workqueue.c 30.8 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6 7 8 9 10 11
/*
 * linux/kernel/workqueue.c
 *
 * Generic mechanism for defining kernel helper threads for running
 * arbitrary tasks in process context.
 *
 * Started by Ingo Molnar, Copyright (C) 2002
 *
 * Derived from the taskqueue/keventd code by:
 *
 *   David Woodhouse <dwmw2@infradead.org>
12
 *   Andrew Morton
L
Linus Torvalds 已提交
13 14
 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
 *   Theodore Ts'o <tytso@mit.edu>
15
 *
C
Christoph Lameter 已提交
16
 * Made to use alloc_percpu by Christoph Lameter.
L
Linus Torvalds 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
30
#include <linux/hardirq.h>
31
#include <linux/mempolicy.h>
32
#include <linux/freezer.h>
33 34
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
35
#include <linux/lockdep.h>
36 37
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
L
Linus Torvalds 已提交
38

T
Tejun Heo 已提交
39 40 41 42 43 44 45 46 47 48
/*
 * Structure fields follow one of the following exclusion rules.
 *
 * I: Set during initialization and read-only afterwards.
 *
 * L: cwq->lock protected.  Access with cwq->lock held.
 *
 * W: workqueue_lock protected.
 */

L
Linus Torvalds 已提交
49
/*
50 51
 * The per-CPU workqueue (if single thread, we always use the first
 * possible cpu).
L
Linus Torvalds 已提交
52 53 54 55 56 57 58
 */
struct cpu_workqueue_struct {

	spinlock_t lock;

	struct list_head worklist;
	wait_queue_head_t more_work;
59
	struct work_struct *current_work;
L
Linus Torvalds 已提交
60

T
Tejun Heo 已提交
61 62
	struct workqueue_struct *wq;		/* I: the owning workqueue */
	struct task_struct	*thread;
L
Linus Torvalds 已提交
63 64 65 66 67 68 69
} ____cacheline_aligned;

/*
 * The externally visible workqueue abstraction is an array of
 * per-CPU workqueues:
 */
struct workqueue_struct {
T
Tejun Heo 已提交
70 71 72
	struct cpu_workqueue_struct *cpu_wq;	/* I: cwq's */
	struct list_head	list;		/* W: list of all workqueues */
	const char		*name;		/* I: workqueue name */
73
	int singlethread;
74
	int freezeable;		/* Freeze threads during suspend */
75
#ifdef CONFIG_LOCKDEP
T
Tejun Heo 已提交
76
	struct lockdep_map	lockdep_map;
77
#endif
L
Linus Torvalds 已提交
78 79
};

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
#ifdef CONFIG_DEBUG_OBJECTS_WORK

static struct debug_obj_descr work_debug_descr;

/*
 * fixup_init is called when:
 * - an active object is initialized
 */
static int work_fixup_init(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_init(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

/*
 * fixup_activate is called when:
 * - an active object is activated
 * - an unknown object is activated (might be a statically initialized object)
 */
static int work_fixup_activate(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {

	case ODEBUG_STATE_NOTAVAILABLE:
		/*
		 * This is not really a fixup. The work struct was
		 * statically initialized. We just make sure that it
		 * is tracked in the object tracker.
		 */
		if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
			debug_object_init(work, &work_debug_descr);
			debug_object_activate(work, &work_debug_descr);
			return 0;
		}
		WARN_ON_ONCE(1);
		return 0;

	case ODEBUG_STATE_ACTIVE:
		WARN_ON(1);

	default:
		return 0;
	}
}

/*
 * fixup_free is called when:
 * - an active object is freed
 */
static int work_fixup_free(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_free(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

static struct debug_obj_descr work_debug_descr = {
	.name		= "work_struct",
	.fixup_init	= work_fixup_init,
	.fixup_activate	= work_fixup_activate,
	.fixup_free	= work_fixup_free,
};

static inline void debug_work_activate(struct work_struct *work)
{
	debug_object_activate(work, &work_debug_descr);
}

static inline void debug_work_deactivate(struct work_struct *work)
{
	debug_object_deactivate(work, &work_debug_descr);
}

void __init_work(struct work_struct *work, int onstack)
{
	if (onstack)
		debug_object_init_on_stack(work, &work_debug_descr);
	else
		debug_object_init(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(__init_work);

void destroy_work_on_stack(struct work_struct *work)
{
	debug_object_free(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(destroy_work_on_stack);

#else
static inline void debug_work_activate(struct work_struct *work) { }
static inline void debug_work_deactivate(struct work_struct *work) { }
#endif

190 191
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
L
Linus Torvalds 已提交
192 193
static LIST_HEAD(workqueues);

194
static int singlethread_cpu __read_mostly;
195
static const struct cpumask *cpu_singlethread_map __read_mostly;
196 197 198 199 200 201 202
/*
 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
 * which comes in between can't use for_each_online_cpu(). We could
 * use cpu_possible_map, the cpumask below is more a documentation
 * than optimization.
 */
203
static cpumask_var_t cpu_populated_map __read_mostly;
204

L
Linus Torvalds 已提交
205
/* If it's single threaded, it isn't in the list of workqueues. */
206
static inline int is_wq_single_threaded(struct workqueue_struct *wq)
L
Linus Torvalds 已提交
207
{
208
	return wq->singlethread;
L
Linus Torvalds 已提交
209 210
}

211
static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
212
{
213
	return is_wq_single_threaded(wq)
214
		? cpu_singlethread_map : cpu_populated_map;
215 216
}

T
Tejun Heo 已提交
217 218
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
					    struct workqueue_struct *wq)
219
{
220
	if (unlikely(is_wq_single_threaded(wq)))
221 222 223 224
		cpu = singlethread_cpu;
	return per_cpu_ptr(wq->cpu_wq, cpu);
}

225 226 227 228
/*
 * Set the workqueue on which a work item is to be run
 * - Must *only* be called if the pending flag is set
 */
229
static inline void set_wq_data(struct work_struct *work,
T
Tejun Heo 已提交
230 231
			       struct cpu_workqueue_struct *cwq,
			       unsigned long extra_flags)
232
{
233
	BUG_ON(!work_pending(work));
234

T
Tejun Heo 已提交
235 236
	atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
			(1UL << WORK_STRUCT_PENDING) | extra_flags);
237 238
}

239 240 241 242 243
/*
 * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
 */
static inline void clear_wq_data(struct work_struct *work)
{
T
Tejun Heo 已提交
244
	atomic_long_set(&work->data, work_static(work));
245 246
}

247 248
static inline
struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
249
{
250
	return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
251 252
}

T
Tejun Heo 已提交
253 254 255 256 257 258 259 260 261 262 263 264
/**
 * insert_work - insert a work into cwq
 * @cwq: cwq @work belongs to
 * @work: work to insert
 * @head: insertion point
 * @extra_flags: extra WORK_STRUCT_* flags to set
 *
 * Insert @work into @cwq after @head.
 *
 * CONTEXT:
 * spin_lock_irq(cwq->lock).
 */
O
Oleg Nesterov 已提交
265
static void insert_work(struct cpu_workqueue_struct *cwq,
T
Tejun Heo 已提交
266 267
			struct work_struct *work, struct list_head *head,
			unsigned int extra_flags)
O
Oleg Nesterov 已提交
268
{
269 270
	trace_workqueue_insertion(cwq->thread, work);

T
Tejun Heo 已提交
271 272 273
	/* we own @work, set data and link */
	set_wq_data(work, cwq, extra_flags);

274 275 276 277 278
	/*
	 * Ensure that we get the right work->data if we see the
	 * result of list_add() below, see try_to_grab_pending().
	 */
	smp_wmb();
T
Tejun Heo 已提交
279

280
	list_add_tail(&work->entry, head);
O
Oleg Nesterov 已提交
281 282 283
	wake_up(&cwq->more_work);
}

T
Tejun Heo 已提交
284
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
L
Linus Torvalds 已提交
285 286
			 struct work_struct *work)
{
T
Tejun Heo 已提交
287
	struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
L
Linus Torvalds 已提交
288 289
	unsigned long flags;

290
	debug_work_activate(work);
L
Linus Torvalds 已提交
291
	spin_lock_irqsave(&cwq->lock, flags);
T
Tejun Heo 已提交
292 293
	BUG_ON(!list_empty(&work->entry));
	insert_work(cwq, work, &cwq->worklist, 0);
L
Linus Torvalds 已提交
294 295 296
	spin_unlock_irqrestore(&cwq->lock, flags);
}

297 298 299 300 301
/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
A
Alan Stern 已提交
302
 * Returns 0 if @work was already on a queue, non-zero otherwise.
L
Linus Torvalds 已提交
303
 *
304 305
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
L
Linus Torvalds 已提交
306
 */
307
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
L
Linus Torvalds 已提交
308
{
309 310 311 312 313
	int ret;

	ret = queue_work_on(get_cpu(), wq, work);
	put_cpu();

L
Linus Torvalds 已提交
314 315
	return ret;
}
316
EXPORT_SYMBOL_GPL(queue_work);
L
Linus Torvalds 已提交
317

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns 0 if @work was already on a queue, non-zero otherwise.
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 */
int
queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
	int ret = 0;

	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
T
Tejun Heo 已提交
335
		__queue_work(cpu, wq, work);
336 337 338 339 340 341
		ret = 1;
	}
	return ret;
}
EXPORT_SYMBOL_GPL(queue_work_on);

342
static void delayed_work_timer_fn(unsigned long __data)
L
Linus Torvalds 已提交
343
{
344
	struct delayed_work *dwork = (struct delayed_work *)__data;
345
	struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
L
Linus Torvalds 已提交
346

T
Tejun Heo 已提交
347
	__queue_work(smp_processor_id(), cwq->wq, &dwork->work);
L
Linus Torvalds 已提交
348 349
}

350 351 352
/**
 * queue_delayed_work - queue work on a workqueue after delay
 * @wq: workqueue to use
353
 * @dwork: delayable work to queue
354 355
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
356
 * Returns 0 if @work was already on a queue, non-zero otherwise.
357
 */
358
int queue_delayed_work(struct workqueue_struct *wq,
359
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
360
{
361
	if (delay == 0)
362
		return queue_work(wq, &dwork->work);
L
Linus Torvalds 已提交
363

364
	return queue_delayed_work_on(-1, wq, dwork, delay);
L
Linus Torvalds 已提交
365
}
366
EXPORT_SYMBOL_GPL(queue_delayed_work);
L
Linus Torvalds 已提交
367

368 369 370 371
/**
 * queue_delayed_work_on - queue work on specific CPU after delay
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
372
 * @dwork: work to queue
373 374
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
375
 * Returns 0 if @work was already on a queue, non-zero otherwise.
376
 */
377
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
378
			struct delayed_work *dwork, unsigned long delay)
379 380
{
	int ret = 0;
381 382
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;
383

384
	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
385 386 387
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

388 389
		timer_stats_timer_set_start_info(&dwork->timer);

390
		/* This stores cwq for the moment, for the timer_fn */
T
Tejun Heo 已提交
391
		set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
392
		timer->expires = jiffies + delay;
393
		timer->data = (unsigned long)dwork;
394
		timer->function = delayed_work_timer_fn;
395 396 397 398 399

		if (unlikely(cpu >= 0))
			add_timer_on(timer, cpu);
		else
			add_timer(timer);
400 401 402 403
		ret = 1;
	}
	return ret;
}
404
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
L
Linus Torvalds 已提交
405

406
static void run_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
407
{
408
	spin_lock_irq(&cwq->lock);
L
Linus Torvalds 已提交
409 410 411
	while (!list_empty(&cwq->worklist)) {
		struct work_struct *work = list_entry(cwq->worklist.next,
						struct work_struct, entry);
412
		work_func_t f = work->func;
413 414 415 416 417 418 419 420 421 422 423
#ifdef CONFIG_LOCKDEP
		/*
		 * It is permissible to free the struct work_struct
		 * from inside the function that is called from it,
		 * this we need to take into account for lockdep too.
		 * To avoid bogus "held lock freed" warnings as well
		 * as problems when looking into work->lockdep_map,
		 * make a copy and use that here.
		 */
		struct lockdep_map lockdep_map = work->lockdep_map;
#endif
424
		trace_workqueue_execution(cwq->thread, work);
425
		debug_work_deactivate(work);
O
Oleg Nesterov 已提交
426
		cwq->current_work = work;
L
Linus Torvalds 已提交
427
		list_del_init(cwq->worklist.next);
428
		spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
429

430
		BUG_ON(get_wq_data(work) != cwq);
O
Oleg Nesterov 已提交
431
		work_clear_pending(work);
432 433
		lock_map_acquire(&cwq->wq->lockdep_map);
		lock_map_acquire(&lockdep_map);
434
		f(work);
435 436
		lock_map_release(&lockdep_map);
		lock_map_release(&cwq->wq->lockdep_map);
L
Linus Torvalds 已提交
437

438 439 440 441
		if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
			printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
					"%s/0x%08x/%d\n",
					current->comm, preempt_count(),
442
				       	task_pid_nr(current));
443 444 445 446 447 448
			printk(KERN_ERR "    last function: ");
			print_symbol("%s\n", (unsigned long)f);
			debug_show_held_locks(current);
			dump_stack();
		}

449
		spin_lock_irq(&cwq->lock);
O
Oleg Nesterov 已提交
450
		cwq->current_work = NULL;
L
Linus Torvalds 已提交
451
	}
452
	spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
453 454
}

T
Tejun Heo 已提交
455 456 457 458 459 460
/**
 * worker_thread - the worker thread function
 * @__cwq: cwq to serve
 *
 * The cwq worker thread function.
 */
L
Linus Torvalds 已提交
461 462 463
static int worker_thread(void *__cwq)
{
	struct cpu_workqueue_struct *cwq = __cwq;
464
	DEFINE_WAIT(wait);
L
Linus Torvalds 已提交
465

466 467
	if (cwq->wq->freezeable)
		set_freezable();
L
Linus Torvalds 已提交
468

469 470
	for (;;) {
		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
471 472 473
		if (!freezing(current) &&
		    !kthread_should_stop() &&
		    list_empty(&cwq->worklist))
L
Linus Torvalds 已提交
474
			schedule();
475 476
		finish_wait(&cwq->more_work, &wait);

477 478
		try_to_freeze();

479
		if (kthread_should_stop())
480
			break;
L
Linus Torvalds 已提交
481

482
		run_workqueue(cwq);
L
Linus Torvalds 已提交
483
	}
484

L
Linus Torvalds 已提交
485 486 487
	return 0;
}

O
Oleg Nesterov 已提交
488 489 490 491 492 493 494 495 496 497 498
struct wq_barrier {
	struct work_struct	work;
	struct completion	done;
};

static void wq_barrier_func(struct work_struct *work)
{
	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
	complete(&barr->done);
}

T
Tejun Heo 已提交
499 500 501 502 503 504 505 506 507 508 509
/**
 * insert_wq_barrier - insert a barrier work
 * @cwq: cwq to insert barrier into
 * @barr: wq_barrier to insert
 * @head: insertion point
 *
 * Insert barrier @barr into @cwq before @head.
 *
 * CONTEXT:
 * spin_lock_irq(cwq->lock).
 */
510
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
511
			struct wq_barrier *barr, struct list_head *head)
O
Oleg Nesterov 已提交
512
{
513 514 515 516 517 518 519
	/*
	 * debugobject calls are safe here even with cwq->lock locked
	 * as we know for sure that this will not trigger any of the
	 * checks and call back into the fixup functions where we
	 * might deadlock.
	 */
	INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
O
Oleg Nesterov 已提交
520 521
	__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
	init_completion(&barr->done);
522

523
	debug_work_activate(&barr->work);
T
Tejun Heo 已提交
524
	insert_work(cwq, &barr->work, head, 0);
O
Oleg Nesterov 已提交
525 526
}

527
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
528
{
529 530
	int active = 0;
	struct wq_barrier barr;
L
Linus Torvalds 已提交
531

532
	WARN_ON(cwq->thread == current);
L
Linus Torvalds 已提交
533

534 535 536 537
	spin_lock_irq(&cwq->lock);
	if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
		insert_wq_barrier(cwq, &barr, &cwq->worklist);
		active = 1;
L
Linus Torvalds 已提交
538
	}
539 540
	spin_unlock_irq(&cwq->lock);

541
	if (active) {
542
		wait_for_completion(&barr.done);
543 544
		destroy_work_on_stack(&barr.work);
	}
545 546

	return active;
L
Linus Torvalds 已提交
547 548
}

549
/**
L
Linus Torvalds 已提交
550
 * flush_workqueue - ensure that any scheduled work has run to completion.
551
 * @wq: workqueue to flush
L
Linus Torvalds 已提交
552 553 554 555
 *
 * Forces execution of the workqueue and blocks until its completion.
 * This is typically used in driver shutdown handlers.
 *
O
Oleg Nesterov 已提交
556 557
 * We sleep until all works which were queued on entry have been handled,
 * but we are not livelocked by new incoming ones.
L
Linus Torvalds 已提交
558
 */
559
void flush_workqueue(struct workqueue_struct *wq)
L
Linus Torvalds 已提交
560
{
561
	const struct cpumask *cpu_map = wq_cpu_map(wq);
562
	int cpu;
L
Linus Torvalds 已提交
563

564
	might_sleep();
565 566
	lock_map_acquire(&wq->lockdep_map);
	lock_map_release(&wq->lockdep_map);
567
	for_each_cpu(cpu, cpu_map)
568
		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
L
Linus Torvalds 已提交
569
}
570
EXPORT_SYMBOL_GPL(flush_workqueue);
L
Linus Torvalds 已提交
571

572 573 574 575
/**
 * flush_work - block until a work_struct's callback has terminated
 * @work: the work which is to be flushed
 *
576 577
 * Returns false if @work has already terminated.
 *
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
 * It is expected that, prior to calling flush_work(), the caller has
 * arranged for the work to not be requeued, otherwise it doesn't make
 * sense to use this function.
 */
int flush_work(struct work_struct *work)
{
	struct cpu_workqueue_struct *cwq;
	struct list_head *prev;
	struct wq_barrier barr;

	might_sleep();
	cwq = get_wq_data(work);
	if (!cwq)
		return 0;

593 594
	lock_map_acquire(&cwq->wq->lockdep_map);
	lock_map_release(&cwq->wq->lockdep_map);
595

596 597 598 599 600 601 602 603
	spin_lock_irq(&cwq->lock);
	if (!list_empty(&work->entry)) {
		/*
		 * See the comment near try_to_grab_pending()->smp_rmb().
		 * If it was re-queued under us we are not going to wait.
		 */
		smp_rmb();
		if (unlikely(cwq != get_wq_data(work)))
T
Tejun Heo 已提交
604
			goto already_gone;
605 606 607
		prev = &work->entry;
	} else {
		if (cwq->current_work != work)
T
Tejun Heo 已提交
608
			goto already_gone;
609 610 611 612
		prev = &cwq->worklist;
	}
	insert_wq_barrier(cwq, &barr, prev->next);

T
Tejun Heo 已提交
613
	spin_unlock_irq(&cwq->lock);
614
	wait_for_completion(&barr.done);
615
	destroy_work_on_stack(&barr.work);
616
	return 1;
T
Tejun Heo 已提交
617 618 619
already_gone:
	spin_unlock_irq(&cwq->lock);
	return 0;
620 621 622
}
EXPORT_SYMBOL_GPL(flush_work);

623
/*
624
 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
625 626 627 628 629
 * so this work can't be re-armed in any way.
 */
static int try_to_grab_pending(struct work_struct *work)
{
	struct cpu_workqueue_struct *cwq;
630
	int ret = -1;
631 632

	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
633
		return 0;
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652

	/*
	 * The queueing is in progress, or it is already queued. Try to
	 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
	 */

	cwq = get_wq_data(work);
	if (!cwq)
		return ret;

	spin_lock_irq(&cwq->lock);
	if (!list_empty(&work->entry)) {
		/*
		 * This work is queued, but perhaps we locked the wrong cwq.
		 * In that case we must see the new value after rmb(), see
		 * insert_work()->wmb().
		 */
		smp_rmb();
		if (cwq == get_wq_data(work)) {
653
			debug_work_deactivate(work);
654 655 656 657 658 659 660 661 662 663
			list_del_init(&work->entry);
			ret = 1;
		}
	}
	spin_unlock_irq(&cwq->lock);

	return ret;
}

static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
O
Oleg Nesterov 已提交
664 665 666 667 668 669 670
				struct work_struct *work)
{
	struct wq_barrier barr;
	int running = 0;

	spin_lock_irq(&cwq->lock);
	if (unlikely(cwq->current_work == work)) {
671
		insert_wq_barrier(cwq, &barr, cwq->worklist.next);
O
Oleg Nesterov 已提交
672 673 674 675
		running = 1;
	}
	spin_unlock_irq(&cwq->lock);

676
	if (unlikely(running)) {
O
Oleg Nesterov 已提交
677
		wait_for_completion(&barr.done);
678 679
		destroy_work_on_stack(&barr.work);
	}
O
Oleg Nesterov 已提交
680 681
}

682
static void wait_on_work(struct work_struct *work)
O
Oleg Nesterov 已提交
683 684
{
	struct cpu_workqueue_struct *cwq;
685
	struct workqueue_struct *wq;
686
	const struct cpumask *cpu_map;
687
	int cpu;
O
Oleg Nesterov 已提交
688

689 690
	might_sleep();

691 692
	lock_map_acquire(&work->lockdep_map);
	lock_map_release(&work->lockdep_map);
693

O
Oleg Nesterov 已提交
694 695
	cwq = get_wq_data(work);
	if (!cwq)
696
		return;
O
Oleg Nesterov 已提交
697

698 699 700
	wq = cwq->wq;
	cpu_map = wq_cpu_map(wq);

701
	for_each_cpu(cpu, cpu_map)
T
Tejun Heo 已提交
702
		wait_on_cpu_work(get_cwq(cpu, wq), work);
703 704
}

705 706 707 708 709 710 711 712 713 714 715 716
static int __cancel_work_timer(struct work_struct *work,
				struct timer_list* timer)
{
	int ret;

	do {
		ret = (timer && likely(del_timer(timer)));
		if (!ret)
			ret = try_to_grab_pending(work);
		wait_on_work(work);
	} while (unlikely(ret < 0));

717
	clear_wq_data(work);
718 719 720
	return ret;
}

721 722 723 724
/**
 * cancel_work_sync - block until a work_struct's callback has terminated
 * @work: the work which is to be flushed
 *
725 726
 * Returns true if @work was pending.
 *
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
 * cancel_work_sync() will cancel the work if it is queued. If the work's
 * callback appears to be running, cancel_work_sync() will block until it
 * has completed.
 *
 * It is possible to use this function if the work re-queues itself. It can
 * cancel the work even if it migrates to another workqueue, however in that
 * case it only guarantees that work->func() has completed on the last queued
 * workqueue.
 *
 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
 * pending, otherwise it goes into a busy-wait loop until the timer expires.
 *
 * The caller must ensure that workqueue_struct on which this work was last
 * queued can't be destroyed before this function returns.
 */
742
int cancel_work_sync(struct work_struct *work)
743
{
744
	return __cancel_work_timer(work, NULL);
O
Oleg Nesterov 已提交
745
}
746
EXPORT_SYMBOL_GPL(cancel_work_sync);
O
Oleg Nesterov 已提交
747

748
/**
749
 * cancel_delayed_work_sync - reliably kill off a delayed work.
750 751
 * @dwork: the delayed work struct
 *
752 753
 * Returns true if @dwork was pending.
 *
754 755 756
 * It is possible to use this function if @dwork rearms itself via queue_work()
 * or queue_delayed_work(). See also the comment for cancel_work_sync().
 */
757
int cancel_delayed_work_sync(struct delayed_work *dwork)
758
{
759
	return __cancel_work_timer(&dwork->work, &dwork->timer);
760
}
761
EXPORT_SYMBOL(cancel_delayed_work_sync);
L
Linus Torvalds 已提交
762

763
static struct workqueue_struct *keventd_wq __read_mostly;
L
Linus Torvalds 已提交
764

765 766 767 768
/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
769 770 771 772 773 774
 * Returns zero if @work was already on the kernel-global workqueue and
 * non-zero otherwise.
 *
 * This puts a job in the kernel-global workqueue if it was not already
 * queued and leaves it in the same position on the kernel-global
 * workqueue otherwise.
775
 */
776
int schedule_work(struct work_struct *work)
L
Linus Torvalds 已提交
777 778 779
{
	return queue_work(keventd_wq, work);
}
780
EXPORT_SYMBOL(schedule_work);
L
Linus Torvalds 已提交
781

782 783 784 785 786 787 788 789 790 791 792 793 794
/*
 * schedule_work_on - put work task on a specific cpu
 * @cpu: cpu to put the work task on
 * @work: job to be done
 *
 * This puts a job on a specific cpu
 */
int schedule_work_on(int cpu, struct work_struct *work)
{
	return queue_work_on(cpu, keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work_on);

795 796
/**
 * schedule_delayed_work - put work task in global workqueue after delay
797 798
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
799 800 801 802
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
803
int schedule_delayed_work(struct delayed_work *dwork,
804
					unsigned long delay)
L
Linus Torvalds 已提交
805
{
806
	return queue_delayed_work(keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
807
}
808
EXPORT_SYMBOL(schedule_delayed_work);
L
Linus Torvalds 已提交
809

810 811 812 813 814 815 816 817 818
/**
 * flush_delayed_work - block until a dwork_struct's callback has terminated
 * @dwork: the delayed work which is to be flushed
 *
 * Any timeout is cancelled, and any pending work is run immediately.
 */
void flush_delayed_work(struct delayed_work *dwork)
{
	if (del_timer_sync(&dwork->timer)) {
T
Tejun Heo 已提交
819 820
		__queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
			     &dwork->work);
821 822 823 824 825 826
		put_cpu();
	}
	flush_work(&dwork->work);
}
EXPORT_SYMBOL(flush_delayed_work);

827 828 829
/**
 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
 * @cpu: cpu to use
830
 * @dwork: job to be done
831 832 833 834 835
 * @delay: number of jiffies to wait
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue on the specified CPU.
 */
L
Linus Torvalds 已提交
836
int schedule_delayed_work_on(int cpu,
837
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
838
{
839
	return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
840
}
841
EXPORT_SYMBOL(schedule_delayed_work_on);
L
Linus Torvalds 已提交
842

843 844 845 846 847 848 849 850 851
/**
 * schedule_on_each_cpu - call a function on each online CPU from keventd
 * @func: the function to call
 *
 * Returns zero on success.
 * Returns -ve errno on failure.
 *
 * schedule_on_each_cpu() is very slow.
 */
852
int schedule_on_each_cpu(work_func_t func)
853 854
{
	int cpu;
855
	int orig = -1;
856
	struct work_struct *works;
857

858 859
	works = alloc_percpu(struct work_struct);
	if (!works)
860
		return -ENOMEM;
861

862 863
	get_online_cpus();

864
	/*
865 866 867
	 * When running in keventd don't schedule a work item on
	 * itself.  Can just call directly because the work queue is
	 * already bound.  This also is faster.
868
	 */
869
	if (current_is_keventd())
870 871
		orig = raw_smp_processor_id();

872
	for_each_online_cpu(cpu) {
873 874 875
		struct work_struct *work = per_cpu_ptr(works, cpu);

		INIT_WORK(work, func);
876
		if (cpu != orig)
877
			schedule_work_on(cpu, work);
878
	}
879 880 881 882 883 884
	if (orig >= 0)
		func(per_cpu_ptr(works, orig));

	for_each_online_cpu(cpu)
		flush_work(per_cpu_ptr(works, cpu));

885
	put_online_cpus();
886
	free_percpu(works);
887 888 889
	return 0;
}

890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
/**
 * flush_scheduled_work - ensure that any scheduled work has run to completion.
 *
 * Forces execution of the kernel-global workqueue and blocks until its
 * completion.
 *
 * Think twice before calling this function!  It's very easy to get into
 * trouble if you don't take great care.  Either of the following situations
 * will lead to deadlock:
 *
 *	One of the work items currently on the workqueue needs to acquire
 *	a lock held by your code or its caller.
 *
 *	Your code is running in the context of a work routine.
 *
 * They will be detected by lockdep when they occur, but the first might not
 * occur very often.  It depends on what work items are on the workqueue and
 * what locks they need, which you have no control over.
 *
 * In most situations flushing the entire workqueue is overkill; you merely
 * need to know that a particular work item isn't queued and isn't running.
 * In such cases you should use cancel_delayed_work_sync() or
 * cancel_work_sync() instead.
 */
L
Linus Torvalds 已提交
914 915 916 917
void flush_scheduled_work(void)
{
	flush_workqueue(keventd_wq);
}
918
EXPORT_SYMBOL(flush_scheduled_work);
L
Linus Torvalds 已提交
919

920 921 922 923 924 925 926 927 928 929 930 931
/**
 * execute_in_process_context - reliably execute the routine with user context
 * @fn:		the function to execute
 * @ew:		guaranteed storage for the execute work structure (must
 *		be available when the work executes)
 *
 * Executes the function immediately if process context is available,
 * otherwise schedules the function for delayed execution.
 *
 * Returns:	0 - function was executed
 *		1 - function was scheduled for execution
 */
932
int execute_in_process_context(work_func_t fn, struct execute_work *ew)
933 934
{
	if (!in_interrupt()) {
935
		fn(&ew->work);
936 937 938
		return 0;
	}

939
	INIT_WORK(&ew->work, fn);
940 941 942 943 944 945
	schedule_work(&ew->work);

	return 1;
}
EXPORT_SYMBOL_GPL(execute_in_process_context);

L
Linus Torvalds 已提交
946 947 948 949 950 951 952 953
int keventd_up(void)
{
	return keventd_wq != NULL;
}

int current_is_keventd(void)
{
	struct cpu_workqueue_struct *cwq;
H
Hugh Dickins 已提交
954
	int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
L
Linus Torvalds 已提交
955 956 957 958
	int ret = 0;

	BUG_ON(!keventd_wq);

959
	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
960 961 962 963 964 965 966
	if (current == cwq->thread)
		ret = 1;

	return ret;

}

967 968
static struct cpu_workqueue_struct *
init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
L
Linus Torvalds 已提交
969
{
970
	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
L
Linus Torvalds 已提交
971

972 973 974 975 976 977
	cwq->wq = wq;
	spin_lock_init(&cwq->lock);
	INIT_LIST_HEAD(&cwq->worklist);
	init_waitqueue_head(&cwq->more_work);

	return cwq;
L
Linus Torvalds 已提交
978 979
}

980 981 982
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct workqueue_struct *wq = cwq->wq;
983
	const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
	struct task_struct *p;

	p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
	/*
	 * Nobody can add the work_struct to this cwq,
	 *	if (caller is __create_workqueue)
	 *		nobody should see this wq
	 *	else // caller is CPU_UP_PREPARE
	 *		cpu is not on cpu_online_map
	 * so we can abort safely.
	 */
	if (IS_ERR(p))
		return PTR_ERR(p);
	cwq->thread = p;

999 1000
	trace_workqueue_creation(cwq->thread, cpu);

1001 1002 1003
	return 0;
}

1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct task_struct *p = cwq->thread;

	if (p != NULL) {
		if (cpu >= 0)
			kthread_bind(p, cpu);
		wake_up_process(p);
	}
}

1015 1016 1017
struct workqueue_struct *__create_workqueue_key(const char *name,
						int singlethread,
						int freezeable,
1018 1019
						struct lock_class_key *key,
						const char *lock_name)
L
Linus Torvalds 已提交
1020 1021
{
	struct workqueue_struct *wq;
1022 1023
	struct cpu_workqueue_struct *cwq;
	int err = 0, cpu;
L
Linus Torvalds 已提交
1024

1025 1026
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
T
Tejun Heo 已提交
1027
		goto err;
1028 1029

	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
T
Tejun Heo 已提交
1030 1031
	if (!wq->cpu_wq)
		goto err;
1032 1033

	wq->name = name;
1034
	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1035
	wq->singlethread = singlethread;
1036
	wq->freezeable = freezeable;
1037
	INIT_LIST_HEAD(&wq->list);
1038 1039 1040 1041

	if (singlethread) {
		cwq = init_cpu_workqueue(wq, singlethread_cpu);
		err = create_workqueue_thread(cwq, singlethread_cpu);
1042
		start_workqueue_thread(cwq, -1);
1043
	} else {
1044
		cpu_maps_update_begin();
1045 1046 1047 1048 1049 1050
		/*
		 * We must place this wq on list even if the code below fails.
		 * cpu_down(cpu) can remove cpu from cpu_populated_map before
		 * destroy_workqueue() takes the lock, in that case we leak
		 * cwq[cpu]->thread.
		 */
1051
		spin_lock(&workqueue_lock);
1052
		list_add(&wq->list, &workqueues);
1053
		spin_unlock(&workqueue_lock);
1054 1055 1056 1057 1058 1059
		/*
		 * We must initialize cwqs for each possible cpu even if we
		 * are going to call destroy_workqueue() finally. Otherwise
		 * cpu_up() can hit the uninitialized cwq once we drop the
		 * lock.
		 */
1060 1061 1062 1063 1064
		for_each_possible_cpu(cpu) {
			cwq = init_cpu_workqueue(wq, cpu);
			if (err || !cpu_online(cpu))
				continue;
			err = create_workqueue_thread(cwq, cpu);
1065
			start_workqueue_thread(cwq, cpu);
L
Linus Torvalds 已提交
1066
		}
1067
		cpu_maps_update_done();
1068 1069 1070 1071 1072 1073 1074
	}

	if (err) {
		destroy_workqueue(wq);
		wq = NULL;
	}
	return wq;
T
Tejun Heo 已提交
1075 1076 1077 1078 1079 1080
err:
	if (wq) {
		free_percpu(wq->cpu_wq);
		kfree(wq);
	}
	return NULL;
1081
}
1082
EXPORT_SYMBOL_GPL(__create_workqueue_key);
L
Linus Torvalds 已提交
1083

1084
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1085
{
1086
	/*
1087 1088
	 * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
	 * cpu_add_remove_lock protects cwq->thread.
1089 1090 1091
	 */
	if (cwq->thread == NULL)
		return;
1092

1093 1094
	lock_map_acquire(&cwq->wq->lockdep_map);
	lock_map_release(&cwq->wq->lockdep_map);
1095

O
Oleg Nesterov 已提交
1096
	flush_cpu_workqueue(cwq);
1097
	/*
1098
	 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
O
Oleg Nesterov 已提交
1099 1100 1101
	 * a concurrent flush_workqueue() can insert a barrier after us.
	 * However, in that case run_workqueue() won't return and check
	 * kthread_should_stop() until it flushes all work_struct's.
1102 1103 1104 1105 1106
	 * When ->worklist becomes empty it is safe to exit because no
	 * more work_structs can be queued on this cwq: flush_workqueue
	 * checks list_empty(), and a "normal" queue_work() can't use
	 * a dead CPU.
	 */
1107
	trace_workqueue_destruction(cwq->thread);
1108 1109
	kthread_stop(cwq->thread);
	cwq->thread = NULL;
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
}

/**
 * destroy_workqueue - safely terminate a workqueue
 * @wq: target workqueue
 *
 * Safely destroy a workqueue. All work currently pending will be done first.
 */
void destroy_workqueue(struct workqueue_struct *wq)
{
1120
	const struct cpumask *cpu_map = wq_cpu_map(wq);
1121
	int cpu;
1122

1123
	cpu_maps_update_begin();
1124
	spin_lock(&workqueue_lock);
1125
	list_del(&wq->list);
1126
	spin_unlock(&workqueue_lock);
1127

1128
	for_each_cpu(cpu, cpu_map)
1129
		cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
1130
 	cpu_maps_update_done();
1131

1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
	free_percpu(wq->cpu_wq);
	kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);

static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
						unsigned long action,
						void *hcpu)
{
	unsigned int cpu = (unsigned long)hcpu;
	struct cpu_workqueue_struct *cwq;
	struct workqueue_struct *wq;
1144
	int err = 0;
1145

1146 1147
	action &= ~CPU_TASKS_FROZEN;

1148 1149
	switch (action) {
	case CPU_UP_PREPARE:
1150
		cpumask_set_cpu(cpu, cpu_populated_map);
1151
	}
1152
undo:
1153 1154 1155 1156 1157
	list_for_each_entry(wq, &workqueues, list) {
		cwq = per_cpu_ptr(wq->cpu_wq, cpu);

		switch (action) {
		case CPU_UP_PREPARE:
1158 1159
			err = create_workqueue_thread(cwq, cpu);
			if (!err)
1160
				break;
1161 1162
			printk(KERN_ERR "workqueue [%s] for %i failed\n",
				wq->name, cpu);
1163
			action = CPU_UP_CANCELED;
1164
			err = -ENOMEM;
1165
			goto undo;
1166 1167

		case CPU_ONLINE:
1168
			start_workqueue_thread(cwq, cpu);
1169 1170 1171
			break;

		case CPU_UP_CANCELED:
1172
			start_workqueue_thread(cwq, -1);
1173
		case CPU_POST_DEAD:
1174
			cleanup_workqueue_thread(cwq);
1175 1176
			break;
		}
L
Linus Torvalds 已提交
1177 1178
	}

1179 1180
	switch (action) {
	case CPU_UP_CANCELED:
1181
	case CPU_POST_DEAD:
1182
		cpumask_clear_cpu(cpu, cpu_populated_map);
1183 1184
	}

1185
	return notifier_from_errno(err);
L
Linus Torvalds 已提交
1186 1187
}

1188
#ifdef CONFIG_SMP
1189

1190
struct work_for_cpu {
1191
	struct completion completion;
1192 1193 1194 1195 1196
	long (*fn)(void *);
	void *arg;
	long ret;
};

1197
static int do_work_for_cpu(void *_wfc)
1198
{
1199
	struct work_for_cpu *wfc = _wfc;
1200
	wfc->ret = wfc->fn(wfc->arg);
1201 1202
	complete(&wfc->completion);
	return 0;
1203 1204 1205 1206 1207 1208 1209 1210
}

/**
 * work_on_cpu - run a function in user context on a particular cpu
 * @cpu: the cpu to run on
 * @fn: the function to run
 * @arg: the function arg
 *
1211 1212
 * This will return the value @fn returns.
 * It is up to the caller to ensure that the cpu doesn't go offline.
1213
 * The caller must not hold any locks which would prevent @fn from completing.
1214 1215 1216
 */
long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
{
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
	struct task_struct *sub_thread;
	struct work_for_cpu wfc = {
		.completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
		.fn = fn,
		.arg = arg,
	};

	sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
	if (IS_ERR(sub_thread))
		return PTR_ERR(sub_thread);
	kthread_bind(sub_thread, cpu);
	wake_up_process(sub_thread);
	wait_for_completion(&wfc.completion);
1230 1231 1232 1233 1234
	return wfc.ret;
}
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */

1235
void __init init_workqueues(void)
L
Linus Torvalds 已提交
1236
{
1237 1238 1239 1240 1241
	alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);

	cpumask_copy(cpu_populated_map, cpu_online_mask);
	singlethread_cpu = cpumask_first(cpu_possible_mask);
	cpu_singlethread_map = cpumask_of(singlethread_cpu);
L
Linus Torvalds 已提交
1242 1243 1244 1245
	hotcpu_notifier(workqueue_cpu_callback, 0);
	keventd_wq = create_workqueue("events");
	BUG_ON(!keventd_wq);
}