workqueue.c 20.2 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * linux/kernel/workqueue.c
 *
 * Generic mechanism for defining kernel helper threads for running
 * arbitrary tasks in process context.
 *
 * Started by Ingo Molnar, Copyright (C) 2002
 *
 * Derived from the taskqueue/keventd code by:
 *
 *   David Woodhouse <dwmw2@infradead.org>
 *   Andrew Morton <andrewm@uow.edu.au>
 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
 *   Theodore Ts'o <tytso@mit.edu>
15 16
 *
 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
L
Linus Torvalds 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
30
#include <linux/hardirq.h>
31
#include <linux/mempolicy.h>
32
#include <linux/freezer.h>
33 34
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
L
Linus Torvalds 已提交
35 36

/*
37 38
 * The per-CPU workqueue (if single thread, we always use the first
 * possible cpu).
L
Linus Torvalds 已提交
39 40 41 42 43 44 45
 */
struct cpu_workqueue_struct {

	spinlock_t lock;

	struct list_head worklist;
	wait_queue_head_t more_work;
46
	struct work_struct *current_work;
L
Linus Torvalds 已提交
47 48

	struct workqueue_struct *wq;
49
	struct task_struct *thread;
50
	int should_stop;
L
Linus Torvalds 已提交
51 52 53 54 55 56 57 58 59

	int run_depth;		/* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;

/*
 * The externally visible workqueue abstraction is an array of
 * per-CPU workqueues:
 */
struct workqueue_struct {
60
	struct cpu_workqueue_struct *cpu_wq;
61
	struct list_head list;
L
Linus Torvalds 已提交
62
	const char *name;
63
	int singlethread;
64
	int freezeable;		/* Freeze threads during suspend */
L
Linus Torvalds 已提交
65 66 67 68
};

/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
   threads to each one as cpus come/go. */
69
static DEFINE_MUTEX(workqueue_mutex);
L
Linus Torvalds 已提交
70 71
static LIST_HEAD(workqueues);

72
static int singlethread_cpu __read_mostly;
73
static cpumask_t cpu_singlethread_map __read_mostly;
74 75
/* optimization, we could use cpu_possible_map */
static cpumask_t cpu_populated_map __read_mostly;
76

L
Linus Torvalds 已提交
77 78 79
/* If it's single threaded, it isn't in the list of workqueues. */
static inline int is_single_threaded(struct workqueue_struct *wq)
{
80
	return wq->singlethread;
L
Linus Torvalds 已提交
81 82
}

83 84 85 86 87 88
static const cpumask_t *wq_cpu_map(struct workqueue_struct *wq)
{
	return is_single_threaded(wq)
		? &cpu_singlethread_map : &cpu_populated_map;
}

89 90 91 92
/*
 * Set the workqueue on which a work item is to be run
 * - Must *only* be called if the pending flag is set
 */
93 94
static inline void set_wq_data(struct work_struct *work, void *wq)
{
95 96 97
	unsigned long new;

	BUG_ON(!work_pending(work));
98 99

	new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
100 101
	new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
	atomic_long_set(&work->data, new);
102 103 104 105
}

static inline void *get_wq_data(struct work_struct *work)
{
106
	return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
107 108
}

O
Oleg Nesterov 已提交
109 110 111 112 113 114 115 116 117 118 119
static void insert_work(struct cpu_workqueue_struct *cwq,
				struct work_struct *work, int tail)
{
	set_wq_data(work, cwq);
	if (tail)
		list_add_tail(&work->entry, &cwq->worklist);
	else
		list_add(&work->entry, &cwq->worklist);
	wake_up(&cwq->more_work);
}

L
Linus Torvalds 已提交
120 121 122 123 124 125 126
/* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq,
			 struct work_struct *work)
{
	unsigned long flags;

	spin_lock_irqsave(&cwq->lock, flags);
O
Oleg Nesterov 已提交
127
	insert_work(cwq, work, 1);
L
Linus Torvalds 已提交
128 129 130
	spin_unlock_irqrestore(&cwq->lock, flags);
}

131 132 133 134 135
/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
A
Alan Stern 已提交
136
 * Returns 0 if @work was already on a queue, non-zero otherwise.
L
Linus Torvalds 已提交
137 138 139 140 141 142 143 144
 *
 * We queue the work to the CPU it was submitted, but there is no
 * guarantee that it will be processed by that CPU.
 */
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
	int ret = 0, cpu = get_cpu();

145
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
L
Linus Torvalds 已提交
146
		if (unlikely(is_single_threaded(wq)))
147
			cpu = singlethread_cpu;
L
Linus Torvalds 已提交
148
		BUG_ON(!list_empty(&work->entry));
149
		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
L
Linus Torvalds 已提交
150 151 152 153 154
		ret = 1;
	}
	put_cpu();
	return ret;
}
155
EXPORT_SYMBOL_GPL(queue_work);
L
Linus Torvalds 已提交
156

157
void delayed_work_timer_fn(unsigned long __data)
L
Linus Torvalds 已提交
158
{
159
	struct delayed_work *dwork = (struct delayed_work *)__data;
160
	struct workqueue_struct *wq = get_wq_data(&dwork->work);
L
Linus Torvalds 已提交
161 162 163
	int cpu = smp_processor_id();

	if (unlikely(is_single_threaded(wq)))
164
		cpu = singlethread_cpu;
L
Linus Torvalds 已提交
165

166
	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
L
Linus Torvalds 已提交
167 168
}

169 170 171
/**
 * queue_delayed_work - queue work on a workqueue after delay
 * @wq: workqueue to use
172
 * @dwork: delayable work to queue
173 174
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
175
 * Returns 0 if @work was already on a queue, non-zero otherwise.
176
 */
L
Linus Torvalds 已提交
177
int fastcall queue_delayed_work(struct workqueue_struct *wq,
178
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
179 180
{
	int ret = 0;
181 182 183
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;

184
	timer_stats_timer_set_start_info(timer);
185 186
	if (delay == 0)
		return queue_work(wq, work);
L
Linus Torvalds 已提交
187

188
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
L
Linus Torvalds 已提交
189 190 191 192
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

		/* This stores wq for the moment, for the timer_fn */
193
		set_wq_data(work, wq);
L
Linus Torvalds 已提交
194
		timer->expires = jiffies + delay;
195
		timer->data = (unsigned long)dwork;
L
Linus Torvalds 已提交
196 197 198 199 200 201
		timer->function = delayed_work_timer_fn;
		add_timer(timer);
		ret = 1;
	}
	return ret;
}
202
EXPORT_SYMBOL_GPL(queue_delayed_work);
L
Linus Torvalds 已提交
203

204 205 206 207
/**
 * queue_delayed_work_on - queue work on specific CPU after delay
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
208
 * @dwork: work to queue
209 210
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
211
 * Returns 0 if @work was already on a queue, non-zero otherwise.
212
 */
213
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
214
			struct delayed_work *dwork, unsigned long delay)
215 216
{
	int ret = 0;
217 218
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;
219

220
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
221 222 223 224
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

		/* This stores wq for the moment, for the timer_fn */
225
		set_wq_data(work, wq);
226
		timer->expires = jiffies + delay;
227
		timer->data = (unsigned long)dwork;
228 229 230 231 232 233
		timer->function = delayed_work_timer_fn;
		add_timer_on(timer, cpu);
		ret = 1;
	}
	return ret;
}
234
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
L
Linus Torvalds 已提交
235

236
static void run_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
237
{
238
	spin_lock_irq(&cwq->lock);
L
Linus Torvalds 已提交
239 240 241 242 243 244 245 246 247 248
	cwq->run_depth++;
	if (cwq->run_depth > 3) {
		/* morton gets to eat his hat */
		printk("%s: recursion depth exceeded: %d\n",
			__FUNCTION__, cwq->run_depth);
		dump_stack();
	}
	while (!list_empty(&cwq->worklist)) {
		struct work_struct *work = list_entry(cwq->worklist.next,
						struct work_struct, entry);
249
		work_func_t f = work->func;
L
Linus Torvalds 已提交
250

O
Oleg Nesterov 已提交
251
		cwq->current_work = work;
L
Linus Torvalds 已提交
252
		list_del_init(cwq->worklist.next);
253
		spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
254

255
		BUG_ON(get_wq_data(work) != cwq);
256
		if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
257 258
			work_release(work);
		f(work);
L
Linus Torvalds 已提交
259

260 261 262 263 264 265 266 267 268 269 270
		if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
			printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
					"%s/0x%08x/%d\n",
					current->comm, preempt_count(),
				       	current->pid);
			printk(KERN_ERR "    last function: ");
			print_symbol("%s\n", (unsigned long)f);
			debug_show_held_locks(current);
			dump_stack();
		}

271
		spin_lock_irq(&cwq->lock);
O
Oleg Nesterov 已提交
272
		cwq->current_work = NULL;
L
Linus Torvalds 已提交
273 274
	}
	cwq->run_depth--;
275
	spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
276 277
}

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
/*
 * NOTE: the caller must not touch *cwq if this func returns true
 */
static int cwq_should_stop(struct cpu_workqueue_struct *cwq)
{
	int should_stop = cwq->should_stop;

	if (unlikely(should_stop)) {
		spin_lock_irq(&cwq->lock);
		should_stop = cwq->should_stop && list_empty(&cwq->worklist);
		if (should_stop)
			cwq->thread = NULL;
		spin_unlock_irq(&cwq->lock);
	}

	return should_stop;
}

L
Linus Torvalds 已提交
296 297 298
static int worker_thread(void *__cwq)
{
	struct cpu_workqueue_struct *cwq = __cwq;
299
	DEFINE_WAIT(wait);
L
Linus Torvalds 已提交
300 301 302
	struct k_sigaction sa;
	sigset_t blocked;

303
	if (!cwq->wq->freezeable)
304
		current->flags |= PF_NOFREEZE;
L
Linus Torvalds 已提交
305 306 307 308 309 310 311 312

	set_user_nice(current, -5);

	/* Block and flush all signals */
	sigfillset(&blocked);
	sigprocmask(SIG_BLOCK, &blocked, NULL);
	flush_signals(current);

313 314 315 316 317 318
	/*
	 * We inherited MPOL_INTERLEAVE from the booting kernel.
	 * Set MPOL_DEFAULT to insure node local allocations.
	 */
	numa_default_policy();

L
Linus Torvalds 已提交
319 320 321 322 323 324
	/* SIG_IGN makes children autoreap: see do_notify_parent(). */
	sa.sa.sa_handler = SIG_IGN;
	sa.sa.sa_flags = 0;
	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

325
	for (;;) {
326
		if (cwq->wq->freezeable)
327 328
			try_to_freeze();

329 330
		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
		if (!cwq->should_stop && list_empty(&cwq->worklist))
L
Linus Torvalds 已提交
331
			schedule();
332 333 334 335
		finish_wait(&cwq->more_work, &wait);

		if (cwq_should_stop(cwq))
			break;
L
Linus Torvalds 已提交
336

337
		run_workqueue(cwq);
L
Linus Torvalds 已提交
338
	}
339

L
Linus Torvalds 已提交
340 341 342
	return 0;
}

O
Oleg Nesterov 已提交
343 344 345 346 347 348 349 350 351 352 353
struct wq_barrier {
	struct work_struct	work;
	struct completion	done;
};

static void wq_barrier_func(struct work_struct *work)
{
	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
	complete(&barr->done);
}

354 355
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
					struct wq_barrier *barr, int tail)
O
Oleg Nesterov 已提交
356 357 358 359 360
{
	INIT_WORK(&barr->work, wq_barrier_func);
	__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));

	init_completion(&barr->done);
361 362

	insert_work(cwq, &barr->work, tail);
O
Oleg Nesterov 已提交
363 364
}

L
Linus Torvalds 已提交
365 366 367 368 369 370 371 372 373
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
	if (cwq->thread == current) {
		/*
		 * Probably keventd trying to flush its own queue. So simply run
		 * it by hand rather than deadlocking.
		 */
		run_workqueue(cwq);
	} else {
O
Oleg Nesterov 已提交
374
		struct wq_barrier barr;
375
		int active = 0;
L
Linus Torvalds 已提交
376

377 378 379 380 381 382
		spin_lock_irq(&cwq->lock);
		if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
			insert_wq_barrier(cwq, &barr, 1);
			active = 1;
		}
		spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
383

384
		if (active)
385
			wait_for_completion(&barr.done);
L
Linus Torvalds 已提交
386 387 388
	}
}

389
/**
L
Linus Torvalds 已提交
390
 * flush_workqueue - ensure that any scheduled work has run to completion.
391
 * @wq: workqueue to flush
L
Linus Torvalds 已提交
392 393 394 395
 *
 * Forces execution of the workqueue and blocks until its completion.
 * This is typically used in driver shutdown handlers.
 *
O
Oleg Nesterov 已提交
396 397
 * We sleep until all works which were queued on entry have been handled,
 * but we are not livelocked by new incoming ones.
L
Linus Torvalds 已提交
398 399 400 401 402 403
 *
 * This function used to run the workqueues itself.  Now we just wait for the
 * helper threads to do it.
 */
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
404
	const cpumask_t *cpu_map = wq_cpu_map(wq);
405
	int cpu;
L
Linus Torvalds 已提交
406

407 408 409
	might_sleep();
	for_each_cpu_mask(cpu, *cpu_map)
		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
L
Linus Torvalds 已提交
410
}
411
EXPORT_SYMBOL_GPL(flush_workqueue);
L
Linus Torvalds 已提交
412

O
Oleg Nesterov 已提交
413 414 415 416 417 418 419 420
static void wait_on_work(struct cpu_workqueue_struct *cwq,
				struct work_struct *work)
{
	struct wq_barrier barr;
	int running = 0;

	spin_lock_irq(&cwq->lock);
	if (unlikely(cwq->current_work == work)) {
421
		insert_wq_barrier(cwq, &barr, 0);
O
Oleg Nesterov 已提交
422 423 424 425
		running = 1;
	}
	spin_unlock_irq(&cwq->lock);

426
	if (unlikely(running))
O
Oleg Nesterov 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
		wait_for_completion(&barr.done);
}

/**
 * flush_work - block until a work_struct's callback has terminated
 * @wq: the workqueue on which the work is queued
 * @work: the work which is to be flushed
 *
 * flush_work() will attempt to cancel the work if it is queued.  If the work's
 * callback appears to be running, flush_work() will block until it has
 * completed.
 *
 * flush_work() is designed to be used when the caller is tearing down data
 * structures which the callback function operates upon.  It is expected that,
 * prior to calling flush_work(), the caller has arranged for the work to not
 * be requeued.
 */
void flush_work(struct workqueue_struct *wq, struct work_struct *work)
{
446
	const cpumask_t *cpu_map = wq_cpu_map(wq);
O
Oleg Nesterov 已提交
447
	struct cpu_workqueue_struct *cwq;
448
	int cpu;
O
Oleg Nesterov 已提交
449

450 451
	might_sleep();

O
Oleg Nesterov 已提交
452 453 454
	cwq = get_wq_data(work);
	/* Was it ever queued ? */
	if (!cwq)
455
		return;
O
Oleg Nesterov 已提交
456 457

	/*
458 459
	 * This work can't be re-queued, no need to re-check that
	 * get_wq_data() is still the same when we take cwq->lock.
O
Oleg Nesterov 已提交
460 461 462 463 464 465
	 */
	spin_lock_irq(&cwq->lock);
	list_del_init(&work->entry);
	work_release(work);
	spin_unlock_irq(&cwq->lock);

466 467
	for_each_cpu_mask(cpu, *cpu_map)
		wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
O
Oleg Nesterov 已提交
468 469 470
}
EXPORT_SYMBOL_GPL(flush_work);

L
Linus Torvalds 已提交
471 472 473

static struct workqueue_struct *keventd_wq;

474 475 476 477 478 479
/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
 * This puts a job in the kernel-global workqueue.
 */
L
Linus Torvalds 已提交
480 481 482 483
int fastcall schedule_work(struct work_struct *work)
{
	return queue_work(keventd_wq, work);
}
484
EXPORT_SYMBOL(schedule_work);
L
Linus Torvalds 已提交
485

486 487
/**
 * schedule_delayed_work - put work task in global workqueue after delay
488 489
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
490 491 492 493
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
494 495
int fastcall schedule_delayed_work(struct delayed_work *dwork,
					unsigned long delay)
L
Linus Torvalds 已提交
496
{
497
	timer_stats_timer_set_start_info(&dwork->timer);
498
	return queue_delayed_work(keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
499
}
500
EXPORT_SYMBOL(schedule_delayed_work);
L
Linus Torvalds 已提交
501

502 503 504
/**
 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
 * @cpu: cpu to use
505
 * @dwork: job to be done
506 507 508 509 510
 * @delay: number of jiffies to wait
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue on the specified CPU.
 */
L
Linus Torvalds 已提交
511
int schedule_delayed_work_on(int cpu,
512
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
513
{
514
	return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
515
}
516
EXPORT_SYMBOL(schedule_delayed_work_on);
L
Linus Torvalds 已提交
517

518 519 520 521 522 523 524 525 526 527 528
/**
 * schedule_on_each_cpu - call a function on each online CPU from keventd
 * @func: the function to call
 *
 * Returns zero on success.
 * Returns -ve errno on failure.
 *
 * Appears to be racy against CPU hotplug.
 *
 * schedule_on_each_cpu() is very slow.
 */
529
int schedule_on_each_cpu(work_func_t func)
530 531
{
	int cpu;
532
	struct work_struct *works;
533

534 535
	works = alloc_percpu(struct work_struct);
	if (!works)
536
		return -ENOMEM;
537

538
	preempt_disable();		/* CPU hotplug */
539
	for_each_online_cpu(cpu) {
540 541 542 543 544
		struct work_struct *work = per_cpu_ptr(works, cpu);

		INIT_WORK(work, func);
		set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
		__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
545
	}
546
	preempt_enable();
547
	flush_workqueue(keventd_wq);
548
	free_percpu(works);
549 550 551
	return 0;
}

L
Linus Torvalds 已提交
552 553 554 555
void flush_scheduled_work(void)
{
	flush_workqueue(keventd_wq);
}
556
EXPORT_SYMBOL(flush_scheduled_work);
L
Linus Torvalds 已提交
557

O
Oleg Nesterov 已提交
558 559 560 561 562 563
void flush_work_keventd(struct work_struct *work)
{
	flush_work(keventd_wq, work);
}
EXPORT_SYMBOL(flush_work_keventd);

L
Linus Torvalds 已提交
564
/**
565
 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
L
Linus Torvalds 已提交
566
 * @wq:   the controlling workqueue structure
567
 * @dwork: the delayed work struct
L
Linus Torvalds 已提交
568
 */
569
void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
570
				       struct delayed_work *dwork)
L
Linus Torvalds 已提交
571
{
572 573 574 575
	/* Was it ever queued ? */
	if (!get_wq_data(&dwork->work))
		return;

576
	while (!cancel_delayed_work(dwork))
L
Linus Torvalds 已提交
577 578
		flush_workqueue(wq);
}
579
EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
L
Linus Torvalds 已提交
580 581

/**
582
 * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
583
 * @dwork: the delayed work struct
L
Linus Torvalds 已提交
584
 */
585
void cancel_rearming_delayed_work(struct delayed_work *dwork)
L
Linus Torvalds 已提交
586
{
587
	cancel_rearming_delayed_workqueue(keventd_wq, dwork);
L
Linus Torvalds 已提交
588 589 590
}
EXPORT_SYMBOL(cancel_rearming_delayed_work);

591 592 593 594 595 596 597 598 599 600 601 602
/**
 * execute_in_process_context - reliably execute the routine with user context
 * @fn:		the function to execute
 * @ew:		guaranteed storage for the execute work structure (must
 *		be available when the work executes)
 *
 * Executes the function immediately if process context is available,
 * otherwise schedules the function for delayed execution.
 *
 * Returns:	0 - function was executed
 *		1 - function was scheduled for execution
 */
603
int execute_in_process_context(work_func_t fn, struct execute_work *ew)
604 605
{
	if (!in_interrupt()) {
606
		fn(&ew->work);
607 608 609
		return 0;
	}

610
	INIT_WORK(&ew->work, fn);
611 612 613 614 615 616
	schedule_work(&ew->work);

	return 1;
}
EXPORT_SYMBOL_GPL(execute_in_process_context);

L
Linus Torvalds 已提交
617 618 619 620 621 622 623 624 625 626 627 628 629
int keventd_up(void)
{
	return keventd_wq != NULL;
}

int current_is_keventd(void)
{
	struct cpu_workqueue_struct *cwq;
	int cpu = smp_processor_id();	/* preempt-safe: keventd is per-cpu */
	int ret = 0;

	BUG_ON(!keventd_wq);

630
	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
631 632 633 634 635 636 637
	if (current == cwq->thread)
		ret = 1;

	return ret;

}

638 639
static struct cpu_workqueue_struct *
init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
L
Linus Torvalds 已提交
640
{
641
	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
642

643 644 645 646 647 648
	cwq->wq = wq;
	spin_lock_init(&cwq->lock);
	INIT_LIST_HEAD(&cwq->worklist);
	init_waitqueue_head(&cwq->more_work);

	return cwq;
L
Linus Torvalds 已提交
649 650
}

651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct workqueue_struct *wq = cwq->wq;
	const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
	struct task_struct *p;

	p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
	/*
	 * Nobody can add the work_struct to this cwq,
	 *	if (caller is __create_workqueue)
	 *		nobody should see this wq
	 *	else // caller is CPU_UP_PREPARE
	 *		cpu is not on cpu_online_map
	 * so we can abort safely.
	 */
	if (IS_ERR(p))
		return PTR_ERR(p);

	cwq->thread = p;
	cwq->should_stop = 0;
	if (!is_single_threaded(wq))
		kthread_bind(p, cpu);

	if (is_single_threaded(wq) || cpu_online(cpu))
		wake_up_process(p);

	return 0;
}

struct workqueue_struct *__create_workqueue(const char *name,
					    int singlethread, int freezeable)
L
Linus Torvalds 已提交
682 683
{
	struct workqueue_struct *wq;
684 685
	struct cpu_workqueue_struct *cwq;
	int err = 0, cpu;
L
Linus Torvalds 已提交
686

687 688 689 690 691 692 693 694 695 696 697
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
		return NULL;

	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
	if (!wq->cpu_wq) {
		kfree(wq);
		return NULL;
	}

	wq->name = name;
698
	wq->singlethread = singlethread;
699
	wq->freezeable = freezeable;
700
	INIT_LIST_HEAD(&wq->list);
701 702 703 704 705

	if (singlethread) {
		cwq = init_cpu_workqueue(wq, singlethread_cpu);
		err = create_workqueue_thread(cwq, singlethread_cpu);
	} else {
706
		mutex_lock(&workqueue_mutex);
707 708 709 710 711 712 713
		list_add(&wq->list, &workqueues);

		for_each_possible_cpu(cpu) {
			cwq = init_cpu_workqueue(wq, cpu);
			if (err || !cpu_online(cpu))
				continue;
			err = create_workqueue_thread(cwq, cpu);
L
Linus Torvalds 已提交
714
		}
715 716 717 718 719 720 721 722 723 724
		mutex_unlock(&workqueue_mutex);
	}

	if (err) {
		destroy_workqueue(wq);
		wq = NULL;
	}
	return wq;
}
EXPORT_SYMBOL_GPL(__create_workqueue);
L
Linus Torvalds 已提交
725

726 727 728 729
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct wq_barrier barr;
	int alive = 0;
730

731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
	spin_lock_irq(&cwq->lock);
	if (cwq->thread != NULL) {
		insert_wq_barrier(cwq, &barr, 1);
		cwq->should_stop = 1;
		alive = 1;
	}
	spin_unlock_irq(&cwq->lock);

	if (alive) {
		wait_for_completion(&barr.done);

		while (unlikely(cwq->thread != NULL))
			cpu_relax();
		/*
		 * Wait until cwq->thread unlocks cwq->lock,
		 * it won't touch *cwq after that.
		 */
		smp_rmb();
		spin_unlock_wait(&cwq->lock);
	}
}

/**
 * destroy_workqueue - safely terminate a workqueue
 * @wq: target workqueue
 *
 * Safely destroy a workqueue. All work currently pending will be done first.
 */
void destroy_workqueue(struct workqueue_struct *wq)
{
761
	const cpumask_t *cpu_map = wq_cpu_map(wq);
762
	struct cpu_workqueue_struct *cwq;
763
	int cpu;
764

765 766 767
	mutex_lock(&workqueue_mutex);
	list_del(&wq->list);
	mutex_unlock(&workqueue_mutex);
768

769 770 771
	for_each_cpu_mask(cpu, *cpu_map) {
		cwq = per_cpu_ptr(wq->cpu_wq, cpu);
		cleanup_workqueue_thread(cwq, cpu);
772
	}
773

774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
	free_percpu(wq->cpu_wq);
	kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);

static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
						unsigned long action,
						void *hcpu)
{
	unsigned int cpu = (unsigned long)hcpu;
	struct cpu_workqueue_struct *cwq;
	struct workqueue_struct *wq;

	switch (action) {
	case CPU_LOCK_ACQUIRE:
789
		mutex_lock(&workqueue_mutex);
790
		return NOTIFY_OK;
791

792
	case CPU_LOCK_RELEASE:
793
		mutex_unlock(&workqueue_mutex);
794
		return NOTIFY_OK;
L
Linus Torvalds 已提交
795

796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
	case CPU_UP_PREPARE:
		cpu_set(cpu, cpu_populated_map);
	}

	list_for_each_entry(wq, &workqueues, list) {
		cwq = per_cpu_ptr(wq->cpu_wq, cpu);

		switch (action) {
		case CPU_UP_PREPARE:
			if (!create_workqueue_thread(cwq, cpu))
				break;
			printk(KERN_ERR "workqueue for %i failed\n", cpu);
			return NOTIFY_BAD;

		case CPU_ONLINE:
			wake_up_process(cwq->thread);
			break;

		case CPU_UP_CANCELED:
			if (cwq->thread)
				wake_up_process(cwq->thread);
		case CPU_DEAD:
			cleanup_workqueue_thread(cwq, cpu);
			break;
		}
L
Linus Torvalds 已提交
821 822 823 824 825
	}

	return NOTIFY_OK;
}

826
void __init init_workqueues(void)
L
Linus Torvalds 已提交
827
{
828
	cpu_populated_map = cpu_online_map;
829
	singlethread_cpu = first_cpu(cpu_possible_map);
830
	cpu_singlethread_map = cpumask_of_cpu(singlethread_cpu);
L
Linus Torvalds 已提交
831 832 833 834
	hotcpu_notifier(workqueue_cpu_callback, 0);
	keventd_wq = create_workqueue("events");
	BUG_ON(!keventd_wq);
}