summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/io_uring.c281
1 files changed, 229 insertions, 52 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 85b914bd823c..5d99376d2369 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -94,6 +94,16 @@ struct io_mapped_ubuf {
unsigned int nr_bvecs;
};
+struct async_list {
+ spinlock_t lock;
+ atomic_t cnt;
+ struct list_head list;
+
+ struct file *file;
+ off_t io_end;
+ size_t io_pages;
+};
+
struct io_ring_ctx {
struct {
struct percpu_ref refs;
@@ -164,6 +174,8 @@ struct io_ring_ctx {
struct list_head cancel_list;
} ____cacheline_aligned_in_smp;
+ struct async_list pending_async[2];
+
#if defined(CONFIG_UNIX)
struct socket *ring_sock;
#endif
@@ -201,6 +213,7 @@ struct io_kiocb {
#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */
#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
#define REQ_F_FIXED_FILE 4 /* ctx owns file */
+#define REQ_F_SEQ_PREV 8 /* sequential with previous */
u64 user_data;
u64 error;
@@ -257,6 +270,7 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref)
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
struct io_ring_ctx *ctx;
+ int i;
ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
if (!ctx)
@@ -272,6 +286,11 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_completion(&ctx->ctx_done);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->wait);
+ for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
+ spin_lock_init(&ctx->pending_async[i].lock);
+ INIT_LIST_HEAD(&ctx->pending_async[i].list);
+ atomic_set(&ctx->pending_async[i].cnt, 0);
+ }
spin_lock_init(&ctx->completion_lock);
INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list);
@@ -885,6 +904,47 @@ static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
}
+/*
+ * Make a note of the last file/offset/direction we punted to async
+ * context. We'll use this information to see if we can piggy back a
+ * sequential request onto the previous one, if it's still hasn't been
+ * completed by the async worker.
+ */
+static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
+{
+ struct async_list *async_list = &req->ctx->pending_async[rw];
+ struct kiocb *kiocb = &req->rw;
+ struct file *filp = kiocb->ki_filp;
+ off_t io_end = kiocb->ki_pos + len;
+
+ if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+ unsigned long max_pages;
+
+ /* Use 8x RA size as a decent limiter for both reads/writes */
+ max_pages = filp->f_ra.ra_pages;
+ if (!max_pages)
+ max_pages = VM_MAX_READAHEAD >> (PAGE_SHIFT - 10);
+ max_pages *= 8;
+
+ /* If max pages are exceeded, reset the state */
+ len >>= PAGE_SHIFT;
+ if (async_list->io_pages + len <= max_pages) {
+ req->flags |= REQ_F_SEQ_PREV;
+ async_list->io_pages += len;
+ } else {
+ io_end = 0;
+ async_list->io_pages = 0;
+ }
+ }
+
+ /* New file? Reset state. */
+ if (async_list->file != filp) {
+ async_list->io_pages = 0;
+ async_list->file = filp;
+ }
+ async_list->io_end = io_end;
+}
+
static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
bool force_nonblock, struct io_submit_state *state)
{
@@ -892,6 +952,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
struct kiocb *kiocb = &req->rw;
struct iov_iter iter;
struct file *file;
+ size_t iov_count;
ssize_t ret;
ret = io_prep_rw(req, s, force_nonblock, state);
@@ -910,16 +971,24 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
if (ret)
goto out_fput;
- ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_iter_count(&iter));
+ iov_count = iov_iter_count(&iter);
+ ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
if (!ret) {
ssize_t ret2;
/* Catch -EAGAIN return for forced non-blocking submission */
ret2 = call_read_iter(file, kiocb, &iter);
- if (!force_nonblock || ret2 != -EAGAIN)
+ if (!force_nonblock || ret2 != -EAGAIN) {
io_rw_done(kiocb, ret2);
- else
+ } else {
+ /*
+ * If ->needs_lock is true, we're already in async
+ * context.
+ */
+ if (!s->needs_lock)
+ io_async_list_note(READ, req, iov_count);
ret = -EAGAIN;
+ }
}
kfree(iovec);
out_fput:
@@ -936,14 +1005,12 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
struct kiocb *kiocb = &req->rw;
struct iov_iter iter;
struct file *file;
+ size_t iov_count;
ssize_t ret;
ret = io_prep_rw(req, s, force_nonblock, state);
if (ret)
return ret;
- /* Hold on to the file for -EAGAIN */
- if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
- return -EAGAIN;
ret = -EBADF;
file = kiocb->ki_filp;
@@ -957,8 +1024,17 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
if (ret)
goto out_fput;
- ret = rw_verify_area(WRITE, file, &kiocb->ki_pos,
- iov_iter_count(&iter));
+ iov_count = iov_iter_count(&iter);
+
+ ret = -EAGAIN;
+ if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
+ /* If ->needs_lock is true, we're already in async context. */
+ if (!s->needs_lock)
+ io_async_list_note(WRITE, req, iov_count);
+ goto out_free;
+ }
+
+ ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
if (!ret) {
/*
* Open-code file_start_write here to grab freeze protection,
@@ -976,9 +1052,11 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
kiocb->ki_flags |= IOCB_WRITE;
io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
}
+out_free:
kfree(iovec);
out_fput:
- if (unlikely(ret))
+ /* Hold on to the file for -EAGAIN */
+ if (unlikely(ret && ret != -EAGAIN))
io_fput(req);
return ret;
}
@@ -1376,6 +1454,21 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
return 0;
}
+static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
+ const struct io_uring_sqe *sqe)
+{
+ switch (sqe->opcode) {
+ case IORING_OP_READV:
+ case IORING_OP_READ_FIXED:
+ return &ctx->pending_async[READ];
+ case IORING_OP_WRITEV:
+ case IORING_OP_WRITE_FIXED:
+ return &ctx->pending_async[WRITE];
+ default:
+ return NULL;
+ }
+}
+
static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
{
u8 opcode = READ_ONCE(sqe->opcode);
@@ -1387,61 +1480,138 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
static void io_sq_wq_submit_work(struct work_struct *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
- struct sqe_submit *s = &req->submit;
- const struct io_uring_sqe *sqe = s->sqe;
struct io_ring_ctx *ctx = req->ctx;
+ struct mm_struct *cur_mm = NULL;
+ struct async_list *async_list;
+ LIST_HEAD(req_list);
mm_segment_t old_fs;
- bool needs_user;
int ret;
- /* Ensure we clear previously set forced non-block flag */
- req->flags &= ~REQ_F_FORCE_NONBLOCK;
- req->rw.ki_flags &= ~IOCB_NOWAIT;
+ async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
+restart:
+ do {
+ struct sqe_submit *s = &req->submit;
+ const struct io_uring_sqe *sqe = s->sqe;
+
+ /* Ensure we clear previously set forced non-block flag */
+ req->flags &= ~REQ_F_FORCE_NONBLOCK;
+ req->rw.ki_flags &= ~IOCB_NOWAIT;
+
+ ret = 0;
+ if (io_sqe_needs_user(sqe) && !cur_mm) {
+ if (!mmget_not_zero(ctx->sqo_mm)) {
+ ret = -EFAULT;
+ } else {
+ cur_mm = ctx->sqo_mm;
+ use_mm(cur_mm);
+ old_fs = get_fs();
+ set_fs(USER_DS);
+ }
+ }
+
+ if (!ret) {
+ s->has_user = cur_mm != NULL;
+ s->needs_lock = true;
+ do {
+ ret = __io_submit_sqe(ctx, req, s, false, NULL);
+ /*
+ * We can get EAGAIN for polled IO even though
+ * we're forcing a sync submission from here,
+ * since we can't wait for request slots on the
+ * block side.
+ */
+ if (ret != -EAGAIN)
+ break;
+ cond_resched();
+ } while (1);
+ }
+ if (ret) {
+ io_cqring_add_event(ctx, sqe->user_data, ret, 0);
+ io_free_req(req);
+ }
- s->needs_lock = true;
- s->has_user = false;
+ /* async context always use a copy of the sqe */
+ kfree(sqe);
+
+ if (!async_list)
+ break;
+ if (!list_empty(&req_list)) {
+ req = list_first_entry(&req_list, struct io_kiocb,
+ list);
+ list_del(&req->list);
+ continue;
+ }
+ if (list_empty(&async_list->list))
+ break;
+
+ req = NULL;
+ spin_lock(&async_list->lock);
+ if (list_empty(&async_list->list)) {
+ spin_unlock(&async_list->lock);
+ break;
+ }
+ list_splice_init(&async_list->list, &req_list);
+ spin_unlock(&async_list->lock);
+
+ req = list_first_entry(&req_list, struct io_kiocb, list);
+ list_del(&req->list);
+ } while (req);
/*
- * If we're doing IO to fixed buffers, we don't need to get/set
- * user context
+ * Rare case of racing with a submitter. If we find the count has
+ * dropped to zero AND we have pending work items, then restart
+ * the processing. This is a tiny race window.
*/
- needs_user = io_sqe_needs_user(s->sqe);
- if (needs_user) {
- if (!mmget_not_zero(ctx->sqo_mm)) {
- ret = -EFAULT;
- goto err;
+ if (async_list) {
+ ret = atomic_dec_return(&async_list->cnt);
+ while (!ret && !list_empty(&async_list->list)) {
+ spin_lock(&async_list->lock);
+ atomic_inc(&async_list->cnt);
+ list_splice_init(&async_list->list, &req_list);
+ spin_unlock(&async_list->lock);
+
+ if (!list_empty(&req_list)) {
+ req = list_first_entry(&req_list,
+ struct io_kiocb, list);
+ list_del(&req->list);
+ goto restart;
+ }
+ ret = atomic_dec_return(&async_list->cnt);
}
- use_mm(ctx->sqo_mm);
- old_fs = get_fs();
- set_fs(USER_DS);
- s->has_user = true;
}
- do {
- ret = __io_submit_sqe(ctx, req, s, false, NULL);
- /*
- * We can get EAGAIN for polled IO even though we're forcing
- * a sync submission from here, since we can't wait for
- * request slots on the block side.
- */
- if (ret != -EAGAIN)
- break;
- cond_resched();
- } while (1);
-
- if (needs_user) {
+ if (cur_mm) {
set_fs(old_fs);
- unuse_mm(ctx->sqo_mm);
- mmput(ctx->sqo_mm);
- }
-err:
- if (ret) {
- io_cqring_add_event(ctx, sqe->user_data, ret, 0);
- io_free_req(req);
+ unuse_mm(cur_mm);
+ mmput(cur_mm);
}
+}
- /* async context always use a copy of the sqe */
- kfree(sqe);
+/*
+ * See if we can piggy back onto previously submitted work, that is still
+ * running. We currently only allow this if the new request is sequential
+ * to the previous one we punted.
+ */
+static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
+{
+ bool ret = false;
+
+ if (!list)
+ return false;
+ if (!(req->flags & REQ_F_SEQ_PREV))
+ return false;
+ if (!atomic_read(&list->cnt))
+ return false;
+
+ ret = true;
+ spin_lock(&list->lock);
+ list_add_tail(&req->list, &list->list);
+ if (!atomic_read(&list->cnt)) {
+ list_del_init(&req->list);
+ ret = false;
+ }
+ spin_unlock(&list->lock);
+ return ret;
}
static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
@@ -1466,12 +1636,19 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
if (sqe_copy) {
+ struct async_list *list;
+
memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
s->sqe = sqe_copy;
memcpy(&req->submit, s, sizeof(*s));
- INIT_WORK(&req->work, io_sq_wq_submit_work);
- queue_work(ctx->sqo_wq, &req->work);
+ list = io_async_list_from_sqe(ctx, s->sqe);
+ if (!io_add_to_prev_work(list, req)) {
+ if (list)
+ atomic_inc(&list->cnt);
+ INIT_WORK(&req->work, io_sq_wq_submit_work);
+ queue_work(ctx->sqo_wq, &req->work);
+ }
ret = 0;
}
}