【1】基本讲解与使用

① ReadWriteLock同Lock一样也是一个接口,提供了readLock和writeLock两种锁的操作机制,一个是只读的锁,一个是写锁。

读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的(排他的)。 每次只能有一个写线程,但是可以有多个线程并发地读数据。

所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。

理论上,读写锁比互斥锁允许对于共享数据更大程度的并发。与互斥锁相比,读写锁是否能够提高性能取决于读写数据的频率、读取和写入操作的持续时间、以及读线程和写线程之间的竞争。

② 使用场景

假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。

例如,最初填充有数据,然后很少修改的集合,同时频繁搜索(例如某种目录)是使用读写锁的理想候选项。

在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源。但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写。这就需要一个读/写锁来解决这个问题。

③ 互斥原则:

  • 读-读能共存,
  • 读-写不能共存,
  • 写-写不能共存。

④ ReadWriteLock 接口源码示例

public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*/
Lock writeLock();
}

其实现类如下:
多线程并发之读写锁(ReentranReadWriteLock&ReadWriteLock)使用详解_可重入


⑤ 使用示例

实例代码如下:

public class TestReadWriteLock {

public static void main(String[] args){
ReadWriteLockDemo rwd = new ReadWriteLockDemo();
//启动100个读线程
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
rwd.get();
}
}).start();
}
//写线程
new Thread(new Runnable() {
@Override
public void run() {
rwd.set((int)(Math.random()*101));
}
},"Write").start();
}
}

class ReadWriteLockDemo{
//模拟共享资源--Number
private int number = 0;
// 实际实现类--ReentrantReadWriteLock,默认非公平模式
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

//读
public void get(){
//使用读锁
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" : "+number);
}finally {
readWriteLock.readLock().unlock();
}
}
//写
public void set(int number){
readWriteLock.writeLock().lock();
try {
this.number = number;
System.out.println(Thread.currentThread().getName()+" : "+number);
}finally {
readWriteLock.writeLock().unlock();
}
}
}

测试结果如下图:
多线程并发之读写锁(ReentranReadWriteLock&ReadWriteLock)使用详解_可重入_02
首先启动读线程,此时number为0;然后某个时刻写线程修改了共享资源number数据,读线程再次读取最新值!


【2】ReentrantReadWriteLock源码分析

ReentrantReadWriteLock是ReadWriteLock接口的实现类–可重入的读写锁。

① ReentrantReadWriteLock拥有的特性

  • 1.1获取顺序(公平和非公平)

ReentrantReadWriteLock不会为锁定访问强加读或者写偏向顺序,但是它确实是支持可选的公平策略。

  • 非公平模式(默认)

构造为非公平策略(缺省值)时,读写锁的入口顺序未指定,这取决于可重入性约束。持续竞争的非公平锁可以无限期地延迟一个或多个读写器线程,但通常具有比公平锁更高的吞吐量。

  • 公平模式

当构造为公平策略时,线程使用近似的到达顺序策略(队列策略)争夺输入。当释放当前持有的锁时,要么最长等待的单个写入线程将被分配写锁,或者如果有一组读取线程等待的时间比所有等待的写入线程都长,那么该组读线程组将被分配读锁。

如果写入锁被占有,或者存在等待写入线程,则试图获取公平读取锁(非可重入)的线程将阻塞。直到当前等待写入线程中最老的线程获取并释放写入锁之后,该线程才会获取读取锁。当然,如果等待的写入线程放弃等待,剩下一个或多个读取器线程作为队列中最长的等待器而没有写锁,那么这些读取器将被分配读锁。

试图获得公平写锁(非可重入)的线程将阻塞,除非读锁和写锁都是空闲的(这意味着没有等待的线程)。(请注意,非阻塞 ReadLock#tryLock()和{@link WriteLock#tryLock()方法不遵守此公平策略设置,并且如果可能的话将立即获取锁,而不管等待的线程)。

  • 构造器源码如下:
//默认非公平模式
public ReentrantReadWriteLock() {
this(false);
}

//使用给定的策略创建ReentrantReadWriteLock,true--公平 false-nonfair
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

可以看到,默认的构造方法使用的是非公平模式,创建的Sync是NonfairSync对象,然后初始化读锁和写锁。一旦初始化后,ReadWriteLock接口中的两个方法就有返回值了,如下:

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

  • 1.2可重入

这个锁允许读线程和写线程以ReentrantLock的语法重新获取读写锁。在写入线程保持的所有写入锁被释放之前,不允许不可重入的读线程。

另外,写锁(写线程)可以获取读锁,但是不允许读锁(读线程)获取写锁。在其他应用程序中,当对在读锁下执行读取的方法或回调期间保持写锁时,可重入性可能非常有用。


  • 1.3锁降级

可重入特性还允许从写锁降级到读锁—通过获取写锁,然后获取读锁,然后释放写锁。但是,从读锁到写锁的升级是不可能的。

  • 1.4锁获取的中断

在读锁和写锁的获取过程中支持中断 。

  • 1.5支持Condition

Condition详解参考:Condition与Lock使用详解。

写锁提供了Condition实现,​​ReentrantLock.newCondition​​;读锁不支持Condition。

  • 1.6监控

该类支持确定锁是否持有或争用的方法。这些方法是为了监视系统状态而设计的,而不是用于同步控制。


② 特性使用实例

  • 2.1锁降级
class CachedData {
Object data;
//volatile修饰,保持内存可见性
volatile boolean cacheValid;
//可重入读写锁
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
//首先获取读锁
rwl.readLock().lock();
//发现没有缓存数据则放弃读锁,获取写锁
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
//进行锁降级
rwl.writeLock().unlock();
// Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}}

  • 2.2集合使用场景

通常可以在集合使用场景中看到ReentrantReadWriteLock的身影。不过只有在集合比较大,读操作比写操作多,操作开销大于同步开销的时候才是值得的。

实例如下:

class RWDictionary {
//集合对象
private final Map<String, Data> m = new TreeMap<String, Data>();
//读写锁
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//获取读锁
private final Lock r = rwl.readLock();
//获取写锁
private final Lock w = rwl.writeLock();

public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}}

【3】Sync、FairSync和NonfairSync

再次回顾ReentrantReadWriteLock构造方法:

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

从上面可以看到,构造方法决定了Sync是FairSync还是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。参考博文:队列同步器AQS-AbstractQueuedSynchronizer 原理分析。

① Sync的两个抽象方法

Sync中提供了很多方法,但是有两个方法是抽象的,子类必须实现。

// 如果当前线程在试图获取读取锁时由于线程等待策略而应该阻塞,则返回true。
abstract boolean readerShouldBlock();

//如果当前线程在试图获取写锁时由于线程等待策略而应该阻塞,则返回true。
abstract boolean writerShouldBlock();

writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。


② FairSync类:

static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。


③ NonfairSync 类:

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

从上面可以看到,非公平模式下,writerShouldBlock直接返回false,说明不需要阻塞,可以直接获取锁。

而readShouldBlock调用了apparentFirstQueuedIsExcluisve()方法。该方法在当前线程是写锁占用的线程时,返回true,否则返回false。即,如果当前有一个写线程正在写,那么该读线程应该阻塞。


④ Sync的几个成员变量和静态内部类

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/

static final int SHARED_SHIFT = 16;//共享移位 16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);//65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//65535

//返回shared保持的计数-无符号右移
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//返回exclusive 保持的计数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

//用于每线程读取保持计数的计数器。作为一个ThreadLocal,缓存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

⑤ 核心方法tryReadLock

//为读取执行tryLock,在两种模式下"驳船"行为都可用。这实际上与tryAcquireShared相同,只是没有调用readerShouldBlock。
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
//如果有写锁占用,返回false
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
//判断是否达到最大值
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS算法更新
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

⑥ 核心方法tryWriteLock

//为写入执行tryLock,在两种模式下"驳船"行为都可用。这实际上与tryAcquire 相同,只是没有调用writerShouldBlock。
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
//获取持有计数
int w = exclusiveCount(c);
// 0 表示没有获取到锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//已经达到最大值
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
//CAS出了问题
if (!compareAndSetState(c, c + 1))
return false;
//设置当前线程为拥有独占访问权的线程
setExclusiveOwnerThread(current);//AQS的setExclusiveOwnerThread
return true;
}

【4】ReadLock

读锁源码实例如下:

public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

//如果写锁没有被其他线程获取则获取读锁并立即返回。
// 如果写入锁由另一个线程持有,则当前线程出于线程调度目的而禁用,并处于休眠状态,直到获取了读锁。
public void lock() {
sync.acquireShared(1);//共享模式 使用AQS的acquireShared方法
}

//获取读锁除非当前线程被中断;
//如果写锁没有被其他线程获取则获取读锁并立即返回。
// 如果写入锁由另一个线程持有,则当前线程出于线程调度目的而禁用,并处于休眠状态,直到获取了读锁。
//如果读锁被当前线程获取,或者被别的线程中断了当前线程。在上述情况下,如果当前线程 在进入该方法时已经设置了中断状态或
//者在获取读锁时被中断,则抛出InterruptedException并将当前线程的中断状态清除。
//在该实现中,由于该方法是显式的中断点,因此相对于锁的正常或可重入获取,优先考虑对中断作出响应。
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//同样调用AQS的acquireSharedInterruptibly
}

//在调用时写入锁不被另一个线程持有时才获取读锁,并立即返回true。
//即使将此锁设置为使用公平排序策略,如果锁可用,则调用{tryLock()}将立即获取读取锁,而不管其他线程当前是否正在等待读取锁。
//这种“驳船行为”在某些情况下是有用的,即使它破坏了公平性。
//如果希望遵守此锁的公平性设置,则使用{@link#tryLock(long,TimeUnit)tryLock(0,TimeUnit.SECONDS)}这几乎等效(它还检测中断)。
//如果写锁被其他线程获取,则立即返回false。
public boolean tryLock() {
return sync.tryReadLock();//Sync的tryReadLock方法
}

//如果写入锁在给定的等待时间内没有被其他线程持有,并且当前线程没有被中断,则获取读取锁。
//如果写锁没有被其他线程持有则获取读锁并返回true。
//如果该锁已被设置为使用公平排序策略,那么如果任何其他线程正在等待该锁,则当前线程不会获取可用的读锁。
//这与tryLock方法形成对比。
//如果想要一个Timed的tryLock在一个公平锁上,你可以使用如下两种方式结合在一起:
* if (lock.tryLock() ||
* lock.tryLock(timeout, unit)) {
* ...
* }}
//如果写锁被其他线程持有则当前线程由于线程调度目的进入睡眠直到下面事情之一发生:
* 读锁被当前线程获取;
* 别的线程中断了当前线程
* 指定的等待时间过去。
//如果获取了读锁则返回true。
//如果当前线程 在进入该方法时已经设置了中断状态或者在获取读锁时被中断,则抛出InterruptedException并将当前线程的中断状态清除。
//如果指定等待时间过去则返回false。如果时间小于或等于零,则该方法根本不会等待。
//在该实现中,由于该方法是显式的中断点,因此优先考虑响应中断,而不是正常或可重入地获取锁及报告等待时间的流逝。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//AQS的tryAcquireSharedNanos
}

//试图释放锁,如果读取器的数量现在是零,那么写锁可以获取。
public void unlock() {
sync.releaseShared(1);//AQS的releaseShared
}

//ReadLocks不支持Condition ,会抛出UnsupportedOperationException
public Condition newCondition() {
throw new UnsupportedOperationException();
}

}

【5】WriteLock

WriteLock 源码实例如下:

public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}


//如果读锁和写锁均未被其他线程持有,则获取写锁并设置写锁计数为one。
//如果当前线程已经持有写锁,则保持计数递增1,并且方法立即返回。
//如果锁由另一个线程持有,则当前线程出于线程调度目的而禁用,并处于休眠状态,
//直到获取了写锁,此时将写锁保持计数设置为one。
public void lock() {
sync.acquire(1);//这里使用AQS的acquire方法
}
//获取写锁除非当前线程被中断。
//如果读锁和写锁均未被其他线程持有,则获取写锁并设置写锁计数为one。
//如果当前线程已经持有写锁,则保持计数递增1,并且方法立即返回。
如果锁由另一个线程持有,则当前线程出于线程调度目的而禁用,并处于休眠状态,直到下面事情发生:
* 当前线程获取了写锁;
* 别的线程中断了当前线程
//如果当前线程在进入这个方法时已经设置了它的中断状态或者当获取写锁时被中断则抛出InterruptedException并清空中断状态
//在该实现中,由于该方法是显式的中断点,因此相对于锁的正常或可重入获取,优先考虑对中断作出响应。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);//AQS的acquireInterruptibly
}

//如果没有被其他线程在调用该方法时获取写锁,则当前线程获取写锁。
//如果没有读锁或者写锁被其他线程持有,则当前线程获取写锁并返回true且设置写锁持有计数为one。
//即使锁被设置了公平策略,调用tryLock()方法将会立即获取锁(如果锁可用),不管当前是否有其他线程在等待写锁。
//这种“驳船行为”在某些情况下是有用的,即使它破坏了公平性。
//如果想保持锁的公平性,则尝试使用tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) 二者是等效的。
//如果当前线程已经持有了锁,则将持有计数+1并返回true。
//如果锁被其他线程持有则立即返回false。
public boolean tryLock( ) {
return sync.tryWriteLock();//Sync的tryWriteLock
}

//获取写锁,如果锁没有被其他线程持有在等待时间内并且当前线程没有被中断。
//如果没有读锁或者写锁被其他线程持有则获取写锁并且返回true,同时将写锁持有计数设置为one。
//如果锁被设置了公平策略,则可能获取不到可用的锁如果有其他的线程在等待这个写锁。
//这与tryLock()方法形成了对比。
//如果想要一个定时的tryLock可以使用如下方式结合:
* if (lock.tryLock() ||
* lock.tryLock(timeout, unit)) {
* ...
* }}</pre>
//如果当前线程已经持有锁,则将持久计数+1,然后返回true。
//如果锁被其他线程持有,则出于线程调度目的当前线程将会进入睡眠状态直到以下三件事情发生之一:
* 当前线程获取写锁;
* 别的线程中断了当前线程;
* 指定等待时间过去。
//如果当前线程获取到写锁则返回true并将写锁持有计数设置为one。
//如果当前线程在进入该方法时设置了中断状态或者在尝试获取写锁时被中断,则会抛出InterruptedException并将中断状态清空。
//如果指定等待时间过去,则返回false。如果time小于或者等于0,方法不会再等待。
//在该实现中,由于该方法是显式的中断点,因此优先考虑响应中断,而不是正常或可重入地获取锁及报告等待时间的流逝 。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));//AQS的tryAcquireNanos
}

//试图释放锁。如果当前线程是锁的持有者,则将持有计数减一。如果持有计数为0,则释放锁。
//如果当前线程不是锁的持有者,则抛出IllegalMonitorStateException。
public void unlock() {
sync.release(1);//AQS的release
}

//返回Lock实例使用的Condition实例-写锁支持,读锁不支持Condition。
//Condition实例支持类似于Object监视器方法如wait/notify/notifyAll的监视器方法,如Condition.await(),signal(),signalAll().
//当调用Condition方法但是写锁没有被持有则抛出IllegalMonitorStateException。
//(读取锁独立于写入锁保存,因此不会被检查或受到影响。然而,在当前线程还获取了读锁时,
//调用条件等待方法本质上总是一个错误,因为其他可能解锁的线程将无法获取写锁。)
//当Condition#await()方法被调用时,写锁被释放,并且在它们返回之前,重新获取写锁,并且锁保持计数恢复到调用方法时的状态。
//当一个线程在等待的时候被中断,则抛出InterruptedException并且将该线程的中断状态清空。
//等待线程以FIFO顺序被唤醒。
//对于从等待方法返回的线程,重新获取锁的顺序与最初获取锁的线程相同,在默认情况下,没有指定锁,
//但是对于公平锁,那些等待时间最长的线程更有利。
public Condition newCondition() {
return sync.newCondition();
}

public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}

//查询写锁是否被当前线程持有
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();//Sync的isHeldExclusively
}

//查询当前线程对这个写锁的保持次数。线程对每个未与解锁操作匹配的锁操作都持有锁
public int getHoldCount() {
return sync.getWriteHoldCount();//Sync的getWriteHoldCount
}
}

ReentrantReadWriteLock.Sync.getWriteHoldCount源码如下:

final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

waiting…