summaryrefslogtreecommitdiff
path: root/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'job.c')
-rw-r--r--job.c137
1 files changed, 137 insertions, 0 deletions
diff --git a/job.c b/job.c
index c5a37fb8ef..78497fd6f5 100644
--- a/job.c
+++ b/job.c
@@ -60,6 +60,26 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
};
+/* Right now, this mutex is only needed to synchronize accesses to job->busy
+ * and job->sleep_timer, such as concurrent calls to job_do_yield and
+ * job_enter. */
+static QemuMutex job_mutex;
+
+static void job_lock(void)
+{
+ qemu_mutex_lock(&job_mutex);
+}
+
+static void job_unlock(void)
+{
+ qemu_mutex_unlock(&job_mutex);
+}
+
+static void __attribute__((__constructor__)) job_init(void)
+{
+ qemu_mutex_init(&job_mutex);
+}
+
/* TODO Make static once the whole state machine is in job.c */
void job_state_transition(Job *job, JobStatus s1)
{
@@ -101,6 +121,16 @@ bool job_is_cancelled(Job *job)
return job->cancelled;
}
+bool job_started(Job *job)
+{
+ return job->co;
+}
+
+bool job_should_pause(Job *job)
+{
+ return job->pause_count > 0;
+}
+
Job *job_next(Job *job)
{
if (!job) {
@@ -143,6 +173,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
job->id = g_strdup(job_id);
job->refcnt = 1;
job->aio_context = ctx;
+ job->busy = false;
+ job->paused = true;
+ job->pause_count = 1;
job_state_transition(job, JOB_STATUS_CREATED);
@@ -172,6 +205,110 @@ void job_unref(Job *job)
}
}
+void job_enter_cond(Job *job, bool(*fn)(Job *job))
+{
+ if (!job_started(job)) {
+ return;
+ }
+ if (job->deferred_to_main_loop) {
+ return;
+ }
+
+ job_lock();
+ if (job->busy) {
+ job_unlock();
+ return;
+ }
+
+ if (fn && !fn(job)) {
+ job_unlock();
+ return;
+ }
+
+ assert(!job->deferred_to_main_loop);
+ timer_del(&job->sleep_timer);
+ job->busy = true;
+ job_unlock();
+ aio_co_wake(job->co);
+}
+
+/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
+ * Reentering the job coroutine with block_job_enter() before the timer has
+ * expired is allowed and cancels the timer.
+ *
+ * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
+ * called explicitly. */
+void coroutine_fn job_do_yield(Job *job, uint64_t ns)
+{
+ job_lock();
+ if (ns != -1) {
+ timer_mod(&job->sleep_timer, ns);
+ }
+ job->busy = false;
+ job_unlock();
+ qemu_coroutine_yield();
+
+ /* Set by job_enter_cond() before re-entering the coroutine. */
+ assert(job->busy);
+}
+
+void coroutine_fn job_pause_point(Job *job)
+{
+ assert(job && job_started(job));
+
+ if (!job_should_pause(job)) {
+ return;
+ }
+ if (job_is_cancelled(job)) {
+ return;
+ }
+
+ if (job->driver->pause) {
+ job->driver->pause(job);
+ }
+
+ if (job_should_pause(job) && !job_is_cancelled(job)) {
+ JobStatus status = job->status;
+ job_state_transition(job, status == JOB_STATUS_READY
+ ? JOB_STATUS_STANDBY
+ : JOB_STATUS_PAUSED);
+ job->paused = true;
+ job_do_yield(job, -1);
+ job->paused = false;
+ job_state_transition(job, status);
+ }
+
+ if (job->driver->resume) {
+ job->driver->resume(job);
+ }
+}
+
+/**
+ * All jobs must allow a pause point before entering their job proper. This
+ * ensures that jobs can be paused prior to being started, then resumed later.
+ */
+static void coroutine_fn job_co_entry(void *opaque)
+{
+ Job *job = opaque;
+
+ assert(job && job->driver && job->driver->start);
+ job_pause_point(job);
+ job->driver->start(job);
+}
+
+
+void job_start(Job *job)
+{
+ assert(job && !job_started(job) && job->paused &&
+ job->driver && job->driver->start);
+ job->co = qemu_coroutine_create(job_co_entry, job);
+ job->pause_count--;
+ job->busy = true;
+ job->paused = false;
+ job_state_transition(job, JOB_STATUS_RUNNING);
+ aio_co_enter(job->aio_context, job->co);
+}
+
typedef struct {
Job *job;
JobDeferToMainLoopFn *fn;