生产者消费者问题

阅读 92

2022-04-17

生产者消费者是极其经典的并发同步模型,描述了在共享固定大小缓冲区下,生产者生产一定量数据放入缓冲区,而消费者则从缓冲区消费取出一定量数据。

semaphore 实现

semaphore(信号量)是一个变量或者抽象数据类型,被用于控制在并发系统临界区问题多个线程对公共资源的访问。与自旋锁、栅栏一起作为同步手段。一个普通的信号量是单纯取决于程序员定义条件而改变的变量。

可以将信号量看作是特定资源的数量记录,并耦合调整该记录的安全操作。

生产者消费者的有限缓冲实际上就可以看作是特定资源,因此可以使用信号量来记录有限缓冲的数量。

以下是c++版本的实现,当然为了控制对buffer写操作时可能碰到的多个生产者消费者同时取放数据导致的写冲突,也在其中加了互斥锁

#include <thread>
#include <mutex>
#include <semaphore>

std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;

void producer(){
    for(;;) {
        Portion portion = produce_next_portion();
        number_of_empty_positions.acquire();
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            add_portion_to_buffer(portion);
        }
        number_of_queueing_portions.release();
    }
}

void consumer(){
    for(;;) {
        number_of_queueing_portions.acquire();
        Portion portion;
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            portion = take_portion_from_buffer();
        }
        number_of_empty_positions.release();
        process_portion_taken(portion);
    }
}

int main(int argc, char const *argv[])
{
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}

monitor实现

monitor(管程)是允许线程具有互斥、等待(堵塞)某个条件为false的能力的抽象数据结构。还具有通知其他线程他们特定条件已经满足的机制,以及让他们暂时放弃独占访问,以便等待某些条件满足,然后重新获取独占访问并恢复他们的任务。

管程由互斥锁以及特定条件变量组成。条件变量本质上是等待特定条件的线程的容器

class Bounded_buffer{
    Portion buffer[N];
    unsigned head, tail;
    unsigned count;
    std::condition_variable nonempty, nonfull;
    std::mutex mtx;

public:
    void append(Portion x){
        std::unique_lock<std::mutex> lck(mtx);
        nonfull.wait(lck, [&]{return !(N==count);});
        assert(0<=count && count < N)
        buffer[tail++]=x;
        tail%=N;
        ++count;
        nonempty.notify_one();
    }

    Portion remove() {
        std::unique_lock<std::mutex> lck(mtx);
        nonempty.wait(lck,[&]{return !(0==count);});
        assert(0 < count && count <= N)
        Portion x = buffer[head++];
        head %= N;
        --count;
        nonfull.notify_one();
        return x;
    }

    Bounded_buffer() {
        head = 0; tail = 0; count = 0;
    }
}

channel实现

channel是一种通过消息传递实现进程间通信和同步的模型。消息可以通过通道发送,而另一个进程或线程能够接收通过它引用channel发送的消息,比如流。通道不同实现可以被缓冲,也可以不被缓冲,可以是异步或者同步的。

channel实现在golang中是极为优雅的

var element = 0

func produce() int {
	element++
	return element
}

func consume(e int) {
	// consume element
}

const (
	producerCount = 2
	consumerCount = 2
	bufferSize = 1
)

func main() {
	ch:=make(chan int, bufferSize)
	for i := 0; i < producerCount; i++ {
		go func() {
			ch<-produce()
		}()
	}

	for j := 0; j < consumerCount; j++ {
		go func() {
			consume(<-ch)
		}()
	}
  
  // assume main will wait other goroutines
}

无semaphore、monitors实现

在单生产者和消费者的时候,可以定义一个容量为b(b>=1)的buffer。使k为大于b的常数并为b的倍数,s、r为0到k-1之间的整数。

在初始化时s=r且buffer为空。生产者放入消息到buffer[s mod b],消费者取出消息buffer[r mod b]

当然还要考虑到在调度器切换时,可能一个线程读取了变量值,切换到第二个线程更改了该值,再切回来,那么第一个线程将使用旧值,而不是当前值,因此需要通过原子操作来解决这个问题

enum {
    N = 4
};
Message buffer[N];
std::atomic<unsigned> count{0};

void producer() {
    unsigned tail{0};
    for (;;) {
        Message msg = produceMessage();
        while (N == count);
        buffer[tail++] = msg;
        tail%=N;
        count.fetch_add(1,std::memory_order_relaxed);
    }
}

void consumer(){
    unsigned head {0};
    for(;;){
        while (count==0);
        Message msg = buffer[head++];
        head%=N;
        count.fetch_sub(1,std::memory_order_relaxed);
        consumeMessage(msg);
    }
}

Ref

  1. https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
  2. https://en.wikipedia.org/wiki/Monitor_(synchronization)
  3. https://en.wikipedia.org/wiki/Semaphore
  4. https://en.wikipedia.org/wiki/Channel_(programming)

精彩评论(0)

0 0 举报