提交 cfb1e33e 编写于 作者: J Jeff Moyer 提交者: Jens Axboe

aio: implement request batching

Hi,

Some workloads issue batches of small I/O, and the performance is poor
due to the call to blk_run_address_space for every single iocb.  Nathan
Roberts pointed this out, and suggested that by deferring this call
until all I/Os in the iocb array are submitted to the block layer, we
can realize some impressive performance gains (up to 30% for sequential
4k reads in batches of 16).
Signed-off-by: NJeff Moyer <jmoyer@redhat.com>
Signed-off-by: NJens Axboe <jens.axboe@oracle.com>
上级 1af60fbd
......@@ -32,6 +32,9 @@
#include <linux/workqueue.h>
#include <linux/security.h>
#include <linux/eventfd.h>
#include <linux/blkdev.h>
#include <linux/mempool.h>
#include <linux/hash.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
......@@ -60,6 +63,14 @@ static DECLARE_WORK(fput_work, aio_fput_routine);
static DEFINE_SPINLOCK(fput_lock);
static LIST_HEAD(fput_head);
#define AIO_BATCH_HASH_BITS 3 /* allocated on-stack, so don't go crazy */
#define AIO_BATCH_HASH_SIZE (1 << AIO_BATCH_HASH_BITS)
struct aio_batch_entry {
struct hlist_node list;
struct address_space *mapping;
};
mempool_t *abe_pool;
static void aio_kick_handler(struct work_struct *);
static void aio_queue_work(struct kioctx *);
......@@ -73,6 +84,8 @@ static int __init aio_setup(void)
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
aio_wq = create_workqueue("aio");
abe_pool = mempool_create_kmalloc_pool(1, sizeof(struct aio_batch_entry));
BUG_ON(!abe_pool);
pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
......@@ -1531,8 +1544,44 @@ static int aio_wake_function(wait_queue_t *wait, unsigned mode,
return 1;
}
static void aio_batch_add(struct address_space *mapping,
struct hlist_head *batch_hash)
{
struct aio_batch_entry *abe;
struct hlist_node *pos;
unsigned bucket;
bucket = hash_ptr(mapping, AIO_BATCH_HASH_BITS);
hlist_for_each_entry(abe, pos, &batch_hash[bucket], list) {
if (abe->mapping == mapping)
return;
}
abe = mempool_alloc(abe_pool, GFP_KERNEL);
BUG_ON(!igrab(mapping->host));
abe->mapping = mapping;
hlist_add_head(&abe->list, &batch_hash[bucket]);
return;
}
static void aio_batch_free(struct hlist_head *batch_hash)
{
struct aio_batch_entry *abe;
struct hlist_node *pos, *n;
int i;
for (i = 0; i < AIO_BATCH_HASH_SIZE; i++) {
hlist_for_each_entry_safe(abe, pos, n, &batch_hash[i], list) {
blk_run_address_space(abe->mapping);
iput(abe->mapping->host);
hlist_del(&abe->list);
mempool_free(abe, abe_pool);
}
}
}
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
struct iocb *iocb)
struct iocb *iocb, struct hlist_head *batch_hash)
{
struct kiocb *req;
struct file *file;
......@@ -1608,6 +1657,12 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
;
}
spin_unlock_irq(&ctx->ctx_lock);
if (req->ki_opcode == IOCB_CMD_PREAD ||
req->ki_opcode == IOCB_CMD_PREADV ||
req->ki_opcode == IOCB_CMD_PWRITE ||
req->ki_opcode == IOCB_CMD_PWRITEV)
aio_batch_add(file->f_mapping, batch_hash);
aio_put_req(req); /* drop extra ref to req */
return 0;
......@@ -1635,6 +1690,7 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
struct kioctx *ctx;
long ret = 0;
int i;
struct hlist_head batch_hash[AIO_BATCH_HASH_SIZE] = { { 0, }, };
if (unlikely(nr < 0))
return -EINVAL;
......@@ -1666,10 +1722,11 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
break;
}
ret = io_submit_one(ctx, user_iocb, &tmp);
ret = io_submit_one(ctx, user_iocb, &tmp, batch_hash);
if (ret)
break;
}
aio_batch_free(batch_hash);
put_ioctx(ctx);
return i ? i : ret;
......
......@@ -1028,9 +1028,6 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
if (dio->bio)
dio_bio_submit(dio);
/* All IO is now issued, send it on its way */
blk_run_address_space(inode->i_mapping);
/*
* It is possible that, we return short IO due to end of file.
* In that case, we need to release all the pages we got hold on.
......@@ -1057,8 +1054,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
((rw & READ) || (dio->result == dio->size)))
ret = -EIOCBQUEUED;
if (ret != -EIOCBQUEUED)
if (ret != -EIOCBQUEUED) {
/* All IO is now issued, send it on its way */
blk_run_address_space(inode->i_mapping);
dio_await_completion(dio);
}
/*
* Sync will always be dropping the final ref and completing the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册