TUBER727 2023-06-22 15:03 采纳率: 0%
浏览 42
已结题

我部署flink1.14.4版本,为啥 执行flinksql,读kafka数据会自动停止啊

我部署flink1.14.4版本,为啥 执行flinksql,读kafka数据会自动停止啊,一般情况不都是一直在运行吗

create table kafka1(
  id int,
  data STRING
) with (
  'connector' = 'kafka'
  ,'topic' = 'ylqtest002'
  ,'properties.zookeeper.connect' = '10.0.0.xx:2181'
  ,'properties.bootstrap.servers' = '10.0.0.xxx:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg1'
  ,'scan.startup.mode'='latest-offset'
);
select * from kafka1;

执行

img

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-06-22 16:26
    关注
    • 关于该问题,我找了一篇非常好的博客,你可以看看是否有帮助,链接:Flink 消费kafka 序列化
    • 除此之外, 这篇博客: Flink1.13中基于flinksql实时数仓简易demo中的 4.3 模拟kafka数据 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
    • 1)子类数据构建:

      [bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic son
      SLF4J: Class path contains multiple SLF4J bindings.
      SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
      SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
      22/05/11 23:20:40 INFO producer.ProducerConfig: ProducerConfig values: 
              acks = 1
              batch.size = 16384
              bootstrap.servers = [dn3:9092, dn4:9092, dn5:9092]
              buffer.memory = 33554432
              client.id = console-producer
              compression.type = none
              connections.max.idle.ms = 540000
              enable.idempotence = false
              interceptor.classes = null
              key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
              linger.ms = 1000
              max.block.ms = 60000
              max.in.flight.requests.per.connection = 5
              max.request.size = 1048576
              metadata.max.age.ms = 300000
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
              receive.buffer.bytes = 32768
              reconnect.backoff.max.ms = 1000
              reconnect.backoff.ms = 50
              request.timeout.ms = 1500
              retries = 3
              retry.backoff.ms = 100
              sasl.jaas.config = null
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.mechanism = GSSAPI
              security.protocol = PLAINTEXT
              send.buffer.bytes = 102400
              ssl.cipher.suites = null
              ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
              ssl.endpoint.identification.algorithm = null
              ssl.key.password = null
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.location = null
              ssl.keystore.password = null
              ssl.keystore.type = JKS
              ssl.protocol = TLS
              ssl.provider = null
              ssl.secure.random.implementation = null
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.location = null
              ssl.truststore.password = null
              ssl.truststore.type = JKS
              transaction.timeout.ms = 60000
              transactional.id = null
              value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
      
      22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
      22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka commitId : unknown
      
      >1,张三,21,shanghai,1001
      >2,李四,22,beijing,1002
      >3,王五,23,guangzhou,1003
      >4,赵六,24,shenzhen,1004
      

      2)父类数据构建

      [bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic father
      
      >1001,张杰
      >1003,王杰
      >1005,钱杰
      >1007,李杰
      
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 6月22日
  • 赞助了问题酬金15元 6月22日
  • 修改了问题 6月22日
  • 创建了问题 6月22日

悬赏问题

  • ¥15 Python输入字符串转化为列表排序具体见图,严格按照输入
  • ¥20 XP系统在重新启动后进不去桌面,一直黑屏。
  • ¥15 opencv图像处理,需要四个处理结果图
  • ¥15 无线移动边缘计算系统中的系统模型
  • ¥15 深度学习中的画图问题
  • ¥15 java报错:使用mybatis plus查询一个只返回一条数据的sql,却报错返回了1000多条
  • ¥15 Python报错怎么解决
  • ¥15 simulink如何调用DLL文件
  • ¥15 关于用pyqt6的项目开发该怎么把前段后端和业务层分离
  • ¥30 线性代数的问题,我真的忘了线代的知识了