0
点赞
收藏
分享

微信扫一扫

Java基础-并发编程-生产者与消费者

Java工程师知识树 / Java基础


问题描述

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。

生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。

生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

Java实现生产者与消费者的几种方式

  1. synchronized + Object.wait() / Object.notifyAll()方法
  2. Lock + Condition.await() / Condition.signalAll()方法
  3. BlockingQueue阻塞队列方法
  4. Semaphore信号量

synchronized + Object.wait() / Object.notifyAll()方法

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 生产与消费的仓库
* 生产者与消费者模式synchronized实现
*/

public class Storage {

private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
//生产者
public void producer() {
while (true) {//持续生产
synchronized (this) {
if (dataList.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
try {
wait();//不生产了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
notifyAll();//唤醒全部等待线程
}
}
}
}
//消费者
public void consumer() {
while (true) {//持续消费
synchronized (this) {
if (dataList.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
try {
wait();//不消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
notifyAll();//唤醒全部等待线程
}
}
}
}

public static void main(String[] args) {
Storage storage = new Storage();
new Thread(storage::producer).start();
new Thread(storage::consumer).start();
}
}

Lock + Condition.await() / Condition.signalAll()方法

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 生产与消费的仓库
* 生产者与消费者模式 Lock 实现
* 还可以使用 tryLock与isHeldByCurrentThread组合使用
*/

public class Storage1 {

private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void producer() {
while (true) {
try {
lock.lock();
if (dataList.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
try {
condition.await();//不生产了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//唤醒全部等待线程
}
}finally {
lock.unlock();
}
}
}

public void consumer() {
while (true) {
try {
lock.lock();
if (dataList.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
try {
condition.await();//不消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//唤醒全部等待线程
}
}finally {
lock.unlock();
}
}
}


public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 4; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 4; i++) {
new Thread(storage::consumer).start();
}
}
}

BlockingQueue阻塞队列方法

阻塞队列:
当阻塞队列为空时,从阻塞队列中取数据的操作会被阻塞。
当阻塞队列为满时,往阻塞队列中添加数据的操作会被阻塞。

阻塞队列名称 说明
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。
package com.study.thread.procon;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* 生产与消费的仓库
* 生产者与消费者模式 BlockingQueue 实现
*/

public class Storage2 {

private static final int MAX = 10;//仓库储量
private BlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<Object>();//仓库产品
//生产者
public void producer() {
while (true) {
if (blockingQueue.size() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
} else {
try {
TimeUnit.SECONDS.sleep(1);
blockingQueue.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者
public void consumer() {
while (true) {
if (blockingQueue.size() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
} else {
try {
TimeUnit.SECONDS.sleep(1);
blockingQueue.remove(0);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + blockingQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 4; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 4; i++) {
new Thread(storage::consumer).start();
}
}
}

Semaphore信号量

package com.study.thread.procon;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* 生产与消费的仓库
* 生产者与消费者模式synchronized实现
*/

public class Storage4 {

private static final int MAX = 10;//仓库储量
private List<Object> dataList = new LinkedList<>();//仓库产品
//创建一个用Static和volatile关键字修饰的信号量实例,传参为0,表示初始计数为0
private static volatile Semaphore semaphore = new Semaphore(0);

//生产者
public void producer() {
while (true) {//持续生产
if (semaphore.availablePermits() >= MAX) {
System.out.println("仓库储量已满,【生产者" + Thread.currentThread().getName()
+ "】无需再生产");
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.add(new Object());
//释放一个许可,并将其返还给信号量,也就是将 permits+1
semaphore.release();
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

//消费者
public void consumer() {
while (true) {//持续消费
if (semaphore.availablePermits() <= 0) {
System.out.println("仓库储量已空,【消费者" + Thread.currentThread().getName()
+ "】不能再继续消费");
} else {
try {
TimeUnit.SECONDS.sleep(1);
dataList.remove(0);
semaphore.release(-1);
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + dataList.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Storage storage = new Storage();
for (int i = 0; i < 40; i++) {
new Thread(storage::producer).start();
}
for (int i = 0; i < 40; i++) {
new Thread(storage::consumer).start();
}
}
}
举报

相关推荐

0 条评论