summaryrefslogtreecommitdiff
path: root/src/giopipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/giopipe.c')
-rw-r--r--src/giopipe.c484
1 files changed, 484 insertions, 0 deletions
diff --git a/src/giopipe.c b/src/giopipe.c
new file mode 100644
index 0000000..d91c4d9
--- /dev/null
+++ b/src/giopipe.c
@@ -0,0 +1,484 @@
+/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ Copyright (C) 2015 Red Hat, Inc.
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <string.h>
+#include <errno.h>
+
+#include "giopipe.h"
+
+#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ())
+#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
+#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
+#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
+#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
+
+typedef struct _PipeInputStreamClass PipeInputStreamClass;
+typedef struct _PipeInputStream PipeInputStream;
+typedef struct _PipeOutputStream PipeOutputStream;
+
+struct _PipeInputStream
+{
+ GInputStream parent_instance;
+
+ PipeOutputStream *peer;
+ gssize read;
+
+ /* GIOstream:closed is protected against pending operations, so we
+ * use an additional close flag to cancel those when the peer is
+ * closing.
+ */
+ gboolean peer_closed;
+ GList *sources;
+};
+
+struct _PipeInputStreamClass
+{
+ GInputStreamClass parent_class;
+};
+
+#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ())
+#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
+#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
+#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
+#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
+
+typedef struct _PipeOutputStreamClass PipeOutputStreamClass;
+
+struct _PipeOutputStream
+{
+ GOutputStream parent_instance;
+
+ PipeInputStream *peer;
+ const gchar *buffer;
+ gsize count;
+ gboolean peer_closed;
+ GList *sources;
+};
+
+struct _PipeOutputStreamClass
+{
+ GOutputStreamClass parent_class;
+};
+
+static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static void pipe_input_stream_check_source (PipeInputStream *self);
+static void pipe_output_stream_check_source (PipeOutputStream *self);
+
+G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ pipe_input_stream_pollable_iface_init))
+
+static gssize
+pipe_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+
+ g_return_val_if_fail(count > 0, -1);
+
+ if (g_input_stream_is_closed (stream) || self->peer_closed) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is already closed");
+ return -1;
+ }
+
+ if (!self->peer->buffer) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror(EAGAIN));
+ return -1;
+ }
+
+ count = MIN(self->peer->count, count);
+ memcpy(buffer, self->peer->buffer, count);
+ self->read = count;
+ self->peer->buffer = NULL;
+
+ //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
+ /* schedule peer source */
+ pipe_output_stream_check_source(self->peer);
+
+ return count;
+}
+
+static GList *
+set_all_sources_ready (GList *sources)
+{
+ GList *it = sources;
+ while (it != NULL) {
+ GSource *s = it->data;
+ GList *next = it->next;
+
+ if (s == NULL || g_source_is_destroyed(s)) {
+ /* remove */
+ sources = g_list_delete_link(sources, it);
+ g_source_unref(s);
+ } else {
+ /* dispatch */
+ g_source_set_ready_time(s, 0);
+ }
+ it = next;
+ }
+ return sources;
+}
+
+static void
+pipe_input_stream_check_source (PipeInputStream *self)
+{
+ if (g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
+ self->sources = set_all_sources_ready(self->sources);
+}
+
+static gboolean
+pipe_input_stream_close (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeInputStream *self;
+
+ self = PIPE_INPUT_STREAM(stream);
+
+ if (self->peer) {
+ /* ignore any pending errors */
+ self->peer->peer_closed = TRUE;
+ g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
+ pipe_output_stream_check_source(self->peer);
+ }
+
+ return TRUE;
+}
+
+static void
+pipe_input_stream_close_async (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GTask *task;
+
+ task = g_task_new (stream, cancellable, callback, data);
+
+ /* will always return TRUE */
+ pipe_input_stream_close (stream, cancellable, NULL);
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+static gboolean
+pipe_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static void
+pipe_input_stream_init (PipeInputStream *self)
+{
+ self->read = -1;
+}
+
+static void
+pipe_input_stream_dispose(GObject *object)
+{
+ PipeInputStream *self;
+
+ self = PIPE_INPUT_STREAM(object);
+
+ if (self->peer) {
+ g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+ self->peer = NULL;
+ }
+
+ g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+ self->sources = NULL;
+
+ G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_input_stream_class_init (PipeInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+ istream_class->read_fn = pipe_input_stream_read;
+ istream_class->close_fn = pipe_input_stream_close;
+ istream_class->close_async = pipe_input_stream_close_async;
+ istream_class->close_finish = pipe_input_stream_close_finish;
+
+ gobject_class->dispose = pipe_input_stream_dispose;
+}
+
+static gboolean
+pipe_input_stream_is_readable (GPollableInputStream *stream)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM (stream);
+ gboolean readable;
+
+ readable = (self->peer && self->peer->buffer && self->read == -1) || self->peer_closed;
+ //g_debug("readable %p %d", self->peer, readable);
+
+ return readable;
+}
+
+static GSource *
+pipe_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ PipeInputStream *self = PIPE_INPUT_STREAM(stream);
+ GSource *pollable_source;
+
+ pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+ self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
+
+ return pollable_source;
+}
+
+static void
+pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = pipe_input_stream_is_readable;
+ iface->create_source = pipe_input_stream_create_source;
+}
+
+static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ pipe_output_stream_pollable_iface_init))
+
+static gssize
+pipe_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ PipeInputStream *peer = self->peer;
+
+ //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
+ if (g_output_stream_is_closed (stream) || self->peer_closed) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is already closed");
+ return -1;
+ }
+
+ /* this abuses pollable stream, writing sync would likely lead to
+ crashes, since the buffer pointer would become invalid, a
+ generic solution would need a copy..
+ */
+ g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
+ self->buffer = buffer;
+ self->count = count;
+
+ pipe_input_stream_check_source(self->peer);
+
+ if (peer->read < 0) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror (EAGAIN));
+ return -1;
+ }
+
+ g_assert(peer->read <= self->count);
+ count = peer->read;
+
+ self->buffer = NULL;
+ self->count = 0;
+ peer->read = -1;
+
+ return count;
+}
+
+static void
+pipe_output_stream_init (PipeOutputStream *stream)
+{
+}
+
+static void
+pipe_output_stream_dispose(GObject *object)
+{
+ PipeOutputStream *self;
+
+ self = PIPE_OUTPUT_STREAM(object);
+
+ if (self->peer) {
+ g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
+ self->peer = NULL;
+ }
+
+ g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
+ self->sources = NULL;
+
+ G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
+}
+
+static void
+pipe_output_stream_check_source (PipeOutputStream *self)
+{
+ if (g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
+ self->sources = set_all_sources_ready(self->sources);
+}
+
+static gboolean
+pipe_output_stream_close (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ PipeOutputStream *self;
+
+ self = PIPE_OUTPUT_STREAM(stream);
+
+ if (self->peer) {
+ /* ignore any pending errors */
+ self->peer->peer_closed = TRUE;
+ g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
+ pipe_input_stream_check_source(self->peer);
+ }
+
+ return TRUE;
+}
+
+static void
+pipe_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GTask *task;
+
+ task = g_task_new (stream, cancellable, callback, data);
+
+ /* will always return TRUE */
+ pipe_output_stream_close (stream, cancellable, NULL);
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+static gboolean
+pipe_output_stream_close_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+
+static void
+pipe_output_stream_class_init (PipeOutputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+ ostream_class->write_fn = pipe_output_stream_write;
+ ostream_class->close_fn = pipe_output_stream_close;
+ ostream_class->close_async = pipe_output_stream_close_async;
+ ostream_class->close_finish = pipe_output_stream_close_finish;
+
+ gobject_class->dispose = pipe_output_stream_dispose;
+}
+
+static gboolean
+pipe_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ gboolean writable;
+
+ writable = self->buffer == NULL || self->peer->read >= 0;
+ //g_debug("writable %p %d", self, writable);
+
+ return writable;
+}
+
+static GSource *
+pipe_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
+ GSource *pollable_source;
+
+ pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
+ self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
+
+ return pollable_source;
+}
+
+static void
+pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = pipe_output_stream_is_writable;
+ iface->create_source = pipe_output_stream_create_source;
+}
+
+G_GNUC_INTERNAL void
+make_gio_pipe(GInputStream **input, GOutputStream **output)
+{
+ PipeInputStream *in;
+ PipeOutputStream *out;
+
+ g_return_if_fail(input != NULL && *input == NULL);
+ g_return_if_fail(output != NULL && *output == NULL);
+
+ in = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL);
+ out = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL);
+
+ out->peer = in;
+ g_object_add_weak_pointer(G_OBJECT(in), (gpointer*)&out->peer);
+
+ in->peer = out;
+ g_object_add_weak_pointer(G_OBJECT(out), (gpointer*)&in->peer);
+
+ *input = G_INPUT_STREAM(in);
+ *output = G_OUTPUT_STREAM(out);
+}
+
+G_GNUC_INTERNAL void
+spice_make_pipe(GIOStream **p1, GIOStream **p2)
+{
+ GInputStream *in1 = NULL, *in2 = NULL;
+ GOutputStream *out1 = NULL, *out2 = NULL;
+
+ g_return_if_fail(p1 != NULL);
+ g_return_if_fail(p2 != NULL);
+ g_return_if_fail(*p1 == NULL);
+ g_return_if_fail(*p2 == NULL);
+
+ make_gio_pipe(&in1, &out2);
+ make_gio_pipe(&in2, &out1);
+
+ *p1 = g_simple_io_stream_new(in1, out1);
+ *p2 = g_simple_io_stream_new(in2, out2);
+
+ g_object_unref(in1);
+ g_object_unref(in2);
+ g_object_unref(out1);
+ g_object_unref(out2);
+}