飞道的博客

【JAVA学习笔记】多线程和并发编程

319人阅读  评论(0)

在家把前两天B站上的看过的JAVA核心技术【进阶】的多线程部分总结一下。

1 多进程和多线程简介

多进程概念

  • 当前的OS都是多任务的
  • 每个独立执行的任务就是一个进程
  • OS将CPU的时间划分为多个时间片
  • 在每个时间片里将CPU分配给某一个任务,时间片结束,CPU被自动回收,又分配给其他的任务去执行。在外部看,可能是所有任务同时在运行,但其实在单核CPU系统中,任务是串行的在CPU中运行。但如果是多核的话,多个任务才可以并行的去执行。

多进程的优点

  • 程序因为IO阻塞的时候,可以释放CPU,让CPU为其他程序服务。当有多个CPU时,可以为多个任务同时执行。
    -  CPU不再提高频率(频率过高,发热,摩尔定律已经失效),而是提高核数。
    -  多核和并行程序才是提高性能的唯一办法。

多进程的缺点

  • 太笨重,不好切换,不好管理。

多线程概念

  • 一个任务可以分为许多个子任务,可串/并行。
  • 每个子任务可以称为一个线程。
  • 如果一个子任务阻塞,程序可以将CPU调度到另外一个子任务中去运行,这样就可以将CPU保留在程序内部,而不会调度到其他程序中去,可以提高本程序获得CPU时间和利用率,减少来回切换线程上下文导致的开销

多线程 vs 多进程

  • 线程共享数据
  • 线程通讯更加高效
  • 线程更加轻量级,更加容易切换。
  • 线程比较容易管理

2 多线程的实现

多线程创建

  • 继承Thread类,实现run方法
  • 实现Runnable接口,实现run方法

  
  1. class TestThread0 extends Thread{
  2. public void run(){
  3. while( true){
  4. System.out.println( "testProcess is running");
  5. try {
  6. Thread.sleep( 2000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. }
  12. }
  13. class TestThread2 implements Runnable{
  14. volatile int tickets= 100;
  15. @Override
  16. public void run() {
  17. while ( true){
  18. if(tickets> 0){
  19. System.out.println(Thread.currentThread().getName()+ "is selling ticket"+tickets);
  20. tickets--;
  21. }
  22. else break;
  23. }
  24. }
  25. }

多线程的启动

  • start方法,会自动以新进程调用run方法
  • 多个线程启动,其启动的先后顺序是随机
  • 线程无需关闭,只要其run方法执行结束后,自动关闭 
  • main函数(线程)可能早于新线程结束,整个程序并不终止
  • 整个程序终止是等所有的线程都终止(包括main函数线程)

Thread vs Runnable

  • Thread占据了父类的名额,不如Runnable方便
  • Runnable启动时需要Thread类的支持
  • Runnable 更容易实现多线程中资源共享 

3 多线程信息共享

线程类

  • 通过继承Thread或实现Runnable 
  • 通过start方法,调用run方法,run方法工作 
  • 线程run结束后,线程退出 

    粗粒太粗,线程之间缺乏交流

细粒度:线程之间需要有信息交流通讯

  • 通过共享变量达到信息共享
  • JDK原生库不支持发送消息(类似C++MPI并行库直接发送消息)

通过共享变量在多个线程中共享消息

  • static变量
  • 同一个Runnable类中的成员变量

示例:


  
  1. class TestThread2 implements Runnable{
  2. volatile int tickets= 100;
  3. @Override
  4. public void run() {
  5. while ( true){
  6. if(tickets> 0){
  7. System.out.println(Thread.currentThread().getName()+ "is selling ticket"+tickets);
  8. tickets--;
  9. }
  10. else break;
  11. }
  12. }
  13. }
  14. new Thread(testThread2).start();
  15. new Thread(testThread2).start();
  16. new Thread(testThread2).start();
  17. new Thread(testThread2).start();
  18. new Thread(testThread2).start();

但是会出现一个数据不一致的现象

多线程信息共享问题

  • 每一个线程都有属于自己独有的工作缓存(应该为寄存器资源),从主存中读取数据放进自己的工作缓存。
  • 关键步骤缺乏加锁限制
  • i++,并非原子性操作
    读取主存i(正本)到工作缓存(副本)中
    每个CPU执行(副本)i+1操作 
    CPU将结果写入到缓存(副本)中 
    数据从工作缓存(副本)刷到主存(正本)中

变量副本问题的解决办法

  • 采用volatile 关键字修饰变量
  • 保证不同线程对共享变量操作时的可见性 

  
  1. class TestThread4 extends Thread{
  2. //boolean flag=true; //子线程不会停止
  3. volatile boolean flag= true; //用volatile修饰的变量能及时在变量中通知变化
  4. int i= 0;
  5. public void run() {
  6. while (flag){
  7. i++;
  8. }
  9. System.out.println( "TestThread4 is exit");
  10. }
  11. }
  12. TestThread4 testThread4= new TestThread4();
  13. testThread4.start();
  14. Thread.sleep( 2000);
  15. testThread4.flag= false;
  16. System.out.println( "mainThread is exit");

关键步骤加锁限制

  • 互斥:某一个线程运行一个代码段(关键区),其他线程不能同时运行这个代码段
  • 同步:多个线程的运行,必须按照某一种规定的先后顺序来运行
  • 互斥是同步的一种特例

  
  1. class TestThread5 implements Runnable{
  2. private volatile int tickets= 100;
  3. @Override
  4. public void run() {
  5. while (tickets> 0){
  6. sale();
  7. try {
  8. Thread.sleep( 100);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }
  14. public synchronized void sale(){
  15. if(tickets> 0){
  16. System.out.println(Thread.currentThread().getName()+ " is selling tickets"+ " "+tickets);
  17. tickets--;
  18. }
  19. }
  20. }
  21. Thread t= new Thread( new TestThread5());
  22. t.start();

互斥的关键字是synchronized

  • synchronized代码块/函数,只能一个线程进入 
  • synchronized加大性能负担,但是使用简便 

4 多线程管理

线程类

  • 通过继承Thread或实现Runnable 
  • 通过start方法,调用run方法,run方法工作 
  • 线程run结束后,线程退出 

    粗粒太粗,线程之间缺乏同步

细粒度:线程之间有同步协作 

  • 等待
  • 通知/唤醒
  • 终止

线程状态

  • NEW刚创建(new)
  • Runnable就绪态(start)
  • Running运行中(run)
  • Block阻塞(sleep)
  • Terminated结束

线程阻塞/唤醒

  • sleep,时间一到,自己会醒来 
  • wait/notify/notifyAll,等待,需要别人来唤醒 
  • join,等待另外一个线程结束 
  • interrupt,向另外一个线程发送中断信号,该线程收到信号,会 触发InterruptedException(可解除阻塞),并进行下一步处理

线程被动地暂停和终止

  • 依靠别的线程来拯救自己
  • 没有及时释放资源

 线程主动暂停和终止 

  • 定期监测共享变量 
  • 如果需要暂停或者终止,先释放资源,再主动动作
  • 暂停:Thread.sleep(),休眠
  • 终止:run方法结束,线程终止

  
  1. class TestThread7 extends Thread{
  2. volatile boolean flag= true;
  3. public void run(){
  4. while (flag){
  5. System.out.println( "Thread7 is running");
  6. try {
  7. Thread.sleep( 1000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. System.out.println( "Thread 7 is end");
  13. }
  14. }
  15. class TestThread6 extends Thread{
  16. public void run(){
  17. while(!interrupted()){
  18. System.out.println( "Thread6 is running");
  19. try {
  20. Thread.sleep( 1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. // break; //这里需要注意,当发生异常时要申明处理方式。如果这里不申明break,则该线程遇到异常后会一直循环下去
  24. }
  25. }
  26. System.out.println( "Thread6 is dead");
  27. }
  28. }
  29. TestThread6 testThread6= new TestThread6();
  30. TestThread7 testThread7= new TestThread7();
  31. testThread6.start();
  32. testThread7.start();
  33. Thread.sleep( 2000);
  34. testThread6.interrupt();
  35. testThread7.flag= false;

消费者生产者示例


  
  1. package com.mooc.Chapter5;
  2. import java.util.Random;
  3. /**
  4. * 多线程管理(1):
  5. * 线程阻塞/唤醒
  6. * sleep
  7. * wait/notify/notifyAll
  8. * join
  9. * interrupt
  10. * 例子:生产者消费者的例子
  11. */
  12. public class Thread4 {
  13. public static void main(String[] args) throws InterruptedException {
  14. Storage storage= new Storage();
  15. Producer producer1= new Producer(storage);
  16. Producer producer2= new Producer(storage);
  17. Consumer consumer1= new Consumer(storage);
  18. Consumer consumer2= new Consumer(storage);
  19. new Thread(producer1).setName( "生产者1");
  20. new Thread(producer2).setName( "生产者2");
  21. new Thread(consumer1).setName( "消费者1");
  22. new Thread(consumer2).setName( "消费者2");
  23. new Thread(producer1).start();
  24. new Thread(producer2).start();
  25. Thread.sleep( 1000);
  26. new Thread(consumer1).start();
  27. new Thread(consumer2).start();
  28. }
  29. }
  30. class Product{
  31. int id;
  32. String name;
  33. public Product(int id, String name) {
  34. this.id = id;
  35. this.name = name;
  36. }
  37. @Override
  38. public String toString() {
  39. return "(product"+id+ " "+name+ ")";
  40. }
  41. public int getId() {
  42. return id;
  43. }
  44. public void setId(int id) {
  45. this.id = id;
  46. }
  47. public String getName() {
  48. return name;
  49. }
  50. public void setName(String name) {
  51. this.name = name;
  52. }
  53. }
  54. //作为一个仓库,注意实例变量和相关的方法,以及初始化时的构造函数需要注意什么
  55. class Storage{
  56. //仓库容量
  57. private Product[] products= new Product[ 10];
  58. private int top= 0; //标记仓库库存数量
  59. //向仓库中存放东西(只准一个线程进来)
  60. public synchronized void push(Product product) throws InterruptedException {
  61. //考虑临界条件,若仓库已经存满了,则无法在存东西,需要线程等待。
  62. while(top==products.length){
  63. wait();
  64. System.out.println( "Producer wait");
  65. }
  66. //否则的话,存入东西,并库存加1,并唤醒其他等地线程
  67. products[top++]=product;
  68. System.out.println(Thread.currentThread().getName()+ "向仓库中生产了一个产品"+product);
  69. notifyAll(); //通知可能等待的消费者可以消费了
  70. System.out.println( "producer notifyAll");
  71. }
  72. //向仓库中取产品(只让一个线程进来)
  73. public synchronized Product pop() throws InterruptedException {
  74. //考虑临界条件
  75. while(top== 0){
  76. wait();
  77. System.out.println( "Consumer wait");
  78. }
  79. --top;
  80. Product p= new Product(products[top].getId(),products[top].getName());
  81. products[top]= null;
  82. System.out.println(Thread.currentThread().getName()+ "从仓库中取出来一个产品"+p);
  83. notifyAll();
  84. System.out.println( "consumer notifyAll");
  85. return p;
  86. }
  87. }
  88. class Producer implements Runnable{
  89. private Storage storage; //生产者需要仓库来进行生产吧
  90. //构造器进行初始化
  91. public Producer(Storage storage){
  92. this.storage=storage;
  93. }
  94. //生产流程:
  95. //若仓库不满则进行生产
  96. @Override
  97. public void run() {
  98. int j= 0;
  99. Random r= new Random();
  100. while (j< 10){
  101. j++;
  102. Product product= new Product(j, "电话"+r.nextInt( 100));
  103. try {
  104. storage.push(product);
  105. } catch (InterruptedException e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. }
  110. }
  111. class Consumer implements Runnable{
  112. private Storage storage;
  113. public Consumer(Storage storage){
  114. this.storage=storage;
  115. }
  116. @Override
  117. public void run() {
  118. int i= 0;
  119. while(i< 10){
  120. i++;
  121. try {
  122. storage.pop();
  123. Thread.sleep( 100);
  124. } catch (InterruptedException e) {
  125. e.printStackTrace();
  126. }
  127. }
  128. }
  129. }

多线程死锁

  • 每个线程互相持有别人需要的锁(哲学家吃面问题) 
  • 预防死锁,对资源进行等级排序 

  
  1. class TestThread51 extends Thread{
  2. public void run(){
  3. synchronized (Thread5.i){
  4. try {
  5. Thread.sleep( 2000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. synchronized (Thread5.j){
  10. System.out.println( "51 is running");
  11. }
  12. }
  13. }
  14. }
  15. class TestThread52 extends Thread{
  16. public void run(){
  17. synchronized (Thread5.j){
  18. try {
  19. Thread.sleep( 2000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. synchronized (Thread5.i){
  24. System.out.println( "52 is running");
  25. }
  26. }
  27. }
  28. }
  29. public class Thread5 {
  30. public static Integer i= 1; //因为锁只能加在对象上,所以声明两个包装类对象
  31. public static Integer j= 2;
  32. public static void main(String[] args) throws InterruptedException {
  33. TestThread51 testThread51= new TestThread51();
  34. TestThread52 testThread52= new TestThread52();
  35. testThread51.start();
  36. testThread52.start();
  37. }
  38. }

守护(后台)线程

  • 普通线程的结束,是run方法运行结束 
  • 守护线程的结束,是run方法运行结束,或main函数结束 
  • 守护线程永远不要访问资源,如文件或数据库等 

  
  1. class DaemonThread extends Thread{
  2. public void run(){
  3. while( true){
  4. System.out.println( "DeamonThread is running");
  5. try {
  6. Thread.sleep( 1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. }
  12. }
  13. DaemonThread d= new DaemonThread();
  14. d.setDaemon( true); //守护进程标志
  15. d.start();
  16. Thread.sleep( 1000);
  17. System.out.println( "main Thread is end");
  18. //守护进程会随着main进程一起结束

课后练习

分6个线程,计算m到n的值(1到100000000)的总和,要求每个线程计算数字量只差不超过1


  
  1. public class GcdTest {
  2. public static void main(String[] args) {
  3. int flag= 100000000/ 6;
  4. long result= 0;
  5. for( int i= 0;i< 6;i++){
  6. new Thread( new Test(i*flag+ 1,flag*(i+ 1))).start();
  7. }
  8. for( int i= 0;i< 100000000;i++){
  9. result=result+i;
  10. }
  11. System.out.println(result);
  12. }
  13. }
  14. class Test implements Runnable{
  15. int startNum,endNum;
  16. long result;
  17. public Test(int startNum,int endNum){
  18. this.startNum=startNum;
  19. this.endNum=endNum;
  20. }
  21. @Override
  22. public void run() {
  23. for( int i=startNum;i<endNum;i++){
  24. result=result+i;
  25. }
  26. try {
  27. Thread.sleep( 1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println(Thread.currentThread().getName()+ "count result:"+result);
  32. }
  33. }

5 JAVA并发框架Executor

并行计算

  • 业务:任务多,数据量大 
  • 串行 vs 并行
    -  串行编程简单,并行编程困难 
    -  单个计算核频率下降,计算核数增多,整体性能变高 
  • 并行困难(任务分配和执行过程高度耦合
    -  如何控制粒度,切割任
    -  如何分配任务给线程,监督线程执行过程
  • 并行模式
    -  主从模式(Master-Slave)
    -  Worker模式(Worker-Worker)
  • Java并发编程
    -  Thread/Runnable/Thread组管理
    -  Executor(本节重点)
    -  Fork-Join框架
  • 线程组ThreadGroup 
    -  线程的集合 –树形结构,大线程组可以包括小线程组
    -  可以通过enumerate方法遍历组内的线程,执行操作
    -  能够有效管理多个线程,但是管理效率低
    任务分配和执行过程高度耦合
    重复创建线程、关闭线程操作,无法重用线程
    
        
    1. package com.mooc.Chapter8;
    2. import java.util.Date;
    3. import java.util.Random;
    4. import java.util.concurrent.TimeUnit;
    5. /**
    6. * 线程组管理:
    7. *
    8. */
    9. public class Thread_group {
    10. public static void main(String[] args) {
    11. //创建线程组
    12. ThreadGroup threadGroup= new ThreadGroup( "Searcher");
    13. Result result= new Result();
    14. //创建一个任务,让十个线程来完成
    15. SearchTask searchTask= new SearchTask(result);
    16. for( int i= 0;i< 10;i++){
    17. Thread thread= new Thread(threadGroup,searchTask);
    18. thread.start();
    19. try {
    20. TimeUnit.SECONDS.sleep( 1);
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. System.out.println( "华丽的分割线==================================");
    26. //查看线程组的消息
    27. System.out.println( "active 线程数量"+threadGroup.activeCount());
    28. System.out.println( "线程组信息明细\n");
    29. threadGroup.list();
    30. System.out.println( "华丽的分割线=======================");
    31. //遍历线程组
    32. Thread[] threads= new Thread[threadGroup.activeCount()];
    33. threadGroup.enumerate(threads);
    34. for( int i= 0;i<threadGroup.activeCount();i++){
    35. System.out.println( "Thread"+threads[i].getName()+threads[i].getState());
    36. }
    37. waitFinish(threadGroup);
    38. //中断线程组中所有线程
    39. threadGroup.interrupt();
    40. }
    41. public static void waitFinish(ThreadGroup threadGroup){
    42. while (threadGroup.activeCount()> 9){
    43. try {
    44. TimeUnit.SECONDS.sleep( 1);
    45. } catch (InterruptedException e) {
    46. e.printStackTrace();
    47. break;
    48. }
    49. }
    50. }
    51. }
    52. class Result{
    53. private String name;
    54. public String getName() {
    55. return name;
    56. }
    57. public void setName(String name) {
    58. this.name = name;
    59. }
    60. }
    61. class SearchTask implements Runnable{
    62. Result result= new Result();
    63. public SearchTask(Result result){
    64. this.result=result;
    65. }
    66. @Override
    67. public void run() {
    68. //显示当前线程的名字
    69. String name=Thread.currentThread().getName();
    70. System.out.println( "线程"+name+ "is running");
    71. try {
    72. doTask();
    73. result.setName(name);
    74. } catch (InterruptedException e) {
    75. System.out.println( "Thread"+name+ "被中断了");
    76. e.printStackTrace();
    77. return;
    78. }
    79. System.out.println( "Thread"+name+ "完成了");
    80. }
    81. private void doTask() throws InterruptedException {
    82. //工作一段时间
    83. Random random= new Random( ( new Date()).getTime());
    84. int value=( int)(random.nextDouble()* 100);
    85. System.out.println( "Thread"+Thread.currentThread().getName()+value);
    86. TimeUnit.SECONDS.sleep(value);
    87. }
    88. }

 Executor

  • 从JDK 5开始提供Executor FrameWork(java.util.concurrent.*) 
    -  分离任务的创建和执行者的创建
    -  线程重复利用(new线程代价很大)
  • 共享线程池
    -  预设好的多个Thread,可弹性增加
    -  多次执行很多很小的任务
    任务创建和执行过程解耦
    程序员无需关心线程池执行任务过程
  • 主要类:ExecutorService, ThreadPoolExecutor,Future (用于存放结果)
    – Executors.newCachedThreadPool/newFixedThreadPool创建线程池
    – ExecutorService线程池服务
    – Callable  具体的逻辑对象(线程类)
    – Future 返回结果
    
        
    1. package com.mooc.Chapter8;
    2. import java.util.Date;
    3. import java.util.concurrent.Executor;
    4. import java.util.concurrent.Executors;
    5. import java.util.concurrent.ThreadPoolExecutor;
    6. public class Executor1 {
    7. public static void main(String[] args) throws InterruptedException {
    8. Server server= new Server();
    9. for ( int i= 0;i< 100;i++){
    10. Task task= new Task( "Task"+i);
    11. Thread.sleep( 10);
    12. server.submitTask(task);
    13. }
    14. server.endServer();
    15. }
    16. }
    17. class Server{
    18. //创建一个线程池
    19. private ThreadPoolExecutor executor;
    20. public Server(){
    21. executor=(ThreadPoolExecutor) Executors.newCachedThreadPool(); //线程池中动态变化的线程
    22. //executor= (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    23. }
    24. //向线程池提交任务
    25. public void submitTask(Task task){
    26. executor.execute(task); //执行,无返回值
    27. System.out.printf( "server:Pool Size:%d\n",executor.getPoolSize());
    28. System.out.printf( "server:Active Count:%d\n",executor.getActiveCount());
    29. System.out.printf( "server:Completed Tasks:%d\n",executor.getCompletedTaskCount());
    30. }
    31. public void endServer(){
    32. executor.shutdown();
    33. }
    34. }
    35. class Task implements Runnable{
    36. private String name;
    37. public Task(String name){
    38. this.name=name;
    39. }
    40. @Override
    41. public void run() {
    42. try {
    43. Long duration=( long)(Math.random()* 1000);
    44. System.out.printf( "%s:Task %s:Doing a task durng %d seconds\n",Thread.currentThread().getName(),name,duration);
    45. Thread.sleep(duration);
    46. } catch (InterruptedException e) {
    47. e.printStackTrace();
    48. }
    49. System.out.printf( "%s:Task %s:Finished on:%s\n",Thread.currentThread().getName(),name, new Date());
    50. }
    51. }
  • 课后练习:用线程池完成计算1-1000的总和
    
        
    1. package com.mooc.Chapter8;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. import java.util.Random;
    5. import java.util.concurrent.*;
    6. public class Executor2 {
    7. public static void main(String[] args) {
    8. //创建线程池
    9. ThreadPoolExecutor executor= (ThreadPoolExecutor) Executors.newFixedThreadPool( 5);
    10. //创建线程执行结果返回容器
    11. //Future<Integer> 是实现callable接口的返回值接受类型;
    12. List<Future<Integer>> resultList= new ArrayList<>();
    13. //执行任务,并将线程的执行结果与resultList容器相联系,这个容器会有方法判断线程的值是否已经执行并返回过来了。
    14. for( int i= 0;i< 10;i++){
    15. SumTask sumTask= new SumTask(i* 100+ 1,(i+ 1)* 100);
    16. Future<Integer> result=executor.submit(sumTask);
    17. resultList.add(result);
    18. }
    19. do{
    20. System.out.printf( "main:已经完成多少个任务:%d\n",executor.getCompletedTaskCount());
    21. for ( int i= 0;i<resultList.size();i++){
    22. Future<Integer> result=resultList.get(i);
    23. System.out.printf( "main:Task %d:%s\n",i,result.isDone());
    24. }
    25. //每隔50毫秒,轮询等待10个任务结束,轮询所有任务的执行情况
    26. try {
    27. Thread.sleep( 50);
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. }
    31. } while (executor.getCompletedTaskCount()<resultList.size());
    32. int total= 0;
    33. for( int i= 0;i<resultList.size();i++){
    34. Future<Integer> sum=resultList.get(i);
    35. Integer s= null;
    36. try {
    37. s=sum.get();
    38. total=total+s;
    39. } catch (InterruptedException e) {
    40. e.printStackTrace();
    41. } catch (ExecutionException e) {
    42. e.printStackTrace();
    43. }
    44. }
    45. System.out.println( "1-1000的总和为:"+total);
    46. }
    47. }
    48. //先定义任务类,索要完成的任务
    49. class SumTask implements Callable<Integer>{
    50. private int startNum,endNum;
    51. public SumTask(int startNum,int endNum){
    52. this.startNum=startNum;
    53. this.endNum=endNum;
    54. }
    55. @Override
    56. public Integer call() throws Exception {
    57. int result= 0;
    58. for( int i=startNum;i<=endNum;i++){
    59. result=result+i;
    60. }
    61. //再随机休眠一段时间
    62. Thread.sleep( new Random().nextInt( 1000));
    63. System.out.printf( "%s:%d\n",Thread.currentThread().getName(),result);
    64. return result;
    65. }
    66. }

     

Fork-Join框架

  •  Java 7 提供另一种并行框架:分解、治理、合并(分治编程) 
  • 适合用于整体任务量不好确定的场合(最小任务可确定)

  • 关键类
     -  ForkJoinPool 任务池
     -  RecursiveAction
     -  RecursiveTask
    
        
    1. package com.mooc.Chapter8;
    2. import java.util.concurrent.ExecutionException;
    3. import java.util.concurrent.ForkJoinPool;
    4. import java.util.concurrent.ForkJoinTask;
    5. import java.util.concurrent.RecursiveTask;
    6. public class ForkJoin {
    7. public static void main(String[] args) throws ExecutionException, InterruptedException {
    8. //创建线程池
    9. ForkJoinPool forkJoinPool= new ForkJoinPool();
    10. //ForkJoinPool forkJoinPool=new ForkJoinPool(5);
    11. //创建任务
    12. ForkTask forkTask= new ForkTask( 1, 1000000000);
    13. //提交任务,ForkJoinTask 保存返回值;
    14. ForkJoinTask<Long> result=forkJoinPool.submit(forkTask);
    15. //等待结果
    16. do{
    17. System.out.println( "Main:Thread Count: "+forkJoinPool.getActiveThreadCount());
    18. System.out.println( "Main:Paralelism: "+forkJoinPool.getParallelism());
    19. //每隔50毫秒轮询一次线程池中的执行情况
    20. try {
    21. Thread.sleep( 50);
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. } while (!forkTask.isDone());
    26. System.out.println(result.get().toString());
    27. }
    28. }
    29. class ForkTask extends RecursiveTask<Long>{
    30. private int start,end;
    31. public ForkTask(int start,int end){
    32. this.start=start;
    33. this.end=end;
    34. }
    35. public static int threadhold= 5;
    36. //将任务分解的递归
    37. @Override
    38. protected Long compute() {
    39. Long sum= 0L;
    40. boolean canCompute=(end-start)<=threadhold;
    41. //如果任务再阈值之内,直接执行
    42. if(canCompute){
    43. for( int i=start;i<=end;i++){
    44. sum=sum+i;
    45. }
    46. return sum;
    47. } else {
    48. //任务大于阈值,分解为两个任务
    49. int middle=(start+end)/ 2;
    50. ForkTask subTask1= new ForkTask(start,middle);
    51. ForkTask subTask2= new ForkTask(middle+ 1,end);
    52. //JDK自带的方法,相当于将两个子任务交给线程池去做了。
    53. invokeAll(subTask1,subTask2);
    54. //结果合并
    55. Long sum1=subTask1.join();
    56. //若没有完成,则线程阻塞
    57. Long sum2=subTask2.join();
    58. //结果合并
    59. sum=sum1+sum2;
    60. }
    61. return sum;
    62. }
    63. }

 6 并发数据结构

  • 常用的数据结构是线程不安全的 
    -  ArrayList, HashMap, HashSet非同步的 
    -  多个线程同时读写,可能会抛出异常或数据错误 
  • 传统Vector,Hashtable等同步集合性能过差 
  • 并发数据结构:数据添加和删除
    -  阻塞式集合:当集合为空或者满时,等待 
    -  非阻塞式集合:当集合为空或者满时,不等待,返回null或异常
  • List
    -  Vector  同步安全,写多读少 
    -  ArrayList 不安全 
    -  Collections.synchronizedList(List list) 基于synchronized,效率差 
    -  CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞 
  • Set
  • Map
  • Quene & Deque

7 JAVA并发协作控制

线程协作

  •  Thread/Executor/Fork-Join 
    – 线程启动,运行,结束
    – 线程之间缺少协作
  •  synchronized 同步 
    – 限定只有一个线程才能进入关键区
    – 简单粗暴,性能损失有点大

Lock

  • Lock也可以实现同步的效果
    – 实现更复杂的临界区结构
    – tryLock方法可以预判锁是否空闲
    – 允许分离读写的操作,多个读,一个写
    – 性能更好 
  •  ReentrantLock类,可重入的互斥锁 
  • ReentrantReadWriteLock类,可重入的读写锁 
  •  lock和unlock函数
  • 编程实例:排队买奶茶,以及老板员工看账本
    
        
    1. package com.mooc.Chapter8;
    2. import java.util.concurrent.locks.ReentrantLock;
    3. import java.util.concurrent.locks.ReentrantReadWriteLock;
    4. public class LockExample {
    5. private static final ReentrantLock queneLock= new ReentrantLock(); //可重入锁
    6. private static final ReentrantReadWriteLock orderLock= new ReentrantReadWriteLock(); //可重入读写锁
    7. public static void main(String[] args) throws InterruptedException {
    8. //buyMilkTea();
    9. handleOrder();
    10. }
    11. //试图去买奶茶的方法
    12. public void tryToBuyMilkTea() throws InterruptedException {
    13. boolean flag= true; //用于进入while循环轮询队伍是否有人,当没人时好买完退出来。
    14. while (flag){
    15. //队列中是否有人
    16. if(queneLock.tryLock()){
    17. long thinkingTime=( long) (Math.random()* 500);
    18. Thread.sleep(thinkingTime);
    19. System.out.println(Thread.currentThread().getName()+ ": 来一杯珍珠奶茶");
    20. flag= false;
    21. queneLock.unlock();
    22. } else {
    23. System.out.println(Thread.currentThread().getName()+ ":再等等");
    24. }
    25. if (flag){
    26. Thread.sleep( 1000); //每个1秒钟来轮询一下
    27. }
    28. }
    29. }
    30. public static void buyMilkTea() throws InterruptedException {
    31. LockExample lockExample= new LockExample();
    32. int STUDENT_CNT= 10;
    33. Thread[] students= new Thread[STUDENT_CNT];
    34. for( int i= 0;i<STUDENT_CNT;i++){
    35. students[i]= new Thread( new Runnable() {
    36. @Override
    37. public void run() {
    38. try {
    39. long walkingTime= ( long) (Math.random()* 1000);
    40. Thread.sleep(walkingTime);
    41. lockExample.tryToBuyMilkTea(); //很重要,精髓
    42. } catch (InterruptedException e) {
    43. e.printStackTrace();
    44. }
    45. }
    46. });
    47. students[i].start();
    48. }
    49. //线程结束
    50. for( int i= 0;i<STUDENT_CNT;i++){
    51. students[i].join();
    52. }
    53. }
    54. public void addOrder() throws InterruptedException {
    55. //加写锁
    56. orderLock.writeLock().lock();
    57. long writingTime= ( long) (Math.random()* 1000);
    58. Thread.sleep(writingTime);
    59. System.out.println( "老板新加一笔订单");
    60. //释放锁
    61. orderLock.writeLock().unlock();
    62. }
    63. public void viewOrder() throws InterruptedException {
    64. orderLock.readLock().lock();
    65. long writingTime= ( long) (Math.random()* 500);
    66. Thread.sleep(writingTime);
    67. System.out.println(Thread.currentThread().getName()+ ": 正在查看订单本");
    68. orderLock.readLock().unlock();
    69. }
    70. public static void handleOrder(){
    71. LockExample lockExample= new LockExample();
    72. Thread boss= new Thread( new Runnable() {
    73. @Override
    74. public void run() {
    75. while( true){
    76. try {
    77. lockExample.addOrder();
    78. long waitingTime= ( long) (Math.random()* 1000);
    79. Thread.sleep(waitingTime);
    80. } catch (InterruptedException e) {
    81. e.printStackTrace();
    82. }
    83. }
    84. }
    85. });
    86. boss.start();
    87. int workersCnt= 3;
    88. Thread[] workers= new Thread[workersCnt];
    89. for( int i= 0;i<workersCnt;i++){
    90. workers[i]= new Thread( new Runnable() {
    91. @Override
    92. public void run() {
    93. while( true){
    94. try {
    95. lockExample.viewOrder();
    96. long waitingTime= ( long) (Math.random()* 500);
    97. Thread.sleep(waitingTime);
    98. } catch (InterruptedException e) {
    99. e.printStackTrace();
    100. }
    101. }
    102. }
    103. });
    104. workers[i].start();
    105. }
    106. }
    107. }

    信号量

  • 信号量:本质上是一个计数器 
  • 计数器大于0,可以使用,等于0不能使用 
  • 可以设置多个并发量,例如限制10个访问 
  •  Semaphore 
    – acquire获取
    – release释放 
  • 比Lock更进一步,可以控制多个同时访问关键区

示例:停车场10辆车5个车位模拟


  
  1. package com.mooc.Chapter8;
  2. import java.util.concurrent.Semaphore;
  3. public class SemaphoreExample {
  4. //信号量数目
  5. private final Semaphore placeSemaphore = new Semaphore( 5);
  6. public static void main(String[] args) throws InterruptedException {
  7. Thread[] cars= new Thread[ 10];
  8. SemaphoreExample semaphoreExample= new SemaphoreExample();
  9. for( int i= 0;i< 10;i++){
  10. cars[i]= new Thread( new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. //汽车随机到来,所以先过来休眠
  15. long randomTime= ( long) (Math.random()* 1000);
  16. Thread.sleep(randomTime);
  17. //在判断是否有车位
  18. if(semaphoreExample.parking()){
  19. long parkingTime= ( long) (Math.random()* 1000);
  20. Thread.sleep(parkingTime);
  21. System.out.println(Thread.currentThread().getName()+ ": 停车"+parkingTime+ "ms");
  22. semaphoreExample.leaving();
  23. }
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. });
  29. cars[i].start();
  30. }
  31. for( int i= 0;i< 10;i++){
  32. cars[i].join();
  33. }
  34. }
  35. public boolean parking() throws InterruptedException {
  36. boolean flag= true; //标记,用于轮询后退出
  37. while (flag){
  38. if (placeSemaphore.tryAcquire()){
  39. System.out.println(Thread.currentThread().getName()+ ":停车成功");
  40. return true; //这里直接退出了
  41. } else {
  42. System.out.println(Thread.currentThread().getName()+ ":没有空位,需要等待");
  43. Thread.sleep( 1000);
  44. }
  45. }
  46. return flag;
  47. }
  48. public void leaving(){
  49. placeSemaphore.release();
  50. System.out.println(Thread.currentThread().getName()+ ":开走了");
  51. }
  52. }

Latch

Barrier

Phaser

Exchanger


转载:https://blog.csdn.net/weixin_39966701/article/details/105103904
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场