summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2015-12-02 09:52:19 +0100
committerWim Taymans <wtaymans@redhat.com>2015-12-02 10:26:03 +0100
commit01f5ca3da85f89e38d8cf3be94becb3bd30c4708 (patch)
tree65d8bbd5d9cb438bf433810dd38cfc0ccacc7e07
parent2f3eb47a954ec0be764965b807182a63950cab4e (diff)
multisocketsink: keep on reading when we stop sending
When we stop sending because we need more data, still keep a GSource around to receive data from the clients. Also handle read and write in the same go.
-rw-r--r--gst/tcp/gstmultisocketsink.c72
-rw-r--r--gst/tcp/gstmultisocketsink.h1
2 files changed, 44 insertions, 29 deletions
diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c
index cf5a4a573..489cba289 100644
--- a/gst/tcp/gstmultisocketsink.c
+++ b/gst/tcp/gstmultisocketsink.c
@@ -181,6 +181,8 @@ static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient);
static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient);
+static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
+ GstSocketClient * client);
static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
handle, GIOCondition condition, GstMultiSocketSink * sink);
@@ -764,12 +766,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
if (mhclient->bufpos == -1) {
/* client is too fast, remove from write queue until new buffer is
* available */
- /* FIXME: specific */
- if (client->source) {
- g_source_destroy (client->source);
- g_source_unref (client->source);
- client->source = NULL;
- }
+ gst_multi_socket_sink_stop_sending (sink, client);
/* if we flushed out all of the client buffers, we can stop */
if (mhclient->flushcount == 0)
@@ -793,13 +790,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
mhclient->bufpos = position;
} else {
/* cannot send data to this client yet */
- /* FIXME: specific */
- if (client->source) {
- g_source_destroy (client->source);
- g_source_unref (client->source);
- client->source = NULL;
- }
-
+ gst_multi_socket_sink_stop_sending (sink, client);
return TRUE;
}
}
@@ -909,37 +900,58 @@ write_error:
}
static void
-gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
- GstMultiHandleClient * mhclient)
+ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
+ GIOCondition condition)
{
- GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
- GstSocketClient *client = (GstSocketClient *) (mhclient);
+ GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
- if (!sink->main_context)
+ if (client->condition == condition)
return;
- if (!client->source) {
- client->source =
- g_socket_create_source (mhclient->handle.socket,
- G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
+ if (client->source) {
+ g_source_destroy (client->source);
+ g_source_unref (client->source);
+ }
+ if (condition && sink->main_context) {
+ client->source = g_socket_create_source (mhclient->handle.socket,
+ condition, sink->cancellable);
g_source_set_callback (client->source,
(GSourceFunc) gst_multi_socket_sink_socket_condition,
gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
g_source_attach (client->source, sink->main_context);
+ } else {
+ client->source = NULL;
+ condition = 0;
}
+ client->condition = condition;
+}
+
+static void
+gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
+ GstMultiHandleClient * mhclient)
+{
+ GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
+ GstSocketClient *client = (GstSocketClient *) (mhclient);
+
+ ensure_condition (sink, client,
+ G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
}
static void
gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient)
{
+ GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstSocketClient *client = (GstSocketClient *) (mhclient);
- if (client->source) {
- g_source_destroy (client->source);
- g_source_unref (client->source);
- client->source = NULL;
- }
+ ensure_condition (sink, client, 0);
+}
+
+static void
+gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
+ GstSocketClient * client)
+{
+ ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
}
/* Handle the clients. This is called when a socket becomes ready
@@ -987,14 +999,16 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
gst_multi_handle_sink_remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
- } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
+ }
+ if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
/* handle client read */
if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
gst_multi_handle_sink_remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
}
- } else if ((condition & G_IO_OUT)) {
+ }
+ if ((condition & G_IO_OUT)) {
/* handle client write */
if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
gst_multi_handle_sink_remove_client_link (mhsink, clink);
diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h
index e1d25ece2..29edf1b99 100644
--- a/gst/tcp/gstmultisocketsink.h
+++ b/gst/tcp/gstmultisocketsink.h
@@ -54,6 +54,7 @@ typedef struct {
GstMultiHandleClient client;
GSource *source;
+ GIOCondition condition;
} GstSocketClient;
/**