async-thread.c 15.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License v2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 */

#include <linux/kthread.h>
#include <linux/list.h>
#include <linux/spinlock.h>
22
#include <linux/freezer.h>
23 24
#include "async-thread.h"

C
Chris Mason 已提交
25 26 27
#define WORK_QUEUED_BIT 0
#define WORK_DONE_BIT 1
#define WORK_ORDER_DONE_BIT 2
28
#define WORK_HIGH_PRIO_BIT 3
C
Chris Mason 已提交
29

30 31 32 33 34
/*
 * container for the kthread task pointer and the list of pending work
 * One of these is allocated per thread.
 */
struct btrfs_worker_thread {
C
Chris Mason 已提交
35 36 37
	/* pool we belong to */
	struct btrfs_workers *workers;

38 39
	/* list of struct btrfs_work that are waiting for service */
	struct list_head pending;
40
	struct list_head prio_pending;
41 42 43 44 45 46 47 48 49

	/* list of worker threads from struct btrfs_workers */
	struct list_head worker_list;

	/* kthread */
	struct task_struct *task;

	/* number of things on the pending list */
	atomic_t num_pending;
50

51 52 53
	/* reference counter for this struct */
	atomic_t refs;

54
	unsigned long sequence;
55 56 57 58 59 60

	/* protects the pending list. */
	spinlock_t lock;

	/* set to non-zero when this thread is already awake and kicking */
	int working;
C
Chris Mason 已提交
61 62 63

	/* are we currently idle */
	int idle;
64 65
};

C
Chris Mason 已提交
66 67 68 69 70 71 72 73 74 75 76
/*
 * helper function to move a thread onto the idle list after it
 * has finished some requests.
 */
static void check_idle_worker(struct btrfs_worker_thread *worker)
{
	if (!worker->idle && atomic_read(&worker->num_pending) <
	    worker->workers->idle_thresh / 2) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 1;
77 78 79 80 81 82

		/* the list may be empty if the worker is just starting */
		if (!list_empty(&worker->worker_list)) {
			list_move(&worker->worker_list,
				 &worker->workers->idle_list);
		}
C
Chris Mason 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

/*
 * helper function to move a thread off the idle list after new
 * pending work is added.
 */
static void check_busy_worker(struct btrfs_worker_thread *worker)
{
	if (worker->idle && atomic_read(&worker->num_pending) >=
	    worker->workers->idle_thresh) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 0;
98 99 100 101 102

		if (!list_empty(&worker->worker_list)) {
			list_move_tail(&worker->worker_list,
				      &worker->workers->worker_list);
		}
C
Chris Mason 已提交
103 104 105 106
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
	struct btrfs_workers *workers = worker->workers;
	unsigned long flags;

	rmb();
	if (!workers->atomic_start_pending)
		return;

	spin_lock_irqsave(&workers->lock, flags);
	if (!workers->atomic_start_pending)
		goto out;

	workers->atomic_start_pending = 0;
	if (workers->num_workers >= workers->max_workers)
		goto out;

	spin_unlock_irqrestore(&workers->lock, flags);
	btrfs_start_workers(workers, 1);
	return;

out:
	spin_unlock_irqrestore(&workers->lock, flags);
}

C
Chris Mason 已提交
132 133 134 135 136 137 138 139
static noinline int run_ordered_completions(struct btrfs_workers *workers,
					    struct btrfs_work *work)
{
	if (!workers->ordered)
		return 0;

	set_bit(WORK_DONE_BIT, &work->flags);

140
	spin_lock(&workers->order_lock);
C
Chris Mason 已提交
141

142 143 144 145 146 147 148 149 150 151
	while (1) {
		if (!list_empty(&workers->prio_order_list)) {
			work = list_entry(workers->prio_order_list.next,
					  struct btrfs_work, order_list);
		} else if (!list_empty(&workers->order_list)) {
			work = list_entry(workers->order_list.next,
					  struct btrfs_work, order_list);
		} else {
			break;
		}
C
Chris Mason 已提交
152 153 154 155 156 157 158 159 160 161 162
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/* we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;

163
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
164 165 166 167

		work->ordered_func(work);

		/* now take the lock again and call the freeing code */
168
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
169 170 171 172
		list_del(&work->order_list);
		work->ordered_free(work);
	}

173
	spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
174 175 176
	return 0;
}

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
static void put_worker(struct btrfs_worker_thread *worker)
{
	if (atomic_dec_and_test(&worker->refs))
		kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
	int freeit = 0;

	spin_lock_irq(&worker->lock);
	spin_lock_irq(&worker->workers->lock);
	if (worker->workers->num_workers > 1 &&
	    worker->idle &&
	    !worker->working &&
	    !list_empty(&worker->worker_list) &&
	    list_empty(&worker->prio_pending) &&
	    list_empty(&worker->pending)) {
		freeit = 1;
		list_del_init(&worker->worker_list);
		worker->workers->num_workers--;
	}
	spin_unlock_irq(&worker->workers->lock);
	spin_unlock_irq(&worker->lock);

	if (freeit)
		put_worker(worker);
	return freeit;
}

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
					struct list_head *prio_head,
					struct list_head *head)
{
	struct btrfs_work *work = NULL;
	struct list_head *cur = NULL;

	if(!list_empty(prio_head))
		cur = prio_head->next;

	smp_mb();
	if (!list_empty(&worker->prio_pending))
		goto refill;

	if (!list_empty(head))
		cur = head->next;

	if (cur)
		goto out;

refill:
	spin_lock_irq(&worker->lock);
	list_splice_tail_init(&worker->prio_pending, prio_head);
	list_splice_tail_init(&worker->pending, head);

	if (!list_empty(prio_head))
		cur = prio_head->next;
	else if (!list_empty(head))
		cur = head->next;
	spin_unlock_irq(&worker->lock);

	if (!cur)
		goto out_fail;

out:
	work = list_entry(cur, struct btrfs_work, list);

out_fail:
	return work;
}

248 249 250 251 252 253
/*
 * main loop for servicing work items
 */
static int worker_loop(void *arg)
{
	struct btrfs_worker_thread *worker = arg;
254 255
	struct list_head head;
	struct list_head prio_head;
256
	struct btrfs_work *work;
257 258 259 260

	INIT_LIST_HEAD(&head);
	INIT_LIST_HEAD(&prio_head);

261
	do {
262
again:
263
		while (1) {
264 265 266 267


			work = get_next_work(worker, &prio_head, &head);
			if (!work)
268 269
				break;

270
			list_del(&work->list);
C
Chris Mason 已提交
271
			clear_bit(WORK_QUEUED_BIT, &work->flags);
272 273 274 275 276 277

			work->worker = worker;

			work->func(work);

			atomic_dec(&worker->num_pending);
C
Chris Mason 已提交
278 279 280 281 282 283
			/*
			 * unless this is an ordered work queue,
			 * 'work' was probably freed by func above.
			 */
			run_ordered_completions(worker->workers, work);

284 285
			check_pending_worker_creates(worker);

286
		}
287 288 289 290

		spin_lock_irq(&worker->lock);
		check_idle_worker(worker);

291
		if (freezing(current)) {
292 293
			worker->working = 0;
			spin_unlock_irq(&worker->lock);
294 295 296
			refrigerator();
		} else {
			spin_unlock_irq(&worker->lock);
297 298 299 300 301 302 303
			if (!kthread_should_stop()) {
				cpu_relax();
				/*
				 * we've dropped the lock, did someone else
				 * jump_in?
				 */
				smp_mb();
304 305
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
306 307 308 309 310 311 312 313 314 315 316 317
					continue;

				/*
				 * this short schedule allows more work to
				 * come in without the queue functions
				 * needing to go through wake_up_process()
				 *
				 * worker->working is still 1, so nobody
				 * is going to try and wake us up
				 */
				schedule_timeout(1);
				smp_mb();
318 319
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
320 321
					continue;

A
Amit Gud 已提交
322 323 324
				if (kthread_should_stop())
					break;

325 326 327
				/* still no more work?, sleep for real */
				spin_lock_irq(&worker->lock);
				set_current_state(TASK_INTERRUPTIBLE);
328
				if (!list_empty(&worker->pending) ||
329 330 331 332
				    !list_empty(&worker->prio_pending)) {
					spin_unlock_irq(&worker->lock);
					goto again;
				}
333 334 335 336 337 338 339 340

				/*
				 * this makes sure we get a wakeup when someone
				 * adds something new to the queue
				 */
				worker->working = 0;
				spin_unlock_irq(&worker->lock);

341 342 343 344 345 346 347
				if (!kthread_should_stop()) {
					schedule_timeout(HZ * 120);
					if (!worker->working &&
					    try_worker_shutdown(worker)) {
						return 0;
					}
				}
348
			}
349 350 351 352 353 354 355 356 357 358 359 360 361
			__set_current_state(TASK_RUNNING);
		}
	} while (!kthread_should_stop());
	return 0;
}

/*
 * this will wait for all the worker threads to shutdown
 */
int btrfs_stop_workers(struct btrfs_workers *workers)
{
	struct list_head *cur;
	struct btrfs_worker_thread *worker;
362
	int can_stop;
363

364
	spin_lock_irq(&workers->lock);
C
Chris Mason 已提交
365
	list_splice_init(&workers->idle_list, &workers->worker_list);
C
Chris Mason 已提交
366
	while (!list_empty(&workers->worker_list)) {
367 368 369
		cur = workers->worker_list.next;
		worker = list_entry(cur, struct btrfs_worker_thread,
				    worker_list);
370 371 372 373 374 375 376 377 378 379 380 381 382 383

		atomic_inc(&worker->refs);
		workers->num_workers -= 1;
		if (!list_empty(&worker->worker_list)) {
			list_del_init(&worker->worker_list);
			put_worker(worker);
			can_stop = 1;
		} else
			can_stop = 0;
		spin_unlock_irq(&workers->lock);
		if (can_stop)
			kthread_stop(worker->task);
		spin_lock_irq(&workers->lock);
		put_worker(worker);
384
	}
385
	spin_unlock_irq(&workers->lock);
386 387 388 389 390 391
	return 0;
}

/*
 * simple init on struct btrfs_workers
 */
392
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
393 394 395
{
	workers->num_workers = 0;
	INIT_LIST_HEAD(&workers->worker_list);
C
Chris Mason 已提交
396
	INIT_LIST_HEAD(&workers->idle_list);
C
Chris Mason 已提交
397
	INIT_LIST_HEAD(&workers->order_list);
398
	INIT_LIST_HEAD(&workers->prio_order_list);
399
	spin_lock_init(&workers->lock);
400
	spin_lock_init(&workers->order_lock);
401
	workers->max_workers = max;
402
	workers->idle_thresh = 32;
403
	workers->name = name;
C
Chris Mason 已提交
404
	workers->ordered = 0;
405 406
	workers->atomic_start_pending = 0;
	workers->atomic_worker_start = 0;
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
}

/*
 * starts new worker threads.  This does not enforce the max worker
 * count in case you need to temporarily go past it.
 */
int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
{
	struct btrfs_worker_thread *worker;
	int ret = 0;
	int i;

	for (i = 0; i < num_workers; i++) {
		worker = kzalloc(sizeof(*worker), GFP_NOFS);
		if (!worker) {
			ret = -ENOMEM;
			goto fail;
		}

		INIT_LIST_HEAD(&worker->pending);
427
		INIT_LIST_HEAD(&worker->prio_pending);
428 429
		INIT_LIST_HEAD(&worker->worker_list);
		spin_lock_init(&worker->lock);
430

431
		atomic_set(&worker->num_pending, 0);
432
		atomic_set(&worker->refs, 1);
433
		worker->workers = workers;
434 435 436
		worker->task = kthread_run(worker_loop, worker,
					   "btrfs-%s-%d", workers->name,
					   workers->num_workers + i);
437 438
		if (IS_ERR(worker->task)) {
			ret = PTR_ERR(worker->task);
439
			kfree(worker);
440 441 442
			goto fail;
		}
		spin_lock_irq(&workers->lock);
C
Chris Mason 已提交
443
		list_add_tail(&worker->worker_list, &workers->idle_list);
444
		worker->idle = 1;
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
		workers->num_workers++;
		spin_unlock_irq(&workers->lock);
	}
	return 0;
fail:
	btrfs_stop_workers(workers);
	return ret;
}

/*
 * run through the list and find a worker thread that doesn't have a lot
 * to do right now.  This can return null if we aren't yet at the thread
 * count limit and all of the threads are busy.
 */
static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	struct list_head *next;
	int enforce_min = workers->num_workers < workers->max_workers;

	/*
C
Chris Mason 已提交
466 467 468 469
	 * if we find an idle thread, don't move it to the end of the
	 * idle list.  This improves the chance that the next submission
	 * will reuse the same thread, and maybe catch it while it is still
	 * working
470
	 */
C
Chris Mason 已提交
471 472
	if (!list_empty(&workers->idle_list)) {
		next = workers->idle_list.next;
473 474
		worker = list_entry(next, struct btrfs_worker_thread,
				    worker_list);
C
Chris Mason 已提交
475
		return worker;
476
	}
C
Chris Mason 已提交
477 478 479
	if (enforce_min || list_empty(&workers->worker_list))
		return NULL;

480
	/*
C
Chris Mason 已提交
481
	 * if we pick a busy task, move the task to the end of the list.
C
Chris Mason 已提交
482 483 484
	 * hopefully this will keep things somewhat evenly balanced.
	 * Do the move in batches based on the sequence number.  This groups
	 * requests submitted at roughly the same time onto the same worker.
485
	 */
C
Chris Mason 已提交
486 487
	next = workers->worker_list.next;
	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
488 489
	atomic_inc(&worker->num_pending);
	worker->sequence++;
C
Chris Mason 已提交
490

491
	if (worker->sequence % workers->idle_thresh == 0)
492
		list_move_tail(next, &workers->worker_list);
493 494 495
	return worker;
}

C
Chris Mason 已提交
496 497 498 499 500
/*
 * selects a worker thread to take the next job.  This will either find
 * an idle worker, start a new worker up to the max count, or just return
 * one of the existing busy workers.
 */
501 502 503 504
static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
505
	struct list_head *fallback;
506 507 508 509 510 511 512

again:
	spin_lock_irqsave(&workers->lock, flags);
	worker = next_worker(workers);

	if (!worker) {
		if (workers->num_workers >= workers->max_workers) {
513 514 515 516
			goto fallback;
		} else if (workers->atomic_worker_start) {
			workers->atomic_start_pending = 1;
			goto fallback;
517 518 519 520 521 522 523
		} else {
			spin_unlock_irqrestore(&workers->lock, flags);
			/* we're below the limit, start another worker */
			btrfs_start_workers(workers, 1);
			goto again;
		}
	}
524
	spin_unlock_irqrestore(&workers->lock, flags);
525
	return worker;
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541

fallback:
	fallback = NULL;
	/*
	 * we have failed to find any workers, just
	 * return the first one we can find.
	 */
	if (!list_empty(&workers->worker_list))
		fallback = workers->worker_list.next;
	if (!list_empty(&workers->idle_list))
		fallback = workers->idle_list.next;
	BUG_ON(!fallback);
	worker = list_entry(fallback,
		  struct btrfs_worker_thread, worker_list);
	spin_unlock_irqrestore(&workers->lock, flags);
	return worker;
542 543 544 545 546 547 548 549 550 551 552
}

/*
 * btrfs_requeue_work just puts the work item back on the tail of the list
 * it was taken from.  It is intended for use with long running work functions
 * that make some progress and want to give the cpu up for others.
 */
int btrfs_requeue_work(struct btrfs_work *work)
{
	struct btrfs_worker_thread *worker = work->worker;
	unsigned long flags;
553
	int wake = 0;
554

C
Chris Mason 已提交
555
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
556 557 558
		goto out;

	spin_lock_irqsave(&worker->lock, flags);
559 560 561 562
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
563
	atomic_inc(&worker->num_pending);
564 565 566 567 568

	/* by definition we're busy, take ourselves off the idle
	 * list
	 */
	if (worker->idle) {
569
		spin_lock(&worker->workers->lock);
570 571 572
		worker->idle = 0;
		list_move_tail(&worker->worker_list,
			       &worker->workers->worker_list);
573
		spin_unlock(&worker->workers->lock);
574
	}
575 576 577 578
	if (!worker->working) {
		wake = 1;
		worker->working = 1;
	}
579

580 581
	if (wake)
		wake_up_process(worker->task);
582
	spin_unlock_irqrestore(&worker->lock, flags);
583
out:
584

585 586 587
	return 0;
}

588 589 590 591 592
void btrfs_set_work_high_prio(struct btrfs_work *work)
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}

593 594 595 596 597 598 599 600 601 602
/*
 * places a struct btrfs_work into the pending queue of one of the kthreads
 */
int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
	int wake = 0;

	/* don't requeue something already on a list */
C
Chris Mason 已提交
603
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
604 605 606
		goto out;

	worker = find_worker(workers);
C
Chris Mason 已提交
607
	if (workers->ordered) {
608 609 610 611 612
		/*
		 * you're not allowed to do ordered queues from an
		 * interrupt handler
		 */
		spin_lock(&workers->order_lock);
613 614 615 616 617 618
		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
			list_add_tail(&work->order_list,
				      &workers->prio_order_list);
		} else {
			list_add_tail(&work->order_list, &workers->order_list);
		}
619
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
620 621 622
	} else {
		INIT_LIST_HEAD(&work->order_list);
	}
623 624

	spin_lock_irqsave(&worker->lock, flags);
625

626 627 628 629
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
630
	atomic_inc(&worker->num_pending);
C
Chris Mason 已提交
631
	check_busy_worker(worker);
632 633 634 635 636 637 638 639 640 641 642

	/*
	 * avoid calling into wake_up_process if this thread has already
	 * been kicked
	 */
	if (!worker->working)
		wake = 1;
	worker->working = 1;

	if (wake)
		wake_up_process(worker->task);
643 644
	spin_unlock_irqrestore(&worker->lock, flags);

645 646 647
out:
	return 0;
}