u014542762 2025-05-13 08:52 采纳率: 0%
浏览 3

pyspark连接kafka无解(相关搜索:服务器)

本人尝试用pyspark 连接服务器kafka,尝尽了各种办法,看了所有的aI方法,都是调试不成功,希望会的朋友看看以下代码与报错信息,给予指导,有偿!

``from pyflink.table import TableEnvironment, EnvironmentSettings

# 1. 创建Table环境
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

# 2. 设置JAR路径(根据实际路径修改)
jar_paths = [
    "file:///flink-connector-base-1.17.2.jar",
    "file:///kafka-clients-2.2.1.jar",
    "file:///flink-json-1.17.2.jar"
]
t_env.get_config().set("pipeline.jars", ";".join(jar_paths))


# 4. 定义Kafka源表(修正表名一致性)
source_ddl = """
CREATE TABLE sourceKafka(
    user_id VARCHAR,
    item_id VARCHAR,
    `timestamp` BIGINT 
) WITH (
    'connector' = 'kafka',
    'topic' = 'pyflink_test',
    'properties.bootstrap.servers' = '---------xxx----------------',
    'properties.group.id' = 'test_3',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
)
"""

t_env.execute_sql(source_ddl)

# 5. 执行查询(表名与CREATE语句一致)
result = t_env.execute_sql("SELECT * FROM sourceKafka LIMIT 10")
result.print()

报错信息如下:

``Traceback (most recent call last):
  File "xxx flink_test\sql方式.py", line 42, in <module>
    result.print()
  File "xxx pyflink\table\table_result.py", line 219, in print
    self._j_table_result.print()
  File "xxx lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "xxx .venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "xxx \flink_test\.venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o93.print.
: java.lang.RuntimeException: Failed to fetch next result
    
Caused by: java.io.IOException: Failed to fetch job execution result

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    
Caused by: java.lang.NoSuchMethodError: org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.<init>(Lorg/apache/flink/connector/base/source/reader/s


  • 写回答

3条回答 默认 最新

  • 檀越@新空间 2025-05-13 08:53
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    从你的描述和报错信息来看,问题可能出在以下几个方面:

    1. 依赖版本不匹配:Flink 和 Kafka 的版本需要兼容。你使用的 Flink 版本是 1.17.2,而 Kafka 客户端版本是 2.2.1。这两个版本之间的兼容性可能存在问题。

    2. 配置错误:Kafka 源表的配置可能存在错误,比如 properties.bootstrap.servers 的值不正确,或者 Kafka 集群没有正确启动。

    3. JAR 文件路径问题:确保 JAR 文件路径正确,并且文件存在。

    4. 权限问题:如果运行环境是服务器,可能需要检查是否有足够的权限访问 Kafka 集群。

    以下是解决方案的详细步骤:

    1. 确认依赖版本兼容性

    确保 Flink 和 Kafka 的版本兼容。你可以参考 Flink 官方文档中的兼容性矩阵。例如,Flink 1.17.2 可以使用 Kafka 2.8.x 或更高版本。

    解决方案

    • 下载兼容的 Kafka 客户端 JAR 文件。例如,使用 Kafka 2.8.0 版本。
    • 修改 kafka-clients-2.2.1.jarkafka-clients-2.8.0.jar

    2. 检查 Kafka 配置

    确保 Kafka 集群正常运行,并且 properties.bootstrap.servers 的值正确。

    解决方案

    • 确保 Kafka 集群正在运行。
    • 检查 properties.bootstrap.servers 是否指向正确的 Kafka 服务器地址。

    3. 确保 JAR 文件路径正确

    确保 JAR 文件路径正确,并且文件存在。

    解决方案

    • 检查 jar_paths 列表中的路径是否正确。
    • 确保 JAR 文件存在于指定的路径中。

    4. 检查权限

    确保运行环境有足够的权限访问 Kafka 集群。

    解决方案

    • 检查服务器上的网络配置,确保可以访问 Kafka 集群。
    • 如果需要,联系系统管理员获取必要的权限。

    修改后的代码

    以下是修改后的代码示例:

    from pyflink.table import TableEnvironment, EnvironmentSettings
    
    # 1. 创建Table环境
    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    
    # 2. 设置JAR路径(根据实际路径修改)
    jar_paths = [
        "file:///path/to/flink-connector-base-1.17.2.jar",
        "file:///path/to/kafka-clients-2.8.0.jar",  # 修改为兼容的 Kafka 客户端版本
        "file:///path/to/flink-json-1.17.2.jar"
    ]
    t_env.get_config().set("pipeline.jars", ";".join(jar_paths))
    
    # 4. 定义Kafka源表(修正表名一致性)
    source_ddl = """
    CREATE TABLE sourceKafka(
        user_id STRING,
        item_id STRING,
        `timestamp` BIGINT 
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'pyflink_test',
        'properties.bootstrap.servers' = 'your-kafka-server:9092',  # 替换为实际的 Kafka 服务器地址
        'properties.group.id' = 'test_3',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    )
    """
    
    t_env.execute_sql(source_ddl)
    
    # 5. 执行查询(表名与CREATE语句一致)
    result = t_env.execute_sql("SELECT * FROM sourceKafka LIMIT 10")
    result.print()
    

    5. 调试和日志

    如果问题仍然存在,可以启用调试日志来进一步诊断问题。

    解决方案

    • t_env.get_config() 中添加日志级别设置:
      t_env.get_config().set('execution.checkpointing.interval', '5s')
      t_env.get_config().set('logging.level.root', 'DEBUG')
      

    通过以上步骤,你应该能够解决连接 Kafka 的问题。如果仍有问题,请提供更多的日志信息以便进一步分析。

    评论

报告相同问题?

问题事件

  • 修改了问题 5月13日
  • 创建了问题 5月13日