m0_75066946 2025-06-23 19:47 采纳率: 0%
浏览 7

spark实时处理并传输到mysql数据库时,显示流处理已启动,卡住很久不动然后报错是什么原因

img

img


spark实时处理并传输到mysql数据库时,显示流处理已启动,卡住很久不动然后报错是什么原因?
我们的流程是爬取股票数据,3s刷新,然后传输到kafka里面,接着spark实时处理并存储到mysql里面,mysql库和表我已经创建好了,最后可视化处理。spark实时处理和存储mysql出问题,报错为broker的问题

  • 写回答

5条回答 默认 最新

  • 阿里嘎多学长 2025-06-23 19:48
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    解答

    Spark实时处理并传输到 MySQL 数据库时,卡住很久不动然后报错可能是由于以下原因:

    1. 网络连接问题:Spark 和 MySQL 之间的网络连接可能出现问题,导致卡住不动。可以尝试检查网络连接是否正常,或者使用 spark.sql.shuffle.partitions 参数来调整分区数量,以减少网络传输的压力。
    2. 数据量太大:如果数据量太大,Spark 可能会卡住不动,无法继续处理。可以尝试使用 spark.sql.shuffle.partitions 参数来调整分区数量,以减少数据量。
    3. MySQL 数据库连接问题:MySQL 数据库连接可能出现问题,导致 Spark 无法继续处理。可以尝试检查 MySQL 数据库连接是否正常,或者使用 spark.jars 参数来指定 MySQL 驱动程序的路径。
    4. Spark 配置问题:Spark 配置可能出现问题,导致卡住不动。可以尝试检查 Spark 配置文件(spark-defaults.conf)中的参数是否正确。
    5. Kafka topic 问题:Kafka topic 可能出现问题,导致 Spark 无法继续处理。可以尝试检查 Kafka topic 是否存在,或者使用 kafka-console-consumer 命令来消费 topic。

    以下是一个简单的示例代码,用于 Spark 实时处理并传输到 MySQL 数据库:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    
    object SparkRealtimeProcessing {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder.appName("Spark Realtime Processing").getOrCreate()
    
        // 读取 Kafka topic
        val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "stock_data").load()
    
        // 处理数据
        val processedDF = kafkaDF.selectExpr("CAST(value AS STRING)").selectExpr("split(value, ',')[0] AS stock_code", "split(value, ',')[1] AS stock_price")
    
        // 传输到 MySQL 数据库
        processedDF.write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/stock_data").option("dbtable", "stock_data").option("user", "root").option("password", "password").save()
    
        spark.stop()
      }
    }
    

    请注意,这只是一个简单的示例代码,实际情况可能需要根据具体情况进行调整。

    评论

报告相同问题?

问题事件

  • 创建了问题 6月23日