飞道的博客

多线程与高并发学习笔记(二)

509人阅读  评论(0)

本文主要介绍一些Java中常用的同步工具类

ReentrantLock

可重入锁,可代替synchronized,它比synchronized更加的灵活,提供了更多的方法,但在使用上需要手动的加锁和释放锁;底层使用CAS来实现。使用synchronized锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放.

常用方法:
Boolean tryLock() 尝试获取锁,不会阻塞等待 还可加入参数自旋一定时间来获取锁
lockinterruptibly() 使用该方法获取锁可以让线程对interrupt() 做出反应。
在创建时可传递参数指定为公平锁(新线程来了是进入等待队列还是和已在队列中的线程一起抢锁)
具体的使用方法参考 Java API

private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

await()  signalAll()

ReentrantLock还支持上面这种用法,newCondition(可以这么理解,每new一个Condition都相当于多一个等待队列 ,通过signal() 可以唤醒特定队列中等待的线程 );在生产者和消费者模式里,可以利用Condition这一特点,利用signalAll() 来只唤醒对应的生产者 或消费者队列里的线程。

CountDownLatch

表面翻译为倒数门栓,相当于一个同步计数器(倒数的);创建的时候指定计数器的值,每调用一次countDown() 方法 计数器的值减一,直到计数器值变为0 在CountDownLatch上等待的线程才会执行(其他线程做countDown() 等待的线程通过调用await()方法来等待);CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次。

private static void usingCountDownLatch() {
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);

        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
                latch.countDown();
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end latch");
    }

CyclicBarrier

翻译为循环障碍,可和CountDownLatch对比理解。它是循环的,可多次使用,是一个循环的障碍。

public static void main(String[] args) {
        //CyclicBarrier barrier = new CyclicBarrier(20);

        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("通过"));

        /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
            @Override
            public void run() {
                System.out.println("通过");
            }
        });*/
        for(int i=0; i<100; i++) {

                new Thread(()->{
                    try {
                        barrier.await();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
        }
    }

ReadWriteLock

读写锁,为了提升效率,在读的时候加读锁(共享锁)多个线程可同时读,在写的时候加写锁(排它锁)一次只能有一个线程在写。
具体的使用如下代码

public class TestReadWriteLock{
 static Lock lock = new ReentrantLock();
    private static int value;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();

    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("read over!");
            //模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = v;
            System.out.println("write over!");
            //模拟写操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        //Runnable readR = ()-> read(lock);
        Runnable readR = ()-> read(readLock);
        
        //Runnable writeR = ()->write(lock, new Random().nextInt());
        Runnable writeR = ()->write(writeLock, new Random().nextInt());

        for(int i=0; i<18; i++) new Thread(readR).start();
        for(int i=0; i<2; i++) new Thread(writeR).start();
    }
 }

Phaser

分阶段的同步工具, 适用于分阶段的任务执行上 第一阶段达成目标 然后执行第二阶段 第二阶段达成目标后 依次继续 ;使用上需要先继承Phaser 重写onAdvance()方法,有一个目标,只有当目标达成才会执行后续的阶段。
具体适用参考如下代码:

public class TestPhaser {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();

    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
		// 本例中的目标 
        phaser.bulkRegister(5);

        for(int i=0; i<5; i++) {
            final int nameIndex = i;
            new Thread(()->{

                Person p = new Person("person " + nameIndex);
                p.arrive();
                //本例中没调用一次register多1 只有达到5才会执行后面的操作
                phaser.arriveAndAwaitAdvance();

                p.eat();
                phaser.arriveAndAwaitAdvance();

                p.leave();
                phaser.arriveAndAwaitAdvance();
            }).start();
        }

    }

    static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
			//自定义的阶段,本例分3个阶段,第一次达到执行 case 0 依次类推
            switch (phase) {
                case 0:
                    System.out.println("所有人到齐了!");
                    return false;
                case 1:
                    System.out.println("所有人吃完了!");
                    return false;
                case 2:
                    System.out.println("所有人离开了!");
                    System.out.println("婚礼结束!");
                    return true;
                default:
                    return true;
            }
        }
    }

    static class Person {
        String name;

        public Person(String name) {
            this.name = name;
        }

        public void arrive() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
        }

        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
        }

        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);
        }
    }
}

semaphore

信号量,相当于一个令牌,持令牌者可执行,令牌可申请,用完之后需要释放。
具体代码如下:

public class TestSemaphore {
    public static void main(String[] args) {    
    	//创建时初始化两个令牌,公平申请模式 默认非公平    
        Semaphore s = new Semaphore(2, true);
        //允许一个线程同时执行
        //Semaphore s = new Semaphore(1);

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(()->{
            try {
            	//申请
                s.acquire();
                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");
				//释放
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Exchanger

交换器,用于两个线程交换数据,可这样理解,Exchange相当于一个容器,一个线程调用exchange方法时会阻塞等待,当另一个线程也掉了exchange方法后,进入该容器交换二者数据,然后返回。
具体使用参考如下:

public class TestExchanger {

    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(()->{
            String s = "data1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t1").start();

        new Thread(()->{
            String s = "data2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t2").start();
    }
}

LockSupport

该工具类可以指定特定线程停止与开始。
具体使用参考如下:

public class T13_TestLockSupport {
    public static void main(String[] args) {
        Thread t = new Thread(()->{
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if(i == 5) {
                	//停止当前线程的执行
                    LockSupport.park();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t.start();
		//刚开始就执行恢复执行,会使得线程不会停止,相当于作废停止
        LockSupport.unpark(t);

        /*
        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after 8 senconds!");
        //让停止的t线程恢复执行
        LockSupport.unpark(t);
        */

    }
}

转载:https://blog.csdn.net/IT_0008/article/details/105811111
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场