3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 #include <linux/workqueue.h>
47 #include "rbd_types.h"
49 #define RBD_DEBUG /* Activate rbd_assert() calls */
52 * The basic unit of block I/O is a sector. It is interpreted in a
53 * number of contexts in Linux (blk, bio, genhd), but the default is
54 * universally 512 bytes. These symbols are just slightly more
55 * meaningful than the bare numbers they represent.
57 #define SECTOR_SHIFT 9
58 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61 * Increment the given counter and return its updated value.
62 * If the counter is already 0 it will not be incremented.
63 * If the counter is already at its maximum value returns
64 * -EINVAL without updating it.
66 static int atomic_inc_return_safe(atomic_t *v)
70 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71 if (counter <= (unsigned int)INT_MAX)
79 /* Decrement the counter. Return the resulting value, or -EINVAL */
80 static int atomic_dec_return_safe(atomic_t *v)
84 counter = atomic_dec_return(v);
93 #define RBD_DRV_NAME "rbd"
95 #define RBD_MINORS_PER_MAJOR 256
96 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
98 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
99 #define RBD_MAX_SNAP_NAME_LEN \
100 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
102 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
104 #define RBD_SNAP_HEAD_NAME "-"
106 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
108 /* This allows a single page to hold an image name sent by OSD */
109 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
110 #define RBD_IMAGE_ID_LEN_MAX 64
112 #define RBD_OBJ_PREFIX_LEN_MAX 64
116 #define RBD_FEATURE_LAYERING (1<<0)
117 #define RBD_FEATURE_STRIPINGV2 (1<<1)
118 #define RBD_FEATURES_ALL \
119 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
121 /* Features supported by this (client software) implementation. */
123 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
128 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129 * enough to hold all possible device names.
131 #define DEV_NAME_LEN 32
132 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
135 * block device image metadata (in-memory version)
137 struct rbd_image_header {
138 /* These six fields never change for a given rbd image */
145 u64 features; /* Might be changeable someday? */
147 /* The remaining fields need to be updated occasionally */
149 struct ceph_snap_context *snapc;
150 char *snap_names; /* format 1 only */
151 u64 *snap_sizes; /* format 1 only */
155 * An rbd image specification.
157 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158 * identify an image. Each rbd_dev structure includes a pointer to
159 * an rbd_spec structure that encapsulates this identity.
161 * Each of the id's in an rbd_spec has an associated name. For a
162 * user-mapped image, the names are supplied and the id's associated
163 * with them are looked up. For a layered image, a parent image is
164 * defined by the tuple, and the names are looked up.
166 * An rbd_dev structure contains a parent_spec pointer which is
167 * non-null if the image it represents is a child in a layered
168 * image. This pointer will refer to the rbd_spec structure used
169 * by the parent rbd_dev for its own identity (i.e., the structure
170 * is shared between the parent and child).
172 * Since these structures are populated once, during the discovery
173 * phase of image construction, they are effectively immutable so
174 * we make no effort to synchronize access to them.
176 * Note that code herein does not assume the image name is known (it
177 * could be a null pointer).
181 const char *pool_name;
183 const char *image_id;
184 const char *image_name;
187 const char *snap_name;
193 * an instance of the client. multiple devices may share an rbd client.
196 struct ceph_client *client;
198 struct list_head node;
201 struct rbd_img_request;
202 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
206 struct rbd_obj_request;
207 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209 enum obj_request_type {
210 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
213 enum obj_operation_type {
220 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
221 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
222 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
223 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
226 struct rbd_obj_request {
227 const char *object_name;
228 u64 offset; /* object start byte */
229 u64 length; /* bytes from offset */
233 * An object request associated with an image will have its
234 * img_data flag set; a standalone object request will not.
236 * A standalone object request will have which == BAD_WHICH
237 * and a null obj_request pointer.
239 * An object request initiated in support of a layered image
240 * object (to check for its existence before a write) will
241 * have which == BAD_WHICH and a non-null obj_request pointer.
243 * Finally, an object request for rbd image data will have
244 * which != BAD_WHICH, and will have a non-null img_request
245 * pointer. The value of which will be in the range
246 * 0..(img_request->obj_request_count-1).
249 struct rbd_obj_request *obj_request; /* STAT op */
251 struct rbd_img_request *img_request;
253 /* links for img_request->obj_requests list */
254 struct list_head links;
257 u32 which; /* posn image request list */
259 enum obj_request_type type;
261 struct bio *bio_list;
267 struct page **copyup_pages;
268 u32 copyup_page_count;
270 struct ceph_osd_request *osd_req;
272 u64 xferred; /* bytes transferred */
275 rbd_obj_callback_t callback;
276 struct completion completion;
282 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
283 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
284 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
285 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
288 struct rbd_img_request {
289 struct rbd_device *rbd_dev;
290 u64 offset; /* starting image byte offset */
291 u64 length; /* byte count from offset */
294 u64 snap_id; /* for reads */
295 struct ceph_snap_context *snapc; /* for writes */
298 struct request *rq; /* block request */
299 struct rbd_obj_request *obj_request; /* obj req initiator */
301 struct page **copyup_pages;
302 u32 copyup_page_count;
303 spinlock_t completion_lock;/* protects next_completion */
305 rbd_img_callback_t callback;
306 u64 xferred;/* aggregate bytes transferred */
307 int result; /* first nonzero obj_request result */
309 u32 obj_request_count;
310 struct list_head obj_requests; /* rbd_obj_request structs */
315 #define for_each_obj_request(ireq, oreq) \
316 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_from(ireq, oreq) \
318 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319 #define for_each_obj_request_safe(ireq, oreq, n) \
320 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
332 int dev_id; /* blkdev unique id */
334 int major; /* blkdev assigned major */
336 struct gendisk *disk; /* blkdev's gendisk and rq */
338 u32 image_format; /* Either 1 or 2 */
339 struct rbd_client *rbd_client;
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
343 struct list_head rq_queue; /* incoming rq queue */
344 spinlock_t lock; /* queue, flags, open_count */
345 struct work_struct rq_work;
347 struct rbd_image_header header;
348 unsigned long flags; /* possibly lock protected */
349 struct rbd_spec *spec;
353 struct ceph_file_layout layout;
355 struct ceph_osd_event *watch_event;
356 struct rbd_obj_request *watch_request;
358 struct rbd_spec *parent_spec;
361 struct rbd_device *parent;
363 /* protects updating the header */
364 struct rw_semaphore header_rwsem;
366 struct rbd_mapping mapping;
368 struct list_head node;
372 unsigned long open_count; /* protected by lock */
376 * Flag bits for rbd_dev->flags. If atomicity is required,
377 * rbd_dev->lock is used to protect access.
379 * Currently, only the "removing" flag (which is coupled with the
380 * "open_count" field) requires atomic access.
383 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
384 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
387 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
389 static LIST_HEAD(rbd_dev_list); /* devices */
390 static DEFINE_SPINLOCK(rbd_dev_list_lock);
392 static LIST_HEAD(rbd_client_list); /* clients */
393 static DEFINE_SPINLOCK(rbd_client_list_lock);
395 /* Slab caches for frequently-allocated structures */
397 static struct kmem_cache *rbd_img_request_cache;
398 static struct kmem_cache *rbd_obj_request_cache;
399 static struct kmem_cache *rbd_segment_name_cache;
401 static int rbd_major;
402 static DEFINE_IDA(rbd_dev_id_ida);
404 static struct workqueue_struct *rbd_wq;
407 * Default to false for now, as single-major requires >= 0.75 version of
408 * userspace rbd utility.
410 static bool single_major = false;
411 module_param(single_major, bool, S_IRUGO);
412 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
414 static int rbd_img_request_submit(struct rbd_img_request *img_request);
416 static void rbd_dev_device_release(struct device *dev);
418 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
420 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
422 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
424 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
426 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
427 static void rbd_spec_put(struct rbd_spec *spec);
429 static int rbd_dev_id_to_minor(int dev_id)
431 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
434 static int minor_to_rbd_dev_id(int minor)
436 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
439 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
440 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
441 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
442 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
444 static struct attribute *rbd_bus_attrs[] = {
446 &bus_attr_remove.attr,
447 &bus_attr_add_single_major.attr,
448 &bus_attr_remove_single_major.attr,
452 static umode_t rbd_bus_is_visible(struct kobject *kobj,
453 struct attribute *attr, int index)
456 (attr == &bus_attr_add_single_major.attr ||
457 attr == &bus_attr_remove_single_major.attr))
463 static const struct attribute_group rbd_bus_group = {
464 .attrs = rbd_bus_attrs,
465 .is_visible = rbd_bus_is_visible,
467 __ATTRIBUTE_GROUPS(rbd_bus);
469 static struct bus_type rbd_bus_type = {
471 .bus_groups = rbd_bus_groups,
474 static void rbd_root_dev_release(struct device *dev)
478 static struct device rbd_root_dev = {
480 .release = rbd_root_dev_release,
483 static __printf(2, 3)
484 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
486 struct va_format vaf;
494 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
495 else if (rbd_dev->disk)
496 printk(KERN_WARNING "%s: %s: %pV\n",
497 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
498 else if (rbd_dev->spec && rbd_dev->spec->image_name)
499 printk(KERN_WARNING "%s: image %s: %pV\n",
500 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
501 else if (rbd_dev->spec && rbd_dev->spec->image_id)
502 printk(KERN_WARNING "%s: id %s: %pV\n",
503 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
505 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
506 RBD_DRV_NAME, rbd_dev, &vaf);
511 #define rbd_assert(expr) \
512 if (unlikely(!(expr))) { \
513 printk(KERN_ERR "\nAssertion failure in %s() " \
515 "\trbd_assert(%s);\n\n", \
516 __func__, __LINE__, #expr); \
519 #else /* !RBD_DEBUG */
520 # define rbd_assert(expr) ((void) 0)
521 #endif /* !RBD_DEBUG */
523 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
524 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
525 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
526 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
528 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
529 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
530 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
531 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
532 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
534 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
535 u8 *order, u64 *snap_size);
536 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
538 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
540 static int rbd_open(struct block_device *bdev, fmode_t mode)
542 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
543 bool removing = false;
545 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
548 spin_lock_irq(&rbd_dev->lock);
549 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
552 rbd_dev->open_count++;
553 spin_unlock_irq(&rbd_dev->lock);
557 (void) get_device(&rbd_dev->dev);
562 static void rbd_release(struct gendisk *disk, fmode_t mode)
564 struct rbd_device *rbd_dev = disk->private_data;
565 unsigned long open_count_before;
567 spin_lock_irq(&rbd_dev->lock);
568 open_count_before = rbd_dev->open_count--;
569 spin_unlock_irq(&rbd_dev->lock);
570 rbd_assert(open_count_before > 0);
572 put_device(&rbd_dev->dev);
575 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
580 bool ro_changed = false;
582 /* get_user() may sleep, so call it before taking rbd_dev->lock */
583 if (get_user(val, (int __user *)(arg)))
586 ro = val ? true : false;
587 /* Snapshot doesn't allow to write*/
588 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
591 spin_lock_irq(&rbd_dev->lock);
592 /* prevent others open this device */
593 if (rbd_dev->open_count > 1) {
598 if (rbd_dev->mapping.read_only != ro) {
599 rbd_dev->mapping.read_only = ro;
604 spin_unlock_irq(&rbd_dev->lock);
605 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
606 if (ret == 0 && ro_changed)
607 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
612 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
613 unsigned int cmd, unsigned long arg)
615 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
620 ret = rbd_ioctl_set_ro(rbd_dev, arg);
630 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
631 unsigned int cmd, unsigned long arg)
633 return rbd_ioctl(bdev, mode, cmd, arg);
635 #endif /* CONFIG_COMPAT */
637 static const struct block_device_operations rbd_bd_ops = {
638 .owner = THIS_MODULE,
640 .release = rbd_release,
643 .compat_ioctl = rbd_compat_ioctl,
648 * Initialize an rbd client instance. Success or not, this function
649 * consumes ceph_opts. Caller holds client_mutex.
651 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
653 struct rbd_client *rbdc;
656 dout("%s:\n", __func__);
657 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
661 kref_init(&rbdc->kref);
662 INIT_LIST_HEAD(&rbdc->node);
664 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
665 if (IS_ERR(rbdc->client))
667 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
669 ret = ceph_open_session(rbdc->client);
673 spin_lock(&rbd_client_list_lock);
674 list_add_tail(&rbdc->node, &rbd_client_list);
675 spin_unlock(&rbd_client_list_lock);
677 dout("%s: rbdc %p\n", __func__, rbdc);
681 ceph_destroy_client(rbdc->client);
686 ceph_destroy_options(ceph_opts);
687 dout("%s: error %d\n", __func__, ret);
692 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
694 kref_get(&rbdc->kref);
700 * Find a ceph client with specific addr and configuration. If
701 * found, bump its reference count.
703 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
705 struct rbd_client *client_node;
708 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
711 spin_lock(&rbd_client_list_lock);
712 list_for_each_entry(client_node, &rbd_client_list, node) {
713 if (!ceph_compare_options(ceph_opts, client_node->client)) {
714 __rbd_get_client(client_node);
720 spin_unlock(&rbd_client_list_lock);
722 return found ? client_node : NULL;
732 /* string args above */
735 /* Boolean args above */
739 static match_table_t rbd_opts_tokens = {
741 /* string args above */
742 {Opt_read_only, "read_only"},
743 {Opt_read_only, "ro"}, /* Alternate spelling */
744 {Opt_read_write, "read_write"},
745 {Opt_read_write, "rw"}, /* Alternate spelling */
746 /* Boolean args above */
754 #define RBD_READ_ONLY_DEFAULT false
756 static int parse_rbd_opts_token(char *c, void *private)
758 struct rbd_options *rbd_opts = private;
759 substring_t argstr[MAX_OPT_ARGS];
760 int token, intval, ret;
762 token = match_token(c, rbd_opts_tokens, argstr);
766 if (token < Opt_last_int) {
767 ret = match_int(&argstr[0], &intval);
769 pr_err("bad mount option arg (not int) "
773 dout("got int token %d val %d\n", token, intval);
774 } else if (token > Opt_last_int && token < Opt_last_string) {
775 dout("got string token %d val %s\n", token,
777 } else if (token > Opt_last_string && token < Opt_last_bool) {
778 dout("got Boolean token %d\n", token);
780 dout("got token %d\n", token);
785 rbd_opts->read_only = true;
788 rbd_opts->read_only = false;
797 static char* obj_op_name(enum obj_operation_type op_type)
812 * Get a ceph client with specific addr and configuration, if one does
813 * not exist create it. Either way, ceph_opts is consumed by this
816 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
818 struct rbd_client *rbdc;
820 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
821 rbdc = rbd_client_find(ceph_opts);
822 if (rbdc) /* using an existing client */
823 ceph_destroy_options(ceph_opts);
825 rbdc = rbd_client_create(ceph_opts);
826 mutex_unlock(&client_mutex);
832 * Destroy ceph client
834 * Caller must hold rbd_client_list_lock.
836 static void rbd_client_release(struct kref *kref)
838 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
840 dout("%s: rbdc %p\n", __func__, rbdc);
841 spin_lock(&rbd_client_list_lock);
842 list_del(&rbdc->node);
843 spin_unlock(&rbd_client_list_lock);
845 ceph_destroy_client(rbdc->client);
850 * Drop reference to ceph client node. If it's not referenced anymore, release
853 static void rbd_put_client(struct rbd_client *rbdc)
856 kref_put(&rbdc->kref, rbd_client_release);
859 static bool rbd_image_format_valid(u32 image_format)
861 return image_format == 1 || image_format == 2;
864 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
869 /* The header has to start with the magic rbd header text */
870 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
873 /* The bio layer requires at least sector-sized I/O */
875 if (ondisk->options.order < SECTOR_SHIFT)
878 /* If we use u64 in a few spots we may be able to loosen this */
880 if (ondisk->options.order > 8 * sizeof (int) - 1)
884 * The size of a snapshot header has to fit in a size_t, and
885 * that limits the number of snapshots.
887 snap_count = le32_to_cpu(ondisk->snap_count);
888 size = SIZE_MAX - sizeof (struct ceph_snap_context);
889 if (snap_count > size / sizeof (__le64))
893 * Not only that, but the size of the entire the snapshot
894 * header must also be representable in a size_t.
896 size -= snap_count * sizeof (__le64);
897 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
904 * Fill an rbd image header with information from the given format 1
907 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
908 struct rbd_image_header_ondisk *ondisk)
910 struct rbd_image_header *header = &rbd_dev->header;
911 bool first_time = header->object_prefix == NULL;
912 struct ceph_snap_context *snapc;
913 char *object_prefix = NULL;
914 char *snap_names = NULL;
915 u64 *snap_sizes = NULL;
921 /* Allocate this now to avoid having to handle failure below */
926 len = strnlen(ondisk->object_prefix,
927 sizeof (ondisk->object_prefix));
928 object_prefix = kmalloc(len + 1, GFP_KERNEL);
931 memcpy(object_prefix, ondisk->object_prefix, len);
932 object_prefix[len] = '\0';
935 /* Allocate the snapshot context and fill it in */
937 snap_count = le32_to_cpu(ondisk->snap_count);
938 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
941 snapc->seq = le64_to_cpu(ondisk->snap_seq);
943 struct rbd_image_snap_ondisk *snaps;
944 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
946 /* We'll keep a copy of the snapshot names... */
948 if (snap_names_len > (u64)SIZE_MAX)
950 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
954 /* ...as well as the array of their sizes. */
956 size = snap_count * sizeof (*header->snap_sizes);
957 snap_sizes = kmalloc(size, GFP_KERNEL);
962 * Copy the names, and fill in each snapshot's id
965 * Note that rbd_dev_v1_header_info() guarantees the
966 * ondisk buffer we're working with has
967 * snap_names_len bytes beyond the end of the
968 * snapshot id array, this memcpy() is safe.
970 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
971 snaps = ondisk->snaps;
972 for (i = 0; i < snap_count; i++) {
973 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
974 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
978 /* We won't fail any more, fill in the header */
981 header->object_prefix = object_prefix;
982 header->obj_order = ondisk->options.order;
983 header->crypt_type = ondisk->options.crypt_type;
984 header->comp_type = ondisk->options.comp_type;
985 /* The rest aren't used for format 1 images */
986 header->stripe_unit = 0;
987 header->stripe_count = 0;
988 header->features = 0;
990 ceph_put_snap_context(header->snapc);
991 kfree(header->snap_names);
992 kfree(header->snap_sizes);
995 /* The remaining fields always get updated (when we refresh) */
997 header->image_size = le64_to_cpu(ondisk->image_size);
998 header->snapc = snapc;
999 header->snap_names = snap_names;
1000 header->snap_sizes = snap_sizes;
1008 ceph_put_snap_context(snapc);
1009 kfree(object_prefix);
1014 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1016 const char *snap_name;
1018 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1020 /* Skip over names until we find the one we are looking for */
1022 snap_name = rbd_dev->header.snap_names;
1024 snap_name += strlen(snap_name) + 1;
1026 return kstrdup(snap_name, GFP_KERNEL);
1030 * Snapshot id comparison function for use with qsort()/bsearch().
1031 * Note that result is for snapshots in *descending* order.
1033 static int snapid_compare_reverse(const void *s1, const void *s2)
1035 u64 snap_id1 = *(u64 *)s1;
1036 u64 snap_id2 = *(u64 *)s2;
1038 if (snap_id1 < snap_id2)
1040 return snap_id1 == snap_id2 ? 0 : -1;
1044 * Search a snapshot context to see if the given snapshot id is
1047 * Returns the position of the snapshot id in the array if it's found,
1048 * or BAD_SNAP_INDEX otherwise.
1050 * Note: The snapshot array is in kept sorted (by the osd) in
1051 * reverse order, highest snapshot id first.
1053 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1055 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1058 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1059 sizeof (snap_id), snapid_compare_reverse);
1061 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1064 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1068 const char *snap_name;
1070 which = rbd_dev_snap_index(rbd_dev, snap_id);
1071 if (which == BAD_SNAP_INDEX)
1072 return ERR_PTR(-ENOENT);
1074 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1075 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1078 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1080 if (snap_id == CEPH_NOSNAP)
1081 return RBD_SNAP_HEAD_NAME;
1083 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1084 if (rbd_dev->image_format == 1)
1085 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1087 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1090 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1093 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1094 if (snap_id == CEPH_NOSNAP) {
1095 *snap_size = rbd_dev->header.image_size;
1096 } else if (rbd_dev->image_format == 1) {
1099 which = rbd_dev_snap_index(rbd_dev, snap_id);
1100 if (which == BAD_SNAP_INDEX)
1103 *snap_size = rbd_dev->header.snap_sizes[which];
1108 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1117 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1120 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1121 if (snap_id == CEPH_NOSNAP) {
1122 *snap_features = rbd_dev->header.features;
1123 } else if (rbd_dev->image_format == 1) {
1124 *snap_features = 0; /* No features for format 1 */
1129 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1133 *snap_features = features;
1138 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1140 u64 snap_id = rbd_dev->spec->snap_id;
1145 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1148 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1152 rbd_dev->mapping.size = size;
1153 rbd_dev->mapping.features = features;
1158 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1160 rbd_dev->mapping.size = 0;
1161 rbd_dev->mapping.features = 0;
1164 static void rbd_segment_name_free(const char *name)
1166 /* The explicit cast here is needed to drop the const qualifier */
1168 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1171 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1178 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1181 segment = offset >> rbd_dev->header.obj_order;
1182 name_format = "%s.%012llx";
1183 if (rbd_dev->image_format == 2)
1184 name_format = "%s.%016llx";
1185 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1186 rbd_dev->header.object_prefix, segment);
1187 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1188 pr_err("error formatting segment name for #%llu (%d)\n",
1190 rbd_segment_name_free(name);
1197 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1199 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1201 return offset & (segment_size - 1);
1204 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1205 u64 offset, u64 length)
1207 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1209 offset &= segment_size - 1;
1211 rbd_assert(length <= U64_MAX - offset);
1212 if (offset + length > segment_size)
1213 length = segment_size - offset;
1219 * returns the size of an object in the image
1221 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1223 return 1 << header->obj_order;
1230 static void bio_chain_put(struct bio *chain)
1236 chain = chain->bi_next;
1242 * zeros a bio chain, starting at specific offset
1244 static void zero_bio_chain(struct bio *chain, int start_ofs)
1247 struct bvec_iter iter;
1248 unsigned long flags;
1253 bio_for_each_segment(bv, chain, iter) {
1254 if (pos + bv.bv_len > start_ofs) {
1255 int remainder = max(start_ofs - pos, 0);
1256 buf = bvec_kmap_irq(&bv, &flags);
1257 memset(buf + remainder, 0,
1258 bv.bv_len - remainder);
1259 flush_dcache_page(bv.bv_page);
1260 bvec_kunmap_irq(buf, &flags);
1265 chain = chain->bi_next;
1270 * similar to zero_bio_chain(), zeros data defined by a page array,
1271 * starting at the given byte offset from the start of the array and
1272 * continuing up to the given end offset. The pages array is
1273 * assumed to be big enough to hold all bytes up to the end.
1275 static void zero_pages(struct page **pages, u64 offset, u64 end)
1277 struct page **page = &pages[offset >> PAGE_SHIFT];
1279 rbd_assert(end > offset);
1280 rbd_assert(end - offset <= (u64)SIZE_MAX);
1281 while (offset < end) {
1284 unsigned long flags;
1287 page_offset = offset & ~PAGE_MASK;
1288 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1289 local_irq_save(flags);
1290 kaddr = kmap_atomic(*page);
1291 memset(kaddr + page_offset, 0, length);
1292 flush_dcache_page(*page);
1293 kunmap_atomic(kaddr);
1294 local_irq_restore(flags);
1302 * Clone a portion of a bio, starting at the given byte offset
1303 * and continuing for the number of bytes indicated.
1305 static struct bio *bio_clone_range(struct bio *bio_src,
1306 unsigned int offset,
1312 bio = bio_clone(bio_src, gfpmask);
1314 return NULL; /* ENOMEM */
1316 bio_advance(bio, offset);
1317 bio->bi_iter.bi_size = len;
1323 * Clone a portion of a bio chain, starting at the given byte offset
1324 * into the first bio in the source chain and continuing for the
1325 * number of bytes indicated. The result is another bio chain of
1326 * exactly the given length, or a null pointer on error.
1328 * The bio_src and offset parameters are both in-out. On entry they
1329 * refer to the first source bio and the offset into that bio where
1330 * the start of data to be cloned is located.
1332 * On return, bio_src is updated to refer to the bio in the source
1333 * chain that contains first un-cloned byte, and *offset will
1334 * contain the offset of that byte within that bio.
1336 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1337 unsigned int *offset,
1341 struct bio *bi = *bio_src;
1342 unsigned int off = *offset;
1343 struct bio *chain = NULL;
1346 /* Build up a chain of clone bios up to the limit */
1348 if (!bi || off >= bi->bi_iter.bi_size || !len)
1349 return NULL; /* Nothing to clone */
1353 unsigned int bi_size;
1357 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1358 goto out_err; /* EINVAL; ran out of bio's */
1360 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1361 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1363 goto out_err; /* ENOMEM */
1366 end = &bio->bi_next;
1369 if (off == bi->bi_iter.bi_size) {
1380 bio_chain_put(chain);
1386 * The default/initial value for all object request flags is 0. For
1387 * each flag, once its value is set to 1 it is never reset to 0
1390 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1392 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1393 struct rbd_device *rbd_dev;
1395 rbd_dev = obj_request->img_request->rbd_dev;
1396 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1401 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1404 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1407 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1409 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1410 struct rbd_device *rbd_dev = NULL;
1412 if (obj_request_img_data_test(obj_request))
1413 rbd_dev = obj_request->img_request->rbd_dev;
1414 rbd_warn(rbd_dev, "obj_request %p already marked done",
1419 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1422 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1426 * This sets the KNOWN flag after (possibly) setting the EXISTS
1427 * flag. The latter is set based on the "exists" value provided.
1429 * Note that for our purposes once an object exists it never goes
1430 * away again. It's possible that the response from two existence
1431 * checks are separated by the creation of the target object, and
1432 * the first ("doesn't exist") response arrives *after* the second
1433 * ("does exist"). In that case we ignore the second one.
1435 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1439 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1440 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1444 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1447 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1450 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1453 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1456 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1458 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1460 return obj_request->img_offset <
1461 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1464 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1466 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1467 atomic_read(&obj_request->kref.refcount));
1468 kref_get(&obj_request->kref);
1471 static void rbd_obj_request_destroy(struct kref *kref);
1472 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1474 rbd_assert(obj_request != NULL);
1475 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1476 atomic_read(&obj_request->kref.refcount));
1477 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1480 static void rbd_img_request_get(struct rbd_img_request *img_request)
1482 dout("%s: img %p (was %d)\n", __func__, img_request,
1483 atomic_read(&img_request->kref.refcount));
1484 kref_get(&img_request->kref);
1487 static bool img_request_child_test(struct rbd_img_request *img_request);
1488 static void rbd_parent_request_destroy(struct kref *kref);
1489 static void rbd_img_request_destroy(struct kref *kref);
1490 static void rbd_img_request_put(struct rbd_img_request *img_request)
1492 rbd_assert(img_request != NULL);
1493 dout("%s: img %p (was %d)\n", __func__, img_request,
1494 atomic_read(&img_request->kref.refcount));
1495 if (img_request_child_test(img_request))
1496 kref_put(&img_request->kref, rbd_parent_request_destroy);
1498 kref_put(&img_request->kref, rbd_img_request_destroy);
1501 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1502 struct rbd_obj_request *obj_request)
1504 rbd_assert(obj_request->img_request == NULL);
1506 /* Image request now owns object's original reference */
1507 obj_request->img_request = img_request;
1508 obj_request->which = img_request->obj_request_count;
1509 rbd_assert(!obj_request_img_data_test(obj_request));
1510 obj_request_img_data_set(obj_request);
1511 rbd_assert(obj_request->which != BAD_WHICH);
1512 img_request->obj_request_count++;
1513 list_add_tail(&obj_request->links, &img_request->obj_requests);
1514 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1515 obj_request->which);
1518 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1519 struct rbd_obj_request *obj_request)
1521 rbd_assert(obj_request->which != BAD_WHICH);
1523 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1524 obj_request->which);
1525 list_del(&obj_request->links);
1526 rbd_assert(img_request->obj_request_count > 0);
1527 img_request->obj_request_count--;
1528 rbd_assert(obj_request->which == img_request->obj_request_count);
1529 obj_request->which = BAD_WHICH;
1530 rbd_assert(obj_request_img_data_test(obj_request));
1531 rbd_assert(obj_request->img_request == img_request);
1532 obj_request->img_request = NULL;
1533 obj_request->callback = NULL;
1534 rbd_obj_request_put(obj_request);
1537 static bool obj_request_type_valid(enum obj_request_type type)
1540 case OBJ_REQUEST_NODATA:
1541 case OBJ_REQUEST_BIO:
1542 case OBJ_REQUEST_PAGES:
1549 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1550 struct rbd_obj_request *obj_request)
1552 dout("%s %p\n", __func__, obj_request);
1553 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1556 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1558 dout("%s %p\n", __func__, obj_request);
1559 ceph_osdc_cancel_request(obj_request->osd_req);
1563 * Wait for an object request to complete. If interrupted, cancel the
1564 * underlying osd request.
1566 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1570 dout("%s %p\n", __func__, obj_request);
1572 ret = wait_for_completion_interruptible(&obj_request->completion);
1574 dout("%s %p interrupted\n", __func__, obj_request);
1575 rbd_obj_request_end(obj_request);
1579 dout("%s %p done\n", __func__, obj_request);
1583 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1586 dout("%s: img %p\n", __func__, img_request);
1589 * If no error occurred, compute the aggregate transfer
1590 * count for the image request. We could instead use
1591 * atomic64_cmpxchg() to update it as each object request
1592 * completes; not clear which way is better off hand.
1594 if (!img_request->result) {
1595 struct rbd_obj_request *obj_request;
1598 for_each_obj_request(img_request, obj_request)
1599 xferred += obj_request->xferred;
1600 img_request->xferred = xferred;
1603 if (img_request->callback)
1604 img_request->callback(img_request);
1606 rbd_img_request_put(img_request);
1610 * The default/initial value for all image request flags is 0. Each
1611 * is conditionally set to 1 at image request initialization time
1612 * and currently never change thereafter.
1614 static void img_request_write_set(struct rbd_img_request *img_request)
1616 set_bit(IMG_REQ_WRITE, &img_request->flags);
1620 static bool img_request_write_test(struct rbd_img_request *img_request)
1623 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1627 * Set the discard flag when the img_request is an discard request
1629 static void img_request_discard_set(struct rbd_img_request *img_request)
1631 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1635 static bool img_request_discard_test(struct rbd_img_request *img_request)
1638 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1641 static void img_request_child_set(struct rbd_img_request *img_request)
1643 set_bit(IMG_REQ_CHILD, &img_request->flags);
1647 static void img_request_child_clear(struct rbd_img_request *img_request)
1649 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1653 static bool img_request_child_test(struct rbd_img_request *img_request)
1656 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1659 static void img_request_layered_set(struct rbd_img_request *img_request)
1661 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1665 static void img_request_layered_clear(struct rbd_img_request *img_request)
1667 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1671 static bool img_request_layered_test(struct rbd_img_request *img_request)
1674 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1677 static enum obj_operation_type
1678 rbd_img_request_op_type(struct rbd_img_request *img_request)
1680 if (img_request_write_test(img_request))
1681 return OBJ_OP_WRITE;
1682 else if (img_request_discard_test(img_request))
1683 return OBJ_OP_DISCARD;
1689 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1691 u64 xferred = obj_request->xferred;
1692 u64 length = obj_request->length;
1694 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1695 obj_request, obj_request->img_request, obj_request->result,
1698 * ENOENT means a hole in the image. We zero-fill the entire
1699 * length of the request. A short read also implies zero-fill
1700 * to the end of the request. An error requires the whole
1701 * length of the request to be reported finished with an error
1702 * to the block layer. In each case we update the xferred
1703 * count to indicate the whole request was satisfied.
1705 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1706 if (obj_request->result == -ENOENT) {
1707 if (obj_request->type == OBJ_REQUEST_BIO)
1708 zero_bio_chain(obj_request->bio_list, 0);
1710 zero_pages(obj_request->pages, 0, length);
1711 obj_request->result = 0;
1712 } else if (xferred < length && !obj_request->result) {
1713 if (obj_request->type == OBJ_REQUEST_BIO)
1714 zero_bio_chain(obj_request->bio_list, xferred);
1716 zero_pages(obj_request->pages, xferred, length);
1718 obj_request->xferred = length;
1719 obj_request_done_set(obj_request);
1722 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1724 dout("%s: obj %p cb %p\n", __func__, obj_request,
1725 obj_request->callback);
1726 if (obj_request->callback)
1727 obj_request->callback(obj_request);
1729 complete_all(&obj_request->completion);
1732 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1734 dout("%s: obj %p\n", __func__, obj_request);
1735 obj_request_done_set(obj_request);
1738 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1740 struct rbd_img_request *img_request = NULL;
1741 struct rbd_device *rbd_dev = NULL;
1742 bool layered = false;
1744 if (obj_request_img_data_test(obj_request)) {
1745 img_request = obj_request->img_request;
1746 layered = img_request && img_request_layered_test(img_request);
1747 rbd_dev = img_request->rbd_dev;
1750 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1751 obj_request, img_request, obj_request->result,
1752 obj_request->xferred, obj_request->length);
1753 if (layered && obj_request->result == -ENOENT &&
1754 obj_request->img_offset < rbd_dev->parent_overlap)
1755 rbd_img_parent_read(obj_request);
1756 else if (img_request)
1757 rbd_img_obj_request_read_callback(obj_request);
1759 obj_request_done_set(obj_request);
1762 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1764 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1765 obj_request->result, obj_request->length);
1767 * There is no such thing as a successful short write. Set
1768 * it to our originally-requested length.
1770 obj_request->xferred = obj_request->length;
1771 obj_request_done_set(obj_request);
1774 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1776 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1777 obj_request->result, obj_request->length);
1779 * There is no such thing as a successful short discard. Set
1780 * it to our originally-requested length.
1782 obj_request->xferred = obj_request->length;
1783 /* discarding a non-existent object is not a problem */
1784 if (obj_request->result == -ENOENT)
1785 obj_request->result = 0;
1786 obj_request_done_set(obj_request);
1790 * For a simple stat call there's nothing to do. We'll do more if
1791 * this is part of a write sequence for a layered image.
1793 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1795 dout("%s: obj %p\n", __func__, obj_request);
1796 obj_request_done_set(obj_request);
1799 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1801 dout("%s: obj %p\n", __func__, obj_request);
1803 if (obj_request_img_data_test(obj_request))
1804 rbd_osd_copyup_callback(obj_request);
1806 obj_request_done_set(obj_request);
1809 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1810 struct ceph_msg *msg)
1812 struct rbd_obj_request *obj_request = osd_req->r_priv;
1815 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1816 rbd_assert(osd_req == obj_request->osd_req);
1817 if (obj_request_img_data_test(obj_request)) {
1818 rbd_assert(obj_request->img_request);
1819 rbd_assert(obj_request->which != BAD_WHICH);
1821 rbd_assert(obj_request->which == BAD_WHICH);
1824 if (osd_req->r_result < 0)
1825 obj_request->result = osd_req->r_result;
1827 rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1830 * We support a 64-bit length, but ultimately it has to be
1831 * passed to blk_end_request(), which takes an unsigned int.
1833 obj_request->xferred = osd_req->r_reply_op_len[0];
1834 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1836 opcode = osd_req->r_ops[0].op;
1838 case CEPH_OSD_OP_READ:
1839 rbd_osd_read_callback(obj_request);
1841 case CEPH_OSD_OP_SETALLOCHINT:
1842 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1844 case CEPH_OSD_OP_WRITE:
1845 rbd_osd_write_callback(obj_request);
1847 case CEPH_OSD_OP_STAT:
1848 rbd_osd_stat_callback(obj_request);
1850 case CEPH_OSD_OP_DELETE:
1851 case CEPH_OSD_OP_TRUNCATE:
1852 case CEPH_OSD_OP_ZERO:
1853 rbd_osd_discard_callback(obj_request);
1855 case CEPH_OSD_OP_CALL:
1856 rbd_osd_call_callback(obj_request);
1858 case CEPH_OSD_OP_NOTIFY_ACK:
1859 case CEPH_OSD_OP_WATCH:
1860 rbd_osd_trivial_callback(obj_request);
1863 rbd_warn(NULL, "%s: unsupported op %hu",
1864 obj_request->object_name, (unsigned short) opcode);
1868 if (obj_request_done_test(obj_request))
1869 rbd_obj_request_complete(obj_request);
1872 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1874 struct rbd_img_request *img_request = obj_request->img_request;
1875 struct ceph_osd_request *osd_req = obj_request->osd_req;
1878 rbd_assert(osd_req != NULL);
1880 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1881 ceph_osdc_build_request(osd_req, obj_request->offset,
1882 NULL, snap_id, NULL);
1885 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1887 struct rbd_img_request *img_request = obj_request->img_request;
1888 struct ceph_osd_request *osd_req = obj_request->osd_req;
1889 struct ceph_snap_context *snapc;
1890 struct timespec mtime = CURRENT_TIME;
1892 rbd_assert(osd_req != NULL);
1894 snapc = img_request ? img_request->snapc : NULL;
1895 ceph_osdc_build_request(osd_req, obj_request->offset,
1896 snapc, CEPH_NOSNAP, &mtime);
1900 * Create an osd request. A read request has one osd op (read).
1901 * A write request has either one (watch) or two (hint+write) osd ops.
1902 * (All rbd data writes are prefixed with an allocation hint op, but
1903 * technically osd watch is a write request, hence this distinction.)
1905 static struct ceph_osd_request *rbd_osd_req_create(
1906 struct rbd_device *rbd_dev,
1907 enum obj_operation_type op_type,
1908 unsigned int num_ops,
1909 struct rbd_obj_request *obj_request)
1911 struct ceph_snap_context *snapc = NULL;
1912 struct ceph_osd_client *osdc;
1913 struct ceph_osd_request *osd_req;
1915 if (obj_request_img_data_test(obj_request) &&
1916 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1917 struct rbd_img_request *img_request = obj_request->img_request;
1918 if (op_type == OBJ_OP_WRITE) {
1919 rbd_assert(img_request_write_test(img_request));
1921 rbd_assert(img_request_discard_test(img_request));
1923 snapc = img_request->snapc;
1926 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1928 /* Allocate and initialize the request, for the num_ops ops */
1930 osdc = &rbd_dev->rbd_client->client->osdc;
1931 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1934 return NULL; /* ENOMEM */
1936 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
1937 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1939 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1941 osd_req->r_callback = rbd_osd_req_callback;
1942 osd_req->r_priv = obj_request;
1944 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1945 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1951 * Create a copyup osd request based on the information in the object
1952 * request supplied. A copyup request has two or three osd ops, a
1953 * copyup method call, potentially a hint op, and a write or truncate
1956 static struct ceph_osd_request *
1957 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1959 struct rbd_img_request *img_request;
1960 struct ceph_snap_context *snapc;
1961 struct rbd_device *rbd_dev;
1962 struct ceph_osd_client *osdc;
1963 struct ceph_osd_request *osd_req;
1964 int num_osd_ops = 3;
1966 rbd_assert(obj_request_img_data_test(obj_request));
1967 img_request = obj_request->img_request;
1968 rbd_assert(img_request);
1969 rbd_assert(img_request_write_test(img_request) ||
1970 img_request_discard_test(img_request));
1972 if (img_request_discard_test(img_request))
1975 /* Allocate and initialize the request, for all the ops */
1977 snapc = img_request->snapc;
1978 rbd_dev = img_request->rbd_dev;
1979 osdc = &rbd_dev->rbd_client->client->osdc;
1980 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
1983 return NULL; /* ENOMEM */
1985 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1986 osd_req->r_callback = rbd_osd_req_callback;
1987 osd_req->r_priv = obj_request;
1989 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1990 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1996 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1998 ceph_osdc_put_request(osd_req);
2001 /* object_name is assumed to be a non-null pointer and NUL-terminated */
2003 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2004 u64 offset, u64 length,
2005 enum obj_request_type type)
2007 struct rbd_obj_request *obj_request;
2011 rbd_assert(obj_request_type_valid(type));
2013 size = strlen(object_name) + 1;
2014 name = kmalloc(size, GFP_NOIO);
2018 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2024 obj_request->object_name = memcpy(name, object_name, size);
2025 obj_request->offset = offset;
2026 obj_request->length = length;
2027 obj_request->flags = 0;
2028 obj_request->which = BAD_WHICH;
2029 obj_request->type = type;
2030 INIT_LIST_HEAD(&obj_request->links);
2031 init_completion(&obj_request->completion);
2032 kref_init(&obj_request->kref);
2034 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2035 offset, length, (int)type, obj_request);
2040 static void rbd_obj_request_destroy(struct kref *kref)
2042 struct rbd_obj_request *obj_request;
2044 obj_request = container_of(kref, struct rbd_obj_request, kref);
2046 dout("%s: obj %p\n", __func__, obj_request);
2048 rbd_assert(obj_request->img_request == NULL);
2049 rbd_assert(obj_request->which == BAD_WHICH);
2051 if (obj_request->osd_req)
2052 rbd_osd_req_destroy(obj_request->osd_req);
2054 rbd_assert(obj_request_type_valid(obj_request->type));
2055 switch (obj_request->type) {
2056 case OBJ_REQUEST_NODATA:
2057 break; /* Nothing to do */
2058 case OBJ_REQUEST_BIO:
2059 if (obj_request->bio_list)
2060 bio_chain_put(obj_request->bio_list);
2062 case OBJ_REQUEST_PAGES:
2063 if (obj_request->pages)
2064 ceph_release_page_vector(obj_request->pages,
2065 obj_request->page_count);
2069 kfree(obj_request->object_name);
2070 obj_request->object_name = NULL;
2071 kmem_cache_free(rbd_obj_request_cache, obj_request);
2074 /* It's OK to call this for a device with no parent */
2076 static void rbd_spec_put(struct rbd_spec *spec);
2077 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2079 rbd_dev_remove_parent(rbd_dev);
2080 rbd_spec_put(rbd_dev->parent_spec);
2081 rbd_dev->parent_spec = NULL;
2082 rbd_dev->parent_overlap = 0;
2086 * Parent image reference counting is used to determine when an
2087 * image's parent fields can be safely torn down--after there are no
2088 * more in-flight requests to the parent image. When the last
2089 * reference is dropped, cleaning them up is safe.
2091 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2095 if (!rbd_dev->parent_spec)
2098 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2102 /* Last reference; clean up parent data structures */
2105 rbd_dev_unparent(rbd_dev);
2107 rbd_warn(rbd_dev, "parent reference underflow");
2111 * If an image has a non-zero parent overlap, get a reference to its
2114 * Returns true if the rbd device has a parent with a non-zero
2115 * overlap and a reference for it was successfully taken, or
2118 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2122 if (!rbd_dev->parent_spec)
2125 down_read(&rbd_dev->header_rwsem);
2126 if (rbd_dev->parent_overlap)
2127 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2128 up_read(&rbd_dev->header_rwsem);
2131 rbd_warn(rbd_dev, "parent reference overflow");
2137 * Caller is responsible for filling in the list of object requests
2138 * that comprises the image request, and the Linux request pointer
2139 * (if there is one).
2141 static struct rbd_img_request *rbd_img_request_create(
2142 struct rbd_device *rbd_dev,
2143 u64 offset, u64 length,
2144 enum obj_operation_type op_type,
2145 struct ceph_snap_context *snapc)
2147 struct rbd_img_request *img_request;
2149 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2153 img_request->rq = NULL;
2154 img_request->rbd_dev = rbd_dev;
2155 img_request->offset = offset;
2156 img_request->length = length;
2157 img_request->flags = 0;
2158 if (op_type == OBJ_OP_DISCARD) {
2159 img_request_discard_set(img_request);
2160 img_request->snapc = snapc;
2161 } else if (op_type == OBJ_OP_WRITE) {
2162 img_request_write_set(img_request);
2163 img_request->snapc = snapc;
2165 img_request->snap_id = rbd_dev->spec->snap_id;
2167 if (rbd_dev_parent_get(rbd_dev))
2168 img_request_layered_set(img_request);
2169 spin_lock_init(&img_request->completion_lock);
2170 img_request->next_completion = 0;
2171 img_request->callback = NULL;
2172 img_request->result = 0;
2173 img_request->obj_request_count = 0;
2174 INIT_LIST_HEAD(&img_request->obj_requests);
2175 kref_init(&img_request->kref);
2177 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2178 obj_op_name(op_type), offset, length, img_request);
2183 static void rbd_img_request_destroy(struct kref *kref)
2185 struct rbd_img_request *img_request;
2186 struct rbd_obj_request *obj_request;
2187 struct rbd_obj_request *next_obj_request;
2189 img_request = container_of(kref, struct rbd_img_request, kref);
2191 dout("%s: img %p\n", __func__, img_request);
2193 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2194 rbd_img_obj_request_del(img_request, obj_request);
2195 rbd_assert(img_request->obj_request_count == 0);
2197 if (img_request_layered_test(img_request)) {
2198 img_request_layered_clear(img_request);
2199 rbd_dev_parent_put(img_request->rbd_dev);
2202 if (img_request_write_test(img_request) ||
2203 img_request_discard_test(img_request))
2204 ceph_put_snap_context(img_request->snapc);
2206 kmem_cache_free(rbd_img_request_cache, img_request);
2209 static struct rbd_img_request *rbd_parent_request_create(
2210 struct rbd_obj_request *obj_request,
2211 u64 img_offset, u64 length)
2213 struct rbd_img_request *parent_request;
2214 struct rbd_device *rbd_dev;
2216 rbd_assert(obj_request->img_request);
2217 rbd_dev = obj_request->img_request->rbd_dev;
2219 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2220 length, OBJ_OP_READ, NULL);
2221 if (!parent_request)
2224 img_request_child_set(parent_request);
2225 rbd_obj_request_get(obj_request);
2226 parent_request->obj_request = obj_request;
2228 return parent_request;
2231 static void rbd_parent_request_destroy(struct kref *kref)
2233 struct rbd_img_request *parent_request;
2234 struct rbd_obj_request *orig_request;
2236 parent_request = container_of(kref, struct rbd_img_request, kref);
2237 orig_request = parent_request->obj_request;
2239 parent_request->obj_request = NULL;
2240 rbd_obj_request_put(orig_request);
2241 img_request_child_clear(parent_request);
2243 rbd_img_request_destroy(kref);
2246 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2248 struct rbd_img_request *img_request;
2249 unsigned int xferred;
2253 rbd_assert(obj_request_img_data_test(obj_request));
2254 img_request = obj_request->img_request;
2256 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2257 xferred = (unsigned int)obj_request->xferred;
2258 result = obj_request->result;
2260 struct rbd_device *rbd_dev = img_request->rbd_dev;
2261 enum obj_operation_type op_type;
2263 if (img_request_discard_test(img_request))
2264 op_type = OBJ_OP_DISCARD;
2265 else if (img_request_write_test(img_request))
2266 op_type = OBJ_OP_WRITE;
2268 op_type = OBJ_OP_READ;
2270 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2271 obj_op_name(op_type), obj_request->length,
2272 obj_request->img_offset, obj_request->offset);
2273 rbd_warn(rbd_dev, " result %d xferred %x",
2275 if (!img_request->result)
2276 img_request->result = result;
2278 * Need to end I/O on the entire obj_request worth of
2279 * bytes in case of error.
2281 xferred = obj_request->length;
2284 /* Image object requests don't own their page array */
2286 if (obj_request->type == OBJ_REQUEST_PAGES) {
2287 obj_request->pages = NULL;
2288 obj_request->page_count = 0;
2291 if (img_request_child_test(img_request)) {
2292 rbd_assert(img_request->obj_request != NULL);
2293 more = obj_request->which < img_request->obj_request_count - 1;
2295 rbd_assert(img_request->rq != NULL);
2296 more = blk_end_request(img_request->rq, result, xferred);
2302 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2304 struct rbd_img_request *img_request;
2305 u32 which = obj_request->which;
2308 rbd_assert(obj_request_img_data_test(obj_request));
2309 img_request = obj_request->img_request;
2311 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2312 rbd_assert(img_request != NULL);
2313 rbd_assert(img_request->obj_request_count > 0);
2314 rbd_assert(which != BAD_WHICH);
2315 rbd_assert(which < img_request->obj_request_count);
2317 spin_lock_irq(&img_request->completion_lock);
2318 if (which != img_request->next_completion)
2321 for_each_obj_request_from(img_request, obj_request) {
2323 rbd_assert(which < img_request->obj_request_count);
2325 if (!obj_request_done_test(obj_request))
2327 more = rbd_img_obj_end_request(obj_request);
2331 rbd_assert(more ^ (which == img_request->obj_request_count));
2332 img_request->next_completion = which;
2334 spin_unlock_irq(&img_request->completion_lock);
2335 rbd_img_request_put(img_request);
2338 rbd_img_request_complete(img_request);
2342 * Add individual osd ops to the given ceph_osd_request and prepare
2343 * them for submission. num_ops is the current number of
2344 * osd operations already to the object request.
2346 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2347 struct ceph_osd_request *osd_request,
2348 enum obj_operation_type op_type,
2349 unsigned int num_ops)
2351 struct rbd_img_request *img_request = obj_request->img_request;
2352 struct rbd_device *rbd_dev = img_request->rbd_dev;
2353 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2354 u64 offset = obj_request->offset;
2355 u64 length = obj_request->length;
2359 if (op_type == OBJ_OP_DISCARD) {
2360 if (!offset && length == object_size &&
2361 (!img_request_layered_test(img_request) ||
2362 !obj_request_overlaps_parent(obj_request))) {
2363 opcode = CEPH_OSD_OP_DELETE;
2364 } else if ((offset + length == object_size)) {
2365 opcode = CEPH_OSD_OP_TRUNCATE;
2367 down_read(&rbd_dev->header_rwsem);
2368 img_end = rbd_dev->header.image_size;
2369 up_read(&rbd_dev->header_rwsem);
2371 if (obj_request->img_offset + length == img_end)
2372 opcode = CEPH_OSD_OP_TRUNCATE;
2374 opcode = CEPH_OSD_OP_ZERO;
2376 } else if (op_type == OBJ_OP_WRITE) {
2377 opcode = CEPH_OSD_OP_WRITE;
2378 osd_req_op_alloc_hint_init(osd_request, num_ops,
2379 object_size, object_size);
2382 opcode = CEPH_OSD_OP_READ;
2385 osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
2387 if (obj_request->type == OBJ_REQUEST_BIO)
2388 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2389 obj_request->bio_list, length);
2390 else if (obj_request->type == OBJ_REQUEST_PAGES)
2391 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2392 obj_request->pages, length,
2393 offset & ~PAGE_MASK, false, false);
2395 /* Discards are also writes */
2396 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2397 rbd_osd_req_format_write(obj_request);
2399 rbd_osd_req_format_read(obj_request);
2403 * Split up an image request into one or more object requests, each
2404 * to a different object. The "type" parameter indicates whether
2405 * "data_desc" is the pointer to the head of a list of bio
2406 * structures, or the base of a page array. In either case this
2407 * function assumes data_desc describes memory sufficient to hold
2408 * all data described by the image request.
2410 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2411 enum obj_request_type type,
2414 struct rbd_device *rbd_dev = img_request->rbd_dev;
2415 struct rbd_obj_request *obj_request = NULL;
2416 struct rbd_obj_request *next_obj_request;
2417 struct bio *bio_list = NULL;
2418 unsigned int bio_offset = 0;
2419 struct page **pages = NULL;
2420 enum obj_operation_type op_type;
2424 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2425 (int)type, data_desc);
2427 img_offset = img_request->offset;
2428 resid = img_request->length;
2429 rbd_assert(resid > 0);
2430 op_type = rbd_img_request_op_type(img_request);
2432 if (type == OBJ_REQUEST_BIO) {
2433 bio_list = data_desc;
2434 rbd_assert(img_offset ==
2435 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2436 } else if (type == OBJ_REQUEST_PAGES) {
2441 struct ceph_osd_request *osd_req;
2442 const char *object_name;
2446 object_name = rbd_segment_name(rbd_dev, img_offset);
2449 offset = rbd_segment_offset(rbd_dev, img_offset);
2450 length = rbd_segment_length(rbd_dev, img_offset, resid);
2451 obj_request = rbd_obj_request_create(object_name,
2452 offset, length, type);
2453 /* object request has its own copy of the object name */
2454 rbd_segment_name_free(object_name);
2459 * set obj_request->img_request before creating the
2460 * osd_request so that it gets the right snapc
2462 rbd_img_obj_request_add(img_request, obj_request);
2464 if (type == OBJ_REQUEST_BIO) {
2465 unsigned int clone_size;
2467 rbd_assert(length <= (u64)UINT_MAX);
2468 clone_size = (unsigned int)length;
2469 obj_request->bio_list =
2470 bio_chain_clone_range(&bio_list,
2474 if (!obj_request->bio_list)
2476 } else if (type == OBJ_REQUEST_PAGES) {
2477 unsigned int page_count;
2479 obj_request->pages = pages;
2480 page_count = (u32)calc_pages_for(offset, length);
2481 obj_request->page_count = page_count;
2482 if ((offset + length) & ~PAGE_MASK)
2483 page_count--; /* more on last page */
2484 pages += page_count;
2487 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2488 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2493 obj_request->osd_req = osd_req;
2494 obj_request->callback = rbd_img_obj_callback;
2495 obj_request->img_offset = img_offset;
2497 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2499 rbd_img_request_get(img_request);
2501 img_offset += length;
2508 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2509 rbd_img_obj_request_del(img_request, obj_request);
2515 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2517 struct rbd_img_request *img_request;
2518 struct rbd_device *rbd_dev;
2519 struct page **pages;
2522 dout("%s: obj %p\n", __func__, obj_request);
2524 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2525 obj_request->type == OBJ_REQUEST_NODATA);
2526 rbd_assert(obj_request_img_data_test(obj_request));
2527 img_request = obj_request->img_request;
2528 rbd_assert(img_request);
2530 rbd_dev = img_request->rbd_dev;
2531 rbd_assert(rbd_dev);
2533 pages = obj_request->copyup_pages;
2534 rbd_assert(pages != NULL);
2535 obj_request->copyup_pages = NULL;
2536 page_count = obj_request->copyup_page_count;
2537 rbd_assert(page_count);
2538 obj_request->copyup_page_count = 0;
2539 ceph_release_page_vector(pages, page_count);
2542 * We want the transfer count to reflect the size of the
2543 * original write request. There is no such thing as a
2544 * successful short write, so if the request was successful
2545 * we can just set it to the originally-requested length.
2547 if (!obj_request->result)
2548 obj_request->xferred = obj_request->length;
2550 obj_request_done_set(obj_request);
2554 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2556 struct rbd_obj_request *orig_request;
2557 struct ceph_osd_request *osd_req;
2558 struct ceph_osd_client *osdc;
2559 struct rbd_device *rbd_dev;
2560 struct page **pages;
2561 enum obj_operation_type op_type;
2566 rbd_assert(img_request_child_test(img_request));
2568 /* First get what we need from the image request */
2570 pages = img_request->copyup_pages;
2571 rbd_assert(pages != NULL);
2572 img_request->copyup_pages = NULL;
2573 page_count = img_request->copyup_page_count;
2574 rbd_assert(page_count);
2575 img_request->copyup_page_count = 0;
2577 orig_request = img_request->obj_request;
2578 rbd_assert(orig_request != NULL);
2579 rbd_assert(obj_request_type_valid(orig_request->type));
2580 img_result = img_request->result;
2581 parent_length = img_request->length;
2582 rbd_assert(parent_length == img_request->xferred);
2583 rbd_img_request_put(img_request);
2585 rbd_assert(orig_request->img_request);
2586 rbd_dev = orig_request->img_request->rbd_dev;
2587 rbd_assert(rbd_dev);
2590 * If the overlap has become 0 (most likely because the
2591 * image has been flattened) we need to free the pages
2592 * and re-submit the original write request.
2594 if (!rbd_dev->parent_overlap) {
2595 struct ceph_osd_client *osdc;
2597 ceph_release_page_vector(pages, page_count);
2598 osdc = &rbd_dev->rbd_client->client->osdc;
2599 img_result = rbd_obj_request_submit(osdc, orig_request);
2608 * The original osd request is of no use to use any more.
2609 * We need a new one that can hold the three ops in a copyup
2610 * request. Allocate the new copyup osd request for the
2611 * original request, and release the old one.
2613 img_result = -ENOMEM;
2614 osd_req = rbd_osd_req_create_copyup(orig_request);
2617 rbd_osd_req_destroy(orig_request->osd_req);
2618 orig_request->osd_req = osd_req;
2619 orig_request->copyup_pages = pages;
2620 orig_request->copyup_page_count = page_count;
2622 /* Initialize the copyup op */
2624 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2625 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2628 /* Add the other op(s) */
2630 op_type = rbd_img_request_op_type(orig_request->img_request);
2631 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2633 /* All set, send it off. */
2635 osdc = &rbd_dev->rbd_client->client->osdc;
2636 img_result = rbd_obj_request_submit(osdc, orig_request);
2640 /* Record the error code and complete the request */
2642 orig_request->result = img_result;
2643 orig_request->xferred = 0;
2644 obj_request_done_set(orig_request);
2645 rbd_obj_request_complete(orig_request);
2649 * Read from the parent image the range of data that covers the
2650 * entire target of the given object request. This is used for
2651 * satisfying a layered image write request when the target of an
2652 * object request from the image request does not exist.
2654 * A page array big enough to hold the returned data is allocated
2655 * and supplied to rbd_img_request_fill() as the "data descriptor."
2656 * When the read completes, this page array will be transferred to
2657 * the original object request for the copyup operation.
2659 * If an error occurs, record it as the result of the original
2660 * object request and mark it done so it gets completed.
2662 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2664 struct rbd_img_request *img_request = NULL;
2665 struct rbd_img_request *parent_request = NULL;
2666 struct rbd_device *rbd_dev;
2669 struct page **pages = NULL;
2673 rbd_assert(obj_request_img_data_test(obj_request));
2674 rbd_assert(obj_request_type_valid(obj_request->type));
2676 img_request = obj_request->img_request;
2677 rbd_assert(img_request != NULL);
2678 rbd_dev = img_request->rbd_dev;
2679 rbd_assert(rbd_dev->parent != NULL);
2682 * Determine the byte range covered by the object in the
2683 * child image to which the original request was to be sent.
2685 img_offset = obj_request->img_offset - obj_request->offset;
2686 length = (u64)1 << rbd_dev->header.obj_order;
2689 * There is no defined parent data beyond the parent
2690 * overlap, so limit what we read at that boundary if
2693 if (img_offset + length > rbd_dev->parent_overlap) {
2694 rbd_assert(img_offset < rbd_dev->parent_overlap);
2695 length = rbd_dev->parent_overlap - img_offset;
2699 * Allocate a page array big enough to receive the data read
2702 page_count = (u32)calc_pages_for(0, length);
2703 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2704 if (IS_ERR(pages)) {
2705 result = PTR_ERR(pages);
2711 parent_request = rbd_parent_request_create(obj_request,
2712 img_offset, length);
2713 if (!parent_request)
2716 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2719 parent_request->copyup_pages = pages;
2720 parent_request->copyup_page_count = page_count;
2722 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2723 result = rbd_img_request_submit(parent_request);
2727 parent_request->copyup_pages = NULL;
2728 parent_request->copyup_page_count = 0;
2729 parent_request->obj_request = NULL;
2730 rbd_obj_request_put(obj_request);
2733 ceph_release_page_vector(pages, page_count);
2735 rbd_img_request_put(parent_request);
2736 obj_request->result = result;
2737 obj_request->xferred = 0;
2738 obj_request_done_set(obj_request);
2743 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2745 struct rbd_obj_request *orig_request;
2746 struct rbd_device *rbd_dev;
2749 rbd_assert(!obj_request_img_data_test(obj_request));
2752 * All we need from the object request is the original
2753 * request and the result of the STAT op. Grab those, then
2754 * we're done with the request.
2756 orig_request = obj_request->obj_request;
2757 obj_request->obj_request = NULL;
2758 rbd_obj_request_put(orig_request);
2759 rbd_assert(orig_request);
2760 rbd_assert(orig_request->img_request);
2762 result = obj_request->result;
2763 obj_request->result = 0;
2765 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2766 obj_request, orig_request, result,
2767 obj_request->xferred, obj_request->length);
2768 rbd_obj_request_put(obj_request);
2771 * If the overlap has become 0 (most likely because the
2772 * image has been flattened) we need to free the pages
2773 * and re-submit the original write request.
2775 rbd_dev = orig_request->img_request->rbd_dev;
2776 if (!rbd_dev->parent_overlap) {
2777 struct ceph_osd_client *osdc;
2779 osdc = &rbd_dev->rbd_client->client->osdc;
2780 result = rbd_obj_request_submit(osdc, orig_request);
2786 * Our only purpose here is to determine whether the object
2787 * exists, and we don't want to treat the non-existence as
2788 * an error. If something else comes back, transfer the
2789 * error to the original request and complete it now.
2792 obj_request_existence_set(orig_request, true);
2793 } else if (result == -ENOENT) {
2794 obj_request_existence_set(orig_request, false);
2795 } else if (result) {
2796 orig_request->result = result;
2801 * Resubmit the original request now that we have recorded
2802 * whether the target object exists.
2804 orig_request->result = rbd_img_obj_request_submit(orig_request);
2806 if (orig_request->result)
2807 rbd_obj_request_complete(orig_request);
2810 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2812 struct rbd_obj_request *stat_request;
2813 struct rbd_device *rbd_dev;
2814 struct ceph_osd_client *osdc;
2815 struct page **pages = NULL;
2821 * The response data for a STAT call consists of:
2828 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829 page_count = (u32)calc_pages_for(0, size);
2830 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2832 return PTR_ERR(pages);
2835 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2840 rbd_obj_request_get(obj_request);
2841 stat_request->obj_request = obj_request;
2842 stat_request->pages = pages;
2843 stat_request->page_count = page_count;
2845 rbd_assert(obj_request->img_request);
2846 rbd_dev = obj_request->img_request->rbd_dev;
2847 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2849 if (!stat_request->osd_req)
2851 stat_request->callback = rbd_img_obj_exists_callback;
2853 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2854 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2856 rbd_osd_req_format_read(stat_request);
2858 osdc = &rbd_dev->rbd_client->client->osdc;
2859 ret = rbd_obj_request_submit(osdc, stat_request);
2862 rbd_obj_request_put(obj_request);
2867 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2869 struct rbd_img_request *img_request;
2870 struct rbd_device *rbd_dev;
2872 rbd_assert(obj_request_img_data_test(obj_request));
2874 img_request = obj_request->img_request;
2875 rbd_assert(img_request);
2876 rbd_dev = img_request->rbd_dev;
2879 if (!img_request_write_test(img_request) &&
2880 !img_request_discard_test(img_request))
2883 /* Non-layered writes */
2884 if (!img_request_layered_test(img_request))
2888 * Layered writes outside of the parent overlap range don't
2889 * share any data with the parent.
2891 if (!obj_request_overlaps_parent(obj_request))
2895 * Entire-object layered writes - we will overwrite whatever
2896 * parent data there is anyway.
2898 if (!obj_request->offset &&
2899 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2903 * If the object is known to already exist, its parent data has
2904 * already been copied.
2906 if (obj_request_known_test(obj_request) &&
2907 obj_request_exists_test(obj_request))
2913 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2915 if (img_obj_request_simple(obj_request)) {
2916 struct rbd_device *rbd_dev;
2917 struct ceph_osd_client *osdc;
2919 rbd_dev = obj_request->img_request->rbd_dev;
2920 osdc = &rbd_dev->rbd_client->client->osdc;
2922 return rbd_obj_request_submit(osdc, obj_request);
2926 * It's a layered write. The target object might exist but
2927 * we may not know that yet. If we know it doesn't exist,
2928 * start by reading the data for the full target object from
2929 * the parent so we can use it for a copyup to the target.
2931 if (obj_request_known_test(obj_request))
2932 return rbd_img_obj_parent_read_full(obj_request);
2934 /* We don't know whether the target exists. Go find out. */
2936 return rbd_img_obj_exists_submit(obj_request);
2939 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2941 struct rbd_obj_request *obj_request;
2942 struct rbd_obj_request *next_obj_request;
2944 dout("%s: img %p\n", __func__, img_request);
2945 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2948 ret = rbd_img_obj_request_submit(obj_request);
2956 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2958 struct rbd_obj_request *obj_request;
2959 struct rbd_device *rbd_dev;
2964 rbd_assert(img_request_child_test(img_request));
2966 /* First get what we need from the image request and release it */
2968 obj_request = img_request->obj_request;
2969 img_xferred = img_request->xferred;
2970 img_result = img_request->result;
2971 rbd_img_request_put(img_request);
2974 * If the overlap has become 0 (most likely because the
2975 * image has been flattened) we need to re-submit the
2978 rbd_assert(obj_request);
2979 rbd_assert(obj_request->img_request);
2980 rbd_dev = obj_request->img_request->rbd_dev;
2981 if (!rbd_dev->parent_overlap) {
2982 struct ceph_osd_client *osdc;
2984 osdc = &rbd_dev->rbd_client->client->osdc;
2985 img_result = rbd_obj_request_submit(osdc, obj_request);
2990 obj_request->result = img_result;
2991 if (obj_request->result)
2995 * We need to zero anything beyond the parent overlap
2996 * boundary. Since rbd_img_obj_request_read_callback()
2997 * will zero anything beyond the end of a short read, an
2998 * easy way to do this is to pretend the data from the
2999 * parent came up short--ending at the overlap boundary.
3001 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3002 obj_end = obj_request->img_offset + obj_request->length;
3003 if (obj_end > rbd_dev->parent_overlap) {
3006 if (obj_request->img_offset < rbd_dev->parent_overlap)
3007 xferred = rbd_dev->parent_overlap -
3008 obj_request->img_offset;
3010 obj_request->xferred = min(img_xferred, xferred);
3012 obj_request->xferred = img_xferred;
3015 rbd_img_obj_request_read_callback(obj_request);
3016 rbd_obj_request_complete(obj_request);
3019 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3021 struct rbd_img_request *img_request;
3024 rbd_assert(obj_request_img_data_test(obj_request));
3025 rbd_assert(obj_request->img_request != NULL);
3026 rbd_assert(obj_request->result == (s32) -ENOENT);
3027 rbd_assert(obj_request_type_valid(obj_request->type));
3029 /* rbd_read_finish(obj_request, obj_request->length); */
3030 img_request = rbd_parent_request_create(obj_request,
3031 obj_request->img_offset,
3032 obj_request->length);
3037 if (obj_request->type == OBJ_REQUEST_BIO)
3038 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3039 obj_request->bio_list);
3041 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3042 obj_request->pages);
3046 img_request->callback = rbd_img_parent_read_callback;
3047 result = rbd_img_request_submit(img_request);
3054 rbd_img_request_put(img_request);
3055 obj_request->result = result;
3056 obj_request->xferred = 0;
3057 obj_request_done_set(obj_request);
3060 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
3062 struct rbd_obj_request *obj_request;
3063 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3066 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3067 OBJ_REQUEST_NODATA);
3072 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3074 if (!obj_request->osd_req)
3077 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
3079 rbd_osd_req_format_read(obj_request);
3081 ret = rbd_obj_request_submit(osdc, obj_request);
3084 ret = rbd_obj_request_wait(obj_request);
3086 rbd_obj_request_put(obj_request);
3091 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
3093 struct rbd_device *rbd_dev = (struct rbd_device *)data;
3099 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3100 rbd_dev->header_name, (unsigned long long)notify_id,
3101 (unsigned int)opcode);
3104 * Until adequate refresh error handling is in place, there is
3105 * not much we can do here, except warn.
3107 * See http://tracker.ceph.com/issues/5040
3109 ret = rbd_dev_refresh(rbd_dev);
3111 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3113 ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3115 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3119 * Send a (un)watch request and wait for the ack. Return a request
3120 * with a ref held on success or error.
3122 static struct rbd_obj_request *rbd_obj_watch_request_helper(
3123 struct rbd_device *rbd_dev,
3126 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3127 struct rbd_obj_request *obj_request;
3130 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3131 OBJ_REQUEST_NODATA);
3133 return ERR_PTR(-ENOMEM);
3135 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3137 if (!obj_request->osd_req) {
3142 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3143 rbd_dev->watch_event->cookie, 0, watch);
3144 rbd_osd_req_format_write(obj_request);
3147 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3149 ret = rbd_obj_request_submit(osdc, obj_request);
3153 ret = rbd_obj_request_wait(obj_request);
3157 ret = obj_request->result;
3160 rbd_obj_request_end(obj_request);
3167 rbd_obj_request_put(obj_request);
3168 return ERR_PTR(ret);
3172 * Initiate a watch request, synchronously.
3174 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3176 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3177 struct rbd_obj_request *obj_request;
3180 rbd_assert(!rbd_dev->watch_event);
3181 rbd_assert(!rbd_dev->watch_request);
3183 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3184 &rbd_dev->watch_event);
3188 obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3189 if (IS_ERR(obj_request)) {
3190 ceph_osdc_cancel_event(rbd_dev->watch_event);
3191 rbd_dev->watch_event = NULL;
3192 return PTR_ERR(obj_request);
3196 * A watch request is set to linger, so the underlying osd
3197 * request won't go away until we unregister it. We retain
3198 * a pointer to the object request during that time (in
3199 * rbd_dev->watch_request), so we'll keep a reference to it.
3200 * We'll drop that reference after we've unregistered it in
3201 * rbd_dev_header_unwatch_sync().
3203 rbd_dev->watch_request = obj_request;
3209 * Tear down a watch request, synchronously.
3211 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3213 struct rbd_obj_request *obj_request;
3215 rbd_assert(rbd_dev->watch_event);
3216 rbd_assert(rbd_dev->watch_request);
3218 rbd_obj_request_end(rbd_dev->watch_request);
3219 rbd_obj_request_put(rbd_dev->watch_request);
3220 rbd_dev->watch_request = NULL;
3222 obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3223 if (!IS_ERR(obj_request))
3224 rbd_obj_request_put(obj_request);
3226 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3227 PTR_ERR(obj_request));
3229 ceph_osdc_cancel_event(rbd_dev->watch_event);
3230 rbd_dev->watch_event = NULL;
3234 * Synchronous osd object method call. Returns the number of bytes
3235 * returned in the outbound buffer, or a negative error code.
3237 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3238 const char *object_name,
3239 const char *class_name,
3240 const char *method_name,
3241 const void *outbound,
3242 size_t outbound_size,
3244 size_t inbound_size)
3246 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3247 struct rbd_obj_request *obj_request;
3248 struct page **pages;
3253 * Method calls are ultimately read operations. The result
3254 * should placed into the inbound buffer provided. They
3255 * also supply outbound data--parameters for the object
3256 * method. Currently if this is present it will be a
3259 page_count = (u32)calc_pages_for(0, inbound_size);
3260 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3262 return PTR_ERR(pages);
3265 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3270 obj_request->pages = pages;
3271 obj_request->page_count = page_count;
3273 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3275 if (!obj_request->osd_req)
3278 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3279 class_name, method_name);
3280 if (outbound_size) {
3281 struct ceph_pagelist *pagelist;
3283 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3287 ceph_pagelist_init(pagelist);
3288 ceph_pagelist_append(pagelist, outbound, outbound_size);
3289 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3292 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3293 obj_request->pages, inbound_size,
3295 rbd_osd_req_format_read(obj_request);
3297 ret = rbd_obj_request_submit(osdc, obj_request);
3300 ret = rbd_obj_request_wait(obj_request);
3304 ret = obj_request->result;
3308 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3309 ret = (int)obj_request->xferred;
3310 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3313 rbd_obj_request_put(obj_request);
3315 ceph_release_page_vector(pages, page_count);
3320 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3322 struct rbd_img_request *img_request;
3323 struct ceph_snap_context *snapc = NULL;
3324 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3325 u64 length = blk_rq_bytes(rq);
3326 enum obj_operation_type op_type;
3330 if (rq->cmd_flags & REQ_DISCARD)
3331 op_type = OBJ_OP_DISCARD;
3332 else if (rq->cmd_flags & REQ_WRITE)
3333 op_type = OBJ_OP_WRITE;
3335 op_type = OBJ_OP_READ;
3337 /* Ignore/skip any zero-length requests */
3340 dout("%s: zero-length request\n", __func__);
3345 /* Only reads are allowed to a read-only device */
3347 if (op_type != OBJ_OP_READ) {
3348 if (rbd_dev->mapping.read_only) {
3352 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3356 * Quit early if the mapped snapshot no longer exists. It's
3357 * still possible the snapshot will have disappeared by the
3358 * time our request arrives at the osd, but there's no sense in
3359 * sending it if we already know.
3361 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3362 dout("request for non-existent snapshot");
3363 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3368 if (offset && length > U64_MAX - offset + 1) {
3369 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3372 goto err_rq; /* Shouldn't happen */
3375 down_read(&rbd_dev->header_rwsem);
3376 mapping_size = rbd_dev->mapping.size;
3377 if (op_type != OBJ_OP_READ) {
3378 snapc = rbd_dev->header.snapc;
3379 ceph_get_snap_context(snapc);
3381 up_read(&rbd_dev->header_rwsem);
3383 if (offset + length > mapping_size) {
3384 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3385 length, mapping_size);
3390 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3396 img_request->rq = rq;
3397 snapc = NULL; /* img_request consumes a ref */
3399 if (op_type == OBJ_OP_DISCARD)
3400 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3403 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3406 goto err_img_request;
3408 result = rbd_img_request_submit(img_request);
3410 goto err_img_request;
3415 rbd_img_request_put(img_request);
3418 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3419 obj_op_name(op_type), length, offset, result);
3421 ceph_put_snap_context(snapc);
3422 blk_end_request_all(rq, result);
3425 static void rbd_request_workfn(struct work_struct *work)
3427 struct rbd_device *rbd_dev =
3428 container_of(work, struct rbd_device, rq_work);
3429 struct request *rq, *next;
3430 LIST_HEAD(requests);
3432 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3433 list_splice_init(&rbd_dev->rq_queue, &requests);
3434 spin_unlock_irq(&rbd_dev->lock);
3436 list_for_each_entry_safe(rq, next, &requests, queuelist) {
3437 list_del_init(&rq->queuelist);
3438 rbd_handle_request(rbd_dev, rq);
3443 * Called with q->queue_lock held and interrupts disabled, possibly on
3444 * the way to schedule(). Do not sleep here!
3446 static void rbd_request_fn(struct request_queue *q)
3448 struct rbd_device *rbd_dev = q->queuedata;
3452 rbd_assert(rbd_dev);
3454 while ((rq = blk_fetch_request(q))) {
3455 /* Ignore any non-FS requests that filter through. */
3456 if (rq->cmd_type != REQ_TYPE_FS) {
3457 dout("%s: non-fs request type %d\n", __func__,
3458 (int) rq->cmd_type);
3459 __blk_end_request_all(rq, 0);
3463 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3468 queue_work(rbd_wq, &rbd_dev->rq_work);
3472 * a queue callback. Makes sure that we don't create a bio that spans across
3473 * multiple osd objects. One exception would be with a single page bios,
3474 * which we handle later at bio_chain_clone_range()
3476 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3477 struct bio_vec *bvec)
3479 struct rbd_device *rbd_dev = q->queuedata;
3480 sector_t sector_offset;
3481 sector_t sectors_per_obj;
3482 sector_t obj_sector_offset;
3486 * Find how far into its rbd object the partition-relative
3487 * bio start sector is to offset relative to the enclosing
3490 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3491 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3492 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3495 * Compute the number of bytes from that offset to the end
3496 * of the object. Account for what's already used by the bio.
3498 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3499 if (ret > bmd->bi_size)
3500 ret -= bmd->bi_size;
3505 * Don't send back more than was asked for. And if the bio
3506 * was empty, let the whole thing through because: "Note
3507 * that a block device *must* allow a single page to be
3508 * added to an empty bio."
3510 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3511 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3512 ret = (int) bvec->bv_len;
3517 static void rbd_free_disk(struct rbd_device *rbd_dev)
3519 struct gendisk *disk = rbd_dev->disk;
3524 rbd_dev->disk = NULL;
3525 if (disk->flags & GENHD_FL_UP) {
3528 blk_cleanup_queue(disk->queue);
3533 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3534 const char *object_name,
3535 u64 offset, u64 length, void *buf)
3538 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3539 struct rbd_obj_request *obj_request;
3540 struct page **pages = NULL;
3545 page_count = (u32) calc_pages_for(offset, length);
3546 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3548 return PTR_ERR(pages);
3551 obj_request = rbd_obj_request_create(object_name, offset, length,
3556 obj_request->pages = pages;
3557 obj_request->page_count = page_count;
3559 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3561 if (!obj_request->osd_req)
3564 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3565 offset, length, 0, 0);
3566 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3568 obj_request->length,
3569 obj_request->offset & ~PAGE_MASK,
3571 rbd_osd_req_format_read(obj_request);
3573 ret = rbd_obj_request_submit(osdc, obj_request);
3576 ret = rbd_obj_request_wait(obj_request);
3580 ret = obj_request->result;
3584 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3585 size = (size_t) obj_request->xferred;
3586 ceph_copy_from_page_vector(pages, buf, 0, size);
3587 rbd_assert(size <= (size_t)INT_MAX);
3591 rbd_obj_request_put(obj_request);
3593 ceph_release_page_vector(pages, page_count);
3599 * Read the complete header for the given rbd device. On successful
3600 * return, the rbd_dev->header field will contain up-to-date
3601 * information about the image.
3603 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3605 struct rbd_image_header_ondisk *ondisk = NULL;
3612 * The complete header will include an array of its 64-bit
3613 * snapshot ids, followed by the names of those snapshots as
3614 * a contiguous block of NUL-terminated strings. Note that
3615 * the number of snapshots could change by the time we read
3616 * it in, in which case we re-read it.
3623 size = sizeof (*ondisk);
3624 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3626 ondisk = kmalloc(size, GFP_KERNEL);
3630 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3634 if ((size_t)ret < size) {
3636 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3640 if (!rbd_dev_ondisk_valid(ondisk)) {
3642 rbd_warn(rbd_dev, "invalid header");
3646 names_size = le64_to_cpu(ondisk->snap_names_len);
3647 want_count = snap_count;
3648 snap_count = le32_to_cpu(ondisk->snap_count);
3649 } while (snap_count != want_count);
3651 ret = rbd_header_from_disk(rbd_dev, ondisk);
3659 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3660 * has disappeared from the (just updated) snapshot context.
3662 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3666 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3669 snap_id = rbd_dev->spec->snap_id;
3670 if (snap_id == CEPH_NOSNAP)
3673 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3674 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3677 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3683 * Don't hold the lock while doing disk operations,
3684 * or lock ordering will conflict with the bdev mutex via:
3685 * rbd_add() -> blkdev_get() -> rbd_open()
3687 spin_lock_irq(&rbd_dev->lock);
3688 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3689 spin_unlock_irq(&rbd_dev->lock);
3691 * If the device is being removed, rbd_dev->disk has
3692 * been destroyed, so don't try to update its size
3695 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3696 dout("setting size to %llu sectors", (unsigned long long)size);
3697 set_capacity(rbd_dev->disk, size);
3698 revalidate_disk(rbd_dev->disk);
3702 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3707 down_write(&rbd_dev->header_rwsem);
3708 mapping_size = rbd_dev->mapping.size;
3710 ret = rbd_dev_header_info(rbd_dev);
3715 * If there is a parent, see if it has disappeared due to the
3716 * mapped image getting flattened.
3718 if (rbd_dev->parent) {
3719 ret = rbd_dev_v2_parent_info(rbd_dev);
3724 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3725 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3726 rbd_dev->mapping.size = rbd_dev->header.image_size;
3728 /* validate mapped snapshot's EXISTS flag */
3729 rbd_exists_validate(rbd_dev);
3732 up_write(&rbd_dev->header_rwsem);
3734 if (mapping_size != rbd_dev->mapping.size)
3735 rbd_dev_update_size(rbd_dev);
3740 static int rbd_init_disk(struct rbd_device *rbd_dev)
3742 struct gendisk *disk;
3743 struct request_queue *q;
3746 /* create gendisk info */
3747 disk = alloc_disk(single_major ?
3748 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3749 RBD_MINORS_PER_MAJOR);
3753 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3755 disk->major = rbd_dev->major;
3756 disk->first_minor = rbd_dev->minor;
3758 disk->flags |= GENHD_FL_EXT_DEVT;
3759 disk->fops = &rbd_bd_ops;
3760 disk->private_data = rbd_dev;
3762 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3766 /* We use the default size, but let's be explicit about it. */
3767 blk_queue_physical_block_size(q, SECTOR_SIZE);
3769 /* set io sizes to object size */
3770 segment_size = rbd_obj_bytes(&rbd_dev->header);
3771 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3772 blk_queue_max_segment_size(q, segment_size);
3773 blk_queue_io_min(q, segment_size);
3774 blk_queue_io_opt(q, segment_size);
3776 /* enable the discard support */
3777 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3778 q->limits.discard_granularity = segment_size;
3779 q->limits.discard_alignment = segment_size;
3780 q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
3781 q->limits.discard_zeroes_data = 1;
3783 blk_queue_merge_bvec(q, rbd_merge_bvec);
3784 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
3785 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
3789 q->queuedata = rbd_dev;
3791 rbd_dev->disk = disk;
3804 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3806 return container_of(dev, struct rbd_device, dev);
3809 static ssize_t rbd_size_show(struct device *dev,
3810 struct device_attribute *attr, char *buf)
3812 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3814 return sprintf(buf, "%llu\n",
3815 (unsigned long long)rbd_dev->mapping.size);
3819 * Note this shows the features for whatever's mapped, which is not
3820 * necessarily the base image.
3822 static ssize_t rbd_features_show(struct device *dev,
3823 struct device_attribute *attr, char *buf)
3825 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3827 return sprintf(buf, "0x%016llx\n",
3828 (unsigned long long)rbd_dev->mapping.features);
3831 static ssize_t rbd_major_show(struct device *dev,
3832 struct device_attribute *attr, char *buf)
3834 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3837 return sprintf(buf, "%d\n", rbd_dev->major);
3839 return sprintf(buf, "(none)\n");
3842 static ssize_t rbd_minor_show(struct device *dev,
3843 struct device_attribute *attr, char *buf)
3845 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3847 return sprintf(buf, "%d\n", rbd_dev->minor);
3850 static ssize_t rbd_client_id_show(struct device *dev,
3851 struct device_attribute *attr, char *buf)
3853 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3855 return sprintf(buf, "client%lld\n",
3856 ceph_client_id(rbd_dev->rbd_client->client));
3859 static ssize_t rbd_pool_show(struct device *dev,
3860 struct device_attribute *attr, char *buf)
3862 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3864 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3867 static ssize_t rbd_pool_id_show(struct device *dev,
3868 struct device_attribute *attr, char *buf)
3870 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3872 return sprintf(buf, "%llu\n",
3873 (unsigned long long) rbd_dev->spec->pool_id);
3876 static ssize_t rbd_name_show(struct device *dev,
3877 struct device_attribute *attr, char *buf)
3879 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3881 if (rbd_dev->spec->image_name)
3882 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3884 return sprintf(buf, "(unknown)\n");
3887 static ssize_t rbd_image_id_show(struct device *dev,
3888 struct device_attribute *attr, char *buf)
3890 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3892 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3896 * Shows the name of the currently-mapped snapshot (or
3897 * RBD_SNAP_HEAD_NAME for the base image).
3899 static ssize_t rbd_snap_show(struct device *dev,
3900 struct device_attribute *attr,
3903 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3905 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3909 * For a v2 image, shows the chain of parent images, separated by empty
3910 * lines. For v1 images or if there is no parent, shows "(no parent
3913 static ssize_t rbd_parent_show(struct device *dev,
3914 struct device_attribute *attr,
3917 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3920 if (!rbd_dev->parent)
3921 return sprintf(buf, "(no parent image)\n");
3923 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3924 struct rbd_spec *spec = rbd_dev->parent_spec;
3926 count += sprintf(&buf[count], "%s"
3927 "pool_id %llu\npool_name %s\n"
3928 "image_id %s\nimage_name %s\n"
3929 "snap_id %llu\nsnap_name %s\n"
3931 !count ? "" : "\n", /* first? */
3932 spec->pool_id, spec->pool_name,
3933 spec->image_id, spec->image_name ?: "(unknown)",
3934 spec->snap_id, spec->snap_name,
3935 rbd_dev->parent_overlap);
3941 static ssize_t rbd_image_refresh(struct device *dev,
3942 struct device_attribute *attr,
3946 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3949 ret = rbd_dev_refresh(rbd_dev);
3956 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3957 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3958 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3959 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3960 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3961 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3962 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3963 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3964 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3965 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3966 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3967 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3969 static struct attribute *rbd_attrs[] = {
3970 &dev_attr_size.attr,
3971 &dev_attr_features.attr,
3972 &dev_attr_major.attr,
3973 &dev_attr_minor.attr,
3974 &dev_attr_client_id.attr,
3975 &dev_attr_pool.attr,
3976 &dev_attr_pool_id.attr,
3977 &dev_attr_name.attr,
3978 &dev_attr_image_id.attr,
3979 &dev_attr_current_snap.attr,
3980 &dev_attr_parent.attr,
3981 &dev_attr_refresh.attr,
3985 static struct attribute_group rbd_attr_group = {
3989 static const struct attribute_group *rbd_attr_groups[] = {
3994 static void rbd_sysfs_dev_release(struct device *dev)
3998 static struct device_type rbd_device_type = {
4000 .groups = rbd_attr_groups,
4001 .release = rbd_sysfs_dev_release,
4004 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4006 kref_get(&spec->kref);
4011 static void rbd_spec_free(struct kref *kref);
4012 static void rbd_spec_put(struct rbd_spec *spec)
4015 kref_put(&spec->kref, rbd_spec_free);
4018 static struct rbd_spec *rbd_spec_alloc(void)
4020 struct rbd_spec *spec;
4022 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4026 spec->pool_id = CEPH_NOPOOL;
4027 spec->snap_id = CEPH_NOSNAP;
4028 kref_init(&spec->kref);
4033 static void rbd_spec_free(struct kref *kref)
4035 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4037 kfree(spec->pool_name);
4038 kfree(spec->image_id);
4039 kfree(spec->image_name);
4040 kfree(spec->snap_name);
4044 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4045 struct rbd_spec *spec)
4047 struct rbd_device *rbd_dev;
4049 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
4053 spin_lock_init(&rbd_dev->lock);
4054 INIT_LIST_HEAD(&rbd_dev->rq_queue);
4055 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4057 atomic_set(&rbd_dev->parent_ref, 0);
4058 INIT_LIST_HEAD(&rbd_dev->node);
4059 init_rwsem(&rbd_dev->header_rwsem);
4061 rbd_dev->spec = spec;
4062 rbd_dev->rbd_client = rbdc;
4064 /* Initialize the layout used for all rbd requests */
4066 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4067 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
4068 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4069 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
4074 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4076 rbd_put_client(rbd_dev->rbd_client);
4077 rbd_spec_put(rbd_dev->spec);
4082 * Get the size and object order for an image snapshot, or if
4083 * snap_id is CEPH_NOSNAP, gets this information for the base
4086 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4087 u8 *order, u64 *snap_size)
4089 __le64 snapid = cpu_to_le64(snap_id);
4094 } __attribute__ ((packed)) size_buf = { 0 };
4096 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4098 &snapid, sizeof (snapid),
4099 &size_buf, sizeof (size_buf));
4100 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4103 if (ret < sizeof (size_buf))
4107 *order = size_buf.order;
4108 dout(" order %u", (unsigned int)*order);
4110 *snap_size = le64_to_cpu(size_buf.size);
4112 dout(" snap_id 0x%016llx snap_size = %llu\n",
4113 (unsigned long long)snap_id,
4114 (unsigned long long)*snap_size);
4119 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4121 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4122 &rbd_dev->header.obj_order,
4123 &rbd_dev->header.image_size);
4126 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4132 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4136 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4137 "rbd", "get_object_prefix", NULL, 0,
4138 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4139 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4144 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4145 p + ret, NULL, GFP_NOIO);
4148 if (IS_ERR(rbd_dev->header.object_prefix)) {
4149 ret = PTR_ERR(rbd_dev->header.object_prefix);
4150 rbd_dev->header.object_prefix = NULL;
4152 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4160 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4163 __le64 snapid = cpu_to_le64(snap_id);
4167 } __attribute__ ((packed)) features_buf = { 0 };
4171 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4172 "rbd", "get_features",
4173 &snapid, sizeof (snapid),
4174 &features_buf, sizeof (features_buf));
4175 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4178 if (ret < sizeof (features_buf))
4181 incompat = le64_to_cpu(features_buf.incompat);
4182 if (incompat & ~RBD_FEATURES_SUPPORTED)
4185 *snap_features = le64_to_cpu(features_buf.features);
4187 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4188 (unsigned long long)snap_id,
4189 (unsigned long long)*snap_features,
4190 (unsigned long long)le64_to_cpu(features_buf.incompat));
4195 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4197 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4198 &rbd_dev->header.features);
4201 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4203 struct rbd_spec *parent_spec;
4205 void *reply_buf = NULL;
4215 parent_spec = rbd_spec_alloc();
4219 size = sizeof (__le64) + /* pool_id */
4220 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4221 sizeof (__le64) + /* snap_id */
4222 sizeof (__le64); /* overlap */
4223 reply_buf = kmalloc(size, GFP_KERNEL);
4229 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4230 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4231 "rbd", "get_parent",
4232 &snapid, sizeof (snapid),
4234 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4239 end = reply_buf + ret;
4241 ceph_decode_64_safe(&p, end, pool_id, out_err);
4242 if (pool_id == CEPH_NOPOOL) {
4244 * Either the parent never existed, or we have
4245 * record of it but the image got flattened so it no
4246 * longer has a parent. When the parent of a
4247 * layered image disappears we immediately set the
4248 * overlap to 0. The effect of this is that all new
4249 * requests will be treated as if the image had no
4252 if (rbd_dev->parent_overlap) {
4253 rbd_dev->parent_overlap = 0;
4254 rbd_dev_parent_put(rbd_dev);
4255 pr_info("%s: clone image has been flattened\n",
4256 rbd_dev->disk->disk_name);
4259 goto out; /* No parent? No problem. */
4262 /* The ceph file layout needs to fit pool id in 32 bits */
4265 if (pool_id > (u64)U32_MAX) {
4266 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4267 (unsigned long long)pool_id, U32_MAX);
4271 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4272 if (IS_ERR(image_id)) {
4273 ret = PTR_ERR(image_id);
4276 ceph_decode_64_safe(&p, end, snap_id, out_err);
4277 ceph_decode_64_safe(&p, end, overlap, out_err);
4280 * The parent won't change (except when the clone is
4281 * flattened, already handled that). So we only need to
4282 * record the parent spec we have not already done so.
4284 if (!rbd_dev->parent_spec) {
4285 parent_spec->pool_id = pool_id;
4286 parent_spec->image_id = image_id;
4287 parent_spec->snap_id = snap_id;
4288 rbd_dev->parent_spec = parent_spec;
4289 parent_spec = NULL; /* rbd_dev now owns this */
4295 * We always update the parent overlap. If it's zero we
4296 * treat it specially.
4298 rbd_dev->parent_overlap = overlap;
4301 /* A null parent_spec indicates it's the initial probe */
4305 * The overlap has become zero, so the clone
4306 * must have been resized down to 0 at some
4307 * point. Treat this the same as a flatten.
4309 rbd_dev_parent_put(rbd_dev);
4310 pr_info("%s: clone image now standalone\n",
4311 rbd_dev->disk->disk_name);
4314 * For the initial probe, if we find the
4315 * overlap is zero we just pretend there was
4318 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4325 rbd_spec_put(parent_spec);
4330 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4334 __le64 stripe_count;
4335 } __attribute__ ((packed)) striping_info_buf = { 0 };
4336 size_t size = sizeof (striping_info_buf);
4343 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4344 "rbd", "get_stripe_unit_count", NULL, 0,
4345 (char *)&striping_info_buf, size);
4346 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4353 * We don't actually support the "fancy striping" feature
4354 * (STRIPINGV2) yet, but if the striping sizes are the
4355 * defaults the behavior is the same as before. So find
4356 * out, and only fail if the image has non-default values.
4359 obj_size = (u64)1 << rbd_dev->header.obj_order;
4360 p = &striping_info_buf;
4361 stripe_unit = ceph_decode_64(&p);
4362 if (stripe_unit != obj_size) {
4363 rbd_warn(rbd_dev, "unsupported stripe unit "
4364 "(got %llu want %llu)",
4365 stripe_unit, obj_size);
4368 stripe_count = ceph_decode_64(&p);
4369 if (stripe_count != 1) {
4370 rbd_warn(rbd_dev, "unsupported stripe count "
4371 "(got %llu want 1)", stripe_count);
4374 rbd_dev->header.stripe_unit = stripe_unit;
4375 rbd_dev->header.stripe_count = stripe_count;
4380 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4382 size_t image_id_size;
4387 void *reply_buf = NULL;
4389 char *image_name = NULL;
4392 rbd_assert(!rbd_dev->spec->image_name);
4394 len = strlen(rbd_dev->spec->image_id);
4395 image_id_size = sizeof (__le32) + len;
4396 image_id = kmalloc(image_id_size, GFP_KERNEL);
4401 end = image_id + image_id_size;
4402 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4404 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4405 reply_buf = kmalloc(size, GFP_KERNEL);
4409 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4410 "rbd", "dir_get_name",
4411 image_id, image_id_size,
4416 end = reply_buf + ret;
4418 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4419 if (IS_ERR(image_name))
4422 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4430 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4432 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4433 const char *snap_name;
4436 /* Skip over names until we find the one we are looking for */
4438 snap_name = rbd_dev->header.snap_names;
4439 while (which < snapc->num_snaps) {
4440 if (!strcmp(name, snap_name))
4441 return snapc->snaps[which];
4442 snap_name += strlen(snap_name) + 1;
4448 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4450 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4455 for (which = 0; !found && which < snapc->num_snaps; which++) {
4456 const char *snap_name;
4458 snap_id = snapc->snaps[which];
4459 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4460 if (IS_ERR(snap_name)) {
4461 /* ignore no-longer existing snapshots */
4462 if (PTR_ERR(snap_name) == -ENOENT)
4467 found = !strcmp(name, snap_name);
4470 return found ? snap_id : CEPH_NOSNAP;
4474 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4475 * no snapshot by that name is found, or if an error occurs.
4477 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4479 if (rbd_dev->image_format == 1)
4480 return rbd_v1_snap_id_by_name(rbd_dev, name);
4482 return rbd_v2_snap_id_by_name(rbd_dev, name);
4486 * An image being mapped will have everything but the snap id.
4488 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4490 struct rbd_spec *spec = rbd_dev->spec;
4492 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4493 rbd_assert(spec->image_id && spec->image_name);
4494 rbd_assert(spec->snap_name);
4496 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4499 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4500 if (snap_id == CEPH_NOSNAP)
4503 spec->snap_id = snap_id;
4505 spec->snap_id = CEPH_NOSNAP;
4512 * A parent image will have all ids but none of the names.
4514 * All names in an rbd spec are dynamically allocated. It's OK if we
4515 * can't figure out the name for an image id.
4517 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4519 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4520 struct rbd_spec *spec = rbd_dev->spec;
4521 const char *pool_name;
4522 const char *image_name;
4523 const char *snap_name;
4526 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4527 rbd_assert(spec->image_id);
4528 rbd_assert(spec->snap_id != CEPH_NOSNAP);
4530 /* Get the pool name; we have to make our own copy of this */
4532 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4534 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4537 pool_name = kstrdup(pool_name, GFP_KERNEL);
4541 /* Fetch the image name; tolerate failure here */
4543 image_name = rbd_dev_image_name(rbd_dev);
4545 rbd_warn(rbd_dev, "unable to get image name");
4547 /* Fetch the snapshot name */
4549 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4550 if (IS_ERR(snap_name)) {
4551 ret = PTR_ERR(snap_name);
4555 spec->pool_name = pool_name;
4556 spec->image_name = image_name;
4557 spec->snap_name = snap_name;
4567 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4576 struct ceph_snap_context *snapc;
4580 * We'll need room for the seq value (maximum snapshot id),
4581 * snapshot count, and array of that many snapshot ids.
4582 * For now we have a fixed upper limit on the number we're
4583 * prepared to receive.
4585 size = sizeof (__le64) + sizeof (__le32) +
4586 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4587 reply_buf = kzalloc(size, GFP_KERNEL);
4591 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4592 "rbd", "get_snapcontext", NULL, 0,
4594 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4599 end = reply_buf + ret;
4601 ceph_decode_64_safe(&p, end, seq, out);
4602 ceph_decode_32_safe(&p, end, snap_count, out);
4605 * Make sure the reported number of snapshot ids wouldn't go
4606 * beyond the end of our buffer. But before checking that,
4607 * make sure the computed size of the snapshot context we
4608 * allocate is representable in a size_t.
4610 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4615 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4619 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4625 for (i = 0; i < snap_count; i++)
4626 snapc->snaps[i] = ceph_decode_64(&p);
4628 ceph_put_snap_context(rbd_dev->header.snapc);
4629 rbd_dev->header.snapc = snapc;
4631 dout(" snap context seq = %llu, snap_count = %u\n",
4632 (unsigned long long)seq, (unsigned int)snap_count);
4639 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4650 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4651 reply_buf = kmalloc(size, GFP_KERNEL);
4653 return ERR_PTR(-ENOMEM);
4655 snapid = cpu_to_le64(snap_id);
4656 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4657 "rbd", "get_snapshot_name",
4658 &snapid, sizeof (snapid),
4660 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4662 snap_name = ERR_PTR(ret);
4667 end = reply_buf + ret;
4668 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4669 if (IS_ERR(snap_name))
4672 dout(" snap_id 0x%016llx snap_name = %s\n",
4673 (unsigned long long)snap_id, snap_name);
4680 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4682 bool first_time = rbd_dev->header.object_prefix == NULL;
4685 ret = rbd_dev_v2_image_size(rbd_dev);
4690 ret = rbd_dev_v2_header_onetime(rbd_dev);
4695 ret = rbd_dev_v2_snap_context(rbd_dev);
4696 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4701 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4703 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4705 if (rbd_dev->image_format == 1)
4706 return rbd_dev_v1_header_info(rbd_dev);
4708 return rbd_dev_v2_header_info(rbd_dev);
4711 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4716 dev = &rbd_dev->dev;
4717 dev->bus = &rbd_bus_type;
4718 dev->type = &rbd_device_type;
4719 dev->parent = &rbd_root_dev;
4720 dev->release = rbd_dev_device_release;
4721 dev_set_name(dev, "%d", rbd_dev->dev_id);
4722 ret = device_register(dev);
4727 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4729 device_unregister(&rbd_dev->dev);
4733 * Get a unique rbd identifier for the given new rbd_dev, and add
4734 * the rbd_dev to the global list.
4736 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4740 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4741 0, minor_to_rbd_dev_id(1 << MINORBITS),
4746 rbd_dev->dev_id = new_dev_id;
4748 spin_lock(&rbd_dev_list_lock);
4749 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4750 spin_unlock(&rbd_dev_list_lock);
4752 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4758 * Remove an rbd_dev from the global list, and record that its
4759 * identifier is no longer in use.
4761 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4763 spin_lock(&rbd_dev_list_lock);
4764 list_del_init(&rbd_dev->node);
4765 spin_unlock(&rbd_dev_list_lock);
4767 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4769 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4773 * Skips over white space at *buf, and updates *buf to point to the
4774 * first found non-space character (if any). Returns the length of
4775 * the token (string of non-white space characters) found. Note
4776 * that *buf must be terminated with '\0'.
4778 static inline size_t next_token(const char **buf)
4781 * These are the characters that produce nonzero for
4782 * isspace() in the "C" and "POSIX" locales.
4784 const char *spaces = " \f\n\r\t\v";
4786 *buf += strspn(*buf, spaces); /* Find start of token */
4788 return strcspn(*buf, spaces); /* Return token length */
4792 * Finds the next token in *buf, and if the provided token buffer is
4793 * big enough, copies the found token into it. The result, if
4794 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4795 * must be terminated with '\0' on entry.
4797 * Returns the length of the token found (not including the '\0').
4798 * Return value will be 0 if no token is found, and it will be >=
4799 * token_size if the token would not fit.
4801 * The *buf pointer will be updated to point beyond the end of the
4802 * found token. Note that this occurs even if the token buffer is
4803 * too small to hold it.
4805 static inline size_t copy_token(const char **buf,
4811 len = next_token(buf);
4812 if (len < token_size) {
4813 memcpy(token, *buf, len);
4814 *(token + len) = '\0';
4822 * Finds the next token in *buf, dynamically allocates a buffer big
4823 * enough to hold a copy of it, and copies the token into the new
4824 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4825 * that a duplicate buffer is created even for a zero-length token.
4827 * Returns a pointer to the newly-allocated duplicate, or a null
4828 * pointer if memory for the duplicate was not available. If
4829 * the lenp argument is a non-null pointer, the length of the token
4830 * (not including the '\0') is returned in *lenp.
4832 * If successful, the *buf pointer will be updated to point beyond
4833 * the end of the found token.
4835 * Note: uses GFP_KERNEL for allocation.
4837 static inline char *dup_token(const char **buf, size_t *lenp)
4842 len = next_token(buf);
4843 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4846 *(dup + len) = '\0';
4856 * Parse the options provided for an "rbd add" (i.e., rbd image
4857 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4858 * and the data written is passed here via a NUL-terminated buffer.
4859 * Returns 0 if successful or an error code otherwise.
4861 * The information extracted from these options is recorded in
4862 * the other parameters which return dynamically-allocated
4865 * The address of a pointer that will refer to a ceph options
4866 * structure. Caller must release the returned pointer using
4867 * ceph_destroy_options() when it is no longer needed.
4869 * Address of an rbd options pointer. Fully initialized by
4870 * this function; caller must release with kfree().
4872 * Address of an rbd image specification pointer. Fully
4873 * initialized by this function based on parsed options.
4874 * Caller must release with rbd_spec_put().
4876 * The options passed take this form:
4877 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4880 * A comma-separated list of one or more monitor addresses.
4881 * A monitor address is an ip address, optionally followed
4882 * by a port number (separated by a colon).
4883 * I.e.: ip1[:port1][,ip2[:port2]...]
4885 * A comma-separated list of ceph and/or rbd options.
4887 * The name of the rados pool containing the rbd image.
4889 * The name of the image in that pool to map.
4891 * An optional snapshot id. If provided, the mapping will
4892 * present data from the image at the time that snapshot was
4893 * created. The image head is used if no snapshot id is
4894 * provided. Snapshot mappings are always read-only.
4896 static int rbd_add_parse_args(const char *buf,
4897 struct ceph_options **ceph_opts,
4898 struct rbd_options **opts,
4899 struct rbd_spec **rbd_spec)
4903 const char *mon_addrs;
4905 size_t mon_addrs_size;
4906 struct rbd_spec *spec = NULL;
4907 struct rbd_options *rbd_opts = NULL;
4908 struct ceph_options *copts;
4911 /* The first four tokens are required */
4913 len = next_token(&buf);
4915 rbd_warn(NULL, "no monitor address(es) provided");
4919 mon_addrs_size = len + 1;
4923 options = dup_token(&buf, NULL);
4927 rbd_warn(NULL, "no options provided");
4931 spec = rbd_spec_alloc();
4935 spec->pool_name = dup_token(&buf, NULL);
4936 if (!spec->pool_name)
4938 if (!*spec->pool_name) {
4939 rbd_warn(NULL, "no pool name provided");
4943 spec->image_name = dup_token(&buf, NULL);
4944 if (!spec->image_name)
4946 if (!*spec->image_name) {
4947 rbd_warn(NULL, "no image name provided");
4952 * Snapshot name is optional; default is to use "-"
4953 * (indicating the head/no snapshot).
4955 len = next_token(&buf);
4957 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4958 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4959 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4960 ret = -ENAMETOOLONG;
4963 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4966 *(snap_name + len) = '\0';
4967 spec->snap_name = snap_name;
4969 /* Initialize all rbd options to the defaults */
4971 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4975 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4977 copts = ceph_parse_options(options, mon_addrs,
4978 mon_addrs + mon_addrs_size - 1,
4979 parse_rbd_opts_token, rbd_opts);
4980 if (IS_ERR(copts)) {
4981 ret = PTR_ERR(copts);
5002 * Return pool id (>= 0) or a negative error code.
5004 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5007 unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
5012 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5013 if (ret == -ENOENT && tries++ < 1) {
5014 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
5019 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5020 ceph_monc_request_next_osdmap(&rbdc->client->monc);
5021 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5022 newest_epoch, timeout);
5025 /* the osdmap we have is new enough */
5034 * An rbd format 2 image has a unique identifier, distinct from the
5035 * name given to it by the user. Internally, that identifier is
5036 * what's used to specify the names of objects related to the image.
5038 * A special "rbd id" object is used to map an rbd image name to its
5039 * id. If that object doesn't exist, then there is no v2 rbd image
5040 * with the supplied name.
5042 * This function will record the given rbd_dev's image_id field if
5043 * it can be determined, and in that case will return 0. If any
5044 * errors occur a negative errno will be returned and the rbd_dev's
5045 * image_id field will be unchanged (and should be NULL).
5047 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5056 * When probing a parent image, the image id is already
5057 * known (and the image name likely is not). There's no
5058 * need to fetch the image id again in this case. We
5059 * do still need to set the image format though.
5061 if (rbd_dev->spec->image_id) {
5062 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5068 * First, see if the format 2 image id file exists, and if
5069 * so, get the image's persistent id from it.
5071 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5072 object_name = kmalloc(size, GFP_NOIO);
5075 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5076 dout("rbd id object name is %s\n", object_name);
5078 /* Response will be an encoded string, which includes a length */
5080 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5081 response = kzalloc(size, GFP_NOIO);
5087 /* If it doesn't exist we'll assume it's a format 1 image */
5089 ret = rbd_obj_method_sync(rbd_dev, object_name,
5090 "rbd", "get_id", NULL, 0,
5091 response, RBD_IMAGE_ID_LEN_MAX);
5092 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5093 if (ret == -ENOENT) {
5094 image_id = kstrdup("", GFP_KERNEL);
5095 ret = image_id ? 0 : -ENOMEM;
5097 rbd_dev->image_format = 1;
5098 } else if (ret >= 0) {
5101 image_id = ceph_extract_encoded_string(&p, p + ret,
5103 ret = PTR_ERR_OR_ZERO(image_id);
5105 rbd_dev->image_format = 2;
5109 rbd_dev->spec->image_id = image_id;
5110 dout("image_id is %s\n", image_id);
5120 * Undo whatever state changes are made by v1 or v2 header info
5123 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5125 struct rbd_image_header *header;
5127 rbd_dev_parent_put(rbd_dev);
5129 /* Free dynamic fields from the header, then zero it out */
5131 header = &rbd_dev->header;
5132 ceph_put_snap_context(header->snapc);
5133 kfree(header->snap_sizes);
5134 kfree(header->snap_names);
5135 kfree(header->object_prefix);
5136 memset(header, 0, sizeof (*header));
5139 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5143 ret = rbd_dev_v2_object_prefix(rbd_dev);
5148 * Get the and check features for the image. Currently the
5149 * features are assumed to never change.
5151 ret = rbd_dev_v2_features(rbd_dev);
5155 /* If the image supports fancy striping, get its parameters */
5157 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5158 ret = rbd_dev_v2_striping_info(rbd_dev);
5162 /* No support for crypto and compression type format 2 images */
5166 rbd_dev->header.features = 0;
5167 kfree(rbd_dev->header.object_prefix);
5168 rbd_dev->header.object_prefix = NULL;
5173 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5175 struct rbd_device *parent = NULL;
5178 if (!rbd_dev->parent_spec)
5181 parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5188 * Images related by parent/child relationships always share
5189 * rbd_client and spec/parent_spec, so bump their refcounts.
5191 __rbd_get_client(rbd_dev->rbd_client);
5192 rbd_spec_get(rbd_dev->parent_spec);
5194 ret = rbd_dev_image_probe(parent, false);
5198 rbd_dev->parent = parent;
5199 atomic_set(&rbd_dev->parent_ref, 1);
5203 rbd_dev_unparent(rbd_dev);
5205 rbd_dev_destroy(parent);
5209 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5213 /* Get an id and fill in device name. */
5215 ret = rbd_dev_id_get(rbd_dev);
5219 BUILD_BUG_ON(DEV_NAME_LEN
5220 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5221 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5223 /* Record our major and minor device numbers. */
5225 if (!single_major) {
5226 ret = register_blkdev(0, rbd_dev->name);
5230 rbd_dev->major = ret;
5233 rbd_dev->major = rbd_major;
5234 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5237 /* Set up the blkdev mapping. */
5239 ret = rbd_init_disk(rbd_dev);
5241 goto err_out_blkdev;
5243 ret = rbd_dev_mapping_set(rbd_dev);
5247 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5248 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5250 ret = rbd_bus_add_dev(rbd_dev);
5252 goto err_out_mapping;
5254 /* Everything's ready. Announce the disk to the world. */
5256 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5257 add_disk(rbd_dev->disk);
5259 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5260 (unsigned long long) rbd_dev->mapping.size);
5265 rbd_dev_mapping_clear(rbd_dev);
5267 rbd_free_disk(rbd_dev);
5270 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5272 rbd_dev_id_put(rbd_dev);
5273 rbd_dev_mapping_clear(rbd_dev);
5278 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5280 struct rbd_spec *spec = rbd_dev->spec;
5283 /* Record the header object name for this rbd image. */
5285 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5287 if (rbd_dev->image_format == 1)
5288 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5290 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5292 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5293 if (!rbd_dev->header_name)
5296 if (rbd_dev->image_format == 1)
5297 sprintf(rbd_dev->header_name, "%s%s",
5298 spec->image_name, RBD_SUFFIX);
5300 sprintf(rbd_dev->header_name, "%s%s",
5301 RBD_HEADER_PREFIX, spec->image_id);
5305 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5307 rbd_dev_unprobe(rbd_dev);
5308 kfree(rbd_dev->header_name);
5309 rbd_dev->header_name = NULL;
5310 rbd_dev->image_format = 0;
5311 kfree(rbd_dev->spec->image_id);
5312 rbd_dev->spec->image_id = NULL;
5314 rbd_dev_destroy(rbd_dev);
5318 * Probe for the existence of the header object for the given rbd
5319 * device. If this image is the one being mapped (i.e., not a
5320 * parent), initiate a watch on its header object before using that
5321 * object to get detailed information about the rbd image.
5323 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5328 * Get the id from the image id object. Unless there's an
5329 * error, rbd_dev->spec->image_id will be filled in with
5330 * a dynamically-allocated string, and rbd_dev->image_format
5331 * will be set to either 1 or 2.
5333 ret = rbd_dev_image_id(rbd_dev);
5337 ret = rbd_dev_header_name(rbd_dev);
5339 goto err_out_format;
5342 ret = rbd_dev_header_watch_sync(rbd_dev);
5344 goto out_header_name;
5347 ret = rbd_dev_header_info(rbd_dev);
5352 * If this image is the one being mapped, we have pool name and
5353 * id, image name and id, and snap name - need to fill snap id.
5354 * Otherwise this is a parent image, identified by pool, image
5355 * and snap ids - need to fill in names for those ids.
5358 ret = rbd_spec_fill_snap_id(rbd_dev);
5360 ret = rbd_spec_fill_names(rbd_dev);
5364 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5365 ret = rbd_dev_v2_parent_info(rbd_dev);
5370 * Need to warn users if this image is the one being
5371 * mapped and has a parent.
5373 if (mapping && rbd_dev->parent_spec)
5375 "WARNING: kernel layering is EXPERIMENTAL!");
5378 ret = rbd_dev_probe_parent(rbd_dev);
5382 dout("discovered format %u image, header name is %s\n",
5383 rbd_dev->image_format, rbd_dev->header_name);
5387 rbd_dev_unprobe(rbd_dev);
5390 rbd_dev_header_unwatch_sync(rbd_dev);
5392 kfree(rbd_dev->header_name);
5393 rbd_dev->header_name = NULL;
5395 rbd_dev->image_format = 0;
5396 kfree(rbd_dev->spec->image_id);
5397 rbd_dev->spec->image_id = NULL;
5401 static ssize_t do_rbd_add(struct bus_type *bus,
5405 struct rbd_device *rbd_dev = NULL;
5406 struct ceph_options *ceph_opts = NULL;
5407 struct rbd_options *rbd_opts = NULL;
5408 struct rbd_spec *spec = NULL;
5409 struct rbd_client *rbdc;
5413 if (!try_module_get(THIS_MODULE))
5416 /* parse add command */
5417 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5419 goto err_out_module;
5420 read_only = rbd_opts->read_only;
5422 rbd_opts = NULL; /* done with this */
5424 rbdc = rbd_get_client(ceph_opts);
5431 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5433 goto err_out_client;
5434 spec->pool_id = (u64)rc;
5436 /* The ceph file layout needs to fit pool id in 32 bits */
5438 if (spec->pool_id > (u64)U32_MAX) {
5439 rbd_warn(NULL, "pool id too large (%llu > %u)",
5440 (unsigned long long)spec->pool_id, U32_MAX);
5442 goto err_out_client;
5445 rbd_dev = rbd_dev_create(rbdc, spec);
5447 goto err_out_client;
5448 rbdc = NULL; /* rbd_dev now owns this */
5449 spec = NULL; /* rbd_dev now owns this */
5451 rc = rbd_dev_image_probe(rbd_dev, true);
5453 goto err_out_rbd_dev;
5455 /* If we are mapping a snapshot it must be marked read-only */
5457 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5459 rbd_dev->mapping.read_only = read_only;
5461 rc = rbd_dev_device_setup(rbd_dev);
5464 * rbd_dev_header_unwatch_sync() can't be moved into
5465 * rbd_dev_image_release() without refactoring, see
5466 * commit 1f3ef78861ac.
5468 rbd_dev_header_unwatch_sync(rbd_dev);
5469 rbd_dev_image_release(rbd_dev);
5470 goto err_out_module;
5476 rbd_dev_destroy(rbd_dev);
5478 rbd_put_client(rbdc);
5482 module_put(THIS_MODULE);
5484 dout("Error adding device %s\n", buf);
5489 static ssize_t rbd_add(struct bus_type *bus,
5496 return do_rbd_add(bus, buf, count);
5499 static ssize_t rbd_add_single_major(struct bus_type *bus,
5503 return do_rbd_add(bus, buf, count);
5506 static void rbd_dev_device_release(struct device *dev)
5508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5510 rbd_free_disk(rbd_dev);
5511 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5512 rbd_dev_mapping_clear(rbd_dev);
5514 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5515 rbd_dev_id_put(rbd_dev);
5516 rbd_dev_mapping_clear(rbd_dev);
5519 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5521 while (rbd_dev->parent) {
5522 struct rbd_device *first = rbd_dev;
5523 struct rbd_device *second = first->parent;
5524 struct rbd_device *third;
5527 * Follow to the parent with no grandparent and
5530 while (second && (third = second->parent)) {
5535 rbd_dev_image_release(second);
5536 first->parent = NULL;
5537 first->parent_overlap = 0;
5539 rbd_assert(first->parent_spec);
5540 rbd_spec_put(first->parent_spec);
5541 first->parent_spec = NULL;
5545 static ssize_t do_rbd_remove(struct bus_type *bus,
5549 struct rbd_device *rbd_dev = NULL;
5550 struct list_head *tmp;
5553 bool already = false;
5556 ret = kstrtoul(buf, 10, &ul);
5560 /* convert to int; abort if we lost anything in the conversion */
5566 spin_lock(&rbd_dev_list_lock);
5567 list_for_each(tmp, &rbd_dev_list) {
5568 rbd_dev = list_entry(tmp, struct rbd_device, node);
5569 if (rbd_dev->dev_id == dev_id) {
5575 spin_lock_irq(&rbd_dev->lock);
5576 if (rbd_dev->open_count)
5579 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5581 spin_unlock_irq(&rbd_dev->lock);
5583 spin_unlock(&rbd_dev_list_lock);
5584 if (ret < 0 || already)
5587 rbd_dev_header_unwatch_sync(rbd_dev);
5589 * flush remaining watch callbacks - these must be complete
5590 * before the osd_client is shutdown
5592 dout("%s: flushing notifies", __func__);
5593 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5596 * Don't free anything from rbd_dev->disk until after all
5597 * notifies are completely processed. Otherwise
5598 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5599 * in a potential use after free of rbd_dev->disk or rbd_dev.
5601 rbd_bus_del_dev(rbd_dev);
5602 rbd_dev_image_release(rbd_dev);
5603 module_put(THIS_MODULE);
5608 static ssize_t rbd_remove(struct bus_type *bus,
5615 return do_rbd_remove(bus, buf, count);
5618 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5622 return do_rbd_remove(bus, buf, count);
5626 * create control files in sysfs
5629 static int rbd_sysfs_init(void)
5633 ret = device_register(&rbd_root_dev);
5637 ret = bus_register(&rbd_bus_type);
5639 device_unregister(&rbd_root_dev);
5644 static void rbd_sysfs_cleanup(void)
5646 bus_unregister(&rbd_bus_type);
5647 device_unregister(&rbd_root_dev);
5650 static int rbd_slab_init(void)
5652 rbd_assert(!rbd_img_request_cache);
5653 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5654 sizeof (struct rbd_img_request),
5655 __alignof__(struct rbd_img_request),
5657 if (!rbd_img_request_cache)
5660 rbd_assert(!rbd_obj_request_cache);
5661 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5662 sizeof (struct rbd_obj_request),
5663 __alignof__(struct rbd_obj_request),
5665 if (!rbd_obj_request_cache)
5668 rbd_assert(!rbd_segment_name_cache);
5669 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5670 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5671 if (rbd_segment_name_cache)
5674 if (rbd_obj_request_cache) {
5675 kmem_cache_destroy(rbd_obj_request_cache);
5676 rbd_obj_request_cache = NULL;
5679 kmem_cache_destroy(rbd_img_request_cache);
5680 rbd_img_request_cache = NULL;
5685 static void rbd_slab_exit(void)
5687 rbd_assert(rbd_segment_name_cache);
5688 kmem_cache_destroy(rbd_segment_name_cache);
5689 rbd_segment_name_cache = NULL;
5691 rbd_assert(rbd_obj_request_cache);
5692 kmem_cache_destroy(rbd_obj_request_cache);
5693 rbd_obj_request_cache = NULL;
5695 rbd_assert(rbd_img_request_cache);
5696 kmem_cache_destroy(rbd_img_request_cache);
5697 rbd_img_request_cache = NULL;
5700 static int __init rbd_init(void)
5704 if (!libceph_compatible(NULL)) {
5705 rbd_warn(NULL, "libceph incompatibility (quitting)");
5709 rc = rbd_slab_init();
5714 * The number of active work items is limited by the number of
5715 * rbd devices, so leave @max_active at default.
5717 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5724 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5725 if (rbd_major < 0) {
5731 rc = rbd_sysfs_init();
5733 goto err_out_blkdev;
5736 pr_info("loaded (major %d)\n", rbd_major);
5738 pr_info("loaded\n");
5744 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5746 destroy_workqueue(rbd_wq);
5752 static void __exit rbd_exit(void)
5754 ida_destroy(&rbd_dev_id_ida);
5755 rbd_sysfs_cleanup();
5757 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5758 destroy_workqueue(rbd_wq);
5762 module_init(rbd_init);
5763 module_exit(rbd_exit);
5765 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5766 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5767 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5768 /* following authorship retained from original osdblk.c */
5769 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5771 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5772 MODULE_LICENSE("GPL");