感谢支持
我们一直在努力

Linux工作队列实现机制

工作项、工作队列和工作者线程

推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events


工作队列(work queue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。


通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则
如果推后执行的任务需要睡眠,那么只能选择工作队列;
如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时;
如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程;
如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。


实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。

工作队列使用

相关文件:


kernel/include/linux/workqueue.h


Kernel/kernel/workqueue.c

工作队列的创建

要使用工作队列,需要先创建工作项,有两种方式:


1)静态创建:


DECLARE_WORK(name,function); 定义正常执行的工作项


DECLARE_DELAYED_WORK(name,function); 定义延后执行的工作项


2)动态创建,运行时创建:


通常在probe()函数中执行下面的操作来初始化工作项:


INIT_WORK(&work, new_ts_work);


INIT_DELAYED_WORK(&led_work,s0340_ledtime_scanf);


工作队列待执行的函数原型是:


typedef void(*work_func_t)(structwork_struct *work);


这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。


创建了工作项之后,在适当的时候可以通过下面的两种方式来提交工作项给工作者线程,通常我们使用的工作队列和工作者线程都是系统初始化时候默认创建的。

工作队列的调度运行

schedule_work(&work)


&work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行


schedule_delayed_work(&delay_work,delay);


&delay_work指向的delay_work直到delay指定的时钟节拍用完以后才会执行。


eg:


schedule_delayed_work(&kpd_backlight_work,msecs_to_jiffies(300));

默认工作队列和工作者线程创建过程

系统默认的工作队列名称是:keventd_wq,默认的工作者线程叫:events/n这里的n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。


默认的工作者线程会从多个地方得到被推后的工作。许多内核驱动程序都把它们的下半部交给默认的工作者线程去做。除非一个驱动程序或者子系统必须建立一个属于它自己的内核线程,否则最好使用默认线程。不过并不存在什么东西能够阻止代码创建属于自己的工作者线程。如果你需要在工作者线程中执行大量的处理操作,这样做或许会带来好处。处理器密集型和性能要求严格的任务会因为拥有自己的工作者线程而获得好处。


默认的工作队列keventd_wq只有一个,但是其工作者线程在每一个cpu上都有。而标记为singlethread的工作者线程最存在于一个cpu上。


关于默认工作队列keventd_wq和工作者线程events/n的建立在文件Kernel/kernel/workqueue.c中实现。


Start_kernel()–>rest_init(),该函数中创建了两个内核线程kernel_initkthreadd,这两个线程都和本文描述的部分有关系,先说说kernel_init


kernel_init()–>do_basic_setup()–>init_workqueues(),该函数中创建了上面提到的默认工作队列和工作者线程。


init_workqueues()–>


–>hotcpu_notifier(workqueue_cpu_callback,0);


–>keventd_wq=create_workqueue(“events”);


注册的cpu通知链cpu_chain上的回调函数是workqueue_cpu_callback(),raw_notifier_call_chain()函数用来调用cpu_chain上的所有回调函数。


这里主要关注的是函数:create_workqueue(“events”);


@kernel/include/linux/workqueue.h


#define__create_workqueue(name,singlethread,freezeable,rt)/


__create_workqueue_key((name),(singlethread),(freezeable),(rt),/NULL,NULL)


#definecreate_workqueue(name)__create_workqueue((name),0,0,0)


#definecreate_rt_workqueue(name)__create_workqueue((name),0,0,1)


#definecreate_freezeable_workqueue(name)__create_workqueue((name),1,1,0)


#definecreate_singlethread_workqueue(name)__create_workqueue((name),1,0,0)


从宏__create_workqueue的参数可以看出,可以通过传递不同的参数:是否单cpu线程,是否可冻结,是否实时来创建不同类型的工作队列和工作者线程。


work_struct工作项结构体定义:@kernel/include/linux/workqueue.h


工作队列workqueue_struct结构体:@kernel/kernel/workqueue.c

分析1

关键函数__create_workqueue_key()分析:


struct workqueue_struct *__create_workqueue_key(const char *name,


int singlethread,


int freezeable,


int rt,


struct lock_class_key *key,


const char *lock_name)


{


struct workqueue_struct *wq;


struct cpu_workqueue_struct *cwq;


int err = 0, cpu;


wq = kzalloc(sizeof(*wq), GFP_KERNEL);


if (!wq)


return NULL;


wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);


if (!wq->cpu_wq) {


kfree(wq);


return NULL;


}


wq->name = name;


lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);


wq->singlethread = singlethread;


wq->freezeable = freezeable;


wq->rt = rt;


INIT_LIST_HEAD(&wq->list);


if (singlethread) {// 创建单模块线程


cwq = init_cpu_workqueue(wq, singlethread_cpu);  note -1


//初始化cpu_workqueue_struct结构体 cwq


// singlethread_cpu — the first cpu in a cpumask


err = create_workqueue_thread(cwq, singlethread_cpu);  note 0


// p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);  note 1


// trace_workqueue_creation(cwq->thread, cpu); note 2


start_workqueue_thread(cwq, -1);  // run this thread


} else {


cpu_maps_update_begin();


/*


 * We must place this wq on list even if the code below fails.


 * cpu_down(cpu) can remove cpu from cpu_populated_map before


 * destroy_workqueue() takes the lock, in that case we leak


 * cwq[cpu]->thread.


 */


spin_lock(&workqueue_lock);


list_add(&wq->list, &workqueues);


spin_unlock(&workqueue_lock);


/*


 * We must initialize cwqs for each possible cpu even if we


 * are going to call destroy_workqueue() finally. Otherwise


 * cpu_up() can hit the uninitialized cwq once we drop the


 * lock.


 */


for_each_possible_cpu(cpu) {// 为每个cpu都建立一个对应的线程


cwq = init_cpu_workqueue(wq, cpu);


if (err || !cpu_online(cpu))


continue;


err = create_workqueue_thread(cwq, cpu);


start_workqueue_thread(cwq, cpu);


}


cpu_maps_update_done();


}


if (err) {


destroy_workqueue(wq);


wq = NULL;


}


return wq;


}


Note -1: @ kernel/kernel/workqueue.c


static struct cpu_workqueue_struct *


init_cpu_workqueue(struct workqueue_struct *wq, int cpu)


{


struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);


cwq->wq = wq;


spin_lock_init(&cwq->lock);


INIT_LIST_HEAD(&cwq->worklist);// 初始化工作项列表使用时提交的工作项都是挂接在这个链表上的


init_waitqueue_head(&cwq->more_work);


// 初始化等待队列头


return cwq;


}


Note 0: @ kernel/kernel/workqueue.c


static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)


{


struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };


struct workqueue_struct *wq = cwq->wq;


const char *fmt = is_wq_single_threaded(wq) ? “%s” : “%s/%d”;


struct task_struct *p;


p = kthread_create(worker_threadcwq, fmt, wq->name, cpu);


// fmt – 线程命名格式; cpu — cpu number; cwq — 传递的参数


// 线程函数: worker_thread()


/*


 * Nobody can add the work_struct to this cwq,


 *if (caller is __create_workqueue)


 *nobody should see this wq


 *else // caller is CPU_UP_PREPARE


 *cpu is not on cpu_online_map


 * so we can abort safely.


 */


if (IS_ERR(p))


return PTR_ERR(p);


if (cwq->wq->rt)


sched_setscheduler_nocheck(p, SCHED_FIFO, ?m); 


// 是否需要设置实时属性


cwq->thread = p;


 // cpu_workqueue_struct.thread 中记录返回线程的task_struct结构体


trace_workqueue_creation(cwq->thread, cpu);


return 0;


}


Note 1 : @ kernel/kernel/kthread.c 这里会牵扯到内核线程的创建机制,可以扩展一下


static DEFINE_SPINLOCK(kthread_create_lock);


static LIST_HEAD(kthread_create_list);


struct kthread_create_info


{


/* Information passed to kthread() from kthreadd. */


int (*threadfn)(void *data);


void *data;


/* Result passed back to kthread_create() from kthreadd. */


struct task_struct *result;


struct completion done;


struct list_head list;


};


/**


 * kthread_create – create a kthread.


 * @threadfn: the function to run until signal_pending(current).


 * @data: data ptr for @threadfn.


 * @namefmt: printf-style name for the thread.


 *


 * Description: This helper function creates and names a kernel


 * thread.  The thread will be stopped: use wake_up_process() to start


 * it.  See also kthread_run(), kthread_create_on_cpu().


 *


 * When woken, the thread will run @threadfn() with @data as its


 * argument. @threadfn() can either call do_exit() directly if it is a


 * standalone thread for which noone will call kthread_stop(), or


 * return when ‘kthread_should_stop()’ is true (which means


 * kthread_stop() has been called).  The return value should be zero


 * or a negative error number; it will be passed to kthread_stop().


 *


 * Returns a task_struct or ERR_PTR(-ENOMEM).


 */


struct task_struct *kthread_create(int (*threadfn)(void *data),


   void *data,


   const char namefmt[],


   …)


{


struct kthread_create_info create;


create.threadfn = threadfn;


create.data = data;


init_completion(&create.done); // 初始化完成量


spin_lock(&kthread_create_lock);


list_add_tail(&create.list, &kthread_create_list); 


// 将新建的kthread挂接到全局的线程链表kthread_create_list


spin_unlock(&kthread_create_lock);


wake_up_process(kthreadd_task);


// kthreadd_task = find_task_by_pid_ns(pid, &init_pid_ns); @ kernel/init/main.c // rest_init()中初始化,该指针保存的是线程kthreaddtask_struct结构体指针。


// 唤醒线程kthreadd


wait_for_completion(&create.done);


// 等待完成量,我们这里转到kthreadd线程的执行函数中去看一下,这个完成量的唤醒应该是在kthreadd线程中做的,kthreadd线程应该是根据kthread_create_list上挂接的kthread_create_info结构体来创建特定线程。


// 这部分关于内核线程创建的机制请阅读分析文档:内核线程创建 目录中的相关文件和内核源码。这里不再详细分析。


// 新线程创建ok后,进入了睡眠,然后唤醒了对应的完成量create.done,这边继续执行


if (!IS_ERR(create.result)) { 


// create.result中保存的是新创建内核线程的task_struct结构体指针


struct sched_param param = { .sched_priority = 0 };


va_list args;


va_start(args, namefmt);


vsnprintf(create.result->comm, sizeof(create.result->comm),


  namefmt, args);// 设置当前线程的名字


  // 名字的格式来源于函数上层上层调用函数,这里是来源于工作队列创建函数create_workqueue_thread()中:@ kernel/kernel/workqueue.c


// const char *fmt = is_wq_single_threaded(wq) ? “%s” : “%s/%d”;


va_end(args);


/*


 * root may have changed our (kthreadd’s) priority or CPU mask.


 * The kernel thread should not inherit these properties.


 */


sched_setscheduler_nocheck(create.result, SCHED_NORMAL, ?m); 


// 调度策略设置


set_cpus_allowed_ptr(create.result, cpu_all_mask);


}


return create.result;// 返回的是新建内核线程的task_struct结构体指针


}


kthread_create()函数通过专门创建线程的内核线程kthreadd创建了公用线程kthread,而在该kthread线程函数中调用其参数传递进来的回调函数threadfn(),这个threadfn()函数就是我们调用kthread_create()函数时传递进来的第一个参数,第二个参数则是执行回调函数时的参数。该函数原形如下:


struct task_struct *kthread_create(int (*threadfn)(void *data),


   void *data,


   const char namefmt[],


   …)


调用示例:


kthread_create(worker_thread, cwq, fmt, wq->name, cpu);


这个线程创建ok之后,会在线程kthread中调用函数worker_thread(cwq);


worker_thread()函数如下,是每一个工作者线程的共用的线程函数。其实工作队列对应的数据结构是workqueue_struct,而该结构体中包含一个对应cpu的数据结构cpu_workqueue_struct,这个数据结构中包含了工作项链表worklist。而所有的工作者线程,只是名字不一样而已,所跑的线程函数都是一样:worker_thread


static int worker_thread(void *__cwq)


{


struct cpu_workqueue_struct *cwq = __cwq;


DEFINE_WAIT(wait);// 定义一个等待队列项wait 


// @ kernel/include/linux/wait.h


if (cwq->wq->freezeable)


set_freezable();// current->flags &= ~PF_NOFREEZE;


for (;;) {


prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);


// 可中断睡眠


// 准备进入睡眠等待,wait加入cwq->more_work等待队列头中,


// 设置非独占进程标志


// 和可中断睡眠标志 @ kernel/kernel/wait.c


if (!freezing(current) && 


    !kthread_should_stop() &&


    list_empty(&cwq->worklist))// 


    // 当前进程是非冻结状态,当前线程没停止,同时工作项列表为空


    // 的时候进入睡眠让出cpu


schedule();


finish_wait(&cwq->more_work, &wait);// 当前线程被唤醒后马上要做的事情


try_to_freeze();


if (kthread_should_stop())// 检查当前线程是否被要求stop


break;


run_workqueue(cwq);// 运行工作项中对应的函数


}


return 0;


}


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


在分析函数run_workqueue()之前,我们先来看一下,提交工作项的时候发生了什么事情。还是从函数int schedule_work(struct work_struct *work);开始说起吧!


@ kernel/kernel/workqueue.c


int schedule_work(struct work_struct *work)


{


return queue_work(keventd_wq, work);


}


keventd_wq工作队列是在函数init_workqueues()中创建的(参看前文),所有这里在提交工作项的时候就用上了。


int queue_work(struct workqueue_struct *wq, struct work_struct *work)


{


int ret;


ret = queue_work_on(get_cpu(), wq, work);


put_cpu();


return ret;


}


该函数将work工作项提交到当前做该项提交的cpu上的工作队列wq上,如果这个cpu被标记为die,那么可以提交到别的cpu上去执行。返回0,表示该项工作已经提交过,还没执行。非0表示提交成功。


int  queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)


{


int ret = 0;


if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {


BUG_ON(!list_empty(&work->entry));


__queue_work(wq_per_cpu(wq, cpu), work);


ret = 1;


}


return ret;


}


@ kernel/include/linux/workqueue.h


#define work_data_bits(work) ((unsigned long *)(&(work)->data))


work_struct结构体的第一个word中保留该标识,宏也在该结构体中定义。


test_and_set_bit(int nr, volatile void *addr)  *addr的第nr位设置为1,并返回它的原值。 

Linux内核的原子操作


工作项在初始化的时候会调用WORK_DATA_INIT()宏来将work_structdata域初始化成0,所有这里!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))结果为1


static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)


{


unsigned long flags;


spin_lock_irqsave(&cwq->lock, flags);


insert_work(cwq, work, &cwq->worklist);


spin_unlock_irqrestore(&cwq->lock, flags);


}


static void insert_work(struct cpu_workqueue_struct *cwq,


struct work_struct *work, struct list_head *head)


{


trace_workqueue_insertion(cwq->thread, work);


set_wq_data(work, cwq);// 设置work_structpending未决标志


/*


 * Ensure that we get the right work->data if we see the


 * result of list_add() below, see try_to_grab_pending().


 */


smp_wmb();// 多处理器的相关动作


list_add_tail(&work->entry, head);// 工作项加入链表


wake_up(&cwq->more_work);// 唤醒等待在该等待队列头上的所有等待队列项


}


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


static void run_workqueue(struct cpu_workqueue_struct *cwq)


{


spin_lock_irq(&cwq->lock);


while (!list_empty(&cwq->worklist)) {


struct work_struct *work = list_entry(cwq->worklist.next,


struct work_struct, entry);


work_func_t f = work->func;// 取出工作项函数


#ifdef CONFIG_LOCKDEP


/*


 * It is permissible to free the struct work_struct


 * from inside the function that is called from it,


 * this we need to take into account for lockdep too.


 * To avoid bogus “held lock freed” warnings as well


 * as problems when looking into work->lockdep_map,


 * make a copy and use that here.


 */


struct lockdep_map lockdep_map = work->lockdep_map;


#endif


trace_workqueue_execution(cwq->thread, work);


cwq->current_work = work;


list_del_init(cwq->worklist.next);// 从链表中删除工作项节点


spin_unlock_irq(&cwq->lock);


BUG_ON(get_wq_data(work) != cwq);


work_clear_pending(work);


lock_map_acquire(&cwq->wq->lockdep_map);


lock_map_acquire(&lockdep_map);


f(work);


// 执行对应的工作项函数,将work_struct结构体指针作为参数传递进去


lock_map_release(&lockdep_map);


lock_map_release(&cwq->wq->lockdep_map);


if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {


printk(KERN_ERR “BUG: workqueue leaked lock or atomic: “


“%s/0x%08x/%d/n”,


current->comm, preempt_count(),


       task_pid_nr(current));


printk(KERN_ERR ”    last function: “);


print_symbol(“%s/n”, (unsigned long)f);


debug_show_held_locks(current);


dump_stack();


}


spin_lock_irq(&cwq->lock);


cwq->current_work = NULL;


}


spin_unlock_irq(&cwq->lock);


}


我们在新建工作项的时候,需要将工作函数的参数设置成work_struct 结构体指针,例如:


static void sitronix_ts_work(struct work_struct *work)


INIT_WORK(&priv->work, sitronix_ts_work);


XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX


虽然以上内容是通过创建系统默认的工作队列keventd_wq和工作者线程events/n来分析了其创建过程,提交工作项过程和提交工作后唤醒工作者线程之后的所做的动作。


其实我们自己也可以使用这些接口来创建独立的工作队列和工作者线程来专门为特定的任务服务,例如在Androidlinux的睡眠和唤醒架构中就使用这种方式,@ kernel/kernel/power/wakelock.c


core_initcall(wakelocks_init)wakelocks_init()函数中有创建两个工作队列和其对于的工作者线程:


sys_sync_work_queue = create_singlethread_workqueue(“fs_sync“);


suspend_work_queue = create_singlethread_workqueue(“suspend“);


early suspend的时候调用:@ kernel/kernel/power/earlysuspend.c


static DECLARE_WORK(early_sys_sync_work, early_sys_sync);


queue_work(sys_sync_work_queue, &early_sys_sync_work);


static DECLARE_WORK(early_suspend_work, early_suspend);


queue_work(suspend_work_queue, &early_suspend_work);


suspend的时候调用:@ kernel/kernel/power/wakelock.c


static DECLARE_WORK(suspend_work, suspend);


queue_work(suspend_work_queue, &suspend_work);


下面来看一看延时执行的工作项是如何提交的,这里和上面共同的部分不讨论,只讨论如何实现的延时执行,其余部分是相同的。


delayed_work结构体的定义:@ kernel/include/linux/workqueue.h


struct delayed_work {


struct work_struct work;


struct timer_list timer;


// work_struct结构体进行了封装,添加了一个timer_list结构体


};


#define DECLARE_DELAYED_WORK(n, f)/


struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)


#define __DELAYED_WORK_INITIALIZER(n, f) {/


.work = __WORK_INITIALIZER((n).work, (f)),/


.timer = TIMER_INITIALIZER(NULL, 0, 0),/


// 初始化work_struct结构体和前文方式一样,这里需要多初始化timer域。


@ kernel/include/linux/timer.h


#define TIMER_INITIALIZER(_function, _expires, _data) {/


.entry = { .prev = TIMER_ENTRY_STATIC },/


.function = (_function),/


.expires = (_expires),/


.data = (_data),/


.base = &boot_tvec_bases,/


__TIMER_LOCKDEP_MAP_INITIALIZER(/


__FILE__ “:” __stringify(__LINE__))/


}


通常情况下使用的定义一个定时器也是调用该宏来初始化:
#define DEFINE_TIMER(_name, _function, _expires, _data)
/


struct timer_list _name =/


TIMER_INITIALIZER(_function, _expires, _data)


提交一个延时执行的工作项使用函数:


int schedule_delayed_work(struct delayed_work *dwork,  unsigned long delay)


{


return queue_delayed_work(keventd_wq, dwork, delay);


// delay – 单位是jiffies,或者传递0的话,就是立即执行和schedule_work()一样了


     // @ kernel/kernel/timer.c文件中有实现一些time to jiffies的函数:


     // msecs_to_jiffied() 、 usecs_to_jiffies()


int queue_delayed_work(struct workqueue_struct *wq,


  struct delayed_work *dwork, unsigned long delay)


{


if (delay == 0)// 如果传递进来的delay0,那么走立即执行的通路


return queue_work(wq, &dwork->work);


return queue_delayed_work_on(-1, wq, dwork, delay);


}


int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,


struct delayed_work *dwork, unsigned long delay)


{


int ret = 0;


struct timer_list *timer = &dwork->timer;


struct work_struct *work = &dwork->work;


// test_and_set_bit()设置特定位并传回该位原来的值


// 如果未决位为0,设置pending未决位后返回0


if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {


BUG_ON(timer_pending(timer));


BUG_ON(!list_empty(&work->entry));


timer_stats_timer_set_start_info(&dwork->timer);


/* This stores cwq for the moment, for the timer_fn */


set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));


timer->expires = jiffies + delay;// 到时时间阀值


timer->data = (unsigned long)dwork;// 向定时执行函数传递的参数


timer->function = delayed_work_timer_fn;// 定时执行函数


if (unlikely(cpu >= 0))


add_timer_on(timer, cpu);


else


add_timer(timer);// 向系统添加一个timer


ret = 1;


}


return ret;


}


static void delayed_work_timer_fn(unsigned long __data)


{


struct delayed_work *dwork = (struct delayed_work *)__data;


struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);


struct workqueue_struct *wq = cwq->wq;


__queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);


}


看到函数__queue_work()是不是觉得很眼熟呢?没错,延时执行的工作项走的提交路线和正常提交工作项在该函数之前不一样,后面后市一样了。换句话说,提交延时工作项,只是延时提交了而已,并不是立即提交给工作者线程,让其工作者线程延时来执行


 


其余函数介绍:


void flush_workqueue(struct workqueue_struct *wq);


此函数刷新指定工作队列,他会一直等待,知道该工作队列中所有工作项都已完成。


void flush_scheduled_work(void)


和上面函数类似,只是刷新默认工作队列:keventd_wq


void flush_delayed_work(struct delayed_work *dwork)


等待一个delayed_work执行完。


int flush_work(struct work_struct *work)


等待一个work执行完。


如何取消提交的延时工作项?


cancel_work_sync(struct work_struct *work);


该函数取消已排在工作队列中的未决work,返回true。如果workcallback已经在运行了,那么该函数将会阻塞到其执行完毕。


static inline int __cancel_delayed_work(struct delayed_work *work)


{


int ret;


ret = del_timer(&work->timer);


if (ret)


work_clear_pending(&work->work);


return ret;


}


// if it returns 0 the timer function may be running and the queueing is in progress.


static inline int cancel_delayed_work(struct delayed_work *work)


{


int ret;


ret = del_timer_sync(&work->timer); // 阻塞直到定时函数执行完


if (ret)


work_clear_pending(&work->work);


return ret;


}


// 同上


三、工作队列新老版本比较


http://liaowb1234.blog.163.com/blog/static/77155547200911296838120/


这篇网文已有详细的说明,请参考。





博客分析2

工作队列(work queue)是另外一种将工作推后执行的形式,它和tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。

那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet






  1. 工作、工作队列和工作者线程


如前所述,我们把推后执行的任务叫做工作(work ),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue ),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events, 自己也可以创建自己的工作者线程。




  1. 表示工作的数据结构


工作用<linux/workqueue.h> 中定义的work_struct 结构表示:




structwork_struct{


        unsigned longpending; /* 这个工作正在等待处理吗?*/


        structlist_head entry; /* 连接所有工作的链表*/


        void(*func) (void*); /* 要执行的函数*/


        void*data; /* 传递给函数的参数*/


        void*wq_data; /* 内部使用*/


        struct timer_listtimer; /* 延迟的工作队列所用到的定时器*/


};




这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。


3. 创建推后的工作


要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK 在编译时静态地建该结构:


DECLARE_WORK(name, void (*func) (void *), void *data);


这样就会静态地创建一个名为name ,待执行函数为func ,参数为data work_struct 结构。


同样,也可以在运行时通过指针创建一个工作:


INIT_WORK(struct work_struct *work, woid(*func) (void*), void *data);


这会动态地初始化一个由work 指向的工作。


4. 工作队列中待执行的函数


工作队列待执行的函数原型是:


void work_handler(void *data)


这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。


5. 对工作进行调度


现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events 工作线程,只需调用


schedule_work(&work)


work 马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。


有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:


schedule_delayed_work(&work, delay);


这时,&work 指向的work_struct 直到delay 指定的时钟节拍用完以后才会执行。


6. 工作队列的简单应用


#include<linux/module.h>
#include<linux/init.h>
#include<linux/workqueue.h>

staticstructworkqueue_struct *queue=NULL;
staticstructwork_struct work;

staticvoidwork_handler(structwork_struct *data)
{
        printk
(KERN_ALERT “workhandler function./n” );
}

staticint__init test_init(void)
{
        
queue=create_singlethread_workqueue(“helloworld”); /*创建一个单线程的工作队列*/
        
if(!queue)
                
gotoerr;

        INIT_WORK
(&work,work_handler);
        schedule_work
(&work);

        
return0;
err
:
        
return1;
}

staticvoid__exit test_exit(void)
{
        destroy_workqueue
(queue);
}
MODULE_LICENSE
(“GPL”);
module_init
(test_init);
module_exit
(test_exit);

赞(0) 打赏
转载请注明出处:服务器评测 » Linux工作队列实现机制
分享到: 更多 (0)

听说打赏我的人,都进福布斯排行榜啦!

支付宝扫一扫打赏

微信扫一扫打赏