async-thread.c 18.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License v2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 */

#include <linux/kthread.h>
20
#include <linux/slab.h>
21 22
#include <linux/list.h>
#include <linux/spinlock.h>
23
#include <linux/freezer.h>
24 25
#include "async-thread.h"

C
Chris Mason 已提交
26 27 28
#define WORK_QUEUED_BIT 0
#define WORK_DONE_BIT 1
#define WORK_ORDER_DONE_BIT 2
29
#define WORK_HIGH_PRIO_BIT 3
C
Chris Mason 已提交
30

31 32 33 34 35
/*
 * container for the kthread task pointer and the list of pending work
 * One of these is allocated per thread.
 */
struct btrfs_worker_thread {
C
Chris Mason 已提交
36 37 38
	/* pool we belong to */
	struct btrfs_workers *workers;

39 40
	/* list of struct btrfs_work that are waiting for service */
	struct list_head pending;
41
	struct list_head prio_pending;
42 43 44 45 46 47 48 49 50

	/* list of worker threads from struct btrfs_workers */
	struct list_head worker_list;

	/* kthread */
	struct task_struct *task;

	/* number of things on the pending list */
	atomic_t num_pending;
51

52 53 54
	/* reference counter for this struct */
	atomic_t refs;

55
	unsigned long sequence;
56 57 58 59 60 61

	/* protects the pending list. */
	spinlock_t lock;

	/* set to non-zero when this thread is already awake and kicking */
	int working;
C
Chris Mason 已提交
62 63 64

	/* are we currently idle */
	int idle;
65 66
};

67 68
static int __btrfs_start_workers(struct btrfs_workers *workers);

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * btrfs_start_workers uses kthread_run, which can block waiting for memory
 * for a very long time.  It will actually throttle on page writeback,
 * and so it may not make progress until after our btrfs worker threads
 * process all of the pending work structs in their queue
 *
 * This means we can't use btrfs_start_workers from inside a btrfs worker
 * thread that is used as part of cleaning dirty memory, which pretty much
 * involves all of the worker threads.
 *
 * Instead we have a helper queue who never has more than one thread
 * where we scheduler thread start operations.  This worker_start struct
 * is used to contain the work and hold a pointer to the queue that needs
 * another worker.
 */
struct worker_start {
	struct btrfs_work work;
	struct btrfs_workers *queue;
};

static void start_new_worker_func(struct btrfs_work *work)
{
	struct worker_start *start;
	start = container_of(work, struct worker_start, work);
93
	__btrfs_start_workers(start->queue);
94 95 96
	kfree(start);
}

C
Chris Mason 已提交
97 98 99 100 101 102 103 104 105 106 107
/*
 * helper function to move a thread onto the idle list after it
 * has finished some requests.
 */
static void check_idle_worker(struct btrfs_worker_thread *worker)
{
	if (!worker->idle && atomic_read(&worker->num_pending) <
	    worker->workers->idle_thresh / 2) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 1;
108 109

		/* the list may be empty if the worker is just starting */
110 111
		if (!list_empty(&worker->worker_list) &&
		    !worker->workers->stopping) {
112 113 114
			list_move(&worker->worker_list,
				 &worker->workers->idle_list);
		}
C
Chris Mason 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

/*
 * helper function to move a thread off the idle list after new
 * pending work is added.
 */
static void check_busy_worker(struct btrfs_worker_thread *worker)
{
	if (worker->idle && atomic_read(&worker->num_pending) >=
	    worker->workers->idle_thresh) {
		unsigned long flags;
		spin_lock_irqsave(&worker->workers->lock, flags);
		worker->idle = 0;
130

131 132
		if (!list_empty(&worker->worker_list) &&
		    !worker->workers->stopping) {
133 134 135
			list_move_tail(&worker->worker_list,
				      &worker->workers->worker_list);
		}
C
Chris Mason 已提交
136 137 138 139
		spin_unlock_irqrestore(&worker->workers->lock, flags);
	}
}

140 141 142
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
	struct btrfs_workers *workers = worker->workers;
143
	struct worker_start *start;
144 145 146 147 148 149
	unsigned long flags;

	rmb();
	if (!workers->atomic_start_pending)
		return;

150 151 152 153 154 155 156
	start = kzalloc(sizeof(*start), GFP_NOFS);
	if (!start)
		return;

	start->work.func = start_new_worker_func;
	start->queue = workers;

157 158 159 160 161
	spin_lock_irqsave(&workers->lock, flags);
	if (!workers->atomic_start_pending)
		goto out;

	workers->atomic_start_pending = 0;
162 163
	if (workers->num_workers + workers->num_workers_starting >=
	    workers->max_workers)
164 165
		goto out;

166
	workers->num_workers_starting += 1;
167
	spin_unlock_irqrestore(&workers->lock, flags);
168
	btrfs_queue_worker(workers->atomic_worker_start, &start->work);
169 170 171
	return;

out:
172
	kfree(start);
173 174 175
	spin_unlock_irqrestore(&workers->lock, flags);
}

176
static noinline void run_ordered_completions(struct btrfs_workers *workers,
C
Chris Mason 已提交
177 178 179
					    struct btrfs_work *work)
{
	if (!workers->ordered)
180
		return;
C
Chris Mason 已提交
181 182 183

	set_bit(WORK_DONE_BIT, &work->flags);

184
	spin_lock(&workers->order_lock);
C
Chris Mason 已提交
185

186 187 188 189 190 191 192 193 194 195
	while (1) {
		if (!list_empty(&workers->prio_order_list)) {
			work = list_entry(workers->prio_order_list.next,
					  struct btrfs_work, order_list);
		} else if (!list_empty(&workers->order_list)) {
			work = list_entry(workers->order_list.next,
					  struct btrfs_work, order_list);
		} else {
			break;
		}
C
Chris Mason 已提交
196 197 198 199 200 201 202 203 204 205 206
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/* we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;

207
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
208 209 210

		work->ordered_func(work);

211
		/* now take the lock again and drop our item from the list */
212
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
213
		list_del(&work->order_list);
214 215 216 217 218 219
		spin_unlock(&workers->order_lock);

		/*
		 * we don't want to call the ordered free functions
		 * with the lock held though
		 */
C
Chris Mason 已提交
220
		work->ordered_free(work);
221
		spin_lock(&workers->order_lock);
C
Chris Mason 已提交
222 223
	}

224
	spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
225 226
}

227 228 229 230 231 232 233 234 235 236 237
static void put_worker(struct btrfs_worker_thread *worker)
{
	if (atomic_dec_and_test(&worker->refs))
		kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
	int freeit = 0;

	spin_lock_irq(&worker->lock);
238
	spin_lock(&worker->workers->lock);
239 240 241 242 243
	if (worker->workers->num_workers > 1 &&
	    worker->idle &&
	    !worker->working &&
	    !list_empty(&worker->worker_list) &&
	    list_empty(&worker->prio_pending) &&
244 245
	    list_empty(&worker->pending) &&
	    atomic_read(&worker->num_pending) == 0) {
246 247 248 249
		freeit = 1;
		list_del_init(&worker->worker_list);
		worker->workers->num_workers--;
	}
250
	spin_unlock(&worker->workers->lock);
251 252 253 254 255 256 257
	spin_unlock_irq(&worker->lock);

	if (freeit)
		put_worker(worker);
	return freeit;
}

258 259 260 261 262 263 264
static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
					struct list_head *prio_head,
					struct list_head *head)
{
	struct btrfs_work *work = NULL;
	struct list_head *cur = NULL;

265
	if (!list_empty(prio_head))
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
		cur = prio_head->next;

	smp_mb();
	if (!list_empty(&worker->prio_pending))
		goto refill;

	if (!list_empty(head))
		cur = head->next;

	if (cur)
		goto out;

refill:
	spin_lock_irq(&worker->lock);
	list_splice_tail_init(&worker->prio_pending, prio_head);
	list_splice_tail_init(&worker->pending, head);

	if (!list_empty(prio_head))
		cur = prio_head->next;
	else if (!list_empty(head))
		cur = head->next;
	spin_unlock_irq(&worker->lock);

	if (!cur)
		goto out_fail;

out:
	work = list_entry(cur, struct btrfs_work, list);

out_fail:
	return work;
}

299 300 301 302 303 304
/*
 * main loop for servicing work items
 */
static int worker_loop(void *arg)
{
	struct btrfs_worker_thread *worker = arg;
305 306
	struct list_head head;
	struct list_head prio_head;
307
	struct btrfs_work *work;
308 309 310 311

	INIT_LIST_HEAD(&head);
	INIT_LIST_HEAD(&prio_head);

312
	do {
313
again:
314
		while (1) {
315 316 317 318


			work = get_next_work(worker, &prio_head, &head);
			if (!work)
319 320
				break;

321
			list_del(&work->list);
C
Chris Mason 已提交
322
			clear_bit(WORK_QUEUED_BIT, &work->flags);
323 324 325 326 327 328

			work->worker = worker;

			work->func(work);

			atomic_dec(&worker->num_pending);
C
Chris Mason 已提交
329 330 331 332 333 334
			/*
			 * unless this is an ordered work queue,
			 * 'work' was probably freed by func above.
			 */
			run_ordered_completions(worker->workers, work);

335
			check_pending_worker_creates(worker);
336
			cond_resched();
337
		}
338 339 340 341

		spin_lock_irq(&worker->lock);
		check_idle_worker(worker);

342
		if (freezing(current)) {
343 344
			worker->working = 0;
			spin_unlock_irq(&worker->lock);
345
			try_to_freeze();
346 347
		} else {
			spin_unlock_irq(&worker->lock);
348 349 350 351 352 353 354
			if (!kthread_should_stop()) {
				cpu_relax();
				/*
				 * we've dropped the lock, did someone else
				 * jump_in?
				 */
				smp_mb();
355 356
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
357 358 359 360 361 362 363 364 365 366 367 368
					continue;

				/*
				 * this short schedule allows more work to
				 * come in without the queue functions
				 * needing to go through wake_up_process()
				 *
				 * worker->working is still 1, so nobody
				 * is going to try and wake us up
				 */
				schedule_timeout(1);
				smp_mb();
369 370
				if (!list_empty(&worker->pending) ||
				    !list_empty(&worker->prio_pending))
371 372
					continue;

A
Amit Gud 已提交
373 374 375
				if (kthread_should_stop())
					break;

376 377 378
				/* still no more work?, sleep for real */
				spin_lock_irq(&worker->lock);
				set_current_state(TASK_INTERRUPTIBLE);
379
				if (!list_empty(&worker->pending) ||
380 381
				    !list_empty(&worker->prio_pending)) {
					spin_unlock_irq(&worker->lock);
382
					set_current_state(TASK_RUNNING);
383 384
					goto again;
				}
385 386 387 388 389 390 391 392

				/*
				 * this makes sure we get a wakeup when someone
				 * adds something new to the queue
				 */
				worker->working = 0;
				spin_unlock_irq(&worker->lock);

393 394 395 396 397 398 399
				if (!kthread_should_stop()) {
					schedule_timeout(HZ * 120);
					if (!worker->working &&
					    try_worker_shutdown(worker)) {
						return 0;
					}
				}
400
			}
401 402 403 404 405 406 407 408 409
			__set_current_state(TASK_RUNNING);
		}
	} while (!kthread_should_stop());
	return 0;
}

/*
 * this will wait for all the worker threads to shutdown
 */
410
void btrfs_stop_workers(struct btrfs_workers *workers)
411 412 413
{
	struct list_head *cur;
	struct btrfs_worker_thread *worker;
414
	int can_stop;
415

416
	spin_lock_irq(&workers->lock);
417
	workers->stopping = 1;
C
Chris Mason 已提交
418
	list_splice_init(&workers->idle_list, &workers->worker_list);
C
Chris Mason 已提交
419
	while (!list_empty(&workers->worker_list)) {
420 421 422
		cur = workers->worker_list.next;
		worker = list_entry(cur, struct btrfs_worker_thread,
				    worker_list);
423 424 425 426 427 428 429 430 431 432 433 434 435 436

		atomic_inc(&worker->refs);
		workers->num_workers -= 1;
		if (!list_empty(&worker->worker_list)) {
			list_del_init(&worker->worker_list);
			put_worker(worker);
			can_stop = 1;
		} else
			can_stop = 0;
		spin_unlock_irq(&workers->lock);
		if (can_stop)
			kthread_stop(worker->task);
		spin_lock_irq(&workers->lock);
		put_worker(worker);
437
	}
438
	spin_unlock_irq(&workers->lock);
439 440 441 442 443
}

/*
 * simple init on struct btrfs_workers
 */
444 445
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
			struct btrfs_workers *async_helper)
446 447
{
	workers->num_workers = 0;
448
	workers->num_workers_starting = 0;
449
	INIT_LIST_HEAD(&workers->worker_list);
C
Chris Mason 已提交
450
	INIT_LIST_HEAD(&workers->idle_list);
C
Chris Mason 已提交
451
	INIT_LIST_HEAD(&workers->order_list);
452
	INIT_LIST_HEAD(&workers->prio_order_list);
453
	spin_lock_init(&workers->lock);
454
	spin_lock_init(&workers->order_lock);
455
	workers->max_workers = max;
456
	workers->idle_thresh = 32;
457
	workers->name = name;
C
Chris Mason 已提交
458
	workers->ordered = 0;
459
	workers->atomic_start_pending = 0;
460
	workers->atomic_worker_start = async_helper;
461
	workers->stopping = 0;
462 463 464 465 466 467
}

/*
 * starts new worker threads.  This does not enforce the max worker
 * count in case you need to temporarily go past it.
 */
468
static int __btrfs_start_workers(struct btrfs_workers *workers)
469 470 471 472
{
	struct btrfs_worker_thread *worker;
	int ret = 0;

473 474 475 476 477
	worker = kzalloc(sizeof(*worker), GFP_NOFS);
	if (!worker) {
		ret = -ENOMEM;
		goto fail;
	}
478

479 480 481 482 483 484 485 486
	INIT_LIST_HEAD(&worker->pending);
	INIT_LIST_HEAD(&worker->prio_pending);
	INIT_LIST_HEAD(&worker->worker_list);
	spin_lock_init(&worker->lock);

	atomic_set(&worker->num_pending, 0);
	atomic_set(&worker->refs, 1);
	worker->workers = workers;
487 488 489
	worker->task = kthread_create(worker_loop, worker,
				      "btrfs-%s-%d", workers->name,
				      workers->num_workers + 1);
490 491 492
	if (IS_ERR(worker->task)) {
		ret = PTR_ERR(worker->task);
		goto fail;
493
	}
494

495
	spin_lock_irq(&workers->lock);
496 497
	if (workers->stopping) {
		spin_unlock_irq(&workers->lock);
498
		ret = -EINVAL;
499 500
		goto fail_kthread;
	}
501 502 503 504 505 506 507
	list_add_tail(&worker->worker_list, &workers->idle_list);
	worker->idle = 1;
	workers->num_workers++;
	workers->num_workers_starting--;
	WARN_ON(workers->num_workers_starting < 0);
	spin_unlock_irq(&workers->lock);

508
	wake_up_process(worker->task);
509
	return 0;
510 511 512

fail_kthread:
	kthread_stop(worker->task);
513
fail:
514
	kfree(worker);
515 516 517
	spin_lock_irq(&workers->lock);
	workers->num_workers_starting--;
	spin_unlock_irq(&workers->lock);
518 519 520
	return ret;
}

521
int btrfs_start_workers(struct btrfs_workers *workers)
522 523
{
	spin_lock_irq(&workers->lock);
524
	workers->num_workers_starting++;
525
	spin_unlock_irq(&workers->lock);
526
	return __btrfs_start_workers(workers);
527 528
}

529 530 531 532 533 534 535 536 537
/*
 * run through the list and find a worker thread that doesn't have a lot
 * to do right now.  This can return null if we aren't yet at the thread
 * count limit and all of the threads are busy.
 */
static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	struct list_head *next;
538 539 540 541
	int enforce_min;

	enforce_min = (workers->num_workers + workers->num_workers_starting) <
		workers->max_workers;
542 543

	/*
C
Chris Mason 已提交
544 545 546 547
	 * if we find an idle thread, don't move it to the end of the
	 * idle list.  This improves the chance that the next submission
	 * will reuse the same thread, and maybe catch it while it is still
	 * working
548
	 */
C
Chris Mason 已提交
549 550
	if (!list_empty(&workers->idle_list)) {
		next = workers->idle_list.next;
551 552
		worker = list_entry(next, struct btrfs_worker_thread,
				    worker_list);
C
Chris Mason 已提交
553
		return worker;
554
	}
C
Chris Mason 已提交
555 556 557
	if (enforce_min || list_empty(&workers->worker_list))
		return NULL;

558
	/*
C
Chris Mason 已提交
559
	 * if we pick a busy task, move the task to the end of the list.
C
Chris Mason 已提交
560 561 562
	 * hopefully this will keep things somewhat evenly balanced.
	 * Do the move in batches based on the sequence number.  This groups
	 * requests submitted at roughly the same time onto the same worker.
563
	 */
C
Chris Mason 已提交
564 565
	next = workers->worker_list.next;
	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
566
	worker->sequence++;
C
Chris Mason 已提交
567

568
	if (worker->sequence % workers->idle_thresh == 0)
569
		list_move_tail(next, &workers->worker_list);
570 571 572
	return worker;
}

C
Chris Mason 已提交
573 574 575 576 577
/*
 * selects a worker thread to take the next job.  This will either find
 * an idle worker, start a new worker up to the max count, or just return
 * one of the existing busy workers.
 */
578 579 580 581
static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
582
	struct list_head *fallback;
583
	int ret;
584 585

	spin_lock_irqsave(&workers->lock, flags);
586
again:
587 588 589
	worker = next_worker(workers);

	if (!worker) {
590 591
		if (workers->num_workers + workers->num_workers_starting >=
		    workers->max_workers) {
592 593 594 595
			goto fallback;
		} else if (workers->atomic_worker_start) {
			workers->atomic_start_pending = 1;
			goto fallback;
596
		} else {
597
			workers->num_workers_starting++;
598 599
			spin_unlock_irqrestore(&workers->lock, flags);
			/* we're below the limit, start another worker */
600
			ret = __btrfs_start_workers(workers);
601
			spin_lock_irqsave(&workers->lock, flags);
602 603
			if (ret)
				goto fallback;
604 605 606
			goto again;
		}
	}
607
	goto found;
608 609 610 611 612 613 614 615 616 617 618 619 620 621

fallback:
	fallback = NULL;
	/*
	 * we have failed to find any workers, just
	 * return the first one we can find.
	 */
	if (!list_empty(&workers->worker_list))
		fallback = workers->worker_list.next;
	if (!list_empty(&workers->idle_list))
		fallback = workers->idle_list.next;
	BUG_ON(!fallback);
	worker = list_entry(fallback,
		  struct btrfs_worker_thread, worker_list);
622 623 624 625 626 627
found:
	/*
	 * this makes sure the worker doesn't exit before it is placed
	 * onto a busy/idle list
	 */
	atomic_inc(&worker->num_pending);
628 629
	spin_unlock_irqrestore(&workers->lock, flags);
	return worker;
630 631 632 633 634 635 636
}

/*
 * btrfs_requeue_work just puts the work item back on the tail of the list
 * it was taken from.  It is intended for use with long running work functions
 * that make some progress and want to give the cpu up for others.
 */
637
void btrfs_requeue_work(struct btrfs_work *work)
638 639 640
{
	struct btrfs_worker_thread *worker = work->worker;
	unsigned long flags;
641
	int wake = 0;
642

C
Chris Mason 已提交
643
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
644
		return;
645 646

	spin_lock_irqsave(&worker->lock, flags);
647 648 649 650
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
651
	atomic_inc(&worker->num_pending);
652 653 654 655 656

	/* by definition we're busy, take ourselves off the idle
	 * list
	 */
	if (worker->idle) {
657
		spin_lock(&worker->workers->lock);
658 659
		worker->idle = 0;
		list_move_tail(&worker->worker_list,
660
			      &worker->workers->worker_list);
661
		spin_unlock(&worker->workers->lock);
662
	}
663 664 665 666
	if (!worker->working) {
		wake = 1;
		worker->working = 1;
	}
667

668 669
	if (wake)
		wake_up_process(worker->task);
670
	spin_unlock_irqrestore(&worker->lock, flags);
671 672
}

673 674 675 676 677
void btrfs_set_work_high_prio(struct btrfs_work *work)
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}

678 679 680
/*
 * places a struct btrfs_work into the pending queue of one of the kthreads
 */
681
void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
682 683 684 685 686 687
{
	struct btrfs_worker_thread *worker;
	unsigned long flags;
	int wake = 0;

	/* don't requeue something already on a list */
C
Chris Mason 已提交
688
	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
689
		return;
690 691

	worker = find_worker(workers);
C
Chris Mason 已提交
692
	if (workers->ordered) {
693 694 695 696 697
		/*
		 * you're not allowed to do ordered queues from an
		 * interrupt handler
		 */
		spin_lock(&workers->order_lock);
698 699 700 701 702 703
		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
			list_add_tail(&work->order_list,
				      &workers->prio_order_list);
		} else {
			list_add_tail(&work->order_list, &workers->order_list);
		}
704
		spin_unlock(&workers->order_lock);
C
Chris Mason 已提交
705 706 707
	} else {
		INIT_LIST_HEAD(&work->order_list);
	}
708 709

	spin_lock_irqsave(&worker->lock, flags);
710

711 712 713 714
	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
		list_add_tail(&work->list, &worker->prio_pending);
	else
		list_add_tail(&work->list, &worker->pending);
C
Chris Mason 已提交
715
	check_busy_worker(worker);
716 717 718 719 720 721 722 723 724 725 726

	/*
	 * avoid calling into wake_up_process if this thread has already
	 * been kicked
	 */
	if (!worker->working)
		wake = 1;
	worker->working = 1;

	if (wake)
		wake_up_process(worker->task);
727
	spin_unlock_irqrestore(&worker->lock, flags);
728
}