Detailed explanation of the thread pool principle and how to implement the thread pool in C language

Detailed explanation of the thread pool principle and how to implement the thread pool in C language

Thread pool is a form of multi-threaded processing, mostly used on high-concurrency servers. It can reasonably and effectively utilize thread resources on high-concurrency servers; threads and processes are used to handle various branch sub-functions. Our usual operations are: receive message ==> message classification ==> thread creation ==> pass message to child thread ==> thread separation ==> execute task in child thread ==> exit when task ends;

[[317565]]

For most small LAN communications, the above method is sufficient to meet the needs; but when our communication range is expanded to wide area networks or large LAN communications, we will face a large number of messages frequently requesting the server; in this case, creating and destroying threads has become a luxury, especially for embedded servers, the rational use of memory resources should be guaranteed;

Therefore, thread pool technology came into being; thread pool allows a thread to be reused multiple times, and the message processing inside the reused thread can be different each time, saving the cost of creation and destruction without having to open a thread for each request;

Structure explanation:

The thread pool is an abstract concept, which consists of a task queue, a bunch of threads, and a manager thread.


We will use the above figure as an example to implement a basic thread pool. Next, we will explain it in parts; the order of explanation is: 1. Overall structure of thread pool 2. Thread array 3. Task queue 4. Manager thread 5. Example of using thread pool interface

1. Overall structure of thread pool

Here we explain the logical structure of the thread pool; look at the code below, the threadpool_t structure contains thread pool status information, task queue information and mutex locks in multi-threaded operations; the task structure contains a function pointer that can hold a variety of different task functions, and a void* parameter passed into the task function;

Note: When using, you need to load your message classification processing function into the task's (*function); then place it in the task queue and notify the idle thread;

Thread pool status information: describes the basic information of the current thread pool, such as whether it is enabled, the minimum number of threads, the maximum number of threads, the number of live threads, the number of busy threads, the number of threads to be destroyed, etc.

Task queue information: describes the basic information of the current task queue, such as the maximum number of tasks, queue not full condition variables, queue not empty condition variables, etc.

Multi-threaded mutex lock: ensure that only one thread takes tasks from the task queue and modifies the task queue information and thread pool information at the same time;

Function pointer: In the message packing stage, the classified message processing function is placed in (*function);

Void* type parameter: used to pass the information required by the message processing function;

  1. /*Task*/
  2. typedef struct {
  3. void *(* function )(void *);
  4. void *arg;
  5. }threadpool_task_t;
  6. /*Thread pool management*/
  7. struct threadpool_t{
  8. pthread_mutex_t lock; /* Lock the entire structure */
  9. pthread_mutex_t thread_counter; /* Lock used when using the number of busy threads */
  10. pthread_cond_t queue_not_full; /* Condition variable, task queue is not full */
  11. pthread_cond_t queue_not_empty; /* The task queue is not empty */
  12. pthread_t *threads; /* Stores the thread's tid, which actually manages the thread array */
  13. pthread_t admin_tid; /* Administrator thread tid */
  14. threadpool_task_t *task_queue; /* Task queue */
  15. /*Thread pool information*/
  16. int min_thr_num; /* Minimum number of threads in the thread pool */
  17. int max_thr_num; /* Maximum number of threads in the thread pool */
  18. int live_thr_num; /* Number of threads alive in the thread pool */
  19. int busy_thr_num; /* Busy thread, working thread */
  20. int wait_exit_thr_num; /* Number of threads to be destroyed */
  21. /*Task queue information*/
  22. int queue_front; /* queue head */
  23. int queue_rear; /* tail of the queue */
  24. int queue_size;
  25. /* Number of existing tasks */
  26. int queue_max_size; /* The maximum number of tasks that the queue can accommodate */
  27. /*Thread pool status*/
  28. int shutdown; /* true means shutdown */
  29. };
  30. **/*Create thread pool*/**
  31. threadpool_t *
  32. threadpool_create( int min_thr_num, int max_thr_num, int queue_max_size)
  33. { /* minimum number of threads, maximum number of threads, maximum number of tasks*/
  34. int i;
  35. threadpool_t *pool = NULL ;
  36. do
  37. {
  38. /* Thread pool space creation */
  39. if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL )
  40. {
  41. printf( "malloc threadpool false; \n" );
  42. break;
  43. }
  44. /*Initialize information*/
  45. pool->min_thr_num = min_thr_num;
  46. pool->max_thr_num = max_thr_num;
  47. pool->busy_thr_num = 0;
  48. pool->live_thr_num = min_thr_num;
  49. pool->wait_exit_thr_num = 0;
  50. pool->queue_front = 0;
  51. pool->queue_rear = 0;
  52. pool->queue_size = 0;
  53. pool->queue_max_size = queue_max_size;
  54. pool->shutdown = false ;
  55. /* According to the maximum number of threads, make space for the working thread array and clear it to 0 */
  56. pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
  57. if (pool->threads == NULL )
  58. {
  59. printf( "malloc threads false;\n" );
  60. break;
  61. }
  62. memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
  63. /* Queue open space */
  64. pool->task_queue =
  65. (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
  66. if (pool->task_queue == NULL )
  67. {
  68. printf( "malloc task queue false;\n" );
  69. break;
  70. }
  71. /* Initialize mutex and condition variable */
  72. if ( pthread_mutex_init(&(pool->lock), NULL ) != 0 ||
  73. pthread_mutex_init(&(pool->thread_counter), NULL ) !=0 ||
  74. pthread_cond_init(&(pool->queue_not_empty), NULL ) !=0 ||
  75. pthread_cond_init(&(pool->queue_not_full), NULL ) !=0)
  76. {
  77. printf( "init lock or cond false;\n" );
  78. break;
  79. }
  80. /* Start min_thr_num worker threads */
  81. for (i=0; i<min_thr_num; i++)
  82. {
  83. /* pool points to the current thread pool. The threadpool_thread function will be explained later. */
  84. pthread_create(&(pool->threads[i]), NULL , threadpool_thread, (void *)pool);
  85. printf( "start thread 0x%x... \n" , (unsigned int )pool->threads[i]);
  86. }
  87. /* The admin_thread function will be explained later */
  88. pthread_create(&(pool->admin_tid), NULL , admin_thread, (void *)pool);
  89. return pool;
  90. } while(0);
  91. /* Release the pool space */
  92. threadpool_free(pool);
  93. return   NULL ;
  94. }

2. Thread Array

The thread array is actually a space for storing a bunch of thread tids when the thread pool is initialized. It logically forms a pool in which threads created in advance are placed. This space contains working threads, threads waiting to work (idle threads), threads waiting to be destroyed, and declared but not initialized thread spaces.


  1. /*Worker thread*/
  2. void *
  3. threadpool_thread(void *threadpool)
  4. {
  5. threadpool_t *pool = (threadpool_t *)threadpool;
  6. threadpool_task_t task;
  7. while ( true )
  8. {
  9. pthread_mutex_lock(&(pool->lock));
  10. /* If there is no task, it will be blocked on "task queue is not empty", and if there is a task, it will jump out */
  11. while ((pool->queue_size == 0) && (!pool->shutdown))
  12. {
  13. printf( "thread 0x%x is waiting \n" , (unsigned int )pthread_self());
  14. pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
  15. /* Determine whether to clear the thread, suicide function */
  16. if (pool->wait_exit_thr_num > 0)
  17. {
  18. pool->wait_exit_thr_num --;  
  19. /* Determine whether the number of threads in the thread pool is greater than the minimum number of threads, if so, terminate the current thread */
  20. if (pool->live_thr_num > pool->min_thr_num)
  21. {
  22. printf( "thread 0x%x is exiting \n" , (unsigned int )pthread_self());
  23. pool->live_thr_num --;  
  24. pthread_mutex_unlock(&(pool->lock));
  25. pthread_exit( NULL ); //End thread
  26. }
  27. }
  28. }
  29. /* Thread pool switch status */
  30. if (pool->shutdown) //Close the thread pool
  31. {
  32. pthread_mutex_unlock(&(pool->lock));
  33. printf( "thread 0x%x is exiting \n" , (unsigned int )pthread_self());
  34. pthread_exit( NULL ); //The thread ends itself
  35. }
  36. //Otherwise the thread can take out the task
  37. task.function = pool->task_queue[pool->queue_front] .function ; //Dequeue operation
  38. task.arg = pool->task_queue[pool->queue_front].arg;
  39. pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //Ring structure
  40. pool->queue_size --;  
  41. //Notify that new tasks can be added
  42. pthread_cond_broadcast(&(pool->queue_not_full));
  43. //Release thread lock
  44. pthread_mutex_unlock(&(pool->lock));
  45. //Execute the task just taken out
  46. printf( "thread 0x%x start working \n" , (unsigned int )pthread_self());
  47. pthread_mutex_lock(&(pool->thread_counter)); //Lock the busy thread variable
  48. pool->busy_thr_num++;
  49. pthread_mutex_unlock(&(pool->thread_counter));
  50. (*(task. function ))(task.arg); //Execute task
  51. //End of task processing
  52. printf( "thread 0x%x end working \n" , (unsigned int )pthread_self());
  53. pthread_mutex_lock(&(pool->thread_counter));
  54. pool->busy_thr_num --;  
  55. pthread_mutex_unlock(&(pool->thread_counter));
  56. }
  57. pthread_exit( NULL );
  58. }

3. Task Queue

The existence of task queue is similar to that of thread array. When the thread pool is initialized, space is allocated according to the maximum number of tasks passed in. When the server receives requests, the messages are classified and packaged into tasks, and the tasks are put into the task queue and the idle threads are notified to take them. The difference is that the task queue has a clear order of priority, first in first out. The threads in the thread array are in a competitive relationship to get the mutex lock and fight for tasks.


  1. /*Add a task to the thread pool's task queue*/
  2. int  
  3. threadpool_add_task(threadpool_t *pool, void *(* function )(void *arg), void *arg)
  4. {
  5. pthread_mutex_lock(&(pool->lock));
  6. /*If the queue is full, call wait to block*/
  7. while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
  8. pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
  9. /*If the thread pool is closed*/
  10. if (pool->shutdown)
  11. {
  12. pthread_mutex_unlock(&(pool->lock));
  13. return -1;
  14. }
  15. /*Clear the parameter arg of the callback function of the working thread*/
  16. if (pool->task_queue[pool->queue_rear].arg != NULL )
  17. {
  18. free (pool->task_queue[pool->queue_rear].arg);
  19. pool->task_queue[pool->queue_rear].arg = NULL ;
  20. }
  21. /*Add task to task queue*/
  22. pool->task_queue[pool->queue_rear]. function = function ;
  23. pool->task_queue[pool->queue_rear].arg = arg;
  24. pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* logical ring */
  25. pool->queue_size++;
  26. /*After adding the task, the queue is no longer empty, and a thread in the thread pool is awakened*/
  27. pthread_cond_signal(&(pool->queue_not_empty));
  28. pthread_mutex_unlock(&(pool->lock));
  29. return 0;
  30. }

4. Manager Thread

As the manager of the thread pool, the main functions of this thread include: checking the survival status and working status of threads in the thread pool; dynamically adding or deleting threads according to the current request status of the server to ensure that the number of threads in the thread pool is maintained at a reasonable and efficient balance;

In the final analysis, it is a separate thread that checks periodically and adds and removes threads based on our balance-maintaining algorithm;

  1. /*Manage threads*/
  2. void *
  3. admin_thread(void *threadpool)
  4. {
  5. int i;
  6. threadpool_t *pool = (threadpool_t *)threadpool;
  7. while (!pool->shutdown)
  8. {
  9. printf( "admin ------------------\n" );
  10. sleep(DEFAULT_TIME); /*Manage after a while*/
  11. pthread_mutex_lock(&(pool->lock)); /*lock*/
  12. int queue_size = pool->queue_size; /*Number of tasks*/
  13. int live_thr_num = pool->live_thr_num; /*Number of live threads*/
  14. pthread_mutex_unlock(&(pool->lock)); /*unlock*/
  15. pthread_mutex_lock(&(pool->thread_counter));
  16. int busy_thr_num = pool->busy_thr_num; /*Number of busy threads*/
  17. pthread_mutex_unlock(&(pool->thread_counter));
  18. printf( "admin busy live -%d--%d-\n" , busy_thr_num, live_thr_num);
  19. /*Create a new thread. The actual number of tasks is greater than the minimum number of tasks waiting, and the number of surviving threads is less than the maximum number of threads*/
  20. if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
  21. {
  22. printf( "admin add-----------\n" );
  23. pthread_mutex_lock(&(pool->lock));
  24. int   add =0;
  25. /*Add DEFAULT_THREAD_NUM threads at a time*/
  26. for (i=0; i<pool->max_thr_num && add <DEFAULT_THREAD_NUM
  27. && pool->live_thr_num < pool->max_thr_num; i++)
  28. {
  29. if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
  30. {
  31. pthread_create(&(pool->threads[i]), NULL , threadpool_thread, (void *)pool);
  32. add ++;
  33. pool->live_thr_num++;
  34. printf( "new thread -----------------------\n" );
  35. }
  36. }
  37. pthread_mutex_unlock(&(pool->lock));
  38. }
  39. /*Destroy redundant threads. Busy threads x2 are less than surviving threads, and the number of surviving threads is greater than the minimum number of threads*/
  40. if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)
  41. {
  42. // printf( "admin busy --%d--%d----\n" , busy_thr_num, live_thr_num);
  43. /*Destroy DEFAULT_THREAD_NUM threads at a time*/
  44. pthread_mutex_lock(&(pool->lock));
  45. pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
  46. pthread_mutex_unlock(&(pool->lock));
  47. for (i=0; i<DEFAULT_THREAD_NUM; i++)
  48. {
  49. //Notify the idle thread to commit suicide
  50. pthread_cond_signal(&(pool->queue_not_empty));
  51. printf( "admin cler --\n" );
  52. }
  53. }
  54. }
  55. return   NULL ;
  56. /*Is the thread alive*/
  57. int  
  58. is_thread_alive(pthread_t tid)
  59. {
  60. int kill_rc = pthread_kill(tid, 0); //Send signal 0 to test whether it is alive
  61. if (kill_rc == ESRCH) //thread does not exist
  62. {
  63. return   false ;
  64. }
  65. return   true ;
  66. }

5. Release

  1. /*Release thread pool*/
  2. int  
  3. threadpool_free(threadpool_t *pool)
  4. {
  5. if (pool == NULL )
  6. return -1;
  7. if (pool->task_queue)
  8. free (pool->task_queue);
  9. if (pool->threads)
  10. {
  11. free (pool->threads);
  12. pthread_mutex_lock(&(pool->lock)); /*Lock first and then destroy*/
  13. pthread_mutex_destroy(&(pool->lock));
  14. pthread_mutex_lock(&(pool->thread_counter));
  15. pthread_mutex_destroy(&(pool->thread_counter));
  16. pthread_cond_destroy(&(pool->queue_not_empty));
  17. pthread_cond_destroy(&(pool->queue_not_full));
  18. }
  19. free (pool);
  20. pool = NULL ;
  21. return 0;
  22. }
  23. /*Destroy thread pool*/
  24. int  
  25. threadpool_destroy(threadpool_t *pool)
  26. {
  27. int i;
  28. if (pool == NULL )
  29. {
  30. return -1;
  31. }
  32. pool->shutdown = true ;
  33. /*Destroy the manager thread*/
  34. pthread_join(pool->admin_tid, NULL );
  35. //Notify all threads to commit suicide (while taking on their own tasks)
  36. for (i=0; i<pool->live_thr_num; i++)
  37. {
  38. pthread_cond_broadcast(&(pool->queue_not_empty));
  39. }
  40. /*Wait for the thread to end. First pthread_exit and then wait for it to end*/
  41. for (i=0; i<pool->live_thr_num; i++)
  42. {
  43. pthread_join(pool->threads[i], NULL );
  44. }
  45. threadpool_free(pool);
  46. return 0;
  47. }

6. Interface

  1. /* Thread pool initialization, its manager thread and worker thread will be started */
  2. threadpool_t *thp = threadpool_create(10, 100, 100);
  3. printf( "threadpool init ... ... \n" );
  4. /* Add after receiving the task */
  5. threadpool_add_task(thp, do_work, (void *)p);
  6. // ... ...
  7. /* Destroy */
  8. threadpool_destroy(thp);

<<:  5G helps: Five future development trends of smart transportation

>>:  IDC predicts that the domestic Wi-Fi 6 market will be close to US$200 million in 2020

Recommend

Strong partner ecosystem helps Denodo grow in Greater China

Beijing, March 10, 2021 - Denodo, a leader in dat...

Security teams’ need for flow data drives NetSecOps collaboration

You’ve probably heard about the fact that network...

Learning and remembering: RPC framework calling process

Introduction to RPC RPC is the abbreviation of Re...

South Korean operators to invest $22 billion in 5G networks by 2022

South Korean mobile operators SK Telecom, KT and ...

Why do 5G mobile phones support more frequency bands?

How many 5G frequency bands a mobile phone can su...

From ServiceMesh to Decentralized SOA Bus

I have talked about service mesh, API gateway and...

After unplugging the network cable, does the original TCP connection still exist?

Hello everyone, I am Xiaolin. Today, let’s talk a...

...

A brief discussion on Wi-Fi Mesh network in home IoT

Author: Fan Deyang, unit: China Mobile Smart Home...

GSA: A total of 122 5G commercial networks have been launched worldwide

As technical standards and specifications are det...