椰Ye 2023-05-15 23:41 采纳率: 63.2%
浏览 27
已结题

MapReduce分区输出ip并且统计个数

MapReduce实现ip分类输出并统计个数
在使用mapReduce实现不同类别ip地址输出到不同文件如何统计每个类型ip地址的个数,将mapper结果用bean封装,分区主要靠partitioner实现,但是一直出现java.lang.ClassCastException的问题,按照网上的法子改了很多次都不行

Mapper代码

package com.topview.log;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable,Text,Text,LogBean> {

    Text outK=new Text();
    LogBean outV=new LogBean();
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {

        //178.199.96.56    2023-03-17 02:29:29    2023-03-17 03:09:25
        //获取一行
        String line=value.toString();
        //日志解析
        LogBean logBean=parseLog(line);
        //合法性
        if(!logBean.isValid()){
            return;
        }
        outK.set(line);

        //写出
        context.write(outK,outV);
    }

    private LogBean parseLog(String line) {
        //切割
        String[] fields = line.split("\t");
        if(fields[0].length()<15) {
            outV.setIp(fields[0]);
            outV.setLogin(fields[1]);
            outV.setLogout(fields[2]);
            outV.setValid(true);
        }else{
            outV.setValid(false);
        }
        //判断是否合法
        return outV;

    }
}

Bean代码

package com.topview.log;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class LogBean implements Writable {
    private String ip;
    private String login;
    private String logout;
    private boolean valid = true;
    public LogBean(){
        super();
    }

    public LogBean(String ip,String login,String logout){
        super();
        this.ip=ip;
        this.login=login;
        this.logout=logout;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //序列化
        dataOutput.writeUTF(ip);
        dataOutput.writeUTF(login);
        dataOutput.writeUTF(logout);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化
        ip=dataInput.readUTF();
        login=dataInput.readUTF();
        logout=dataInput.readUTF();
    }

    public String toString(){
        return ip+"\t"+login+"\t"+logout;
    }

    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getLogin() {
        return login;
    }
    public void setLogin(String login) {
        this.login = login;
    }
    public String getLogout() {
        return logout;
    }
    public void setLogout(String logout) {
        this.logout = logout;
    }
    public boolean isValid() {
        return valid;
    }
    public void setValid(boolean valid) {
        this.valid = valid;
    }
}

Partitioner代码

package com.topview.log;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class ProvincePartitioner extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        String key=text.toString();
        String[] keys=key.split(".");
        int ip=Integer.parseInt(keys[0]);
        int partition=0;
        if(ip>0&&ip<128){
            partition=0;
        }else if(ip>127&&ip<192){
            partition=1;
        }else if(ip>191&&ip<224){
            partition=2;
        }else if (ip>223&&ip<240){
            partition=3;
        }else if (ip>239&&ip<256){
            partition=4;
        }else{
        }
        return partition;
    }
}

Driver类

package com.topview.log;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogDriver {
    public static void main(String[] args) throws Exception{

        //job
        Job job= Job.getInstance(new Configuration());

        //jar
        job.setJarByClass(LogDriver.class);

        //map
        job.setMapperClass(LogMapper.class);
        job.setMapOutputKeyClass(LogBean.class);
        job.setMapOutputValueClass(IntWritable.class);

        //reduce
        job.setReducerClass(LogReducer.class);
        job.setOutputKeyClass(LogBean.class);
        job.setOutputValueClass(IntWritable.class);

        //分区
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(5);

        //路径
        FileInputFormat.setInputPaths(job,new Path("D:/IDEA/TVlog/input"));
        FileOutputFormat.setOutputPath(job,new Path("D:/IDEA/TVlog/success"));

        //提交
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}


java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.topview.log.LogBean
看了driver类,应该是没有问题的,不知道如何实现这个ip分类和统计
  • 写回答

1条回答 默认 最新

  • 卑以自牧w 2023-05-16 05:51
    关注

    bean类需要继承WritableComparable这个类,重写compareTo方法,单纯实现Writable,你自定义的对象没有实现自动排序,你要在compareTo中定义先来后到的存入

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 5月24日
  • 已采纳回答 5月16日
  • 创建了问题 5月15日

悬赏问题

  • ¥15 求京东批量付款能替代天诚
  • ¥15 slaris 系统断电后,重新开机后一直自动重启
  • ¥15 51寻迹小车定点寻迹
  • ¥15 谁能帮我看看这拒稿理由啥意思啊阿啊
  • ¥15 关于vue2中methods使用call修改this指向的问题
  • ¥15 idea自动补全键位冲突
  • ¥15 请教一下写代码,代码好难
  • ¥15 iis10中如何阻止别人网站重定向到我的网站
  • ¥15 滑块验证码移动速度不一致问题
  • ¥15 Utunbu中vscode下cern root工作台中写的程序root的头文件无法包含