hadoop向mysql写入数据,map执行100%,reduce不执行

开发环境: haddoop2.7.1,jdk1.7
功能描述:从文件中读取数据写入mysql.
问题:为什么程序执行map100%,combine100%,reduce0%.程序没有运行到reduce中

package b508.demo;

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WriteDB3 {

// Map处理过程

public static class Map extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        String line = value.toString();

        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {

            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
        System.out.println("MAP");

    }
}
// combine

public static class Combine extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable num : values) {
            sum += num.get();
        }
        context.write(key, new IntWritable(sum));
    //  System.out.println(key);
    //  System.out.println(sum);
    //  System.out.println("Combine over");

    }
}
// Reduce处理过程

public static class Reduce extends Reducer<Text, IntWritable, WordRecord, Text> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, WordRecord, Text>.Context context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        System.out.println("reduce begin");
        int sum = 0;
        for (IntWritable num : values) {
            sum += num.get();
        }
        WordRecord wordcount = new WordRecord();

        wordcount.word = key.toString();

        wordcount.number = sum;
        context.write(wordcount, new Text());
        System.out.println("reduce over");
    }

}

public static class WordRecord implements Writable, DBWritable {

    public String word;

    public int number;

    @Override

    public void readFields(DataInput in) throws IOException {

        this.word = Text.readString(in);

        this.number = in.readInt();

    }

    @Override

    public void write(DataOutput out) throws IOException {

        Text.writeString(out, this.word);

        out.writeInt(this.number);

    }

    @Override

    public void readFields(ResultSet result) throws SQLException {

        this.word = result.getString(1);

        this.number = result.getInt(2);

    }

    @Override

    public void write(PreparedStatement stmt) throws SQLException {

        stmt.setString(1, this.word);

        stmt.setInt(2, this.number);

    }

}

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word");

    job.setJarByClass(WriteDB2.class);
    job.setMapperClass(Map.class);

    job.setCombinerClass(Combine.class);
    job.setReducerClass(Reduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(WordRecord.class);
    job.setOutputValueClass(Text.class);
    Path path = new Path("hdfs://master:9000/input");
    FileInputFormat.addInputPath(job, path);


    // 建立数据库连接

    DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

    "jdbc:mysql://localhost:3306/school", "root", "123456");

    // 写入"wordcount"表中的数据

    String[] fields = { "word", "number" };

    DBOutputFormat.setOutput(job, "wordcount", fields);


    job.waitForCompletion(true); 



}

}

0

1个回答

2
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
hadoop执行mapreduce过程reduce不执行原因
1.如果你的map过程中没有context.write()是不执行reduce过程的; 2.如果你的map过程中context.write()的map后的的部分数据出现问题,不符合reduce接受的数据也会不执行reduce 比如说你的日志文件中有一个空行是不符合reduce的接受数据reduce不执行;
hadoop reducer不执行问题及解决
一、问题描述:在eclipse上通过hadoop插件自动生成mapper、reducer、main方法类实现wordcount的统计。待处理的文件内容:hello tomhello joyhello rosehello joyhello jerryhello tomhello rosehello joy测试了几次,处理的结果都为:hello 1hello 1hello 1hello 1hello ...
遇到问题---hadoop---reduce执行时又重新map
遇到的情况 我们在运行一个2T的hive数据进行动态分区,发现运行了很长时间的mapreduce在reduce运行过程中又重新启动了一次map。 如图 原因 分析到的原因可能有两个 一是有异常报错,reduce入库时一直失败,很多个fail或者kill,hadoop启用推测执行机制。hadoop针对慢或者老是失败的任务额外启动一个备份任务,一起处理同一份数据,哪个先执行完,则采用...
hadoop执行过程中reduce没有执行,没有将key相同的value放在一起的问题
hadoop在过滤重复数据的问题中出现了一些问题,没有将相同的数据去掉,而是排好序都呈现了出来,于是我又写了一个字符计数的程序,也是这种效果,没有将同一个key的value放在一起,效果图如下 这个是原始数据 这个是处理之后的数据 仅仅是将每行的数据进行切分了,没有将key相同的放在一起。 原始代码如下 package ccnu.eisr; import java.io.IO...
hadoop 程序在运行时 reduce过程卡在某个进度不动的问题
今天写好hadoop 程序之后,进行线上测试,驶入数据为一个hive表的文件,location到了一个hdfs目录下,然后跑hadoop的过程中,map阶段没有出现问题,但是每次到了reduce阶段,进度都卡在33%不动了,刚开始以为是集群问题,后来重新启动了几次任务,都是这种情况。 可能的情况1: 后来在stackoverflow上找到了答案,这是hadoop上数据倾斜造成的问题(我的hiv
集群提交MapReduce作业执行卡住问题解决方案
关于MapReduce,Map能完全执行,Reduce执行到一半卡住不执行住了几个小时,死活停滞不前,发现后根据提示开始排错,进入50030,进入作业发现这说明作业在map之完后的shuffle阶段中,reduce无法从map处拷贝数据,是因为客户端与数据节点通讯失败造成的。客户端程序应该能够和所有的节点通讯才能保证数据的传输正常,然后开始各种检查:所有节点hosts中写的完全相通,不是节点名称不...
Hadoop Context 写文件遇到的问题(reduce 100% 后失败)
场景:        我在一个reduce中同时使用了context.write 和multipleOutputs.write,         结果却打出了如下信息:2018-03-07 17:45:39,425 INFO [submiter1] org.apache.hadoop.mapreduce.Job: map 100% reduce 98% 2018-03-07 17:45:44,4...
<hadoop>在hadoop集群的map和reduce函数中传递自定义对象
在hadoop集群中传递自定义对象
Hadoop,往map/reduce中传值的问题解决方法实例
Hadoop,往map/reduce中传值的问题解决方法实例最近在看一些map/reduce的程序,其中遇到一个问题:就是在类中定义的属性无法被mapreduce程序直接获取。具体代码如下public class KeyJob { public static class myMap extends Mapper<LongWritable, Text, Text, IntWritable> {
hadoop08--maptask、reducetask的并行度&数据倾斜问题
maptask的并行度 1.maptask:运行map部分的任务,我们就叫做maptask。 2.并行度:同时运行的maptask的任务的个数,一个maptask肯定只运行在一台节点上。 3.例如文件大小是500M: 存储为三块: blk_1:0-128 blk_2:128-256 blk_3:256-384 blk_4:384-500 启动一个maptask合适...
关于运行PI和wordcount的map0%,reduce0%问题解决方法
记得:没金刚钻别揽瓷器活!当然怎么说呢。。4个site.xml文件配置别搞的太高大上,用默认的就好了,这样就不会导致你机器尤其是master节点卡在map0%和reduce0%了。。。。为了明白这问题,一搞这问题又是一天啊。。。我的电脑配置:3台腾讯云:1核2G,50G;以下为我的4个site文件配置:mapred-site.xml&amp;lt;configuration&amp;gt;&amp;lt;property...
hadoop的小疑问:Map执行未结束便开始执行Reduce操作?
关于MapReduce的一个很基本的疑问: 以入门的WordCount案列示意: 在程序执行MR的过程中,经常会在map还未执行结束时,便开始执行了reduce. map执行结束后,通过shuffer过程,将想同的单词作为一个key放在一起 map的输出结果为(key,1) 假设Map执行了80%时,便已经开始了reduce操作,那么已经出现统计过的词
使用sqoop将数据从hdfs中导入mysql时,卡在INFO mapreduce.Job: map 100% reduce 0%的解决办法
最近在将hdfs中已经处理好的数据导入到mysql的时候遇到这么一个现象,当任务执行到 INFO mapreduce.Job: map 100% reduce 0% mapreduce任务卡在map100% ,任何reduce不继续执行。 总结了以下解决办法: 一,如果没有报错的话,可能是你mysql的数据库建表指定的主键不全,导致导入一部分数据后,主键出现相同字段,那么就造成阻塞了
Reduce运行到99.99%到100%,最后出现了错误,导致任务失败
Error: java.io.IOException: Failing write. Tried pipeline recovery 5 times without success.  at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1113)  a
hadoop中每个节点map和reduce个数的设置调优
map red.tasktracker.map.tasks.maximum 这个是一个task tracker中可同时执行的map的最大个数,默认值为2,看《pro hadoop》:it is common to set this value to the effective number of CPUs on the node   把Job分割成map和reduce ...
hadoop入门级总结二:Map/Reduce
在上一篇博客:hadoop入门级总结一:HDFS中,简单的介绍了hadoop分布式文件系统HDFS的整体框架及文件写入读出机制。接下来,简要的总结一下hadoop的另外一大关键技术之一分布式计算框架:Map/Reduce。一、Map/Reduce是什么:Map/Reduce是在2004年谷歌的一篇论文中提出大数据并行编程框架,由两个基本的步骤Map(映射)和Reduce(化简)组成,Map/Red
Mapper类/Reducer类中的setup方法和cleanup方法以及run方法的介绍
在hadoop的源码中,基类Mapper类和Reducer类中都是只包含四个方法:setup方法,cleanup方法,run方法,map方法。 /**    * Expert users can override this method for more complete control over the    * execution of the Mapper.    * @pa
hadoop 多个mapreduce在java中串起来执行
注意:不建议这样做,用shell脚本把多个mr按顺序执行
模拟实现mapreduce中环形缓冲区的读写过程
原型: 在mapreduce中,map task调用map处理逻辑将处理后的key和value利用outputcollector.collect()放入一个环形缓冲区中,那么这个缓冲区是有一定大小的,那么如果放入的内容很多很多的时候怎么办呢?其实hadoop里面有这么个机制,在缓冲区达到某一个值或者比率的时候,比如80%,那么hadoop会利用Spiller.spill()将这个80%的数据读出
Hadoop执行MapReduce程序时,报自定义Mapper类或者Reducer类没有初始化方法<init>错误
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: xxx.MapJoin$MyMapper.&amp;lt;init&amp;gt;() 原因:map类和reduce类前没有加static修饰。     hadoop在调用map和reduce类时利用的是反射机制,如果内部类不是静态的,则无法获取到...
hadoop上的两种运行mapreduce程序的方法
之前学习了一段时间的hadoop的相关知识 ,学习理论基础的时候要同时实际操作才能对它更熟练,废话不多说来说说在hadoop上运行一个最简单的words count的程序 首先我先贴上这个程序的源代码 供大家参考 代码分为三个部分写的 Run、 map阶段、 reduce阶段 Map: package wordsCount; import java.io.IOExcept
map 0% reduce 0% 然后卡死的解决方案
在运行几次mapreduce任务后,再启动一次会出现系统卡死的情况。。。百度到的几种方法都木有用。。。偶然发现ubuntu下有个system monitor,打开发现内存占用率很高,swap分区差不多快用光了,再运行一次map任务,当swap用光后电脑就死机了。。。重启,将swap分区从380M扩容到8G。。。妈妈再也不用担心我运行mapreduce死机了。BTW,昨晚顺便买了条4g内存。。。
执行mapreduce程序时直接退出,没有报错
运行mapreduce程序时直接退出 运行没有任何报错,什么错误都没,查看日志也没,但是就是忘看代码了 结果只因为少了下面这一行
mapreduce的cleanUp和setUp的特殊用法(TopN问题)和常规用法
特殊用法 我们上来不讲普通用法,普通用法放到最后。我们来谈一谈特殊用法,了解这一用法,让你的mapreduce编程能力提高一个档次,毫不夸张!!!扯淡了,让我们进入正题: 我们知道reduce和map都有一个局限性就是map是读一行执行一次,reduce是每一组执行一次,但是当我们想全部得到数据之后,按照需求删选然后再输出怎么办? 这时候只使用map和r...
mapreduce程序中避免reduce输出空文件
在mapreduce里,如果某个reduce输出为空,默认也会生成一个大小为0的文件。原因是reduce写的时候,不知道会不会有输出数据,所以默认初始化了一个文件。如果没有输出,close文件最终会生成一个空文件。如下。有几个缺点: 1)生成的很多小文件,对namenode形成一定压力 2)生成的数据下个阶段处理的时候,这些空的文件会浪费掉一些计算资源。 3)看着不爽 -rw-r--r--...
C++版的mapreduce程序运行在hadoop上(1)
因部门内一部分写c++而不会java的人想要将他们的程序改写成mapreduce程序运行在hadoop上,故作了几个简单的例子作说明。本篇是hadoop streaming运行c++。
Hive-为什么没有启动MapReduce任务
Hive-fetch task 优化 hive.fetch.task.conversion hive.fetch.task.aggr hive.fetch.task.conversion.threshold
Hadoop Map Reduce教程
Hadoop Map Reduce教程,介绍hadoop map/reduce框架的各个方面
Hadoop运行mapreduce任务过程中报错:Error: Java heap space问题解决
问题:Hadoop集群在运行mapreduce任务的时候报错:Error: Java heap space 问题分析:这个错误,首先一看到,便猜测是jvm的堆内存不够,于是便查询了hadoop运行mapreduce的时候jvm的默认值(我之前在搭建集群的时候是没有设置的),于是知道了在 mapred-site.xml中有一个mapred.child.java.opts的配置,用于jvm运行时he
学习Hadoop权威指南之Hadoop运行MapReduce日志查看
修改map配置文件 mapred-site.xml   [root@bigdata yar]# vim /opt/hadoop-2.8.3/etc/hadoop/mapred-site.xml   &amp;lt;property&amp;gt; &amp;lt;name&amp;gt;mapreduce.jobhistory.address &amp;lt;/name&amp;gt; &amp;lt;value&amp;gt;bigdata.cq...
hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序
利用hadoop的mapreduce把oracle/mysql中的数据导入到hbase和hdfs中的两个java程序
hadoop中MapReduce的sort(部分排序,完全排序,二次排序)
1.部分排序 MapReduce默认就是在每个分区里进行排序 2.完全排序 在所有的分区中,整体有序                 1)使用一个reduce             2)自定义分区函数 不同的key进入的到不同的分区之中,在每个分区中自动排序,实现完全分区.. import org.apache.hadoop.io.IntWritable; import org...
多个MapReduce之间的嵌套
多个MapReduce之间的嵌套在很多实际工作中,单个MR不能满足逻辑需求,而是需要多个MR之间的相互嵌套。很多场景下,一个MR的输入依赖于另一个MR的输出。结合案例实现一下两个MR的嵌套。 Tip:如果只关心多个MR嵌套的实现,可以直接跳到下面《多个MR嵌套源码》章节查看案例描述根据log日志计算log中不同的IP地址数量是多少。测试数据如下图所示: 该日志中每个字段都是用Tab建分割的。
MapReduce 只有Map阶段,写出到pc端
package ProOrder2;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap;import
hadoop的mapreduce任务的执行流程
hadoop2.x的三大核心:mapreduce 、hdfs以及yarn ,其中核心之一mapreduce,利用了分而治之的思想,Map(映射)和 Reduce(归约),分布式多处理然后进行汇总的思想,比如:清点扑克牌把里面的花色都分开,一个人清点那么可能耗时4分钟,如果利用mapreduce的思想,把扑克牌分成4份,每个人对自己的那一份进行清点,然后4个人都清点完成之后把各自的相同花色放一起进行汇
hadoop概念(3)-MapReduce各个执行阶段及Shuffle过程详解
MapReduce各个执行阶段(1)MapReduce框架使用InputFormat模块做Map前的预处理,比如验证输入的格式是否符合输入定义;然后,将输入文件切分为逻辑上的多个InputSplit,InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件进行实际切割,只是记录了要处理的数据的位置和长度。(2)因为InputSpl
Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)
前言当一个应用向YARN集群提交作业后,此作业的多个任务由于负载不均衡、资源分布不均等原因都会导致各个任务运行完成的时间不一致,甚至会出现一个任务明显慢于同一作业的其它任务的情况。如果对这种情况不加优化,最慢的任务最终会拖慢整个作业的整体执行进度。好在mapreduce框架提供了任务推断执行机制,当有必要时就启动一个备份任务。最终会采用备份任务和原任务中率先执行完的结果作为最终结果。由于具体分析推
mapreduce运行遇到的问题-1
1.ShuffleError: error in shuffle in fetcher Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.ru...
mapreduce job任务非常慢
1、application日志 2016-08-11 14:48:15,174 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Ramping down all scheduled reduces:0 2016-08-11 14:48:15,174 INFO [
大数据数据库的技术对垒:MapReduce vs. MPP[作者:李明]
大数据数据库的技术对垒:MapReduce vs. MPP --作者:李明(email: mli@pivotal.io) 这些年大数据概念已经成为IT界的热门,我们经常也会在新闻和报纸中看到。大数据概念中最为关键的技术就是数据库管理系统,伴随着hadoop和MapReduce技术的流行,大数据的数据库中Hive和
相关热词 c#异步发送kafka c#窗体编号 c# 操作二进制文件 c# 反射 机制 c#线程 窗体失去响应 c#角度转弧度 c# 解析gps数据 c# vs设置 语法版本 c# json含回车 c#多线程demo