summaryrefslogtreecommitdiff
path: root/salut/lib/gibber/gibber-bytestream-direct.c
diff options
context:
space:
mode:
Diffstat (limited to 'salut/lib/gibber/gibber-bytestream-direct.c')
-rw-r--r--salut/lib/gibber/gibber-bytestream-direct.c639
1 files changed, 639 insertions, 0 deletions
diff --git a/salut/lib/gibber/gibber-bytestream-direct.c b/salut/lib/gibber/gibber-bytestream-direct.c
new file mode 100644
index 000000000..2537d32c9
--- /dev/null
+++ b/salut/lib/gibber/gibber-bytestream-direct.c
@@ -0,0 +1,639 @@
+/*
+ * gibber-bytestream-direct.c - Source for GibberBytestreamDirect
+ * Copyright (C) 2008 Collabora Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "gibber-bytestream-direct.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+#include <glib.h>
+
+#include <wocky/wocky-xmpp-error.h>
+
+#include "gibber-sockets.h"
+#include "gibber-linklocal-transport.h"
+#include "gibber-util.h"
+
+#define DEBUG_FLAG DEBUG_BYTESTREAM
+#include "gibber-debug.h"
+
+static void
+bytestream_iface_init (gpointer g_iface, gpointer iface_data);
+
+G_DEFINE_TYPE_WITH_CODE (GibberBytestreamDirect, gibber_bytestream_direct,
+ G_TYPE_OBJECT,
+ G_IMPLEMENT_INTERFACE (GIBBER_TYPE_BYTESTREAM_IFACE,
+ bytestream_iface_init));
+
+/* properties */
+enum
+{
+ PROP_ADDRESSES = 1,
+ PROP_SELF_ID,
+ PROP_PEER_ID,
+ PROP_STREAM_ID,
+ PROP_STATE,
+
+ /* relevent only on recipient side to connect to the initiator */
+ PROP_PORT,
+
+ PROP_PROTOCOL,
+ LAST_PROPERTY
+};
+
+typedef struct _GibberBytestreamDirectPrivate GibberBytestreamDirectPrivate;
+struct _GibberBytestreamDirectPrivate
+{
+ /* A list of struct sockaddr_storage to try to connect to, if
+ * this GibberBytestreamDirect object is on the initiator side.
+ * GibberBytestreamDirect own theses structures and must free them. */
+ GArray *addresses;
+
+ gchar *self_id;
+ gchar *peer_id;
+ gchar *stream_id;
+ GibberBytestreamState state;
+
+ guint portnum;
+
+ /* Are we the recipient of this bytestream?
+ * If not we are the sender */
+ gboolean recipient;
+ GibberTransport *transport;
+ gboolean write_blocked;
+ gboolean read_blocked;
+
+ GibberBytestreamDirectCheckAddrFunc check_addr_func;
+ gpointer check_addr_func_data;
+
+ gboolean dispose_has_run;
+};
+
+#define GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE(obj) \
+ ((GibberBytestreamDirectPrivate *) obj->priv)
+
+static void
+gibber_bytestream_direct_init (GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
+ GIBBER_TYPE_BYTESTREAM_DIRECT, GibberBytestreamDirectPrivate);
+
+ self->priv = priv;
+}
+
+static void
+gibber_bytestream_direct_dispose (GObject *object)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (object);
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ if (priv->dispose_has_run)
+ return;
+ priv->dispose_has_run = TRUE;
+
+ if (priv->state != GIBBER_BYTESTREAM_STATE_CLOSED)
+ {
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+ }
+
+ if (priv->transport != NULL)
+ {
+ g_object_unref (priv->transport);
+ priv->transport = NULL;
+ }
+
+ G_OBJECT_CLASS (gibber_bytestream_direct_parent_class)->dispose (object);
+}
+
+static void
+gibber_bytestream_direct_finalize (GObject *object)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (object);
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ g_free (priv->stream_id);
+ g_free (priv->self_id);
+ g_free (priv->peer_id);
+
+ if (priv->addresses != NULL)
+ g_array_unref (priv->addresses);
+
+ G_OBJECT_CLASS (gibber_bytestream_direct_parent_class)->finalize (object);
+}
+
+static void
+gibber_bytestream_direct_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (object);
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ switch (property_id)
+ {
+ case PROP_ADDRESSES:
+ g_value_set_pointer (value, priv->addresses);
+ break;
+ case PROP_SELF_ID:
+ g_value_set_string (value, priv->self_id);
+ break;
+ case PROP_PEER_ID:
+ g_value_set_string (value, priv->peer_id);
+ break;
+ case PROP_STREAM_ID:
+ g_value_set_string (value, priv->stream_id);
+ break;
+ case PROP_STATE:
+ g_value_set_uint (value, priv->state);
+ break;
+ case PROP_PORT:
+ g_value_set_uint (value, priv->portnum);
+ break;
+ case PROP_PROTOCOL:
+ /* this property is not used because direct bytestream are not
+ * negociated on XMPP using SI */
+ g_value_set_string (value, (const gchar *)"");
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gibber_bytestream_direct_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (object);
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ switch (property_id)
+ {
+ case PROP_ADDRESSES:
+ priv->addresses = g_value_get_pointer (value);
+ break;
+ case PROP_SELF_ID:
+ g_free (priv->self_id);
+ priv->self_id = g_value_dup_string (value);
+ break;
+ case PROP_PEER_ID:
+ g_free (priv->peer_id);
+ priv->peer_id = g_value_dup_string (value);
+ break;
+ case PROP_STREAM_ID:
+ g_free (priv->stream_id);
+ priv->stream_id = g_value_dup_string (value);
+ break;
+ case PROP_STATE:
+ if (priv->state != g_value_get_uint (value))
+ {
+ priv->state = g_value_get_uint (value);
+ g_signal_emit_by_name (object, "state-changed", priv->state);
+ }
+ break;
+ case PROP_PORT:
+ priv->portnum = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static GObject *
+gibber_bytestream_direct_constructor (GType type,
+ guint n_props,
+ GObjectConstructParam *props)
+{
+ GObject *obj;
+ GibberBytestreamDirectPrivate *priv;
+
+ obj = G_OBJECT_CLASS (gibber_bytestream_direct_parent_class)->
+ constructor (type, n_props, props);
+
+ priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (GIBBER_BYTESTREAM_DIRECT (obj));
+
+ g_assert (priv->self_id != NULL);
+ g_assert (priv->peer_id != NULL);
+
+ return obj;
+}
+
+static void
+gibber_bytestream_direct_class_init (
+ GibberBytestreamDirectClass *gibber_bytestream_direct_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (gibber_bytestream_direct_class);
+ GParamSpec *param_spec;
+
+ g_type_class_add_private (gibber_bytestream_direct_class,
+ sizeof (GibberBytestreamDirectPrivate));
+
+ object_class->dispose = gibber_bytestream_direct_dispose;
+ object_class->finalize = gibber_bytestream_direct_finalize;
+
+ object_class->get_property = gibber_bytestream_direct_get_property;
+ object_class->set_property = gibber_bytestream_direct_set_property;
+ object_class->constructor = gibber_bytestream_direct_constructor;
+
+ g_object_class_override_property (object_class, PROP_SELF_ID,
+ "self-id");
+ g_object_class_override_property (object_class, PROP_PEER_ID,
+ "peer-id");
+ g_object_class_override_property (object_class, PROP_STREAM_ID,
+ "stream-id");
+ g_object_class_override_property (object_class, PROP_STATE,
+ "state");
+ g_object_class_override_property (object_class, PROP_PROTOCOL,
+ "protocol");
+
+ param_spec = g_param_spec_pointer (
+ "addresses",
+ "Array of addresses",
+ "GArray of struct sockaddr_storage used to find the IP address to "
+ "connect in this bytestream if it's a private one",
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_READWRITE |
+ G_PARAM_STATIC_NAME |
+ G_PARAM_STATIC_NICK |
+ G_PARAM_STATIC_BLURB);
+ g_object_class_install_property (object_class, PROP_ADDRESSES,
+ param_spec);
+
+ param_spec = g_param_spec_uint (
+ "port",
+ "port",
+ "port for the recipient to connect on the initiator",
+ 0,
+ G_MAXUINT32,
+ 0,
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_READWRITE |
+ G_PARAM_STATIC_NAME |
+ G_PARAM_STATIC_NICK |
+ G_PARAM_STATIC_BLURB);
+ g_object_class_install_property (object_class, PROP_PORT,
+ param_spec);
+}
+
+void
+gibber_bytestream_direct_set_check_addr_func (
+ GibberBytestreamDirect *self,
+ GibberBytestreamDirectCheckAddrFunc func,
+ gpointer user_data)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ priv->check_addr_func = func;
+ priv->check_addr_func_data = user_data;
+}
+
+static void
+transport_handler (GibberTransport *transport,
+ GibberBuffer *data,
+ gpointer user_data)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (user_data);
+ GString *buffer;
+
+ DEBUG ("GibberBytestreamDirect emit DATA_RECEIVED.");
+
+ buffer = g_string_new_len ((const gchar *) data->data, data->length);
+
+ g_signal_emit_by_name (G_OBJECT (self), "data-received", NULL, buffer);
+
+ g_string_free (buffer, TRUE);
+}
+
+static void
+transport_connected_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ DEBUG ("transport connected. Bytestream is now open");
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_OPEN,
+ NULL);
+}
+
+static void
+transport_disconnected_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+ return;
+
+ DEBUG ("transport disconnected. close the bytestream");
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_ACCEPTED)
+ {
+ /* Connection to host failed */
+ GError e = { WOCKY_XMPP_ERROR, WOCKY_XMPP_ERROR_ITEM_NOT_FOUND,
+ "connection failed" };
+
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), &e);
+ }
+ else
+ {
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+ }
+}
+
+static void
+change_write_blocked_state (GibberBytestreamDirect *self,
+ gboolean blocked)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->write_blocked == blocked)
+ return;
+
+ priv->write_blocked = blocked;
+ g_signal_emit_by_name (self, "write-blocked", blocked);
+}
+
+static void
+bytestream_closed (GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->transport != NULL)
+ {
+ g_signal_handlers_disconnect_matched (priv->transport,
+ G_SIGNAL_MATCH_DATA, 0, 0, NULL, NULL, self);
+ gibber_transport_disconnect (priv->transport);
+ g_object_unref (priv->transport);
+ priv->transport = NULL;
+ }
+
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSED, NULL);
+}
+
+static void
+transport_buffer_empty_cb (GibberTransport *transport,
+ GibberBytestreamDirect *self)
+{
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSING)
+ {
+ DEBUG ("buffer is now empty. Bytestream can be closed");
+ bytestream_closed (self);
+ }
+ else if (priv->write_blocked)
+ {
+ DEBUG ("buffer is empty, unblock write to the bytestream");
+ change_write_blocked_state (self, FALSE);
+ }
+}
+
+static void
+set_transport (GibberBytestreamDirect *self,
+ GibberTransport *transport)
+{
+ GibberBytestreamDirectPrivate *priv = GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE
+ (self);
+
+ g_assert (priv->transport == NULL);
+
+ priv->transport = g_object_ref (transport);
+ gibber_transport_set_handler (transport, transport_handler, self);
+
+ /* The transport will already be connected if it is created from
+ * GibberListener. In this case, set the bytestream to open, otherwise
+ * it will be done in transport_connected_cb. */
+ if (gibber_transport_get_state (transport) == GIBBER_TRANSPORT_CONNECTED)
+ {
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_OPEN,
+ NULL);
+ }
+
+ g_signal_connect (transport, "connected",
+ G_CALLBACK (transport_connected_cb), self);
+ g_signal_connect (transport, "disconnected",
+ G_CALLBACK (transport_disconnected_cb), self);
+ g_signal_connect (priv->transport, "buffer-empty",
+ G_CALLBACK (transport_buffer_empty_cb), self);
+}
+
+gboolean
+gibber_bytestream_direct_accept_socket (GibberBytestreamIface *bytestream,
+ GibberTransport *transport)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state != GIBBER_BYTESTREAM_STATE_LOCAL_PENDING)
+ {
+ DEBUG ("bytestream is not is the initiating state (state %d)",
+ priv->state);
+ return FALSE;
+ }
+
+ set_transport (self, transport);
+
+ return TRUE;
+}
+
+static void
+gibber_bytestream_direct_block_reading (GibberBytestreamIface *bytestream,
+ gboolean block)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->read_blocked == block)
+ return;
+
+ priv->read_blocked = block;
+
+ DEBUG ("%s the transport bytestream", block ? "block": "unblock");
+ gibber_transport_block_receiving (priv->transport, block);
+}
+
+/*
+ * gibber_bytestream_direct_send
+ *
+ * Implements gibber_bytestream_iface_send on GibberBytestreamIface
+ */
+static gboolean
+gibber_bytestream_direct_send (GibberBytestreamIface *bytestream,
+ guint len,
+ const gchar *str)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+ GError *error = NULL;
+
+ if (priv->state != GIBBER_BYTESTREAM_STATE_OPEN)
+ {
+ DEBUG ("can't send data through a not open bytestream (state: %d)",
+ priv->state);
+ return FALSE;
+ }
+
+ if (priv->write_blocked)
+ {
+ DEBUG ("sending data while the bytestream was blocked");
+ }
+
+ DEBUG ("send %u bytes through bytestream", len);
+ if (!gibber_transport_send (priv->transport, (const guint8 *) str, len,
+ &error))
+ {
+ DEBUG ("sending failed: %s", error->message);
+ g_error_free (error);
+
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), NULL);
+ return FALSE;
+ }
+
+ if (!gibber_transport_buffer_is_empty (priv->transport))
+ {
+ /* We >don't want to send more data while the buffer isn't empty */
+ DEBUG ("buffer isn't empty. Block write to the bytestream");
+ change_write_blocked_state (self, TRUE);
+ }
+
+ return TRUE;
+}
+
+
+/*
+ * gibber_bytestream_direct_accept
+ *
+ * Implements gibber_bytestream_iface_accept on GibberBytestreamIface
+ */
+static void
+gibber_bytestream_direct_accept (GibberBytestreamIface *bytestream,
+ GibberBytestreamAugmentSiAcceptReply func,
+ gpointer user_data)
+{
+ /* nothing to do: GibberBytestreamDirect don't use Stream Initiation, so it
+ * does not have to send any SI iq stanza to accept the bytestream */
+}
+
+/*
+ * gibber_bytestream_direct_close
+ *
+ * Implements gibber_bytestream_iface_close on GibberBytestreamIface
+ */
+static void
+gibber_bytestream_direct_close (GibberBytestreamIface *bytestream,
+ GError *error)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+
+ if (priv->state == GIBBER_BYTESTREAM_STATE_CLOSED)
+ /* bytestream already closed, do nothing */
+ return;
+
+ g_object_set (self, "state", GIBBER_BYTESTREAM_STATE_CLOSING, NULL);
+ if (priv->transport != NULL &&
+ !gibber_transport_buffer_is_empty (priv->transport))
+ {
+ DEBUG ("Wait transport buffer is empty before close the bytestream");
+ }
+ else
+ {
+ DEBUG ("Transport buffer is empty, we can close the bytestream");
+ bytestream_closed (self);
+ }
+}
+
+/*
+ * gibber_bytestream_direct_initiate
+ * connect to the remote end
+ *
+ * Implements gibber_bytestream_iface_initiate on GibberBytestreamIface
+ */
+static gboolean
+gibber_bytestream_direct_initiate (GibberBytestreamIface *bytestream)
+{
+ GibberBytestreamDirect *self = GIBBER_BYTESTREAM_DIRECT (bytestream);
+ GibberLLTransport *ll_transport;
+ /* never cast addr but type-punning to avoid strict-aliasing issues
+ * (see -fstrict-aliasing in man gcc) */
+ union {
+ struct sockaddr_storage storage;
+ struct sockaddr_in6 in6;
+ } addr;
+ GibberBytestreamDirectPrivate *priv =
+ GIBBER_BYTESTREAM_DIRECT_GET_PRIVATE (self);
+ guint i;
+ gboolean ret;
+
+ g_assert (priv->addresses != NULL);
+ if (priv->addresses->len < 1)
+ {
+ GError e = { WOCKY_XMPP_ERROR, WOCKY_XMPP_ERROR_ITEM_NOT_FOUND,
+ "Unsable get socket address for this contact" };
+ DEBUG ("Could not get socket address for this contact" );
+ gibber_bytestream_iface_close (GIBBER_BYTESTREAM_IFACE (self), &e);
+ return FALSE;
+ }
+
+ ll_transport = gibber_ll_transport_new ();
+ set_transport (self, GIBBER_TRANSPORT (ll_transport));
+ g_object_unref (ll_transport);
+
+ ret = FALSE;
+ for (i = 0 ; i < priv->addresses->len && !ret ; i++)
+ {
+ addr.storage = g_array_index (priv->addresses, struct sockaddr_storage,
+ i);
+ addr.in6.sin6_port = g_htons ((guint16) priv->portnum);
+
+ ret = gibber_ll_transport_open_sockaddr (ll_transport, &addr.storage,
+ NULL);
+ }
+
+ return ret;
+}
+
+static void
+bytestream_iface_init (gpointer g_iface,
+ gpointer iface_data)
+{
+ GibberBytestreamIfaceClass *klass = (GibberBytestreamIfaceClass *) g_iface;
+
+ klass->initiate = gibber_bytestream_direct_initiate;
+ klass->send = gibber_bytestream_direct_send;
+ klass->close = gibber_bytestream_direct_close;
+ klass->accept = gibber_bytestream_direct_accept;
+ klass->block_reading = gibber_bytestream_direct_block_reading;
+}