提交 4a69a410 编写于 作者: C Chris Mason

Btrfs: Add ordered async work queues

Btrfs uses kernel threads to create async work queues for cpu intensive
operations such as checksumming and decompression.  These work well,
but they make it difficult to keep IO order intact.

A single writepages call from pdflush or fsync will turn into a number
of bios, and each bio is checksummed in parallel.  Once the checksum is
computed, the bio is sent down to the disk, and since we don't control
the order in which the parallel operations happen, they might go down to
the disk in almost any order.

The code deals with this somewhat by having deep work queues for a single
kernel thread, making it very likely that a single thread will process all
the bios for a single inode.

This patch introduces an explicitly ordered work queue.  As work structs
are placed into the queue they are put onto the tail of a list.  They have
three callbacks:

->func (cpu intensive processing here)
->ordered_func (order sensitive processing here)
->ordered_free (free the work struct, all processing is done)

The work struct has three callbacks.  The func callback does the cpu intensive
work, and when it completes the work struct is marked as done.

Every time a work struct completes, the list is checked to see if the head
is marked as done.  If so the ordered_func callback is used to do the
order sensitive processing and the ordered_free callback is used to do
any cleanup.  Then we loop back and check the head of the list again.

This patch also changes the checksumming code to use the ordered workqueues.
One a 4 drive array, it increases streaming writes from 280MB/s to 350MB/s.
Signed-off-by: NChris Mason <chris.mason@oracle.com>
上级 537fb067
...@@ -23,6 +23,10 @@ ...@@ -23,6 +23,10 @@
# include <linux/freezer.h> # include <linux/freezer.h>
#include "async-thread.h" #include "async-thread.h"
#define WORK_QUEUED_BIT 0
#define WORK_DONE_BIT 1
#define WORK_ORDER_DONE_BIT 2
/* /*
* container for the kthread task pointer and the list of pending work * container for the kthread task pointer and the list of pending work
* One of these is allocated per thread. * One of these is allocated per thread.
...@@ -88,6 +92,47 @@ static void check_busy_worker(struct btrfs_worker_thread *worker) ...@@ -88,6 +92,47 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
} }
} }
static noinline int run_ordered_completions(struct btrfs_workers *workers,
struct btrfs_work *work)
{
unsigned long flags;
if (!workers->ordered)
return 0;
set_bit(WORK_DONE_BIT, &work->flags);
spin_lock_irqsave(&workers->lock, flags);
while(!list_empty(&workers->order_list)) {
work = list_entry(workers->order_list.next,
struct btrfs_work, order_list);
if (!test_bit(WORK_DONE_BIT, &work->flags))
break;
/* we are going to call the ordered done function, but
* we leave the work item on the list as a barrier so
* that later work items that are done don't have their
* functions called before this one returns
*/
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
break;
spin_unlock_irqrestore(&workers->lock, flags);
work->ordered_func(work);
/* now take the lock again and call the freeing code */
spin_lock_irqsave(&workers->lock, flags);
list_del(&work->order_list);
work->ordered_free(work);
}
spin_unlock_irqrestore(&workers->lock, flags);
return 0;
}
/* /*
* main loop for servicing work items * main loop for servicing work items
*/ */
...@@ -102,7 +147,7 @@ static int worker_loop(void *arg) ...@@ -102,7 +147,7 @@ static int worker_loop(void *arg)
cur = worker->pending.next; cur = worker->pending.next;
work = list_entry(cur, struct btrfs_work, list); work = list_entry(cur, struct btrfs_work, list);
list_del(&work->list); list_del(&work->list);
clear_bit(0, &work->flags); clear_bit(WORK_QUEUED_BIT, &work->flags);
work->worker = worker; work->worker = worker;
spin_unlock_irq(&worker->lock); spin_unlock_irq(&worker->lock);
...@@ -110,8 +155,15 @@ static int worker_loop(void *arg) ...@@ -110,8 +155,15 @@ static int worker_loop(void *arg)
work->func(work); work->func(work);
atomic_dec(&worker->num_pending); atomic_dec(&worker->num_pending);
/*
* unless this is an ordered work queue,
* 'work' was probably freed by func above.
*/
run_ordered_completions(worker->workers, work);
spin_lock_irq(&worker->lock); spin_lock_irq(&worker->lock);
check_idle_worker(worker); check_idle_worker(worker);
} }
worker->working = 0; worker->working = 0;
if (freezing(current)) { if (freezing(current)) {
...@@ -154,10 +206,12 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) ...@@ -154,10 +206,12 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
workers->num_workers = 0; workers->num_workers = 0;
INIT_LIST_HEAD(&workers->worker_list); INIT_LIST_HEAD(&workers->worker_list);
INIT_LIST_HEAD(&workers->idle_list); INIT_LIST_HEAD(&workers->idle_list);
INIT_LIST_HEAD(&workers->order_list);
spin_lock_init(&workers->lock); spin_lock_init(&workers->lock);
workers->max_workers = max; workers->max_workers = max;
workers->idle_thresh = 32; workers->idle_thresh = 32;
workers->name = name; workers->name = name;
workers->ordered = 0;
} }
/* /*
...@@ -296,7 +350,7 @@ int btrfs_requeue_work(struct btrfs_work *work) ...@@ -296,7 +350,7 @@ int btrfs_requeue_work(struct btrfs_work *work)
struct btrfs_worker_thread *worker = work->worker; struct btrfs_worker_thread *worker = work->worker;
unsigned long flags; unsigned long flags;
if (test_and_set_bit(0, &work->flags)) if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
goto out; goto out;
spin_lock_irqsave(&worker->lock, flags); spin_lock_irqsave(&worker->lock, flags);
...@@ -330,10 +384,17 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) ...@@ -330,10 +384,17 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
int wake = 0; int wake = 0;
/* don't requeue something already on a list */ /* don't requeue something already on a list */
if (test_and_set_bit(0, &work->flags)) if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
goto out; goto out;
worker = find_worker(workers); worker = find_worker(workers);
if (workers->ordered) {
spin_lock_irqsave(&workers->lock, flags);
list_add_tail(&work->order_list, &workers->order_list);
spin_unlock_irqrestore(&workers->lock, flags);
} else {
INIT_LIST_HEAD(&work->order_list);
}
spin_lock_irqsave(&worker->lock, flags); spin_lock_irqsave(&worker->lock, flags);
atomic_inc(&worker->num_pending); atomic_inc(&worker->num_pending);
......
...@@ -37,10 +37,16 @@ struct btrfs_worker_thread; ...@@ -37,10 +37,16 @@ struct btrfs_worker_thread;
*/ */
struct btrfs_work { struct btrfs_work {
/* /*
* only func should be set to the function you want called * func should be set to the function you want called
* your work struct is passed as the only arg * your work struct is passed as the only arg
*
* ordered_func must be set for work sent to an ordered work queue,
* and it is called to complete a given work item in the same
* order they were sent to the queue.
*/ */
void (*func)(struct btrfs_work *work); void (*func)(struct btrfs_work *work);
void (*ordered_func)(struct btrfs_work *work);
void (*ordered_free)(struct btrfs_work *work);
/* /*
* flags should be set to zero. It is used to make sure the * flags should be set to zero. It is used to make sure the
...@@ -51,6 +57,7 @@ struct btrfs_work { ...@@ -51,6 +57,7 @@ struct btrfs_work {
/* don't touch these */ /* don't touch these */
struct btrfs_worker_thread *worker; struct btrfs_worker_thread *worker;
struct list_head list; struct list_head list;
struct list_head order_list;
}; };
struct btrfs_workers { struct btrfs_workers {
...@@ -63,6 +70,9 @@ struct btrfs_workers { ...@@ -63,6 +70,9 @@ struct btrfs_workers {
/* once a worker has this many requests or fewer, it is idle */ /* once a worker has this many requests or fewer, it is idle */
int idle_thresh; int idle_thresh;
/* force completions in the order they were queued */
int ordered;
/* list with all the work threads. The workers on the idle thread /* list with all the work threads. The workers on the idle thread
* may be actively servicing jobs, but they haven't yet hit the * may be actively servicing jobs, but they haven't yet hit the
* idle thresh limit above. * idle thresh limit above.
...@@ -70,6 +80,12 @@ struct btrfs_workers { ...@@ -70,6 +80,12 @@ struct btrfs_workers {
struct list_head worker_list; struct list_head worker_list;
struct list_head idle_list; struct list_head idle_list;
/*
* when operating in ordered mode, this maintains the list
* of work items waiting for completion
*/
struct list_head order_list;
/* lock for finding the next worker thread to queue on */ /* lock for finding the next worker thread to queue on */
spinlock_t lock; spinlock_t lock;
......
...@@ -80,7 +80,8 @@ struct async_submit_bio { ...@@ -80,7 +80,8 @@ struct async_submit_bio {
struct inode *inode; struct inode *inode;
struct bio *bio; struct bio *bio;
struct list_head list; struct list_head list;
extent_submit_bio_hook_t *submit_bio_hook; extent_submit_bio_hook_t *submit_bio_start;
extent_submit_bio_hook_t *submit_bio_done;
int rw; int rw;
int mirror_num; int mirror_num;
unsigned long bio_flags; unsigned long bio_flags;
...@@ -452,7 +453,18 @@ int btrfs_congested_async(struct btrfs_fs_info *info, int iodone) ...@@ -452,7 +453,18 @@ int btrfs_congested_async(struct btrfs_fs_info *info, int iodone)
btrfs_async_submit_limit(info); btrfs_async_submit_limit(info);
} }
static void run_one_async_submit(struct btrfs_work *work) static void run_one_async_start(struct btrfs_work *work)
{
struct btrfs_fs_info *fs_info;
struct async_submit_bio *async;
async = container_of(work, struct async_submit_bio, work);
fs_info = BTRFS_I(async->inode)->root->fs_info;
async->submit_bio_start(async->inode, async->rw, async->bio,
async->mirror_num, async->bio_flags);
}
static void run_one_async_done(struct btrfs_work *work)
{ {
struct btrfs_fs_info *fs_info; struct btrfs_fs_info *fs_info;
struct async_submit_bio *async; struct async_submit_bio *async;
...@@ -470,15 +482,23 @@ static void run_one_async_submit(struct btrfs_work *work) ...@@ -470,15 +482,23 @@ static void run_one_async_submit(struct btrfs_work *work)
waitqueue_active(&fs_info->async_submit_wait)) waitqueue_active(&fs_info->async_submit_wait))
wake_up(&fs_info->async_submit_wait); wake_up(&fs_info->async_submit_wait);
async->submit_bio_hook(async->inode, async->rw, async->bio, async->submit_bio_done(async->inode, async->rw, async->bio,
async->mirror_num, async->bio_flags); async->mirror_num, async->bio_flags);
}
static void run_one_async_free(struct btrfs_work *work)
{
struct async_submit_bio *async;
async = container_of(work, struct async_submit_bio, work);
kfree(async); kfree(async);
} }
int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
int rw, struct bio *bio, int mirror_num, int rw, struct bio *bio, int mirror_num,
unsigned long bio_flags, unsigned long bio_flags,
extent_submit_bio_hook_t *submit_bio_hook) extent_submit_bio_hook_t *submit_bio_start,
extent_submit_bio_hook_t *submit_bio_done)
{ {
struct async_submit_bio *async; struct async_submit_bio *async;
int limit = btrfs_async_submit_limit(fs_info); int limit = btrfs_async_submit_limit(fs_info);
...@@ -491,8 +511,13 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, ...@@ -491,8 +511,13 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
async->rw = rw; async->rw = rw;
async->bio = bio; async->bio = bio;
async->mirror_num = mirror_num; async->mirror_num = mirror_num;
async->submit_bio_hook = submit_bio_hook; async->submit_bio_start = submit_bio_start;
async->work.func = run_one_async_submit; async->submit_bio_done = submit_bio_done;
async->work.func = run_one_async_start;
async->work.ordered_func = run_one_async_done;
async->work.ordered_free = run_one_async_free;
async->work.flags = 0; async->work.flags = 0;
async->bio_flags = bio_flags; async->bio_flags = bio_flags;
...@@ -533,29 +558,25 @@ static int btree_csum_one_bio(struct bio *bio) ...@@ -533,29 +558,25 @@ static int btree_csum_one_bio(struct bio *bio)
return 0; return 0;
} }
static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, static int __btree_submit_bio_start(struct inode *inode, int rw,
int mirror_num, unsigned long bio_flags) struct bio *bio, int mirror_num,
unsigned long bio_flags)
{ {
struct btrfs_root *root = BTRFS_I(inode)->root;
int ret;
/* /*
* when we're called for a write, we're already in the async * when we're called for a write, we're already in the async
* submission context. Just jump into btrfs_map_bio * submission context. Just jump into btrfs_map_bio
*/ */
if (rw & (1 << BIO_RW)) { btree_csum_one_bio(bio);
btree_csum_one_bio(bio); return 0;
return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, }
mirror_num, 1);
}
static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
int mirror_num, unsigned long bio_flags)
{
/* /*
* called for a read, do the setup so that checksum validation * when we're called for a write, we're already in the async
* can happen in the async kernel threads * submission context. Just jump into btrfs_map_bio
*/ */
ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
BUG_ON(ret);
return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1); return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
} }
...@@ -567,11 +588,22 @@ static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, ...@@ -567,11 +588,22 @@ static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
* can happen in parallel across all CPUs * can happen in parallel across all CPUs
*/ */
if (!(rw & (1 << BIO_RW))) { if (!(rw & (1 << BIO_RW))) {
return __btree_submit_bio_hook(inode, rw, bio, mirror_num, 0); int ret;
/*
* called for a read, do the setup so that checksum validation
* can happen in the async kernel threads
*/
ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
bio, 1);
BUG_ON(ret);
return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
mirror_num, 1);
} }
return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
inode, rw, bio, mirror_num, 0, inode, rw, bio, mirror_num, 0,
__btree_submit_bio_hook); __btree_submit_bio_start,
__btree_submit_bio_done);
} }
static int btree_writepage(struct page *page, struct writeback_control *wbc) static int btree_writepage(struct page *page, struct writeback_control *wbc)
...@@ -1534,7 +1566,8 @@ struct btrfs_root *open_ctree(struct super_block *sb, ...@@ -1534,7 +1566,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
* were sent by the writeback daemons, improving overall locality * were sent by the writeback daemons, improving overall locality
* of the IO going down the pipe. * of the IO going down the pipe.
*/ */
fs_info->workers.idle_thresh = 128; fs_info->workers.idle_thresh = 8;
fs_info->workers.ordered = 1;
btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1); btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
btrfs_init_workers(&fs_info->endio_workers, "endio", btrfs_init_workers(&fs_info->endio_workers, "endio",
......
...@@ -72,7 +72,9 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio, ...@@ -72,7 +72,9 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,
int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
int rw, struct bio *bio, int mirror_num, int rw, struct bio *bio, int mirror_num,
unsigned long bio_flags, unsigned long bio_flags,
extent_submit_bio_hook_t *submit_bio_hook); extent_submit_bio_hook_t *submit_bio_start,
extent_submit_bio_hook_t *submit_bio_done);
int btrfs_congested_async(struct btrfs_fs_info *info, int iodone); int btrfs_congested_async(struct btrfs_fs_info *info, int iodone);
unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info); unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info);
int btrfs_write_tree_block(struct extent_buffer *buf); int btrfs_write_tree_block(struct extent_buffer *buf);
......
...@@ -881,7 +881,7 @@ int btrfs_merge_bio_hook(struct page *page, unsigned long offset, ...@@ -881,7 +881,7 @@ int btrfs_merge_bio_hook(struct page *page, unsigned long offset,
* At IO completion time the cums attached on the ordered extent record * At IO completion time the cums attached on the ordered extent record
* are inserted into the btree * are inserted into the btree
*/ */
int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, int __btrfs_submit_bio_start(struct inode *inode, int rw, struct bio *bio,
int mirror_num, unsigned long bio_flags) int mirror_num, unsigned long bio_flags)
{ {
struct btrfs_root *root = BTRFS_I(inode)->root; struct btrfs_root *root = BTRFS_I(inode)->root;
...@@ -889,7 +889,21 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, ...@@ -889,7 +889,21 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
ret = btrfs_csum_one_bio(root, inode, bio); ret = btrfs_csum_one_bio(root, inode, bio);
BUG_ON(ret); BUG_ON(ret);
return 0;
}
/*
* in order to insert checksums into the metadata in large chunks,
* we wait until bio submission time. All the pages in the bio are
* checksummed and sums are attached onto the ordered extent record.
*
* At IO completion time the cums attached on the ordered extent record
* are inserted into the btree
*/
int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
int mirror_num, unsigned long bio_flags)
{
struct btrfs_root *root = BTRFS_I(inode)->root;
return btrfs_map_bio(root, rw, bio, mirror_num, 1); return btrfs_map_bio(root, rw, bio, mirror_num, 1);
} }
...@@ -922,7 +936,8 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, ...@@ -922,7 +936,8 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
/* we're doing a write, do the async checksumming */ /* we're doing a write, do the async checksumming */
return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
inode, rw, bio, mirror_num, inode, rw, bio, mirror_num,
bio_flags, __btrfs_submit_bio_hook); bio_flags, __btrfs_submit_bio_start,
__btrfs_submit_bio_done);
} }
mapit: mapit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册