rgwangpeng003 2014-02-26 06:01 采纳率: 0%
浏览 2942

自己写的hadoop ,MapReduce程序不能并行

 **

学习hadoop有一段时间了,在写hadoop 程序时,尽管是根据官方例子,套着模板写出的,但是不能达到真正意义上的并行,也就是说,各分机没有任务运行。

**
运行环境如下:
操作系统: centOS6.3 32位, jdk1.7, hadoop-1.0.3, 1台master,3台worker。

为了具体说明问题,程序如下:

package campus;

import java.io.IOException;
import java.net.URI;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestSmsMR {
// map
public static class TSmsMap extends
Mapper {
private static Text keyWord = new Text(); //第一个结点
private static Text valueWord = new Text(); //第二个结点

        public void map(Object key, Text value, Context context) {   // value: tag: 0 1 u 

            String line = value.toString();
            String[] arr = line.split(" |\u0009|\\|");      // 通过 空格、 \t、 | 分割字符串


                if ( !(arr[0].equals(arr[1])) ) {
                    try {
                        String tmpKey = arr[0];

                        String tmpValue = "";

                        for(int i = 1; i < arr.length; i ++){
                            tmpValue += arr[i] + " ";
                        }

                        keyWord.set(tmpKey);
                        valueWord.set(tmpValue);
                        // 数据是非对称的,这就需要使用一次 write
                        context.write(keyWord, valueWord);

// context.write(valueWord, keyWord); //添加这句的话,必须先看图文件,如果重复则不需要这一行
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} // 这种方法可行
}

        }
    }


    //reduce write<Text,Set<Text>>
    public static class TSmsReduce extends
            Reducer<Text, Text, Text, Text> {

        private static Text keyStr = new Text();
        private static Text valueStr = new Text();

        public void reduce(Text key, Iterable<Text> values,Context context) {

            String writeKey = key.toString();

            String writeValues =  "";

            for (Text val : values) {

                writeValues +=  val.toString() + "\t";
            }

            keyStr.set(writeKey);
            valueStr.set(writeValues);

// System.out.println("writeKey: " + writeKey + "\twriteValues: " + writeValues);

            try {
                context.write(keyStr, valueStr);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    public static void preExectue(String inputPath, String outputPath)
            throws Exception {
        Configuration conf = new Configuration();

// conf.setBoolean("mapred.compress.map.output", true);
conf.setBoolean("mapred.output.compress", true);
// conf.setIfUnset("mapred.map.output.compression.type", "BLOCK");
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
conf.addResource(new Path("/usr/hadoop/conf/core-site.xml"));
conf.addResource(new Path("/usr/hadoop/conf/hdfs-site.xml"));
// 如果 outputPath 存在,那么先删除
Path outPutPath = new Path(outputPath);
FileSystem fs = FileSystem.get(URI.create(outputPath), conf);
fs.delete(outPutPath);
// 自己添加路径
String[] ars = new String[] { inputPath, outputPath };
String[] otherArgs = new GenericOptionsParser(conf, ars)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: sotTest ");
System.exit(2);
}
Job job = new Job(conf, "TestSmsMR");
job.setJarByClass(TestSmsMR.class);
job.setMapperClass(TSmsMap.class);
job.setReducerClass(TSmsReduce.class);
// job.setNumReduceTasks(4);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        if (job.waitForCompletion(true)) {
            System.out.println("The preprocess mapreduce has finished!");
        }

    }

    //main函数测试都好着,为什么就不能并行呢
    public static void main(String[] args) throws Exception {

        Long startTime = System.currentTimeMillis();
        String srcPath = "campusSms";
        String dstPath = "campusSmsLabelOut";

        preExectue(srcPath,dstPath);
        Long runTime = System.currentTimeMillis() - startTime;
        System.out.println("run time: " + runTime);
    }

}

还是觉得问题出在这个函数上:
public static void preExectue(String inputPath, String outputPath)

运行前提是: 环境已搭建好,而且测试主机分机都能正常通信,且主机从机都起来了。希望解答时,能多考虑些编程方面的问题。

该程序运行起来,就是在主机上跑,MapReduce机制到分机并没有得到任务,运行的数据250M,按照hadoop默认块64M来算,也应该分为4块,3台分机应得到执行任务的,可是程序就是不能并行。

请有经验的hadoop学习实践者给予指导。

在集群上运行Pi值计算,都能看到并行。就是自己写的程序,不行!......

  • 写回答

2条回答 默认 最新

  • qingqingdede 2016-01-30 08:41
    关注

    请问解决了吗? 怎么查看作业是否分配到了各个子节点?

    评论

报告相同问题?

悬赏问题

  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突
  • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大
  • ¥15 import arcpy出现importing _arcgisscripting 找不到相关程序