提交 e0dd880a 编写于 作者: L Linus Torvalds

Merge branch 'for-4.2' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq

Pull workqueue updates from Tejun Heo:
 "Most of the changes are around implementing and fixing fallouts from
  sysfs and internal interface to limit the CPUs available to all
  unbound workqueues to help isolating CPUs.  It needs more work as
  ordered workqueues can roam unrestricted but still is a significant
  improvement"

* 'for-4.2' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq:
  workqueue: fix typos in comments
  workqueue: move flush_scheduled_work() to workqueue.h
  workqueue: remove the lock from wq_sysfs_prep_attrs()
  workqueue: remove the declaration of copy_workqueue_attrs()
  workqueue: ensure attrs changes are properly synchronized
  workqueue: separate out and refactor the locking of applying attrs
  workqueue: simplify wq_update_unbound_numa()
  workqueue: wq_pool_mutex protects the attrs-installation
  workqueue: fix a typo
  workqueue: function name in the comment differs from the real function name
  workqueue: fix trivial typo in Documentation/workqueue.txt
  workqueue: Allow modifying low level unbound workqueue cpumask
  workqueue: Create low-level unbound workqueues cpumask
  workqueue: split apply_workqueue_attrs() into 3 stages
...@@ -365,7 +365,7 @@ root 5674 0.0 0.0 0 0 ? S 12:13 0:00 [kworker/1:0] ...@@ -365,7 +365,7 @@ root 5674 0.0 0.0 0 0 ? S 12:13 0:00 [kworker/1:0]
If kworkers are going crazy (using too much cpu), there are two types If kworkers are going crazy (using too much cpu), there are two types
of possible problems: of possible problems:
1. Something beeing scheduled in rapid succession 1. Something being scheduled in rapid succession
2. A single work item that consumes lots of cpu cycles 2. A single work item that consumes lots of cpu cycles
The first one can be tracked using tracing: The first one can be tracked using tracing:
......
...@@ -424,6 +424,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask); ...@@ -424,6 +424,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask);
void free_workqueue_attrs(struct workqueue_attrs *attrs); void free_workqueue_attrs(struct workqueue_attrs *attrs);
int apply_workqueue_attrs(struct workqueue_struct *wq, int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs); const struct workqueue_attrs *attrs);
int workqueue_set_unbound_cpumask(cpumask_var_t cpumask);
extern bool queue_work_on(int cpu, struct workqueue_struct *wq, extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work); struct work_struct *work);
...@@ -434,7 +435,6 @@ extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq, ...@@ -434,7 +435,6 @@ extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
extern void flush_workqueue(struct workqueue_struct *wq); extern void flush_workqueue(struct workqueue_struct *wq);
extern void drain_workqueue(struct workqueue_struct *wq); extern void drain_workqueue(struct workqueue_struct *wq);
extern void flush_scheduled_work(void);
extern int schedule_on_each_cpu(work_func_t func); extern int schedule_on_each_cpu(work_func_t func);
...@@ -530,6 +530,35 @@ static inline bool schedule_work(struct work_struct *work) ...@@ -530,6 +530,35 @@ static inline bool schedule_work(struct work_struct *work)
return queue_work(system_wq, work); return queue_work(system_wq, work);
} }
/**
* flush_scheduled_work - ensure that any scheduled work has run to completion.
*
* Forces execution of the kernel-global workqueue and blocks until its
* completion.
*
* Think twice before calling this function! It's very easy to get into
* trouble if you don't take great care. Either of the following situations
* will lead to deadlock:
*
* One of the work items currently on the workqueue needs to acquire
* a lock held by your code or its caller.
*
* Your code is running in the context of a work routine.
*
* They will be detected by lockdep when they occur, but the first might not
* occur very often. It depends on what work items are on the workqueue and
* what locks they need, which you have no control over.
*
* In most situations flushing the entire workqueue is overkill; you merely
* need to know that a particular work item isn't queued and isn't running.
* In such cases you should use cancel_delayed_work_sync() or
* cancel_work_sync() instead.
*/
static inline void flush_scheduled_work(void)
{
flush_workqueue(system_wq);
}
/** /**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
* @cpu: cpu to use * @cpu: cpu to use
......
...@@ -127,6 +127,11 @@ enum { ...@@ -127,6 +127,11 @@ enum {
* *
* PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads. * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads.
* *
* PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads.
*
* PWR: wq_pool_mutex and wq->mutex protected for writes. Either or
* sched-RCU for reads.
*
* WQ: wq->mutex protected. * WQ: wq->mutex protected.
* *
* WR: wq->mutex protected for writes. Sched-RCU protected for reads. * WR: wq->mutex protected for writes. Sched-RCU protected for reads.
...@@ -247,8 +252,8 @@ struct workqueue_struct { ...@@ -247,8 +252,8 @@ struct workqueue_struct {
int nr_drainers; /* WQ: drain in progress */ int nr_drainers; /* WQ: drain in progress */
int saved_max_active; /* WQ: saved pwq max_active */ int saved_max_active; /* WQ: saved pwq max_active */
struct workqueue_attrs *unbound_attrs; /* WQ: only for unbound wqs */ struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */
struct pool_workqueue *dfl_pwq; /* WQ: only for unbound wqs */ struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
#ifdef CONFIG_SYSFS #ifdef CONFIG_SYSFS
struct wq_device *wq_dev; /* I: for sysfs interface */ struct wq_device *wq_dev; /* I: for sysfs interface */
...@@ -268,7 +273,7 @@ struct workqueue_struct { ...@@ -268,7 +273,7 @@ struct workqueue_struct {
/* hot fields used during command issue, aligned to cacheline */ /* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */ struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
}; };
static struct kmem_cache *pwq_cache; static struct kmem_cache *pwq_cache;
...@@ -299,6 +304,8 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */ ...@@ -299,6 +304,8 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PR: list of all workqueues */ static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */ static bool workqueue_freezing; /* PL: have wqs started freezing? */
static cpumask_var_t wq_unbound_cpumask; /* PL: low level cpumask for all unbound wqs */
/* the per-cpu worker pools */ /* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
cpu_worker_pools); cpu_worker_pools);
...@@ -330,8 +337,6 @@ struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly; ...@@ -330,8 +337,6 @@ struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq); EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
static int worker_thread(void *__worker); static int worker_thread(void *__worker);
static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq); static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
...@@ -347,6 +352,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq); ...@@ -347,6 +352,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
lockdep_is_held(&wq->mutex), \ lockdep_is_held(&wq->mutex), \
"sched RCU or wq->mutex should be held") "sched RCU or wq->mutex should be held")
#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
rcu_lockdep_assert(rcu_read_lock_sched_held() || \
lockdep_is_held(&wq->mutex) || \
lockdep_is_held(&wq_pool_mutex), \
"sched RCU, wq->mutex or wq_pool_mutex should be held")
#define for_each_cpu_worker_pool(pool, cpu) \ #define for_each_cpu_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \ for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
(pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \ (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
...@@ -551,7 +562,8 @@ static int worker_pool_assign_id(struct worker_pool *pool) ...@@ -551,7 +562,8 @@ static int worker_pool_assign_id(struct worker_pool *pool)
* @wq: the target workqueue * @wq: the target workqueue
* @node: the node ID * @node: the node ID
* *
* This must be called either with pwq_lock held or sched RCU read locked. * This must be called with any of wq_pool_mutex, wq->mutex or sched RCU
* read locked.
* If the pwq needs to be used beyond the locking in effect, the caller is * If the pwq needs to be used beyond the locking in effect, the caller is
* responsible for guaranteeing that the pwq stays online. * responsible for guaranteeing that the pwq stays online.
* *
...@@ -560,7 +572,7 @@ static int worker_pool_assign_id(struct worker_pool *pool) ...@@ -560,7 +572,7 @@ static int worker_pool_assign_id(struct worker_pool *pool)
static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq, static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
int node) int node)
{ {
assert_rcu_or_wq_mutex(wq); assert_rcu_or_wq_mutex_or_pool_mutex(wq);
return rcu_dereference_raw(wq->numa_pwq_tbl[node]); return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
} }
...@@ -976,7 +988,7 @@ static struct worker *find_worker_executing_work(struct worker_pool *pool, ...@@ -976,7 +988,7 @@ static struct worker *find_worker_executing_work(struct worker_pool *pool,
* move_linked_works - move linked works to a list * move_linked_works - move linked works to a list
* @work: start of series of works to be scheduled * @work: start of series of works to be scheduled
* @head: target list to append @work to * @head: target list to append @work to
* @nextp: out paramter for nested worklist walking * @nextp: out parameter for nested worklist walking
* *
* Schedule linked works starting from @work to @head. Work series to * Schedule linked works starting from @work to @head. Work series to
* be scheduled starts at @work and includes any consecutive work with * be scheduled starts at @work and includes any consecutive work with
...@@ -2616,7 +2628,7 @@ EXPORT_SYMBOL_GPL(flush_workqueue); ...@@ -2616,7 +2628,7 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
* Wait until the workqueue becomes empty. While draining is in progress, * Wait until the workqueue becomes empty. While draining is in progress,
* only chain queueing is allowed. IOW, only currently pending or running * only chain queueing is allowed. IOW, only currently pending or running
* work items on @wq can queue further work items on it. @wq is flushed * work items on @wq can queue further work items on it. @wq is flushed
* repeatedly until it becomes empty. The number of flushing is detemined * repeatedly until it becomes empty. The number of flushing is determined
* by the depth of chaining and should be relatively short. Whine if it * by the depth of chaining and should be relatively short. Whine if it
* takes too long. * takes too long.
*/ */
...@@ -2946,36 +2958,6 @@ int schedule_on_each_cpu(work_func_t func) ...@@ -2946,36 +2958,6 @@ int schedule_on_each_cpu(work_func_t func)
return 0; return 0;
} }
/**
* flush_scheduled_work - ensure that any scheduled work has run to completion.
*
* Forces execution of the kernel-global workqueue and blocks until its
* completion.
*
* Think twice before calling this function! It's very easy to get into
* trouble if you don't take great care. Either of the following situations
* will lead to deadlock:
*
* One of the work items currently on the workqueue needs to acquire
* a lock held by your code or its caller.
*
* Your code is running in the context of a work routine.
*
* They will be detected by lockdep when they occur, but the first might not
* occur very often. It depends on what work items are on the workqueue and
* what locks they need, which you have no control over.
*
* In most situations flushing the entire workqueue is overkill; you merely
* need to know that a particular work item isn't queued and isn't running.
* In such cases you should use cancel_delayed_work_sync() or
* cancel_work_sync() instead.
*/
void flush_scheduled_work(void)
{
flush_workqueue(system_wq);
}
EXPORT_SYMBOL(flush_scheduled_work);
/** /**
* execute_in_process_context - reliably execute the routine with user context * execute_in_process_context - reliably execute the routine with user context
* @fn: the function to execute * @fn: the function to execute
...@@ -3081,7 +3063,7 @@ static bool wqattrs_equal(const struct workqueue_attrs *a, ...@@ -3081,7 +3063,7 @@ static bool wqattrs_equal(const struct workqueue_attrs *a,
* init_worker_pool - initialize a newly zalloc'd worker_pool * init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize * @pool: worker_pool to initialize
* *
* Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs. * Initialize a newly zalloc'd @pool. It also allocates @pool->attrs.
* *
* Return: 0 on success, -errno on failure. Even on failure, all fields * Return: 0 on success, -errno on failure. Even on failure, all fields
* inside @pool proper are initialized and put_unbound_pool() can be called * inside @pool proper are initialized and put_unbound_pool() can be called
...@@ -3425,20 +3407,9 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, ...@@ -3425,20 +3407,9 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
return pwq; return pwq;
} }
/* undo alloc_unbound_pwq(), used only in the error path */
static void free_unbound_pwq(struct pool_workqueue *pwq)
{
lockdep_assert_held(&wq_pool_mutex);
if (pwq) {
put_unbound_pool(pwq->pool);
kmem_cache_free(pwq_cache, pwq);
}
}
/** /**
* wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node * wq_calc_node_cpumask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the wq_attrs of interest * @attrs: the wq_attrs of the default pwq of the target workqueue
* @node: the target NUMA node * @node: the target NUMA node
* @cpu_going_down: if >= 0, the CPU to consider as offline * @cpu_going_down: if >= 0, the CPU to consider as offline
* @cpumask: outarg, the resulting cpumask * @cpumask: outarg, the resulting cpumask
...@@ -3488,6 +3459,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, ...@@ -3488,6 +3459,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
{ {
struct pool_workqueue *old_pwq; struct pool_workqueue *old_pwq;
lockdep_assert_held(&wq_pool_mutex);
lockdep_assert_held(&wq->mutex); lockdep_assert_held(&wq->mutex);
/* link_pwq() can handle duplicate calls */ /* link_pwq() can handle duplicate calls */
...@@ -3498,46 +3470,59 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, ...@@ -3498,46 +3470,59 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
return old_pwq; return old_pwq;
} }
/** /* context to store the prepared attrs & pwqs before applying */
* apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue struct apply_wqattrs_ctx {
* @wq: the target workqueue struct workqueue_struct *wq; /* target workqueue */
* @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs() struct workqueue_attrs *attrs; /* attrs to apply */
* struct list_head list; /* queued for batching commit */
* Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA struct pool_workqueue *dfl_pwq;
* machines, this function maps a separate pwq to each NUMA node with struct pool_workqueue *pwq_tbl[];
* possibles CPUs in @attrs->cpumask so that work items are affine to the };
* NUMA node it was issued on. Older pwqs are released as in-flight work
* items finish. Note that a work item which repeatedly requeues itself /* free the resources after success or abort */
* back-to-back will stay on its current pwq. static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
* {
* Performs GFP_KERNEL allocations. if (ctx) {
* int node;
* Return: 0 on success and -errno on failure.
*/ for_each_node(node)
int apply_workqueue_attrs(struct workqueue_struct *wq, put_pwq_unlocked(ctx->pwq_tbl[node]);
const struct workqueue_attrs *attrs) put_pwq_unlocked(ctx->dfl_pwq);
free_workqueue_attrs(ctx->attrs);
kfree(ctx);
}
}
/* allocate the attrs and pwqs for later installation */
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{ {
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs; struct workqueue_attrs *new_attrs, *tmp_attrs;
struct pool_workqueue **pwq_tbl, *dfl_pwq; int node;
int node, ret;
/* only unbound workqueues can change attributes */ lockdep_assert_held(&wq_pool_mutex);
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;
/* creating multiple pwqs breaks ordering guarantee */ ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))) GFP_KERNEL);
return -EINVAL;
pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(GFP_KERNEL); new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL); tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!pwq_tbl || !new_attrs || !tmp_attrs) if (!ctx || !new_attrs || !tmp_attrs)
goto enomem; goto out_free;
/* make a copy of @attrs and sanitize it */ /*
* Calculate the attrs of the default pwq.
* If the user configured cpumask doesn't overlap with the
* wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
*/
copy_workqueue_attrs(new_attrs, attrs); copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask); cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
if (unlikely(cpumask_empty(new_attrs->cpumask)))
cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
/* /*
* We may create multiple pwqs with differing cpumasks. Make a * We may create multiple pwqs with differing cpumasks. Make a
...@@ -3546,76 +3531,130 @@ int apply_workqueue_attrs(struct workqueue_struct *wq, ...@@ -3546,76 +3531,130 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
*/ */
copy_workqueue_attrs(tmp_attrs, new_attrs); copy_workqueue_attrs(tmp_attrs, new_attrs);
/*
* CPUs should stay stable across pwq creations and installations.
* Pin CPUs, determine the target cpumask for each node and create
* pwqs accordingly.
*/
get_online_cpus();
mutex_lock(&wq_pool_mutex);
/* /*
* If something goes wrong during CPU up/down, we'll fall back to * If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create * the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately. * it even if we don't use it immediately.
*/ */
dfl_pwq = alloc_unbound_pwq(wq, new_attrs); ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
if (!dfl_pwq) if (!ctx->dfl_pwq)
goto enomem_pwq; goto out_free;
for_each_node(node) { for_each_node(node) {
if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) { if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!pwq_tbl[node]) if (!ctx->pwq_tbl[node])
goto enomem_pwq; goto out_free;
} else { } else {
dfl_pwq->refcnt++; ctx->dfl_pwq->refcnt++;
pwq_tbl[node] = dfl_pwq; ctx->pwq_tbl[node] = ctx->dfl_pwq;
} }
} }
mutex_unlock(&wq_pool_mutex); /* save the user configured attrs and sanitize it. */
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;
ctx->wq = wq;
free_workqueue_attrs(tmp_attrs);
return ctx;
out_free:
free_workqueue_attrs(tmp_attrs);
free_workqueue_attrs(new_attrs);
apply_wqattrs_cleanup(ctx);
return NULL;
}
/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
int node;
/* all pwqs have been created successfully, let's install'em */ /* all pwqs have been created successfully, let's install'em */
mutex_lock(&wq->mutex); mutex_lock(&ctx->wq->mutex);
copy_workqueue_attrs(wq->unbound_attrs, new_attrs); copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
/* save the previous pwq and install the new one */ /* save the previous pwq and install the new one */
for_each_node(node) for_each_node(node)
pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]); ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
ctx->pwq_tbl[node]);
/* @dfl_pwq might not have been used, ensure it's linked */ /* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(dfl_pwq); link_pwq(ctx->dfl_pwq);
swap(wq->dfl_pwq, dfl_pwq); swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
mutex_unlock(&wq->mutex); mutex_unlock(&ctx->wq->mutex);
}
/* put the old pwqs */ static void apply_wqattrs_lock(void)
for_each_node(node) {
put_pwq_unlocked(pwq_tbl[node]); /* CPUs should stay stable across pwq creations and installations */
put_pwq_unlocked(dfl_pwq); get_online_cpus();
mutex_lock(&wq_pool_mutex);
}
static void apply_wqattrs_unlock(void)
{
mutex_unlock(&wq_pool_mutex);
put_online_cpus(); put_online_cpus();
ret = 0; }
/* fall through */
out_free: static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
free_workqueue_attrs(tmp_attrs); const struct workqueue_attrs *attrs)
free_workqueue_attrs(new_attrs); {
kfree(pwq_tbl); struct apply_wqattrs_ctx *ctx;
int ret = -ENOMEM;
/* only unbound workqueues can change attributes */
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;
/* creating multiple pwqs breaks ordering guarantee */
if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
return -EINVAL;
ctx = apply_wqattrs_prepare(wq, attrs);
/* the ctx has been prepared successfully, let's commit it */
if (ctx) {
apply_wqattrs_commit(ctx);
ret = 0;
}
apply_wqattrs_cleanup(ctx);
return ret; return ret;
}
enomem_pwq: /**
free_unbound_pwq(dfl_pwq); * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
for_each_node(node) * @wq: the target workqueue
if (pwq_tbl && pwq_tbl[node] != dfl_pwq) * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
free_unbound_pwq(pwq_tbl[node]); *
mutex_unlock(&wq_pool_mutex); * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
put_online_cpus(); * machines, this function maps a separate pwq to each NUMA node with
enomem: * possibles CPUs in @attrs->cpumask so that work items are affine to the
ret = -ENOMEM; * NUMA node it was issued on. Older pwqs are released as in-flight work
goto out_free; * items finish. Note that a work item which repeatedly requeues itself
* back-to-back will stay on its current pwq.
*
* Performs GFP_KERNEL allocations.
*
* Return: 0 on success and -errno on failure.
*/
int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
int ret;
apply_wqattrs_lock();
ret = apply_workqueue_attrs_locked(wq, attrs);
apply_wqattrs_unlock();
return ret;
} }
/** /**
...@@ -3651,7 +3690,8 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, ...@@ -3651,7 +3690,8 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
lockdep_assert_held(&wq_pool_mutex); lockdep_assert_held(&wq_pool_mutex);
if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND)) if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND) ||
wq->unbound_attrs->no_numa)
return; return;
/* /*
...@@ -3662,48 +3702,37 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, ...@@ -3662,48 +3702,37 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
target_attrs = wq_update_unbound_numa_attrs_buf; target_attrs = wq_update_unbound_numa_attrs_buf;
cpumask = target_attrs->cpumask; cpumask = target_attrs->cpumask;
mutex_lock(&wq->mutex);
if (wq->unbound_attrs->no_numa)
goto out_unlock;
copy_workqueue_attrs(target_attrs, wq->unbound_attrs); copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
pwq = unbound_pwq_by_node(wq, node); pwq = unbound_pwq_by_node(wq, node);
/* /*
* Let's determine what needs to be done. If the target cpumask is * Let's determine what needs to be done. If the target cpumask is
* different from wq's, we need to compare it to @pwq's and create * different from the default pwq's, we need to compare it to @pwq's
* a new one if they don't match. If the target cpumask equals * and create a new one if they don't match. If the target cpumask
* wq's, the default pwq should be used. * equals the default pwq's, the default pwq should be used.
*/ */
if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) { if (wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, cpu_off, cpumask)) {
if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask)) if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
goto out_unlock; return;
} else { } else {
goto use_dfl_pwq; goto use_dfl_pwq;
} }
mutex_unlock(&wq->mutex);
/* create a new pwq */ /* create a new pwq */
pwq = alloc_unbound_pwq(wq, target_attrs); pwq = alloc_unbound_pwq(wq, target_attrs);
if (!pwq) { if (!pwq) {
pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n", pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
wq->name); wq->name);
mutex_lock(&wq->mutex);
goto use_dfl_pwq; goto use_dfl_pwq;
} }
/* /* Install the new pwq. */
* Install the new pwq. As this function is called only from CPU
* hotplug callbacks and applying a new attrs is wrapped with
* get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
* inbetween.
*/
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
old_pwq = numa_pwq_tbl_install(wq, node, pwq); old_pwq = numa_pwq_tbl_install(wq, node, pwq);
goto out_unlock; goto out_unlock;
use_dfl_pwq: use_dfl_pwq:
mutex_lock(&wq->mutex);
spin_lock_irq(&wq->dfl_pwq->pool->lock); spin_lock_irq(&wq->dfl_pwq->pool->lock);
get_pwq(wq->dfl_pwq); get_pwq(wq->dfl_pwq);
spin_unlock_irq(&wq->dfl_pwq->pool->lock); spin_unlock_irq(&wq->dfl_pwq->pool->lock);
...@@ -4385,7 +4414,7 @@ static void rebind_workers(struct worker_pool *pool) ...@@ -4385,7 +4414,7 @@ static void rebind_workers(struct worker_pool *pool)
/* /*
* Restore CPU affinity of all workers. As all idle workers should * Restore CPU affinity of all workers. As all idle workers should
* be on the run-queue of the associated CPU before any local * be on the run-queue of the associated CPU before any local
* wake-ups for concurrency management happen, restore CPU affinty * wake-ups for concurrency management happen, restore CPU affinity
* of all workers first and then clear UNBOUND. As we're called * of all workers first and then clear UNBOUND. As we're called
* from CPU_ONLINE, the following shouldn't fail. * from CPU_ONLINE, the following shouldn't fail.
*/ */
...@@ -4698,6 +4727,82 @@ void thaw_workqueues(void) ...@@ -4698,6 +4727,82 @@ void thaw_workqueues(void)
} }
#endif /* CONFIG_FREEZER */ #endif /* CONFIG_FREEZER */
static int workqueue_apply_unbound_cpumask(void)
{
LIST_HEAD(ctxs);
int ret = 0;
struct workqueue_struct *wq;
struct apply_wqattrs_ctx *ctx, *n;
lockdep_assert_held(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list) {
if (!(wq->flags & WQ_UNBOUND))
continue;
/* creating multiple pwqs breaks ordering guarantee */
if (wq->flags & __WQ_ORDERED)
continue;
ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs);
if (!ctx) {
ret = -ENOMEM;
break;
}
list_add_tail(&ctx->list, &ctxs);
}
list_for_each_entry_safe(ctx, n, &ctxs, list) {
if (!ret)
apply_wqattrs_commit(ctx);
apply_wqattrs_cleanup(ctx);
}
return ret;
}
/**
* workqueue_set_unbound_cpumask - Set the low-level unbound cpumask
* @cpumask: the cpumask to set
*
* The low-level workqueues cpumask is a global cpumask that limits
* the affinity of all unbound workqueues. This function check the @cpumask
* and apply it to all unbound workqueues and updates all pwqs of them.
*
* Retun: 0 - Success
* -EINVAL - Invalid @cpumask
* -ENOMEM - Failed to allocate memory for attrs or pwqs.
*/
int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
{
int ret = -EINVAL;
cpumask_var_t saved_cpumask;
if (!zalloc_cpumask_var(&saved_cpumask, GFP_KERNEL))
return -ENOMEM;
cpumask_and(cpumask, cpumask, cpu_possible_mask);
if (!cpumask_empty(cpumask)) {
apply_wqattrs_lock();
/* save the old wq_unbound_cpumask. */
cpumask_copy(saved_cpumask, wq_unbound_cpumask);
/* update wq_unbound_cpumask at first and apply it to wqs. */
cpumask_copy(wq_unbound_cpumask, cpumask);
ret = workqueue_apply_unbound_cpumask();
/* restore the wq_unbound_cpumask when failed. */
if (ret < 0)
cpumask_copy(wq_unbound_cpumask, saved_cpumask);
apply_wqattrs_unlock();
}
free_cpumask_var(saved_cpumask);
return ret;
}
#ifdef CONFIG_SYSFS #ifdef CONFIG_SYSFS
/* /*
* Workqueues with WQ_SYSFS flag set is visible to userland via * Workqueues with WQ_SYSFS flag set is visible to userland via
...@@ -4802,13 +4907,13 @@ static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq) ...@@ -4802,13 +4907,13 @@ static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
{ {
struct workqueue_attrs *attrs; struct workqueue_attrs *attrs;
lockdep_assert_held(&wq_pool_mutex);
attrs = alloc_workqueue_attrs(GFP_KERNEL); attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!attrs) if (!attrs)
return NULL; return NULL;
mutex_lock(&wq->mutex);
copy_workqueue_attrs(attrs, wq->unbound_attrs); copy_workqueue_attrs(attrs, wq->unbound_attrs);
mutex_unlock(&wq->mutex);
return attrs; return attrs;
} }
...@@ -4817,18 +4922,22 @@ static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr, ...@@ -4817,18 +4922,22 @@ static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs; struct workqueue_attrs *attrs;
int ret; int ret = -ENOMEM;
apply_wqattrs_lock();
attrs = wq_sysfs_prep_attrs(wq); attrs = wq_sysfs_prep_attrs(wq);
if (!attrs) if (!attrs)
return -ENOMEM; goto out_unlock;
if (sscanf(buf, "%d", &attrs->nice) == 1 && if (sscanf(buf, "%d", &attrs->nice) == 1 &&
attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE) attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
ret = apply_workqueue_attrs(wq, attrs); ret = apply_workqueue_attrs_locked(wq, attrs);
else else
ret = -EINVAL; ret = -EINVAL;
out_unlock:
apply_wqattrs_unlock();
free_workqueue_attrs(attrs); free_workqueue_attrs(attrs);
return ret ?: count; return ret ?: count;
} }
...@@ -4852,16 +4961,20 @@ static ssize_t wq_cpumask_store(struct device *dev, ...@@ -4852,16 +4961,20 @@ static ssize_t wq_cpumask_store(struct device *dev,
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs; struct workqueue_attrs *attrs;
int ret; int ret = -ENOMEM;
apply_wqattrs_lock();
attrs = wq_sysfs_prep_attrs(wq); attrs = wq_sysfs_prep_attrs(wq);
if (!attrs) if (!attrs)
return -ENOMEM; goto out_unlock;
ret = cpumask_parse(buf, attrs->cpumask); ret = cpumask_parse(buf, attrs->cpumask);
if (!ret) if (!ret)
ret = apply_workqueue_attrs(wq, attrs); ret = apply_workqueue_attrs_locked(wq, attrs);
out_unlock:
apply_wqattrs_unlock();
free_workqueue_attrs(attrs); free_workqueue_attrs(attrs);
return ret ?: count; return ret ?: count;
} }
...@@ -4885,18 +4998,22 @@ static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr, ...@@ -4885,18 +4998,22 @@ static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs; struct workqueue_attrs *attrs;
int v, ret; int v, ret = -ENOMEM;
apply_wqattrs_lock();
attrs = wq_sysfs_prep_attrs(wq); attrs = wq_sysfs_prep_attrs(wq);
if (!attrs) if (!attrs)
return -ENOMEM; goto out_unlock;
ret = -EINVAL; ret = -EINVAL;
if (sscanf(buf, "%d", &v) == 1) { if (sscanf(buf, "%d", &v) == 1) {
attrs->no_numa = !v; attrs->no_numa = !v;
ret = apply_workqueue_attrs(wq, attrs); ret = apply_workqueue_attrs_locked(wq, attrs);
} }
out_unlock:
apply_wqattrs_unlock();
free_workqueue_attrs(attrs); free_workqueue_attrs(attrs);
return ret ?: count; return ret ?: count;
} }
...@@ -4914,9 +5031,49 @@ static struct bus_type wq_subsys = { ...@@ -4914,9 +5031,49 @@ static struct bus_type wq_subsys = {
.dev_groups = wq_sysfs_groups, .dev_groups = wq_sysfs_groups,
}; };
static ssize_t wq_unbound_cpumask_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
int written;
mutex_lock(&wq_pool_mutex);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
cpumask_pr_args(wq_unbound_cpumask));
mutex_unlock(&wq_pool_mutex);
return written;
}
static ssize_t wq_unbound_cpumask_store(struct device *dev,
struct device_attribute *attr, const char *buf, size_t count)
{
cpumask_var_t cpumask;
int ret;
if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
return -ENOMEM;
ret = cpumask_parse(buf, cpumask);
if (!ret)
ret = workqueue_set_unbound_cpumask(cpumask);
free_cpumask_var(cpumask);
return ret ? ret : count;
}
static struct device_attribute wq_sysfs_cpumask_attr =
__ATTR(cpumask, 0644, wq_unbound_cpumask_show,
wq_unbound_cpumask_store);
static int __init wq_sysfs_init(void) static int __init wq_sysfs_init(void)
{ {
return subsys_virtual_register(&wq_subsys, NULL); int err;
err = subsys_virtual_register(&wq_subsys, NULL);
if (err)
return err;
return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
} }
core_initcall(wq_sysfs_init); core_initcall(wq_sysfs_init);
...@@ -4948,7 +5105,7 @@ int workqueue_sysfs_register(struct workqueue_struct *wq) ...@@ -4948,7 +5105,7 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
int ret; int ret;
/* /*
* Adjusting max_active or creating new pwqs by applyting * Adjusting max_active or creating new pwqs by applying
* attributes breaks ordering guarantee. Disallow exposing ordered * attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues. * workqueues.
*/ */
...@@ -5064,6 +5221,9 @@ static int __init init_workqueues(void) ...@@ -5064,6 +5221,9 @@ static int __init init_workqueues(void)
WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册