MapReduce实现ip分类输出并统计个数
在使用mapReduce实现不同类别ip地址输出到不同文件如何统计每个类型ip地址的个数,将mapper结果用bean封装,分区主要靠partitioner实现,但是一直出现java.lang.ClassCastException的问题,按照网上的法子改了很多次都不行
Mapper代码
package com.topview.log;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable,Text,Text,LogBean> {
Text outK=new Text();
LogBean outV=new LogBean();
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
//178.199.96.56 2023-03-17 02:29:29 2023-03-17 03:09:25
//获取一行
String line=value.toString();
//日志解析
LogBean logBean=parseLog(line);
//合法性
if(!logBean.isValid()){
return;
}
outK.set(line);
//写出
context.write(outK,outV);
}
private LogBean parseLog(String line) {
//切割
String[] fields = line.split("\t");
if(fields[0].length()<15) {
outV.setIp(fields[0]);
outV.setLogin(fields[1]);
outV.setLogout(fields[2]);
outV.setValid(true);
}else{
outV.setValid(false);
}
//判断是否合法
return outV;
}
}
Bean代码
package com.topview.log;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class LogBean implements Writable {
private String ip;
private String login;
private String logout;
private boolean valid = true;
public LogBean(){
super();
}
public LogBean(String ip,String login,String logout){
super();
this.ip=ip;
this.login=login;
this.logout=logout;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
//序列化
dataOutput.writeUTF(ip);
dataOutput.writeUTF(login);
dataOutput.writeUTF(logout);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
//反序列化
ip=dataInput.readUTF();
login=dataInput.readUTF();
logout=dataInput.readUTF();
}
public String toString(){
return ip+"\t"+login+"\t"+logout;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getLogin() {
return login;
}
public void setLogin(String login) {
this.login = login;
}
public String getLogout() {
return logout;
}
public void setLogout(String logout) {
this.logout = logout;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
}
Partitioner代码
package com.topview.log;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class ProvincePartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
String key=text.toString();
String[] keys=key.split(".");
int ip=Integer.parseInt(keys[0]);
int partition=0;
if(ip>0&&ip<128){
partition=0;
}else if(ip>127&&ip<192){
partition=1;
}else if(ip>191&&ip<224){
partition=2;
}else if (ip>223&&ip<240){
partition=3;
}else if (ip>239&&ip<256){
partition=4;
}else{
}
return partition;
}
}
Driver类
package com.topview.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Title:
* @Package
* @Description:
* @author: Yeeasy
**/
public class LogDriver {
public static void main(String[] args) throws Exception{
//job
Job job= Job.getInstance(new Configuration());
//jar
job.setJarByClass(LogDriver.class);
//map
job.setMapperClass(LogMapper.class);
job.setMapOutputKeyClass(LogBean.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce
job.setReducerClass(LogReducer.class);
job.setOutputKeyClass(LogBean.class);
job.setOutputValueClass(IntWritable.class);
//分区
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
//路径
FileInputFormat.setInputPaths(job,new Path("D:/IDEA/TVlog/input"));
FileOutputFormat.setOutputPath(job,new Path("D:/IDEA/TVlog/success"));
//提交
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}