workqueue.c 30.9 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6 7 8 9 10 11
/*
 * linux/kernel/workqueue.c
 *
 * Generic mechanism for defining kernel helper threads for running
 * arbitrary tasks in process context.
 *
 * Started by Ingo Molnar, Copyright (C) 2002
 *
 * Derived from the taskqueue/keventd code by:
 *
 *   David Woodhouse <dwmw2@infradead.org>
12
 *   Andrew Morton
L
Linus Torvalds 已提交
13 14
 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
 *   Theodore Ts'o <tytso@mit.edu>
15
 *
C
Christoph Lameter 已提交
16
 * Made to use alloc_percpu by Christoph Lameter.
L
Linus Torvalds 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
30
#include <linux/hardirq.h>
31
#include <linux/mempolicy.h>
32
#include <linux/freezer.h>
33 34
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
35
#include <linux/lockdep.h>
L
Linus Torvalds 已提交
36

T
Tejun Heo 已提交
37 38 39 40 41 42 43 44 45 46
/*
 * Structure fields follow one of the following exclusion rules.
 *
 * I: Set during initialization and read-only afterwards.
 *
 * L: cwq->lock protected.  Access with cwq->lock held.
 *
 * W: workqueue_lock protected.
 */

L
Linus Torvalds 已提交
47
/*
48
 * The per-CPU workqueue (if single thread, we always use the first
T
Tejun Heo 已提交
49 50 51
 * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
 * work_struct->data are used for flags and thus cwqs need to be
 * aligned at two's power of the number of flag bits.
L
Linus Torvalds 已提交
52 53 54 55 56 57 58
 */
struct cpu_workqueue_struct {

	spinlock_t lock;

	struct list_head worklist;
	wait_queue_head_t more_work;
59
	struct work_struct *current_work;
T
Tejun Heo 已提交
60
	unsigned int		cpu;
L
Linus Torvalds 已提交
61

T
Tejun Heo 已提交
62 63
	struct workqueue_struct *wq;		/* I: the owning workqueue */
	struct task_struct	*thread;
T
Tejun Heo 已提交
64
};
L
Linus Torvalds 已提交
65 66 67 68 69 70

/*
 * The externally visible workqueue abstraction is an array of
 * per-CPU workqueues:
 */
struct workqueue_struct {
71
	unsigned int		flags;		/* I: WQ_* flags */
T
Tejun Heo 已提交
72 73 74
	struct cpu_workqueue_struct *cpu_wq;	/* I: cwq's */
	struct list_head	list;		/* W: list of all workqueues */
	const char		*name;		/* I: workqueue name */
75
#ifdef CONFIG_LOCKDEP
T
Tejun Heo 已提交
76
	struct lockdep_map	lockdep_map;
77
#endif
L
Linus Torvalds 已提交
78 79
};

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
#ifdef CONFIG_DEBUG_OBJECTS_WORK

static struct debug_obj_descr work_debug_descr;

/*
 * fixup_init is called when:
 * - an active object is initialized
 */
static int work_fixup_init(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_init(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

/*
 * fixup_activate is called when:
 * - an active object is activated
 * - an unknown object is activated (might be a statically initialized object)
 */
static int work_fixup_activate(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {

	case ODEBUG_STATE_NOTAVAILABLE:
		/*
		 * This is not really a fixup. The work struct was
		 * statically initialized. We just make sure that it
		 * is tracked in the object tracker.
		 */
119
		if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
			debug_object_init(work, &work_debug_descr);
			debug_object_activate(work, &work_debug_descr);
			return 0;
		}
		WARN_ON_ONCE(1);
		return 0;

	case ODEBUG_STATE_ACTIVE:
		WARN_ON(1);

	default:
		return 0;
	}
}

/*
 * fixup_free is called when:
 * - an active object is freed
 */
static int work_fixup_free(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_free(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

static struct debug_obj_descr work_debug_descr = {
	.name		= "work_struct",
	.fixup_init	= work_fixup_init,
	.fixup_activate	= work_fixup_activate,
	.fixup_free	= work_fixup_free,
};

static inline void debug_work_activate(struct work_struct *work)
{
	debug_object_activate(work, &work_debug_descr);
}

static inline void debug_work_deactivate(struct work_struct *work)
{
	debug_object_deactivate(work, &work_debug_descr);
}

void __init_work(struct work_struct *work, int onstack)
{
	if (onstack)
		debug_object_init_on_stack(work, &work_debug_descr);
	else
		debug_object_init(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(__init_work);

void destroy_work_on_stack(struct work_struct *work)
{
	debug_object_free(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(destroy_work_on_stack);

#else
static inline void debug_work_activate(struct work_struct *work) { }
static inline void debug_work_deactivate(struct work_struct *work) { }
#endif

190 191
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
L
Linus Torvalds 已提交
192 193
static LIST_HEAD(workqueues);

194
static int singlethread_cpu __read_mostly;
L
Linus Torvalds 已提交
195

T
Tejun Heo 已提交
196 197
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
					    struct workqueue_struct *wq)
198
{
T
Tejun Heo 已提交
199
	return per_cpu_ptr(wq->cpu_wq, cpu);
200 201
}

T
Tejun Heo 已提交
202 203
static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
					       struct workqueue_struct *wq)
204
{
T
Tejun Heo 已提交
205
	if (unlikely(wq->flags & WQ_SINGLE_THREAD))
206
		cpu = singlethread_cpu;
T
Tejun Heo 已提交
207
	return get_cwq(cpu, wq);
208 209
}

210 211 212 213
/*
 * Set the workqueue on which a work item is to be run
 * - Must *only* be called if the pending flag is set
 */
214
static inline void set_wq_data(struct work_struct *work,
T
Tejun Heo 已提交
215 216
			       struct cpu_workqueue_struct *cwq,
			       unsigned long extra_flags)
217
{
218
	BUG_ON(!work_pending(work));
219

T
Tejun Heo 已提交
220
	atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
221
			WORK_STRUCT_PENDING | extra_flags);
222 223
}

224 225 226 227 228
/*
 * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
 */
static inline void clear_wq_data(struct work_struct *work)
{
T
Tejun Heo 已提交
229
	atomic_long_set(&work->data, work_static(work));
230 231
}

232
static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
233
{
234 235
	return (void *)(atomic_long_read(&work->data) &
			WORK_STRUCT_WQ_DATA_MASK);
236 237
}

T
Tejun Heo 已提交
238 239 240 241 242 243 244 245 246 247 248 249
/**
 * insert_work - insert a work into cwq
 * @cwq: cwq @work belongs to
 * @work: work to insert
 * @head: insertion point
 * @extra_flags: extra WORK_STRUCT_* flags to set
 *
 * Insert @work into @cwq after @head.
 *
 * CONTEXT:
 * spin_lock_irq(cwq->lock).
 */
O
Oleg Nesterov 已提交
250
static void insert_work(struct cpu_workqueue_struct *cwq,
T
Tejun Heo 已提交
251 252
			struct work_struct *work, struct list_head *head,
			unsigned int extra_flags)
O
Oleg Nesterov 已提交
253
{
T
Tejun Heo 已提交
254 255 256
	/* we own @work, set data and link */
	set_wq_data(work, cwq, extra_flags);

257 258 259 260 261
	/*
	 * Ensure that we get the right work->data if we see the
	 * result of list_add() below, see try_to_grab_pending().
	 */
	smp_wmb();
T
Tejun Heo 已提交
262

263
	list_add_tail(&work->entry, head);
O
Oleg Nesterov 已提交
264 265 266
	wake_up(&cwq->more_work);
}

T
Tejun Heo 已提交
267
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
L
Linus Torvalds 已提交
268 269
			 struct work_struct *work)
{
T
Tejun Heo 已提交
270
	struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
L
Linus Torvalds 已提交
271 272
	unsigned long flags;

273
	debug_work_activate(work);
L
Linus Torvalds 已提交
274
	spin_lock_irqsave(&cwq->lock, flags);
T
Tejun Heo 已提交
275 276
	BUG_ON(!list_empty(&work->entry));
	insert_work(cwq, work, &cwq->worklist, 0);
L
Linus Torvalds 已提交
277 278 279
	spin_unlock_irqrestore(&cwq->lock, flags);
}

280 281 282 283 284
/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
A
Alan Stern 已提交
285
 * Returns 0 if @work was already on a queue, non-zero otherwise.
L
Linus Torvalds 已提交
286
 *
287 288
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
L
Linus Torvalds 已提交
289
 */
290
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
L
Linus Torvalds 已提交
291
{
292 293 294 295 296
	int ret;

	ret = queue_work_on(get_cpu(), wq, work);
	put_cpu();

L
Linus Torvalds 已提交
297 298
	return ret;
}
299
EXPORT_SYMBOL_GPL(queue_work);
L
Linus Torvalds 已提交
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns 0 if @work was already on a queue, non-zero otherwise.
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 */
int
queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
	int ret = 0;

317
	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
T
Tejun Heo 已提交
318
		__queue_work(cpu, wq, work);
319 320 321 322 323 324
		ret = 1;
	}
	return ret;
}
EXPORT_SYMBOL_GPL(queue_work_on);

325
static void delayed_work_timer_fn(unsigned long __data)
L
Linus Torvalds 已提交
326
{
327
	struct delayed_work *dwork = (struct delayed_work *)__data;
328
	struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
L
Linus Torvalds 已提交
329

T
Tejun Heo 已提交
330
	__queue_work(smp_processor_id(), cwq->wq, &dwork->work);
L
Linus Torvalds 已提交
331 332
}

333 334 335
/**
 * queue_delayed_work - queue work on a workqueue after delay
 * @wq: workqueue to use
336
 * @dwork: delayable work to queue
337 338
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
339
 * Returns 0 if @work was already on a queue, non-zero otherwise.
340
 */
341
int queue_delayed_work(struct workqueue_struct *wq,
342
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
343
{
344
	if (delay == 0)
345
		return queue_work(wq, &dwork->work);
L
Linus Torvalds 已提交
346

347
	return queue_delayed_work_on(-1, wq, dwork, delay);
L
Linus Torvalds 已提交
348
}
349
EXPORT_SYMBOL_GPL(queue_delayed_work);
L
Linus Torvalds 已提交
350

351 352 353 354
/**
 * queue_delayed_work_on - queue work on specific CPU after delay
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
355
 * @dwork: work to queue
356 357
 * @delay: number of jiffies to wait before queueing
 *
A
Alan Stern 已提交
358
 * Returns 0 if @work was already on a queue, non-zero otherwise.
359
 */
360
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
361
			struct delayed_work *dwork, unsigned long delay)
362 363
{
	int ret = 0;
364 365
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;
366

367
	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
368 369 370
		BUG_ON(timer_pending(timer));
		BUG_ON(!list_empty(&work->entry));

371 372
		timer_stats_timer_set_start_info(&dwork->timer);

373
		/* This stores cwq for the moment, for the timer_fn */
T
Tejun Heo 已提交
374
		set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
375
		timer->expires = jiffies + delay;
376
		timer->data = (unsigned long)dwork;
377
		timer->function = delayed_work_timer_fn;
378 379 380 381 382

		if (unlikely(cpu >= 0))
			add_timer_on(timer, cpu);
		else
			add_timer(timer);
383 384 385 386
		ret = 1;
	}
	return ret;
}
387
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
L
Linus Torvalds 已提交
388

389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
/**
 * process_one_work - process single work
 * @cwq: cwq to process work for
 * @work: work to process
 *
 * Process @work.  This function contains all the logics necessary to
 * process a single work including synchronization against and
 * interaction with other workers on the same cpu, queueing and
 * flushing.  As long as context requirement is met, any worker can
 * call this function to process a work.
 *
 * CONTEXT:
 * spin_lock_irq(cwq->lock) which is released and regrabbed.
 */
static void process_one_work(struct cpu_workqueue_struct *cwq,
			     struct work_struct *work)
{
	work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
	/*
	 * It is permissible to free the struct work_struct from
	 * inside the function that is called from it, this we need to
	 * take into account for lockdep too.  To avoid bogus "held
	 * lock freed" warnings as well as problems when looking into
	 * work->lockdep_map, make a copy and use that here.
	 */
	struct lockdep_map lockdep_map = work->lockdep_map;
#endif
	/* claim and process */
	debug_work_deactivate(work);
	cwq->current_work = work;
	list_del_init(&work->entry);

	spin_unlock_irq(&cwq->lock);

	BUG_ON(get_wq_data(work) != cwq);
	work_clear_pending(work);
	lock_map_acquire(&cwq->wq->lockdep_map);
	lock_map_acquire(&lockdep_map);
	f(work);
	lock_map_release(&lockdep_map);
	lock_map_release(&cwq->wq->lockdep_map);

	if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
		printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
		       "%s/0x%08x/%d\n",
		       current->comm, preempt_count(), task_pid_nr(current));
		printk(KERN_ERR "    last function: ");
		print_symbol("%s\n", (unsigned long)f);
		debug_show_held_locks(current);
		dump_stack();
	}

	spin_lock_irq(&cwq->lock);

	/* we're done with it, release */
	cwq->current_work = NULL;
}

448
static void run_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
449
{
450
	spin_lock_irq(&cwq->lock);
L
Linus Torvalds 已提交
451 452 453
	while (!list_empty(&cwq->worklist)) {
		struct work_struct *work = list_entry(cwq->worklist.next,
						struct work_struct, entry);
454
		process_one_work(cwq, work);
L
Linus Torvalds 已提交
455
	}
456
	spin_unlock_irq(&cwq->lock);
L
Linus Torvalds 已提交
457 458
}

T
Tejun Heo 已提交
459 460 461 462 463 464
/**
 * worker_thread - the worker thread function
 * @__cwq: cwq to serve
 *
 * The cwq worker thread function.
 */
L
Linus Torvalds 已提交
465 466 467
static int worker_thread(void *__cwq)
{
	struct cpu_workqueue_struct *cwq = __cwq;
468
	DEFINE_WAIT(wait);
L
Linus Torvalds 已提交
469

470
	if (cwq->wq->flags & WQ_FREEZEABLE)
471
		set_freezable();
L
Linus Torvalds 已提交
472

473 474
	for (;;) {
		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
475 476 477
		if (!freezing(current) &&
		    !kthread_should_stop() &&
		    list_empty(&cwq->worklist))
L
Linus Torvalds 已提交
478
			schedule();
479 480
		finish_wait(&cwq->more_work, &wait);

481 482
		try_to_freeze();

483
		if (kthread_should_stop())
484
			break;
L
Linus Torvalds 已提交
485

T
Tejun Heo 已提交
486 487 488 489
		if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
					    get_cpu_mask(cwq->cpu))))
			set_cpus_allowed_ptr(cwq->thread,
					     get_cpu_mask(cwq->cpu));
490
		run_workqueue(cwq);
L
Linus Torvalds 已提交
491
	}
492

L
Linus Torvalds 已提交
493 494 495
	return 0;
}

O
Oleg Nesterov 已提交
496 497 498 499 500 501 502 503 504 505 506
struct wq_barrier {
	struct work_struct	work;
	struct completion	done;
};

static void wq_barrier_func(struct work_struct *work)
{
	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
	complete(&barr->done);
}

T
Tejun Heo 已提交
507 508 509 510 511 512 513 514 515 516 517
/**
 * insert_wq_barrier - insert a barrier work
 * @cwq: cwq to insert barrier into
 * @barr: wq_barrier to insert
 * @head: insertion point
 *
 * Insert barrier @barr into @cwq before @head.
 *
 * CONTEXT:
 * spin_lock_irq(cwq->lock).
 */
518
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
519
			struct wq_barrier *barr, struct list_head *head)
O
Oleg Nesterov 已提交
520
{
521 522 523 524 525 526 527
	/*
	 * debugobject calls are safe here even with cwq->lock locked
	 * as we know for sure that this will not trigger any of the
	 * checks and call back into the fixup functions where we
	 * might deadlock.
	 */
	INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
528
	__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
O
Oleg Nesterov 已提交
529
	init_completion(&barr->done);
530

531
	debug_work_activate(&barr->work);
T
Tejun Heo 已提交
532
	insert_work(cwq, &barr->work, head, 0);
O
Oleg Nesterov 已提交
533 534
}

535
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
L
Linus Torvalds 已提交
536
{
537 538
	int active = 0;
	struct wq_barrier barr;
L
Linus Torvalds 已提交
539

540
	WARN_ON(cwq->thread == current);
L
Linus Torvalds 已提交
541

542 543 544 545
	spin_lock_irq(&cwq->lock);
	if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
		insert_wq_barrier(cwq, &barr, &cwq->worklist);
		active = 1;
L
Linus Torvalds 已提交
546
	}
547 548
	spin_unlock_irq(&cwq->lock);

549
	if (active) {
550
		wait_for_completion(&barr.done);
551 552
		destroy_work_on_stack(&barr.work);
	}
553 554

	return active;
L
Linus Torvalds 已提交
555 556
}

557
/**
L
Linus Torvalds 已提交
558
 * flush_workqueue - ensure that any scheduled work has run to completion.
559
 * @wq: workqueue to flush
L
Linus Torvalds 已提交
560 561 562 563
 *
 * Forces execution of the workqueue and blocks until its completion.
 * This is typically used in driver shutdown handlers.
 *
O
Oleg Nesterov 已提交
564 565
 * We sleep until all works which were queued on entry have been handled,
 * but we are not livelocked by new incoming ones.
L
Linus Torvalds 已提交
566
 */
567
void flush_workqueue(struct workqueue_struct *wq)
L
Linus Torvalds 已提交
568
{
569
	int cpu;
L
Linus Torvalds 已提交
570

571
	might_sleep();
572 573
	lock_map_acquire(&wq->lockdep_map);
	lock_map_release(&wq->lockdep_map);
T
Tejun Heo 已提交
574 575
	for_each_possible_cpu(cpu)
		flush_cpu_workqueue(get_cwq(cpu, wq));
L
Linus Torvalds 已提交
576
}
577
EXPORT_SYMBOL_GPL(flush_workqueue);
L
Linus Torvalds 已提交
578

579 580 581 582
/**
 * flush_work - block until a work_struct's callback has terminated
 * @work: the work which is to be flushed
 *
583 584
 * Returns false if @work has already terminated.
 *
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
 * It is expected that, prior to calling flush_work(), the caller has
 * arranged for the work to not be requeued, otherwise it doesn't make
 * sense to use this function.
 */
int flush_work(struct work_struct *work)
{
	struct cpu_workqueue_struct *cwq;
	struct list_head *prev;
	struct wq_barrier barr;

	might_sleep();
	cwq = get_wq_data(work);
	if (!cwq)
		return 0;

600 601
	lock_map_acquire(&cwq->wq->lockdep_map);
	lock_map_release(&cwq->wq->lockdep_map);
602

603 604 605 606 607 608 609 610
	spin_lock_irq(&cwq->lock);
	if (!list_empty(&work->entry)) {
		/*
		 * See the comment near try_to_grab_pending()->smp_rmb().
		 * If it was re-queued under us we are not going to wait.
		 */
		smp_rmb();
		if (unlikely(cwq != get_wq_data(work)))
T
Tejun Heo 已提交
611
			goto already_gone;
612 613 614
		prev = &work->entry;
	} else {
		if (cwq->current_work != work)
T
Tejun Heo 已提交
615
			goto already_gone;
616 617 618 619
		prev = &cwq->worklist;
	}
	insert_wq_barrier(cwq, &barr, prev->next);

T
Tejun Heo 已提交
620
	spin_unlock_irq(&cwq->lock);
621
	wait_for_completion(&barr.done);
622
	destroy_work_on_stack(&barr.work);
623
	return 1;
T
Tejun Heo 已提交
624 625 626
already_gone:
	spin_unlock_irq(&cwq->lock);
	return 0;
627 628 629
}
EXPORT_SYMBOL_GPL(flush_work);

630
/*
631
 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
632 633 634 635 636
 * so this work can't be re-armed in any way.
 */
static int try_to_grab_pending(struct work_struct *work)
{
	struct cpu_workqueue_struct *cwq;
637
	int ret = -1;
638

639
	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
640
		return 0;
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

	/*
	 * The queueing is in progress, or it is already queued. Try to
	 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
	 */

	cwq = get_wq_data(work);
	if (!cwq)
		return ret;

	spin_lock_irq(&cwq->lock);
	if (!list_empty(&work->entry)) {
		/*
		 * This work is queued, but perhaps we locked the wrong cwq.
		 * In that case we must see the new value after rmb(), see
		 * insert_work()->wmb().
		 */
		smp_rmb();
		if (cwq == get_wq_data(work)) {
660
			debug_work_deactivate(work);
661 662 663 664 665 666 667 668 669 670
			list_del_init(&work->entry);
			ret = 1;
		}
	}
	spin_unlock_irq(&cwq->lock);

	return ret;
}

static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
O
Oleg Nesterov 已提交
671 672 673 674 675 676 677
				struct work_struct *work)
{
	struct wq_barrier barr;
	int running = 0;

	spin_lock_irq(&cwq->lock);
	if (unlikely(cwq->current_work == work)) {
678
		insert_wq_barrier(cwq, &barr, cwq->worklist.next);
O
Oleg Nesterov 已提交
679 680 681 682
		running = 1;
	}
	spin_unlock_irq(&cwq->lock);

683
	if (unlikely(running)) {
O
Oleg Nesterov 已提交
684
		wait_for_completion(&barr.done);
685 686
		destroy_work_on_stack(&barr.work);
	}
O
Oleg Nesterov 已提交
687 688
}

689
static void wait_on_work(struct work_struct *work)
O
Oleg Nesterov 已提交
690 691
{
	struct cpu_workqueue_struct *cwq;
692
	struct workqueue_struct *wq;
693
	int cpu;
O
Oleg Nesterov 已提交
694

695 696
	might_sleep();

697 698
	lock_map_acquire(&work->lockdep_map);
	lock_map_release(&work->lockdep_map);
699

O
Oleg Nesterov 已提交
700 701
	cwq = get_wq_data(work);
	if (!cwq)
702
		return;
O
Oleg Nesterov 已提交
703

704 705
	wq = cwq->wq;

T
Tejun Heo 已提交
706
	for_each_possible_cpu(cpu)
T
Tejun Heo 已提交
707
		wait_on_cpu_work(get_cwq(cpu, wq), work);
708 709
}

710 711 712 713 714 715 716 717 718 719 720 721
static int __cancel_work_timer(struct work_struct *work,
				struct timer_list* timer)
{
	int ret;

	do {
		ret = (timer && likely(del_timer(timer)));
		if (!ret)
			ret = try_to_grab_pending(work);
		wait_on_work(work);
	} while (unlikely(ret < 0));

722
	clear_wq_data(work);
723 724 725
	return ret;
}

726 727 728 729
/**
 * cancel_work_sync - block until a work_struct's callback has terminated
 * @work: the work which is to be flushed
 *
730 731
 * Returns true if @work was pending.
 *
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
 * cancel_work_sync() will cancel the work if it is queued. If the work's
 * callback appears to be running, cancel_work_sync() will block until it
 * has completed.
 *
 * It is possible to use this function if the work re-queues itself. It can
 * cancel the work even if it migrates to another workqueue, however in that
 * case it only guarantees that work->func() has completed on the last queued
 * workqueue.
 *
 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
 * pending, otherwise it goes into a busy-wait loop until the timer expires.
 *
 * The caller must ensure that workqueue_struct on which this work was last
 * queued can't be destroyed before this function returns.
 */
747
int cancel_work_sync(struct work_struct *work)
748
{
749
	return __cancel_work_timer(work, NULL);
O
Oleg Nesterov 已提交
750
}
751
EXPORT_SYMBOL_GPL(cancel_work_sync);
O
Oleg Nesterov 已提交
752

753
/**
754
 * cancel_delayed_work_sync - reliably kill off a delayed work.
755 756
 * @dwork: the delayed work struct
 *
757 758
 * Returns true if @dwork was pending.
 *
759 760 761
 * It is possible to use this function if @dwork rearms itself via queue_work()
 * or queue_delayed_work(). See also the comment for cancel_work_sync().
 */
762
int cancel_delayed_work_sync(struct delayed_work *dwork)
763
{
764
	return __cancel_work_timer(&dwork->work, &dwork->timer);
765
}
766
EXPORT_SYMBOL(cancel_delayed_work_sync);
L
Linus Torvalds 已提交
767

768
static struct workqueue_struct *keventd_wq __read_mostly;
L
Linus Torvalds 已提交
769

770 771 772 773
/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
774 775 776 777 778 779
 * Returns zero if @work was already on the kernel-global workqueue and
 * non-zero otherwise.
 *
 * This puts a job in the kernel-global workqueue if it was not already
 * queued and leaves it in the same position on the kernel-global
 * workqueue otherwise.
780
 */
781
int schedule_work(struct work_struct *work)
L
Linus Torvalds 已提交
782 783 784
{
	return queue_work(keventd_wq, work);
}
785
EXPORT_SYMBOL(schedule_work);
L
Linus Torvalds 已提交
786

787 788 789 790 791 792 793 794 795 796 797 798 799
/*
 * schedule_work_on - put work task on a specific cpu
 * @cpu: cpu to put the work task on
 * @work: job to be done
 *
 * This puts a job on a specific cpu
 */
int schedule_work_on(int cpu, struct work_struct *work)
{
	return queue_work_on(cpu, keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work_on);

800 801
/**
 * schedule_delayed_work - put work task in global workqueue after delay
802 803
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
804 805 806 807
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
808
int schedule_delayed_work(struct delayed_work *dwork,
809
					unsigned long delay)
L
Linus Torvalds 已提交
810
{
811
	return queue_delayed_work(keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
812
}
813
EXPORT_SYMBOL(schedule_delayed_work);
L
Linus Torvalds 已提交
814

815 816 817 818 819 820 821 822 823
/**
 * flush_delayed_work - block until a dwork_struct's callback has terminated
 * @dwork: the delayed work which is to be flushed
 *
 * Any timeout is cancelled, and any pending work is run immediately.
 */
void flush_delayed_work(struct delayed_work *dwork)
{
	if (del_timer_sync(&dwork->timer)) {
T
Tejun Heo 已提交
824 825
		__queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
			     &dwork->work);
826 827 828 829 830 831
		put_cpu();
	}
	flush_work(&dwork->work);
}
EXPORT_SYMBOL(flush_delayed_work);

832 833 834
/**
 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
 * @cpu: cpu to use
835
 * @dwork: job to be done
836 837 838 839 840
 * @delay: number of jiffies to wait
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue on the specified CPU.
 */
L
Linus Torvalds 已提交
841
int schedule_delayed_work_on(int cpu,
842
			struct delayed_work *dwork, unsigned long delay)
L
Linus Torvalds 已提交
843
{
844
	return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
L
Linus Torvalds 已提交
845
}
846
EXPORT_SYMBOL(schedule_delayed_work_on);
L
Linus Torvalds 已提交
847

848 849 850 851 852 853 854 855 856
/**
 * schedule_on_each_cpu - call a function on each online CPU from keventd
 * @func: the function to call
 *
 * Returns zero on success.
 * Returns -ve errno on failure.
 *
 * schedule_on_each_cpu() is very slow.
 */
857
int schedule_on_each_cpu(work_func_t func)
858 859
{
	int cpu;
860
	int orig = -1;
861
	struct work_struct *works;
862

863 864
	works = alloc_percpu(struct work_struct);
	if (!works)
865
		return -ENOMEM;
866

867 868
	get_online_cpus();

869
	/*
870 871 872
	 * When running in keventd don't schedule a work item on
	 * itself.  Can just call directly because the work queue is
	 * already bound.  This also is faster.
873
	 */
874
	if (current_is_keventd())
875 876
		orig = raw_smp_processor_id();

877
	for_each_online_cpu(cpu) {
878 879 880
		struct work_struct *work = per_cpu_ptr(works, cpu);

		INIT_WORK(work, func);
881
		if (cpu != orig)
882
			schedule_work_on(cpu, work);
883
	}
884 885 886 887 888 889
	if (orig >= 0)
		func(per_cpu_ptr(works, orig));

	for_each_online_cpu(cpu)
		flush_work(per_cpu_ptr(works, cpu));

890
	put_online_cpus();
891
	free_percpu(works);
892 893 894
	return 0;
}

895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
/**
 * flush_scheduled_work - ensure that any scheduled work has run to completion.
 *
 * Forces execution of the kernel-global workqueue and blocks until its
 * completion.
 *
 * Think twice before calling this function!  It's very easy to get into
 * trouble if you don't take great care.  Either of the following situations
 * will lead to deadlock:
 *
 *	One of the work items currently on the workqueue needs to acquire
 *	a lock held by your code or its caller.
 *
 *	Your code is running in the context of a work routine.
 *
 * They will be detected by lockdep when they occur, but the first might not
 * occur very often.  It depends on what work items are on the workqueue and
 * what locks they need, which you have no control over.
 *
 * In most situations flushing the entire workqueue is overkill; you merely
 * need to know that a particular work item isn't queued and isn't running.
 * In such cases you should use cancel_delayed_work_sync() or
 * cancel_work_sync() instead.
 */
L
Linus Torvalds 已提交
919 920 921 922
void flush_scheduled_work(void)
{
	flush_workqueue(keventd_wq);
}
923
EXPORT_SYMBOL(flush_scheduled_work);
L
Linus Torvalds 已提交
924

925 926 927 928 929 930 931 932 933 934 935 936
/**
 * execute_in_process_context - reliably execute the routine with user context
 * @fn:		the function to execute
 * @ew:		guaranteed storage for the execute work structure (must
 *		be available when the work executes)
 *
 * Executes the function immediately if process context is available,
 * otherwise schedules the function for delayed execution.
 *
 * Returns:	0 - function was executed
 *		1 - function was scheduled for execution
 */
937
int execute_in_process_context(work_func_t fn, struct execute_work *ew)
938 939
{
	if (!in_interrupt()) {
940
		fn(&ew->work);
941 942 943
		return 0;
	}

944
	INIT_WORK(&ew->work, fn);
945 946 947 948 949 950
	schedule_work(&ew->work);

	return 1;
}
EXPORT_SYMBOL_GPL(execute_in_process_context);

L
Linus Torvalds 已提交
951 952 953 954 955 956 957 958
int keventd_up(void)
{
	return keventd_wq != NULL;
}

int current_is_keventd(void)
{
	struct cpu_workqueue_struct *cwq;
H
Hugh Dickins 已提交
959
	int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
L
Linus Torvalds 已提交
960 961 962 963
	int ret = 0;

	BUG_ON(!keventd_wq);

T
Tejun Heo 已提交
964
	cwq = get_cwq(cpu, keventd_wq);
L
Linus Torvalds 已提交
965 966 967 968 969 970 971
	if (current == cwq->thread)
		ret = 1;

	return ret;

}

T
Tejun Heo 已提交
972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
static struct cpu_workqueue_struct *alloc_cwqs(void)
{
	/*
	 * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
	 * Make sure that the alignment isn't lower than that of
	 * unsigned long long.
	 */
	const size_t size = sizeof(struct cpu_workqueue_struct);
	const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
				   __alignof__(unsigned long long));
	struct cpu_workqueue_struct *cwqs;
#ifndef CONFIG_SMP
	void *ptr;

	/*
	 * On UP, percpu allocator doesn't honor alignment parameter
	 * and simply uses arch-dependent default.  Allocate enough
	 * room to align cwq and put an extra pointer at the end
	 * pointing back to the originally allocated pointer which
	 * will be used for free.
	 *
	 * FIXME: This really belongs to UP percpu code.  Update UP
	 * percpu code to honor alignment and remove this ugliness.
	 */
	ptr = __alloc_percpu(size + align + sizeof(void *), 1);
	cwqs = PTR_ALIGN(ptr, align);
	*(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
#else
	/* On SMP, percpu allocator can do it itself */
	cwqs = __alloc_percpu(size, align);
#endif
	/* just in case, make sure it's actually aligned */
	BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
	return cwqs;
}

static void free_cwqs(struct cpu_workqueue_struct *cwqs)
{
#ifndef CONFIG_SMP
	/* on UP, the pointer to free is stored right after the cwq */
	if (cwqs)
		free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
#else
	free_percpu(cwqs);
#endif
}

1019 1020 1021 1022 1023
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct workqueue_struct *wq = cwq->wq;
	struct task_struct *p;

T
Tejun Heo 已提交
1024
	p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
	/*
	 * Nobody can add the work_struct to this cwq,
	 *	if (caller is __create_workqueue)
	 *		nobody should see this wq
	 *	else // caller is CPU_UP_PREPARE
	 *		cpu is not on cpu_online_map
	 * so we can abort safely.
	 */
	if (IS_ERR(p))
		return PTR_ERR(p);
	cwq->thread = p;

	return 0;
}

1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
	struct task_struct *p = cwq->thread;

	if (p != NULL) {
		if (cpu >= 0)
			kthread_bind(p, cpu);
		wake_up_process(p);
	}
}

1051
struct workqueue_struct *__create_workqueue_key(const char *name,
1052
						unsigned int flags,
1053 1054
						struct lock_class_key *key,
						const char *lock_name)
L
Linus Torvalds 已提交
1055
{
T
Tejun Heo 已提交
1056
	bool singlethread = flags & WQ_SINGLE_THREAD;
L
Linus Torvalds 已提交
1057
	struct workqueue_struct *wq;
1058
	int err = 0, cpu;
L
Linus Torvalds 已提交
1059

1060 1061
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
T
Tejun Heo 已提交
1062
		goto err;
1063

T
Tejun Heo 已提交
1064
	wq->cpu_wq = alloc_cwqs();
T
Tejun Heo 已提交
1065 1066
	if (!wq->cpu_wq)
		goto err;
1067

1068
	wq->flags = flags;
1069
	wq->name = name;
1070
	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1071
	INIT_LIST_HEAD(&wq->list);
1072

T
Tejun Heo 已提交
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
	cpu_maps_update_begin();
	/*
	 * We must initialize cwqs for each possible cpu even if we
	 * are going to call destroy_workqueue() finally. Otherwise
	 * cpu_up() can hit the uninitialized cwq once we drop the
	 * lock.
	 */
	for_each_possible_cpu(cpu) {
		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

T
Tejun Heo 已提交
1083
		BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
T
Tejun Heo 已提交
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
		cwq->wq = wq;
		cwq->cpu = cpu;
		spin_lock_init(&cwq->lock);
		INIT_LIST_HEAD(&cwq->worklist);
		init_waitqueue_head(&cwq->more_work);

		if (err)
			continue;
		err = create_workqueue_thread(cwq, cpu);
		if (cpu_online(cpu) && !singlethread)
1094
			start_workqueue_thread(cwq, cpu);
T
Tejun Heo 已提交
1095 1096
		else
			start_workqueue_thread(cwq, -1);
1097 1098
	}

T
Tejun Heo 已提交
1099 1100 1101 1102 1103 1104
	spin_lock(&workqueue_lock);
	list_add(&wq->list, &workqueues);
	spin_unlock(&workqueue_lock);

	cpu_maps_update_done();

1105 1106 1107 1108 1109
	if (err) {
		destroy_workqueue(wq);
		wq = NULL;
	}
	return wq;
T
Tejun Heo 已提交
1110 1111
err:
	if (wq) {
T
Tejun Heo 已提交
1112
		free_cwqs(wq->cpu_wq);
T
Tejun Heo 已提交
1113 1114 1115
		kfree(wq);
	}
	return NULL;
1116
}
1117
EXPORT_SYMBOL_GPL(__create_workqueue_key);
L
Linus Torvalds 已提交
1118

1119
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1120
{
1121
	/*
1122 1123
	 * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
	 * cpu_add_remove_lock protects cwq->thread.
1124 1125 1126
	 */
	if (cwq->thread == NULL)
		return;
1127

1128 1129
	lock_map_acquire(&cwq->wq->lockdep_map);
	lock_map_release(&cwq->wq->lockdep_map);
1130

O
Oleg Nesterov 已提交
1131
	flush_cpu_workqueue(cwq);
1132
	/*
1133
	 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
O
Oleg Nesterov 已提交
1134 1135 1136
	 * a concurrent flush_workqueue() can insert a barrier after us.
	 * However, in that case run_workqueue() won't return and check
	 * kthread_should_stop() until it flushes all work_struct's.
1137 1138 1139 1140 1141 1142 1143
	 * When ->worklist becomes empty it is safe to exit because no
	 * more work_structs can be queued on this cwq: flush_workqueue
	 * checks list_empty(), and a "normal" queue_work() can't use
	 * a dead CPU.
	 */
	kthread_stop(cwq->thread);
	cwq->thread = NULL;
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
}

/**
 * destroy_workqueue - safely terminate a workqueue
 * @wq: target workqueue
 *
 * Safely destroy a workqueue. All work currently pending will be done first.
 */
void destroy_workqueue(struct workqueue_struct *wq)
{
1154
	int cpu;
1155

1156
	cpu_maps_update_begin();
1157
	spin_lock(&workqueue_lock);
1158
	list_del(&wq->list);
1159
	spin_unlock(&workqueue_lock);
T
Tejun Heo 已提交
1160
	cpu_maps_update_done();
1161

T
Tejun Heo 已提交
1162 1163
	for_each_possible_cpu(cpu)
		cleanup_workqueue_thread(get_cwq(cpu, wq));
1164

T
Tejun Heo 已提交
1165
	free_cwqs(wq->cpu_wq);
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
	kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);

static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
						unsigned long action,
						void *hcpu)
{
	unsigned int cpu = (unsigned long)hcpu;
	struct cpu_workqueue_struct *cwq;
	struct workqueue_struct *wq;

1178 1179
	action &= ~CPU_TASKS_FROZEN;

1180
	list_for_each_entry(wq, &workqueues, list) {
T
Tejun Heo 已提交
1181 1182
		if (wq->flags & WQ_SINGLE_THREAD)
			continue;
1183

T
Tejun Heo 已提交
1184
		cwq = get_cwq(cpu, wq);
1185

T
Tejun Heo 已提交
1186
		switch (action) {
1187
		case CPU_POST_DEAD:
T
Tejun Heo 已提交
1188 1189 1190
			lock_map_acquire(&cwq->wq->lockdep_map);
			lock_map_release(&cwq->wq->lockdep_map);
			flush_cpu_workqueue(cwq);
1191 1192
			break;
		}
L
Linus Torvalds 已提交
1193 1194
	}

T
Tejun Heo 已提交
1195
	return notifier_from_errno(0);
L
Linus Torvalds 已提交
1196 1197
}

1198
#ifdef CONFIG_SMP
1199

1200
struct work_for_cpu {
1201
	struct completion completion;
1202 1203 1204 1205 1206
	long (*fn)(void *);
	void *arg;
	long ret;
};

1207
static int do_work_for_cpu(void *_wfc)
1208
{
1209
	struct work_for_cpu *wfc = _wfc;
1210
	wfc->ret = wfc->fn(wfc->arg);
1211 1212
	complete(&wfc->completion);
	return 0;
1213 1214 1215 1216 1217 1218 1219 1220
}

/**
 * work_on_cpu - run a function in user context on a particular cpu
 * @cpu: the cpu to run on
 * @fn: the function to run
 * @arg: the function arg
 *
1221 1222
 * This will return the value @fn returns.
 * It is up to the caller to ensure that the cpu doesn't go offline.
1223
 * The caller must not hold any locks which would prevent @fn from completing.
1224 1225 1226
 */
long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
{
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
	struct task_struct *sub_thread;
	struct work_for_cpu wfc = {
		.completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
		.fn = fn,
		.arg = arg,
	};

	sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
	if (IS_ERR(sub_thread))
		return PTR_ERR(sub_thread);
	kthread_bind(sub_thread, cpu);
	wake_up_process(sub_thread);
	wait_for_completion(&wfc.completion);
1240 1241 1242 1243 1244
	return wfc.ret;
}
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */

1245
void __init init_workqueues(void)
L
Linus Torvalds 已提交
1246
{
1247
	singlethread_cpu = cpumask_first(cpu_possible_mask);
L
Linus Torvalds 已提交
1248 1249 1250 1251
	hotcpu_notifier(workqueue_cpu_callback, 0);
	keventd_wq = create_workqueue("events");
	BUG_ON(!keventd_wq);
}