今天因为工作需要,需要帮同事用C语言(不是C++)写一个生产者消费者的任务队列工具库,考虑到不能使用任何第三库和C++的任何特性,所以我将任务队列做成一个链表,生产者在队列尾部加入任务,消费者在队列头部取出任务。很快就写好了,代码如下:
/**
* 线程池工具, ctrip_thread_pool.h
* zhangyl 2018.03.23
*/
#ifndef __CTRIP_THREAD_POOL_H__
#define __CTRIP_THREAD_POOL_H__
#include <pthread.h>
#ifndef NULL
#define NULL 0
#endif
#define PUBLIC
PUBLIC struct ctrip_task
{
struct ctrip_task* pNext;
int value;
};
struct ctrip_thread_info
{
//线程退出标志
int thread_running;
int thread_num;
int tasknum;
struct ctrip_task* tasks;
pthread_t* threadid;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
/* 初始化线程池线程数目
* @param thread_num 线程数目, 默认为8个
*/
PUBLIC void ctrip_init_thread_pool(int thread_num);
/* 销毁线程池
*/
PUBLIC void ctrip_destroy_thread_pool();
/**向任务池中增加一个任务
* @param t 需要增加的任务
*/
PUBLIC void ctrip_thread_pool_add_task(struct ctrip_task* t);
/**从任务池中取出一个任务
* @return 返回得到的任务
*/
struct ctrip_task* ctrip_thread_pool_retrieve_task();
/**执行任务池中的任务
* @param t 需要执行的任务
*/
PUBLIC void ctrip_thread_pool_do_task(struct ctrip_task* t);
/**线程函数
* @param thread_param 线程参数
*/
void* ctrip_thread_routine(void* thread_param);
#endif //!__CTRIP_THREAD_POOL_H__
/**
* 线程池工具, ctrip_thread_pool.c
* zhangyl 2018.03.23
*/
#include "ctrip_thread_pool.h"
#include <stdio.h>
#include <stdlib.h>
struct ctrip_thread_info g_threadinfo;
int thread_running = 0;
void ctrip_init_thread_pool(int thread_num)
{
if (thread_num <= 0)
thread_num = 5;
pthread_mutex_init(&g_threadinfo.mutex, NULL);
pthread_cond_init(&g_threadinfo.cond, NULL);
g_threadinfo.thread_num = thread_num;
g_threadinfo.thread_running = 1;
g_threadinfo.tasknum = 0;
g_threadinfo.tasks = NULL;
thread_running = 1;
g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);
int i;
for (i = 0; i < thread_num; ++i)
{
pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL);
}
}
void ctrip_destroy_thread_pool()
{
g_threadinfo.thread_running = 0;
thread_running = 0;
pthread_cond_broadcast(&g_threadinfo.cond);
int i;
for (i = 0; i < g_threadinfo.thread_num; ++i)
{
pthread_join(g_threadinfo.threadid[i], NULL);
}
free(g_threadinfo.threadid);
pthread_mutex_destroy(&g_threadinfo.mutex);
pthread_cond_destroy(&g_threadinfo.cond);
}
void ctrip_thread_pool_add_task(struct ctrip_task* t)
{
if (t == NULL)
return;
pthread_mutex_lock(&g_threadinfo.mutex);
struct ctrip_task* head = g_threadinfo.tasks;
if (head == NULL)
g_threadinfo.tasks = t;
else
{
while (head->pNext != NULL)
{
head = head->pNext;
}
head->pNext = t;
}
++g_threadinfo.tasknum;
//当有变化后,使用signal通知wait函数
pthread_cond_signal(&g_threadinfo.cond);
pthread_mutex_unlock(&g_threadinfo.mutex);
}
struct ctrip_task* ctrip_thread_pool_retrieve_task()
{
struct ctrip_task* head = g_threadinfo.tasks;
if (head != NULL)
{
g_threadinfo.tasks = head->pNext;
--g_threadinfo.tasknum;
printf("retrieve a task, task value is [%d]\n", head->value);
return head;
}
printf("no task\n");
return NULL;
}
void* ctrip_thread_routine(void* thread_param)
{
printf("thread NO.%d start.\n", (int)pthread_self());
while (thread_running/*g_threadinfo.thread_running*/)
{
struct ctrip_task* current = NULL;
pthread_mutex_lock(&g_threadinfo.mutex);
while (g_threadinfo.tasknum <= 0)
{
//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。
//当变化后,条件合适,将直接获得锁。
pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);
if (!g_threadinfo.thread_running)
break;
current = ctrip_thread_pool_retrieve_task();
if (current != NULL)
break;
}// end inner-while-loop
pthread_mutex_unlock(&g_threadinfo.mutex);
ctrip_thread_pool_do_task(current);
}// end outer-while-loop
printf("thread NO.%d exit.\n", (int)pthread_self());
}
void ctrip_thread_pool_do_task(struct ctrip_task* t)
{
if (t == NULL)
return;
//TODO: do your work here
printf("task value is [%d]\n", t->value);
//TODO:如果t需要释放,记得在这里释放
}
测试代码如下:
// ctrip_thread_pool.cpp : Defines the entry point for the console application.
//
//#include "stdafx.h"
#include "ctrip_thread_pool.h"
#include <stdlib.h>
#include <unistd.h>
int main(int argc, char* argv[])
{
ctrip_init_thread_pool(5);
struct ctrip_task* task = NULL;
int i;
for (i = 0; i < 100; ++i)
{
task = (struct ctrip_task*)malloc(sizeof(struct ctrip_task));
task->value = i + 1;
task->pNext = NULL;
printf("add task, task value [%d]\n", task->value);
ctrip_thread_pool_add_task(task);
}
sleep(10);
ctrip_destroy_thread_pool();
return 0;
}
代码很快就写好了,但是每次程序只能执行前几个加到任务池子里面的任务,导致池子有不少任务积累在池子里面。甚是奇怪,我也看了半天才看出结果。聪明的你,能看出上述代码为啥只能执行加到池子里面的前几个任务?先不要看答案,自己想一会儿。
linux条件变量是做后台开发必须熟练掌握的基础知识,而条件变量使用存在以下几个非常让人迷惑的地方,讲解如下
第一、必须要结合一个互斥体一起使用。使用流程如下:
pthread_mutex_lock(&g_threadinfo.mutex)
pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);
pthread_mutex_unlock(&g_threadinfo.mutex);
上面的代码,我们分为一二三步,当条件不满足是pthread_cond_wait会挂起线程,但是不知道你有没有注意到,如果在第二步挂起线程的话,第一步的mutex已经被上锁,谁来解锁?mutex的使用原则是谁上锁谁解锁,所以不可能在其他线程来给这个mutex解锁,但是这个线程已经挂起了,这就死锁了。所以pthread_cond_wait在挂起之前,额外做的一个事情就是给绑定的mutex解锁。反过来,如果条件满足,pthread_cond_wait不挂起线程,pthread_cond_wait将什么也不做,这样就接着走pthread_mutex_unlock解锁的流程。而在这个加锁和解锁之间的代码就是我们操作受保护资源的地方。
第二,不知道你有没有注意到pthread_cond_wait是放在一个while循环里面的:
pthread_mutex_lock(&g_threadinfo.mutex);
while (g_threadinfo.tasknum <= 0)
{
//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。
//当变化后,条件合适,将直接获得锁。
pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);
if (!g_threadinfo.thread_running)
break;
current = ctrip_thread_pool_retrieve_task();
if (current != NULL)
break;
}// end inner-while-loop
pthread_mutex_unlock(&g_threadinfo.mutex);
注意,我说的是内层的while循环,不是外层的。pthread_cond_wait一定要放在一个while循环里面吗?一定要的。这里有一个非常重要的关于条件变量的基础知识,叫条件变量的虚假唤醒(spurious wakeup),那啥叫条件变量的虚假唤醒呢?假设pthread_cond_wait不放在这个while循环里面,正常情况下,pthread_cond_wait因为条件不满足,挂起线程。然后,外部条件满足以后,调用pthread_cond_signal或pthread_cond_broadcast来唤醒挂起的线程。这没啥问题。但是条件变量可能在某些情况下也被唤醒,这个时候pthread_cond_wait处继续往下执行,但是这个时候,条件并不满足(比如任务队列中仍然为空)。这种唤醒我们叫“虚假唤醒”。为了避免虚假唤醒时,做无意义的动作,我们将pthread_cond_wait放到while循环条件中,这样即使被虚假唤醒了,由于while条件(比如任务队列是否为空,资源数量是否大于0)仍然为true,导致线程进行继续挂起。有人说条件变量是最不可能用错的线程之间同步技术,我却觉得这是最容易使用错误的线程之间同步技术。
上述代码存在的问题是,只考虑了任务队列开始为空,生产者后来添加了任务,条件变量被唤醒,然后消费者取任务执行的逻辑。假如一开始池中就有任务呢?这个原因导致,只有开始的几个添加到任务队列中任务被执行。因为一旦任务队列不为空。内层while循环条件将不再满足,导致消费者线程不再从任务队列中取任务消费。正确的代码如下:
/**
* 线程池工具, ctrip_thread_pool.c(修正后的代码)
* zhangyl 2018.03.23
*/
#include "ctrip_thread_pool.h"
#include <stdio.h>
#include <stdlib.h>
struct ctrip_thread_info g_threadinfo;
void ctrip_init_thread_pool(int thread_num)
{
if (thread_num <= 0)
thread_num = 5;
pthread_mutex_init(&g_threadinfo.mutex, NULL);
pthread_cond_init(&g_threadinfo.cond, NULL);
g_threadinfo.thread_num = thread_num;
g_threadinfo.thread_running = 1;
g_threadinfo.tasknum = 0;
g_threadinfo.tasks = NULL;
g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);
int i;
for (i = 0; i < thread_num; ++i)
{
pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL);
}
}
void ctrip_destroy_thread_pool()
{
g_threadinfo.thread_running = 0;
pthread_cond_broadcast(&g_threadinfo.cond);
int i;
for (i = 0; i < g_threadinfo.thread_num; ++i)
{
pthread_join(g_threadinfo.threadid[i], NULL);
}
free(g_threadinfo.threadid);
pthread_mutex_destroy(&g_threadinfo.mutex);
pthread_cond_destroy(&g_threadinfo.cond);
}
void ctrip_thread_pool_add_task(struct ctrip_task* t)
{
if (t == NULL)
return;
pthread_mutex_lock(&g_threadinfo.mutex);
struct ctrip_task* head = g_threadinfo.tasks;
if (head == NULL)
g_threadinfo.tasks = t;
else
{
while (head->pNext != NULL)
{
head = head->pNext;
}
head->pNext = t;
}
++g_threadinfo.tasknum;
//当有变化后,使用signal通知wait函数
pthread_cond_signal(&g_threadinfo.cond);
pthread_mutex_unlock(&g_threadinfo.mutex);
}
struct ctrip_task* ctrip_thread_pool_retrieve_task()
{
struct ctrip_task* head = g_threadinfo.tasks;
if (head != NULL)
{
g_threadinfo.tasks = head->pNext;
--g_threadinfo.tasknum;
printf("retrieve a task, task value is [%d]\n", head->value);
return head;
}
printf("no task\n");
return NULL;
}
void* ctrip_thread_routine(void* thread_param)
{
printf("thread NO.%d start.\n", (int)pthread_self());
while (g_threadinfo.thread_running)
{
struct ctrip_task* current = NULL;
pthread_mutex_lock(&g_threadinfo.mutex);
while (g_threadinfo.tasknum <= 0)
{
//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。
//当变化后,条件合适,将直接获得锁。
pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);
if (!g_threadinfo.thread_running)
break;
}// end inner-while-loop
current = ctrip_thread_pool_retrieve_task();
pthread_mutex_unlock(&g_threadinfo.mutex);
ctrip_thread_pool_do_task(current);
}// end outer-while-loop
printf("thread NO.%d exit.\n", (int)pthread_self());
}
void ctrip_thread_pool_do_task(struct ctrip_task* t)
{
if (t == NULL)
return;
//TODO: do your work here
printf("task value is [%d]\n", t->value);
//TODO:如果t需要释放,记得在这里释放
}
ok,不知道你有没有看明白呀?