diff options
author | Felipe Contreras <felipe.contreras@gmail.com> | 2010-06-13 21:28:41 +0300 |
---|---|---|
committer | Felipe Contreras <felipe.contreras@gmail.com> | 2010-06-14 03:33:28 +0300 |
commit | c09e7aa06f0ccfc1707c4cb6f463f53f776e6c29 (patch) | |
tree | 0a723f793f12404dbcffed8a9b3585faad28b993 | |
parent | f579d63c0eef97adc5960b27ff13427df59edd28 (diff) |
node: add write implementation
Signed-off-by: Felipe Contreras <felipe.contreras@gmail.com>
-rw-r--r-- | pn_node.c | 73 | ||||
-rw-r--r-- | pn_node.h | 3 |
2 files changed, 75 insertions, 1 deletions
@@ -1,4 +1,5 @@ #include "pn_node.h" +#include "pn_buffer.h" #include <gio/gio.h> @@ -18,6 +19,7 @@ struct pn_node_priv { enum pn_node_status status; GSocketConnection *socket_conn; GError *error; + struct pn_buffer *output_buffer; }; struct pn_node * @@ -56,9 +58,79 @@ pn_node_set_error(struct pn_node *node, pn_node_close(node); } +void +pn_node_write(struct pn_node *node, + const void *buffer, + gsize count) +{ + PN_NODE_GET_CLASS(node)->write(node, buffer, count); +} + /* pn_node stuff */ static void +write_cb(GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + struct pn_node *node = user_data; + struct pn_node_priv *priv = node->priv; + gssize size; + GError *error = NULL; + struct pn_buffer *buffer; + + size = g_output_stream_write_finish(G_OUTPUT_STREAM(source), + result, &error); + + pn_debug(node, "size=%i", size); + + if (error) + goto nok; + + buffer = priv->output_buffer; + + buffer->offset += size; + + if (buffer->offset < buffer->len) { + g_output_stream_write_async(G_OUTPUT_STREAM(source), + buffer->data + buffer->offset, + buffer->len - buffer->offset, + G_PRIORITY_DEFAULT, + NULL, + write_cb, + node); + } + else { + pn_buffer_free(buffer); + priv->output_buffer = NULL; + + g_object_unref(node); + } + return; + +nok: + pn_node_set_error(node, error); + g_object_unref(node); + return; +} + +static void +write_impl(struct pn_node *node, + const void *buffer, + gsize count) +{ + struct pn_node_priv *priv = node->priv; + GOutputStream *output; + + output = g_io_stream_get_output_stream(G_IO_STREAM(priv->socket_conn)); + priv->output_buffer = pn_buffer_new_use((void *)buffer, count); + + g_object_ref(node); + g_output_stream_write_async(output, buffer, count, + G_PRIORITY_DEFAULT, NULL, write_cb, node); +} + +static void connect_cb(GObject *source, GAsyncResult *res, gpointer user_data) @@ -159,6 +231,7 @@ class_init(void *g_class, node_class->connect = &connect_impl; node_class->close = &close_impl; + node_class->write = &write_impl; gobject_class->dispose = dispose; @@ -16,6 +16,7 @@ struct pn_node_class { void (*connect) (struct pn_node *node, const char *hostname, int port); void (*close) (struct pn_node *node); + void (*write) (struct pn_node *node, const void *buffer, gsize count); guint open_sig; guint error_sig; @@ -31,7 +32,7 @@ void pn_node_free(struct pn_node *node); void pn_node_connect(struct pn_node *node, const char *hostname, int port); void pn_node_close(struct pn_node *node); -void pn_node_set_error(struct pn_node *node, GError *error); +void pn_node_write(struct pn_node *node, const void *buffer, gsize count); GType pn_node_get_type(void); |