2 rgwangpeng003 rgwangpeng003 于 2014.02.26 14:01 提问

自己写的hadoop ,MapReduce程序不能并行
 **

学习hadoop有一段时间了,在写hadoop 程序时,尽管是根据官方例子,套着模板写出的,但是不能达到真正意义上的并行,也就是说,各分机没有任务运行。

**
运行环境如下:
操作系统: centOS6.3 32位, jdk1.7, hadoop-1.0.3, 1台master,3台worker。

为了具体说明问题,程序如下:

package campus;

import java.io.IOException;
import java.net.URI;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestSmsMR {
// map
public static class TSmsMap extends
Mapper {
private static Text keyWord = new Text(); //第一个结点
private static Text valueWord = new Text(); //第二个结点

        public void map(Object key, Text value, Context context) {   // value: tag: 0 1 u 

            String line = value.toString();
            String[] arr = line.split(" |\u0009|\\|");      // 通过 空格、 \t、 | 分割字符串


                if ( !(arr[0].equals(arr[1])) ) {
                    try {
                        String tmpKey = arr[0];

                        String tmpValue = "";

                        for(int i = 1; i < arr.length; i ++){
                            tmpValue += arr[i] + " ";
                        }

                        keyWord.set(tmpKey);
                        valueWord.set(tmpValue);
                        // 数据是非对称的,这就需要使用一次 write
                        context.write(keyWord, valueWord);

// context.write(valueWord, keyWord); //添加这句的话,必须先看图文件,如果重复则不需要这一行
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // 这种方法可行
}

        }
    }


    //reduce write<Text,Set<Text>>
    public static class TSmsReduce extends
            Reducer<Text, Text, Text, Text> {

        private static Text keyStr = new Text();
        private static Text valueStr = new Text();

        public void reduce(Text key, Iterable<Text> values,Context context) {

            String writeKey = key.toString();

            String writeValues =  "";

            for (Text val : values) {

                writeValues +=  val.toString() + "\t";
            }

            keyStr.set(writeKey);
            valueStr.set(writeValues);

// System.out.println("writeKey: " + writeKey + "\twriteValues: " + writeValues);

            try {
                context.write(keyStr, valueStr);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    public static void preExectue(String inputPath, String outputPath)
            throws Exception {
        Configuration conf = new Configuration();

// conf.setBoolean("mapred.compress.map.output", true);
conf.setBoolean("mapred.output.compress", true);
// conf.setIfUnset("mapred.map.output.compression.type", "BLOCK");
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.addResource(new Path("/usr/hadoop/conf/core-site.xml"));
conf.addResource(new Path("/usr/hadoop/conf/hdfs-site.xml"));
// 如果 outputPath 存在,那么先删除
Path outPutPath = new Path(outputPath);
FileSystem fs = FileSystem.get(URI.create(outputPath), conf);
fs.delete(outPutPath);
// 自己添加路径
String[] ars = new String[] { inputPath, outputPath };
String[] otherArgs = new GenericOptionsParser(conf, ars)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: sotTest ");
System.exit(2);
}
Job job = new Job(conf, "TestSmsMR");
job.setJarByClass(TestSmsMR.class);
job.setMapperClass(TSmsMap.class);
job.setReducerClass(TSmsReduce.class);
// job.setNumReduceTasks(4);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        if (job.waitForCompletion(true)) {
            System.out.println("The preprocess mapreduce has finished!");
        }

    }

    //main函数测试都好着,为什么就不能并行呢
    public static void main(String[] args) throws Exception {

        Long startTime = System.currentTimeMillis();
        String srcPath = "campusSms";
        String dstPath = "campusSmsLabelOut";

        preExectue(srcPath,dstPath);
        Long runTime = System.currentTimeMillis() - startTime;
        System.out.println("run time: " + runTime);
    }

}

还是觉得问题出在这个函数上:
public static void preExectue(String inputPath, String outputPath)

运行前提是: 环境已搭建好,而且测试主机分机都能正常通信,且主机从机都起来了。希望解答时,能多考虑些编程方面的问题。

该程序运行起来,就是在主机上跑,MapReduce机制到分机并没有得到任务,运行的数据250M,按照hadoop默认块64M来算,也应该分为4块,3台分机应得到执行任务的,可是程序就是不能并行。

请有经验的hadoop学习实践者给予指导。

在集群上运行Pi值计算,都能看到并行。就是自己写的程序,不行!......

2个回答

qingqingdede
qingqingdede   2016.01.30 16:41

请问解决了吗? 怎么查看作业是否分配到了各个子节点?

yingping1898
yingping1898   2016.10.14 10:15

请问解决了吗? 我也遇到了这样的问题?

Csdn user default icon
上传中...
上传图片
插入图片
准确详细的回答,更有利于被提问者采纳,从而获得C币。复制、灌水、广告等回答会被删除,是时候展现真正的技术了!