提交 209fb87a 编写于 作者: C Christoph Hellwig 提交者: Alex Elder

xfs simplify and speed up direct I/O completions

Our current handling of direct I/O completions is rather suboptimal,
because we defer it to a workqueue more often than needed, and we
perform a much to aggressive flush of the workqueue in case unwritten
extent conversions happen.

This patch changes the direct I/O reads to not even use a completion
handler, as we don't bother to use it at all, and to perform the unwritten
extent conversions in caller context for synchronous direct I/O.

For a small I/O size direct I/O workload on a consumer grade SSD, such as
the untar of a kernel tree inside qemu this patch gives speedups of
about 5%.  Getting us much closer to the speed of a native block device,
or a fully allocated XFS file.
Signed-off-by: NChristoph Hellwig <hch@lst.de>
Reviewed-by: NDave Chinner <dchinner@redhat.com>
Signed-off-by: NAlex Elder <aelder@sgi.com>
上级 fb511f21
...@@ -202,23 +202,17 @@ xfs_setfilesize( ...@@ -202,23 +202,17 @@ xfs_setfilesize(
} }
/* /*
* Schedule IO completion handling on a xfsdatad if this was * Schedule IO completion handling on the final put of an ioend.
* the final hold on this ioend. If we are asked to wait,
* flush the workqueue.
*/ */
STATIC void STATIC void
xfs_finish_ioend( xfs_finish_ioend(
xfs_ioend_t *ioend, struct xfs_ioend *ioend)
int wait)
{ {
if (atomic_dec_and_test(&ioend->io_remaining)) { if (atomic_dec_and_test(&ioend->io_remaining)) {
struct workqueue_struct *wq; if (ioend->io_type == IO_UNWRITTEN)
queue_work(xfsconvertd_workqueue, &ioend->io_work);
wq = (ioend->io_type == IO_UNWRITTEN) ? else
xfsconvertd_workqueue : xfsdatad_workqueue; queue_work(xfsdatad_workqueue, &ioend->io_work);
queue_work(wq, &ioend->io_work);
if (wait)
flush_workqueue(wq);
} }
} }
...@@ -262,7 +256,7 @@ xfs_end_io( ...@@ -262,7 +256,7 @@ xfs_end_io(
*/ */
if (error == EAGAIN) { if (error == EAGAIN) {
atomic_inc(&ioend->io_remaining); atomic_inc(&ioend->io_remaining);
xfs_finish_ioend(ioend, 0); xfs_finish_ioend(ioend);
/* ensure we don't spin on blocked ioends */ /* ensure we don't spin on blocked ioends */
delay(1); delay(1);
} else { } else {
...@@ -272,6 +266,17 @@ xfs_end_io( ...@@ -272,6 +266,17 @@ xfs_end_io(
} }
} }
/*
* Call IO completion handling in caller context on the final put of an ioend.
*/
STATIC void
xfs_finish_ioend_sync(
struct xfs_ioend *ioend)
{
if (atomic_dec_and_test(&ioend->io_remaining))
xfs_end_io(&ioend->io_work);
}
/* /*
* Allocate and initialise an IO completion structure. * Allocate and initialise an IO completion structure.
* We need to track unwritten extent write completion here initially. * We need to track unwritten extent write completion here initially.
...@@ -353,7 +358,7 @@ xfs_end_bio( ...@@ -353,7 +358,7 @@ xfs_end_bio(
bio->bi_end_io = NULL; bio->bi_end_io = NULL;
bio_put(bio); bio_put(bio);
xfs_finish_ioend(ioend, 0); xfs_finish_ioend(ioend);
} }
STATIC void STATIC void
...@@ -495,7 +500,7 @@ xfs_submit_ioend( ...@@ -495,7 +500,7 @@ xfs_submit_ioend(
} }
if (bio) if (bio)
xfs_submit_ioend_bio(wbc, ioend, bio); xfs_submit_ioend_bio(wbc, ioend, bio);
xfs_finish_ioend(ioend, 0); xfs_finish_ioend(ioend);
} while ((ioend = next) != NULL); } while ((ioend = next) != NULL);
} }
...@@ -1406,70 +1411,56 @@ xfs_get_blocks_direct( ...@@ -1406,70 +1411,56 @@ xfs_get_blocks_direct(
return __xfs_get_blocks(inode, iblock, bh_result, create, 1); return __xfs_get_blocks(inode, iblock, bh_result, create, 1);
} }
/*
* Complete a direct I/O write request.
*
* If the private argument is non-NULL __xfs_get_blocks signals us that we
* need to issue a transaction to convert the range from unwritten to written
* extents. In case this is regular synchronous I/O we just call xfs_end_io
* to do this and we are done. But in case this was a successfull AIO
* request this handler is called from interrupt context, from which we
* can't start transactions. In that case offload the I/O completion to
* the workqueues we also use for buffered I/O completion.
*/
STATIC void STATIC void
xfs_end_io_direct( xfs_end_io_direct_write(
struct kiocb *iocb, struct kiocb *iocb,
loff_t offset, loff_t offset,
ssize_t size, ssize_t size,
void *private, void *private,
int ret, int ret,
bool is_async) bool is_async)
{ {
xfs_ioend_t *ioend = iocb->private; struct xfs_ioend *ioend = iocb->private;
bool complete_aio = is_async;
/* /*
* Non-NULL private data means we need to issue a transaction to * blockdev_direct_IO can return an error even after the I/O
* convert a range from unwritten to written extents. This needs * completion handler was called. Thus we need to protect
* to happen from process context but aio+dio I/O completion * against double-freeing.
* happens from irq context so we need to defer it to a workqueue.
* This is not necessary for synchronous direct I/O, but we do
* it anyway to keep the code uniform and simpler.
*
* Well, if only it were that simple. Because synchronous direct I/O
* requires extent conversion to occur *before* we return to userspace,
* we have to wait for extent conversion to complete. Look at the
* iocb that has been passed to us to determine if this is AIO or
* not. If it is synchronous, tell xfs_finish_ioend() to kick the
* workqueue and wait for it to complete.
*
* The core direct I/O code might be changed to always call the
* completion handler in the future, in which case all this can
* go away.
*/ */
iocb->private = NULL;
ioend->io_offset = offset; ioend->io_offset = offset;
ioend->io_size = size; ioend->io_size = size;
if (ioend->io_type == IO_READ) { if (private && size > 0)
xfs_finish_ioend(ioend, 0); ioend->io_type = IO_UNWRITTEN;
} else if (private && size > 0) {
if (is_async) { if (is_async) {
/*
* If we are converting an unwritten extent we need to delay
* the AIO completion until after the unwrittent extent
* conversion has completed, otherwise do it ASAP.
*/
if (ioend->io_type == IO_UNWRITTEN) {
ioend->io_iocb = iocb; ioend->io_iocb = iocb;
ioend->io_result = ret; ioend->io_result = ret;
complete_aio = false;
xfs_finish_ioend(ioend, 0);
} else { } else {
xfs_finish_ioend(ioend, 1); aio_complete(iocb, ret, 0);
} }
xfs_finish_ioend(ioend);
} else { } else {
/* xfs_finish_ioend_sync(ioend);
* A direct I/O write ioend starts it's life in unwritten
* state in case they map an unwritten extent. This write
* didn't map an unwritten extent so switch it's completion
* handler.
*/
ioend->io_type = IO_NEW;
xfs_finish_ioend(ioend, 0);
} }
/*
* blockdev_direct_IO can return an error even after the I/O
* completion handler was called. Thus we need to protect
* against double-freeing.
*/
iocb->private = NULL;
if (complete_aio)
aio_complete(iocb, ret, 0);
} }
STATIC ssize_t STATIC ssize_t
...@@ -1480,23 +1471,26 @@ xfs_vm_direct_IO( ...@@ -1480,23 +1471,26 @@ xfs_vm_direct_IO(
loff_t offset, loff_t offset,
unsigned long nr_segs) unsigned long nr_segs)
{ {
struct file *file = iocb->ki_filp; struct inode *inode = iocb->ki_filp->f_mapping->host;
struct inode *inode = file->f_mapping->host; struct block_device *bdev = xfs_find_bdev_for_inode(inode);
struct block_device *bdev; ssize_t ret;
ssize_t ret;
if (rw & WRITE) {
bdev = xfs_find_bdev_for_inode(inode); iocb->private = xfs_alloc_ioend(inode, IO_NEW);
iocb->private = xfs_alloc_ioend(inode, rw == WRITE ? ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov,
IO_UNWRITTEN : IO_READ); offset, nr_segs,
xfs_get_blocks_direct,
ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov, xfs_end_io_direct_write);
offset, nr_segs, if (ret != -EIOCBQUEUED && iocb->private)
xfs_get_blocks_direct, xfs_destroy_ioend(iocb->private);
xfs_end_io_direct); } else {
ret = blockdev_direct_IO_no_locking(rw, iocb, inode, bdev, iov,
offset, nr_segs,
xfs_get_blocks_direct,
NULL);
}
if (unlikely(ret != -EIOCBQUEUED && iocb->private))
xfs_destroy_ioend(iocb->private);
return ret; return ret;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册