async-thread.c 17.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License v2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 */

#include <linux/kthread.h>
20
#include <linux/slab.h>
21 22
#include <linux/list.h>
#include <linux/spinlock.h>
23
#include <linux/freezer.h>
24 25
#include "async-thread.h"

C
Chris Mason 已提交
26 27 28
#define WORK_QUEUED_BIT 0
#define WORK_DONE_BIT 1
#define WORK_ORDER_DONE_BIT 2
29
#define WORK_HIGH_PRIO_BIT 3
C
Chris Mason 已提交
30

31 32 33 34 35
/*
 * container for the kthread task pointer and the list of pending work
 * One of these is allocated per thread.
 */
struct btrfs_worker_thread {
C
Chris Mason 已提交
36 37 38
	/* pool we belong to */
	struct btrfs_workers *workers;

39 40
	/* list of struct btrfs_work that are waiting for service */
	struct list_head pending;
41
	struct list_head prio_pending;
42 43 44 45 46 47 48 49 50

	/* list of worker threads from struct btrfs_workers */
	struct list_head worker_list;

	/* kthread */
	struct task_struct *task;

	/* number of things on the pending list */
	atomic_t num_pending;
51

52 53 54
	/* reference counter for this struct */
	atomic_t refs;

55
	unsigned long sequence;
56 57 58 59 60 61

	/* protects the pending list. */
	spinlock_t lock;

	/* set to non-zero when this thread is already awake and kicking */
	int working;
C
Chris Mason 已提交
62 63 64

	/* are we currently idle */
	int idle;
65 66
};

67 68
static int __btrfs_start_workers(struct btrfs_workers *workers);

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * btrfs_start_workers uses kthread_run, which can block waiting for memory
 * for a very long time.  It will actually throttle on page writeback,
 * and so it may not make progress until after our btrfs worker threads
 * process all of the pending work structs in their queue
 *
 * This means we can't use btrfs_start_workers from inside a btrfs worker
 * thread that is used as part of cleaning dirty memory, which pretty much
 * involves all of the worker threads.
 *
 * Instead we have a helper queue who never has more than one thread
 * where we scheduler thread start operations.  This worker_start struct
 * is used to contain the work and hold a pointer to the queue that needs
 * another worker.
 */
struct worker_start {
	struct btrfs_work work;
	struct btrfs_workers *queue;
};

static void start_new_worker_func(struct btrfs_work *work)
{
	struct worker_start *start;
	start = container_of(work, struct worker_start, work);
93
	__btrfs_start_workers(start->queue);
94 95 96
	kfree(start);
}

C
Chris Mason 已提交
97 98 99 100 101 102 103 104 105 106 107
/*
 * helper function to move a thread onto the idle list after it
 * has finished some requests.
 */
static void check_idle_worker(struct btrfs_worker_thread *worker)
{
	if (!worker->idle && atomic_read(&worker->num_pending) <
	    worker->workers->idle_thresh / 2) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 1;
108 109 110 111 112 113

		/* the list may be empty if the worker is just starting */
		if (!list_empty(&worker->worker_list)) {
			list_move(&worker->worker_list,
				 &worker->workers->idle_list);
		}
C
Chris Mason 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

/*
 * helper function to move a thread off the idle list after new
 * pending work is added.
 */
static void check_busy_worker(struct btrfs_worker_thread *worker)
{
	if (worker->idle && atomic_read(&worker->num_pending) >=
	    worker->workers->idle_thresh) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 0;
129 130 131 132 133

		if (!list_empty(&worker->worker_list)) {
			list_move_tail(&worker->worker_list,
				      &worker->workers->worker_list);
		}
C
Chris Mason 已提交
134 135 136 137
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

138 139 140
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
	struct btrfs_workers *workers = worker->workers;
141
	struct worker_start *start;
142 143 144 145 146 147
	unsigned long flags;

	rmb();
	if (!workers->atomic_start_pending)
		return;

148 149 150 151 152 153 154
	start = kzalloc(sizeof(*start), GFP_NOFS);
	if (!start)
		return;

	start->work.func = start_new_worker_func;
	start->queue = workers;

155 156 157 158 159
	spin_lock_irqsave(&workers->lock, flags);
	if (!workers->atomic_start_pending)
		goto out;

	workers->atomic_start_pending = 0;
160 161
	if (workers->num_workers + workers->num_workers_starting >=
	    workers->max_workers)
162 163
		goto out;

164
	workers->num_workers_starting += 1;
165
	spin_unlock_irqrestore(&workers->lock, flags);
166
	btrfs_queue_worker(workers->atomic_worker_start, &start->work);
167 168 169
	return;

out:
170
	kfree(start);
171 172 173
	spin_unlock_irqrestore(&workers->lock, flags);
}

C
Chris Mason 已提交
174 175 176 177 178 179 180 181
static noinline int run_ordered_completions(struct btrfs_workers *workers,
					    struct btrfs_work *work)
{
	if (!workers->ordered)
		return 0;

	set_bit(WORK_DONE_BIT, &work->flags);

182
	spin_lock(&workers->order_lock);
C
Chris Mason 已提交
183

184 185 186 187 188 189 190 191 192 193
	while (1) {
		if (!list_empty(&workers->prio_order_list)) {
			work = list_entry(workers->prio_order_list.next,
					  struct btrfs_work, order_list);
		} else if (!list_empty(&workers->order_list)) {
			work = list_entry(workers->order_list.next,
					  struct btrfs_work, order_list);
		} else {
			break;
		}
C
Chris Mason 已提交
194 195 196 197 198 199 200 201 202 203 204
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/* we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;

205
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
206 207 208 209

		work->ordered_func(work);

		/* now take the lock again and call the freeing code */
210
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
211 212 213 214
		list_del(&work->order_list);
		work->ordered_free(work);
	}

215
	spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
216 217 218
	return 0;
}

219 220 221 222 223 224 225 226 227 228 229
static void put_worker(struct btrfs_worker_thread *worker)
{
	if (atomic_dec_and_test(&worker->refs))
		kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
	int freeit = 0;

	spin_lock_irq(&worker->lock);
230
	spin_lock(&worker->workers->lock);
231 232 233 234 235
	if (worker->workers->num_workers > 1 &&
	    worker->idle &&
	    !worker->working &&
	    !list_empty(&worker->worker_list) &&
	    list_empty(&worker->prio_pending) &&
236 237
	    list_empty(&worker->pending) &&
	    atomic_read(&worker->num_pending) == 0) {
238 239 240 241
		freeit = 1;
		list_del_init(&worker->worker_list);
		worker->workers->num_workers--;
	}
242
	spin_unlock(&worker->workers->lock);
243 244 245 246 247 248 249
	spin_unlock_irq(&worker->lock);

	if (freeit)
		put_worker(worker);
	return freeit;
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
					struct list_head *prio_head,
					struct list_head *head)
{
	struct btrfs_work *work = NULL;
	struct list_head *cur = NULL;

	if(!list_empty(prio_head))
		cur = prio_head->next;

	smp_mb();
	if (!list_empty(&worker->prio_pending))
		goto refill;

	if (!list_empty(head))
		cur = head->next;

	if (cur)
		goto out;

refill:
	spin_lock_irq(&worker->lock);
	list_splice_tail_init(&worker->prio_pending, prio_head);
	list_splice_tail_init(&worker->pending, head);

	if (!list_empty(prio_head))
		cur = prio_head->next;
	else if (!list_empty(head))
		cur = head->next;
	spin_unlock_irq(&worker->lock);

	if (!cur)
		goto out_fail;

out:
	work = list_entry(cur, struct btrfs_work, list);

out_fail:
	return work;
}

291 292 293 294 295 296
/*
 * main loop for servicing work items
 */
static int worker_loop(void *arg)
{
	struct btrfs_worker_thread *worker = arg;
297 298
	struct list_head head;
	struct list_head prio_head;
299
	struct btrfs_work *work;
300 301 302 303

	INIT_LIST_HEAD(&head);
	INIT_LIST_HEAD(&prio_head);

304
	do {
305
again:
306
		while (1) {
307 308 309 310


			work = get_next_work(worker, &prio_head, &head);
			if (!work)
311 312
				break;

313
			list_del(&work->list);
C
Chris Mason 已提交
314
			clear_bit(WORK_QUEUED_BIT, &work->flags);
315 316 317 318 319 320

			work->worker = worker;

			work->func(work);

			atomic_dec(&worker->num_pending);
C
Chris Mason 已提交
321 322 323 324 325 326
			/*
			 * unless this is an ordered work queue,
			 * 'work' was probably freed by func above.
			 */
			run_ordered_completions(worker->workers, work);

327
			check_pending_worker_creates(worker);
328
			cond_resched();
329
		}
330 331 332 333

		spin_lock_irq(&worker->lock);
		check_idle_worker(worker);

334
		if (freezing(current)) {
335 336
			worker->working = 0;
			spin_unlock_irq(&worker->lock);
337 338 339
			refrigerator();
		} else {
			spin_unlock_irq(&worker->lock);
340 341 342 343 344 345 346
			if (!kthread_should_stop()) {
				cpu_relax();
				/*
				 * we've dropped the lock, did someone else
				 * jump_in?
				 */
				smp_mb();
347 348
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
349 350 351 352 353 354 355 356 357 358 359 360
					continue;

				/*
				 * this short schedule allows more work to
				 * come in without the queue functions
				 * needing to go through wake_up_process()
				 *
				 * worker->working is still 1, so nobody
				 * is going to try and wake us up
				 */
				schedule_timeout(1);
				smp_mb();
361 362
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
363 364
					continue;

A
Amit Gud 已提交
365 366 367
				if (kthread_should_stop())
					break;

368 369 370
				/* still no more work?, sleep for real */
				spin_lock_irq(&worker->lock);
				set_current_state(TASK_INTERRUPTIBLE);
371
				if (!list_empty(&worker->pending) ||
372 373
				    !list_empty(&worker->prio_pending)) {
					spin_unlock_irq(&worker->lock);
374
					set_current_state(TASK_RUNNING);
375 376
					goto again;
				}
377 378 379 380 381 382 383 384

				/*
				 * this makes sure we get a wakeup when someone
				 * adds something new to the queue
				 */
				worker->working = 0;
				spin_unlock_irq(&worker->lock);

385 386 387 388 389 390 391
				if (!kthread_should_stop()) {
					schedule_timeout(HZ * 120);
					if (!worker->working &&
					    try_worker_shutdown(worker)) {
						return 0;
					}
				}
392
			}
393 394 395 396 397 398 399 400 401 402 403 404 405
			__set_current_state(TASK_RUNNING);
		}
	} while (!kthread_should_stop());
	return 0;
}

/*
 * this will wait for all the worker threads to shutdown
 */
int btrfs_stop_workers(struct btrfs_workers *workers)
{
	struct list_head *cur;
	struct btrfs_worker_thread *worker;
406
	int can_stop;
407

408
	spin_lock_irq(&workers->lock);
C
Chris Mason 已提交
409
	list_splice_init(&workers->idle_list, &workers->worker_list);
C
Chris Mason 已提交
410
	while (!list_empty(&workers->worker_list)) {
411 412 413
		cur = workers->worker_list.next;
		worker = list_entry(cur, struct btrfs_worker_thread,
				    worker_list);
414 415 416 417 418 419 420 421 422 423 424 425 426 427

		atomic_inc(&worker->refs);
		workers->num_workers -= 1;
		if (!list_empty(&worker->worker_list)) {
			list_del_init(&worker->worker_list);
			put_worker(worker);
			can_stop = 1;
		} else
			can_stop = 0;
		spin_unlock_irq(&workers->lock);
		if (can_stop)
			kthread_stop(worker->task);
		spin_lock_irq(&workers->lock);
		put_worker(worker);
428
	}
429
	spin_unlock_irq(&workers->lock);
430 431 432 433 434 435
	return 0;
}

/*
 * simple init on struct btrfs_workers
 */
436 437
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
			struct btrfs_workers *async_helper)
438 439
{
	workers->num_workers = 0;
440
	workers->num_workers_starting = 0;
441
	INIT_LIST_HEAD(&workers->worker_list);
C
Chris Mason 已提交
442
	INIT_LIST_HEAD(&workers->idle_list);
C
Chris Mason 已提交
443
	INIT_LIST_HEAD(&workers->order_list);
444
	INIT_LIST_HEAD(&workers->prio_order_list);
445
	spin_lock_init(&workers->lock);
446
	spin_lock_init(&workers->order_lock);
447
	workers->max_workers = max;
448
	workers->idle_thresh = 32;
449
	workers->name = name;
C
Chris Mason 已提交
450
	workers->ordered = 0;
451
	workers->atomic_start_pending = 0;
452
	workers->atomic_worker_start = async_helper;
453 454 455 456 457 458
}

/*
 * starts new worker threads.  This does not enforce the max worker
 * count in case you need to temporarily go past it.
 */
459
static int __btrfs_start_workers(struct btrfs_workers *workers)
460 461 462 463
{
	struct btrfs_worker_thread *worker;
	int ret = 0;

464 465 466 467 468
	worker = kzalloc(sizeof(*worker), GFP_NOFS);
	if (!worker) {
		ret = -ENOMEM;
		goto fail;
	}
469

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
	INIT_LIST_HEAD(&worker->pending);
	INIT_LIST_HEAD(&worker->prio_pending);
	INIT_LIST_HEAD(&worker->worker_list);
	spin_lock_init(&worker->lock);

	atomic_set(&worker->num_pending, 0);
	atomic_set(&worker->refs, 1);
	worker->workers = workers;
	worker->task = kthread_run(worker_loop, worker,
				   "btrfs-%s-%d", workers->name,
				   workers->num_workers + 1);
	if (IS_ERR(worker->task)) {
		ret = PTR_ERR(worker->task);
		kfree(worker);
		goto fail;
485
	}
486 487 488 489 490 491 492 493
	spin_lock_irq(&workers->lock);
	list_add_tail(&worker->worker_list, &workers->idle_list);
	worker->idle = 1;
	workers->num_workers++;
	workers->num_workers_starting--;
	WARN_ON(workers->num_workers_starting < 0);
	spin_unlock_irq(&workers->lock);

494 495
	return 0;
fail:
496 497 498
	spin_lock_irq(&workers->lock);
	workers->num_workers_starting--;
	spin_unlock_irq(&workers->lock);
499 500 501
	return ret;
}

502
int btrfs_start_workers(struct btrfs_workers *workers)
503 504
{
	spin_lock_irq(&workers->lock);
505
	workers->num_workers_starting++;
506
	spin_unlock_irq(&workers->lock);
507
	return __btrfs_start_workers(workers);
508 509
}

510 511 512 513 514 515 516 517 518
/*
 * run through the list and find a worker thread that doesn't have a lot
 * to do right now.  This can return null if we aren't yet at the thread
 * count limit and all of the threads are busy.
 */
static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	struct list_head *next;
519 520 521 522
	int enforce_min;

	enforce_min = (workers->num_workers + workers->num_workers_starting) <
		workers->max_workers;
523 524

	/*
C
Chris Mason 已提交
525 526 527 528
	 * if we find an idle thread, don't move it to the end of the
	 * idle list.  This improves the chance that the next submission
	 * will reuse the same thread, and maybe catch it while it is still
	 * working
529
	 */
C
Chris Mason 已提交
530 531
	if (!list_empty(&workers->idle_list)) {
		next = workers->idle_list.next;
532 533
		worker = list_entry(next, struct btrfs_worker_thread,
				    worker_list);
C
Chris Mason 已提交
534
		return worker;
535
	}
C
Chris Mason 已提交
536 537 538
	if (enforce_min || list_empty(&workers->worker_list))
		return NULL;

539
	/*
C
Chris Mason 已提交
540
	 * if we pick a busy task, move the task to the end of the list.
C
Chris Mason 已提交
541 542 543
	 * hopefully this will keep things somewhat evenly balanced.
	 * Do the move in batches based on the sequence number.  This groups
	 * requests submitted at roughly the same time onto the same worker.
544
	 */
C
Chris Mason 已提交
545 546
	next = workers->worker_list.next;
	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
547
	worker->sequence++;
C
Chris Mason 已提交
548

549
	if (worker->sequence % workers->idle_thresh == 0)
550
		list_move_tail(next, &workers->worker_list);
551 552 553
	return worker;
}

C
Chris Mason 已提交
554 555 556 557 558
/*
 * selects a worker thread to take the next job.  This will either find
 * an idle worker, start a new worker up to the max count, or just return
 * one of the existing busy workers.
 */
559 560 561 562
static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
563
	struct list_head *fallback;
564
	int ret;
565 566 567 568 569 570

again:
	spin_lock_irqsave(&workers->lock, flags);
	worker = next_worker(workers);

	if (!worker) {
571 572
		if (workers->num_workers + workers->num_workers_starting >=
		    workers->max_workers) {
573 574 575 576
			goto fallback;
		} else if (workers->atomic_worker_start) {
			workers->atomic_start_pending = 1;
			goto fallback;
577
		} else {
578
			workers->num_workers_starting++;
579 580
			spin_unlock_irqrestore(&workers->lock, flags);
			/* we're below the limit, start another worker */
581 582 583
			ret = __btrfs_start_workers(workers);
			if (ret)
				goto fallback;
584 585 586
			goto again;
		}
	}
587
	goto found;
588 589 590 591 592 593 594 595 596 597 598 599 600 601

fallback:
	fallback = NULL;
	/*
	 * we have failed to find any workers, just
	 * return the first one we can find.
	 */
	if (!list_empty(&workers->worker_list))
		fallback = workers->worker_list.next;
	if (!list_empty(&workers->idle_list))
		fallback = workers->idle_list.next;
	BUG_ON(!fallback);
	worker = list_entry(fallback,
		  struct btrfs_worker_thread, worker_list);
602 603 604 605 606 607
found:
	/*
	 * this makes sure the worker doesn't exit before it is placed
	 * onto a busy/idle list
	 */
	atomic_inc(&worker->num_pending);
608 609
	spin_unlock_irqrestore(&workers->lock, flags);
	return worker;
610 611 612 613 614 615 616 617 618 619 620
}

/*
 * btrfs_requeue_work just puts the work item back on the tail of the list
 * it was taken from.  It is intended for use with long running work functions
 * that make some progress and want to give the cpu up for others.
 */
int btrfs_requeue_work(struct btrfs_work *work)
{
	struct btrfs_worker_thread *worker = work->worker;
	unsigned long flags;
621
	int wake = 0;
622

C
Chris Mason 已提交
623
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
624 625 626
		goto out;

	spin_lock_irqsave(&worker->lock, flags);
627 628 629 630
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
631
	atomic_inc(&worker->num_pending);
632 633 634 635 636

	/* by definition we're busy, take ourselves off the idle
	 * list
	 */
	if (worker->idle) {
637
		spin_lock(&worker->workers->lock);
638 639
		worker->idle = 0;
		list_move_tail(&worker->worker_list,
640
			      &worker->workers->worker_list);
641
		spin_unlock(&worker->workers->lock);
642
	}
643 644 645 646
	if (!worker->working) {
		wake = 1;
		worker->working = 1;
	}
647

648 649
	if (wake)
		wake_up_process(worker->task);
650
	spin_unlock_irqrestore(&worker->lock, flags);
651
out:
652

653 654 655
	return 0;
}

656 657 658 659 660
void btrfs_set_work_high_prio(struct btrfs_work *work)
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}

661 662 663
/*
 * places a struct btrfs_work into the pending queue of one of the kthreads
 */
664
void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
665 666 667 668 669 670
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
	int wake = 0;

	/* don't requeue something already on a list */
C
Chris Mason 已提交
671
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
672
		return;
673 674

	worker = find_worker(workers);
C
Chris Mason 已提交
675
	if (workers->ordered) {
676 677 678 679 680
		/*
		 * you're not allowed to do ordered queues from an
		 * interrupt handler
		 */
		spin_lock(&workers->order_lock);
681 682 683 684 685 686
		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
			list_add_tail(&work->order_list,
				      &workers->prio_order_list);
		} else {
			list_add_tail(&work->order_list, &workers->order_list);
		}
687
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
688 689 690
	} else {
		INIT_LIST_HEAD(&work->order_list);
	}
691 692

	spin_lock_irqsave(&worker->lock, flags);
693

694 695 696 697
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
C
Chris Mason 已提交
698
	check_busy_worker(worker);
699 700 701 702 703 704 705 706 707 708 709

	/*
	 * avoid calling into wake_up_process if this thread has already
	 * been kicked
	 */
	if (!worker->working)
		wake = 1;
	worker->working = 1;

	if (wake)
		wake_up_process(worker->task);
710
	spin_unlock_irqrestore(&worker->lock, flags);
711
}