2 gll 89 gll_89 于 2013.10.05 16:22 提问

C#中Parallel.For并行处理中读取文件时出现的错误

1、读取“d:/users/v-lingao/from_lei/wordsegmentation/testdata”目录下的所有txt文档,利用Parallel.For并行处理各个txt文档中的内容,每次读取一行存储到string line中,利用line = sr.ReadLine() (StreamReader sr); 没处理一行也入“d:/users/v-lingao/from_lei/wordsegmentation/testdata1”目录下新创建的对应的txt文件中。方法ComputeIDF()实现次功能。

2、读取在“d:/users/v-lingao/from_lei/wordsegmentation/testdata1”目录下创建的txt文件,利用Parallel.For并行处理每个txt文档中的内容,类似于ComputeIDF()方法,利用line = sr.ReadLine().。法ComputingTfIdf()实现此功能。错误也就出现在此方法中,错误提示根据写入文件时编码方式的不同有所改变。

部分代码如下所示:

public static Dictionary ComputeIDF(List stopWordsList)
{
DirectoryInfo di = new DirectoryInfo(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata");
FileInfo[] ff = di.GetFiles("*.txt");
Dictionary featureDoc = new Dictionary();

        Parallel.For(0, ff.Length, (part) =>
        {
            FileInfo file = ff[part];
            Dictionary<string, int> featureFile = new Dictionary<string,int>();  
            string name = file.Name.Substring(file.Name.LastIndexOf("\\") + 1);
            string path = Path.Combine(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata1", name);
            FileStream aFile = new FileStream(path, FileMode.Create);
            StreamWriter sw = new StreamWriter(aFile, Encoding.UTF8);
            int lineCount = 0;
            char[] charArray = new char[] { ' ' };
            StreamReader sr = new StreamReader(file.OpenRead(),Encoding.UTF8);
            string line = sr.ReadLine();
            while (line != null)
            {
​    ​    ​    ​    ​    ​//部分代码省略
​    ​    ​    ​    ​​lineCount++;
                sw.Write(lineCount);
                foreach (KeyValuePair<string, int> keyvalue in featureLine)
                {
                   sw.Write(' ' + keyvalue.Key + ':' + (0.5 + 0.5 * ((float)keyvalue.Value / maxCount)));
                 }
                 sw.WriteLine();
                 line = sr.ReadLine();
            }                                           
                //combine the featureFiles into featureDoc without repeating
                featureDoc.Add(featurename, featureFile[featurename]);                 
                sr.Close();
                sw.Close();    
        });
        Dictionary<string, float> idf = new Dictionary<string, float>();
        foreach (KeyValuePair<string, int> keyvalue in featureDoc)
        {
            idf.Add(keyvalue.Key, (float)Math.Log10((float)sumLine / (float)keyvalue.Value));
        }
        return idf;
    }

这个方法没有问题。接下来是ComputingTfIdf(idf),问题出在这个方法中。
public static void ComputingTfIdf(Dictionary idf)
{
DirectoryInfo dir = new DirectoryInfo(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata1");
FileInfo[] ff = dir.GetFiles("*.txt");
StreamReader sr;

        Parallel.For(0, ff.Length, (part) =>
        {
            FileInfo file = ff[part];
            List<string> idfList = new List<string>();
            idfList.AddRange(idf.Keys);
            int linenum = 0;
            sr = new StreamReader(file.OpenRead(),Encoding.UTF8);
            char[] charArray = new char[] { ' ' };
            char[] charArray1 = new char[] { ':' };


            string name = file.Name.Substring(file.Name.LastIndexOf("\\") + 1);
            string path = Path.Combine(@"d:/users/v-lingao/from_lei/wordsegmentation/idfdata", name);
            FileStream aFile = new FileStream(path, FileMode.Create);
            StreamWriter sw = new StreamWriter(aFile, Encoding.UTF8);
             ** *string line = sr.ReadLine();* **    //这行有时也会出错
            while (line != null)
            {
                linenum++;
                string[] words = line.Split(charArray);
                int i = 1;
                foreach (string word in words)
                {
                    if (i == 1)
                    {
                        sw.Write(word + ' ');
                        i++;
                    }
                    else
                    {
                        string[] wds = word.Split(charArray1);
                        if (wds.Length == 2)
                        {
                            string key = wds[0];
                            if (idf.Keys.Contains(key))
                            {
                                double tfidf = (double)idf[key] * (Convert.ToDouble(wds[1]));
                                sw.Write(idfList.IndexOf(key)+ ':'+tfidf +' ');
                            }
                        }
                    }
                }
                sw.WriteLine();
                 ** *line = sr.ReadLine();* **   //问题常常出现在这行
                }
            }
            sw.Close();
        });
    }

错误提示根据写入文件时编码方式的不同有所改变。当读取、写入文件用UTF8或者Unicode时,写入和读取的都是乱码,并且line = sr.ReadLine()出错,错误提示为: ** The output char buffer is too small to contain the decoded characters, encoding 'Unicode (UTF-8)' fallback 'System.Text.DecoderReplacementFallback' **
很是无语,功能相同的代码,为什么ComputeIDF()方法中line = sr.ReadLine()就不出错。我将编码换成Encoding.GetEncoding("GBK")读写文件不会出现乱码,但line = sr.ReadLine()还是出错,相当无语!
还有就是当不用并行处理Parallel.For,而是用for循环时也不出错。
求大侠帮忙,不胜感激!

1个回答

gll_89
gll_89   2013.10.06 13:08
已采纳

问题在朋友的帮助下已经解决,很感谢我的朋友!
现在把结果和大家分享下,希望遇到类似问题的同仁能从中有所启发。
用并行处理Parallel.For,要特别注意局部变量的位置。在我的代码中sr是在Parallel.For结构外面定义的,这样在执行的过程中几个线程会共享一个sr,最终导致异常的产生。

Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!