summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelipe Contreras <felipe.contreras@gmail.com>2010-06-13 21:28:41 +0300
committerFelipe Contreras <felipe.contreras@gmail.com>2010-06-14 03:33:28 +0300
commitc09e7aa06f0ccfc1707c4cb6f463f53f776e6c29 (patch)
tree0a723f793f12404dbcffed8a9b3585faad28b993
parentf579d63c0eef97adc5960b27ff13427df59edd28 (diff)
node: add write implementation
Signed-off-by: Felipe Contreras <felipe.contreras@gmail.com>
-rw-r--r--pn_node.c73
-rw-r--r--pn_node.h3
2 files changed, 75 insertions, 1 deletions
diff --git a/pn_node.c b/pn_node.c
index 61c0050..8f9acff 100644
--- a/pn_node.c
+++ b/pn_node.c
@@ -1,4 +1,5 @@
#include "pn_node.h"
+#include "pn_buffer.h"
#include <gio/gio.h>
@@ -18,6 +19,7 @@ struct pn_node_priv {
enum pn_node_status status;
GSocketConnection *socket_conn;
GError *error;
+ struct pn_buffer *output_buffer;
};
struct pn_node *
@@ -56,9 +58,79 @@ pn_node_set_error(struct pn_node *node,
pn_node_close(node);
}
+void
+pn_node_write(struct pn_node *node,
+ const void *buffer,
+ gsize count)
+{
+ PN_NODE_GET_CLASS(node)->write(node, buffer, count);
+}
+
/* pn_node stuff */
static void
+write_cb(GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ struct pn_node *node = user_data;
+ struct pn_node_priv *priv = node->priv;
+ gssize size;
+ GError *error = NULL;
+ struct pn_buffer *buffer;
+
+ size = g_output_stream_write_finish(G_OUTPUT_STREAM(source),
+ result, &error);
+
+ pn_debug(node, "size=%i", size);
+
+ if (error)
+ goto nok;
+
+ buffer = priv->output_buffer;
+
+ buffer->offset += size;
+
+ if (buffer->offset < buffer->len) {
+ g_output_stream_write_async(G_OUTPUT_STREAM(source),
+ buffer->data + buffer->offset,
+ buffer->len - buffer->offset,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ write_cb,
+ node);
+ }
+ else {
+ pn_buffer_free(buffer);
+ priv->output_buffer = NULL;
+
+ g_object_unref(node);
+ }
+ return;
+
+nok:
+ pn_node_set_error(node, error);
+ g_object_unref(node);
+ return;
+}
+
+static void
+write_impl(struct pn_node *node,
+ const void *buffer,
+ gsize count)
+{
+ struct pn_node_priv *priv = node->priv;
+ GOutputStream *output;
+
+ output = g_io_stream_get_output_stream(G_IO_STREAM(priv->socket_conn));
+ priv->output_buffer = pn_buffer_new_use((void *)buffer, count);
+
+ g_object_ref(node);
+ g_output_stream_write_async(output, buffer, count,
+ G_PRIORITY_DEFAULT, NULL, write_cb, node);
+}
+
+static void
connect_cb(GObject *source,
GAsyncResult *res,
gpointer user_data)
@@ -159,6 +231,7 @@ class_init(void *g_class,
node_class->connect = &connect_impl;
node_class->close = &close_impl;
+ node_class->write = &write_impl;
gobject_class->dispose = dispose;
diff --git a/pn_node.h b/pn_node.h
index c243fd9..830d472 100644
--- a/pn_node.h
+++ b/pn_node.h
@@ -16,6 +16,7 @@ struct pn_node_class {
void (*connect) (struct pn_node *node, const char *hostname, int port);
void (*close) (struct pn_node *node);
+ void (*write) (struct pn_node *node, const void *buffer, gsize count);
guint open_sig;
guint error_sig;
@@ -31,7 +32,7 @@ void pn_node_free(struct pn_node *node);
void pn_node_connect(struct pn_node *node, const char *hostname, int port);
void pn_node_close(struct pn_node *node);
-void pn_node_set_error(struct pn_node *node, GError *error);
+void pn_node_write(struct pn_node *node, const void *buffer, gsize count);
GType pn_node_get_type(void);