【深度长文】聊一聊 Java AbstractQueuedSynchronizer 以及在 ReentrantLock 中的应用

news/2024/5/19 2:08:01 标签: java, 开发语言, 后端, 流程图

文章目录

  • AQS 原理简述
    • 内部数据结构
    • 公平锁 vs. 非公平锁
      • ReentrantLock 非公平锁
      • ReentrantLock 公平锁
  • AQS 源码分析
    • 加锁过程
      • addWaiter
      • acquireQueued
      • shouldParkAfterFailedAcquire
      • cancelAcquire
    • 解锁过程
      • unparkSuccessor

AbstractQueuedSynchronizer (AQS) 是 Java 并发包中,实现各种同步结构和部分其他组成单元( ThreadPoolExecutor.Worker) 的基础。

理论上,同步结构可以相互实现(例如使用 Semaphore 实现 mutex),但 直接用一个同步结构来实现另一个可能会导致实现上的复杂性和难以理解的代码 。为了解决这种复杂性,Doug Lea 设计了 AQS,AQS 作为一个抽象类,内部封装了同步状态的管理、线程的排队和等待、以及条件变量的支持等基础功能。

通过继承 AQS 并实现其部分方法,可以相对容易地构建各种同步结构,如互斥锁 ReentrantLock、读写锁 ReadWriteLockCountDownLatch ,同步结构的开发者只需要关注于同步机制的具体逻辑,而不是底层的线程调度和同步状态管理。

本文旨在介绍 AQS 抽象类以及 ReentrantLock 如何基于 AQS 实现公平、非公平两种模式的互斥锁。

AQS 原理简述

AQS 的核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态。如果共享资源被占用,就需要一定的 阻塞等待唤醒机制 来保证锁分配。

AQS 内部字段和方法有三个重要的组成:

  • 一个先入先出(FIFO) 的等待线程队列 CLH,等待获取锁的线程放入该队列中,实现多线程间的竞争和等待。

    CLH:Craig、Landin and Hagersten队列,单向链表实现。AQS 的队列是 CLH 变体的虚拟双向队列,AQS 将请求共享资源的线程封装为 Node 对象实现锁分配。

  • 共享资源的持有状态通过一个 volatile 的整数成员 state 同步,同时还提供了 setStategetState 方法。

    java">/* The synchronization state. */
    private volatile int state;
    
  • 各种基于 CAS 操作的基础方法,以及期望具体同步结构(AQS 实现类)去实现的 tryAcquire/tryRelease 方法。tryAcquire 操作,尝试非阻塞地获取资源的独占权tryRelease 操作,释放对某个资源的独占。
    ReentrantLock 的内部类 Sync 继承了 AQS,公平锁和非公平锁模式分别实现了 tryAcquire 方法。

等待线程队列是 AQS 机制的核心之一,它的示意图如下:

CLH队列示意图


内部数据结构

AQS CLH 等待队列中的节点 Node 的数据结构如下:

java">// 队列中的节点
static final class Node {
    // 标记当前节点等待状态为共享还是排他
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    // 当前节点已经取消(因为超时或中断)
    static final int CANCELLED =  1;
    // 后继节点被或将要被阻塞(因park), 当前节点在释放或取消时需要unpark后继
    static final int SIGNAL    = -1;
    // 当前节点位于条件变量等待队列, 不会被用于sync队列节点
    static final int CONDITION = -2;
    // waitStatus value to indicate the next acquireShared should unconditionally propagate
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    ...
}

上述字段的含义如下:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread节点包装的线程对象
prev指向前驱节点的指针
nextWaiter指向下一个处于 CONDITION 状态的节点(Condition Queue使用)
next指向后继节点的指针

详细解释下上述各个字段的用处:

  1. 队列节点 Node 包装 Thread 实例,控制信息由前驱节点 waitStatus 字段持有。例如:当前线程尝试获取锁失败后是否 park,需要查看前驱节点的waitStatus 是否为SIGNAL。
  2. 节点入队时,只需要将节点作为新的 tail 节点链接到链表中;出队时,将节点设置为新的 head。
  3. 使用prev指针处理可能的取消 (等待超时或中断);如果一个节点取消,它的后继节点需要重新链接到一个未被取消的前驱节点。(详见shouldParkAfterFailedAcquire方法)
  4. next 指针用于实现阻塞唤醒机制,unparkSuccessor(node) 方法先使用 node.next node 节点的第一个有效的后继节点,确定向哪个线程发送 unpark 信号。(详见 unparkSuccessor方法)

waitStatus 枚举值如下:

枚举含义
0当一个Node被初始化的时候的默认值
CANCELLED(1)表示线程获取锁的请求已经取消了
CONDITION(-2)节点在等待队列中,节点线程等待唤醒
PROPAGATE(-3)线程处在SHARED情况下,该字段才会使用
SIGNAL(-1)线程已经准备好,等待资源释放

独占锁的获取流程



共享锁的获取流程(参考ReadWriteLock):



公平锁 vs. 非公平锁

公平锁 指的是多个线程排队获取共享资源,获取锁的顺序按照线程请求锁的顺序来分配,线程队列中的第一个线程可以获得共享资源,其余线程等待资源释放。

公平锁的优点:

  • 公平:所有线程都能按照申请锁的顺序获得锁,避免了“饥饿”现象
  • 可预测性:可以预测任何线程获取锁的大致时间。

公平锁的缺点:

  • 效率较低:每次都需要检查和维护获取锁的顺序,增加了开销。
  • 吞吐量相比非公平锁更低:CPU唤醒阻塞线程的开销比非公平锁大。

非公平锁 指的是多个线程加锁时直接尝试获取锁,获取不到才会加入等待队列中。但如果加锁时成功获取锁,则无需进入等待队列。非公平锁可能出现后加锁的线程先于先加锁的线程获得锁:

优点:减少唤起线程的开销,提升整体吞吐率

缺点:等待队列中的线程可能获取不到锁,导致饥饿。


以奶茶店排队为例:

  • 公平锁模式:新同学 whr 到奶茶店领取奶茶,发现已经排了很长队,下一个领奶茶的是小王同学。具有公平观念的 whr 排到队伍尾部等待领取奶茶。
  • 非公平锁模式:左图中新同学 whr 领取奶茶,小王同学开了小差,whr直接插队领到了奶茶。右图中,小王同学坚守道德原则,不允许 whr 插队,whr 领取奶茶失败,排到队伍尾部等待领取。

在介绍 ReentrantLock 两种锁模式前,我先梳理下 ReentrantLock 和 AQS 之间的关系。

  • ReentrantLock 的内部抽象类 Sync 继承 AQS:

    java">abstract static class Sync extends AbstractQueuedSynchronizer {}
    
  • ReentrantLock 中的成员属性 sync 为 Sync 类型实例的引用,ReentrantLock 构造函数对该属性初始化:

    java">public class ReentrantLock implements Lock, java.io.Serializable {
    	private final Sync sync;
    	
    	// 默认为非公平锁, 系统吞吐率更高
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
    	// 如果fair为true, 显式指定采用公平锁模式
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    }
    

    NonfairSyncFairSync 均为继承 Sync 抽象类的同步结构实现类,对应于非公平锁和公平锁。

  • ReentrantLock 使用者通常使用 lock API 获取锁,该方法会调用 Sync#lock() 方法,FairSync 和 NonFairSync 各自实现了 Sync#lock 抽象方法。
    注意到:非公平锁 lock 时,先使用 CAS 尝试更新 state,如果更新成功则说明成功获取锁,不考虑是否有其它线程在排队等待

    java">// ReentrantLock#lock
    public void lock() {
        sync.lock();
    }
    
    // FairSync#lock
    final void lock() {
        acquire(1);
    }
    
    // NonFairSync#lock
    final void lock() {
       if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
       else
           acquire(1);
    }
    

    AQS 中实现了 acquire 方法,调用 tryAcquire 方法尝试非阻塞地获取锁。如果成功,acquire 方法返回;如果失败,则将当前线程包装为 Node 对象,加入到阻塞队列中。

    java">public final void acquire(int arg) {
    	// tryAcquire 非阻塞地获取锁
        if (!tryAcquire(arg) &&
        	// 非阻塞获取锁失败, 当前线程进入 CLH 队列中等待其它线程释放锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    tryAcquire 方法在 AQS 中的实现是抛出异常,它的真正实现交由继承了 AQS 的实现类。

介绍完 ReentrantLock 和 AQS 的 API 交互关系,下面我将结合源码介绍 FairSync、NonFairSync 对 tryAcquire 方法的实现。

ReentrantLock 非公平锁

这一章介绍 ReentrantLock 使用 AQS 实现的非公平锁 NonfairSync,下图为加锁流程图

加锁时调用 ReentrantLock#lock:

  • 调用 sync 属性的 lock 方法,sync 属性默认初始化为 NonfairSync 对象。
  • 如果 state 等于 0,NonfairSync#lock 先通过 CAS设置为 1;
    • 设置成功,将 exclusiveOwnerThread 设置为当前线程;
    • 如果失败,执行 AQS 的 acquire 方法排队等待锁资源的释放;
  • acquire 方法会先调用 tryAcquire 方法尝试获取锁,该方法由同步器实现,因此调用 NonfairSync#tryAcquire,也就是调用 nonfairTryAcquire 方法。
  • nonfairTryAcquire 方法中
    • 判断 state 为 0,尝试 CAS 更新为 1,表示占有锁,失败则返回 false;
    • 如果 state 不等于 0,说明锁已经被某个线程占用,检查 exclusiveOwnerThread 是否等于当前线程。若等于,则说明当前线程已经持有锁,更新 state 表示重入获取;若不为当前线程,返回 false。
java">// ReentrantLock.NonfairSync 非公平锁实现的lock()方法
// 无论等待队列中是否有等待线程, 直接使用 CAS 尝试获取锁
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// ReentrantLock.NonfairSync 
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    	// 当前锁是空闲状态, CAS 设置 state 为 1, 设置 exclusiveOwnerThread 为当前线程 
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	// 可重入锁的持有线程为当前线程, 增加持有计数, 允许再次获取锁 
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以看到,当ReentrantLock 是非公平锁时,在 lock 方法和 tryAcquire 方法中,新来的线程会直接尝试 CAS 设置 state 为 1,而不考虑 CLH 队列中是否有其它线程在等待。这就是买奶茶时的插队行为,系统并发度较高时,争用共享资源的线程很多,可能导致队列中的等待线程饥饿!


ReentrantLock 公平锁

下面代码是公平锁实现的 tryAcquire 方法:

java">protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
       // hasQueuedPredecessors 检查队列中是否存在排队的线程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 设置当前线程为锁占有线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	// 当前线程已经持有锁, 可重入地获取锁, 增加重入计数
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相比 非公平锁,公平锁 FairSync#tryAcquire 在判断 state 等于 0 后,先通过 hasQueuedPredecessors 判断是否已经有线程在等待队列中排队。如果返回 true,说明有线程已经在排队了,当前线程不能直接获取锁。


hasQueuedPredecessors 实现如下:

java">public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

来讨论下实现细节,该方法何时返回 true

return 语句中的条件出现的场景如下:

  • h != t
    • 情况一:已经有节点在排队,h 和 t 都不为 null;
    • 情况二:队列正在初始化,h 和 t 有一个为 null,但这种赋值顺序下,只可能 t 为 null。
  • s = h.next
    • s = h.next 不等于 null,对应情况一,队列中至少有两个不同的节点存在,除去 head 为虚节点,至少有一个线程正在排队等待锁释放。如果队列中第一个等待线程不是当前线程,说明队列中已经有其它线程正在等待,当前线程需要入队等待,保证公平锁策略。
    • s = h.next 等于 null,对应情况二,其它线程 B 正在初始化 CLH 队列,但还未完成【队列初始化并将线程节点入队】,当前线程 A 需要入队等待。

情况一的时序图如下

线程 A 调用 FairSync#tryAcquire 获取公平锁,执行 hasQueuedPredecessors() 检查等待队列中是否已经有线程排队。

线程 B 在线程 A 之前执行 tryAcquire,但是获取锁失败,执行 addWaiter 将自身加入到等待队列中。但是等待队列还未初始化(tail == null),因此使用 enq 方法入队,enq 会执行队列初始化,。(addWaiter 和 enq 方法实现在 AQS,我会在【源码分析】中详细介绍)


最终 h != t && (s = h.next) == null ,hasQueuedPredecessors 返回 true,而这个队列中的前驱节点就是 线程 B 的 Node 对象。



情况二时序图如下

等待队列中已经存在节点(h.next != null),且等待队列中第一个线程不是当前线程 s.thread != Thread.currentThread(),说明存在排队的线程,hasQueuedPredecessors 返回 true。


AQS 源码分析

本章节结合 AQS 源码,带大家更深入地了解调用 ReentrantLock 的 lock 和 unlock 方法时,底层的 AQS 完成了哪些工作。

加锁过程

ReentrantLock#lock 最终会调用 AQS 的 acquire 方法,我以该方法为入口介绍加锁流程:

java">public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS acquire 方法会调用同步结构实现的 tryAcquire 方法非阻塞地获取锁,根据 ReentrantLock 不同的锁模式,执行不同的 tryAcquire。这一部分已经在【非公平锁 vs. 非公平锁】中完成详细介绍。


addWaiter

acquire 方法中,如果 tryAcquire 获取锁失败,则会执行 addWaiteraddWaiter当前线程包装为 Node 节点,节点模式为 Node.EXCLUSIVE,加入到 CLH 等待队列中,返回包含当前线程的 Node 对象。

  • 如果 tail 等于 null,说明队列还未初始化,使用 enq 方法先初始化队列,再将 Node 实例入队。
  • 如果 CAS 更新 tail 字段失败,也执行 enq 方法,在 死循环中确保节点入队成功
java">private Node addWaiter(Node mode) {
    // 锁模式 + 当前线程 新建 CLH 队列的节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        // 前驱指向 pred, 即原 tail 节点
        node.prev = pred;
        // CAS 设置 tail 属性, 设置成功才允许将 prev.next 设置为 node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 队列为空, 初始化队列; CAS 失败, 则死循环直到入队成功
    enq(node);
    return node;
}

如果走到 enq 方法,说明 tail 等于 null (等待队列为空) 或者 CAS 设置 tail 失败

java">private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 入队前先初始化队列, CAS确保只有一个线程完成队列初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // CLH 队列已经初始化, 将 node 追加到队列尾部, CAS 失败则继续循环
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

执行完 addWaiter 入队操作后,acquireQueued 方法让排队中的线程尝试获取锁,直到成功 或者 不再需要获取(中断)

java">final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // p==head: 当前线程是队列中第一个等待线程, 锁释放后唤醒了当前线程
            if (p == head && tryAcquire(arg)) {
                // tryAcquire返回true, 当前线程成功获取锁, 将node设置为头节点
                // node.prev node.thread 设置为 null
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 判断当前线程能否park(RUNNABLE 切换为 WAITING), 后面详细解释这个方法
            // 如果可以, 使用 LockSupport.park 挂起当前线程, unpark唤醒后, 返回中断标记
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果使用可中断的 API, parkAndCheckInterrupt返回true后抛出InterruptedException
        // 此时failed等于true, cancelAcquire 取消当前线程获取锁的尝试, node.ws更新为 CANCELLED
        if (failed)
            cancelAcquire(node);
    }
}

// 将Node实例设置为等待队列头节点, 
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

当前线程被包装为 node 对象加入队列后,如果 head 的后继为 node 节点,且当前线程 tryAcquire 返回 true。说明获取锁成功,等待队列的变化如下图所示:

如果 node 节点不为 head 的后继,说明队列中已经有线程在等待锁,应该 park 挂起当前线程,直到当前线程被中断或 unpark 唤醒。

这一过程的详细分析以及等待队列的变化将在 shouldParkAfterFailedAcquire 方法中介绍。现在大家只要了解 shouldParkAfterFailedAcquire 用于判断当前线程是否应该 park 挂起。如果该方法返回 true,则执行 parkAndCheckInterrupt 方法挂起当前线程:

java">private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程被唤醒后,从LockSupport.park(this)后继续执行,返回线程中断标记(中断标记会被清除)。

如果 parkAndCheckInterrupt 返回 true,则在当前线程获取锁后,acquireQueued 方法也返回 true:

java">public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

因为 Thread.interrupted 检查中断时清除了中断标记,因此这里调用 selfInterrupt 重新设置线程中断标记。综上所属:ReentrantLock#lock() 不会响应中断,只是记录中断记录。中断仅仅让等待线程在 acquireQueued 空转一次。


这里简单提一下,如果使用支持中断的 API:ReentrantLock#lockInterruptibly(),在 tryAcquire 返回 false 后,会调用 AQS 的 doAcquireInterruptibly 方法获取锁。

java">    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

该方法的流程与 acquireQueued 基本相同,差异主要在于线程被中断后的响应。如果 parkAndCheckInterrupt 返回 true,则会抛出InterruptedException,调用者可以感知到中断异常,而不需要显式检查中断标记。


shouldParkAfterFailedAcquire


shouldParkAfterFailedAcquire 代码详细注释如下,该方法判断当前线程能否执行后续的 parkAndInterrupted() 方法,park 挂起线程等待锁资源释放:

java">private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // ws == -1: 当前线程可以安心地挂起, 如果可获取锁, 前驱节点负责唤醒当前线程
        return true;
    if (ws > 0) {
        // 移除被取消的前驱节点, 再次尝试在 acquireQueued 的循环中获取锁
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 不使用 CAS 更新, 因为node包装的当前线程不可能被取消
        // 不存在其它线程将 pred.next 更新为其它节点的情况
        pred.next = node; 
    } else {
        // ws == 0 可能有如下两种情况: 
        // (1) pred 节点入队时ws一直为初始值 0
        // (2) 锁释放时执行了unparkSuccessor(pred), 但是当前线程没有成功获取锁
        //   设置前驱节点的 ws 为 -1, 等待下一次 unparkSuccessor(pred)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • 只有 node 的前驱节点的 waitStatus 等于 SIGNAL,才能返回 true,当前线程才能安心地挂起。锁释放时,当前线程会被唤醒。
  • shouldParkAfterFailedAcquire 返回 false 后,进入 acquireQueued 方法的下一轮循环,当前线程会再次尝试获取锁 p == head && tryAcquire(arg)。返回 false 有如下两种情况:
    • 如果 node 的前驱节点已经取消(ws == 1),将取消节点移除等待队列,方法返回 false,再次尝试从 acquireQueued 方法获取锁。
    • 如果 node 的前驱节点 ws == 0,可能因为 pred 节点入队时 ws 一直保持初始值 0;或是 持有线程释放锁时执行了 unparkSuccessor(pred),唤醒了当前线程,但是当前线程在唤醒后还是没有成功获取锁。
      这两种情况下,都需要设置前驱节点的 ws 为 -1,等待下一次锁被释放时,执行 unparkSuccessor(pred)

下图为 node 未成功获取锁时,等待队列发生的变化:

  1. node 节点对应的线程 tryAcquire 未能获取锁,执行 acquireQueued 排队等待,此时 node 节点将作为新的 tail;
  2. node 线程执行 shouldParkAfterFailedAcquire 方法,发现它的两个前驱节点 node1、node2 的 waitStatus 为 CANCELLED(1),因此将它们从队列中移除,方法返回 false。
  3. node 线程再次尝试在 acquireQueued 方法的循环中获取锁,结果 tryAcquire 还是失败。再次进入 shouldParkAfterFailedAcquire 方法,发现 node 前驱节点 head 的 waitStatus 等于 0,因此通过 CAS 修改为 SIGNAL(-1),方法返回 -1。
  4. node 线程执行第三轮循环,终于 tryAcquire 返回 true,成功获取锁,node 成为头节点。

下面,我将分享下阅读源码过程中的一些疑问,以及自己的理解。

ws > 0 说明前驱节点状态为 CANCELLED,这种状态怎么产生的

当使用可中断的 API 时,例如lockInterruptibly,parkAndCheckInterrupt 检测到中断返回 true 时,会抛出 InterruptedException。finally 代码块执行时 failed 等于 true,执行 cancelAcquire 取消 node 获取锁的尝试,从等待队列中移除 node,这一过程中 node.waitStatus 会被修改为 CANCELLED

java">private void doAcquireInterruptibly(int arg)
    // ...
    boolean failed = true;
    try {
        for (;;) {
            //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当前线程被中断后, parkAndCheckInterrupt返回true
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

ws 等于 0 和 -1 分别是什么含义

如果节点 node 作为头节点,node.waitStatus 等于 0,说明:

  • node 没有后继节点(没有线程在它后面排队)。如果有新的线程加入队列,会在 shouldParkAfterFailedAcquire 方法将 node.waitStatus 修改为 SIGNAL。

  • 锁持有者释放锁资源时,已经通过 unparkSuccessor(node) 唤醒了阻塞队列中第一个等待线程,node.waitStatus 会修改为中间状态 0。

    java">private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // ...
    }
    

头节点 node.waitStatus 如果为 SIGNAL(-1),锁持有者在释放锁时(执行 AQS release 方法),需要唤醒阻塞队列中的第一个等待线程,即调用 unparkSuccessor(node)。如果 node.waitStatus 等于 0,则无需这样做。

java">    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

为什么 ws == 0,CAS 更新 ws 为 SIGNAL(-1) 后,方法不返回 true,执行 park 挂起当前线程

可能存在如下情况:

  1. 锁持有线程 owner_t 释放锁时,使用 unparkSuccessor ,该方法会将 head.ws 设置为 0,并唤醒队列第一个线程 curr_t。
  2. curr_t 被唤醒后,执行 tryAcquire 尝试获取锁,却因为 preempt_t 抢夺锁而获取失败。
  3. 争抢锁的线程 preempt_t 获取锁,快速执行完业务逻辑后,使用 unlock 释放锁,因为 head.ws 等于 0,不执行 unparkSuccessor 方法
  4. curr_t 因为获取锁失败,执行 shouldParkAfterFailedAcquire,检查到 head.ws 等于 0,于是 CAS 将 head.waitStatus 更新为 SIGNAL(-1)。

上述操作的时间序列如下表所示:

owner_tcurr_tpreempt_t
unlock-->unparkSuccessor
head.ws == 0
lock()
tryAcquire failed
unlock()
head.ws==0, no unparkSuccessor
shouldParkAfterFailedAcquire
compareAndSetWaitStatus(0 to -1)
return false;
空转 再次 tryAcquire 获取锁

如果 CAS 更新 pred.waitStatus 后,方法返回 true,当前线程将在本轮循环中执行 park 操作挂起。在这种情况下,锁处于空闲状态,curr_t 线程却阻塞在队列中等待唤醒而不是获取锁,有可能阻塞队列中的线程永远无法获取锁!

而返回 false 空转一次,curr_t 线程能确认锁当前是被其它线程占用的,才能安心地 park 挂起。

该场景地时序图如下:


cancelAcquire

节点因中断或超时,通过 cancelAcquire 方法设置状态为CANCLE。(ReentrantLock 中支持中断的API为lockInterruptibly)

java">private void doAcquireInterruptibly(int arg)
    // ...
    boolean failed = true;
    try {
        for (;;) {
            //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当前线程被中断后, parkAndCheckInterrupt返回true
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cancelAcquire 方法的执行逻辑如下:

  1. 找到被取消节点node未被取消的前驱节点pred,然后设置node的状态为CANCELLED
  2. 如果node为链表尾部,通过CAS 将tail设置为前驱节点 pred。
  3. 如果 pred 不是头节点,维护 pred 节点的 next 指针,设置 pred.next=node.next
  4. pred为头节点,需要通过unparkSuccessor(node) 唤醒线程争抢锁。

代码详细注释如下:

java">private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 通过 prev 指针跳过取消状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    // waitStatus为volatile字段, 赋值之后, 其余节点能跳过该节点
    // 如果在赋值之前, 不受其它线程的干扰
    node.waitStatus = Node.CANCELLED;

    // 如果node为尾部, 从队列中移除自身, 将node的前驱pred设置为尾部, pred.next设置为null 
    if (node == tail && compareAndSetTail(node, pred)) {
        // CAS, pred.next可能已经因新节点入队而改变
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 当前节点的前驱不是head && 
        // 1: 判断当前节点前驱节点 ws 是否为 SIGNAL 2: 如果不是但可以 CAS 更新为 SIGNAL(即prev没有被取消)
        // 1 或 2 为 true, 且当前节点线程不为 null, CAS 设置 prev.next 为 node.next
        // 如果当前节点线程为 null, 可能是prev被取消 或 prev线程获得了锁成为head
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            
            // 为什么仅修改pred.next? 
            // 1. unparkSuccessor(node) 优先通过node.next定位下一个未被取消的节点, 唤醒该节点争抢锁
            // 若node.next等于null或ws==0, 利用prev指针从后向前遍历, 效率会降低
            // 2. 不让 pred.next 引用被取消的 cancelled 节点, 有助于被取消节点的垃圾回收
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 兜底方案: 1(1)node的前驱为head 或 (2)pred节点几乎同时被取消; 唤醒最靠近node节点且未被取消的后继
            // 牺牲了部分性能, 因为可能要遍历等待队列, 并且唤醒线程
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

上述代码,将被取消的节点 node 非为了三类:node 为尾节点、node 为中间节点、node 为头节点的后继节点。

  • node 为尾部节点时,将 node 未被取消的前驱节点 pred 设置为 tail,更新 pred.next 为 null,过程如下:

  • node 为中间节点时,维护好前驱节点 pred.next,如果 node.next 有效,将 pred.next 设置为node.next;

  • node 为头节点的后继,此时需要 unpark 唤醒 node 的后继节点,尝试获取锁。


我们来讨论如下几个问题:

为什么 pred == head 情况下,一定要 unparkSuccessor(node)

pred 节点对应的线程获得锁,记为 owner_t;node 节点对应的线程记为 node_t。假设出现如下的指令序列:

owner_tnode_t
owner_t 线程释放锁,unparkSuccessor(head)
head.ws 设置为 0
找到后继第一个未被取消的节点 node
node_t 被中断,执行cancelAcquire(node)
LockSupport.unpark(node.thread)
node.waitStatus = Node.CANCELLED

结论:如果 node 节点取消后不执行 unparkSuccessor,可能 node 节点后的等待线程不会再被 unpark

因为锁持有线程 owner_t 释放时,执行unparkSuccessor 将 node_t 线程唤醒,并将 head.waitStatus 设置为 0。但与此同时node 线程因为中断被取消,并不会尝试获取锁。

此时锁被释放,但是等待队列中的线程却不会被唤醒,永远无法获取锁。

即使是非公平锁,另一个线程 t 抢占了锁,释放时执行 release 方法发现 head.waitStatus 等于 0,无需执行 unparkSuccessor,仍然无法唤醒等待队列中的线程。

java">public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 执行 waitStatus 需要 head.ws 不等于 0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

为什么需要判断 pred.thread != null

java">if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    pred.thread != null) {
    //...
}

pred.thread 等于 null 有两种情况:

  • 情况一:pred 成为新的 head 节点,thread 被设置为 null;
  • 情况二:pred 也被取消了,thread 被设置为 null;

结论

  1. 发生情况一,pred 成为新的 head 节点,如果没有 pred.thread !=null,可能导致等待队列中的线程不会再被唤醒!
  2. 如果发生情况二 pred 被取消,没必要判断 thread != null,因为 是否唤醒后续线程的决定交由 pred 节点,node 不执行 unparkSuccessor 不会导致等待队列中的线程得不到唤醒。

假设出现如下序列,pred 节点的 waitStatus 属性等于 0,pred_t 成功获取锁,并将 pred 节点设置为头节点。

随后,pred_t 释放锁,因为检查到 head.ws 等于 0,不执行 unparkSuccessor。

在这之后,node_t 线程 CAS 更新 pred.ws 为 SIGNAL(-1);如果不检查 pred.thread,将不会有其他线程 unpark 等待队列中的线程。

pred_t 线程node 线程
pred != head
pred_t 线程执行 tryAcquire 返回 true (pred.ws == 0)
setHead(pred):
1. head属性设置为 pred
2. pred.thread设置为null
pred 立即释放锁,检查到 head.ws 等于0
不执行 unparkSuccessor
CAS 将 pred.ws 设置为 SIGNAL(-1)

解锁过程

ReentrantLock 通过 unlock 方法释放锁:

java">public void unlock() {
    sync.release(1);
}

本质是调用 AQS 的 release 方法,release 中调用 tryRelease 方法:

java">public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLock 内部类 Sync 实现了 tryRelease 方法,并不区分公平锁和非公平锁:

java">protected final boolean tryRelease(int releases) {
    // c 为剩余重入次数
    int c = getState() - releases;
    // 锁只能持有者线程才能释放, 当前线程若没有持有锁, 抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 锁重入次数扣减为 0, 可以释放锁资源, 独占锁的持有线程 exclusiveOwnerThread 设置为 null
    if (c == 0) {
        free = true;
        // 锁持有线程设置为 null
        setExclusiveOwnerThread(null);
    }
    // 更新state
    setState(c);
    return free;
}
  1. 检查当前线程是否为互斥锁的所有线程,如果不是,抛出 IllegalMonitorStateException 异常。
  2. 检查 state 锁持有次数,减去释放次数 releases 后是否等于 0。如果等于 0,说明锁完全释放,将持有线程设置为 null,此时锁可以被其它线程获取。
  3. setState 更新 state。

阅读代码时的问题

问题一:为什么 release 中判断 h != null && h.waitStatus != 0

  • h 如果为 null,说明锁释放后,等待队列还未初始化,没有线程在等待锁资源;
  • h != null && h.waitStatus == 0 ,说明 head 没有后继节点,或者后继节点没有 park,仍然在运行中,无需唤醒
  • h != null && h.waitStatus < 0,head 的后继节点处于 WAITING 状态,需要唤醒。

问题二:是否可以先 setState,再 setExclusiveOwnerThread不可以!

tryAcquire 获取锁时,会先检查 state 是否为 0,如果为 0,则 CAS 更新为 state 为 1,然后设置 exclusiveOwnerThread 为当前线程。

假设线程 A 执行 tryRelease 释放锁,线程 B 执行 FairSync#tryAcquire 尝试获取锁,可能出现如下序列:

线程 A线程 B
setState(0)
compareAndSetState(0, acquires)
setExclusiveOwnerThread(current)
setExclusiveOwnerThread(null)

线程 A 执行先执行 setState 将 state 设置为 0,线程 B 执行 tryAcquire 是 CAS 更新 state 为 1 成功。

随后,线程 B 将锁持有线程更新为 current 当前线程,结果随后线程 A setExclusiveOwnerThread(null) 操作将 锁持有线程 设置为 null,造成了同步状态的不一致!



unparkSuccessor

tryRelease 执行成功后,如果需要唤醒队列中等待锁的线程,将执行 unparkSuccessor。该方法用于唤醒 node 后第一个未取消的节点

java">private void unparkSuccessor(Node node) {
	// node.ws 更新为 0, 准备 unpark 后继节点
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	// 先使用node.next指针找, 相比使用prev遍历, 效率更高
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // node.next被取消, 使用兜底方案
        // 从尾部节点开始找,利用prev指针向前遍历, 找到队列中第一个 ws<=0(未取消)的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

代码我已经添加了详细注释,但有几点我需要解释下:

  • 使用 prev 指针从后往前找未取消节点 (ws <= 0) 是安全的,因为 prev 的所有修改仅仅是 跳过已取消节点 (ws==1),不影响查找未取消的节点

  • 使用 next 指针从前向后遍历是不安全的,因为可能存在 node 的后继节点 succ 完成 cancelAcquire 方法的调用。cancelAcquire 方法最后设置 succ.next==succ。此时 succ.next 指针是不可靠的,因此只能走从后往前遍历的方式。

  • 为什么 next 指针遍历不安全,还会先访问一次 node.next?

    因为 node.next 指针一定有效,unparkSuccessor 会在如下场景中被调用:

    • (1) 在 AQS release方法中执行 unparkSuccessor(head) 用以唤醒队列中的头节点,头节点的 next 指针一定有效。
    • (2) node 节点取消时,node 对应的线程在 cancelAcquire(node) 方法中执行 unparkSuccessor(node),此时 node.next 指针还未修改,一定是有效的。

总结:next 指针用于加快唤醒等待线程的效率,不必遍历整个 CLH 队列;而 prev 指针将作为 next 指向节点被取消后的兜底方案,从队列尾部后向前遍历,确保一定能找到最近的未取消节点




创作过程耗时费力,但我乐在其中(钻研源码的过程和分享知识是让人快乐的事情),如果大家喜欢这种图文结合、代码详细注释的写作风格,就给我点一个免费的赞吧!


http://www.niftyadmin.cn/n/5436308.html

相关文章

第八章、设计模式

23种设计模式 创建型模式5种 工厂方法模式*&#xff08;*表示常考的&#xff09;Factory Method抽象工厂模式*Abstract Factory原型模式Prototype单例模式Singleton单例模式构建器模式*Builder 结构型模式7种 适配器模式*Adapter桥接模式Bridge组合模式Composite装饰模式*Decor…

爬虫逆向实战(35)-MyToken数据(MD5加盐)

一、数据接口分析 主页地址&#xff1a;MyToken 1、抓包 通过抓包可以发现数据接口是/ticker/currencyranklist 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现有一个code参数 请求头是否加密&#xff1f; 无 响应是否加密&#xf…

SpringBoot ApplicationListener实现发布订阅模式

文章目录 前言一、Spring对JDK的扩展二、快速实现发布订阅模式 前言 发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式&#xff0c;它被广泛应用于事件驱动架构中。即一个事件的发布&#xff0c;该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供…

Qt散文一

Qt的事件分为普通事件和系统事件&#xff0c;普通事件比如用户按下键盘&#xff0c;系统事件比如定时器事件。事件循环的开始是从main函数的QApplication&#xff0c;然后调用exec()开始的&#xff0c;在执行exec()函数之后&#xff0c;程序将进入事件循环来监听应用程序的事件…

【开源】SpringBoot框架开发智慧社区业务综合平台

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 业务类型模块2.2 基础业务模块2.3 预约业务模块2.4 反馈管理模块2.5 社区新闻模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 业务类型表3.2.2 基础业务表3.2.3 预约业务表3.2.4 反馈表3.2.5 社区新闻表 四、系统展…

蓝桥杯刷题 Day36 倒计时26天 纯练题的一天

[蓝桥杯 2022 省 B] 积木画 题目描述 小明最近迷上了积木画&#xff0c;有这么两种类型的积木&#xff0c;分别为 I 型&#xff08;大小为 2个单位面积) 和 L 型 (大小为 3 个单位面积): 同时&#xff0c;小明有一块面积大小为2N 的画布&#xff0c;画布由2N 个 11 区域构成。…

【Linux】Centos7安装Tomcat9

1.下载安装包 官网地址&#xff1a;https://tomcat.apache.org/download-90.cgi 网盘地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1V5I-imDqtGrBgvq6mXtMmg?pwd6666 提取码&#xff1a;6666 2.上传安装包到虚拟机 3.解压到/usr/local/ tar -zxvf apache-to…

使用Cloudflare来给wordpress网站图片自动压缩加速

首先打开Cloudflare Worker 创建一个服务名称随意&#xff0c;内容使用连接内的wordpress-worker.js内容覆盖原内容即可 https://github.com/Mecanik/cloudflare-image-resizing-worker 然后打开触发器然后添加路由&#xff0c;设置你的域名*.example.com/*注意使用通配符使域名…