条件变量是线程同步的另一种方式,实际上,条件变量是信号量的底层实现,这也就意味着,使用条件变量可以拥有更大的自由度,同时也就需要更加小心的进行同步操作。条件变量使用的条件本身是需要使用互斥量进行保护的,线程在改变条件状态之前必须首先锁住互斥量,其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定之后才能计算条件。
模型
#include<pthread.h>
pthread_t cond //准备条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化静态的条件变量
pthread_cond_init() //初始化一个动态的条件变量
pthread_cond_wait() //等待条件变量变为真
pthread_cond_timedwait() //等待条件变量变为真,等待有时间限制。
pthread_cond_signal() //至少唤醒一个等待该条件的线程
pthread_cond_broadcast() //唤醒等待该条件的所有线程
pthread_cond_destroy() //销毁一个条件变量
pthread_cond_init()
//初始化一个动态的条件变量
//成功返回0,失败返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
#include<pthread.h>
pthread_t cond //准备条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化静态的条件变量
pthread_cond_init() //初始化一个动态的条件变量
pthread_cond_wait() //等待条件变量变为真
pthread_cond_timedwait() //等待条件变量变为真,等待有时间限制。
pthread_cond_signal() //至少唤醒一个等待该条件的线程
pthread_cond_broadcast() //唤醒等待该条件的所有线程
pthread_cond_destroy() //销毁一个条件变量
//初始化一个动态的条件变量
//成功返回0,失败返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
cond:条件变量指针,这里使用了restrict关键字
attr:条件变量属性指针,默认属性赋NULL
pthread_cond_wait() / pthread_cond_timedwait()
//等待条件变量为真。收到pthread_cond_broadcast()或pthread_cond_signal()就唤醒
//成功返回0,失败返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
//等待条件变量为真。收到pthread_cond_broadcast()或pthread_cond_signal()就唤醒
//成功返回0,失败返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
条件变量的使在多线程的程序中,因为各个线程都可以访问大部分的进程资源,所以我们为了保证公有资源的使用是可以控制的,在一个线程开始使用公有资源之前要尝试获取互斥锁,在使用完毕之后要释放互斥锁,这样就能保证每个公共资源在一个时刻都只能被一个线程使用,在一定程度上得到了控制,但这并不能解决同步的问题,考虑如下两个线程:
Q=create_queue();
pthread_t mutex
//线程A,入队
while(1){
lock(mutex);
in_queue(Q);
unlock(mutex);
}
//线程B,出队
while(1){
lock(mutex);
out_queue(Q);
unlock(mutex);
}
上述代码可以实现两个线程的互斥,即同一时刻只有一个线程在使用公有资源-队列。但如果线程B获取了锁,但队列中是空的,它的out_queue()也就是没有意义的,所以我们这里更需要一种方法将两个线程进行同步:只有当队列中有数据的时候才进行出队。
我们设计这样一种逻辑:
//线程B,出队
while(1){
lock(mutex);
if(队列是空,线程不应该执行){
释放锁;
continue;
}
out_queue(Q);
unlock(mutex);
}
这个程序就解决了上述的问题,即便线程B抢到了互斥锁,但是如果队列是空的,他就释放锁让两个线程重新抢锁,希望这次线程A能抢到并往里放一个数据。
但这个逻辑还有一个问题,就是多线程并发的问题,很有可能发生的一种情况是:线程B抢到了锁,发现没有数据,释放锁->线程A立即抢到了锁并往里放了一个数据->线程B执行continue,显然,这种情况下是不应该continue的,因为线程B想要的条件在释放锁之后立即就被满足了,它错过了条件。
So,我们想一种反过来的逻辑:
//线程B,出队
while(1){
lock(mutex);
if(队列是空,线程不应该执行){
continue;
释放锁;
}
out_queue(Q);
unlock(mutex);
}
显然这种方法有个致命的问题:一旦continue了,线程B自己获得锁就没有被释放,这样线程A不可能抢到锁,而B继续加锁就会形成死锁!
Finaly,我们希望看到一个函数fcn,如果条件不满足,能同时释放锁+停止执行线程,如果条件满足,自己当时获得的锁还在
//线程B,出队
while(1){
lock(mutex);
fcn(当前线程不应该执行,mutex) //if(当前线程不应该执行){释放锁“同时” continue;}
out_queue(Q);
unlock(mutex);
}
OK,这个就是pthread_cond_wait()的原理了,只不过它把continue变成了”休眠”这种由OS负责的操作,可以大大的节约资源。
当然,线程执行的条件是很难当作参数传入一个函数的,POSIX多线程的模型使用系统提供的”条件变量”+”我们自己定义的具体条件” 来确定一个线程是否应该执行接下来的内容。”条件变量”只有真和假,所以一种典型的多线程同步的结构如下
//线程B,出队
while(1){
lock(mutex);
while(条件不满足)
pthread_cond_wait(cond,mutex)
//获得互斥锁可以同时保护while里的条件和cond的判断,二者总是用一把锁保护,并一同释放
//cond为假,就休眠同时释放锁,等待被cond为真唤醒,把自己获得的锁拿回来
//拿回自己的锁再检查线程执行条件,条件不满足继续循环,直到条件满足跳出循环
//这个函数是带着"线程的执行条件为真"+"cond为真"走出循环的
//这个函数返回后cond被重新设置为0
out_queue(Q);
unlock(mutex);
}
pthread_cond_braoadcast()/pthread_cond_signal()
//使条件变量为真并唤醒wait中的线程,前者唤醒所有wait的,后者唤醒一个
//成功返回0,失败返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_destroy()
//销毁条件变量
//成功返回0,失败返回error number
int pthread_cond_destroy(pthread_cond_t *cond);
例子-线程池
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100
typedef struct{//每个节点的封装格式,fcn是用户自定义函数,arg是用户自定义函数参数指针,保证通用性声明为void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //用户自定义函数的参数结构体
int x;
}argfcn_t;
//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t
#endif //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"
//互斥量和条件变量
pthread_mutex_t lock;
pthread_cond_t cond;
//全局的表
lqueue_t* Q;
//每个线程的任务,必须是这个形式的函数指针
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到广播,因为延迟,可能醒了好几个,要判断一下是不是自己
pthread_cond_wait(&cond,&lock); //先抢到锁再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
}
//创建线程池
void create_pool(void){
//初始化队列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化条件变量
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
}
//准备函数
void* fcn(void* parg){ //用户自定义的需要线程执行的函数
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1\n");
printf("task1:%d\n",i->x);
}
//添加任务
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg;
in_lqueue(Q,task);
pthread_cond_signal(&cond); //添加了一个任务,用signal更好
}
int main(int argc, const char *argv[])
{
//创建线程池
create_pool();
//准备参数
argfcn_t argfcn;
argfcn.x=5;
//添加任务
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}
//使条件变量为真并唤醒wait中的线程,前者唤醒所有wait的,后者唤醒一个
//成功返回0,失败返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
//销毁条件变量
//成功返回0,失败返回error number
int pthread_cond_destroy(pthread_cond_t *cond);
例子-线程池
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100
typedef struct{//每个节点的封装格式,fcn是用户自定义函数,arg是用户自定义函数参数指针,保证通用性声明为void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //用户自定义函数的参数结构体
int x;
}argfcn_t;
//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t
#endif //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"
//互斥量和条件变量
pthread_mutex_t lock;
pthread_cond_t cond;
//全局的表
lqueue_t* Q;
//每个线程的任务,必须是这个形式的函数指针
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到广播,因为延迟,可能醒了好几个,要判断一下是不是自己
pthread_cond_wait(&cond,&lock); //先抢到锁再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
}
//创建线程池
void create_pool(void){
//初始化队列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化条件变量
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
}
//准备函数
void* fcn(void* parg){ //用户自定义的需要线程执行的函数
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1\n");
printf("task1:%d\n",i->x);
}
//添加任务
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg;
in_lqueue(Q,task);
pthread_cond_signal(&cond); //添加了一个任务,用signal更好
}
int main(int argc, const char *argv[])
{
//创建线程池
create_pool();
//准备参数
argfcn_t argfcn;
argfcn.x=5;
//添加任务
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100
typedef struct{//每个节点的封装格式,fcn是用户自定义函数,arg是用户自定义函数参数指针,保证通用性声明为void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //用户自定义函数的参数结构体
int x;
}argfcn_t;
//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t
#endif //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"
//互斥量和条件变量
pthread_mutex_t lock;
pthread_cond_t cond;
//全局的表
lqueue_t* Q;
//每个线程的任务,必须是这个形式的函数指针
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到广播,因为延迟,可能醒了好几个,要判断一下是不是自己
pthread_cond_wait(&cond,&lock); //先抢到锁再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
}
//创建线程池
void create_pool(void){
//初始化队列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化条件变量
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
}
//准备函数
void* fcn(void* parg){ //用户自定义的需要线程执行的函数
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1\n");
printf("task1:%d\n",i->x);
}
//添加任务
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg;
in_lqueue(Q,task);
pthread_cond_signal(&cond); //添加了一个任务,用signal更好
}
int main(int argc, const char *argv[])
{
//创建线程池
create_pool();
//准备参数
argfcn_t argfcn;
argfcn.x=5;
//添加任务
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-11/136664.htm