async-thread.c 11.1 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2 3
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
4
 * Copyright (C) 2014 Fujitsu.  All rights reserved.
5 6 7
 */

#include <linux/kthread.h>
8
#include <linux/slab.h>
9 10
#include <linux/list.h>
#include <linux/spinlock.h>
11
#include <linux/freezer.h>
12
#include "async-thread.h"
13
#include "ctree.h"
14

15 16 17 18 19
enum {
	WORK_DONE_BIT,
	WORK_ORDER_DONE_BIT,
	WORK_HIGH_PRIO_BIT,
};
C
Chris Mason 已提交
20

21 22 23
#define NO_THRESHOLD (-1)
#define DFT_THRESHOLD (32)

24
struct __btrfs_workqueue {
25
	struct workqueue_struct *normal_wq;
26 27 28 29

	/* File system this workqueue services */
	struct btrfs_fs_info *fs_info;

30 31 32 33 34
	/* List head pointing to ordered work list */
	struct list_head ordered_list;

	/* Spinlock for ordered_list */
	spinlock_t list_lock;
35 36 37

	/* Thresholding related variants */
	atomic_t pending;
38 39 40 41 42 43 44 45

	/* Up limit of concurrency workers */
	int limit_active;

	/* Current number of concurrency workers */
	int current_active;

	/* Threshold to change current_active */
46 47 48
	int thresh;
	unsigned int count;
	spinlock_t thres_lock;
49 50
};

51 52 53
struct btrfs_workqueue {
	struct __btrfs_workqueue *normal;
	struct __btrfs_workqueue *high;
54 55
};

56 57 58
static void normal_work_helper(struct btrfs_work *work);

#define BTRFS_WORK_HELPER(name)					\
59
noinline_for_stack void btrfs_##name(struct work_struct *arg)		\
60 61 62 63 64 65
{									\
	struct btrfs_work *work = container_of(arg, struct btrfs_work,	\
					       normal_work);		\
	normal_work_helper(work);					\
}

66
struct btrfs_fs_info *
67
btrfs_workqueue_owner(const struct __btrfs_workqueue *wq)
68 69 70 71 72
{
	return wq->fs_info;
}

struct btrfs_fs_info *
73
btrfs_work_owner(const struct btrfs_work *work)
74 75 76 77
{
	return work->wq->fs_info;
}

78
bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
79 80 81 82 83 84 85 86 87 88 89 90 91
{
	/*
	 * We could compare wq->normal->pending with num_online_cpus()
	 * to support "thresh == NO_THRESHOLD" case, but it requires
	 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
	 * postpone it until someone needs the support of that case.
	 */
	if (wq->normal->thresh == NO_THRESHOLD)
		return false;

	return atomic_read(&wq->normal->pending) > wq->normal->thresh * 2;
}

92 93 94 95 96 97 98 99 100 101
BTRFS_WORK_HELPER(worker_helper);
BTRFS_WORK_HELPER(delalloc_helper);
BTRFS_WORK_HELPER(flush_delalloc_helper);
BTRFS_WORK_HELPER(cache_helper);
BTRFS_WORK_HELPER(submit_helper);
BTRFS_WORK_HELPER(fixup_helper);
BTRFS_WORK_HELPER(endio_helper);
BTRFS_WORK_HELPER(endio_meta_helper);
BTRFS_WORK_HELPER(endio_meta_write_helper);
BTRFS_WORK_HELPER(endio_raid56_helper);
102
BTRFS_WORK_HELPER(endio_repair_helper);
103 104 105 106 107 108 109 110 111 112
BTRFS_WORK_HELPER(rmw_helper);
BTRFS_WORK_HELPER(endio_write_helper);
BTRFS_WORK_HELPER(freespace_write_helper);
BTRFS_WORK_HELPER(delayed_meta_helper);
BTRFS_WORK_HELPER(readahead_helper);
BTRFS_WORK_HELPER(qgroup_rescan_helper);
BTRFS_WORK_HELPER(extent_refs_helper);
BTRFS_WORK_HELPER(scrub_helper);
BTRFS_WORK_HELPER(scrubwrc_helper);
BTRFS_WORK_HELPER(scrubnc_helper);
113
BTRFS_WORK_HELPER(scrubparity_helper);
114 115

static struct __btrfs_workqueue *
116 117
__btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, const char *name,
			unsigned int flags, int limit_active, int thresh)
118
{
119
	struct __btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
120

121
	if (!ret)
122 123
		return NULL;

124
	ret->fs_info = fs_info;
125
	ret->limit_active = limit_active;
126 127 128 129 130
	atomic_set(&ret->pending, 0);
	if (thresh == 0)
		thresh = DFT_THRESHOLD;
	/* For low threshold, disabling threshold is a better choice */
	if (thresh < DFT_THRESHOLD) {
131
		ret->current_active = limit_active;
132 133
		ret->thresh = NO_THRESHOLD;
	} else {
134 135 136 137 138 139
		/*
		 * For threshold-able wq, let its concurrency grow on demand.
		 * Use minimal max_active at alloc time to reduce resource
		 * usage.
		 */
		ret->current_active = 1;
140 141 142
		ret->thresh = thresh;
	}

143
	if (flags & WQ_HIGHPRI)
144 145
		ret->normal_wq = alloc_workqueue("btrfs-%s-high", flags,
						 ret->current_active, name);
146
	else
147 148
		ret->normal_wq = alloc_workqueue("btrfs-%s", flags,
						 ret->current_active, name);
149
	if (!ret->normal_wq) {
150 151 152 153 154 155
		kfree(ret);
		return NULL;
	}

	INIT_LIST_HEAD(&ret->ordered_list);
	spin_lock_init(&ret->list_lock);
156
	spin_lock_init(&ret->thres_lock);
157
	trace_btrfs_workqueue_alloc(ret, name, flags & WQ_HIGHPRI);
158 159 160 161
	return ret;
}

static inline void
162
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq);
163

164 165
struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
					      const char *name,
166
					      unsigned int flags,
167
					      int limit_active,
168
					      int thresh)
169
{
170
	struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
171

172
	if (!ret)
173 174
		return NULL;

175 176
	ret->normal = __btrfs_alloc_workqueue(fs_info, name,
					      flags & ~WQ_HIGHPRI,
177
					      limit_active, thresh);
178
	if (!ret->normal) {
179 180 181 182
		kfree(ret);
		return NULL;
	}

183
	if (flags & WQ_HIGHPRI) {
184 185
		ret->high = __btrfs_alloc_workqueue(fs_info, name, flags,
						    limit_active, thresh);
186
		if (!ret->high) {
187 188 189 190 191
			__btrfs_destroy_workqueue(ret->normal);
			kfree(ret);
			return NULL;
		}
	}
192 193 194
	return ret;
}

195 196 197 198 199
/*
 * Hook for threshold which will be called in btrfs_queue_work.
 * This hook WILL be called in IRQ handler context,
 * so workqueue_set_max_active MUST NOT be called in this hook
 */
200
static inline void thresh_queue_hook(struct __btrfs_workqueue *wq)
201 202 203 204 205 206 207 208 209 210 211
{
	if (wq->thresh == NO_THRESHOLD)
		return;
	atomic_inc(&wq->pending);
}

/*
 * Hook for threshold which will be called before executing the work,
 * This hook is called in kthread content.
 * So workqueue_set_max_active is called here.
 */
212
static inline void thresh_exec_hook(struct __btrfs_workqueue *wq)
213
{
214
	int new_current_active;
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
	long pending;
	int need_change = 0;

	if (wq->thresh == NO_THRESHOLD)
		return;

	atomic_dec(&wq->pending);
	spin_lock(&wq->thres_lock);
	/*
	 * Use wq->count to limit the calling frequency of
	 * workqueue_set_max_active.
	 */
	wq->count++;
	wq->count %= (wq->thresh / 4);
	if (!wq->count)
		goto  out;
231
	new_current_active = wq->current_active;
232 233 234 235 236 237 238

	/*
	 * pending may be changed later, but it's OK since we really
	 * don't need it so accurate to calculate new_max_active.
	 */
	pending = atomic_read(&wq->pending);
	if (pending > wq->thresh)
239
		new_current_active++;
240
	if (pending < wq->thresh / 2)
241 242 243
		new_current_active--;
	new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
	if (new_current_active != wq->current_active)  {
244
		need_change = 1;
245
		wq->current_active = new_current_active;
246 247 248 249 250
	}
out:
	spin_unlock(&wq->thres_lock);

	if (need_change) {
251
		workqueue_set_max_active(wq->normal_wq, wq->current_active);
252 253 254
	}
}

255 256
static void run_ordered_work(struct __btrfs_workqueue *wq,
			     struct btrfs_work *self)
257 258
{
	struct list_head *list = &wq->ordered_list;
259
	struct btrfs_work *work;
260 261
	spinlock_t *lock = &wq->list_lock;
	unsigned long flags;
262 263
	void *wtag;
	bool free_self = false;
264 265 266 267 268

	while (1) {
		spin_lock_irqsave(lock, flags);
		if (list_empty(list))
			break;
269
		work = list_entry(list->next, struct btrfs_work,
270 271 272 273 274 275 276 277 278 279 280 281
				  ordered_list);
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/*
		 * we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;
282
		trace_btrfs_ordered_sched(work);
283 284 285 286 287 288 289 290
		spin_unlock_irqrestore(lock, flags);
		work->ordered_func(work);

		/* now take the lock again and drop our item from the list */
		spin_lock_irqsave(lock, flags);
		list_del(&work->ordered_list);
		spin_unlock_irqrestore(lock, flags);

291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
		if (work == self) {
			/*
			 * This is the work item that the worker is currently
			 * executing.
			 *
			 * The kernel workqueue code guarantees non-reentrancy
			 * of work items. I.e., if a work item with the same
			 * address and work function is queued twice, the second
			 * execution is blocked until the first one finishes. A
			 * work item may be freed and recycled with the same
			 * work function; the workqueue code assumes that the
			 * original work item cannot depend on the recycled work
			 * item in that case (see find_worker_executing_work()).
			 *
			 * Note that the work of one Btrfs filesystem may depend
			 * on the work of another Btrfs filesystem via, e.g., a
			 * loop device. Therefore, we must not allow the current
			 * work item to be recycled until we are really done,
			 * otherwise we break the above assumption and can
			 * deadlock.
			 */
			free_self = true;
		} else {
			/*
			 * We don't want to call the ordered free functions with
			 * the lock held though. Save the work as tag for the
			 * trace event, because the callback could free the
			 * structure.
			 */
			wtag = work;
			work->ordered_free(work);
			trace_btrfs_all_work_done(wq->fs_info, wtag);
		}
324 325
	}
	spin_unlock_irqrestore(lock, flags);
326 327 328 329 330 331

	if (free_self) {
		wtag = self;
		self->ordered_free(self);
		trace_btrfs_all_work_done(wq->fs_info, wtag);
	}
332 333
}

334
static void normal_work_helper(struct btrfs_work *work)
335
{
336
	struct __btrfs_workqueue *wq;
337
	void *wtag;
338 339 340 341 342 343 344 345 346 347 348 349 350
	int need_order = 0;

	/*
	 * We should not touch things inside work in the following cases:
	 * 1) after work->func() if it has no ordered_free
	 *    Since the struct is freed in work->func().
	 * 2) after setting WORK_DONE_BIT
	 *    The work may be freed in other threads almost instantly.
	 * So we save the needed things here.
	 */
	if (work->ordered_func)
		need_order = 1;
	wq = work->wq;
351 352
	/* Safe for tracepoints in case work gets freed by the callback */
	wtag = work;
353

354
	trace_btrfs_work_sched(work);
355
	thresh_exec_hook(wq);
356 357 358
	work->func(work);
	if (need_order) {
		set_bit(WORK_DONE_BIT, &work->flags);
359
		run_ordered_work(wq, work);
360
	}
361
	if (!need_order)
362
		trace_btrfs_all_work_done(wq->fs_info, wtag);
363 364
}

365
void btrfs_init_work(struct btrfs_work *work, btrfs_work_func_t uniq_func,
366 367 368
		     btrfs_func_t func,
		     btrfs_func_t ordered_func,
		     btrfs_func_t ordered_free)
369 370 371 372
{
	work->func = func;
	work->ordered_func = ordered_func;
	work->ordered_free = ordered_free;
373
	INIT_WORK(&work->normal_work, uniq_func);
374 375 376 377
	INIT_LIST_HEAD(&work->ordered_list);
	work->flags = 0;
}

378 379
static inline void __btrfs_queue_work(struct __btrfs_workqueue *wq,
				      struct btrfs_work *work)
380 381 382 383
{
	unsigned long flags;

	work->wq = wq;
384
	thresh_queue_hook(wq);
385 386 387 388 389
	if (work->ordered_func) {
		spin_lock_irqsave(&wq->list_lock, flags);
		list_add_tail(&work->ordered_list, &wq->ordered_list);
		spin_unlock_irqrestore(&wq->list_lock, flags);
	}
390
	trace_btrfs_work_queued(work);
391
	queue_work(wq->normal_wq, &work->normal_work);
392 393
}

394 395
void btrfs_queue_work(struct btrfs_workqueue *wq,
		      struct btrfs_work *work)
396
{
397
	struct __btrfs_workqueue *dest_wq;
398 399 400 401 402 403 404 405 406

	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
		dest_wq = wq->high;
	else
		dest_wq = wq->normal;
	__btrfs_queue_work(dest_wq, work);
}

static inline void
407
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq)
408 409
{
	destroy_workqueue(wq->normal_wq);
410
	trace_btrfs_workqueue_destroy(wq);
411 412 413
	kfree(wq);
}

414
void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
415 416 417 418 419 420
{
	if (!wq)
		return;
	if (wq->high)
		__btrfs_destroy_workqueue(wq->high);
	__btrfs_destroy_workqueue(wq->normal);
421
	kfree(wq);
422 423
}

424
void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
425
{
426 427
	if (!wq)
		return;
428
	wq->normal->limit_active = limit_active;
429
	if (wq->high)
430
		wq->high->limit_active = limit_active;
431 432
}

433
void btrfs_set_work_high_priority(struct btrfs_work *work)
434 435
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
436
}