Flink如何将kafka里的消息写入到对应的topic

已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?

1个回答

DEMO

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("sensor", new SimpleStringSchema(), properties))
    // Transform操作
    val dataStream = sourceStream.map(data => data.toString)

    // sink
    dataStream.addSink( new FlinkKafkaProducer010[String]("mym-sink", new SimpleStringSchema(), properties))
    dataStream.print("send to kafka")

    env.execute("kafka data process")
  }
u012220365
首席IT民工 你这个代码是把sensor这个topic的消息写入mym-sink这个topic吧
5 个月之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
flink集群任务,FlinkKafkaConsumer010读取topic消息 ,将消息存储并上传至cos报错

用flink读取kafka集群的topic ,使用了一个java 的定时任务 Timer() 打包放到flink集群上  刚接触flink  实在不知道如何调 ![图片说明](https://img-ask.csdn.net/upload/201903/18/1552913971_808109.png)

flink集成kafka做数据分流报错org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

flink集成kafka做数据分流时出现了很诡异的异常,有谁遇到过吗? org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer 分流后会有2个sink写入两个topic,这个异常是偶发,但是每次发生就经常触发。。。 ``` org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360) ... 12 more ```

[flink]Gelly如何计算从KAFKA中读取的Datastream型数据?

小白求教;先有一需求将数据从KAFKA中取出,使用Flink kafka connector这里得到的是DataStream格式数据。想将这些数据使用Gelly进行图分析,但是Gelly是基于DataSet格式的。尝试过将DataStream转换为Table,再将Table转换为DataSet,但是报错如下。求问还有什么较好的解决办法么?![图片说明](https://img-ask.csdn.net/upload/201908/14/1565773740_150241.png)![图片说明](https://img-ask.csdn.net/upload/201908/14/1565773757_625389.png)

Flink任务在由不可预测异常导致任务挂掉,然后自动重新提交之后会重复消费kafka里的数据怎么解决?

如代码所示,同一个flink任务里有多个处理不同数据流的flatMap,从kafka不同的topic中取数据进行处理,其中stream1有不可预测的异常没有抓住,或者因为环境问题导致整个任务被cancel,然后自动重新启动,重启任务之后都会概率性出现有数条kafka里的数据被重复处理,有木有大神知道怎么解决啊? ``` StreamExecutionEnvitoment env = StreamExecutionEnvitoment.getExecutionEnvitoment(); env.enableCheckpointing(5000); FlinkKafkaConsumer011<String> myConsumer1 = new FlinkKafkaConsumer011<>(topic1, Charset.forName("ISO8859-1"), prop); myConsumer1.setStartFromLatest(); DataStream<String> stream1 = env.addSource(myConsumer).name(topic); stream1.flatMap(new Handle1()).setParallelism(1).setMaxParallelism(1).addSink(new Sink1()); FlinkKafkaConsumer011<String> myConsumer2 = new FlinkKafkaConsumer011<>(topic2, Charset.forName("ISO8859-1"), prop); myConsumer2.setStartFromLatest(); DataStream<String> stream2 = env.addSource(myConsumer).name(topic); stream2.flatMap(new Handle2()).setParallelism(1).setMaxParallelism(1).addSink(new Sink2()); ```

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

kafka集成flink报出如下错误如何解决

idea运行kafka集成flink的项目运行报错。 public class KafkaFlinkDemo1 { public static void main(String[] args) throws Exception { //获取执行环境 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //创建一个Table Environment StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv); sTableEnv.connect(new Kafka() .version("0.10") .topic("topic1") .startFromLatest() .property("group.id", "group1") .property("bootstrap.servers", "172.168.30.105:21005") ).withFormat( new Json().failOnMissingField(false).deriveSchema() ).withSchema( new Schema().field("userId", Types.LONG()) .field("day", Types.STRING()) .field("begintime", Types.LONG()) .field("endtime", Types.LONG()) .field("data", ObjectArrayTypeInfo.getInfoFor( Row[].class, Types.ROW(new String[]{"package", "activetime"}, new TypeInformation[]{Types.STRING(), Types.LONG()} ) )) ).inAppendMode().registerTableSource("userlog"); Table result = sTableEnv.sqlQuery("select userId from userlog"); DataStream<Row> rowDataStream = sTableEnv.toAppendStream(result, Row.class); rowDataStream.print(); sEnv.execute("KafkaFlinkDemo1"); } } 报错信息如下: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/ch/qos/logback/logback-classic/1.1.3/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] Exception in thread "main" java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map; at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58) at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107) at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95) at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68) at com.huawei.bigdata.KafkaFlinkDemo1.main(KafkaFlinkDemo1.java:41) Process finished with exit code 1

kafka消费数据老是丢失

WARN TaskSetManager: Lost task 9.0 in stage 26569.0 (TID 812602, 2, 2, 104-250-138-250.static.gorillaservers.com): k): k): ): ): kafka.common.NotLeaderForPForPForPartitionException 有两个groupID消费一个topic,出现上面的警告后,有一个groupID就消费不到数据了

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

如何确保我的消费者仅按顺序处理kafka主题中的消息?

<div class="post-text" itemprop="text"> <p>I've never used kafka before. I have two test Go programs accessing a local kafka instance: a reader and a writer. I'm trying to tweak my producer, consumer, and kafka server settings to get a particular behavior.</p> <p>My writer:</p> <pre><code>package main import ( "fmt" "math/rand" "strconv" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { rand.Seed(time.Now().UnixNano()) topics := []string{ "policymanager-100", "policymanager-200", "policymanager-300", } progress := make(map[string]int) for _, t := range topics { progress[t] = 0 } producer, err := kafka.NewProducer(&amp;kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "0", }) if err != nil { panic(err) } defer producer.Close() fmt.Println("producing messages...") for i := 0; i &lt; 30; i++ { index := rand.Intn(len(topics)) topic := topics[index] num := progress[topic] num++ fmt.Printf("%s =&gt; %d ", topic, num) msg := &amp;kafka.Message{ Value: []byte(strconv.Itoa(num)), TopicPartition: kafka.TopicPartition{ Topic: &amp;topic, }, } err = producer.Produce(msg, nil) if err != nil { panic(err) } progress[topic] = num time.Sleep(time.Millisecond * 100) } fmt.Println("DONE") } </code></pre> <p>There are three topics that exist on my local kafka: policymanager-100, policymanager-200, policymanager-300. They each only have 1 partition to ensure all messages are sorted by the time kafka receives them. My writer will randomly pick one of those topics and issue a message consisting of a number that increments solely for that topic. When it's done running, I expect the queues to look something like this (topic names shortened for legibility):</p> <pre><code>100: 1 2 3 4 5 6 7 8 9 10 11 200: 1 2 3 4 5 6 7 300: 1 2 3 4 5 6 7 8 9 10 11 12 </code></pre> <p>So far so good. I'm trying to configure things so that any number of consumers can be spun up and consume these messages in order. By "in-order" I mean that no consumer should get message 2 for topic 100 until message 1 is COMPLETED (not just started). If message 1 for topic 100 is being worked on, consumers are free to consume from other topics that currently don't have a message being processed. If a message of a topic has been sent to a consumer, that entire topic should become "locked" until either a timeout assumes that the consumer failed or the consumer commits the message, then the topic is "unlocked" to have it's next message made available to be consumed.</p> <p>My reader:</p> <pre><code>package main import ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { count := 2 for i := 0; i &lt; count; i++ { go consumer(i + 1) } fmt.Println("cosuming...") // hold this thread open indefinitely select {} } func consumer(id int) { c, err := kafka.NewConsumer(&amp;kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "0", // strconv.Itoa(id), "enable.auto.commit": "false", }) if err != nil { panic(err) } c.SubscribeTopics([]string{`^policymanager-.+$`}, nil) for { msg, err := c.ReadMessage(-1) if err != nil { panic(err) } fmt.Printf("%d) Message on %s: %s ", id, msg.TopicPartition, string(msg.Value)) time.Sleep(time.Second) _, err = c.CommitMessage(msg) if err != nil { fmt.Printf("ERROR commiting: %+v ", err) } } } </code></pre> <p>From my current understanding, the way I'm likely to achieve this is by setting up my consumer properly. I've tried many different variations of this program. I've tried having all my goroutines share the same consumer. I've tried using a different <code>group.id</code> for each goroutine. None of these was the right configuration to get the behavior I'm after.</p> <p>What the posted code does is empty out one topic at a time. Despite having multiple goroutines, the process will read all of 100 then move to 200 then 300 and only one goroutine will actually do all the reading. When I let each goroutine have a different <code>group.id</code> then messages get read by multiple goroutines which I would like to prevent.</p> <p>My example consumer is simply breaking things up with goroutines but when I begin working this project into my use case at work, I'll need this to work across multiple kubernetes instances that won't be talking to each other so using anything that interacts between goroutines won't work as soon as there are 2 instances on 2 kubes. That's why I'm hoping to make kafka do the gatekeeping I want.</p> </div>

springboot整合kafka超时的问题,错误提示如下所示

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 2019-10-10 00:06:20,395 ERROR [main] o.s.t.c.TestContextManager [TestContextManager.java:250] Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@78e117e3] to prepare test instance [com.nowcoder.community.KafkaTests@9b3be1c] java.lang.IllegalStateException: Failed to load ApplicationContext ## 测试代码如下: ``` import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka() { kafkaProducer.sendMessage("test", "你好"); kafkaProducer.sendMessage("test", "在吗"); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } } @Component class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); } } @Component class KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); } } ``` 测试方法时报错,Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 求大佬看看。

关于flink的JDBCAppendTableSink不能往MySQL写入数据的问题

1、使用继承RichSinkFunction类的方法,自定义写入MySQL,就可以完成数据写入MySQL的操作; 使用JDBCAppendTableSink.builder()这种方法的时候,没法写数据到MySQL。 2、 ``` JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1) .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/flink") .setUsername("root") .setPassword("123456") .setQuery(sql2) .setParameterTypes(types) .build(); ```

FLINK SQL 1.10 如何直接将bigint类型时间数据转换为Timestamp类型数据而不把BIGINT类型数据转换为字符串类型时间数据再处理。

我是Flink新手,正在学习flink sql相关开发,参考的阿里云开发文档进行学习 https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.368d1352JdVsRU 我现在有一些13位的bigint格式时间数据,想把他们转换成timestamp格式数据。开发手册上说可以直接用TO_TIMESTAMP()函数直接处理,但我使用时会报错说 ``` Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TO_TIMESTAMP' to arguments of type 'TO_TIMESTAMP(<BIGINT>)'. Supported form(s): 'TO_TIMESTAMP(<CHARACTER>)' 'TO_TIMESTAMP(<CHARACTER>, <CHARACTER>)' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ``` 我使用的是最简单的格式,其中CREATETIME为长度13位的BIGINT类型数据 ``` SELECT TO_TIMESTAMP(CREATETIME), SUBMITTER AS username FROM kafka_databus ```

canal集成kafka之后,每次mysql变更数据,会传递多条数据至kafka

canal集成kafka之后,每次mysql变更数据,会传递多条数据至kafka,但这几条数据除一些不重要的字段如起止时间之外都是一致的,我想知道为何会发送多条数据,如果可以的话,能不能只发一条

求大牛指点,kafka 怎么设置 batch.size 合适?

我这里有一批测试数据,数据过滤之前的大小为65.2M 左右, 数据有 359021 行, 把这些发送到kafka中,一开始 batch.size 的大小设置为16K,发现每次插入数据的行数都不一样,差距特别大,后来我试着改变这个值,发现一直在发生变化很不稳定,最后在 70K 的时候发现是最接近 359021 行的, 行数稳定 358751 行‬,但是距离总行还是有所差别,所以想请教一下是什么原因导致的,batch.size 怎么设置才好详细代码如下: ```java object Producer { def main(args: Array[String]): Unit = { //创建 sparkContext val conf: SparkConf = new SparkConf().setAppName("sparkStream").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //读取文件并过滤空行 val lines: RDD[String] = sc.textFile("D:\\dev\\大数据\\大数据资料\\spark\\rng_comment.txt").filter(_.trim != "") //指定连接的节点 val sink: KafkaProducerConf = new KafkaProducerConf("rng_comment", "hadoop01:9092,hadoop02:9092,hadoop03:9092") //发送数据到kafka sink.save(lines) //关闭连接 sink.kafkaProducer.close() sc.stop() } } ``` <br/> ``` class KafkaProducerConf(topic: String, severs: String) extends Serializable { def createKafkaConnection(): KafkaProducer[String, String] = { val props: Properties = new Properties() props.put("bootstrap.servers", severs) props.put("acks", "all");// acks=0 配置适用于实现非常高的吞吐量 , acks=all 这是最安全的模式 props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) props.put("retries", "0") //设置重试次数 props.put("batch.size", "66765") //设置批量发送的数据大小 props.put("buffer.memory", "33554432") //设置缓冲区大小 props.put("linger.ms", "1000") //最多延迟10000毫秒 props.put("partitioner.class", (new MyPartitioner).getClass) new KafkaProducer[String, String](props) } lazy val kafkaProducer: KafkaProducer[String, String] = createKafkaConnection() //把数据保存到kafka def save(vs: RDD[String]): Unit = { try { vs.foreach(x => { println(x) val record: ProducerRecord[String, String] = new ProducerRecord[String, String](topic, x.split("\t")(0), x.toString) kafkaProducer.send(record) }) } catch { case e: Exception => println("发送数据出错:" + e) } } } ``` <br> 部分数据如下: ``` 0 11381 2018/10/20 21:07 今天你还相信网恋吗:希望越大失望越大,技能省得莫名其妙,小规模团战打得又乱,最后一句,骄兵必败! 1 219517 3873092044 今天你还相信网恋吗 5 1540040820 1 683 2018/10/20 21:07 趣游电竞:这几把打的什么呀, 0 36213 3784336787 趣游电竞 2 1540040820 2 0 2018/10/20 21:08 阴暗感i:既然选择了RNG,那么我会一直支持RNG! 0 0 6019401575 阴暗感i 0 1540040880 3 0 2018/10/20 21:08 买桔子的先森:孙大勇和heart亲妈今晚必死,我不想看到他们 0 0 5440806717 买桔子的先森 0 1540040880 4 0 2018/10/20 21:08 还能改三次名别浪费了:菜逼 0 1 5288544674 还能改三次名别浪费了 5 1540040880 5 0 2018/10/20 21:08 弓长威0:退役 0 0 5567911915 弓长威0 0 1540040880 6 0 2018/10/20 21:08 春眠不觉晓丫:滚 0 0 3295234971 春眠不觉晓丫 0 1540040880 7 0 2018/10/20 21:08 面朝大海觉得好冷:滚出lpl 0 0 6023555412 面朝大海觉得好冷 0 1540040880 8 0 2018/10/20 21:08 Whiteiiiiii:你们今天这种态度真的不配赢 0 0 1846699503 Whiteiiiiii 0 1540040880 9 0 2018/10/20 21:08 万年熊队长:天天明年吗,有没有一点新意,我真的没办法再等了。毕业了 0 0 2670160850 万年熊队长 0 1540040880 10 0 2018/10/20 21:08 Airw10:不用明年了,解散了吧 0 0 3169314130 Airw10 0 1540040880 11 0 2018/10/20 21:08 Reck1e55:还能赢回来吗?? 0 1 6337686460 Reck1e55 0 1540040880 12 0 2018/10/20 21:08 SayWanli:四年了本科四年全部都喜欢rng战队 心如死灰 0 0 2356759803 SayWanli 0 1540040880 13 0 2018/10/20 21:08 佳旸牌咖喱牛肉面:买尼玛的冠军皮肤,老子钱都存好了 0 0 6227448758 佳旸牌咖喱牛肉面 2 1540040880 14 0 2018/10/20 21:08 你是我年少时最冒险的梦-:我想问问你们??打的是个什么?努力了一整年??到最后打成这样??真的失望 1 0 3972972440 你是我年少时最冒险的梦- 0 1540040880 15 0 2018/10/20 21:08 深思76863:脱了 0 0 6264488668 深思76863 0 1540040880 16 0 2018/10/20 21:08 石榴哥w:失望透顶失望透顶,不再关注RNG 0 0 6731579474 石榴哥w 0 1540040880 17 0 2018/10/20 21:08 许向暖呐:滚 0 0 5670089757 许向暖呐 0 1540040880 18 0 2018/10/20 21:08 曹大老实人:你知道我在网吧看比赛我周围的人一直说rng回家的时候我心里多难受吗 0 0 6384972437 曹大老实人 3 1540040880 19 0 2018/10/20 21:08 錯過_0927:滾 0 0 2430243037 錯過_0927 0 1540040880 20 0 2018/10/20 21:08 沉着的肉夹馍馍:明年再见 0 0 2128400513 沉着的肉夹馍馍 5 1540040880 ```

kafka 集成 kerberos ,启动kafka报错

kafka 使用kerberos协议的时候,启动kakfa的时候报zookeeper校验不通过。 错误信息如下:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098295_69981.png) kerberos的用户密钥:![图片说明](https://img-ask.csdn.net/upload/201902/02/1549098396_900993.png) kerberos的etc/krb5.conf配置信息:[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = EXAMPLE.COM default_tkt_enctypes = arcfour-hmac-md5 dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true [realms] EXAMPLE.COM = { kdc = 192.168.1.41 admin_server = 192.168.1.41 } [domain_realm] kafka = EXAMPLE.COM zookeeper = EXAMPLE.COM weiwei = EXAMPLE.COM 192.168.1.41 = EXAMPLE.COM 127.0.0.1 = EXAMPLE.COM kerberos 的var/kerberos/krb5kdc/kdc.conf的配置信息: [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] EXAMPLE.COM = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab max_renewable_life = 7d supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal } kafka的kafka_server_jaas.conf的配置信息: KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="kafka/weiwei@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper_jaas.conf的配置信息: Server{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/var/kerberos/krb5kdc/kafka.keytab" principal="zookeeper/192.168.1.41@EXAMPLE.COM"; }; zookeeper.properties的新增配置信息: authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 server.properties 新增的配置信息: advertised.host.name=192.168.1.41 advertised.listeners=SASL_PLAINTEXT://192.168.1.41:9092 listeners=SASL_PLAINTEXT://192.168.1.41:9092 #listeners=PLAINTEXT://127.0.0.1:9093 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka zookeeper-server-start.sh 新增的配置信息 export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/zookeeper_jaas.conf' kafka-server-start.sh 新增的配置信息: export KAFKA_OPTS='-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/shubei/Downloads/kafka_2.12-1.0.0/config/kafka_server_jaas.conf' 配置信息基本是这样,快过年了,小弟在线求救,再预祝大侠们新年快乐。 ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ```

实时采集oracle和sqlserver的增量数据传入kafka

有没有什么方案可以实现实时采集oracle和sqlserver的增量数据传入kafka,不能设置时间戳,触发器,最好是监控操作日志

windows下kafka跑不起来,报错无法加载主类,CLASSPATH已经有加引号了

折腾了老半天,还是不行,zookeeper已经跑起来了,就是kafka跑不起来,百度了很久,很多人说的CLASSPATH加引号的方法,我看了我的文件,直接就是加引号的。就真的不知道是什么问题了。 然后Java环境变量也没有放在有空格或者中文的路径下。 求大神指点迷津。 报错一直是:命令语法不正确. 错误:找不到或无法加载主类kafka.Kafka ![图片说明](https://img-ask.csdn.net/upload/201908/04/1564926225_237840.png) ![图片说明](https://img-ask.csdn.net/upload/201908/04/1564926390_37103.png)

flink从checkpoint中恢复任务失败?

您好,请教一个问题。 我手动执行命令从checkpoint中恢复,但是提示文件不存在。 生成的checkpoint文件如下 ![图片说明](https://img-ask.csdn.net/upload/201902/23/1550912301_776703.png) 为什么不是 ![图片说明](https://img-ask.csdn.net/upload/201902/23/1550912325_212463.png) 我用savepoint恢复就可以成功 状态后端用的是new FsStateBackend("file:///data/flink/checkpoints")

Flink中与spark PairFunction对应的是什么

Flink中与spark PairFunction对应的是什么

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

程序员请照顾好自己,周末病魔差点一套带走我。

程序员在一个周末的时间,得了重病,差点当场去世,还好及时挽救回来了。

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

搜狗输入法也在挑战国人的智商!

故事总是一个接着一个到来...上周写完《鲁大师已经彻底沦为一款垃圾流氓软件!》这篇文章之后,鲁大师的市场工作人员就找到了我,希望把这篇文章删除掉。经过一番沟通我先把这篇文章从公号中删除了...

总结了 150 余个神奇网站,你不来瞅瞅吗?

原博客再更新,可能就没了,之后将持续更新本篇博客。

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

优雅的替换if-else语句

场景 日常开发,if-else语句写的不少吧??当逻辑分支非常多的时候,if-else套了一层又一层,虽然业务功能倒是实现了,但是看起来是真的很不优雅,尤其是对于我这种有强迫症的程序"猿",看到这么多if-else,脑袋瓜子就嗡嗡的,总想着解锁新姿势:干掉过多的if-else!!!本文将介绍三板斧手段: 优先判断条件,条件不满足的,逻辑及时中断返回; 采用策略模式+工厂模式; 结合注解,锦...

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

2020阿里全球数学大赛:3万名高手、4道题、2天2夜未交卷

阿里巴巴全球数学竞赛( Alibaba Global Mathematics Competition)由马云发起,由中国科学技术协会、阿里巴巴基金会、阿里巴巴达摩院共同举办。大赛不设报名门槛,全世界爱好数学的人都可参与,不论是否出身数学专业、是否投身数学研究。 2020年阿里巴巴达摩院邀请北京大学、剑桥大学、浙江大学等高校的顶尖数学教师组建了出题组。中科院院士、美国艺术与科学院院士、北京国际数学...

男生更看重女生的身材脸蛋,还是思想?

往往,我们看不进去大段大段的逻辑。深刻的哲理,往往短而精悍,一阵见血。问:产品经理挺漂亮的,有点心动,但不知道合不合得来。男生更看重女生的身材脸蛋,还是...

程序员为什么千万不要瞎努力?

本文作者用对比非常鲜明的两个开发团队的故事,讲解了敏捷开发之道 —— 如果你的团队缺乏统一标准的环境,那么即使勤劳努力,不仅会极其耗时而且成果甚微,使用...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试:第十六章:Java中级开发(16k)

HashMap底层实现原理,红黑树,B+树,B树的结构原理 Spring的AOP和IOC是什么?它们常见的使用场景有哪些?Spring事务,事务的属性,传播行为,数据库隔离级别 Spring和SpringMVC,MyBatis以及SpringBoot的注解分别有哪些?SpringMVC的工作原理,SpringBoot框架的优点,MyBatis框架的优点 SpringCould组件有哪些,他们...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

大三实习生,字节跳动面经分享,已拿Offer

说实话,自己的算法,我一个不会,太难了吧

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

《Oracle Java SE编程自学与面试指南》最佳学习路线图2020年最新版(进大厂必备)

正确选择比瞎努力更重要!

《Oracle Java SE编程自学与面试指南》最佳学习路线图(2020最新版)

正确选择比瞎努力更重要!

都前后端分离了,咱就别做页面跳转了!统统 JSON 交互

文章目录1. 无状态登录1.1 什么是有状态1.2 什么是无状态1.3 如何实现无状态1.4 各自优缺点2. 登录交互2.1 前后端分离的数据交互2.2 登录成功2.3 登录失败3. 未认证处理方案4. 注销登录 这是本系列的第四篇,有小伙伴找不到之前文章,松哥给大家列一个索引出来: 挖一个大坑,Spring Security 开搞! 松哥手把手带你入门 Spring Security,别再问密...

字节跳动面试官竟然问了我JDBC?

轻松等回家通知

面试官:你连SSO都不懂,就别来面试了

大厂竟然要考我SSO,卧槽。

阿里面试官让我用Zk(Zookeeper)实现分布式锁

他可能没想到,我当场手写出来了

终于,月薪过5万了!

来看几个问题想不想月薪超过5万?想不想进入公司架构组?想不想成为项目组的负责人?想不想成为spring的高手,超越99%的对手?那么本文内容是你必须要掌握的。本文主要详解bean的生命...

自从喜欢上了B站这12个UP主,我越来越觉得自己是个废柴了!

不怕告诉你,我自从喜欢上了这12个UP主,哔哩哔哩成为了我手机上最耗电的软件,几乎每天都会看,可是吧,看的越多,我就越觉得自己是个废柴,唉,老天不公啊,不信你看看…… 间接性踌躇满志,持续性混吃等死,都是因为你们……但是,自己的学习力在慢慢变强,这是不容忽视的,推荐给你们! 都说B站是个宝,可是有人不会挖啊,没事,今天咱挖好的送你一箩筐,首先啊,我在B站上最喜欢看这个家伙的视频了,为啥 ,咱撇...

代码注释如此沙雕,会玩还是你们程序员!

某站后端代码被“开源”,同时刷遍全网的,还有代码里的那些神注释。 我们这才知道,原来程序员个个都是段子手;这么多年来,我们也走过了他们的无数套路… 首先,产品经理,是永远永远吐槽不完的!网友的评论也非常扎心,说看这些代码就像在阅读程序员的日记,每一页都写满了对产品经理的恨。 然后,也要发出直击灵魂的质问:你是尊贵的付费大会员吗? 这不禁让人想起之前某音乐app的穷逼Vip,果然,穷逼在哪里都是...

立即提问
相关内容推荐