国服冰 2021-10-19 13:54 采纳率: 0%
浏览 34

【Java并发编程系列】高并发工具类之线程协作工具类

一、CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch原理


现在有这样一个运动会场景,总共有5名运动员,1名裁判,比赛开始前,所有运动员需等待裁判打出发令枪,然后开始跑,等所有运动员跑完后,裁判需要号召全体运动员集合

public class SportsMeeting{
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);

        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("运动员" + no + "准备完毕");
                        begin.await();
                        System.out.println("运动员" + no + "正在跑!");
                        Thread.sleep(new Random().nextInt(2000));
                        System.out.println("运动员" + no + "跑到终点!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }

            };
            poolExecutor.execute(runnable);
        }

        /* 裁判准备发令枪 */
        Thread.sleep(5000);
        System.out.println("---------发令枪响,比赛开始!---------");
        begin.countDown();
        end.await();
        System.out.println("---------比赛结束,所有参赛人员请集合!---------");

        poolExecutor.shutdown();
    }
}
运动员5准备完毕
运动员4准备完毕
运动员1准备完毕
运动员2准备完毕
运动员3准备完毕
---------发令枪响,比赛开始!---------
运动员4正在跑!
运动员2正在跑!
运动员1正在跑!
运动员5正在跑!
运动员3正在跑!
运动员5跑到终点!
运动员1跑到终点!
运动员2跑到终点!
运动员4跑到终点!
运动员3跑到终点!
---------比赛结束,所有参赛人员请集合!---------

ps:计数器必须大于0,计数器等于0的时候,调用await方法时就不会阻塞当前线程。CountDownLatch不能重新初始化或者修改CountDownLatch内部计数器的值

二、CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情就是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行

CyclicBarrier有两组构造函数

public CyclicBarrier(int parties) {
        this(parties, null);
    }
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

第二组构造函数的不同之处就是当所有线程到达屏障时,优先执行barrierAction线程,方便处理更复杂的业务场景

前面介绍了CountDownLatch,这两者的主要区别是:

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束,主要用于事件。CyclicBarrier
    参主要用于线程。

现在有这样一个场景,有一组学生总共15个人需要乘车前往一个旅游景点,但是一辆车一次性只能乘坐5名同学,所以只能分批次乘车,而且每一批必须等所有人到齐了才能发车

public class Ride{
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("---------这波人都到了,乘车出发!---------");
            }
        });

        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,10,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 15; i++) {
            final int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("同学"+no+"正在前往指定地点");
                        Thread.sleep(new Random().nextInt(4000));
                        System.out.println("同学"+no+"到指定地点,正在等待其他人");
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            poolExecutor.execute(runnable);
        }
        poolExecutor.shutdown();
    }
}
同学3正在前往指定地点
同学1正在前往指定地点
同学5正在前往指定地点
同学4正在前往指定地点
同学2正在前往指定地点
同学5到指定地点,正在等待其他人
同学3到指定地点,正在等待其他人
同学1到指定地点,正在等待其他人
同学4到指定地点,正在等待其他人
同学2到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------
同学6正在前往指定地点
同学7正在前往指定地点
同学10正在前往指定地点
同学9正在前往指定地点
同学8正在前往指定地点
同学8到指定地点,正在等待其他人
同学10到指定地点,正在等待其他人
同学6到指定地点,正在等待其他人
同学9到指定地点,正在等待其他人
同学7到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------
同学11正在前往指定地点
同学12正在前往指定地点
同学14正在前往指定地点
同学13正在前往指定地点
同学15正在前往指定地点
同学11到指定地点,正在等待其他人
同学13到指定地点,正在等待其他人
同学14到指定地点,正在等待其他人
同学12到指定地点,正在等待其他人
同学15到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------

三、Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程,给各个线程发放“通行证”,以保证合理地使用公共资源

Semaphore的常见使用场景:
Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的数量,当连接到达了限制数量后,后面的线程只能排队等前面的线程释放数据库连接后才能获得数据库连接。

再比如交通公路上的红绿灯,绿灯亮起时只能让 100 辆车通过,红灯亮起不允许车辆通过。

再比如停车场的场景中,一个停车场有有限数量的车位,同时能够容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

Semaphore原理图:

Semaphore原理图


来解释一下 Semaphore ,Semaphore 有一个初始容量,这个初始容量就是 Semaphore 所能够允许的信号量。在调用 Semaphore 中的 acquire 方法后,Semaphore 的容量 -1,相对的在调用 release 方法后,Semaphore 的容量 + 1,在这个过程中,计数器一直在监控 Semaphore 数量的变化,等到流量超过 Semaphore 的容量后,多余的流量就会放入等待队列中进行排队等待。等到 Semaphore 的容量允许后,方可重新进入。

Semaphore 所控制的流量其实就是一个个的线程,因为并发工具最主要的研究对象就是线程。

现在有这样一个场景,需要读取几万个文件的数据,并且将文件数据写入到数据库中,但是数据库的连接数只有10个,所以这时必须控制只有10个线程拿到数据库连接资源去操作数据库

public class SavaMassiveData{
    private static final int ThreadCount = 30;
    private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 12, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < ThreadCount; i++) {
            poolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("save data");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        poolExecutor.shutdown();
    }
}

其他 Semaphore 方法

  • drainPermits : 获取并退还所有立即可用的许可,其实相当于使用 CAS 方法把内存值置为 0
  • reducePermits:和 nonfairTryAcquireShared 方法类似,只不过nonfairTryAcquireShared 是使用 CAS 使内存值 + 1,而 reducePermits 是使内存值 - 1 。
  • isFair:对 Semaphore 许可的争夺是采用公平还是非公平的方式,对应到内部的实现就是 FairSync 和NonfairSync
  • hasQueuedThreads:当前是否有线程由于要获取 Semaphore 许可而进入阻塞。
  • getQueuedThreads:返回一个包含了等待获取许可的线程集合。
  • getQueueLength:获取正在排队而进入阻塞状态的线程个数
  • 写回答

1条回答 默认 最新

  • 程序猿_水木 2022-08-29 09:27
    关注

    谢谢分享。

    评论

报告相同问题?

问题事件

  • 提问应符合社区要求 6月12日
  • 创建了问题 10月19日

悬赏问题

  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料