Root/fs/btrfs/async-thread.c

1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#include <linux/kthread.h>
20#include <linux/slab.h>
21#include <linux/list.h>
22#include <linux/spinlock.h>
23#include <linux/freezer.h>
24#include "async-thread.h"
25
26#define WORK_QUEUED_BIT 0
27#define WORK_DONE_BIT 1
28#define WORK_ORDER_DONE_BIT 2
29#define WORK_HIGH_PRIO_BIT 3
30
31/*
32 * container for the kthread task pointer and the list of pending work
33 * One of these is allocated per thread.
34 */
35struct btrfs_worker_thread {
36    /* pool we belong to */
37    struct btrfs_workers *workers;
38
39    /* list of struct btrfs_work that are waiting for service */
40    struct list_head pending;
41    struct list_head prio_pending;
42
43    /* list of worker threads from struct btrfs_workers */
44    struct list_head worker_list;
45
46    /* kthread */
47    struct task_struct *task;
48
49    /* number of things on the pending list */
50    atomic_t num_pending;
51
52    /* reference counter for this struct */
53    atomic_t refs;
54
55    unsigned long sequence;
56
57    /* protects the pending list. */
58    spinlock_t lock;
59
60    /* set to non-zero when this thread is already awake and kicking */
61    int working;
62
63    /* are we currently idle */
64    int idle;
65};
66
67/*
68 * btrfs_start_workers uses kthread_run, which can block waiting for memory
69 * for a very long time. It will actually throttle on page writeback,
70 * and so it may not make progress until after our btrfs worker threads
71 * process all of the pending work structs in their queue
72 *
73 * This means we can't use btrfs_start_workers from inside a btrfs worker
74 * thread that is used as part of cleaning dirty memory, which pretty much
75 * involves all of the worker threads.
76 *
77 * Instead we have a helper queue who never has more than one thread
78 * where we scheduler thread start operations. This worker_start struct
79 * is used to contain the work and hold a pointer to the queue that needs
80 * another worker.
81 */
82struct worker_start {
83    struct btrfs_work work;
84    struct btrfs_workers *queue;
85};
86
87static void start_new_worker_func(struct btrfs_work *work)
88{
89    struct worker_start *start;
90    start = container_of(work, struct worker_start, work);
91    btrfs_start_workers(start->queue, 1);
92    kfree(start);
93}
94
95static int start_new_worker(struct btrfs_workers *queue)
96{
97    struct worker_start *start;
98    int ret;
99
100    start = kzalloc(sizeof(*start), GFP_NOFS);
101    if (!start)
102        return -ENOMEM;
103
104    start->work.func = start_new_worker_func;
105    start->queue = queue;
106    ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
107    if (ret)
108        kfree(start);
109    return ret;
110}
111
112/*
113 * helper function to move a thread onto the idle list after it
114 * has finished some requests.
115 */
116static void check_idle_worker(struct btrfs_worker_thread *worker)
117{
118    if (!worker->idle && atomic_read(&worker->num_pending) <
119        worker->workers->idle_thresh / 2) {
120        unsigned long flags;
121        spin_lock_irqsave(&worker->workers->lock, flags);
122        worker->idle = 1;
123
124        /* the list may be empty if the worker is just starting */
125        if (!list_empty(&worker->worker_list)) {
126            list_move(&worker->worker_list,
127                 &worker->workers->idle_list);
128        }
129        spin_unlock_irqrestore(&worker->workers->lock, flags);
130    }
131}
132
133/*
134 * helper function to move a thread off the idle list after new
135 * pending work is added.
136 */
137static void check_busy_worker(struct btrfs_worker_thread *worker)
138{
139    if (worker->idle && atomic_read(&worker->num_pending) >=
140        worker->workers->idle_thresh) {
141        unsigned long flags;
142        spin_lock_irqsave(&worker->workers->lock, flags);
143        worker->idle = 0;
144
145        if (!list_empty(&worker->worker_list)) {
146            list_move_tail(&worker->worker_list,
147                      &worker->workers->worker_list);
148        }
149        spin_unlock_irqrestore(&worker->workers->lock, flags);
150    }
151}
152
153static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
154{
155    struct btrfs_workers *workers = worker->workers;
156    unsigned long flags;
157
158    rmb();
159    if (!workers->atomic_start_pending)
160        return;
161
162    spin_lock_irqsave(&workers->lock, flags);
163    if (!workers->atomic_start_pending)
164        goto out;
165
166    workers->atomic_start_pending = 0;
167    if (workers->num_workers + workers->num_workers_starting >=
168        workers->max_workers)
169        goto out;
170
171    workers->num_workers_starting += 1;
172    spin_unlock_irqrestore(&workers->lock, flags);
173    start_new_worker(workers);
174    return;
175
176out:
177    spin_unlock_irqrestore(&workers->lock, flags);
178}
179
180static noinline int run_ordered_completions(struct btrfs_workers *workers,
181                        struct btrfs_work *work)
182{
183    if (!workers->ordered)
184        return 0;
185
186    set_bit(WORK_DONE_BIT, &work->flags);
187
188    spin_lock(&workers->order_lock);
189
190    while (1) {
191        if (!list_empty(&workers->prio_order_list)) {
192            work = list_entry(workers->prio_order_list.next,
193                      struct btrfs_work, order_list);
194        } else if (!list_empty(&workers->order_list)) {
195            work = list_entry(workers->order_list.next,
196                      struct btrfs_work, order_list);
197        } else {
198            break;
199        }
200        if (!test_bit(WORK_DONE_BIT, &work->flags))
201            break;
202
203        /* we are going to call the ordered done function, but
204         * we leave the work item on the list as a barrier so
205         * that later work items that are done don't have their
206         * functions called before this one returns
207         */
208        if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
209            break;
210
211        spin_unlock(&workers->order_lock);
212
213        work->ordered_func(work);
214
215        /* now take the lock again and call the freeing code */
216        spin_lock(&workers->order_lock);
217        list_del(&work->order_list);
218        work->ordered_free(work);
219    }
220
221    spin_unlock(&workers->order_lock);
222    return 0;
223}
224
225static void put_worker(struct btrfs_worker_thread *worker)
226{
227    if (atomic_dec_and_test(&worker->refs))
228        kfree(worker);
229}
230
231static int try_worker_shutdown(struct btrfs_worker_thread *worker)
232{
233    int freeit = 0;
234
235    spin_lock_irq(&worker->lock);
236    spin_lock(&worker->workers->lock);
237    if (worker->workers->num_workers > 1 &&
238        worker->idle &&
239        !worker->working &&
240        !list_empty(&worker->worker_list) &&
241        list_empty(&worker->prio_pending) &&
242        list_empty(&worker->pending) &&
243        atomic_read(&worker->num_pending) == 0) {
244        freeit = 1;
245        list_del_init(&worker->worker_list);
246        worker->workers->num_workers--;
247    }
248    spin_unlock(&worker->workers->lock);
249    spin_unlock_irq(&worker->lock);
250
251    if (freeit)
252        put_worker(worker);
253    return freeit;
254}
255
256static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
257                    struct list_head *prio_head,
258                    struct list_head *head)
259{
260    struct btrfs_work *work = NULL;
261    struct list_head *cur = NULL;
262
263    if(!list_empty(prio_head))
264        cur = prio_head->next;
265
266    smp_mb();
267    if (!list_empty(&worker->prio_pending))
268        goto refill;
269
270    if (!list_empty(head))
271        cur = head->next;
272
273    if (cur)
274        goto out;
275
276refill:
277    spin_lock_irq(&worker->lock);
278    list_splice_tail_init(&worker->prio_pending, prio_head);
279    list_splice_tail_init(&worker->pending, head);
280
281    if (!list_empty(prio_head))
282        cur = prio_head->next;
283    else if (!list_empty(head))
284        cur = head->next;
285    spin_unlock_irq(&worker->lock);
286
287    if (!cur)
288        goto out_fail;
289
290out:
291    work = list_entry(cur, struct btrfs_work, list);
292
293out_fail:
294    return work;
295}
296
297/*
298 * main loop for servicing work items
299 */
300static int worker_loop(void *arg)
301{
302    struct btrfs_worker_thread *worker = arg;
303    struct list_head head;
304    struct list_head prio_head;
305    struct btrfs_work *work;
306
307    INIT_LIST_HEAD(&head);
308    INIT_LIST_HEAD(&prio_head);
309
310    do {
311again:
312        while (1) {
313
314
315            work = get_next_work(worker, &prio_head, &head);
316            if (!work)
317                break;
318
319            list_del(&work->list);
320            clear_bit(WORK_QUEUED_BIT, &work->flags);
321
322            work->worker = worker;
323
324            work->func(work);
325
326            atomic_dec(&worker->num_pending);
327            /*
328             * unless this is an ordered work queue,
329             * 'work' was probably freed by func above.
330             */
331            run_ordered_completions(worker->workers, work);
332
333            check_pending_worker_creates(worker);
334
335        }
336
337        spin_lock_irq(&worker->lock);
338        check_idle_worker(worker);
339
340        if (freezing(current)) {
341            worker->working = 0;
342            spin_unlock_irq(&worker->lock);
343            refrigerator();
344        } else {
345            spin_unlock_irq(&worker->lock);
346            if (!kthread_should_stop()) {
347                cpu_relax();
348                /*
349                 * we've dropped the lock, did someone else
350                 * jump_in?
351                 */
352                smp_mb();
353                if (!list_empty(&worker->pending) ||
354                    !list_empty(&worker->prio_pending))
355                    continue;
356
357                /*
358                 * this short schedule allows more work to
359                 * come in without the queue functions
360                 * needing to go through wake_up_process()
361                 *
362                 * worker->working is still 1, so nobody
363                 * is going to try and wake us up
364                 */
365                schedule_timeout(1);
366                smp_mb();
367                if (!list_empty(&worker->pending) ||
368                    !list_empty(&worker->prio_pending))
369                    continue;
370
371                if (kthread_should_stop())
372                    break;
373
374                /* still no more work?, sleep for real */
375                spin_lock_irq(&worker->lock);
376                set_current_state(TASK_INTERRUPTIBLE);
377                if (!list_empty(&worker->pending) ||
378                    !list_empty(&worker->prio_pending)) {
379                    spin_unlock_irq(&worker->lock);
380                    goto again;
381                }
382
383                /*
384                 * this makes sure we get a wakeup when someone
385                 * adds something new to the queue
386                 */
387                worker->working = 0;
388                spin_unlock_irq(&worker->lock);
389
390                if (!kthread_should_stop()) {
391                    schedule_timeout(HZ * 120);
392                    if (!worker->working &&
393                        try_worker_shutdown(worker)) {
394                        return 0;
395                    }
396                }
397            }
398            __set_current_state(TASK_RUNNING);
399        }
400    } while (!kthread_should_stop());
401    return 0;
402}
403
404/*
405 * this will wait for all the worker threads to shutdown
406 */
407int btrfs_stop_workers(struct btrfs_workers *workers)
408{
409    struct list_head *cur;
410    struct btrfs_worker_thread *worker;
411    int can_stop;
412
413    spin_lock_irq(&workers->lock);
414    list_splice_init(&workers->idle_list, &workers->worker_list);
415    while (!list_empty(&workers->worker_list)) {
416        cur = workers->worker_list.next;
417        worker = list_entry(cur, struct btrfs_worker_thread,
418                    worker_list);
419
420        atomic_inc(&worker->refs);
421        workers->num_workers -= 1;
422        if (!list_empty(&worker->worker_list)) {
423            list_del_init(&worker->worker_list);
424            put_worker(worker);
425            can_stop = 1;
426        } else
427            can_stop = 0;
428        spin_unlock_irq(&workers->lock);
429        if (can_stop)
430            kthread_stop(worker->task);
431        spin_lock_irq(&workers->lock);
432        put_worker(worker);
433    }
434    spin_unlock_irq(&workers->lock);
435    return 0;
436}
437
438/*
439 * simple init on struct btrfs_workers
440 */
441void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
442            struct btrfs_workers *async_helper)
443{
444    workers->num_workers = 0;
445    workers->num_workers_starting = 0;
446    INIT_LIST_HEAD(&workers->worker_list);
447    INIT_LIST_HEAD(&workers->idle_list);
448    INIT_LIST_HEAD(&workers->order_list);
449    INIT_LIST_HEAD(&workers->prio_order_list);
450    spin_lock_init(&workers->lock);
451    spin_lock_init(&workers->order_lock);
452    workers->max_workers = max;
453    workers->idle_thresh = 32;
454    workers->name = name;
455    workers->ordered = 0;
456    workers->atomic_start_pending = 0;
457    workers->atomic_worker_start = async_helper;
458}
459
460/*
461 * starts new worker threads. This does not enforce the max worker
462 * count in case you need to temporarily go past it.
463 */
464static int __btrfs_start_workers(struct btrfs_workers *workers,
465                 int num_workers)
466{
467    struct btrfs_worker_thread *worker;
468    int ret = 0;
469    int i;
470
471    for (i = 0; i < num_workers; i++) {
472        worker = kzalloc(sizeof(*worker), GFP_NOFS);
473        if (!worker) {
474            ret = -ENOMEM;
475            goto fail;
476        }
477
478        INIT_LIST_HEAD(&worker->pending);
479        INIT_LIST_HEAD(&worker->prio_pending);
480        INIT_LIST_HEAD(&worker->worker_list);
481        spin_lock_init(&worker->lock);
482
483        atomic_set(&worker->num_pending, 0);
484        atomic_set(&worker->refs, 1);
485        worker->workers = workers;
486        worker->task = kthread_run(worker_loop, worker,
487                       "btrfs-%s-%d", workers->name,
488                       workers->num_workers + i);
489        if (IS_ERR(worker->task)) {
490            ret = PTR_ERR(worker->task);
491            kfree(worker);
492            goto fail;
493        }
494        spin_lock_irq(&workers->lock);
495        list_add_tail(&worker->worker_list, &workers->idle_list);
496        worker->idle = 1;
497        workers->num_workers++;
498        workers->num_workers_starting--;
499        WARN_ON(workers->num_workers_starting < 0);
500        spin_unlock_irq(&workers->lock);
501    }
502    return 0;
503fail:
504    btrfs_stop_workers(workers);
505    return ret;
506}
507
508int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
509{
510    spin_lock_irq(&workers->lock);
511    workers->num_workers_starting += num_workers;
512    spin_unlock_irq(&workers->lock);
513    return __btrfs_start_workers(workers, num_workers);
514}
515
516/*
517 * run through the list and find a worker thread that doesn't have a lot
518 * to do right now. This can return null if we aren't yet at the thread
519 * count limit and all of the threads are busy.
520 */
521static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
522{
523    struct btrfs_worker_thread *worker;
524    struct list_head *next;
525    int enforce_min;
526
527    enforce_min = (workers->num_workers + workers->num_workers_starting) <
528        workers->max_workers;
529
530    /*
531     * if we find an idle thread, don't move it to the end of the
532     * idle list. This improves the chance that the next submission
533     * will reuse the same thread, and maybe catch it while it is still
534     * working
535     */
536    if (!list_empty(&workers->idle_list)) {
537        next = workers->idle_list.next;
538        worker = list_entry(next, struct btrfs_worker_thread,
539                    worker_list);
540        return worker;
541    }
542    if (enforce_min || list_empty(&workers->worker_list))
543        return NULL;
544
545    /*
546     * if we pick a busy task, move the task to the end of the list.
547     * hopefully this will keep things somewhat evenly balanced.
548     * Do the move in batches based on the sequence number. This groups
549     * requests submitted at roughly the same time onto the same worker.
550     */
551    next = workers->worker_list.next;
552    worker = list_entry(next, struct btrfs_worker_thread, worker_list);
553    worker->sequence++;
554
555    if (worker->sequence % workers->idle_thresh == 0)
556        list_move_tail(next, &workers->worker_list);
557    return worker;
558}
559
560/*
561 * selects a worker thread to take the next job. This will either find
562 * an idle worker, start a new worker up to the max count, or just return
563 * one of the existing busy workers.
564 */
565static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
566{
567    struct btrfs_worker_thread *worker;
568    unsigned long flags;
569    struct list_head *fallback;
570
571again:
572    spin_lock_irqsave(&workers->lock, flags);
573    worker = next_worker(workers);
574
575    if (!worker) {
576        if (workers->num_workers + workers->num_workers_starting >=
577            workers->max_workers) {
578            goto fallback;
579        } else if (workers->atomic_worker_start) {
580            workers->atomic_start_pending = 1;
581            goto fallback;
582        } else {
583            workers->num_workers_starting++;
584            spin_unlock_irqrestore(&workers->lock, flags);
585            /* we're below the limit, start another worker */
586            __btrfs_start_workers(workers, 1);
587            goto again;
588        }
589    }
590    goto found;
591
592fallback:
593    fallback = NULL;
594    /*
595     * we have failed to find any workers, just
596     * return the first one we can find.
597     */
598    if (!list_empty(&workers->worker_list))
599        fallback = workers->worker_list.next;
600    if (!list_empty(&workers->idle_list))
601        fallback = workers->idle_list.next;
602    BUG_ON(!fallback);
603    worker = list_entry(fallback,
604          struct btrfs_worker_thread, worker_list);
605found:
606    /*
607     * this makes sure the worker doesn't exit before it is placed
608     * onto a busy/idle list
609     */
610    atomic_inc(&worker->num_pending);
611    spin_unlock_irqrestore(&workers->lock, flags);
612    return worker;
613}
614
615/*
616 * btrfs_requeue_work just puts the work item back on the tail of the list
617 * it was taken from. It is intended for use with long running work functions
618 * that make some progress and want to give the cpu up for others.
619 */
620int btrfs_requeue_work(struct btrfs_work *work)
621{
622    struct btrfs_worker_thread *worker = work->worker;
623    unsigned long flags;
624    int wake = 0;
625
626    if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
627        goto out;
628
629    spin_lock_irqsave(&worker->lock, flags);
630    if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
631        list_add_tail(&work->list, &worker->prio_pending);
632    else
633        list_add_tail(&work->list, &worker->pending);
634    atomic_inc(&worker->num_pending);
635
636    /* by definition we're busy, take ourselves off the idle
637     * list
638     */
639    if (worker->idle) {
640        spin_lock(&worker->workers->lock);
641        worker->idle = 0;
642        list_move_tail(&worker->worker_list,
643                  &worker->workers->worker_list);
644        spin_unlock(&worker->workers->lock);
645    }
646    if (!worker->working) {
647        wake = 1;
648        worker->working = 1;
649    }
650
651    if (wake)
652        wake_up_process(worker->task);
653    spin_unlock_irqrestore(&worker->lock, flags);
654out:
655
656    return 0;
657}
658
659void btrfs_set_work_high_prio(struct btrfs_work *work)
660{
661    set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
662}
663
664/*
665 * places a struct btrfs_work into the pending queue of one of the kthreads
666 */
667int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
668{
669    struct btrfs_worker_thread *worker;
670    unsigned long flags;
671    int wake = 0;
672
673    /* don't requeue something already on a list */
674    if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
675        goto out;
676
677    worker = find_worker(workers);
678    if (workers->ordered) {
679        /*
680         * you're not allowed to do ordered queues from an
681         * interrupt handler
682         */
683        spin_lock(&workers->order_lock);
684        if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
685            list_add_tail(&work->order_list,
686                      &workers->prio_order_list);
687        } else {
688            list_add_tail(&work->order_list, &workers->order_list);
689        }
690        spin_unlock(&workers->order_lock);
691    } else {
692        INIT_LIST_HEAD(&work->order_list);
693    }
694
695    spin_lock_irqsave(&worker->lock, flags);
696
697    if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
698        list_add_tail(&work->list, &worker->prio_pending);
699    else
700        list_add_tail(&work->list, &worker->pending);
701    check_busy_worker(worker);
702
703    /*
704     * avoid calling into wake_up_process if this thread has already
705     * been kicked
706     */
707    if (!worker->working)
708        wake = 1;
709    worker->working = 1;
710
711    if (wake)
712        wake_up_process(worker->task);
713    spin_unlock_irqrestore(&worker->lock, flags);
714
715out:
716    return 0;
717}
718

Archive Download this file



interactive