meilidedabobo
夏未央-夜未眠
采纳率0%
2016-11-30 03:05 阅读 3.3k

Java多线程并发如何模拟

一个Java Application运行后,在系统中是作为一 个线程吗?运行main方法以后,为什么不能实现多个线程启动?我想用java程序模拟多个消费者和kafka生产者通信,可以实现吗
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**

  • Created by Administrator on 2016/10/17.
    */
    public class consumerTest extends Thread{
    public synchronized void run(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    //消费者组group
    props.put("group.id", "test");
    //是否自动确认offset
    props.put("enable.auto.commit", "true");
    //自动确认offset的时间间隔
    props.put("auto.commit.interval.ms", "1000");
    //会话超时时间
    props.put("session.timeout.ms", "30000");
    //key的序列化类
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //value的序列化类
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer consumer = new KafkaConsumer(props);
    //消费者订阅的topic, 可同时订阅多个
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
    //读取数据,读取超时时间为100ms
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records)
    System.out.println(String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()));
    }
    }

    public static void main(String[] args) {
    consumerTest test1=new consumerTest();
    Thread threads[] = new Thread[5];
    for (int i = 0; i < threads.length; i++)
    threads[i] = new Thread(test1);
    for (int i = 0; i < threads.length; i++)
    {
    threads[i].start();
    }
    }
    }

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

2条回答 默认 最新

  • wojiushiwo945you 毕小宝 2016-11-30 03:57
     你去搜索demo运行,看看结果,然后单步跟踪下执行过程就明白了。生产者消费者模式最重要的就是共享同一个资源队列,一个存一个取,这个过程中做好同步处理。
    
    点赞 评论 复制链接分享
  • linxingall linxingall 2016-11-30 05:56

    java有现成的线程池
    ExecutorService executor = Executors.newFixedThreadPool(poolSize);

    点赞 评论 复制链接分享

相关推荐