summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/iomap/direct-io.c163
-rw-r--r--include/linux/fs.h35
-rw-r--r--io_uring/rw.c27
3 files changed, 180 insertions, 45 deletions
diff --git a/fs/iomap/direct-io.c b/fs/iomap/direct-io.c
index ea3b868c8355..bcd3f8cf5ea4 100644
--- a/fs/iomap/direct-io.c
+++ b/fs/iomap/direct-io.c
@@ -20,10 +20,12 @@
* Private flags for iomap_dio, must not overlap with the public ones in
* iomap.h:
*/
-#define IOMAP_DIO_WRITE_FUA (1 << 28)
-#define IOMAP_DIO_NEED_SYNC (1 << 29)
-#define IOMAP_DIO_WRITE (1 << 30)
-#define IOMAP_DIO_DIRTY (1 << 31)
+#define IOMAP_DIO_CALLER_COMP (1U << 26)
+#define IOMAP_DIO_INLINE_COMP (1U << 27)
+#define IOMAP_DIO_WRITE_THROUGH (1U << 28)
+#define IOMAP_DIO_NEED_SYNC (1U << 29)
+#define IOMAP_DIO_WRITE (1U << 30)
+#define IOMAP_DIO_DIRTY (1U << 31)
struct iomap_dio {
struct kiocb *iocb;
@@ -41,7 +43,6 @@ struct iomap_dio {
struct {
struct iov_iter *iter;
struct task_struct *waiter;
- struct bio *poll_bio;
} submit;
/* used for aio completion: */
@@ -63,12 +64,14 @@ static struct bio *iomap_dio_alloc_bio(const struct iomap_iter *iter,
static void iomap_dio_submit_bio(const struct iomap_iter *iter,
struct iomap_dio *dio, struct bio *bio, loff_t pos)
{
+ struct kiocb *iocb = dio->iocb;
+
atomic_inc(&dio->ref);
/* Sync dio can't be polled reliably */
- if ((dio->iocb->ki_flags & IOCB_HIPRI) && !is_sync_kiocb(dio->iocb)) {
- bio_set_polled(bio, dio->iocb);
- dio->submit.poll_bio = bio;
+ if ((iocb->ki_flags & IOCB_HIPRI) && !is_sync_kiocb(iocb)) {
+ bio_set_polled(bio, iocb);
+ WRITE_ONCE(iocb->private, bio);
}
if (dio->dops && dio->dops->submit_io)
@@ -130,6 +133,11 @@ ssize_t iomap_dio_complete(struct iomap_dio *dio)
}
EXPORT_SYMBOL_GPL(iomap_dio_complete);
+static ssize_t iomap_dio_deferred_complete(void *data)
+{
+ return iomap_dio_complete(data);
+}
+
static void iomap_dio_complete_work(struct work_struct *work)
{
struct iomap_dio *dio = container_of(work, struct iomap_dio, aio.work);
@@ -152,27 +160,69 @@ void iomap_dio_bio_end_io(struct bio *bio)
{
struct iomap_dio *dio = bio->bi_private;
bool should_dirty = (dio->flags & IOMAP_DIO_DIRTY);
+ struct kiocb *iocb = dio->iocb;
if (bio->bi_status)
iomap_dio_set_error(dio, blk_status_to_errno(bio->bi_status));
+ if (!atomic_dec_and_test(&dio->ref))
+ goto release_bio;
- if (atomic_dec_and_test(&dio->ref)) {
- if (dio->wait_for_completion) {
- struct task_struct *waiter = dio->submit.waiter;
- WRITE_ONCE(dio->submit.waiter, NULL);
- blk_wake_io_task(waiter);
- } else if (dio->flags & IOMAP_DIO_WRITE) {
- struct inode *inode = file_inode(dio->iocb->ki_filp);
-
- WRITE_ONCE(dio->iocb->private, NULL);
- INIT_WORK(&dio->aio.work, iomap_dio_complete_work);
- queue_work(inode->i_sb->s_dio_done_wq, &dio->aio.work);
- } else {
- WRITE_ONCE(dio->iocb->private, NULL);
- iomap_dio_complete_work(&dio->aio.work);
- }
+ /*
+ * Synchronous dio, task itself will handle any completion work
+ * that needs after IO. All we need to do is wake the task.
+ */
+ if (dio->wait_for_completion) {
+ struct task_struct *waiter = dio->submit.waiter;
+
+ WRITE_ONCE(dio->submit.waiter, NULL);
+ blk_wake_io_task(waiter);
+ goto release_bio;
+ }
+
+ /*
+ * Flagged with IOMAP_DIO_INLINE_COMP, we can complete it inline
+ */
+ if (dio->flags & IOMAP_DIO_INLINE_COMP) {
+ WRITE_ONCE(iocb->private, NULL);
+ iomap_dio_complete_work(&dio->aio.work);
+ goto release_bio;
+ }
+
+ /*
+ * If this dio is flagged with IOMAP_DIO_CALLER_COMP, then schedule
+ * our completion that way to avoid an async punt to a workqueue.
+ */
+ if (dio->flags & IOMAP_DIO_CALLER_COMP) {
+ /* only polled IO cares about private cleared */
+ iocb->private = dio;
+ iocb->dio_complete = iomap_dio_deferred_complete;
+
+ /*
+ * Invoke ->ki_complete() directly. We've assigned our
+ * dio_complete callback handler, and since the issuer set
+ * IOCB_DIO_CALLER_COMP, we know their ki_complete handler will
+ * notice ->dio_complete being set and will defer calling that
+ * handler until it can be done from a safe task context.
+ *
+ * Note that the 'res' being passed in here is not important
+ * for this case. The actual completion value of the request
+ * will be gotten from dio_complete when that is run by the
+ * issuer.
+ */
+ iocb->ki_complete(iocb, 0);
+ goto release_bio;
}
+ /*
+ * Async DIO completion that requires filesystem level completion work
+ * gets punted to a work queue to complete as the operation may require
+ * more IO to be issued to finalise filesystem metadata changes or
+ * guarantee data integrity.
+ */
+ INIT_WORK(&dio->aio.work, iomap_dio_complete_work);
+ queue_work(file_inode(iocb->ki_filp)->i_sb->s_dio_done_wq,
+ &dio->aio.work);
+release_bio:
if (should_dirty) {
bio_check_pages_dirty(bio);
} else {
@@ -203,7 +253,7 @@ static void iomap_dio_zero(const struct iomap_iter *iter, struct iomap_dio *dio,
/*
* Figure out the bio's operation flags from the dio request, the
* mapping, and whether or not we want FUA. Note that we can end up
- * clearing the WRITE_FUA flag in the dio request.
+ * clearing the WRITE_THROUGH flag in the dio request.
*/
static inline blk_opf_t iomap_dio_bio_opflags(struct iomap_dio *dio,
const struct iomap *iomap, bool use_fua)
@@ -217,7 +267,7 @@ static inline blk_opf_t iomap_dio_bio_opflags(struct iomap_dio *dio,
if (use_fua)
opflags |= REQ_FUA;
else
- dio->flags &= ~IOMAP_DIO_WRITE_FUA;
+ dio->flags &= ~IOMAP_DIO_WRITE_THROUGH;
return opflags;
}
@@ -257,12 +307,19 @@ static loff_t iomap_dio_bio_iter(const struct iomap_iter *iter,
* Use a FUA write if we need datasync semantics, this is a pure
* data IO that doesn't require any metadata updates (including
* after IO completion such as unwritten extent conversion) and
- * the underlying device supports FUA. This allows us to avoid
- * cache flushes on IO completion.
+ * the underlying device either supports FUA or doesn't have
+ * a volatile write cache. This allows us to avoid cache flushes
+ * on IO completion. If we can't use writethrough and need to
+ * sync, disable in-task completions as dio completion will
+ * need to call generic_write_sync() which will do a blocking
+ * fsync / cache flush call.
*/
if (!(iomap->flags & (IOMAP_F_SHARED|IOMAP_F_DIRTY)) &&
- (dio->flags & IOMAP_DIO_WRITE_FUA) && bdev_fua(iomap->bdev))
+ (dio->flags & IOMAP_DIO_WRITE_THROUGH) &&
+ (bdev_fua(iomap->bdev) || !bdev_write_cache(iomap->bdev)))
use_fua = true;
+ else if (dio->flags & IOMAP_DIO_NEED_SYNC)
+ dio->flags &= ~IOMAP_DIO_CALLER_COMP;
}
/*
@@ -277,10 +334,23 @@ static loff_t iomap_dio_bio_iter(const struct iomap_iter *iter,
goto out;
/*
- * We can only poll for single bio I/Os.
+ * We can only do deferred completion for pure overwrites that
+ * don't require additional IO at completion. This rules out
+ * writes that need zeroing or extent conversion, extend
+ * the file size, or issue journal IO or cache flushes
+ * during completion processing.
*/
if (need_zeroout ||
+ ((dio->flags & IOMAP_DIO_NEED_SYNC) && !use_fua) ||
((dio->flags & IOMAP_DIO_WRITE) && pos >= i_size_read(inode)))
+ dio->flags &= ~IOMAP_DIO_CALLER_COMP;
+
+ /*
+ * The rules for polled IO completions follow the guidelines as the
+ * ones we set for inline and deferred completions. If none of those
+ * are available for this IO, clear the polled flag.
+ */
+ if (!(dio->flags & (IOMAP_DIO_INLINE_COMP|IOMAP_DIO_CALLER_COMP)))
dio->iocb->ki_flags &= ~IOCB_HIPRI;
if (need_zeroout) {
@@ -505,12 +575,14 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
dio->submit.iter = iter;
dio->submit.waiter = current;
- dio->submit.poll_bio = NULL;
if (iocb->ki_flags & IOCB_NOWAIT)
iomi.flags |= IOMAP_NOWAIT;
if (iov_iter_rw(iter) == READ) {
+ /* reads can always complete inline */
+ dio->flags |= IOMAP_DIO_INLINE_COMP;
+
if (iomi.pos >= dio->i_size)
goto out_free_dio;
@@ -524,6 +596,15 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
iomi.flags |= IOMAP_WRITE;
dio->flags |= IOMAP_DIO_WRITE;
+ /*
+ * Flag as supporting deferred completions, if the issuer
+ * groks it. This can avoid a workqueue punt for writes.
+ * We may later clear this flag if we need to do other IO
+ * as part of this IO completion.
+ */
+ if (iocb->ki_flags & IOCB_DIO_CALLER_COMP)
+ dio->flags |= IOMAP_DIO_CALLER_COMP;
+
if (dio_flags & IOMAP_DIO_OVERWRITE_ONLY) {
ret = -EAGAIN;
if (iomi.pos >= dio->i_size ||
@@ -537,13 +618,16 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
dio->flags |= IOMAP_DIO_NEED_SYNC;
/*
- * For datasync only writes, we optimistically try
- * using FUA for this IO. Any non-FUA write that
- * occurs will clear this flag, hence we know before
- * completion whether a cache flush is necessary.
+ * For datasync only writes, we optimistically try using
+ * WRITE_THROUGH for this IO. This flag requires either
+ * FUA writes through the device's write cache, or a
+ * normal write to a device without a volatile write
+ * cache. For the former, Any non-FUA write that occurs
+ * will clear this flag, hence we know before completion
+ * whether a cache flush is necessary.
*/
if (!(iocb->ki_flags & IOCB_SYNC))
- dio->flags |= IOMAP_DIO_WRITE_FUA;
+ dio->flags |= IOMAP_DIO_WRITE_THROUGH;
}
/*
@@ -605,14 +689,13 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
iomap_dio_set_error(dio, ret);
/*
- * If all the writes we issued were FUA, we don't need to flush the
- * cache on IO completion. Clear the sync flag for this case.
+ * If all the writes we issued were already written through to the
+ * media, we don't need to flush the cache on IO completion. Clear the
+ * sync flag for this case.
*/
- if (dio->flags & IOMAP_DIO_WRITE_FUA)
+ if (dio->flags & IOMAP_DIO_WRITE_THROUGH)
dio->flags &= ~IOMAP_DIO_NEED_SYNC;
- WRITE_ONCE(iocb->private, dio->submit.poll_bio);
-
/*
* We are about to drop our additional submission reference, which
* might be the last reference to the dio. There are three different
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 6867512907d6..1e6dbe309d52 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -338,6 +338,20 @@ enum rw_hint {
#define IOCB_NOIO (1 << 20)
/* can use bio alloc cache */
#define IOCB_ALLOC_CACHE (1 << 21)
+/*
+ * IOCB_DIO_CALLER_COMP can be set by the iocb owner, to indicate that the
+ * iocb completion can be passed back to the owner for execution from a safe
+ * context rather than needing to be punted through a workqueue. If this
+ * flag is set, the bio completion handling may set iocb->dio_complete to a
+ * handler function and iocb->private to context information for that handler.
+ * The issuer should call the handler with that context information from task
+ * context to complete the processing of the iocb. Note that while this
+ * provides a task context for the dio_complete() callback, it should only be
+ * used on the completion side for non-IO generating completions. It's fine to
+ * call blocking functions from this callback, but they should not wait for
+ * unrelated IO (like cache flushing, new IO generation, etc).
+ */
+#define IOCB_DIO_CALLER_COMP (1 << 22)
/* for use in trace events */
#define TRACE_IOCB_STRINGS \
@@ -351,7 +365,8 @@ enum rw_hint {
{ IOCB_WRITE, "WRITE" }, \
{ IOCB_WAITQ, "WAITQ" }, \
{ IOCB_NOIO, "NOIO" }, \
- { IOCB_ALLOC_CACHE, "ALLOC_CACHE" }
+ { IOCB_ALLOC_CACHE, "ALLOC_CACHE" }, \
+ { IOCB_DIO_CALLER_COMP, "CALLER_COMP" }
struct kiocb {
struct file *ki_filp;
@@ -360,7 +375,23 @@ struct kiocb {
void *private;
int ki_flags;
u16 ki_ioprio; /* See linux/ioprio.h */
- struct wait_page_queue *ki_waitq; /* for async buffered IO */
+ union {
+ /*
+ * Only used for async buffered reads, where it denotes the
+ * page waitqueue associated with completing the read. Valid
+ * IFF IOCB_WAITQ is set.
+ */
+ struct wait_page_queue *ki_waitq;
+ /*
+ * Can be used for O_DIRECT IO, where the completion handling
+ * is punted back to the issuer of the IO. May only be set
+ * if IOCB_DIO_CALLER_COMP is set by the issuer, and the issuer
+ * must then check for presence of this handler when ki_complete
+ * is invoked. The data passed in to this handler must be
+ * assigned to ->private when dio_complete is assigned.
+ */
+ ssize_t (*dio_complete)(void *data);
+ };
};
static inline bool is_sync_kiocb(struct kiocb *kiocb)
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 1bce2208b65c..e2166a9a7eaf 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -105,6 +105,7 @@ int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
} else {
rw->kiocb.ki_ioprio = get_current_ioprio();
}
+ rw->kiocb.dio_complete = NULL;
rw->addr = READ_ONCE(sqe->addr);
rw->len = READ_ONCE(sqe->len);
@@ -285,6 +286,15 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
+ struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
+ struct kiocb *kiocb = &rw->kiocb;
+
+ if ((kiocb->ki_flags & IOCB_DIO_CALLER_COMP) && kiocb->dio_complete) {
+ long res = kiocb->dio_complete(rw->kiocb.private);
+
+ io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+ }
+
io_req_io_end(req);
if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
@@ -300,9 +310,11 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
struct io_kiocb *req = cmd_to_io_kiocb(rw);
- if (__io_complete_rw_common(req, res))
- return;
- io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+ if (!kiocb->dio_complete || !(kiocb->ki_flags & IOCB_DIO_CALLER_COMP)) {
+ if (__io_complete_rw_common(req, res))
+ return;
+ io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+ }
req->io_task_work.func = io_req_rw_complete;
__io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
}
@@ -916,6 +928,15 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
}
kiocb->ki_flags |= IOCB_WRITE;
+ /*
+ * For non-polled IO, set IOCB_DIO_CALLER_COMP, stating that our handler
+ * groks deferring the completion to task context. This isn't
+ * necessary and useful for polled IO as that can always complete
+ * directly.
+ */
+ if (!(kiocb->ki_flags & IOCB_HIPRI))
+ kiocb->ki_flags |= IOCB_DIO_CALLER_COMP;
+
if (likely(req->file->f_op->write_iter))
ret2 = call_write_iter(req->file, kiocb, &s->iter);
else if (req->file->f_op->write)