提交 a31ad380 编写于 作者: K Kent Overstreet 提交者: Linus Torvalds

aio: make aio_read_evt() more efficient, convert to hrtimers

Previously, aio_read_event() pulled a single completion off the
ringbuffer at a time, locking and unlocking each time.  Change it to
pull off as many events as it can at a time, and copy them directly to
userspace.

This also fixes a bug where if copying the event to userspace failed,
we'd lose the event.

Also convert it to wait_event_interruptible_hrtimeout(), which
simplifies it quite a bit.

[akpm@linux-foundation.org: coding-style fixes]
Signed-off-by: NKent Overstreet <koverstreet@google.com>
Cc: Zach Brown <zab@redhat.com>
Cc: Felipe Balbi <balbi@ti.com>
Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Cc: Mark Fasheh <mfasheh@suse.com>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Rusty Russell <rusty@rustcorp.com.au>
Cc: Jens Axboe <axboe@kernel.dk>
Cc: Asai Thambi S P <asamymuthupa@micron.com>
Cc: Selvan Mani <smani@micron.com>
Cc: Sam Bradshaw <sbradshaw@micron.com>
Cc: Jeff Moyer <jmoyer@redhat.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: Benjamin LaHaise <bcrl@kvack.org>
Reviewed-by: N"Theodore Ts'o" <tytso@mit.edu>
Signed-off-by: NAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: NLinus Torvalds <torvalds@linux-foundation.org>
上级 774a08b3
...@@ -63,7 +63,7 @@ struct aio_ring_info { ...@@ -63,7 +63,7 @@ struct aio_ring_info {
unsigned long mmap_size; unsigned long mmap_size;
struct page **ring_pages; struct page **ring_pages;
spinlock_t ring_lock; struct mutex ring_lock;
long nr_pages; long nr_pages;
unsigned nr, tail; unsigned nr, tail;
...@@ -344,7 +344,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) ...@@ -344,7 +344,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2); atomic_set(&ctx->users, 2);
atomic_set(&ctx->dead, 0); atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->ring_info.ring_lock); mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait); init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs); INIT_LIST_HEAD(&ctx->active_reqs);
...@@ -748,187 +748,127 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -748,187 +748,127 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
} }
EXPORT_SYMBOL(aio_complete); EXPORT_SYMBOL(aio_complete);
/* aio_read_evt /* aio_read_events
* Pull an event off of the ioctx's event ring. Returns the number of * Pull an event off of the ioctx's event ring. Returns the number of
* events fetched (0 or 1 ;-) * events fetched
* FIXME: make this use cmpxchg.
* TODO: make the ringbuffer user mmap()able (requires FIXME).
*/ */
static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent) static long aio_read_events_ring(struct kioctx *ctx,
struct io_event __user *event, long nr)
{ {
struct aio_ring_info *info = &ioctx->ring_info; struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring; struct aio_ring *ring;
unsigned long head; unsigned head, pos;
int ret = 0; long ret = 0;
int copy_ret;
mutex_lock(&info->ring_lock);
ring = kmap_atomic(info->ring_pages[0]); ring = kmap_atomic(info->ring_pages[0]);
pr_debug("h%u t%u m%u\n", ring->head, ring->tail, ring->nr); head = ring->head;
kunmap_atomic(ring);
pr_debug("h%u t%u m%u\n", head, info->tail, info->nr);
if (ring->head == ring->tail) if (head == info->tail)
goto out; goto out;
spin_lock(&info->ring_lock); while (ret < nr) {
long avail;
struct io_event *ev;
struct page *page;
head = ring->head % info->nr; avail = (head <= info->tail ? info->tail : info->nr) - head;
if (head != ring->tail) { if (head == info->tail)
struct io_event *evp = aio_ring_event(info, head); break;
*ent = *evp;
head = (head + 1) % info->nr; avail = min(avail, nr - ret);
smp_mb(); /* finish reading the event before updatng the head */ avail = min_t(long, avail, AIO_EVENTS_PER_PAGE -
ring->head = head; ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
ret = 1;
put_aio_ring_event(evp); pos = head + AIO_EVENTS_OFFSET;
page = info->ring_pages[pos / AIO_EVENTS_PER_PAGE];
pos %= AIO_EVENTS_PER_PAGE;
ev = kmap(page);
copy_ret = copy_to_user(event + ret, ev + pos,
sizeof(*ev) * avail);
kunmap(page);
if (unlikely(copy_ret)) {
ret = -EFAULT;
goto out;
} }
spin_unlock(&info->ring_lock);
out: ret += avail;
head += avail;
head %= info->nr;
}
ring = kmap_atomic(info->ring_pages[0]);
ring->head = head;
kunmap_atomic(ring); kunmap_atomic(ring);
pr_debug("%d h%u t%u\n", ret, ring->head, ring->tail);
pr_debug("%li h%u t%u\n", ret, head, info->tail);
out:
mutex_unlock(&info->ring_lock);
return ret; return ret;
} }
struct aio_timeout { static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
struct timer_list timer; struct io_event __user *event, long *i)
int timed_out;
struct task_struct *p;
};
static void timeout_func(unsigned long data)
{ {
struct aio_timeout *to = (struct aio_timeout *)data; long ret = aio_read_events_ring(ctx, event + *i, nr - *i);
to->timed_out = 1; if (ret > 0)
wake_up_process(to->p); *i += ret;
}
static inline void init_timeout(struct aio_timeout *to) if (unlikely(atomic_read(&ctx->dead)))
{ ret = -EINVAL;
setup_timer_on_stack(&to->timer, timeout_func, (unsigned long) to);
to->timed_out = 0;
to->p = current;
}
static inline void set_timeout(long start_jiffies, struct aio_timeout *to, if (!*i)
const struct timespec *ts) *i = ret;
{
to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
if (time_after(to->timer.expires, jiffies))
add_timer(&to->timer);
else
to->timed_out = 1;
}
static inline void clear_timeout(struct aio_timeout *to) return ret < 0 || *i >= min_nr;
{
del_singleshot_timer_sync(&to->timer);
} }
static int read_events(struct kioctx *ctx, static long read_events(struct kioctx *ctx, long min_nr, long nr,
long min_nr, long nr,
struct io_event __user *event, struct io_event __user *event,
struct timespec __user *timeout) struct timespec __user *timeout)
{ {
long start_jiffies = jiffies; ktime_t until = { .tv64 = KTIME_MAX };
struct task_struct *tsk = current; long ret = 0;
DECLARE_WAITQUEUE(wait, tsk);
int ret;
int i = 0;
struct io_event ent;
struct aio_timeout to;
/* needed to zero any padding within an entry (there shouldn't be
* any, but C is fun!
*/
memset(&ent, 0, sizeof(ent));
ret = 0;
while (likely(i < nr)) {
ret = aio_read_evt(ctx, &ent);
if (unlikely(ret <= 0))
break;
pr_debug("%Lx %Lx %Lx %Lx\n",
ent.data, ent.obj, ent.res, ent.res2);
/* Could we split the check in two? */
ret = -EFAULT;
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
pr_debug("lost an event due to EFAULT.\n");
break;
}
ret = 0;
/* Good, event copied to userland, update counts. */
event ++;
i ++;
}
if (min_nr <= i)
return i;
if (ret)
return ret;
/* End fast path */
init_timeout(&to);
if (timeout) { if (timeout) {
struct timespec ts; struct timespec ts;
ret = -EFAULT;
if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
goto out;
set_timeout(start_jiffies, &to, &ts); if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
} return -EFAULT;
while (likely(i < nr)) { until = timespec_to_ktime(ts);
add_wait_queue_exclusive(&ctx->wait, &wait);
do {
set_task_state(tsk, TASK_INTERRUPTIBLE);
ret = aio_read_evt(ctx, &ent);
if (ret)
break;
if (min_nr <= i)
break;
if (unlikely(atomic_read(&ctx->dead))) {
ret = -EINVAL;
break;
} }
if (to.timed_out) /* Only check after read evt */
break;
/* Try to only show up in io wait if there are ops
* in flight */
if (atomic_read(&ctx->reqs_active))
io_schedule();
else
schedule();
if (signal_pending(tsk)) {
ret = -EINTR;
break;
}
/*ret = aio_read_evt(ctx, &ent);*/
} while (1) ;
set_task_state(tsk, TASK_RUNNING); /*
remove_wait_queue(&ctx->wait, &wait); * Note that aio_read_events() is being called as the conditional - i.e.
* we're calling it after prepare_to_wait() has set task state to
if (unlikely(ret <= 0)) * TASK_INTERRUPTIBLE.
break; *
* But aio_read_events() can block, and if it blocks it's going to flip
ret = -EFAULT; * the task state back to TASK_RUNNING.
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) { *
pr_debug("lost an event due to EFAULT.\n"); * This should be ok, provided it doesn't flip the state back to
break; * TASK_RUNNING and return 0 too much - that causes us to spin. That
} * will only happen if the mutex_lock() call blocks, and we then find
* the ringbuffer empty. So in practice we should be ok, but it's
* something to be aware of when touching this code.
*/
wait_event_interruptible_hrtimeout(ctx->wait,
aio_read_events(ctx, min_nr, nr, event, &ret), until);
/* Good, event copied to userland, update counts. */ if (!ret && signal_pending(current))
event ++; ret = -EINTR;
i ++;
}
if (timeout) return ret;
clear_timeout(&to);
out:
destroy_timer_on_stack(&to.timer);
return i ? i : ret;
} }
/* sys_io_setup: /* sys_io_setup:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册