Root/drivers/block/rbd.c

1/*
2   rbd.c -- Export ceph rados objects as a Linux block device
3
4
5   based on drivers/block/osdblk.c:
6
7   Copyright 2009 Red Hat, Inc.
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program; see the file COPYING. If not, write to
20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24   For usage instructions, please refer to:
25
26                 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69#define DEV_NAME_LEN 32
70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78    u64 image_size;
79    char *object_prefix;
80    __u8 obj_order;
81    __u8 crypt_type;
82    __u8 comp_type;
83    struct ceph_snap_context *snapc;
84    size_t snap_names_len;
85    u32 total_snaps;
86
87    char *snap_names;
88    u64 *snap_sizes;
89
90    u64 obj_version;
91};
92
93struct rbd_options {
94    int notify_timeout;
95};
96
97/*
98 * an instance of the client. multiple devices may share an rbd client.
99 */
100struct rbd_client {
101    struct ceph_client *client;
102    struct rbd_options *rbd_opts;
103    struct kref kref;
104    struct list_head node;
105};
106
107/*
108 * a request completion status
109 */
110struct rbd_req_status {
111    int done;
112    int rc;
113    u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120    int total;
121    int num_done;
122    struct kref kref;
123    struct rbd_req_status status[0];
124};
125
126/*
127 * a single io request
128 */
129struct rbd_request {
130    struct request *rq; /* blk layer request */
131    struct bio *bio; /* cloned bio */
132    struct page **pages; /* list of used pages */
133    u64 len;
134    int coll_index;
135    struct rbd_req_coll *coll;
136};
137
138struct rbd_snap {
139    struct device dev;
140    const char *name;
141    u64 size;
142    struct list_head node;
143    u64 id;
144};
145
146/*
147 * a single device
148 */
149struct rbd_device {
150    int dev_id; /* blkdev unique id */
151
152    int major; /* blkdev assigned major */
153    struct gendisk *disk; /* blkdev's gendisk and rq */
154    struct request_queue *q;
155
156    struct rbd_client *rbd_client;
157
158    char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160    spinlock_t lock; /* queue lock */
161
162    struct rbd_image_header header;
163    char *image_name;
164    size_t image_name_len;
165    char *header_name;
166    char *pool_name;
167    int pool_id;
168
169    struct ceph_osd_event *watch_event;
170    struct ceph_osd_request *watch_request;
171
172    /* protects updating the header */
173    struct rw_semaphore header_rwsem;
174    /* name of the snapshot this device reads from */
175    char *snap_name;
176    /* id of the snapshot this device reads from */
177    u64 snap_id; /* current snapshot id */
178    /* whether the snap_id this device reads from still exists */
179    bool snap_exists;
180    int read_only;
181
182    struct list_head node;
183
184    /* list of snapshots */
185    struct list_head snaps;
186
187    /* sysfs related */
188    struct device dev;
189};
190
191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
193static LIST_HEAD(rbd_dev_list); /* devices */
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
201static ssize_t rbd_snap_add(struct device *dev,
202                struct device_attribute *attr,
203                const char *buf,
204                size_t count);
205static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208               size_t count);
209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210              size_t count);
211
212static struct bus_attribute rbd_bus_attrs[] = {
213    __ATTR(add, S_IWUSR, NULL, rbd_add),
214    __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215    __ATTR_NULL
216};
217
218static struct bus_type rbd_bus_type = {
219    .name = "rbd",
220    .bus_attrs = rbd_bus_attrs,
221};
222
223static void rbd_root_dev_release(struct device *dev)
224{
225}
226
227static struct device rbd_root_dev = {
228    .init_name = "rbd",
229    .release = rbd_root_dev_release,
230};
231
232
233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234{
235    return get_device(&rbd_dev->dev);
236}
237
238static void rbd_put_dev(struct rbd_device *rbd_dev)
239{
240    put_device(&rbd_dev->dev);
241}
242
243static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{
247    struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249    if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250        return -EROFS;
251
252    rbd_get_dev(rbd_dev);
253    set_device_ro(bdev, rbd_dev->read_only);
254
255    return 0;
256}
257
258static int rbd_release(struct gendisk *disk, fmode_t mode)
259{
260    struct rbd_device *rbd_dev = disk->private_data;
261
262    rbd_put_dev(rbd_dev);
263
264    return 0;
265}
266
267static const struct block_device_operations rbd_bd_ops = {
268    .owner = THIS_MODULE,
269    .open = rbd_open,
270    .release = rbd_release,
271};
272
273/*
274 * Initialize an rbd client instance.
275 * We own *ceph_opts.
276 */
277static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278                        struct rbd_options *rbd_opts)
279{
280    struct rbd_client *rbdc;
281    int ret = -ENOMEM;
282
283    dout("rbd_client_create\n");
284    rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285    if (!rbdc)
286        goto out_opt;
287
288    kref_init(&rbdc->kref);
289    INIT_LIST_HEAD(&rbdc->node);
290
291    mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293    rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294    if (IS_ERR(rbdc->client))
295        goto out_mutex;
296    ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298    ret = ceph_open_session(rbdc->client);
299    if (ret < 0)
300        goto out_err;
301
302    rbdc->rbd_opts = rbd_opts;
303
304    spin_lock(&rbd_client_list_lock);
305    list_add_tail(&rbdc->node, &rbd_client_list);
306    spin_unlock(&rbd_client_list_lock);
307
308    mutex_unlock(&ctl_mutex);
309
310    dout("rbd_client_create created %p\n", rbdc);
311    return rbdc;
312
313out_err:
314    ceph_destroy_client(rbdc->client);
315out_mutex:
316    mutex_unlock(&ctl_mutex);
317    kfree(rbdc);
318out_opt:
319    if (ceph_opts)
320        ceph_destroy_options(ceph_opts);
321    return ERR_PTR(ret);
322}
323
324/*
325 * Find a ceph client with specific addr and configuration.
326 */
327static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328{
329    struct rbd_client *client_node;
330
331    if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332        return NULL;
333
334    list_for_each_entry(client_node, &rbd_client_list, node)
335        if (!ceph_compare_options(ceph_opts, client_node->client))
336            return client_node;
337    return NULL;
338}
339
340/*
341 * mount options
342 */
343enum {
344    Opt_notify_timeout,
345    Opt_last_int,
346    /* int args above */
347    Opt_last_string,
348    /* string args above */
349};
350
351static match_table_t rbd_opts_tokens = {
352    {Opt_notify_timeout, "notify_timeout=%d"},
353    /* int args above */
354    /* string args above */
355    {-1, NULL}
356};
357
358static int parse_rbd_opts_token(char *c, void *private)
359{
360    struct rbd_options *rbd_opts = private;
361    substring_t argstr[MAX_OPT_ARGS];
362    int token, intval, ret;
363
364    token = match_token(c, rbd_opts_tokens, argstr);
365    if (token < 0)
366        return -EINVAL;
367
368    if (token < Opt_last_int) {
369        ret = match_int(&argstr[0], &intval);
370        if (ret < 0) {
371            pr_err("bad mount option arg (not int) "
372                   "at '%s'\n", c);
373            return ret;
374        }
375        dout("got int token %d val %d\n", token, intval);
376    } else if (token > Opt_last_int && token < Opt_last_string) {
377        dout("got string token %d val %s\n", token,
378             argstr[0].from);
379    } else {
380        dout("got token %d\n", token);
381    }
382
383    switch (token) {
384    case Opt_notify_timeout:
385        rbd_opts->notify_timeout = intval;
386        break;
387    default:
388        BUG_ON(token);
389    }
390    return 0;
391}
392
393/*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
397static struct rbd_client *rbd_get_client(const char *mon_addr,
398                     size_t mon_addr_len,
399                     char *options)
400{
401    struct rbd_client *rbdc;
402    struct ceph_options *ceph_opts;
403    struct rbd_options *rbd_opts;
404
405    rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406    if (!rbd_opts)
407        return ERR_PTR(-ENOMEM);
408
409    rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411    ceph_opts = ceph_parse_options(options, mon_addr,
412                    mon_addr + mon_addr_len,
413                    parse_rbd_opts_token, rbd_opts);
414    if (IS_ERR(ceph_opts)) {
415        kfree(rbd_opts);
416        return ERR_CAST(ceph_opts);
417    }
418
419    spin_lock(&rbd_client_list_lock);
420    rbdc = __rbd_client_find(ceph_opts);
421    if (rbdc) {
422        /* using an existing client */
423        kref_get(&rbdc->kref);
424        spin_unlock(&rbd_client_list_lock);
425
426        ceph_destroy_options(ceph_opts);
427        kfree(rbd_opts);
428
429        return rbdc;
430    }
431    spin_unlock(&rbd_client_list_lock);
432
433    rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435    if (IS_ERR(rbdc))
436        kfree(rbd_opts);
437
438    return rbdc;
439}
440
441/*
442 * Destroy ceph client
443 *
444 * Caller must hold rbd_client_list_lock.
445 */
446static void rbd_client_release(struct kref *kref)
447{
448    struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450    dout("rbd_release_client %p\n", rbdc);
451    spin_lock(&rbd_client_list_lock);
452    list_del(&rbdc->node);
453    spin_unlock(&rbd_client_list_lock);
454
455    ceph_destroy_client(rbdc->client);
456    kfree(rbdc->rbd_opts);
457    kfree(rbdc);
458}
459
460/*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464static void rbd_put_client(struct rbd_device *rbd_dev)
465{
466    kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467    rbd_dev->rbd_client = NULL;
468}
469
470/*
471 * Destroy requests collection
472 */
473static void rbd_coll_release(struct kref *kref)
474{
475    struct rbd_req_coll *coll =
476        container_of(kref, struct rbd_req_coll, kref);
477
478    dout("rbd_coll_release %p\n", coll);
479    kfree(coll);
480}
481
482static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483{
484    return !memcmp(&ondisk->text,
485            RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486}
487
488/*
489 * Create a new header structure, translate header format from the on-disk
490 * header.
491 */
492static int rbd_header_from_disk(struct rbd_image_header *header,
493                 struct rbd_image_header_ondisk *ondisk,
494                 u32 allocated_snaps)
495{
496    u32 snap_count;
497
498    if (!rbd_dev_ondisk_valid(ondisk))
499        return -ENXIO;
500
501    snap_count = le32_to_cpu(ondisk->snap_count);
502    if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
503                 / sizeof (u64))
504        return -EINVAL;
505    header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506                snap_count * sizeof(u64),
507                GFP_KERNEL);
508    if (!header->snapc)
509        return -ENOMEM;
510
511    if (snap_count) {
512        header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513        header->snap_names = kmalloc(header->snap_names_len,
514                         GFP_KERNEL);
515        if (!header->snap_names)
516            goto err_snapc;
517        header->snap_sizes = kmalloc(snap_count * sizeof(u64),
518                         GFP_KERNEL);
519        if (!header->snap_sizes)
520            goto err_names;
521    } else {
522        WARN_ON(ondisk->snap_names_len);
523        header->snap_names_len = 0;
524        header->snap_names = NULL;
525        header->snap_sizes = NULL;
526    }
527
528    header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529                    GFP_KERNEL);
530    if (!header->object_prefix)
531        goto err_sizes;
532
533    memcpy(header->object_prefix, ondisk->block_name,
534           sizeof(ondisk->block_name));
535    header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536
537    header->image_size = le64_to_cpu(ondisk->image_size);
538    header->obj_order = ondisk->options.order;
539    header->crypt_type = ondisk->options.crypt_type;
540    header->comp_type = ondisk->options.comp_type;
541
542    atomic_set(&header->snapc->nref, 1);
543    header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544    header->snapc->num_snaps = snap_count;
545    header->total_snaps = snap_count;
546
547    if (snap_count && allocated_snaps == snap_count) {
548        int i;
549
550        for (i = 0; i < snap_count; i++) {
551            header->snapc->snaps[i] =
552                le64_to_cpu(ondisk->snaps[i].id);
553            header->snap_sizes[i] =
554                le64_to_cpu(ondisk->snaps[i].image_size);
555        }
556
557        /* copy snapshot names */
558        memcpy(header->snap_names, &ondisk->snaps[snap_count],
559            header->snap_names_len);
560    }
561
562    return 0;
563
564err_sizes:
565    kfree(header->snap_sizes);
566    header->snap_sizes = NULL;
567err_names:
568    kfree(header->snap_names);
569    header->snap_names = NULL;
570err_snapc:
571    kfree(header->snapc);
572    header->snapc = NULL;
573
574    return -ENOMEM;
575}
576
577static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
578            u64 *seq, u64 *size)
579{
580    int i;
581    char *p = header->snap_names;
582
583    for (i = 0; i < header->total_snaps; i++) {
584        if (!strcmp(snap_name, p)) {
585
586            /* Found it. Pass back its id and/or size */
587
588            if (seq)
589                *seq = header->snapc->snaps[i];
590            if (size)
591                *size = header->snap_sizes[i];
592            return i;
593        }
594        p += strlen(p) + 1; /* Skip ahead to the next name */
595    }
596    return -ENOENT;
597}
598
599static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
600{
601    int ret;
602
603    down_write(&rbd_dev->header_rwsem);
604
605    if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606            sizeof (RBD_SNAP_HEAD_NAME))) {
607        rbd_dev->snap_id = CEPH_NOSNAP;
608        rbd_dev->snap_exists = false;
609        rbd_dev->read_only = 0;
610        if (size)
611            *size = rbd_dev->header.image_size;
612    } else {
613        u64 snap_id = 0;
614
615        ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
616                    &snap_id, size);
617        if (ret < 0)
618            goto done;
619        rbd_dev->snap_id = snap_id;
620        rbd_dev->snap_exists = true;
621        rbd_dev->read_only = 1;
622    }
623
624    ret = 0;
625done:
626    up_write(&rbd_dev->header_rwsem);
627    return ret;
628}
629
630static void rbd_header_free(struct rbd_image_header *header)
631{
632    kfree(header->object_prefix);
633    kfree(header->snap_sizes);
634    kfree(header->snap_names);
635    ceph_put_snap_context(header->snapc);
636}
637
638/*
639 * get the actual striped segment name, offset and length
640 */
641static u64 rbd_get_segment(struct rbd_image_header *header,
642               const char *object_prefix,
643               u64 ofs, u64 len,
644               char *seg_name, u64 *segofs)
645{
646    u64 seg = ofs >> header->obj_order;
647
648    if (seg_name)
649        snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650             "%s.%012llx", object_prefix, seg);
651
652    ofs = ofs & ((1 << header->obj_order) - 1);
653    len = min_t(u64, len, (1 << header->obj_order) - ofs);
654
655    if (segofs)
656        *segofs = ofs;
657
658    return len;
659}
660
661static int rbd_get_num_segments(struct rbd_image_header *header,
662                u64 ofs, u64 len)
663{
664    u64 start_seg = ofs >> header->obj_order;
665    u64 end_seg = (ofs + len - 1) >> header->obj_order;
666    return end_seg - start_seg + 1;
667}
668
669/*
670 * returns the size of an object in the image
671 */
672static u64 rbd_obj_bytes(struct rbd_image_header *header)
673{
674    return 1 << header->obj_order;
675}
676
677/*
678 * bio helpers
679 */
680
681static void bio_chain_put(struct bio *chain)
682{
683    struct bio *tmp;
684
685    while (chain) {
686        tmp = chain;
687        chain = chain->bi_next;
688        bio_put(tmp);
689    }
690}
691
692/*
693 * zeros a bio chain, starting at specific offset
694 */
695static void zero_bio_chain(struct bio *chain, int start_ofs)
696{
697    struct bio_vec *bv;
698    unsigned long flags;
699    void *buf;
700    int i;
701    int pos = 0;
702
703    while (chain) {
704        bio_for_each_segment(bv, chain, i) {
705            if (pos + bv->bv_len > start_ofs) {
706                int remainder = max(start_ofs - pos, 0);
707                buf = bvec_kmap_irq(bv, &flags);
708                memset(buf + remainder, 0,
709                       bv->bv_len - remainder);
710                bvec_kunmap_irq(buf, &flags);
711            }
712            pos += bv->bv_len;
713        }
714
715        chain = chain->bi_next;
716    }
717}
718
719/*
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
722 */
723static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724                   struct bio_pair **bp,
725                   int len, gfp_t gfpmask)
726{
727    struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728    int total = 0;
729
730    if (*bp) {
731        bio_pair_release(*bp);
732        *bp = NULL;
733    }
734
735    while (old_chain && (total < len)) {
736        tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737        if (!tmp)
738            goto err_out;
739
740        if (total + old_chain->bi_size > len) {
741            struct bio_pair *bp;
742
743            /*
744             * this split can only happen with a single paged bio,
745             * split_bio will BUG_ON if this is not the case
746             */
747            dout("bio_chain_clone split! total=%d remaining=%d"
748                 "bi_size=%u\n",
749                 total, len - total, old_chain->bi_size);
750
751            /* split the bio. We'll release it either in the next
752               call, or it will have to be released outside */
753            bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
754            if (!bp)
755                goto err_out;
756
757            __bio_clone(tmp, &bp->bio1);
758
759            *next = &bp->bio2;
760        } else {
761            __bio_clone(tmp, old_chain);
762            *next = old_chain->bi_next;
763        }
764
765        tmp->bi_bdev = NULL;
766        gfpmask &= ~__GFP_WAIT;
767        tmp->bi_next = NULL;
768
769        if (!new_chain) {
770            new_chain = tail = tmp;
771        } else {
772            tail->bi_next = tmp;
773            tail = tmp;
774        }
775        old_chain = old_chain->bi_next;
776
777        total += tmp->bi_size;
778    }
779
780    BUG_ON(total < len);
781
782    if (tail)
783        tail->bi_next = NULL;
784
785    *old = old_chain;
786
787    return new_chain;
788
789err_out:
790    dout("bio_chain_clone with err\n");
791    bio_chain_put(new_chain);
792    return NULL;
793}
794
795/*
796 * helpers for osd request op vectors.
797 */
798static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799                    int opcode, u32 payload_len)
800{
801    struct ceph_osd_req_op *ops;
802
803    ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
804    if (!ops)
805        return NULL;
806
807    ops[0].op = opcode;
808
809    /*
810     * op extent offset and length will be set later on
811     * in calc_raw_layout()
812     */
813    ops[0].payload_len = payload_len;
814
815    return ops;
816}
817
818static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819{
820    kfree(ops);
821}
822
823static void rbd_coll_end_req_index(struct request *rq,
824                   struct rbd_req_coll *coll,
825                   int index,
826                   int ret, u64 len)
827{
828    struct request_queue *q;
829    int min, max, i;
830
831    dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832         coll, index, ret, (unsigned long long) len);
833
834    if (!rq)
835        return;
836
837    if (!coll) {
838        blk_end_request(rq, ret, len);
839        return;
840    }
841
842    q = rq->q;
843
844    spin_lock_irq(q->queue_lock);
845    coll->status[index].done = 1;
846    coll->status[index].rc = ret;
847    coll->status[index].bytes = len;
848    max = min = coll->num_done;
849    while (max < coll->total && coll->status[max].done)
850        max++;
851
852    for (i = min; i<max; i++) {
853        __blk_end_request(rq, coll->status[i].rc,
854                  coll->status[i].bytes);
855        coll->num_done++;
856        kref_put(&coll->kref, rbd_coll_release);
857    }
858    spin_unlock_irq(q->queue_lock);
859}
860
861static void rbd_coll_end_req(struct rbd_request *req,
862                 int ret, u64 len)
863{
864    rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
865}
866
867/*
868 * Send ceph osd request
869 */
870static int rbd_do_request(struct request *rq,
871              struct rbd_device *rbd_dev,
872              struct ceph_snap_context *snapc,
873              u64 snapid,
874              const char *object_name, u64 ofs, u64 len,
875              struct bio *bio,
876              struct page **pages,
877              int num_pages,
878              int flags,
879              struct ceph_osd_req_op *ops,
880              struct rbd_req_coll *coll,
881              int coll_index,
882              void (*rbd_cb)(struct ceph_osd_request *req,
883                     struct ceph_msg *msg),
884              struct ceph_osd_request **linger_req,
885              u64 *ver)
886{
887    struct ceph_osd_request *req;
888    struct ceph_file_layout *layout;
889    int ret;
890    u64 bno;
891    struct timespec mtime = CURRENT_TIME;
892    struct rbd_request *req_data;
893    struct ceph_osd_request_head *reqhead;
894    struct ceph_osd_client *osdc;
895
896    req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897    if (!req_data) {
898        if (coll)
899            rbd_coll_end_req_index(rq, coll, coll_index,
900                           -ENOMEM, len);
901        return -ENOMEM;
902    }
903
904    if (coll) {
905        req_data->coll = coll;
906        req_data->coll_index = coll_index;
907    }
908
909    dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910        (unsigned long long) ofs, (unsigned long long) len);
911
912    osdc = &rbd_dev->rbd_client->client->osdc;
913    req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914                    false, GFP_NOIO, pages, bio);
915    if (!req) {
916        ret = -ENOMEM;
917        goto done_pages;
918    }
919
920    req->r_callback = rbd_cb;
921
922    req_data->rq = rq;
923    req_data->bio = bio;
924    req_data->pages = pages;
925    req_data->len = len;
926
927    req->r_priv = req_data;
928
929    reqhead = req->r_request->front.iov_base;
930    reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931
932    strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933    req->r_oid_len = strlen(req->r_oid);
934
935    layout = &req->r_file_layout;
936    memset(layout, 0, sizeof(*layout));
937    layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938    layout->fl_stripe_count = cpu_to_le32(1);
939    layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940    layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941    ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942                req, ops);
943
944    ceph_osdc_build_request(req, ofs, &len,
945                ops,
946                snapc,
947                &mtime,
948                req->r_oid, req->r_oid_len);
949
950    if (linger_req) {
951        ceph_osdc_set_request_linger(osdc, req);
952        *linger_req = req;
953    }
954
955    ret = ceph_osdc_start_request(osdc, req, false);
956    if (ret < 0)
957        goto done_err;
958
959    if (!rbd_cb) {
960        ret = ceph_osdc_wait_request(osdc, req);
961        if (ver)
962            *ver = le64_to_cpu(req->r_reassert_version.version);
963        dout("reassert_ver=%llu\n",
964            (unsigned long long)
965                le64_to_cpu(req->r_reassert_version.version));
966        ceph_osdc_put_request(req);
967    }
968    return ret;
969
970done_err:
971    bio_chain_put(req_data->bio);
972    ceph_osdc_put_request(req);
973done_pages:
974    rbd_coll_end_req(req_data, ret, len);
975    kfree(req_data);
976    return ret;
977}
978
979/*
980 * Ceph osd op callback
981 */
982static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983{
984    struct rbd_request *req_data = req->r_priv;
985    struct ceph_osd_reply_head *replyhead;
986    struct ceph_osd_op *op;
987    __s32 rc;
988    u64 bytes;
989    int read_op;
990
991    /* parse reply */
992    replyhead = msg->front.iov_base;
993    WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994    op = (void *)(replyhead + 1);
995    rc = le32_to_cpu(replyhead->result);
996    bytes = le64_to_cpu(op->extent.length);
997    read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999    dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000        (unsigned long long) bytes, read_op, (int) rc);
1001
1002    if (rc == -ENOENT && read_op) {
1003        zero_bio_chain(req_data->bio, 0);
1004        rc = 0;
1005    } else if (rc == 0 && read_op && bytes < req_data->len) {
1006        zero_bio_chain(req_data->bio, bytes);
1007        bytes = req_data->len;
1008    }
1009
1010    rbd_coll_end_req(req_data, rc, bytes);
1011
1012    if (req_data->bio)
1013        bio_chain_put(req_data->bio);
1014
1015    ceph_osdc_put_request(req);
1016    kfree(req_data);
1017}
1018
1019static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020{
1021    ceph_osdc_put_request(req);
1022}
1023
1024/*
1025 * Do a synchronous ceph osd operation
1026 */
1027static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028               struct ceph_snap_context *snapc,
1029               u64 snapid,
1030               int flags,
1031               struct ceph_osd_req_op *ops,
1032               const char *object_name,
1033               u64 ofs, u64 len,
1034               char *buf,
1035               struct ceph_osd_request **linger_req,
1036               u64 *ver)
1037{
1038    int ret;
1039    struct page **pages;
1040    int num_pages;
1041
1042    BUG_ON(ops == NULL);
1043
1044    num_pages = calc_pages_for(ofs , len);
1045    pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046    if (IS_ERR(pages))
1047        return PTR_ERR(pages);
1048
1049    ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050              object_name, ofs, len, NULL,
1051              pages, num_pages,
1052              flags,
1053              ops,
1054              NULL, 0,
1055              NULL,
1056              linger_req, ver);
1057    if (ret < 0)
1058        goto done;
1059
1060    if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061        ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062
1063done:
1064    ceph_release_page_vector(pages, num_pages);
1065    return ret;
1066}
1067
1068/*
1069 * Do an asynchronous ceph osd operation
1070 */
1071static int rbd_do_op(struct request *rq,
1072             struct rbd_device *rbd_dev,
1073             struct ceph_snap_context *snapc,
1074             u64 snapid,
1075             int opcode, int flags,
1076             u64 ofs, u64 len,
1077             struct bio *bio,
1078             struct rbd_req_coll *coll,
1079             int coll_index)
1080{
1081    char *seg_name;
1082    u64 seg_ofs;
1083    u64 seg_len;
1084    int ret;
1085    struct ceph_osd_req_op *ops;
1086    u32 payload_len;
1087
1088    seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089    if (!seg_name)
1090        return -ENOMEM;
1091
1092    seg_len = rbd_get_segment(&rbd_dev->header,
1093                  rbd_dev->header.object_prefix,
1094                  ofs, len,
1095                  seg_name, &seg_ofs);
1096
1097    payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098
1099    ret = -ENOMEM;
1100    ops = rbd_create_rw_ops(1, opcode, payload_len);
1101    if (!ops)
1102        goto done;
1103
1104    /* we've taken care of segment sizes earlier when we
1105       cloned the bios. We should never have a segment
1106       truncated at this point */
1107    BUG_ON(seg_len < len);
1108
1109    ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110                 seg_name, seg_ofs, seg_len,
1111                 bio,
1112                 NULL, 0,
1113                 flags,
1114                 ops,
1115                 coll, coll_index,
1116                 rbd_req_cb, 0, NULL);
1117
1118    rbd_destroy_ops(ops);
1119done:
1120    kfree(seg_name);
1121    return ret;
1122}
1123
1124/*
1125 * Request async osd write
1126 */
1127static int rbd_req_write(struct request *rq,
1128             struct rbd_device *rbd_dev,
1129             struct ceph_snap_context *snapc,
1130             u64 ofs, u64 len,
1131             struct bio *bio,
1132             struct rbd_req_coll *coll,
1133             int coll_index)
1134{
1135    return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136             CEPH_OSD_OP_WRITE,
1137             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138             ofs, len, bio, coll, coll_index);
1139}
1140
1141/*
1142 * Request async osd read
1143 */
1144static int rbd_req_read(struct request *rq,
1145             struct rbd_device *rbd_dev,
1146             u64 snapid,
1147             u64 ofs, u64 len,
1148             struct bio *bio,
1149             struct rbd_req_coll *coll,
1150             int coll_index)
1151{
1152    return rbd_do_op(rq, rbd_dev, NULL,
1153             snapid,
1154             CEPH_OSD_OP_READ,
1155             CEPH_OSD_FLAG_READ,
1156             ofs, len, bio, coll, coll_index);
1157}
1158
1159/*
1160 * Request sync osd read
1161 */
1162static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1163              u64 snapid,
1164              const char *object_name,
1165              u64 ofs, u64 len,
1166              char *buf,
1167              u64 *ver)
1168{
1169    struct ceph_osd_req_op *ops;
1170    int ret;
1171
1172    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1173    if (!ops)
1174        return -ENOMEM;
1175
1176    ret = rbd_req_sync_op(rbd_dev, NULL,
1177                   snapid,
1178                   CEPH_OSD_FLAG_READ,
1179                   ops, object_name, ofs, len, buf, NULL, ver);
1180    rbd_destroy_ops(ops);
1181
1182    return ret;
1183}
1184
1185/*
1186 * Request sync osd watch
1187 */
1188static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1189                   u64 ver,
1190                   u64 notify_id)
1191{
1192    struct ceph_osd_req_op *ops;
1193    int ret;
1194
1195    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1196    if (!ops)
1197        return -ENOMEM;
1198
1199    ops[0].watch.ver = cpu_to_le64(ver);
1200    ops[0].watch.cookie = notify_id;
1201    ops[0].watch.flag = 0;
1202
1203    ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204              rbd_dev->header_name, 0, 0, NULL,
1205              NULL, 0,
1206              CEPH_OSD_FLAG_READ,
1207              ops,
1208              NULL, 0,
1209              rbd_simple_req_cb, 0, NULL);
1210
1211    rbd_destroy_ops(ops);
1212    return ret;
1213}
1214
1215static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1216{
1217    struct rbd_device *rbd_dev = (struct rbd_device *)data;
1218    u64 hver;
1219    int rc;
1220
1221    if (!rbd_dev)
1222        return;
1223
1224    dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225        rbd_dev->header_name, (unsigned long long) notify_id,
1226        (unsigned int) opcode);
1227    rc = rbd_refresh_header(rbd_dev, &hver);
1228    if (rc)
1229        pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230               " update snaps: %d\n", rbd_dev->major, rc);
1231
1232    rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1233}
1234
1235/*
1236 * Request sync osd watch
1237 */
1238static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1239{
1240    struct ceph_osd_req_op *ops;
1241    struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242    int ret;
1243
1244    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245    if (!ops)
1246        return -ENOMEM;
1247
1248    ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249                     (void *)rbd_dev, &rbd_dev->watch_event);
1250    if (ret < 0)
1251        goto fail;
1252
1253    ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254    ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255    ops[0].watch.flag = 1;
1256
1257    ret = rbd_req_sync_op(rbd_dev, NULL,
1258                  CEPH_NOSNAP,
1259                  CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260                  ops,
1261                  rbd_dev->header_name,
1262                  0, 0, NULL,
1263                  &rbd_dev->watch_request, NULL);
1264
1265    if (ret < 0)
1266        goto fail_event;
1267
1268    rbd_destroy_ops(ops);
1269    return 0;
1270
1271fail_event:
1272    ceph_osdc_cancel_event(rbd_dev->watch_event);
1273    rbd_dev->watch_event = NULL;
1274fail:
1275    rbd_destroy_ops(ops);
1276    return ret;
1277}
1278
1279/*
1280 * Request sync osd unwatch
1281 */
1282static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283{
1284    struct ceph_osd_req_op *ops;
1285    int ret;
1286
1287    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288    if (!ops)
1289        return -ENOMEM;
1290
1291    ops[0].watch.ver = 0;
1292    ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293    ops[0].watch.flag = 0;
1294
1295    ret = rbd_req_sync_op(rbd_dev, NULL,
1296                  CEPH_NOSNAP,
1297                  CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298                  ops,
1299                  rbd_dev->header_name,
1300                  0, 0, NULL, NULL, NULL);
1301
1302
1303    rbd_destroy_ops(ops);
1304    ceph_osdc_cancel_event(rbd_dev->watch_event);
1305    rbd_dev->watch_event = NULL;
1306    return ret;
1307}
1308
1309struct rbd_notify_info {
1310    struct rbd_device *rbd_dev;
1311};
1312
1313static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314{
1315    struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316    if (!rbd_dev)
1317        return;
1318
1319    dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320            rbd_dev->header_name, (unsigned long long) notify_id,
1321            (unsigned int) opcode);
1322}
1323
1324/*
1325 * Request sync osd notify
1326 */
1327static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1328{
1329    struct ceph_osd_req_op *ops;
1330    struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331    struct ceph_osd_event *event;
1332    struct rbd_notify_info info;
1333    int payload_len = sizeof(u32) + sizeof(u32);
1334    int ret;
1335
1336    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1337    if (!ops)
1338        return -ENOMEM;
1339
1340    info.rbd_dev = rbd_dev;
1341
1342    ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343                     (void *)&info, &event);
1344    if (ret < 0)
1345        goto fail;
1346
1347    ops[0].watch.ver = 1;
1348    ops[0].watch.flag = 1;
1349    ops[0].watch.cookie = event->cookie;
1350    ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351    ops[0].watch.timeout = 12;
1352
1353    ret = rbd_req_sync_op(rbd_dev, NULL,
1354                   CEPH_NOSNAP,
1355                   CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356                   ops,
1357                   rbd_dev->header_name,
1358                   0, 0, NULL, NULL, NULL);
1359    if (ret < 0)
1360        goto fail_event;
1361
1362    ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363    dout("ceph_osdc_wait_event returned %d\n", ret);
1364    rbd_destroy_ops(ops);
1365    return 0;
1366
1367fail_event:
1368    ceph_osdc_cancel_event(event);
1369fail:
1370    rbd_destroy_ops(ops);
1371    return ret;
1372}
1373
1374/*
1375 * Request sync osd read
1376 */
1377static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378                 const char *object_name,
1379                 const char *class_name,
1380                 const char *method_name,
1381                 const char *data,
1382                 int len,
1383                 u64 *ver)
1384{
1385    struct ceph_osd_req_op *ops;
1386    int class_name_len = strlen(class_name);
1387    int method_name_len = strlen(method_name);
1388    int ret;
1389
1390    ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391                    class_name_len + method_name_len + len);
1392    if (!ops)
1393        return -ENOMEM;
1394
1395    ops[0].cls.class_name = class_name;
1396    ops[0].cls.class_len = (__u8) class_name_len;
1397    ops[0].cls.method_name = method_name;
1398    ops[0].cls.method_len = (__u8) method_name_len;
1399    ops[0].cls.argc = 0;
1400    ops[0].cls.indata = data;
1401    ops[0].cls.indata_len = len;
1402
1403    ret = rbd_req_sync_op(rbd_dev, NULL,
1404                   CEPH_NOSNAP,
1405                   CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406                   ops,
1407                   object_name, 0, 0, NULL, NULL, ver);
1408
1409    rbd_destroy_ops(ops);
1410
1411    dout("cls_exec returned %d\n", ret);
1412    return ret;
1413}
1414
1415static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416{
1417    struct rbd_req_coll *coll =
1418            kzalloc(sizeof(struct rbd_req_coll) +
1419                    sizeof(struct rbd_req_status) * num_reqs,
1420                GFP_ATOMIC);
1421
1422    if (!coll)
1423        return NULL;
1424    coll->total = num_reqs;
1425    kref_init(&coll->kref);
1426    return coll;
1427}
1428
1429/*
1430 * block device queue callback
1431 */
1432static void rbd_rq_fn(struct request_queue *q)
1433{
1434    struct rbd_device *rbd_dev = q->queuedata;
1435    struct request *rq;
1436    struct bio_pair *bp = NULL;
1437
1438    while ((rq = blk_fetch_request(q))) {
1439        struct bio *bio;
1440        struct bio *rq_bio, *next_bio = NULL;
1441        bool do_write;
1442        unsigned int size;
1443        u64 op_size = 0;
1444        u64 ofs;
1445        int num_segs, cur_seg = 0;
1446        struct rbd_req_coll *coll;
1447        struct ceph_snap_context *snapc;
1448
1449        /* peek at request from block layer */
1450        if (!rq)
1451            break;
1452
1453        dout("fetched request\n");
1454
1455        /* filter out block requests we don't understand */
1456        if ((rq->cmd_type != REQ_TYPE_FS)) {
1457            __blk_end_request_all(rq, 0);
1458            continue;
1459        }
1460
1461        /* deduce our operation (read, write) */
1462        do_write = (rq_data_dir(rq) == WRITE);
1463
1464        size = blk_rq_bytes(rq);
1465        ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466        rq_bio = rq->bio;
1467        if (do_write && rbd_dev->read_only) {
1468            __blk_end_request_all(rq, -EROFS);
1469            continue;
1470        }
1471
1472        spin_unlock_irq(q->queue_lock);
1473
1474        down_read(&rbd_dev->header_rwsem);
1475
1476        if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477            up_read(&rbd_dev->header_rwsem);
1478            dout("request for non-existent snapshot");
1479            spin_lock_irq(q->queue_lock);
1480            __blk_end_request_all(rq, -ENXIO);
1481            continue;
1482        }
1483
1484        snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486        up_read(&rbd_dev->header_rwsem);
1487
1488        dout("%s 0x%x bytes at 0x%llx\n",
1489             do_write ? "write" : "read",
1490             size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491
1492        num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493        coll = rbd_alloc_coll(num_segs);
1494        if (!coll) {
1495            spin_lock_irq(q->queue_lock);
1496            __blk_end_request_all(rq, -ENOMEM);
1497            ceph_put_snap_context(snapc);
1498            continue;
1499        }
1500
1501        do {
1502            /* a bio clone to be passed down to OSD req */
1503            dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504            op_size = rbd_get_segment(&rbd_dev->header,
1505                          rbd_dev->header.object_prefix,
1506                          ofs, size,
1507                          NULL, NULL);
1508            kref_get(&coll->kref);
1509            bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510                          op_size, GFP_ATOMIC);
1511            if (!bio) {
1512                rbd_coll_end_req_index(rq, coll, cur_seg,
1513                               -ENOMEM, op_size);
1514                goto next_seg;
1515            }
1516
1517
1518            /* init OSD command: write or read */
1519            if (do_write)
1520                rbd_req_write(rq, rbd_dev,
1521                          snapc,
1522                          ofs,
1523                          op_size, bio,
1524                          coll, cur_seg);
1525            else
1526                rbd_req_read(rq, rbd_dev,
1527                         rbd_dev->snap_id,
1528                         ofs,
1529                         op_size, bio,
1530                         coll, cur_seg);
1531
1532next_seg:
1533            size -= op_size;
1534            ofs += op_size;
1535
1536            cur_seg++;
1537            rq_bio = next_bio;
1538        } while (size > 0);
1539        kref_put(&coll->kref, rbd_coll_release);
1540
1541        if (bp)
1542            bio_pair_release(bp);
1543        spin_lock_irq(q->queue_lock);
1544
1545        ceph_put_snap_context(snapc);
1546    }
1547}
1548
1549/*
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1553 */
1554static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555              struct bio_vec *bvec)
1556{
1557    struct rbd_device *rbd_dev = q->queuedata;
1558    unsigned int chunk_sectors;
1559    sector_t sector;
1560    unsigned int bio_sectors;
1561    int max;
1562
1563    chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564    sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565    bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
1567    max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1568                 + bio_sectors)) << SECTOR_SHIFT;
1569    if (max < 0)
1570        max = 0; /* bio_add cannot handle a negative return */
1571    if (max <= bvec->bv_len && bio_sectors == 0)
1572        return bvec->bv_len;
1573    return max;
1574}
1575
1576static void rbd_free_disk(struct rbd_device *rbd_dev)
1577{
1578    struct gendisk *disk = rbd_dev->disk;
1579
1580    if (!disk)
1581        return;
1582
1583    rbd_header_free(&rbd_dev->header);
1584
1585    if (disk->flags & GENHD_FL_UP)
1586        del_gendisk(disk);
1587    if (disk->queue)
1588        blk_cleanup_queue(disk->queue);
1589    put_disk(disk);
1590}
1591
1592/*
1593 * reload the ondisk the header
1594 */
1595static int rbd_read_header(struct rbd_device *rbd_dev,
1596               struct rbd_image_header *header)
1597{
1598    ssize_t rc;
1599    struct rbd_image_header_ondisk *dh;
1600    u32 snap_count = 0;
1601    u64 ver;
1602    size_t len;
1603
1604    /*
1605     * First reads the fixed-size header to determine the number
1606     * of snapshots, then re-reads it, along with all snapshot
1607     * records as well as their stored names.
1608     */
1609    len = sizeof (*dh);
1610    while (1) {
1611        dh = kmalloc(len, GFP_KERNEL);
1612        if (!dh)
1613            return -ENOMEM;
1614
1615        rc = rbd_req_sync_read(rbd_dev,
1616                       CEPH_NOSNAP,
1617                       rbd_dev->header_name,
1618                       0, len,
1619                       (char *)dh, &ver);
1620        if (rc < 0)
1621            goto out_dh;
1622
1623        rc = rbd_header_from_disk(header, dh, snap_count);
1624        if (rc < 0) {
1625            if (rc == -ENXIO)
1626                pr_warning("unrecognized header format"
1627                       " for image %s\n",
1628                       rbd_dev->image_name);
1629            goto out_dh;
1630        }
1631
1632        if (snap_count == header->total_snaps)
1633            break;
1634
1635        snap_count = header->total_snaps;
1636        len = sizeof (*dh) +
1637            snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638            header->snap_names_len;
1639
1640        rbd_header_free(header);
1641        kfree(dh);
1642    }
1643    header->obj_version = ver;
1644
1645out_dh:
1646    kfree(dh);
1647    return rc;
1648}
1649
1650/*
1651 * create a snapshot
1652 */
1653static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654                   const char *snap_name,
1655                   gfp_t gfp_flags)
1656{
1657    int name_len = strlen(snap_name);
1658    u64 new_snapid;
1659    int ret;
1660    void *data, *p, *e;
1661    struct ceph_mon_client *monc;
1662
1663    /* we should create a snapshot only if we're pointing at the head */
1664    if (rbd_dev->snap_id != CEPH_NOSNAP)
1665        return -EINVAL;
1666
1667    monc = &rbd_dev->rbd_client->client->monc;
1668    ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669    dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1670    if (ret < 0)
1671        return ret;
1672
1673    data = kmalloc(name_len + 16, gfp_flags);
1674    if (!data)
1675        return -ENOMEM;
1676
1677    p = data;
1678    e = data + name_len + 16;
1679
1680    ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681    ceph_encode_64_safe(&p, e, new_snapid, bad);
1682
1683    ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1684                "rbd", "snap_add",
1685                data, p - data, NULL);
1686
1687    kfree(data);
1688
1689    return ret < 0 ? ret : 0;
1690bad:
1691    return -ERANGE;
1692}
1693
1694static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695{
1696    struct rbd_snap *snap;
1697    struct rbd_snap *next;
1698
1699    list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700        __rbd_remove_snap_dev(snap);
1701}
1702
1703/*
1704 * only read the first part of the ondisk header, without the snaps info
1705 */
1706static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1707{
1708    int ret;
1709    struct rbd_image_header h;
1710
1711    ret = rbd_read_header(rbd_dev, &h);
1712    if (ret < 0)
1713        return ret;
1714
1715    down_write(&rbd_dev->header_rwsem);
1716
1717    /* resized? */
1718    if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719        sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1720
1721        dout("setting size to %llu sectors", (unsigned long long) size);
1722        set_capacity(rbd_dev->disk, size);
1723    }
1724
1725    /* rbd_dev->header.object_prefix shouldn't change */
1726    kfree(rbd_dev->header.snap_sizes);
1727    kfree(rbd_dev->header.snap_names);
1728    /* osd requests may still refer to snapc */
1729    ceph_put_snap_context(rbd_dev->header.snapc);
1730
1731    if (hver)
1732        *hver = h.obj_version;
1733    rbd_dev->header.obj_version = h.obj_version;
1734    rbd_dev->header.image_size = h.image_size;
1735    rbd_dev->header.total_snaps = h.total_snaps;
1736    rbd_dev->header.snapc = h.snapc;
1737    rbd_dev->header.snap_names = h.snap_names;
1738    rbd_dev->header.snap_names_len = h.snap_names_len;
1739    rbd_dev->header.snap_sizes = h.snap_sizes;
1740    /* Free the extra copy of the object prefix */
1741    WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742    kfree(h.object_prefix);
1743
1744    ret = __rbd_init_snaps_header(rbd_dev);
1745
1746    up_write(&rbd_dev->header_rwsem);
1747
1748    return ret;
1749}
1750
1751static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1752{
1753    int ret;
1754
1755    mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756    ret = __rbd_refresh_header(rbd_dev, hver);
1757    mutex_unlock(&ctl_mutex);
1758
1759    return ret;
1760}
1761
1762static int rbd_init_disk(struct rbd_device *rbd_dev)
1763{
1764    struct gendisk *disk;
1765    struct request_queue *q;
1766    int rc;
1767    u64 segment_size;
1768    u64 total_size = 0;
1769
1770    /* contact OSD, request size info about the object being mapped */
1771    rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772    if (rc)
1773        return rc;
1774
1775    /* no need to lock here, as rbd_dev is not registered yet */
1776    rc = __rbd_init_snaps_header(rbd_dev);
1777    if (rc)
1778        return rc;
1779
1780    rc = rbd_header_set_snap(rbd_dev, &total_size);
1781    if (rc)
1782        return rc;
1783
1784    /* create gendisk info */
1785    rc = -ENOMEM;
1786    disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787    if (!disk)
1788        goto out;
1789
1790    snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791         rbd_dev->dev_id);
1792    disk->major = rbd_dev->major;
1793    disk->first_minor = 0;
1794    disk->fops = &rbd_bd_ops;
1795    disk->private_data = rbd_dev;
1796
1797    /* init rq */
1798    rc = -ENOMEM;
1799    q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800    if (!q)
1801        goto out_disk;
1802
1803    /* We use the default size, but let's be explicit about it. */
1804    blk_queue_physical_block_size(q, SECTOR_SIZE);
1805
1806    /* set io sizes to object size */
1807    segment_size = rbd_obj_bytes(&rbd_dev->header);
1808    blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809    blk_queue_max_segment_size(q, segment_size);
1810    blk_queue_io_min(q, segment_size);
1811    blk_queue_io_opt(q, segment_size);
1812
1813    blk_queue_merge_bvec(q, rbd_merge_bvec);
1814    disk->queue = q;
1815
1816    q->queuedata = rbd_dev;
1817
1818    rbd_dev->disk = disk;
1819    rbd_dev->q = q;
1820
1821    /* finally, announce the disk to the world */
1822    set_capacity(disk, total_size / SECTOR_SIZE);
1823    add_disk(disk);
1824
1825    pr_info("%s: added with size 0x%llx\n",
1826        disk->disk_name, (unsigned long long)total_size);
1827    return 0;
1828
1829out_disk:
1830    put_disk(disk);
1831out:
1832    return rc;
1833}
1834
1835/*
1836  sysfs
1837*/
1838
1839static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1840{
1841    return container_of(dev, struct rbd_device, dev);
1842}
1843
1844static ssize_t rbd_size_show(struct device *dev,
1845                 struct device_attribute *attr, char *buf)
1846{
1847    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848    sector_t size;
1849
1850    down_read(&rbd_dev->header_rwsem);
1851    size = get_capacity(rbd_dev->disk);
1852    up_read(&rbd_dev->header_rwsem);
1853
1854    return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1855}
1856
1857static ssize_t rbd_major_show(struct device *dev,
1858                  struct device_attribute *attr, char *buf)
1859{
1860    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862    return sprintf(buf, "%d\n", rbd_dev->major);
1863}
1864
1865static ssize_t rbd_client_id_show(struct device *dev,
1866                  struct device_attribute *attr, char *buf)
1867{
1868    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870    return sprintf(buf, "client%lld\n",
1871            ceph_client_id(rbd_dev->rbd_client->client));
1872}
1873
1874static ssize_t rbd_pool_show(struct device *dev,
1875                 struct device_attribute *attr, char *buf)
1876{
1877    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879    return sprintf(buf, "%s\n", rbd_dev->pool_name);
1880}
1881
1882static ssize_t rbd_pool_id_show(struct device *dev,
1883                 struct device_attribute *attr, char *buf)
1884{
1885    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887    return sprintf(buf, "%d\n", rbd_dev->pool_id);
1888}
1889
1890static ssize_t rbd_name_show(struct device *dev,
1891                 struct device_attribute *attr, char *buf)
1892{
1893    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895    return sprintf(buf, "%s\n", rbd_dev->image_name);
1896}
1897
1898static ssize_t rbd_snap_show(struct device *dev,
1899                 struct device_attribute *attr,
1900                 char *buf)
1901{
1902    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904    return sprintf(buf, "%s\n", rbd_dev->snap_name);
1905}
1906
1907static ssize_t rbd_image_refresh(struct device *dev,
1908                 struct device_attribute *attr,
1909                 const char *buf,
1910                 size_t size)
1911{
1912    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913    int ret;
1914
1915    ret = rbd_refresh_header(rbd_dev, NULL);
1916
1917    return ret < 0 ? ret : size;
1918}
1919
1920static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929
1930static struct attribute *rbd_attrs[] = {
1931    &dev_attr_size.attr,
1932    &dev_attr_major.attr,
1933    &dev_attr_client_id.attr,
1934    &dev_attr_pool.attr,
1935    &dev_attr_pool_id.attr,
1936    &dev_attr_name.attr,
1937    &dev_attr_current_snap.attr,
1938    &dev_attr_refresh.attr,
1939    &dev_attr_create_snap.attr,
1940    NULL
1941};
1942
1943static struct attribute_group rbd_attr_group = {
1944    .attrs = rbd_attrs,
1945};
1946
1947static const struct attribute_group *rbd_attr_groups[] = {
1948    &rbd_attr_group,
1949    NULL
1950};
1951
1952static void rbd_sysfs_dev_release(struct device *dev)
1953{
1954}
1955
1956static struct device_type rbd_device_type = {
1957    .name = "rbd",
1958    .groups = rbd_attr_groups,
1959    .release = rbd_sysfs_dev_release,
1960};
1961
1962
1963/*
1964  sysfs - snapshots
1965*/
1966
1967static ssize_t rbd_snap_size_show(struct device *dev,
1968                  struct device_attribute *attr,
1969                  char *buf)
1970{
1971    struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973    return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974}
1975
1976static ssize_t rbd_snap_id_show(struct device *dev,
1977                struct device_attribute *attr,
1978                char *buf)
1979{
1980    struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982    return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983}
1984
1985static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988static struct attribute *rbd_snap_attrs[] = {
1989    &dev_attr_snap_size.attr,
1990    &dev_attr_snap_id.attr,
1991    NULL,
1992};
1993
1994static struct attribute_group rbd_snap_attr_group = {
1995    .attrs = rbd_snap_attrs,
1996};
1997
1998static void rbd_snap_dev_release(struct device *dev)
1999{
2000    struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001    kfree(snap->name);
2002    kfree(snap);
2003}
2004
2005static const struct attribute_group *rbd_snap_attr_groups[] = {
2006    &rbd_snap_attr_group,
2007    NULL
2008};
2009
2010static struct device_type rbd_snap_device_type = {
2011    .groups = rbd_snap_attr_groups,
2012    .release = rbd_snap_dev_release,
2013};
2014
2015static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2016{
2017    list_del(&snap->node);
2018    device_unregister(&snap->dev);
2019}
2020
2021static int rbd_register_snap_dev(struct rbd_snap *snap,
2022                  struct device *parent)
2023{
2024    struct device *dev = &snap->dev;
2025    int ret;
2026
2027    dev->type = &rbd_snap_device_type;
2028    dev->parent = parent;
2029    dev->release = rbd_snap_dev_release;
2030    dev_set_name(dev, "snap_%s", snap->name);
2031    ret = device_register(dev);
2032
2033    return ret;
2034}
2035
2036static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037                          int i, const char *name)
2038{
2039    struct rbd_snap *snap;
2040    int ret;
2041
2042    snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2043    if (!snap)
2044        return ERR_PTR(-ENOMEM);
2045
2046    ret = -ENOMEM;
2047    snap->name = kstrdup(name, GFP_KERNEL);
2048    if (!snap->name)
2049        goto err;
2050
2051    snap->size = rbd_dev->header.snap_sizes[i];
2052    snap->id = rbd_dev->header.snapc->snaps[i];
2053    if (device_is_registered(&rbd_dev->dev)) {
2054        ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2055        if (ret < 0)
2056            goto err;
2057    }
2058
2059    return snap;
2060
2061err:
2062    kfree(snap->name);
2063    kfree(snap);
2064
2065    return ERR_PTR(ret);
2066}
2067
2068/*
2069 * search for the previous snap in a null delimited string list
2070 */
2071const char *rbd_prev_snap_name(const char *name, const char *start)
2072{
2073    if (name < start + 2)
2074        return NULL;
2075
2076    name -= 2;
2077    while (*name) {
2078        if (name == start)
2079            return start;
2080        name--;
2081    }
2082    return name + 1;
2083}
2084
2085/*
2086 * compare the old list of snapshots that we have to what's in the header
2087 * and update it accordingly. Note that the header holds the snapshots
2088 * in a reverse order (from newest to oldest) and we need to go from
2089 * older to new so that we don't get a duplicate snap name when
2090 * doing the process (e.g., removed snapshot and recreated a new
2091 * one with the same name.
2092 */
2093static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2094{
2095    const char *name, *first_name;
2096    int i = rbd_dev->header.total_snaps;
2097    struct rbd_snap *snap, *old_snap = NULL;
2098    struct list_head *p, *n;
2099
2100    first_name = rbd_dev->header.snap_names;
2101    name = first_name + rbd_dev->header.snap_names_len;
2102
2103    list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104        u64 cur_id;
2105
2106        old_snap = list_entry(p, struct rbd_snap, node);
2107
2108        if (i)
2109            cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110
2111        if (!i || old_snap->id < cur_id) {
2112            /*
2113             * old_snap->id was skipped, thus was
2114             * removed. If this rbd_dev is mapped to
2115             * the removed snapshot, record that it no
2116             * longer exists, to prevent further I/O.
2117             */
2118            if (rbd_dev->snap_id == old_snap->id)
2119                rbd_dev->snap_exists = false;
2120            __rbd_remove_snap_dev(old_snap);
2121            continue;
2122        }
2123        if (old_snap->id == cur_id) {
2124            /* we have this snapshot already */
2125            i--;
2126            name = rbd_prev_snap_name(name, first_name);
2127            continue;
2128        }
2129        for (; i > 0;
2130             i--, name = rbd_prev_snap_name(name, first_name)) {
2131            if (!name) {
2132                WARN_ON(1);
2133                return -EINVAL;
2134            }
2135            cur_id = rbd_dev->header.snapc->snaps[i];
2136            /* snapshot removal? handle it above */
2137            if (cur_id >= old_snap->id)
2138                break;
2139            /* a new snapshot */
2140            snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2141            if (IS_ERR(snap))
2142                return PTR_ERR(snap);
2143
2144            /* note that we add it backward so using n and not p */
2145            list_add(&snap->node, n);
2146            p = &snap->node;
2147        }
2148    }
2149    /* we're done going over the old snap list, just add what's left */
2150    for (; i > 0; i--) {
2151        name = rbd_prev_snap_name(name, first_name);
2152        if (!name) {
2153            WARN_ON(1);
2154            return -EINVAL;
2155        }
2156        snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2157        if (IS_ERR(snap))
2158            return PTR_ERR(snap);
2159        list_add(&snap->node, &rbd_dev->snaps);
2160    }
2161
2162    return 0;
2163}
2164
2165static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166{
2167    int ret;
2168    struct device *dev;
2169    struct rbd_snap *snap;
2170
2171    mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172    dev = &rbd_dev->dev;
2173
2174    dev->bus = &rbd_bus_type;
2175    dev->type = &rbd_device_type;
2176    dev->parent = &rbd_root_dev;
2177    dev->release = rbd_dev_release;
2178    dev_set_name(dev, "%d", rbd_dev->dev_id);
2179    ret = device_register(dev);
2180    if (ret < 0)
2181        goto out;
2182
2183    list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184        ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2185        if (ret < 0)
2186            break;
2187    }
2188out:
2189    mutex_unlock(&ctl_mutex);
2190    return ret;
2191}
2192
2193static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2194{
2195    device_unregister(&rbd_dev->dev);
2196}
2197
2198static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2199{
2200    int ret, rc;
2201
2202    do {
2203        ret = rbd_req_sync_watch(rbd_dev);
2204        if (ret == -ERANGE) {
2205            rc = rbd_refresh_header(rbd_dev, NULL);
2206            if (rc < 0)
2207                return rc;
2208        }
2209    } while (ret == -ERANGE);
2210
2211    return ret;
2212}
2213
2214static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216/*
2217 * Get a unique rbd identifier for the given new rbd_dev, and add
2218 * the rbd_dev to the global list. The minimum rbd id is 1.
2219 */
2220static void rbd_id_get(struct rbd_device *rbd_dev)
2221{
2222    rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2223
2224    spin_lock(&rbd_dev_list_lock);
2225    list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226    spin_unlock(&rbd_dev_list_lock);
2227}
2228
2229/*
2230 * Remove an rbd_dev from the global list, and record that its
2231 * identifier is no longer in use.
2232 */
2233static void rbd_id_put(struct rbd_device *rbd_dev)
2234{
2235    struct list_head *tmp;
2236    int rbd_id = rbd_dev->dev_id;
2237    int max_id;
2238
2239    BUG_ON(rbd_id < 1);
2240
2241    spin_lock(&rbd_dev_list_lock);
2242    list_del_init(&rbd_dev->node);
2243
2244    /*
2245     * If the id being "put" is not the current maximum, there
2246     * is nothing special we need to do.
2247     */
2248    if (rbd_id != atomic64_read(&rbd_id_max)) {
2249        spin_unlock(&rbd_dev_list_lock);
2250        return;
2251    }
2252
2253    /*
2254     * We need to update the current maximum id. Search the
2255     * list to find out what it is. We're more likely to find
2256     * the maximum at the end, so search the list backward.
2257     */
2258    max_id = 0;
2259    list_for_each_prev(tmp, &rbd_dev_list) {
2260        struct rbd_device *rbd_dev;
2261
2262        rbd_dev = list_entry(tmp, struct rbd_device, node);
2263        if (rbd_id > max_id)
2264            max_id = rbd_id;
2265    }
2266    spin_unlock(&rbd_dev_list_lock);
2267
2268    /*
2269     * The max id could have been updated by rbd_id_get(), in
2270     * which case it now accurately reflects the new maximum.
2271     * Be careful not to overwrite the maximum value in that
2272     * case.
2273     */
2274    atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275}
2276
2277/*
2278 * Skips over white space at *buf, and updates *buf to point to the
2279 * first found non-space character (if any). Returns the length of
2280 * the token (string of non-white space characters) found. Note
2281 * that *buf must be terminated with '\0'.
2282 */
2283static inline size_t next_token(const char **buf)
2284{
2285        /*
2286        * These are the characters that produce nonzero for
2287        * isspace() in the "C" and "POSIX" locales.
2288        */
2289        const char *spaces = " \f\n\r\t\v";
2290
2291        *buf += strspn(*buf, spaces); /* Find start of token */
2292
2293    return strcspn(*buf, spaces); /* Return token length */
2294}
2295
2296/*
2297 * Finds the next token in *buf, and if the provided token buffer is
2298 * big enough, copies the found token into it. The result, if
2299 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2300 * must be terminated with '\0' on entry.
2301 *
2302 * Returns the length of the token found (not including the '\0').
2303 * Return value will be 0 if no token is found, and it will be >=
2304 * token_size if the token would not fit.
2305 *
2306 * The *buf pointer will be updated to point beyond the end of the
2307 * found token. Note that this occurs even if the token buffer is
2308 * too small to hold it.
2309 */
2310static inline size_t copy_token(const char **buf,
2311                char *token,
2312                size_t token_size)
2313{
2314        size_t len;
2315
2316    len = next_token(buf);
2317    if (len < token_size) {
2318        memcpy(token, *buf, len);
2319        *(token + len) = '\0';
2320    }
2321    *buf += len;
2322
2323        return len;
2324}
2325
2326/*
2327 * Finds the next token in *buf, dynamically allocates a buffer big
2328 * enough to hold a copy of it, and copies the token into the new
2329 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2330 * that a duplicate buffer is created even for a zero-length token.
2331 *
2332 * Returns a pointer to the newly-allocated duplicate, or a null
2333 * pointer if memory for the duplicate was not available. If
2334 * the lenp argument is a non-null pointer, the length of the token
2335 * (not including the '\0') is returned in *lenp.
2336 *
2337 * If successful, the *buf pointer will be updated to point beyond
2338 * the end of the found token.
2339 *
2340 * Note: uses GFP_KERNEL for allocation.
2341 */
2342static inline char *dup_token(const char **buf, size_t *lenp)
2343{
2344    char *dup;
2345    size_t len;
2346
2347    len = next_token(buf);
2348    dup = kmalloc(len + 1, GFP_KERNEL);
2349    if (!dup)
2350        return NULL;
2351
2352    memcpy(dup, *buf, len);
2353    *(dup + len) = '\0';
2354    *buf += len;
2355
2356    if (lenp)
2357        *lenp = len;
2358
2359    return dup;
2360}
2361
2362/*
2363 * This fills in the pool_name, image_name, image_name_len, snap_name,
2364 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365 * on the list of monitor addresses and other options provided via
2366 * /sys/bus/rbd/add.
2367 *
2368 * Note: rbd_dev is assumed to have been initially zero-filled.
2369 */
2370static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371                  const char *buf,
2372                  const char **mon_addrs,
2373                  size_t *mon_addrs_size,
2374                  char *options,
2375                 size_t options_size)
2376{
2377    size_t len;
2378    int ret;
2379
2380    /* The first four tokens are required */
2381
2382    len = next_token(&buf);
2383    if (!len)
2384        return -EINVAL;
2385    *mon_addrs_size = len + 1;
2386    *mon_addrs = buf;
2387
2388    buf += len;
2389
2390    len = copy_token(&buf, options, options_size);
2391    if (!len || len >= options_size)
2392        return -EINVAL;
2393
2394    ret = -ENOMEM;
2395    rbd_dev->pool_name = dup_token(&buf, NULL);
2396    if (!rbd_dev->pool_name)
2397        goto out_err;
2398
2399    rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400    if (!rbd_dev->image_name)
2401        goto out_err;
2402
2403    /* Create the name of the header object */
2404
2405    rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406                        + sizeof (RBD_SUFFIX),
2407                    GFP_KERNEL);
2408    if (!rbd_dev->header_name)
2409        goto out_err;
2410    sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411
2412    /*
2413     * The snapshot name is optional. If none is is supplied,
2414     * we use the default value.
2415     */
2416    rbd_dev->snap_name = dup_token(&buf, &len);
2417    if (!rbd_dev->snap_name)
2418        goto out_err;
2419    if (!len) {
2420        /* Replace the empty name with the default */
2421        kfree(rbd_dev->snap_name);
2422        rbd_dev->snap_name
2423            = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424        if (!rbd_dev->snap_name)
2425            goto out_err;
2426
2427        memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428            sizeof (RBD_SNAP_HEAD_NAME));
2429    }
2430
2431    return 0;
2432
2433out_err:
2434    kfree(rbd_dev->header_name);
2435    kfree(rbd_dev->image_name);
2436    kfree(rbd_dev->pool_name);
2437    rbd_dev->pool_name = NULL;
2438
2439    return ret;
2440}
2441
2442static ssize_t rbd_add(struct bus_type *bus,
2443               const char *buf,
2444               size_t count)
2445{
2446    char *options;
2447    struct rbd_device *rbd_dev = NULL;
2448    const char *mon_addrs = NULL;
2449    size_t mon_addrs_size = 0;
2450    struct ceph_osd_client *osdc;
2451    int rc = -ENOMEM;
2452
2453    if (!try_module_get(THIS_MODULE))
2454        return -ENODEV;
2455
2456    options = kmalloc(count, GFP_KERNEL);
2457    if (!options)
2458        goto err_nomem;
2459    rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460    if (!rbd_dev)
2461        goto err_nomem;
2462
2463    /* static rbd_device initialization */
2464    spin_lock_init(&rbd_dev->lock);
2465    INIT_LIST_HEAD(&rbd_dev->node);
2466    INIT_LIST_HEAD(&rbd_dev->snaps);
2467    init_rwsem(&rbd_dev->header_rwsem);
2468
2469    /* generate unique id: find highest unique id, add one */
2470    rbd_id_get(rbd_dev);
2471
2472    /* Fill in the device name, now that we have its id. */
2473    BUILD_BUG_ON(DEV_NAME_LEN
2474            < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475    sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2476
2477    /* parse add command */
2478    rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2479                options, count);
2480    if (rc)
2481        goto err_put_id;
2482
2483    rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2484                        options);
2485    if (IS_ERR(rbd_dev->rbd_client)) {
2486        rc = PTR_ERR(rbd_dev->rbd_client);
2487        goto err_put_id;
2488    }
2489
2490    /* pick the pool */
2491    osdc = &rbd_dev->rbd_client->client->osdc;
2492    rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2493    if (rc < 0)
2494        goto err_out_client;
2495    rbd_dev->pool_id = rc;
2496
2497    /* register our block device */
2498    rc = register_blkdev(0, rbd_dev->name);
2499    if (rc < 0)
2500        goto err_out_client;
2501    rbd_dev->major = rc;
2502
2503    rc = rbd_bus_add_dev(rbd_dev);
2504    if (rc)
2505        goto err_out_blkdev;
2506
2507    /*
2508     * At this point cleanup in the event of an error is the job
2509     * of the sysfs code (initiated by rbd_bus_del_dev()).
2510     *
2511     * Set up and announce blkdev mapping.
2512     */
2513    rc = rbd_init_disk(rbd_dev);
2514    if (rc)
2515        goto err_out_bus;
2516
2517    rc = rbd_init_watch_dev(rbd_dev);
2518    if (rc)
2519        goto err_out_bus;
2520
2521    return count;
2522
2523err_out_bus:
2524    /* this will also clean up rest of rbd_dev stuff */
2525
2526    rbd_bus_del_dev(rbd_dev);
2527    kfree(options);
2528    return rc;
2529
2530err_out_blkdev:
2531    unregister_blkdev(rbd_dev->major, rbd_dev->name);
2532err_out_client:
2533    rbd_put_client(rbd_dev);
2534err_put_id:
2535    if (rbd_dev->pool_name) {
2536        kfree(rbd_dev->snap_name);
2537        kfree(rbd_dev->header_name);
2538        kfree(rbd_dev->image_name);
2539        kfree(rbd_dev->pool_name);
2540    }
2541    rbd_id_put(rbd_dev);
2542err_nomem:
2543    kfree(rbd_dev);
2544    kfree(options);
2545
2546    dout("Error adding device %s\n", buf);
2547    module_put(THIS_MODULE);
2548
2549    return (ssize_t) rc;
2550}
2551
2552static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2553{
2554    struct list_head *tmp;
2555    struct rbd_device *rbd_dev;
2556
2557    spin_lock(&rbd_dev_list_lock);
2558    list_for_each(tmp, &rbd_dev_list) {
2559        rbd_dev = list_entry(tmp, struct rbd_device, node);
2560        if (rbd_dev->dev_id == dev_id) {
2561            spin_unlock(&rbd_dev_list_lock);
2562            return rbd_dev;
2563        }
2564    }
2565    spin_unlock(&rbd_dev_list_lock);
2566    return NULL;
2567}
2568
2569static void rbd_dev_release(struct device *dev)
2570{
2571    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2572
2573    if (rbd_dev->watch_request) {
2574        struct ceph_client *client = rbd_dev->rbd_client->client;
2575
2576        ceph_osdc_unregister_linger_request(&client->osdc,
2577                            rbd_dev->watch_request);
2578    }
2579    if (rbd_dev->watch_event)
2580        rbd_req_sync_unwatch(rbd_dev);
2581
2582    rbd_put_client(rbd_dev);
2583
2584    /* clean up and free blkdev */
2585    rbd_free_disk(rbd_dev);
2586    unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587
2588    /* done with the id, and with the rbd_dev */
2589    kfree(rbd_dev->snap_name);
2590    kfree(rbd_dev->header_name);
2591    kfree(rbd_dev->pool_name);
2592    kfree(rbd_dev->image_name);
2593    rbd_id_put(rbd_dev);
2594    kfree(rbd_dev);
2595
2596    /* release module ref */
2597    module_put(THIS_MODULE);
2598}
2599
2600static ssize_t rbd_remove(struct bus_type *bus,
2601              const char *buf,
2602              size_t count)
2603{
2604    struct rbd_device *rbd_dev = NULL;
2605    int target_id, rc;
2606    unsigned long ul;
2607    int ret = count;
2608
2609    rc = strict_strtoul(buf, 10, &ul);
2610    if (rc)
2611        return rc;
2612
2613    /* convert to int; abort if we lost anything in the conversion */
2614    target_id = (int) ul;
2615    if (target_id != ul)
2616        return -EINVAL;
2617
2618    mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2619
2620    rbd_dev = __rbd_get_dev(target_id);
2621    if (!rbd_dev) {
2622        ret = -ENOENT;
2623        goto done;
2624    }
2625
2626    __rbd_remove_all_snaps(rbd_dev);
2627    rbd_bus_del_dev(rbd_dev);
2628
2629done:
2630    mutex_unlock(&ctl_mutex);
2631    return ret;
2632}
2633
2634static ssize_t rbd_snap_add(struct device *dev,
2635                struct device_attribute *attr,
2636                const char *buf,
2637                size_t count)
2638{
2639    struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640    int ret;
2641    char *name = kmalloc(count + 1, GFP_KERNEL);
2642    if (!name)
2643        return -ENOMEM;
2644
2645    snprintf(name, count, "%s", buf);
2646
2647    mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648
2649    ret = rbd_header_add_snap(rbd_dev,
2650                  name, GFP_KERNEL);
2651    if (ret < 0)
2652        goto err_unlock;
2653
2654    ret = __rbd_refresh_header(rbd_dev, NULL);
2655    if (ret < 0)
2656        goto err_unlock;
2657
2658    /* shouldn't hold ctl_mutex when notifying.. notify might
2659       trigger a watch callback that would need to get that mutex */
2660    mutex_unlock(&ctl_mutex);
2661
2662    /* make a best effort, don't error if failed */
2663    rbd_req_sync_notify(rbd_dev);
2664
2665    ret = count;
2666    kfree(name);
2667    return ret;
2668
2669err_unlock:
2670    mutex_unlock(&ctl_mutex);
2671    kfree(name);
2672    return ret;
2673}
2674
2675/*
2676 * create control files in sysfs
2677 * /sys/bus/rbd/...
2678 */
2679static int rbd_sysfs_init(void)
2680{
2681    int ret;
2682
2683    ret = device_register(&rbd_root_dev);
2684    if (ret < 0)
2685        return ret;
2686
2687    ret = bus_register(&rbd_bus_type);
2688    if (ret < 0)
2689        device_unregister(&rbd_root_dev);
2690
2691    return ret;
2692}
2693
2694static void rbd_sysfs_cleanup(void)
2695{
2696    bus_unregister(&rbd_bus_type);
2697    device_unregister(&rbd_root_dev);
2698}
2699
2700int __init rbd_init(void)
2701{
2702    int rc;
2703
2704    rc = rbd_sysfs_init();
2705    if (rc)
2706        return rc;
2707    pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2708    return 0;
2709}
2710
2711void __exit rbd_exit(void)
2712{
2713    rbd_sysfs_cleanup();
2714}
2715
2716module_init(rbd_init);
2717module_exit(rbd_exit);
2718
2719MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721MODULE_DESCRIPTION("rados block device");
2722
2723/* following authorship retained from original osdblk.c */
2724MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2725
2726MODULE_LICENSE("GPL");
2727

Archive Download this file



interactive