0
点赞
收藏
分享

微信扫一扫

一起来写web server 05 -- 多线程进阶版本

ITWYY 2021-09-27 阅读 46

这个版本的web server比第4版稍微做了一点改进,那就是由主线程统一接收连接,然后连接的处理由子线程来完成.因此,这里就引入了条件变量以及同步互斥的问题.

同步机制

muduo库中有一个关于同步机制的封装,我这里就直接采用了.我这里来介绍一下这个封装吧.

下面是Conditon这个类的代码:

class Condition : noncopyable
{
    private:
        MutexLock& mutex_; /* 之前的锁的一个引用 */
        pthread_cond_t pcond_; /* 系统定义的条件变量的类型 */
        ... ...
}

这个类的构造函数用于初始化同步变量:

explicit Condition(MutexLock& mutex)
        : mutex_(mutex)
    {
        pthread_cond_init(&pcond_, NULL); /* 初始化同步变量 */
    }

析构函数就销毁掉同步变量:

~Condition()
    {
        pthread_cond_destroy(&pcond_); /* 销毁条件变量 */
    }

等待某个条件:

void wait()
    {
        MutexLock::UnassignGuard ug(mutex_);
        pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); /* 等待Mutex */
    }

通知单个线程:

void notify()
    {
        pthread_cond_signal(&pcond_); /* 唤醒一个线程 */
    }
  1. 必须与mutex一起使用,该布尔表达式的读写需受此mutex保护.
  2. mutex已经上锁的时候才能调用wait().
  3. 把判断布尔条件和wait()放到while循环中.
    写成代码是这个样子的:
MutexLock mutex;
Condition cond(mutex);
std::deque<int> queue;

int dequeue() {
    MutexLockGuard lock(mutex); /* 加锁 */
    while (queue.empty()) {
        cond.wait(); 
    }
    assert(!queue.empty());
    int top = queue.front();
    queue.pop_front();
    return top;
}
  1. 不一定要在mutex已经上锁的情况下调用signal(理论上).
  2. signal之前一般要修改布尔表达式.
  3. 修改布尔表达式通常要用mutex保护.
  4. 注意区分signalbroadcast:"broadcast"通常用于表明状态变化,而signal表示资源可用.
    写成代码是:
void enqueue(int x) 
{
    MutexLockGuard lock(mutex); // 加锁
    queue.push_back(x);
    cond.signal(); // 可以移出临界区之外
}

以上引自linux多线程服务端编程.

我来谈一下我的理解:

cond中之所以需要mutex,是因为在执行到

while (condition) {
 cond.wait();
}

时,需要将cond中持有的mutex解锁.一旦接收到signal,它需要重新抢夺这个mutex,抢到了,才能从wait函数中返回.

为什么cond.wait()要放入while循环中呢?一方面是因为spurious wakeup,之所以会有这个东西,是速度的考量,一般来说,即使没有spurious wakeup,你也要这么写代码,举个栗子.

在生产者消费者模型之中,消费者1获得锁,发现queue为空,wait,消费者2获得锁,发现queue为空,wait,生产者3获得锁,将生产的产品放入queue,调用signal,并且释放了mutex,t1,t2被唤醒,可以预见的是,这两者只会有一个获得锁,消费完这个产品,然后另一个获得锁,发现为空,还是得继续等待,这就是while的由来,当然,至于signal为什么会唤醒多个线程,man手册上就是这么说的.

我们的代码

```cpp
/*-
* 线程池的加强版本.主要是主线程统一接收连接,其余都是工作者线程,这里的布局非常类似于一个生产者.
* 多个消费者.
*/

#define MAXNCLI 100

MutexLock mutex; /* 全局的锁 */
Condition cond(mutex); /* 全局的条件变量 */
int clifd[MAXNCLI], iget, iput;

int main(int argc, char *argv[])
{
    int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
    signal(SIGPIPE, SIG_IGN);
    pthread_t tids[10];
    void* thread_main(void *);

    for (int i = 0; i < 10; ++i) {
        int *arg = (int *)Malloc(sizeof(int));
        *arg = i;
        Pthread_create(&tids[i], NULL, thread_main, (void *)arg);
    }
    struct sockaddr cliaddr; /* 用于存储对方的ip信息 */
    socklen_t clilen;
    for (; ; ) {
        int connfd = Accept(listenfd, &cliaddr, &clilen);
        {
            MutexLockGuard lock(mutex); /* 加锁 */
            clifd[iput] = connfd; /* 涉及到对共享变量的修改,要加锁 */
            if (++iput == MAXNCLI) iput = 0;
            if (iput == iget) unix_error("clifd is not big enough!\n");
        }
        cond.notify(); /* 通知一个线程有数据啦! */
    }
    return 0;
}

线程的代码是这样的:

void*
thread_main(void *arg)
{
    int connfd;
    printf("thread %d starting\n", *(int *)arg);
    Free(arg);
    for ( ; ;) {
        {
            MutexLockGuard lock(mutex); /* 加锁 */
            while (iget == iput) { /* 没有新的连接到来 */
                /*-
                * 代码必须用while循环来等待条件变量,原因是spurious wakeup
                */
                cond.wait(); /* 这一步会原子地unlock mutex并进入等待,wait执行完毕会自动重新加锁 */
            }
            connfd = clifd[iget]; /* 获得连接套接字 */
            if (++iget == MAXNCLI) iget = 0;
        }
        doit(connfd);
        close(connfd);
    }
}

总结

这个版本在原来的版本上增加了同步互斥操作,在某种程度上增加了难度.

具体代码还是看这里吧!:https://github.com/lishuhuakai/Spweb

举报

相关推荐

0 条评论