/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* Copyright (C) 2013 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see . */ #include "config.h" #include #include "vmcstream.h" #include "spice-channel-priv.h" #include "gio-coroutine.h" struct _SpiceVmcInputStream { GInputStream parent_instance; GTask *task; struct coroutine *coroutine; SpiceChannel *channel; gboolean all; guint8 *buffer; gsize count; gsize pos; gulong cancel_id; }; struct _SpiceVmcInputStreamClass { GInputStreamClass parent_class; }; static gssize spice_vmc_input_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error); static void spice_vmc_input_stream_read_async (GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data); static gssize spice_vmc_input_stream_read_finish (GInputStream *stream, GAsyncResult *result, GError **error); static gssize spice_vmc_input_stream_skip (GInputStream *stream, gsize count, GCancellable *cancellable, GError **error); static gboolean spice_vmc_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); G_DEFINE_TYPE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM) static void spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass) { GInputStreamClass *istream_class; istream_class = G_INPUT_STREAM_CLASS(klass); istream_class->read_fn = spice_vmc_input_stream_read; istream_class->read_async = spice_vmc_input_stream_read_async; istream_class->read_finish = spice_vmc_input_stream_read_finish; istream_class->skip = spice_vmc_input_stream_skip; istream_class->close_fn = spice_vmc_input_stream_close; } static void spice_vmc_input_stream_init(SpiceVmcInputStream *self) { } static SpiceVmcInputStream * spice_vmc_input_stream_new(void) { SpiceVmcInputStream *self; self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL); return self; } typedef struct _complete_in_idle_cb_data { GTask *task; gssize pos; } complete_in_idle_cb_data; static gboolean complete_in_idle_cb(gpointer user_data) { complete_in_idle_cb_data *data = user_data; g_task_return_int(data->task, data->pos); g_object_unref (data->task); g_free (data); return FALSE; } /* coroutine */ /* * Feed a SpiceVmc stream with new data from a coroutine * * The other end will be waiting on read_async() until data is fed * here. */ G_GNUC_INTERNAL void spice_vmc_input_stream_co_data(SpiceVmcInputStream *self, const gpointer d, gsize size) { guint8 *data = d; g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self)); g_return_if_fail(self->coroutine == NULL); self->coroutine = coroutine_self(); while (size > 0) { complete_in_idle_cb_data *cb_data; SPICE_DEBUG("spicevmc co_data %p", self->task); if (!self->task) coroutine_yield(NULL); g_return_if_fail(self->task != NULL); gsize min = MIN(self->count, size); memcpy(self->buffer, data, min); size -= min; data += min; SPICE_DEBUG("spicevmc co_data complete: %" G_GSIZE_FORMAT "/%" G_GSIZE_FORMAT, min, self->count); self->pos += min; self->buffer += min; if (self->all && min > 0 && self->pos != self->count) continue; /* Let's deal with the task complete in idle by ourselves, as GTask * heuristic only makes sense in a non-coroutine case. */ cb_data = g_new(complete_in_idle_cb_data , 1); cb_data->task = g_object_ref(self->task); cb_data->pos = self->pos; g_idle_add(complete_in_idle_cb, cb_data); g_clear_object(&self->task); } self->coroutine = NULL; } static void read_cancelled(GCancellable *cancellable, gpointer user_data) { SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data); SPICE_DEBUG("read cancelled, %p", self->task); g_task_return_new_error(self->task, G_IO_ERROR, G_IO_ERROR_CANCELLED, "read cancelled"); /* With GTask, we don't need to disconnect GCancellable when task is * cancelled within cancellable callback as it could lead to deadlocks * e.g: https://bugzilla.gnome.org/show_bug.cgi?id=705395 */ g_clear_object(&self->task); } G_GNUC_INTERNAL void spice_vmc_input_stream_read_all_async(GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); GTask *task; /* no concurrent read permitted by ginputstream */ g_return_if_fail(self->task == NULL); self->all = TRUE; self->buffer = buffer; self->count = count; self->pos = 0; task = g_task_new(self, cancellable, callback, user_data); self->task = task; if (cancellable) self->cancel_id = g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); if (self->coroutine) coroutine_yieldto(self->coroutine, NULL); } G_GNUC_INTERNAL gssize spice_vmc_input_stream_read_all_finish(GInputStream *stream, GAsyncResult *result, GError **error) { GTask *task = G_TASK(result); SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); GCancellable *cancel; g_return_val_if_fail(g_task_is_valid(task, self), -1); cancel = g_task_get_cancellable(task); if (!g_cancellable_is_cancelled(cancel)) { g_cancellable_disconnect(cancel, self->cancel_id); self->cancel_id = 0; } return g_task_propagate_int(task, error); } static void spice_vmc_input_stream_read_async(GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); GTask *task; /* no concurrent read permitted by ginputstream */ g_return_if_fail(self->task == NULL); self->all = FALSE; self->buffer = buffer; self->count = count; self->pos = 0; task = g_task_new(self, cancellable, callback, user_data); self->task = task; if (cancellable) self->cancel_id = g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); if (self->coroutine) coroutine_yieldto(self->coroutine, NULL); } static gssize spice_vmc_input_stream_read_finish(GInputStream *stream, GAsyncResult *result, GError **error) { GTask *task = G_TASK(result); SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); GCancellable *cancel; g_return_val_if_fail(g_task_is_valid(task, self), -1); cancel = g_task_get_cancellable(task); if (!g_cancellable_is_cancelled(cancel)) { g_cancellable_disconnect(cancel, self->cancel_id); self->cancel_id = 0; } return g_task_propagate_int(task, error); } static gssize spice_vmc_input_stream_read(GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { g_return_val_if_reached(-1); } static gssize spice_vmc_input_stream_skip(GInputStream *stream, gsize count, GCancellable *cancellable, GError **error) { g_return_val_if_reached(-1); } static gboolean spice_vmc_input_stream_close(GInputStream *stream, GCancellable *cancellable, GError **error) { SPICE_DEBUG("fake close"); return TRUE; } /* OUTPUT */ struct _SpiceVmcOutputStream { GOutputStream parent_instance; SpiceChannel *channel; /* weak */ }; struct _SpiceVmcOutputStreamClass { GOutputStreamClass parent_class; }; static gssize spice_vmc_output_stream_write_fn (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error); static gssize spice_vmc_output_stream_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error); static void spice_vmc_output_stream_write_async (GOutputStream *stream, const void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data); G_DEFINE_TYPE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM) static void spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass) { GOutputStreamClass *ostream_class; ostream_class = G_OUTPUT_STREAM_CLASS(klass); ostream_class->write_fn = spice_vmc_output_stream_write_fn; ostream_class->write_async = spice_vmc_output_stream_write_async; ostream_class->write_finish = spice_vmc_output_stream_write_finish; } static void spice_vmc_output_stream_init(SpiceVmcOutputStream *self) { } static SpiceVmcOutputStream * spice_vmc_output_stream_new(SpiceChannel *channel) { SpiceVmcOutputStream *self; self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL); self->channel = channel; return self; } static gssize spice_vmc_output_stream_write_fn(GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error) { SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); SpiceMsgOut *msg_out; msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel), SPICE_MSGC_SPICEVMC_DATA); spice_marshaller_add(msg_out->marshaller, buffer, count); spice_msg_out_send(msg_out); return count; } static gssize spice_vmc_output_stream_write_finish(GOutputStream *stream, GAsyncResult *simple, GError **error) { SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); GAsyncResult *res = g_task_propagate_pointer(G_TASK(simple), error); gssize bytes_written; SPICE_DEBUG("spicevmc write finish"); bytes_written = spice_vmc_write_finish(self->channel, res, error); g_object_unref(res); return bytes_written; } static void write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) { GTask *task = user_data; g_task_return_pointer(task, g_object_ref(res), g_object_unref); g_object_unref(task); } static void spice_vmc_output_stream_write_async(GOutputStream *stream, const void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); GTask *task; SPICE_DEBUG("spicevmc write async"); /* an AsyncResult to forward async op to channel */ task = g_task_new(self, cancellable, callback, user_data); spice_vmc_write_async(self->channel, buffer, count, cancellable, write_cb, task); } /* STREAM */ struct _SpiceVmcStream { GIOStream parent_instance; SpiceChannel *channel; /* weak */ SpiceVmcInputStream *in; SpiceVmcOutputStream *out; }; struct _SpiceVmcStreamClass { GIOStreamClass parent_class; }; static void spice_vmc_stream_finalize (GObject *object); static GInputStream * spice_vmc_stream_get_input_stream (GIOStream *stream); static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream); G_DEFINE_TYPE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM) static void spice_vmc_stream_class_init(SpiceVmcStreamClass *klass) { GObjectClass *object_class; GIOStreamClass *iostream_class; object_class = G_OBJECT_CLASS(klass); object_class->finalize = spice_vmc_stream_finalize; iostream_class = G_IO_STREAM_CLASS(klass); iostream_class->get_input_stream = spice_vmc_stream_get_input_stream; iostream_class->get_output_stream = spice_vmc_stream_get_output_stream; } static void spice_vmc_stream_finalize(GObject *object) { SpiceVmcStream *self = SPICE_VMC_STREAM(object); g_clear_object(&self->in); g_clear_object(&self->out); G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object); } static void spice_vmc_stream_init(SpiceVmcStream *self) { } G_GNUC_INTERNAL SpiceVmcStream * spice_vmc_stream_new(SpiceChannel *channel) { SpiceVmcStream *self; self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL); self->channel = channel; return self; } static GInputStream * spice_vmc_stream_get_input_stream(GIOStream *stream) { SpiceVmcStream *self = SPICE_VMC_STREAM(stream); if (!self->in) self->in = spice_vmc_input_stream_new(); return G_INPUT_STREAM(self->in); } static GOutputStream * spice_vmc_stream_get_output_stream(GIOStream *stream) { SpiceVmcStream *self = SPICE_VMC_STREAM(stream); if (!self->out) self->out = spice_vmc_output_stream_new(self->channel); return G_OUTPUT_STREAM(self->out); }