在家把前两天B站上的看过的JAVA核心技术【进阶】的多线程部分总结一下。
1 多进程和多线程简介
多进程概念
- 当前的OS都是多任务的
- 每个独立执行的任务就是一个进程
- OS将CPU的时间划分为多个时间片
- 在每个时间片里将CPU分配给某一个任务,时间片结束,CPU被自动回收,又分配给其他的任务去执行。在外部看,可能是所有任务同时在运行,但其实在单核CPU系统中,任务是串行的在CPU中运行。但如果是多核的话,多个任务才可以并行的去执行。
多进程的优点
- 程序因为IO阻塞的时候,可以释放CPU,让CPU为其他程序服务。当有多个CPU时,可以为多个任务同时执行。
- CPU不再提高频率(频率过高,发热,摩尔定律已经失效),而是提高核数。
- 多核和并行程序才是提高性能的唯一办法。
多进程的缺点
- 太笨重,不好切换,不好管理。
多线程概念
- 一个任务可以分为许多个子任务,可串/并行。
- 每个子任务可以称为一个线程。
- 如果一个子任务阻塞,程序可以将CPU调度到另外一个子任务中去运行,这样就可以将CPU保留在程序内部,而不会调度到其他程序中去,可以提高本程序获得CPU时间和利用率,减少来回切换线程上下文导致的开销。
多线程 vs 多进程
- 线程共享数据
- 线程通讯更加高效
- 线程更加轻量级,更加容易切换。
- 线程比较容易管理
2 多线程的实现
多线程创建
- 继承Thread类,实现run方法
- 实现Runnable接口,实现run方法
-
class TestThread0 extends Thread{
-
public void run(){
-
while(
true){
-
System.out.println(
"testProcess is running");
-
try {
-
Thread.sleep(
2000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-
-
class TestThread2 implements Runnable{
-
-
volatile
int tickets=
100;
-
@Override
-
public void run() {
-
while (
true){
-
-
if(tickets>
0){
-
System.out.println(Thread.currentThread().getName()+
"is selling ticket"+tickets);
-
tickets--;
-
}
-
else
break;
-
}
-
}
-
}
多线程的启动
- start方法,会自动以新进程调用run方法
- 多个线程启动,其启动的先后顺序是随机
- 线程无需关闭,只要其run方法执行结束后,自动关闭
- main函数(线程)可能早于新线程结束,整个程序并不终止
- 整个程序终止是等所有的线程都终止(包括main函数线程)
Thread vs Runnable
- Thread占据了父类的名额,不如Runnable方便
- Runnable启动时需要Thread类的支持
- Runnable 更容易实现多线程中资源共享
3 多线程信息共享
线程类
- 通过继承Thread或实现Runnable
- 通过start方法,调用run方法,run方法工作
- 线程run结束后,线程退出
粗粒太粗,线程之间缺乏交流
细粒度:线程之间需要有信息交流通讯
- 通过共享变量达到信息共享
- JDK原生库不支持发送消息(类似C++MPI并行库直接发送消息)
通过共享变量在多个线程中共享消息
- static变量
- 同一个Runnable类中的成员变量
示例:
-
class TestThread2 implements Runnable{
-
-
volatile
int tickets=
100;
-
@Override
-
public void run() {
-
while (
true){
-
-
if(tickets>
0){
-
System.out.println(Thread.currentThread().getName()+
"is selling ticket"+tickets);
-
tickets--;
-
}
-
else
break;
-
}
-
}
-
}
-
new Thread(testThread2).start();
-
new Thread(testThread2).start();
-
new Thread(testThread2).start();
-
new Thread(testThread2).start();
-
new Thread(testThread2).start();
但是会出现一个数据不一致的现象
多线程信息共享问题
- 每一个线程都有属于自己独有的工作缓存(应该为寄存器资源),从主存中读取数据放进自己的工作缓存。
- 关键步骤缺乏加锁限制
- i++,并非原子性操作
读取主存i(正本)到工作缓存(副本)中
每个CPU执行(副本)i+1操作
CPU将结果写入到缓存(副本)中
数据从工作缓存(副本)刷到主存(正本)中
变量副本问题的解决办法
- 采用volatile 关键字修饰变量
- 保证不同线程对共享变量操作时的可见性
-
class TestThread4 extends Thread{
-
-
//boolean flag=true; //子线程不会停止
-
volatile
boolean flag=
true;
//用volatile修饰的变量能及时在变量中通知变化
-
int i=
0;
-
-
public void run() {
-
while (flag){
-
i++;
-
}
-
System.out.println(
"TestThread4 is exit");
-
}
-
}
-
-
TestThread4 testThread4=
new TestThread4();
-
testThread4.start();
-
Thread.sleep(
2000);
-
testThread4.flag=
false;
-
System.out.println(
"mainThread is exit");
关键步骤加锁限制
- 互斥:某一个线程运行一个代码段(关键区),其他线程不能同时运行这个代码段
- 同步:多个线程的运行,必须按照某一种规定的先后顺序来运行
- 互斥是同步的一种特例
-
class TestThread5 implements Runnable{
-
-
private
volatile
int tickets=
100;
-
-
@Override
-
public void run() {
-
while (tickets>
0){
-
sale();
-
try {
-
Thread.sleep(
100);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
public synchronized void sale(){
-
if(tickets>
0){
-
System.out.println(Thread.currentThread().getName()+
" is selling tickets"+
" "+tickets);
-
tickets--;
-
}
-
}
-
}
-
-
Thread t=
new Thread(
new TestThread5());
-
t.start();
互斥的关键字是synchronized
- synchronized代码块/函数,只能一个线程进入
- synchronized加大性能负担,但是使用简便
4 多线程管理
线程类
- 通过继承Thread或实现Runnable
- 通过start方法,调用run方法,run方法工作
- 线程run结束后,线程退出
粗粒太粗,线程之间缺乏同步
细粒度:线程之间有同步协作
- 等待
- 通知/唤醒
- 终止
线程状态
- NEW刚创建(new)
- Runnable就绪态(start)
- Running运行中(run)
- Block阻塞(sleep)
- Terminated结束
线程阻塞/唤醒
- sleep,时间一到,自己会醒来
- wait/notify/notifyAll,等待,需要别人来唤醒
- join,等待另外一个线程结束
- interrupt,向另外一个线程发送中断信号,该线程收到信号,会 触发InterruptedException(可解除阻塞),并进行下一步处理
线程被动地暂停和终止
- 依靠别的线程来拯救自己
- 没有及时释放资源
线程主动暂停和终止
- 定期监测共享变量
- 如果需要暂停或者终止,先释放资源,再主动动作
- 暂停:Thread.sleep(),休眠
- 终止:run方法结束,线程终止
-
class TestThread7 extends Thread{
-
volatile
boolean flag=
true;
-
public void run(){
-
while (flag){
-
System.out.println(
"Thread7 is running");
-
-
try {
-
Thread.sleep(
1000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
System.out.println(
"Thread 7 is end");
-
}
-
}
-
-
class TestThread6 extends Thread{
-
public void run(){
-
while(!interrupted()){
-
System.out.println(
"Thread6 is running");
-
-
try {
-
Thread.sleep(
1000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
// break; //这里需要注意,当发生异常时要申明处理方式。如果这里不申明break,则该线程遇到异常后会一直循环下去
-
}
-
}
-
-
System.out.println(
"Thread6 is dead");
-
}
-
}
-
-
TestThread6 testThread6=
new TestThread6();
-
TestThread7 testThread7=
new TestThread7();
-
-
testThread6.start();
-
testThread7.start();
-
-
Thread.sleep(
2000);
-
testThread6.interrupt();
-
testThread7.flag=
false;
消费者生产者示例
-
package com.mooc.Chapter5;
-
-
-
import java.util.Random;
-
-
/**
-
* 多线程管理(1):
-
* 线程阻塞/唤醒
-
* sleep
-
* wait/notify/notifyAll
-
* join
-
* interrupt
-
* 例子:生产者消费者的例子
-
*/
-
public
class Thread4 {
-
public static void main(String[] args) throws InterruptedException {
-
Storage storage=
new Storage();
-
Producer producer1=
new Producer(storage);
-
Producer producer2=
new Producer(storage);
-
Consumer consumer1=
new Consumer(storage);
-
Consumer consumer2=
new Consumer(storage);
-
new Thread(producer1).setName(
"生产者1");
-
new Thread(producer2).setName(
"生产者2");
-
new Thread(consumer1).setName(
"消费者1");
-
new Thread(consumer2).setName(
"消费者2");
-
-
new Thread(producer1).start();
-
new Thread(producer2).start();
-
Thread.sleep(
1000);
-
new Thread(consumer1).start();
-
new Thread(consumer2).start();
-
}
-
}
-
-
class Product{
-
int id;
-
String name;
-
-
public Product(int id, String name) {
-
this.id = id;
-
this.name = name;
-
}
-
-
@Override
-
public String toString() {
-
return
"(product"+id+
" "+name+
")";
-
}
-
-
public int getId() {
-
return id;
-
}
-
-
public void setId(int id) {
-
this.id = id;
-
}
-
-
public String getName() {
-
return name;
-
}
-
-
public void setName(String name) {
-
this.name = name;
-
}
-
-
}
-
-
//作为一个仓库,注意实例变量和相关的方法,以及初始化时的构造函数需要注意什么
-
class Storage{
-
//仓库容量
-
private Product[] products=
new Product[
10];
-
private
int top=
0;
//标记仓库库存数量
-
-
-
//向仓库中存放东西(只准一个线程进来)
-
public synchronized void push(Product product) throws InterruptedException {
-
//考虑临界条件,若仓库已经存满了,则无法在存东西,需要线程等待。
-
while(top==products.length){
-
wait();
-
System.out.println(
"Producer wait");
-
}
-
//否则的话,存入东西,并库存加1,并唤醒其他等地线程
-
products[top++]=product;
-
System.out.println(Thread.currentThread().getName()+
"向仓库中生产了一个产品"+product);
-
notifyAll();
//通知可能等待的消费者可以消费了
-
System.out.println(
"producer notifyAll");
-
}
-
-
//向仓库中取产品(只让一个线程进来)
-
public synchronized Product pop() throws InterruptedException {
-
//考虑临界条件
-
while(top==
0){
-
wait();
-
System.out.println(
"Consumer wait");
-
}
-
-
--top;
-
Product p=
new Product(products[top].getId(),products[top].getName());
-
products[top]=
null;
-
System.out.println(Thread.currentThread().getName()+
"从仓库中取出来一个产品"+p);
-
notifyAll();
-
System.out.println(
"consumer notifyAll");
-
return p;
-
}
-
}
-
-
class Producer implements Runnable{
-
private Storage storage;
//生产者需要仓库来进行生产吧
-
//构造器进行初始化
-
public Producer(Storage storage){
-
this.storage=storage;
-
}
-
//生产流程:
-
//若仓库不满则进行生产
-
@Override
-
public void run() {
-
int j=
0;
-
Random r=
new Random();
-
while (j<
10){
-
j++;
-
Product product=
new Product(j,
"电话"+r.nextInt(
100));
-
try {
-
storage.push(product);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-
-
class Consumer implements Runnable{
-
-
private Storage storage;
-
public Consumer(Storage storage){
-
this.storage=storage;
-
}
-
-
@Override
-
public void run() {
-
int i=
0;
-
while(i<
10){
-
i++;
-
try {
-
storage.pop();
-
Thread.sleep(
100);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
多线程死锁
- 每个线程互相持有别人需要的锁(哲学家吃面问题)
- 预防死锁,对资源进行等级排序
-
class TestThread51 extends Thread{
-
public void run(){
-
synchronized (Thread5.i){
-
try {
-
Thread.sleep(
2000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
synchronized (Thread5.j){
-
System.out.println(
"51 is running");
-
}
-
}
-
}
-
}
-
-
class TestThread52 extends Thread{
-
public void run(){
-
synchronized (Thread5.j){
-
try {
-
Thread.sleep(
2000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
synchronized (Thread5.i){
-
System.out.println(
"52 is running");
-
-
}
-
}
-
}
-
}
-
-
public
class Thread5 {
-
-
public
static Integer i=
1;
//因为锁只能加在对象上,所以声明两个包装类对象
-
public
static Integer j=
2;
-
public static void main(String[] args) throws InterruptedException {
-
-
TestThread51 testThread51=
new TestThread51();
-
TestThread52 testThread52=
new TestThread52();
-
testThread51.start();
-
testThread52.start();
-
}
-
}
守护(后台)线程
- 普通线程的结束,是run方法运行结束
- 守护线程的结束,是run方法运行结束,或main函数结束
- 守护线程永远不要访问资源,如文件或数据库等
-
class DaemonThread extends Thread{
-
-
public void run(){
-
while(
true){
-
System.out.println(
"DeamonThread is running");
-
try {
-
Thread.sleep(
1000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-
-
DaemonThread d=
new DaemonThread();
-
d.setDaemon(
true);
//守护进程标志
-
d.start();
-
Thread.sleep(
1000);
-
-
System.out.println(
"main Thread is end");
-
//守护进程会随着main进程一起结束
课后练习
分6个线程,计算m到n的值(1到100000000)的总和,要求每个线程计算数字量只差不超过1
-
public
class GcdTest {
-
-
-
public static void main(String[] args) {
-
int flag=
100000000/
6;
-
long result=
0;
-
-
for(
int i=
0;i<
6;i++){
-
new Thread(
new Test(i*flag+
1,flag*(i+
1))).start();
-
}
-
for(
int i=
0;i<
100000000;i++){
-
result=result+i;
-
}
-
System.out.println(result);
-
-
}
-
}
-
-
class Test implements Runnable{
-
-
int startNum,endNum;
-
long result;
-
public Test(int startNum,int endNum){
-
this.startNum=startNum;
-
this.endNum=endNum;
-
}
-
@Override
-
public void run() {
-
for(
int i=startNum;i<endNum;i++){
-
result=result+i;
-
}
-
try {
-
Thread.sleep(
1000);
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
System.out.println(Thread.currentThread().getName()+
"count result:"+result);
-
}
-
}
5 JAVA并发框架Executor
并行计算
- 业务:任务多,数据量大
- 串行 vs 并行
- 串行编程简单,并行编程困难
- 单个计算核频率下降,计算核数增多,整体性能变高 - 并行困难(任务分配和执行过程高度耦合)
- 如何控制粒度,切割任
- 如何分配任务给线程,监督线程执行过程 - 并行模式
- 主从模式(Master-Slave)
- Worker模式(Worker-Worker) - Java并发编程
- Thread/Runnable/Thread组管理
- Executor(本节重点)
- Fork-Join框架 - 线程组ThreadGroup
- 线程的集合 –树形结构,大线程组可以包括小线程组
- 可以通过enumerate方法遍历组内的线程,执行操作
- 能够有效管理多个线程,但是管理效率低
- 任务分配和执行过程高度耦合
- 重复创建线程、关闭线程操作,无法重用线程-
package com.mooc.Chapter8;
-
-
-
import java.util.Date;
-
import java.util.Random;
-
import java.util.concurrent.TimeUnit;
-
-
/**
-
* 线程组管理:
-
*
-
*/
-
public class Thread_group {
-
-
-
public static void main(String[] args) {
-
//创建线程组
-
ThreadGroup threadGroup= new ThreadGroup( "Searcher");
-
Result result= new Result();
-
//创建一个任务,让十个线程来完成
-
SearchTask searchTask= new SearchTask(result);
-
for( int i= 0;i< 10;i++){
-
Thread thread= new Thread(threadGroup,searchTask);
-
thread.start();
-
try {
-
TimeUnit.SECONDS.sleep( 1);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
System.out.println( "华丽的分割线==================================");
-
-
//查看线程组的消息
-
System.out.println( "active 线程数量"+threadGroup.activeCount());
-
System.out.println( "线程组信息明细\n");
-
threadGroup.list();
-
System.out.println( "华丽的分割线=======================");
-
-
//遍历线程组
-
Thread[] threads= new Thread[threadGroup.activeCount()];
-
threadGroup.enumerate(threads);
-
for( int i= 0;i<threadGroup.activeCount();i++){
-
System.out.println( "Thread"+threads[i].getName()+threads[i].getState());
-
}
-
-
waitFinish(threadGroup);
-
-
//中断线程组中所有线程
-
threadGroup.interrupt();
-
}
-
public static void waitFinish(ThreadGroup threadGroup){
-
while (threadGroup.activeCount()> 9){
-
try {
-
TimeUnit.SECONDS.sleep( 1);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
break;
-
}
-
}
-
}
-
}
-
-
class Result{
-
private String name;
-
-
public String getName() {
-
return name;
-
}
-
-
public void setName(String name) {
-
this.name = name;
-
}
-
}
-
-
class SearchTask implements Runnable{
-
-
Result result= new Result();
-
public SearchTask(Result result){
-
this.result=result;
-
}
-
@Override
-
public void run() {
-
//显示当前线程的名字
-
String name=Thread.currentThread().getName();
-
System.out.println( "线程"+name+ "is running");
-
try {
-
doTask();
-
result.setName(name);
-
} catch (InterruptedException e) {
-
System.out.println( "Thread"+name+ "被中断了");
-
e.printStackTrace();
-
return;
-
}
-
System.out.println( "Thread"+name+ "完成了");
-
}
-
private void doTask() throws InterruptedException {
-
//工作一段时间
-
Random random= new Random( ( new Date()).getTime());
-
int value=( int)(random.nextDouble()* 100);
-
System.out.println( "Thread"+Thread.currentThread().getName()+value);
-
TimeUnit.SECONDS.sleep(value);
-
}
-
}
-
Executor
- 从JDK 5开始提供Executor FrameWork(java.util.concurrent.*)
- 分离任务的创建和执行者的创建
- 线程重复利用(new线程代价很大) - 共享线程池
- 预设好的多个Thread,可弹性增加
- 多次执行很多很小的任务
- 任务创建和执行过程解耦
- 程序员无需关心线程池执行任务过程 - 主要类:ExecutorService, ThreadPoolExecutor,Future (用于存放结果)
– Executors.newCachedThreadPool/newFixedThreadPool创建线程池
– ExecutorService线程池服务
– Callable 具体的逻辑对象(线程类)
– Future 返回结果-
package com.mooc.Chapter8;
-
-
-
import java.util.Date;
-
import java.util.concurrent.Executor;
-
import java.util.concurrent.Executors;
-
import java.util.concurrent.ThreadPoolExecutor;
-
-
public class Executor1 {
-
public static void main(String[] args) throws InterruptedException {
-
Server server= new Server();
-
for ( int i= 0;i< 100;i++){
-
Task task= new Task( "Task"+i);
-
Thread.sleep( 10);
-
server.submitTask(task);
-
}
-
server.endServer();
-
}
-
}
-
-
class Server{
-
//创建一个线程池
-
private ThreadPoolExecutor executor;
-
public Server(){
-
executor=(ThreadPoolExecutor) Executors.newCachedThreadPool(); //线程池中动态变化的线程
-
//executor= (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
-
}
-
//向线程池提交任务
-
public void submitTask(Task task){
-
executor.execute(task); //执行,无返回值
-
-
System.out.printf( "server:Pool Size:%d\n",executor.getPoolSize());
-
System.out.printf( "server:Active Count:%d\n",executor.getActiveCount());
-
System.out.printf( "server:Completed Tasks:%d\n",executor.getCompletedTaskCount());
-
}
-
-
public void endServer(){
-
executor.shutdown();
-
}
-
}
-
-
class Task implements Runnable{
-
-
private String name;
-
public Task(String name){
-
this.name=name;
-
}
-
@Override
-
public void run() {
-
try {
-
Long duration=( long)(Math.random()* 1000);
-
System.out.printf( "%s:Task %s:Doing a task durng %d seconds\n",Thread.currentThread().getName(),name,duration);
-
Thread.sleep(duration);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
System.out.printf( "%s:Task %s:Finished on:%s\n",Thread.currentThread().getName(),name, new Date());
-
}
-
}
-
- 课后练习:用线程池完成计算1-1000的总和
-
package com.mooc.Chapter8;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
import java.util.Random;
-
import java.util.concurrent.*;
-
-
public class Executor2 {
-
public static void main(String[] args) {
-
//创建线程池
-
ThreadPoolExecutor executor= (ThreadPoolExecutor) Executors.newFixedThreadPool( 5);
-
//创建线程执行结果返回容器
-
//Future<Integer> 是实现callable接口的返回值接受类型;
-
List<Future<Integer>> resultList= new ArrayList<>();
-
//执行任务,并将线程的执行结果与resultList容器相联系,这个容器会有方法判断线程的值是否已经执行并返回过来了。
-
for( int i= 0;i< 10;i++){
-
SumTask sumTask= new SumTask(i* 100+ 1,(i+ 1)* 100);
-
Future<Integer> result=executor.submit(sumTask);
-
resultList.add(result);
-
}
-
do{
-
System.out.printf( "main:已经完成多少个任务:%d\n",executor.getCompletedTaskCount());
-
for ( int i= 0;i<resultList.size();i++){
-
Future<Integer> result=resultList.get(i);
-
System.out.printf( "main:Task %d:%s\n",i,result.isDone());
-
}
-
//每隔50毫秒,轮询等待10个任务结束,轮询所有任务的执行情况
-
try {
-
Thread.sleep( 50);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
} while (executor.getCompletedTaskCount()<resultList.size());
-
-
int total= 0;
-
for( int i= 0;i<resultList.size();i++){
-
Future<Integer> sum=resultList.get(i);
-
Integer s= null;
-
try {
-
s=sum.get();
-
total=total+s;
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
}
-
System.out.println( "1-1000的总和为:"+total);
-
}
-
}
-
//先定义任务类,索要完成的任务
-
class SumTask implements Callable<Integer>{
-
-
private int startNum,endNum;
-
-
-
public SumTask(int startNum,int endNum){
-
this.startNum=startNum;
-
this.endNum=endNum;
-
}
-
@Override
-
public Integer call() throws Exception {
-
int result= 0;
-
for( int i=startNum;i<=endNum;i++){
-
result=result+i;
-
}
-
//再随机休眠一段时间
-
Thread.sleep( new Random().nextInt( 1000));
-
System.out.printf( "%s:%d\n",Thread.currentThread().getName(),result);
-
return result;
-
}
-
}
-
Fork-Join框架
- Java 7 提供另一种并行框架:分解、治理、合并(分治编程)
- 适合用于整体任务量不好确定的场合(最小任务可确定)
- 关键类
- ForkJoinPool 任务池
- RecursiveAction
- RecursiveTask-
package com.mooc.Chapter8;
-
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.ForkJoinPool;
-
import java.util.concurrent.ForkJoinTask;
-
import java.util.concurrent.RecursiveTask;
-
-
public class ForkJoin {
-
public static void main(String[] args) throws ExecutionException, InterruptedException {
-
//创建线程池
-
ForkJoinPool forkJoinPool= new ForkJoinPool();
-
//ForkJoinPool forkJoinPool=new ForkJoinPool(5);
-
//创建任务
-
ForkTask forkTask= new ForkTask( 1, 1000000000);
-
//提交任务,ForkJoinTask 保存返回值;
-
ForkJoinTask<Long> result=forkJoinPool.submit(forkTask);
-
//等待结果
-
do{
-
System.out.println( "Main:Thread Count: "+forkJoinPool.getActiveThreadCount());
-
System.out.println( "Main:Paralelism: "+forkJoinPool.getParallelism());
-
-
//每隔50毫秒轮询一次线程池中的执行情况
-
try {
-
Thread.sleep( 50);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
} while (!forkTask.isDone());
-
System.out.println(result.get().toString());
-
}
-
}
-
-
class ForkTask extends RecursiveTask<Long>{
-
-
private int start,end;
-
public ForkTask(int start,int end){
-
this.start=start;
-
this.end=end;
-
}
-
public static int threadhold= 5;
-
//将任务分解的递归
-
@Override
-
protected Long compute() {
-
Long sum= 0L;
-
boolean canCompute=(end-start)<=threadhold;
-
//如果任务再阈值之内,直接执行
-
if(canCompute){
-
for( int i=start;i<=end;i++){
-
sum=sum+i;
-
}
-
return sum;
-
} else {
-
//任务大于阈值,分解为两个任务
-
int middle=(start+end)/ 2;
-
ForkTask subTask1= new ForkTask(start,middle);
-
ForkTask subTask2= new ForkTask(middle+ 1,end);
-
//JDK自带的方法,相当于将两个子任务交给线程池去做了。
-
invokeAll(subTask1,subTask2);
-
//结果合并
-
Long sum1=subTask1.join();
-
//若没有完成,则线程阻塞
-
Long sum2=subTask2.join();
-
//结果合并
-
sum=sum1+sum2;
-
}
-
return sum;
-
}
-
}
-
6 并发数据结构
- 常用的数据结构是线程不安全的
- ArrayList, HashMap, HashSet非同步的
- 多个线程同时读写,可能会抛出异常或数据错误 - 传统Vector,Hashtable等同步集合性能过差
- 并发数据结构:数据添加和删除
- 阻塞式集合:当集合为空或者满时,等待
- 非阻塞式集合:当集合为空或者满时,不等待,返回null或异常 - List
- Vector 同步安全,写多读少
- ArrayList 不安全
- Collections.synchronizedList(List list) 基于synchronized,效率差
- CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞 - Set
- Map
- Quene & Deque
7 JAVA并发协作控制
线程协作
- Thread/Executor/Fork-Join
– 线程启动,运行,结束
– 线程之间缺少协作 - synchronized 同步
– 限定只有一个线程才能进入关键区
– 简单粗暴,性能损失有点大
Lock
- Lock也可以实现同步的效果
– 实现更复杂的临界区结构
– tryLock方法可以预判锁是否空闲
– 允许分离读写的操作,多个读,一个写
– 性能更好 - ReentrantLock类,可重入的互斥锁
- ReentrantReadWriteLock类,可重入的读写锁
- lock和unlock函数
- 编程实例:排队买奶茶,以及老板员工看账本
-
package com.mooc.Chapter8;
-
-
import java.util.concurrent.locks.ReentrantLock;
-
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
public class LockExample {
-
-
private static final ReentrantLock queneLock= new ReentrantLock(); //可重入锁
-
private static final ReentrantReadWriteLock orderLock= new ReentrantReadWriteLock(); //可重入读写锁
-
public static void main(String[] args) throws InterruptedException {
-
//buyMilkTea();
-
handleOrder();
-
}
-
-
//试图去买奶茶的方法
-
public void tryToBuyMilkTea() throws InterruptedException {
-
boolean flag= true; //用于进入while循环轮询队伍是否有人,当没人时好买完退出来。
-
while (flag){
-
//队列中是否有人
-
if(queneLock.tryLock()){
-
long thinkingTime=( long) (Math.random()* 500);
-
Thread.sleep(thinkingTime);
-
System.out.println(Thread.currentThread().getName()+ ": 来一杯珍珠奶茶");
-
flag= false;
-
queneLock.unlock();
-
} else {
-
System.out.println(Thread.currentThread().getName()+ ":再等等");
-
}
-
if (flag){
-
Thread.sleep( 1000); //每个1秒钟来轮询一下
-
}
-
}
-
}
-
-
public static void buyMilkTea() throws InterruptedException {
-
LockExample lockExample= new LockExample();
-
int STUDENT_CNT= 10;
-
-
Thread[] students= new Thread[STUDENT_CNT];
-
for( int i= 0;i<STUDENT_CNT;i++){
-
students[i]= new Thread( new Runnable() {
-
@Override
-
public void run() {
-
-
try {
-
long walkingTime= ( long) (Math.random()* 1000);
-
Thread.sleep(walkingTime);
-
lockExample.tryToBuyMilkTea(); //很重要,精髓
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
});
-
students[i].start();
-
}
-
//线程结束
-
for( int i= 0;i<STUDENT_CNT;i++){
-
students[i].join();
-
}
-
}
-
-
public void addOrder() throws InterruptedException {
-
//加写锁
-
orderLock.writeLock().lock();
-
long writingTime= ( long) (Math.random()* 1000);
-
Thread.sleep(writingTime);
-
System.out.println( "老板新加一笔订单");
-
//释放锁
-
orderLock.writeLock().unlock();
-
}
-
-
public void viewOrder() throws InterruptedException {
-
orderLock.readLock().lock();
-
long writingTime= ( long) (Math.random()* 500);
-
Thread.sleep(writingTime);
-
System.out.println(Thread.currentThread().getName()+ ": 正在查看订单本");
-
orderLock.readLock().unlock();
-
}
-
-
public static void handleOrder(){
-
LockExample lockExample= new LockExample();
-
-
Thread boss= new Thread( new Runnable() {
-
@Override
-
public void run() {
-
while( true){
-
try {
-
lockExample.addOrder();
-
long waitingTime= ( long) (Math.random()* 1000);
-
Thread.sleep(waitingTime);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
});
-
boss.start();
-
-
int workersCnt= 3;
-
Thread[] workers= new Thread[workersCnt];
-
for( int i= 0;i<workersCnt;i++){
-
workers[i]= new Thread( new Runnable() {
-
@Override
-
public void run() {
-
while( true){
-
try {
-
lockExample.viewOrder();
-
long waitingTime= ( long) (Math.random()* 500);
-
Thread.sleep(waitingTime);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
});
-
workers[i].start();
-
}
-
}
-
}
信号量
-
- 信号量:本质上是一个计数器
- 计数器大于0,可以使用,等于0不能使用
- 可以设置多个并发量,例如限制10个访问
- Semaphore
– acquire获取
– release释放 - 比Lock更进一步,可以控制多个同时访问关键区
示例:停车场10辆车5个车位模拟
-
package com.mooc.Chapter8;
-
-
import java.util.concurrent.Semaphore;
-
-
public
class SemaphoreExample {
-
-
//信号量数目
-
private
final Semaphore placeSemaphore =
new Semaphore(
5);
-
-
public static void main(String[] args) throws InterruptedException {
-
Thread[] cars=
new Thread[
10];
-
SemaphoreExample semaphoreExample=
new SemaphoreExample();
-
for(
int i=
0;i<
10;i++){
-
-
cars[i]=
new Thread(
new Runnable() {
-
@Override
-
public void run() {
-
try {
-
//汽车随机到来,所以先过来休眠
-
long randomTime= (
long) (Math.random()*
1000);
-
Thread.sleep(randomTime);
-
//在判断是否有车位
-
if(semaphoreExample.parking()){
-
long parkingTime= (
long) (Math.random()*
1000);
-
Thread.sleep(parkingTime);
-
System.out.println(Thread.currentThread().getName()+
": 停车"+parkingTime+
"ms");
-
semaphoreExample.leaving();
-
}
-
}
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
});
-
cars[i].start();
-
}
-
-
for(
int i=
0;i<
10;i++){
-
cars[i].join();
-
}
-
}
-
-
public boolean parking() throws InterruptedException {
-
-
boolean flag=
true;
//标记,用于轮询后退出
-
while (flag){
-
if (placeSemaphore.tryAcquire()){
-
System.out.println(Thread.currentThread().getName()+
":停车成功");
-
return
true;
//这里直接退出了
-
}
else {
-
System.out.println(Thread.currentThread().getName()+
":没有空位,需要等待");
-
Thread.sleep(
1000);
-
}
-
}
-
return flag;
-
}
-
-
public void leaving(){
-
placeSemaphore.release();
-
System.out.println(Thread.currentThread().getName()+
":开走了");
-
}
-
-
}
Latch
Barrier
Phaser
Exchanger
转载:https://blog.csdn.net/weixin_39966701/article/details/105103904
查看评论