飞道的博客

【Java 并发编程实战】使用 AQS 实现一个简单的互斥锁

334人阅读  评论(0)

使用 AQS 实现一个简单的互斥锁

AQS 是什么?

参考[2]。


  
  1. /**
  2. * Returns a collection containing threads that may be waiting to
  3. * acquire. Because the actual set of threads may change
  4. * dynamically while constructing this result, the returned
  5. * collection is only a best-effort estimate. The elements of the
  6. * returned collection are in no particular order. This method is
  7. * designed to facilitate construction of subclasses that provide
  8. * more extensive monitoring facilities.
  9. *
  10. * @return the collection of threads
  11. */
  12. public final Collection<Thread> getQueuedThreads() {
  13. ArrayList<Thread> list = new ArrayList<Thread>();
  14. for (Node p = tail; p != null; p = p.prev) {
  15. Thread t = p.thread;
  16. if (t != null)
  17. list.add(t);
  18. }
  19. return list;
  20. }
  21. /**
  22. * Returns a collection containing threads that may be waiting to
  23. * acquire in exclusive mode. This has the same properties
  24. * as {@link #getQueuedThreads} except that it only returns
  25. * those threads waiting due to an exclusive(独有的;排外的;专一的) acquire.
  26. *
  27. * @return the collection of threads
  28. */
  29. public final Collection<Thread> getExclusiveQueuedThreads() {
  30. ArrayList<Thread> list = new ArrayList<Thread>();
  31. for (Node p = tail; p != null; p = p.prev) {
  32. if (!p.isShared()) {
  33. Thread t = p.thread;
  34. if (t != null)
  35. list.add(t);
  36. }
  37. }
  38. return list;
  39. }

CAS自旋

CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作

CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B。


  
  1. if V==A ( Compare)
  2. then V=B (Swap)

因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。

java.util.concurrent.locks static final class AbstractQueuedSynchronizer.Node extends Object

Wait queue node class.
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks(自旋锁). We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait.
To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.

Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts.
The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/
We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.)
Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility.
CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.
Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on.
Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class. < 1.8 >


  
  1. /**
  2. * Creates and enqueues node for current thread and given mode.
  3. *
  4. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  5. * @return the new node
  6. */
  7. private Node addWaiter(Node mode) {
  8. Node node = new Node(Thread.currentThread(), mode);
  9. // Try the fast path of enq; backup to full enq on failure
  10. Node pred = tail;
  11. if (pred != null) {
  12. node.prev = pred;
  13. if (compareAndSetTail(pred, node)) {
  14. pred.next = node;
  15. return node;
  16. }
  17. }
  18. enq(node);
  19. return node;
  20. }
  21. // Queuing utilities
  22. /**
  23. * The number of nanoseconds for which it is faster to spin
  24. * rather than to use timed park. A rough estimate suffices
  25. * to improve responsiveness with very short timeouts.
  26. */
  27. static final long spinForTimeoutThreshold = 1000L;
  28. /**
  29. * Inserts node into queue, initializing if necessary. See picture above.
  30. * @param node the node to insert
  31. * @return node's predecessor
  32. */
  33. private Node enq(final Node node) {
  34. for (;;) {
  35. Node t = tail;
  36. if (t == null) { // Must initialize
  37. if (compareAndSetHead( new Node()))
  38. tail = head;
  39. } else {
  40. node.prev = t;
  41. if (compareAndSetTail(t, node)) {
  42. t.next = node;
  43. return t;
  44. }
  45. }
  46. }
  47. }

简单互斥锁实现代码


  
  1. package com.light.sword
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer
  3. /**
  4. * @author: Jack
  5. * 2020-02-10 15:42
  6. * 使用 AQS 实现一个简单的互斥锁
  7. */
  8. class MutexDemo {
  9. private val sync = Sync()
  10. fun lock() {
  11. sync.acquire( 0)
  12. }
  13. fun unlock() {
  14. sync.release( 0)
  15. }
  16. class Sync : AbstractQueuedSynchronizer() {
  17. private val LOCKED = 1
  18. private val UNLOCKED = 0
  19. init {
  20. state = UNLOCKED
  21. }
  22. override fun tryAcquire(arg: Int): Boolean {
  23. return compareAndSetState(UNLOCKED, LOCKED)
  24. }
  25. override fun tryRelease(arg: Int): Boolean {
  26. state = UNLOCKED
  27. return true
  28. }
  29. }
  30. }

怎样使用互斥锁


  
  1. fun main() {
  2. for (i in 0. .10) {
  3. Thread {
  4. task(Thread.currentThread().name)
  5. }.start()
  6. }
  7. val mutex = MutexDemo()
  8. for (i in 0. .10) {
  9. Thread {
  10. mutex.lock()
  11. task(Thread.currentThread().name)
  12. mutex.unlock()
  13. }.start()
  14. }
  15. }
  16. fun task(threadName: String) {
  17. println( "-------------")
  18. println( "${System.currentTimeMillis()} : $threadName")
  19. Thread.sleep( 1000)
  20. }
  21. // 输出:
  22. -------------
  23. -------------
  24. 1581489100026 : Thread -1
  25. -------------
  26. -------------
  27. 1581489100026 : Thread -2
  28. -------------
  29. 1581489100026 : Thread -5
  30. -------------
  31. 1581489100026 : Thread -0
  32. -------------
  33. 1581489100026 : Thread -6
  34. -------------
  35. 1581489100027 : Thread -10
  36. -------------
  37. -------------
  38. 1581489100027 : Thread -9
  39. 1581489100026 : Thread -4
  40. -------------
  41. 1581489100027 : Thread -3
  42. 1581489100026 : Thread -8
  43. 1581489100027 : Thread -7
  44. -------------
  45. 1581489100027 : Thread -11
  46. -------------
  47. 1581489101028 : Thread -12
  48. -------------
  49. 1581489102029 : Thread -13
  50. -------------
  51. 1581489103030 : Thread -14
  52. -------------
  53. 1581489104031 : Thread -15
  54. -------------
  55. 1581489105032 : Thread -16
  56. -------------
  57. 1581489106034 : Thread -17
  58. -------------
  59. 1581489107036 : Thread -18
  60. -------------
  61. 1581489108039 : Thread -19
  62. -------------
  63. 1581489109042 : Thread -20
  64. -------------
  65. 1581489110046 : Thread -21

参考资料:

[1] https://www.jianshu.com/p/282bdb57e343

[2] Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释:
https://blog.csdn.net/universsky2015/article/details/95813887


Kotlin 开发者社区

国内第一Kotlin 开发者社区公众号,主要分享、交流 Kotlin 编程语言、Spring Boot、Android、React.js/Node.js、函数式编程、编程思想等相关主题。

越是喧嚣的世界,越需要宁静的思考。


转载:https://blog.csdn.net/universsky2015/article/details/104292256
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场