Java并发编程--并发工具的使用和原理

时间:2023-11-21 10:34:53  热度:0°C

Condition

我们在使用synchronized的时候,经常会用到wait/notify来实现线程间的通信,在J/U/C中也提供了锁的实现机制,那在J/U/C中是否也提供了类似的线程通信的工具呢?Condition就是J/U/C提供的一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件时,才会被唤醒。

Condition的基本使用我们先写两个class,ConditionDemoWait 和ConditionDemoNotify/

public class ConditionDemoWait implements Runnable { private Lock lock/ private Condition condition/ public ConditionDemoWait(Lock lock/ Condition condition) { this/lock = lock/ this/condition = condition/ } @Override public void run() { System/out/println( begin ConditionDemoWait )/ try { lock/lock()/ condition/await()/ System/out/println( end ConditionDemoWait )/ } catch (Exception e) { e/printStackTrace()/ } finally { lock/unlock()/ } }}
public class ConditionDemoNotify implements Runnable { private Lock lock/ private Condition condition/ public ConditionDemoNotify(Lock lock/ Condition condition) { this/lock = lock/ this/condition = condition/ } @Override public void run() { System/out/println( begin ConditionDemoNotify )/ try { lock/lock()/ condition/signal()/ System/out/println( end ConditionDemoNotify )/ } catch (Exception e) { e/printStackTrace()/ } finally { lock/unlock()/ } }}

测试代码:

public class ConditionDemo { public static void main(String[] args) throws InterruptedException { Lock lock = new ReentrantLock()/ Condition condition = lock/newCondition()/ new Thread(new ConditionDemoWait(lock/ condition))/start()/ TimeUnit/SECONDS/sleep(2)/ new Thread(new ConditionDemoNotify(lock/ condition))/start()/ }}

运行结果:

通过上面这个例子简单实现了wait和notify的功能,当调用await方法后,当前线程会释放锁并等待,而其他线程调用了condition的signal或者signalAll方法通知被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前释放的锁后继续执行,最后释放锁。所以Condition中两个最重要的方法:await和signal/signalAll:await:把当前线程阻塞挂起signal/signalAll:唤醒阻塞的线程

Condition源码分析调用Condition方法的时候需要获得锁,所以意味着会存在一个AQS同步队列,上面的例子中,如果两个线程同时运行的话,AQS队列可能是下面这种情况:

这个时候,如果Thread A调用了condition/await()方法,它做了什么事情呢?

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变成等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

public final void await() throws InterruptedException { if (Thread/interrupted()) // 表示await允许被中断 throw new InterruptedException()/ Node node = addConditionWaiter()/ // 创建一个新节点,节点状态为Condition int savedState = fullyRelease(node)/ // 释放当前的锁,得到锁的状态 int interruptMode = 0/ while (!isOnSyncQueue(node)) { LockSupport/park(this)/ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break/ } if (acquireQueued(node/ savedState) &/&/ interruptMode != THROW_IE) interruptMode = REINTERRUPT/ if (node/nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters()/ if (interruptMode != 0) reportInterruptAfterWait(interruptMode)/}

addConditionWaiter方法这个方法的主要作用是把当前线程封装成一个Node,添加到等待队列中去;这里的等待队列不再是双向链表,而是单项链表

private Node addConditionWaiter() { Node t = lastWaiter/ // If lastWaiter is cancelled/ clean out/ if (t != null &/&/ t/waitStatus != Node/CONDITION) { unlinkCancelledWaiters()/ t = lastWaiter/ } Node node = new Node(Thread/currentThread()/ Node/CONDITION)/ if (t == null) firstWaiter = node/ else t/nextWaiter = node/ lastWaiter = node/ return node/}

执行完addConditionWaiter方法后,就会产生下面这样一个等待队列:

接下来会执行fullyRelease方法,为什么叫fullyRelease呢?就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

final int fullyRelease(Node node) { boolean failed = true/ try { int savedState = getState()/ // 获得重入锁的次数 if (release(savedState)) { // 释放锁并唤醒下一个同步队列中的线程 failed = false/ return savedState/ } else { throw new IllegalMonitorStateException()/ } } finally { if (failed) node/waitStatus = Node/CANCELLED/ }}

此时,同步队列会触发锁的竞争,Thread B获得锁。

接下来会通过isOnSyncQueue(node)方法判断Thread A是否在同步队列中,返回false表示不在;返回true表示在同步队列中。如果不在AQS同步队列中,说明当前节点没有被唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其它线程调用signal唤醒;如果在AQS同步队列中,意味着它需要去竞争同步锁去获得程序的执行权限。

为什么要做这个判断呢?因为在Condition等待队列中的节点会重新加入到AQS同步队列中去竞争锁,也就是当调用signal的时候,会把当前在等待队列中的节点从Condition等待队列转移到同步队列。基于上面例子现在的逻辑结构,如何判断Thread A这个节点是否存在于AQS队列中呢?

  • 如果Thread A的waitStatus是CONDITION,说明它一定在Condition队列,不在AQS队列中;因为AQS队列中节点的状态一定不能是CONDITION

  • 如果node/prev为空,说明不存在于AQS队列中,因为prev=null在AQS队列中只有一种可能,就是它是head节点,但是head节点意味着它是获得锁的节点

  • 如果node/next不等于空,说明一定在AQS队列中,因为只有AQS队列才会存在prev和next的关系

  • findNodeFromTail表示从tail节点往前扫描AQS队列,一旦发现AQS中的节点和当前节点相等,说明节点一定存在于AQS队列中

    final boolean isOnSyncQueue(Node node) { if (node/waitStatus == Node/CONDITION || node/prev == null) return false/ if (node/next != null) // If has successor/ it must be on queue return true/ return findNodeFromTail(node)/}private boolean findNodeFromTail(Node node) { Node t = tail/ for (//) { if (t == node) return true/ if (t == null) return false/ t = t/prev/ }}

    因为Thread A不在AQS队列中,所以isOnSyncQueue()方法返回false,接下来会执行LockSupport/park(this)将Thread A挂起并释放锁。

    Condition/signal当Thread A释放锁之后,Thread B会获得锁;在上面的例子中,thread B会执行condition/signal()方法去唤醒在等待队列中的节点:

    public final void signal() { if (!isHeldExclusively()) // 判断当前线程是否获得了锁,如果没有则抛exception throw new IllegalMonitorStateException()/ Node first = firstWaiter/ // 拿到Condition队列中的第一个节点 if (first != null) doSignal(first)/}protected final boolean isHeldExclusively() { // While we must in general read state before owner/ // we don t need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread/currentThread()/}private void doSignal(Node first) { do { // 从Condition队列中删除第一个节点 if ( (firstWaiter = first/nextWaiter) == null) lastWaiter = null/ first/nextWaiter = null/ } while (!transferForSignal(first) &/&/ (first = firstWaiter) != null)/}final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node/ Node/CONDITION/ 0)) // 通过cas更新节点状态为0, 如果更新失败,只有一种可能就是节点被Cancel了 return false/ Node p = enq(node)/ // 调用enq方法,把当前节点加入到AQS同步队列,并返回当前节点的上一个节点;在本例子中也就是原tail节点 int ws = p/waitStatus/ if (ws >/ 0 || !compareAndSetWaitStatus(p/ ws/ Node/SIGNAL)) // 如果上一个节点的状态被取消了,或者尝试设置上一个节点的状态为SIGNAL失败,则唤醒节点上的线程 LockSupport/unpark(node/thread)/ return true/}

    由上面代码可以看出,当Thread B调用了signal方法的时候,会获取等待队列中的第一个节点,并且调用doSignal方法;在doSignal方法中会把第一个节点从等待队列中删除,并通过transferForSignal方法将该节点转移到AQS同步队列中。

    执行完doSignal方法后,会把Condition队列中的节点转移到AQS队列中去,逻辑结构图如下;这个时候会判断Thread A的prev节点也就是head节点的waitStatus,如果大于0或者设置SIGNAL失败,表示节点被设置成了CANCELLED状态,这时候需要唤醒Thread A,否则就基于AQS队列的机制唤醒,也就是等到Thread B释放锁之后来唤醒Thread A/

    前面分析await方法的时候,thread A会在调用LockSupport/park()方法的时候被阻塞,而通过signal方法被唤醒之后会继续回到上次执行的逻辑中执行checkInterruptWhileWaiting(node)方法去判断Thread A在Condition队列被阻塞的过程中,是否被其它线程触发过中断请求:

    public final void await() throws InterruptedException { if (Thread/interrupted()) throw new InterruptedException()/ Node node = addConditionWaiter()/ int savedState = fullyRelease(node)/ int interruptMode = 0/ while (!isOnSyncQueue(node)) { LockSupport/park(this)/ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break/ } if (acquireQueued(node/ savedState) &/&/ interruptMode != THROW_IE) interruptMode = REINTERRUPT/ if (node/nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters()/ if (interruptMode != 0) reportInterruptAfterWait(interruptMode)/}private int checkInterruptWhileWaiting(Node node) { return Thread/interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE / REINTERRUPT) / 0/}final boolean transferAfterCancelledWait(Node node) { // 使用CAS修改节点状态,如果还能修改成功,则说明线程被中断时,signal方法还没有被调用 // 注意:线程被唤醒,并不一定是在Java层面执行了LockSupport/unpark方法,也可能是调用了线程的interrupt()方法,这个方***更新中断标识,并唤醒处于阻塞状态的线程 if (compareAndSetWaitStatus(node/ Node/CONDITION/ 0)) { enq(node)/ // 如果CAS成功,则调用enq方法将当前node加入到AQS队列 return true/ } // 如果CAS失败,则判断当前节点是否已经在AQS队列中,如果不在则让给其它线程执行;当Node被触发了signal时,node就会被加入到AQS队列中 while (!isOnSyncQueue(node)) Thread/yield()/ return false/}

    如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续处理是应该抛出InterruptedException还是重新中断。

    这里需要注意的是如果第一次CAS失败了,则不能判断当前线程是先进行了中断还是先进行了Signal方法的调用;可能是先执行了signal然后中断;也可能是先中断然后执行signal;这时需要做的就是等待当前线程的node被添加到AQS队列后,也就是enq方法返回后,返回false告诉checkInterruptWhileWaiting方法返回REINTERRUPT,后续进行重新中断。

    简单来说,这个方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为true表示中断在signal调用之前,signal还未执行,那么这个时候会根据await的语义,在await时遇到中断需要抛出interruptedException,返回true就是告诉checkInterruptWhileWaiting方法返回THROW_IE。如果返回false,表示signal方法已经执行过了,只需要重新响应中断即可。

    接下来会调用acquireQueued方法,这个方法是让当前被唤醒的节点Thread A去抢占同步锁,并要恢复到原本的重入次数状态。调用完这个方法后,将AQS队列中的head节点的status设置成SIGNAL,AQS队列的状态如下:

    Condition总结线程awaitThread先通过lock/lock方法获得锁后,调用condition/await()方法进入等待队列,而另一个线程signalThread通过lock/lock方法获得锁后调用了condition/signal方法,使得线程awaitThread能够有机会移入到同步队列,当其他线程释放lock后使得线程awaitThread能够有机会获得锁,从而使得awaitThread能够从await方法中退出并执行后续操作;如果awaitThread获取锁失败,则会直接进入到同步队列。

    ***

    J/U/C中提供了几个比较常用的并发工具类,比如CountDownLatch、Semaphore、CyclicBarrier。接下来我们会了解一下这些常用的API

    CountDownLatchCountDownLatch是一个同步工具类,它允许一个或者多个线程一直等待,直到其它线程的操作执行完毕再执行。从名字可以解读到CountDown是倒数的意思,类似我们倒计时的概念。CountDownLatch提供了两个方法:一个是countDown,一个是await;CountDownLatch初始化的时候需要传入一个整数,在这个整数倒数到0之前,调用了await方法的线程都必须要等待,然后通过countDown方法来倒数。

    public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(3)/ new Thread(() ->/ { System/out/println(Thread/currentThread()/getName() + - 执行中 )/ countDownLatch/countDown()/ System/out/println(Thread/currentThread()/getName() + - 执行完毕 )/ }/ t1 )/start()/ new Thread(() ->/ { System/out/println(Thread/currentThread()/getName() + - 执行中 )/ countDownLatch/countDown()/ System/out/println(Thread/currentThread()/getName() + - 执行完毕 )/ }/ t2 )/start()/ new Thread(() ->/ { System/out/println(Thread/currentThread()/getName() + - 执行中 )/ countDownLatch/countDown()/ System/out/println(Thread/currentThread()/getName() + - 执行完毕 )/ }/ t3 )/start()/ countDownLatch/await()/ System/out/println( 所有线程执行完毕 )/ }}

    运行结果:

    上面的例子从代码上看,有点类似join的功能,但是比join更加灵活。CountDownLatch构造函数会接收一个int类型的参数作为计数器的初始值,当调用countDown方法的时候,这个计数器就会减一。使用CountDownLatch可以模拟高并发场景:

    public class CountDownLatchDemo extends Thread { static CountDownLatch countDownLatch = new CountDownLatch(1)/ @Override public void run() { try { countDownLatch/await()/ } catch (Exception e) { e/printStackTrace()/ } System/out/println( Thread/ + Thread/currentThread()/getName())/ } public static void main(String[] args) throws InterruptedException { for (int i = 0/ i </ 1000/ i++) { new CountDownLatchDemo()/start()/ } System/out/println( 所有线程阻塞中//////// )/ TimeUnit/SECONDS/sleep(5)/ System/out/println( 线程阻塞开关打开 )/ countDownLatch/countDown()/ }}

    运行结果:

    总的来说,凡是涉及到需要指定某个任务在执行之前,要等到某个前置任务执行完毕后才执行的场景,都可以使用到CountDownLatch。

    CountDownLatch源码分析对于CountDownLatch我们只需要关注两个方法,一个是countDown()方法,另一个是await()方法。countDown()方法每次调用都会将state的值减一,直到state的值为0;而await方法是一个阻塞方法,当state=0的时候await方法才会返回。await方法可以被多个线程调用,所有调用了await方法的线程都阻塞在AQS的阻塞队列中,等待条件满足时,将线程一个一个从队列中唤醒。

    CountDownLatch也用到了AQS,在CountDownLatch内部写了一个Sync并且集成了AQS这个抽象类,重写了AQS中的共享锁方法。当调用了CountDownLatch/await方法的时候,代码如下:

    public void await() throws InterruptedException { sync/acquireSharedInterruptibly(1)/}public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread/interrupted()) throw new InterruptedException()/ if (tryAcquireShared(arg) </ 0) doAcquireSharedInterruptibly(arg)/}protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 / -1/}

    通过上面的代码可以看到,首先要判断当前线程是否获得了共享锁,如果state不等于0, 说明当前线程要加入到共享锁队列中。doAcquireSharedInterruptibly方法的代码如下:

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node/SHARED)/ boolean failed = true/ try { for (//) { final Node p = node/predecessor()/ if (p == head) { int r = tryAcquireShared(arg)/ // 由于state != 0,所以返回值是-1 if (r >/= 0) { setHeadAndPropagate(node/ r)/ p/next = null/ // help GC failed = false/ return/ } } if (shouldParkAfterFailedAcquire(p/ node) &/&/ parkAndCheckInterrupt()) throw new InterruptedException()/ } } finally { if (failed) cancelAcquire(node)/ }}
  • addWaiter方法创建了一个SHARED模式的节点并且加入到AQS的队列中

  • 由于这个时候state不为0,所以肯定会去执行if (shouldParkAfterFailedAcquire(p/ node) &/&/ parkAndCheckInterrupt())这个判断,在partAndCheckInterrput方法中挂起线程

  • 这个时候所有线程都调用了await方法,由于state的值现在还不为0,所以这些线程都会加入到AQS队列中,并且都处于阻塞状态:

    当其他线程调用CountDownLatch/countDown方法的时候,我们看看它做了什么事情:

    public void countDown() { sync/releaseShared(1)/}public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared()/ return true/ } return false/}protected boolean tryReleaseShared(int releases) { // Decrement count/ signal when transition to zero for (//) { int c = getState()/ if (c == 0) return false/ int nextc = c-1/ if (compareAndSetState(c/ nextc)) return nextc == 0/ }}private void doReleaseShared() { for (//) { Node h = head/ if (h != null &/&/ h != tail) { int ws = h/waitStatus/ if (ws == Node/SIGNAL) { if (!compareAndSetWaitStatus(h/ Node/SIGNAL/ 0)) continue/ // loop to recheck cases unparkSuccessor(h)/ } // 这个CAS失败的场景是:执行到这里的时候,恰好有一个节点入队,入队会将这个ws设置为-1 else if (ws == 0 &/&/ !compareAndSetWaitStatus(h/ 0/ Node/PROPAGATE)) continue/ // loop on failed CAS } if (h == head) // loop if head changed break/ }}private void unparkSuccessor(Node node) { int ws = node/waitStatus/ if (ws </ 0) compareAndSetWaitStatus(node/ ws/ 0)/ Node s = node/next/ if (s == null || s/waitStatus >/ 0) { s = null/ for (Node t = tail/ t != null &/&/ t != node/ t = t/prev) if (t/waitStatus </= 0) s = t/ } if (s != null) LockSupport/unpark(s/thread)/}

    由上面代码可以看出,tryReleaseShared方法使用自旋的方式去实现state减一,当state为0 的时候,调用doReleaseShared方法唤醒处于await状态下的线程。

    共享锁的释放和独占锁的释放有一定的差别,在doReleaseShared方法中,先判断头结点的状态是否为SIGNAL,如果是,将状态改成0;修改成功之后调用unparkSuccessor方法唤醒头结点的下一个节点(在我们的例子中是 thread 1)。


    一旦Thread 1被唤醒,代码又会回到doAcquireSharedInterruptibly方法中来执行,如果满足state=0,会执行setHeadAndPropagate方法;

    private void setHeadAndPropagate(Node node/ int propagate) { Node h = head/ // Record old head for check below setHead(node)/ if (propagate >/ 0 || h == null || h/waitStatus </ 0 || (h = head) == null || h/waitStatus </ 0) { Node s = node/next/ if (s == null || s/isShared()) doReleaseShared()/ }}

    这个方法的主要作用是把被唤醒的节点设置成head节点,然后继续唤醒该节点的下一个节点 thread 2。一次循环,thread 2会唤醒thread 3…

    SemaphoreSemaphore也就是我们常说的信号灯,semaphore可以控制同时访问的线程个数,通过acquire获取一个许可,如果没有就等待,通过release释放一个许可,有点类似限流的作用。

    叫做信号灯的原因也和它的用处相关,比如m某商场就有5个停车位,如果这个时候来了10辆车,必须要等前面有空的车位才能进入。

    public class SemaphoreDemoTest { public static void main(String[] args) { Semaphore semaphore = new Semaphore(5)/ for (int i = 0/ i </ 10/ i++) { new Car(i/ semaphore)/start()/ } } static class Car extends Thread { private int num/ private Semaphore semaphore/ public Car(int num/ Semaphore semaphore) { this/num = num/ this/semaphore = semaphore/ } public void run() { try { semaphore/acquire()/ System/out/println( 第 + num + 占用一个车位 )/ TimeUnit/SECONDS/sleep(2)/ System/out/println( 第 + num + 辆车开走了 )/ semaphore/release()/ } catch (Exception e) { e/printStackTrace()/ } } }}

    运行结果:

    第0占用一个车位第2占用一个车位第3占用一个车位第4占用一个车位第1占用一个车位第1辆车开走了第0辆车开走了第4辆车开走了第3辆车开走了第2辆车开走了第8占用一个车位第6占用一个车位第5占用一个车位第7占用一个车位第9占用一个车位第8辆车开走了第7辆车开走了第6辆车开走了第5辆车开走了第9辆车开走了

    Semaphore源码分析从Semaphore的功能来看,我们可以猜测到它的底层实现y一定是基于AQS的共享锁,因为需要实现多个线程共享一个令牌池。创建Semaphore实例的时候,需要一个参数permits,这个基本上可以确定是设置给AQS的stated的,然后每个线程调用acquire的时候,执行state=state-1,release的时候执行state=state+1;当然acquire的时候,如果state=0,说明没有资源了需要等待其他线程release。

    Semaphore分公平策略和非公平策略,区别就在于是不是会先判断是否有线程再排队,然后才进行CAS减操作

    static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L/ NonfairSync(int permits) { super(permits)/ } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires)/ }}static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L/ FairSync(int permits) { super(permits)/ } protected int tryAcquireShared(int acquires) { for (//) { if (hasQueuedPredecessors()) return -1/ int available = getState()/ int remaining = available - acquires/ if (remaining </ 0 || compareAndSetState(available/ remaining)) return remaining/ } }}

    CyclicBarrierCyclicBarrier的字面意思是可循环使用的屏障,它要做的事情是让一组线程达到一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障***的线程才会继续工作。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障***的线程数,每个线程调用await方法告诉CyclicBarrier当前线程已经到达了屏障,然后当前线程被阻塞。

    使用线程当存在需要所有的子任务都完成时,才执行主任务,这个时候j就可以选择使用CyclicBarrier

    使用案例:

    public class DataImportThread extends Thread { private CyclicBarrier cyclicBarrier/ private String path/ public DataImportThread(CyclicBarrier cyclicBarrier/ String path) { this/cyclicBarrier = cyclicBarrier/ this/path = path/ } public void run() { System/out/println( 开始导入 + path + 位置的数据 )/ try { cyclicBarrier/await()/ } catch (Exception e) { e/printStackTrace()/ } }}public class CyclicBarrierDemo extends Thread { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3/ new CyclicBarrierDemo())/ new Thread(new DataImportThread(cyclicBarrier/ file1 ))/start()/ new Thread(new DataImportThread(cyclicBarrier/ file2 ))/start()/ new Thread(new DataImportThread(cyclicBarrier/ file3 ))/start()/ } public void run() { System/out/println( 开始分析数据 )/ }}

    运行结果:

    注意:

  • 对于指定计数值parties,若由于某种原因,没有足够的线程调用CyclicBarrier的await方法,则所有调用await的线程都会被阻塞

  • CyclicBarrier也可以调用await(timeout/ unit),设置超时时间,在设定时间内,如果没有足够的线程,则接触阻塞状态继续工作

  • 通过reset重置计数,会使得进入awaitd的线程c出现BrokenBarrierException

  • 如果采用的是CyclicBarrier(int parties/ Runnable barrierAction)构造方法,执行barrierAction操作的是最后一个到达的线程。

  • 免责声明:
    1. 《Java并发编程--并发工具的使用和原理》内容来源于互联网,版权归原著者或相关公司所有。
    2. 若《65667893文库网》收录的文本内容侵犯了您的权益或隐私,请立即通知我们删除。