0
点赞
收藏
分享

微信扫一扫

【C/C++】C语言多线程开发 —— 线程池

菜菜捞捞 2022-03-14 阅读 44

H i , a l l   I   a m   c o m i n g ! Hi, all\ I\ am\ coming! Hi,all I am coming!

重新找到了半夜写 C S D N CSDN CSDN 的感觉

C语言到了进阶路上之后, 就需要面对多线程开发了

以下是我在 八 股 八股 的过程中, 准备的多线程的优点

  • 提高 CPU 的利用率
  • 线程的创建开销远比进程要小很多, 所以切换时保存上下文的速度会快很多

同时多线程的缺点

  • 稳定性低, 不可靠, 某个线程的瘫痪、可能影响整个进程
  • 安全性要求高, 由于对同一个进程的资源进行访问, 那么必然会涉及到锁的开销
  • 缓存行对齐 (局部性原理)

众所周知, 对于每一个进程来说32位操作系统虚拟地址 3 G, 每个线程创建需要 8MB, 32位系统上理论上可以创建 3G/8MB 个线程, 但实际上远比这小很多(一个进程又不是全是线程在支撑, 还有进程所包含的资源)

如果此时假设需要输出 1000000000000 个数字, 如果用一个线程可能执行需要很久, 两个时间就会缩短一些, 同样的越多的线程时间肯定会越短, 但是并不是线程越多越好, 理论上来说, 除了创建线程的数量有上限以外, 同时还包括锁的开销, 多个线程竞争资源的时候开销很大

所以需要一个线程池, 线程池的作用:

  • 起到缓冲作用, 在线程 与 任务分配之间起到缓冲作用
  • 提升复用性, 减少线程创建的开销

具体代码如下


#include <pthread.h>
#include <string.h>
#include <memory.h>
#include <stdio.h>
#include <stdlib.h> 
// 增加
#define LIST_ADD(item, list) do {\
	item -> prev = NULL;	\
	item -> next = list;	\
	if (list) {				\
		list -> prev = item;\
	}						\
	list = item;			\
} while(0)

// 删除
#define LIST_DELETE(item, list) do {\
	if (item -> prev) {\
		item -> prev -> next = item -> next;\
	}\
	if (item -> next) {\
		item -> next -> prev = item ->prev;\
	}\
	if (item == list) {\
		list = item -> next;\
	}\
	item -> prev = item -> next = nullptr;\
} while(0)

struct THREAD_POOL_WOKER {

	pthread_t thread;
	
	struct THREAD_MANAGER *pool;
	
	int terminate; // 停止标识符

	struct THREAD_POOL_WOKER* next;
	struct THREAD_POOL_WOKER* prev;
};

struct THREAD_POOL_JOB {
	void (*func)(pthread_t &thread, void *user_data);
	void* user_data;
	
	struct THREAD_POOL_JOB* next;
	struct THREAD_POOL_JOB* prev;
};

struct THREAD_MANAGER {
	struct THREAD_POOL_WOKER* workers;
	struct THREAD_POOL_JOB* jobs;
	pthread_cond_t jobs_cond;
	pthread_mutex_t jobs_mutex;	
};

static void* callback(void* arg) {

	struct THREAD_POOL_WOKER *worker = (struct THREAD_POOL_WOKER*)arg;

	while (1) {
		// enter
		pthread_mutex_lock(&worker->pool->jobs_mutex);

		while (worker->pool->jobs == nullptr) {
			if (worker->terminate) {
				break;
			}
			pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
		}

		if (worker->terminate) {
			pthread_mutex_unlock(&worker->pool->jobs_mutex);
			break;
		}

		struct THREAD_POOL_JOB *job = worker->pool->jobs;

		LIST_DELETE(job, worker->pool->jobs);

		pthread_mutex_unlock(&worker->pool->jobs_mutex);

		job->func(worker->thread, job->user_data);
	}

	free(worker);

	pthread_exit(NULL);

}

int THreadPoolCreate(struct THREAD_MANAGER *pool, int workNumbers) {
	if (workNumbers < 1) {
		workNumbers = 1;
	}
	if (pool == NULL) {
		return -1;
	}

	memset(pool, 0, sizeof(THREAD_MANAGER));

	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));


	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
 
	for (int i = 0; i < workNumbers; i++) {
		struct THREAD_POOL_WOKER *worker = (struct THREAD_POOL_WOKER*)malloc(sizeof(THREAD_POOL_WOKER));
		if (worker == NULL) {
			printf("details : malloc error\n");
			return -1;
		}
		memset(worker, 0, sizeof(struct THREAD_POOL_WOKER));

		int ret = pthread_create(&worker->thread, NULL, callback, worker);

		if (ret) {
			printf("details : thread error\n");
			return -2;
		}

		// work 
		worker->pool = pool;

		LIST_ADD(worker, pool->workers);
	}

	return 0;

}

void THreadPoolDestory(struct THREAD_MANAGER* pool) {
	struct THREAD_POOL_WOKER *work = nullptr;

	for (work = pool->workers; work != nullptr; work = work->next) {
		work->terminate = 1;
	}

	pthread_mutex_lock(&pool->jobs_mutex);
	pthread_cond_broadcast(&pool->jobs_cond);
	pthread_mutex_unlock(&pool->jobs_mutex);
}


void THreadPoolPushJob(struct THREAD_MANAGER* pool, struct THREAD_POOL_JOB* job) {
	
	pthread_mutex_lock(&pool->jobs_mutex);
	// 临界区
	LIST_ADD(job, pool->jobs);
	
	pthread_cond_signal(&pool->jobs_cond);

	pthread_mutex_unlock(&pool->jobs_mutex);
}

typedef THREAD_MANAGER THREAD_POOL;


void write(pthread_t &threadID ,void *user_data) {
	int* x = (int*)user_data;
	printf("thread id is : %u print(%d)\n", threadID, *x);
}

int a[20];

int main() {

	THREAD_POOL* threadPool = new THREAD_POOL();

	// 10个线程
	int rev = THreadPoolCreate(threadPool, 10);

	if (rev) {
		//创建失败
		exit(0);
	}

	for (int i = 0; i < 20; i++) {
		
		a[i] = i;

		THREAD_POOL_JOB* job = new THREAD_POOL_JOB();
		job->func = &write;
		job->user_data = (void*)(a+i);
		THreadPoolPushJob(threadPool, job);
	}



	return 0;
}

运行截图:

在这里插入图片描述

举报

相关推荐

0 条评论