hadoop 2.9.2 版本
设置输出输出文件格式时,可以
job.setOutputFormatClass(TextOutputFormat.class); // 默认的输出组件
但是,job.setOutputFormatClass(SequenceFileOutputFormat.class); 这样时,会报错。看其他人的博客,可以这样设置。
错误提示:
The method setOutputFormatClass(Class<? extends OutputFormat>) in the type Job is not applicable for the arguments (Class)
package cn.edu360.mr.indexSequance;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 构建倒排索引,产生结果; hello a.txt->3 b.txt->4
*
* 分两步,
* 一
* map: 产生K: 单词+文档 V:次数(1)
* reduce: 产生 K:单词+文档 V:总次数
*
*二
* map: 产生 K:单词 V:文档 在文档中出现的次数
* reduce : 输出 单词 文档, 文档中出现的次数
*
*
* @author Administrator
*
*/
public class IndexStep1 {
/**
* 分割单词,
* 输出 单词+文档 次数 1
* @author Administrator
*
*/
public static class IndexMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取maptask所处理的数据任务上下文信息: 文件路径, 偏移量范围
// 若访问数据为数据库,则为: 库名, 表名, 行范围
InputSplit inputSplit = context.getInputSplit(); // InputSplit为抽象类
FileSplit fileSplit = (FileSplit) inputSplit; // 强转inputSplit为FileSplit类型, FileSplit为InputSplit的实现类,针对文件
String filename = fileSplit.getPath().getName(); // 获取文件名
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
word = format(word);
if (word.length() >2) {
context.write(new Text(word + "-" + filename), new IntWritable(1));
}
}
}
}
/**
* 统计文档+单词
*
* 输出: K:文档+单词 V: 单词在该文档中出现的总次数
* @author Administrator
*
*/
public static class IndexReduce1 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
JobConf jobConf = new JobConf(configuration);
Job job = Job.getInstance(configuration);
job.setMapperClass(IndexMapper1.class);
job.setReducerClass(IndexReduce1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
File file = new File("d:\\testOut");
if (file.exists()) {
deleteDir(file);
}
job.setOutputFormatClass(TextOutputFormat.class); // 默认的输出组件
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("d:\\test1"));
FileOutputFormat.setOutputPath(job, new Path("d:\\testOut"));
job.waitForCompletion(true);
}
// 使用正则表达式去除标点符号,大写转小写
public static String format(String s) {
String str = s.replaceAll("\\pP|\\pS", "");
return str.toLowerCase(); // 大写转小写
}
/**
* 递归删除文件夹及文件下的所有文件
* @param dir
* @return
*/
private static boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
// 递归删除目录中的子目录下
for (int i=0; i<children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
return false;
}
}
}
// 目录此时为空,可以删除
return dir.delete();
}
}