z183052114 2021-07-30 17:14 采纳率: 100%
浏览 20
已结题

spark与hive版本问题? 本机正常运行,服务器上报错.

在本机正常运行, 服务器上运行报错,spark版本2.3.1 hive版本1.1.0

错误信息:

org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
    at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:156)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:143)
    at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:574)
    at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:194)
    at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:247)
    at com.sz.mysql.SyncMysql.getHiveConnection(SyncMysql.java:128)
    at com.sz.mysql.SyncMysql.main(SyncMysql.java:72)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.sql.SQLException: Could not establish connection to jdbc:hive2://192.168.67.145:10000/zlgx_ods: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
    at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:586)
    at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:194)
    at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:247)
    at com.sz.mysql.SyncMysql.getHiveConnection(SyncMysql.java:128)
    at com.sz.mysql.SyncMysql.main(SyncMysql.java:72)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null, configuration:{use:database=zlgx_ods})
    at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:156)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:143)
    at org.apache

代码:


package com.sz.mysql;

import org.apache.spark.sql.*;

import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author nanlei
 * @date 2021年05月10日
 */
public class SyncMysql {


    public static void main(String[] args) throws ParseException {


        try {
            //日期 20210101
//            String date = args[0];
            String date = "20210728";

            System.setProperty("HADOOP_USER_NAME", "hdfs");
            SparkSession spark = SparkSession
                    .builder()
                    .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
                    .config("parquet.enable.summary-metadata", false)
                    .appName("SyncMysql")
                    .master("local[*]")
                    .getOrCreate();

            Connection conn = getMysqlConnection();
            Statement stat = conn.createStatement();
            ResultSet rs = stat.executeQuery("select * from sync_mysql_to_hive");
            while (rs.next()) {
                String src_path = rs.getString("src_path");
                String target_parh = rs.getString("target_parh");
                String table_name = rs.getString("table_name");
                String date_field = rs.getString("date_field");
                String date_format = rs.getString("date_format");
                String username = rs.getString("username");
                String password = rs.getString("password");
                System.out.println("参数列表:");
                System.out.println("src_path: " + src_path);
                System.out.println("target_parh: " + target_parh);
                System.out.println("table_name: " + table_name);
                System.out.println("date_field: " + date_field);
                System.out.println("date_format: " + date_format);
                System.out.println("username: " + username);
                System.out.println("password: " + password);

                String hdfsPath = (target_parh + "/" + date.substring(0, 4) + "/" + date.substring(4, 6) + "/" + date.substring(6, 8)) + "/";

                SimpleDateFormat srcFormat = new SimpleDateFormat("yyyyMMdd");
                Date d = srcFormat.parse(date);
                SimpleDateFormat targetFormat = new SimpleDateFormat(date_format);
                String targetDate = targetFormat.format(d);

                Dataset<Row> row = getMetaDataset(spark, src_path, table_name, username, password);
                Dataset<Row> queryRow = row.where(date_field + " like '%" + targetDate + "%'");

                String[] columnNames = queryRow.columns();
                Column[] columns = new Column[columnNames.length];
                for (int i = 0; i < columnNames.length; i++) {
                    columns[i] = queryRow.col(columnNames[i]).cast("string");
                }
                Dataset<Row> newRow = queryRow.select(columns);

                newRow.write().mode(SaveMode.Overwrite).parquet(hdfsPath);
                Class.forName("org.apache.hive.jdbc.HiveDriver");
                Connection hiveCon = DriverManager.getConnection("jdbc:hive2://192.168.67.145:10000/zlgx_ods", "hive", "hive");
                Statement hiveStat = hiveCon.createStatement();
                String template = "alter table zlgx_ods.%s add if not exists partition (year = '%s' ,month = '%s' , day ='%s' ) location  '%s'";
                String sql = String.format(template, table_name, date.substring(0, 4), date.substring(4, 6), date.substring(6, 8), hdfsPath);
                System.out.println("执行sql--->" + sql);
                hiveStat.execute(sql);
                System.out.println("关联hive完成");
                hiveStat.close();
                hiveCon.close();
            }
            rs.close();
            conn.close();
            spark.close();


        } catch (Exception e) {
            e.printStackTrace();
        }

    }


    public static Dataset<Row> getMetaDataset(SparkSession spark, String jdbcUrl, String tableName, String username, String password) {
        Dataset<Row> conosequence = null;
        try {
            conosequence = spark.read().format("jdbc").option("delimiter", ",")
                    .option("header", "true")
                    .option("url", jdbcUrl)
                    .option("dbtable", tableName)
                    .option("user", username)
                    .option("password", password)
                    .load();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conosequence;
    }

    public static Connection getMysqlConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");  //注册数据库驱动
            String url = "jdbc:mysql://192.168.67.189:3306/etl_2020?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC";  //定义连接数据库的url
            con = DriverManager.getConnection(url, "wangchen", "wc-jrgx");  //获取数据库连接
            System.out.println("数据库连接成功!");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return con;  //返回一个连接
    }


}

maven配置:

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
        <dependency>
            <groupId>com.esotericsoftware</groupId>
            <artifactId>kryo</artifactId>
            <version>4.0.2</version>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
        <!-- hive依赖 -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!-- 中文分词器 -->
        <dependency>
            <groupId>cn.bestwu</groupId>
            <artifactId>ik-analyzers</artifactId>
            <version>5.1.0</version>
        </dependency>
    </dependencies>


  • 写回答

1条回答

      报告相同问题?

      相关推荐 更多相似问题

      问题事件

      • 已结题 3月30日
      • 已采纳回答 3月30日
      • 创建了问题 7月30日

      悬赏问题

      • ¥15 内存管理的一段代码不是很理解
      • ¥20 打开anaconda时卡在Loading applications无法进入界面
      • ¥15 网页超时时间设置失效
      • ¥15 有关绿色信贷毕业论文的问题
      • ¥30 关于#机器人#的问题,如何解决?
      • ¥15 求MATLAB函数ScalarLayerDisplay的代码
      • ¥15 安卓如何自动执行检测到的NFC标签,无需再点确认
      • ¥15 pyHM库mouse模块的ValueError错误
      • ¥15 python opencv 摄像头 传感器
      • ¥30 eMMC&Android&C&Linux