感谢支持
我们一直在努力

Linux内核分析之工作队列

可延迟函数和工作队列非常相似,但是他们的区别还是很大的。主要区别在于:可延迟函数运行在中断上下文中,而工作队列中的函数运行在进程上下文中。在中断上下文中不可能发生进程切换。可延迟函数和工作队列中的函数都不能访问进程的用户态地址空间。


涉及数据结构


  1. /* 

  2.  * The per-CPU workqueue (if single thread, we always use the first 

  3.  * possible cpu). 

  4.  */  

  5. struct cpu_workqueue_struct {  

  6.   

  7.     spinlock_t lock;/*保护该数据结构的自旋锁*/  

  8.   

  9.     struct list_head worklist;/*挂起链表的头结点*/  

  10.     /*等待队列,其中的工作者线程因等待跟多 

  11.     的工作而处于睡眠状态*/  

  12.     wait_queue_head_t more_work;  

  13.     /*等待队列,其中的进程由于等待工作队列 

  14.     被刷新而处于睡眠状态*/  

  15.     struct work_struct *current_work;  

  16.       

  17.     struct workqueue_struct *wq;  

  18.     struct task_struct *thread;/*指向结构中工作者线程的进程描述符指针*/  

  19. } ____cacheline_aligned;  

  20.   

  21. /* 

  22.  * The externally visible workqueue abstraction is an array of 

  23.  * per-CPU workqueues: 

  24.  */  

  25. struct workqueue_struct {  

  26.     struct cpu_workqueue_struct *cpu_wq;  

  27.     struct list_head list;  

  28.     const char *name;  

  29.     int singlethread;  

  30.     int freezeable;     /* Freeze threads during suspend */  

  31.     int rt;  

  32. #ifdef CONFIG_LOCKDEP   

  33.     struct lockdep_map lockdep_map;  

  34. #endif   

  35. };  

工作队列操作


创建


最终都会调用如下函数执行


  1. struct workqueue_struct *__create_workqueue_key(const char *name,  

  2.                         int singlethread,  

  3.                         int freezeable,  

  4.                         int rt,  

  5.                         struct lock_class_key *key,  

  6.                         const char *lock_name)  

  7. {  

  8.     struct workqueue_struct *wq;  

  9.     struct cpu_workqueue_struct *cwq;  

  10.     int err = 0, cpu;  

  11.     /*分配wq结构*/  

  12.     wq = kzalloc(sizeof(*wq), GFP_KERNEL);  

  13.     if (!wq)  

  14.         return NULL;  

  15.     /*分配cwq结构*/  

  16.     wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);  

  17.     if (!wq->cpu_wq) {  

  18.         kfree(wq);  

  19.         return NULL;  

  20.     }  

  21.   

  22.     wq->name = name;  

  23.     lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);  

  24.     wq->singlethread = singlethread;  

  25.     wq->freezeable = freezeable;  

  26.     wq->rt = rt;  

  27.     INIT_LIST_HEAD(&wq->list);  

  28.   

  29.     if (singlethread) {/*如果设置了单线程,只创建一个*/  

  30.         /*初始化cwq*/  

  31.         cwq = init_cpu_workqueue(wq, singlethread_cpu);  

  32.         /*创建内核线程*/  

  33.         err = create_workqueue_thread(cwq, singlethread_cpu);  

  34.         /*唤醒刚创建的内核线程*/  

  35.         start_workqueue_thread(cwq, -1);  

  36.     } else {/*反之,每个cpu创建一个线程*/  

  37.         cpu_maps_update_begin();  

  38.         /* 

  39.          * We must place this wq on list even if the code below fails. 

  40.          * cpu_down(cpu) can remove cpu from cpu_populated_map before 

  41.          * destroy_workqueue() takes the lock, in that case we leak 

  42.          * cwq[cpu]->thread. 

  43.          */  

  44.         spin_lock(&workqueue_lock);  

  45.         list_add(&wq->list, &workqueues);  

  46.         spin_unlock(&workqueue_lock);  

  47.         /* 

  48.          * We must initialize cwqs for each possible cpu even if we 

  49.          * are going to call destroy_workqueue() finally. Otherwise 

  50.          * cpu_up() can hit the uninitialized cwq once we drop the 

  51.          * lock. 

  52.          */  

  53.         for_each_possible_cpu(cpu) {/*对每个cpu*/  

  54.             cwq = init_cpu_workqueue(wq, cpu);  

  55.             if (err || !cpu_online(cpu))  

  56.                 continue;  

  57.             err = create_workqueue_thread(cwq, cpu);  

  58.             start_workqueue_thread(cwq, cpu);  

  59.         }  

  60.         cpu_maps_update_done();  

  61.     }  

  62.   

  63.     if (err) {  

  64.         destroy_workqueue(wq);  

  65.         wq = NULL;  

  66.     }  

  67.     return wq;  

  68. }  

可见,工作队列在创建时就唤醒创建的内核线程,下面我们看看他创建的内核线程


  1. static int worker_thread(void *__cwq)  

  2. {  

  3.     struct cpu_workqueue_struct *cwq = __cwq;  

  4.     DEFINE_WAIT(wait);  

  5.   

  6.     if (cwq->wq->freezeable)  

  7.         set_freezable();  

  8.   

  9.     for (;;) {  

  10.         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);  

  11.         if (!freezing(current) &&  

  12.             !kthread_should_stop() &&  

  13.             list_empty(&cwq->worklist))  

  14.             schedule();  

  15.         finish_wait(&cwq->more_work, &wait);  

  16.   

  17.         try_to_freeze();  

  18.   

  19.         if (kthread_should_stop())  

  20.             break;  

  21.         /*执行工作队列*/  

  22.         run_workqueue(cwq);  

  23.     }  

  24.   

  25.     return 0;  

  26. }  

 


  1. static void run_workqueue(struct cpu_workqueue_struct *cwq)  

  2. {  

  3.     spin_lock_irq(&cwq->lock);  

  4.     while (!list_empty(&cwq->worklist)) {  

  5.         struct work_struct *work = list_entry(cwq->worklist.next,  

  6.                         struct work_struct, entry);  

  7.         work_func_t f = work->func;  

  8. #ifdef CONFIG_LOCKDEP   

  9.         /* 

  10.          * It is permissible to free the struct work_struct 

  11.          * from inside the function that is called from it, 

  12.          * this we need to take into account for lockdep too. 

  13.          * To avoid bogus “held lock freed” warnings as well 

  14.          * as problems when looking into work->lockdep_map, 

  15.          * make a copy and use that here. 

  16.          */  

  17.         struct lockdep_map lockdep_map = work->lockdep_map;  

  18. #endif   

  19.         trace_workqueue_execution(cwq->thread, work);  

  20.         cwq->current_work = work;  

  21.         list_del_init(cwq->worklist.next);  

  22.         spin_unlock_irq(&cwq->lock);  

  23.   

  24.         BUG_ON(get_wq_data(work) != cwq);  

  25.         work_clear_pending(work);  

  26.         lock_map_acquire(&cwq->wq->lockdep_map);  

  27.         lock_map_acquire(&lockdep_map);  

  28.         f(work);/*执行工作队列中实际的函数*/  

  29.         lock_map_release(&lockdep_map);  

  30.         lock_map_release(&cwq->wq->lockdep_map);  

  31.   

  32.         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {  

  33.             printk(KERN_ERR “BUG: workqueue leaked lock or atomic: “  

  34.                     “%s/0x%08x/%d\n”,  

  35.                     current->comm, preempt_count(),  

  36.                         task_pid_nr(current));  

  37.             printk(KERN_ERR ”    last function: “);  

  38.             print_symbol(“%s\n”, (unsigned long)f);  

  39.             debug_show_held_locks(current);  

  40.             dump_stack();  

  41.         }  

  42.   

  43.         spin_lock_irq(&cwq->lock);  

  44.         cwq->current_work = NULL;  

  45.     }  

  46.     spin_unlock_irq(&cwq->lock);  

  47. }  

可见,创建的内核线程是执行工作队列中的所有函数。
除了最重要的创建函数,内核提供了一系列函数对其操作和方便编程,在这里介绍一个插入队列的函数。


  1. /** 

  2.  * queue_work – queue work on a workqueue 

  3.  * @wq: workqueue to use 

  4.  * @work: work to queue 

  5.  * 

  6.  * Returns 0 if @work was already on a queue, non-zero otherwise. 

  7.  * 

  8.  * We queue the work to the CPU on which it was submitted, but if the CPU dies 

  9.  * it can be processed by another CPU. 

  10.  */  

  11. int queue_work(struct workqueue_struct *wq, struct work_struct *work)  

  12. {  

  13.     int ret;  

  14.   

  15.     ret = queue_work_on(get_cpu(), wq, work);  

  16.     put_cpu();  

  17.   

  18.     return ret;  

  19. }  

 


  1. /** 

  2.  * queue_work_on – queue work on specific cpu 

  3.  * @cpu: CPU number to execute work on 

  4.  * @wq: workqueue to use 

  5.  * @work: work to queue 

  6.  * 

  7.  * Returns 0 if @work was already on a queue, non-zero otherwise. 

  8.  * 

  9.  * We queue the work to a specific CPU, the caller must ensure it 

  10.  * can’t go away. 

  11.  */  

  12. int  

  13. queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)  

  14. {  

  15.     int ret = 0;  

  16.   

  17.     if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {  

  18.         BUG_ON(!list_empty(&work->entry));  

  19.         __queue_work(wq_per_cpu(wq, cpu), work);  

  20.         ret = 1;  

  21.     }  

  22.     return ret;  

  23. }  

最终调用insert_work函数


  1. static void insert_work(struct cpu_workqueue_struct *cwq,  

  2.             struct work_struct *work, struct list_head *head)  

  3. {  

  4.     trace_workqueue_insertion(cwq->thread, work);  

  5.   

  6.     set_wq_data(work, cwq);  

  7.     /* 

  8.      * Ensure that we get the right work->data if we see the 

  9.      * result of list_add() below, see try_to_grab_pending(). 

  10.      */  

  11.     smp_wmb();  

  12.     list_add_tail(&work->entry, head);  

  13.     wake_up(&cwq->more_work);  

  14. }  

可见,在队列插入的时候就实现了唤醒。其他的函数不一一说了,了解了他的实现原理,看懂不难。

赞(0) 打赏
转载请注明出处:服务器评测 » Linux内核分析之工作队列
分享到: 更多 (0)

听说打赏我的人,都进福布斯排行榜啦!

支付宝扫一扫打赏

微信扫一扫打赏