addr.c 56.2 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
S
Sage Weil 已提交
3 4 5 6 7 8

#include <linux/backing-dev.h>
#include <linux/fs.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/writeback.h>	/* generic_writepages */
9
#include <linux/slab.h>
S
Sage Weil 已提交
10 11
#include <linux/pagevec.h>
#include <linux/task_io_accounting_ops.h>
12
#include <linux/signal.h>
13
#include <linux/iversion.h>
14
#include <linux/ktime.h>
S
Sage Weil 已提交
15 16

#include "super.h"
17
#include "mds_client.h"
18
#include "cache.h"
19
#include "metric.h"
20
#include <linux/ceph/osd_client.h>
21
#include <linux/ceph/striper.h>
S
Sage Weil 已提交
22 23 24 25 26 27 28 29 30 31 32 33

/*
 * Ceph address space ops.
 *
 * There are a few funny things going on here.
 *
 * The page->private field is used to reference a struct
 * ceph_snap_context for _every_ dirty page.  This indicates which
 * snapshot the page was logically dirtied in, and thus which snap
 * context needs to be associated with the osd write during writeback.
 *
 * Similarly, struct ceph_inode_info maintains a set of counters to
L
Lucas De Marchi 已提交
34
 * count dirty pages on the inode.  In the absence of snapshots,
S
Sage Weil 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
 *
 * When a snapshot is taken (that is, when the client receives
 * notification that a snapshot was taken), each inode with caps and
 * with dirty pages (dirty pages implies there is a cap) gets a new
 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
 * moved to capsnap->dirty. (Unless a sync write is currently in
 * progress.  In that case, the capsnap is said to be "pending", new
 * writes cannot start, and the capsnap isn't "finalized" until the
 * write completes (or fails) and a final size/mtime for the inode for
 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
 *
 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
 * we look for the first capsnap in i_cap_snaps and write out pages in
 * that snap context _only_.  Then we move on to the next capsnap,
 * eventually reaching the "live" or "head" context (i.e., pages that
 * are not yet snapped) and are writing the most recently dirtied
 * pages.
 *
 * Invalidate and so forth must take care to ensure the dirty page
 * accounting is preserved.
 */

Y
Yehuda Sadeh 已提交
59 60 61 62 63
#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
#define CONGESTION_OFF_THRESH(congestion_kb)				\
	(CONGESTION_ON_THRESH(congestion_kb) -				\
	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))

64 65 66 67 68 69
static inline struct ceph_snap_context *page_snap_context(struct page *page)
{
	if (PagePrivate(page))
		return (void *)page->private;
	return NULL;
}
S
Sage Weil 已提交
70 71 72 73 74 75 76 77 78 79 80

/*
 * Dirty a page.  Optimistically adjust accounting, on the assumption
 * that we won't race with invalidate.  If we do, readjust.
 */
static int ceph_set_page_dirty(struct page *page)
{
	struct address_space *mapping = page->mapping;
	struct inode *inode;
	struct ceph_inode_info *ci;
	struct ceph_snap_context *snapc;
81
	int ret;
S
Sage Weil 已提交
82 83 84 85

	if (unlikely(!mapping))
		return !TestSetPageDirty(page);

86
	if (PageDirty(page)) {
S
Sage Weil 已提交
87 88
		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
		     mapping->host, page, page->index);
89
		BUG_ON(!PagePrivate(page));
S
Sage Weil 已提交
90 91 92 93 94 95 96
		return 0;
	}

	inode = mapping->host;
	ci = ceph_inode(inode);

	/* dirty the head */
97
	spin_lock(&ci->i_ceph_lock);
98 99 100 101 102 103 104 105 106 107 108 109 110
	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
				list_last_entry(&ci->i_cap_snaps,
						struct ceph_cap_snap,
						ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
		capsnap->dirty_pages++;
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		++ci->i_wrbuffer_ref_head;
	}
S
Sage Weil 已提交
111
	if (ci->i_wrbuffer_ref == 0)
112
		ihold(inode);
S
Sage Weil 已提交
113 114 115 116 117 118 119
	++ci->i_wrbuffer_ref;
	dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
	     "snapc %p seq %lld (%d snaps)\n",
	     mapping->host, page, page->index,
	     ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
	     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
	     snapc, snapc->seq, snapc->num_snaps);
120
	spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
121

122 123 124 125 126 127 128
	/*
	 * Reference snap context in page->private.  Also set
	 * PagePrivate so that we get invalidatepage callback.
	 */
	BUG_ON(PagePrivate(page));
	page->private = (unsigned long)snapc;
	SetPagePrivate(page);
S
Sage Weil 已提交
129

130 131 132
	ret = __set_page_dirty_nobuffers(page);
	WARN_ON(!PageLocked(page));
	WARN_ON(!page->mapping);
S
Sage Weil 已提交
133

134
	return ret;
S
Sage Weil 已提交
135 136 137 138 139 140 141
}

/*
 * If we are truncating the full page (i.e. offset == 0), adjust the
 * dirty page counters appropriately.  Only called if there is private
 * data on the page.
 */
142 143
static void ceph_invalidatepage(struct page *page, unsigned int offset,
				unsigned int length)
S
Sage Weil 已提交
144
{
145
	struct inode *inode;
S
Sage Weil 已提交
146
	struct ceph_inode_info *ci;
147
	struct ceph_snap_context *snapc = page_snap_context(page);
S
Sage Weil 已提交
148

149
	inode = page->mapping->host;
150 151
	ci = ceph_inode(inode);

152
	if (offset != 0 || length != PAGE_SIZE) {
153 154 155 156
		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
		     inode, page, page->index, offset, length);
		return;
	}
157

158 159
	ceph_invalidate_fscache_page(inode, page);

160
	WARN_ON(!PageLocked(page));
161 162 163
	if (!PagePrivate(page))
		return;

164 165 166 167 168 169 170
	dout("%p invalidatepage %p idx %lu full dirty page\n",
	     inode, page, page->index);

	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
	ceph_put_snap_context(snapc);
	page->private = 0;
	ClearPagePrivate(page);
S
Sage Weil 已提交
171 172 173 174
}

static int ceph_releasepage(struct page *page, gfp_t g)
{
175 176
	dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
	     page, page->index, PageDirty(page) ? "" : "not ");
177 178 179 180 181 182

	/* Can we release the page from the cache? */
	if (!ceph_release_fscache_page(page, g))
		return 0;

	return !PagePrivate(page);
S
Sage Weil 已提交
183 184
}

185
/* read a single page, without unlocking it. */
Y
Yan, Zheng 已提交
186
static int ceph_do_readpage(struct file *filp, struct page *page)
S
Sage Weil 已提交
187
{
A
Al Viro 已提交
188
	struct inode *inode = file_inode(filp);
S
Sage Weil 已提交
189
	struct ceph_inode_info *ci = ceph_inode(inode);
190
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
191 192 193
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
	struct ceph_vino vino = ceph_vino(inode);
S
Sage Weil 已提交
194
	int err = 0;
Y
Yan, Zheng 已提交
195
	u64 off = page_offset(page);
196
	u64 len = PAGE_SIZE;
S
Sage Weil 已提交
197

Y
Yan, Zheng 已提交
198
	if (off >= i_size_read(inode)) {
199
		zero_user_segment(page, 0, PAGE_SIZE);
Y
Yan, Zheng 已提交
200 201 202
		SetPageUptodate(page);
		return 0;
	}
203

204 205 206 207 208 209 210
	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		/*
		 * Uptodate inline data should have been added
		 * into page cache while getting Fcr caps.
		 */
		if (off == 0)
			return -EINVAL;
211
		zero_user_segment(page, 0, PAGE_SIZE);
212 213 214
		SetPageUptodate(page);
		return 0;
	}
Y
Yan, Zheng 已提交
215 216

	err = ceph_readpage_from_fscache(inode, page);
217
	if (err == 0)
Y
Yan, Zheng 已提交
218
		return -EINPROGRESS;
219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
	dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
	     vino.ino, vino.snap, filp, off, len, page, page->index);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
				    ci->i_truncate_seq, ci->i_truncate_size,
				    false);
	if (IS_ERR(req))
		return PTR_ERR(req);

	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);

	err = ceph_osdc_start_request(osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(osdc, req);

	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
				 req->r_end_latency, err);

	ceph_osdc_put_request(req);
	dout("readpage result %d\n", err);

S
Sage Weil 已提交
241 242 243 244
	if (err == -ENOENT)
		err = 0;
	if (err < 0) {
		SetPageError(page);
245
		ceph_fscache_readpage_cancel(inode, page);
246 247
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
S
Sage Weil 已提交
248 249
		goto out;
	}
250
	if (err < PAGE_SIZE)
251
		/* zero fill remainder of page */
252
		zero_user_segment(page, err, PAGE_SIZE);
253 254
	else
		flush_dcache_page(page);
S
Sage Weil 已提交
255

256 257
	SetPageUptodate(page);
	ceph_readpage_to_fscache(inode, page);
258

S
Sage Weil 已提交
259 260 261 262 263 264
out:
	return err < 0 ? err : 0;
}

static int ceph_readpage(struct file *filp, struct page *page)
{
Y
Yan, Zheng 已提交
265 266 267 268 269
	int r = ceph_do_readpage(filp, page);
	if (r != -EINPROGRESS)
		unlock_page(page);
	else
		r = 0;
S
Sage Weil 已提交
270 271 272 273
	return r;
}

/*
S
Sage Weil 已提交
274
 * Finish an async read(ahead) op.
S
Sage Weil 已提交
275
 */
276
static void finish_read(struct ceph_osd_request *req)
S
Sage Weil 已提交
277
{
S
Sage Weil 已提交
278
	struct inode *inode = req->r_inode;
279
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
280
	struct ceph_osd_data *osd_data;
281 282
	int rc = req->r_result <= 0 ? req->r_result : 0;
	int bytes = req->r_result >= 0 ? req->r_result : 0;
283
	int num_pages;
S
Sage Weil 已提交
284
	int i;
S
Sage Weil 已提交
285

S
Sage Weil 已提交
286
	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
287 288
	if (rc == -EBLOCKLISTED)
		ceph_inode_to_client(inode)->blocklisted = true;
S
Sage Weil 已提交
289 290

	/* unlock all pages, zeroing any data we didn't read */
291
	osd_data = osd_req_op_extent_osd_data(req, 0);
292 293 294
	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
	num_pages = calc_pages_for((u64)osd_data->alignment,
					(u64)osd_data->length);
295
	for (i = 0; i < num_pages; i++) {
296
		struct page *page = osd_data->pages[i];
S
Sage Weil 已提交
297

298 299
		if (rc < 0 && rc != -ENOENT) {
			ceph_fscache_readpage_cancel(inode, page);
300
			goto unlock;
301
		}
302
		if (bytes < (int)PAGE_SIZE) {
S
Sage Weil 已提交
303 304
			/* zero (remainder of) page */
			int s = bytes < 0 ? 0 : bytes;
305
			zero_user_segment(page, s, PAGE_SIZE);
S
Sage Weil 已提交
306
		}
S
Sage Weil 已提交
307 308 309 310
 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
		     page->index);
		flush_dcache_page(page);
		SetPageUptodate(page);
311
		ceph_readpage_to_fscache(inode, page);
312
unlock:
S
Sage Weil 已提交
313
		unlock_page(page);
314 315
		put_page(page);
		bytes -= PAGE_SIZE;
S
Sage Weil 已提交
316
	}
317 318 319 320

	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
				 req->r_end_latency, rc);

321
	kfree(osd_data->pages);
S
Sage Weil 已提交
322 323 324
}

/*
S
Sage Weil 已提交
325 326
 * start an async read(ahead) operation.  return nr_pages we submitted
 * a read for on success, or negative error code.
S
Sage Weil 已提交
327
 */
328 329
static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
		      struct list_head *page_list, int max)
S
Sage Weil 已提交
330
{
331 332
	struct ceph_osd_client *osdc =
		&ceph_inode_to_client(inode)->client->osdc;
S
Sage Weil 已提交
333
	struct ceph_inode_info *ci = ceph_inode(inode);
334
	struct page *page = lru_to_page(page_list);
335
	struct ceph_vino vino;
S
Sage Weil 已提交
336 337
	struct ceph_osd_request *req;
	u64 off;
S
Sage Weil 已提交
338
	u64 len;
S
Sage Weil 已提交
339 340 341 342
	int i;
	struct page **pages;
	pgoff_t next_index;
	int nr_pages = 0;
343 344 345
	int got = 0;
	int ret = 0;

346
	if (!rw_ctx) {
347 348 349
		/* caller of readpages does not hold buffer and read caps
		 * (fadvise, madvise and readahead cases) */
		int want = CEPH_CAP_FILE_CACHE;
Y
Yan, Zheng 已提交
350 351
		ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
					true, &got);
352 353 354 355 356 357 358 359 360 361
		if (ret < 0) {
			dout("start_read %p, error getting cap\n", inode);
		} else if (!(got & want)) {
			dout("start_read %p, no cache cap\n", inode);
			ret = 0;
		}
		if (ret <= 0) {
			if (got)
				ceph_put_cap_refs(ci, got);
			while (!list_empty(page_list)) {
362
				page = lru_to_page(page_list);
363 364 365 366 367 368
				list_del(&page->lru);
				put_page(page);
			}
			return ret;
		}
	}
S
Sage Weil 已提交
369

370
	off = (u64) page_offset(page);
S
Sage Weil 已提交
371

S
Sage Weil 已提交
372 373 374 375 376 377 378
	/* count pages */
	next_index = page->index;
	list_for_each_entry_reverse(page, page_list, lru) {
		if (page->index != next_index)
			break;
		nr_pages++;
		next_index++;
379 380
		if (max && nr_pages == max)
			break;
S
Sage Weil 已提交
381
	}
382
	len = nr_pages << PAGE_SHIFT;
S
Sage Weil 已提交
383 384
	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
	     off, len);
385 386
	vino = ceph_vino(inode);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
387
				    0, 1, CEPH_OSD_OP_READ,
388
				    CEPH_OSD_FLAG_READ, NULL,
S
Sage Weil 已提交
389
				    ci->i_truncate_seq, ci->i_truncate_size,
390
				    false);
391 392 393 394
	if (IS_ERR(req)) {
		ret = PTR_ERR(req);
		goto out;
	}
S
Sage Weil 已提交
395

S
Sage Weil 已提交
396
	/* build page vector */
397
	nr_pages = calc_pages_for(0, len);
398
	pages = kmalloc_array(nr_pages, sizeof(*pages), GFP_KERNEL);
399 400 401 402
	if (!pages) {
		ret = -ENOMEM;
		goto out_put;
	}
S
Sage Weil 已提交
403 404 405
	for (i = 0; i < nr_pages; ++i) {
		page = list_entry(page_list->prev, struct page, lru);
		BUG_ON(PageLocked(page));
S
Sage Weil 已提交
406
		list_del(&page->lru);
407

S
Sage Weil 已提交
408 409 410
 		dout("start_read %p adding %p idx %lu\n", inode, page,
		     page->index);
		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
411
					  GFP_KERNEL)) {
412
			ceph_fscache_uncache_page(inode, page);
413
			put_page(page);
S
Sage Weil 已提交
414
			dout("start_read %p add_to_page_cache failed %p\n",
S
Sage Weil 已提交
415
			     inode, page);
S
Sage Weil 已提交
416
			nr_pages = i;
417 418
			if (nr_pages > 0) {
				len = nr_pages << PAGE_SHIFT;
419
				osd_req_op_extent_update(req, 0, len);
420 421
				break;
			}
S
Sage Weil 已提交
422
			goto out_pages;
S
Sage Weil 已提交
423
		}
S
Sage Weil 已提交
424
		pages[i] = page;
S
Sage Weil 已提交
425
	}
426
	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
S
Sage Weil 已提交
427 428 429 430 431 432 433 434
	req->r_callback = finish_read;
	req->r_inode = inode;

	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
	ret = ceph_osdc_start_request(osdc, req, false);
	if (ret < 0)
		goto out_pages;
	ceph_osdc_put_request(req);
435 436 437 438 439 440

	/* After adding locked pages to page cache, the inode holds cache cap.
	 * So we can drop our cap refs. */
	if (got)
		ceph_put_cap_refs(ci, got);

S
Sage Weil 已提交
441 442 443
	return nr_pages;

out_pages:
444 445 446 447 448
	for (i = 0; i < nr_pages; ++i) {
		ceph_fscache_readpage_cancel(inode, pages[i]);
		unlock_page(pages[i]);
	}
	ceph_put_page_vector(pages, nr_pages, false);
449
out_put:
S
Sage Weil 已提交
450
	ceph_osdc_put_request(req);
451 452 453
out:
	if (got)
		ceph_put_cap_refs(ci, got);
S
Sage Weil 已提交
454 455
	return ret;
}
S
Sage Weil 已提交
456

S
Sage Weil 已提交
457 458 459 460 461 462 463 464

/*
 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 * the caller (VM) cleans them up.
 */
static int ceph_readpages(struct file *file, struct address_space *mapping,
			  struct list_head *page_list, unsigned nr_pages)
{
A
Al Viro 已提交
465
	struct inode *inode = file_inode(file);
466
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
467
	struct ceph_file_info *fi = file->private_data;
468
	struct ceph_rw_context *rw_ctx;
S
Sage Weil 已提交
469
	int rc = 0;
470 471
	int max = 0;

Y
Yan, Zheng 已提交
472 473 474
	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
		return -EINVAL;

475 476 477 478 479 480
	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
					 &nr_pages);

	if (rc == 0)
		goto out;

481
	rw_ctx = ceph_find_rw_context(fi);
482
	max = fsc->mount_options->rsize >> PAGE_SHIFT;
483 484
	dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
	     inode, file, rw_ctx, nr_pages, max);
S
Sage Weil 已提交
485
	while (!list_empty(page_list)) {
486
		rc = start_read(inode, rw_ctx, page_list, max);
S
Sage Weil 已提交
487 488 489
		if (rc < 0)
			goto out;
	}
S
Sage Weil 已提交
490
out:
491 492
	ceph_fscache_readpages_cancel(inode, page_list);

S
Sage Weil 已提交
493
	dout("readpages %p file %p ret %d\n", inode, file, rc);
S
Sage Weil 已提交
494 495 496
	return rc;
}

497 498 499 500 501 502
struct ceph_writeback_ctl
{
	loff_t i_size;
	u64 truncate_size;
	u32 truncate_seq;
	bool size_stable;
503
	bool head_snapc;
504 505
};

S
Sage Weil 已提交
506 507 508 509
/*
 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 * only snap context we are allowed to write back.
 */
510
static struct ceph_snap_context *
511 512
get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
		   struct ceph_snap_context *page_snapc)
S
Sage Weil 已提交
513 514 515 516 517
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = NULL;
	struct ceph_cap_snap *capsnap = NULL;

518
	spin_lock(&ci->i_ceph_lock);
S
Sage Weil 已提交
519 520 521
	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
		     capsnap->context, capsnap->dirty_pages);
522 523 524 525 526 527 528 529 530 531 532 533 534 535
		if (!capsnap->dirty_pages)
			continue;

		/* get i_size, truncate_{seq,size} for page_snapc? */
		if (snapc && capsnap->context != page_snapc)
			continue;

		if (ctl) {
			if (capsnap->writing) {
				ctl->i_size = i_size_read(inode);
				ctl->size_stable = false;
			} else {
				ctl->i_size = capsnap->size;
				ctl->size_stable = true;
536
			}
537 538
			ctl->truncate_size = capsnap->truncate_size;
			ctl->truncate_seq = capsnap->truncate_seq;
539
			ctl->head_snapc = false;
S
Sage Weil 已提交
540
		}
541 542 543 544 545 546 547 548 549

		if (snapc)
			break;

		snapc = ceph_get_snap_context(capsnap->context);
		if (!page_snapc ||
		    page_snapc == snapc ||
		    page_snapc->seq > snapc->seq)
			break;
S
Sage Weil 已提交
550
	}
551
	if (!snapc && ci->i_wrbuffer_ref_head) {
552
		snapc = ceph_get_snap_context(ci->i_head_snapc);
S
Sage Weil 已提交
553 554
		dout(" head snapc %p has %d dirty pages\n",
		     snapc, ci->i_wrbuffer_ref_head);
555 556 557 558 559
		if (ctl) {
			ctl->i_size = i_size_read(inode);
			ctl->truncate_size = ci->i_truncate_size;
			ctl->truncate_seq = ci->i_truncate_seq;
			ctl->size_stable = false;
560
			ctl->head_snapc = true;
561
		}
S
Sage Weil 已提交
562
	}
563
	spin_unlock(&ci->i_ceph_lock);
S
Sage Weil 已提交
564 565 566
	return snapc;
}

567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
static u64 get_writepages_data_length(struct inode *inode,
				      struct page *page, u64 start)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = page_snap_context(page);
	struct ceph_cap_snap *capsnap = NULL;
	u64 end = i_size_read(inode);

	if (snapc != ci->i_head_snapc) {
		bool found = false;
		spin_lock(&ci->i_ceph_lock);
		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
			if (capsnap->context == snapc) {
				if (!capsnap->writing)
					end = capsnap->size;
				found = true;
				break;
			}
		}
		spin_unlock(&ci->i_ceph_lock);
		WARN_ON(!found);
	}
	if (end > page_offset(page) + PAGE_SIZE)
		end = page_offset(page) + PAGE_SIZE;
	return end > start ? end - start : 0;
}

594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
/*
 * do a synchronous write on N pages
 */
static int ceph_sync_writepages(struct ceph_fs_client *fsc,
				struct ceph_vino vino,
				struct ceph_file_layout *layout,
				struct ceph_snap_context *snapc,
				u64 off, u64 len,
				u32 truncate_seq, u64 truncate_size,
				struct timespec64 *mtime,
				struct page **pages, int num_pages)
{
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
	int rc = 0;
	int page_align = off & ~PAGE_MASK;

	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
				    snapc, truncate_seq, truncate_size,
				    true);
	if (IS_ERR(req))
		return PTR_ERR(req);

	/* it may be a short write due to an object boundary */
	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
				false, false);
	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);

	req->r_mtime = *mtime;
	rc = ceph_osdc_start_request(osdc, req, true);
	if (!rc)
		rc = ceph_osdc_wait_request(osdc, req);

628 629 630
	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, rc);

631 632 633 634 635 636 637
	ceph_osdc_put_request(req);
	if (rc == 0)
		rc = len;
	dout("writepages result %d\n", rc);
	return rc;
}

S
Sage Weil 已提交
638 639 640
/*
 * Write a single page, but leave the page locked.
 *
641
 * If we get a write error, mark the mapping for error, but still adjust the
S
Sage Weil 已提交
642 643 644 645 646 647
 * dirty page accounting (i.e., page is no longer dirty).
 */
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{
	struct inode *inode;
	struct ceph_inode_info *ci;
648
	struct ceph_fs_client *fsc;
649
	struct ceph_snap_context *snapc, *oldest;
650
	loff_t page_off = page_offset(page);
Y
Yan, Zheng 已提交
651
	int err, len = PAGE_SIZE;
652
	struct ceph_writeback_ctl ceph_wbc;
S
Sage Weil 已提交
653 654 655 656 657

	dout("writepage %p idx %lu\n", page, page->index);

	inode = page->mapping->host;
	ci = ceph_inode(inode);
658
	fsc = ceph_inode_to_client(inode);
S
Sage Weil 已提交
659 660

	/* verify this is a writeable snap context */
661
	snapc = page_snap_context(page);
662
	if (!snapc) {
S
Sage Weil 已提交
663
		dout("writepage %p page %p not dirty?\n", inode, page);
Y
Yan, Zheng 已提交
664
		return 0;
S
Sage Weil 已提交
665
	}
666
	oldest = get_oldest_context(inode, &ceph_wbc, snapc);
667
	if (snapc->seq > oldest->seq) {
S
Sage Weil 已提交
668
		dout("writepage %p page %p snapc %p not writeable - noop\n",
669
		     inode, page, snapc);
S
Sage Weil 已提交
670
		/* we should only noop if called by kswapd */
671
		WARN_ON(!(current->flags & PF_MEMALLOC));
672
		ceph_put_snap_context(oldest);
673
		redirty_page_for_writepage(wbc, page);
Y
Yan, Zheng 已提交
674
		return 0;
S
Sage Weil 已提交
675
	}
676
	ceph_put_snap_context(oldest);
S
Sage Weil 已提交
677 678

	/* is this a partial page at end of file? */
679 680
	if (page_off >= ceph_wbc.i_size) {
		dout("%p page eof %llu\n", page, ceph_wbc.i_size);
681
		page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
Y
Yan, Zheng 已提交
682
		return 0;
683
	}
Y
Yan, Zheng 已提交
684

685 686
	if (ceph_wbc.i_size < page_off + len)
		len = ceph_wbc.i_size - page_off;
S
Sage Weil 已提交
687

688 689
	dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
	     inode, page, page->index, page_off, len, snapc, snapc->seq);
S
Sage Weil 已提交
690

691
	if (atomic_long_inc_return(&fsc->writeback_count) >
692
	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
693
		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
Y
Yehuda Sadeh 已提交
694

S
Sage Weil 已提交
695
	set_page_writeback(page);
696
	err = ceph_sync_writepages(fsc, ceph_vino(inode),
697 698 699
				   &ci->i_layout, snapc, page_off, len,
				   ceph_wbc.truncate_seq,
				   ceph_wbc.truncate_size,
700
				   &inode->i_mtime, &page, 1);
S
Sage Weil 已提交
701
	if (err < 0) {
702 703 704 705 706 707 708 709
		struct writeback_control tmp_wbc;
		if (!wbc)
			wbc = &tmp_wbc;
		if (err == -ERESTARTSYS) {
			/* killed by SIGKILL */
			dout("writepage interrupted page %p\n", page);
			redirty_page_for_writepage(wbc, page);
			end_page_writeback(page);
Y
Yan, Zheng 已提交
710
			return err;
711
		}
712 713
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
714 715
		dout("writepage setting page/mapping error %d %p\n",
		     err, page);
S
Sage Weil 已提交
716
		mapping_set_error(&inode->i_data, err);
717
		wbc->pages_skipped++;
S
Sage Weil 已提交
718 719 720 721 722 723 724 725
	} else {
		dout("writepage cleaned page %p\n", page);
		err = 0;  /* vfs expects us to return 0 */
	}
	page->private = 0;
	ClearPagePrivate(page);
	end_page_writeback(page);
	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
726
	ceph_put_snap_context(snapc);  /* page's reference */
727 728 729 730 731

	if (atomic_long_dec_return(&fsc->writeback_count) <
	    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
		clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);

S
Sage Weil 已提交
732 733 734 735 736
	return err;
}

static int ceph_writepage(struct page *page, struct writeback_control *wbc)
{
737 738 739
	int err;
	struct inode *inode = page->mapping->host;
	BUG_ON(!inode);
740
	ihold(inode);
741
	err = writepage_nounlock(page, wbc);
742 743 744 745 746
	if (err == -ERESTARTSYS) {
		/* direct memory reclaimer was killed by SIGKILL. return 0
		 * to prevent caller from setting mapping/page error */
		err = 0;
	}
S
Sage Weil 已提交
747
	unlock_page(page);
748
	iput(inode);
S
Sage Weil 已提交
749 750 751 752 753 754 755 756 757
	return err;
}

/*
 * async writeback completion handler.
 *
 * If we get an error, set the mapping error bit, but not the individual
 * page error bits.
 */
758
static void writepages_finish(struct ceph_osd_request *req)
S
Sage Weil 已提交
759 760 761
{
	struct inode *inode = req->r_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
762
	struct ceph_osd_data *osd_data;
S
Sage Weil 已提交
763
	struct page *page;
Y
Yan, Zheng 已提交
764 765 766
	int num_pages, total_pages = 0;
	int i, j;
	int rc = req->r_result;
S
Sage Weil 已提交
767 768
	struct ceph_snap_context *snapc = req->r_snapc;
	struct address_space *mapping = inode->i_mapping;
769
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
Y
Yan, Zheng 已提交
770
	bool remove_page;
S
Sage Weil 已提交
771

Y
Yan, Zheng 已提交
772
	dout("writepages_finish %p rc %d\n", inode, rc);
773
	if (rc < 0) {
S
Sage Weil 已提交
774
		mapping_set_error(mapping, rc);
775
		ceph_set_error_write(ci);
776 777
		if (rc == -EBLOCKLISTED)
			fsc->blocklisted = true;
778 779 780
	} else {
		ceph_clear_error_write(ci);
	}
Y
Yan, Zheng 已提交
781

782 783 784
	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, rc);

Y
Yan, Zheng 已提交
785 786 787 788 789 790 791 792
	/*
	 * We lost the cache cap, need to truncate the page before
	 * it is unlocked, otherwise we'd truncate it later in the
	 * page truncation thread, possibly losing some data that
	 * raced its way in
	 */
	remove_page = !(ceph_caps_issued(ci) &
			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
S
Sage Weil 已提交
793 794

	/* clean all pages */
Y
Yan, Zheng 已提交
795 796 797
	for (i = 0; i < req->r_num_ops; i++) {
		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
			break;
798

Y
Yan, Zheng 已提交
799 800 801 802 803 804 805 806 807 808 809 810 811
		osd_data = osd_req_op_extent_osd_data(req, i);
		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
		num_pages = calc_pages_for((u64)osd_data->alignment,
					   (u64)osd_data->length);
		total_pages += num_pages;
		for (j = 0; j < num_pages; j++) {
			page = osd_data->pages[j];
			BUG_ON(!page);
			WARN_ON(!PageUptodate(page));

			if (atomic_long_dec_return(&fsc->writeback_count) <
			     CONGESTION_OFF_THRESH(
					fsc->mount_options->congestion_kb))
812
				clear_bdi_congested(inode_to_bdi(inode),
Y
Yan, Zheng 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
						    BLK_RW_ASYNC);

			ceph_put_snap_context(page_snap_context(page));
			page->private = 0;
			ClearPagePrivate(page);
			dout("unlocking %p\n", page);
			end_page_writeback(page);

			if (remove_page)
				generic_error_remove_page(inode->i_mapping,
							  page);

			unlock_page(page);
		}
		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
829

830
		release_pages(osd_data->pages, num_pages);
S
Sage Weil 已提交
831 832
	}

Y
Yan, Zheng 已提交
833 834 835
	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);

	osd_data = osd_req_op_extent_osd_data(req, 0);
836
	if (osd_data->pages_from_pool)
837
		mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
S
Sage Weil 已提交
838
	else
839
		kfree(osd_data->pages);
S
Sage Weil 已提交
840 841 842 843 844 845 846 847 848 849 850
	ceph_osdc_put_request(req);
}

/*
 * initiate async writeback
 */
static int ceph_writepages_start(struct address_space *mapping,
				 struct writeback_control *wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
851 852
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_vino vino = ceph_vino(inode);
853
	pgoff_t index, start_index, end = -1;
854
	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
S
Sage Weil 已提交
855 856
	struct pagevec pvec;
	int rc = 0;
F
Fabian Frederick 已提交
857
	unsigned int wsize = i_blocksize(inode);
S
Sage Weil 已提交
858
	struct ceph_osd_request *req = NULL;
859
	struct ceph_writeback_ctl ceph_wbc;
860
	bool should_loop, range_whole = false;
861
	bool done = false;
S
Sage Weil 已提交
862

Y
Yanhu Cao 已提交
863
	dout("writepages_start %p (mode=%s)\n", inode,
S
Sage Weil 已提交
864 865 866
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

867
	if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
868 869 870 871 872
		if (ci->i_wrbuffer_ref > 0) {
			pr_warn_ratelimited(
				"writepage_start %p %lld forced umount\n",
				inode, ceph_ino(inode));
		}
873
		mapping_set_error(mapping, -EIO);
S
Sage Weil 已提交
874 875
		return -EIO; /* we're in a forced umount, don't write! */
	}
Y
Yan, Zheng 已提交
876
	if (fsc->mount_options->wsize < wsize)
877
		wsize = fsc->mount_options->wsize;
S
Sage Weil 已提交
878

879
	pagevec_init(&pvec);
S
Sage Weil 已提交
880

881
	start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
882
	index = start_index;
S
Sage Weil 已提交
883 884 885

retry:
	/* find oldest snap context with dirty data */
886
	snapc = get_oldest_context(inode, &ceph_wbc, NULL);
S
Sage Weil 已提交
887 888 889 890 891 892 893 894
	if (!snapc) {
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		dout(" no snap context with dirty data?\n");
		goto out;
	}
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);
895

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
	should_loop = false;
	if (ceph_wbc.head_snapc && snapc != last_snapc) {
		/* where to start/end? */
		if (wbc->range_cyclic) {
			index = start_index;
			end = -1;
			if (index > 0)
				should_loop = true;
			dout(" cyclic, start at %lu\n", index);
		} else {
			index = wbc->range_start >> PAGE_SHIFT;
			end = wbc->range_end >> PAGE_SHIFT;
			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
				range_whole = true;
			dout(" not cyclic, %lu to %lu\n", index, end);
		}
	} else if (!ceph_wbc.head_snapc) {
		/* Do not respect wbc->range_{start,end}. Dirty pages
		 * in that range can be associated with newer snapc.
		 * They are not writeable until we write all dirty pages
		 * associated with 'snapc' get written */
917
		if (index > 0)
918 919
			should_loop = true;
		dout(" non-head snapc, range whole\n");
S
Sage Weil 已提交
920
	}
921 922

	ceph_put_snap_context(last_snapc);
S
Sage Weil 已提交
923 924
	last_snapc = snapc;

925
	while (!done && index <= end) {
Y
Yan, Zheng 已提交
926
		int num_ops = 0, op_idx;
927
		unsigned i, pvec_pages, max_pages, locked_pages = 0;
Y
Yan, Zheng 已提交
928
		struct page **pages = NULL, **data_pages;
S
Sage Weil 已提交
929
		struct page *page;
930
		pgoff_t strip_unit_end = 0;
Y
Yan, Zheng 已提交
931
		u64 offset = 0, len = 0;
932
		bool from_pool = false;
S
Sage Weil 已提交
933

934
		max_pages = wsize >> PAGE_SHIFT;
S
Sage Weil 已提交
935 936

get_more_pages:
937 938
		pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
						end, PAGECACHE_TAG_DIRTY);
J
Jan Kara 已提交
939
		dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
S
Sage Weil 已提交
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
		if (!pvec_pages && !locked_pages)
			break;
		for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
			page = pvec.pages[i];
			dout("? %p idx %lu\n", page, page->index);
			if (locked_pages == 0)
				lock_page(page);  /* first page */
			else if (!trylock_page(page))
				break;

			/* only dirty pages, or our accounting breaks */
			if (unlikely(!PageDirty(page)) ||
			    unlikely(page->mapping != mapping)) {
				dout("!dirty or !mapping %p\n", page);
				unlock_page(page);
955
				continue;
S
Sage Weil 已提交
956
			}
957 958 959 960 961
			/* only if matching snap context */
			pgsnapc = page_snap_context(page);
			if (pgsnapc != snapc) {
				dout("page snapc %p %lld != oldest %p %lld\n",
				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
962 963 964 965
				if (!should_loop &&
				    !ceph_wbc.head_snapc &&
				    wbc->sync_mode != WB_SYNC_NONE)
					should_loop = true;
S
Sage Weil 已提交
966
				unlock_page(page);
967
				continue;
S
Sage Weil 已提交
968
			}
969 970 971
			if (page_offset(page) >= ceph_wbc.i_size) {
				dout("%p page eof %llu\n",
				     page, ceph_wbc.i_size);
972 973 974
				if ((ceph_wbc.size_stable ||
				    page_offset(page) >= i_size_read(inode)) &&
				    clear_page_dirty_for_io(page))
975 976 977 978 979 980 981
					mapping->a_ops->invalidatepage(page,
								0, PAGE_SIZE);
				unlock_page(page);
				continue;
			}
			if (strip_unit_end && (page->index > strip_unit_end)) {
				dout("end of strip unit %p\n", page);
S
Sage Weil 已提交
982 983 984 985
				unlock_page(page);
				break;
			}
			if (PageWriteback(page)) {
986 987 988 989 990 991 992
				if (wbc->sync_mode == WB_SYNC_NONE) {
					dout("%p under writeback\n", page);
					unlock_page(page);
					continue;
				}
				dout("waiting on writeback %p\n", page);
				wait_on_page_writeback(page);
S
Sage Weil 已提交
993 994 995 996 997
			}

			if (!clear_page_dirty_for_io(page)) {
				dout("%p !clear_page_dirty_for_io\n", page);
				unlock_page(page);
998
				continue;
S
Sage Weil 已提交
999 1000
			}

1001 1002 1003
			/*
			 * We have something to write.  If this is
			 * the first locked page this time through,
Y
Yan, Zheng 已提交
1004 1005
			 * calculate max possinle write size and
			 * allocate a page array
1006
			 */
S
Sage Weil 已提交
1007
			if (locked_pages == 0) {
Y
Yan, Zheng 已提交
1008 1009
				u64 objnum;
				u64 objoff;
1010
				u32 xlen;
Y
Yan, Zheng 已提交
1011

S
Sage Weil 已提交
1012
				/* prepare async write request */
1013
				offset = (u64)page_offset(page);
1014 1015 1016 1017 1018
				ceph_calc_file_object_mapping(&ci->i_layout,
							      offset, wsize,
							      &objnum, &objoff,
							      &xlen);
				len = xlen;
1019

Y
Yanhu Cao 已提交
1020
				num_ops = 1;
Y
Yan, Zheng 已提交
1021
				strip_unit_end = page->index +
1022
					((len - 1) >> PAGE_SHIFT);
A
Alex Elder 已提交
1023

Y
Yan, Zheng 已提交
1024
				BUG_ON(pages);
A
Alex Elder 已提交
1025
				max_pages = calc_pages_for(0, (u64)len);
1026 1027 1028
				pages = kmalloc_array(max_pages,
						      sizeof(*pages),
						      GFP_NOFS);
A
Alex Elder 已提交
1029
				if (!pages) {
1030 1031
					from_pool = true;
					pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
1032
					BUG_ON(!pages);
A
Alex Elder 已提交
1033
				}
Y
Yan, Zheng 已提交
1034 1035 1036

				len = 0;
			} else if (page->index !=
1037
				   (offset + len) >> PAGE_SHIFT) {
1038 1039
				if (num_ops >= (from_pool ?  CEPH_OSD_SLAB_OPS :
							     CEPH_OSD_MAX_OPS)) {
Y
Yan, Zheng 已提交
1040 1041 1042 1043 1044 1045 1046 1047
					redirty_page_for_writepage(wbc, page);
					unlock_page(page);
					break;
				}

				num_ops++;
				offset = (u64)page_offset(page);
				len = 0;
S
Sage Weil 已提交
1048 1049 1050 1051 1052
			}

			/* note position of first page in pvec */
			dout("%p will write page %p idx %lu\n",
			     inode, page, page->index);
Y
Yehuda Sadeh 已提交
1053

Y
Yan, Zheng 已提交
1054 1055
			if (atomic_long_inc_return(&fsc->writeback_count) >
			    CONGESTION_ON_THRESH(
1056
				    fsc->mount_options->congestion_kb)) {
1057
				set_bdi_congested(inode_to_bdi(inode),
S
Sage Weil 已提交
1058
						  BLK_RW_ASYNC);
Y
Yehuda Sadeh 已提交
1059 1060
			}

1061 1062 1063 1064

			pages[locked_pages++] = page;
			pvec.pages[i] = NULL;

1065
			len += PAGE_SIZE;
S
Sage Weil 已提交
1066 1067 1068 1069 1070 1071
		}

		/* did we get anything? */
		if (!locked_pages)
			goto release_pvec_pages;
		if (i) {
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
			unsigned j, n = 0;
			/* shift unused page to beginning of pvec */
			for (j = 0; j < pvec_pages; j++) {
				if (!pvec.pages[j])
					continue;
				if (n < j)
					pvec.pages[n] = pvec.pages[j];
				n++;
			}
			pvec.nr = n;
S
Sage Weil 已提交
1082 1083 1084 1085

			if (pvec_pages && i == pvec_pages &&
			    locked_pages < max_pages) {
				dout("reached end pvec, trying for more\n");
1086
				pagevec_release(&pvec);
S
Sage Weil 已提交
1087 1088 1089 1090
				goto get_more_pages;
			}
		}

Y
Yan, Zheng 已提交
1091
new_request:
1092
		offset = page_offset(pages[0]);
Y
Yan, Zheng 已提交
1093 1094 1095 1096 1097
		len = wsize;

		req = ceph_osdc_new_request(&fsc->client->osdc,
					&ci->i_layout, vino,
					offset, &len, 0, num_ops,
1098 1099 1100
					CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
					snapc, ceph_wbc.truncate_seq,
					ceph_wbc.truncate_size, false);
Y
Yan, Zheng 已提交
1101 1102 1103 1104 1105 1106 1107
		if (IS_ERR(req)) {
			req = ceph_osdc_new_request(&fsc->client->osdc,
						&ci->i_layout, vino,
						offset, &len, 0,
						min(num_ops,
						    CEPH_OSD_SLAB_OPS),
						CEPH_OSD_OP_WRITE,
1108
						CEPH_OSD_FLAG_WRITE,
1109 1110
						snapc, ceph_wbc.truncate_seq,
						ceph_wbc.truncate_size, true);
Y
Yan, Zheng 已提交
1111
			BUG_ON(IS_ERR(req));
Y
Yan, Zheng 已提交
1112
		}
Y
Yan, Zheng 已提交
1113
		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
1114
			     PAGE_SIZE - offset);
Y
Yan, Zheng 已提交
1115 1116 1117

		req->r_callback = writepages_finish;
		req->r_inode = inode;
S
Sage Weil 已提交
1118

Y
Yan, Zheng 已提交
1119 1120 1121 1122 1123 1124 1125
		/* Format the osd request message and submit the write */
		len = 0;
		data_pages = pages;
		op_idx = 0;
		for (i = 0; i < locked_pages; i++) {
			u64 cur_offset = page_offset(pages[i]);
			if (offset + len != cur_offset) {
Y
Yanhu Cao 已提交
1126
				if (op_idx + 1 == req->r_num_ops)
Y
Yan, Zheng 已提交
1127 1128 1129 1130 1131 1132 1133
					break;
				osd_req_op_extent_dup_last(req, op_idx,
							   cur_offset - offset);
				dout("writepages got pages at %llu~%llu\n",
				     offset, len);
				osd_req_op_extent_osd_data_pages(req, op_idx,
							data_pages, len, 0,
1134
							from_pool, false);
Y
Yan, Zheng 已提交
1135
				osd_req_op_extent_update(req, op_idx, len);
1136

Y
Yan, Zheng 已提交
1137 1138 1139 1140 1141 1142 1143
				len = 0;
				offset = cur_offset; 
				data_pages = pages + i;
				op_idx++;
			}

			set_page_writeback(pages[i]);
1144
			len += PAGE_SIZE;
Y
Yan, Zheng 已提交
1145 1146
		}

1147 1148
		if (ceph_wbc.size_stable) {
			len = min(len, ceph_wbc.i_size - offset);
Y
Yan, Zheng 已提交
1149 1150 1151 1152
		} else if (i == locked_pages) {
			/* writepages_finish() clears writeback pages
			 * according to the data length, so make sure
			 * data length covers all locked pages */
1153
			u64 min_len = len + 1 - PAGE_SIZE;
1154 1155
			len = get_writepages_data_length(inode, pages[i - 1],
							 offset);
Y
Yan, Zheng 已提交
1156 1157 1158
			len = max(len, min_len);
		}
		dout("writepages got pages at %llu~%llu\n", offset, len);
1159

Y
Yan, Zheng 已提交
1160
		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1161
						 0, from_pool, false);
Y
Yan, Zheng 已提交
1162
		osd_req_op_extent_update(req, op_idx, len);
1163

Y
Yan, Zheng 已提交
1164 1165
		BUG_ON(op_idx + 1 != req->r_num_ops);

1166
		from_pool = false;
Y
Yan, Zheng 已提交
1167 1168 1169 1170 1171 1172 1173
		if (i < locked_pages) {
			BUG_ON(num_ops <= req->r_num_ops);
			num_ops -= req->r_num_ops;
			locked_pages -= i;

			/* allocate new pages array for next request */
			data_pages = pages;
1174 1175
			pages = kmalloc_array(locked_pages, sizeof(*pages),
					      GFP_NOFS);
Y
Yan, Zheng 已提交
1176
			if (!pages) {
1177 1178
				from_pool = true;
				pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
Y
Yan, Zheng 已提交
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
				BUG_ON(!pages);
			}
			memcpy(pages, data_pages + i,
			       locked_pages * sizeof(*pages));
			memset(data_pages + i, 0,
			       locked_pages * sizeof(*pages));
		} else {
			BUG_ON(num_ops != req->r_num_ops);
			index = pages[i - 1]->index + 1;
			/* request message now owns the pages array */
			pages = NULL;
		}
1191

1192
		req->r_mtime = inode->i_mtime;
1193 1194
		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
		BUG_ON(rc);
S
Sage Weil 已提交
1195 1196
		req = NULL;

Y
Yan, Zheng 已提交
1197 1198 1199 1200
		wbc->nr_to_write -= i;
		if (pages)
			goto new_request;

1201 1202 1203 1204 1205 1206 1207
		/*
		 * We stop writing back only if we are not doing
		 * integrity sync. In case of integrity sync we have to
		 * keep going until we have written all the pages
		 * we tagged for writeback prior to entering this loop.
		 */
		if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
1208
			done = true;
S
Sage Weil 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218

release_pvec_pages:
		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
		     pvec.nr ? pvec.pages[0] : NULL);
		pagevec_release(&pvec);
	}

	if (should_loop && !done) {
		/* more to do; loop back to beginning of file */
		dout("writepages looping back to beginning of file\n");
1219
		end = start_index - 1; /* OK even when start_index == 0 */
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230

		/* to write dirty pages associated with next snapc,
		 * we need to wait until current writes complete */
		if (wbc->sync_mode != WB_SYNC_NONE &&
		    start_index == 0 && /* all dirty pages were checked */
		    !ceph_wbc.head_snapc) {
			struct page *page;
			unsigned i, nr;
			index = 0;
			while ((index <= end) &&
			       (nr = pagevec_lookup_tag(&pvec, mapping, &index,
1231
						PAGECACHE_TAG_WRITEBACK))) {
1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
				for (i = 0; i < nr; i++) {
					page = pvec.pages[i];
					if (page_snap_context(page) != snapc)
						continue;
					wait_on_page_writeback(page);
				}
				pagevec_release(&pvec);
				cond_resched();
			}
		}

1243
		start_index = 0;
S
Sage Weil 已提交
1244 1245 1246 1247 1248 1249 1250 1251
		index = 0;
		goto retry;
	}

	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
		mapping->writeback_index = index;

out:
1252
	ceph_osdc_put_request(req);
1253 1254
	ceph_put_snap_context(last_snapc);
	dout("writepages dend - startone, rc = %d\n", rc);
S
Sage Weil 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
	return rc;
}



/*
 * See if a given @snapc is either writeable, or already written.
 */
static int context_is_writeable_or_written(struct inode *inode,
					   struct ceph_snap_context *snapc)
{
1266
	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
1267 1268 1269 1270
	int ret = !oldest || snapc->seq <= oldest->seq;

	ceph_put_snap_context(oldest);
	return ret;
S
Sage Weil 已提交
1271 1272
}

1273 1274 1275
/**
 * ceph_find_incompatible - find an incompatible context and return it
 * @page: page being dirtied
1276
 *
1277 1278 1279 1280 1281 1282
 * We are only allowed to write into/dirty a page if the page is
 * clean, or already dirty within the same snap context. Returns a
 * conflicting context if there is one, NULL if there isn't, or a
 * negative error code on other errors.
 *
 * Must be called with page lock held.
S
Sage Weil 已提交
1283
 */
1284
static struct ceph_snap_context *
1285
ceph_find_incompatible(struct page *page)
S
Sage Weil 已提交
1286
{
1287
	struct inode *inode = page->mapping->host;
1288
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
S
Sage Weil 已提交
1289 1290
	struct ceph_inode_info *ci = ceph_inode(inode);

1291
	if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1292
		dout(" page %p forced umount\n", page);
1293
		return ERR_PTR(-EIO);
1294 1295
	}

1296 1297 1298 1299 1300 1301 1302 1303
	for (;;) {
		struct ceph_snap_context *snapc, *oldest;

		wait_on_page_writeback(page);

		snapc = page_snap_context(page);
		if (!snapc || snapc == ci->i_head_snapc)
			break;
S
Sage Weil 已提交
1304 1305 1306 1307 1308

		/*
		 * this page is already dirty in another (older) snap
		 * context!  is it writeable now?
		 */
1309
		oldest = get_oldest_context(inode, NULL, NULL);
1310
		if (snapc->seq > oldest->seq) {
1311
			/* not writeable -- return it for the caller to deal with */
1312
			ceph_put_snap_context(oldest);
1313 1314
			dout(" page %p snapc %p not current or oldest\n", page, snapc);
			return ceph_get_snap_context(snapc);
S
Sage Weil 已提交
1315
		}
1316
		ceph_put_snap_context(oldest);
S
Sage Weil 已提交
1317 1318

		/* yay, writeable, do it now (without dropping page lock) */
1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
		dout(" page %p snapc %p not current, but oldest\n", page, snapc);
		if (clear_page_dirty_for_io(page)) {
			int r = writepage_nounlock(page, NULL);
			if (r < 0)
				return ERR_PTR(r);
		}
	}
	return NULL;
}

/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
 *
 * called with page locked.
 * return success with page locked,
 * or any failure (incl -EAGAIN) with page unlocked.
 */
static int ceph_update_writeable_page(struct file *file,
			    loff_t pos, unsigned len,
			    struct page *page)
{
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc;
	loff_t page_off = pos & PAGE_MASK;
	int pos_in_page = pos & ~PAGE_MASK;
	int end_in_page = pos_in_page + len;
	loff_t i_size;
	int r;

retry_locked:
1351
	snapc = ceph_find_incompatible(page);
1352 1353 1354
	if (snapc) {
		if (IS_ERR(snapc)) {
			r = PTR_ERR(snapc);
Y
Yan, Zheng 已提交
1355
			goto fail_unlock;
1356 1357 1358 1359 1360 1361 1362
		}
		unlock_page(page);
		ceph_queue_writeback(inode);
		r = wait_event_killable(ci->i_cap_wq,
					context_is_writeable_or_written(inode, snapc));
		ceph_put_snap_context(snapc);
		return -EAGAIN;
S
Sage Weil 已提交
1363 1364 1365 1366 1367 1368 1369 1370
	}

	if (PageUptodate(page)) {
		dout(" page %p already uptodate\n", page);
		return 0;
	}

	/* full page? */
1371
	if (pos_in_page == 0 && len == PAGE_SIZE)
S
Sage Weil 已提交
1372 1373 1374
		return 0;

	/* past end of file? */
1375
	i_size = i_size_read(inode);
S
Sage Weil 已提交
1376 1377 1378

	if (page_off >= i_size ||
	    (pos_in_page == 0 && (pos+len) >= i_size &&
1379
	     end_in_page - pos_in_page != PAGE_SIZE)) {
S
Sage Weil 已提交
1380
		dout(" zeroing %p 0 - %d and %d - %d\n",
1381
		     page, pos_in_page, end_in_page, (int)PAGE_SIZE);
S
Sage Weil 已提交
1382 1383
		zero_user_segments(page,
				   0, pos_in_page,
1384
				   end_in_page, PAGE_SIZE);
S
Sage Weil 已提交
1385 1386 1387 1388
		return 0;
	}

	/* we need to read it. */
Y
Yan, Zheng 已提交
1389 1390 1391 1392 1393 1394
	r = ceph_do_readpage(file, page);
	if (r < 0) {
		if (r == -EINPROGRESS)
			return -EAGAIN;
		goto fail_unlock;
	}
S
Sage Weil 已提交
1395
	goto retry_locked;
Y
Yan, Zheng 已提交
1396
fail_unlock:
S
Sage Weil 已提交
1397 1398 1399 1400
	unlock_page(page);
	return r;
}

1401 1402 1403 1404 1405 1406 1407 1408
/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
 */
static int ceph_write_begin(struct file *file, struct address_space *mapping,
			    loff_t pos, unsigned len, unsigned flags,
			    struct page **pagep, void **fsdata)
{
A
Al Viro 已提交
1409
	struct inode *inode = file_inode(file);
1410
	struct page *page;
1411
	pgoff_t index = pos >> PAGE_SHIFT;
S
Sage Weil 已提交
1412
	int r;
1413 1414

	do {
1415
		/* get a page */
1416
		page = grab_cache_page_write_begin(mapping, index, 0);
S
Sage Weil 已提交
1417 1418
		if (!page)
			return -ENOMEM;
1419 1420

		dout("write_begin file %p inode %p page %p %d~%d\n", file,
S
Sage Weil 已提交
1421
		     inode, page, (int)pos, (int)len);
1422 1423

		r = ceph_update_writeable_page(file, pos, len, page);
1424
		if (r < 0)
1425
			put_page(page);
1426 1427
		else
			*pagep = page;
1428 1429 1430 1431 1432
	} while (r == -EAGAIN);

	return r;
}

S
Sage Weil 已提交
1433 1434
/*
 * we don't do anything in here that simple_write_end doesn't do
1435
 * except adjust dirty page accounting
S
Sage Weil 已提交
1436 1437 1438 1439 1440
 */
static int ceph_write_end(struct file *file, struct address_space *mapping,
			  loff_t pos, unsigned len, unsigned copied,
			  struct page *page, void *fsdata)
{
A
Al Viro 已提交
1441
	struct inode *inode = file_inode(file);
1442
	bool check_cap = false;
S
Sage Weil 已提交
1443 1444 1445 1446 1447

	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
	     inode, page, (int)pos, (int)copied, (int)len);

	/* zero the stale part of the page if we did a short copy */
A
Al Viro 已提交
1448 1449 1450 1451 1452 1453 1454
	if (!PageUptodate(page)) {
		if (copied < len) {
			copied = 0;
			goto out;
		}
		SetPageUptodate(page);
	}
S
Sage Weil 已提交
1455 1456

	/* did file size increase? */
1457
	if (pos+copied > i_size_read(inode))
S
Sage Weil 已提交
1458 1459 1460 1461
		check_cap = ceph_inode_set_size(inode, pos+copied);

	set_page_dirty(page);

A
Al Viro 已提交
1462
out:
S
Sage Weil 已提交
1463
	unlock_page(page);
1464
	put_page(page);
S
Sage Weil 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

	if (check_cap)
		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);

	return copied;
}

/*
 * we set .direct_IO to indicate direct io is supported, but since we
 * intercept O_DIRECT reads and writes early, this function should
 * never get called.
 */
1477
static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
S
Sage Weil 已提交
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
{
	WARN_ON(1);
	return -EINVAL;
}

const struct address_space_operations ceph_aops = {
	.readpage = ceph_readpage,
	.readpages = ceph_readpages,
	.writepage = ceph_writepage,
	.writepages = ceph_writepages_start,
	.write_begin = ceph_write_begin,
	.write_end = ceph_write_end,
	.set_page_dirty = ceph_set_page_dirty,
	.invalidatepage = ceph_invalidatepage,
	.releasepage = ceph_releasepage,
	.direct_IO = ceph_direct_io,
};

1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
static void ceph_block_sigs(sigset_t *oldset)
{
	sigset_t mask;
	siginitsetinv(&mask, sigmask(SIGKILL));
	sigprocmask(SIG_BLOCK, &mask, oldset);
}

static void ceph_restore_sigs(sigset_t *oldset)
{
	sigprocmask(SIG_SETMASK, oldset, NULL);
}
S
Sage Weil 已提交
1507 1508 1509 1510

/*
 * vm ops
 */
1511
static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
1512
{
1513
	struct vm_area_struct *vma = vmf->vma;
1514 1515 1516
	struct inode *inode = file_inode(vma->vm_file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
1517
	struct page *pinned_page = NULL;
1518
	loff_t off = vmf->pgoff << PAGE_SHIFT;
1519
	int want, got, err;
1520
	sigset_t oldset;
1521
	vm_fault_t ret = VM_FAULT_SIGBUS;
1522 1523

	ceph_block_sigs(&oldset);
1524 1525

	dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1526
	     inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
1527 1528 1529 1530
	if (fi->fmode & CEPH_FILE_MODE_LAZY)
		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
	else
		want = CEPH_CAP_FILE_CACHE;
1531 1532

	got = 0;
Y
Yan, Zheng 已提交
1533 1534
	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
			    &got, &pinned_page);
1535
	if (err < 0)
1536
		goto out_restore;
1537

1538
	dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1539
	     inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
1540

Y
Yan, Zheng 已提交
1541
	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1542
	    ci->i_inline_version == CEPH_INLINE_NONE) {
1543 1544
		CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
		ceph_add_rw_context(fi, &rw_ctx);
1545
		ret = filemap_fault(vmf);
1546
		ceph_del_rw_context(fi, &rw_ctx);
1547 1548 1549
		dout("filemap_fault %p %llu~%zd drop cap refs %s ret %x\n",
			inode, off, (size_t)PAGE_SIZE,
				ceph_cap_string(got), ret);
1550
	} else
1551
		err = -EAGAIN;
1552

1553
	if (pinned_page)
1554
		put_page(pinned_page);
1555 1556
	ceph_put_cap_refs(ci, got);

1557
	if (err != -EAGAIN)
1558
		goto out_restore;
Y
Yan, Zheng 已提交
1559 1560

	/* read inline data */
1561
	if (off >= PAGE_SIZE) {
Y
Yan, Zheng 已提交
1562 1563 1564 1565 1566
		/* does not support inline data > PAGE_SIZE */
		ret = VM_FAULT_SIGBUS;
	} else {
		struct address_space *mapping = inode->i_mapping;
		struct page *page = find_or_create_page(mapping, 0,
1567 1568
						mapping_gfp_constraint(mapping,
						~__GFP_FS));
Y
Yan, Zheng 已提交
1569 1570
		if (!page) {
			ret = VM_FAULT_OOM;
1571
			goto out_inline;
Y
Yan, Zheng 已提交
1572
		}
1573
		err = __ceph_do_getattr(inode, page,
Y
Yan, Zheng 已提交
1574
					 CEPH_STAT_CAP_INLINE_DATA, true);
1575
		if (err < 0 || off >= i_size_read(inode)) {
Y
Yan, Zheng 已提交
1576
			unlock_page(page);
1577
			put_page(page);
1578
			ret = vmf_error(err);
1579
			goto out_inline;
Y
Yan, Zheng 已提交
1580
		}
1581 1582
		if (err < PAGE_SIZE)
			zero_user_segment(page, err, PAGE_SIZE);
Y
Yan, Zheng 已提交
1583 1584 1585 1586 1587
		else
			flush_dcache_page(page);
		SetPageUptodate(page);
		vmf->page = page;
		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1588
out_inline:
1589
		dout("filemap_fault %p %llu~%zd read inline data ret %x\n",
1590
		     inode, off, (size_t)PAGE_SIZE, ret);
Y
Yan, Zheng 已提交
1591
	}
1592 1593
out_restore:
	ceph_restore_sigs(&oldset);
1594 1595
	if (err < 0)
		ret = vmf_error(err);
1596

1597 1598
	return ret;
}
S
Sage Weil 已提交
1599 1600 1601 1602

/*
 * Reuse write_begin here for simplicity.
 */
1603
static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
S
Sage Weil 已提交
1604
{
1605
	struct vm_area_struct *vma = vmf->vma;
A
Al Viro 已提交
1606
	struct inode *inode = file_inode(vma->vm_file);
1607 1608
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
1609
	struct ceph_cap_flush *prealloc_cf;
1610
	struct page *page = vmf->page;
1611
	loff_t off = page_offset(page);
1612 1613
	loff_t size = i_size_read(inode);
	size_t len;
1614
	int want, got, err;
1615
	sigset_t oldset;
1616
	vm_fault_t ret = VM_FAULT_SIGBUS;
1617

1618 1619
	prealloc_cf = ceph_alloc_cap_flush();
	if (!prealloc_cf)
1620
		return VM_FAULT_OOM;
1621

1622
	sb_start_pagefault(inode->i_sb);
1623
	ceph_block_sigs(&oldset);
1624

1625 1626 1627 1628 1629 1630
	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		struct page *locked_page = NULL;
		if (off == 0) {
			lock_page(page);
			locked_page = page;
		}
1631
		err = ceph_uninline_data(vma->vm_file, locked_page);
1632 1633
		if (locked_page)
			unlock_page(locked_page);
1634
		if (err < 0)
1635
			goto out_free;
1636 1637
	}

1638 1639
	if (off + PAGE_SIZE <= size)
		len = PAGE_SIZE;
S
Sage Weil 已提交
1640
	else
1641
		len = size & ~PAGE_MASK;
S
Sage Weil 已提交
1642

1643 1644 1645 1646 1647 1648
	dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
	     inode, ceph_vinop(inode), off, len, size);
	if (fi->fmode & CEPH_FILE_MODE_LAZY)
		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
	else
		want = CEPH_CAP_FILE_BUFFER;
1649 1650

	got = 0;
Y
Yan, Zheng 已提交
1651
	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
1652
			    &got, NULL);
1653
	if (err < 0)
1654
		goto out_free;
1655

1656 1657 1658 1659 1660
	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
	     inode, off, len, ceph_cap_string(got));

	/* Update time before taking page lock */
	file_update_time(vma->vm_file);
1661
	inode_inc_iversion_raw(inode);
1662

1663
	do {
1664 1665
		struct ceph_snap_context *snapc;

1666
		lock_page(page);
1667

1668
		if (page_mkwrite_check_truncate(page, inode) < 0) {
1669 1670 1671 1672 1673
			unlock_page(page);
			ret = VM_FAULT_NOPAGE;
			break;
		}

1674 1675
		snapc = ceph_find_incompatible(page);
		if (!snapc) {
1676 1677 1678
			/* success.  we'll keep the page locked. */
			set_page_dirty(page);
			ret = VM_FAULT_LOCKED;
1679 1680 1681 1682 1683 1684 1685 1686
			break;
		}

		unlock_page(page);

		if (IS_ERR(snapc)) {
			ret = VM_FAULT_SIGBUS;
			break;
1687
		}
1688 1689 1690 1691 1692 1693

		ceph_queue_writeback(inode);
		err = wait_event_killable(ci->i_cap_wq,
				context_is_writeable_or_written(inode, snapc));
		ceph_put_snap_context(snapc);
	} while (err == 0);
1694

1695 1696
	if (ret == VM_FAULT_LOCKED ||
	    ci->i_inline_version != CEPH_INLINE_NONE) {
1697 1698
		int dirty;
		spin_lock(&ci->i_ceph_lock);
1699
		ci->i_inline_version = CEPH_INLINE_NONE;
1700 1701
		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
					       &prealloc_cf);
1702 1703 1704 1705 1706
		spin_unlock(&ci->i_ceph_lock);
		if (dirty)
			__mark_inode_dirty(inode, dirty);
	}

1707
	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
1708 1709
	     inode, off, len, ceph_cap_string(got), ret);
	ceph_put_cap_refs(ci, got);
1710
out_free:
1711
	ceph_restore_sigs(&oldset);
1712
	sb_end_pagefault(inode->i_sb);
1713
	ceph_free_cap_flush(prealloc_cf);
1714 1715
	if (err < 0)
		ret = vmf_error(err);
S
Sage Weil 已提交
1716 1717 1718
	return ret;
}

Y
Yan, Zheng 已提交
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730
void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
			   char	*data, size_t len)
{
	struct address_space *mapping = inode->i_mapping;
	struct page *page;

	if (locked_page) {
		page = locked_page;
	} else {
		if (i_size_read(inode) == 0)
			return;
		page = find_or_create_page(mapping, 0,
1731 1732
					   mapping_gfp_constraint(mapping,
					   ~__GFP_FS));
Y
Yan, Zheng 已提交
1733 1734 1735 1736
		if (!page)
			return;
		if (PageUptodate(page)) {
			unlock_page(page);
1737
			put_page(page);
Y
Yan, Zheng 已提交
1738 1739 1740 1741
			return;
		}
	}

1742
	dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
Y
Yan, Zheng 已提交
1743 1744 1745 1746 1747 1748 1749 1750 1751
	     inode, ceph_vinop(inode), len, locked_page);

	if (len > 0) {
		void *kaddr = kmap_atomic(page);
		memcpy(kaddr, data, len);
		kunmap_atomic(kaddr);
	}

	if (page != locked_page) {
1752 1753
		if (len < PAGE_SIZE)
			zero_user_segment(page, len, PAGE_SIZE);
Y
Yan, Zheng 已提交
1754 1755 1756 1757 1758
		else
			flush_dcache_page(page);

		SetPageUptodate(page);
		unlock_page(page);
1759
		put_page(page);
Y
Yan, Zheng 已提交
1760 1761 1762
	}
}

1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795
int ceph_uninline_data(struct file *filp, struct page *locked_page)
{
	struct inode *inode = file_inode(filp);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_osd_request *req;
	struct page *page = NULL;
	u64 len, inline_version;
	int err = 0;
	bool from_pagecache = false;

	spin_lock(&ci->i_ceph_lock);
	inline_version = ci->i_inline_version;
	spin_unlock(&ci->i_ceph_lock);

	dout("uninline_data %p %llx.%llx inline_version %llu\n",
	     inode, ceph_vinop(inode), inline_version);

	if (inline_version == 1 || /* initial version, no data */
	    inline_version == CEPH_INLINE_NONE)
		goto out;

	if (locked_page) {
		page = locked_page;
		WARN_ON(!PageUptodate(page));
	} else if (ceph_caps_issued(ci) &
		   (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
		page = find_get_page(inode->i_mapping, 0);
		if (page) {
			if (PageUptodate(page)) {
				from_pagecache = true;
				lock_page(page);
			} else {
1796
				put_page(page);
1797 1798 1799 1800 1801 1802 1803
				page = NULL;
			}
		}
	}

	if (page) {
		len = i_size_read(inode);
1804 1805
		if (len > PAGE_SIZE)
			len = PAGE_SIZE;
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
	} else {
		page = __page_cache_alloc(GFP_NOFS);
		if (!page) {
			err = -ENOMEM;
			goto out;
		}
		err = __ceph_do_getattr(inode, page,
					CEPH_STAT_CAP_INLINE_DATA, true);
		if (err < 0) {
			/* no inline data */
			if (err == -ENODATA)
				err = 0;
			goto out;
		}
		len = err;
	}

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
				    ceph_vino(inode), 0, &len, 0, 1,
1825
				    CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
I
Ilya Dryomov 已提交
1826
				    NULL, 0, 0, false);
1827 1828 1829 1830 1831
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}

1832
	req->r_mtime = inode->i_mtime;
1833 1834 1835 1836 1837 1838 1839 1840 1841
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
	ceph_osdc_put_request(req);
	if (err < 0)
		goto out;

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
				    ceph_vino(inode), 0, &len, 1, 3,
1842
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
I
Ilya Dryomov 已提交
1843 1844
				    NULL, ci->i_truncate_seq,
				    ci->i_truncate_size, false);
1845 1846 1847 1848 1849 1850 1851
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}

	osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);

Y
Yan, Zheng 已提交
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872
	{
		__le64 xattr_buf = cpu_to_le64(inline_version);
		err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
					    "inline_version", &xattr_buf,
					    sizeof(xattr_buf),
					    CEPH_OSD_CMPXATTR_OP_GT,
					    CEPH_OSD_CMPXATTR_MODE_U64);
		if (err)
			goto out_put;
	}

	{
		char xattr_buf[32];
		int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
					 "%llu", inline_version);
		err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
					    "inline_version",
					    xattr_buf, xattr_len, 0, 0);
		if (err)
			goto out_put;
	}
1873

1874
	req->r_mtime = inode->i_mtime;
1875 1876 1877
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1878 1879 1880 1881

	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, err);

1882 1883 1884 1885 1886 1887 1888 1889
out_put:
	ceph_osdc_put_request(req);
	if (err == -ECANCELED)
		err = 0;
out:
	if (page && page != locked_page) {
		if (from_pagecache) {
			unlock_page(page);
1890
			put_page(page);
1891 1892 1893 1894 1895 1896 1897 1898 1899
		} else
			__free_pages(page, 0);
	}

	dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
	     inode, ceph_vinop(inode), inline_version, err);
	return err;
}

1900
static const struct vm_operations_struct ceph_vmops = {
1901
	.fault		= ceph_filemap_fault,
S
Sage Weil 已提交
1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
	.page_mkwrite	= ceph_page_mkwrite,
};

int ceph_mmap(struct file *file, struct vm_area_struct *vma)
{
	struct address_space *mapping = file->f_mapping;

	if (!mapping->a_ops->readpage)
		return -ENOEXEC;
	file_accessed(file);
	vma->vm_ops = &ceph_vmops;
	return 0;
}
1915 1916 1917 1918 1919 1920

enum {
	POOL_READ	= 1,
	POOL_WRITE	= 2,
};

Y
Yan, Zheng 已提交
1921 1922
static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
				s64 pool, struct ceph_string *pool_ns)
1923 1924 1925 1926 1927 1928 1929
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
	struct rb_node **p, *parent;
	struct ceph_pool_perm *perm;
	struct page **pages;
Y
Yan, Zheng 已提交
1930
	size_t pool_ns_len;
1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941
	int err = 0, err2 = 0, have = 0;

	down_read(&mdsc->pool_perm_rwsem);
	p = &mdsc->pool_perm_tree.rb_node;
	while (*p) {
		perm = rb_entry(*p, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
Y
Yan, Zheng 已提交
1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952
			int ret = ceph_compare_string(pool_ns,
						perm->pool_ns,
						perm->pool_ns_len);
			if (ret < 0)
				p = &(*p)->rb_left;
			else if (ret > 0)
				p = &(*p)->rb_right;
			else {
				have = perm->perm;
				break;
			}
1953 1954 1955 1956 1957 1958
		}
	}
	up_read(&mdsc->pool_perm_rwsem);
	if (*p)
		goto out;

Y
Yan, Zheng 已提交
1959 1960 1961 1962 1963
	if (pool_ns)
		dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
		     pool, (int)pool_ns->len, pool_ns->str);
	else
		dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
1964 1965

	down_write(&mdsc->pool_perm_rwsem);
Y
Yan, Zheng 已提交
1966
	p = &mdsc->pool_perm_tree.rb_node;
1967 1968 1969 1970 1971 1972 1973 1974 1975
	parent = NULL;
	while (*p) {
		parent = *p;
		perm = rb_entry(parent, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
Y
Yan, Zheng 已提交
1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986
			int ret = ceph_compare_string(pool_ns,
						perm->pool_ns,
						perm->pool_ns_len);
			if (ret < 0)
				p = &(*p)->rb_left;
			else if (ret > 0)
				p = &(*p)->rb_right;
			else {
				have = perm->perm;
				break;
			}
1987 1988 1989 1990 1991 1992 1993
		}
	}
	if (*p) {
		up_write(&mdsc->pool_perm_rwsem);
		goto out;
	}

I
Ilya Dryomov 已提交
1994
	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1995 1996 1997 1998 1999 2000 2001 2002 2003
					 1, false, GFP_NOFS);
	if (!rd_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	rd_req->r_flags = CEPH_OSD_FLAG_READ;
	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
	rd_req->r_base_oloc.pool = pool;
Y
Yan, Zheng 已提交
2004 2005
	if (pool_ns)
		rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
2006
	ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
2007

2008 2009 2010
	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
	if (err)
		goto out_unlock;
2011

I
Ilya Dryomov 已提交
2012
	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
2013 2014 2015 2016 2017 2018
					 1, false, GFP_NOFS);
	if (!wr_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

2019
	wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
2020
	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
2021
	ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
2022
	ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
2023

2024 2025 2026
	err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
	if (err)
		goto out_unlock;
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038

	/* one page should be large enough for STAT data */
	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
	if (IS_ERR(pages)) {
		err = PTR_ERR(pages);
		goto out_unlock;
	}

	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
				     0, false, true);
	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);

2039
	wr_req->r_mtime = ci->vfs_inode.i_mtime;
2040 2041 2042 2043 2044 2045 2046 2047 2048
	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);

	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
	if (!err2)
		err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);

	if (err >= 0 || err == -ENOENT)
		have |= POOL_READ;
2049
	else if (err != -EPERM) {
2050 2051
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
2052
		goto out_unlock;
2053
	}
2054 2055 2056 2057

	if (err2 == 0 || err2 == -EEXIST)
		have |= POOL_WRITE;
	else if (err2 != -EPERM) {
2058 2059
		if (err2 == -EBLOCKLISTED)
			fsc->blocklisted = true;
2060 2061 2062 2063
		err = err2;
		goto out_unlock;
	}

Y
Yan, Zheng 已提交
2064 2065
	pool_ns_len = pool_ns ? pool_ns->len : 0;
	perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
2066 2067 2068 2069 2070 2071 2072
	if (!perm) {
		err = -ENOMEM;
		goto out_unlock;
	}

	perm->pool = pool;
	perm->perm = have;
Y
Yan, Zheng 已提交
2073 2074 2075 2076 2077
	perm->pool_ns_len = pool_ns_len;
	if (pool_ns_len > 0)
		memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
	perm->pool_ns[pool_ns_len] = 0;

2078 2079 2080 2081 2082 2083
	rb_link_node(&perm->node, parent, p);
	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
	err = 0;
out_unlock:
	up_write(&mdsc->pool_perm_rwsem);

2084 2085
	ceph_osdc_put_request(rd_req);
	ceph_osdc_put_request(wr_req);
2086 2087 2088
out:
	if (!err)
		err = have;
Y
Yan, Zheng 已提交
2089 2090 2091 2092 2093
	if (pool_ns)
		dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
		     pool, (int)pool_ns->len, pool_ns->str, err);
	else
		dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
2094 2095 2096
	return err;
}

Y
Yan, Zheng 已提交
2097
int ceph_pool_perm_check(struct inode *inode, int need)
2098
{
Y
Yan, Zheng 已提交
2099
	struct ceph_inode_info *ci = ceph_inode(inode);
Y
Yan, Zheng 已提交
2100
	struct ceph_string *pool_ns;
Y
Yan, Zheng 已提交
2101
	s64 pool;
2102 2103
	int ret, flags;

2104 2105 2106 2107 2108 2109 2110 2111 2112
	if (ci->i_vino.snap != CEPH_NOSNAP) {
		/*
		 * Pool permission check needs to write to the first object.
		 * But for snapshot, head of the first object may have alread
		 * been deleted. Skip check to avoid creating orphan object.
		 */
		return 0;
	}

Y
Yan, Zheng 已提交
2113
	if (ceph_test_mount_opt(ceph_inode_to_client(inode),
2114 2115 2116 2117 2118
				NOPOOLPERM))
		return 0;

	spin_lock(&ci->i_ceph_lock);
	flags = ci->i_ceph_flags;
2119
	pool = ci->i_layout.pool_id;
2120 2121 2122 2123
	spin_unlock(&ci->i_ceph_lock);
check:
	if (flags & CEPH_I_POOL_PERM) {
		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
2124
			dout("ceph_pool_perm_check pool %lld no read perm\n",
2125 2126 2127 2128
			     pool);
			return -EPERM;
		}
		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
2129
			dout("ceph_pool_perm_check pool %lld no write perm\n",
2130 2131 2132 2133 2134 2135
			     pool);
			return -EPERM;
		}
		return 0;
	}

Y
Yan, Zheng 已提交
2136 2137 2138
	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
	ret = __ceph_pool_perm_get(ci, pool, pool_ns);
	ceph_put_string(pool_ns);
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
	if (ret < 0)
		return ret;

	flags = CEPH_I_POOL_PERM;
	if (ret & POOL_READ)
		flags |= CEPH_I_POOL_RD;
	if (ret & POOL_WRITE)
		flags |= CEPH_I_POOL_WR;

	spin_lock(&ci->i_ceph_lock);
Y
Yan, Zheng 已提交
2149 2150 2151
	if (pool == ci->i_layout.pool_id &&
	    pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
		ci->i_ceph_flags |= flags;
2152
        } else {
2153
		pool = ci->i_layout.pool_id;
2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
		flags = ci->i_ceph_flags;
	}
	spin_unlock(&ci->i_ceph_lock);
	goto check;
}

void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
{
	struct ceph_pool_perm *perm;
	struct rb_node *n;

	while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
		n = rb_first(&mdsc->pool_perm_tree);
		perm = rb_entry(n, struct ceph_pool_perm, node);
		rb_erase(n, &mdsc->pool_perm_tree);
		kfree(perm);
	}
}