椰Ye 2023-05-16 00:43 采纳率: 63.2%
浏览 16

MapReduce分区输出ip并且统计个数(2

MapReduce分区输出ip并且统计个数(2
在使用mapReduce实现不同类别ip地址输出到不同文件如何统计每个类型ip地址的个数,将mapper结果用bean封装,分区主要靠partitioner实现,但是一直出现java.lang.ClassCastException的问题,按照网上的法子改了很多次都不行

Mapper

package com.topview.log;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogMapper extends Mapper<LongWritable,Text,LogBean, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //178.199.96.56    2023-03-17 02:29:29    2023-03-17 03:09:25
        //获取一行
        String line = value.toString();
        //日志解析
        LogBean logBean = parseLog(line);

        //写出
        context.write(logBean, new IntWritable(1));
    }

    private LogBean parseLog(String line) {
        String[] fields = line.split("\t");
        if (fields[0].length() < 15) {
            return new LogBean(fields[0], fields[1], fields[2]);
        } else{
            return new LogBean();
        }
    }
}

Reducer

package com.topview.log;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogReducer extends Reducer<LogBean,IntWritable,LogBean, NullWritable> {

    // 一组相同的key,调用一次reduce
    @Override
    protected void reduce(LogBean logBean, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;
        for(IntWritable value:values){
            count++;
            context.write(logBean,NullWritable.get());
        }
    }

}

分区实现

package com.topview.log;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class ProvincePartitioner extends Partitioner<LogBean, IntWritable> {

    @Override
    public int getPartition(LogBean logBean, IntWritable intWritable, int i) {
        String logBeanIp=logBean.getIp();
        String[] key = logBeanIp.split("\\.");
        int ip = Integer.parseInt(key[0]);
        int partition = 0;
        if (ip > 0 && ip < 128) {
            partition = 0;
        } else if (ip > 127 && ip < 192) {
            partition = 1;
        } else if (ip > 191 && ip < 224) {
            partition = 2;
        } else if (ip > 223 && ip < 240) {
            partition = 3;
        } else if (ip > 239 && ip < 256) {
            partition = 4;
        } else {
        }
        return partition;
    }
}

Bean

package com.topview.log;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogBean implements Writable {
    private String ip;
    private String login;
    private String logout;

    public LogBean(){
        super();
    }

    public LogBean(String ip,String login,String logout){
        this.ip=ip;
        this.login=login;
        this.logout=logout;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //序列化
        dataOutput.writeUTF(ip);
        dataOutput.writeUTF(login);
        dataOutput.writeUTF(logout);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化
        ip=dataInput.readUTF();
        login=dataInput.readUTF();
        logout=dataInput.readUTF();
    }

    public String toString(){
        return ip+"\t"+login+"\t"+logout;
    }

    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getLogin() {
        return login;
    }
    public void setLogin(String login) {
        this.login = login;
    }
    public String getLogout() {
        return logout;
    }
    public void setLogout(String logout) {
        this.logout = logout;
    }

}

Driver

package com.topview.log;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogDriver {
    public static void main(String[] args) throws Exception{

        //job
        Job job= Job.getInstance(new Configuration());

        //jar
        job.setJarByClass(LogDriver.class);

        //map
        job.setMapperClass(LogMapper.class);
        job.setMapOutputKeyClass(LogBean.class);
        job.setMapOutputValueClass(IntWritable.class);

        //reduce
        job.setReducerClass(LogReducer.class);
        job.setOutputKeyClass(LogBean.class);
        job.setOutputValueClass(NullWritable.class);

        //分区
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(5);

        //路径
        FileInputFormat.setInputPaths(job,new Path("D:/IDEA/TVlog/input"));
        FileOutputFormat.setOutputPath(job,new Path("D:/IDEA/TVlog/success"));

        //提交
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}
修改了运行了还是报错,希望有好的解决方法,实现Bean封装与分区,还有各个分区的ip数量统计

java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.topview.log.LogBean

是我的Bean类写的有问题吗
  • 写回答

2条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-05-16 02:07
    关注
    • 这篇博客: 【MapReduce】美国新冠疫情案例集 (MR与数据库交互)中的 Step 5:编写Mapper类,传递< 州,记录(Bean对象) > 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
    • import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import java.io.IOException;
      
      public class Map1 extends Mapper<LongWritable, BeanSQL, Text, BeanSQL> {
      
          Text k = new Text();
      
          @Override
          protected void map(LongWritable key, BeanSQL value, Context context) throws IOException, InterruptedException {
              // 将key设置为获取的每行数据的state信息
              k.set(value.getState());
              // 写出
              context.write(k,value);
          }
      }
      

      返回顶部


    评论

报告相同问题?

问题事件

  • 创建了问题 5月16日

悬赏问题

  • ¥15 求京东批量付款能替代天诚
  • ¥15 slaris 系统断电后,重新开机后一直自动重启
  • ¥15 51寻迹小车定点寻迹
  • ¥15 谁能帮我看看这拒稿理由啥意思啊阿啊
  • ¥15 关于vue2中methods使用call修改this指向的问题
  • ¥15 idea自动补全键位冲突
  • ¥15 请教一下写代码,代码好难
  • ¥15 iis10中如何阻止别人网站重定向到我的网站
  • ¥15 滑块验证码移动速度不一致问题
  • ¥15 Utunbu中vscode下cern root工作台中写的程序root的头文件无法包含