Root/drivers/md/dm-kcopyd.c

1/*
2 * Copyright (C) 2002 Sistina Software (UK) Limited.
3 * Copyright (C) 2006 Red Hat GmbH
4 *
5 * This file is released under the GPL.
6 *
7 * Kcopyd provides a simple interface for copying an area of one
8 * block-device to one or more other block-devices, with an asynchronous
9 * completion notification.
10 */
11
12#include <linux/types.h>
13#include <linux/atomic.h>
14#include <linux/blkdev.h>
15#include <linux/fs.h>
16#include <linux/init.h>
17#include <linux/list.h>
18#include <linux/mempool.h>
19#include <linux/module.h>
20#include <linux/pagemap.h>
21#include <linux/slab.h>
22#include <linux/vmalloc.h>
23#include <linux/workqueue.h>
24#include <linux/mutex.h>
25#include <linux/device-mapper.h>
26#include <linux/dm-kcopyd.h>
27
28#include "dm.h"
29
30#define SUB_JOB_SIZE 128
31#define SPLIT_COUNT 8
32#define MIN_JOBS 8
33#define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
34
35/*-----------------------------------------------------------------
36 * Each kcopyd client has its own little pool of preallocated
37 * pages for kcopyd io.
38 *---------------------------------------------------------------*/
39struct dm_kcopyd_client {
40    struct page_list *pages;
41    unsigned nr_reserved_pages;
42    unsigned nr_free_pages;
43
44    struct dm_io_client *io_client;
45
46    wait_queue_head_t destroyq;
47    atomic_t nr_jobs;
48
49    mempool_t *job_pool;
50
51    struct workqueue_struct *kcopyd_wq;
52    struct work_struct kcopyd_work;
53
54/*
55 * We maintain three lists of jobs:
56 *
57 * i) jobs waiting for pages
58 * ii) jobs that have pages, and are waiting for the io to be issued.
59 * iii) jobs that have completed.
60 *
61 * All three of these are protected by job_lock.
62 */
63    spinlock_t job_lock;
64    struct list_head complete_jobs;
65    struct list_head io_jobs;
66    struct list_head pages_jobs;
67};
68
69static struct page_list zero_page_list;
70
71static void wake(struct dm_kcopyd_client *kc)
72{
73    queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
74}
75
76/*
77 * Obtain one page for the use of kcopyd.
78 */
79static struct page_list *alloc_pl(gfp_t gfp)
80{
81    struct page_list *pl;
82
83    pl = kmalloc(sizeof(*pl), gfp);
84    if (!pl)
85        return NULL;
86
87    pl->page = alloc_page(gfp);
88    if (!pl->page) {
89        kfree(pl);
90        return NULL;
91    }
92
93    return pl;
94}
95
96static void free_pl(struct page_list *pl)
97{
98    __free_page(pl->page);
99    kfree(pl);
100}
101
102/*
103 * Add the provided pages to a client's free page list, releasing
104 * back to the system any beyond the reserved_pages limit.
105 */
106static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
107{
108    struct page_list *next;
109
110    do {
111        next = pl->next;
112
113        if (kc->nr_free_pages >= kc->nr_reserved_pages)
114            free_pl(pl);
115        else {
116            pl->next = kc->pages;
117            kc->pages = pl;
118            kc->nr_free_pages++;
119        }
120
121        pl = next;
122    } while (pl);
123}
124
125static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
126                unsigned int nr, struct page_list **pages)
127{
128    struct page_list *pl;
129
130    *pages = NULL;
131
132    do {
133        pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
134        if (unlikely(!pl)) {
135            /* Use reserved pages */
136            pl = kc->pages;
137            if (unlikely(!pl))
138                goto out_of_memory;
139            kc->pages = pl->next;
140            kc->nr_free_pages--;
141        }
142        pl->next = *pages;
143        *pages = pl;
144    } while (--nr);
145
146    return 0;
147
148out_of_memory:
149    if (*pages)
150        kcopyd_put_pages(kc, *pages);
151    return -ENOMEM;
152}
153
154/*
155 * These three functions resize the page pool.
156 */
157static void drop_pages(struct page_list *pl)
158{
159    struct page_list *next;
160
161    while (pl) {
162        next = pl->next;
163        free_pl(pl);
164        pl = next;
165    }
166}
167
168/*
169 * Allocate and reserve nr_pages for the use of a specific client.
170 */
171static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
172{
173    unsigned i;
174    struct page_list *pl = NULL, *next;
175
176    for (i = 0; i < nr_pages; i++) {
177        next = alloc_pl(GFP_KERNEL);
178        if (!next) {
179            if (pl)
180                drop_pages(pl);
181            return -ENOMEM;
182        }
183        next->next = pl;
184        pl = next;
185    }
186
187    kc->nr_reserved_pages += nr_pages;
188    kcopyd_put_pages(kc, pl);
189
190    return 0;
191}
192
193static void client_free_pages(struct dm_kcopyd_client *kc)
194{
195    BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
196    drop_pages(kc->pages);
197    kc->pages = NULL;
198    kc->nr_free_pages = kc->nr_reserved_pages = 0;
199}
200
201/*-----------------------------------------------------------------
202 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
203 * for this reason we use a mempool to prevent the client from
204 * ever having to do io (which could cause a deadlock).
205 *---------------------------------------------------------------*/
206struct kcopyd_job {
207    struct dm_kcopyd_client *kc;
208    struct list_head list;
209    unsigned long flags;
210
211    /*
212     * Error state of the job.
213     */
214    int read_err;
215    unsigned long write_err;
216
217    /*
218     * Either READ or WRITE
219     */
220    int rw;
221    struct dm_io_region source;
222
223    /*
224     * The destinations for the transfer.
225     */
226    unsigned int num_dests;
227    struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
228
229    struct page_list *pages;
230
231    /*
232     * Set this to ensure you are notified when the job has
233     * completed. 'context' is for callback to use.
234     */
235    dm_kcopyd_notify_fn fn;
236    void *context;
237
238    /*
239     * These fields are only used if the job has been split
240     * into more manageable parts.
241     */
242    struct mutex lock;
243    atomic_t sub_jobs;
244    sector_t progress;
245
246    struct kcopyd_job *master_job;
247};
248
249static struct kmem_cache *_job_cache;
250
251int __init dm_kcopyd_init(void)
252{
253    _job_cache = kmem_cache_create("kcopyd_job",
254                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
255                __alignof__(struct kcopyd_job), 0, NULL);
256    if (!_job_cache)
257        return -ENOMEM;
258
259    zero_page_list.next = &zero_page_list;
260    zero_page_list.page = ZERO_PAGE(0);
261
262    return 0;
263}
264
265void dm_kcopyd_exit(void)
266{
267    kmem_cache_destroy(_job_cache);
268    _job_cache = NULL;
269}
270
271/*
272 * Functions to push and pop a job onto the head of a given job
273 * list.
274 */
275static struct kcopyd_job *pop(struct list_head *jobs,
276                  struct dm_kcopyd_client *kc)
277{
278    struct kcopyd_job *job = NULL;
279    unsigned long flags;
280
281    spin_lock_irqsave(&kc->job_lock, flags);
282
283    if (!list_empty(jobs)) {
284        job = list_entry(jobs->next, struct kcopyd_job, list);
285        list_del(&job->list);
286    }
287    spin_unlock_irqrestore(&kc->job_lock, flags);
288
289    return job;
290}
291
292static void push(struct list_head *jobs, struct kcopyd_job *job)
293{
294    unsigned long flags;
295    struct dm_kcopyd_client *kc = job->kc;
296
297    spin_lock_irqsave(&kc->job_lock, flags);
298    list_add_tail(&job->list, jobs);
299    spin_unlock_irqrestore(&kc->job_lock, flags);
300}
301
302
303static void push_head(struct list_head *jobs, struct kcopyd_job *job)
304{
305    unsigned long flags;
306    struct dm_kcopyd_client *kc = job->kc;
307
308    spin_lock_irqsave(&kc->job_lock, flags);
309    list_add(&job->list, jobs);
310    spin_unlock_irqrestore(&kc->job_lock, flags);
311}
312
313/*
314 * These three functions process 1 item from the corresponding
315 * job list.
316 *
317 * They return:
318 * < 0: error
319 * 0: success
320 * > 0: can't process yet.
321 */
322static int run_complete_job(struct kcopyd_job *job)
323{
324    void *context = job->context;
325    int read_err = job->read_err;
326    unsigned long write_err = job->write_err;
327    dm_kcopyd_notify_fn fn = job->fn;
328    struct dm_kcopyd_client *kc = job->kc;
329
330    if (job->pages && job->pages != &zero_page_list)
331        kcopyd_put_pages(kc, job->pages);
332    /*
333     * If this is the master job, the sub jobs have already
334     * completed so we can free everything.
335     */
336    if (job->master_job == job)
337        mempool_free(job, kc->job_pool);
338    fn(read_err, write_err, context);
339
340    if (atomic_dec_and_test(&kc->nr_jobs))
341        wake_up(&kc->destroyq);
342
343    return 0;
344}
345
346static void complete_io(unsigned long error, void *context)
347{
348    struct kcopyd_job *job = (struct kcopyd_job *) context;
349    struct dm_kcopyd_client *kc = job->kc;
350
351    if (error) {
352        if (job->rw == WRITE)
353            job->write_err |= error;
354        else
355            job->read_err = 1;
356
357        if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
358            push(&kc->complete_jobs, job);
359            wake(kc);
360            return;
361        }
362    }
363
364    if (job->rw == WRITE)
365        push(&kc->complete_jobs, job);
366
367    else {
368        job->rw = WRITE;
369        push(&kc->io_jobs, job);
370    }
371
372    wake(kc);
373}
374
375/*
376 * Request io on as many buffer heads as we can currently get for
377 * a particular job.
378 */
379static int run_io_job(struct kcopyd_job *job)
380{
381    int r;
382    struct dm_io_request io_req = {
383        .bi_rw = job->rw,
384        .mem.type = DM_IO_PAGE_LIST,
385        .mem.ptr.pl = job->pages,
386        .mem.offset = 0,
387        .notify.fn = complete_io,
388        .notify.context = job,
389        .client = job->kc->io_client,
390    };
391
392    if (job->rw == READ)
393        r = dm_io(&io_req, 1, &job->source, NULL);
394    else
395        r = dm_io(&io_req, job->num_dests, job->dests, NULL);
396
397    return r;
398}
399
400static int run_pages_job(struct kcopyd_job *job)
401{
402    int r;
403    unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
404
405    r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
406    if (!r) {
407        /* this job is ready for io */
408        push(&job->kc->io_jobs, job);
409        return 0;
410    }
411
412    if (r == -ENOMEM)
413        /* can't complete now */
414        return 1;
415
416    return r;
417}
418
419/*
420 * Run through a list for as long as possible. Returns the count
421 * of successful jobs.
422 */
423static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
424            int (*fn) (struct kcopyd_job *))
425{
426    struct kcopyd_job *job;
427    int r, count = 0;
428
429    while ((job = pop(jobs, kc))) {
430
431        r = fn(job);
432
433        if (r < 0) {
434            /* error this rogue job */
435            if (job->rw == WRITE)
436                job->write_err = (unsigned long) -1L;
437            else
438                job->read_err = 1;
439            push(&kc->complete_jobs, job);
440            break;
441        }
442
443        if (r > 0) {
444            /*
445             * We couldn't service this job ATM, so
446             * push this job back onto the list.
447             */
448            push_head(jobs, job);
449            break;
450        }
451
452        count++;
453    }
454
455    return count;
456}
457
458/*
459 * kcopyd does this every time it's woken up.
460 */
461static void do_work(struct work_struct *work)
462{
463    struct dm_kcopyd_client *kc = container_of(work,
464                    struct dm_kcopyd_client, kcopyd_work);
465    struct blk_plug plug;
466
467    /*
468     * The order that these are called is *very* important.
469     * complete jobs can free some pages for pages jobs.
470     * Pages jobs when successful will jump onto the io jobs
471     * list. io jobs call wake when they complete and it all
472     * starts again.
473     */
474    blk_start_plug(&plug);
475    process_jobs(&kc->complete_jobs, kc, run_complete_job);
476    process_jobs(&kc->pages_jobs, kc, run_pages_job);
477    process_jobs(&kc->io_jobs, kc, run_io_job);
478    blk_finish_plug(&plug);
479}
480
481/*
482 * If we are copying a small region we just dispatch a single job
483 * to do the copy, otherwise the io has to be split up into many
484 * jobs.
485 */
486static void dispatch_job(struct kcopyd_job *job)
487{
488    struct dm_kcopyd_client *kc = job->kc;
489    atomic_inc(&kc->nr_jobs);
490    if (unlikely(!job->source.count))
491        push(&kc->complete_jobs, job);
492    else if (job->pages == &zero_page_list)
493        push(&kc->io_jobs, job);
494    else
495        push(&kc->pages_jobs, job);
496    wake(kc);
497}
498
499static void segment_complete(int read_err, unsigned long write_err,
500                 void *context)
501{
502    /* FIXME: tidy this function */
503    sector_t progress = 0;
504    sector_t count = 0;
505    struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
506    struct kcopyd_job *job = sub_job->master_job;
507    struct dm_kcopyd_client *kc = job->kc;
508
509    mutex_lock(&job->lock);
510
511    /* update the error */
512    if (read_err)
513        job->read_err = 1;
514
515    if (write_err)
516        job->write_err |= write_err;
517
518    /*
519     * Only dispatch more work if there hasn't been an error.
520     */
521    if ((!job->read_err && !job->write_err) ||
522        test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
523        /* get the next chunk of work */
524        progress = job->progress;
525        count = job->source.count - progress;
526        if (count) {
527            if (count > SUB_JOB_SIZE)
528                count = SUB_JOB_SIZE;
529
530            job->progress += count;
531        }
532    }
533    mutex_unlock(&job->lock);
534
535    if (count) {
536        int i;
537
538        *sub_job = *job;
539        sub_job->source.sector += progress;
540        sub_job->source.count = count;
541
542        for (i = 0; i < job->num_dests; i++) {
543            sub_job->dests[i].sector += progress;
544            sub_job->dests[i].count = count;
545        }
546
547        sub_job->fn = segment_complete;
548        sub_job->context = sub_job;
549        dispatch_job(sub_job);
550
551    } else if (atomic_dec_and_test(&job->sub_jobs)) {
552
553        /*
554         * Queue the completion callback to the kcopyd thread.
555         *
556         * Some callers assume that all the completions are called
557         * from a single thread and don't race with each other.
558         *
559         * We must not call the callback directly here because this
560         * code may not be executing in the thread.
561         */
562        push(&kc->complete_jobs, job);
563        wake(kc);
564    }
565}
566
567/*
568 * Create some sub jobs to share the work between them.
569 */
570static void split_job(struct kcopyd_job *master_job)
571{
572    int i;
573
574    atomic_inc(&master_job->kc->nr_jobs);
575
576    atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
577    for (i = 0; i < SPLIT_COUNT; i++) {
578        master_job[i + 1].master_job = master_job;
579        segment_complete(0, 0u, &master_job[i + 1]);
580    }
581}
582
583int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
584           unsigned int num_dests, struct dm_io_region *dests,
585           unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
586{
587    struct kcopyd_job *job;
588
589    /*
590     * Allocate an array of jobs consisting of one master job
591     * followed by SPLIT_COUNT sub jobs.
592     */
593    job = mempool_alloc(kc->job_pool, GFP_NOIO);
594
595    /*
596     * set up for the read.
597     */
598    job->kc = kc;
599    job->flags = flags;
600    job->read_err = 0;
601    job->write_err = 0;
602
603    job->num_dests = num_dests;
604    memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
605
606    if (from) {
607        job->source = *from;
608        job->pages = NULL;
609        job->rw = READ;
610    } else {
611        memset(&job->source, 0, sizeof job->source);
612        job->source.count = job->dests[0].count;
613        job->pages = &zero_page_list;
614        job->rw = WRITE;
615    }
616
617    job->fn = fn;
618    job->context = context;
619    job->master_job = job;
620
621    if (job->source.count <= SUB_JOB_SIZE)
622        dispatch_job(job);
623    else {
624        mutex_init(&job->lock);
625        job->progress = 0;
626        split_job(job);
627    }
628
629    return 0;
630}
631EXPORT_SYMBOL(dm_kcopyd_copy);
632
633int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
634           unsigned num_dests, struct dm_io_region *dests,
635           unsigned flags, dm_kcopyd_notify_fn fn, void *context)
636{
637    return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
638}
639EXPORT_SYMBOL(dm_kcopyd_zero);
640
641void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
642                 dm_kcopyd_notify_fn fn, void *context)
643{
644    struct kcopyd_job *job;
645
646    job = mempool_alloc(kc->job_pool, GFP_NOIO);
647
648    memset(job, 0, sizeof(struct kcopyd_job));
649    job->kc = kc;
650    job->fn = fn;
651    job->context = context;
652    job->master_job = job;
653
654    atomic_inc(&kc->nr_jobs);
655
656    return job;
657}
658EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
659
660void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
661{
662    struct kcopyd_job *job = j;
663    struct dm_kcopyd_client *kc = job->kc;
664
665    job->read_err = read_err;
666    job->write_err = write_err;
667
668    push(&kc->complete_jobs, job);
669    wake(kc);
670}
671EXPORT_SYMBOL(dm_kcopyd_do_callback);
672
673/*
674 * Cancels a kcopyd job, eg. someone might be deactivating a
675 * mirror.
676 */
677#if 0
678int kcopyd_cancel(struct kcopyd_job *job, int block)
679{
680    /* FIXME: finish */
681    return -1;
682}
683#endif /* 0 */
684
685/*-----------------------------------------------------------------
686 * Client setup
687 *---------------------------------------------------------------*/
688struct dm_kcopyd_client *dm_kcopyd_client_create(void)
689{
690    int r = -ENOMEM;
691    struct dm_kcopyd_client *kc;
692
693    kc = kmalloc(sizeof(*kc), GFP_KERNEL);
694    if (!kc)
695        return ERR_PTR(-ENOMEM);
696
697    spin_lock_init(&kc->job_lock);
698    INIT_LIST_HEAD(&kc->complete_jobs);
699    INIT_LIST_HEAD(&kc->io_jobs);
700    INIT_LIST_HEAD(&kc->pages_jobs);
701
702    kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
703    if (!kc->job_pool)
704        goto bad_slab;
705
706    INIT_WORK(&kc->kcopyd_work, do_work);
707    kc->kcopyd_wq = alloc_workqueue("kcopyd",
708                    WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
709    if (!kc->kcopyd_wq)
710        goto bad_workqueue;
711
712    kc->pages = NULL;
713    kc->nr_reserved_pages = kc->nr_free_pages = 0;
714    r = client_reserve_pages(kc, RESERVE_PAGES);
715    if (r)
716        goto bad_client_pages;
717
718    kc->io_client = dm_io_client_create();
719    if (IS_ERR(kc->io_client)) {
720        r = PTR_ERR(kc->io_client);
721        goto bad_io_client;
722    }
723
724    init_waitqueue_head(&kc->destroyq);
725    atomic_set(&kc->nr_jobs, 0);
726
727    return kc;
728
729bad_io_client:
730    client_free_pages(kc);
731bad_client_pages:
732    destroy_workqueue(kc->kcopyd_wq);
733bad_workqueue:
734    mempool_destroy(kc->job_pool);
735bad_slab:
736    kfree(kc);
737
738    return ERR_PTR(r);
739}
740EXPORT_SYMBOL(dm_kcopyd_client_create);
741
742void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
743{
744    /* Wait for completion of all jobs submitted by this client. */
745    wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
746
747    BUG_ON(!list_empty(&kc->complete_jobs));
748    BUG_ON(!list_empty(&kc->io_jobs));
749    BUG_ON(!list_empty(&kc->pages_jobs));
750    destroy_workqueue(kc->kcopyd_wq);
751    dm_io_client_destroy(kc->io_client);
752    client_free_pages(kc);
753    mempool_destroy(kc->job_pool);
754    kfree(kc);
755}
756EXPORT_SYMBOL(dm_kcopyd_client_destroy);
757

Archive Download this file



interactive