在本机正常运行, 服务器上运行报错,spark版本2.3.1 hive版本1.1.0
错误信息:
org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:156)
at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:143)
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:574)
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:194)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at com.sz.mysql.SyncMysql.getHiveConnection(SyncMysql.java:128)
at com.sz.mysql.SyncMysql.main(SyncMysql.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.sql.SQLException: Could not establish connection to jdbc:hive2://192.168.67.145:10000/zlgx_ods: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:586)
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:194)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at com.sz.mysql.SyncMysql.getHiveConnection(SyncMysql.java:128)
at com.sz.mysql.SyncMysql.main(SyncMysql.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:156)
at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:143)
at org.apache
代码:
package com.sz.mysql;
import org.apache.spark.sql.*;
import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author nanlei
* @date 2021年05月10日
*/
public class SyncMysql {
public static void main(String[] args) throws ParseException {
try {
//日期 20210101
// String date = args[0];
String date = "20210728";
System.setProperty("HADOOP_USER_NAME", "hdfs");
SparkSession spark = SparkSession
.builder()
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
.config("parquet.enable.summary-metadata", false)
.appName("SyncMysql")
.master("local[*]")
.getOrCreate();
Connection conn = getMysqlConnection();
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * from sync_mysql_to_hive");
while (rs.next()) {
String src_path = rs.getString("src_path");
String target_parh = rs.getString("target_parh");
String table_name = rs.getString("table_name");
String date_field = rs.getString("date_field");
String date_format = rs.getString("date_format");
String username = rs.getString("username");
String password = rs.getString("password");
System.out.println("参数列表:");
System.out.println("src_path: " + src_path);
System.out.println("target_parh: " + target_parh);
System.out.println("table_name: " + table_name);
System.out.println("date_field: " + date_field);
System.out.println("date_format: " + date_format);
System.out.println("username: " + username);
System.out.println("password: " + password);
String hdfsPath = (target_parh + "/" + date.substring(0, 4) + "/" + date.substring(4, 6) + "/" + date.substring(6, 8)) + "/";
SimpleDateFormat srcFormat = new SimpleDateFormat("yyyyMMdd");
Date d = srcFormat.parse(date);
SimpleDateFormat targetFormat = new SimpleDateFormat(date_format);
String targetDate = targetFormat.format(d);
Dataset<Row> row = getMetaDataset(spark, src_path, table_name, username, password);
Dataset<Row> queryRow = row.where(date_field + " like '%" + targetDate + "%'");
String[] columnNames = queryRow.columns();
Column[] columns = new Column[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
columns[i] = queryRow.col(columnNames[i]).cast("string");
}
Dataset<Row> newRow = queryRow.select(columns);
newRow.write().mode(SaveMode.Overwrite).parquet(hdfsPath);
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection hiveCon = DriverManager.getConnection("jdbc:hive2://192.168.67.145:10000/zlgx_ods", "hive", "hive");
Statement hiveStat = hiveCon.createStatement();
String template = "alter table zlgx_ods.%s add if not exists partition (year = '%s' ,month = '%s' , day ='%s' ) location '%s'";
String sql = String.format(template, table_name, date.substring(0, 4), date.substring(4, 6), date.substring(6, 8), hdfsPath);
System.out.println("执行sql--->" + sql);
hiveStat.execute(sql);
System.out.println("关联hive完成");
hiveStat.close();
hiveCon.close();
}
rs.close();
conn.close();
spark.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static Dataset<Row> getMetaDataset(SparkSession spark, String jdbcUrl, String tableName, String username, String password) {
Dataset<Row> conosequence = null;
try {
conosequence = spark.read().format("jdbc").option("delimiter", ",")
.option("header", "true")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("user", username)
.option("password", password)
.load();
} catch (Exception e) {
e.printStackTrace();
}
return conosequence;
}
public static Connection getMysqlConnection() {
Connection con = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver"); //注册数据库驱动
String url = "jdbc:mysql://192.168.67.189:3306/etl_2020?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC"; //定义连接数据库的url
con = DriverManager.getConnection(url, "wangchen", "wc-jrgx"); //获取数据库连接
System.out.println("数据库连接成功!");
} catch (Exception e) {
e.printStackTrace();
}
return con; //返回一个连接
}
}
maven配置:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<!-- hive依赖 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 中文分词器 -->
<dependency>
<groupId>cn.bestwu</groupId>
<artifactId>ik-analyzers</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>