MapReduce结果输出到HBase,一直连接不上MySQL
Error: java.io.IOException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.****
版本 HBase 2.5.5
mysql 8.0
mysql-connect 8.0.33
public class stationStatic {
private static StationMapper stationMapper;
public static class StationCountMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//读取hbase表中的一行
//定义列簇、列名
byte[] fm = Bytes.toBytes("record");
byte[] c1 = Bytes.toBytes("stationID");
byte[] c2 = Bytes.toBytes("time");
byte[] c3 = Bytes.toBytes("lineID");
if (value.containsColumn(fm, c1)) {
//提取地铁站名名称
String name = Bytes.toString(value.getValue(fm, c1));
String time = Bytes.toString(value.getValue(fm, c2));
String line = Bytes.toString(value.getValue(fm, c3));
//输出 2019-01-01,B,27
context.write(new Text(time.substring(0, 10) + "," + line + "," + name), new IntWritable(1));
}
}
}
public static class StationCountReducer extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//统计客流量
float playCount = 0;
for (IntWritable v : values) {
playCount += v.get();
}
playCount = playCount / 20000;
//输出 2019-01-01,B,27
String[] items = key.toString().split(",");
//定义行键、列簇、列名、值
byte[] rk = Bytes.toBytes(key.toString());
byte[] fm = Bytes.toBytes("count");
byte[] c0 = Bytes.toBytes("station");
byte[] v0 = Bytes.toBytes(items[2]);
byte[] c1 = Bytes.toBytes("station_count");
byte[] v1 = Bytes.toBytes(String.valueOf(playCount));
byte[] c2 = Bytes.toBytes("time");
byte[] v2 = Bytes.toBytes(items[0]);
byte[] c3 = Bytes.toBytes("line");
byte[] v3 = Bytes.toBytes(items[1]);
stationMapper.addStationNum(v2.toString(),v0.toString(),v3.toString(),playCount);
//构建Put对象
Put put = new Put(rk);
put.addColumn(fm, c0, v0);
put.addColumn(fm, c1, v1);
put.addColumn(fm, c2, v2);
put.addColumn(fm, c3, v3);
//输出
context.write(NullWritable.get(), put);
}
}
public void run() {
try {
System.setProperty("HADOOP_USER_NAME","root");
//定义表名
String tableName = "pro:station_count";
HBaseUtil.createTable(tableName, "count", false);
Configuration conf = HBaseConfiguration.create();
DistributedCache.addFileToClassPath(new Path("hdfs://master:9000/user/hadoop/mysql-connector-j-8.0.33.jar"), conf);
DBConfiguration.configureDB(conf,
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost:3306/underground",
"root",
"6636");
conf.set("mapreduce.app-submission.cross-platform","true");
//创建Job
Job job = Job.getInstance(conf, "station count");
job.setJar("D:\\gruaduating_project\\SuzhouUnderground\\target\\SuzhouUnderground-0.0.1-SNAPSHOT.jar");
job.setJarByClass(stationStatic.class);
//定义Sacn用于读取hbase表中数据
Scan scan = new Scan();
//定义列簇、列名
byte[] fm = Bytes.toBytes("record");
scan.addFamily(fm);
//设置mapper
TableMapReduceUtil.initTableMapperJob("pro:work", scan, StationCountMapper.class,
Text.class, IntWritable.class, job);
//设置reducer
job.setNumReduceTasks(1);
TableMapReduceUtil.initTableReducerJob(tableName, StationCountReducer.class, job);
job.setOutputKeyClass(StationOutput.class);
job.setOutputValueClass(NullWritable.class);
DBOutputFormat.setOutput(job, "stationnum" , "get_time","station","stationNum","line");
job.setOutputFormatClass(DBOutputFormat.class);
//运行
boolean flag = job.waitForCompletion(true);
if (flag) {
System.out.println("地铁站客流量统计完成~~~");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}