飞道的博客

线程同步工具Condition详解

294人阅读  评论(0)

Condition可以代替Object监视器方法( wait , notify和notifyAll ),必须和Lock配合使用, Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。可以使某个线程挂起,直到其他线程唤醒,就像Object.wait()方法,一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例,请使用lLock的newCondition()方法。


可以看出它是一个接口,所以其逻辑必须由实现类代替

获取Condition

 private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

关于ReentrantLock请移步并发编程之ReentrantLock解析

 * @return the Condition object
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

//真正的实现类

   final ConditionObject newCondition() {
            return new ConditionObject();
        }

重要方法

/**
* 阻塞该线程,直到其他线程signal()方法或者signalAll()方法,亦或者被其他线程打断,会清楚中断状态
*
**/
 void await() throws InterruptedException;
/**
*  在await()方法上设置了超时时间而已
*
**/
 boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 唤醒阻塞的线程如果有多个线程阻塞,随机唤醒一个
*
**/
  void signal();
/**
* 唤醒所有阻塞的线程
*
**/
void signalAll();

案例分析
Condition在阻塞队列应用的比较多如ArrayBlockingQueue的put方法

 /**
     * 将指定的元素加入队列,如果队列满了发生阻塞
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
            //当队列满了就阻塞
                notFull.await();
                //有可用空间就插入元素
            enqueue(e);
        } finally {
        //释放锁
            lock.unlock();
        }
    }

关于ArrayBlockingQueue请移步ArrayBlockingQueue详解

/**
 * 描述:     演示Condition的基本用法
 */
public class ConditionDemo1 {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    void method1() throws InterruptedException {
        lock.lock();
        try{
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,开始执行后续的任务");
        }finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try{
            System.out.println("准备工作完成,唤醒其他的线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        conditionDemo1.method1();
    }
}

/**
 * 描述:     演示用Condition实现生产者消费者模式
 */
public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}


转载:https://blog.csdn.net/qq_34800986/article/details/106145257
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场