使用pycharm用pyspark怎么连接kafka
文心一言生成的代码
from pyspark.sql import SparkSession # 导入 SparkSession 模块
from pyspark.sql.functions import explode, split # 导入 explode 和 split 函数
if name == "main": # 程序入口点
spark = SparkSession \ # 创建 SparkSession 对象
.builder \ # 使用 builder 模式
.appName("StructuredKafkaWordCount") \ # 设置应用名
.getOrCreate() # 获取现有 SparkSession 或创建新的
# 从 Kafka 读取数据创建 DataFrame
lines = spark \ # 创建一个 DataFrame 对象
.readStream \ # 指定为流式读取
.format("kafka") \ # 指定数据源格式为 Kafka
.option("kafka.bootstrap.servers", "node:9092") \ # 设置 Kafka 服务器地址
.option("subscribe", "test01") \ # 订阅的主题
.load() # 从 Kafka 中加载数据
# 将 value 字段转换为字符串类型,并根据空格拆分成单词
words = lines.select( # 选择 DataFrame 中的列
explode(split(lines.value, " ")).alias("word") # 使用 explode 和 split 函数处理 value 列,将其拆分为单词,并起别名为 word
)
# 计算单词频率
wordCounts = words.groupBy("word").count() # 按 word 列进行分组,并计算每组的数量
# 打印结果到控制台
query = wordCounts \ # 将结果 DataFrame 赋值给变量 query
.writeStream \ # 指定为流式写入
.outputMode("complete") \ # 输出模式为完整模式
.format("console") \ # 输出格式为控制台
.start() # 启动流式查询
query.awaitTermination() # 等待流式查询终止
报错信息
