ReentrantReadWriteLock源码探究

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

ReentrantReadWriteLock源码探究

纳兰小依   2020-03-22 我要评论
`ReentrantReadWriteLock`实现了可重入的读锁和写锁,其中读锁是共享锁,写锁是互斥锁。与`ReentrantLock`类似,`ReentrantReadWriteLock`也提供了公平锁和非公平锁两种实现,以满足不同的场景。因此,实际在使用时,会涉及到读锁、写锁、公平锁、非公平锁四个不同的概念,这也使得`ReentrantReadWriteLock`更加复杂一些。 ## 1.核心字段与构造器 ``` private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } ``` `ReentrantReadWriteLock`通过`ReadLock`和`WriteLock`两个内部类分别实现读锁和写锁,并且默认的构造器使用非公平锁。`Sync`这个内部类同样继承了`AbstractQueuedSynchronizer(AQS)`,排队等候的逻辑都交由`AQS`实现,接下来分别看一下读锁和写锁的加锁逻辑。 ## 2.读锁的加锁逻辑 ``` public void lock() { sync.acquireShared(1); } //位于AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //位于Sync protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果写锁已经被占有,并且占有者不是当前线程,则返回-1,即写锁被其他线程占有时不能获取读锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); //如果读锁不需要阻塞,并且读锁的获取数量没有达到最大值(2^16-1),则尝试将读锁的持有数量加1(注意是在高16位加1), //如果加1操作能够成功,则表示当前线程成功获取读锁 //注意,公平读锁和非公平读锁的readerShouldBlock()方法逻辑是不一样的 //非公平读锁在等待队列第一个线程请求写锁时会返回true,其他情况都返回false //公平读锁会查看当前等待队列中是否有其他线程在等待 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r == 0表示当前线程是第一个获取读锁的线程,将firstReader指向自己,并初始化firstReaderHoldCount字段 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //如果firstReader已经指向了自己,就将firstReaderHoldCount加1,表示当前线程作为第一个获取读锁的线程,共获取读锁的次数 firstReaderHoldCount++; } else { //cachedHoldCounter记录的是最后一个获取读锁的线程 //使用cachedHoldCounter可以节省在ThreadLocal中查找操作 HoldCounter rh = cachedHoldCounter; //如果cachedHoldCounter还没初始化,或者最后一个获取读锁的线程不是当前线程,就从ThreadLocal中查看当前线程对应的HoldCounter //注意,如果无法在ThreadLocal中查到当前线程的记录,那么就会新建一个HoldCounter加入ThreadLocalMap中,对应的count字段初始化为0 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //执行到这里,说明rh != null,并且当前线程是最后一个获取读锁的线程,此时更新ThreadLocalMap中的value值 else if (rh.count == 0) readHolds.set(rh); //将读锁的持有次数加1 rh.count++; } //获取读锁成功,则返回1 return 1; } return fullTryAcquireShared(current); } //读写锁使用state字段的低16位表示写锁,高16位表示读锁 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //获取读锁被持有的次数,数量由state的高16位表示 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //如果写锁被其他线程占有,或者当前获取读锁的线程需要阻塞,就返回-1,如果获取读锁成功则返回1,其他情况会继续自旋 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //自旋 for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { //如果写锁已经被其他线程占有,则返回-1 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { //获取最后一个获取锁的线程 rh = cachedHoldCounter; //如果当前线程不是最后一个获取锁的线程,则从ThreadLocalMap中取出HoldCount对象 if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); //当前线程没有获取读锁,就将ThreadLocalMap中存的HoldCount清理掉 //因为根据上面的代码逻辑,走到这里的时候,说明当前获取读锁的线程应该阻塞, //即无法获取读锁,这种情况下需要清理ThreadLocalMap中的记录 if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //读锁获取数量达到最大,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //如果成功更新state的值,则表示读锁获取成功,否则会继续自旋 if (compareAndSetState(c, c + SHARED_UNIT)) { //如果当前线程是第一个持有读锁的线程,就设置firstReader字段 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //从ThreadLocal中查找当前线程对应的HoldCounter对象 rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //更新当前线程持有锁的数量 rh.count++; //将cachedHoldCounter指向当前线程的HoldCounter对象 cachedHoldCounter = rh; // cache for release } return 1; } } } ``` `tryAcquireShared()`方法用到了`ThreadLocal`来记录当前线程获取的读锁数量,有兴趣的话可以参考[ThreadLocal源码探究 (JDK 1.8)](https://www.cnblogs.com/NaLanZiYi-LinEr/p/12430457.html)了解`ThreadLocal`的实现细节。公平锁和非公平锁以不同的方式实现了`readerShouldBlock()`方法,接下来分别讲解公平锁和非公平锁的实现。 - 非公平锁的`readerShouldBlock()`实现 ``` //由名字可以看出,这个方法主要是判断读锁是否应该阻塞 final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } //如果等待队列的第一个线程请求互斥锁,则返回true,表示应该阻塞当前的读锁 //由readerShouldBlock()方法的注释了解到,非公平的读锁在发现等待队列头节点请求互斥锁时, //需要进行阻塞而不是抢锁,是为了避免读请求太多的情况下造成写锁线程饥饿 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; //head != null表示等待队列已初始化, //h.next != null && !s.isShared()表示队列的头结点请求的是互斥锁 //s.thread != null表示头结点对应的线程还没有开始执行 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } ``` 对于非公平锁的读锁来说,在发现等待队列中的第一个线程请求写锁时,会主动取消抢锁,是为了避免请求写锁的线程饥饿,这是与`ReentrantLock`中的非公平锁一个很大的不同。与非公平锁不同的是,公平锁会先查看队列中是否有其他线程在等待。 - 公平锁的`readerShouldBlock()`实现 ``` final boolean readerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //只要队列中有其他节点在等候,公平锁就要求其他线程排队等待 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` 至此`tryAcquireShared()`方法的逻辑已经介绍完了,在该方法返回`-1`时,表示当前无法获取读锁,就会接着执行`doAcquireShared()`方法,来看看该方法的源码: ``` private void doAcquireShared(int arg) { //将当前线程构造成节点,放到等待队列的末尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //中断标志 boolean interrupted = false; for (;;) { //获取当前线程的前一个节点 final Node p = node.predecessor(); //只有在当前节点是队列中的第一个有效节点时,才会执行下面的语句 if (p == head) { //尝试获取读锁,获取成功则r>0,失败则r<0 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC //如果线程设置了中断标记,则将当前线程中断 if (interrupted) selfInterrupt(); failed = false; return; } } //当前线程不是队列第一个有效节点,或者获取读锁失败,就阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //线程中断之后,设置interrupted=true,之后代码逻辑会自旋一次,会在for循环的第一个条件语句中用到该字段 interrupted = true; } } finally { //如果线程被中断,或者出现异常时,failed=true,需要通过cancelAcquire()方法放弃获取锁 if (failed) cancelAcquire(node); } } ``` `doAcquireShared()`方法的逻辑与同在`AQS`中的另一个方法`doAcquireSharedInterruptibly()`非常相似,在[CountDownLatch源码探究 (JDK 1.8)](https://www.cnblogs.com/NaLanZiYi-LinEr/p/12391903.html)对`doAcquireSharedInterruptibly()`有详细的解释,包括`shouldParkAfterFailedAcquire()`和`parkAndCheckInterrupt()`则两个方法,有兴趣的话可以参考,本文不再讨论这些方法的细节。 ## 3.写锁的加锁逻辑 介绍完读锁的加锁逻辑之后,接下来看看写锁加锁的实现原理: ``` public void lock() { sync.acquire(1); } //加锁的代码与ReentrantLock一样,复用了AQS中的处理框架 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //ReentrantReadWriteLock的内部类Sync重写了tryAcquire()方法 protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //①如果w = 0,但是c!=0,说明读锁已结被占有,直接返回false表示获取写锁失败 //②如果w != 0,说明写锁已经被占有,需要判断是不是当前线程占有写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; //判断写锁的持有次数有没有超限 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //更新持有的写锁数量,这里可以看出写锁是可以重入的 setState(c + acquires); return true; } //走到这里,说明c=0,即当前读锁和写锁都没有被占有,公平锁会先检查队列中有没有其他线程在等待锁,非公平锁不会阻塞 //如果writerShouldBlock()反复返回false,才会考虑设置state字段,设置成功表示成功获取写锁,否则返回false表示获取写锁失败 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //记录当前线程持有写锁 setExclusiveOwnerThread(current); return true; } ``` 公平锁和非公平锁同样都重写了`writerShouldBlock()`方法,非公平锁的实现非常简单,直接返回`false`,表示非公平的写锁不需要阻塞;公平锁会检查等待队列中是否有其他线程在等待获取锁,两种实现方式的源码如下: ``` //公平锁的实现 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } //非公平锁的实现 final boolean writerShouldBlock() { return false; // writers can always barge } ``` ## 4.释放锁 - 读锁释放锁 ``` public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //当state=0时表示读锁已完全释放,才会返回true,其他情况返回false protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //判断当前线程是否是第一个获取读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; //firstReaderHoldCount=1,说明读锁只被当前线程占有1次,释放之后更新firstReader的值 if (firstReaderHoldCount == 1) firstReader = null; //如果当前线程多次持有读锁,则将计数减1 else firstReaderHoldCount--; } else { //执行到这里,说明当前线程不是第一个获取读锁的线程 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //如果当前线程获取的读锁次数<=1,在释放锁的时候,需要清除ThreadLocal中的记录 readHolds.remove(); //没有持有读锁的线程释放读锁会报错 if (count <= 0) throw unmatchedUnlockException(); } //将当前线程读锁的重入次数减1 --rh.count; } for (;;) { int c = getState(); //更新读锁的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //nextc=0,说明读锁已释放,返回true,否则返回false //由于读锁是共享锁,可以有多个线程同时获取读锁,只有最后一个持有读锁的线程完全释放读锁,才会返回true return nextc == 0; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //头结点是SIGNAL状态时,将其状态更新成0,该操作会一直自旋重试,直到修改成功,成功之后会唤醒后面的等待线程 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //如果头结点不是SIGNAL状态,就自旋将其更新为PROPAGATE状态 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //h==head说明,从for循环开始到现在头结点没用发生变化 //注意:当线程释放锁的时候,会修改头结点 if (h == head) // loop if head changed break; } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //waitStatus>0只有CANCELLED状态,代表节点放弃获取锁 if (s == null || s.waitStatus > 0) { s = null; //从队列尾部开始向前查找,目的是寻找node节点后第一个非CANCELLED状态的节点,并将s指向该节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒s节点对应的线程 if (s != null) LockSupport.unpark(s.thread); } ``` 由于读锁是共享锁,并且是可重入锁,因此在最后一个持有读锁的线程最后一次释放读锁时,读锁才能真正被释放,此时才会通过`doReleaseShared()`方法唤醒队列中的等待线程。 - 写锁释放锁 ``` public void unlock() { sync.release(1); } public final boolean release(int arg) { //如果线程是释放写锁成功,则唤醒后面的等待线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //线程没有持有写锁不允许释放写锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //释放写锁成功,则设置exclusiveOwnerThread=null,表示写锁目前没有被任何线程占有 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } //判断当前线程是否持有写锁 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } ``` ## 5.其他介绍 - `HoldCounter` `HoldCounter`是用来记录线程持有读锁的数量,源码中使用`cachedHoldCounter`来记录最后一个获取读锁的是哪个线程,由于代码很简单,因此前文并未对其进行介绍,这里做一下简单讲解: ``` /** * A counter for per-thread read hold counts. * Maintained as a ThreadLocal; cached in cachedHoldCounter */ static final class HoldCounter { //记录线程持有读锁的次数 int count = 0; // Use id, not reference, to avoid garbage retention //记录线程id final long tid = getThreadId(Thread.currentThread()); } ``` 与`HoldCounter`配合使用的是`ThreadLocalHoldCounter`类,使用`readHolds`字段维持对该类的引用,下面是`ThreadLocalHoldCounter`的源码: ``` /** * ThreadLocal subclass. Easiest to explicitly define for sake * of deserialization mechanics. */ //继承了ThreadLocal static final class ThreadLocalHoldCounter extends ThreadLocal

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们