package com.ctsig.cdn.log;
import com.ctsig.cdn.log.util.DealDate;
import com.ctsig.cdn.log.util.IP2CCUtil;
import com.ctsig.cdn.log.util.PropsUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
public class CleanData extends Configured implements Tool {
private static Logger logger = LoggerFactory.getLogger(CleanData.class);
public static class CleanMapper extends Mapper<Object, Text, Text, CleanBean> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//获取文件名称,根据文件名称获取域名
InputSplit inputSplit = context.getInputSplit();
String filename = ((FileSplit) inputSplit).getPath().getName();
String[] str = filename.split("_");
String fileName = str[0];
String line = fileName + " " + value.toString();
context.write(new Text(), new CleanBean(line));
}
}
public static class CleanReducer extends Reducer<Text, CleanBean, Text, CleanBean> {
private MultipleOutputs<Text, CleanBean> multipleOutputs;
private static IP2CCUtil ip2CCUtil;
@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<Text, CleanBean>(context);
ip2CCUtil = new IP2CCUtil();
}
@Override
protected void reduce(Text key, Iterable<CleanBean> Values, Context context) {
for (CleanBean value : Values) {
String[] line = value.getLine().split("\\s+");
String ipvalue = "";
String hit = "-1";
String logInfo = "";
String name = "";
String UserAgent = "-";
String doamin = "-";
String countryCode = "-";
String countryName = "-";
String timeTaken ="0";
String referer = "-";
if (line[0].equals("swiftserve")) {
//判断该行数据的长度是否满足需求
if(line.length <12 && line.length>0) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
//调用ss清洗程序
String datetime = line[4];
ipvalue = line[1];
try {
//根据ip获取国家码和国家名称
String couStr = ip2CCUtil.getCountry(ipvalue);
if(couStr != null) {
countryCode = couStr.split(" ")[0];
countryName = couStr.split(" ")[1];
}
//开始处理日志格式
//首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
datetime = DealDate.Swiftservedate(datetime);
try {
Date d = sdf.parse(datetime.toString());
Date date = new Date(d.getTime());
SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//时区处理完毕
String newdatetime = newsdf.format(date);
String d1[] = newdatetime.split(" ");
String namedata[] = newdatetime.split(" ")[0].split("-");
name = namedata[0] + namedata[1] + namedata[2];
String[] datavalue = value.toString().split("\"");
//判断截取的数据是否符合要求
if(datavalue.length<12 && datavalue.length >0) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
//处理cs(User-Agent)参数
UserAgent = datavalue[5];
//判断该行是否命中
String hitStr = datavalue[11];
if (hitStr.indexOf("HIT") != -1) {
hit = "HIT";
} else {
hit = "MISS";
}
}
String [] doStr = line[6].split("/");
if(doStr.length <3 && doStr.length>0) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
doamin = line[6].split("/")[2];
logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + doamin +
"\001" + line[5].replace("\"", "") + "\001" + line[6].replace("\"", "") + "\001" + line[7] +
"\001" + line[10] + "\001" + hit + "\001" + line[8] + "\001" + line[11].replace("\"", "") +
"\001" + UserAgent;
multipleOutputs.write(key, new CleanBean(logInfo), name);
}
} catch (ParseException e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
} catch (Exception e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
}
} else if (line[0].equals("tata")) {
//判断该行数据是否符合要求
if(line.length >0 && line.length <12) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
//调用tata清洗程序
String datetime = line[4];
String timeZone = line[5].replace("]", "");
ipvalue = line[1];
try {
//根据ip获取国家码和国家名称
String couStr = ip2CCUtil.getCountry(ipvalue);
if(couStr != null) {
countryCode = couStr.split(" ")[0];
countryName = couStr.split(" ")[1];
}
//开始处理日志格式
//首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("GMT" + timeZone)); // 设置北京时区
datetime = DealDate.Tatadate(datetime);
try {
Date d = sdf.parse(datetime.toString());
Date date = new Date(d.getTime());
SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//时区处理完毕
String newdatetime = newsdf.format(date);
String d1[] = newdatetime.split(" ");
String namedata[] = newdatetime.split(" ")[0].split("-");
name = namedata[0] + namedata[1] + namedata[2];
String[] datavalue = value.toString().split("\"");
//判断截取的数据是否符合要求
if(datavalue.length >0 && datavalue.length <7) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
//处理cs(User-Agent)参数
timeTaken = datavalue[6];
UserAgent = datavalue[5];
logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + line[2] + "\001"
+ line[6].replace("\"", "") + "\001" + line[7] + "\001" + line[9] + "\001"
+ line[10] + "\001" + "-1" + "\001" + timeTaken.replaceAll(" ", "") + "\001"
+ line[11].replace("\"", "") + "\001" + UserAgent;
multipleOutputs.write(key, new CleanBean(logInfo), name);
}
} catch(ParseException e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
} catch (Exception e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
}
} else {
//判断该行数据是否符合要求
if(line.length >0 && line.length <9) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
//循环一调参数根据下标确定列
String datetime = line[1] + " " + line[2];
ipvalue = line[3];
//判断该行是否命中
String hitStr = line[line.length - 2];
String hitlast = hitStr.substring(hitStr.length() - 1, hitStr.length());
if (hitlast.equals("0") || hitlast.equals("3")) {
hit = "MISS";
} else if (hitlast.equals("1") || hitlast.equals("2")) {
hit = "HIT";
} else {
hit = "-1";
}
try {
//根据ip获取国家码和国家名称
String couStr = ip2CCUtil.getCountry(ipvalue);
if(couStr != null) {
countryCode = couStr.split(" ")[0];
countryName = couStr.split(" ")[1];
}
if (ipvalue.length() > 0) {
//开始处理日志格式
//首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
try {
Date d = sdf.parse(datetime.toString());
Date date = new Date(d.getTime());
SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//时区处理完毕
String newdatetime = newsdf.format(date);
String d1[] = newdatetime.split(" ");
String namedata[] = newdatetime.split(" ")[0].split("-");
name = namedata[0] + namedata[1] + namedata[2];
//处理cs(User-Agent)参数
String[] datavalue = value.toString().split("\"");
if(datavalue.length >0 && datavalue.length <4) {
logger.info("错误的日志数据格式:"+value.getLine());
} else {
UserAgent = datavalue[3];
referer = datavalue[1];
logInfo = d1[0] + "\001" + d1[1] + "\001" + line[3] + "\001" + countryName +
"\001" + countryCode + "\001" + line[0] + "\001" + line[4] +
"\001" + line[5] + "\001" + line[6] + "\001" + line[7] +
"\001" + hit + "\001" + line[8] + "\001" + referer + "\001" + UserAgent;
multipleOutputs.write(key, new CleanBean(logInfo), name);
}
} catch (ParseException e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
}
} catch (Exception e) {
logger.info("错误的日志数据格式:"+value.getLine());
}
}
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
@Override
public int run(String[] args) throws Exception {
//读取配置文件
Configuration conf = new Configuration();
// 解决java.io.IOException: No FileSystem for scheme: hdfs异常
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
//判断该天的数据是否已经清洗完成(即是否有_SUCCESS文件)
Path fileSucc = new Path(args[1]+"/_SUCCESS");
FileSystem fsSucc = fileSucc.getFileSystem(conf);
if(fsSucc.exists(fileSucc)) {
logger.info("该天的数据已经清洗完成!");
return 0;
} else {
//判断目录是否存在,如果存在,则删除
Path output = new Path(args[1]);
FileSystem fs = output.getFileSystem(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
}
//新建一个任务
Job job = Job.getInstance(conf);
//主类
job.setJarByClass(CleanData.class);
//输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper
job.setMapperClass(CleanMapper.class);
//Reducer
job.setReducerClass(CleanReducer.class);
//key输出类型
job.setOutputKeyClass(Text.class);
//value输出类型
job.setOutputValueClass(CleanBean.class);
//去掉job设置outputFormatClass,改成通过LazyOutputFormat设置
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class CleanBean implements Writable {
/*
* 成员变量
*/
private String line; //每行的数据
public CleanBean() {
super();
}
public CleanBean(String line) {
super();
this.line = line;
}
public String getLine() {
return line;
}
public void setLine(String line) {
this.line = line;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(line);
}
@Override
public void readFields(DataInput in) throws IOException {
line = in.readUTF();
}
@Override
public String toString() {
return new String(this.line);
}
}
public static void main(String[] args) throws Exception {
// 从配置文件中读取属性
/** 服务器运行报:java.lang.NullPointerException
PropertyReader reader = new PropertyReader("config.properties");
String hdfsUrl = reader.getProperty("hdfs.default.url") ;
String originPath = reader.getProperty("hdfs.default.origin.path") ;
String cleanedPath = reader.getProperty("hdfs.default.cleaned.path");
logger.debug("HDFS URL: {}",hdfsUrl);
**/
PropsUtil r = new PropsUtil("config.properties");
String hdfsUrl = r.getString("hdfs.default.url");
String originPath = r.getString("hdfs.default.origin.path");
String cleanedPath = r.getString("hdfs.default.cleaned.path");
//数据输入路径和输出路径
String[] ag = {hdfsUrl + originPath + "/tata/" + args[0],
hdfsUrl + cleanedPath + "/" + args[0]};
int ec = ToolRunner.run(new Configuration(), new CleanData(), ag);
System.exit(ec);
}
}