tenc1239 2023-04-21 21:55 采纳率: 76%
浏览 22

disruptor 中多消费者 ,bufferSize 和 newFixedThreadPool数量对消费的影响为什么是这样

disruptor 中多消费者 ,bufferSize 和 newFixedThreadPool数量对消费的影响为什么是这样
bufferSize 设为 1024 ,消费者设置为10个, 线程池设置为5 时,运行一万次任务,打印出来只有1024 个
但是 线程池改为10 个就能打印出来 10000个
或者 bufferSize 设为 10000 就能打印出来 10000个
猜测 bufferSize 设为 2048 ,消费者设置为10个, 线程池设置为5 时 ,打印出来只有2048 个
到底为什么, disruptor 中 bufferSize 和线程数量 线程池数量的关系是什么?


public class Main { // 01:46:38 // https://www.bilibili.com/video/BV1xd4y1g7iU/?spm_id_from=333.1073.top_right_bar_window_history.content.click&vd_source=ff8b7f852278821525f11666b36f180a
    public static void main(String[] args) throws InterruptedException {
        RingBuffer<Order> ringBuffer = RingBuffer.create(
                ProducerType.MULTI,
                new EventFactory<Order>() {
                    @Override
                    public Order newInstance() {
                        return new Order();
                    }
                },
                1024,
                new BlockingWaitStrategy()
        );

        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        Consumer[] consumers = new Consumer[10];
        for(int i = 0 ; i < consumers.length; i ++){

            consumers[i] = new Consumer("C"+i); // 实例化每个消费者 并赋ID
        }

        WorkerPool<Order> orderWorkerPool = new WorkerPool<>(
                ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                consumers
        );
        ringBuffer.addGatingSequences(orderWorkerPool.getWorkerSequences());

        orderWorkerPool.start(Executors.newFixedThreadPool(10)); // 启动线程池
        CyclicBarrier latch = new CyclicBarrier(100); // 控制并发

        for (int i =0 ; i < 100; i ++){
            Producer producer = new Producer(ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        latch.await();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    for (int j = 0; j < 100; j++) { // 并发
                        producer.sendData(UUID.randomUUID().toString());
                    }
                }
            }
            ).start();
        }
        System.out.println("---------------线程创建完毕 , 开始生产数据---------------------");
        Thread.sleep(10000);
        System.out.println("任务总是" + consumers[2].getCount());

    }
}

public class Consumer implements WorkHandler<Order> {
    private  String cosumerId;
    private static AtomicInteger count = new AtomicInteger(0);
    public  Consumer(String cosumerId) {
        this.cosumerId = cosumerId;
    }

    public int getCount() {
        return count.get();
    }

    @Override
    public void onEvent(Order order) throws Exception {
        System.out.println("当前消费者:" + this.cosumerId + "消息id: " + order.getId());
        count.incrementAndGet();
    }
}

```java
public class Producer {
    private RingBuffer<Order> ringBuffer;
    public  Producer(RingBuffer<Order> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public  void  sendData(String uuid) {
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(uuid);

        }finally {
            ringBuffer.publish(sequence);
        }
    }
}


```java
@Data
public class Order {
    private String id;
    private String name;
    private  double price;
}


public class EventExceptionHandler implements ExceptionHandler<Order> {
    @Override
    public void handleEventException(Throwable ex, long sequence, Order order) {
        System.out.println("异常sequence:" + sequence + "order:" + order);
    }

    @Override
    public void handleOnStartException(Throwable ex) {

    }

    @Override
    public void handleOnShutdownException(Throwable ex) {

    }
}
  • 写回答

1条回答 默认 最新

  • 创意程序员 2023-04-21 23:00
    关注

    disruptor使用ringBuffer环形队列,bufferSize存放就是的消息数量,线程池大小就是能消费消息的能力,队列满了再发送消息要么覆盖要么丢弃,所以打印少。线程池大能及时消费更多消息,或者队列长度大能存下更多消息以慢慢消费,所以打印多。

    评论

报告相同问题?

问题事件

  • 修改了问题 4月21日
  • 创建了问题 4月21日

悬赏问题

  • ¥15 无法输出helloworld
  • ¥15 高通uboot 打印ubi init err 22
  • ¥20 PDF元数据中的XMP媒体管理属性
  • ¥15 R语言中lasso回归报错
  • ¥15 网站突然不能访问了,上午还好好的
  • ¥15 有没有dl可以帮弄”我去图书馆”秒选道具和积分
  • ¥15 semrush,SEO,内嵌网站,api
  • ¥15 Stata:为什么reghdfe后的因变量没有被发现识别啊
  • ¥15 振荡电路,ADS仿真
  • ¥15 关于#c语言#的问题,请各位专家解答!