小言_互联网的博客

Java并发编程应该掌握的并发工具类,快来看看你掌握了哪些?

318人阅读  评论(0)

1、JUC是什么?

JUC是java并发包java.util.concurrent的缩写,就是这么简单明了~~~

在我们JDK的并发包中,提供了几个非常有用的并发工具类,比如:CountDownLatch 闭锁、CyclicBarrier 同步屏障、Semaphore 信号量,在线程之间交换数据的一种方式 Exchanger,赶紧操练起来。

2、CountDownLatch 闭锁

这个CountDownLatch可以说是Join()方法的升级版,join用于让当前执行的线程,等待调用join的线程执行结束,然后在接着往下执行。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/5/8 14:16
  4. * @Description: join
  5. */
  6. public class JoinTest {
  7. static class JoinThread extends Thread {
  8. @Override
  9. public void run() {
  10. System.out.println( "JoinThread thread running");
  11. try {
  12. Thread.sleep( 2000L);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println( "JoinThread thread over");
  17. }
  18. }
  19. public static void main(String[] args) {
  20. System.out.println( "main is running");
  21. JoinThread joinThread = new JoinThread();
  22. joinThread.start();
  23. try {
  24. // 让主线线程等待joinThread执行完成,才继续往下执行
  25. joinThread.join();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. System.out.println( "main is over");
  30. }
  31. }

执行结果如下:

main is running
JoinThread thread running
JoinThread thread over
main is over

从代码可以,在main方法中调用了JoinThread的join方法,mian这个主线程,就必须等到JoinThread这个线程执行完毕才会接着执行。 join的原理其实就是不停的去检查join的线程是否还存活,如果join线程存活则让当前线程永远等待。一直到join线程执行完了,线程的this.notifyAll()就会被调用,调用notifyAll()方法是在JVM里面进行实现的。

join部分源码如下,其中wait(0)就是表示永远等待下去,isAlive()检查是否存活。


  
  1. while (isAlive()) {
  2. wait( 0);
  3. }

在回过头来看CountDownLatch,在JDK1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,它允许一个或者多个线程等待其他线程完成操作。

我们先来看看它的构造函数:


  
  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException( "count < 0");
  3. this.sync = new Sync(count);
  4. }

CountDownLatch的构造函数接受一个int类型的参数作为计数器。假设现在启动一个任务,需要开启5个线程去完成初始化工作,一直到初始化的工作全部做完了,业务线程和主线才接着执行后续代码,所以这里构造函数中我们就需要传5进去。

常用的方法如如下:


  
  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly( 1);
  3. }

  
  1. public void countDown() {
  2. sync.releaseShared( 1);
  3. }

当我们调用CountDownLatch的await()方法时,该线程会被阻塞,一直到计数器变成0才被唤醒。

countDown()方法调用一次,计数器就会减1,一直到计数器被减到0时,调用await()的线程才可以继续执行。

注意:计数器必须大于等于0,只是如果等于0的时候,计数器就是0,线程调用await()方法也不会进入阻塞状态。

代码演示:现在启动一个任务,需要等待5个线程全部初始化完毕之后,主线程和业务线程才可以执行执行。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/4/29 16:44
  4. * @Description: Show CountDownLatch test
  5. */
  6. public class CountDownLatchTest {
  7. // 一个线程也可以进行减多次
  8. static CountDownLatch latch = new CountDownLatch( 5);
  9. /**
  10. * 初始化线程
  11. */
  12. static class InitTask implements Runnable {
  13. @Override
  14. public void run() {
  15. try {
  16. Thread.sleep( 1000L);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. System.out.println(Thread.currentThread().getId() + "InitTask thread init ");
  21. // 计数器 -1
  22. latch.countDown();
  23. }
  24. }
  25. /**
  26. * 业务线程
  27. */
  28. static class BusinessTask implements Runnable {
  29. @Override
  30. public void run() {
  31. try {
  32. // 等待初始化线程完成
  33. latch.await();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. System.out.println( "Business thread start work");
  38. }
  39. }
  40. public static void main(String[] args) {
  41. // 初始化5个线程
  42. for ( int i = 0; i < 5; i++) {
  43. Thread thread = new Thread( new InitTask());
  44. thread.start();
  45. }
  46. // 启动业务线程,必须要等到初始化线程5个全部完成,才开始执行业务代码
  47. new Thread( new BusinessTask()).start();
  48. try {
  49. // 主线程也要等待初始化线程完成,才接着往下走
  50. latch.await();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. // 必须要等到初始化线程5个全部完成,才会走到这里
  55. System.out.println( "Main thread is start work");
  56. }
  57. }

  执行结果如下:

13InitTask thread init 
10InitTask thread init 
14InitTask thread init 
11InitTask thread init 
12InitTask thread init 
Main thread is start work
Business thread start work

CountDownLatch还有一个await带参数的方法,如果初始化线程执行特别慢,或者因为异常了没有正常调用countDown()方法,我们也不可能让其他线程一直等着,所以可以使用另外一个带指定时间的await方法,该方法等待特定的时间以后,就不会再阻塞当前线程。


  
  1. public boolean await(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos( 1, unit.toNanos(timeout));
  4. }

 

3、CyclicBarrier 同步屏障

它要做的事情就是让一组线程达到一个同步点(也可以说屏障),等到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截点线程才会继续运行,并且可以循环使用。

举个例子,森林冰火双人游戏不知道小伙伴有没有玩过,必须要两个玩家全部到达门时,门(屏障)才会开。

CyclicBarrier提供了两种构造函数:

第一种构造函数:


  
  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }

默认的构造函数就是CyclicBarrier(int parites),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier 我已经到达了屏障,当然当前线程被阻塞。 一直到指定数量的线程被拦截,才能放行。

调用await(),相当于计数器加1,一直达到等于parties数量,则屏障打开。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/4/29 17:17
  4. * @Description: Show CyclicBarrier test
  5. */
  6. public class CyclicBarrierTest {
  7. // 由线程本身控制是否完成
  8. static CyclicBarrier barrier = new CyclicBarrier( 2);
  9. // 初始化线程
  10. static class InitTask extends Thread {
  11. @Override
  12. public void run() {
  13. try {
  14. // 随机睡眠
  15. Thread.sleep( new Random().nextInt( 2000));
  16. System.out.println(Thread.currentThread().getName() + "到达终点");
  17. barrier.await();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. // 必须冰妹妹,和火哥哥两个玩家全部执行完await()方法,才算成功
  22. System.out.println( "恭喜" + Thread.currentThread().getName() + "闯关成功");
  23. }
  24. }
  25. public static void main(String[] args) {
  26. InitTask initMM = new InitTask();
  27. initMM.setName( "冰妹妹");
  28. InitTask initGG = new InitTask();
  29. initGG.setName( "火哥哥");
  30. initGG.start();
  31. initMM.start();
  32. }
  33. }

 执行结果如下:

冰妹妹到达终点
       火哥哥到达终点
       恭喜火哥哥闯关成功
       恭喜冰妹妹闯关成功

由结果可见,必须要冰妹妹、火哥哥全部达到终点,双方才能被放行。

被放行的线程可能是冰妹妹优先执行、也有可能是火哥哥优先执行,都是由CPU决定的。

如果把new CyclicBarrier(2) 修改为 new CyclicBarrier(3) ,则先到达屏障的线程会一直处于等待状态,因为没有第三个线程调用await()方法,所以之前达到屏障的线程都不会继续执行。

第二种构造函数:


  
  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }

CyclicBarrier()还提供了更高级的构造函数,在全部线程都到达屏障的时候,优先执行barrierAction,方便处理更复杂的业务场景。 假设冰妹妹、火哥哥闯关成功了,需要优先加载下一个关卡资源,来提高用户体验。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/4/29 17:17
  4. * @Description: Show CyclicBarrier test
  5. */
  6. public class CyclicBarrierTest {
  7. // 控制一组线程同时完全的情况下,代码接着执行,有线程本身控制是否完成
  8. static CyclicBarrier barrier = new CyclicBarrier( 2, new BusinessTask());
  9. // 业务线程
  10. static class BusinessTask implements Runnable {
  11. @Override
  12. public void run() {
  13. System.out.println( "开始加载下一关资源");
  14. }
  15. }
  16. // 代码省略
  17. }

执行结果如下:

火哥哥到达终点
       冰妹妹到达终点
       开始加载下一关资源
       恭喜冰妹妹闯关成功
       恭喜火哥哥闯关成功

循环使用:

这个森林冰火小游戏课时有很多关卡的,每一个关卡都是需要两个玩家同时到达屏障才算闯关成功。这里CyclicBarrier也是可以重复使用的,假设闯完第一关之后,继续闯第二关。

同步屏障计数器初始化还是2,但是我们开启了4个线程来执行,结果依然是正确,所以能够体现CyclicBarrier是可以循环使用的。


  
  1. // 控制一组线程同时完全的情况下,代码接着执行,有线程本身控制是否完成
  2. static CyclicBarrier barrier = new CyclicBarrier( 2, new BusinessTask());
  3. // 其他代码保持一致
  4. public static void main(String[] args) throws InterruptedException {
  5. System.out.println( "开始闯关:关卡一");
  6. InitTask initMM = new InitTask();
  7. initMM.setName( "冰妹妹");
  8. InitTask initGG = new InitTask();
  9. initGG.setName( "火哥哥");
  10. initGG.start();
  11. initMM.start();
  12. // 关卡一完成之后,火哥哥内急去了一趟洗手间,睡眠5秒
  13. Thread.sleep( 5000L);
  14. System.out.println( "开始闯关:关卡二");
  15. InitTask initMM2 = new InitTask();
  16. initMM2.setName( "冰妹妹");
  17. InitTask initGG2 = new InitTask();
  18. initGG2.setName( "火哥哥");
  19. initMM2.start();
  20. initGG2.start();
  21. }

执行结果如下:

开始闯关:关卡一
       冰妹妹到达终点
       火哥哥到达终点
       开始加载下一关资源
       恭喜火哥哥闯关成功
       恭喜冰妹妹闯关成功
       开始闯关:关卡二
       冰妹妹到达终点
       火哥哥到达终点
       开始加载下一关资源
       恭喜火哥哥闯关成功
       恭喜冰妹妹闯关成功

CountDownLatch 和 CyclicBarrier 区别

1、CountDownLatch 是依赖其他线程完成了某些操作来控制,而CliclicBarrier是有线程本身控制。 仔细想想CountDownLatch是等待其他五个初始化线程全部完成之后,主线程和业务现场才开始继续执行,是不是依赖其他线程控制。再想想冰妹妹和火哥哥,是必须要冰妹妹和火哥哥亲自完成到达屏障,屏障才会打开。

2、CountDownLatch每执行countDown()方法,计数器 - 1,而 CyclicBarrier 调用await()方法,计数器 + 1

3、CountDownLatch只可以使用一次,而CyclicBarrier可以重复使用。

4、CyclicBarrier还有一个很高级的构造函数,可以指定一个线程先进行操作。

4、Semaphore 信号量

用来控制同时访问特定资源的线程数量,它通过协调各个线程,来保证特定资源能够被合理使用。

Semaphore可以用于做流量控制,比如数据库连接,假设数据库同时连接的数量只能有5个,首先进来了5个线程拿到了许可证可以获得数据库连接,后进来的线程只能等待前面线程释放了,才可以拿到连接。

来看看Semaphore的构造函数:


  
  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }

  
  1. public Semaphore(int permits, boolean fair) {
  2. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  3. }

permits参数表示可用的许可证数量,比如Semaphore(5)  就表示运行5个线程能够同时获得资源。

fair 是控制是否公平锁,默认是非公平,传true则是公平锁。

大概解释下什么是公平和非公平,比如早餐排队买包子,大家都老老实实排队购买,这就是公平的。 这个时候来了个抠脚大汉,直接插入到队伍前面去买包子去了,这个时候就是不公平的体现了。

Semaphore 使用也很简单,通过Semaphore的acquire()方法拿到一个许可证,使用完成之后调用release()方法,来释放许可证。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/4/30 14:50
  4. * @Description: 信号量测试
  5. */
  6. public class SemaphoreTest {
  7. private static Semaphore semaphore = new Semaphore( 5);
  8. // 业务线程
  9. static class BusinessTask extends Thread {
  10. @Override
  11. public void run() {
  12. try {
  13. // 拿获取资格,拿不到就等着
  14. semaphore.acquire();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. try {
  19. System.out.println(Thread.currentThread().getName() + "拿到数据库连接");
  20. // 随机睡眠
  21. Thread.sleep( new Random().nextInt( 2000));
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. // 释放资源
  26. semaphore.release();
  27. }
  28. }
  29. public static void main(String[] args) {
  30. // 启动10个线程来获取资源
  31. for ( int i = 0; i < 10; i++) {
  32. BusinessTask task = new BusinessTask();
  33. task.start();
  34. }
  35. }
  36. }

运行的结果,并不是10个线程一开始就能获取到资源,而是先前5个获取到,后五个等待资源被释放了,才进行获取。

Semaphore还提供了一些其他方法:

 availablePermits() :返回此Semaphore对象中当前可用的许可数。

getQueueLength():返回正等待获取许可证的线程数。

hasQueuedThreads(): 是否有线程正在等待获取许可证。 

等等.........

5、Exchanger 交换

用于线程之间协作的工具类,它提供了一个同步点,在这个同步点,两个线程可以交换彼此的数据。

这里注意只能是两个线程,当其中一个线程执行了exchange()方法,它会一直等待第二个线程也执行到exchange()方法的时候,两个线程就可以交换数据。


  
  1. /**
  2. * @Auther: IT贱男
  3. * @Date: 2020/4/30 15:22
  4. * @Description: 线程数据交换
  5. */
  6. public class ExchangeTest {
  7. private static Exchanger<String> exchanger = new Exchanger<>();
  8. static class TaskA implements Runnable {
  9. @Override
  10. public void run() {
  11. String str = "我是线程A";
  12. try {
  13. System.out.println( "TaskA --- 原始值 = " + str);
  14. String exchange = exchanger.exchange(str);
  15. System.out.println( "TaskA --- 交换后的值 = " + exchange);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. static class TaskB implements Runnable {
  22. @Override
  23. public void run() {
  24. String str = "我是线程B";
  25. try {
  26. System.out.println( "TaskB --- 原始值 = " + str);
  27. String exchange = exchanger.exchange(str);
  28. System.out.println( "TaskB --- 交换后的值 = " + exchange);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. public static void main(String[] args) {
  35. Thread thread = new Thread( new TaskA());
  36. thread.start();
  37. Thread thread1 = new Thread( new TaskB());
  38. thread1.start();
  39. }
  40. }

执行结果如下:

TaskA --- 原始值 = 我是线程A
        TaskB --- 原始值 = 我是线程B
        TaskB --- 交换后的值 = 我是线程A
        TaskA --- 交换后的值 = 我是线程B

可见,TaskA的值交换后,变成了 “我是线程B”,TaskB也是如此。

如果两个线程有一个没有执行到exchange()方法,则会一直等待,但是又不想等太长时间。可以使用

public V exchange(V x, long timeout, TimeUnit unit)方法,来设置最大的等待时长。

6、本章总结

本章介绍了JDK中提供的几个并发工具类,了解这四种工具类,如果遇到合适的业务场景可以不妨试试这些。

后续小编还会持续更新并发一系列相关技术点,比如并发容器、并发框架、锁、Java内存模型等等,从简单到深入,从长发飘飘到所剩无几。

看完不来个赞么?~~~~


转载:https://blog.csdn.net/weixin_38111957/article/details/105994854
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场