package com.neuedu.myhbase.mybase;
import com.neuedu.myhbase.mybase.HbsaseUtils;
import com.neuedu.myhbase.mybase.WeatherWritable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class Step6 {
private static class Step6Mapper extends TableMapper<Text, WeatherWritable> {
private final int targetYear;
public Step6Mapper(int targetYear) {
this.targetYear = targetYear;
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result,
Text, WeatherWritable>.Context context) throws IOException, InterruptedException {
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
if (value.isEmpty()) {
return;
}
String rowKey = Bytes.toString(key.get());
String[] parts = rowKey.split("_");
String code = parts[0];
String date = parts[1];
// Extract year and month
String yearMonth = date.substring(0, 4) + "-" + date.substring(4, 6);
String year = Integer.toString(Integer.parseInt(yearMonth.substring(0, 4)));
if (!year.equals(Integer.toString(targetYear))) { // Check if it's not the target year
return; // Skip non-target year data
}
double max = Bytes.toDouble(value.getValue(cf, c1));
double min = Bytes.toDouble(value.getValue(cf, c2));
double avg = Bytes.toDouble(value.getValue(cf, c3));
WeatherWritable w = new WeatherWritable(code, yearMonth, max, min, avg, 0);
context.write(new Text(code + "_" + yearMonth), w);
}
}
private static class Step6Reducer extends TableReducer<Text, WeatherWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<WeatherWritable> values, Reducer<Text, WeatherWritable,
NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
WeatherWritable w = new WeatherWritable();
int count = 0;
// Calculate the number of days in the month
String month = null;
int totalDays = getDaysInMonth(month);
for (WeatherWritable v : values) {
if (StringUtils.isBlank(w.getCode())) {
w.setCode(v.getCode());
w.setDate(v.getDate());
}
w.setMaxTemperature(w.getMaxTemperature() + v.getMaxTemperature());
w.setMinTemperature(w.getMinTemperature() + v.getMinTemperature());
w.setAvgTemperature(w.getAvgTemperature() + v.getAvgTemperature());
count++;
}
if (count > 0) {
w.setMaxTemperature(w.getMaxTemperature() / count);
w.setMinTemperature(w.getMinTemperature() / count);
w.setAvgTemperature(w.getAvgTemperature() / count);
}
// Update temperature values
w.setMaxTemperature(w.getMaxTemperature() * totalDays);
w.setMinTemperature(w.getMinTemperature() * totalDays);
w.setAvgTemperature(w.getAvgTemperature() * totalDays);
byte[] rk = Bytes.toBytes(w.getCode() + "_" + w.getDate());
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
byte[] v1 = Bytes.toBytes(w.getMaxTemperature());
byte[] v2 = Bytes.toBytes(w.getMinTemperature());
byte[] v3 = Bytes.toBytes(w.getAvgTemperature());
Put put = new Put(rk);
put.addColumn(cf, c1, v1);
put.addColumn(cf, c2, v2);
put.addColumn(cf, c3, v3);
context.write(NullWritable.get(), put);
}
}
public static int getDaysInMonth(String month) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM");
LocalDate date = LocalDate.parse(month, formatter);
return date.lengthOfMonth(); // 返回指定月的天数
}
public static void run(String sourceTableName, String targetTableName, int targetYear, String... columnFamilies) {
try {
// Create configuration object
Configuration conf = HbsaseUtils.getConf();
// Delete table (if exists) and create new table
HbsaseUtils.deleteTable(targetTableName);
HbsaseUtils.createTable(targetTableName, columnFamilies);
conf.set(TableOutputFormat.OUTPUT_TABLE, targetTableName);
// Define source table
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
Scan scan = new Scan();
scan.addColumn(cf, c1);
scan.addColumn(cf, c2);
scan.addColumn(cf, c3);
// Create job
Job job = Job.getInstance(conf, "Weather Group By Month");
Step6Mapper mapper = new Step6Mapper(targetYear);
TableMapReduceUtil.initTableMapperJob(sourceTableName, scan, mapper.getClass(), Text.class, WeatherWritable.class, job);
TableMapReduceUtil.initTableReducerJob(targetTableName, Step6Reducer.class, job);
boolean success = job.waitForCompletion(true);
if (success) {
System.out.println("Monthly weather summary completed~~·");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.neuedu.myhbase.mybase;
import org.jline.utils.Display;
public class Manger {
public static void main(String[] args) {
String input = "hdfs://master:9000/brazil_weather/*.csv";
String tableName = "weather";
String[] colmnFamilies = {"info"};
//数据清洗
//Step1.run(input, tableName,"info");
//查询指定日期的天气
//Step2.run("83377","31/12/2019");
// //统计汇总每年的天气
// String targetTable = "weatherofyears";
// Step3.run(tableName, targetTable,colmnFamilies);
//预测指定日期
// String date = "14/08";
// String predictTable = "predict";
//Step4.run(date,tableName,predictTable,colmnFamilies);
//预测未来7天的天气
//Step5.run(tableName,predictTable,colmnFamilies);
//显示所有统计汇总每年的天气
// Dispiay dispiay = new Dispiay();
// for (WeatherWritable w : dispiay.getWeatherOfYears()){
// System.out.println(w);
// }
// 显示指定年份的每月天气情况
// String year = "0";
// String targetTable = "weatherofmonths";
// Step6.run(tableName, targetTable,colmnFamilies);
// Dispiay dispiay = new Dispiay();
// for (WeatherWritable w : dispiay.getWeatherOfMonths(year)) {
// System.out.println(w);
// }
//通过phoenix读取数据
//建立表映射
// PhoenixTableMapping.predictMapping();
// //读取数据
// PhoenixQuery query = new PhoenixQuery();
// for (WeatherWritable w : query.selectAllOfPredict()){
// System.out.println("记录:"+w);
// }
int targetYear = 2011;
String targetTableName = "weatherofmonths";
Step6.run(tableName,targetTableName,targetYear,colmnFamilies);
PhoenixTableMapping.weatherofmonthsMapping();
PhoenixQuery query = new PhoenixQuery();
for (WeatherWritable w : query.selectAllOfMonths(targetYear)) {
System.out.println("记录:"+w);
}
}
// 指定年月天气情况
}
public List<WeatherWritable> selectAllOfMonths(int year) {
List<WeatherWritable> weathers = new ArrayList<>();
try {
this.conn = this.getConnection();
// 构造 SQL 查询语句
String sql = "SELECT \"ROW\", \"max\", \"min\", \"avg\" FROM \"weatherofmonths\" WHERE \"ROW\" LIKE ?";
this.stmt = this.conn.prepareStatement(sql);
// 设置 SQL 查询语句的参数
this.stmt.setString(1, "83377_" + year + "-%");
this.rs = this.stmt.executeQuery();
while (this.rs.next()) {
WeatherWritable w = new WeatherWritable();
String rowKey = this.rs.getString(1);
String[] parts = rowKey.split("_");
w.setCode(parts[0]); // 例如 "83377"
w.setDate(parts[1]); // 例如 "2023-01-01"
w.setMaxTemperature(this.rs.getDouble(2));
w.setMinTemperature(this.rs.getDouble(3));
w.setAvgTemperature(this.rs.getDouble(4));
// 如果需要,可以在这里添加获取 rainfall 的逻辑
// w.setRainfall(this.rs.getDouble(5));
weathers.add(w);
}
} catch (Exception e) {
e.printStackTrace();
}
return weathers;
}
无法打印出查询指定年份的每个月的天气信息(最高温度、最低温度、平均温度)哪部分代码出错