生产者消费者问题

  2019-6-16 


经典问题

生产者消费者问题关键点

①三类实体:生产者(多个线程)、消费者(多个线程)、仓库(也叫做资源,在生消问题中是一个封装的list)

生产者需要考虑仓库满问题(阻塞等待),消费者需要考虑仓库空问题(阻塞等待),仓库需要考虑互斥访问问题(阻塞等待)

【以及三者访问架构(线程通信问题访问架构):由仓库(资源)对象管理实际被访问的资源list,提供produce和consume方法给消费者线程和生产者线程使用,这两个方法互斥地访问list对象】

②仓库是互斥访问的,同时只能有一个线程(生产者或消费者)访问

③如何做到线程间的通信

生产者消费者问题实现方法

wait() / notify()方法

被锁住的对象是list(消费者线程和生产者线程互斥访问list)那么就要调用list.wait()和list.notify()进行线程间通信

import java.util.LinkedList;

//仓库(即资源)
class Storage
{
    //仓库容量
    private final int MAX_SIZE = 10;
    //仓库内容
    private LinkedList<Object> list = new LinkedList<>();

    //生产一个产品添加进仓库
    public void produce()
    {
        synchronized(list)
        {
            //如果仓库已满,就wait等待,放弃锁,等待notify唤醒重新获取锁(
            //因为仓库是互斥访问的,这时候仓库是满的,生产者得等消费者先消费了再生产,所以生产者如果获取了锁就要主动放弃锁,等消费者notify()
            while(list.size()+1>MAX_SIZE)
            {
                System.out.println("【生产者"+Thread.currentThread().getName()+"】仓库已满");
                try
                {
                    list.wait();  //必须由notify()唤醒
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            //仓库未满,可以生产加入仓库了
            list.add(new Object());
            System.out.println("【生产者"+Thread.currentThread().getName()+"】生产一个产品,现库存"+list.size());
            //放弃锁,并通知wait队列可以获取锁了(唤醒生产者)
            list.notify();
        }
    }

    //从仓库中消费一个产品
    public void consume()
    {
        synchronized (list)
        {
            while (list.size()==0)
            {
                System.out.println("【消费者"+Thread.currentThread().getName()+"】仓库为空");
                try
                {
                    list.wait();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            list.remove();
            System.out.println("【消费者"+Thread.currentThread().getName()+"】消费一个产品,现库存"+list.size());
            list.notify();
        }
    }
}

//定义生产者任务
class Producer implements Runnable
{
    //用于记录仓库的引用,来让生产者访问仓库
    Storage storage;
    public Producer(Storage storage)
    {
        this.storage = storage;
    }
    public void run()
    {
        storage.produce();
    }
}

//定义消费者任务
class Consumer implements Runnable
{
    Storage storage;
    public Consumer(Storage storage)
    {
        this.storage = storage;
    }
    public void run()
    {
        storage.consume();
    }
}

public class Main
{
    public static void main(String[] args)
    {
        //建立仓库实体
        Storage storage = new Storage();
        //创建生产者消费者线程
        Thread p1 = new Thread(new Producer(storage));
        Thread p2 = new Thread(new Producer(storage));
        Thread p3 = new Thread(new Producer(storage));

        Thread c1 = new Thread(new Consumer(storage));
        Thread c2 = new Thread(new Consumer(storage));
        Thread c3 = new Thread(new Consumer(storage));

        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

await() / signal()方法(ReentrantLock的应用)

await() / signal()是属于ReentrantLock机制的Condition组件的方法,而不是Object的

一个ReentrantLock锁可以创建多个Condition,每个Condition都维护一个等待队列

修改仓库类,不使用重量级锁synchronized,改用ReentrantLock轻量级锁

仓库使用一把ReentrantLock锁,并同时维护两个Condition队列:full Condition队列 和 empty Condition队列,将生产者和消费者分开到两个Condition监视器中:full Condition为仓库满的监视器,维护了一个队列,该队列上的线程都是因为list已满而阻塞的生产者;empty Condition为仓库空的监视器,维护了一个队列,该队列上的线程都是因为list为空而阻塞的消费者(当然也可以只用1个Conditon)

多个Condition维持多个等待队列可以使得通知更准确,只通知指定队列的线程开始获取锁,并发效率高

class Storage
{
    //仓库容量
    private final int MAX_SIZE = 10;
    //仓库内容
    private LinkedList<Object> list = new LinkedList<>();

    //创建ReentrantLock锁
    private final ReentrantLock lock = new ReentrantLock();

    //将生产者和消费者分开到两个Condition监视器中
    //仓库满的监视器,维护了一个队列,该队列上的线程都是因为list已满而阻塞的生产者
    private final Condition full = lock.newCondition();
    //仓库空的监视器,维护了一个队列,该队列上的线程都是因为list为空而阻塞的消费者
    private final Condition empty = lock.newCondition();

    //生产一个产品添加进仓库
    public void produce()
    {
        //不使用重量级锁synchronized,改用轻量级锁
        lock.lock();
        while(list.size()+1>MAX_SIZE)
        {
            System.out.println("【生产者"+Thread.currentThread().getName()+"】仓库已满");
            try
            {
                full.await();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生产者"+Thread.currentThread().getName()+"】生产一个产品,现库存"+list.size());

        //通知empty condition队列上的所有线程
        empty.signalAll();
        lock.unlock();
    }


    //从仓库中消费一个产品
    public void consume()
    {
        lock.lock();
        while (list.size()==0)
        {
            System.out.println("【消费者"+Thread.currentThread().getName()+"】仓库为空");
            try
            {
                empty.await();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消费者"+Thread.currentThread().getName()+"】消费一个产品,现库存"+list.size());
        full.signalAll();
        lock.unlock();
    }
}

BlockingQueue阻塞队列方法

class Storage
{
    private LinkedBlockingDeque<Object> list = new LinkedBlockingDeque<>(10);
    public void produce()
    {
        try{
            list.put(new Object());   //阻塞队列如果队满会阻塞住,并自动释放锁,通知其它等待线程,相当于帮我们自动完成了并发管理
            System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public void consume()
    {
        try{
            list.take();
            System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + list.size());
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

信号量

设计三个信号量

①互斥访问信号量:保证访问仓库的互斥性

②仓库空位信号量(空的格子即仓库还能放入的资源数量):维持生产者等待队列

③仓库满格信号量(满的格子即仓库已放入的资源数量):维持消费者等待队列(若无,则队空)

这里实际上可以结合前面的方法,三类实体各自采用不同的并发控制实现手段

注意这里的先后顺序,是先申请生产产品,再申请仓库访问互斥量(因为先判断能否生产仓库是否已满,没满才去申请访问仓库开始操作

class Storage
{
    //仓库
    LinkedList<Object> list = new LinkedList<Object>();
    //访问仓库互斥信号量
    Semaphore mutex = new Semaphore(1);
    //空格信号量
    Semaphore EmptyPositions = new Semaphore(10);
    //满格信号量
    Semaphore FullPositions = new Semaphore(0);

    public void produce()
    {
        try{
            //请求生产一个商品,那么空格-1(释放一个空格—)
            EmptyPositions.acquire();

            //注意这里的先后顺序,是先申请生产产品,再申请仓库访问互斥量(因为先判断能否生产仓库是否已满,没满才去申请访问仓库开始操作)
            //申请仓库访问互斥量
            mutex.acquire();
            //生产操作
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //生产操作完成,释放仓库访问互斥量
            mutex.release();
            //生产操作完成,满格+1(增加一个满格)
            FullPositions.release();
        }
    }

    public void consume()
    {
        try{
            FullPositions.acquire();
            mutex.acquire();
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size());
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            mutex.release();
            EmptyPositions.release();
        }
    }
}

管道

不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

只适合两个线程间通信

暂略

参考:Java多种方式解决生产者消费者问题


且听风吟