package com.neuedu.myhbase.mybase;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Step4 {
private static class Step4Mapper extends TableMapper<Text,WeatherWritable>{
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable,Result,
Text,WeatherWritable>.Context context)throws IOException, InterruptedException{
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
byte[] c4 = Bytes.toBytes("rain");
if (value.isEmpty()){
return;
}
String rowKey = Bytes.toString(key.get());
String code = rowKey.split("_")[0];
String date = rowKey.split("_")[1].substring(0,5);
String predictDate = context.getConfiguration().get("date");
if (!date.equals(predictDate)){
return;
}
double max = Bytes.toDouble(value.getValue(cf,c1));
double min = Bytes.toDouble(value.getValue(cf,c2));
double avg = Bytes.toDouble(value.getValue(cf,c3));
double rain = Bytes.toDouble(value.getValue(cf,c4));
WeatherWritable w = new WeatherWritable(code,date,max,min,avg,rain);
context.write(new Text(code+"_"+date),w);
}
}
private static class Step4Reducer extends TableReducer<Text,WeatherWritable, NullWritable>{
protected void reduce(Text key, Iterable<WeatherWritable> values, Reducer<Text,WeatherWritable,
NullWritable, Mutation>.Context context) throws IOException, InterruptedException{
WeatherWritable w = new WeatherWritable();
int count = 0;
for (WeatherWritable v : values) {
if (StringUtils.isBlank(w.getCode())) {
w.setCode(v.getCode());
w.setDate(v.getDate());
}
w.setMaxTemperature(w.getMaxTemperature() + v.getMaxTemperature());
w.setMinTemperature(w.getMinTemperature() + v.getMinTemperature());
w.setAvgTemperature(w.getAvgTemperature() + v.getAvgTemperature());
w.setRainfall(w.getRainfall() + v.getRainfall());
count++;
}
w.setMaxTemperature(w.getMaxTemperature() / count);
w.setMinTemperature(w.getMinTemperature() / count);
w.setAvgTemperature(w.getAvgTemperature() / count);
w.setRainfall(w.getRainfall() / count);
byte[] rk = Bytes.toBytes(w.getCode() + "_" +w.getDate());
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
byte[] c4 = Bytes.toBytes("rain");
byte[] v1 = Bytes.toBytes(w.getMaxTemperature());
byte[] v2 = Bytes.toBytes(w.getMinTemperature());
byte[] v3 = Bytes.toBytes(w.getAvgTemperature());
byte[] v4 = Bytes.toBytes(w.getRainfall());
Put put = new Put(rk);
put.addColumn(cf, c1, v1);
put.addColumn(cf,c2,v2);
put.addColumn(cf,c3,v3);
put.addColumn(cf,c4,v4);
context.write(NullWritable.get(),put);
}
}
public static WeatherWritable run(String date,String sourceTableName,String tableName,String...columnFamilies){
WeatherWritable w = new WeatherWritable();
try {
Configuration conf = HbsaseUtils.getConf();
conf.set("date", date);
// HbsaseUtils.deleteTable(tableName);
//HbsaseUtils.createTable(tableName,"info");
HbsaseUtils.createTable(tableName,columnFamilies);
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
byte[] cf = Bytes.toBytes("info");
byte[] c1 = Bytes.toBytes("max");
byte[] c2 = Bytes.toBytes("min");
byte[] c3 = Bytes.toBytes("avg");
byte[] c4 = Bytes.toBytes("rain");
Scan scan = new Scan();
scan.addColumn(cf,c1);
scan.addColumn(cf,c2);
scan.addColumn(cf,c3);
scan.addColumn(cf,c4);
Job job = Job.getInstance(conf,"predict weather");
TableMapReduceUtil.initTableMapperJob(sourceTableName,scan, Step4Mapper.class, Text.class, WeatherWritable.class,job);
TableMapReduceUtil.initTableReducerJob(tableName, Step4Reducer.class,job);
boolean success = job.waitForCompletion(true);
if (success){
System.out.println("预测指定日期的天气完成---");
}
} catch (Exception e){
e.printStackTrace();
}
return w;
}
}
package com.neuedu.myhbase.controller;
import com.neuedu.myhbase.mybase.*;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import com.neuedu.myhbase.mybase.Step4;
import java.util.List;
/**
* @author tzongt
* 2024/8/8
*/
@Controller
@RequestMapping(value = "/myhbase")
public class MyBasseController {
@GetMapping(value ="/import")
@ResponseBody
public boolean dataImport(){
//注解@ResponseBody,用户请求,不在返回页面地址,返回数据
try {
String input = "hdfs://master:9000/brazil_weather/*.csv";
String tableName = "weather";
String[] columnFamilies = {"info"};
//数据清洗
Step1.run(input,tableName,columnFamilies);
} catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
@GetMapping(value = "/query")
public String queryWeather(String date, Model model){
WeatherWritable w = Step2.run("83377",date);
model.addAttribute("weather",w);
return "detail";
}
@GetMapping(value = "/groupofyears")
public String groupOfYears(Model model){
//先汇总
String tableName = "weather";
String[] columnFamilies = {"info"};
String targetTable = "weatherofyears";
Step3.run(tableName,targetTable,columnFamilies);
//读取
Dispiay dispiay = new Dispiay();
List<WeatherWritable> weathers = dispiay.getWeatherOfYears();
//返回前端
model.addAttribute("weathers",weathers);
return "weatherofyears";
}
@GetMapping(value = "/predictofday") // 添加这个方法
public String getPredictOfDay(@RequestParam("date") String date, Model model) {
// 获取必要的参数
String sourceTableName = "weather";
String tableName = "predict";
String[] columnFamilies = {"info"};
// 调用 Step4.run 方法并传入参数
WeatherWritable w = Step4.run(date, sourceTableName, tableName, columnFamilies);
// 将预测结果添加到模型对象
model.addAttribute("weather", w);
return "predicted_detail"; // 返回一个显示预测结果的视图
}
@GetMapping(value = "/predictofsevenday")
public String getPredictofsevenday(Model model) {
String tableName = "weather";
String[] colmnFamilies = {"info"};
String predictTable = "predict";
Step5.run(tableName,predictTable,colmnFamilies);
model.addAttribute("weather",w);
return "predicted_detail";
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>天气数据分析与预测系统</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
font-family: "Microsoft YaHei","微软雅黑","黑体","宋体","sans-serif";
}
ul, ol {
list-style: none;
}
a:link,a:active{
color: black;
text-decoration: none;
}
a:hover,a:visited{
color: black;
text-decoration: underline;
}
.main{
width: 1096px;
margin: 0 auto;
}
ul,li{
border: 1px solid black;
}
</style>
<script type="text/javascript" src="/webjars/jquery/3.6.3/dist/jquery.js"></script>
<script>
function dataImport(){
$.ajax({
url: "/myhbase/import",
type: "get",
dataType: "json",
success: function(data){
console.log("数据清洗与导入成功!");
alert("数据清洗与导入成功!")
}
})
}
</script>
</head>
<body>
<div class="main">
<h1>天气数据分析与预测系统</h1>
<ul>
<li><a href="javascript:void(0)" onclick="dataImport()">数据清洗与导入</a></li>
<li>查询指定时间段天气数据
<form action="/myhbase/query" method="get">
<table>
<tr>
<td>查询时间:</td>
<td><input type="text" name="date" placeholder="dd/MM/yyyy"></td>
</tr>
<tr>
<td><input type="submit" value="查询"></td>
<td><input type="reset" value="重置"></td>
</tr>
</table>
</form>
</li>
<li><a href="/myhbase/groupofyears">统计汇总每年的天气数据</a></li>
<li>预测指定日期的天气数据
<form action="/myhbase/predictofday" method="get">
<label for="query_date">查询时间:</label>
<input type="text" id="query_date" name="date" placeholder="dd/MM/yyyy" required>
<input type="submit" value="查询">
</form>
</li>
<li><a href="/myhbase/predictofsevenday">预测未来7天的天气数据</a></li>
<li><a href="/myecharts/weatherofyears">数据可视化(统计汇总每年的天气数据)</a></li>
<li><a href="/myecharts/predict">数据可视化(预测天气)</a></li>
</ul>
</div>
</body>
</html>