addr.c 53.1 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
S
Sage Weil 已提交
3 4 5 6 7 8

#include <linux/backing-dev.h>
#include <linux/fs.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/writeback.h>	/* generic_writepages */
9
#include <linux/slab.h>
S
Sage Weil 已提交
10 11
#include <linux/pagevec.h>
#include <linux/task_io_accounting_ops.h>
12
#include <linux/signal.h>
13
#include <linux/iversion.h>
14
#include <linux/ktime.h>
15
#include <linux/netfs.h>
S
Sage Weil 已提交
16 17

#include "super.h"
18
#include "mds_client.h"
19
#include "cache.h"
20
#include "metric.h"
21
#include <linux/ceph/osd_client.h>
22
#include <linux/ceph/striper.h>
S
Sage Weil 已提交
23 24 25 26 27 28 29 30 31 32 33 34

/*
 * Ceph address space ops.
 *
 * There are a few funny things going on here.
 *
 * The page->private field is used to reference a struct
 * ceph_snap_context for _every_ dirty page.  This indicates which
 * snapshot the page was logically dirtied in, and thus which snap
 * context needs to be associated with the osd write during writeback.
 *
 * Similarly, struct ceph_inode_info maintains a set of counters to
L
Lucas De Marchi 已提交
35
 * count dirty pages on the inode.  In the absence of snapshots,
S
Sage Weil 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
 *
 * When a snapshot is taken (that is, when the client receives
 * notification that a snapshot was taken), each inode with caps and
 * with dirty pages (dirty pages implies there is a cap) gets a new
 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
 * moved to capsnap->dirty. (Unless a sync write is currently in
 * progress.  In that case, the capsnap is said to be "pending", new
 * writes cannot start, and the capsnap isn't "finalized" until the
 * write completes (or fails) and a final size/mtime for the inode for
 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
 *
 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
 * we look for the first capsnap in i_cap_snaps and write out pages in
 * that snap context _only_.  Then we move on to the next capsnap,
 * eventually reaching the "live" or "head" context (i.e., pages that
 * are not yet snapped) and are writing the most recently dirtied
 * pages.
 *
 * Invalidate and so forth must take care to ensure the dirty page
 * accounting is preserved.
 */

Y
Yehuda Sadeh 已提交
60 61 62 63 64
#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
#define CONGESTION_OFF_THRESH(congestion_kb)				\
	(CONGESTION_ON_THRESH(congestion_kb) -				\
	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))

65 66 67
static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
					struct page *page, void **_fsdata);

68 69 70 71 72 73
static inline struct ceph_snap_context *page_snap_context(struct page *page)
{
	if (PagePrivate(page))
		return (void *)page->private;
	return NULL;
}
S
Sage Weil 已提交
74 75 76 77 78 79 80 81 82 83 84 85

/*
 * Dirty a page.  Optimistically adjust accounting, on the assumption
 * that we won't race with invalidate.  If we do, readjust.
 */
static int ceph_set_page_dirty(struct page *page)
{
	struct address_space *mapping = page->mapping;
	struct inode *inode;
	struct ceph_inode_info *ci;
	struct ceph_snap_context *snapc;

86
	if (PageDirty(page)) {
S
Sage Weil 已提交
87 88
		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
		     mapping->host, page, page->index);
89
		BUG_ON(!PagePrivate(page));
S
Sage Weil 已提交
90 91 92 93 94 95 96
		return 0;
	}

	inode = mapping->host;
	ci = ceph_inode(inode);

	/* dirty the head */
97
	spin_lock(&ci->i_ceph_lock);
98 99 100 101 102 103 104 105 106 107 108 109 110
	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
				list_last_entry(&ci->i_cap_snaps,
						struct ceph_cap_snap,
						ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
		capsnap->dirty_pages++;
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		++ci->i_wrbuffer_ref_head;
	}
S
Sage Weil 已提交
111
	if (ci->i_wrbuffer_ref == 0)
112
		ihold(inode);
S
Sage Weil 已提交
113 114 115 116 117 118 119
	++ci->i_wrbuffer_ref;
	dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
	     "snapc %p seq %lld (%d snaps)\n",
	     mapping->host, page, page->index,
	     ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
	     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
	     snapc, snapc->seq, snapc->num_snaps);
120
	spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
121

122 123 124 125 126
	/*
	 * Reference snap context in page->private.  Also set
	 * PagePrivate so that we get invalidatepage callback.
	 */
	BUG_ON(PagePrivate(page));
127
	attach_page_private(page, snapc);
S
Sage Weil 已提交
128

129
	return __set_page_dirty_nobuffers(page);
S
Sage Weil 已提交
130 131 132 133 134 135 136
}

/*
 * If we are truncating the full page (i.e. offset == 0), adjust the
 * dirty page counters appropriately.  Only called if there is private
 * data on the page.
 */
137 138
static void ceph_invalidatepage(struct page *page, unsigned int offset,
				unsigned int length)
S
Sage Weil 已提交
139
{
140
	struct inode *inode;
S
Sage Weil 已提交
141
	struct ceph_inode_info *ci;
142
	struct ceph_snap_context *snapc;
S
Sage Weil 已提交
143

J
Jeff Layton 已提交
144 145
	wait_on_page_fscache(page);

146
	inode = page->mapping->host;
147 148
	ci = ceph_inode(inode);

149
	if (offset != 0 || length != thp_size(page)) {
150 151 152 153
		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
		     inode, page, page->index, offset, length);
		return;
	}
154

155
	WARN_ON(!PageLocked(page));
156 157 158
	if (!PagePrivate(page))
		return;

159 160 161
	dout("%p invalidatepage %p idx %lu full dirty page\n",
	     inode, page, page->index);

162
	snapc = detach_page_private(page);
163 164
	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
	ceph_put_snap_context(snapc);
S
Sage Weil 已提交
165 166
}

J
Jeff Layton 已提交
167
static int ceph_releasepage(struct page *page, gfp_t gfp)
S
Sage Weil 已提交
168
{
169 170
	dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
	     page, page->index, PageDirty(page) ? "" : "not ");
171

J
Jeff Layton 已提交
172 173 174 175 176
	if (PageFsCache(page)) {
		if (!(gfp & __GFP_DIRECT_RECLAIM) || !(gfp & __GFP_FS))
			return 0;
		wait_on_page_fscache(page);
	}
177
	return !PagePrivate(page);
S
Sage Weil 已提交
178 179
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
{
	struct inode *inode = rreq->mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_layout *lo = &ci->i_layout;
	u32 blockoff;
	u64 blockno;

	/* Expand the start downward */
	blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
	rreq->start = blockno * lo->stripe_unit;
	rreq->len += blockoff;

	/* Now, round up the length to the next block */
	rreq->len = roundup(rreq->len, lo->stripe_unit);
}

static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
{
	struct inode *inode = subreq->rreq->mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	u64 objno, objoff;
	u32 xlen;

	/* Truncate the extent at the end of the current block */
	ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
				      &objno, &objoff, &xlen);
	subreq->len = min(xlen, fsc->mount_options->rsize);
	return true;
}

static void finish_netfs_read(struct ceph_osd_request *req)
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
	struct netfs_read_subrequest *subreq = req->r_priv;
	int num_pages;
	int err = req->r_result;

X
Xiubo Li 已提交
220
	ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
X
Xiubo Li 已提交
221
				 req->r_end_latency, osd_data->length, err);
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297

	dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
	     subreq->len, i_size_read(req->r_inode));

	/* no object means success but no data */
	if (err == -ENOENT)
		err = 0;
	else if (err == -EBLOCKLISTED)
		fsc->blocklisted = true;

	if (err >= 0 && err < subreq->len)
		__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);

	netfs_subreq_terminated(subreq, err, true);

	num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
	ceph_put_page_vector(osd_data->pages, num_pages, false);
	iput(req->r_inode);
}

static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
{
	struct netfs_read_request *rreq = subreq->rreq;
	struct inode *inode = rreq->mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_osd_request *req;
	struct ceph_vino vino = ceph_vino(inode);
	struct iov_iter iter;
	struct page **pages;
	size_t page_off;
	int err = 0;
	u64 len = subreq->len;

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
			0, 1, CEPH_OSD_OP_READ,
			CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
			NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		req = NULL;
		goto out;
	}

	dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
	iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
	err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
	if (err < 0) {
		dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
		goto out;
	}

	/* should always give us a page-aligned read */
	WARN_ON_ONCE(page_off);
	len = err;

	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
	req->r_callback = finish_netfs_read;
	req->r_priv = subreq;
	req->r_inode = inode;
	ihold(inode);

	err = ceph_osdc_start_request(req->r_osdc, req, false);
	if (err)
		iput(inode);
out:
	ceph_osdc_put_request(req);
	if (err)
		netfs_subreq_terminated(subreq, err, false);
	dout("%s: result %d\n", __func__, err);
}

static void ceph_init_rreq(struct netfs_read_request *rreq, struct file *file)
{
}

298 299 300 301 302 303 304 305 306 307
static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	int got = (uintptr_t)priv;

	if (got)
		ceph_put_cap_refs(ci, got);
}

308
static const struct netfs_read_request_ops ceph_netfs_read_ops = {
309 310 311 312 313 314
	.init_rreq		= ceph_init_rreq,
	.is_cache_enabled	= ceph_is_cache_enabled,
	.begin_cache_operation	= ceph_begin_cache_operation,
	.issue_op		= ceph_netfs_issue_op,
	.expand_readahead	= ceph_netfs_expand_readahead,
	.clamp_length		= ceph_netfs_clamp_length,
315
	.check_write_begin	= ceph_netfs_check_write_begin,
316
	.cleanup		= ceph_readahead_cleanup,
317 318 319 320 321 322 323 324 325
};

/* read a single page, without unlocking it. */
static int ceph_readpage(struct file *file, struct page *page)
{
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_vino vino = ceph_vino(inode);
	u64 off = page_offset(page);
326
	u64 len = thp_size(page);
327 328 329 330 331 332 333 334 335 336

	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		/*
		 * Uptodate inline data should have been added
		 * into page cache while getting Fcr caps.
		 */
		if (off == 0) {
			unlock_page(page);
			return -EINVAL;
		}
337
		zero_user_segment(page, 0, thp_size(page));
338 339 340 341 342 343 344 345 346 347 348
		SetPageUptodate(page);
		unlock_page(page);
		return 0;
	}

	dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
	     vino.ino, vino.snap, file, off, len, page, page->index);

	return netfs_readpage(file, page, &ceph_netfs_read_ops, NULL);
}

349
static void ceph_readahead(struct readahead_control *ractl)
S
Sage Weil 已提交
350
{
351 352 353
	struct inode *inode = file_inode(ractl->file);
	struct ceph_file_info *fi = ractl->file->private_data;
	struct ceph_rw_context *rw_ctx;
354 355 356
	int got = 0;
	int ret = 0;

357 358 359 360
	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
		return;

	rw_ctx = ceph_find_rw_context(fi);
361
	if (!rw_ctx) {
362 363 364 365
		/*
		 * readahead callers do not necessarily hold Fcb caps
		 * (e.g. fadvise, madvise).
		 */
366
		int want = CEPH_CAP_FILE_CACHE;
367 368 369

		ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
		if (ret < 0)
370
			dout("start_read %p, error getting cap\n", inode);
371
		else if (!(got & want))
372
			dout("start_read %p, no cache cap\n", inode);
S
Sage Weil 已提交
373

374 375
		if (ret <= 0)
			return;
376
	}
377
	netfs_readahead(ractl, &ceph_netfs_read_ops, (void *)(uintptr_t)got);
S
Sage Weil 已提交
378 379
}

380 381 382 383 384 385
struct ceph_writeback_ctl
{
	loff_t i_size;
	u64 truncate_size;
	u32 truncate_seq;
	bool size_stable;
386
	bool head_snapc;
387 388
};

S
Sage Weil 已提交
389 390 391 392
/*
 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 * only snap context we are allowed to write back.
 */
393
static struct ceph_snap_context *
394 395
get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
		   struct ceph_snap_context *page_snapc)
S
Sage Weil 已提交
396 397 398 399 400
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = NULL;
	struct ceph_cap_snap *capsnap = NULL;

401
	spin_lock(&ci->i_ceph_lock);
S
Sage Weil 已提交
402 403 404
	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
		     capsnap->context, capsnap->dirty_pages);
405 406 407 408 409 410 411 412 413 414 415 416 417 418
		if (!capsnap->dirty_pages)
			continue;

		/* get i_size, truncate_{seq,size} for page_snapc? */
		if (snapc && capsnap->context != page_snapc)
			continue;

		if (ctl) {
			if (capsnap->writing) {
				ctl->i_size = i_size_read(inode);
				ctl->size_stable = false;
			} else {
				ctl->i_size = capsnap->size;
				ctl->size_stable = true;
419
			}
420 421
			ctl->truncate_size = capsnap->truncate_size;
			ctl->truncate_seq = capsnap->truncate_seq;
422
			ctl->head_snapc = false;
S
Sage Weil 已提交
423
		}
424 425 426 427 428 429 430 431 432

		if (snapc)
			break;

		snapc = ceph_get_snap_context(capsnap->context);
		if (!page_snapc ||
		    page_snapc == snapc ||
		    page_snapc->seq > snapc->seq)
			break;
S
Sage Weil 已提交
433
	}
434
	if (!snapc && ci->i_wrbuffer_ref_head) {
435
		snapc = ceph_get_snap_context(ci->i_head_snapc);
S
Sage Weil 已提交
436 437
		dout(" head snapc %p has %d dirty pages\n",
		     snapc, ci->i_wrbuffer_ref_head);
438 439 440 441 442
		if (ctl) {
			ctl->i_size = i_size_read(inode);
			ctl->truncate_size = ci->i_truncate_size;
			ctl->truncate_seq = ci->i_truncate_seq;
			ctl->size_stable = false;
443
			ctl->head_snapc = true;
444
		}
S
Sage Weil 已提交
445
	}
446
	spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
447 448 449
	return snapc;
}

450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
static u64 get_writepages_data_length(struct inode *inode,
				      struct page *page, u64 start)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = page_snap_context(page);
	struct ceph_cap_snap *capsnap = NULL;
	u64 end = i_size_read(inode);

	if (snapc != ci->i_head_snapc) {
		bool found = false;
		spin_lock(&ci->i_ceph_lock);
		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
			if (capsnap->context == snapc) {
				if (!capsnap->writing)
					end = capsnap->size;
				found = true;
				break;
			}
		}
		spin_unlock(&ci->i_ceph_lock);
		WARN_ON(!found);
	}
472 473
	if (end > page_offset(page) + thp_size(page))
		end = page_offset(page) + thp_size(page);
474 475 476
	return end > start ? end - start : 0;
}

S
Sage Weil 已提交
477 478 479
/*
 * Write a single page, but leave the page locked.
 *
480
 * If we get a write error, mark the mapping for error, but still adjust the
S
Sage Weil 已提交
481 482 483 484
 * dirty page accounting (i.e., page is no longer dirty).
 */
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{
485 486 487
	struct inode *inode = page->mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
488
	struct ceph_snap_context *snapc, *oldest;
489
	loff_t page_off = page_offset(page);
490
	int err;
491
	loff_t len = thp_size(page);
492
	struct ceph_writeback_ctl ceph_wbc;
493 494
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
S
Sage Weil 已提交
495 496 497 498

	dout("writepage %p idx %lu\n", page, page->index);

	/* verify this is a writeable snap context */
499
	snapc = page_snap_context(page);
500
	if (!snapc) {
S
Sage Weil 已提交
501
		dout("writepage %p page %p not dirty?\n", inode, page);
Y
Yan, Zheng 已提交
502
		return 0;
S
Sage Weil 已提交
503
	}
504
	oldest = get_oldest_context(inode, &ceph_wbc, snapc);
505
	if (snapc->seq > oldest->seq) {
S
Sage Weil 已提交
506
		dout("writepage %p page %p snapc %p not writeable - noop\n",
507
		     inode, page, snapc);
S
Sage Weil 已提交
508
		/* we should only noop if called by kswapd */
509
		WARN_ON(!(current->flags & PF_MEMALLOC));
510
		ceph_put_snap_context(oldest);
511
		redirty_page_for_writepage(wbc, page);
Y
Yan, Zheng 已提交
512
		return 0;
S
Sage Weil 已提交
513
	}
514
	ceph_put_snap_context(oldest);
S
Sage Weil 已提交
515 516

	/* is this a partial page at end of file? */
517 518
	if (page_off >= ceph_wbc.i_size) {
		dout("%p page eof %llu\n", page, ceph_wbc.i_size);
519
		page->mapping->a_ops->invalidatepage(page, 0, thp_size(page));
Y
Yan, Zheng 已提交
520
		return 0;
521
	}
Y
Yan, Zheng 已提交
522

523 524
	if (ceph_wbc.i_size < page_off + len)
		len = ceph_wbc.i_size - page_off;
S
Sage Weil 已提交
525

526
	dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
527
	     inode, page, page->index, page_off, len, snapc, snapc->seq);
S
Sage Weil 已提交
528

529
	if (atomic_long_inc_return(&fsc->writeback_count) >
530
	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
531
		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
Y
Yehuda Sadeh 已提交
532

S
Sage Weil 已提交
533
	set_page_writeback(page);
534 535 536 537 538 539 540 541 542 543 544
	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
				    ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
				    true);
	if (IS_ERR(req)) {
		redirty_page_for_writepage(wbc, page);
		end_page_writeback(page);
		return PTR_ERR(req);
	}

	/* it may be a short write due to an object boundary */
545
	WARN_ON_ONCE(len > thp_size(page));
546 547 548 549 550 551 552 553
	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
	dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);

	req->r_mtime = inode->i_mtime;
	err = ceph_osdc_start_request(osdc, req, true);
	if (!err)
		err = ceph_osdc_wait_request(osdc, req);

X
Xiubo Li 已提交
554
	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
X
Xiubo Li 已提交
555
				  req->r_end_latency, len, err);
556 557 558 559 560

	ceph_osdc_put_request(req);
	if (err == 0)
		err = len;

S
Sage Weil 已提交
561
	if (err < 0) {
562 563 564 565 566 567 568 569
		struct writeback_control tmp_wbc;
		if (!wbc)
			wbc = &tmp_wbc;
		if (err == -ERESTARTSYS) {
			/* killed by SIGKILL */
			dout("writepage interrupted page %p\n", page);
			redirty_page_for_writepage(wbc, page);
			end_page_writeback(page);
Y
Yan, Zheng 已提交
570
			return err;
571
		}
572 573
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
574 575
		dout("writepage setting page/mapping error %d %p\n",
		     err, page);
S
Sage Weil 已提交
576
		mapping_set_error(&inode->i_data, err);
577
		wbc->pages_skipped++;
S
Sage Weil 已提交
578 579 580 581
	} else {
		dout("writepage cleaned page %p\n", page);
		err = 0;  /* vfs expects us to return 0 */
	}
582 583
	oldest = detach_page_private(page);
	WARN_ON_ONCE(oldest != snapc);
S
Sage Weil 已提交
584 585
	end_page_writeback(page);
	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
586
	ceph_put_snap_context(snapc);  /* page's reference */
587 588 589 590 591

	if (atomic_long_dec_return(&fsc->writeback_count) <
	    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
		clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);

S
Sage Weil 已提交
592 593 594 595 596
	return err;
}

static int ceph_writepage(struct page *page, struct writeback_control *wbc)
{
597 598 599
	int err;
	struct inode *inode = page->mapping->host;
	BUG_ON(!inode);
600
	ihold(inode);
601
	err = writepage_nounlock(page, wbc);
602 603 604 605 606
	if (err == -ERESTARTSYS) {
		/* direct memory reclaimer was killed by SIGKILL. return 0
		 * to prevent caller from setting mapping/page error */
		err = 0;
	}
S
Sage Weil 已提交
607
	unlock_page(page);
608
	iput(inode);
S
Sage Weil 已提交
609 610 611 612 613 614 615 616 617
	return err;
}

/*
 * async writeback completion handler.
 *
 * If we get an error, set the mapping error bit, but not the individual
 * page error bits.
 */
618
static void writepages_finish(struct ceph_osd_request *req)
S
Sage Weil 已提交
619 620 621
{
	struct inode *inode = req->r_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
622
	struct ceph_osd_data *osd_data;
S
Sage Weil 已提交
623
	struct page *page;
Y
Yan, Zheng 已提交
624 625 626
	int num_pages, total_pages = 0;
	int i, j;
	int rc = req->r_result;
S
Sage Weil 已提交
627 628
	struct ceph_snap_context *snapc = req->r_snapc;
	struct address_space *mapping = inode->i_mapping;
629
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
X
Xiubo Li 已提交
630
	unsigned int len = 0;
Y
Yan, Zheng 已提交
631
	bool remove_page;
S
Sage Weil 已提交
632

Y
Yan, Zheng 已提交
633
	dout("writepages_finish %p rc %d\n", inode, rc);
634
	if (rc < 0) {
S
Sage Weil 已提交
635
		mapping_set_error(mapping, rc);
636
		ceph_set_error_write(ci);
637 638
		if (rc == -EBLOCKLISTED)
			fsc->blocklisted = true;
639 640 641
	} else {
		ceph_clear_error_write(ci);
	}
Y
Yan, Zheng 已提交
642 643 644 645 646 647 648 649 650

	/*
	 * We lost the cache cap, need to truncate the page before
	 * it is unlocked, otherwise we'd truncate it later in the
	 * page truncation thread, possibly losing some data that
	 * raced its way in
	 */
	remove_page = !(ceph_caps_issued(ci) &
			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
S
Sage Weil 已提交
651 652

	/* clean all pages */
Y
Yan, Zheng 已提交
653 654 655
	for (i = 0; i < req->r_num_ops; i++) {
		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
			break;
656

Y
Yan, Zheng 已提交
657 658
		osd_data = osd_req_op_extent_osd_data(req, i);
		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
X
Xiubo Li 已提交
659
		len += osd_data->length;
Y
Yan, Zheng 已提交
660 661 662 663 664 665 666 667 668 669 670
		num_pages = calc_pages_for((u64)osd_data->alignment,
					   (u64)osd_data->length);
		total_pages += num_pages;
		for (j = 0; j < num_pages; j++) {
			page = osd_data->pages[j];
			BUG_ON(!page);
			WARN_ON(!PageUptodate(page));

			if (atomic_long_dec_return(&fsc->writeback_count) <
			     CONGESTION_OFF_THRESH(
					fsc->mount_options->congestion_kb))
671
				clear_bdi_congested(inode_to_bdi(inode),
Y
Yan, Zheng 已提交
672 673
						    BLK_RW_ASYNC);

674
			ceph_put_snap_context(detach_page_private(page));
Y
Yan, Zheng 已提交
675
			end_page_writeback(page);
676
			dout("unlocking %p\n", page);
Y
Yan, Zheng 已提交
677 678 679 680 681 682 683 684 685

			if (remove_page)
				generic_error_remove_page(inode->i_mapping,
							  page);

			unlock_page(page);
		}
		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
686

687
		release_pages(osd_data->pages, num_pages);
S
Sage Weil 已提交
688 689
	}

X
Xiubo Li 已提交
690 691 692
	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, len, rc);

Y
Yan, Zheng 已提交
693 694 695
	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);

	osd_data = osd_req_op_extent_osd_data(req, 0);
696
	if (osd_data->pages_from_pool)
697
		mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
S
Sage Weil 已提交
698
	else
699
		kfree(osd_data->pages);
S
Sage Weil 已提交
700 701 702 703 704 705 706 707 708 709 710
	ceph_osdc_put_request(req);
}

/*
 * initiate async writeback
 */
static int ceph_writepages_start(struct address_space *mapping,
				 struct writeback_control *wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
711 712
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_vino vino = ceph_vino(inode);
713
	pgoff_t index, start_index, end = -1;
714
	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
S
Sage Weil 已提交
715 716
	struct pagevec pvec;
	int rc = 0;
F
Fabian Frederick 已提交
717
	unsigned int wsize = i_blocksize(inode);
S
Sage Weil 已提交
718
	struct ceph_osd_request *req = NULL;
719
	struct ceph_writeback_ctl ceph_wbc;
720
	bool should_loop, range_whole = false;
721
	bool done = false;
S
Sage Weil 已提交
722

Y
Yanhu Cao 已提交
723
	dout("writepages_start %p (mode=%s)\n", inode,
S
Sage Weil 已提交
724 725 726
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

727
	if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
728 729 730 731 732
		if (ci->i_wrbuffer_ref > 0) {
			pr_warn_ratelimited(
				"writepage_start %p %lld forced umount\n",
				inode, ceph_ino(inode));
		}
733
		mapping_set_error(mapping, -EIO);
S
Sage Weil 已提交
734 735
		return -EIO; /* we're in a forced umount, don't write! */
	}
Y
Yan, Zheng 已提交
736
	if (fsc->mount_options->wsize < wsize)
737
		wsize = fsc->mount_options->wsize;
S
Sage Weil 已提交
738

739
	pagevec_init(&pvec);
S
Sage Weil 已提交
740

741
	start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
742
	index = start_index;
S
Sage Weil 已提交
743 744 745

retry:
	/* find oldest snap context with dirty data */
746
	snapc = get_oldest_context(inode, &ceph_wbc, NULL);
S
Sage Weil 已提交
747 748 749 750 751 752 753 754
	if (!snapc) {
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		dout(" no snap context with dirty data?\n");
		goto out;
	}
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);
755

756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
	should_loop = false;
	if (ceph_wbc.head_snapc && snapc != last_snapc) {
		/* where to start/end? */
		if (wbc->range_cyclic) {
			index = start_index;
			end = -1;
			if (index > 0)
				should_loop = true;
			dout(" cyclic, start at %lu\n", index);
		} else {
			index = wbc->range_start >> PAGE_SHIFT;
			end = wbc->range_end >> PAGE_SHIFT;
			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
				range_whole = true;
			dout(" not cyclic, %lu to %lu\n", index, end);
		}
	} else if (!ceph_wbc.head_snapc) {
		/* Do not respect wbc->range_{start,end}. Dirty pages
		 * in that range can be associated with newer snapc.
		 * They are not writeable until we write all dirty pages
		 * associated with 'snapc' get written */
777
		if (index > 0)
778 779
			should_loop = true;
		dout(" non-head snapc, range whole\n");
S
Sage Weil 已提交
780
	}
781 782

	ceph_put_snap_context(last_snapc);
S
Sage Weil 已提交
783 784
	last_snapc = snapc;

785
	while (!done && index <= end) {
Y
Yan, Zheng 已提交
786
		int num_ops = 0, op_idx;
787
		unsigned i, pvec_pages, max_pages, locked_pages = 0;
Y
Yan, Zheng 已提交
788
		struct page **pages = NULL, **data_pages;
S
Sage Weil 已提交
789
		struct page *page;
790
		pgoff_t strip_unit_end = 0;
Y
Yan, Zheng 已提交
791
		u64 offset = 0, len = 0;
792
		bool from_pool = false;
S
Sage Weil 已提交
793

794
		max_pages = wsize >> PAGE_SHIFT;
S
Sage Weil 已提交
795 796

get_more_pages:
797 798
		pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
						end, PAGECACHE_TAG_DIRTY);
J
Jan Kara 已提交
799
		dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
S
Sage Weil 已提交
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
		if (!pvec_pages && !locked_pages)
			break;
		for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
			page = pvec.pages[i];
			dout("? %p idx %lu\n", page, page->index);
			if (locked_pages == 0)
				lock_page(page);  /* first page */
			else if (!trylock_page(page))
				break;

			/* only dirty pages, or our accounting breaks */
			if (unlikely(!PageDirty(page)) ||
			    unlikely(page->mapping != mapping)) {
				dout("!dirty or !mapping %p\n", page);
				unlock_page(page);
815
				continue;
S
Sage Weil 已提交
816
			}
817 818 819 820 821
			/* only if matching snap context */
			pgsnapc = page_snap_context(page);
			if (pgsnapc != snapc) {
				dout("page snapc %p %lld != oldest %p %lld\n",
				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
822 823 824 825
				if (!should_loop &&
				    !ceph_wbc.head_snapc &&
				    wbc->sync_mode != WB_SYNC_NONE)
					should_loop = true;
S
Sage Weil 已提交
826
				unlock_page(page);
827
				continue;
S
Sage Weil 已提交
828
			}
829 830 831
			if (page_offset(page) >= ceph_wbc.i_size) {
				dout("%p page eof %llu\n",
				     page, ceph_wbc.i_size);
832 833 834
				if ((ceph_wbc.size_stable ||
				    page_offset(page) >= i_size_read(inode)) &&
				    clear_page_dirty_for_io(page))
835
					mapping->a_ops->invalidatepage(page,
836
								0, thp_size(page));
837 838 839 840 841
				unlock_page(page);
				continue;
			}
			if (strip_unit_end && (page->index > strip_unit_end)) {
				dout("end of strip unit %p\n", page);
S
Sage Weil 已提交
842 843 844 845
				unlock_page(page);
				break;
			}
			if (PageWriteback(page)) {
846 847 848 849 850 851 852
				if (wbc->sync_mode == WB_SYNC_NONE) {
					dout("%p under writeback\n", page);
					unlock_page(page);
					continue;
				}
				dout("waiting on writeback %p\n", page);
				wait_on_page_writeback(page);
S
Sage Weil 已提交
853 854 855 856 857
			}

			if (!clear_page_dirty_for_io(page)) {
				dout("%p !clear_page_dirty_for_io\n", page);
				unlock_page(page);
858
				continue;
S
Sage Weil 已提交
859 860
			}

861 862 863
			/*
			 * We have something to write.  If this is
			 * the first locked page this time through,
Y
Yan, Zheng 已提交
864 865
			 * calculate max possinle write size and
			 * allocate a page array
866
			 */
S
Sage Weil 已提交
867
			if (locked_pages == 0) {
Y
Yan, Zheng 已提交
868 869
				u64 objnum;
				u64 objoff;
870
				u32 xlen;
Y
Yan, Zheng 已提交
871

S
Sage Weil 已提交
872
				/* prepare async write request */
873
				offset = (u64)page_offset(page);
874 875 876 877 878
				ceph_calc_file_object_mapping(&ci->i_layout,
							      offset, wsize,
							      &objnum, &objoff,
							      &xlen);
				len = xlen;
879

Y
Yanhu Cao 已提交
880
				num_ops = 1;
Y
Yan, Zheng 已提交
881
				strip_unit_end = page->index +
882
					((len - 1) >> PAGE_SHIFT);
A
Alex Elder 已提交
883

Y
Yan, Zheng 已提交
884
				BUG_ON(pages);
A
Alex Elder 已提交
885
				max_pages = calc_pages_for(0, (u64)len);
886 887 888
				pages = kmalloc_array(max_pages,
						      sizeof(*pages),
						      GFP_NOFS);
A
Alex Elder 已提交
889
				if (!pages) {
890 891
					from_pool = true;
					pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
892
					BUG_ON(!pages);
A
Alex Elder 已提交
893
				}
Y
Yan, Zheng 已提交
894 895 896

				len = 0;
			} else if (page->index !=
897
				   (offset + len) >> PAGE_SHIFT) {
898 899
				if (num_ops >= (from_pool ?  CEPH_OSD_SLAB_OPS :
							     CEPH_OSD_MAX_OPS)) {
Y
Yan, Zheng 已提交
900 901 902 903 904 905 906 907
					redirty_page_for_writepage(wbc, page);
					unlock_page(page);
					break;
				}

				num_ops++;
				offset = (u64)page_offset(page);
				len = 0;
S
Sage Weil 已提交
908 909 910 911 912
			}

			/* note position of first page in pvec */
			dout("%p will write page %p idx %lu\n",
			     inode, page, page->index);
Y
Yehuda Sadeh 已提交
913

Y
Yan, Zheng 已提交
914 915
			if (atomic_long_inc_return(&fsc->writeback_count) >
			    CONGESTION_ON_THRESH(
916
				    fsc->mount_options->congestion_kb)) {
917
				set_bdi_congested(inode_to_bdi(inode),
S
Sage Weil 已提交
918
						  BLK_RW_ASYNC);
Y
Yehuda Sadeh 已提交
919 920
			}

921 922 923 924

			pages[locked_pages++] = page;
			pvec.pages[i] = NULL;

925
			len += thp_size(page);
S
Sage Weil 已提交
926 927 928 929 930 931
		}

		/* did we get anything? */
		if (!locked_pages)
			goto release_pvec_pages;
		if (i) {
932 933 934 935 936 937 938 939 940 941
			unsigned j, n = 0;
			/* shift unused page to beginning of pvec */
			for (j = 0; j < pvec_pages; j++) {
				if (!pvec.pages[j])
					continue;
				if (n < j)
					pvec.pages[n] = pvec.pages[j];
				n++;
			}
			pvec.nr = n;
S
Sage Weil 已提交
942 943 944 945

			if (pvec_pages && i == pvec_pages &&
			    locked_pages < max_pages) {
				dout("reached end pvec, trying for more\n");
946
				pagevec_release(&pvec);
S
Sage Weil 已提交
947 948 949 950
				goto get_more_pages;
			}
		}

Y
Yan, Zheng 已提交
951
new_request:
952
		offset = page_offset(pages[0]);
Y
Yan, Zheng 已提交
953 954 955 956 957
		len = wsize;

		req = ceph_osdc_new_request(&fsc->client->osdc,
					&ci->i_layout, vino,
					offset, &len, 0, num_ops,
958 959 960
					CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
					snapc, ceph_wbc.truncate_seq,
					ceph_wbc.truncate_size, false);
Y
Yan, Zheng 已提交
961 962 963 964 965 966 967
		if (IS_ERR(req)) {
			req = ceph_osdc_new_request(&fsc->client->osdc,
						&ci->i_layout, vino,
						offset, &len, 0,
						min(num_ops,
						    CEPH_OSD_SLAB_OPS),
						CEPH_OSD_OP_WRITE,
968
						CEPH_OSD_FLAG_WRITE,
969 970
						snapc, ceph_wbc.truncate_seq,
						ceph_wbc.truncate_size, true);
Y
Yan, Zheng 已提交
971
			BUG_ON(IS_ERR(req));
Y
Yan, Zheng 已提交
972
		}
Y
Yan, Zheng 已提交
973
		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
974
			     thp_size(page) - offset);
Y
Yan, Zheng 已提交
975 976 977

		req->r_callback = writepages_finish;
		req->r_inode = inode;
S
Sage Weil 已提交
978

Y
Yan, Zheng 已提交
979 980 981 982 983 984 985
		/* Format the osd request message and submit the write */
		len = 0;
		data_pages = pages;
		op_idx = 0;
		for (i = 0; i < locked_pages; i++) {
			u64 cur_offset = page_offset(pages[i]);
			if (offset + len != cur_offset) {
Y
Yanhu Cao 已提交
986
				if (op_idx + 1 == req->r_num_ops)
Y
Yan, Zheng 已提交
987 988 989 990 991 992 993
					break;
				osd_req_op_extent_dup_last(req, op_idx,
							   cur_offset - offset);
				dout("writepages got pages at %llu~%llu\n",
				     offset, len);
				osd_req_op_extent_osd_data_pages(req, op_idx,
							data_pages, len, 0,
994
							from_pool, false);
Y
Yan, Zheng 已提交
995
				osd_req_op_extent_update(req, op_idx, len);
996

Y
Yan, Zheng 已提交
997 998 999 1000 1001 1002 1003
				len = 0;
				offset = cur_offset; 
				data_pages = pages + i;
				op_idx++;
			}

			set_page_writeback(pages[i]);
1004
			len += thp_size(page);
Y
Yan, Zheng 已提交
1005 1006
		}

1007 1008
		if (ceph_wbc.size_stable) {
			len = min(len, ceph_wbc.i_size - offset);
Y
Yan, Zheng 已提交
1009 1010 1011 1012
		} else if (i == locked_pages) {
			/* writepages_finish() clears writeback pages
			 * according to the data length, so make sure
			 * data length covers all locked pages */
1013
			u64 min_len = len + 1 - thp_size(page);
1014 1015
			len = get_writepages_data_length(inode, pages[i - 1],
							 offset);
Y
Yan, Zheng 已提交
1016 1017 1018
			len = max(len, min_len);
		}
		dout("writepages got pages at %llu~%llu\n", offset, len);
1019

Y
Yan, Zheng 已提交
1020
		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1021
						 0, from_pool, false);
Y
Yan, Zheng 已提交
1022
		osd_req_op_extent_update(req, op_idx, len);
1023

Y
Yan, Zheng 已提交
1024 1025
		BUG_ON(op_idx + 1 != req->r_num_ops);

1026
		from_pool = false;
Y
Yan, Zheng 已提交
1027 1028 1029 1030 1031 1032 1033
		if (i < locked_pages) {
			BUG_ON(num_ops <= req->r_num_ops);
			num_ops -= req->r_num_ops;
			locked_pages -= i;

			/* allocate new pages array for next request */
			data_pages = pages;
1034 1035
			pages = kmalloc_array(locked_pages, sizeof(*pages),
					      GFP_NOFS);
Y
Yan, Zheng 已提交
1036
			if (!pages) {
1037 1038
				from_pool = true;
				pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
Y
Yan, Zheng 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
				BUG_ON(!pages);
			}
			memcpy(pages, data_pages + i,
			       locked_pages * sizeof(*pages));
			memset(data_pages + i, 0,
			       locked_pages * sizeof(*pages));
		} else {
			BUG_ON(num_ops != req->r_num_ops);
			index = pages[i - 1]->index + 1;
			/* request message now owns the pages array */
			pages = NULL;
		}
1051

1052
		req->r_mtime = inode->i_mtime;
1053 1054
		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
		BUG_ON(rc);
S
Sage Weil 已提交
1055 1056
		req = NULL;

Y
Yan, Zheng 已提交
1057 1058 1059 1060
		wbc->nr_to_write -= i;
		if (pages)
			goto new_request;

1061 1062 1063 1064 1065 1066 1067
		/*
		 * We stop writing back only if we are not doing
		 * integrity sync. In case of integrity sync we have to
		 * keep going until we have written all the pages
		 * we tagged for writeback prior to entering this loop.
		 */
		if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
1068
			done = true;
S
Sage Weil 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078

release_pvec_pages:
		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
		     pvec.nr ? pvec.pages[0] : NULL);
		pagevec_release(&pvec);
	}

	if (should_loop && !done) {
		/* more to do; loop back to beginning of file */
		dout("writepages looping back to beginning of file\n");
1079
		end = start_index - 1; /* OK even when start_index == 0 */
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090

		/* to write dirty pages associated with next snapc,
		 * we need to wait until current writes complete */
		if (wbc->sync_mode != WB_SYNC_NONE &&
		    start_index == 0 && /* all dirty pages were checked */
		    !ceph_wbc.head_snapc) {
			struct page *page;
			unsigned i, nr;
			index = 0;
			while ((index <= end) &&
			       (nr = pagevec_lookup_tag(&pvec, mapping, &index,
1091
						PAGECACHE_TAG_WRITEBACK))) {
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
				for (i = 0; i < nr; i++) {
					page = pvec.pages[i];
					if (page_snap_context(page) != snapc)
						continue;
					wait_on_page_writeback(page);
				}
				pagevec_release(&pvec);
				cond_resched();
			}
		}

1103
		start_index = 0;
S
Sage Weil 已提交
1104 1105 1106 1107 1108 1109 1110 1111
		index = 0;
		goto retry;
	}

	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
		mapping->writeback_index = index;

out:
1112
	ceph_osdc_put_request(req);
1113 1114
	ceph_put_snap_context(last_snapc);
	dout("writepages dend - startone, rc = %d\n", rc);
S
Sage Weil 已提交
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
	return rc;
}



/*
 * See if a given @snapc is either writeable, or already written.
 */
static int context_is_writeable_or_written(struct inode *inode,
					   struct ceph_snap_context *snapc)
{
1126
	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
1127 1128 1129 1130
	int ret = !oldest || snapc->seq <= oldest->seq;

	ceph_put_snap_context(oldest);
	return ret;
S
Sage Weil 已提交
1131 1132
}

1133 1134 1135
/**
 * ceph_find_incompatible - find an incompatible context and return it
 * @page: page being dirtied
1136
 *
1137 1138 1139 1140 1141 1142
 * We are only allowed to write into/dirty a page if the page is
 * clean, or already dirty within the same snap context. Returns a
 * conflicting context if there is one, NULL if there isn't, or a
 * negative error code on other errors.
 *
 * Must be called with page lock held.
S
Sage Weil 已提交
1143
 */
1144
static struct ceph_snap_context *
1145
ceph_find_incompatible(struct page *page)
S
Sage Weil 已提交
1146
{
1147
	struct inode *inode = page->mapping->host;
1148
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
S
Sage Weil 已提交
1149 1150
	struct ceph_inode_info *ci = ceph_inode(inode);

1151
	if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1152
		dout(" page %p forced umount\n", page);
1153
		return ERR_PTR(-EIO);
1154 1155
	}

1156 1157 1158 1159 1160 1161 1162 1163
	for (;;) {
		struct ceph_snap_context *snapc, *oldest;

		wait_on_page_writeback(page);

		snapc = page_snap_context(page);
		if (!snapc || snapc == ci->i_head_snapc)
			break;
S
Sage Weil 已提交
1164 1165 1166 1167 1168

		/*
		 * this page is already dirty in another (older) snap
		 * context!  is it writeable now?
		 */
1169
		oldest = get_oldest_context(inode, NULL, NULL);
1170
		if (snapc->seq > oldest->seq) {
1171
			/* not writeable -- return it for the caller to deal with */
1172
			ceph_put_snap_context(oldest);
1173 1174
			dout(" page %p snapc %p not current or oldest\n", page, snapc);
			return ceph_get_snap_context(snapc);
S
Sage Weil 已提交
1175
		}
1176
		ceph_put_snap_context(oldest);
S
Sage Weil 已提交
1177 1178

		/* yay, writeable, do it now (without dropping page lock) */
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
		dout(" page %p snapc %p not current, but oldest\n", page, snapc);
		if (clear_page_dirty_for_io(page)) {
			int r = writepage_nounlock(page, NULL);
			if (r < 0)
				return ERR_PTR(r);
		}
	}
	return NULL;
}

1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
					struct page *page, void **_fsdata)
{
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc;

	snapc = ceph_find_incompatible(page);
	if (snapc) {
		int r;

		unlock_page(page);
		put_page(page);
		if (IS_ERR(snapc))
			return PTR_ERR(snapc);

		ceph_queue_writeback(inode);
		r = wait_event_killable(ci->i_cap_wq,
					context_is_writeable_or_written(inode, snapc));
		ceph_put_snap_context(snapc);
		return r == 0 ? -EAGAIN : r;
	}
	return 0;
}

1214 1215 1216 1217
/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
 */
1218 1219 1220
static int ceph_write_begin(struct file *file, struct address_space *mapping,
			    loff_t pos, unsigned len, unsigned flags,
			    struct page **pagep, void **fsdata)
1221 1222 1223
{
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
1224 1225
	struct page *page = NULL;
	pgoff_t index = pos >> PAGE_SHIFT;
1226
	int r;
S
Sage Weil 已提交
1227

1228 1229 1230 1231 1232
	/*
	 * Uninlining should have already been done and everything updated, EXCEPT
	 * for inline_version sent to the MDS.
	 */
	if (ci->i_inline_version != CEPH_INLINE_NONE) {
1233
		page = grab_cache_page_write_begin(mapping, index, flags);
1234 1235
		if (!page)
			return -ENOMEM;
1236

1237
		/*
1238 1239
		 * The inline_version on a new inode is set to 1. If that's the
		 * case, then the page is brand new and isn't yet Uptodate.
1240
		 */
1241 1242 1243 1244 1245 1246 1247 1248
		r = 0;
		if (index == 0 && ci->i_inline_version != 1) {
			if (!PageUptodate(page)) {
				WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
					  ci->i_inline_version);
				r = -EINVAL;
			}
			goto out;
1249
		}
1250
		zero_user_segment(page, 0, thp_size(page));
1251 1252
		SetPageUptodate(page);
		goto out;
1253
	}
1254

1255 1256 1257 1258 1259
	r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
			      &ceph_netfs_read_ops, NULL);
out:
	if (r == 0)
		wait_on_page_fscache(page);
1260
	if (r < 0) {
1261
		if (page)
1262
			put_page(page);
1263
	} else {
1264
		WARN_ON_ONCE(!PageLocked(page));
1265 1266
		*pagep = page;
	}
1267 1268 1269
	return r;
}

S
Sage Weil 已提交
1270 1271
/*
 * we don't do anything in here that simple_write_end doesn't do
1272
 * except adjust dirty page accounting
S
Sage Weil 已提交
1273 1274 1275 1276 1277
 */
static int ceph_write_end(struct file *file, struct address_space *mapping,
			  loff_t pos, unsigned len, unsigned copied,
			  struct page *page, void *fsdata)
{
A
Al Viro 已提交
1278
	struct inode *inode = file_inode(file);
1279
	bool check_cap = false;
S
Sage Weil 已提交
1280 1281 1282 1283 1284

	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
	     inode, page, (int)pos, (int)copied, (int)len);

	/* zero the stale part of the page if we did a short copy */
A
Al Viro 已提交
1285 1286 1287 1288 1289 1290 1291
	if (!PageUptodate(page)) {
		if (copied < len) {
			copied = 0;
			goto out;
		}
		SetPageUptodate(page);
	}
S
Sage Weil 已提交
1292 1293

	/* did file size increase? */
1294
	if (pos+copied > i_size_read(inode))
S
Sage Weil 已提交
1295 1296 1297 1298
		check_cap = ceph_inode_set_size(inode, pos+copied);

	set_page_dirty(page);

A
Al Viro 已提交
1299
out:
S
Sage Weil 已提交
1300
	unlock_page(page);
1301
	put_page(page);
S
Sage Weil 已提交
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313

	if (check_cap)
		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);

	return copied;
}

/*
 * we set .direct_IO to indicate direct io is supported, but since we
 * intercept O_DIRECT reads and writes early, this function should
 * never get called.
 */
1314
static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
S
Sage Weil 已提交
1315 1316 1317 1318 1319 1320 1321
{
	WARN_ON(1);
	return -EINVAL;
}

const struct address_space_operations ceph_aops = {
	.readpage = ceph_readpage,
1322
	.readahead = ceph_readahead,
S
Sage Weil 已提交
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
	.writepage = ceph_writepage,
	.writepages = ceph_writepages_start,
	.write_begin = ceph_write_begin,
	.write_end = ceph_write_end,
	.set_page_dirty = ceph_set_page_dirty,
	.invalidatepage = ceph_invalidatepage,
	.releasepage = ceph_releasepage,
	.direct_IO = ceph_direct_io,
};

1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
static void ceph_block_sigs(sigset_t *oldset)
{
	sigset_t mask;
	siginitsetinv(&mask, sigmask(SIGKILL));
	sigprocmask(SIG_BLOCK, &mask, oldset);
}

static void ceph_restore_sigs(sigset_t *oldset)
{
	sigprocmask(SIG_SETMASK, oldset, NULL);
}
S
Sage Weil 已提交
1344 1345 1346 1347

/*
 * vm ops
 */
1348
static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
1349
{
1350
	struct vm_area_struct *vma = vmf->vma;
1351 1352 1353
	struct inode *inode = file_inode(vma->vm_file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
1354
	loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
1355
	int want, got, err;
1356
	sigset_t oldset;
1357
	vm_fault_t ret = VM_FAULT_SIGBUS;
1358 1359

	ceph_block_sigs(&oldset);
1360

1361 1362
	dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
	     inode, ceph_vinop(inode), off);
1363 1364 1365 1366
	if (fi->fmode & CEPH_FILE_MODE_LAZY)
		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
	else
		want = CEPH_CAP_FILE_CACHE;
1367 1368

	got = 0;
1369
	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got);
1370
	if (err < 0)
1371
		goto out_restore;
1372

1373 1374
	dout("filemap_fault %p %llu got cap refs on %s\n",
	     inode, off, ceph_cap_string(got));
1375

Y
Yan, Zheng 已提交
1376
	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1377
	    ci->i_inline_version == CEPH_INLINE_NONE) {
1378 1379
		CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
		ceph_add_rw_context(fi, &rw_ctx);
1380
		ret = filemap_fault(vmf);
1381
		ceph_del_rw_context(fi, &rw_ctx);
1382 1383
		dout("filemap_fault %p %llu drop cap refs %s ret %x\n",
		     inode, off, ceph_cap_string(got), ret);
1384
	} else
1385
		err = -EAGAIN;
1386 1387 1388

	ceph_put_cap_refs(ci, got);

1389
	if (err != -EAGAIN)
1390
		goto out_restore;
Y
Yan, Zheng 已提交
1391 1392

	/* read inline data */
1393
	if (off >= PAGE_SIZE) {
Y
Yan, Zheng 已提交
1394 1395 1396 1397 1398
		/* does not support inline data > PAGE_SIZE */
		ret = VM_FAULT_SIGBUS;
	} else {
		struct address_space *mapping = inode->i_mapping;
		struct page *page = find_or_create_page(mapping, 0,
1399 1400
						mapping_gfp_constraint(mapping,
						~__GFP_FS));
Y
Yan, Zheng 已提交
1401 1402
		if (!page) {
			ret = VM_FAULT_OOM;
1403
			goto out_inline;
Y
Yan, Zheng 已提交
1404
		}
1405
		err = __ceph_do_getattr(inode, page,
Y
Yan, Zheng 已提交
1406
					 CEPH_STAT_CAP_INLINE_DATA, true);
1407
		if (err < 0 || off >= i_size_read(inode)) {
Y
Yan, Zheng 已提交
1408
			unlock_page(page);
1409
			put_page(page);
1410
			ret = vmf_error(err);
1411
			goto out_inline;
Y
Yan, Zheng 已提交
1412
		}
1413 1414
		if (err < PAGE_SIZE)
			zero_user_segment(page, err, PAGE_SIZE);
Y
Yan, Zheng 已提交
1415 1416 1417 1418 1419
		else
			flush_dcache_page(page);
		SetPageUptodate(page);
		vmf->page = page;
		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1420
out_inline:
1421 1422
		dout("filemap_fault %p %llu read inline data ret %x\n",
		     inode, off, ret);
Y
Yan, Zheng 已提交
1423
	}
1424 1425
out_restore:
	ceph_restore_sigs(&oldset);
1426 1427
	if (err < 0)
		ret = vmf_error(err);
1428

1429 1430
	return ret;
}
S
Sage Weil 已提交
1431

1432
static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
S
Sage Weil 已提交
1433
{
1434
	struct vm_area_struct *vma = vmf->vma;
A
Al Viro 已提交
1435
	struct inode *inode = file_inode(vma->vm_file);
1436 1437
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
1438
	struct ceph_cap_flush *prealloc_cf;
1439
	struct page *page = vmf->page;
1440
	loff_t off = page_offset(page);
1441 1442
	loff_t size = i_size_read(inode);
	size_t len;
1443
	int want, got, err;
1444
	sigset_t oldset;
1445
	vm_fault_t ret = VM_FAULT_SIGBUS;
1446

1447 1448
	prealloc_cf = ceph_alloc_cap_flush();
	if (!prealloc_cf)
1449
		return VM_FAULT_OOM;
1450

1451
	sb_start_pagefault(inode->i_sb);
1452
	ceph_block_sigs(&oldset);
1453

1454 1455 1456 1457 1458 1459
	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		struct page *locked_page = NULL;
		if (off == 0) {
			lock_page(page);
			locked_page = page;
		}
1460
		err = ceph_uninline_data(vma->vm_file, locked_page);
1461 1462
		if (locked_page)
			unlock_page(locked_page);
1463
		if (err < 0)
1464
			goto out_free;
1465 1466
	}

1467 1468
	if (off + thp_size(page) <= size)
		len = thp_size(page);
S
Sage Weil 已提交
1469
	else
1470
		len = offset_in_thp(page, size);
S
Sage Weil 已提交
1471

1472 1473 1474 1475 1476 1477
	dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
	     inode, ceph_vinop(inode), off, len, size);
	if (fi->fmode & CEPH_FILE_MODE_LAZY)
		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
	else
		want = CEPH_CAP_FILE_BUFFER;
1478 1479

	got = 0;
1480
	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
1481
	if (err < 0)
1482
		goto out_free;
1483

1484 1485 1486 1487 1488
	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
	     inode, off, len, ceph_cap_string(got));

	/* Update time before taking page lock */
	file_update_time(vma->vm_file);
1489
	inode_inc_iversion_raw(inode);
1490

1491
	do {
1492 1493
		struct ceph_snap_context *snapc;

1494
		lock_page(page);
1495

1496
		if (page_mkwrite_check_truncate(page, inode) < 0) {
1497 1498 1499 1500 1501
			unlock_page(page);
			ret = VM_FAULT_NOPAGE;
			break;
		}

1502 1503
		snapc = ceph_find_incompatible(page);
		if (!snapc) {
1504 1505 1506
			/* success.  we'll keep the page locked. */
			set_page_dirty(page);
			ret = VM_FAULT_LOCKED;
1507 1508 1509 1510 1511 1512 1513 1514
			break;
		}

		unlock_page(page);

		if (IS_ERR(snapc)) {
			ret = VM_FAULT_SIGBUS;
			break;
1515
		}
1516 1517 1518 1519 1520 1521

		ceph_queue_writeback(inode);
		err = wait_event_killable(ci->i_cap_wq,
				context_is_writeable_or_written(inode, snapc));
		ceph_put_snap_context(snapc);
	} while (err == 0);
1522

1523 1524
	if (ret == VM_FAULT_LOCKED ||
	    ci->i_inline_version != CEPH_INLINE_NONE) {
1525 1526
		int dirty;
		spin_lock(&ci->i_ceph_lock);
1527
		ci->i_inline_version = CEPH_INLINE_NONE;
1528 1529
		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
					       &prealloc_cf);
1530 1531 1532 1533 1534
		spin_unlock(&ci->i_ceph_lock);
		if (dirty)
			__mark_inode_dirty(inode, dirty);
	}

1535
	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
1536
	     inode, off, len, ceph_cap_string(got), ret);
1537
	ceph_put_cap_refs_async(ci, got);
1538
out_free:
1539
	ceph_restore_sigs(&oldset);
1540
	sb_end_pagefault(inode->i_sb);
1541
	ceph_free_cap_flush(prealloc_cf);
1542 1543
	if (err < 0)
		ret = vmf_error(err);
S
Sage Weil 已提交
1544 1545 1546
	return ret;
}

Y
Yan, Zheng 已提交
1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558
void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
			   char	*data, size_t len)
{
	struct address_space *mapping = inode->i_mapping;
	struct page *page;

	if (locked_page) {
		page = locked_page;
	} else {
		if (i_size_read(inode) == 0)
			return;
		page = find_or_create_page(mapping, 0,
1559 1560
					   mapping_gfp_constraint(mapping,
					   ~__GFP_FS));
Y
Yan, Zheng 已提交
1561 1562 1563 1564
		if (!page)
			return;
		if (PageUptodate(page)) {
			unlock_page(page);
1565
			put_page(page);
Y
Yan, Zheng 已提交
1566 1567 1568 1569
			return;
		}
	}

1570
	dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
Y
Yan, Zheng 已提交
1571 1572 1573 1574 1575 1576 1577 1578 1579
	     inode, ceph_vinop(inode), len, locked_page);

	if (len > 0) {
		void *kaddr = kmap_atomic(page);
		memcpy(kaddr, data, len);
		kunmap_atomic(kaddr);
	}

	if (page != locked_page) {
1580 1581
		if (len < PAGE_SIZE)
			zero_user_segment(page, len, PAGE_SIZE);
Y
Yan, Zheng 已提交
1582 1583 1584 1585 1586
		else
			flush_dcache_page(page);

		SetPageUptodate(page);
		unlock_page(page);
1587
		put_page(page);
Y
Yan, Zheng 已提交
1588 1589 1590
	}
}

1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
int ceph_uninline_data(struct file *filp, struct page *locked_page)
{
	struct inode *inode = file_inode(filp);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_osd_request *req;
	struct page *page = NULL;
	u64 len, inline_version;
	int err = 0;
	bool from_pagecache = false;

	spin_lock(&ci->i_ceph_lock);
	inline_version = ci->i_inline_version;
	spin_unlock(&ci->i_ceph_lock);

	dout("uninline_data %p %llx.%llx inline_version %llu\n",
	     inode, ceph_vinop(inode), inline_version);

	if (inline_version == 1 || /* initial version, no data */
	    inline_version == CEPH_INLINE_NONE)
		goto out;

	if (locked_page) {
		page = locked_page;
		WARN_ON(!PageUptodate(page));
	} else if (ceph_caps_issued(ci) &
		   (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
		page = find_get_page(inode->i_mapping, 0);
		if (page) {
			if (PageUptodate(page)) {
				from_pagecache = true;
				lock_page(page);
			} else {
1624
				put_page(page);
1625 1626 1627 1628 1629 1630 1631
				page = NULL;
			}
		}
	}

	if (page) {
		len = i_size_read(inode);
1632 1633
		if (len > PAGE_SIZE)
			len = PAGE_SIZE;
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
	} else {
		page = __page_cache_alloc(GFP_NOFS);
		if (!page) {
			err = -ENOMEM;
			goto out;
		}
		err = __ceph_do_getattr(inode, page,
					CEPH_STAT_CAP_INLINE_DATA, true);
		if (err < 0) {
			/* no inline data */
			if (err == -ENODATA)
				err = 0;
			goto out;
		}
		len = err;
	}

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
				    ceph_vino(inode), 0, &len, 0, 1,
1653
				    CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
I
Ilya Dryomov 已提交
1654
				    NULL, 0, 0, false);
1655 1656 1657 1658 1659
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}

1660
	req->r_mtime = inode->i_mtime;
1661 1662 1663 1664 1665 1666 1667 1668 1669
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
	ceph_osdc_put_request(req);
	if (err < 0)
		goto out;

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
				    ceph_vino(inode), 0, &len, 1, 3,
1670
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
I
Ilya Dryomov 已提交
1671 1672
				    NULL, ci->i_truncate_seq,
				    ci->i_truncate_size, false);
1673 1674 1675 1676 1677 1678 1679
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}

	osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);

Y
Yan, Zheng 已提交
1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700
	{
		__le64 xattr_buf = cpu_to_le64(inline_version);
		err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
					    "inline_version", &xattr_buf,
					    sizeof(xattr_buf),
					    CEPH_OSD_CMPXATTR_OP_GT,
					    CEPH_OSD_CMPXATTR_MODE_U64);
		if (err)
			goto out_put;
	}

	{
		char xattr_buf[32];
		int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
					 "%llu", inline_version);
		err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
					    "inline_version",
					    xattr_buf, xattr_len, 0, 0);
		if (err)
			goto out_put;
	}
1701

1702
	req->r_mtime = inode->i_mtime;
1703 1704 1705
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1706

X
Xiubo Li 已提交
1707
	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
X
Xiubo Li 已提交
1708
				  req->r_end_latency, len, err);
1709

1710 1711 1712 1713 1714 1715 1716 1717
out_put:
	ceph_osdc_put_request(req);
	if (err == -ECANCELED)
		err = 0;
out:
	if (page && page != locked_page) {
		if (from_pagecache) {
			unlock_page(page);
1718
			put_page(page);
1719 1720 1721 1722 1723 1724 1725 1726 1727
		} else
			__free_pages(page, 0);
	}

	dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
	     inode, ceph_vinop(inode), inline_version, err);
	return err;
}

1728
static const struct vm_operations_struct ceph_vmops = {
1729
	.fault		= ceph_filemap_fault,
S
Sage Weil 已提交
1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
	.page_mkwrite	= ceph_page_mkwrite,
};

int ceph_mmap(struct file *file, struct vm_area_struct *vma)
{
	struct address_space *mapping = file->f_mapping;

	if (!mapping->a_ops->readpage)
		return -ENOEXEC;
	file_accessed(file);
	vma->vm_ops = &ceph_vmops;
	return 0;
}
1743 1744 1745 1746 1747 1748

enum {
	POOL_READ	= 1,
	POOL_WRITE	= 2,
};

Y
Yan, Zheng 已提交
1749 1750
static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
				s64 pool, struct ceph_string *pool_ns)
1751 1752 1753 1754 1755 1756 1757
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
	struct rb_node **p, *parent;
	struct ceph_pool_perm *perm;
	struct page **pages;
Y
Yan, Zheng 已提交
1758
	size_t pool_ns_len;
1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
	int err = 0, err2 = 0, have = 0;

	down_read(&mdsc->pool_perm_rwsem);
	p = &mdsc->pool_perm_tree.rb_node;
	while (*p) {
		perm = rb_entry(*p, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
Y
Yan, Zheng 已提交
1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780
			int ret = ceph_compare_string(pool_ns,
						perm->pool_ns,
						perm->pool_ns_len);
			if (ret < 0)
				p = &(*p)->rb_left;
			else if (ret > 0)
				p = &(*p)->rb_right;
			else {
				have = perm->perm;
				break;
			}
1781 1782 1783 1784 1785 1786
		}
	}
	up_read(&mdsc->pool_perm_rwsem);
	if (*p)
		goto out;

Y
Yan, Zheng 已提交
1787 1788 1789 1790 1791
	if (pool_ns)
		dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
		     pool, (int)pool_ns->len, pool_ns->str);
	else
		dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
1792 1793

	down_write(&mdsc->pool_perm_rwsem);
Y
Yan, Zheng 已提交
1794
	p = &mdsc->pool_perm_tree.rb_node;
1795 1796 1797 1798 1799 1800 1801 1802 1803
	parent = NULL;
	while (*p) {
		parent = *p;
		perm = rb_entry(parent, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
Y
Yan, Zheng 已提交
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814
			int ret = ceph_compare_string(pool_ns,
						perm->pool_ns,
						perm->pool_ns_len);
			if (ret < 0)
				p = &(*p)->rb_left;
			else if (ret > 0)
				p = &(*p)->rb_right;
			else {
				have = perm->perm;
				break;
			}
1815 1816 1817 1818 1819 1820 1821
		}
	}
	if (*p) {
		up_write(&mdsc->pool_perm_rwsem);
		goto out;
	}

I
Ilya Dryomov 已提交
1822
	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1823 1824 1825 1826 1827 1828 1829 1830 1831
					 1, false, GFP_NOFS);
	if (!rd_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	rd_req->r_flags = CEPH_OSD_FLAG_READ;
	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
	rd_req->r_base_oloc.pool = pool;
Y
Yan, Zheng 已提交
1832 1833
	if (pool_ns)
		rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
1834
	ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
1835

1836 1837 1838
	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
	if (err)
		goto out_unlock;
1839

I
Ilya Dryomov 已提交
1840
	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1841 1842 1843 1844 1845 1846
					 1, false, GFP_NOFS);
	if (!wr_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

1847
	wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
1848
	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1849
	ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1850
	ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
1851

1852 1853 1854
	err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
	if (err)
		goto out_unlock;
1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866

	/* one page should be large enough for STAT data */
	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
	if (IS_ERR(pages)) {
		err = PTR_ERR(pages);
		goto out_unlock;
	}

	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
				     0, false, true);
	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);

1867
	wr_req->r_mtime = ci->vfs_inode.i_mtime;
1868 1869 1870 1871 1872 1873 1874 1875 1876
	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);

	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
	if (!err2)
		err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);

	if (err >= 0 || err == -ENOENT)
		have |= POOL_READ;
1877
	else if (err != -EPERM) {
1878 1879
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
1880
		goto out_unlock;
1881
	}
1882 1883 1884 1885

	if (err2 == 0 || err2 == -EEXIST)
		have |= POOL_WRITE;
	else if (err2 != -EPERM) {
1886 1887
		if (err2 == -EBLOCKLISTED)
			fsc->blocklisted = true;
1888 1889 1890 1891
		err = err2;
		goto out_unlock;
	}

Y
Yan, Zheng 已提交
1892 1893
	pool_ns_len = pool_ns ? pool_ns->len : 0;
	perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
1894 1895 1896 1897 1898 1899 1900
	if (!perm) {
		err = -ENOMEM;
		goto out_unlock;
	}

	perm->pool = pool;
	perm->perm = have;
Y
Yan, Zheng 已提交
1901 1902 1903 1904 1905
	perm->pool_ns_len = pool_ns_len;
	if (pool_ns_len > 0)
		memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
	perm->pool_ns[pool_ns_len] = 0;

1906 1907 1908 1909 1910 1911
	rb_link_node(&perm->node, parent, p);
	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
	err = 0;
out_unlock:
	up_write(&mdsc->pool_perm_rwsem);

1912 1913
	ceph_osdc_put_request(rd_req);
	ceph_osdc_put_request(wr_req);
1914 1915 1916
out:
	if (!err)
		err = have;
Y
Yan, Zheng 已提交
1917 1918 1919 1920 1921
	if (pool_ns)
		dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
		     pool, (int)pool_ns->len, pool_ns->str, err);
	else
		dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
1922 1923 1924
	return err;
}

Y
Yan, Zheng 已提交
1925
int ceph_pool_perm_check(struct inode *inode, int need)
1926
{
Y
Yan, Zheng 已提交
1927
	struct ceph_inode_info *ci = ceph_inode(inode);
Y
Yan, Zheng 已提交
1928
	struct ceph_string *pool_ns;
Y
Yan, Zheng 已提交
1929
	s64 pool;
1930 1931
	int ret, flags;

1932 1933 1934 1935
	/* Only need to do this for regular files */
	if (!S_ISREG(inode->i_mode))
		return 0;

1936 1937 1938 1939 1940 1941 1942 1943 1944
	if (ci->i_vino.snap != CEPH_NOSNAP) {
		/*
		 * Pool permission check needs to write to the first object.
		 * But for snapshot, head of the first object may have alread
		 * been deleted. Skip check to avoid creating orphan object.
		 */
		return 0;
	}

Y
Yan, Zheng 已提交
1945
	if (ceph_test_mount_opt(ceph_inode_to_client(inode),
1946 1947 1948 1949 1950
				NOPOOLPERM))
		return 0;

	spin_lock(&ci->i_ceph_lock);
	flags = ci->i_ceph_flags;
1951
	pool = ci->i_layout.pool_id;
1952 1953 1954 1955
	spin_unlock(&ci->i_ceph_lock);
check:
	if (flags & CEPH_I_POOL_PERM) {
		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
1956
			dout("ceph_pool_perm_check pool %lld no read perm\n",
1957 1958 1959 1960
			     pool);
			return -EPERM;
		}
		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
1961
			dout("ceph_pool_perm_check pool %lld no write perm\n",
1962 1963 1964 1965 1966 1967
			     pool);
			return -EPERM;
		}
		return 0;
	}

Y
Yan, Zheng 已提交
1968 1969 1970
	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
	ret = __ceph_pool_perm_get(ci, pool, pool_ns);
	ceph_put_string(pool_ns);
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980
	if (ret < 0)
		return ret;

	flags = CEPH_I_POOL_PERM;
	if (ret & POOL_READ)
		flags |= CEPH_I_POOL_RD;
	if (ret & POOL_WRITE)
		flags |= CEPH_I_POOL_WR;

	spin_lock(&ci->i_ceph_lock);
Y
Yan, Zheng 已提交
1981 1982 1983
	if (pool == ci->i_layout.pool_id &&
	    pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
		ci->i_ceph_flags |= flags;
1984
        } else {
1985
		pool = ci->i_layout.pool_id;
1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003
		flags = ci->i_ceph_flags;
	}
	spin_unlock(&ci->i_ceph_lock);
	goto check;
}

void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
{
	struct ceph_pool_perm *perm;
	struct rb_node *n;

	while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
		n = rb_first(&mdsc->pool_perm_tree);
		perm = rb_entry(n, struct ceph_pool_perm, node);
		rb_erase(n, &mdsc->pool_perm_tree);
		kfree(perm);
	}
}