浅析Java中并发工具类的使用

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

浅析Java中并发工具类的使用

初念初恋   2022-12-07 我要评论

在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种方法。

它们都在java.util.concurrent包下。先总体概括一下都有哪些工具类,它们有什么作用,然后再分别介绍它们的主要使用方法和原理。

作用
CountDownLatch线程等待直到计数器减为0时开始工作
CyclicBarrier作用跟CountDownLatch类似,但是可以重复使用
Semaphore限制线程的数量
Exchanger两个线程交换数据

下面分别介绍这几个类。

CountDownLatch

概述

CountDownLatch可以使一个或多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器

案例

玩吃鸡游戏的时候,正式开始游戏之前,肯定会加载一些前置场景,例如:“加载地图”、“加载人物模型”、“加载背景音乐”等。

public class CountDownLatchDemo {
    // 定义前置任务线程
    static class PreTaskThread implements Runnable {

        private String task;
        private CountDownLatch countDownLatch;

        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 任务完成");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 假设有三个模块需要加载
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 主任务
        new Thread(() -> {
            try {
                System.out.println("等待数据加载...");
                System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("数据加载完成,正式开始游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 前置任务
        new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
        new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
    }
}

输出:

等待数据加载...
还有3个前置任务
加载地图数据 - 任务完成
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
数据加载完成,正式开始游戏!

原理

CountDownLatch的方法很简单,如下:

// 构造方法:
public CountDownLatch(int count)

public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

CountDownLatch构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

源码分析

CountDownLatch有一个内部类叫做Sync,它继承了AbstractQueuedSynchronizer类,其中维护了一个整数state,并且保证了修改state的可见性和原子性,源码如下:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

创建CountDownLatch实例时,也会创建一个Sync的实例,同时把计数器的值传给Sync实例,源码如下:

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}

countDown方法中,只调用了Sync实例的releaseShared方法,源码如下:

public void countDown() {
    sync.releaseShared(1);
}

其中的releaseShared方法,先对计数器进行减1操作,如果减1后的计数器为0,唤醒被await方法阻塞的所有线程,源码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //对计数器进行减一操作
        doReleaseShared();//如果计数器为0,唤醒被await方法阻塞的所有线程
        return true;
    }
    return false;
}

其中的tryReleaseShared方法,先获取当前计数器的值,如果计数器为0时,就直接返回;如果不为0时,使用CAS方法对计数器进行减1操作,源码如下:

protected boolean tryReleaseShared(int releases) {
    for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
        int c = getState();//获取当前计数器的值。
        if (c == 0)// 计数器为0时,就直接返回。
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
            return nextc == 0;//如果操作成功,返回计数器是否为0
    }
}

await方法中,只调用了Sync实例的acquireSharedInterruptibly方法,源码如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

其中acquireSharedInterruptibly方法,判断计数器是否为0,如果不为0则阻塞当前线程,源码如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//判断计数器是否为0
        doAcquireSharedInterruptibly(arg);//如果不为0则阻塞当前线程
}

其中tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1,源码如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

概述

CyclicBarrier 翻译为中文是循环(Cyclic)栅栏(Barrier)的意思,它的大概含义是实现一个可循环利用的屏障。

CyclicBarrier 作用是让一组线程相互等待,当达到一个共同点时,所有之前等待的线程再继续执行,且 CyclicBarrier 功能可重复使用,使用reset()方法重置屏障。

案例

同样用玩游戏的例子。如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {

        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            // 假设总共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的任务%s完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("本关卡所有前置任务完成,开始游戏...");
        });

        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}

输出:

关卡1的任务加载背景音乐完成
关卡1的任务加载地图数据完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...
关卡2的任务加载人物模型完成
关卡2的任务加载背景音乐完成
关卡2的任务加载地图数据完成
本关卡所有前置任务完成,开始游戏...
关卡3的任务加载背景音乐完成
关卡3的任务加载地图数据完成
关卡3的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...

与CountDownLatch有一些不同。CyclicBarrier没有分为await()countDown(),而是只有单独的一个await()方法。

一旦调用await()方法的线程数量等于构造方法中传入的任务总量,就代表达到屏障了。CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。

源码分析

构造函数:

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
  this(parties, null);
}

默认barrierAction是null,这个参数是Runnable参数,当最后线程达到的时候执行的任务,上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏...”。parties 是参与的线程数。

接着看下await方法,有两个重载,区别是是否有等待超时,源码如下:

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

重点看下dowait(),核心逻辑就是这个方法,源码如下:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {       
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 每次使用屏障都会生成一个实例
            final Generation g = generation;

            // 如果被破坏了就抛异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 线程中断检测
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 剩余的等待线程数
            int index = --count;
            // 最后线程到达时 
            if (index == 0) {  // tripped
                // 标记任务是否被执行(就是传进入的runable参数)
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 完成后 进行下一组 初始化 generation 初始化 count 并唤醒所有等待的线程 
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // index 不为0时 进入自旋
            for (;;) {
                try {
                    // 先判断超时 没超时就继续等着
                    if (!timed)
                        trip.await();
                        // 如果超出指定时间 调用 awaitNanos 超时了释放锁
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                        // 中断异常捕获
                } catch (InterruptedException ie) {
                    // 判断是否被破坏
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 否则的话中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }

                // 被破坏抛异常
                if (g.broken)
                    throw new BrokenBarrierException();

                // 正常调用 就返回 
                if (g != generation)
                    return index;

                // 超时了而被唤醒的情况 调用 breakBarrier()
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

总结下dowait()方法的逻辑:

  • 线程调用后,会检查barrier的状态、线程状态,异常状态会中断。
  • 在初始化CyclicBarrier时,设置的资源值count,会进行--count
  • 当10个线程中前9个线程,执行dowait()后,由于count!=0,因此会进行for(;;),在内部会执行Condition的trip.await()方法,进行阻塞。
  • 阻塞结束的条件有:超时、被唤醒、线程中断。
  • 当第10个线程执行dowait()后,由于count==0,会先检查并执行command的内容。
  • 最后执行nextGeneration(),在内部调用trip.signalAll()唤醒所有trip.await()的线程。

如果被破坏了怎么恢复呢?来看下reset()方法,源码如下:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

源码很简单,break之后重新生成新的实例,对应的会重新初始化count,在dowaitindex==0也调用了nextGeneration,所以说它是可以循环利用的。

与CountDonwLatch的区别

CountDownLatch减计数,CyclicBarrier加计数。

CountDownLatch是一次性的,CyclicBarrier可以重用。

CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。

Semaphore

概述

Semaphore 一般译作 信号量,它也是一种线程同步工具,主要用于多个线程对共享资源进行并行操作的一种工具类。它代表了一种许可的概念,是否允许多线程对同一资源进行操作的许可,使用 Semaphore 可以控制并发访问资源的线程个数。

使用场景

Semaphore 的使用场景主要用于流量控制

比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的数量,当连接到达了限制数量后,后面的线程只能排队等前面的线程释放数据库连接后才能获得数据库连接。

比如停车场的场景中,一个停车场有有限数量的车位,同时能够容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

案例

模拟一下停车场的业务场景:

在进入停车场之前会有一个提示牌,上面显示着停车位还有多少,当车位为 0 时,不能进入停车场,当车位不为 0 时,才会允许车辆进入停车场。所以停车场有几个关键因素:停车场车位的总容量,当一辆车进入时,停车场车位的总容量 - 1,当一辆车离开时,总容量 + 1,停车场车位不足时,车辆只能在停车场外等待。

public class SemaphoreDemo {

    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("欢迎 " + Thread.currentThread().getName() + " 来到停车场");
                    // 判断是否允许停车
                    if (semaphore.availablePermits() == 0) {
                        System.out.println("车位不足,请耐心等待");
                    }
                    try {
                        // 尝试获取
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + " 进入停车场");
                        Thread.sleep(new Random().nextInt(10000));// 模拟车辆在停车场停留的时间
                        System.out.println(Thread.currentThread().getName() + " 驶出停车场");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, i + "号车");
            thread.start();
        }
    }
}

Semaphore 的初始容量,也就是只有 10 个车位,我们用这 10 个车位来控制 100 辆车的流量,所以结果和我们预想的很相似,即大部分车都在等待状态。但是同时仍允许一些车驶入停车场,驶入停车场的车辆,就会 semaphore.acquire 占用一个车位,驶出停车场时,就会 semaphore.release 让出一个车位,让后面的车再次驶入。

原理

Semaphore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。

如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。

Exchanger

概述

Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。

案例

案例1:A同学和B同学交换各自收藏的大片。

public class ExchangerDemo {

    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> stringExchanger = new Exchanger<>();

        Thread studentA = new Thread(() -> {
            try {
                String dataA = "A同学收藏多年的大片";
                String dataB = stringExchanger.exchange(dataA);
                System.out.println("A同学得到了" + dataB);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("这个时候A同学是阻塞的,在等待B同学的大片");
        Thread.sleep(1000);

        Thread studentB = new Thread(() -> {
            try {
                String dataB = "B同学收藏多年的大片";
                String dataA = stringExchanger.exchange(dataB);
                System.out.println("B同学得到了" + dataA);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        studentA.start();
        studentB.start();
    }
}

输出:

这个时候A同学是阻塞的,在等待B同学的大片
A同学得到了B同学收藏多年的大片
B同学得到了A同学收藏多年的大片

可以看到,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。

Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。

public V exchange(V x, long timeout, TimeUnit unit)

案例2:A同学被放鸽子,交易失败。

public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> stringExchanger = new Exchanger<>();
        Thread studentA = new Thread(() -> {
            String dataB = null;
            try {
                String dataA = "A同学收藏多年的大片";
                dataB = stringExchanger.exchange(dataA,5, TimeUnit.SECONDS);
                System.out.println("A同学得到了" + dataB);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("等待超时-TimeoutException");
            }
            System.out.println("A同学得到了:"+dataB);
        });

        studentA.start();
    }
}

输出:

等待超时-TimeoutException
A同学得到了:null

原理

Exchanger类底层关键的技术有:

  • 使用CAS自旋指令完成数据交换;
  • 使用LockSupport的park方法使交换线程进入休眠等待,使用LockSupport的unpark方法唤醒等待线程。
  • 此外还声明了一个Node对象用于存储交换数据。

Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。根据JDK里面的注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于遗传算法、流水线设计等场景。

需要注意的是,exchange是可以重复使用的。也就是说,两个线程可以使用Exchanger在内存中不断地再交换数据。

小结

本文配合一些应用场景介绍了JDK中提供的几个并发工具类,简单分析了一下使用原理及业务场景,工作中,一旦有对应的业务场景,可以试试这些工具类。

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们