不知道~- 2022-12-19 21:06 采纳率: 54.2%
浏览 37
已结题

expected org.apache.hadoop.io.Text, receivedorg.apache.hadoop.io.LongWritable

img


public class MysqllReadMapper extends Mapper<LongWritable,MysqlWritable,LongWritable,Text> {
    @Override
    protected void map(LongWritable key, MysqlWritable value, Context context) throws IOException, InterruptedException {
        context.write(key,new Text(value.toString()));
    }
}
package com.ll.mysqlreddata;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MysqlReadDriver {
    public MysqlReadDriver()  {
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
         final Configuration  conf=new Configuration();
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test","root","123456");
        final Job job= Job.getInstance(conf,MysqlReadDriver.class.getSimpleName());
        job.setJarByClass(MysqlReadDriver.class);
        job.setMapperClass(MysqllReadMapper.class);
        job.setReducerClass(MysqlReadReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        DBInputFormat.setInput(job,MysqlWritable.class," SELECT goodsType,AVG(goodsprice),AVG(goodsCount) from  goods_table GROUP BY goodsType","select count(goodsType) from goods_table");
        job.setInputFormatClass(DBInputFormat.class);
        Path output=new Path("./18one");
        FileOutputFormat.setOutputPath(job,output);
//        job.setPartitionerClass(GoodsPartitioner.class);
////    //一般来说 1个分区 就有一个ReduceTask执行该分区的数据
//        job.setNumReduceTasks(2);//与预计的分区一致
        boolean flag=job.waitForCompletion(true);
        System.exit(  flag ?   0 : 1  );
    }
//
    /*
    job.setJarByClass(MysqlReadDriver.class);
        job.setMapperClass(MysqllReadMapper.class);
        job.setReducerClass(MysqlReadReduce.class);
        ./mysqlreadoutput
     */




}

package com.ll.goods_data;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MysqlReadReduce extends Reducer<LongWritable,Text,LongWritable,Text> {
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        super.reduce(key, values, context);
    }
package com.ll.mysqlreddata;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class MysqlWritable implements Writable,DBWritable {
    private String goodsType;
    private double avg_Price;
    private double avg_Count;
    private double goodPrice;

    public double getGoodPrice() {
        return goodPrice;
    }

    public void setGoodPrice(double goodPrice) {
        this.goodPrice = goodPrice;
    }

    public String getGoodsType() {
        return goodsType;
    }

    public void setGoodsType(String goodsType) {
        this.goodsType = goodsType;
    }

    public double getAvg_Price() {
        return avg_Price;
    }

    public void setAvg_Price(double avg_Price) {
        this.avg_Price = avg_Price;
    }

    public double getAvg_Count() {
        return avg_Count;
    }

    public void setAvg_Count(double avg_Count) {
        this.avg_Count = avg_Count;
    }

    @Override
    public String toString() {
        return "类型:" + goodsType+
                "商品的平均价格" +avg_Price  + "平均数量 "+avg_Count;
    }

    public MysqlWritable() {

    }

    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1,this.goodsType);
        preparedStatement.setDouble(2,this.avg_Price);
        preparedStatement.setDouble(3,this.avg_Count);


    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
                this.avg_Price=resultSet.getDouble(2);//从结果集获取你想要的数据
                this.avg_Count=resultSet.getDouble(3);//从结果集获取你想要的数据
                this.goodsType=resultSet.getString(1);
//                this.last_update=resultSet.getString("last_update");
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(goodsType);
        dataOutput.writeDouble(getAvg_Price());
              dataOutput.writeDouble(getAvg_Count());
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.goodsType=dataInput.readUTF();
        this.avg_Price=dataInput.readDouble();
        this.avg_Count=dataInput.readDouble();

    }
}
  • 写回答

1条回答 默认 最新

  • wux_labs 2022-12-19 22:16
    关注

    类型不匹配,需要文本类型,但接收到了数值类型。
    你的main函数中定义了:

    job.setMapperClass(MysqllReadMapper.class);
    job.setMapOutputKeyClass(Text.class);
    

    也就是你需要map输出Text类型。
    但是你的MysqllReadMapper的输出却定义成了LongWritable,两者类型不匹配。

    public class MysqllReadMapper extends Mapper<LongWritable,MysqlWritable,LongWritable,Text> {
        @Override
        protected void map(LongWritable key, MysqlWritable value, Context context) throws IOException, InterruptedException {
            context.write(key,new Text(value.toString()));
        }
    }
    

    要么改main函数,要么改自定义mapper,改成类型一致。
    望采纳,谢谢!

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 12月29日
  • 已采纳回答 12月21日
  • 修改了问题 12月19日
  • 修改了问题 12月19日
  • 展开全部

悬赏问题

  • ¥15 filenotfounderror:文件是存在的,权限也给了,但还一直报错
  • ¥15 YOLOv5在进行trainpy训练后为什么会出现这种情况(语言-python)
  • ¥15 关于远程桌面的鼠标位置转换
  • ¥15 MATLAB和mosek的求解问题
  • ¥20 修改中兴光猫sn的时候提示失败
  • ¥15 java大作业爬取网页
  • ¥15 怎么获取欧易的btc永续合约和交割合约的5m级的历史数据用来回测套利策略?
  • ¥15 有没有办法利用libusb读取usb设备数据
  • ¥15 为什么openeluer里面按不了python3呢?
  • ¥15 关于#matlab#的问题:训练序列与输入层维度不一样