reada.c 23.7 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (C) 2011 STRATO.  All rights reserved.
 */

#include <linux/sched.h>
#include <linux/pagemap.h>
#include <linux/writeback.h>
#include <linux/blkdev.h>
#include <linux/rbtree.h>
#include <linux/slab.h>
#include <linux/workqueue.h>
#include "ctree.h"
#include "volumes.h"
#include "disk-io.h"
#include "transaction.h"
17
#include "dev-replace.h"
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

#undef DEBUG

/*
 * This is the implementation for the generic read ahead framework.
 *
 * To trigger a readahead, btrfs_reada_add must be called. It will start
 * a read ahead for the given range [start, end) on tree root. The returned
 * handle can either be used to wait on the readahead to finish
 * (btrfs_reada_wait), or to send it to the background (btrfs_reada_detach).
 *
 * The read ahead works as follows:
 * On btrfs_reada_add, the root of the tree is inserted into a radix_tree.
 * reada_start_machine will then search for extents to prefetch and trigger
 * some reads. When a read finishes for a node, all contained node/leaf
 * pointers that lie in the given range will also be enqueued. The reads will
 * be triggered in sequential order, thus giving a big win over a naive
 * enumeration. It will also make use of multi-device layouts. Each disk
 * will have its on read pointer and all disks will by utilized in parallel.
 * Also will no two disks read both sides of a mirror simultaneously, as this
 * would waste seeking capacity. Instead both disks will read different parts
 * of the filesystem.
 * Any number of readaheads can be started in parallel. The read order will be
 * determined globally, i.e. 2 parallel readaheads will normally finish faster
 * than the 2 started one after another.
 */

#define MAX_IN_FLIGHT 6

struct reada_extctl {
	struct list_head	list;
	struct reada_control	*rc;
	u64			generation;
};

struct reada_extent {
	u64			logical;
	struct btrfs_key	top;
	struct list_head	extctl;
57
	int 			refcnt;
58
	spinlock_t		lock;
59
	struct reada_zone	*zones[BTRFS_MAX_MIRRORS];
60
	int			nzones;
61
	int			scheduled;
62 63 64 65 66 67 68 69 70 71
};

struct reada_zone {
	u64			start;
	u64			end;
	u64			elems;
	struct list_head	list;
	spinlock_t		lock;
	int			locked;
	struct btrfs_device	*device;
72 73
	struct btrfs_device	*devs[BTRFS_MAX_MIRRORS]; /* full list, incl
							   * self */
74 75 76 77 78
	int			ndevs;
	struct kref		refcnt;
};

struct reada_machine_work {
79
	struct btrfs_work	work;
80 81 82 83 84 85 86 87 88 89
	struct btrfs_fs_info	*fs_info;
};

static void reada_extent_put(struct btrfs_fs_info *, struct reada_extent *);
static void reada_control_release(struct kref *kref);
static void reada_zone_release(struct kref *kref);
static void reada_start_machine(struct btrfs_fs_info *fs_info);
static void __reada_start_machine(struct btrfs_fs_info *fs_info);

static int reada_add_block(struct reada_control *rc, u64 logical,
90
			   struct btrfs_key *top, u64 generation);
91 92 93

/* recurses */
/* in case of err, eb might be NULL */
94 95
static void __readahead_hook(struct btrfs_fs_info *fs_info,
			     struct reada_extent *re, struct extent_buffer *eb,
96
			     int err)
97 98 99 100 101 102 103 104 105 106 107 108 109
{
	int nritems;
	int i;
	u64 bytenr;
	u64 generation;
	struct list_head list;

	spin_lock(&re->lock);
	/*
	 * just take the full list from the extent. afterwards we
	 * don't need the lock anymore
	 */
	list_replace_init(&re->extctl, &list);
110
	re->scheduled = 0;
111 112
	spin_unlock(&re->lock);

113 114 115 116 117 118 119 120
	/*
	 * this is the error case, the extent buffer has not been
	 * read correctly. We won't access anything from it and
	 * just cleanup our data structures. Effectively this will
	 * cut the branch below this node from read ahead.
	 */
	if (err)
		goto cleanup;
121

122 123 124 125 126 127
	/*
	 * FIXME: currently we just set nritems to 0 if this is a leaf,
	 * effectively ignoring the content. In a next step we could
	 * trigger more readahead depending from the content, e.g.
	 * fetch the checksums for the extents in the leaf.
	 */
128
	if (!btrfs_header_level(eb))
129 130 131 132
		goto cleanup;

	nritems = btrfs_header_nritems(eb);
	generation = btrfs_header_generation(eb);
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
	for (i = 0; i < nritems; i++) {
		struct reada_extctl *rec;
		u64 n_gen;
		struct btrfs_key key;
		struct btrfs_key next_key;

		btrfs_node_key_to_cpu(eb, &key, i);
		if (i + 1 < nritems)
			btrfs_node_key_to_cpu(eb, &next_key, i + 1);
		else
			next_key = re->top;
		bytenr = btrfs_node_blockptr(eb, i);
		n_gen = btrfs_node_ptr_generation(eb, i);

		list_for_each_entry(rec, &list, list) {
			struct reada_control *rc = rec->rc;

			/*
			 * if the generation doesn't match, just ignore this
			 * extctl. This will probably cut off a branch from
			 * prefetch. Alternatively one could start a new (sub-)
			 * prefetch for this branch, starting again from root.
			 * FIXME: move the generation check out of this loop
			 */
#ifdef DEBUG
			if (rec->generation != generation) {
159 160 161 162
				btrfs_debug(fs_info,
					    "generation mismatch for (%llu,%d,%llu) %llu != %llu",
					    key.objectid, key.type, key.offset,
					    rec->generation, generation);
163 164 165 166 167
			}
#endif
			if (rec->generation == generation &&
			    btrfs_comp_cpu_keys(&key, &rc->key_end) < 0 &&
			    btrfs_comp_cpu_keys(&next_key, &rc->key_start) > 0)
168
				reada_add_block(rc, bytenr, &next_key, n_gen);
169 170
		}
	}
171 172

cleanup:
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
	/*
	 * free extctl records
	 */
	while (!list_empty(&list)) {
		struct reada_control *rc;
		struct reada_extctl *rec;

		rec = list_first_entry(&list, struct reada_extctl, list);
		list_del(&rec->list);
		rc = rec->rc;
		kfree(rec);

		kref_get(&rc->refcnt);
		if (atomic_dec_and_test(&rc->elems)) {
			kref_put(&rc->refcnt, reada_control_release);
			wake_up(&rc->wait);
		}
		kref_put(&rc->refcnt, reada_control_release);

		reada_extent_put(fs_info, re);	/* one ref for each entry */
	}

195
	return;
196 197
}

198
int btree_readahead_hook(struct extent_buffer *eb, int err)
199
{
200
	struct btrfs_fs_info *fs_info = eb->fs_info;
201 202
	int ret = 0;
	struct reada_extent *re;
203

204 205 206
	/* find extent */
	spin_lock(&fs_info->reada_lock);
	re = radix_tree_lookup(&fs_info->reada_tree,
207
			       eb->start >> PAGE_SHIFT);
208 209 210 211 212 213 214
	if (re)
		re->refcnt++;
	spin_unlock(&fs_info->reada_lock);
	if (!re) {
		ret = -1;
		goto start_machine;
	}
215

216
	__readahead_hook(fs_info, re, eb, err);
217
	reada_extent_put(fs_info, re);	/* our ref */
218

219 220
start_machine:
	reada_start_machine(fs_info);
221 222 223
	return ret;
}

224
static struct reada_zone *reada_find_zone(struct btrfs_device *dev, u64 logical,
225
					  struct btrfs_bio *bbio)
226
{
227
	struct btrfs_fs_info *fs_info = dev->fs_info;
228 229 230 231 232 233 234 235 236 237
	int ret;
	struct reada_zone *zone;
	struct btrfs_block_group_cache *cache = NULL;
	u64 start;
	u64 end;
	int i;

	zone = NULL;
	spin_lock(&fs_info->reada_lock);
	ret = radix_tree_gang_lookup(&dev->reada_zones, (void **)&zone,
238
				     logical >> PAGE_SHIFT, 1);
239
	if (ret == 1 && logical >= zone->start && logical <= zone->end) {
240 241
		kref_get(&zone->refcnt);
		spin_unlock(&fs_info->reada_lock);
242
		return zone;
243 244
	}

245 246
	spin_unlock(&fs_info->reada_lock);

247 248 249 250 251 252 253 254
	cache = btrfs_lookup_block_group(fs_info, logical);
	if (!cache)
		return NULL;

	start = cache->key.objectid;
	end = start + cache->key.offset - 1;
	btrfs_put_block_group(cache);

255
	zone = kzalloc(sizeof(*zone), GFP_KERNEL);
256 257 258
	if (!zone)
		return NULL;

259 260 261 262 263 264
	ret = radix_tree_preload(GFP_KERNEL);
	if (ret) {
		kfree(zone);
		return NULL;
	}

265 266 267 268 269 270 271 272
	zone->start = start;
	zone->end = end;
	INIT_LIST_HEAD(&zone->list);
	spin_lock_init(&zone->lock);
	zone->locked = 0;
	kref_init(&zone->refcnt);
	zone->elems = 0;
	zone->device = dev; /* our device always sits at index 0 */
273
	for (i = 0; i < bbio->num_stripes; ++i) {
274
		/* bounds have already been checked */
275
		zone->devs[i] = bbio->stripes[i].dev;
276
	}
277
	zone->ndevs = bbio->num_stripes;
278 279 280

	spin_lock(&fs_info->reada_lock);
	ret = radix_tree_insert(&dev->reada_zones,
281
				(unsigned long)(zone->end >> PAGE_SHIFT),
282 283
				zone);

A
Arne Jansen 已提交
284
	if (ret == -EEXIST) {
285
		kfree(zone);
A
Arne Jansen 已提交
286
		ret = radix_tree_gang_lookup(&dev->reada_zones, (void **)&zone,
287
					     logical >> PAGE_SHIFT, 1);
288
		if (ret == 1 && logical >= zone->start && logical <= zone->end)
A
Arne Jansen 已提交
289
			kref_get(&zone->refcnt);
290 291
		else
			zone = NULL;
292
	}
A
Arne Jansen 已提交
293
	spin_unlock(&fs_info->reada_lock);
294
	radix_tree_preload_end();
295 296 297 298

	return zone;
}

299
static struct reada_extent *reada_find_extent(struct btrfs_fs_info *fs_info,
300
					      u64 logical,
301
					      struct btrfs_key *top)
302 303 304
{
	int ret;
	struct reada_extent *re = NULL;
A
Arne Jansen 已提交
305
	struct reada_extent *re_exist = NULL;
306
	struct btrfs_bio *bbio = NULL;
307
	struct btrfs_device *dev;
308
	struct btrfs_device *prev_dev;
309
	u64 length;
310
	int real_stripes;
311
	int nzones = 0;
312
	unsigned long index = logical >> PAGE_SHIFT;
313
	int dev_replace_is_ongoing;
314
	int have_zone = 0;
315 316 317 318

	spin_lock(&fs_info->reada_lock);
	re = radix_tree_lookup(&fs_info->reada_tree, index);
	if (re)
319
		re->refcnt++;
320 321
	spin_unlock(&fs_info->reada_lock);

A
Arne Jansen 已提交
322
	if (re)
323 324
		return re;

325
	re = kzalloc(sizeof(*re), GFP_KERNEL);
326 327 328 329 330 331 332
	if (!re)
		return NULL;

	re->logical = logical;
	re->top = *top;
	INIT_LIST_HEAD(&re->extctl);
	spin_lock_init(&re->lock);
333
	re->refcnt = 1;
334 335 336 337

	/*
	 * map block
	 */
338
	length = fs_info->nodesize;
339 340
	ret = btrfs_map_block(fs_info, BTRFS_MAP_GET_READ_MIRRORS, logical,
			&length, &bbio, 0);
341
	if (ret || !bbio || length < fs_info->nodesize)
342 343
		goto error;

344
	if (bbio->num_stripes > BTRFS_MAX_MIRRORS) {
345
		btrfs_err(fs_info,
346 347
			   "readahead: more than %d copies not supported",
			   BTRFS_MAX_MIRRORS);
348 349 350
		goto error;
	}

351 352
	real_stripes = bbio->num_stripes - bbio->num_tgtdevs;
	for (nzones = 0; nzones < real_stripes; ++nzones) {
353 354
		struct reada_zone *zone;

355
		dev = bbio->stripes[nzones].dev;
356 357

		/* cannot read ahead on missing device. */
358
		if (!dev->bdev)
359 360
			continue;

361
		zone = reada_find_zone(dev, logical, bbio);
362
		if (!zone)
363
			continue;
364

365
		re->zones[re->nzones++] = zone;
366 367 368 369 370 371 372 373 374
		spin_lock(&zone->lock);
		if (!zone->elems)
			kref_get(&zone->refcnt);
		++zone->elems;
		spin_unlock(&zone->lock);
		spin_lock(&fs_info->reada_lock);
		kref_put(&zone->refcnt, reada_zone_release);
		spin_unlock(&fs_info->reada_lock);
	}
375
	if (re->nzones == 0) {
376 377 378 379
		/* not a single zone found, error and out */
		goto error;
	}

380 381 382 383
	ret = radix_tree_preload(GFP_KERNEL);
	if (ret)
		goto error;

384
	/* insert extent in reada_tree + all per-device trees, all or nothing */
385
	btrfs_dev_replace_read_lock(&fs_info->dev_replace);
386 387
	spin_lock(&fs_info->reada_lock);
	ret = radix_tree_insert(&fs_info->reada_tree, index, re);
A
Arne Jansen 已提交
388 389
	if (ret == -EEXIST) {
		re_exist = radix_tree_lookup(&fs_info->reada_tree, index);
390
		re_exist->refcnt++;
A
Arne Jansen 已提交
391
		spin_unlock(&fs_info->reada_lock);
392
		btrfs_dev_replace_read_unlock(&fs_info->dev_replace);
393
		radix_tree_preload_end();
A
Arne Jansen 已提交
394 395
		goto error;
	}
396 397
	if (ret) {
		spin_unlock(&fs_info->reada_lock);
398
		btrfs_dev_replace_read_unlock(&fs_info->dev_replace);
399
		radix_tree_preload_end();
400 401
		goto error;
	}
402
	radix_tree_preload_end();
403
	prev_dev = NULL;
404 405
	dev_replace_is_ongoing = btrfs_dev_replace_is_ongoing(
			&fs_info->dev_replace);
406 407 408
	for (nzones = 0; nzones < re->nzones; ++nzones) {
		dev = re->zones[nzones]->device;

409 410 411 412 413 414 415 416 417 418
		if (dev == prev_dev) {
			/*
			 * in case of DUP, just add the first zone. As both
			 * are on the same device, there's nothing to gain
			 * from adding both.
			 * Also, it wouldn't work, as the tree is per device
			 * and adding would fail with EEXIST
			 */
			continue;
		}
419 420 421
		if (!dev->bdev)
			continue;

422 423 424 425 426 427 428 429
		if (dev_replace_is_ongoing &&
		    dev == fs_info->dev_replace.tgtdev) {
			/*
			 * as this device is selected for reading only as
			 * a last resort, skip it for read ahead.
			 */
			continue;
		}
430
		prev_dev = dev;
431 432
		ret = radix_tree_insert(&dev->reada_extents, index, re);
		if (ret) {
433 434
			while (--nzones >= 0) {
				dev = re->zones[nzones]->device;
435
				BUG_ON(dev == NULL);
436
				/* ignore whether the entry was inserted */
437 438 439 440
				radix_tree_delete(&dev->reada_extents, index);
			}
			radix_tree_delete(&fs_info->reada_tree, index);
			spin_unlock(&fs_info->reada_lock);
441
			btrfs_dev_replace_read_unlock(&fs_info->dev_replace);
442 443
			goto error;
		}
444
		have_zone = 1;
445 446
	}
	spin_unlock(&fs_info->reada_lock);
447
	btrfs_dev_replace_read_unlock(&fs_info->dev_replace);
448

449 450 451
	if (!have_zone)
		goto error;

452
	btrfs_put_bbio(bbio);
453 454 455
	return re;

error:
456
	for (nzones = 0; nzones < re->nzones; ++nzones) {
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
		struct reada_zone *zone;

		zone = re->zones[nzones];
		kref_get(&zone->refcnt);
		spin_lock(&zone->lock);
		--zone->elems;
		if (zone->elems == 0) {
			/*
			 * no fs_info->reada_lock needed, as this can't be
			 * the last ref
			 */
			kref_put(&zone->refcnt, reada_zone_release);
		}
		spin_unlock(&zone->lock);

		spin_lock(&fs_info->reada_lock);
		kref_put(&zone->refcnt, reada_zone_release);
		spin_unlock(&fs_info->reada_lock);
	}
476
	btrfs_put_bbio(bbio);
477
	kfree(re);
A
Arne Jansen 已提交
478
	return re_exist;
479 480 481 482 483 484
}

static void reada_extent_put(struct btrfs_fs_info *fs_info,
			     struct reada_extent *re)
{
	int i;
485
	unsigned long index = re->logical >> PAGE_SHIFT;
486 487

	spin_lock(&fs_info->reada_lock);
488
	if (--re->refcnt) {
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
		spin_unlock(&fs_info->reada_lock);
		return;
	}

	radix_tree_delete(&fs_info->reada_tree, index);
	for (i = 0; i < re->nzones; ++i) {
		struct reada_zone *zone = re->zones[i];

		radix_tree_delete(&zone->device->reada_extents, index);
	}

	spin_unlock(&fs_info->reada_lock);

	for (i = 0; i < re->nzones; ++i) {
		struct reada_zone *zone = re->zones[i];

		kref_get(&zone->refcnt);
		spin_lock(&zone->lock);
		--zone->elems;
		if (zone->elems == 0) {
			/* no fs_info->reada_lock needed, as this can't be
			 * the last ref */
			kref_put(&zone->refcnt, reada_zone_release);
		}
		spin_unlock(&zone->lock);

		spin_lock(&fs_info->reada_lock);
		kref_put(&zone->refcnt, reada_zone_release);
		spin_unlock(&fs_info->reada_lock);
	}

	kfree(re);
}

static void reada_zone_release(struct kref *kref)
{
	struct reada_zone *zone = container_of(kref, struct reada_zone, refcnt);

	radix_tree_delete(&zone->device->reada_zones,
528
			  zone->end >> PAGE_SHIFT);
529 530 531 532 533 534 535 536 537 538 539 540 541

	kfree(zone);
}

static void reada_control_release(struct kref *kref)
{
	struct reada_control *rc = container_of(kref, struct reada_control,
						refcnt);

	kfree(rc);
}

static int reada_add_block(struct reada_control *rc, u64 logical,
542
			   struct btrfs_key *top, u64 generation)
543
{
544
	struct btrfs_fs_info *fs_info = rc->fs_info;
545 546 547
	struct reada_extent *re;
	struct reada_extctl *rec;

548
	/* takes one ref */
549
	re = reada_find_extent(fs_info, logical, top);
550 551 552
	if (!re)
		return -1;

553
	rec = kzalloc(sizeof(*rec), GFP_KERNEL);
554
	if (!rec) {
555
		reada_extent_put(fs_info, re);
556
		return -ENOMEM;
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
	}

	rec->rc = rc;
	rec->generation = generation;
	atomic_inc(&rc->elems);

	spin_lock(&re->lock);
	list_add_tail(&rec->list, &re->extctl);
	spin_unlock(&re->lock);

	/* leave the ref on the extent */

	return 0;
}

/*
 * called with fs_info->reada_lock held
 */
static void reada_peer_zones_set_lock(struct reada_zone *zone, int lock)
{
	int i;
578
	unsigned long index = zone->end >> PAGE_SHIFT;
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612

	for (i = 0; i < zone->ndevs; ++i) {
		struct reada_zone *peer;
		peer = radix_tree_lookup(&zone->devs[i]->reada_zones, index);
		if (peer && peer->device != zone->device)
			peer->locked = lock;
	}
}

/*
 * called with fs_info->reada_lock held
 */
static int reada_pick_zone(struct btrfs_device *dev)
{
	struct reada_zone *top_zone = NULL;
	struct reada_zone *top_locked_zone = NULL;
	u64 top_elems = 0;
	u64 top_locked_elems = 0;
	unsigned long index = 0;
	int ret;

	if (dev->reada_curr_zone) {
		reada_peer_zones_set_lock(dev->reada_curr_zone, 0);
		kref_put(&dev->reada_curr_zone->refcnt, reada_zone_release);
		dev->reada_curr_zone = NULL;
	}
	/* pick the zone with the most elements */
	while (1) {
		struct reada_zone *zone;

		ret = radix_tree_gang_lookup(&dev->reada_zones,
					     (void **)&zone, index, 1);
		if (ret == 0)
			break;
613
		index = (zone->end >> PAGE_SHIFT) + 1;
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
		if (zone->locked) {
			if (zone->elems > top_locked_elems) {
				top_locked_elems = zone->elems;
				top_locked_zone = zone;
			}
		} else {
			if (zone->elems > top_elems) {
				top_elems = zone->elems;
				top_zone = zone;
			}
		}
	}
	if (top_zone)
		dev->reada_curr_zone = top_zone;
	else if (top_locked_zone)
		dev->reada_curr_zone = top_locked_zone;
	else
		return 0;

	dev->reada_next = dev->reada_curr_zone->start;
	kref_get(&dev->reada_curr_zone->refcnt);
	reada_peer_zones_set_lock(dev->reada_curr_zone, 1);

	return 1;
}

640
static int reada_start_machine_dev(struct btrfs_device *dev)
641
{
642
	struct btrfs_fs_info *fs_info = dev->fs_info;
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
	struct reada_extent *re = NULL;
	int mirror_num = 0;
	struct extent_buffer *eb = NULL;
	u64 logical;
	int ret;
	int i;

	spin_lock(&fs_info->reada_lock);
	if (dev->reada_curr_zone == NULL) {
		ret = reada_pick_zone(dev);
		if (!ret) {
			spin_unlock(&fs_info->reada_lock);
			return 0;
		}
	}
	/*
	 * FIXME currently we issue the reads one extent at a time. If we have
	 * a contiguous block of extents, we could also coagulate them or use
	 * plugging to speed things up
	 */
	ret = radix_tree_gang_lookup(&dev->reada_extents, (void **)&re,
664
				     dev->reada_next >> PAGE_SHIFT, 1);
665
	if (ret == 0 || re->logical > dev->reada_curr_zone->end) {
666 667 668 669 670 671 672
		ret = reada_pick_zone(dev);
		if (!ret) {
			spin_unlock(&fs_info->reada_lock);
			return 0;
		}
		re = NULL;
		ret = radix_tree_gang_lookup(&dev->reada_extents, (void **)&re,
673
					dev->reada_next >> PAGE_SHIFT, 1);
674 675 676 677 678
	}
	if (ret == 0) {
		spin_unlock(&fs_info->reada_lock);
		return 0;
	}
679
	dev->reada_next = re->logical + fs_info->nodesize;
680
	re->refcnt++;
681 682 683

	spin_unlock(&fs_info->reada_lock);

684
	spin_lock(&re->lock);
685
	if (re->scheduled || list_empty(&re->extctl)) {
686 687 688 689
		spin_unlock(&re->lock);
		reada_extent_put(fs_info, re);
		return 0;
	}
690
	re->scheduled = 1;
691 692
	spin_unlock(&re->lock);

693 694 695 696 697 698 699 700 701 702 703 704
	/*
	 * find mirror num
	 */
	for (i = 0; i < re->nzones; ++i) {
		if (re->zones[i]->device == dev) {
			mirror_num = i + 1;
			break;
		}
	}
	logical = re->logical;

	atomic_inc(&dev->reada_in_flight);
705
	ret = reada_tree_block_flagged(fs_info, logical, mirror_num, &eb);
706
	if (ret)
707
		__readahead_hook(fs_info, re, NULL, ret);
708
	else if (eb)
709
		__readahead_hook(fs_info, re, eb, ret);
710 711 712 713

	if (eb)
		free_extent_buffer(eb);

714
	atomic_dec(&dev->reada_in_flight);
715 716
	reada_extent_put(fs_info, re);

717 718 719 720
	return 1;

}

721
static void reada_start_machine_worker(struct btrfs_work *work)
722 723 724
{
	struct reada_machine_work *rmw;
	struct btrfs_fs_info *fs_info;
725
	int old_ioprio;
726 727 728 729 730 731

	rmw = container_of(work, struct reada_machine_work, work);
	fs_info = rmw->fs_info;

	kfree(rmw);

732 733 734
	old_ioprio = IOPRIO_PRIO_VALUE(task_nice_ioclass(current),
				       task_nice_ioprio(current));
	set_task_ioprio(current, BTRFS_IOPRIO_READA);
735
	__reada_start_machine(fs_info);
736
	set_task_ioprio(current, old_ioprio);
Z
Zhao Lei 已提交
737 738

	atomic_dec(&fs_info->reada_works_cnt);
739 740 741 742 743 744 745 746 747 748 749 750
}

static void __reada_start_machine(struct btrfs_fs_info *fs_info)
{
	struct btrfs_device *device;
	struct btrfs_fs_devices *fs_devices = fs_info->fs_devices;
	u64 enqueued;
	u64 total = 0;
	int i;

	do {
		enqueued = 0;
751
		mutex_lock(&fs_devices->device_list_mutex);
752 753 754
		list_for_each_entry(device, &fs_devices->devices, dev_list) {
			if (atomic_read(&device->reada_in_flight) <
			    MAX_IN_FLIGHT)
755
				enqueued += reada_start_machine_dev(device);
756
		}
757
		mutex_unlock(&fs_devices->device_list_mutex);
758 759 760 761 762 763 764 765 766 767 768 769 770
		total += enqueued;
	} while (enqueued && total < 10000);

	if (enqueued == 0)
		return;

	/*
	 * If everything is already in the cache, this is effectively single
	 * threaded. To a) not hold the caller for too long and b) to utilize
	 * more cores, we broke the loop above after 10000 iterations and now
	 * enqueue to workers to finish it. This will distribute the load to
	 * the cores.
	 */
Z
Zhao Lei 已提交
771
	for (i = 0; i < 2; ++i) {
772
		reada_start_machine(fs_info);
Z
Zhao Lei 已提交
773 774 775 776
		if (atomic_read(&fs_info->reada_works_cnt) >
		    BTRFS_MAX_MIRRORS * 2)
			break;
	}
777 778 779 780 781 782
}

static void reada_start_machine(struct btrfs_fs_info *fs_info)
{
	struct reada_machine_work *rmw;

783
	rmw = kzalloc(sizeof(*rmw), GFP_KERNEL);
784 785 786 787
	if (!rmw) {
		/* FIXME we cannot handle this properly right now */
		BUG();
	}
788 789
	btrfs_init_work(&rmw->work, btrfs_readahead_helper,
			reada_start_machine_worker, NULL, NULL);
790 791
	rmw->fs_info = fs_info;

792
	btrfs_queue_work(fs_info->readahead_workers, &rmw->work);
Z
Zhao Lei 已提交
793
	atomic_inc(&fs_info->reada_works_cnt);
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
}

#ifdef DEBUG
static void dump_devs(struct btrfs_fs_info *fs_info, int all)
{
	struct btrfs_device *device;
	struct btrfs_fs_devices *fs_devices = fs_info->fs_devices;
	unsigned long index;
	int ret;
	int i;
	int j;
	int cnt;

	spin_lock(&fs_info->reada_lock);
	list_for_each_entry(device, &fs_devices->devices, dev_list) {
809
		btrfs_debug(fs_info, "dev %lld has %d in flight", device->devid,
810 811 812 813 814 815 816 817
			atomic_read(&device->reada_in_flight));
		index = 0;
		while (1) {
			struct reada_zone *zone;
			ret = radix_tree_gang_lookup(&device->reada_zones,
						     (void **)&zone, index, 1);
			if (ret == 0)
				break;
818
			pr_debug("  zone %llu-%llu elems %llu locked %d devs",
819 820
				    zone->start, zone->end, zone->elems,
				    zone->locked);
821
			for (j = 0; j < zone->ndevs; ++j) {
822
				pr_cont(" %lld",
823 824 825
					zone->devs[j]->devid);
			}
			if (device->reada_curr_zone == zone)
826
				pr_cont(" curr off %llu",
827
					device->reada_next - zone->start);
828
			pr_cont("\n");
829
			index = (zone->end >> PAGE_SHIFT) + 1;
830 831 832 833 834 835 836 837 838 839
		}
		cnt = 0;
		index = 0;
		while (all) {
			struct reada_extent *re = NULL;

			ret = radix_tree_gang_lookup(&device->reada_extents,
						     (void **)&re, index, 1);
			if (ret == 0)
				break;
840
			pr_debug("  re: logical %llu size %u empty %d scheduled %d",
841
				re->logical, fs_info->nodesize,
842
				list_empty(&re->extctl), re->scheduled);
843 844

			for (i = 0; i < re->nzones; ++i) {
845
				pr_cont(" zone %llu-%llu devs",
846 847 848
					re->zones[i]->start,
					re->zones[i]->end);
				for (j = 0; j < re->zones[i]->ndevs; ++j) {
849
					pr_cont(" %lld",
850 851 852
						re->zones[i]->devs[j]->devid);
				}
			}
853
			pr_cont("\n");
854
			index = (re->logical >> PAGE_SHIFT) + 1;
855 856 857 858 859 860 861 862 863 864 865 866 867 868
			if (++cnt > 15)
				break;
		}
	}

	index = 0;
	cnt = 0;
	while (all) {
		struct reada_extent *re = NULL;

		ret = radix_tree_gang_lookup(&fs_info->reada_tree, (void **)&re,
					     index, 1);
		if (ret == 0)
			break;
869
		if (!re->scheduled) {
870
			index = (re->logical >> PAGE_SHIFT) + 1;
871 872
			continue;
		}
873
		pr_debug("re: logical %llu size %u list empty %d scheduled %d",
874
			re->logical, fs_info->nodesize,
875
			list_empty(&re->extctl), re->scheduled);
876
		for (i = 0; i < re->nzones; ++i) {
877
			pr_cont(" zone %llu-%llu devs",
878 879
				re->zones[i]->start,
				re->zones[i]->end);
Z
Zhao Lei 已提交
880
			for (j = 0; j < re->zones[i]->ndevs; ++j) {
881
				pr_cont(" %lld",
Z
Zhao Lei 已提交
882
				       re->zones[i]->devs[j]->devid);
883 884
			}
		}
885
		pr_cont("\n");
886
		index = (re->logical >> PAGE_SHIFT) + 1;
887 888 889 890 891 892 893 894 895 896 897 898 899 900
	}
	spin_unlock(&fs_info->reada_lock);
}
#endif

/*
 * interface
 */
struct reada_control *btrfs_reada_add(struct btrfs_root *root,
			struct btrfs_key *key_start, struct btrfs_key *key_end)
{
	struct reada_control *rc;
	u64 start;
	u64 generation;
901
	int ret;
902 903 904 905 906 907 908
	struct extent_buffer *node;
	static struct btrfs_key max_key = {
		.objectid = (u64)-1,
		.type = (u8)-1,
		.offset = (u64)-1
	};

909
	rc = kzalloc(sizeof(*rc), GFP_KERNEL);
910 911 912
	if (!rc)
		return ERR_PTR(-ENOMEM);

913
	rc->fs_info = root->fs_info;
914 915 916 917 918 919 920 921 922 923 924 925
	rc->key_start = *key_start;
	rc->key_end = *key_end;
	atomic_set(&rc->elems, 0);
	init_waitqueue_head(&rc->wait);
	kref_init(&rc->refcnt);
	kref_get(&rc->refcnt); /* one ref for having elements */

	node = btrfs_root_node(root);
	start = node->start;
	generation = btrfs_header_generation(node);
	free_extent_buffer(node);

926
	ret = reada_add_block(rc, start, &max_key, generation);
927
	if (ret) {
928
		kfree(rc);
929
		return ERR_PTR(ret);
930
	}
931 932 933 934 935 936 937 938 939 940

	reada_start_machine(root->fs_info);

	return rc;
}

#ifdef DEBUG
int btrfs_reada_wait(void *handle)
{
	struct reada_control *rc = handle;
941
	struct btrfs_fs_info *fs_info = rc->fs_info;
942 943

	while (atomic_read(&rc->elems)) {
944 945
		if (!atomic_read(&fs_info->reada_works_cnt))
			reada_start_machine(fs_info);
946 947
		wait_event_timeout(rc->wait, atomic_read(&rc->elems) == 0,
				   5 * HZ);
948
		dump_devs(fs_info, atomic_read(&rc->elems) < 10 ? 1 : 0);
949 950
	}

951
	dump_devs(fs_info, atomic_read(&rc->elems) < 10 ? 1 : 0);
952 953 954 955 956 957 958 959 960

	kref_put(&rc->refcnt, reada_control_release);

	return 0;
}
#else
int btrfs_reada_wait(void *handle)
{
	struct reada_control *rc = handle;
961
	struct btrfs_fs_info *fs_info = rc->fs_info;
962 963

	while (atomic_read(&rc->elems)) {
964 965 966 967
		if (!atomic_read(&fs_info->reada_works_cnt))
			reada_start_machine(fs_info);
		wait_event_timeout(rc->wait, atomic_read(&rc->elems) == 0,
				   (HZ + 9) / 10);
968 969 970 971 972 973 974 975 976 977 978 979 980 981
	}

	kref_put(&rc->refcnt, reada_control_release);

	return 0;
}
#endif

void btrfs_reada_detach(void *handle)
{
	struct reada_control *rc = handle;

	kref_put(&rc->refcnt, reada_control_release);
}