Thread pool is a form of multi-threaded processing, mostly used on high-concurrency servers. It can reasonably and effectively utilize thread resources on high-concurrency servers; threads and processes are used to handle various branch sub-functions. Our usual operations are: receive message ==> message classification ==> thread creation ==> pass message to child thread ==> thread separation ==> execute task in child thread ==> exit when task ends; [[317565]] For most small LAN communications, the above method is sufficient to meet the needs; but when our communication range is expanded to wide area networks or large LAN communications, we will face a large number of messages frequently requesting the server; in this case, creating and destroying threads has become a luxury, especially for embedded servers, the rational use of memory resources should be guaranteed; Therefore, thread pool technology came into being; thread pool allows a thread to be reused multiple times, and the message processing inside the reused thread can be different each time, saving the cost of creation and destruction without having to open a thread for each request; Structure explanation: The thread pool is an abstract concept, which consists of a task queue, a bunch of threads, and a manager thread.
We will use the above figure as an example to implement a basic thread pool. Next, we will explain it in parts; the order of explanation is: 1. Overall structure of thread pool 2. Thread array 3. Task queue 4. Manager thread 5. Example of using thread pool interface 1. Overall structure of thread pool Here we explain the logical structure of the thread pool; look at the code below, the threadpool_t structure contains thread pool status information, task queue information and mutex locks in multi-threaded operations; the task structure contains a function pointer that can hold a variety of different task functions, and a void* parameter passed into the task function; Note: When using, you need to load your message classification processing function into the task's (*function); then place it in the task queue and notify the idle thread; Thread pool status information: describes the basic information of the current thread pool, such as whether it is enabled, the minimum number of threads, the maximum number of threads, the number of live threads, the number of busy threads, the number of threads to be destroyed, etc. Task queue information: describes the basic information of the current task queue, such as the maximum number of tasks, queue not full condition variables, queue not empty condition variables, etc. Multi-threaded mutex lock: ensure that only one thread takes tasks from the task queue and modifies the task queue information and thread pool information at the same time; Function pointer: In the message packing stage, the classified message processing function is placed in (*function); Void* type parameter: used to pass the information required by the message processing function; - /*Task*/
- typedef struct {
- void *(* function )(void *);
- void *arg;
- }threadpool_task_t;
- /*Thread pool management*/
- struct threadpool_t{
- pthread_mutex_t lock; /* Lock the entire structure */
- pthread_mutex_t thread_counter; /* Lock used when using the number of busy threads */
- pthread_cond_t queue_not_full; /* Condition variable, task queue is not full */
- pthread_cond_t queue_not_empty; /* The task queue is not empty */
- pthread_t *threads; /* Stores the thread's tid, which actually manages the thread array */
- pthread_t admin_tid; /* Administrator thread tid */
- threadpool_task_t *task_queue; /* Task queue */
- /*Thread pool information*/
- int min_thr_num; /* Minimum number of threads in the thread pool */
- int max_thr_num; /* Maximum number of threads in the thread pool */
- int live_thr_num; /* Number of threads alive in the thread pool */
- int busy_thr_num; /* Busy thread, working thread */
- int wait_exit_thr_num; /* Number of threads to be destroyed */
- /*Task queue information*/
- int queue_front; /* queue head */
- int queue_rear; /* tail of the queue */
- int queue_size;
- /* Number of existing tasks */
- int queue_max_size; /* The maximum number of tasks that the queue can accommodate */
- /*Thread pool status*/
- int shutdown; /* true means shutdown */
- };
- **/*Create thread pool*/**
- threadpool_t *
- threadpool_create( int min_thr_num, int max_thr_num, int queue_max_size)
- { /* minimum number of threads, maximum number of threads, maximum number of tasks*/
- int i;
- threadpool_t *pool = NULL ;
- do
- {
- /* Thread pool space creation */
- if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL )
- {
- printf( "malloc threadpool false; \n" );
- break;
- }
- /*Initialize information*/
- pool->min_thr_num = min_thr_num;
- pool->max_thr_num = max_thr_num;
- pool->busy_thr_num = 0;
- pool->live_thr_num = min_thr_num;
- pool->wait_exit_thr_num = 0;
- pool->queue_front = 0;
- pool->queue_rear = 0;
- pool->queue_size = 0;
- pool->queue_max_size = queue_max_size;
- pool->shutdown = false ;
- /* According to the maximum number of threads, make space for the working thread array and clear it to 0 */
- pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
- if (pool->threads == NULL )
- {
- printf( "malloc threads false;\n" );
- break;
- }
- memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
- /* Queue open space */
- pool->task_queue =
- (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
- if (pool->task_queue == NULL )
- {
- printf( "malloc task queue false;\n" );
- break;
- }
- /* Initialize mutex and condition variable */
- if ( pthread_mutex_init(&(pool->lock), NULL ) != 0 ||
- pthread_mutex_init(&(pool->thread_counter), NULL ) !=0 ||
- pthread_cond_init(&(pool->queue_not_empty), NULL ) !=0 ||
- pthread_cond_init(&(pool->queue_not_full), NULL ) !=0)
- {
- printf( "init lock or cond false;\n" );
- break;
- }
- /* Start min_thr_num worker threads */
- for (i=0; i<min_thr_num; i++)
- {
- /* pool points to the current thread pool. The threadpool_thread function will be explained later. */
- pthread_create(&(pool->threads[i]), NULL , threadpool_thread, (void *)pool);
- printf( "start thread 0x%x... \n" , (unsigned int )pool->threads[i]);
- }
- /* The admin_thread function will be explained later */
- pthread_create(&(pool->admin_tid), NULL , admin_thread, (void *)pool);
- return pool;
- } while(0);
- /* Release the pool space */
- threadpool_free(pool);
- return NULL ;
- }
2. Thread Array The thread array is actually a space for storing a bunch of thread tids when the thread pool is initialized. It logically forms a pool in which threads created in advance are placed. This space contains working threads, threads waiting to work (idle threads), threads waiting to be destroyed, and declared but not initialized thread spaces.
- /*Worker thread*/
- void *
- threadpool_thread(void *threadpool)
- {
- threadpool_t *pool = (threadpool_t *)threadpool;
- threadpool_task_t task;
- while ( true )
- {
- pthread_mutex_lock(&(pool->lock));
- /* If there is no task, it will be blocked on "task queue is not empty", and if there is a task, it will jump out */
- while ((pool->queue_size == 0) && (!pool->shutdown))
- {
- printf( "thread 0x%x is waiting \n" , (unsigned int )pthread_self());
- pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
- /* Determine whether to clear the thread, suicide function */
- if (pool->wait_exit_thr_num > 0)
- {
- pool->wait_exit_thr_num
- /* Determine whether the number of threads in the thread pool is greater than the minimum number of threads, if so, terminate the current thread */
- if (pool->live_thr_num > pool->min_thr_num)
- {
- printf( "thread 0x%x is exiting \n" , (unsigned int )pthread_self());
- pool->live_thr_num
- pthread_mutex_unlock(&(pool->lock));
- pthread_exit( NULL ); //End thread
- }
- }
- }
- /* Thread pool switch status */
- if (pool->shutdown) //Close the thread pool
- {
- pthread_mutex_unlock(&(pool->lock));
- printf( "thread 0x%x is exiting \n" , (unsigned int )pthread_self());
- pthread_exit( NULL ); //The thread ends itself
- }
- //Otherwise the thread can take out the task
- task.function = pool->task_queue[pool->queue_front] .function ; //Dequeue operation
- task.arg = pool->task_queue[pool->queue_front].arg;
- pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //Ring structure
- pool->queue_size
- //Notify that new tasks can be added
- pthread_cond_broadcast(&(pool->queue_not_full));
- //Release thread lock
- pthread_mutex_unlock(&(pool->lock));
- //Execute the task just taken out
- printf( "thread 0x%x start working \n" , (unsigned int )pthread_self());
- pthread_mutex_lock(&(pool->thread_counter)); //Lock the busy thread variable
- pool->busy_thr_num++;
- pthread_mutex_unlock(&(pool->thread_counter));
- (*(task. function ))(task.arg); //Execute task
- //End of task processing
- printf( "thread 0x%x end working \n" , (unsigned int )pthread_self());
- pthread_mutex_lock(&(pool->thread_counter));
- pool->busy_thr_num
- pthread_mutex_unlock(&(pool->thread_counter));
- }
- pthread_exit( NULL );
- }
3. Task Queue The existence of task queue is similar to that of thread array. When the thread pool is initialized, space is allocated according to the maximum number of tasks passed in. When the server receives requests, the messages are classified and packaged into tasks, and the tasks are put into the task queue and the idle threads are notified to take them. The difference is that the task queue has a clear order of priority, first in first out. The threads in the thread array are in a competitive relationship to get the mutex lock and fight for tasks.
- /*Add a task to the thread pool's task queue*/
- int
- threadpool_add_task(threadpool_t *pool, void *(* function )(void *arg), void *arg)
- {
- pthread_mutex_lock(&(pool->lock));
- /*If the queue is full, call wait to block*/
- while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
- pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
- /*If the thread pool is closed*/
- if (pool->shutdown)
- {
- pthread_mutex_unlock(&(pool->lock));
- return -1;
- }
- /*Clear the parameter arg of the callback function of the working thread*/
- if (pool->task_queue[pool->queue_rear].arg != NULL )
- {
- free (pool->task_queue[pool->queue_rear].arg);
- pool->task_queue[pool->queue_rear].arg = NULL ;
- }
- /*Add task to task queue*/
- pool->task_queue[pool->queue_rear]. function = function ;
- pool->task_queue[pool->queue_rear].arg = arg;
- pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* logical ring */
- pool->queue_size++;
- /*After adding the task, the queue is no longer empty, and a thread in the thread pool is awakened*/
- pthread_cond_signal(&(pool->queue_not_empty));
- pthread_mutex_unlock(&(pool->lock));
- return 0;
- }
4. Manager Thread As the manager of the thread pool, the main functions of this thread include: checking the survival status and working status of threads in the thread pool; dynamically adding or deleting threads according to the current request status of the server to ensure that the number of threads in the thread pool is maintained at a reasonable and efficient balance; In the final analysis, it is a separate thread that checks periodically and adds and removes threads based on our balance-maintaining algorithm; - /*Manage threads*/
- void *
- admin_thread(void *threadpool)
- {
- int i;
- threadpool_t *pool = (threadpool_t *)threadpool;
- while (!pool->shutdown)
- {
- printf( "admin ------------------\n" );
- sleep(DEFAULT_TIME); /*Manage after a while*/
- pthread_mutex_lock(&(pool->lock)); /*lock*/
- int queue_size = pool->queue_size; /*Number of tasks*/
- int live_thr_num = pool->live_thr_num; /*Number of live threads*/
- pthread_mutex_unlock(&(pool->lock)); /*unlock*/
- pthread_mutex_lock(&(pool->thread_counter));
- int busy_thr_num = pool->busy_thr_num; /*Number of busy threads*/
- pthread_mutex_unlock(&(pool->thread_counter));
- printf( "admin busy live -%d--%d-\n" , busy_thr_num, live_thr_num);
- /*Create a new thread. The actual number of tasks is greater than the minimum number of tasks waiting, and the number of surviving threads is less than the maximum number of threads*/
- if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
- {
- printf( "admin add-----------\n" );
- pthread_mutex_lock(&(pool->lock));
- int add =0;
- /*Add DEFAULT_THREAD_NUM threads at a time*/
- for (i=0; i<pool->max_thr_num && add <DEFAULT_THREAD_NUM
- && pool->live_thr_num < pool->max_thr_num; i++)
- {
- if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
- {
- pthread_create(&(pool->threads[i]), NULL , threadpool_thread, (void *)pool);
- add ++;
- pool->live_thr_num++;
- printf( "new thread -----------------------\n" );
- }
- }
- pthread_mutex_unlock(&(pool->lock));
- }
- /*Destroy redundant threads. Busy threads x2 are less than surviving threads, and the number of surviving threads is greater than the minimum number of threads*/
- if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)
- {
- // printf( "admin busy --%d--%d----\n" , busy_thr_num, live_thr_num);
- /*Destroy DEFAULT_THREAD_NUM threads at a time*/
- pthread_mutex_lock(&(pool->lock));
- pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
- pthread_mutex_unlock(&(pool->lock));
- for (i=0; i<DEFAULT_THREAD_NUM; i++)
- {
- //Notify the idle thread to commit suicide
- pthread_cond_signal(&(pool->queue_not_empty));
- printf( "admin cler --\n" );
- }
- }
- }
- return NULL ;
- /*Is the thread alive*/
- int
- is_thread_alive(pthread_t tid)
- {
- int kill_rc = pthread_kill(tid, 0); //Send signal 0 to test whether it is alive
- if (kill_rc == ESRCH) //thread does not exist
- {
- return false ;
- }
- return true ;
- }
5. Release - /*Release thread pool*/
- int
- threadpool_free(threadpool_t *pool)
- {
- if (pool == NULL )
- return -1;
- if (pool->task_queue)
- free (pool->task_queue);
- if (pool->threads)
- {
- free (pool->threads);
- pthread_mutex_lock(&(pool->lock)); /*Lock first and then destroy*/
- pthread_mutex_destroy(&(pool->lock));
- pthread_mutex_lock(&(pool->thread_counter));
- pthread_mutex_destroy(&(pool->thread_counter));
- pthread_cond_destroy(&(pool->queue_not_empty));
- pthread_cond_destroy(&(pool->queue_not_full));
- }
- free (pool);
- pool = NULL ;
- return 0;
- }
- /*Destroy thread pool*/
- int
- threadpool_destroy(threadpool_t *pool)
- {
- int i;
- if (pool == NULL )
- {
- return -1;
- }
- pool->shutdown = true ;
- /*Destroy the manager thread*/
- pthread_join(pool->admin_tid, NULL );
- //Notify all threads to commit suicide (while taking on their own tasks)
- for (i=0; i<pool->live_thr_num; i++)
- {
- pthread_cond_broadcast(&(pool->queue_not_empty));
- }
- /*Wait for the thread to end. First pthread_exit and then wait for it to end*/
- for (i=0; i<pool->live_thr_num; i++)
- {
- pthread_join(pool->threads[i], NULL );
- }
- threadpool_free(pool);
- return 0;
- }
6. Interface - /* Thread pool initialization, its manager thread and worker thread will be started */
- threadpool_t *thp = threadpool_create(10, 100, 100);
- printf( "threadpool init ... ... \n" );
- /* Add after receiving the task */
- threadpool_add_task(thp, do_work, (void *)p);
- // ... ...
- /* Destroy */
- threadpool_destroy(thp);
|