summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelipe Contreras <felipe.contreras@gmail.com>2010-06-14 01:54:50 +0300
committerFelipe Contreras <felipe.contreras@gmail.com>2010-06-14 03:33:28 +0300
commit290427fa78f74fd5bec3410cf325a691eea016ac (patch)
treeb6f9c3cf3f9953a6cbf582e80076365f4b90c0d6
parent69abe0167a51b96a7caafa32edc66c3186be784a (diff)
node: add read implementation
Signed-off-by: Felipe Contreras <felipe.contreras@gmail.com>
-rw-r--r--pn_node.c58
1 files changed, 58 insertions, 0 deletions
diff --git a/pn_node.c b/pn_node.c
index 8f9acff..98bdc4b 100644
--- a/pn_node.c
+++ b/pn_node.c
@@ -5,6 +5,8 @@
#include "pn_log.h"
+#include <string.h>
+
static void *parent_class;
enum pn_node_status {
@@ -19,6 +21,7 @@ struct pn_node_priv {
enum pn_node_status status;
GSocketConnection *socket_conn;
GError *error;
+ struct pn_buffer *input_buffer;
struct pn_buffer *output_buffer;
};
@@ -69,6 +72,40 @@ pn_node_write(struct pn_node *node,
/* pn_node stuff */
static void
+read_cb(GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ struct pn_node *node = user_data;
+ struct pn_node_priv *priv = node->priv;
+ gssize size;
+ GError *error = NULL;
+ struct pn_buffer *b = priv->input_buffer;
+
+ size = g_input_stream_read_finish(G_INPUT_STREAM(source),
+ result, &error);
+
+ pn_debug(node, "size=%i", size);
+
+ if (error)
+ goto nok;
+
+ if (priv->status == PN_NODE_STATUS_OPEN) {
+ g_input_stream_read_async(G_INPUT_STREAM(source), b->data, b->size,
+ G_PRIORITY_DEFAULT, NULL, read_cb, node);
+ }
+ else {
+ g_object_unref(node);
+ }
+ return;
+
+nok:
+ pn_node_set_error(node, error);
+ g_object_unref(node);
+ return;
+}
+
+static void
write_cb(GObject *source,
GAsyncResult *result,
gpointer user_data)
@@ -145,9 +182,19 @@ connect_cb(GObject *source,
g_object_ref(node);
if (socket_conn) {
+ GInputStream *input;
+ struct pn_buffer *b = priv->input_buffer;
+
priv->status = PN_NODE_STATUS_OPEN;
priv->socket_conn = socket_conn;
+ input = g_io_stream_get_input_stream(G_IO_STREAM(socket_conn));
+
+ g_object_ref(node);
+ g_input_stream_read_async(input, b->data, b->size,
+ G_PRIORITY_DEFAULT, NULL,
+ read_cb, node);
+
struct pn_node_class *class;
class = g_type_class_peek(PN_NODE_TYPE);
g_signal_emit(G_OBJECT(node), class->open_sig, 0, node);
@@ -223,6 +270,15 @@ dispose(GObject *obj)
}
static void
+finalize(GObject *obj)
+{
+ struct pn_node_priv *priv = PN_NODE(obj)->priv;
+ pn_buffer_free(priv->input_buffer);
+
+ G_OBJECT_CLASS(parent_class)->finalize(obj);
+}
+
+static void
class_init(void *g_class,
void *class_data)
{
@@ -234,6 +290,7 @@ class_init(void *g_class,
node_class->write = &write_impl;
gobject_class->dispose = dispose;
+ gobject_class->finalize = finalize;
parent_class = g_type_class_peek_parent(g_class);
g_type_class_add_private(g_class, sizeof(struct pn_node_priv));
@@ -256,6 +313,7 @@ instance_init(GTypeInstance *instance,
struct pn_node_priv *priv;
self->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE(instance, PN_NODE_TYPE, struct pn_node_priv);
+ priv->input_buffer = pn_buffer_new_and_alloc(0);
}
GType