TaroLee 2018-06-11 06:38 采纳率: 37.5%
浏览 662
已结题

在MapReduce处理数据时根据ip获取国家名称和国家码等信息

在Windows下可以获取数据,在linux开发环境下获取不到数据是为什么?代码如下:

package com.ctsig.cdn.log.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.Reader;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

  • @copyright:
  • @create: 2018-05-14 17:54
    */
    public class IP2CCUtil {
    private static Logger logger = LoggerFactory.getLogger(IpAddressService.class);

    public String getCountry(String ipAddress) {
    File database = new File("/home/hadoop/GeoLite2-City.mmdb");

    if(isIP(ipAddress)) {
        try {
            InetAddress address = InetAddress.getByName(ipAddress);
            Reader reader = new Reader(database);
            JsonNode response = reader.get(address);
            JsonNode country = response.get("country");
            reader.close();
            return String.format("%s %s",
                    // 国家编码
                    country.get("iso_code").asText(),
                    // 国家英文名
                    country.get("names").get("en").asText());
        }catch (UnknownHostException e1) {
            e1.printStackTrace();
        }catch (IOException e) {
            logger.debug("未查到地址ip为:"+ipAddress+"的国家码和国家名等信息!", e);
        } 
    }
    return null;
    

    }

    /**

    • 判断是否是有效的IP *
    • @param ip IP
    • @return true or false */ public static boolean isIP(String ip) { return ip.matches("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"); }

    public static void main(String[] args) {
    String result = new IP2CCUtil().getCountry("83.83.205.108") ;
    if (result != null) {
    String code = result.split(" ")[0] ;
    String name = result.split(" ")[1] ;
    System.out.println(result);
    System.out.println("Country Code: "+code);
    System.out.println("Country Name: "+name);
    }

    }
    }

  • 写回答

2条回答 默认 最新

  • TaroLee 2018-06-11 06:39
    关注

    package com.ctsig.cdn.log;

    import com.ctsig.cdn.log.util.DealDate;
    import com.ctsig.cdn.log.util.IP2CCUtil;
    import com.ctsig.cdn.log.util.PropsUtil;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.TimeZone;

    public class CleanData extends Configured implements Tool {
    private static Logger logger = LoggerFactory.getLogger(CleanData.class);

    public static class CleanMapper extends Mapper<Object, Text, Text, CleanBean> {
    
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    
            //获取文件名称,根据文件名称获取域名
            InputSplit inputSplit = context.getInputSplit();
            String filename = ((FileSplit) inputSplit).getPath().getName();
            String[] str = filename.split("_");
            String fileName = str[0];
    
            String line = fileName + " " + value.toString();
            context.write(new Text(), new CleanBean(line));
        }
    }
    
    public static class CleanReducer extends Reducer<Text, CleanBean, Text, CleanBean> {
        private MultipleOutputs<Text, CleanBean> multipleOutputs;
        private static IP2CCUtil ip2CCUtil;
    
        @Override
        protected void setup(Context context) {
            multipleOutputs = new MultipleOutputs<Text, CleanBean>(context);
            ip2CCUtil = new IP2CCUtil();
        }
    
        @Override
        protected void reduce(Text key, Iterable<CleanBean> Values, Context context) {
    
            for (CleanBean value : Values) {
                String[] line = value.getLine().split("\\s+");
                String ipvalue = "";
                String hit = "-1";
                String logInfo = "";
                String name = "";
                String UserAgent = "-";
                String doamin = "-";
                String countryCode = "-";
                String countryName = "-";
                String timeTaken ="0";
                String referer = "-";
    
                if (line[0].equals("swiftserve")) {
                    //判断该行数据的长度是否满足需求
                    if(line.length <12 && line.length>0) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    } else {
                        //调用ss清洗程序
                        String datetime = line[4];
                        ipvalue = line[1];
                        try {
                            //根据ip获取国家码和国家名称
                            String couStr = ip2CCUtil.getCountry(ipvalue);
                            if(couStr != null) {
                                countryCode = couStr.split(" ")[0];
                                countryName = couStr.split(" ")[1];
                            }
    
                            //开始处理日志格式
                            //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
                            datetime = DealDate.Swiftservedate(datetime);
                            try {
                                Date d = sdf.parse(datetime.toString());
                                Date date = new Date(d.getTime());
                                SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                //时区处理完毕
                                String newdatetime = newsdf.format(date);
                                String d1[] = newdatetime.split(" ");
                                String namedata[] = newdatetime.split(" ")[0].split("-");
                                name = namedata[0] + namedata[1] + namedata[2];
                                String[] datavalue = value.toString().split("\"");
                                //判断截取的数据是否符合要求
                                if(datavalue.length<12 && datavalue.length >0) {
                                    logger.info("错误的日志数据格式:"+value.getLine());
                                } else {
                                    //处理cs(User-Agent)参数
                                    UserAgent = datavalue[5];
                                    //判断该行是否命中
                                    String hitStr = datavalue[11];
                                    if (hitStr.indexOf("HIT") != -1) {
                                        hit = "HIT";
                                    } else {
                                        hit = "MISS";
                                    }
                                }
    
                                String [] doStr = line[6].split("/");
                                if(doStr.length <3 && doStr.length>0) {
                                    logger.info("错误的日志数据格式:"+value.getLine());
                                } else {
                                    doamin = line[6].split("/")[2];
                                    logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + doamin +
                                            "\001" + line[5].replace("\"", "") + "\001" + line[6].replace("\"", "") + "\001" + line[7] +
                                            "\001" + line[10] + "\001" + hit + "\001" + line[8] + "\001" + line[11].replace("\"", "") +
                                            "\001" + UserAgent;
                                    multipleOutputs.write(key, new CleanBean(logInfo), name);
                                }
                            } catch (ParseException e) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            }
                        } catch (Exception e) {
                            logger.info("错误的日志数据格式:"+value.getLine());
                        }
                    }
    
                } else if (line[0].equals("tata")) {
                    //判断该行数据是否符合要求
                    if(line.length >0 && line.length <12) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    } else {
                        //调用tata清洗程序
                        String datetime = line[4];
                        String timeZone = line[5].replace("]", "");
                        ipvalue = line[1];
                        try {
                            //根据ip获取国家码和国家名称
                            String couStr = ip2CCUtil.getCountry(ipvalue);
                            if(couStr != null) {
                                 countryCode = couStr.split(" ")[0];
                                 countryName = couStr.split(" ")[1];
                            }
    
                            //开始处理日志格式
                            //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            sdf.setTimeZone(TimeZone.getTimeZone("GMT" + timeZone)); // 设置北京时区
                            datetime = DealDate.Tatadate(datetime);
    
                            try {
                                Date d = sdf.parse(datetime.toString());
                                Date date = new Date(d.getTime());
                                SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                //时区处理完毕
                                String newdatetime = newsdf.format(date);
                                String d1[] = newdatetime.split(" ");
                                String namedata[] = newdatetime.split(" ")[0].split("-");
                                name = namedata[0] + namedata[1] + namedata[2];
                                String[] datavalue = value.toString().split("\"");
                                //判断截取的数据是否符合要求
                                if(datavalue.length >0 && datavalue.length <7) {
                                    logger.info("错误的日志数据格式:"+value.getLine());
                                } else {
                                    //处理cs(User-Agent)参数
                                    timeTaken = datavalue[6];
                                    UserAgent = datavalue[5];
                                    logInfo = d1[0] + "\001" + d1[1] + "\001" + line[1] + "\001" + countryName + "\001" + countryCode + "\001" + line[2] + "\001"
                                            + line[6].replace("\"", "") + "\001" + line[7] + "\001" + line[9] + "\001"
                                            + line[10] + "\001" + "-1" + "\001" + timeTaken.replaceAll(" ", "") + "\001"
                                            + line[11].replace("\"", "") + "\001" + UserAgent;
                                    multipleOutputs.write(key, new CleanBean(logInfo), name);
                                }
                            } catch(ParseException e) {
                                logger.info("错误的日志数据格式:"+value.getLine());
                            }
                        } catch (Exception e) {
                            logger.info("错误的日志数据格式:"+value.getLine());
                        }
                    }
    
                } else {
                    //判断该行数据是否符合要求
                    if(line.length >0 && line.length <9) {
                        logger.info("错误的日志数据格式:"+value.getLine());
                    } else {
                        //循环一调参数根据下标确定列
                        String datetime = line[1] + " " + line[2];
                        ipvalue = line[3];
                        //判断该行是否命中
                        String hitStr = line[line.length - 2];
                        String hitlast = hitStr.substring(hitStr.length() - 1, hitStr.length());
                        if (hitlast.equals("0") || hitlast.equals("3")) {
                            hit = "MISS";
                        } else if (hitlast.equals("1") || hitlast.equals("2")) {
                            hit = "HIT";
                        } else {
                            hit = "-1";
                        }
                        try {
                            //根据ip获取国家码和国家名称
                            String couStr = ip2CCUtil.getCountry(ipvalue);
                            if(couStr != null) {
                                countryCode = couStr.split(" ")[0];
                                countryName = couStr.split(" ")[1];
                            }
    
                            if (ipvalue.length() > 0) {
                                //开始处理日志格式
                                //首先处理时区问题,level3的时区为GMT+0;改为GMT+8;
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                sdf.setTimeZone(TimeZone.getTimeZone("GMT+0000")); // 设置北京时区
                                try {
                                    Date d = sdf.parse(datetime.toString());
                                    Date date = new Date(d.getTime());
                                    SimpleDateFormat newsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                    //时区处理完毕
                                    String newdatetime = newsdf.format(date);
                                    String d1[] = newdatetime.split(" ");
    
                                    String namedata[] = newdatetime.split(" ")[0].split("-");
                                    name = namedata[0] + namedata[1] + namedata[2];
                                    //处理cs(User-Agent)参数
                                    String[] datavalue = value.toString().split("\"");
                                    if(datavalue.length >0 && datavalue.length <4) {
                                        logger.info("错误的日志数据格式:"+value.getLine());
                                    } else {
                                        UserAgent = datavalue[3];
                                        referer = datavalue[1];
                                        logInfo = d1[0] + "\001" + d1[1] + "\001" + line[3] + "\001" + countryName +
                                                "\001" + countryCode + "\001" + line[0] + "\001" + line[4] +
                                                "\001" + line[5] + "\001" + line[6] + "\001" + line[7] +
                                                "\001" + hit + "\001" + line[8] + "\001" + referer + "\001" + UserAgent;
                                        multipleOutputs.write(key, new CleanBean(logInfo), name);
                                    }
    
                                } catch (ParseException e) {
                                    logger.info("错误的日志数据格式:"+value.getLine());
                                }
                            }
                        } catch (Exception e) {
                            logger.info("错误的日志数据格式:"+value.getLine());
                        }
                    }
                }
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }
    
    
    @Override
    public int run(String[] args) throws Exception {
        //读取配置文件
        Configuration conf = new Configuration();
        // 解决java.io.IOException: No FileSystem for scheme: hdfs异常
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        //判断该天的数据是否已经清洗完成(即是否有_SUCCESS文件)
        Path fileSucc = new Path(args[1]+"/_SUCCESS");
        FileSystem fsSucc = fileSucc.getFileSystem(conf);
        if(fsSucc.exists(fileSucc)) {
            logger.info("该天的数据已经清洗完成!");
            return 0;
        } else {
            //判断目录是否存在,如果存在,则删除
            Path output = new Path(args[1]);
            FileSystem fs = output.getFileSystem(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
        }
    
        //新建一个任务
        Job job = Job.getInstance(conf);
    
        //主类
        job.setJarByClass(CleanData.class);
    
        //输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        //Mapper
        job.setMapperClass(CleanMapper.class);
        //Reducer
        job.setReducerClass(CleanReducer.class);
    
        //key输出类型
        job.setOutputKeyClass(Text.class);
        //value输出类型
        job.setOutputValueClass(CleanBean.class);
    
        //去掉job设置outputFormatClass,改成通过LazyOutputFormat设置
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    
    public static class CleanBean implements Writable {
    
        /*
         * 成员变量
         */
        private String line;  //每行的数据
    
        public CleanBean() {
            super();
        }
    
        public CleanBean(String line) {
            super();
            this.line = line;
        }
    
        public String getLine() {
            return line;
        }
    
        public void setLine(String line) {
            this.line = line;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(line);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            line = in.readUTF();
        }
    
        @Override
        public String toString() {
            return new String(this.line);
        }
    }
    
    public static void main(String[] args) throws Exception {
        // 从配置文件中读取属性
        /**  服务器运行报:java.lang.NullPointerException
         PropertyReader reader = new PropertyReader("config.properties");
         String hdfsUrl = reader.getProperty("hdfs.default.url") ;
         String originPath = reader.getProperty("hdfs.default.origin.path") ;
         String cleanedPath = reader.getProperty("hdfs.default.cleaned.path");
         logger.debug("HDFS URL: {}",hdfsUrl);
         **/
        PropsUtil r = new PropsUtil("config.properties");
        String hdfsUrl = r.getString("hdfs.default.url");
        String originPath = r.getString("hdfs.default.origin.path");
        String cleanedPath = r.getString("hdfs.default.cleaned.path");
    
        //数据输入路径和输出路径
        String[] ag = {hdfsUrl + originPath + "/tata/" + args[0],
                hdfsUrl + cleanedPath + "/" + args[0]};
    
        int ec = ToolRunner.run(new Configuration(), new CleanData(), ag);
        System.exit(ec);
    }
    

    }

    评论

报告相同问题?

悬赏问题

  • ¥20 腾讯企业邮箱邮件可以恢复么
  • ¥15 有人知道怎么将自己的迁移策略布到edgecloudsim上使用吗?
  • ¥15 错误 LNK2001 无法解析的外部符号
  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能
  • ¥15 jmeter脚本回放有的是对的有的是错的
  • ¥15 r语言蛋白组学相关问题
  • ¥15 Python时间序列如何拟合疏系数模型