假设完成一项任务需要的时间=创建线程时间T1+线程执行任务时间T2+销毁线程时间T3,如果T1+T3的时间远大于T2,通常就可以考虑采取线程池来提高服务器的性能
thread pool就是线程的一种使用模式,一个线程池中维护着多个线程等待接收管理者分配的可并发执行的任务。
将待处理的任务抽象成task结构:
typedef struct task { void* (*run)(void* args); // abstract a job function that need to run void* arg; // argument of the run function struct task* next; // point to the next task in task queue} task_t;threadpool中用first和last指针指向首尾两个任务task结构体保证每个task都能指向任务队列中下一个tasktypedef struct task { void* (*run)(void* args); // abstract a job function that need to run void* arg; // argument of the run function struct task* next; // point to the next task in task queue} task_t;typedef struct threadpool { condition_t ready; // condition & mutex task_t* first; // fist task in task queue task_t* last; // last task in task queue int counter; // total task number int idle; // idle task number int max_threads; // max task number int quit; // the quit flag} threadpool_t;设计了condition_t类来实现安全并发:
typedef struct condition { /** * 互斥锁 */ pthread_mutex_t pmutex; /** * 条件变量 */ pthread_cond_t pcond;} condition_t;提供对应的接口:
/** * 初始化 */int condition_init(condition_t* cond);/** * 加锁 */int condition_lock(condition_t* cond);/** * 解锁 */int condition_unlock(condition_t* cond);/** * 条件等待 * * pthread_cond_wait(cond, mutex)的功能有3个: * 1) 调用者线程首先释放mutex * 2) 然后阻塞, 等待被别的线程唤醒 * 3) 当调用者线程被唤醒后,调用者线程会再次获取mutex */int condition_wait(condition_t* cond);/** * 计时等待 */int condition_timedwait(condition_t* cond, const timespec* abstime);/** * 激活一个等待该条件的线程 * * 1) 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行 * 2) 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量 * 3) 最多只会给一个线程发信号,不会有「惊群现象」 * 4) 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程 * 5) 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后 */int condition_signal(condition_t *cond);/** * 唤醒所有等待线程 */int condition_broadcast(condition_t *cond);/** * 销毁 */int condition_destroy(condition_t *cond);仅仅是初始化了condition和mutex,还有一些线程池的属性。但是任务队列是空的,而且此时也一个线程都没有。
// initialize the thread poolvoid threadpool_init(threadpool_t* pool, int threads_num) { int n_status = condition_init(&pool ->ready); if (n_status == 0) { printf("Info: initialize the thread pool successfully!\n"); } else { printf("Error: initialize the thread pool failed, status:%d\n", n_status); } pool->first = NULL; pool->last = NULL; pool->counter = 0; pool->idle = 0; pool->max_threads = threads_num; pool->quit = 0;}首先构建task结构体,然后将其加入任务队列。
// add a task to thread poolvoid threadpool_add_task(threadpool_t* pool, void* (*run)(void *arg), void* arg) { // create a task task_t* new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t))); new_task->run = run; new_task->arg = arg; new_task->next = NULL; // lock the condition condition_lock(&pool->ready); // add the task to task queue if (pool->first == NULL) { pool->first = new_task; } else { // else add to the last task pool->last->next = new_task; } pool->last = new_task; /* * after you add a task to task queue, you need to allocate it to a thread: * (1)if idle thread num > 0: awake a idle thread * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task */ if (pool->idle > 0) { // awake a thread that wait for longest time condition_signal(&pool->ready); } else if (pool->counter < pool->max_threads) { // define a tid to get the thread identifier that we are going to create pthread_t tid; /* * pthread_create(): * (1)thread identifier * (2)set the pthread attribute * (3)the function that thread is going to run * (4)the args of run func * * A realistic limit of thread num is 200 to 400 threads * https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm */ pthread_create(&tid, NULL, thread_routine, pool); pool->counter++; } else { // when (idle == 0 & counter = max_threads), then wait printf("Warning: no idle thread, please wait...\n"); } condition_unlock(&pool->ready);}// when task queue is empty, then block 2 second to get the new task// If timeout, then destroy the threadwhile (pool->first == NULL && !pool->quit) { printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self()); // get the system time clock_gettime(CLOCK_REALTIME, &abs_name); abs_name.tv_sec += 2; int status; status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second if (status == ETIMEDOUT) { printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self()); timeout = true; break; }}...// if visit task queue timeout(means no task in queue), quit destory the threadif (timeout) { pool->counter--; condition_unlock(&pool->ready); break; // destroy the thread}// when the thread run the task, we should unlock the thread poolif (pool->first != NULL) { // get the task from task queue task_t* t = pool->first; pool->first = t->next; // unlock the thread pool to make other threads visit task queue condition_unlock(&pool->ready); // run the task run func t->run(t->arg); free(t); // lock condition_lock(&pool->ready);}// when task queue is clean and quit flag is 1, then destroy the threadif (pool->quit && pool->first == NULL) { pool->counter--; // 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成 if (pool->counter == 0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; // destroy the thread}condition.h:
#ifndef CONDITION_H_#define CONDITION_H_#include <pthread.h>#include <cstdio>typedef struct condition { /** * 互斥锁 */ pthread_mutex_t pmutex; /** * 条件变量 */ pthread_cond_t pcond;} condition_t;/** * 初始化 */int condition_init(condition_t* cond);/** * 加锁 */int condition_lock(condition_t* cond);/** * 解锁 */int condition_unlock(condition_t* cond);/** * 条件等待 * * pthread_cond_wait(cond, mutex)的功能有3个: * 1) 调用者线程首先释放mutex * 2) 然后阻塞, 等待被别的线程唤醒 * 3) 当调用者线程被唤醒后,调用者线程会再次获取mutex */int condition_wait(condition_t* cond);/** * 计时等待 */int condition_timedwait(condition_t* cond, const timespec* abstime);/** * 激活一个等待该条件的线程 * * 1) 作用: 发送一个信号给另外一个处于阻塞等待状态的线程, 使其脱离阻塞状态继续执行 * 2) 如果没有线程处在阻塞状态, 那么pthread_cond_signal也会成功返回, 所以需要判断下idle thread的数量 * 3) 最多只会给一个线程发信号,不会有「惊群现象」 * 4) 首先根据线程优先级的高低确定发送给哪个线程信号, 如果优先级相同则优先发给等待最久的线程 * 5) 重点: pthread_cond_wait必须放在lock和unlock之间, 因为他要根据共享变量的状态决定是否要等待; 但是pthread_cond_signal既可以放在lock和unlock之间,也可以放在lock和unlock之后 */int condition_signal(condition_t *cond);/** * 唤醒所有等待线程 */int condition_broadcast(condition_t *cond);/** * 销毁 */int condition_destroy(condition_t *cond);#endif // CONDITION_H_condition.cpp:
#include "condition.h"// 初始化int condition_init(condition_t* cond) { int status; status = pthread_mutex_init(&cond->pmutex, NULL); if (status != 0) { printf("Error: pthread_mutex_init failed, return value:%d\n", status); return status; } status = pthread_cond_init(&cond->pcond, NULL); if (status != 0) { printf("Error: pthread_cond_init failed, return value:%d\n", status); return status; } return 0;}// 加锁int condition_lock(condition_t* cond) { return pthread_mutex_lock(&cond->pmutex);}// 解锁int condition_unlock(condition_t* cond) { return pthread_mutex_unlock(&cond->pmutex);}// 条件等待int condition_wait(condition_t* cond) { return pthread_cond_wait(&cond->pcond, &cond->pmutex);}// 计时等待int condition_timedwait(condition_t* cond, const timespec* abstime) { return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);}// 激活一个等待该条件的线程int condition_signal(condition_t *cond) { return pthread_cond_signal(&cond->pcond);}// 唤醒所有等待线程int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond->pcond);}// 销毁int condition_destroy(condition_t *cond) { int status; status = pthread_mutex_destroy(&cond->pmutex); if (status != 0) { return status; } status = pthread_cond_destroy(&cond->pcond); if (status != 0) { return status; } return 0;}threadpool.h:
#ifndef THREAD_POLL_H_#define THREAD_POLL_H_#include "condition.h"typedef struct task { void* (*run)(void* args); // abstract a job function that need to run void* arg; // argument of the run function struct task* next; // point to the next task in task queue} task_t;typedef struct threadpool { condition_t ready; // condition & mutex task_t* first; // fist task in task queue task_t* last; // last task in task queue int counter; // total task number int idle; // idle task number int max_threads; // max task number int quit; // the quit flag} threadpool_t;/** * initialize threadpool */ void threadpool_init(threadpool_t* pool, int threads_num);/** * add a task to threadpool */void threadpool_add_task(threadpool_t* pool, void* (*run)(void *args), void* arg);/** * destroy threadpool */void threadpool_destroy(threadpool_t* pool);#endif // THREAD_POLL_H_Threadpool.cpp:
#include <pthread.h>#include <cstdlib>#include <cstdio>#include <cerrno>#include <ctime>#include "threadpool.h"void *thread_routine(void *arg);// initialize the thread poolvoid threadpool_init(threadpool_t* pool, int threads_num) { int n_status = condition_init(&pool ->ready); if (n_status == 0) { printf("Info: initialize the thread pool successfully!\n"); } else { printf("Error: initialize the thread pool failed, status:%d\n", n_status); } pool->first = NULL; pool->last = NULL; pool->counter = 0; pool->idle = 0; pool->max_threads = threads_num; pool->quit = 0;}// add a task to thread poolvoid threadpool_add_task(threadpool_t* pool, void* (*run)(void *arg), void* arg) { // create a task task_t* new_task = reinterpret_cast<task_t*>(malloc(sizeof(task_t))); new_task->run = run; new_task->arg = arg; new_task->next = NULL; // lock the condition condition_lock(&pool->ready); // add the task to task queue if (pool->first == NULL) { pool->first = new_task; } else { // else add to the last task pool->last->next = new_task; } pool->last = new_task; /* * after you add a task to task queue, you need to allocate it to a thread: * (1)if idle thread num > 0: awake a idle thread * (2)if idle thread num = 0 & thread num does not reach maximum: create a new thread to run the task */ if (pool->idle > 0) { // awake a thread that wait for longest time condition_signal(&pool->ready); } else if (pool->counter < pool->max_threads) { // define a tid to get the thread identifier that we are going to create pthread_t tid; /* * pthread_create(): * (1)thread identifier * (2)set the pthread attribute * (3)the function that thread is going to run * (4)the args of run func * * A realistic limit of thread num is 200 to 400 threads * https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/ptcrea.htm */ pthread_create(&tid, NULL, thread_routine, pool); pool->counter++; } else { // when (idle == 0 & counter = max_threads), then wait printf("Warning: no idle thread, please wait...\n"); } condition_unlock(&pool->ready);}// create a thread to run the task run func// and the void *arg means the arg passed by pthread_create: poolvoid *thread_routine(void *arg) { struct timespec abs_name; bool timeout; printf("Info: create thread, and the thread id is: %ld\n", (u_int64_t)pthread_self()); threadpool_t *pool = reinterpret_cast<threadpool_t *>(arg); // keep visiting the task queue while (true) { timeout = false; condition_lock(&pool->ready); pool->idle++; // when task queue is empty, then block 2 second to get the new task // If timeout, then destroy the thread while (pool->first == NULL && !pool->quit) { printf("Info: thread %ld is waiting for a task\n", (u_int64_t)pthread_self()); // get the system time clock_gettime(CLOCK_REALTIME, &abs_name); abs_name.tv_sec += 2; int status; status = condition_timedwait(&pool->ready, &abs_name); // block for 2 second if (status == ETIMEDOUT) { printf("Info: thread %ld wait timed out\n", (u_int64_t)pthread_self()); timeout = true; break; } } pool->idle--; // when the thread run the task, we should unlock the thread pool if (pool->first != NULL) { // get the task from task queue task_t* t = pool->first; pool->first = t->next; // unlock the thread pool to make other threads visit task queue condition_unlock(&pool->ready); // run the task run func t->run(t->arg); free(t); // lock condition_lock(&pool->ready); } // when task queue is clean and quit flag is 1, then destroy the thread if (pool->quit && pool->first == NULL) { pool->counter--; // 若线程池中线程数为0,通知等待线程(主线程)全部任务已经完成 if (pool->counter == 0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; // destroy the thread } // if visit task queue timeout(means no task in queue), quit destory the thread if (timeout) { pool->counter--; condition_unlock(&pool->ready); break; // destroy the thread } condition_unlock(&pool->ready); } // if break, destroy the thread printf("Info: thread %ld quit\n", (u_int64_t)pthread_self()); return NULL;}/* * destroy a thread pool: * 1) awake all the idle thread * 2) wait for the running thread to finish */void threadpool_destroy(threadpool_t *pool) { if (pool->quit) { return; } condition_lock(&pool->ready); pool->quit = 1; if (pool->counter > 0) { if (pool->idle > 0) { condition_broadcast(&pool->ready); } while (pool->counter > 0) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destroy(&pool->ready);}test.cpp:
#include <unistd.h>#include <stdlib.h>#include <stdio.h>#include "threadpool.h"#define THREADPOOL_MAX_NUM 30void* mytask(void *arg) { printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), *reinterpret_cast<int*>(arg)); sleep(1); free(arg); return NULL;}int main(int argc, char* argv[]) { threadpool_t pool; threadpool_init(&pool, THREADPOOL_MAX_NUM); // add task to task queue for (int i=0; i < 100; i++) { int *arg = reinterpret_cast<int *>(malloc(sizeof(int))); *arg = i; threadpool_add_task(&pool, mytask, arg); } threadpool_destroy(&pool); return 0;}编译运行:
$g++ -g test.cpp threadpool.cpp condition.cpp -o test -std=c++11 -lpthread$./testInfo: initialize the thread pool successfully!Info: create thread, and the thread id is: 139898193295104Info: create thread, and the thread id is: 139898176509696Info: thread 139898176509696 is working on task 0Info: create thread, and the thread id is: 139898168116992Info: create thread, and the thread id is: 139898184902400Info: create thread, and the thread id is: 139898134546176Info: create thread, and the thread id is: 139898126153472Info: create thread, and the thread id is: 139898117760768Info: thread 139898117760768 is working on task 1Info: create thread, and the thread id is: 139898100975360Info: create thread, and the thread id is: 139898092582656Info: create thread, and the thread id is: 139898084189952Info: create thread, and the thread id is: 139898159724288Info: create thread, and the thread id is: 139898109368064Info: create thread, and the thread id is: 139898067404544Info: create thread, and the thread id is: 139898059011840Info: create thread, and the thread id is: 139898050619136Info: create thread, and the thread id is: 139898042226432Info: create thread, and the thread id is: 139898033833728Info: create thread, and the thread id is: 139898025441024Info: create thread, and the thread id is: 139898017048320Info: create thread, and the thread id is: 139898008655616Info: create thread, and the thread id is: 139898075797248Info: create thread, and the thread id is: 139898000262912Info: create thread, and the thread id is: 139898142938880Info: create thread, and the thread id is: 139898151331584Info: thread 139898159724288 is working on task 2Info: thread 139898151331584 is working on task 3Info: create thread, and the thread id is: 139897991870208Info: create thread, and the thread id is: 139897966692096Info: create thread, and the thread id is: 139897958299392Warning: no idle thread, please wait...Warning: no idle thread, please wait...Info: create thread, and the thread id is: 139897949906688Info: create thread, and the thread id is: 139897983477504Info: create thread, and the thread id is: 139897975084800Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Info: thread 139898067404544 is working on task 4Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Info: thread 139898168116992 is working on task 5Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Warning: no idle thread, please wait...Info: thread 139898142938880 is working on task 6Warning: no idle thread, please wait...Info: thread 139898042226432 is working on task 7Warning: no idle thread, please wait...Info: thread 139897949906688 is working on task 8Info: thread 139898184902400 is working on task 11Info: thread 139898134546176 is working on task 13Info: thread 139898017048320 is working on task 14Info: thread 139898008655616 is working on task 16Info: thread 139898193295104 is working on task 18Info: thread 139898000262912 is working on task 20Info: thread 139898100975360 is working on task 21Info: thread 139897983477504 is working on task 9Info: thread 139897975084800 is working on task 10Info: thread 139898092582656 is working on task 26Info: thread 139898050619136 is working on task 24Info: thread 139897991870208 is working on task 28Info: thread 139898025441024 is working on task 12Info: thread 139898084189952 is working on task 15Info: thread 139898109368064 is working on task 17Info: thread 139897966692096 is working on task 25Info: thread 139898075797248 is working on task 19Info: thread 139898059011840 is working on task 22Info: thread 139897958299392 is working on task 27Info: thread 139898033833728 is working on task 29Info: thread 139898126153472 is working on task 23Info: thread 139898176509696 is working on task 30Info: thread 139898117760768 is working on task 31Info: thread 139898159724288 is working on task 32Info: thread 139898151331584 is working on task 33Info: thread 139898067404544 is working on task 34Info: thread 139898168116992 is working on task 35Info: thread 139898142938880 is working on task 36Info: thread 139898042226432 is working on task 37Info: thread 139897949906688 is working on task 38Info: thread 139898184902400 is working on task 39Info: thread 139898000262912 is working on task 40Info: thread 139898017048320 is working on task 41Info: thread 139898008655616 is working on task 42Info: thread 139898134546176 is working on task 43Info: thread 139898193295104 is working on task 44Info: thread 139898050619136 is working on task 49Info: thread 139897991870208 is working on task 50Info: thread 139898025441024 is working on task 51Info: thread 139898084189952 is working on task 52Info: thread 139898109368064 is working on task 53Info: thread 139898075797248 is working on task 54Info: thread 139897975084800 is working on task 48Info: thread 139898100975360 is working on task 45Info: thread 139898033833728 is working on task 57Info: thread 139897983477504 is working on task 47Info: thread 139897966692096 is working on task 55Info: thread 139897958299392 is working on task 56Info: thread 139898092582656 is working on task 46Info: thread 139898126153472 is working on task 58Info: thread 139898059011840 is working on task 59Info: thread 139898176509696 is working on task 60Info: thread 139898117760768 is working on task 61Info: thread 139898159724288 is working on task 62Info: thread 139898151331584 is working on task 63Info: thread 139898067404544 is working on task 64Info: thread 139898168116992 is working on task 65Info: thread 139898142938880 is working on task 66Info: thread 139898042226432 is working on task 67Info: thread 139897949906688 is working on task 69Info: thread 139898184902400 is working on task 68Info: thread 139898000262912 is working on task 70Info: thread 139898008655616 is working on task 71Info: thread 139898017048320 is working on task 72Info: thread 139898050619136 is working on task 73Info: thread 139898134546176 is working on task 74Info: thread 139898109368064 is working on task 78Info: thread 139898100975360 is working on task 80Info: thread 139897975084800 is working on task 82Info: thread 139898075797248 is working on task 81Info: thread 139897966692096 is working on task 85Info: thread 139897958299392 is working on task 86Info: thread 139898126153472 is working on task 88Info: thread 139898059011840 is working on task 89Info: thread 139898092582656 is working on task 87Info: thread 139898025441024 is working on task 76Info: thread 139898084189952 is working on task 77Info: thread 139898193295104 is working on task 75Info: thread 139897991870208 is working on task 79Info: thread 139898033833728 is working on task 83Info: thread 139897983477504 is working on task 84Info: thread 139898176509696 is working on task 90Info: thread 139898117760768 is working on task 91Info: thread 139898159724288 is working on task 92Info: thread 139898151331584 is working on task 93Info: thread 139898067404544 is working on task 94Info: thread 139898168116992 is working on task 95Info: thread 139898142938880 is working on task 96Info: thread 139898042226432 is working on task 97Info: thread 139897949906688 is working on task 98Info: thread 139898184902400 is working on task 99Info: thread 139898000262912 quitInfo: thread 139898008655616 quitInfo: thread 139898017048320 quitInfo: thread 139898050619136 quitInfo: thread 139898134546176 quitInfo: thread 139898109368064 quitInfo: thread 139898100975360 quitInfo: thread 139897966692096 quitInfo: thread 139898126153472 quitInfo: thread 139897975084800 quitInfo: thread 139898075797248 quitInfo: thread 139897958299392 quitInfo: thread 139898059011840 quitInfo: thread 139898092582656 quitInfo: thread 139898025441024 quitInfo: thread 139898084189952 quitInfo: thread 139898193295104 quitInfo: thread 139897991870208 quitInfo: thread 139898033833728 quitInfo: thread 139897983477504 quitInfo: thread 139898176509696 quitInfo: thread 139898117760768 quitInfo: thread 139898159724288 quitInfo: thread 139898151331584 quitInfo: thread 139898067404544 quitInfo: thread 139898168116992 quitInfo: thread 139898142938880 quitInfo: thread 139898042226432 quitInfo: thread 139897949906688 quitInfo: thread 139898184902400 quitthreadpool.h:
#ifndef THREAD_POOL_H#define THREAD_POOL_H#include <stdio.h>#include <pthread.h>#include <functional>#include <vector>#include <queue>class ThreadPool { public: typedef void *(WrokerFunc)(void* arg); struct Task { WrokerFunc* run; void* arg; }; explicit ThreadPool(int thread_num); ~ThreadPool(); void addTask(WrokerFunc* func, void* arg); private: std::queue<Task*> task_queue_; std::vector<pthread_t> thread_list_; bool is_running_; // note: is_running_不用原子变量或者锁操作可能存在卡死问题 pthread_mutex_t mutex_; pthread_cond_t condition_; static void* thread_routine(void* pool_ptr); void thread_worker();};// =========================implementation=========================inline ThreadPool::ThreadPool(int thread_num) : is_running_(true) { pthread_mutex_init(&mutex_, NULL); pthread_cond_init(&condition_, NULL); for (int i = 0; i < thread_num; i++) { pthread_t pid; pthread_create(&pid, NULL, thread_routine, this); thread_list_.push_back(pid); }}inline ThreadPool::~ThreadPool() { pthread_mutex_lock(&mutex_); is_running_ = false; pthread_mutex_unlock(&mutex_); pthread_cond_broadcast(&condition_); // wakeup all threads that block to get task for (int i = 0; i < thread_list_.size(); i++) { pthread_join(thread_list_[i], NULL); } pthread_cond_destroy(&condition_); pthread_mutex_destroy(&mutex_);}inline void ThreadPool::addTask(WrokerFunc* func, void* arg) { Task* task = new Task(); task->run = func; task->arg = arg; pthread_mutex_lock(&mutex_); task_queue_.push(task); pthread_mutex_unlock(&mutex_); pthread_cond_signal(&condition_);}inline void* ThreadPool::thread_routine(void* pool_ptr) { ThreadPool* pool = static_cast<ThreadPool*>(pool_ptr); pool->thread_worker();}inline void ThreadPool::thread_worker() { Task* task = NULL; while (true) { pthread_mutex_lock(&mutex_); if (!is_running_) { pthread_mutex_unlock(&mutex_); break; } if (task_queue_.empty()) { pthread_cond_wait(&condition_, &mutex_); // 获取不到任务时阻塞, 直到有新的任务入队 if (task_queue_.empty()) { pthread_mutex_unlock(&mutex_); continue; } } task = task_queue_.front(); task_queue_.pop(); pthread_mutex_unlock(&mutex_); (*(task->run))(task->arg); delete task; task = NULL; } // 线程池终止时(is_running_ = false)确保任务队列为空后退出 while (true) { pthread_mutex_lock(&mutex_); if (task_queue_.empty()) { pthread_mutex_unlock(&mutex_); break; } task = task_queue_.front(); task_queue_.pop(); pthread_mutex_unlock(&mutex_); delete task; task = NULL; } printf("Info: thread[%lu] exit\n", pthread_self());}#endif // THREAD_POOL_H测试代码threadpool_test.cpp:
#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <pthread.h>#include <vector>#include "threadpool.h"void* MyTaskFunc(void* arg) { int* i = static_cast<int*>(arg); printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), *i); return NULL;}int main() { ThreadPool pool(10); for (int i = 0; i < 100; i++) { int* arg = new int(i); pool.addTask(&MyTaskFunc, arg); } return 0;}编译运行:
$g++ -g threadpool_test.cpp -o threadpool_test -lpthread$./threadpool_test [MyTaskFunc]: thread[140224777099008] is working on 0[MyTaskFunc]: thread[140224793884416] is working on 8[MyTaskFunc]: thread[140224844240640] is working on 2Info: thread[140224844240640] exit[MyTaskFunc]: thread[140224827455232] is working on 4Info: thread[140224827455232] exit[MyTaskFunc]: thread[140224810669824] is working on 6Info: thread[140224810669824] exit[MyTaskFunc]: thread[140224777099008] is working on 10Info: thread[140224777099008] exit[MyTaskFunc]: thread[140224852633344] is working on 1Info: thread[140224852633344] exit[MyTaskFunc]: thread[140224835847936] is working on 3Info: thread[140224835847936] exit[MyTaskFunc]: thread[140224802277120] is working on 7Info: thread[140224802277120] exitInfo: thread[140224793884416] exit[MyTaskFunc]: thread[140224819062528] is working on 5Info: thread[140224819062528] exit[MyTaskFunc]: thread[140224785491712] is working on 9Info: thread[140224785491712] exit上述的线程池无法很好地支持同步任务,因此我们基于信号量实现了SyncTaskQueue。
sync_task_queue.h:
#ifndef SYNC_TASK_QUEUE_H#define SYNC_TASK_QUEUE_H#include <semaphore.h>#include <vector>#include "threadpool.h"class SyncTaskQueue { public: struct SyncTask { ThreadPool::Task task; sem_t* sem; }; explicit SyncTaskQueue(ThreadPool* pool_ptr); ~SyncTaskQueue(); void addTask(ThreadPool::WrokerFunc* func, void* sync_task_ptr); void wait(); private: ThreadPool* threadpool_; sem_t sem_; int sync_task_num_; // 务必保证单线程读写, 否则需要加锁 static ThreadPool::WrokerFunc workerFuncWrapper;};inline SyncTaskQueue::SyncTaskQueue(ThreadPool* pool_ptr) : threadpool_(pool_ptr), sync_task_num_(0) { sem_init(&sem_, 0, 0);}inline SyncTaskQueue::~SyncTaskQueue() { // make sure that all task has been finished before destory if (sync_task_num_ > 0) { wait(); } sem_destroy(&sem_);}inline void SyncTaskQueue::addTask(ThreadPool::WrokerFunc* func, void* arg) { sync_task_num_++; // wrapper the worker function with sem SyncTask* sync_task = new SyncTask(); sync_task->sem = &(this->sem_); sync_task->task.run = func; sync_task->task.arg = arg; threadpool_->addTask(&workerFuncWrapper, sync_task);}inline void SyncTaskQueue::wait() { while (sync_task_num_) { int sem_value = 0; sem_wait(&sem_); sync_task_num_--; }}inline void* SyncTaskQueue::workerFuncWrapper(void* sync_task_ptr) { SyncTask* sync_task = static_cast<SyncTask*>(sync_task_ptr); (*(sync_task->task.run))(sync_task->task.arg); sem_post(sync_task->sem); delete sync_task;}#endif // SYNC_TASK_QUEUE_H测试文件sync_task_queue_test.cpp:
#include <unistd.h>#include "sync_task_queue.h"void* MyTaskFunc(void* arg) { int i = *static_cast<int*>(arg); printf("[MyTaskFunc]: thread[%lu] is working on %d\n", pthread_self(), i); sleep(2); return NULL;}int main() { ThreadPool threadpool(20); SyncTaskQueue sync_task_queue(&threadpool); for (int i = 0; i < 15; i++) { int* arg = new int(i); sync_task_queue.addTask(&MyTaskFunc, arg); } printf("====================================wait for result===================================\n"); sync_task_queue.wait();}编译运行:
$g++ -g sync_task_queue_test.cpp -o sync_task_queue_test -lpthread$./sync_task_queue_test [MyTaskFunc]: thread[140349199148800] is working on 0[MyTaskFunc]: thread[140349266290432] is working on 12[MyTaskFunc]: thread[140349358610176] is working on 2[MyTaskFunc]: thread[140349341824768] is working on 3[MyTaskFunc]: thread[140349333432064] is working on 4[MyTaskFunc]: thread[140349325039360] is working on 5[MyTaskFunc]: thread[140349308253952] is working on 6[MyTaskFunc]: thread[140349299861248] is working on 8[MyTaskFunc]: thread[140349316646656] is working on 7[MyTaskFunc]: thread[140349283075840] is working on 9[MyTaskFunc]: thread[140349291468544] is working on 10[MyTaskFunc]: thread[140349274683136] is working on 11[MyTaskFunc]: thread[140349257897728] is working on 13[MyTaskFunc]: thread[140349249505024] is working on 14[MyTaskFunc]: thread[140349350217472] is working on 1====================================wait for result===================================Info: thread[140349232719616] exitInfo: thread[140349241112320] exitInfo: thread[140349266290432] exitInfo: thread[140349207541504] exitInfo: thread[140349299861248] exitInfo: thread[140349283075840] exitInfo: thread[140349291468544] exitInfo: thread[140349249505024] exitInfo: thread[140349274683136] exitInfo: thread[140349333432064] exitInfo: thread[140349325039360] exitInfo: thread[140349308253952] exitInfo: thread[140349224326912] exitInfo: thread[140349316646656] exitInfo: thread[140349199148800] exitInfo: thread[140349350217472] exitInfo: thread[140349257897728] exitInfo: thread[140349215934208] exitInfo: thread[140349358610176] exitInfo: thread[140349341824768] exit传统C++线程池仅能接受特殊的Task(执行函数需要满足特殊的格式),使用C++11特性的线程池可以更好地支持任意类型参数的Task。
threadpool.h:
参考自:https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h
另外一种实现:https://github.com/mtrebi/thread-pool/blob/master/include/ThreadPool.h
#ifndef THREAD_POOL_H#define THREAD_POOL_H#include <vector>#include <queue>#include <memory>#include <thread>#include <mutex>#include <condition_variable>#include <future>#include <utility>#include <functional>#include <stdexcept>class ThreadPool { public: explicit ThreadPool(size_t); template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector<std::thread> workers_; // the task queue std::queue<std::function<void()>> tasks_; // synchronization std::mutex queue_mutex_; std::condition_variable condition_; bool stop_;};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(size_t threads) : stop_(false) { for (size_t i = 0; i < threads; ++i) { workers_.emplace_back( [this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex_); this->condition_.wait(lock, [this] { return this->stop_ || !this->tasks_.empty(); }); if (this->stop_ && this->tasks_.empty()) { return; } task = std::move(this->tasks_.front()); this->tasks_.pop(); } task(); } }); }}// add new work item to the pooltemplate<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()>> ( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex_); // don't allow enqueueing after stopping the pool if (stop_) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks_.emplace([task](){ (*task)(); }); } condition_.notify_one(); return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex_); stop_ = true; } condition_.notify_all(); for (std::thread &worker : workers_) { worker.join(); }}#endif // THREAD_POOL_Htest.cpp:
#include <unistd.h>#include "threadpool.h"void mytask(int i) { printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i); sleep(1); return;}int main() { ThreadPool threadpool(20); for (int i = 0; i < 100; ++i) { threadpool.enqueue(mytask, i); } return 0;}编译运行:
$g++ -g test.cpp -o test -std=c++11 -lpthread$./testInfo: thread 139679726323456 is working on task 1Info: thread 139679709538048 is working on task 3Info: thread 139679717930752 is working on task 2Info: thread 139679734716160 is working on task 0Info: thread 139679600432896 is working on task 15Info: thread 139679684359936 is working on task 6Info: thread 139679617218304 is working on task 14Info: thread 139679634003712 is working on task 12Info: thread 139679667574528 is working on task 8Info: thread 139679625611008 is working on task 13Info: thread 139679575254784 is working on task 19Info: thread 139679659181824 is working on task 9Info: thread 139679701145344 is working on task 4Info: thread 139679692752640 is working on task 5Info: thread 139679592040192 is working on task 17Info: thread 139679583647488 is working on task 18Info: thread 139679675967232 is working on task 7Info: thread 139679642396416 is working on task 11Info: thread 139679608825600 is working on task 16Info: thread 139679650789120 is working on task 10Info: thread 139679684359936 is working on task 21Info: thread 139679617218304 is working on task 24Info: thread 139679600432896 is working on task 28Info: thread 139679709538048 is working on task 23Info: thread 139679659181824 is working on task 30Info: thread 139679692752640 is working on task 32Info: thread 139679583647488 is working on task 34Info: thread 139679608825600 is working on task 35Info: thread 139679592040192 is working on task 33Info: thread 139679634003712 is working on task 25Info: thread 139679625611008 is working on task 29Info: thread 139679726323456 is working on task 20Info: thread 139679701145344 is working on task 31Info: thread 139679650789120 is working on task 36Info: thread 139679667574528 is working on task 27Info: thread 139679575254784 is working on task 37Info: thread 139679734716160 is working on task 26Info: thread 139679675967232 is working on task 38Info: thread 139679717930752 is working on task 22Info: thread 139679642396416 is working on task 39Info: thread 139679684359936 is working on task 40Info: thread 139679692752640 is working on task 45Info: thread 139679625611008 is working on task 51Info: thread 139679583647488 is working on task 43Info: thread 139679659181824 is working on task 44Info: thread 139679575254784 is working on task 55Info: thread 139679592040192 is working on task 47Info: thread 139679617218304 is working on task 41Info: thread 139679717930752 is working on task 57Info: thread 139679726323456 is working on task 49Info: thread 139679634003712 is working on task 50Info: thread 139679650789120 is working on task 52Info: thread 139679675967232 is working on task 59Info: thread 139679667574528 is working on task 54Info: thread 139679608825600 is working on task 46Info: thread 139679734716160 is working on task 56Info: thread 139679600432896 is working on task 48Info: thread 139679642396416 is working on task 58Info: thread 139679709538048 is working on task 42Info: thread 139679701145344 is working on task 53Info: thread 139679684359936 is working on task 60Info: thread 139679625611008 is working on task 62Info: thread 139679692752640 is working on task 61Info: thread 139679583647488 is working on task 63Info: thread 139679659181824 is working on task 64Info: thread 139679575254784 is working on task 65Info: thread 139679592040192 is working on task 66Info: thread 139679617218304 is working on task 67Info: thread 139679717930752 is working on task 68Info: thread 139679726323456 is working on task 69Info: thread 139679650789120 is working on task 71Info: thread 139679634003712 is working on task 70Info: thread 139679675967232 is working on task 72Info: thread 139679667574528 is working on task 73Info: thread 139679608825600 is working on task 74Info: thread 139679734716160 is working on task 75Info: thread 139679642396416 is working on task 77Info: thread 139679709538048 is working on task 78Info: thread 139679701145344 is working on task 79Info: thread 139679600432896 is working on task 76Info: thread 139679684359936 is working on task 80Info: thread 139679625611008 is working on task 81Info: thread 139679583647488 is working on task 83Info: thread 139679692752640 is working on task 82Info: thread 139679659181824 is working on task 84Info: thread 139679575254784 is working on task 85Info: thread 139679717930752 is working on task 88Info: thread 139679617218304 is working on task 87Info: thread 139679592040192 is working on task 86Info: thread 139679634003712 is working on task 91Info: thread 139679650789120 is working on task 90Info: thread 139679667574528 is working on task 93Info: thread 139679675967232 is working on task 92Info: thread 139679608825600 is working on task 94Info: thread 139679726323456 is working on task 89Info: thread 139679734716160 is working on task 95Info: thread 139679709538048 is working on task 96Info: thread 139679642396416 is working on task 97Info: thread 139679701145344 is working on task 98Info: thread 139679600432896 is working on task 99创建一个ThreadPool的全局变量,将所有需要异步执行的任务丢到该线程池中即可:
#include <unistd.h>#include "threadpool.h"// 全局异步线程池ThreadPool g_threadpool2(20);int main() { // 执行异步任务 g_threadpool2.enqueue( [] { sleep(1); printf("async task done\n"); }); return 0;}编译运行:
$g++ -g test.cpp -o test -std=c++11 -lpthread$./test async task done创建一个ThreadPool的全局变量并添加同步任务,通过std::future的wait()方法阻塞等待同步结果,也可以使用get()方法获取到函数返回值。
#include <unistd.h>#include <memory>#include "threadpool.h"// 全局异步线程池ThreadPool g_threadpool2(20);int main() { // 创建同步任务 auto res = g_threadpool2.enqueue( [] { sleep(1); printf("sync task done\n"); }); // 阻塞等待同步结果 res.wait(); return 0;}编译运行:
$g++ -g test.cpp -o test -std=c++11 -lpthread$./test sync task done创建一个临时ThreadPool,利用其析构函数完成并发同步任务:
需要注意的是,这种用法已经脱离了线程池的初衷(避免处理短时间任务时创建与销毁线程的代价),它的主要用途是实现「多线程并发」,常用于并发多个IO请求并等待同步结果。
考虑这个场景:代码中仅在某种特殊场景(极少触发)下需要并发请求多个http链接,一方面我们不希望这些请求影响到进程的业务线程池,另一方面我们又不想单独为这个场景创建一个全局线程池使其大部分时间都在空跑。
2.3这种用法解决了我们「临时创建线程+执行并行任务+销毁线程」的局部并发问题,避免我们直接在用户代码处直接创建线程。
#include <unistd.h>#include <memory>#include "threadpool.h"int main() { // 创建并发度为5的局部线程池 std::shared_ptr<ThreadPool> threadpool = std::make_shared<ThreadPool>(5); // 创建30个异步任务 for (int i = 0; i < 30; i++) { threadpool->enqueue( [i] { sleep(1); printf("Info: thread %ld is working on task %d\n", (u_int64_t)pthread_self(), i); }); } // 阻塞直至获取同步结果 threadpool.reset(); return 0;}编译运行:
$g++ -g test.cpp -o test -std=c++11 -lpthread$./test Info: thread 139811129124608 is working on task 4Info: thread 139811145910016 is working on task 2Info: thread 139811137517312 is working on task 3Info: thread 139811162695424 is working on task 0Info: thread 139811154302720 is working on task 1Info: thread 139811129124608 is working on task 5Info: thread 139811137517312 is working on task 7Info: thread 139811145910016 is working on task 6Info: thread 139811162695424 is working on task 8Info: thread 139811154302720 is working on task 9Info: thread 139811129124608 is working on task 10Info: thread 139811137517312 is working on task 11Info: thread 139811162695424 is working on task 13Info: thread 139811154302720 is working on task 14Info: thread 139811145910016 is working on task 12Info: thread 139811129124608 is working on task 15Info: thread 139811137517312 is working on task 18Info: thread 139811145910016 is working on task 19Info: thread 139811162695424 is working on task 16Info: thread 139811154302720 is working on task 17Info: thread 139811129124608 is working on task 21Info: thread 139811162695424 is working on task 23Info: thread 139811154302720 is working on task 24Info: thread 139811145910016 is working on task 22Info: thread 139811137517312 is working on task 20Info: thread 139811162695424 is working on task 25Info: thread 139811154302720 is working on task 26Info: thread 139811129124608 is working on task 27Info: thread 139811137517312 is working on task 29Info: thread 139811145910016 is working on task 28[1] https://www.cnblogs.com/ailumiyana/p/10016965.html
[2] https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h
[3] https://blog.csdn.net/qq_34771252/article/details/90319537
[4] https://github.com/lizhenghn123/zl_threadpool/tree/master/ThreadPoolCpp03