snap.c 27.2 KB
Newer Older
1
#include <linux/ceph/ceph_debug.h>
S
Sage Weil 已提交
2 3

#include <linux/sort.h>
4
#include <linux/slab.h>
S
Sage Weil 已提交
5 6

#include "super.h"
7 8 9
#include "mds_client.h"

#include <linux/ceph/decode.h>
S
Sage Weil 已提交
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72

/*
 * Snapshots in ceph are driven in large part by cooperation from the
 * client.  In contrast to local file systems or file servers that
 * implement snapshots at a single point in the system, ceph's
 * distributed access to storage requires clients to help decide
 * whether a write logically occurs before or after a recently created
 * snapshot.
 *
 * This provides a perfect instantanous client-wide snapshot.  Between
 * clients, however, snapshots may appear to be applied at slightly
 * different points in time, depending on delays in delivering the
 * snapshot notification.
 *
 * Snapshots are _not_ file system-wide.  Instead, each snapshot
 * applies to the subdirectory nested beneath some directory.  This
 * effectively divides the hierarchy into multiple "realms," where all
 * of the files contained by each realm share the same set of
 * snapshots.  An individual realm's snap set contains snapshots
 * explicitly created on that realm, as well as any snaps in its
 * parent's snap set _after_ the point at which the parent became it's
 * parent (due to, say, a rename).  Similarly, snaps from prior parents
 * during the time intervals during which they were the parent are included.
 *
 * The client is spared most of this detail, fortunately... it must only
 * maintains a hierarchy of realms reflecting the current parent/child
 * realm relationship, and for each realm has an explicit list of snaps
 * inherited from prior parents.
 *
 * A snap_realm struct is maintained for realms containing every inode
 * with an open cap in the system.  (The needed snap realm information is
 * provided by the MDS whenever a cap is issued, i.e., on open.)  A 'seq'
 * version number is used to ensure that as realm parameters change (new
 * snapshot, new parent, etc.) the client's realm hierarchy is updated.
 *
 * The realm hierarchy drives the generation of a 'snap context' for each
 * realm, which simply lists the resulting set of snaps for the realm.  This
 * is attached to any writes sent to OSDs.
 */
/*
 * Unfortunately error handling is a bit mixed here.  If we get a snap
 * update, but don't have enough memory to update our realm hierarchy,
 * it's not clear what we can do about it (besides complaining to the
 * console).
 */


/*
 * increase ref count for the realm
 *
 * caller must hold snap_rwsem for write.
 */
void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
			 struct ceph_snap_realm *realm)
{
	dout("get_realm %p %d -> %d\n", realm,
	     atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
	/*
	 * since we _only_ increment realm refs or empty the empty
	 * list with snap_rwsem held, adjusting the empty list here is
	 * safe.  we do need to protect against concurrent empty list
	 * additions, however.
	 */
73
	if (atomic_inc_return(&realm->nref) == 1) {
S
Sage Weil 已提交
74 75 76 77 78 79
		spin_lock(&mdsc->snap_empty_lock);
		list_del_init(&realm->empty_item);
		spin_unlock(&mdsc->snap_empty_lock);
	}
}

S
Sage Weil 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
static void __insert_snap_realm(struct rb_root *root,
				struct ceph_snap_realm *new)
{
	struct rb_node **p = &root->rb_node;
	struct rb_node *parent = NULL;
	struct ceph_snap_realm *r = NULL;

	while (*p) {
		parent = *p;
		r = rb_entry(parent, struct ceph_snap_realm, node);
		if (new->ino < r->ino)
			p = &(*p)->rb_left;
		else if (new->ino > r->ino)
			p = &(*p)->rb_right;
		else
			BUG();
	}

	rb_link_node(&new->node, parent, p);
	rb_insert_color(&new->node, root);
}

S
Sage Weil 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
/*
 * create and get the realm rooted at @ino and bump its ref count.
 *
 * caller must hold snap_rwsem for write.
 */
static struct ceph_snap_realm *ceph_create_snap_realm(
	struct ceph_mds_client *mdsc,
	u64 ino)
{
	struct ceph_snap_realm *realm;

	realm = kzalloc(sizeof(*realm), GFP_NOFS);
	if (!realm)
		return ERR_PTR(-ENOMEM);

117
	atomic_set(&realm->nref, 1);    /* for caller */
S
Sage Weil 已提交
118 119 120 121
	realm->ino = ino;
	INIT_LIST_HEAD(&realm->children);
	INIT_LIST_HEAD(&realm->child_item);
	INIT_LIST_HEAD(&realm->empty_item);
S
Sage Weil 已提交
122
	INIT_LIST_HEAD(&realm->dirty_item);
S
Sage Weil 已提交
123 124
	INIT_LIST_HEAD(&realm->inodes_with_caps);
	spin_lock_init(&realm->inodes_with_caps_lock);
S
Sage Weil 已提交
125
	__insert_snap_realm(&mdsc->snap_realms, realm);
S
Sage Weil 已提交
126 127 128 129 130
	dout("create_snap_realm %llx %p\n", realm->ino, realm);
	return realm;
}

/*
S
Sage Weil 已提交
131
 * lookup the realm rooted at @ino.
S
Sage Weil 已提交
132 133 134
 *
 * caller must hold snap_rwsem for write.
 */
135 136
static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
						   u64 ino)
S
Sage Weil 已提交
137
{
S
Sage Weil 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
	struct rb_node *n = mdsc->snap_realms.rb_node;
	struct ceph_snap_realm *r;

	while (n) {
		r = rb_entry(n, struct ceph_snap_realm, node);
		if (ino < r->ino)
			n = n->rb_left;
		else if (ino > r->ino)
			n = n->rb_right;
		else {
			dout("lookup_snap_realm %llx %p\n", r->ino, r);
			return r;
		}
	}
	return NULL;
S
Sage Weil 已提交
153 154
}

155 156 157 158 159 160 161 162 163 164
struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
					       u64 ino)
{
	struct ceph_snap_realm *r;
	r = __lookup_snap_realm(mdsc, ino);
	if (r)
		ceph_get_snap_realm(mdsc, r);
	return r;
}

S
Sage Weil 已提交
165 166 167 168 169 170 171 172 173 174 175
static void __put_snap_realm(struct ceph_mds_client *mdsc,
			     struct ceph_snap_realm *realm);

/*
 * called with snap_rwsem (write)
 */
static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
				 struct ceph_snap_realm *realm)
{
	dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);

S
Sage Weil 已提交
176
	rb_erase(&realm->node, &mdsc->snap_realms);
S
Sage Weil 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

	if (realm->parent) {
		list_del_init(&realm->child_item);
		__put_snap_realm(mdsc, realm->parent);
	}

	kfree(realm->prior_parent_snaps);
	kfree(realm->snaps);
	ceph_put_snap_context(realm->cached_context);
	kfree(realm);
}

/*
 * caller holds snap_rwsem (write)
 */
static void __put_snap_realm(struct ceph_mds_client *mdsc,
			     struct ceph_snap_realm *realm)
{
	dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
	     atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
	if (atomic_dec_and_test(&realm->nref))
		__destroy_snap_realm(mdsc, realm);
}

/*
 * caller needn't hold any locks
 */
void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
			 struct ceph_snap_realm *realm)
{
	dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
	     atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
	if (!atomic_dec_and_test(&realm->nref))
		return;

	if (down_write_trylock(&mdsc->snap_rwsem)) {
		__destroy_snap_realm(mdsc, realm);
		up_write(&mdsc->snap_rwsem);
	} else {
		spin_lock(&mdsc->snap_empty_lock);
217
		list_add(&realm->empty_item, &mdsc->snap_empty);
S
Sage Weil 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
		spin_unlock(&mdsc->snap_empty_lock);
	}
}

/*
 * Clean up any realms whose ref counts have dropped to zero.  Note
 * that this does not include realms who were created but not yet
 * used.
 *
 * Called under snap_rwsem (write)
 */
static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
{
	struct ceph_snap_realm *realm;

	spin_lock(&mdsc->snap_empty_lock);
	while (!list_empty(&mdsc->snap_empty)) {
		realm = list_first_entry(&mdsc->snap_empty,
				   struct ceph_snap_realm, empty_item);
		list_del(&realm->empty_item);
		spin_unlock(&mdsc->snap_empty_lock);
		__destroy_snap_realm(mdsc, realm);
		spin_lock(&mdsc->snap_empty_lock);
	}
	spin_unlock(&mdsc->snap_empty_lock);
}

void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
{
	down_write(&mdsc->snap_rwsem);
	__cleanup_empty_realms(mdsc);
	up_write(&mdsc->snap_rwsem);
}

/*
 * adjust the parent realm of a given @realm.  adjust child list, and parent
 * pointers, and ref counts appropriately.
 *
 * return true if parent was changed, 0 if unchanged, <0 on error.
 *
 * caller must hold snap_rwsem for write.
 */
static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
				    struct ceph_snap_realm *realm,
				    u64 parentino)
{
	struct ceph_snap_realm *parent;

	if (realm->parent_ino == parentino)
		return 0;

	parent = ceph_lookup_snap_realm(mdsc, parentino);
	if (!parent) {
		parent = ceph_create_snap_realm(mdsc, parentino);
		if (IS_ERR(parent))
			return PTR_ERR(parent);
	}
	dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n",
	     realm->ino, realm, realm->parent_ino, realm->parent,
	     parentino, parent);
	if (realm->parent) {
		list_del_init(&realm->child_item);
		ceph_put_snap_realm(mdsc, realm->parent);
	}
	realm->parent_ino = parentino;
	realm->parent = parent;
	list_add(&realm->child_item, &parent->children);
	return 1;
}


static int cmpu64_rev(const void *a, const void *b)
{
	if (*(u64 *)a < *(u64 *)b)
		return 1;
	if (*(u64 *)a > *(u64 *)b)
		return -1;
	return 0;
}

298

S
Sage Weil 已提交
299 300 301 302 303 304 305 306
/*
 * build the snap context for a given realm.
 */
static int build_snap_context(struct ceph_snap_realm *realm)
{
	struct ceph_snap_realm *parent = realm->parent;
	struct ceph_snap_context *snapc;
	int err = 0;
307
	u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
S
Sage Weil 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

	/*
	 * build parent context, if it hasn't been built.
	 * conservatively estimate that all parent snaps might be
	 * included by us.
	 */
	if (parent) {
		if (!parent->cached_context) {
			err = build_snap_context(parent);
			if (err)
				goto fail;
		}
		num += parent->cached_context->num_snaps;
	}

	/* do i actually need to update?  not if my context seq
	   matches realm seq, and my parents' does to.  (this works
	   because we rebuild_snap_realms() works _downward_ in
	   hierarchy after each update.) */
	if (realm->cached_context &&
S
Sage Weil 已提交
328
	    realm->cached_context->seq == realm->seq &&
S
Sage Weil 已提交
329
	    (!parent ||
S
Sage Weil 已提交
330
	     realm->cached_context->seq >= parent->cached_context->seq)) {
331
		dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
S
Sage Weil 已提交
332 333 334
		     " (unchanged)\n",
		     realm->ino, realm, realm->cached_context,
		     realm->cached_context->seq,
335
		     (unsigned int) realm->cached_context->num_snaps);
S
Sage Weil 已提交
336 337 338 339 340
		return 0;
	}

	/* alloc new snap context */
	err = -ENOMEM;
X
Xi Wang 已提交
341
	if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
S
Sage Weil 已提交
342
		goto fail;
343
	snapc = ceph_create_snap_context(num, GFP_NOFS);
S
Sage Weil 已提交
344 345 346 347 348 349 350
	if (!snapc)
		goto fail;

	/* build (reverse sorted) snap vector */
	num = 0;
	snapc->seq = realm->seq;
	if (parent) {
351 352
		u32 i;

L
Lucas De Marchi 已提交
353
		/* include any of parent's snaps occurring _after_ my
S
Sage Weil 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
		   parent became my parent */
		for (i = 0; i < parent->cached_context->num_snaps; i++)
			if (parent->cached_context->snaps[i] >=
			    realm->parent_since)
				snapc->snaps[num++] =
					parent->cached_context->snaps[i];
		if (parent->cached_context->seq > snapc->seq)
			snapc->seq = parent->cached_context->seq;
	}
	memcpy(snapc->snaps + num, realm->snaps,
	       sizeof(u64)*realm->num_snaps);
	num += realm->num_snaps;
	memcpy(snapc->snaps + num, realm->prior_parent_snaps,
	       sizeof(u64)*realm->num_prior_parent_snaps);
	num += realm->num_prior_parent_snaps;

	sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
	snapc->num_snaps = num;
372 373 374
	dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
	     realm->ino, realm, snapc, snapc->seq,
	     (unsigned int) snapc->num_snaps);
S
Sage Weil 已提交
375

376
	ceph_put_snap_context(realm->cached_context);
S
Sage Weil 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
	realm->cached_context = snapc;
	return 0;

fail:
	/*
	 * if we fail, clear old (incorrect) cached_context... hopefully
	 * we'll have better luck building it later
	 */
	if (realm->cached_context) {
		ceph_put_snap_context(realm->cached_context);
		realm->cached_context = NULL;
	}
	pr_err("build_snap_context %llx %p fail %d\n", realm->ino,
	       realm, err);
	return err;
}

/*
 * rebuild snap context for the given realm and all of its children.
 */
static void rebuild_snap_realms(struct ceph_snap_realm *realm)
{
	struct ceph_snap_realm *child;

	dout("rebuild_snap_realms %llx %p\n", realm->ino, realm);
	build_snap_context(realm);

	list_for_each_entry(child, &realm->children, child_item)
		rebuild_snap_realms(child);
}


/*
 * helper to allocate and decode an array of snapids.  free prior
 * instance, if any.
 */
413
static int dup_array(u64 **dst, __le64 *src, u32 num)
S
Sage Weil 已提交
414
{
415
	u32 i;
S
Sage Weil 已提交
416 417 418 419 420 421 422 423 424 425 426 427 428 429

	kfree(*dst);
	if (num) {
		*dst = kcalloc(num, sizeof(u64), GFP_NOFS);
		if (!*dst)
			return -ENOMEM;
		for (i = 0; i < num; i++)
			(*dst)[i] = get_unaligned_le64(src + i);
	} else {
		*dst = NULL;
	}
	return 0;
}

430 431 432 433 434 435 436 437
static bool has_new_snaps(struct ceph_snap_context *o,
			  struct ceph_snap_context *n)
{
	if (n->num_snaps == 0)
		return false;
	/* snaps are in descending order */
	return n->snaps[0] > o->seq;
}
S
Sage Weil 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452

/*
 * When a snapshot is applied, the size/mtime inode metadata is queued
 * in a ceph_cap_snap (one for each snapshot) until writeback
 * completes and the metadata can be flushed back to the MDS.
 *
 * However, if a (sync) write is currently in-progress when we apply
 * the snapshot, we have to wait until the write succeeds or fails
 * (and a final size/mtime is known).  In this case the
 * cap_snap->writing = 1, and is said to be "pending."  When the write
 * finishes, we __ceph_finish_cap_snap().
 *
 * Caller must hold snap_rwsem for read (i.e., the realm topology won't
 * change).
 */
453
void ceph_queue_cap_snap(struct ceph_inode_info *ci)
S
Sage Weil 已提交
454 455 456
{
	struct inode *inode = &ci->vfs_inode;
	struct ceph_cap_snap *capsnap;
457
	struct ceph_snap_context *old_snapc, *new_snapc;
458
	int used, dirty;
S
Sage Weil 已提交
459 460 461 462 463 464 465

	capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
	if (!capsnap) {
		pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
		return;
	}

466
	spin_lock(&ci->i_ceph_lock);
S
Sage Weil 已提交
467
	used = __ceph_caps_used(ci);
468
	dirty = __ceph_caps_dirty(ci);
469

470
	old_snapc = ci->i_head_snapc;
471
	new_snapc = ci->i_snap_realm->cached_context;
472

473 474 475 476 477 478 479 480
	/*
	 * If there is a write in progress, treat that as a dirty Fw,
	 * even though it hasn't completed yet; by the time we finish
	 * up this capsnap it will be.
	 */
	if (used & CEPH_CAP_FILE_WR)
		dirty |= CEPH_CAP_FILE_WR;

S
Sage Weil 已提交
481 482 483 484 485
	if (__ceph_have_pending_cap_snap(ci)) {
		/* there is no point in queuing multiple "pending" cap_snaps,
		   as no new writes are allowed to start when pending, so any
		   writes in progress now were started before the previous
		   cap_snap.  lucky us. */
486
		dout("queue_cap_snap %p already pending\n", inode);
487 488
		goto update_snapc;
	}
489 490
	if (ci->i_wrbuffer_ref_head == 0 &&
	    !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
491 492 493
		dout("queue_cap_snap %p nothing dirty|writing\n", inode);
		goto update_snapc;
	}
S
Sage Weil 已提交
494

495
	BUG_ON(!old_snapc);
Y
Yan, Zheng 已提交
496

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
	/*
	 * There is no need to send FLUSHSNAP message to MDS if there is
	 * no new snapshot. But when there is dirty pages or on-going
	 * writes, we still need to create cap_snap. cap_snap is needed
	 * by the write path and page writeback path.
	 *
	 * also see ceph_try_drop_cap_snap()
	 */
	if (has_new_snaps(old_snapc, new_snapc)) {
		if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
			capsnap->need_flush = true;
	} else {
		if (!(used & CEPH_CAP_FILE_WR) &&
		    ci->i_wrbuffer_ref_head == 0) {
			dout("queue_cap_snap %p "
			     "no new_snap|dirty_page|writing\n", inode);
			goto update_snapc;
		}
	}

	dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
	     inode, capsnap, old_snapc, ceph_cap_string(dirty),
	     capsnap->need_flush ? "" : "no_flush");
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
	ihold(inode);

	atomic_set(&capsnap->nref, 1);
	INIT_LIST_HEAD(&capsnap->ci_item);

	capsnap->follows = old_snapc->seq;
	capsnap->issued = __ceph_caps_issued(ci, NULL);
	capsnap->dirty = dirty;

	capsnap->mode = inode->i_mode;
	capsnap->uid = inode->i_uid;
	capsnap->gid = inode->i_gid;

	if (dirty & CEPH_CAP_XATTR_EXCL) {
		__ceph_build_xattrs_blob(ci);
		capsnap->xattr_blob =
			ceph_buffer_get(ci->i_xattrs.blob);
		capsnap->xattr_version = ci->i_xattrs.version;
S
Sage Weil 已提交
538
	} else {
539 540
		capsnap->xattr_blob = NULL;
		capsnap->xattr_version = 0;
S
Sage Weil 已提交
541 542
	}

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
	capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;

	/* dirty page count moved from _head to this cap_snap;
	   all subsequent writes page dirties occur _after_ this
	   snapshot. */
	capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
	ci->i_wrbuffer_ref_head = 0;
	capsnap->context = old_snapc;
	list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);

	if (used & CEPH_CAP_FILE_WR) {
		dout("queue_cap_snap %p cap_snap %p snapc %p"
		     " seq %llu used WR, now pending\n", inode,
		     capsnap, old_snapc, old_snapc->seq);
		capsnap->writing = 1;
	} else {
		/* note mtime, size NOW. */
		__ceph_finish_cap_snap(ci, capsnap);
	}
	capsnap = NULL;
563
	old_snapc = NULL;
564 565 566

update_snapc:
	if (ci->i_head_snapc) {
567 568
		ci->i_head_snapc = ceph_get_snap_context(new_snapc);
		dout(" new snapc is %p\n", new_snapc);
569
	}
570
	spin_unlock(&ci->i_ceph_lock);
571 572 573

	kfree(capsnap);
	ceph_put_snap_context(old_snapc);
S
Sage Weil 已提交
574 575 576 577 578 579 580 581
}

/*
 * Finalize the size, mtime for a cap_snap.. that is, settle on final values
 * to be used for the snapshot, to be flushed back to the mds.
 *
 * If capsnap can now be flushed, add to snap_flush list, and return 1.
 *
582
 * Caller must hold i_ceph_lock.
S
Sage Weil 已提交
583 584 585 586 587
 */
int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
			    struct ceph_cap_snap *capsnap)
{
	struct inode *inode = &ci->vfs_inode;
588
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
S
Sage Weil 已提交
589 590 591 592 593 594 595 596

	BUG_ON(capsnap->writing);
	capsnap->size = inode->i_size;
	capsnap->mtime = inode->i_mtime;
	capsnap->atime = inode->i_atime;
	capsnap->ctime = inode->i_ctime;
	capsnap->time_warp_seq = ci->i_time_warp_seq;
	if (capsnap->dirty_pages) {
597
		dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
S
Sage Weil 已提交
598 599
		     "still has %d dirty pages\n", inode, capsnap,
		     capsnap->context, capsnap->context->seq,
600 601
		     ceph_cap_string(capsnap->dirty), capsnap->size,
		     capsnap->dirty_pages);
S
Sage Weil 已提交
602 603
		return 0;
	}
604
	dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
S
Sage Weil 已提交
605
	     inode, capsnap, capsnap->context,
606 607
	     capsnap->context->seq, ceph_cap_string(capsnap->dirty),
	     capsnap->size);
S
Sage Weil 已提交
608 609 610 611 612 613 614

	spin_lock(&mdsc->snap_flush_lock);
	list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
	spin_unlock(&mdsc->snap_flush_lock);
	return 1;  /* caller may want to ceph_flush_snaps */
}

615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
/*
 * Queue cap_snaps for snap writeback for this realm and its children.
 * Called under snap_rwsem, so realm topology won't change.
 */
static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
{
	struct ceph_inode_info *ci;
	struct inode *lastinode = NULL;
	struct ceph_snap_realm *child;

	dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);

	spin_lock(&realm->inodes_with_caps_lock);
	list_for_each_entry(ci, &realm->inodes_with_caps,
			    i_snap_realm_item) {
		struct inode *inode = igrab(&ci->vfs_inode);
		if (!inode)
			continue;
		spin_unlock(&realm->inodes_with_caps_lock);
634
		iput(lastinode);
635 636 637 638 639
		lastinode = inode;
		ceph_queue_cap_snap(ci);
		spin_lock(&realm->inodes_with_caps_lock);
	}
	spin_unlock(&realm->inodes_with_caps_lock);
640
	iput(lastinode);
641

S
Sage Weil 已提交
642 643 644 645 646 647
	list_for_each_entry(child, &realm->children, child_item) {
		dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
		     realm, realm->ino, child, child->ino);
		list_del_init(&child->dirty_item);
		list_add(&child->dirty_item, &realm->dirty_item);
	}
648

S
Sage Weil 已提交
649
	list_del_init(&realm->dirty_item);
650 651
	dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
}
S
Sage Weil 已提交
652 653 654 655 656 657 658 659 660

/*
 * Parse and apply a snapblob "snap trace" from the MDS.  This specifies
 * the snap realm parameters from a given realm and all of its ancestors,
 * up to the root.
 *
 * Caller must hold snap_rwsem for write.
 */
int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
661 662
			   void *p, void *e, bool deletion,
			   struct ceph_snap_realm **realm_ret)
S
Sage Weil 已提交
663 664 665 666
{
	struct ceph_mds_snap_realm *ri;    /* encoded */
	__le64 *snaps;                     /* encoded */
	__le64 *prior_parent_snaps;        /* encoded */
667 668
	struct ceph_snap_realm *realm = NULL;
	struct ceph_snap_realm *first_realm = NULL;
S
Sage Weil 已提交
669 670
	int invalidate = 0;
	int err = -ENOMEM;
S
Sage Weil 已提交
671
	LIST_HEAD(dirty_realms);
S
Sage Weil 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700

	dout("update_snap_trace deletion=%d\n", deletion);
more:
	ceph_decode_need(&p, e, sizeof(*ri), bad);
	ri = p;
	p += sizeof(*ri);
	ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) +
			    le32_to_cpu(ri->num_prior_parent_snaps)), bad);
	snaps = p;
	p += sizeof(u64) * le32_to_cpu(ri->num_snaps);
	prior_parent_snaps = p;
	p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);

	realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
	if (!realm) {
		realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
		if (IS_ERR(realm)) {
			err = PTR_ERR(realm);
			goto fail;
		}
	}

	/* ensure the parent is correct */
	err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
	if (err < 0)
		goto fail;
	invalidate += err;

	if (le64_to_cpu(ri->seq) > realm->seq) {
S
Sage Weil 已提交
701 702
		dout("update_snap_trace updating %llx %p %lld -> %lld\n",
		     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
S
Sage Weil 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
		/* update realm parameters, snap lists */
		realm->seq = le64_to_cpu(ri->seq);
		realm->created = le64_to_cpu(ri->created);
		realm->parent_since = le64_to_cpu(ri->parent_since);

		realm->num_snaps = le32_to_cpu(ri->num_snaps);
		err = dup_array(&realm->snaps, snaps, realm->num_snaps);
		if (err < 0)
			goto fail;

		realm->num_prior_parent_snaps =
			le32_to_cpu(ri->num_prior_parent_snaps);
		err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps,
				realm->num_prior_parent_snaps);
		if (err < 0)
			goto fail;

S
Sage Weil 已提交
720 721
		/* queue realm for cap_snap creation */
		list_add(&realm->dirty_item, &dirty_realms);
722 723
		if (realm->seq > mdsc->last_snap_seq)
			mdsc->last_snap_seq = realm->seq;
S
Sage Weil 已提交
724

S
Sage Weil 已提交
725 726
		invalidate = 1;
	} else if (!realm->cached_context) {
S
Sage Weil 已提交
727 728
		dout("update_snap_trace %llx %p seq %lld new\n",
		     realm->ino, realm, realm->seq);
S
Sage Weil 已提交
729
		invalidate = 1;
S
Sage Weil 已提交
730 731 732
	} else {
		dout("update_snap_trace %llx %p seq %lld unchanged\n",
		     realm->ino, realm, realm->seq);
S
Sage Weil 已提交
733 734 735 736 737 738
	}

	dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
	     realm, invalidate, p, e);

	/* invalidate when we reach the _end_ (root) of the trace */
739
	if (invalidate && p >= e)
S
Sage Weil 已提交
740 741
		rebuild_snap_realms(realm);

742 743 744 745 746 747 748 749
	if (!first_realm)
		first_realm = realm;
	else
		ceph_put_snap_realm(mdsc, realm);

	if (p < e)
		goto more;

S
Sage Weil 已提交
750 751 752 753
	/*
	 * queue cap snaps _after_ we've built the new snap contexts,
	 * so that i_head_snapc can be set appropriately.
	 */
S
Sage Weil 已提交
754 755 756
	while (!list_empty(&dirty_realms)) {
		realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
					 dirty_item);
S
Sage Weil 已提交
757 758 759
		queue_realm_cap_snaps(realm);
	}

760 761 762 763 764
	if (realm_ret)
		*realm_ret = first_realm;
	else
		ceph_put_snap_realm(mdsc, first_realm);

S
Sage Weil 已提交
765 766 767 768 769 770
	__cleanup_empty_realms(mdsc);
	return 0;

bad:
	err = -EINVAL;
fail:
771 772 773 774
	if (realm && !IS_ERR(realm))
		ceph_put_snap_realm(mdsc, realm);
	if (first_realm)
		ceph_put_snap_realm(mdsc, first_realm);
S
Sage Weil 已提交
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
	pr_err("update_snap_trace error %d\n", err);
	return err;
}


/*
 * Send any cap_snaps that are queued for flush.  Try to carry
 * s_mutex across multiple snap flushes to avoid locking overhead.
 *
 * Caller holds no locks.
 */
static void flush_snaps(struct ceph_mds_client *mdsc)
{
	struct ceph_inode_info *ci;
	struct inode *inode;
	struct ceph_mds_session *session = NULL;

	dout("flush_snaps\n");
	spin_lock(&mdsc->snap_flush_lock);
	while (!list_empty(&mdsc->snap_flush_list)) {
		ci = list_first_entry(&mdsc->snap_flush_list,
				struct ceph_inode_info, i_snap_flush_item);
		inode = &ci->vfs_inode;
798
		ihold(inode);
S
Sage Weil 已提交
799
		spin_unlock(&mdsc->snap_flush_lock);
800
		spin_lock(&ci->i_ceph_lock);
801
		__ceph_flush_snaps(ci, &session);
802
		spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
		iput(inode);
		spin_lock(&mdsc->snap_flush_lock);
	}
	spin_unlock(&mdsc->snap_flush_lock);

	if (session) {
		mutex_unlock(&session->s_mutex);
		ceph_put_mds_session(session);
	}
	dout("flush_snaps done\n");
}


/*
 * Handle a snap notification from the MDS.
 *
 * This can take two basic forms: the simplest is just a snap creation
 * or deletion notification on an existing realm.  This should update the
 * realm and its children.
 *
 * The more difficult case is realm creation, due to snap creation at a
 * new point in the file hierarchy, or due to a rename that moves a file or
 * directory into another realm.
 */
void ceph_handle_snap(struct ceph_mds_client *mdsc,
828
		      struct ceph_mds_session *session,
S
Sage Weil 已提交
829 830
		      struct ceph_msg *msg)
{
831
	struct super_block *sb = mdsc->fsc->sb;
832
	int mds = session->s_mds;
S
Sage Weil 已提交
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900
	u64 split;
	int op;
	int trace_len;
	struct ceph_snap_realm *realm = NULL;
	void *p = msg->front.iov_base;
	void *e = p + msg->front.iov_len;
	struct ceph_mds_snap_head *h;
	int num_split_inos, num_split_realms;
	__le64 *split_inos = NULL, *split_realms = NULL;
	int i;
	int locked_rwsem = 0;

	/* decode */
	if (msg->front.iov_len < sizeof(*h))
		goto bad;
	h = p;
	op = le32_to_cpu(h->op);
	split = le64_to_cpu(h->split);   /* non-zero if we are splitting an
					  * existing realm */
	num_split_inos = le32_to_cpu(h->num_split_inos);
	num_split_realms = le32_to_cpu(h->num_split_realms);
	trace_len = le32_to_cpu(h->trace_len);
	p += sizeof(*h);

	dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
	     ceph_snap_op_name(op), split, trace_len);

	mutex_lock(&session->s_mutex);
	session->s_seq++;
	mutex_unlock(&session->s_mutex);

	down_write(&mdsc->snap_rwsem);
	locked_rwsem = 1;

	if (op == CEPH_SNAP_OP_SPLIT) {
		struct ceph_mds_snap_realm *ri;

		/*
		 * A "split" breaks part of an existing realm off into
		 * a new realm.  The MDS provides a list of inodes
		 * (with caps) and child realms that belong to the new
		 * child.
		 */
		split_inos = p;
		p += sizeof(u64) * num_split_inos;
		split_realms = p;
		p += sizeof(u64) * num_split_realms;
		ceph_decode_need(&p, e, sizeof(*ri), bad);
		/* we will peek at realm info here, but will _not_
		 * advance p, as the realm update will occur below in
		 * ceph_update_snap_trace. */
		ri = p;

		realm = ceph_lookup_snap_realm(mdsc, split);
		if (!realm) {
			realm = ceph_create_snap_realm(mdsc, split);
			if (IS_ERR(realm))
				goto out;
		}

		dout("splitting snap_realm %llx %p\n", realm->ino, realm);
		for (i = 0; i < num_split_inos; i++) {
			struct ceph_vino vino = {
				.ino = le64_to_cpu(split_inos[i]),
				.snap = CEPH_NOSNAP,
			};
			struct inode *inode = ceph_find_inode(sb, vino);
			struct ceph_inode_info *ci;
S
Sage Weil 已提交
901
			struct ceph_snap_realm *oldrealm;
S
Sage Weil 已提交
902 903 904 905 906

			if (!inode)
				continue;
			ci = ceph_inode(inode);

907
			spin_lock(&ci->i_ceph_lock);
S
Sage Weil 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926
			if (!ci->i_snap_realm)
				goto skip_inode;
			/*
			 * If this inode belongs to a realm that was
			 * created after our new realm, we experienced
			 * a race (due to another split notifications
			 * arriving from a different MDS).  So skip
			 * this inode.
			 */
			if (ci->i_snap_realm->created >
			    le64_to_cpu(ri->created)) {
				dout(" leaving %p in newer realm %llx %p\n",
				     inode, ci->i_snap_realm->ino,
				     ci->i_snap_realm);
				goto skip_inode;
			}
			dout(" will move %p to split realm %llx %p\n",
			     inode, realm->ino, realm);
			/*
S
Sage Weil 已提交
927
			 * Move the inode to the new realm
S
Sage Weil 已提交
928
			 */
929
			spin_lock(&realm->inodes_with_caps_lock);
S
Sage Weil 已提交
930
			list_del_init(&ci->i_snap_realm_item);
S
Sage Weil 已提交
931 932 933 934
			list_add(&ci->i_snap_realm_item,
				 &realm->inodes_with_caps);
			oldrealm = ci->i_snap_realm;
			ci->i_snap_realm = realm;
935
			spin_unlock(&realm->inodes_with_caps_lock);
936
			spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
937

S
Sage Weil 已提交
938 939
			ceph_get_snap_realm(mdsc, realm);
			ceph_put_snap_realm(mdsc, oldrealm);
S
Sage Weil 已提交
940 941 942 943 944

			iput(inode);
			continue;

skip_inode:
945
			spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
946 947 948 949 950 951
			iput(inode);
		}

		/* we may have taken some of the old realm's children. */
		for (i = 0; i < num_split_realms; i++) {
			struct ceph_snap_realm *child =
952
				__lookup_snap_realm(mdsc,
S
Sage Weil 已提交
953 954 955 956 957 958 959 960 961 962 963 964
					   le64_to_cpu(split_realms[i]));
			if (!child)
				continue;
			adjust_snap_realm_parent(mdsc, child, realm->ino);
		}
	}

	/*
	 * update using the provided snap trace. if we are deleting a
	 * snap, we can avoid queueing cap_snaps.
	 */
	ceph_update_snap_trace(mdsc, p, e,
965
			       op == CEPH_SNAP_OP_DESTROY, NULL);
S
Sage Weil 已提交
966

S
Sage Weil 已提交
967
	if (op == CEPH_SNAP_OP_SPLIT)
S
Sage Weil 已提交
968 969 970 971 972 973 974 975 976 977 978 979
		/* we took a reference when we created the realm, above */
		ceph_put_snap_realm(mdsc, realm);

	__cleanup_empty_realms(mdsc);

	up_write(&mdsc->snap_rwsem);

	flush_snaps(mdsc);
	return;

bad:
	pr_err("corrupt snap message from mds%d\n", mds);
980
	ceph_msg_dump(msg);
S
Sage Weil 已提交
981 982 983 984 985
out:
	if (locked_rwsem)
		up_write(&mdsc->snap_rwsem);
	return;
}