博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
14、详解java同步工具类CountDownLatch
阅读量:4035 次
发布时间:2019-05-24

本文共 8034 字,大约阅读时间需要 26 分钟。

这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见。我们将通过案例来进行讲解分析。

一、定义

CountDownLatch的作用很简单,就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执行完才可以。我们举一个例子来说明,在考试的时候,老师必须要等到所有人交了试卷才可以走。此时老师就相当于等待线程,而学生就好比是执行的线程。

注意:java中还有一个同步工具类叫做CyclicBarrier,他的作用和CountDownLatch类似。同样是等待其他线程都完成了,才可以进行下一步操作,我们再举一个例子,在打王者的时候,在开局前所有人都必须要加载到100%才可以进入。否则所有玩家都相互等待。

我们看一下区别:

**CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。 **

CyclicBarrier : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。

关键点其实就在于那N个线程

(1)CountDownLatch里面N个线程就是学生,学生做完了试卷就可以走了,不用等待其他的学生是否完成

(2)CyclicBarrier 里面N个线程就是所有的游戏玩家,一个游戏玩家加载到100%还不可以,必须要等到其他的游戏玩家都加载到100%才可以开局

现在应该理解CountDownLatch的含义了吧,下面我们使用一个代码案例来解释。

二、使用

我们使用学生考试的案例来进行演示:

public class CountDownLatchTest {
static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) {
System.out.println("全班同学开始考试:一共两个学生"); new Thread(() -> {
System.out.println("第一个学生交卷,countDownLatch减1"); countDownLatch.countDown(); }).start(); new Thread(() -> {
System.out.println("第二个学生交卷,countDownLatch减1"); countDownLatch.countDown(); }).start(); try {
countDownLatch.await(); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("老师清点试卷,在此之前,只要一个学生没交," + "countDownLatch不为0,不能离开考场"); }}

在上面,我们定义了一个CountDownLatch,并设置其值为2。有两个学生使用两个线程来表示,然后依次执行。最后老师线程(main线程)在学生线程都执行完了才可以执行。我们来运行一边看看结果。

在这里插入图片描述

现在我们应该能体会到其用法了吧。在上面我们的等待线程时老师(main线程)。

下面我们对这个countDownLatch分析一下。为什么具有上面的特点。

三、原理

在上面我们看到,CountDownLatch主要使用countDown方法进行减1操作,使用await方法进行等到操作。我们进入到源码中看看。本源码基于jdk1.8。特在此说明。

1、countDown原理

/**     * Decrements the count of the latch, releasing all waiting threads if     * the count reaches zero.     *     * 

If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * *

If the current count equals zero then nothing happens. */ public void countDown() {

sync.releaseShared(1); }

英语不好的人看起来真的是一脸懵逼,不过信号上面的英语还都是简单的英语,大致意思是这样的:CountDownLatch里面保存了一个count值,通过减1操作,直到为0时候,等待线程才可以执行。而且通过源码也可以看到这个countDown方法其实是通过sync调用releaseShared(1)来完成的。

OK。到了这一步我们可能会纳闷,sync是个什么鬼,releaseShared方法又是如何实现的。我们不妨接着看源码,在CountDownLatch的开头我们找到了答案,原来这个sync在这里定义了。

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {
setState(count); } int getCount() {
return getState(); } protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero for (;;) {
int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }

在这里我们发现继承了AbstractQueuedSynchronizer(AQS)。AQS的其中一个作用就是维护线程状态和获取释放锁。在这里也就是说CountDownLatch使用AQS机制维护锁状态。而releaseShared(1)方法就是释放了一个共享锁。

现在理解了吧,底层使用AQS机制调用releaseShared方法释放一个锁资源。那么等待的方法是如何实现的呢?

2、await原理

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

这俩方法都是让线程等待,第一个没有实现限制,第二个有时间限制,我们一个一个来看。

(1)await()

await()底层主要是acquireSharedInterruptibly方法实现的,继续跟进去看看。

public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

这里面有两个if语句,首先第一个判断是否被中断,如果被中断了,那就抛出中断异常。然后判断当前是否还有线程未执行,如果有那就,那就执行doAcquireSharedInterruptibly方法继续等待。

//这是AQS里面的方法//arg在这里调用的是1,表示countDown是否减少到了0//如果到0了,那说明满足了要求,返回1,不再等待//如果没有达到0,说明还有线程未执行,必须要等到所有的线程//执行结束才可以,返回-1,此时小于0,执行doAcquireSharedInterruptiblyprotected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();}

上面函数的意思已经在注释里面了,下面我们就来看看这个doAcquireSharedInterruptibly是如何实现的。

private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {
final Node node = addWaiter(Node.SHARED); boolean failed = true; try {
for (;;) {
final Node p = node.predecessor(); if (p == head) {
int r = tryAcquireShared(arg); if (r >= 0) {
setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally {
if (failed) cancelAcquire(node); } }

这块的代码比较长,不过大致意思我可以描述一下,他会用一个一个的节点将线程串起来 等达到条件后再一个一个的唤醒。核心就是第三行的addWaiter函数。我们可以再跟进去看看吧。

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {
node.prev = pred; if (compareAndSetTail(pred, node)) {
pred.next = node; return node; } } enq(node); return node; }

你会发现这里面也使用了CAS机制。而且就是使用链表穿起来的。

(2) await(long timeout, TimeUnit unit)

这个方法的意思是等待指定的时间,如果还有线程没执行完,那就接着执行。就好比考完试了,还有同学没交试卷,此时因为到时间了。不管三七二十一也不管剩下的同学是否提交,直接就走了。其底层是通过Sync的tryAcquireSharedNanos方法实现的,我们接着进入到源码中看看。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)            throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }

在这里皮球又一次被踢走了,真正实现的其实就是doAcquireSharedNanos方法,tryAcquireShared方法主要是判断是否当前满足wait的条件。我们接着看。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)            throws InterruptedException {
if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try {
for (;;) {
final Node p = node.predecessor(); if (p == head) {
int r = tryAcquireShared(arg); if (r >= 0) {
setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally {
if (failed) cancelAcquire(node); } }

上面的代码看似长,最核心的就是for循环里面的,最主要的意思就是如果当前还有线程未执行而且过了超时时间,那就直接执行等待线程就好了,不再等了。也就是我在指定的时间内你没执行完我等着你,要是超了这个时间点我就不管了。

对于CountDownLatch来说原理主要还是通过源码来认识。不过CountDownLatch看起来虽然很好用,也有很多不足之处,比如说CountDownLatch是一次性的 , 计数器的值只能在构造方法中初始化一次 , 之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后 , 它不能再次被使用。

心的就是for循环里面的,最主要的意思就是如果当前还有线程未执行而且过了超时时间,那就直接执行等待线程就好了,不再等了。也就是我在指定的时间内你没执行完我等着你,要是超了这个时间点我就不管了。

对于CountDownLatch来说原理主要还是通过源码来认识。不过CountDownLatch看起来虽然很好用,也有很多不足之处,比如说CountDownLatch是一次性的 , 计数器的值只能在构造方法中初始化一次 , 之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后 , 它不能再次被使用。

OK。对其介绍就先到这里吧。

在这里插入图片描述

转载地址:http://cibdi.baihongyu.com/

你可能感兴趣的文章
YUV420只绘制Y通道
查看>>
yuv420 还原为RGB图像
查看>>
LED恒流驱动芯片
查看>>
驱动TFT要SDRAM做为显示缓存
查看>>
使用file查看可执行文件的平台性,x86 or arm ?
查看>>
qt5 everywhere 编译summary
查看>>
qt5 everywhere编译完成后,找不到qmake
查看>>
arm-linux开机读取硬件时钟,设置系统时钟。
查看>>
交叉编译在x86上调试好的qt程序
查看>>
qt 创建异形窗体
查看>>
可重入函数与不可重入函数
查看>>
简单Linux C线程池
查看>>
内存池
查看>>
输入设备节点自动生成
查看>>
opencv test code-1
查看>>
eclipse 导入先前存在的项目
查看>>
GNU hello代码分析
查看>>
Qt继电器控制板代码
查看>>
wpa_supplicant控制脚本
查看>>
rfkill: WLAN hard blocked
查看>>