本人尝试用pyspark 连接服务器kafka,尝尽了各种办法,看了所有的aI方法,都是调试不成功,希望会的朋友看看以下代码与报错信息,给予指导,有偿!
``from pyflink.table import TableEnvironment, EnvironmentSettings
# 1. 创建Table环境
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# 2. 设置JAR路径(根据实际路径修改)
jar_paths = [
"file:///flink-connector-base-1.17.2.jar",
"file:///kafka-clients-2.2.1.jar",
"file:///flink-json-1.17.2.jar"
]
t_env.get_config().set("pipeline.jars", ";".join(jar_paths))
# 4. 定义Kafka源表(修正表名一致性)
source_ddl = """
CREATE TABLE sourceKafka(
user_id VARCHAR,
item_id VARCHAR,
`timestamp` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'pyflink_test',
'properties.bootstrap.servers' = '---------xxx----------------',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
# 5. 执行查询(表名与CREATE语句一致)
result = t_env.execute_sql("SELECT * FROM sourceKafka LIMIT 10")
result.print()
报错信息如下:
``Traceback (most recent call last):
File "xxx flink_test\sql方式.py", line 42, in <module>
result.print()
File "xxx pyflink\table\table_result.py", line 219, in print
self._j_table_result.print()
File "xxx lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "xxx .venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
return f(*a, **kw)
File "xxx \flink_test\.venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o93.print.
: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.NoSuchMethodError: org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.<init>(Lorg/apache/flink/connector/base/source/reader/s