阻塞队列——生产者消费者模型

阅读 76

2022-04-03

目录

🍉 阻塞队列

🍉阻塞队列的功能

  • 线程安全:阻塞队列是我们多线程中的案列之一,既然属于多线程,那么一定和线程安全有关系,阻塞队列主要的功能就是能保证线程安全
  • 产生阻塞效果
    1.若队列为空,尝试出队列,就会出现阻塞,阻塞到队列不为空为止
    2 .若队列为满,尝试如队列也会出现堵塞,阻塞到队列不为满为止.
    基于上述问题,我们就可以实现一个典型的生产者和消费者模型

🍉生产者消费者模型

我们举个场景🌰 :
过年的时候,大家家里都会包饺子,一家三口都会齐上阵,为了高效率的包,一般都会分工,假如👨擀饺子皮,👦和👩包饺子,这样效率会大大提高,👨擀完饺子皮之后会把饺子皮放在盖帘上,然后👦和👩将盖帘子上的饺子皮包成饺子,这就是一个典型的生产者消费者模型
生产者:👨
消费者:👦和👩
这里的盖帘子我们把他叫做交易场所

生产者和消费者模型是实际开发中非常有用的一种多线程开发手段,尤其是在服务器开发中
假设有两个服务器A,B A作为入口服务器作为直接接受用户的网络请求,B作为应用服务器,来给A提供一些数据

🍉生产者消费者模型的优点

  1. 优点一:能够让多个服务器之间充分的解耦合
    请添加图片描述
    何为解耦合?
    打个比方:如果不使用生产者消费者模型,就像上面的图所示,这样的话A,B两个服务器耦合性是非常高的,在开发A是,需要充分的了解到B的接口是什么样的,开发B的时候也需要知道A是怎么调用的,一旦把B换成C,A是需要进行非常大的改动的,而且如果B挂了A也顺带着挂了,显然在计算机中这样的代码是非常不可取的,耦合性太高,在日常开发中我也需要尽量做到高内聚,低耦合

如果我们引进生产者消费者模型,就解决了这个问题

如图请添加图片描述

A和B分别和阻塞队列进行交互,A不需要认识B,B也不需要认识A,队列是不会变的 B挂了,A也没啥影响,把B换成C,A也完全感知不到,这就达到了我们所说的低耦合的理念

  1. 优点二:能够对请求进行削峰填谷

请添加图片描述

如图,A作为入口服务器,计算量很轻,即使请求暴涨,A服务器也不会奔溃,但是B作为响应服务器,计算量很大,所需要的资源也很多,如果请求量暴涨,B就会挂掉
但是如果引进阻塞队列的话,就会有所改变
请添加图片描述
引进阻塞队列后就会起到削峰填谷的效果

  1. 削峰
  1. 填谷

以上就是削峰填谷,其实这样的功能主要是为了保护像B这样的响应/应用服务器,保证程序不会因为数据的暴涨而崩溃

在现实生活中也有这样的🌰 :三峡大坝,不懂得小伙伴可以去了解一下

🥝阻塞队列的实现

🥝 Java中标准库的阻塞队列

我们先根据Java中内置的阻塞队列,基于他来实现一个生产者消费者模型,再来实现自己创建的阻塞队列

 public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
        blockingQueue.put("hello");
        blockingQueue.take();
    }

这就是一个简单的阻塞队列

🥝自己实现一个阻塞队列

  1. 先实现一个简单的队列

下面是代码的实现

class MyBlockingQueue{
    private int[] array = new int[1_0000];
    private int head = 0;
    private int end = 0;
    private int size = 0;
     public void put(int value) throws InterruptedException {
        if(size == array.length){
            return;
        }
        array[end] = value;
        end++;
        //需要处理end下表到达数组末尾的情况
        if(end >= array.length){
            end = 0;
        }
        size++;
        
    }
     public Integer take() throws InterruptedException {
        if(size == 0){
           return null;
        }
        int ret = array[head];
        head++;
        if(head>=array.length){
            head=0;
        }
        size--;
        
        return ret;
    }

}
  1. 保证阻塞队列的安全
    从上面的代码我可以看出,takeput方法每一行代码都在对公共变量进行操作,所以我们直接对这两个方法进行加锁即可
class MyBlockingQueue{
    private int[] array = new int[1_0000];
    private int head = 0;
    private int end = 0;
    private int size = 0;
    synchronized public void put(int value) throws InterruptedException {
        if(size == array.length){
            
        }
        array[end] = value;
        end++;
        //需要处理end下表到达数组末尾的情况
        if(end >= array.length){
            end = 0;
        }
        size++;
        
    }
    synchronized public Integer take() throws InterruptedException {
        if(size == 0){
            
        }
        int ret = array[head];
        head++;
        if(head>=array.length){
            head=0;
        }
        size--;
        
        return ret;
    }

}
  1. 实现堵塞效果

关键使用wait和notify关键字

class MyBlockingQueue{
    private int[] array = new int[1_0000];
    private int head = 0;
    private int end = 0;
    private int size = 0;
    synchronized public void put(int value) throws InterruptedException {
        if(size == array.length){
            this.wait();
        }
        array[end] = value;
        end++;
        //需要处理end下表到达数组末尾的情况
        if(end >= array.length){
            end = 0;
        }
        size++;
        this.notify();
    }
    synchronized public Integer take() throws InterruptedException {
        if(size == 0){
            this.wait();
        }
        int ret = array[head];
        head++;
        if(head>=array.length){
            head=0;
        }
        size--;
        this.notify();
        return ret;
    }

}

这样的一个简单的堵塞队列就完成了
‘下面创建一个实例来感受一下这样的阻塞队列

🌰:`

public class TestDemo3 {
    private static MyBlockingQueue myBlockingQueue = new MyBlockingQueue();

    public static void main1(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
        blockingQueue.put("hello");
        blockingQueue.take();
    }
    public static void main(String[] args) {
        Thread producer = new Thread(()->{
            int num = 0;
            while(true){
                try {
                    System.out.println("生产了"+num);
                     myBlockingQueue.put(num);
                    num++;
                    //Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        });
        producer.start();
        Thread customer = new Thread(()->{
            while(true){
                try {
                    int num = myBlockingQueue.take();
                    System.out.println("消费了"+num);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();

    }
}

运行结果请添加图片描述
以上就是阻塞队列的相关基础知识
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋

精彩评论(0)

0 0 举报