kafka数据可以写入数据,消费不可数据

kafka的offset值一直不变,可以往里面写数据
会是什么原因呢
手动改变offset的值是可以消费数据的

1个回答

消费者是否正确注册上了,是否有连接在线上

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
kafka消费数据老是丢失

WARN TaskSetManager: Lost task 9.0 in stage 26569.0 (TID 812602, 2, 2, 104-250-138-250.static.gorillaservers.com): k): k): ): ): kafka.common.NotLeaderForPForPForPartitionException 有两个groupID消费一个topic,出现上面的警告后,有一个groupID就消费不到数据了

kafka 消费者消费不到数据

[root@hzctc-kafka-5d61 ~]# kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group sbs-haodian-message1 --topic Message --zookeeper 10.1.5.61:2181 [2018-04-18 16:43:43,467] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/sbs-haodian-message1/offsets/Message/8. 用kafka的时候 用命令查看消费组消费情况 报这个错误 其他的消费组是正常的 哪位大神知道这是什么原因导致的 我在消费操作的时候加了缓存锁 每次poll操作之后的间隔时间不确定 可能是10S或者20S或者30S 不过我的sessiontimeiut设置了90s。这个会有什么影响吗

Spark Streaming读取kafka数据解析后写入ES,处理效率太低太慢

环境: * Kafka 0.10+(不影响) * Spark 2.4.0 + Yarn * ES 6.5.4 问题: 从Kafka读取获取消息,然后进行简单过滤清晰操作后,将消息写入到ES中,发现处理效率很低, Kafka有三个partition maxRatePerPartition=2000 batchInterval=1s //这种情况下刚刚好,就是处理延迟在1s左右浮动,不会出现任务堆积的情况 //此时处理配置 //num_executor=3 //executor_core=8 然后将读数据的maxRatePerPartition增大到10000乃至20000,发现处理速度始终没有变化 期间将num_executor设置为8,executor_core设置为8,还是没啥用 还增加了设置: ```java conf.set("spark.streaming,concurrentJobs","20") conf.set("spark.local.wait","100ms") ``` 还是没啥变化,大佬们,到底要咋调啊

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

python消费kafka数据,为什么前面取几次都取不到?

python 消费kafka数据时,刚开始连接时为什么取不到数据? 代码如下: ``` # -*- coding:utf8 -*- from kafka import KafkaConsumer from kafka import TopicPartition import kafka import time # 测试kafka poll方法能拉取多少的记录 consumer = KafkaConsumer( bootstrap_servers=['192.168.13.202:9092'], group_id='group-1', auto_offset_reset='earliest', enable_auto_commit=False) consumer.subscribe('test') print ("t1",time.time()) while True: print("t2", time.time()) msg = consumer.poll(timeout_ms=100, max_records=5) # 从kafka获取消息 # print (len(msg)) for i in msg.values(): for k in i: print(k.offset, k.value) time.sleep(1) ``` 打印的结果却是 ``` t1 1567669170.438951 t2 1567669170.438951 t2 1567669171.8450315 t2 1567669172.945094 t2 1567669174.0471573 t2 1567669175.1472201 0 b'{"ast":"\xe7\x82\xb"}' 1 b'{"ast":"","dm":2}' 2 b'{"ast":"12"}' 3 b'{"ast":"sd"}' 4 b'{"ast":"12ds"}' t2 1567669176.1822793 ``` 为什么连接上kafka之后,会取5次才会取到数据?

kafka通过consumer java api实现消费者,KafkaStream打印不出来数据

kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据 ``` package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumerTest extends Thread { //在linux环境运行正常 @Override public void run() { // TODO Auto-generated method stub String topic="powerTopic"; Properties pro=new Properties(); pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181"); pro.put("group.id", "test"); // pro.put("zookeeper.session.timeout.ms", "4000"); // pro.put("consumer.timeout.ms", "-1"); ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro); ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig); Map<String, Integer> paramMap=new HashMap<String, Integer>(); paramMap.put(topic,1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap); KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0); // System.out.println(kafkastream.size()); System.out.println("hello"); ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator(); while(iterator.hasNext()){ // MessageAndMetadata<byte[], byte[]> message=iterator.next(); // String topic1=message.topic(); String msg=new String(iterator.next().message()); System.out.println(msg); } } public static void main(String[] args) { // TODO Auto-generated method stub new KafkaConsumerTest().start(); new MyProducer01().start(); } } ``` kafka环境在centos操作系统,在windows系统的eclipse运行程序,打印不出来数据,也不结束报错: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576807897_599340.jpg) 打包后在集群环境运行结果: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576808400_74063.jpg)

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

kafka多个消费者消费同一数据

kafka不同groupid下的消费者,消费同一topic下的某一条数据,为什么offset值不变? 只被消费了一次吗?

spark读取kafka数据, 缓存当天数据

spark stream从kafka读取数据,10秒间隔;需要缓存当天数据用于业务分析。 思路1:定义static rdd用于union每次接收到的rdd;用window窗口(窗口长1小时,滑动步长20分钟);union之后checkpoint。 但是发现在利用static rdd做业务分析的时候,应该是因为磁盘io,所以执行时间太长。 思路2:一样定义static rdd, context调用remember(24小时)保留数据24小时(数据缓存在哪里了,暂时不清楚,汗);但是在业务分析时,发现static rdd的count结果为0 求教怎么缓存一段时间的rdd 数据放executor内存或分布放个worker都可以,一天的数据量大概在100g,过滤后再5g,机器内存256g

kafka数据上传hbase的问题

我使用的环境是hdp的伪分布集群 我的项目是flume采集数据发送到kafka的各个topic当中 再由jar文件使得从kafka当中获取数据 发送到hbase做持久化 然后因为数据量颇大 每次传个半个小时的数据 regionserver就挂掉了 项目是肯定没问题的 因为目前在学习阶段 别人是可以执行且不报错的 问题如下所示 ``` java.io.FileNotFoundException: File /tmp/hbase-root/hbase/lib does not exist at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431) ~[hadoop-common-2.7.3.jar!/:na] at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) ~[hadoop-common-2.7.3.jar!/:na] at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) ~[hadoop-common-2.7.3.jar!/:na] at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) ~[hadoop-common-2.7.3.jar!/:na] at org.apache.hadoop.hbase.util.DynamicClassLoader.loadNewJars(DynamicClassLoader.java:178) [hbase-common-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.util.DynamicClassLoader.loadClass(DynamicClassLoader.java:142) [hbase-common-1.1.2.jar!/:1.1.2] at java.lang.Class.forName0(Native Method) [na:1.8.0_161] at java.lang.Class.forName(Class.java:348) [na:1.8.0_161] at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toException(ProtobufUtil.java:1543) [hbase-client-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.protobuf.ResponseConverter.getResults(ResponseConverter.java:120) [hbase-client-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.client.MultiServerCallable.call(MultiServerCallable.java:134) [hbase-client-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.client.MultiServerCallable.call(MultiServerCallable.java:54) [hbase-client-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) [hbase-client-1.1.2.jar!/:1.1.2] at org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl$SingleServerRequestRunnable.run(AsyncProcess.java:708) [hbase-client-1.1.2.jar!/:1.1.2] ``` 他突然开始寻找 File /tmp/hbase-root/hbase/lib does not exist 这个路径的文件 我的项目中并没有从这个路径下寻找文件 我前往到这个路径 路径是空的 就是根本没有这个路径 然后我前往hbase的log中查看 hbase来了一套组合拳 ``` 2020-03-21 19:29:49,789 ERROR [Thread-19] util.PolicyRefresher: PolicyRefresher(serviceName=Sandbox_hbase): failed to refresh policies. Will continue to use last known version of policies (6) com.sun.jersey.api.client.ClientHandlerException: java.net.SocketTimeoutException: Read timed out at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:149) at com.sun.jersey.api.client.Client.handle(Client.java:648) at com.sun.jersey.api.client.WebResource.handle(WebResource.java:670) at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74) at com.sun.jersey.api.client.WebResource$Builder.get(WebResource.java:503) at org.apache.ranger.admin.client.RangerAdminRESTClient.getServicePoliciesIfUpdated(RangerAdminRESTClient.java:135) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicyfromPolicyAdmin(PolicyRefresher.java:264) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicy(PolicyRefresher.java:202) at org.apache.ranger.plugin.util.PolicyRefresher.run(PolicyRefresher.java:171) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492) at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) at com.sun.jersey.client.urlconnection.URLConnectionClientHandler._invoke(URLConnectionClientHandler.java:240) at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:147) ... 8 more ``` 然后就是读取超时 ``` com.sun.jersey.api.client.ClientHandlerException: java.net.SocketTimeoutException: Read timed out at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:149) at com.sun.jersey.api.client.Client.handle(Client.java:648) at com.sun.jersey.api.client.WebResource.handle(WebResource.java:670) at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74) at com.sun.jersey.api.client.WebResource$Builder.get(WebResource.java:503) at org.apache.ranger.admin.client.RangerAdminRESTClient.getServicePoliciesIfUpdated(RangerAdminRESTClient.java:135) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicyfromPolicyAdmin(PolicyRefresher.java:264) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicy(PolicyRefresher.java:202) at org.apache.ranger.plugin.util.PolicyRefresher.run(PolicyRefresher.java:171) ``` 然后就是最匪夷所思的异常 ``` 2020-03-21 19:33:36,252 ERROR [Thread-19] util.PolicyRefresher: PolicyRefresher(serviceName=Sandbox_hbase): failed to refresh policies. Will continue to use last known version of policies (6) com.sun.jersey.api.client.ClientHandlerException: java.net.SocketTimeoutException: Read timed out at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:149) at com.sun.jersey.api.client.Client.handle(Client.java:648) at com.sun.jersey.api.client.WebResource.handle(WebResource.java:670) at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74) at com.sun.jersey.api.client.WebResource$Builder.get(WebResource.java:503) at org.apache.ranger.admin.client.RangerAdminRESTClient.getServicePoliciesIfUpdated(RangerAdminRESTClient.java:135) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicyfromPolicyAdmin(PolicyRefresher.java:264) at org.apache.ranger.plugin.util.PolicyRefresher.loadPolicy(PolicyRefresher.java:202) at org.apache.ranger.plugin.util.PolicyRefresher.run(PolicyRefresher.java:171) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492) at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) at com.sun.jersey.client.urlconnection.URLConnectionClientHandler._invoke(URLConnectionClientHandler.java:240) at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:147) ... 8 more ``` 求大佬解答

kafkaconsumer数据不能批量消费

我用kafkaconsumer批量消费数据, 无法获取获取批量, 从日志看offset提交也异常, 规律性的每3次提交成功一次, 数据每次获取一条, 无法批量获取。 困扰了两天了, 莫名啊。 ``` public class KafkaManualConsumer { public static void main(String[] args) { Properties properties = new Properties(); System.setProperty("java.security.auth.login.config", "c:/kafka_client_jaas.conf"); //配置文件路径 properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("bootstrap.servers", "VM_0_16_centos:9092"); //kafka:9092 properties.put("enable.auto.commit", "false"); //properties.put("session.timeout.ms", 60000); properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); properties.put("fetch.max.wait.ms", 5000); properties.put("max.poll.records", 5000); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "yuu67u36"); // properties.put("receive.buffer.bytes", 3276800); // properties.put("heartbeat.interval.ms", 59000); // properties.put("client.id", "t4t5t234f34f3f"); // properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32*1024*1024); // properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 64*1024*1024); // properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 128*1024*1024); //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 2000*1024); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-attendphotos")); //kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-stat")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000L); System.out.println("-----------------"); System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("offset = " + record.offset()); VideoPhotoOuter dto = JSON.parseObject(record.value(), VideoPhotoOuter.class); System.out.println(dto.getPhotos().get(0).getPhotoFmt()); //System.out.printf("offset = %d, value = %s", record.offset(), record.value()); } try { kafkaConsumer.commitSync(); Thread.currentThread().sleep(1000L); } catch(Exception ex) { //手动抛出SQLException使用事务回滚 } } //kafkaConsumer.close(); } } ``` 下面是控制台日志 ``` 17:32:06.758 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [VM_0_16_centos:9092] check.crcs = true client.dns.lookup = default client.id = t4t5t234f34f3f client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 5000 fetch.min.bytes = 1 group.id = yuu67u36 group.instance.id = null heartbeat.interval.ms = 59000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 5000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 3276800 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 60000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ....................... ........................ ----------------- 0 17:32:08.068 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:10.095 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0 17:32:12.091 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent a full fetch response that created a new incremental fetch session 725149318 with 1 response partition(s) 17:32:12.092 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90261 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93666, lastStableOffset = 93666, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576) 17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.bytes-fetched 17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.records-fetched 17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lag 17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lead 17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90262, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null) 17:32:12.122 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=1) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null) ----------------- 1 offset = 90261 JPG 17:32:12.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:14.256 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:16.279 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent an incremental fetch response for session 725149318 with 1 response partition(s) 17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90262 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93668, lastStableOffset = 93668, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576) 17:32:17.280 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90263, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null) 17:32:17.281 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=2) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 17:32:17.281 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null) ----------------- 1 ```

【求助】structed streaming 在消费kafka数据时,怎么保证数据完整&有且仅有1次被消费

看了下官方文档,原本streaming在使用direct模式时,可以自己维护offset,感觉上还比较靠谱。 现在structed streaming 使用kafka时,enable.auto.commit 是不可设置的,按照文档说的是,structed streaming 不提交任何offset, 那spark在新版本的消费kafka中,如何保证有且仅有一次,或者是至少被消费一次。

kafka 在子线程消费数据,有卡顿的情况?

2个程序消费数据,一个程序在主线程消费就没这个问题,另个一个子线程消费数据,有卡顿的情况?

Flink如何将kafka里的消息写入到对应的topic

已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?

logstash配置文件到kafka,kafka接收不到数据

input { file { path => "/var/log/172.16.16.121.log" } } filter { } output { kafka { bootstrap_servers => "172.16.16.120:9092" topic_id => "test_SSA1" compression_type => "snappy" } }

spring-kafka做了分区,部分分区数据可以正常消费,部分分区始终无法消费?

spring-kafka(版本:1.0.6.RELEASE,kafka-client:0.9.0.1),创建了8个分区,有一个分区的数据始终无法消费,其他分区数据正常消费。有大神知道是为什么?

监控kafka数据源是否堆积

监控kafka数据源是否堆积,java怎么实现 Kafka版本是0.8.2.1

用flume读取kafka数据到hdfs,source创建时报错Kafka topic must be specified

计划使用flume读取kafka的数据传送到hdfs上,结果错误如下 ![图片说明](https://img-ask.csdn.net/upload/202003/20/1584694939_566000.png) 但是我的kafka里确实有对应的topic,名字为topic_start,下面是我的配置文件 ![图片说明](https://img-ask.csdn.net/upload/202003/20/1584695029_280148.png) 不知道问题出在哪里,目前测试结果为kafka里的topic,flume都无法读取出来,求大佬帮忙解决一下这个问题。目前我毫无头绪,如果不行只能考录重新安装flume等办法了

MySQL 8.0.19安装教程(windows 64位)

话不多说直接开干 目录 1-先去官网下载点击的MySQL的下载​ 2-配置初始化的my.ini文件的文件 3-初始化MySQL 4-安装MySQL服务 + 启动MySQL 服务 5-连接MySQL + 修改密码 先去官网下载点击的MySQL的下载 下载完成后解压 解压完是这个样子 配置初始化的my.ini文件的文件 ...

Python+OpenCV计算机视觉

Python+OpenCV计算机视觉系统全面的介绍。

Vue.js 2.0之全家桶系列视频课程

基于新的Vue.js 2.3版本, 目前新全的Vue.js教学视频,让你少走弯路,直达技术前沿! 1. 包含Vue.js全家桶(vue.js、vue-router、axios、vuex、vue-cli、webpack、ElementUI等) 2. 采用笔记+代码案例的形式讲解,通俗易懂

navicat(内含激活码)

navicat支持mysql的可视化操作,内涵激活码,不用再忍受弹框的痛苦。

HTML期末大作业

这是我自己做的HTML期末大作业,花了很多时间,稍加修改就可以作为自己的作业了,而且也可以作为学习参考

150讲轻松搞定Python网络爬虫

【为什么学爬虫?】 &nbsp; &nbsp; &nbsp; &nbsp;1、爬虫入手容易,但是深入较难,如何写出高效率的爬虫,如何写出灵活性高可扩展的爬虫都是一项技术活。另外在爬虫过程中,经常容易遇到被反爬虫,比如字体反爬、IP识别、验证码等,如何层层攻克难点拿到想要的数据,这门课程,你都能学到! &nbsp; &nbsp; &nbsp; &nbsp;2、如果是作为一个其他行业的开发者,比如app开发,web开发,学习爬虫能让你加强对技术的认知,能够开发出更加安全的软件和网站 【课程设计】 一个完整的爬虫程序,无论大小,总体来说可以分成三个步骤,分别是: 网络请求:模拟浏览器的行为从网上抓取数据。 数据解析:将请求下来的数据进行过滤,提取我们想要的数据。 数据存储:将提取到的数据存储到硬盘或者内存中。比如用mysql数据库或者redis等。 那么本课程也是按照这几个步骤循序渐进的进行讲解,带领学生完整的掌握每个步骤的技术。另外,因为爬虫的多样性,在爬取的过程中可能会发生被反爬、效率低下等。因此我们又增加了两个章节用来提高爬虫程序的灵活性,分别是: 爬虫进阶:包括IP代理,多线程爬虫,图形验证码识别、JS加密解密、动态网页爬虫、字体反爬识别等。 Scrapy和分布式爬虫:Scrapy框架、Scrapy-redis组件、分布式爬虫等。 通过爬虫进阶的知识点我们能应付大量的反爬网站,而Scrapy框架作为一个专业的爬虫框架,使用他可以快速提高我们编写爬虫程序的效率和速度。另外如果一台机器不能满足你的需求,我们可以用分布式爬虫让多台机器帮助你快速爬取数据。 &nbsp; 从基础爬虫到商业化应用爬虫,本套课程满足您的所有需求! 【课程服务】 专属付费社群+每周三讨论会+1v1答疑

三个项目玩转深度学习(附1G源码)

从事大数据与人工智能开发与实践约十年,钱老师亲自见证了大数据行业的发展与人工智能的从冷到热。事实证明,计算机技术的发展,算力突破,海量数据,机器人技术等,开启了第四次工业革命的序章。深度学习图像分类一直是人工智能的经典任务,是智慧零售、安防、无人驾驶等机器视觉应用领域的核心技术之一,掌握图像分类技术是机器视觉学习的重中之重。针对现有线上学习的特点与实际需求,我们开发了人工智能案例实战系列课程。打造:以项目案例实践为驱动的课程学习方式,覆盖了智能零售,智慧交通等常见领域,通过基础学习、项目案例实践、社群答疑,三维立体的方式,打造最好的学习效果。

基于STM32的电子时钟设计

时钟功能 还有闹钟功能,温湿度功能,整点报时功能 你值得拥有

学生成绩管理系统(PHP + MYSQL)

做的是数据库课程设计,使用的php + MySQL,本来是黄金搭配也就没啥说的,推荐使用wamp服务器,里面有详细的使用说明,带有界面的啊!呵呵 不行的话,可以给我留言!

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

程序员的兼职技能课

获取讲师答疑方式: 在付费视频第一节(触摸命令_ALL)片头有二维码及加群流程介绍 限时福利 原价99元,今日仅需39元!购课添加小助手(微信号:itxy41)按提示还可领取价值800元的编程大礼包! 讲师介绍: 苏奕嘉&nbsp;前阿里UC项目工程师 脚本开发平台官方认证满级(六级)开发者。 我将如何教会你通过【定制脚本】赚到你人生的第一桶金? 零基础程序定制脚本开发课程,是完全针对零脚本开发经验的小白而设计,课程内容共分为3大阶段: ①前期将带你掌握Q开发语言和界面交互开发能力; ②中期通过实战来制作有具体需求的定制脚本; ③后期将解锁脚本的更高阶玩法,打通任督二脉; ④应用定制脚本合法赚取额外收入的完整经验分享,带你通过程序定制脚本开发这项副业,赚取到你的第一桶金!

实用主义学Python(小白也容易上手的Python实用案例)

原价169,限时立减100元! 系统掌握Python核心语法16点,轻松应对工作中80%以上的Python使用场景! 69元=72讲+源码+社群答疑+讲师社群分享会&nbsp; 【哪些人适合学习这门课程?】 1)大学生,平时只学习了Python理论,并未接触Python实战问题; 2)对Python实用技能掌握薄弱的人,自动化、爬虫、数据分析能让你快速提高工作效率; 3)想学习新技术,如:人工智能、机器学习、深度学习等,这门课程是你的必修课程; 4)想修炼更好的编程内功,优秀的工程师肯定不能只会一门语言,Python语言功能强大、使用高效、简单易学。 【超实用技能】 从零开始 自动生成工作周报 职场升级 豆瓣电影数据爬取 实用案例 奥运冠军数据分析 自动化办公:通过Python自动化分析Excel数据并自动操作Word文档,最终获得一份基于Excel表格的数据分析报告。 豆瓣电影爬虫:通过Python自动爬取豆瓣电影信息并将电影图片保存到本地。 奥运会数据分析实战 简介:通过Python分析120年间奥运会的数据,从不同角度入手分析,从而得出一些有趣的结论。 【超人气老师】 二两 中国人工智能协会高级会员 生成对抗神经网络研究者 《深入浅出生成对抗网络:原理剖析与TensorFlow实现》一书作者 阿里云大学云学院导师 前大型游戏公司后端工程师 【超丰富实用案例】 0)图片背景去除案例 1)自动生成工作周报案例 2)豆瓣电影数据爬取案例 3)奥运会数据分析案例 4)自动处理邮件案例 5)github信息爬取/更新提醒案例 6)B站百大UP信息爬取与分析案例 7)构建自己的论文网站案例

Java8零基础入门视频教程

这门课程基于主流的java8平台,由浅入深的详细讲解了java SE的开发技术,可以使java方向的入门学员,快速扎实的掌握java开发技术!

Python数据挖掘简易入门

&nbsp; &nbsp; &nbsp; &nbsp; 本课程为Python数据挖掘方向的入门课程,课程主要以真实数据为基础,详细介绍数据挖掘入门的流程和使用Python实现pandas与numpy在数据挖掘方向的运用,并深入学习如何运用scikit-learn调用常用的数据挖掘算法解决数据挖掘问题,为进一步深入学习数据挖掘打下扎实的基础。

零基础学C#编程—C#从小白到大咖

本课程从初学者角度出发,提供了C#从入门到成为程序开发高手所需要掌握的各方面知识和技术。 【课程特点】 1 由浅入深,编排合理; 2 视频讲解,精彩详尽; 3 丰富实例,轻松易学; 4 每章总结配有难点解析文档。 15大章节,228课时,1756分钟与你一同进步!

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

多功能数字钟.zip

利用数字电子计数知识设计并制作的数字电子钟(含multisim仿真),该数字钟具有显示星期、24小时制时间、闹铃、整点报时、时间校准功能

极简JAVA学习营第四期(报名以后加助教微信:eduxy-1)

想学好JAVA必须要报两万的培训班吗? Java大神勿入 如果你: 零基础想学JAVA却不知道从何入手 看了一堆书和视频却还是连JAVA的环境都搭建不起来 囊中羞涩面对两万起的JAVA培训班不忍直视 在职没有每天大块的时间专门学习JAVA 那么恭喜你找到组织了,在这里有: 1. 一群志同道合立志学好JAVA的同学一起学习讨论JAVA 2. 灵活机动的学习时间完成特定学习任务+每日编程实战练习 3. 热心助人的助教和讲师及时帮你解决问题,不按时完成作业小心助教老师的家访哦 上一张图看看前辈的感悟: &nbsp; &nbsp; 大家一定迫不及待想知道什么是极简JAVA学习营了吧,下面就来给大家说道说道: 什么是极简JAVA学习营? 1. 针对Java小白或者初级Java学习者; 2. 利用9天时间,每天1个小时时间; 3.通过 每日作业 / 组队PK / 助教答疑 / 实战编程 / 项目答辩 / 社群讨论 / 趣味知识抢答等方式让学员爱上学习编程 , 最终实现能独立开发一个基于控制台的‘库存管理系统’ 的学习模式 极简JAVA学习营是怎么学习的? &nbsp; 如何报名? 只要购买了极简JAVA一:JAVA入门就算报名成功! &nbsp;本期为第四期极简JAVA学习营,我们来看看往期学员的学习状态: 作业看这里~ &nbsp; 助教的作业报告是不是很专业 不交作业打屁屁 助教答疑是不是很用心 &nbsp; 有奖抢答大家玩的很嗨啊 &nbsp; &nbsp; 项目答辩终于开始啦 &nbsp; 优秀者的获奖感言 &nbsp; 这是答辩项目的效果 &nbsp; &nbsp; 这么细致的服务,这么好的氛围,这样的学习效果,需要多少钱呢? 不要1999,不要199,不要99,只要9.9 是的你没听错,只要9.9以上所有就都属于你了 如果你: 1、&nbsp;想学JAVA没有基础 2、&nbsp;想学JAVA没有整块的时间 3、&nbsp;想学JAVA没有足够的预算 还等什么?赶紧报名吧,抓紧抢位,本期只招300人,错过只有等时间待定的下一期了 &nbsp; 报名请加小助手微信:eduxy-1 &nbsp; &nbsp;

Python可以这样学(第一季:Python内功修炼)

董付国系列教材《Python程序设计基础》、《Python程序设计(第2版)》、《Python可以这样学》配套视频,讲解Python 3.5.x和3.6.x语法、内置对象用法、选择与循环以及函数设计与使用、lambda表达式用法、字符串与正则表达式应用、面向对象编程、文本文件与二进制文件操作、目录操作与系统运维、异常处理结构。

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

机器学习实战系列套餐(必备基础+经典算法+案例实战)

机器学习实战系列套餐以实战为出发点,帮助同学们快速掌握机器学习领域必备经典算法原理并结合Python工具包进行实战应用。建议学习顺序:1.Python必备工具包:掌握实战工具 2.机器学习算法与实战应用:数学原理与应用方法都是必备技能 3.数据挖掘实战:通过真实数据集进行项目实战。按照下列课程顺序学习即可! 课程风格通俗易懂,用最接地气的方式带领大家轻松进军机器学习!提供所有课程代码,PPT与实战数据,有任何问题欢迎随时与我讨论。

Java面试题大全(2020版)

发现网上很多Java面试题都没有答案,所以花了很长时间搜集整理出来了这套Java面试题大全,希望对大家有帮助哈~ 本套Java面试题大全,全的不能再全,哈哈~ 一、Java 基础 1. JDK 和 JRE 有什么区别? JDK:Java Development Kit 的简称,java 开发工具包,提供了 java 的开发环境和运行环境。 JRE:Java Runtime Environ...

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

深度学习原理+项目实战+算法详解+主流框架(套餐)

深度学习系列课程从深度学习基础知识点开始讲解一步步进入神经网络的世界再到卷积和递归神经网络,详解各大经典网络架构。实战部分选择当下最火爆深度学习框架PyTorch与Tensorflow/Keras,全程实战演示框架核心使用与建模方法。项目实战部分选择计算机视觉与自然语言处理领域经典项目,从零开始详解算法原理,debug模式逐行代码解读。适合准备就业和转行的同学们加入学习! 建议按照下列课程顺序来进行学习 (1)掌握深度学习必备经典网络架构 (2)深度框架实战方法 (3)计算机视觉与自然语言处理项目实战。(按照课程排列顺序即可)

HoloLens2开发入门教程

本课程为HoloLens2开发入门教程,讲解部署开发环境,安装VS2019,Unity版本,Windows SDK,创建Unity项目,讲解如何使用MRTK,编辑器模拟手势交互,打包VS工程并编译部署应用到HoloLens上等。

几率大的Redis面试题(含答案)

本文的面试题如下: Redis 持久化机制 缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级等问题 热点数据和冷数据是什么 Memcache与Redis的区别都有哪些? 单线程的redis为什么这么快 redis的数据类型,以及每种数据类型的使用场景,Redis 内部结构 redis的过期策略以及内存淘汰机制【~】 Redis 为什么是单线程的,优点 如何解决redis的并发竞争key问题 Red...

MFC一站式终极全套课程包

该套餐共包含从C小白到C++到MFC的全部课程,整套学下来绝对成为一名C++大牛!!!

【数据结构与算法综合实验】欢乐连连看(C++ & MFC)案例

这是武汉理工大学计算机学院数据结构与算法综合实验课程的第三次项目:欢乐连连看(C++ & MFC)迭代开发代码。运行环境:VS2017。已经实现功能:开始游戏、消子、判断胜负、提示、重排、计时、帮助。

YOLOv3目标检测实战:训练自己的数据集

YOLOv3是一种基于深度学习的端到端实时目标检测方法,以速度快见长。本课程将手把手地教大家使用labelImg标注和使用YOLOv3训练自己的数据集。课程分为三个小项目:足球目标检测(单目标检测)、梅西目标检测(单目标检测)、足球和梅西同时目标检测(两目标检测)。 本课程的YOLOv3使用Darknet,在Ubuntu系统上做项目演示。包括:安装Darknet、给自己的数据集打标签、整理自己的数据集、修改配置文件、训练自己的数据集、测试训练出的网络模型、性能统计(mAP计算和画出PR曲线)和先验框聚类。 Darknet是使用C语言实现的轻型开源深度学习框架,依赖少,可移植性好,值得深入探究。 除本课程《YOLOv3目标检测实战:训练自己的数据集》外,本人推出了有关YOLOv3目标检测的系列课程,请持续关注该系列的其它课程视频,包括: 《YOLOv3目标检测实战:交通标志识别》 《YOLOv3目标检测:原理与源码解析》 《YOLOv3目标检测:网络模型改进方法》 敬请关注并选择学习!

u-boot-2015.07.tar.bz2

uboot-2015-07最新代码,喜欢的朋友请拿去

相关热词 c# 局部 截图 页面 c#实现简单的文件管理器 c# where c# 取文件夹路径 c# 对比 当天 c# fir 滤波器 c# 和站 队列 c# txt 去空格 c#移除其他类事件 c# 自动截屏
立即提问