workqueue.c 20.1 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * linux/kernel/workqueue.c
 *
 * Generic mechanism for defining kernel helper threads for running
 * arbitrary tasks in process context.
 *
 * Started by Ingo Molnar, Copyright (C) 2002
 *
 * Derived from the taskqueue/keventd code by:
 *
 *   David Woodhouse <dwmw2@infradead.org>
 *   Andrew Morton <andrewm@uow.edu.au>
 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
 *   Theodore Ts'o <tytso@mit.edu>
15 16
 *
 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
L
Linus Torvalds 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
30
#include <linux/hardirq.h>
31
#include <linux/mempolicy.h>
32
#include <linux/freezer.h>
33 34
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
L
Linus Torvalds 已提交
35 36

/*
37 38
 * The per-CPU workqueue (if single thread, we always use the first
 * possible cpu).
L
Linus Torvalds 已提交
39 40 41 42 43 44 45
 */
struct cpu_workqueue_struct {

	spinlock_t lock;

	struct list_head worklist;
	wait_queue_head_t more_work;
46
	struct work_struct *current_work;
L
Linus Torvalds 已提交
47 48

	struct workqueue_struct *wq;
49
	struct task_struct *thread;
50
	int should_stop;
L
Linus Torvalds 已提交
51 52 53 54 55 56 57 58 59

	int run_depth;		/* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;

/*
 * The externally visible workqueue abstraction is an array of
 * per-CPU workqueues:
 */
struct workqueue_struct {
60
	struct cpu_workqueue_struct *cpu_wq;
L
Linus Torvalds 已提交
61 62
	const char *name;
	struct list_head list; 	/* Empty if single thread */
63
	int freezeable;		/* Freeze threads during suspend */
L
Linus Torvalds 已提交
64 65 66 67
};

/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
   threads to each one as cpus come/go. */
68
static DEFINE_MUTEX(workqueue_mutex);
L
Linus Torvalds 已提交
69 70
static LIST_HEAD(workqueues);

71 72 73
static int singlethread_cpu __read_mostly;
/* optimization, we could use cpu_possible_map */
static cpumask_t cpu_populated_map __read_mostly;
74

L
Linus Torvalds 已提交
75 76 77 78 79 80
/* If it's single threaded, it isn't in the list of workqueues. */
static inline int is_single_threaded(struct workqueue_struct *wq)
{
	return list_empty(&wq->list);
}

81 82 83 84
/*
 * Set the workqueue on which a work item is to be run
 * - Must *only* be called if the pending flag is set
 */
85 86
static inline void set_wq_data(struct work_struct *work, void *wq)
{
87 88 89
	unsigned long new;

	BUG_ON(!work_pending(work));
90 91

	new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
92 93
	new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
	atomic_long_set(&work->data, new);
94 95 96 97
}

static inline void *get_wq_data(struct work_struct *work)
{
98
	return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
99 100
}

O
Oleg Nesterov 已提交
101 102 103 104 105 106 107 108 109 110 111
static void insert_work(struct cpu_workqueue_struct *cwq,
				struct work_struct *work, int tail)
{
	set_wq_data(work, cwq);
	if (tail)
		list_add_tail(&work->entry, &cwq->worklist);
	else
		list_add(&work->entry, &cwq->worklist);
	wake_up(&cwq->more_work);
}

L
Linus Torvalds 已提交
112 113 114 115 116 117 118
/* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq,
			 struct work_struct *work)
{
	unsigned long flags;

	spin_lock_irqsave(&cwq->lock, flags);
O
Oleg Nesterov 已提交
119
	insert_work(cwq, work, 1);
L
Linus Torvalds 已提交
120 121 122
	spin_unlock_irqrestore(&cwq->lock, flags);
}

123 124 125 126 127
/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
A
Alan Stern 已提交
128
 * Returns 0 if @work was already on a queue, non-zero otherwise.
L
Linus Torvalds 已提交
129 130 131 132 133 134 135 136
 *
 * We queue the work to the CPU it was submitted, but there is no
 * guarantee that it will be processed by that CPU.
 */
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
	int ret = 0, cpu = get_cpu();

137
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
L
Linus Torvalds 已提交
138
		if (unlikely(is_single_threaded(wq)))
139
			cpu = singlethread_cpu;
L
Linus Torvalds 已提交
140
		BUG_ON(!list_empty(&work->entry));
141
		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
L
Linus Torvalds 已提交
142 143 144 145 146
		ret = 1;
	}
	put_cpu();
	return ret;
}
147
EXPORT_SYMBOL_GPL(queue_work);
L
Linus Torvalds 已提交
148

149
void delayed_work_timer_fn(unsigned long __data)
L
Linus Torvalds 已提交
150
{
151
	struct delayed_work *dwork = (struct delayed_work *)__data;
152
	struct workqueue_struct *wq = get_wq_data(&dwork->work);
L
Linus Torvalds 已提交
153 154 155
	int cpu = smp_processor_id();

	if (unlikely(is_single_threaded(wq)))
156
		cpu = singlethread_cpu;
L
Linus Torvalds 已提交
157

158
	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
L
Linus Torvalds 已提交
159 160
}

161 162 163
/**
 * queue_delayed_work - queue work on a workqueue after delay
 * @wq: workqueue to use
164
 * @dwork: delayable work to queue
165 166
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
167
 * Returns 0 if @work was already on a queue, non-zero otherwise.
168
 */
L
Linus Torvalds 已提交
169
int fastcall queue_delayed_work(struct workqueue_struct *wq,
170
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
171 172
{
	int ret = 0;
173 174 175
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;

176
	timer_stats_timer_set_start_info(timer);
177 178
	if (delay == 0)
		return queue_work(wq, work);
L
Linus Torvalds 已提交
179

180
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
L
Linus Torvalds 已提交
181 182 183 184
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

		/* This stores wq for the moment, for the timer_fn */
185
		set_wq_data(work, wq);
L
Linus Torvalds 已提交
186
		timer->expires = jiffies + delay;
187
		timer->data = (unsigned long)dwork;
L
Linus Torvalds 已提交
188 189 190 191 192 193
		timer->function = delayed_work_timer_fn;
		add_timer(timer);
		ret = 1;
	}
	return ret;
}
194
EXPORT_SYMBOL_GPL(queue_delayed_work);
L
Linus Torvalds 已提交
195

196 197 198 199
/**
 * queue_delayed_work_on - queue work on specific CPU after delay
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
200
 * @dwork: work to queue
201 202
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
203
 * Returns 0 if @work was already on a queue, non-zero otherwise.
204
 */
205
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
206
			struct delayed_work *dwork, unsigned long delay)
207 208
{
	int ret = 0;
209 210
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;
211

212
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
213 214 215 216
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

		/* This stores wq for the moment, for the timer_fn */
217
		set_wq_data(work, wq);
218
		timer->expires = jiffies + delay;
219
		timer->data = (unsigned long)dwork;
220 221 222 223 224 225
		timer->function = delayed_work_timer_fn;
		add_timer_on(timer, cpu);
		ret = 1;
	}
	return ret;
}
226
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
L
Linus Torvalds 已提交
227

228
static void run_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
229
{
230
	spin_lock_irq(&cwq->lock);
L
Linus Torvalds 已提交
231 232 233 234 235 236 237 238 239 240
	cwq->run_depth++;
	if (cwq->run_depth > 3) {
		/* morton gets to eat his hat */
		printk("%s: recursion depth exceeded: %d\n",
			__FUNCTION__, cwq->run_depth);
		dump_stack();
	}
	while (!list_empty(&cwq->worklist)) {
		struct work_struct *work = list_entry(cwq->worklist.next,
						struct work_struct, entry);
241
		work_func_t f = work->func;
L
Linus Torvalds 已提交
242

O
Oleg Nesterov 已提交
243
		cwq->current_work = work;
L
Linus Torvalds 已提交
244
		list_del_init(cwq->worklist.next);
245
		spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
246

247
		BUG_ON(get_wq_data(work) != cwq);
248
		if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
249 250
			work_release(work);
		f(work);
L
Linus Torvalds 已提交
251

252 253 254 255 256 257 258 259 260 261 262
		if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
			printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
					"%s/0x%08x/%d\n",
					current->comm, preempt_count(),
				       	current->pid);
			printk(KERN_ERR "    last function: ");
			print_symbol("%s\n", (unsigned long)f);
			debug_show_held_locks(current);
			dump_stack();
		}

263
		spin_lock_irq(&cwq->lock);
O
Oleg Nesterov 已提交
264
		cwq->current_work = NULL;
L
Linus Torvalds 已提交
265 266
	}
	cwq->run_depth--;
267
	spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
268 269
}

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
/*
 * NOTE: the caller must not touch *cwq if this func returns true
 */
static int cwq_should_stop(struct cpu_workqueue_struct *cwq)
{
	int should_stop = cwq->should_stop;

	if (unlikely(should_stop)) {
		spin_lock_irq(&cwq->lock);
		should_stop = cwq->should_stop && list_empty(&cwq->worklist);
		if (should_stop)
			cwq->thread = NULL;
		spin_unlock_irq(&cwq->lock);
	}

	return should_stop;
}

L
Linus Torvalds 已提交
288 289 290
static int worker_thread(void *__cwq)
{
	struct cpu_workqueue_struct *cwq = __cwq;
291
	DEFINE_WAIT(wait);
L
Linus Torvalds 已提交
292 293 294
	struct k_sigaction sa;
	sigset_t blocked;

295
	if (!cwq->wq->freezeable)
296
		current->flags |= PF_NOFREEZE;
L
Linus Torvalds 已提交
297 298 299 300 301 302 303 304

	set_user_nice(current, -5);

	/* Block and flush all signals */
	sigfillset(&blocked);
	sigprocmask(SIG_BLOCK, &blocked, NULL);
	flush_signals(current);

305 306 307 308 309 310
	/*
	 * We inherited MPOL_INTERLEAVE from the booting kernel.
	 * Set MPOL_DEFAULT to insure node local allocations.
	 */
	numa_default_policy();

L
Linus Torvalds 已提交
311 312 313 314 315 316
	/* SIG_IGN makes children autoreap: see do_notify_parent(). */
	sa.sa.sa_handler = SIG_IGN;
	sa.sa.sa_flags = 0;
	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

317
	for (;;) {
318
		if (cwq->wq->freezeable)
319 320
			try_to_freeze();

321 322
		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
		if (!cwq->should_stop && list_empty(&cwq->worklist))
L
Linus Torvalds 已提交
323
			schedule();
324 325 326 327
		finish_wait(&cwq->more_work, &wait);

		if (cwq_should_stop(cwq))
			break;
L
Linus Torvalds 已提交
328

329
		run_workqueue(cwq);
L
Linus Torvalds 已提交
330
	}
331

L
Linus Torvalds 已提交
332 333 334
	return 0;
}

O
Oleg Nesterov 已提交
335 336 337 338 339 340 341 342 343 344 345
struct wq_barrier {
	struct work_struct	work;
	struct completion	done;
};

static void wq_barrier_func(struct work_struct *work)
{
	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
	complete(&barr->done);
}

346 347
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
					struct wq_barrier *barr, int tail)
O
Oleg Nesterov 已提交
348 349 350 351 352
{
	INIT_WORK(&barr->work, wq_barrier_func);
	__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));

	init_completion(&barr->done);
353 354

	insert_work(cwq, &barr->work, tail);
O
Oleg Nesterov 已提交
355 356
}

L
Linus Torvalds 已提交
357 358 359 360 361 362 363 364 365
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
	if (cwq->thread == current) {
		/*
		 * Probably keventd trying to flush its own queue. So simply run
		 * it by hand rather than deadlocking.
		 */
		run_workqueue(cwq);
	} else {
O
Oleg Nesterov 已提交
366
		struct wq_barrier barr;
367
		int active = 0;
L
Linus Torvalds 已提交
368

369 370 371 372 373 374
		spin_lock_irq(&cwq->lock);
		if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
			insert_wq_barrier(cwq, &barr, 1);
			active = 1;
		}
		spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
375

376
		if (active)
377
			wait_for_completion(&barr.done);
L
Linus Torvalds 已提交
378 379 380
	}
}

381
/**
L
Linus Torvalds 已提交
382
 * flush_workqueue - ensure that any scheduled work has run to completion.
383
 * @wq: workqueue to flush
L
Linus Torvalds 已提交
384 385 386 387
 *
 * Forces execution of the workqueue and blocks until its completion.
 * This is typically used in driver shutdown handlers.
 *
O
Oleg Nesterov 已提交
388 389
 * We sleep until all works which were queued on entry have been handled,
 * but we are not livelocked by new incoming ones.
L
Linus Torvalds 已提交
390 391 392 393 394 395
 *
 * This function used to run the workqueues itself.  Now we just wait for the
 * helper threads to do it.
 */
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
396 397
	might_sleep();

398
	if (is_single_threaded(wq))
399
		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
400
	else {
L
Linus Torvalds 已提交
401 402
		int cpu;

403
		for_each_cpu_mask(cpu, cpu_populated_map)
404
			flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
L
Linus Torvalds 已提交
405 406
	}
}
407
EXPORT_SYMBOL_GPL(flush_workqueue);
L
Linus Torvalds 已提交
408

O
Oleg Nesterov 已提交
409 410 411 412 413 414 415 416
static void wait_on_work(struct cpu_workqueue_struct *cwq,
				struct work_struct *work)
{
	struct wq_barrier barr;
	int running = 0;

	spin_lock_irq(&cwq->lock);
	if (unlikely(cwq->current_work == work)) {
417
		insert_wq_barrier(cwq, &barr, 0);
O
Oleg Nesterov 已提交
418 419 420 421
		running = 1;
	}
	spin_unlock_irq(&cwq->lock);

422
	if (unlikely(running))
O
Oleg Nesterov 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
		wait_for_completion(&barr.done);
}

/**
 * flush_work - block until a work_struct's callback has terminated
 * @wq: the workqueue on which the work is queued
 * @work: the work which is to be flushed
 *
 * flush_work() will attempt to cancel the work if it is queued.  If the work's
 * callback appears to be running, flush_work() will block until it has
 * completed.
 *
 * flush_work() is designed to be used when the caller is tearing down data
 * structures which the callback function operates upon.  It is expected that,
 * prior to calling flush_work(), the caller has arranged for the work to not
 * be requeued.
 */
void flush_work(struct workqueue_struct *wq, struct work_struct *work)
{
	struct cpu_workqueue_struct *cwq;

444 445
	might_sleep();

O
Oleg Nesterov 已提交
446 447 448
	cwq = get_wq_data(work);
	/* Was it ever queued ? */
	if (!cwq)
449
		return;
O
Oleg Nesterov 已提交
450 451

	/*
452 453
	 * This work can't be re-queued, no need to re-check that
	 * get_wq_data() is still the same when we take cwq->lock.
O
Oleg Nesterov 已提交
454 455 456 457 458 459
	 */
	spin_lock_irq(&cwq->lock);
	list_del_init(&work->entry);
	work_release(work);
	spin_unlock_irq(&cwq->lock);

460
	if (is_single_threaded(wq))
O
Oleg Nesterov 已提交
461
		wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work);
462
	else {
O
Oleg Nesterov 已提交
463 464
		int cpu;

465
		for_each_cpu_mask(cpu, cpu_populated_map)
O
Oleg Nesterov 已提交
466 467 468 469 470
			wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
	}
}
EXPORT_SYMBOL_GPL(flush_work);

L
Linus Torvalds 已提交
471 472 473

static struct workqueue_struct *keventd_wq;

474 475 476 477 478 479
/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
 * This puts a job in the kernel-global workqueue.
 */
L
Linus Torvalds 已提交
480 481 482 483
int fastcall schedule_work(struct work_struct *work)
{
	return queue_work(keventd_wq, work);
}
484
EXPORT_SYMBOL(schedule_work);
L
Linus Torvalds 已提交
485

486 487
/**
 * schedule_delayed_work - put work task in global workqueue after delay
488 489
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
490 491 492 493
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
494 495
int fastcall schedule_delayed_work(struct delayed_work *dwork,
					unsigned long delay)
L
Linus Torvalds 已提交
496
{
497
	timer_stats_timer_set_start_info(&dwork->timer);
498
	return queue_delayed_work(keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
499
}
500
EXPORT_SYMBOL(schedule_delayed_work);
L
Linus Torvalds 已提交
501

502 503 504
/**
 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
 * @cpu: cpu to use
505
 * @dwork: job to be done
506 507 508 509 510
 * @delay: number of jiffies to wait
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue on the specified CPU.
 */
L
Linus Torvalds 已提交
511
int schedule_delayed_work_on(int cpu,
512
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
513
{
514
	return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
515
}
516
EXPORT_SYMBOL(schedule_delayed_work_on);
L
Linus Torvalds 已提交
517

518 519 520 521 522 523 524 525 526 527 528
/**
 * schedule_on_each_cpu - call a function on each online CPU from keventd
 * @func: the function to call
 *
 * Returns zero on success.
 * Returns -ve errno on failure.
 *
 * Appears to be racy against CPU hotplug.
 *
 * schedule_on_each_cpu() is very slow.
 */
529
int schedule_on_each_cpu(work_func_t func)
530 531
{
	int cpu;
532
	struct work_struct *works;
533

534 535
	works = alloc_percpu(struct work_struct);
	if (!works)
536
		return -ENOMEM;
537

538
	preempt_disable();		/* CPU hotplug */
539
	for_each_online_cpu(cpu) {
540 541 542 543 544
		struct work_struct *work = per_cpu_ptr(works, cpu);

		INIT_WORK(work, func);
		set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
		__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
545
	}
546
	preempt_enable();
547
	flush_workqueue(keventd_wq);
548
	free_percpu(works);
549 550 551
	return 0;
}

L
Linus Torvalds 已提交
552 553 554 555
void flush_scheduled_work(void)
{
	flush_workqueue(keventd_wq);
}
556
EXPORT_SYMBOL(flush_scheduled_work);
L
Linus Torvalds 已提交
557

O
Oleg Nesterov 已提交
558 559 560 561 562 563
void flush_work_keventd(struct work_struct *work)
{
	flush_work(keventd_wq, work);
}
EXPORT_SYMBOL(flush_work_keventd);

L
Linus Torvalds 已提交
564
/**
565
 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
L
Linus Torvalds 已提交
566
 * @wq:   the controlling workqueue structure
567
 * @dwork: the delayed work struct
L
Linus Torvalds 已提交
568
 */
569
void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
570
				       struct delayed_work *dwork)
L
Linus Torvalds 已提交
571
{
572
	while (!cancel_delayed_work(dwork))
L
Linus Torvalds 已提交
573 574
		flush_workqueue(wq);
}
575
EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
L
Linus Torvalds 已提交
576 577

/**
578
 * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
579
 * @dwork: the delayed work struct
L
Linus Torvalds 已提交
580
 */
581
void cancel_rearming_delayed_work(struct delayed_work *dwork)
L
Linus Torvalds 已提交
582
{
583
	cancel_rearming_delayed_workqueue(keventd_wq, dwork);
L
Linus Torvalds 已提交
584 585 586
}
EXPORT_SYMBOL(cancel_rearming_delayed_work);

587 588 589 590 591 592 593 594 595 596 597 598
/**
 * execute_in_process_context - reliably execute the routine with user context
 * @fn:		the function to execute
 * @ew:		guaranteed storage for the execute work structure (must
 *		be available when the work executes)
 *
 * Executes the function immediately if process context is available,
 * otherwise schedules the function for delayed execution.
 *
 * Returns:	0 - function was executed
 *		1 - function was scheduled for execution
 */
599
int execute_in_process_context(work_func_t fn, struct execute_work *ew)
600 601
{
	if (!in_interrupt()) {
602
		fn(&ew->work);
603 604 605
		return 0;
	}

606
	INIT_WORK(&ew->work, fn);
607 608 609 610 611 612
	schedule_work(&ew->work);

	return 1;
}
EXPORT_SYMBOL_GPL(execute_in_process_context);

L
Linus Torvalds 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625
int keventd_up(void)
{
	return keventd_wq != NULL;
}

int current_is_keventd(void)
{
	struct cpu_workqueue_struct *cwq;
	int cpu = smp_processor_id();	/* preempt-safe: keventd is per-cpu */
	int ret = 0;

	BUG_ON(!keventd_wq);

626
	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
627 628 629 630 631 632 633
	if (current == cwq->thread)
		ret = 1;

	return ret;

}

634 635
static struct cpu_workqueue_struct *
init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
L
Linus Torvalds 已提交
636
{
637
	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
638

639 640 641 642 643 644
	cwq->wq = wq;
	spin_lock_init(&cwq->lock);
	INIT_LIST_HEAD(&cwq->worklist);
	init_waitqueue_head(&cwq->more_work);

	return cwq;
L
Linus Torvalds 已提交
645 646
}

647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct workqueue_struct *wq = cwq->wq;
	const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
	struct task_struct *p;

	p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
	/*
	 * Nobody can add the work_struct to this cwq,
	 *	if (caller is __create_workqueue)
	 *		nobody should see this wq
	 *	else // caller is CPU_UP_PREPARE
	 *		cpu is not on cpu_online_map
	 * so we can abort safely.
	 */
	if (IS_ERR(p))
		return PTR_ERR(p);

	cwq->thread = p;
	cwq->should_stop = 0;
	if (!is_single_threaded(wq))
		kthread_bind(p, cpu);

	if (is_single_threaded(wq) || cpu_online(cpu))
		wake_up_process(p);

	return 0;
}

struct workqueue_struct *__create_workqueue(const char *name,
					    int singlethread, int freezeable)
L
Linus Torvalds 已提交
678 679
{
	struct workqueue_struct *wq;
680 681
	struct cpu_workqueue_struct *cwq;
	int err = 0, cpu;
L
Linus Torvalds 已提交
682

683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
		return NULL;

	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
	if (!wq->cpu_wq) {
		kfree(wq);
		return NULL;
	}

	wq->name = name;
	wq->freezeable = freezeable;

	if (singlethread) {
		INIT_LIST_HEAD(&wq->list);
		cwq = init_cpu_workqueue(wq, singlethread_cpu);
		err = create_workqueue_thread(cwq, singlethread_cpu);
	} else {
701
		mutex_lock(&workqueue_mutex);
702 703 704 705 706 707 708
		list_add(&wq->list, &workqueues);

		for_each_possible_cpu(cpu) {
			cwq = init_cpu_workqueue(wq, cpu);
			if (err || !cpu_online(cpu))
				continue;
			err = create_workqueue_thread(cwq, cpu);
L
Linus Torvalds 已提交
709
		}
710 711 712 713 714 715 716 717 718 719
		mutex_unlock(&workqueue_mutex);
	}

	if (err) {
		destroy_workqueue(wq);
		wq = NULL;
	}
	return wq;
}
EXPORT_SYMBOL_GPL(__create_workqueue);
L
Linus Torvalds 已提交
720

721 722 723 724
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct wq_barrier barr;
	int alive = 0;
725

726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
	spin_lock_irq(&cwq->lock);
	if (cwq->thread != NULL) {
		insert_wq_barrier(cwq, &barr, 1);
		cwq->should_stop = 1;
		alive = 1;
	}
	spin_unlock_irq(&cwq->lock);

	if (alive) {
		wait_for_completion(&barr.done);

		while (unlikely(cwq->thread != NULL))
			cpu_relax();
		/*
		 * Wait until cwq->thread unlocks cwq->lock,
		 * it won't touch *cwq after that.
		 */
		smp_rmb();
		spin_unlock_wait(&cwq->lock);
	}
}

/**
 * destroy_workqueue - safely terminate a workqueue
 * @wq: target workqueue
 *
 * Safely destroy a workqueue. All work currently pending will be done first.
 */
void destroy_workqueue(struct workqueue_struct *wq)
{
	struct cpu_workqueue_struct *cwq;

	if (is_single_threaded(wq)) {
		cwq = per_cpu_ptr(wq->cpu_wq, singlethread_cpu);
		cleanup_workqueue_thread(cwq, singlethread_cpu);
	} else {
		int cpu;

		mutex_lock(&workqueue_mutex);
		list_del(&wq->list);
766
		mutex_unlock(&workqueue_mutex);
L
Linus Torvalds 已提交
767

768 769 770
		for_each_cpu_mask(cpu, cpu_populated_map) {
			cwq = per_cpu_ptr(wq->cpu_wq, cpu);
			cleanup_workqueue_thread(cwq, cpu);
L
Linus Torvalds 已提交
771
		}
772
	}
773

774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
	free_percpu(wq->cpu_wq);
	kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);

static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
						unsigned long action,
						void *hcpu)
{
	unsigned int cpu = (unsigned long)hcpu;
	struct cpu_workqueue_struct *cwq;
	struct workqueue_struct *wq;

	switch (action) {
	case CPU_LOCK_ACQUIRE:
789
		mutex_lock(&workqueue_mutex);
790
		return NOTIFY_OK;
791

792
	case CPU_LOCK_RELEASE:
793
		mutex_unlock(&workqueue_mutex);
794
		return NOTIFY_OK;
L
Linus Torvalds 已提交
795

796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
	case CPU_UP_PREPARE:
		cpu_set(cpu, cpu_populated_map);
	}

	list_for_each_entry(wq, &workqueues, list) {
		cwq = per_cpu_ptr(wq->cpu_wq, cpu);

		switch (action) {
		case CPU_UP_PREPARE:
			if (!create_workqueue_thread(cwq, cpu))
				break;
			printk(KERN_ERR "workqueue for %i failed\n", cpu);
			return NOTIFY_BAD;

		case CPU_ONLINE:
			wake_up_process(cwq->thread);
			break;

		case CPU_UP_CANCELED:
			if (cwq->thread)
				wake_up_process(cwq->thread);
		case CPU_DEAD:
			cleanup_workqueue_thread(cwq, cpu);
			break;
		}
L
Linus Torvalds 已提交
821 822 823 824 825 826 827
	}

	return NOTIFY_OK;
}

void init_workqueues(void)
{
828
	cpu_populated_map = cpu_online_map;
829
	singlethread_cpu = first_cpu(cpu_possible_map);
L
Linus Torvalds 已提交
830 831 832 833
	hotcpu_notifier(workqueue_cpu_callback, 0);
	keventd_wq = create_workqueue("events");
	BUG_ON(!keventd_wq);
}