小言_互联网的博客

JUC并发编程学习(四)-生产者与消费者

346人阅读  评论(0)

生产者和消费者

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer
problem),是一个多线程同步问题的经典案例。
简单说明,就是一个类用于生产,一个类用于消费。生产者生产一定的数据放入管存区,然后重复此操作,同时,消费者消费缓存区的数据。生产者和消费者之间必须保持同步 。要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

解决方法(思路)

采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决问题的核心
   保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

1.wait() / notify()方法—传统
2.await() / signal()方法—新版
3.BlockingQueue阻塞队列方法
4.信号量
5.管道

接下来重点介绍,1,2方法

生产者和消费者 synchroinzed 版 (wait/notify()方法)

题目:现在两个线程,操作一个初始值为0的变量 一个线程 + 1, 一个线程 -1。判断什么时候+1,什么时候-1
交替10 次

package com.juc.study.lockdemo;


/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {


        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();

    }


}


//属性、方法
class Data {


    private int num = 0;


    //+1操作
    public synchronized void increment() {
        //判断
        if (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }


    //-1 操作
    public synchronized void decrement() {


        //判断
        if (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }
}

控制台输出:

使用了 synchronized关键字,当前只有2个操作是没存在什么问题。再加多加几个线程试试。

问题升级:防止虚假唤醒,4个线程,两个加,两个减

package com.juc.study.lockdemo;


/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {


        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();




    }


}


//属性、方法
class Data {


    private int num = 0;


    //+1操作
    public synchronized void increment() {
        //判断
        if (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }


    //-1 操作
    public synchronized void decrement() {


        //判断
        if (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }


}

然后发现出现了下面这样的问题。这是虚假唤醒的问题。

虚假唤醒:FalseNotify,线程竞争监视器导致的虚假唤醒。
虚假唤醒原因:A线程由wait被notify后,状态由WAITING变成BLOCKED状态,来竞争监视器,但是另外一个线程B也处于BLOCKED状态,它也会来竞争监视器,这是,CPU是没办法控制到底是谁先拿到监视器。
如果不是wait的A线程先拿到监视器,那当wait的A线程拿到监视器的时候,共享的值已经改变了。

查看了一下JDK文档,有做这样的说明
原因是我们写的判断语句使用的是if,这样的写法引发了虚假唤醒的问题。改为while就没有这样的问题。

package com.juc.study.lockdemo;


/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知
* @Version: 1.0
*/
public class A {
    public static void main(String[] args) {


        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();




    }


}


//属性、方法
class Data {


    private int num = 0;


    //+1操作
    public synchronized void increment() {
        //判断
       while (num > 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num++;
        System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
        //通知
        this.notify();
    }


    //-1 操作
    public synchronized void decrement() {


        //判断
       while (num == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //干活
        num--;
        System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
        this.notify();
    }


}

生产者和消费者-新版JUC写法

查看jdk1.8文档,可看到 使用了Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。

手写生产者消费者问题:100 分写法

package com.juc.study.lockdemo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-14 15:25
* @description: TDOO 生产者消费模型:判断、干活、通知  //新版写法 
* @Version: 1.0
*/
public class B {
    public static void main(String[] args) {


        Data2 data = new Data2();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();


        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();

    }

}

//属性、方法
class Data2 {


    private int num = 0;


    //定义锁
    private Lock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();


    //+1操作
    public  void increment() {

        //加锁
        lock.lock();

        try {
            //判断
            while (num > 0) {
                 condition.await(); //等待
             }
            //干活
            num++;
            System.out.println(Thread.currentThread().getName() + "线程加操作\t" + num);
            condition.signalAll();//通知


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            lock.unlock();
        }
    }


    //-1 操作
    public  void decrement() {


        //加锁
        lock.lock();
        try {
            //判断
            while (num == 0) {
                condition.await();//等待
             }
            //干活
            num--;
            System.out.println(Thread.currentThread().getName() + "线程减操作\t" + num);
            condition.signalAll();//通知
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//    解锁
        }
    }
}

以上4个进程,唤醒是随机的,即进程的执行时随机的,不是有序进行的,那怎么能让进程有序执行呢。接下来我们讨论进程间如何精确通知访问。

控制线程精确通知顺序访问 Condition

Lock提供了一个接口Condition,通过Lock类对象获取Condition实现类对象。通过Condition,可以指定唤醒哪个进程。

condition最大的特点是可以实现精确顺序通知线程的使用.
一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其newCondition()方法。

例如,假设我们有一个有限的缓冲区,它支持put和take方法。 如果在一个空的缓冲区尝试一个take ,则线程将阻塞直到一个项目可用; 如果put试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put线程和take线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition实例来实现。

  class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock(); try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally { lock.unlock(); }
   }

   public Object take() throws InterruptedException {
     lock.lock(); try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally { lock.unlock(); }
   }
 } 

实例
使用num进行数字变换,1,2,3分别代表A,B,C。 当num=1时,A打印3次,打印完通知B线程打印10次,B线程打印结束通知C线程。

package com.juc.study.lockdemo;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* @ClassName:
* @PackageName: com.juc.study.lockdemo
* @author: youjp
* @create: 2020-04-15 14:19
* @description:    TODO 精确线程调用A->B->C->A
* @Version: 1.0
*/
public class C {
    public static void main(String[] args) {
        Data3 da = new Data3();
        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print3();
            }
        }, "A").start();


        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print10();
            }
        }, "B").start();


        new Thread(() -> {
            for (int i = 0; i <10 ; i++) {
                da.print5();
            }
        }, "C").start();
    }
}


class Data3 {


    private int num = 1;  //1:A  2:B 3:C ,1表示A,2表示B...
    //锁
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition(); ////3个判断,交替执行 A--B-- C--A
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();




    /**
     * A :打印3次后,通知B
     */
    public void print3() {
        lock.lock();
        //获取锁
        try {
            while (num != 1) {
                conditionA.await();
            }
            //干活
            for (int i = 0; i < 3; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 2;
            //第一个线程通知第2个线程,第2个通知第3个。。
            conditionB.signal();


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }




    /**
     * B :打印10次后,通知C
     */
    public void print10() {
        //获取锁
        lock.lock();
        try {
            while (num != 2) {
                conditionB.await();//等待
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 3;
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    /**
     * C:打印5次后,通知A
     */
    public void print5() {
        lock.lock();


        try {
            while (num != 3) {
                conditionC.await();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t打印次数:" + i);
            }
            num = 1;
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

打印结果:

其他实现方法参考:
https://blog.csdn.net/zgj12138/article/details/74012263?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1


转载:https://blog.csdn.net/qq_36654629/article/details/105932461
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场