0
点赞
收藏
分享

微信扫一扫

深入学习java源码之ReadWriteLock.readLock()与ReadWriteLock.writeLock()


深入学习java源码之ReadWriteLock.readLock()与ReadWriteLock.writeLock()

假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源。但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写(译者注:也就是说:读-读能共存,读-写不能共存,写-写不能共存)。这就需要一个读/写锁来解决这个问题。

对于lock的读写锁,可以通过new ReentrantReadWriteLock()获取到一个读写锁。所谓读写锁,便是多线程之间读不互斥,读写互斥。读写锁是一种自旋锁,如果当前没有读者,也没有写者,那么写者可以立刻获得锁,否则它必须自旋在那里,直到没有任何写者或读者。如果当前没有写者,那么读者可以立即获得该读写锁,否则读者必须自旋在那里,直到写者释放该锁。

我们假设对写操作的请求比对读操作的请求更重要,就要提升写请求的优先级。此外,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象。请求写操作的线程会一直阻塞,直到所有的读线程都从ReadWriteLock上解锁了。如果一直保证新线程的读操作权限,那么等待写操作的线程就会一直阻塞下去,结果就是发生“饥饿”。因此,只有当没有线程正在锁住ReadWriteLock进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续。

两个线程在一起进行读操作

public class Main {  
public static void main(String[] args) {
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final Main m = new Main();

new Thread(){
public void run() {
m.read(Thread.currentThread());
};
}.start();

new Thread(){
public void run() {
m.read(Thread.currentThread());
};
}.start();
}
public void read(Thread thread) {
readWriteLock.readLock().lock();
try {
long startTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startTime <= 1) {
System.out.println(thread.getName()+"线程在进行读操作");
}
System.out.println(thread.getName()+"线程完成读操作");
} finally {
readWriteLock.unlock();
}
}
}

可以看到两个线程同时进行读操作,效率大大的提升了。但是要注意的是,如果一个线程获取了读锁,那么另外的线程想要获取写锁则需要等待释放;而如果一个线程已经获取了写锁,则另外的线程想获取读锁或写锁都需要等待写锁被释放。

 

当其它线程没有对共享资源进行读操作或者写操作时,某个线程就有可能获得该共享资源的写锁,进而对共享资源进行写操作。有多少线程请求了写锁以及以何种顺序请求写锁并不重要,除非你想保证写锁请求的公平性。

public class Main {
//静态内部类实现线程共享
static class Example{
//创建lock
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//读操作
public void read(){
//获取读锁并上锁
lock.readLock().lock();
try{
System.out.println("读线程开始");
Thread.sleep(1000);
System.out.println("读线程结束");
}catch (Exception e){
e.printStackTrace();
}finally{
//解锁
lock.readLock().unlock();
}
}
//写操作
public void write(){
//获取写锁并上锁
lock.writeLock().lock();
try{
System.out.println("写线程开始");
Thread.sleep(1000);
System.out.println("写线程结束");
}catch (Exception e){
e.printStackTrace();
}finally {
//解锁
lock.writeLock().unlock();
}
}
}

public static void main(String[] args) {
final Example example = new Example();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
example.read();
example.write();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
example.read();
example.write();
}
}
}).start();
}
}

运行结果:

深入学习java源码之ReadWriteLock.readLock()与ReadWriteLock.writeLock()_读写锁

根据结果可以发现,多线程下ReentrantReadWriteLock可以多线程同时读,但写的话同一时刻只有一个线程执行。

 

读写锁的应用场景,我们可以利用读写锁可是实现一个多线程下数据缓存的功能,具体实现思路如下:

class dataCatch{
Object data; //缓存的数据
public volatile Boolean isCatch = false; //是否有缓存
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //生成读写锁

public void process(){
lock.readLock().lock(); //先加读锁,此时数据不会被修改
//数据没有缓存
if(!isCatch){
lock.readLock().unlock(); //解读锁
lock.writeLock().lock(); //加写锁,此时数据不会被读到
/********
*
* 执行数据查询操作并赋值给data
*
********/
isCatch = true;
lock.readLock().lock(); //先加读锁后解写锁
lock.writeLock().lock();
}
/********
*
* 放回data数据给用户
*
********/
lock.readLock().unlock(); //解读锁
}
}

 

总结:

1.synchronized是Java的关键字,是内置特性,而Lock是一个接口,可以用它来实现同步。

2.synchronized同步的时候,其中一条线程用完会自动释放锁,而Lock需要手动释放,如果不手动释放,可能会造成死锁。

3.使用synchronized如果其中一个线程不释放锁,那么其他需要获取锁的线程会一直等待下去,知道使用完释放或者出现异常,而Lock可以使用可以响应中断的锁或者使用规定等待时间的锁

4.synchronized无法得知是否获取到锁,而Lcok可以做到。

5.用ReadWriteLock可以提高多个线程进行读操作的效率。

所以综上所述,在两种锁的选择上,当线程对于资源的竞争不激烈的时候,效率差不太多,但是当大量线程同时竞争的时候,Lock的性能会远高于synchronized。

 

java源码

A ​​ReadWriteLock​​维护一对关联的​​locks​​ ,一个用于只读操作,一个用于写入。 ​​read lock​​可以由多个阅读器线程同时进行,只要没有作者。 ​​write lock​​是独家的。

所有​​ReadWriteLock​​​实现必须保证的存储器同步效应​​writeLock​​​操作(如在指定​​Lock​​接口)也保持相对于所述相关联的​​readLock​​ 。 也就是说,一个线程成功获取读锁定将会看到在之前发布的写锁定所做的所有更新。

读写锁允许访问共享数据时的并发性高于互斥锁所允许的并发性。 它利用了这样一个事实:一次只有一个线程( 写入线程)可以修改共享数据,在许多情况下,任何数量的线程都可以同时读取数据(因此读取器线程)。 从理论上讲,通过使用读写锁允许的并发性增加将导致性能改进超过使用互斥锁。 实际上,并发性的增加只能在多处理器上完全实现,然后只有在共享数据的访问模式是合适的时才可以。

读写锁是否会提高使用互斥锁的性能取决于数据被读取的频率与被修改的频率相比,读取和写入操作的持续时间以及数据的争用 - 即是,将尝试同时读取或写入数据的线程数。 例如,最初填充数据的集合,然后经常被修改的频繁搜索(例如某种目录)是使用读写锁的理想候选。 然而,如果更新变得频繁,那么数据的大部分时间将被专门锁定,并且并发性增加很少。 此外,如果读取操作太短,则读写锁定实现(其本身比互斥锁更复杂)的开销可以支配执行成本,特别是因为许多读写锁定实现仍将序列化所有线程通过小部分代码。 最终,只有剖析和测量将确定使用读写锁是否适合您的应用程序。

虽然读写锁的基本操作是直接的,但是执行必须做出许多策略决策,这可能会影响给定应用程序中读写锁定的有效性。 这些政策的例子包括:

在评估应用程序的给定实现的适用性时,应考虑所有这些问题。

  • 在写入器释放写入锁定时,确定在读取器和写入器都在等待时是否授予读取锁定或写入锁定。 作家偏好是常见的,因为写作预计会很短,很少见。 读者喜好不常见,因为如果读者经常和长期的预期,写作可能导致漫长的延迟。 公平的或“按顺序”的实现也是可能的。
  • 确定在读卡器处于活动状态并且写入器正在等待时请求读取锁定的读取器是否被授予读取锁定。 读者的偏好可以无限期地拖延作者,而对作者的偏好可以减少并发的潜力。
  • 确定锁是否可重入:一个具有写锁的线程是否可以重新获取? 持有写锁可以获取读锁吗? 读锁本身是否可重入?
  • 写入锁可以降级到读锁,而不允许插入写者? 读锁可以升级到写锁,优先于其他等待读者或作者吗?

Modifier and Type

Method and Description

​Lock​

​readLock()​

返回用于阅读的锁。

​Lock​

​writeLock()​

返回用于写入的锁。

package java.util.concurrent.locks;

public interface ReadWriteLock {
Lock readLock();

Lock writeLock();
}

 

Modifier and Type

Method and Description

​protected Thread​

​getOwner()​

返回当前拥有写锁的线程,如果不拥有,则返回 ​​null​​ 。

​protected Collection<Thread>​

​getQueuedReaderThreads()​

返回一个包含可能正在等待获取读取锁的线程的集合。

​protected Collection<Thread>​

​getQueuedThreads()​

返回一个包含可能正在等待获取读取或写入锁定的线程的集合。

​protected Collection<Thread>​

​getQueuedWriterThreads()​

返回一个包含可能正在等待获取写入锁的线程的集合。

​int​

​getQueueLength()​

返回等待获取读取或写入锁定的线程数的估计。

​int​

​getReadHoldCount()​

查询当前线程对此锁的可重入读取保留数。

​int​

​getReadLockCount()​

查询为此锁持有的读取锁的数量。

​protected Collection<Thread>​

​getWaitingThreads(Condition​

返回包含可能在与写锁相关联的给定条件下等待的线程的集合。

​int​

​getWaitQueueLength(Condition​

返回与写入锁相关联的给定条件等待的线程数的估计。

​int​

​getWriteHoldCount()​

查询当前线程对此锁的可重入写入数量。

​boolean​

​hasQueuedThread(Thread​

查询给定线程是否等待获取读取或写入锁定。

​boolean​

​hasQueuedThreads()​

查询是否有任何线程正在等待获取读取或写入锁定。

​boolean​

​hasWaiters(Condition​

查询任何线程是否等待与写锁相关联的给定条件。

​boolean​

​isFair()​

如果此锁的公平设置为true,则返回 ​​true​​ 。

​boolean​

​isWriteLocked()​

查询写锁是否由任何线程持有。

​boolean​

​isWriteLockedByCurrentThread()​

查询写锁是否由当前线程持有。

​ReentrantReadWriteLock.ReadLock​

​readLock()​

返回用于阅读的锁。

​String​

​toString()​

返回一个标识此锁的字符串以及其锁定状态。

​ReentrantReadWriteLock.WriteLock​

​writeLock()​

返回用于写入的锁。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

private transient ThreadLocalHoldCounter readHolds;

private transient HoldCounter cachedHoldCounter;

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

abstract boolean readerShouldBlock();

abstract boolean writerShouldBlock();


protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}

protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}

final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

// Methods relayed to outer class

final ConditionObject newCondition() {
return new ConditionObject();
}

final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}

final int getReadLockCount() {
return sharedCount(getState());
}

final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;

Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;

HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;

int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}

private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}

final int getCount() { return getState(); }
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void unlock() {
sync.releaseShared(1);
}

public Condition newCondition() {
throw new UnsupportedOperationException();
}

public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}

public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

public void lock() {
sync.acquire(1);
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock( ) {
return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}

public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

// Instrumentation and status
public final boolean isFair() {
return sync instanceof FairSync;
}

protected Thread getOwner() {
return sync.getOwner();
}

public int getReadLockCount() {
return sync.getReadLockCount();
}

public boolean isWriteLocked() {
return sync.isWriteLocked();
}

public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}

public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}

public int getReadHoldCount() {
return sync.getReadHoldCount();
}

protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}

protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}

public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}

public final int getQueueLength() {
return sync.getQueueLength();
}

protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

public String toString() {
int c = sync.getCount();
int w = Sync.exclusiveCount(c);
int r = Sync.sharedCount(c);

return super.toString() +
"[Write locks = " + w + ", Read locks = " + r + "]";
}

static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}

}

 

 

 

 

 

举报

相关推荐

0 条评论