@TOC
生产者消费者模型
什么是阻塞队列
阻塞队列 与 普通队列 的特性一样的:先进先出。
但是呢,相比于普通队列,阻塞队列也有着一些其它方面的功能!!!
-
线程安全
-
产生阻塞效果
2.1 如果队列为空,尝试出队一个元素,就会触发阻塞,一直阻塞到队列不为空为止。
2.2 如果队列为满,尝试入队一个元素,就会触发阻塞,一直阻塞到队列不为满为止。基于上述特性,就可以实现“生产者消费者模型”。生产者消费者模型 是日常开发中,处理多线程问题的一个典型方式。
举个例子:
过年,吃饺子
既然吃饺子,就需要包饺子这件事。
而包出一个完美的饺子这件事很麻烦。
【和面,擀饺子皮,包饺子,煮/蒸。大概是这么一个流程,其中细节是非常多的】
如果数量非常多,就需要多人分工进行协作。
其中 和面 和 煮饺子 不太好进行分工。【一般和面是一个人负责,煮饺子也是一个人】
和面这件事,一坨面一起和。没有说拆成两个部分来和面的。那样口感就不一样了。
煮饺子,那就更简单了,一个人拿着勺子,等到煮熟了,直接捞起来就行了。
擀饺子皮 和 包饺子 就比较好分工了。
毕竟面皮是一张一张擀出来了,饺子也是一个一个包的。
我们主要考虑擀面皮 和 包饺子的过程。假设 现有 A、B、C 三个人一起来擀饺子皮 + 包饺子。
协作方式1:
A、B、C 分别每个人都是先擀一张皮,然后再包一个饺子。
这种方式肯定是有效率的,毕竟三个人一起擀面皮和包饺子。肯定是比一个人要快的。
但是这种方式存在一定的问题,锁冲突比较激烈.
注意!擀饺子皮,需要一个重要的道具 “擀面杖”

问题就出在这里!擀面杖这个东西,一般只会买一个。
那么,如果此时A、B、C 三个都来擀面皮,故 三个人中,只能有一个人可以使用擀面杖,同时其他两个人,就需要等待,等待这个人使用完,让出来。然后,另外两个人就会出现竞争。
所以这个时候就会出现一系列的阻塞等待。
协作方式2:
A专门负责擀饺子皮,B和C专门负责包饺子。
因为 擀饺子皮的人,现在只有一个人
所以没有人跟他抢擀面杖(也就不会有锁的竞争了,同时也不会有阻塞等待的情况发生)
此时,A就是饺子皮的生产者,要不断的生成一个些饺子皮。
B和C就是饺子皮的消费者,他们需要不断的 使用/消耗 饺子皮。
这种就是生产者消费者模型。
在这个模型中,既有生产者负责生产数据,消费者负责使用数据。
那么,生产者 和 消费者之间,需要有一个“桥梁” 来去进行沟通交互。
我们将 “桥梁” 称其为 “交易场所”。
放在 饺子 事件中,“交易场所” 就相当于 用来放饺子的那个 盖板。
A将生产出来的饺子皮放在盖板上,B、C消耗的饺子皮,要从盖板上面拿。
得有这样的一个空间来存放饺子皮,得有这样的一个空间来存储需要使用的数据。
这就是“交易场所”。
阻塞队列 就可以作为 生产者消费者模型 中的 “交易场所”。生产者消费者模型,是实际开发中非常有用的一种多线程开发手段!!!尤其是在服务器开发的场景中。
假设:
有两个服务器A、B
- A 作为入口服务器直接接收用户的网络请求。
- B 作为应用服务器,来给A提供一些数据。
情况一: 直接发送

阻塞队列在开发中运用
情况二: 使用阻塞队列

JAVA 中阻塞队列的主要用法
Java中内置阻塞队列,讲一下常用的入队和出队方法。
-
入队put (阻塞) offer(无阻塞) 出队take(阻塞) poll(无阻塞)

模拟实现阻塞队列
-
先实现一个
普通队列 - 加上
线程安全 - 加上
阻塞功能
因此 阻塞队列 是可以基于链表,也可以基于数组来实现
但是基于数组来实现阻塞队列更简单,所以写一个数组版本的阻塞队列
数组实现队列的重点就在于 循环队列

所以
- >
先实现一个循环队列
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
// 入队列
public void put(int value){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
return;
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
}
// 出队列
public Integer take(){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
return null;
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
return tmp;
}
}

- >
让队列在支持线程安全保证多线程环境下,调用这里的put 和 take 是没有问题的。
使用加锁操作 synchronized

实现阻塞
关键要点:使用 wait 和 notify机制。

注意:
如果这里有三个线程都是使用的同一个 锁对象, notify 是 不可能实现精准唤醒 指定 wait 的。
notify 只能唤醒随机的一个等待的线程,不能做到精准。
如果想要精准,就必须使用不同的 锁对象。
想精准唤醒 t1,就必须专门为它创建一个锁对象 locker1,让t1 调用 locker1.wait。再对其进行 locker1.notify 才能唤醒
想精准唤醒 t2,就必须专门为它创建一个锁对象 locker2,让t2 调用 locker2.wait。再对其进行 locker2.notify 才能唤醒.
这样才能达到精准唤醒的效果。
代码:
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
private Object locker = new Object();// 专门的锁对象
// 入队列
public void put(int value) throws InterruptedException {
synchronized(locker){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
//return;
locker.wait();
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
locker.notify();
}
}
// 出队列
public Integer take() throws InterruptedException {
synchronized(locker){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
// return null;
locker.wait();
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
// 在 take成功之后,唤醒put中的等待。
locker.notify();
return tmp;
}
}
}
利用 阻塞队列 来构造一个 生产者和消费者模型
我通过构造两个线程,来实现一个简易的消费者生产者模型。
public class Test22 {
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
// 实现一个 生产者消费者模型
Thread producer = new Thread(()->{
int num = 0;
while (true){
try {
System.out.println("生产了" + num);
queue.put(num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while (true){
try {
int num = queue.take();
System.out.println("消费了"+num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}
当前的场景中,只有一个消费者 和 一个 生产者。
如果多个生产者 和 消费者,那我们就多创建线程就行了。
为了更好看到效果,我们在给这这个程序中的“生产者”加上一个sleep。
让它生产的慢一些,此时消费者就只能跟生产的步伐走。
生产者生成一个,消费者就消费一个。

下面我们来看一下执行效果。【生产的速度 没有消费速度快】

我们再来将sleep代码的位置换到 消费者 代码中。
此时就是消费速度 没有生产速度快。
来看下面的效果

完整的生产者消费者模型 + 阻塞队列
下面这个程序,是 生产速度非常,消费速度很慢。
取决于你给谁加上sleep
class MyBlockingQueue{
// 保存数据的本体
private int[] data = new int[1000];
// 有效元素个数
private int usedSize;
// 队头下标位置
private int head;
// 队尾下标位置
private int rear;
private Object locker = new Object();// 专门的锁对象
// 入队列
public void put(int value) throws InterruptedException {
synchronized(locker){
if(usedSize == this.data.length){
// 如果队列满了,暂时先返回。
//return;
locker.wait();
}
data[rear++] = value;
//处理 rear 到达数组末尾的情况。
if(rear >= data.length){
rear = 0;
}
usedSize++;// 入队成功,元素个数加一。
locker.notify();
}
}
// 出队列
public Integer take() throws InterruptedException {
synchronized(locker){
if(usedSize == 0){
// 如果队列为空,就返回一个 非法值
// return null;
locker.wait();
}
int tmp = data[head];
head++;
if(head == data.length){
head = 0;
}
usedSize--;
// 在 take成功之后,唤醒put中的等待。
locker.notify();
return tmp;
}
}
}
public class Test22 {
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
// 实现一个 生产者消费者模型
Thread producer = new Thread(()->{
int num = 0;
while (true){
try {
System.out.println("生产了" + num);
queue.put(num);
num++;
// Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while (true){
try {
int num = queue.take();
System.out.println("消费了"+num);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}









