async-thread.c 7.9 KB
Newer Older
1 2
/*
 * Copyright (C) 2007 Oracle.  All rights reserved.
3
 * Copyright (C) 2014 Fujitsu.  All rights reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License v2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this program; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 021110-1307, USA.
 */

#include <linux/kthread.h>
21
#include <linux/slab.h>
22 23
#include <linux/list.h>
#include <linux/spinlock.h>
24
#include <linux/freezer.h>
25
#include <linux/workqueue.h>
26 27
#include "async-thread.h"

Q
Qu Wenruo 已提交
28 29 30
#define WORK_DONE_BIT 0
#define WORK_ORDER_DONE_BIT 1
#define WORK_HIGH_PRIO_BIT 2
C
Chris Mason 已提交
31

32 33 34
#define NO_THRESHOLD (-1)
#define DFT_THRESHOLD (32)

35
struct __btrfs_workqueue {
36 37 38 39 40 41
	struct workqueue_struct *normal_wq;
	/* List head pointing to ordered work list */
	struct list_head ordered_list;

	/* Spinlock for ordered_list */
	spinlock_t list_lock;
42 43 44 45 46 47 48 49

	/* Thresholding related variants */
	atomic_t pending;
	int max_active;
	int current_max;
	int thresh;
	unsigned int count;
	spinlock_t thres_lock;
50 51
};

52 53 54
struct btrfs_workqueue {
	struct __btrfs_workqueue *normal;
	struct __btrfs_workqueue *high;
55 56
};

57
static inline struct __btrfs_workqueue
58
*__btrfs_alloc_workqueue(char *name, int flags, int max_active, int thresh)
59
{
60
	struct __btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_NOFS);
61 62 63 64

	if (unlikely(!ret))
		return NULL;

65 66 67 68 69 70 71 72 73 74 75 76 77
	ret->max_active = max_active;
	atomic_set(&ret->pending, 0);
	if (thresh == 0)
		thresh = DFT_THRESHOLD;
	/* For low threshold, disabling threshold is a better choice */
	if (thresh < DFT_THRESHOLD) {
		ret->current_max = max_active;
		ret->thresh = NO_THRESHOLD;
	} else {
		ret->current_max = 1;
		ret->thresh = thresh;
	}

78 79
	if (flags & WQ_HIGHPRI)
		ret->normal_wq = alloc_workqueue("%s-%s-high", flags,
80 81
						 ret->max_active,
						 "btrfs", name);
82 83
	else
		ret->normal_wq = alloc_workqueue("%s-%s", flags,
84 85
						 ret->max_active, "btrfs",
						 name);
86 87 88 89 90 91 92
	if (unlikely(!ret->normal_wq)) {
		kfree(ret);
		return NULL;
	}

	INIT_LIST_HEAD(&ret->ordered_list);
	spin_lock_init(&ret->list_lock);
93
	spin_lock_init(&ret->thres_lock);
94 95 96 97
	return ret;
}

static inline void
98
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq);
99

100 101 102 103
struct btrfs_workqueue *btrfs_alloc_workqueue(char *name,
					      int flags,
					      int max_active,
					      int thresh)
104
{
105
	struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_NOFS);
106 107 108 109

	if (unlikely(!ret))
		return NULL;

110
	ret->normal = __btrfs_alloc_workqueue(name, flags & ~WQ_HIGHPRI,
111
					      max_active, thresh);
112
	if (unlikely(!ret->normal)) {
113 114 115 116
		kfree(ret);
		return NULL;
	}

117
	if (flags & WQ_HIGHPRI) {
118 119
		ret->high = __btrfs_alloc_workqueue(name, flags, max_active,
						    thresh);
120 121 122 123 124 125
		if (unlikely(!ret->high)) {
			__btrfs_destroy_workqueue(ret->normal);
			kfree(ret);
			return NULL;
		}
	}
126 127 128
	return ret;
}

129 130 131 132 133
/*
 * Hook for threshold which will be called in btrfs_queue_work.
 * This hook WILL be called in IRQ handler context,
 * so workqueue_set_max_active MUST NOT be called in this hook
 */
134
static inline void thresh_queue_hook(struct __btrfs_workqueue *wq)
135 136 137 138 139 140 141 142 143 144 145
{
	if (wq->thresh == NO_THRESHOLD)
		return;
	atomic_inc(&wq->pending);
}

/*
 * Hook for threshold which will be called before executing the work,
 * This hook is called in kthread content.
 * So workqueue_set_max_active is called here.
 */
146
static inline void thresh_exec_hook(struct __btrfs_workqueue *wq)
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
{
	int new_max_active;
	long pending;
	int need_change = 0;

	if (wq->thresh == NO_THRESHOLD)
		return;

	atomic_dec(&wq->pending);
	spin_lock(&wq->thres_lock);
	/*
	 * Use wq->count to limit the calling frequency of
	 * workqueue_set_max_active.
	 */
	wq->count++;
	wq->count %= (wq->thresh / 4);
	if (!wq->count)
		goto  out;
	new_max_active = wq->current_max;

	/*
	 * pending may be changed later, but it's OK since we really
	 * don't need it so accurate to calculate new_max_active.
	 */
	pending = atomic_read(&wq->pending);
	if (pending > wq->thresh)
		new_max_active++;
	if (pending < wq->thresh / 2)
		new_max_active--;
	new_max_active = clamp_val(new_max_active, 1, wq->max_active);
	if (new_max_active != wq->current_max)  {
		need_change = 1;
		wq->current_max = new_max_active;
	}
out:
	spin_unlock(&wq->thres_lock);

	if (need_change) {
		workqueue_set_max_active(wq->normal_wq, wq->current_max);
	}
}

189
static void run_ordered_work(struct __btrfs_workqueue *wq)
190 191
{
	struct list_head *list = &wq->ordered_list;
192
	struct btrfs_work *work;
193 194 195 196 197 198 199
	spinlock_t *lock = &wq->list_lock;
	unsigned long flags;

	while (1) {
		spin_lock_irqsave(lock, flags);
		if (list_empty(list))
			break;
200
		work = list_entry(list->next, struct btrfs_work,
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
				  ordered_list);
		if (!test_bit(WORK_DONE_BIT, &work->flags))
			break;

		/*
		 * we are going to call the ordered done function, but
		 * we leave the work item on the list as a barrier so
		 * that later work items that are done don't have their
		 * functions called before this one returns
		 */
		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
			break;
		spin_unlock_irqrestore(lock, flags);
		work->ordered_func(work);

		/* now take the lock again and drop our item from the list */
		spin_lock_irqsave(lock, flags);
		list_del(&work->ordered_list);
		spin_unlock_irqrestore(lock, flags);

		/*
		 * we don't want to call the ordered free functions
		 * with the lock held though
		 */
		work->ordered_free(work);
	}
	spin_unlock_irqrestore(lock, flags);
}

static void normal_work_helper(struct work_struct *arg)
{
232 233
	struct btrfs_work *work;
	struct __btrfs_workqueue *wq;
234 235
	int need_order = 0;

236
	work = container_of(arg, struct btrfs_work, normal_work);
237 238 239 240 241 242 243 244 245 246 247 248
	/*
	 * We should not touch things inside work in the following cases:
	 * 1) after work->func() if it has no ordered_free
	 *    Since the struct is freed in work->func().
	 * 2) after setting WORK_DONE_BIT
	 *    The work may be freed in other threads almost instantly.
	 * So we save the needed things here.
	 */
	if (work->ordered_func)
		need_order = 1;
	wq = work->wq;

249
	thresh_exec_hook(wq);
250 251 252 253 254 255 256
	work->func(work);
	if (need_order) {
		set_bit(WORK_DONE_BIT, &work->flags);
		run_ordered_work(wq);
	}
}

257 258 259 260
void btrfs_init_work(struct btrfs_work *work,
		     void (*func)(struct btrfs_work *),
		     void (*ordered_func)(struct btrfs_work *),
		     void (*ordered_free)(struct btrfs_work *))
261 262 263 264 265 266 267 268 269
{
	work->func = func;
	work->ordered_func = ordered_func;
	work->ordered_free = ordered_free;
	INIT_WORK(&work->normal_work, normal_work_helper);
	INIT_LIST_HEAD(&work->ordered_list);
	work->flags = 0;
}

270 271
static inline void __btrfs_queue_work(struct __btrfs_workqueue *wq,
				      struct btrfs_work *work)
272 273 274 275
{
	unsigned long flags;

	work->wq = wq;
276
	thresh_queue_hook(wq);
277 278 279 280 281 282 283 284
	if (work->ordered_func) {
		spin_lock_irqsave(&wq->list_lock, flags);
		list_add_tail(&work->ordered_list, &wq->ordered_list);
		spin_unlock_irqrestore(&wq->list_lock, flags);
	}
	queue_work(wq->normal_wq, &work->normal_work);
}

285 286
void btrfs_queue_work(struct btrfs_workqueue *wq,
		      struct btrfs_work *work)
287
{
288
	struct __btrfs_workqueue *dest_wq;
289 290 291 292 293 294 295 296 297

	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
		dest_wq = wq->high;
	else
		dest_wq = wq->normal;
	__btrfs_queue_work(dest_wq, work);
}

static inline void
298
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq)
299 300 301 302 303
{
	destroy_workqueue(wq->normal_wq);
	kfree(wq);
}

304
void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
305 306 307 308 309 310 311 312
{
	if (!wq)
		return;
	if (wq->high)
		__btrfs_destroy_workqueue(wq->high);
	__btrfs_destroy_workqueue(wq->normal);
}

313
void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int max)
314
{
315
	wq->normal->max_active = max;
316
	if (wq->high)
317
		wq->high->max_active = max;
318 319
}

320
void btrfs_set_work_high_priority(struct btrfs_work *work)
321 322
{
	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
323
}