课内项目需要使用MapReduce,我的程序可以运行,但是存在以下两个问题:
1.只有第一个单词可以显示所在文件名和TF-IDF的值,其他的词只能显示NULL和0.000000,请问这是为什么?
2.引入DistributedCache后提示无法找到文件
(我是用ubuntu 下eclipse编译运行的,hadoop单机版)
下面是我的代码:
public class TF_IDF {
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
String filename="";//用于存放file name
int allcount=0; //doc中的单词数
//private final static IntWritable one =new IntWritable(1);//value
private Text one=new Text("1");
private String word="";//key
private Path[] querypath=null;
private BufferedReader fis;
private Set<String> queryword=new HashSet<String>();
private FileSplit filesplit;
public void setup(Context context)throws IOException, InterruptedException{
Configuration conf=context.getConfiguration();
querypath=DistributedCache.getLocalCacheFiles(conf);
//FileSystem fsopen=FileSystem.getLocal(conf);
//FSDataInputStream in=fsopen.open(path[0]);
String queryfilename=querypath[0].getName().toString();
try{
fis=new BufferedReader(new FileReader(queryfilename));
String word=null;
while((word=fis.readLine())!=null){
queryword.add(word);
}
}catch(IOException ioe){
System.err.println("Caught exception while parsing the cached file '"
+ StringUtils.stringifyException(ioe));
}
}
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
String line=value.toString().toLowerCase();
line=line.replaceAll("[\\pP\\p{Punct}]"," ");
line=line.replaceAll("[\\pN\\p{Punct}]"," ");
StringTokenizer itr=new StringTokenizer(line);
filesplit=(FileSplit)context.getInputSplit();
filename=filesplit.getPath().getName();//输入文件的路径为/xxx/xxx/xxx.txt
// InputSplit split=(InputSplit)context.getInputSplit();
//filename=((FileSplit)context.getInputSplit()).getPath().getName();//输入文件的路径为/xxx/xxx/xxx.txt
//FileSplit split=(FileSplit)context.getInputSplit();
// String str=split.getPath().toString();
//filename=str.substring(str.lastIndexOf("/")+1);
//int abc=context.getInputSplit().toString().lastIndexOf(":");
//filename=context.getInputSplit().toString().substring(0, abc);
while(itr.hasMoreTokens()){
if(queryword.contains(itr.nextToken())){
word=itr.nextToken();
word = word+" ";
word=word+filename;
allcount+=1;
context.write(new Text(word),one);
}
}
}//map函数结束
public void cleanup(Context context)throws IOException,InterruptedException{
String str="";//这里只是转换allcount变为string类型,不用加空格
str=String.valueOf(allcount);
context.write(new Text("!"+" "+filename),new Text(str));
}//cleanup 函数结束
}//map class 结束
//map 出来的结果是<A doc1, 1><B doc1,1>...其中A doc1为key
private static class Combiner extends Reducer<Text,Text,Text,Text>{
double all=0;//取值,单词的总数
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
Set<Text> valueset=new TreeSet<Text>();
for(Text val:values){
valueset.add(val);
}
//取all的值
int index=key.toString().indexOf(" ");
if(key.toString().substring(index-1, index).equals("!")){//我将!提前,所以不能使用原来的方法
for(Text val:valueset){
all=Integer.parseInt(val.toString());
}
return;
}
double sum=0;
for(Text val:valueset){
sum+=Integer.parseInt(val.toString());
}
double tf=sum/all;
tf=Math.round(tf*100000)/100000.00000;
String tfstring=null;//这儿也不用加空格,只是转化字符串
tfstring=String.valueOf(tf);
String oldkey[]=key.toString().split(" ");
String newkey=oldkey[0];
String newvalue=oldkey[2]+" "+tfstring;
context.write(new Text(newkey),new Text(newvalue));
}//reduce函数结束
}//Combiner class 结束
//Combiner出来的结果是<A, doc1 TF>,A为key
//经过partitioner,将所有的同一key的合起来算针对某一个单词的IDF
public static class Reduce extends Reducer<Text,Text,Text,Text>{
int a=0;//用于计算单词出现的文件的个数,注意的是这个时候key就是单词
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
Set<Text> valueset=new TreeSet<Text>();
for(Text val:values){
a+=1;
valueset.add(val);
}
//double filenumber=context.getNumReduceTasks()+1;
double filenumber=Integer.parseInt(context.getJobName());
double idf=Math.log(filenumber/a);//算出针对某一个单词的IDF(权重)
double max=0;//直接比大小,不然之后很难办,有三个变量呢
String docs[]=new String[a+1];
double TFIDFs[]=new double[a+1];//一一对应,寻找一个对儿
int i=0;//用于循环
//用一个for循环修改每一个对儿
//Iterable<Text> valuefromvalues=values;
for(Text val:valueset){
//先拆value
String value[]=val.toString().split(" ");
String doc=value[0];
String tf=value[1];
double tfdouble=Double.parseDouble(tf);
double tfidf=tfdouble*idf;
if(max<tfidf){
max=tfidf;
}
docs[i]=doc;
TFIDFs[i]=tfidf;
i++;
}
//循环结束后,所有value值都被拆了,max的tfidf也照出来了,现在开始定位
int j=0;
for(j=0;j<i;j++){
if(TFIDFs[j]==max){
break;
}
}
String finalvalues=docs[j]+" "+String.format("%.7f",TFIDFs[j]);
context.write(key,new Text(finalvalues));
}//reduce 函数结束
}//Reduce class 结束
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
FileSystem input=FileSystem.get(conf);
FileStatus p[]=input.listStatus(new Path(args[0]));
DistributedCache.addCacheFile(new URI(args[2]), conf);//add in query.txt
Job job=Job.getInstance(conf,"TF_IDF");
job.setJarByClass(TF_IDF.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String filenumber="";
filenumber+=p.length;
job.setJobName(filenumber);
// job.setNumReduceTasks(p.length+1);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}//main结束
}//class结束