#include "pn_node.h" #include "pn_buffer.h" #include #include "pn_log.h" #include static void *parent_class; enum pn_node_status { PN_NODE_STATUS_CLOSED, PN_NODE_STATUS_CONNECTING, PN_NODE_STATUS_OPEN, }; struct pn_node_priv { char *hostname; int port; enum pn_node_status status; GSocketConnection *socket_conn; GError *error; struct pn_buffer *input_buffer; struct pn_buffer *output_buffer; }; GQuark pn_node_error_quark(void) { static GQuark quark; if (quark == 0) quark = g_quark_from_static_string("pecan-node-error-quark"); return quark; } struct pn_node * pn_node_new(void) { return g_object_new(PN_NODE_TYPE, NULL); } void pn_node_free(struct pn_node *node) { if (!node) return; g_object_unref(node); } void pn_node_connect(struct pn_node *node, const char *hostname, int port) { PN_NODE_GET_CLASS(node)->connect(node, hostname, port); } void pn_node_close(struct pn_node *node) { PN_NODE_GET_CLASS(node)->close(node); } void pn_node_set_error(struct pn_node *node, GError *error) { node->priv->error = error; pn_node_close(node); } void pn_node_write(struct pn_node *node, const void *buffer, gsize count) { PN_NODE_GET_CLASS(node)->write(node, buffer, count); } void pn_node_parse(struct pn_node *node, struct pn_buffer *buffer) { PN_NODE_GET_CLASS(node)->parse(node, buffer); } /* pn_node stuff */ static void parse_impl(struct pn_node *node, struct pn_buffer *buffer) { } static void read_cb(GObject *source, GAsyncResult *result, gpointer user_data) { struct pn_node *node = user_data; struct pn_node_priv *priv = node->priv; gssize size; GError *error = NULL; struct pn_buffer *b = priv->input_buffer; size = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error); pn_debug(node, "size=%i", size); if (size == 0) error = g_error_new_literal(PN_NODE_ERROR, 0, "Connection finished"); if (error) goto nok; b->len += size; pn_node_parse(node, b); if (priv->status == PN_NODE_STATUS_OPEN) { g_input_stream_read_async(G_INPUT_STREAM(source), b->data, b->size, G_PRIORITY_DEFAULT, NULL, read_cb, node); } else { g_object_unref(node); } return; nok: pn_node_set_error(node, error); g_object_unref(node); return; } static void write_cb(GObject *source, GAsyncResult *result, gpointer user_data) { struct pn_node *node = user_data; struct pn_node_priv *priv = node->priv; gssize size; GError *error = NULL; struct pn_buffer *buffer; size = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error); pn_debug(node, "size=%i", size); if (size == 0) error = g_error_new_literal(PN_NODE_ERROR, 0, "Connection finished"); if (error) goto nok; buffer = priv->output_buffer; buffer->offset += size; if (buffer->offset < buffer->len) { g_output_stream_write_async(G_OUTPUT_STREAM(source), buffer->data + buffer->offset, buffer->len - buffer->offset, G_PRIORITY_DEFAULT, NULL, write_cb, node); } else { pn_buffer_free(buffer); priv->output_buffer = NULL; g_object_unref(node); } return; nok: pn_node_set_error(node, error); g_object_unref(node); return; } static void write_impl(struct pn_node *node, const void *buffer, gsize count) { struct pn_node_priv *priv = node->priv; GOutputStream *output; output = g_io_stream_get_output_stream(G_IO_STREAM(priv->socket_conn)); priv->output_buffer = pn_buffer_new_use((void *)buffer, count); g_object_ref(node); g_output_stream_write_async(output, buffer, count, G_PRIORITY_DEFAULT, NULL, write_cb, node); } static void connect_cb(GObject *source, GAsyncResult *res, gpointer user_data) { struct pn_node *node = user_data; struct pn_node_priv *priv = node->priv; GSocketConnection *socket_conn; GError *error = NULL; socket_conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source), res, &error); g_object_ref(node); if (socket_conn) { GInputStream *input; struct pn_buffer *b = priv->input_buffer; priv->status = PN_NODE_STATUS_OPEN; priv->socket_conn = socket_conn; input = g_io_stream_get_input_stream(G_IO_STREAM(socket_conn)); g_object_ref(node); g_input_stream_read_async(input, b->data, b->size, G_PRIORITY_DEFAULT, NULL, read_cb, node); struct pn_node_class *class; class = g_type_class_peek(PN_NODE_TYPE); g_signal_emit(G_OBJECT(node), class->open_sig, 0, node); } else { pn_node_set_error(node, error); } g_object_unref(node); } static void connect_impl(struct pn_node *node, const char *hostname, int port) { struct pn_node_priv *priv = node->priv; pn_node_close(node); priv->hostname = g_strdup(hostname); priv->port = port; priv->status = PN_NODE_STATUS_CONNECTING; { GSocketClient *client; client = g_socket_client_new(); g_socket_client_connect_to_host_async(client, hostname, port, NULL, connect_cb, node); g_object_unref(client); } } static void close_impl(struct pn_node *node) { struct pn_node_priv *priv = node->priv; if (priv->status == PN_NODE_STATUS_CLOSED) return; priv->status = PN_NODE_STATUS_CLOSED; if (priv->error) { struct pn_node_class *class; class = g_type_class_peek(PN_NODE_TYPE); g_signal_emit(G_OBJECT(node), class->error_sig, 0, &priv->error, node); if (priv->error) { pn_warn(node, "unhandled error: %s", priv->error->message); g_clear_error(&priv->error); } } if (priv->socket_conn) { g_object_unref(priv->socket_conn); priv->socket_conn = NULL; } g_free(priv->hostname); priv->hostname = NULL; } /* GObject stuff */ static void dispose(GObject *obj) { pn_node_close(PN_NODE(obj)); G_OBJECT_CLASS(parent_class)->dispose(obj); } static void finalize(GObject *obj) { struct pn_node_priv *priv = PN_NODE(obj)->priv; pn_buffer_free(priv->input_buffer); G_OBJECT_CLASS(parent_class)->finalize(obj); } static void class_init(void *g_class, void *class_data) { struct pn_node_class *node_class = PN_NODE_CLASS(g_class); GObjectClass *gobject_class = G_OBJECT_CLASS(g_class); node_class->connect = &connect_impl; node_class->close = &close_impl; node_class->write = &write_impl; node_class->parse = &parse_impl; gobject_class->dispose = dispose; gobject_class->finalize = finalize; parent_class = g_type_class_peek_parent(g_class); g_type_class_add_private(g_class, sizeof(struct pn_node_priv)); node_class->open_sig = g_signal_new("open", G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); node_class->error_sig = g_signal_new("error", G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 1, G_TYPE_POINTER); } static void instance_init(GTypeInstance *instance, void *g_class) { struct pn_node *self = PN_NODE(instance); struct pn_node_priv *priv; self->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE(instance, PN_NODE_TYPE, struct pn_node_priv); priv->input_buffer = pn_buffer_new_and_alloc(0); } GType pn_node_get_type(void) { static gsize init_type; if (g_once_init_enter(&init_type)) { GType type; GTypeInfo type_info = { .class_size = sizeof(struct pn_node_class), .class_init = class_init, .instance_size = sizeof(struct pn_node), .instance_init = instance_init, }; type = g_type_register_static(G_TYPE_OBJECT, "PnNode", &type_info, 0); g_once_init_leave(&init_type, type); } return init_type; }