**
学习hadoop有一段时间了,在写hadoop 程序时,尽管是根据官方例子,套着模板写出的,但是不能达到真正意义上的并行,也就是说,各分机没有任务运行。
**
运行环境如下:
操作系统: centOS6.3 32位, jdk1.7, hadoop-1.0.3, 1台master,3台worker。
为了具体说明问题,程序如下:
package campus;
import java.io.IOException;
import java.net.URI;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestSmsMR {
// map
public static class TSmsMap extends
Mapper {
private static Text keyWord = new Text(); //第一个结点
private static Text valueWord = new Text(); //第二个结点
public void map(Object key, Text value, Context context) { // value: tag: 0 1 u
String line = value.toString();
String[] arr = line.split(" |\u0009|\\|"); // 通过 空格、 \t、 | 分割字符串
if ( !(arr[0].equals(arr[1])) ) {
try {
String tmpKey = arr[0];
String tmpValue = "";
for(int i = 1; i < arr.length; i ++){
tmpValue += arr[i] + " ";
}
keyWord.set(tmpKey);
valueWord.set(tmpValue);
// 数据是非对称的,这就需要使用一次 write
context.write(keyWord, valueWord);
// context.write(valueWord, keyWord); //添加这句的话,必须先看图文件,如果重复则不需要这一行
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // 这种方法可行
}
}
}
//reduce write<Text,Set<Text>>
public static class TSmsReduce extends
Reducer<Text, Text, Text, Text> {
private static Text keyStr = new Text();
private static Text valueStr = new Text();
public void reduce(Text key, Iterable<Text> values,Context context) {
String writeKey = key.toString();
String writeValues = "";
for (Text val : values) {
writeValues += val.toString() + "\t";
}
keyStr.set(writeKey);
valueStr.set(writeValues);
// System.out.println("writeKey: " + writeKey + "\twriteValues: " + writeValues);
try {
context.write(keyStr, valueStr);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void preExectue(String inputPath, String outputPath)
throws Exception {
Configuration conf = new Configuration();
// conf.setBoolean("mapred.compress.map.output", true);
conf.setBoolean("mapred.output.compress", true);
// conf.setIfUnset("mapred.map.output.compression.type", "BLOCK");
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.addResource(new Path("/usr/hadoop/conf/core-site.xml"));
conf.addResource(new Path("/usr/hadoop/conf/hdfs-site.xml"));
// 如果 outputPath 存在,那么先删除
Path outPutPath = new Path(outputPath);
FileSystem fs = FileSystem.get(URI.create(outputPath), conf);
fs.delete(outPutPath);
// 自己添加路径
String[] ars = new String[] { inputPath, outputPath };
String[] otherArgs = new GenericOptionsParser(conf, ars)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: sotTest ");
System.exit(2);
}
Job job = new Job(conf, "TestSmsMR");
job.setJarByClass(TestSmsMR.class);
job.setMapperClass(TSmsMap.class);
job.setReducerClass(TSmsReduce.class);
// job.setNumReduceTasks(4);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
if (job.waitForCompletion(true)) {
System.out.println("The preprocess mapreduce has finished!");
}
}
//main函数测试都好着,为什么就不能并行呢
public static void main(String[] args) throws Exception {
Long startTime = System.currentTimeMillis();
String srcPath = "campusSms";
String dstPath = "campusSmsLabelOut";
preExectue(srcPath,dstPath);
Long runTime = System.currentTimeMillis() - startTime;
System.out.println("run time: " + runTime);
}
}
还是觉得问题出在这个函数上:
public static void preExectue(String inputPath, String outputPath)
运行前提是: 环境已搭建好,而且测试主机分机都能正常通信,且主机从机都起来了。希望解答时,能多考虑些编程方面的问题。
该程序运行起来,就是在主机上跑,MapReduce机制到分机并没有得到任务,运行的数据250M,按照hadoop默认块64M来算,也应该分为4块,3台分机应得到执行任务的,可是程序就是不能并行。
请有经验的hadoop学习实践者给予指导。
在集群上运行Pi值计算,都能看到并行。就是自己写的程序,不行!......