经典问题
生产者消费者问题关键点
①三类实体:生产者(多个线程)、消费者(多个线程)、仓库(也叫做资源,在生消问题中是一个封装的list)
生产者需要考虑仓库满问题(阻塞等待),消费者需要考虑仓库空问题(阻塞等待),仓库需要考虑互斥访问问题(阻塞等待)
【以及三者访问架构(线程通信问题访问架构):由仓库(资源)对象管理实际被访问的资源list,提供produce和consume方法给消费者线程和生产者线程使用,这两个方法互斥地访问list对象】
②仓库是互斥访问的,同时只能有一个线程(生产者或消费者)访问
③如何做到线程间的通信
生产者消费者问题实现方法
wait() / notify()方法
被锁住的对象是list(消费者线程和生产者线程互斥访问list)那么就要调用list.wait()和list.notify()进行线程间通信
import java.util.LinkedList;
//仓库(即资源)
class Storage
{
//仓库容量
private final int MAX_SIZE = 10;
//仓库内容
private LinkedList<Object> list = new LinkedList<>();
//生产一个产品添加进仓库
public void produce()
{
synchronized(list)
{
//如果仓库已满,就wait等待,放弃锁,等待notify唤醒重新获取锁(
//因为仓库是互斥访问的,这时候仓库是满的,生产者得等消费者先消费了再生产,所以生产者如果获取了锁就要主动放弃锁,等消费者notify()
while(list.size()+1>MAX_SIZE)
{
System.out.println("【生产者"+Thread.currentThread().getName()+"】仓库已满");
try
{
list.wait(); //必须由notify()唤醒
}catch (InterruptedException e){
e.printStackTrace();
}
}
//仓库未满,可以生产加入仓库了
list.add(new Object());
System.out.println("【生产者"+Thread.currentThread().getName()+"】生产一个产品,现库存"+list.size());
//放弃锁,并通知wait队列可以获取锁了(唤醒生产者)
list.notify();
}
}
//从仓库中消费一个产品
public void consume()
{
synchronized (list)
{
while (list.size()==0)
{
System.out.println("【消费者"+Thread.currentThread().getName()+"】仓库为空");
try
{
list.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者"+Thread.currentThread().getName()+"】消费一个产品,现库存"+list.size());
list.notify();
}
}
}
//定义生产者任务
class Producer implements Runnable
{
//用于记录仓库的引用,来让生产者访问仓库
Storage storage;
public Producer(Storage storage)
{
this.storage = storage;
}
public void run()
{
storage.produce();
}
}
//定义消费者任务
class Consumer implements Runnable
{
Storage storage;
public Consumer(Storage storage)
{
this.storage = storage;
}
public void run()
{
storage.consume();
}
}
public class Main
{
public static void main(String[] args)
{
//建立仓库实体
Storage storage = new Storage();
//创建生产者消费者线程
Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread p3 = new Thread(new Producer(storage));
Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
Thread c3 = new Thread(new Consumer(storage));
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}
await() / signal()方法(ReentrantLock的应用)
await() / signal()是属于ReentrantLock机制的Condition组件的方法,而不是Object的
一个ReentrantLock锁可以创建多个Condition,每个Condition都维护一个等待队列
修改仓库类,不使用重量级锁synchronized,改用ReentrantLock轻量级锁
仓库使用一把ReentrantLock锁,并同时维护两个Condition队列:full Condition队列 和 empty Condition队列,将生产者和消费者分开到两个Condition监视器中:full Condition为仓库满的监视器,维护了一个队列,该队列上的线程都是因为list已满而阻塞的生产者;empty Condition为仓库空的监视器,维护了一个队列,该队列上的线程都是因为list为空而阻塞的消费者(当然也可以只用1个Conditon)
多个Condition维持多个等待队列可以使得通知更准确,只通知指定队列的线程开始获取锁,并发效率高
class Storage
{
//仓库容量
private final int MAX_SIZE = 10;
//仓库内容
private LinkedList<Object> list = new LinkedList<>();
//创建ReentrantLock锁
private final ReentrantLock lock = new ReentrantLock();
//将生产者和消费者分开到两个Condition监视器中
//仓库满的监视器,维护了一个队列,该队列上的线程都是因为list已满而阻塞的生产者
private final Condition full = lock.newCondition();
//仓库空的监视器,维护了一个队列,该队列上的线程都是因为list为空而阻塞的消费者
private final Condition empty = lock.newCondition();
//生产一个产品添加进仓库
public void produce()
{
//不使用重量级锁synchronized,改用轻量级锁
lock.lock();
while(list.size()+1>MAX_SIZE)
{
System.out.println("【生产者"+Thread.currentThread().getName()+"】仓库已满");
try
{
full.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者"+Thread.currentThread().getName()+"】生产一个产品,现库存"+list.size());
//通知empty condition队列上的所有线程
empty.signalAll();
lock.unlock();
}
//从仓库中消费一个产品
public void consume()
{
lock.lock();
while (list.size()==0)
{
System.out.println("【消费者"+Thread.currentThread().getName()+"】仓库为空");
try
{
empty.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者"+Thread.currentThread().getName()+"】消费一个产品,现库存"+list.size());
full.signalAll();
lock.unlock();
}
}
BlockingQueue阻塞队列方法
class Storage
{
private LinkedBlockingDeque<Object> list = new LinkedBlockingDeque<>(10);
public void produce()
{
try{
list.put(new Object()); //阻塞队列如果队满会阻塞住,并自动释放锁,通知其它等待线程,相当于帮我们自动完成了并发管理
System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
}
public void consume()
{
try{
list.take();
System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + list.size());
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
信号量
设计三个信号量
①互斥访问信号量:保证访问仓库的互斥性
②仓库空位信号量(空的格子即仓库还能放入的资源数量):维持生产者等待队列
③仓库满格信号量(满的格子即仓库已放入的资源数量):维持消费者等待队列(若无,则队空)
这里实际上可以结合前面的方法,三类实体各自采用不同的并发控制实现手段
注意这里的先后顺序,是先申请生产产品,再申请仓库访问互斥量(因为先判断能否生产仓库是否已满,没满才去申请访问仓库开始操作
class Storage
{
//仓库
LinkedList<Object> list = new LinkedList<Object>();
//访问仓库互斥信号量
Semaphore mutex = new Semaphore(1);
//空格信号量
Semaphore EmptyPositions = new Semaphore(10);
//满格信号量
Semaphore FullPositions = new Semaphore(0);
public void produce()
{
try{
//请求生产一个商品,那么空格-1(释放一个空格—)
EmptyPositions.acquire();
//注意这里的先后顺序,是先申请生产产品,再申请仓库访问互斥量(因为先判断能否生产仓库是否已满,没满才去申请访问仓库开始操作)
//申请仓库访问互斥量
mutex.acquire();
//生产操作
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
}catch (InterruptedException e){
e.printStackTrace();
}finally {
//生产操作完成,释放仓库访问互斥量
mutex.release();
//生产操作完成,满格+1(增加一个满格)
FullPositions.release();
}
}
public void consume()
{
try{
FullPositions.acquire();
mutex.acquire();
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size());
}catch (InterruptedException e){
e.printStackTrace();
}finally {
mutex.release();
EmptyPositions.release();
}
}
}
管道
不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。
只适合两个线程间通信
暂略