小楼XiaoLou 2024-04-18 13:00 采纳率: 0%
浏览 8
已结题

MapReduce结果输出到HBase,一直连接不上MySQL

MapReduce结果输出到HBase,一直连接不上MySQL
Error: java.io.IOException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
****

img

版本 HBase 2.5.5
mysql 8.0
mysql-connect 8.0.33

public class stationStatic {
    private static StationMapper stationMapper;
    public static class StationCountMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//读取hbase表中的一行
            //定义列簇、列名
            byte[] fm = Bytes.toBytes("record");
            byte[] c1 = Bytes.toBytes("stationID");
            byte[] c2 = Bytes.toBytes("time");
            byte[] c3 = Bytes.toBytes("lineID");
            if (value.containsColumn(fm, c1)) {
                //提取地铁站名名称
                String name = Bytes.toString(value.getValue(fm, c1));
                String time = Bytes.toString(value.getValue(fm, c2));
                String line = Bytes.toString(value.getValue(fm, c3));
                //输出 2019-01-01,B,27
                context.write(new Text(time.substring(0, 10) + "," + line + "," + name), new IntWritable(1));
            }
        }
    }

    public static class StationCountReducer extends TableReducer<Text, IntWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //统计客流量
            float playCount = 0;
            for (IntWritable v : values) {
                playCount += v.get();
            }
            playCount = playCount / 20000;
            //输出 2019-01-01,B,27
            String[] items = key.toString().split(",");
            //定义行键、列簇、列名、值
            byte[] rk = Bytes.toBytes(key.toString());
            byte[] fm = Bytes.toBytes("count");
            byte[] c0 = Bytes.toBytes("station");
            byte[] v0 = Bytes.toBytes(items[2]);
            byte[] c1 = Bytes.toBytes("station_count");
            byte[] v1 = Bytes.toBytes(String.valueOf(playCount));
            byte[] c2 = Bytes.toBytes("time");
            byte[] v2 = Bytes.toBytes(items[0]);
            byte[] c3 = Bytes.toBytes("line");
            byte[] v3 = Bytes.toBytes(items[1]);
            stationMapper.addStationNum(v2.toString(),v0.toString(),v3.toString(),playCount);
            //构建Put对象
            Put put = new Put(rk);
            put.addColumn(fm, c0, v0);
            put.addColumn(fm, c1, v1);
            put.addColumn(fm, c2, v2);
            put.addColumn(fm, c3, v3);
            //输出
            context.write(NullWritable.get(), put);
        }
    }

    public void run() {
        try {
            System.setProperty("HADOOP_USER_NAME","root");
            //定义表名
            String tableName = "pro:station_count";
            HBaseUtil.createTable(tableName, "count", false);
            Configuration conf = HBaseConfiguration.create();
            DistributedCache.addFileToClassPath(new Path("hdfs://master:9000/user/hadoop/mysql-connector-j-8.0.33.jar"), conf);
            DBConfiguration.configureDB(conf,
                    "com.mysql.cj.jdbc.Driver",
                    "jdbc:mysql://localhost:3306/underground",
                    "root",
                    "6636");
            conf.set("mapreduce.app-submission.cross-platform","true");
            //创建Job
            Job job = Job.getInstance(conf, "station count");
            job.setJar("D:\\gruaduating_project\\SuzhouUnderground\\target\\SuzhouUnderground-0.0.1-SNAPSHOT.jar");
            job.setJarByClass(stationStatic.class);
            //定义Sacn用于读取hbase表中数据
            Scan scan = new Scan();
            //定义列簇、列名
            byte[] fm = Bytes.toBytes("record");
            scan.addFamily(fm);
            //设置mapper
            TableMapReduceUtil.initTableMapperJob("pro:work", scan, StationCountMapper.class,
                    Text.class, IntWritable.class, job);
            //设置reducer
            job.setNumReduceTasks(1);
            TableMapReduceUtil.initTableReducerJob(tableName, StationCountReducer.class, job);
            job.setOutputKeyClass(StationOutput.class);
            job.setOutputValueClass(NullWritable.class);
            DBOutputFormat.setOutput(job, "stationnum" , "get_time","station","stationNum","line");
            job.setOutputFormatClass(DBOutputFormat.class);
            //运行
            boolean flag = job.waitForCompletion(true);
            if (flag) {
                System.out.println("地铁站客流量统计完成~~~");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 写回答

26条回答 默认 最新

  • 阿里嘎多学长 2024-04-18 13:06
    关注
    获得0.15元问题酬金

    以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


    从您提供的链接内容来看,这是一个关于MapReduce结果输出到HBase时遇到的问题。用户遇到了一个java.io.IOException: Communications link failure的异常,这通常意味着与MySQL服务器的通信连接失败。这个问题可能是由多种原因造成的,例如网络问题、配置错误、驱动程序不兼容等。

    在您提供的代码示例中,尝试将MapReduce的结果输出到HBase,并且似乎还尝试连接MySQL。以下是一些可能导致连接失败的原因以及相应的解决方案:

    1. 网络问题:确保MySQL服务器正在运行,并且可以从运行MapReduce作业的机器上访问。

    2. 配置错误:检查MySQL的URL、用户名和密码是否正确。

    3. 驱动程序不兼容:确保使用的JDBC驱动程序与MySQL服务器的版本兼容。

    4. 权限问题:确保MySQL用户具有足够的权限来执行查询和操作数据库。

    5. 防火墙或安全组设置:如果MySQL服务器位于远程或受防火墙保护的网络中,确保相应的端口(默认是3306)已开放。

    6. HBase和MySQL的版本兼容性:确保HBase和MySQL的版本不冲突。

    针对您提供的代码,以下是一些改进建议:

    • 确保DistributedCache添加的MySQL JDBC驱动程序路径是正确的,并且该JAR文件在指定的HDFS路径上可用。
    • 使用DBConfiguration.configureDB方法配置数据库连接时,确保所有参数(包括URL、用户名和密码)都是正确的。
    • 检查job.setOutputFormatClass(DBOutputFormat.class);这一行是否正确设置了输出格式,DBOutputFormat需要正确配置以与MySQL配合使用。

    如果上述建议仍然无法解决问题,您可能需要提供更多的错误日志信息,或者检查MySQL服务器的状态和配置。

    此外,您的代码示例中包含了一个自定义的StationMapper类,但是在示例中并没有看到该类的定义。确保StationMapper类存在,并且其addStationNum方法可以正确执行。

    如果您需要示例代码来展示如何正确配置MapReduce作业以输出到HBase并连接MySQL,我可以提供一个简化的示例。但请注意,这只是一个示例,您需要根据您的具体需求进行调整:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    
    import java.io.IOException;
    
    public class HBaseMySQLExample {
    
        public static class HBaseMapper extends Mapper<ImmutableBytesWritable, Result, Text, IntWritable> {
            // Map逻辑...
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = HBaseConfiguration.create();
            // 添加MySQL JDBC驱动到classpath
            DistributedCache.addFileToClassPath(new Path("path_to_your_mysql_connector_jar"), conf);
            
            // 配置数据库连接
            DBConfiguration.configureDB(conf,
                    "com.mysql.cj.jdbc.Driver",
                    "jdbc:mysql://your-mysql-host:3306/your-database",
                    "your-username",
                    "your-password");
    
            Job job = Job.getInstance(conf, "HBase to MySQL");
            job.setJarByClass(HBaseMySQLExample.class);
            job.setMapperClass(HBaseMapper.class);
            // 设置其他MapReduce配置...
    
            // 初始化TableReducerJob
            TableMapReduceUtil.initTableReducerJob(
                    "your-hbase-table",
                    null,
                    job,
                    null);
    
            // 设置输出格式为DBOutputFormat,指定输出表和列
            DBOutputFormat.setOutput(job, 
                    "your-mysql-output-table",
                    "your-mysql-output-columns");
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

    请根据您的实际情况调整上述示例代码中的参数,例如数据库连接字符串、用户名、密码、HBase表名、MySQL输出表和列等。

    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 4月26日
  • 创建了问题 4月18日

悬赏问题

  • ¥15 微信会员卡等级和折扣规则
  • ¥15 微信公众平台自制会员卡可以通过收款码收款码收款进行自动积分吗
  • ¥15 随身WiFi网络灯亮但是没有网络,如何解决?
  • ¥15 gdf格式的脑电数据如何处理matlab
  • ¥20 重新写的代码替换了之后运行hbuliderx就这样了
  • ¥100 监控抖音用户作品更新可以微信公众号提醒
  • ¥15 UE5 如何可以不渲染HDRIBackdrop背景
  • ¥70 2048小游戏毕设项目
  • ¥20 mysql架构,按照姓名分表
  • ¥15 MATLAB实现区间[a,b]上的Gauss-Legendre积分