summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2015-12-10 12:18:04 +0100
committerWim Taymans <wtaymans@redhat.com>2015-12-10 12:44:42 +0100
commit9c2bcd7b768d77a79fe6e4b9b864ec60427d1bef (patch)
tree6727209e454e7467b13caa098937898aae4b37f9
parent9aaaa26ff3b4ff6c1ddc3eb43705a8635951d8ee (diff)
multisocketsink: add GstNetworkMessage event
Add a property and logic to send a GstNetworkMessage event containing the message that was received from a client. This can be used to implement simply bidirectional communication.
-rw-r--r--gst/tcp/gstmultisocketsink.c94
-rw-r--r--gst/tcp/gstmultisocketsink.h1
2 files changed, 83 insertions, 12 deletions
diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c
index b9f22e000..f9acc13f1 100644
--- a/gst/tcp/gstmultisocketsink.c
+++ b/gst/tcp/gstmultisocketsink.c
@@ -137,11 +137,13 @@ enum
};
#define DEFAULT_SEND_DISPATCHED FALSE
+#define DEFAULT_SEND_MESSAGES FALSE
enum
{
PROP_0,
PROP_SEND_DISPATCHED,
+ PROP_SEND_MESSAGES,
PROP_LAST
};
@@ -223,11 +225,42 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize;
+ /**
+ * GstMultiSocketSink:send-dispatched:
+ *
+ * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
+ * is sent to a client.
+ * The event is a CUSTOM event name GstNetworkMessageDispatched and
+ * contains:
+ *
+ * "object" G_TYPE_OBJECT : the object identifying the client
+ * "buffer" GST_TYPE_BUFFER : the buffer sent to the client
+ *
+ * Since: 1.8.0
+ */
g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
g_param_spec_boolean ("send-dispatched", "Send Dispatched",
"If GstNetworkMessageDispatched events should be pushed",
DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstMultiSocketSink:send-messages:
+ *
+ * Sends a GstNetworkMessage event upstream whenever a buffer
+ * is received from a client.
+ * The event is a CUSTOM event name GstNetworkMessage and contains:
+ *
+ * "object" G_TYPE_OBJECT : the object identifying the client
+ * "buffer" GST_TYPE_BUFFER : the buffer with data received from the
+ * client
+ *
+ * Since: 1.8.0
+ */
+ g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
+ g_param_spec_boolean ("send-messages", "Send Messages",
+ "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstMultiSocketSink::add:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to add to multisocketsink
@@ -416,6 +449,7 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
this->cancellable = g_cancellable_new ();
this->send_dispatched = DEFAULT_SEND_DISPATCHED;
+ this->send_messages = DEFAULT_SEND_MESSAGES;
}
static void
@@ -569,38 +603,49 @@ static gboolean
gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
GstSocketClient * client)
{
- gboolean ret;
- gchar dummy[256];
+ gboolean ret, do_event;
+ gchar dummy[256], *mem, *omem;
gssize nread;
GError *err = NULL;
gboolean first = TRUE;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
+ gssize navail, maxmem;
GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
ret = TRUE;
+ navail = g_socket_get_available_bytes (mhclient->handle.socket);
+ if (navail <= 0)
+ return TRUE;
+
+ /* only collect the data in a buffer when we need to send it with an event */
+ do_event = sink->send_messages;
+ if (do_event) {
+ omem = mem = g_malloc (navail);
+ maxmem = navail;
+ } else {
+ mem = dummy;
+ maxmem = sizeof (dummy);
+ }
+
/* just Read 'n' Drop, could also just drop the client as it's not supposed
* to write to us except for closing the socket, I guess it's because we
* like to listen to our customers. */
- do {
- gssize navail;
-
+ while (navail > 0) {
GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
- navail = g_socket_get_available_bytes (mhclient->handle.socket);
- if (navail <= 0)
- break;
-
nread =
- g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
- sizeof (dummy)), sink->cancellable, &err);
+ g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
+ maxmem), sink->cancellable, &err);
+
if (first && nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
+ break;
} else if (nread < 0) {
GST_WARNING_OBJECT (sink, "%s could not read: %s",
mhclient->debug, err->message);
@@ -608,10 +653,29 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
ret = FALSE;
break;
}
+ navail -= nread;
+ if (do_event)
+ mem += nread;
first = FALSE;
- } while (nread > 0);
+ }
g_clear_error (&err);
+ if (do_event) {
+ if (ret) {
+ GstBuffer *buf;
+ GstEvent *ev;
+
+ buf = gst_buffer_new_wrapped (omem, maxmem);
+ ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+ gst_structure_new ("GstNetworkMessage",
+ "object", G_TYPE_OBJECT, mhclient->handle.socket,
+ "buffer", GST_TYPE_BUFFER, buf, NULL));
+ gst_buffer_unref (buf);
+
+ gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
+ } else
+ g_free (omem);
+ }
return ret;
}
@@ -1114,6 +1178,9 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
case PROP_SEND_DISPATCHED:
sink->send_dispatched = g_value_get_boolean (value);
break;
+ case PROP_SEND_MESSAGES:
+ sink->send_messages = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -1130,6 +1197,9 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
case PROP_SEND_DISPATCHED:
g_value_set_boolean (value, sink->send_dispatched);
break;
+ case PROP_SEND_MESSAGES:
+ g_value_set_boolean (value, sink->send_messages);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h
index c68c9abc2..c89844d93 100644
--- a/gst/tcp/gstmultisocketsink.h
+++ b/gst/tcp/gstmultisocketsink.h
@@ -68,6 +68,7 @@ struct _GstMultiSocketSink {
/*< private >*/
GMainContext *main_context;
GCancellable *cancellable;
+ gboolean send_messages;
gboolean send_dispatched;
};