MapReduce分区输出ip并且统计个数(2
在使用mapReduce实现不同类别ip地址输出到不同文件如何统计每个类型ip地址的个数,将mapper结果用bean封装,分区主要靠partitioner实现,但是一直出现java.lang.ClassCastException的问题,按照网上的法子改了很多次都不行
Mapper
package com.topview.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class LogMapper extends Mapper<LongWritable,Text,LogBean, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//178.199.96.56 2023-03-17 02:29:29 2023-03-17 03:09:25
//获取一行
String line = value.toString();
//日志解析
LogBean logBean = parseLog(line);
//写出
context.write(logBean, new IntWritable(1));
}
private LogBean parseLog(String line) {
String[] fields = line.split("\t");
if (fields[0].length() < 15) {
return new LogBean(fields[0], fields[1], fields[2]);
} else{
return new LogBean();
}
}
}
Reducer
package com.topview.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class LogReducer extends Reducer<LogBean,IntWritable,LogBean, NullWritable> {
// 一组相同的key,调用一次reduce
@Override
protected void reduce(LogBean logBean, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable value:values){
count++;
context.write(logBean,NullWritable.get());
}
}
}
分区实现
package com.topview.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class ProvincePartitioner extends Partitioner<LogBean, IntWritable> {
@Override
public int getPartition(LogBean logBean, IntWritable intWritable, int i) {
String logBeanIp=logBean.getIp();
String[] key = logBeanIp.split("\\.");
int ip = Integer.parseInt(key[0]);
int partition = 0;
if (ip > 0 && ip < 128) {
partition = 0;
} else if (ip > 127 && ip < 192) {
partition = 1;
} else if (ip > 191 && ip < 224) {
partition = 2;
} else if (ip > 223 && ip < 240) {
partition = 3;
} else if (ip > 239 && ip < 256) {
partition = 4;
} else {
}
return partition;
}
}
Bean
package com.topview.log;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class LogBean implements Writable {
private String ip;
private String login;
private String logout;
public LogBean(){
super();
}
public LogBean(String ip,String login,String logout){
this.ip=ip;
this.login=login;
this.logout=logout;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
//序列化
dataOutput.writeUTF(ip);
dataOutput.writeUTF(login);
dataOutput.writeUTF(logout);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
//反序列化
ip=dataInput.readUTF();
login=dataInput.readUTF();
logout=dataInput.readUTF();
}
public String toString(){
return ip+"\t"+login+"\t"+logout;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getLogin() {
return login;
}
public void setLogin(String login) {
this.login = login;
}
public String getLogout() {
return logout;
}
public void setLogout(String logout) {
this.logout = logout;
}
}
Driver
package com.topview.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class LogDriver {
public static void main(String[] args) throws Exception{
//job
Job job= Job.getInstance(new Configuration());
//jar
job.setJarByClass(LogDriver.class);
//map
job.setMapperClass(LogMapper.class);
job.setMapOutputKeyClass(LogBean.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(LogBean.class);
job.setOutputValueClass(NullWritable.class);
//分区
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
//路径
FileInputFormat.setInputPaths(job,new Path("D:/IDEA/TVlog/input"));
FileOutputFormat.setOutputPath(job,new Path("D:/IDEA/TVlog/success"));
//提交
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
修改了运行了还是报错,希望有好的解决方法,实现Bean封装与分区,还有各个分区的ip数量统计
java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.topview.log.LogBean