坚持看完相信你会有收获
可重入锁, 表示同一个线程可重复对某个资源进行上锁. 下面为了更好的理解可重入锁, 先来了解下不可重入锁.
简单写了示例帮助了解不可重入锁
// 在Java中, 可以很好的设计一种不可重入锁.
class NonReentrantLock {
private volatile boolean isLock;
private Thread ownerThread;
// 获取锁的方法
public synchronized void lock() {
// 判断锁的同时没有判断获取锁的线程, 导致线程重复获取锁会睡眠
// 关键的地方就是下面的while循环.
while (isLock) {
try {
// 让线程进入睡眠的方法.
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ownerThread = Thread.currentThread();
isLock = true;
}
// 释放锁的方法
public synchronized void unlock() {
if (ownerThread != null && ownerThread == Thread.currentThread()) {
isLock = false;
notifyAll();
} else {
throw new IllegalMonitorStateException();
}
}
}
}
上面的代码是简单写了一个不可重入锁. 当其他线程试图获取锁或已经获取锁的线程再次获取锁时, 会一直阻塞, 最后导致线程睡眠.
下面对上面的代码进行改造, 使其成为可重入锁.
private static class MyReentrantLock {
// 查看是否已经上锁.
private volatile boolean isLock;
private Thread ownerThread;
// 查看锁状态.
private volatile int state;
// 获取锁
public synchronized void lock() {
// 获取锁时, 通过判断已经占用锁的线程是否为当前线程
while (isLock && ownerThread != Thread.currentThread()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ownerThread = Thread.currentThread();
isLock = true;
state++;
}
// 释放锁
public synchronized void unlock() {
if (ownerThread != null && ownerThread == Thread.currentThread()) {
isLock = false;
state--;
if (state == 0) {
notifyAll();
}
} else {
throw new IllegalMonitorStateException();
}
}
}
可以自己写几个线程, 对其进行测试, 同一个线程对其加锁, 并不会阻塞.
上面的可重入锁是一个粗糙的版本, 有很多需要优化的地方. 接下来逐步优化, 使其变得更强大.
第一步优化, 先把isLock
变量去掉. 因为该变量再加入state
变量之后就有点多余. 使用state变量足以表示锁的状态.
- state == 0 表示锁闲, 没有线程占用.
- state > 0 表示锁忙, 数字的大小表示加锁的次数.
于是删除isLock
成员变量, 并且修改lock方法和unlock方法.
private static class MyReentrantLock {
private Thread ownerThread;
// 锁状态.
private volatile int state;
// 获取锁
public synchronized void lock() {
// 通过state变量进行判断锁状态
while (state > 0 || ownerThread != Thread.currentThread()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
thread = Thread.currentThread();
state++;
}
// 释放锁
public synchronized void unlock() {
// 通过ownerThread判断当前是否有线程占用
if (ownerThread != null && ownerThread == Thread.currentThread())
state--;
if (state == 0) {
notifyAll();
}
} else {
throw new IllegalMonitorStateException();
}
}
}
现在代码看起来清晰了, 但是不高效. 因为依赖synchronized
关键字保证并发抢锁的正确性. 关于 synchronized
为什么不高效,以及运行原理先不在这里展开述说了.
下一步是把synchronized
去掉, 换成一个更高效的方式来保证线程并发抢锁正确.
这里引入CAS(CompareAndSwap), 先对其简单介绍一下: 假设有一个变量money
, 其值为100, 现在A线程期望修改成200, B线程期望修改成300. 两个线程同时cas修改money
变量, 则操作系统能保证只有一个线程会成功, 另一个线程会失败. 成功的线程返回true, 失败的线程返回false.
为什么要引入CAS呢?
多个线程同时去抢锁的时候, 可以不让其进入睡眠. 减少线程上下文切换从而提高性能.
要想在java中使用cas操作, 需要通过Unsafe类, 使用Unsafe这里就不展开叙述了. 可以自行查看美团技术团队出品的文章.
假设现在已经有如下可用的CAS操作, 不用纠结它是怎么实现的, 只需要知道它能完成上面提到原子功能.
// compareAndSetState 方法签名如下
// 对state变量进行cas赋值操作
private final boolean compareAndSetState(int expect, int update);
尝试使用compareAndSetState
重写lock方法.
第一次可能写出是下面这个样子:
public void lock() {
for(;;) {
// 获取当前锁的状态
int currentState = state;
// 如果锁是空闲, 尝试cas设置锁状态
if (currentState == 0 && compareAndSetState(currentState, currentState + 1)) {
// 设置成功, 则保存获取锁的线程
ownerThread = Thread.currentThread();
return;
// 如果锁忙, 则判断是否当前线程获取锁
} else if (ownerThread == Thread.currentThread()) {
// 当前线程占用锁, 修改状态. 这里没有并发, 直接赋值就可以.
state++;
return;
}
// 不满足上面的条件,则会进入下一次for循环.
}
}
public void unlock() {
// 判断是否获取锁的线程
if (ownerThread != null && ownerThread == Thread.currentThread()) {
// 是获取锁的线程, 修改state状态
state--;
// 判断是否释放锁
if (state == 0) {
// 将上次保存的线程清除.
thread = null;
}
} else {
throw new IllegalMonitorStateException();
}
}
现在感觉一切很完美, 因为已经实现了主体功能, 但是有比较严重的问题.
lock
方法不让线程睡眠, 存在大量并发情况将使用很多CPU资源.- 在
unlock
方法中, 对thread修改的地方也有并发情况, 假设unlock
中已经调用了state--
, 并且操作完成之后state
等于0, 而另一个线程这时调用了lock
方法, 并且lock
方法成功了, 在lock
方法中设置的ownerThread
为调用lock
的线程. 而unlock
中会清除调用lock
的线程, 导致错误产生.
修复上面的第二个问题比较容易. 在lock
方法获取锁的时候, 判断占用线程是否为null, 并且可以给ownerThread
用volatile
修饰. 保证其可见性.
🤔思考: 不对
ownerThread
用volatile
修饰是否有问题?
欢迎评论讨论.
public void lock() {
for(;;) {
int currentState = state;
// 只需要将下面的判断代码修改下. 并且用volatile修饰ownerThread
if (ownerThread == null && currentState == 0 && compareAndSetState(currentState, currentState + 1)) {
ownerThread = Thread.currentThread();
return;
} else if (ownerThread == Thread.currentThread()) {
state++;
return;
}
}
}
现在已经解决了第二个问题, 下面解决第一个问题, 若想减少对CPU的浪费, 则需要让获取不到锁的线程进入睡眠状态, 等待占用锁的线程使用完成锁之后, 唤醒睡眠的线程.
因此问题发生了变化:
- 如何让线程进入睡眠, 并且唤醒
- 如何保存进入睡眠的线程
来看第一个问题, 如果让线程进入睡眠, 并且唤醒. 使用wait
和noitfy
并不可行, 具体原因可以自行分析.
那么是否可以使用sleep(long)
方法呢? 我觉得是可以的, sleep方法指定一个时间, 可以给其Long.MAX_VALUE
, 睡眠三亿年, 相当于永久睡眠了, 唤醒需要调用thread.interrupt()
方法.
为什么在AQS中没有使用
sleep(long)
方法呢?
首先使用sleep(long)
方法并不优雅, 并且interrupt()
方法需要在sleep(long)
方法调用之后调用才能唤醒. 若先调用interrupt
方法, 再调用sleep
方法, 则不会对线程进行唤醒(不会触发受检异常).
有一种使线程进入睡眠的方法是LockSupport#park()
, 它是基于Unsafe
实现的. LockSupport
非常简单易用, 使用起来比sleep(long)
更优雅. 可以理解为LockSupport
维护一个permit,其值最大为1, 调用park
将permit减一, 调用unpark
将permit加一. 开始每个线程permit为0, 调用park
先将permit减一, 之后判断permit是否小于0, 若小于0, 则进入睡眠, 等待其他线程调用unpark
将permit加一后, 将睡眠的线程唤醒. 需要注意多次连续调用unpark, permit也是1.
总体来说, LockSupport
提供的睡眠唤醒方法能够解决sleep(long)
方法存在的问题.
现在来说第二个问题, 答案比较简单, 直接用链表保存即可. 但是里面涉及到并发, 难点也在这里.
假设封装需要睡眠的线程为Node
, 里面有一个指向下一个Node
的引用, Node
中未来还可以保存一些状态. 这样就会形成一个等待的链表, 而比较难的地方就在于多线程并发抢锁的过程, 以及抢锁和释放锁过程的同时进行, 要在多线程的操作下维护该链表是需要思考的地方.
线程封装为链表中的节点.
private static class Node {
Thread thread;
Node next;
Node(Thread thread) {
this.thread = thread;
}
}
下面直接来看代码, 调用获取锁的lock
方法, 只要发现锁已经被占用, 则会加入到队列中. 等待被唤醒抢锁, 而线程被唤醒的顺序就是加入的顺序.
private static class MyReentrantLock {
private volatile int state;
private volatile Thread ownerThread;
private volatile Node head;
private volatile Node tail;
// 获取锁
public void lock() {
for (;;) {
int currentState = state;
if (currentState == 0 && compareAndSetStatus(currentState,currentState + 1)) {
ownerThread = Thread.currentThread();
return;
} else if(currentState > 0 && ownerThread == Thread.currentThread()) {
state++;
return;
} else {
// 加入链表中,并且睡眠等待唤醒
addQueue();
LockSupport.park();
}
}
}
// 添加到链表中
private void addQueue() {
Node node = new Node(Thread.currentThread());
// 队列还为空
for (;;) {
Node tail = this.tail;
if (tail == null) {
// 初始化队列
if (compareAndSetHead(node)) {
this.tail = node;
return;
}
} else {
// 添加到队尾
if (compareAndSetTail(tail, node)) {
tail.next = node;
return;
}
}
}
}
// 释放锁
public void unlock() {
if (ownerThread != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
if (--state == 0) {
// 唤醒下一个抢锁的线程.
ownerThread = null;
unparkSuccessor();
}
}
// 唤醒下一个线程
private void unparkSuccessor() {
Node head = this.head;
if (head != null) {
this.head = head;
LockSupport.unpark(head.thread);
}
}
// 内部对等待线程的数据抽象
private static class Node {
Thread thread;
Node next;
Node(Thread thread) {
this.thread = thread;
}
}
// 下面代码只看方法名字
// 封装只是为了进行CAS操作
private static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
return null;
}
}
private static final Unsafe UNSAFE = getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
static {
try {
stateOffset = UNSAFE.objectFieldOffset(MyReentrantLock.class.getDeclaredField("state"));
headOffset = UNSAFE.objectFieldOffset(MyReentrantLock.class.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset(MyReentrantLock.class.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
private boolean compareAndSetStatus(int expect, int update) {
return UNSAFE.compareAndSwapInt(this, stateOffset, expect, update);
}
private boolean compareAndSetHead(Node update) {
return UNSAFE.compareAndSwapObject(this, headOffset, null, update);
}
private boolean compareAndSetTail(Node expect, Node update) {
return UNSAFE.compareAndSwapObject(this, tailOffset, expect, update);
}
}
上面代码还有一些问题
比如当释放锁的时候, 有个线程也来获取锁, 则链表中的等待线程怎么运行?
欢迎读者评论提出自己的想法(思考的过程,就是提高的过程), 以及如何可以更好提高该重入锁, 最后再去看AQS的源码. 这时候有一首诗特别适合 <<题西林壁>> 横看成岭侧成峰,远近高低各不同.
结束语
关于AQS, 还写了一篇文章,欢迎大家前去提提意见. AQS可以查看该文章.
创作不宜, 点个赞再走呗~ 可怜的目光~~~
文章总结:
- 线程睡眠的方式
- CAS操作的封装
- 并发情况操作链表(队列)
- 可重入锁的基本实现思路
转载:https://blog.csdn.net/weixin_37150792/article/details/105767456