diff options
author | Paolo Bonzini <pbonzini@redhat.com> | 2017-02-13 14:52:19 +0100 |
---|---|---|
committer | Stefan Hajnoczi <stefanha@redhat.com> | 2017-02-21 11:14:07 +0000 |
commit | 0c330a734b51c177ab8488932ac3b0c4d63a718a (patch) | |
tree | 1251fc380ca5313495d9a9c541460b3ac2ffb7e0 /util | |
parent | c2b38b277a7882a592f4f2ec955084b2b756daaa (diff) |
aio: introduce aio_co_schedule and aio_co_wake
aio_co_wake provides the infrastructure to start a coroutine on a "home"
AioContext. It will be used by CoMutex and CoQueue, so that coroutines
don't jump from one context to another when they go to sleep on a
mutex or waitqueue. However, it can also be used as a more efficient
alternative to one-shot bottom halves, and saves the effort of tracking
which AioContext a coroutine is running on.
aio_co_schedule is the part of aio_co_wake that starts a coroutine
on a remove AioContext, but it is also useful to implement e.g.
bdrv_set_aio_context callbacks.
The implementation of aio_co_schedule is based on a lock-free
multiple-producer, single-consumer queue. The multiple producers use
cmpxchg to add to a LIFO stack. The consumer (a per-AioContext bottom
half) grabs all items added so far, inverts the list to make it FIFO,
and goes through it one item at a time until it's empty. The data
structure was inspired by OSv, which uses it in the very code we'll
"port" to QEMU for the thread-safe CoMutex.
Most of the new code is really tests.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Message-id: 20170213135235.12274-3-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'util')
-rw-r--r-- | util/async.c | 65 | ||||
-rw-r--r-- | util/qemu-coroutine.c | 8 | ||||
-rw-r--r-- | util/trace-events | 4 |
3 files changed, 77 insertions, 0 deletions
diff --git a/util/async.c b/util/async.c index 1fd97e1f15..9cac702c5b 100644 --- a/util/async.c +++ b/util/async.c @@ -31,6 +31,8 @@ #include "qemu/main-loop.h" #include "qemu/atomic.h" #include "block/raw-aio.h" +#include "qemu/coroutine_int.h" +#include "trace.h" /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ @@ -275,6 +277,9 @@ aio_ctx_finalize(GSource *source) } #endif + assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); + qemu_bh_delete(ctx->co_schedule_bh); + qemu_lockcnt_lock(&ctx->list_lock); assert(!qemu_lockcnt_count(&ctx->list_lock)); while (ctx->first_bh) { @@ -364,6 +369,28 @@ static bool event_notifier_poll(void *opaque) return atomic_read(&ctx->notified); } +static void co_schedule_bh_cb(void *opaque) +{ + AioContext *ctx = opaque; + QSLIST_HEAD(, Coroutine) straight, reversed; + + QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines); + QSLIST_INIT(&straight); + + while (!QSLIST_EMPTY(&reversed)) { + Coroutine *co = QSLIST_FIRST(&reversed); + QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next); + QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next); + } + + while (!QSLIST_EMPTY(&straight)) { + Coroutine *co = QSLIST_FIRST(&straight); + QSLIST_REMOVE_HEAD(&straight, co_scheduled_next); + trace_aio_co_schedule_bh_cb(ctx, co); + qemu_coroutine_enter(co); + } +} + AioContext *aio_context_new(Error **errp) { int ret; @@ -379,6 +406,10 @@ AioContext *aio_context_new(Error **errp) } g_source_set_can_recurse(&ctx->source, true); qemu_lockcnt_init(&ctx->list_lock); + + ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx); + QSLIST_INIT(&ctx->scheduled_coroutines); + aio_set_event_notifier(ctx, &ctx->notifier, false, (EventNotifierHandler *) @@ -402,6 +433,40 @@ fail: return NULL; } +void aio_co_schedule(AioContext *ctx, Coroutine *co) +{ + trace_aio_co_schedule(ctx, co); + QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines, + co, co_scheduled_next); + qemu_bh_schedule(ctx->co_schedule_bh); +} + +void aio_co_wake(struct Coroutine *co) +{ + AioContext *ctx; + + /* Read coroutine before co->ctx. Matches smp_wmb in + * qemu_coroutine_enter. + */ + smp_read_barrier_depends(); + ctx = atomic_read(&co->ctx); + + if (ctx != qemu_get_current_aio_context()) { + aio_co_schedule(ctx, co); + return; + } + + if (qemu_in_coroutine()) { + Coroutine *self = qemu_coroutine_self(); + assert(self != co); + QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next); + } else { + aio_context_acquire(ctx); + qemu_coroutine_enter(co); + aio_context_release(ctx); + } +} + void aio_context_ref(AioContext *ctx) { g_source_ref(&ctx->source); diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index a5d2f6c0c3..415600dc30 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -19,6 +19,7 @@ #include "qemu/atomic.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "block/aio.h" enum { POOL_BATCH_SIZE = 64, @@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co) } co->caller = self; + co->ctx = qemu_get_current_aio_context(); + + /* Store co->ctx before anything that stores co. Matches + * barrier in aio_co_wake. + */ + smp_wmb(); + ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); qemu_co_queue_run_restart(co); diff --git a/util/trace-events b/util/trace-events index 1fa12f0491..53bd70c4cd 100644 --- a/util/trace-events +++ b/util/trace-events @@ -6,6 +6,10 @@ run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 +# util/async.c +aio_co_schedule(void *ctx, void *co) "ctx %p co %p" +aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p" + # util/thread-pool.c thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" |