Root/kernel/slow-work.c

1/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
2 *
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
10 *
11 * See Documentation/slow-work.txt
12 */
13
14#include <linux/module.h>
15#include <linux/slow-work.h>
16#include <linux/kthread.h>
17#include <linux/freezer.h>
18#include <linux/wait.h>
19#include <linux/debugfs.h>
20#include "slow-work.h"
21
22static void slow_work_cull_timeout(unsigned long);
23static void slow_work_oom_timeout(unsigned long);
24
25#ifdef CONFIG_SYSCTL
26static int slow_work_min_threads_sysctl(struct ctl_table *, int,
27                    void __user *, size_t *, loff_t *);
28
29static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
30                    void __user *, size_t *, loff_t *);
31#endif
32
33/*
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
36 *
37 * A portion of the pool may be processing very slow operations.
38 */
39static unsigned slow_work_min_threads = 2;
40static unsigned slow_work_max_threads = 4;
41static unsigned vslow_work_proportion = 50; /* % of threads that may process
42                         * very slow work */
43
44#ifdef CONFIG_SYSCTL
45static const int slow_work_min_min_threads = 2;
46static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
47static const int slow_work_min_vslow = 1;
48static const int slow_work_max_vslow = 99;
49
50ctl_table slow_work_sysctls[] = {
51    {
52        .ctl_name = CTL_UNNUMBERED,
53        .procname = "min-threads",
54        .data = &slow_work_min_threads,
55        .maxlen = sizeof(unsigned),
56        .mode = 0644,
57        .proc_handler = slow_work_min_threads_sysctl,
58        .extra1 = (void *) &slow_work_min_min_threads,
59        .extra2 = &slow_work_max_threads,
60    },
61    {
62        .ctl_name = CTL_UNNUMBERED,
63        .procname = "max-threads",
64        .data = &slow_work_max_threads,
65        .maxlen = sizeof(unsigned),
66        .mode = 0644,
67        .proc_handler = slow_work_max_threads_sysctl,
68        .extra1 = &slow_work_min_threads,
69        .extra2 = (void *) &slow_work_max_max_threads,
70    },
71    {
72        .ctl_name = CTL_UNNUMBERED,
73        .procname = "vslow-percentage",
74        .data = &vslow_work_proportion,
75        .maxlen = sizeof(unsigned),
76        .mode = 0644,
77        .proc_handler = &proc_dointvec_minmax,
78        .extra1 = (void *) &slow_work_min_vslow,
79        .extra2 = (void *) &slow_work_max_vslow,
80    },
81    { .ctl_name = 0 }
82};
83#endif
84
85/*
86 * The active state of the thread pool
87 */
88static atomic_t slow_work_thread_count;
89static atomic_t vslow_work_executing_count;
90
91static bool slow_work_may_not_start_new_thread;
92static bool slow_work_cull; /* cull a thread due to lack of activity */
93static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
94static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
95static struct slow_work slow_work_new_thread; /* new thread starter */
96
97/*
98 * slow work ID allocation (use slow_work_queue_lock)
99 */
100static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
101
102/*
103 * Unregistration tracking to prevent put_ref() from disappearing during module
104 * unload
105 */
106#ifdef CONFIG_MODULES
107static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108static struct module *slow_work_unreg_module;
109static struct slow_work *slow_work_unreg_work_item;
110static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111static DEFINE_MUTEX(slow_work_unreg_sync_lock);
112
113static void slow_work_set_thread_processing(int id, struct slow_work *work)
114{
115    if (work)
116        slow_work_thread_processing[id] = work->owner;
117}
118static void slow_work_done_thread_processing(int id, struct slow_work *work)
119{
120    struct module *module = slow_work_thread_processing[id];
121
122    slow_work_thread_processing[id] = NULL;
123    smp_mb();
124    if (slow_work_unreg_work_item == work ||
125        slow_work_unreg_module == module)
126        wake_up_all(&slow_work_unreg_wq);
127}
128static void slow_work_clear_thread_processing(int id)
129{
130    slow_work_thread_processing[id] = NULL;
131}
132#else
133static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
134static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
135static void slow_work_clear_thread_processing(int id) {}
136#endif
137
138/*
139 * Data for tracking currently executing items for indication through /proc
140 */
141#ifdef CONFIG_SLOW_WORK_DEBUG
142struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
143pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
144DEFINE_RWLOCK(slow_work_execs_lock);
145#endif
146
147/*
148 * The queues of work items and the lock governing access to them. These are
149 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
150 * as the number of threads bears no relation to the number of CPUs.
151 *
152 * There are two queues of work items: one for slow work items, and one for
153 * very slow work items.
154 */
155LIST_HEAD(slow_work_queue);
156LIST_HEAD(vslow_work_queue);
157DEFINE_SPINLOCK(slow_work_queue_lock);
158
159/*
160 * The following are two wait queues that get pinged when a work item is placed
161 * on an empty queue. These allow work items that are hogging a thread by
162 * sleeping in a way that could be deferred to yield their thread and enqueue
163 * themselves.
164 */
165static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
166static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
167
168/*
169 * The thread controls. A variable used to signal to the threads that they
170 * should exit when the queue is empty, a waitqueue used by the threads to wait
171 * for signals, and a completion set by the last thread to exit.
172 */
173static bool slow_work_threads_should_exit;
174static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
175static DECLARE_COMPLETION(slow_work_last_thread_exited);
176
177/*
178 * The number of users of the thread pool and its lock. Whilst this is zero we
179 * have no threads hanging around, and when this reaches zero, we wait for all
180 * active or queued work items to complete and kill all the threads we do have.
181 */
182static int slow_work_user_count;
183static DEFINE_MUTEX(slow_work_user_lock);
184
185static inline int slow_work_get_ref(struct slow_work *work)
186{
187    if (work->ops->get_ref)
188        return work->ops->get_ref(work);
189
190    return 0;
191}
192
193static inline void slow_work_put_ref(struct slow_work *work)
194{
195    if (work->ops->put_ref)
196        work->ops->put_ref(work);
197}
198
199/*
200 * Calculate the maximum number of active threads in the pool that are
201 * permitted to process very slow work items.
202 *
203 * The answer is rounded up to at least 1, but may not equal or exceed the
204 * maximum number of the threads in the pool. This means we always have at
205 * least one thread that can process slow work items, and we always have at
206 * least one thread that won't get tied up doing so.
207 */
208static unsigned slow_work_calc_vsmax(void)
209{
210    unsigned vsmax;
211
212    vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
213    vsmax /= 100;
214    vsmax = max(vsmax, 1U);
215    return min(vsmax, slow_work_max_threads - 1);
216}
217
218/*
219 * Attempt to execute stuff queued on a slow thread. Return true if we managed
220 * it, false if there was nothing to do.
221 */
222static noinline bool slow_work_execute(int id)
223{
224    struct slow_work *work = NULL;
225    unsigned vsmax;
226    bool very_slow;
227
228    vsmax = slow_work_calc_vsmax();
229
230    /* see if we can schedule a new thread to be started if we're not
231     * keeping up with the work */
232    if (!waitqueue_active(&slow_work_thread_wq) &&
233        (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
234        atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
235        !slow_work_may_not_start_new_thread)
236        slow_work_enqueue(&slow_work_new_thread);
237
238    /* find something to execute */
239    spin_lock_irq(&slow_work_queue_lock);
240    if (!list_empty(&vslow_work_queue) &&
241        atomic_read(&vslow_work_executing_count) < vsmax) {
242        work = list_entry(vslow_work_queue.next,
243                  struct slow_work, link);
244        if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
245            BUG();
246        list_del_init(&work->link);
247        atomic_inc(&vslow_work_executing_count);
248        very_slow = true;
249    } else if (!list_empty(&slow_work_queue)) {
250        work = list_entry(slow_work_queue.next,
251                  struct slow_work, link);
252        if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
253            BUG();
254        list_del_init(&work->link);
255        very_slow = false;
256    } else {
257        very_slow = false; /* avoid the compiler warning */
258    }
259
260    slow_work_set_thread_processing(id, work);
261    if (work) {
262        slow_work_mark_time(work);
263        slow_work_begin_exec(id, work);
264    }
265
266    spin_unlock_irq(&slow_work_queue_lock);
267
268    if (!work)
269        return false;
270
271    if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
272        BUG();
273
274    /* don't execute if the work is in the process of being cancelled */
275    if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
276        work->ops->execute(work);
277
278    if (very_slow)
279        atomic_dec(&vslow_work_executing_count);
280    clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
281
282    /* wake up anyone waiting for this work to be complete */
283    wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
284
285    slow_work_end_exec(id, work);
286
287    /* if someone tried to enqueue the item whilst we were executing it,
288     * then it'll be left unenqueued to avoid multiple threads trying to
289     * execute it simultaneously
290     *
291     * there is, however, a race between us testing the pending flag and
292     * getting the spinlock, and between the enqueuer setting the pending
293     * flag and getting the spinlock, so we use a deferral bit to tell us
294     * if the enqueuer got there first
295     */
296    if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
297        spin_lock_irq(&slow_work_queue_lock);
298
299        if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
300            test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
301            goto auto_requeue;
302
303        spin_unlock_irq(&slow_work_queue_lock);
304    }
305
306    /* sort out the race between module unloading and put_ref() */
307    slow_work_put_ref(work);
308    slow_work_done_thread_processing(id, work);
309
310    return true;
311
312auto_requeue:
313    /* we must complete the enqueue operation
314     * - we transfer our ref on the item back to the appropriate queue
315     * - don't wake another thread up as we're awake already
316     */
317    slow_work_mark_time(work);
318    if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
319        list_add_tail(&work->link, &vslow_work_queue);
320    else
321        list_add_tail(&work->link, &slow_work_queue);
322    spin_unlock_irq(&slow_work_queue_lock);
323    slow_work_clear_thread_processing(id);
324    return true;
325}
326
327/**
328 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
329 * work: The work item under execution that wants to sleep
330 * _timeout: Scheduler sleep timeout
331 *
332 * Allow a requeueable work item to sleep on a slow-work processor thread until
333 * that thread is needed to do some other work or the sleep is interrupted by
334 * some other event.
335 *
336 * The caller must set up a wake up event before calling this and must have set
337 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
338 * condition before calling this function as no test is made here.
339 *
340 * False is returned if there is nothing on the queue; true is returned if the
341 * work item should be requeued
342 */
343bool slow_work_sleep_till_thread_needed(struct slow_work *work,
344                    signed long *_timeout)
345{
346    wait_queue_head_t *wfo_wq;
347    struct list_head *queue;
348
349    DEFINE_WAIT(wait);
350
351    if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
352        wfo_wq = &vslow_work_queue_waits_for_occupation;
353        queue = &vslow_work_queue;
354    } else {
355        wfo_wq = &slow_work_queue_waits_for_occupation;
356        queue = &slow_work_queue;
357    }
358
359    if (!list_empty(queue))
360        return true;
361
362    add_wait_queue_exclusive(wfo_wq, &wait);
363    if (list_empty(queue))
364        *_timeout = schedule_timeout(*_timeout);
365    finish_wait(wfo_wq, &wait);
366
367    return !list_empty(queue);
368}
369EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
370
371/**
372 * slow_work_enqueue - Schedule a slow work item for processing
373 * @work: The work item to queue
374 *
375 * Schedule a slow work item for processing. If the item is already undergoing
376 * execution, this guarantees not to re-enter the execution routine until the
377 * first execution finishes.
378 *
379 * The item is pinned by this function as it retains a reference to it, managed
380 * through the item operations. The item is unpinned once it has been
381 * executed.
382 *
383 * An item may hog the thread that is running it for a relatively large amount
384 * of time, sufficient, for example, to perform several lookup, mkdir, create
385 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
386 *
387 * Conversely, if a number of items are awaiting processing, it may take some
388 * time before any given item is given attention. The number of threads in the
389 * pool may be increased to deal with demand, but only up to a limit.
390 *
391 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
392 * the very slow queue, from which only a portion of the threads will be
393 * allowed to pick items to execute. This ensures that very slow items won't
394 * overly block ones that are just ordinarily slow.
395 *
396 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
397 * attempted queued)
398 */
399int slow_work_enqueue(struct slow_work *work)
400{
401    wait_queue_head_t *wfo_wq;
402    struct list_head *queue;
403    unsigned long flags;
404    int ret;
405
406    if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
407        return -ECANCELED;
408
409    BUG_ON(slow_work_user_count <= 0);
410    BUG_ON(!work);
411    BUG_ON(!work->ops);
412
413    /* when honouring an enqueue request, we only promise that we will run
414     * the work function in the future; we do not promise to run it once
415     * per enqueue request
416     *
417     * we use the PENDING bit to merge together repeat requests without
418     * having to disable IRQs and take the spinlock, whilst still
419     * maintaining our promise
420     */
421    if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
422        if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
423            wfo_wq = &vslow_work_queue_waits_for_occupation;
424            queue = &vslow_work_queue;
425        } else {
426            wfo_wq = &slow_work_queue_waits_for_occupation;
427            queue = &slow_work_queue;
428        }
429
430        spin_lock_irqsave(&slow_work_queue_lock, flags);
431
432        if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
433            goto cancelled;
434
435        /* we promise that we will not attempt to execute the work
436         * function in more than one thread simultaneously
437         *
438         * this, however, leaves us with a problem if we're asked to
439         * enqueue the work whilst someone is executing the work
440         * function as simply queueing the work immediately means that
441         * another thread may try executing it whilst it is already
442         * under execution
443         *
444         * to deal with this, we set the ENQ_DEFERRED bit instead of
445         * enqueueing, and the thread currently executing the work
446         * function will enqueue the work item when the work function
447         * returns and it has cleared the EXECUTING bit
448         */
449        if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
450            set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
451        } else {
452            ret = slow_work_get_ref(work);
453            if (ret < 0)
454                goto failed;
455            slow_work_mark_time(work);
456            list_add_tail(&work->link, queue);
457            wake_up(&slow_work_thread_wq);
458
459            /* if someone who could be requeued is sleeping on a
460             * thread, then ask them to yield their thread */
461            if (work->link.prev == queue)
462                wake_up(wfo_wq);
463        }
464
465        spin_unlock_irqrestore(&slow_work_queue_lock, flags);
466    }
467    return 0;
468
469cancelled:
470    ret = -ECANCELED;
471failed:
472    spin_unlock_irqrestore(&slow_work_queue_lock, flags);
473    return ret;
474}
475EXPORT_SYMBOL(slow_work_enqueue);
476
477static int slow_work_wait(void *word)
478{
479    schedule();
480    return 0;
481}
482
483/**
484 * slow_work_cancel - Cancel a slow work item
485 * @work: The work item to cancel
486 *
487 * This function will cancel a previously enqueued work item. If we cannot
488 * cancel the work item, it is guarenteed to have run when this function
489 * returns.
490 */
491void slow_work_cancel(struct slow_work *work)
492{
493    bool wait = true, put = false;
494
495    set_bit(SLOW_WORK_CANCELLING, &work->flags);
496    smp_mb();
497
498    /* if the work item is a delayed work item with an active timer, we
499     * need to wait for the timer to finish _before_ getting the spinlock,
500     * lest we deadlock against the timer routine
501     *
502     * the timer routine will leave DELAYED set if it notices the
503     * CANCELLING flag in time
504     */
505    if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
506        struct delayed_slow_work *dwork =
507            container_of(work, struct delayed_slow_work, work);
508        del_timer_sync(&dwork->timer);
509    }
510
511    spin_lock_irq(&slow_work_queue_lock);
512
513    if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
514        /* the timer routine aborted or never happened, so we are left
515         * holding the timer's reference on the item and should just
516         * drop the pending flag and wait for any ongoing execution to
517         * finish */
518        struct delayed_slow_work *dwork =
519            container_of(work, struct delayed_slow_work, work);
520
521        BUG_ON(timer_pending(&dwork->timer));
522        BUG_ON(!list_empty(&work->link));
523
524        clear_bit(SLOW_WORK_DELAYED, &work->flags);
525        put = true;
526        clear_bit(SLOW_WORK_PENDING, &work->flags);
527
528    } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
529           !list_empty(&work->link)) {
530        /* the link in the pending queue holds a reference on the item
531         * that we will need to release */
532        list_del_init(&work->link);
533        wait = false;
534        put = true;
535        clear_bit(SLOW_WORK_PENDING, &work->flags);
536
537    } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
538        /* the executor is holding our only reference on the item, so
539         * we merely need to wait for it to finish executing */
540        clear_bit(SLOW_WORK_PENDING, &work->flags);
541    }
542
543    spin_unlock_irq(&slow_work_queue_lock);
544
545    /* the EXECUTING flag is set by the executor whilst the spinlock is set
546     * and before the item is dequeued - so assuming the above doesn't
547     * actually dequeue it, simply waiting for the EXECUTING flag to be
548     * released here should be sufficient */
549    if (wait)
550        wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
551                TASK_UNINTERRUPTIBLE);
552
553    clear_bit(SLOW_WORK_CANCELLING, &work->flags);
554    if (put)
555        slow_work_put_ref(work);
556}
557EXPORT_SYMBOL(slow_work_cancel);
558
559/*
560 * Handle expiry of the delay timer, indicating that a delayed slow work item
561 * should now be queued if not cancelled
562 */
563static void delayed_slow_work_timer(unsigned long data)
564{
565    wait_queue_head_t *wfo_wq;
566    struct list_head *queue;
567    struct slow_work *work = (struct slow_work *) data;
568    unsigned long flags;
569    bool queued = false, put = false, first = false;
570
571    if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
572        wfo_wq = &vslow_work_queue_waits_for_occupation;
573        queue = &vslow_work_queue;
574    } else {
575        wfo_wq = &slow_work_queue_waits_for_occupation;
576        queue = &slow_work_queue;
577    }
578
579    spin_lock_irqsave(&slow_work_queue_lock, flags);
580    if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
581        clear_bit(SLOW_WORK_DELAYED, &work->flags);
582
583        if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
584            /* we discard the reference the timer was holding in
585             * favour of the one the executor holds */
586            set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
587            put = true;
588        } else {
589            slow_work_mark_time(work);
590            list_add_tail(&work->link, queue);
591            queued = true;
592            if (work->link.prev == queue)
593                first = true;
594        }
595    }
596
597    spin_unlock_irqrestore(&slow_work_queue_lock, flags);
598    if (put)
599        slow_work_put_ref(work);
600    if (first)
601        wake_up(wfo_wq);
602    if (queued)
603        wake_up(&slow_work_thread_wq);
604}
605
606/**
607 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
608 * @dwork: The delayed work item to queue
609 * @delay: When to start executing the work, in jiffies from now
610 *
611 * This is similar to slow_work_enqueue(), but it adds a delay before the work
612 * is actually queued for processing.
613 *
614 * The item can have delayed processing requested on it whilst it is being
615 * executed. The delay will begin immediately, and if it expires before the
616 * item finishes executing, the item will be placed back on the queue when it
617 * has done executing.
618 */
619int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
620                  unsigned long delay)
621{
622    struct slow_work *work = &dwork->work;
623    unsigned long flags;
624    int ret;
625
626    if (delay == 0)
627        return slow_work_enqueue(&dwork->work);
628
629    BUG_ON(slow_work_user_count <= 0);
630    BUG_ON(!work);
631    BUG_ON(!work->ops);
632
633    if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
634        return -ECANCELED;
635
636    if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
637        spin_lock_irqsave(&slow_work_queue_lock, flags);
638
639        if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
640            goto cancelled;
641
642        /* the timer holds a reference whilst it is pending */
643        ret = work->ops->get_ref(work);
644        if (ret < 0)
645            goto cant_get_ref;
646
647        if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
648            BUG();
649        dwork->timer.expires = jiffies + delay;
650        dwork->timer.data = (unsigned long) work;
651        dwork->timer.function = delayed_slow_work_timer;
652        add_timer(&dwork->timer);
653
654        spin_unlock_irqrestore(&slow_work_queue_lock, flags);
655    }
656
657    return 0;
658
659cancelled:
660    ret = -ECANCELED;
661cant_get_ref:
662    spin_unlock_irqrestore(&slow_work_queue_lock, flags);
663    return ret;
664}
665EXPORT_SYMBOL(delayed_slow_work_enqueue);
666
667/*
668 * Schedule a cull of the thread pool at some time in the near future
669 */
670static void slow_work_schedule_cull(void)
671{
672    mod_timer(&slow_work_cull_timer,
673          round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
674}
675
676/*
677 * Worker thread culling algorithm
678 */
679static bool slow_work_cull_thread(void)
680{
681    unsigned long flags;
682    bool do_cull = false;
683
684    spin_lock_irqsave(&slow_work_queue_lock, flags);
685
686    if (slow_work_cull) {
687        slow_work_cull = false;
688
689        if (list_empty(&slow_work_queue) &&
690            list_empty(&vslow_work_queue) &&
691            atomic_read(&slow_work_thread_count) >
692            slow_work_min_threads) {
693            slow_work_schedule_cull();
694            do_cull = true;
695        }
696    }
697
698    spin_unlock_irqrestore(&slow_work_queue_lock, flags);
699    return do_cull;
700}
701
702/*
703 * Determine if there is slow work available for dispatch
704 */
705static inline bool slow_work_available(int vsmax)
706{
707    return !list_empty(&slow_work_queue) ||
708        (!list_empty(&vslow_work_queue) &&
709         atomic_read(&vslow_work_executing_count) < vsmax);
710}
711
712/*
713 * Worker thread dispatcher
714 */
715static int slow_work_thread(void *_data)
716{
717    int vsmax, id;
718
719    DEFINE_WAIT(wait);
720
721    set_freezable();
722    set_user_nice(current, -5);
723
724    /* allocate ourselves an ID */
725    spin_lock_irq(&slow_work_queue_lock);
726    id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
727    BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
728    __set_bit(id, slow_work_ids);
729    slow_work_set_thread_pid(id, current->pid);
730    spin_unlock_irq(&slow_work_queue_lock);
731
732    sprintf(current->comm, "kslowd%03u", id);
733
734    for (;;) {
735        vsmax = vslow_work_proportion;
736        vsmax *= atomic_read(&slow_work_thread_count);
737        vsmax /= 100;
738
739        prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
740                      TASK_INTERRUPTIBLE);
741        if (!freezing(current) &&
742            !slow_work_threads_should_exit &&
743            !slow_work_available(vsmax) &&
744            !slow_work_cull)
745            schedule();
746        finish_wait(&slow_work_thread_wq, &wait);
747
748        try_to_freeze();
749
750        vsmax = vslow_work_proportion;
751        vsmax *= atomic_read(&slow_work_thread_count);
752        vsmax /= 100;
753
754        if (slow_work_available(vsmax) && slow_work_execute(id)) {
755            cond_resched();
756            if (list_empty(&slow_work_queue) &&
757                list_empty(&vslow_work_queue) &&
758                atomic_read(&slow_work_thread_count) >
759                slow_work_min_threads)
760                slow_work_schedule_cull();
761            continue;
762        }
763
764        if (slow_work_threads_should_exit)
765            break;
766
767        if (slow_work_cull && slow_work_cull_thread())
768            break;
769    }
770
771    spin_lock_irq(&slow_work_queue_lock);
772    slow_work_set_thread_pid(id, 0);
773    __clear_bit(id, slow_work_ids);
774    spin_unlock_irq(&slow_work_queue_lock);
775
776    if (atomic_dec_and_test(&slow_work_thread_count))
777        complete_and_exit(&slow_work_last_thread_exited, 0);
778    return 0;
779}
780
781/*
782 * Handle thread cull timer expiration
783 */
784static void slow_work_cull_timeout(unsigned long data)
785{
786    slow_work_cull = true;
787    wake_up(&slow_work_thread_wq);
788}
789
790/*
791 * Start a new slow work thread
792 */
793static void slow_work_new_thread_execute(struct slow_work *work)
794{
795    struct task_struct *p;
796
797    if (slow_work_threads_should_exit)
798        return;
799
800    if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
801        return;
802
803    if (!mutex_trylock(&slow_work_user_lock))
804        return;
805
806    slow_work_may_not_start_new_thread = true;
807    atomic_inc(&slow_work_thread_count);
808    p = kthread_run(slow_work_thread, NULL, "kslowd");
809    if (IS_ERR(p)) {
810        printk(KERN_DEBUG "Slow work thread pool: OOM\n");
811        if (atomic_dec_and_test(&slow_work_thread_count))
812            BUG(); /* we're running on a slow work thread... */
813        mod_timer(&slow_work_oom_timer,
814              round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
815    } else {
816        /* ratelimit the starting of new threads */
817        mod_timer(&slow_work_oom_timer, jiffies + 1);
818    }
819
820    mutex_unlock(&slow_work_user_lock);
821}
822
823static const struct slow_work_ops slow_work_new_thread_ops = {
824    .owner = THIS_MODULE,
825    .execute = slow_work_new_thread_execute,
826#ifdef CONFIG_SLOW_WORK_DEBUG
827    .desc = slow_work_new_thread_desc,
828#endif
829};
830
831/*
832 * post-OOM new thread start suppression expiration
833 */
834static void slow_work_oom_timeout(unsigned long data)
835{
836    slow_work_may_not_start_new_thread = false;
837}
838
839#ifdef CONFIG_SYSCTL
840/*
841 * Handle adjustment of the minimum number of threads
842 */
843static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
844                    void __user *buffer,
845                    size_t *lenp, loff_t *ppos)
846{
847    int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
848    int n;
849
850    if (ret == 0) {
851        mutex_lock(&slow_work_user_lock);
852        if (slow_work_user_count > 0) {
853            /* see if we need to start or stop threads */
854            n = atomic_read(&slow_work_thread_count) -
855                slow_work_min_threads;
856
857            if (n < 0 && !slow_work_may_not_start_new_thread)
858                slow_work_enqueue(&slow_work_new_thread);
859            else if (n > 0)
860                slow_work_schedule_cull();
861        }
862        mutex_unlock(&slow_work_user_lock);
863    }
864
865    return ret;
866}
867
868/*
869 * Handle adjustment of the maximum number of threads
870 */
871static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
872                    void __user *buffer,
873                    size_t *lenp, loff_t *ppos)
874{
875    int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
876    int n;
877
878    if (ret == 0) {
879        mutex_lock(&slow_work_user_lock);
880        if (slow_work_user_count > 0) {
881            /* see if we need to stop threads */
882            n = slow_work_max_threads -
883                atomic_read(&slow_work_thread_count);
884
885            if (n < 0)
886                slow_work_schedule_cull();
887        }
888        mutex_unlock(&slow_work_user_lock);
889    }
890
891    return ret;
892}
893#endif /* CONFIG_SYSCTL */
894
895/**
896 * slow_work_register_user - Register a user of the facility
897 * @module: The module about to make use of the facility
898 *
899 * Register a user of the facility, starting up the initial threads if there
900 * aren't any other users at this point. This will return 0 if successful, or
901 * an error if not.
902 */
903int slow_work_register_user(struct module *module)
904{
905    struct task_struct *p;
906    int loop;
907
908    mutex_lock(&slow_work_user_lock);
909
910    if (slow_work_user_count == 0) {
911        printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
912        init_completion(&slow_work_last_thread_exited);
913
914        slow_work_threads_should_exit = false;
915        slow_work_init(&slow_work_new_thread,
916                   &slow_work_new_thread_ops);
917        slow_work_may_not_start_new_thread = false;
918        slow_work_cull = false;
919
920        /* start the minimum number of threads */
921        for (loop = 0; loop < slow_work_min_threads; loop++) {
922            atomic_inc(&slow_work_thread_count);
923            p = kthread_run(slow_work_thread, NULL, "kslowd");
924            if (IS_ERR(p))
925                goto error;
926        }
927        printk(KERN_NOTICE "Slow work thread pool: Ready\n");
928    }
929
930    slow_work_user_count++;
931    mutex_unlock(&slow_work_user_lock);
932    return 0;
933
934error:
935    if (atomic_dec_and_test(&slow_work_thread_count))
936        complete(&slow_work_last_thread_exited);
937    if (loop > 0) {
938        printk(KERN_ERR "Slow work thread pool:"
939               " Aborting startup on ENOMEM\n");
940        slow_work_threads_should_exit = true;
941        wake_up_all(&slow_work_thread_wq);
942        wait_for_completion(&slow_work_last_thread_exited);
943        printk(KERN_ERR "Slow work thread pool: Aborted\n");
944    }
945    mutex_unlock(&slow_work_user_lock);
946    return PTR_ERR(p);
947}
948EXPORT_SYMBOL(slow_work_register_user);
949
950/*
951 * wait for all outstanding items from the calling module to complete
952 * - note that more items may be queued whilst we're waiting
953 */
954static void slow_work_wait_for_items(struct module *module)
955{
956#ifdef CONFIG_MODULES
957    DECLARE_WAITQUEUE(myself, current);
958    struct slow_work *work;
959    int loop;
960
961    mutex_lock(&slow_work_unreg_sync_lock);
962    add_wait_queue(&slow_work_unreg_wq, &myself);
963
964    for (;;) {
965        spin_lock_irq(&slow_work_queue_lock);
966
967        /* first of all, we wait for the last queued item in each list
968         * to be processed */
969        list_for_each_entry_reverse(work, &vslow_work_queue, link) {
970            if (work->owner == module) {
971                set_current_state(TASK_UNINTERRUPTIBLE);
972                slow_work_unreg_work_item = work;
973                goto do_wait;
974            }
975        }
976        list_for_each_entry_reverse(work, &slow_work_queue, link) {
977            if (work->owner == module) {
978                set_current_state(TASK_UNINTERRUPTIBLE);
979                slow_work_unreg_work_item = work;
980                goto do_wait;
981            }
982        }
983
984        /* then we wait for the items being processed to finish */
985        slow_work_unreg_module = module;
986        smp_mb();
987        for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
988            if (slow_work_thread_processing[loop] == module)
989                goto do_wait;
990        }
991        spin_unlock_irq(&slow_work_queue_lock);
992        break; /* okay, we're done */
993
994    do_wait:
995        spin_unlock_irq(&slow_work_queue_lock);
996        schedule();
997        slow_work_unreg_work_item = NULL;
998        slow_work_unreg_module = NULL;
999    }
1000
1001    remove_wait_queue(&slow_work_unreg_wq, &myself);
1002    mutex_unlock(&slow_work_unreg_sync_lock);
1003#endif /* CONFIG_MODULES */
1004}
1005
1006/**
1007 * slow_work_unregister_user - Unregister a user of the facility
1008 * @module: The module whose items should be cleared
1009 *
1010 * Unregister a user of the facility, killing all the threads if this was the
1011 * last one.
1012 *
1013 * This waits for all the work items belonging to the nominated module to go
1014 * away before proceeding.
1015 */
1016void slow_work_unregister_user(struct module *module)
1017{
1018    /* first of all, wait for all outstanding items from the calling module
1019     * to complete */
1020    if (module)
1021        slow_work_wait_for_items(module);
1022
1023    /* then we can actually go about shutting down the facility if need
1024     * be */
1025    mutex_lock(&slow_work_user_lock);
1026
1027    BUG_ON(slow_work_user_count <= 0);
1028
1029    slow_work_user_count--;
1030    if (slow_work_user_count == 0) {
1031        printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1032        slow_work_threads_should_exit = true;
1033        del_timer_sync(&slow_work_cull_timer);
1034        del_timer_sync(&slow_work_oom_timer);
1035        wake_up_all(&slow_work_thread_wq);
1036        wait_for_completion(&slow_work_last_thread_exited);
1037        printk(KERN_NOTICE "Slow work thread pool:"
1038               " Shut down complete\n");
1039    }
1040
1041    mutex_unlock(&slow_work_user_lock);
1042}
1043EXPORT_SYMBOL(slow_work_unregister_user);
1044
1045/*
1046 * Initialise the slow work facility
1047 */
1048static int __init init_slow_work(void)
1049{
1050    unsigned nr_cpus = num_possible_cpus();
1051
1052    if (slow_work_max_threads < nr_cpus)
1053        slow_work_max_threads = nr_cpus;
1054#ifdef CONFIG_SYSCTL
1055    if (slow_work_max_max_threads < nr_cpus * 2)
1056        slow_work_max_max_threads = nr_cpus * 2;
1057#endif
1058#ifdef CONFIG_SLOW_WORK_DEBUG
1059    {
1060        struct dentry *dbdir;
1061
1062        dbdir = debugfs_create_dir("slow_work", NULL);
1063        if (dbdir && !IS_ERR(dbdir))
1064            debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
1065                        NULL, &slow_work_runqueue_fops);
1066    }
1067#endif
1068    return 0;
1069}
1070
1071subsys_initcall(init_slow_work);
1072

Archive Download this file



interactive