async-thread.c 25.5 KB
Newer Older
1 2
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
3
 * Copyright (C) 2014 Fujitsu.  All rights reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License v2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 */

#include <linux/kthread.h>
21
#include <linux/slab.h>
22 23
#include <linux/list.h>
#include <linux/spinlock.h>
24
#include <linux/freezer.h>
25
#include <linux/workqueue.h>
26 27
#include "async-thread.h"

C
Chris Mason 已提交
28 29 30
#define WORK_QUEUED_BIT 0
#define WORK_DONE_BIT 1
#define WORK_ORDER_DONE_BIT 2
31
#define WORK_HIGH_PRIO_BIT 3
C
Chris Mason 已提交
32

33 34 35
#define NO_THRESHOLD (-1)
#define DFT_THRESHOLD (32)

36 37 38 39 40
/*
 * container for the kthread task pointer and the list of pending work
 * One of these is allocated per thread.
 */
struct btrfs_worker_thread {
C
Chris Mason 已提交
41 42 43
	/* pool we belong to */
	struct btrfs_workers *workers;

44 45
	/* list of struct btrfs_work that are waiting for service */
	struct list_head pending;
46
	struct list_head prio_pending;
47 48 49 50 51 52 53 54 55

	/* list of worker threads from struct btrfs_workers */
	struct list_head worker_list;

	/* kthread */
	struct task_struct *task;

	/* number of things on the pending list */
	atomic_t num_pending;
56

57 58 59
	/* reference counter for this struct */
	atomic_t refs;

60
	unsigned long sequence;
61 62 63 64 65 66

	/* protects the pending list. */
	spinlock_t lock;

	/* set to non-zero when this thread is already awake and kicking */
	int working;
C
Chris Mason 已提交
67 68 69

	/* are we currently idle */
	int idle;
70 71
};

72 73
static int __btrfs_start_workers(struct btrfs_workers *workers);

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
/*
 * btrfs_start_workers uses kthread_run, which can block waiting for memory
 * for a very long time.  It will actually throttle on page writeback,
 * and so it may not make progress until after our btrfs worker threads
 * process all of the pending work structs in their queue
 *
 * This means we can't use btrfs_start_workers from inside a btrfs worker
 * thread that is used as part of cleaning dirty memory, which pretty much
 * involves all of the worker threads.
 *
 * Instead we have a helper queue who never has more than one thread
 * where we scheduler thread start operations.  This worker_start struct
 * is used to contain the work and hold a pointer to the queue that needs
 * another worker.
 */
struct worker_start {
	struct btrfs_work work;
	struct btrfs_workers *queue;
};

static void start_new_worker_func(struct btrfs_work *work)
{
	struct worker_start *start;
	start = container_of(work, struct worker_start, work);
98
	__btrfs_start_workers(start->queue);
99 100 101
	kfree(start);
}

C
Chris Mason 已提交
102 103 104 105 106 107 108 109 110 111 112
/*
 * helper function to move a thread onto the idle list after it
 * has finished some requests.
 */
static void check_idle_worker(struct btrfs_worker_thread *worker)
{
	if (!worker->idle && atomic_read(&worker->num_pending) <
	    worker->workers->idle_thresh / 2) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 1;
113 114

		/* the list may be empty if the worker is just starting */
115 116
		if (!list_empty(&worker->worker_list) &&
		    !worker->workers->stopping) {
117 118 119
			list_move(&worker->worker_list,
				 &worker->workers->idle_list);
		}
C
Chris Mason 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

/*
 * helper function to move a thread off the idle list after new
 * pending work is added.
 */
static void check_busy_worker(struct btrfs_worker_thread *worker)
{
	if (worker->idle && atomic_read(&worker->num_pending) >=
	    worker->workers->idle_thresh) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 0;
135

136 137
		if (!list_empty(&worker->worker_list) &&
		    !worker->workers->stopping) {
138 139 140
			list_move_tail(&worker->worker_list,
				      &worker->workers->worker_list);
		}
C
Chris Mason 已提交
141 142 143 144
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

145 146 147
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
	struct btrfs_workers *workers = worker->workers;
148
	struct worker_start *start;
149 150 151 152 153 154
	unsigned long flags;

	rmb();
	if (!workers->atomic_start_pending)
		return;

155 156 157 158 159 160 161
	start = kzalloc(sizeof(*start), GFP_NOFS);
	if (!start)
		return;

	start->work.func = start_new_worker_func;
	start->queue = workers;

162 163 164 165 166
	spin_lock_irqsave(&workers->lock, flags);
	if (!workers->atomic_start_pending)
		goto out;

	workers->atomic_start_pending = 0;
167 168
	if (workers->num_workers + workers->num_workers_starting >=
	    workers->max_workers)
169 170
		goto out;

171
	workers->num_workers_starting += 1;
172
	spin_unlock_irqrestore(&workers->lock, flags);
173
	btrfs_queue_worker(workers->atomic_worker_start, &start->work);
174 175 176
	return;

out:
177
	kfree(start);
178 179 180
	spin_unlock_irqrestore(&workers->lock, flags);
}

181
static noinline void run_ordered_completions(struct btrfs_workers *workers,
C
Chris Mason 已提交
182 183 184
					    struct btrfs_work *work)
{
	if (!workers->ordered)
185
		return;
C
Chris Mason 已提交
186 187 188

	set_bit(WORK_DONE_BIT, &work->flags);

189
	spin_lock(&workers->order_lock);
C
Chris Mason 已提交
190

191 192 193 194 195 196 197 198 199 200
	while (1) {
		if (!list_empty(&workers->prio_order_list)) {
			work = list_entry(workers->prio_order_list.next,
					  struct btrfs_work, order_list);
		} else if (!list_empty(&workers->order_list)) {
			work = list_entry(workers->order_list.next,
					  struct btrfs_work, order_list);
		} else {
			break;
		}
C
Chris Mason 已提交
201 202 203 204 205 206 207 208 209 210 211
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/* we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;

212
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
213 214 215

		work->ordered_func(work);

216
		/* now take the lock again and drop our item from the list */
217
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
218
		list_del(&work->order_list);
219 220 221 222 223 224
		spin_unlock(&workers->order_lock);

		/*
		 * we don't want to call the ordered free functions
		 * with the lock held though
		 */
C
Chris Mason 已提交
225
		work->ordered_free(work);
226
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
227 228
	}

229
	spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
230 231
}

232 233 234 235 236 237 238 239 240 241 242
static void put_worker(struct btrfs_worker_thread *worker)
{
	if (atomic_dec_and_test(&worker->refs))
		kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
	int freeit = 0;

	spin_lock_irq(&worker->lock);
243
	spin_lock(&worker->workers->lock);
244 245 246 247 248
	if (worker->workers->num_workers > 1 &&
	    worker->idle &&
	    !worker->working &&
	    !list_empty(&worker->worker_list) &&
	    list_empty(&worker->prio_pending) &&
249 250
	    list_empty(&worker->pending) &&
	    atomic_read(&worker->num_pending) == 0) {
251 252 253 254
		freeit = 1;
		list_del_init(&worker->worker_list);
		worker->workers->num_workers--;
	}
255
	spin_unlock(&worker->workers->lock);
256 257 258 259 260 261 262
	spin_unlock_irq(&worker->lock);

	if (freeit)
		put_worker(worker);
	return freeit;
}

263 264 265 266 267 268 269
static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
					struct list_head *prio_head,
					struct list_head *head)
{
	struct btrfs_work *work = NULL;
	struct list_head *cur = NULL;

270
	if (!list_empty(prio_head)) {
271
		cur = prio_head->next;
272 273
		goto out;
	}
274 275 276 277 278

	smp_mb();
	if (!list_empty(&worker->prio_pending))
		goto refill;

279
	if (!list_empty(head)) {
280 281
		cur = head->next;
		goto out;
282
	}
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304

refill:
	spin_lock_irq(&worker->lock);
	list_splice_tail_init(&worker->prio_pending, prio_head);
	list_splice_tail_init(&worker->pending, head);

	if (!list_empty(prio_head))
		cur = prio_head->next;
	else if (!list_empty(head))
		cur = head->next;
	spin_unlock_irq(&worker->lock);

	if (!cur)
		goto out_fail;

out:
	work = list_entry(cur, struct btrfs_work, list);

out_fail:
	return work;
}

305 306 307 308 309 310
/*
 * main loop for servicing work items
 */
static int worker_loop(void *arg)
{
	struct btrfs_worker_thread *worker = arg;
311 312
	struct list_head head;
	struct list_head prio_head;
313
	struct btrfs_work *work;
314 315 316 317

	INIT_LIST_HEAD(&head);
	INIT_LIST_HEAD(&prio_head);

318
	do {
319
again:
320
		while (1) {
321 322 323 324


			work = get_next_work(worker, &prio_head, &head);
			if (!work)
325 326
				break;

327
			list_del(&work->list);
C
Chris Mason 已提交
328
			clear_bit(WORK_QUEUED_BIT, &work->flags);
329 330 331 332 333 334

			work->worker = worker;

			work->func(work);

			atomic_dec(&worker->num_pending);
C
Chris Mason 已提交
335 336 337 338 339 340
			/*
			 * unless this is an ordered work queue,
			 * 'work' was probably freed by func above.
			 */
			run_ordered_completions(worker->workers, work);

341
			check_pending_worker_creates(worker);
342
			cond_resched();
343
		}
344 345 346 347

		spin_lock_irq(&worker->lock);
		check_idle_worker(worker);

348
		if (freezing(current)) {
349 350
			worker->working = 0;
			spin_unlock_irq(&worker->lock);
351
			try_to_freeze();
352 353
		} else {
			spin_unlock_irq(&worker->lock);
354 355 356 357 358 359 360
			if (!kthread_should_stop()) {
				cpu_relax();
				/*
				 * we've dropped the lock, did someone else
				 * jump_in?
				 */
				smp_mb();
361 362
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
363 364 365 366 367 368 369 370 371 372 373 374
					continue;

				/*
				 * this short schedule allows more work to
				 * come in without the queue functions
				 * needing to go through wake_up_process()
				 *
				 * worker->working is still 1, so nobody
				 * is going to try and wake us up
				 */
				schedule_timeout(1);
				smp_mb();
375 376
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
377 378
					continue;

A
Amit Gud 已提交
379 380 381
				if (kthread_should_stop())
					break;

382 383 384
				/* still no more work?, sleep for real */
				spin_lock_irq(&worker->lock);
				set_current_state(TASK_INTERRUPTIBLE);
385
				if (!list_empty(&worker->pending) ||
386 387
				    !list_empty(&worker->prio_pending)) {
					spin_unlock_irq(&worker->lock);
388
					set_current_state(TASK_RUNNING);
389 390
					goto again;
				}
391 392 393 394 395 396 397 398

				/*
				 * this makes sure we get a wakeup when someone
				 * adds something new to the queue
				 */
				worker->working = 0;
				spin_unlock_irq(&worker->lock);

399 400 401 402 403 404 405
				if (!kthread_should_stop()) {
					schedule_timeout(HZ * 120);
					if (!worker->working &&
					    try_worker_shutdown(worker)) {
						return 0;
					}
				}
406
			}
407 408 409 410 411 412 413 414 415
			__set_current_state(TASK_RUNNING);
		}
	} while (!kthread_should_stop());
	return 0;
}

/*
 * this will wait for all the worker threads to shutdown
 */
416
void btrfs_stop_workers(struct btrfs_workers *workers)
417 418 419
{
	struct list_head *cur;
	struct btrfs_worker_thread *worker;
420
	int can_stop;
421

422
	spin_lock_irq(&workers->lock);
423
	workers->stopping = 1;
C
Chris Mason 已提交
424
	list_splice_init(&workers->idle_list, &workers->worker_list);
C
Chris Mason 已提交
425
	while (!list_empty(&workers->worker_list)) {
426 427 428
		cur = workers->worker_list.next;
		worker = list_entry(cur, struct btrfs_worker_thread,
				    worker_list);
429 430 431 432 433 434 435 436 437 438 439 440 441 442

		atomic_inc(&worker->refs);
		workers->num_workers -= 1;
		if (!list_empty(&worker->worker_list)) {
			list_del_init(&worker->worker_list);
			put_worker(worker);
			can_stop = 1;
		} else
			can_stop = 0;
		spin_unlock_irq(&workers->lock);
		if (can_stop)
			kthread_stop(worker->task);
		spin_lock_irq(&workers->lock);
		put_worker(worker);
443
	}
444
	spin_unlock_irq(&workers->lock);
445 446 447 448 449
}

/*
 * simple init on struct btrfs_workers
 */
450 451
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
			struct btrfs_workers *async_helper)
452 453
{
	workers->num_workers = 0;
454
	workers->num_workers_starting = 0;
455
	INIT_LIST_HEAD(&workers->worker_list);
C
Chris Mason 已提交
456
	INIT_LIST_HEAD(&workers->idle_list);
C
Chris Mason 已提交
457
	INIT_LIST_HEAD(&workers->order_list);
458
	INIT_LIST_HEAD(&workers->prio_order_list);
459
	spin_lock_init(&workers->lock);
460
	spin_lock_init(&workers->order_lock);
461
	workers->max_workers = max;
462
	workers->idle_thresh = 32;
463
	workers->name = name;
C
Chris Mason 已提交
464
	workers->ordered = 0;
465
	workers->atomic_start_pending = 0;
466
	workers->atomic_worker_start = async_helper;
467
	workers->stopping = 0;
468 469 470 471 472 473
}

/*
 * starts new worker threads.  This does not enforce the max worker
 * count in case you need to temporarily go past it.
 */
474
static int __btrfs_start_workers(struct btrfs_workers *workers)
475 476 477 478
{
	struct btrfs_worker_thread *worker;
	int ret = 0;

479 480 481 482 483
	worker = kzalloc(sizeof(*worker), GFP_NOFS);
	if (!worker) {
		ret = -ENOMEM;
		goto fail;
	}
484

485 486 487 488 489 490 491 492
	INIT_LIST_HEAD(&worker->pending);
	INIT_LIST_HEAD(&worker->prio_pending);
	INIT_LIST_HEAD(&worker->worker_list);
	spin_lock_init(&worker->lock);

	atomic_set(&worker->num_pending, 0);
	atomic_set(&worker->refs, 1);
	worker->workers = workers;
493 494 495
	worker->task = kthread_create(worker_loop, worker,
				      "btrfs-%s-%d", workers->name,
				      workers->num_workers + 1);
496 497 498
	if (IS_ERR(worker->task)) {
		ret = PTR_ERR(worker->task);
		goto fail;
499
	}
500

501
	spin_lock_irq(&workers->lock);
502 503
	if (workers->stopping) {
		spin_unlock_irq(&workers->lock);
504
		ret = -EINVAL;
505 506
		goto fail_kthread;
	}
507 508 509 510 511 512 513
	list_add_tail(&worker->worker_list, &workers->idle_list);
	worker->idle = 1;
	workers->num_workers++;
	workers->num_workers_starting--;
	WARN_ON(workers->num_workers_starting < 0);
	spin_unlock_irq(&workers->lock);

514
	wake_up_process(worker->task);
515
	return 0;
516 517 518

fail_kthread:
	kthread_stop(worker->task);
519
fail:
520
	kfree(worker);
521 522 523
	spin_lock_irq(&workers->lock);
	workers->num_workers_starting--;
	spin_unlock_irq(&workers->lock);
524 525 526
	return ret;
}

527
int btrfs_start_workers(struct btrfs_workers *workers)
528 529
{
	spin_lock_irq(&workers->lock);
530
	workers->num_workers_starting++;
531
	spin_unlock_irq(&workers->lock);
532
	return __btrfs_start_workers(workers);
533 534
}

535 536 537 538 539 540 541 542 543
/*
 * run through the list and find a worker thread that doesn't have a lot
 * to do right now.  This can return null if we aren't yet at the thread
 * count limit and all of the threads are busy.
 */
static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	struct list_head *next;
544 545 546 547
	int enforce_min;

	enforce_min = (workers->num_workers + workers->num_workers_starting) <
		workers->max_workers;
548 549

	/*
C
Chris Mason 已提交
550 551 552 553
	 * if we find an idle thread, don't move it to the end of the
	 * idle list.  This improves the chance that the next submission
	 * will reuse the same thread, and maybe catch it while it is still
	 * working
554
	 */
C
Chris Mason 已提交
555 556
	if (!list_empty(&workers->idle_list)) {
		next = workers->idle_list.next;
557 558
		worker = list_entry(next, struct btrfs_worker_thread,
				    worker_list);
C
Chris Mason 已提交
559
		return worker;
560
	}
C
Chris Mason 已提交
561 562 563
	if (enforce_min || list_empty(&workers->worker_list))
		return NULL;

564
	/*
C
Chris Mason 已提交
565
	 * if we pick a busy task, move the task to the end of the list.
C
Chris Mason 已提交
566 567 568
	 * hopefully this will keep things somewhat evenly balanced.
	 * Do the move in batches based on the sequence number.  This groups
	 * requests submitted at roughly the same time onto the same worker.
569
	 */
C
Chris Mason 已提交
570 571
	next = workers->worker_list.next;
	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
572
	worker->sequence++;
C
Chris Mason 已提交
573

574
	if (worker->sequence % workers->idle_thresh == 0)
575
		list_move_tail(next, &workers->worker_list);
576 577 578
	return worker;
}

C
Chris Mason 已提交
579 580 581 582 583
/*
 * selects a worker thread to take the next job.  This will either find
 * an idle worker, start a new worker up to the max count, or just return
 * one of the existing busy workers.
 */
584 585 586 587
static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
588
	struct list_head *fallback;
589
	int ret;
590 591

	spin_lock_irqsave(&workers->lock, flags);
592
again:
593 594 595
	worker = next_worker(workers);

	if (!worker) {
596 597
		if (workers->num_workers + workers->num_workers_starting >=
		    workers->max_workers) {
598 599 600 601
			goto fallback;
		} else if (workers->atomic_worker_start) {
			workers->atomic_start_pending = 1;
			goto fallback;
602
		} else {
603
			workers->num_workers_starting++;
604 605
			spin_unlock_irqrestore(&workers->lock, flags);
			/* we're below the limit, start another worker */
606
			ret = __btrfs_start_workers(workers);
607
			spin_lock_irqsave(&workers->lock, flags);
608 609
			if (ret)
				goto fallback;
610 611 612
			goto again;
		}
	}
613
	goto found;
614 615 616 617 618 619 620 621 622 623 624 625 626 627

fallback:
	fallback = NULL;
	/*
	 * we have failed to find any workers, just
	 * return the first one we can find.
	 */
	if (!list_empty(&workers->worker_list))
		fallback = workers->worker_list.next;
	if (!list_empty(&workers->idle_list))
		fallback = workers->idle_list.next;
	BUG_ON(!fallback);
	worker = list_entry(fallback,
		  struct btrfs_worker_thread, worker_list);
628 629 630 631 632 633
found:
	/*
	 * this makes sure the worker doesn't exit before it is placed
	 * onto a busy/idle list
	 */
	atomic_inc(&worker->num_pending);
634 635
	spin_unlock_irqrestore(&workers->lock, flags);
	return worker;
636 637 638 639 640 641 642
}

/*
 * btrfs_requeue_work just puts the work item back on the tail of the list
 * it was taken from.  It is intended for use with long running work functions
 * that make some progress and want to give the cpu up for others.
 */
643
void btrfs_requeue_work(struct btrfs_work *work)
644 645 646
{
	struct btrfs_worker_thread *worker = work->worker;
	unsigned long flags;
647
	int wake = 0;
648

C
Chris Mason 已提交
649
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
650
		return;
651 652

	spin_lock_irqsave(&worker->lock, flags);
653 654 655 656
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
657
	atomic_inc(&worker->num_pending);
658 659 660 661 662

	/* by definition we're busy, take ourselves off the idle
	 * list
	 */
	if (worker->idle) {
663
		spin_lock(&worker->workers->lock);
664 665
		worker->idle = 0;
		list_move_tail(&worker->worker_list,
666
			      &worker->workers->worker_list);
667
		spin_unlock(&worker->workers->lock);
668
	}
669 670 671 672
	if (!worker->working) {
		wake = 1;
		worker->working = 1;
	}
673

674 675
	if (wake)
		wake_up_process(worker->task);
676
	spin_unlock_irqrestore(&worker->lock, flags);
677 678
}

679 680 681 682 683
void btrfs_set_work_high_prio(struct btrfs_work *work)
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}

684 685 686
/*
 * places a struct btrfs_work into the pending queue of one of the kthreads
 */
687
void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
688 689 690 691 692 693
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
	int wake = 0;

	/* don't requeue something already on a list */
C
Chris Mason 已提交
694
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
695
		return;
696 697

	worker = find_worker(workers);
C
Chris Mason 已提交
698
	if (workers->ordered) {
699 700 701 702 703
		/*
		 * you're not allowed to do ordered queues from an
		 * interrupt handler
		 */
		spin_lock(&workers->order_lock);
704 705 706 707 708 709
		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
			list_add_tail(&work->order_list,
				      &workers->prio_order_list);
		} else {
			list_add_tail(&work->order_list, &workers->order_list);
		}
710
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
711 712 713
	} else {
		INIT_LIST_HEAD(&work->order_list);
	}
714 715

	spin_lock_irqsave(&worker->lock, flags);
716

717 718 719 720
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
C
Chris Mason 已提交
721
	check_busy_worker(worker);
722 723 724 725 726 727 728 729 730 731 732

	/*
	 * avoid calling into wake_up_process if this thread has already
	 * been kicked
	 */
	if (!worker->working)
		wake = 1;
	worker->working = 1;

	if (wake)
		wake_up_process(worker->task);
733
	spin_unlock_irqrestore(&worker->lock, flags);
734
}
735

736
struct __btrfs_workqueue_struct {
737 738 739 740 741 742
	struct workqueue_struct *normal_wq;
	/* List head pointing to ordered work list */
	struct list_head ordered_list;

	/* Spinlock for ordered_list */
	spinlock_t list_lock;
743 744 745 746 747 748 749 750

	/* Thresholding related variants */
	atomic_t pending;
	int max_active;
	int current_max;
	int thresh;
	unsigned int count;
	spinlock_t thres_lock;
751 752
};

753 754 755 756 757 758
struct btrfs_workqueue_struct {
	struct __btrfs_workqueue_struct *normal;
	struct __btrfs_workqueue_struct *high;
};

static inline struct __btrfs_workqueue_struct
759
*__btrfs_alloc_workqueue(char *name, int flags, int max_active, int thresh)
760 761 762 763 764 765
{
	struct __btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);

	if (unlikely(!ret))
		return NULL;

766 767 768 769 770 771 772 773 774 775 776 777 778
	ret->max_active = max_active;
	atomic_set(&ret->pending, 0);
	if (thresh == 0)
		thresh = DFT_THRESHOLD;
	/* For low threshold, disabling threshold is a better choice */
	if (thresh < DFT_THRESHOLD) {
		ret->current_max = max_active;
		ret->thresh = NO_THRESHOLD;
	} else {
		ret->current_max = 1;
		ret->thresh = thresh;
	}

779 780
	if (flags & WQ_HIGHPRI)
		ret->normal_wq = alloc_workqueue("%s-%s-high", flags,
781 782
						 ret->max_active,
						 "btrfs", name);
783 784
	else
		ret->normal_wq = alloc_workqueue("%s-%s", flags,
785 786
						 ret->max_active, "btrfs",
						 name);
787 788 789 790 791 792 793
	if (unlikely(!ret->normal_wq)) {
		kfree(ret);
		return NULL;
	}

	INIT_LIST_HEAD(&ret->ordered_list);
	spin_lock_init(&ret->list_lock);
794
	spin_lock_init(&ret->thres_lock);
795 796 797 798 799 800
	return ret;
}

static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq);

801 802
struct btrfs_workqueue_struct *btrfs_alloc_workqueue(char *name,
						     int flags,
803 804
						     int max_active,
						     int thresh)
805 806 807 808 809 810
{
	struct btrfs_workqueue_struct *ret = kzalloc(sizeof(*ret), GFP_NOFS);

	if (unlikely(!ret))
		return NULL;

811
	ret->normal = __btrfs_alloc_workqueue(name, flags & ~WQ_HIGHPRI,
812
					      max_active, thresh);
813
	if (unlikely(!ret->normal)) {
814 815 816 817
		kfree(ret);
		return NULL;
	}

818
	if (flags & WQ_HIGHPRI) {
819 820
		ret->high = __btrfs_alloc_workqueue(name, flags, max_active,
						    thresh);
821 822 823 824 825 826
		if (unlikely(!ret->high)) {
			__btrfs_destroy_workqueue(ret->normal);
			kfree(ret);
			return NULL;
		}
	}
827 828 829
	return ret;
}

830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
/*
 * Hook for threshold which will be called in btrfs_queue_work.
 * This hook WILL be called in IRQ handler context,
 * so workqueue_set_max_active MUST NOT be called in this hook
 */
static inline void thresh_queue_hook(struct __btrfs_workqueue_struct *wq)
{
	if (wq->thresh == NO_THRESHOLD)
		return;
	atomic_inc(&wq->pending);
}

/*
 * Hook for threshold which will be called before executing the work,
 * This hook is called in kthread content.
 * So workqueue_set_max_active is called here.
 */
static inline void thresh_exec_hook(struct __btrfs_workqueue_struct *wq)
{
	int new_max_active;
	long pending;
	int need_change = 0;

	if (wq->thresh == NO_THRESHOLD)
		return;

	atomic_dec(&wq->pending);
	spin_lock(&wq->thres_lock);
	/*
	 * Use wq->count to limit the calling frequency of
	 * workqueue_set_max_active.
	 */
	wq->count++;
	wq->count %= (wq->thresh / 4);
	if (!wq->count)
		goto  out;
	new_max_active = wq->current_max;

	/*
	 * pending may be changed later, but it's OK since we really
	 * don't need it so accurate to calculate new_max_active.
	 */
	pending = atomic_read(&wq->pending);
	if (pending > wq->thresh)
		new_max_active++;
	if (pending < wq->thresh / 2)
		new_max_active--;
	new_max_active = clamp_val(new_max_active, 1, wq->max_active);
	if (new_max_active != wq->current_max)  {
		need_change = 1;
		wq->current_max = new_max_active;
	}
out:
	spin_unlock(&wq->thres_lock);

	if (need_change) {
		workqueue_set_max_active(wq->normal_wq, wq->current_max);
	}
}

890
static void run_ordered_work(struct __btrfs_workqueue_struct *wq)
891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
{
	struct list_head *list = &wq->ordered_list;
	struct btrfs_work_struct *work;
	spinlock_t *lock = &wq->list_lock;
	unsigned long flags;

	while (1) {
		spin_lock_irqsave(lock, flags);
		if (list_empty(list))
			break;
		work = list_entry(list->next, struct btrfs_work_struct,
				  ordered_list);
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/*
		 * we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;
		spin_unlock_irqrestore(lock, flags);
		work->ordered_func(work);

		/* now take the lock again and drop our item from the list */
		spin_lock_irqsave(lock, flags);
		list_del(&work->ordered_list);
		spin_unlock_irqrestore(lock, flags);

		/*
		 * we don't want to call the ordered free functions
		 * with the lock held though
		 */
		work->ordered_free(work);
	}
	spin_unlock_irqrestore(lock, flags);
}

static void normal_work_helper(struct work_struct *arg)
{
	struct btrfs_work_struct *work;
934
	struct __btrfs_workqueue_struct *wq;
935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
	int need_order = 0;

	work = container_of(arg, struct btrfs_work_struct, normal_work);
	/*
	 * We should not touch things inside work in the following cases:
	 * 1) after work->func() if it has no ordered_free
	 *    Since the struct is freed in work->func().
	 * 2) after setting WORK_DONE_BIT
	 *    The work may be freed in other threads almost instantly.
	 * So we save the needed things here.
	 */
	if (work->ordered_func)
		need_order = 1;
	wq = work->wq;

950
	thresh_exec_hook(wq);
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
	work->func(work);
	if (need_order) {
		set_bit(WORK_DONE_BIT, &work->flags);
		run_ordered_work(wq);
	}
}

void btrfs_init_work(struct btrfs_work_struct *work,
		     void (*func)(struct btrfs_work_struct *),
		     void (*ordered_func)(struct btrfs_work_struct *),
		     void (*ordered_free)(struct btrfs_work_struct *))
{
	work->func = func;
	work->ordered_func = ordered_func;
	work->ordered_free = ordered_free;
	INIT_WORK(&work->normal_work, normal_work_helper);
	INIT_LIST_HEAD(&work->ordered_list);
	work->flags = 0;
}

971 972
static inline void __btrfs_queue_work(struct __btrfs_workqueue_struct *wq,
				      struct btrfs_work_struct *work)
973 974 975 976
{
	unsigned long flags;

	work->wq = wq;
977
	thresh_queue_hook(wq);
978 979 980 981 982 983 984 985
	if (work->ordered_func) {
		spin_lock_irqsave(&wq->list_lock, flags);
		list_add_tail(&work->ordered_list, &wq->ordered_list);
		spin_unlock_irqrestore(&wq->list_lock, flags);
	}
	queue_work(wq->normal_wq, &work->normal_work);
}

986 987 988 989 990 991 992 993 994 995 996 997 998 999
void btrfs_queue_work(struct btrfs_workqueue_struct *wq,
		      struct btrfs_work_struct *work)
{
	struct __btrfs_workqueue_struct *dest_wq;

	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
		dest_wq = wq->high;
	else
		dest_wq = wq->normal;
	__btrfs_queue_work(dest_wq, work);
}

static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue_struct *wq)
1000 1001 1002 1003 1004
{
	destroy_workqueue(wq->normal_wq);
	kfree(wq);
}

1005 1006 1007 1008 1009 1010 1011 1012 1013
void btrfs_destroy_workqueue(struct btrfs_workqueue_struct *wq)
{
	if (!wq)
		return;
	if (wq->high)
		__btrfs_destroy_workqueue(wq->high);
	__btrfs_destroy_workqueue(wq->normal);
}

1014 1015
void btrfs_workqueue_set_max(struct btrfs_workqueue_struct *wq, int max)
{
1016
	wq->normal->max_active = max;
1017
	if (wq->high)
1018
		wq->high->max_active = max;
1019 1020 1021 1022 1023
}

void btrfs_set_work_high_priority(struct btrfs_work_struct *work)
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
1024
}