线程池实现C++版本

阅读 74

2022-02-06

学习了c++,把线程池改成c++试试

开始

C++版本的线程池是根据C语言版本修改来的,为了熟悉一下C++的相关语法,所以就有了这篇笔记。记录一下修改的过程中遇到的问题。

本线程池应该是基于对象的,并不是面向对象

[线程池简单实现(基于C语言)](Linux C线程池简单实现 | Blog (ethereal14.github.io))

  • 也不知道会不会有内存泄露的问题,这些事以后再说

文件说明

文件名作用
threadpool.cpp线程池成员函数具体实现
threadpool.h线程池对象的定义、各种枚举类型
main.cpp测试用例
CMakeLists.txt使用cmake组织的工程

线程池对象设计

基本就是把原来的结构体改成现在的class,把变量定义在private里。把工作线程定义为静态成员函数

class threadpool
{
private:
    pthread_mutex_t m_lock;
    pthread_cond_t m_notify;
    pthread_t *m_threads;
    threadpool_task *m_queue;

    int m_thread_count;
    int m_queue_size;
    int m_head;
    int m_tail;
    int m_count;
    int m_shutdown;
    int m_started;

private:
    static void *threadpool_thread(void *arg);

public:
    threadpool(int thread_count, int queue_size, int flags);
    ~threadpool();

    int threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag);
    int threadpoolexit(threadpool *pool, int flag);
};

任务队列对象

如上,仅仅把struct改为class

class threadpool_task
{
public:
    void (*function)(void *);
    void *argument;
    threadpool_task();
    ~threadpool_task();
};

各种枚举类型

  • 不知道能不能把这些枚举类型放在class里边,以后有空研究
typedef enum
{
    threadpool_invalid = -1,
    threadpool_lock_failure = -2,
    threadpool_queue_full = -3,
    threadpool_shutdown = -4,
    threadpool_thread_failure = -5
} threadpool_terror_t;

typedef enum
{
    immediate_shutdown = 1, //自动关闭
    graceful_shutdown = 2   //立即关闭
} threadpool_shutdown_t;

typedef enum
{
    threadpool_graceful = 1
} threadpool_destroy_flags_t;

线程池构造函数

  • 这里遇到个问题:构造函数是没有返回值的,所以不能把原来的创建线程池函数照搬过来。
    • 使用do while结构来进行构造
threadpool::threadpool(int thread_count = 32, int queue_size = 256, int flags = 0) : m_queue_size(queue_size)
{
    do
    {
        if (thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE)
        {
            std::cout << "argument eeror" << std::endl;
            break;
        }

        this->m_thread_count = 0;
        //this->m_queue_size = queue_size;
        this->m_head = this->m_tail = this->m_count = 0;
        this->m_shutdown = this->m_started = 0;

        this->m_threads = new pthread_t[thread_count];
        this->m_queue = new threadpool_task[queue_size];

        if ((pthread_mutex_init(&this->m_lock, nullptr) != 0) ||
            (pthread_cond_init(&this->m_notify, NULL) != 0) ||
            (this->m_threads == nullptr) || (this->m_queue == nullptr))
        {
            std::cout << "init eeror" << std::endl;
            break;
        }

        for (size_t i = 0; i < thread_count; i++)
        {
            if (pthread_create(&this->m_threads[i], nullptr, threadpool_thread, this) != 0)
            {
                std::cout << "create " << i << " thread error" << std::endl;
                break;
            }
            this->m_thread_count++;
            this->m_started++;
        }

    } while (0);
}

线程池析构函数

析构函数把一些资源释放了就行

threadpool::~threadpool()
{
    if (this == NULL || this->m_started > 0)
    {
        exit(-1);
    }
    if (this->m_threads)
    {
        delete[] this->m_threads;
        delete[] this->m_queue;

        pthread_mutex_lock(&(this->m_lock));
        pthread_mutex_destroy(&(this->m_lock));
        pthread_cond_destroy(&(this->m_notify));
    }
    //delete[] this;
}

以下函数基本没啥变化

添加任务函数

int threadpool::threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag)
{
    int err = 0, next;

    if (pool == nullptr || function == nullptr)
        return threadpool_invalid;
    if (pthread_mutex_lock(&(pool->m_lock)) != 0)
    {
        return threadpool_lock_failure;
    }

    next = pool->m_tail + 1;

    next = (next == pool->m_queue_size) ? 0 : next;

    do
    {
        if (pool->m_count == pool->m_queue_size)
        {
            err = threadpool_queue_full;
            break;
        }
        pool->m_queue[pool->m_tail].function = function;
        pool->m_queue[pool->m_tail].argument = argument;

        pool->m_tail = next;
        pool->m_count++;

        if (pthread_cond_signal(&(pool->m_notify)) != 0)
        {
            err = threadpool_lock_failure;
            break;
        }

    } while (0);

    if (pthread_mutex_unlock(&(pool->m_lock)) != 0)
    {
        err = threadpool_lock_failure;
    }

    return err;
}

线程退出函数

int threadpool::threadpoolexit(threadpool *pool, int flag)
{
    int i, err = 0;

    if (pool == nullptr)
        return threadpool_invalid;

    if (pthread_mutex_lock(&(pool->m_lock)) != 0)
        return threadpool_lock_failure;

    do
    {
        if (pool->m_shutdown)
        {
            err = threadpool_shutdown;
            break;
        }
        pool->m_shutdown = (flag & threadpool_graceful) ? graceful_shutdown : immediate_shutdown;

        if ((pthread_cond_broadcast(&(pool->m_notify)) != 0) || (pthread_mutex_unlock(&(pool->m_lock)) != 0))
        {
            err = threadpool_lock_failure;
            break;
        }

        for (size_t i = 0; i < pool->m_thread_count; i++)
        {
            if (pthread_join(pool->m_threads[i], nullptr) != 0)
                err = threadpool_thread_failure;
        }

    } while (0);

    return err;
}

工作线程函数

void *threadpool::threadpool_thread(void *arg)
{
    threadpool *pool = static_cast<threadpool *>(arg);
    threadpool_task task;

    for (;;)
    {
        pthread_mutex_lock(&(pool->m_lock));
        while ((pool->m_count == 0) && (!pool->m_shutdown))
        {
            pthread_cond_wait(&(pool->m_notify), &(pool->m_lock));
        }
        if ((pool->m_shutdown == immediate_shutdown) || ((pool->m_shutdown == graceful_shutdown) && (pool->m_count == 0)))
        {
            break;
        }

        task.function = pool->m_queue[pool->m_head].function;
        task.argument = pool->m_queue[pool->m_head].argument;

        pool->m_head++;
        pool->m_head = (pool->m_head == pool->m_queue_size) ? 0 : pool->m_head;
        pool->m_count--;

        pthread_mutex_unlock(&(pool->m_lock));

        // (*(task.function))(task.argument);

        task.function(task.argument);
    }

    pool->m_started--;
    pthread_mutex_unlock(&(pool->m_lock));
    pthread_exit(nullptr);
    return nullptr;
}

最后的测试用例

#include "threadpool.h"
#include <iostream>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>

#define THREAD 32
#define QUEUE 256

int tasks = 0, done = 0;
pthread_mutex_t lock;

void dumy_task(void *arg)
{
    usleep(10000);
    pthread_mutex_lock(&lock);

    done++;
    pthread_mutex_unlock(&lock);
}

int main(int, char **)
{
    threadpool *pool = new threadpool(THREAD, QUEUE, 0);

    pthread_mutex_init(&lock, nullptr);

    std::cout << "pool started with " << THREAD << " threads and queue size of " << QUEUE << std::endl;

    while ((pool->threadpool_add(pool, &dumy_task, nullptr, 0)) == 0)
    {
        pthread_mutex_lock(&lock);
        tasks++;
        pthread_mutex_unlock(&lock);
    }
    std::cout << "add " << tasks << " tasks" << std::endl;

    while ((tasks / 2) > done)
    {
        usleep(10000);
    }
    //pool->threadpoolexit(pool, 0) == 0;
    assert(pool->threadpoolexit(pool, 0) == 0);
    std::cout << "did " << done << " tasks" << std::endl;

    delete[] pool;

    return 0;
}

CMakeLists文件

cmake_minimum_required(VERSION 3.0.0)
project(threadpool_cpp VERSION 0.1.0)

include(CTest)
enable_testing()

add_executable(threadpool_cpp main.cpp threadpool.cpp threadpool.h)

find_package(Threads)
target_link_libraries(threadpool_cpp ${CMAKE_THREAD_LIBS_INIT})

set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
include(CPack)

测试效果图

在这里插入图片描述

精彩评论(0)

0 0 举报