配置:
Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094");
//请求时候需要验证
props.put("acks", "0");
//请求失败时候需要重试
props.put("retries", 1);
//生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量
//props.put("batch.size", 16384);
//汇聚一定时间内的记录一起发出
// props.put("linger.ms", 50);
//内存缓存区大小
props.put("buffer.memory", 33554432);
//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
生产数据
没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i));
}
困扰很久,不知道配置还是哪里问题。