diff options
author | Wim Taymans <wtaymans@redhat.com> | 2015-12-02 09:52:19 +0100 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2015-12-02 10:26:03 +0100 |
commit | 01f5ca3da85f89e38d8cf3be94becb3bd30c4708 (patch) | |
tree | 65d8bbd5d9cb438bf433810dd38cfc0ccacc7e07 | |
parent | 2f3eb47a954ec0be764965b807182a63950cab4e (diff) |
multisocketsink: keep on reading when we stop sending
When we stop sending because we need more data, still keep a GSource
around to receive data from the clients.
Also handle read and write in the same go.
-rw-r--r-- | gst/tcp/gstmultisocketsink.c | 72 | ||||
-rw-r--r-- | gst/tcp/gstmultisocketsink.h | 1 |
2 files changed, 44 insertions, 29 deletions
diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index cf5a4a573..489cba289 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -181,6 +181,8 @@ static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient); static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient); +static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink, + GstSocketClient * client); static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle, GIOCondition condition, GstMultiSocketSink * sink); @@ -764,12 +766,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, if (mhclient->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ - /* FIXME: specific */ - if (client->source) { - g_source_destroy (client->source); - g_source_unref (client->source); - client->source = NULL; - } + gst_multi_socket_sink_stop_sending (sink, client); /* if we flushed out all of the client buffers, we can stop */ if (mhclient->flushcount == 0) @@ -793,13 +790,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, mhclient->bufpos = position; } else { /* cannot send data to this client yet */ - /* FIXME: specific */ - if (client->source) { - g_source_destroy (client->source); - g_source_unref (client->source); - client->source = NULL; - } - + gst_multi_socket_sink_stop_sending (sink, client); return TRUE; } } @@ -909,37 +900,58 @@ write_error: } static void -gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink, - GstMultiHandleClient * mhclient) +ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client, + GIOCondition condition) { - GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); - GstSocketClient *client = (GstSocketClient *) (mhclient); + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - if (!sink->main_context) + if (client->condition == condition) return; - if (!client->source) { - client->source = - g_socket_create_source (mhclient->handle.socket, - G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); + if (client->source) { + g_source_destroy (client->source); + g_source_unref (client->source); + } + if (condition && sink->main_context) { + client->source = g_socket_create_source (mhclient->handle.socket, + condition, sink->cancellable); g_source_set_callback (client->source, (GSourceFunc) gst_multi_socket_sink_socket_condition, gst_object_ref (sink), (GDestroyNotify) gst_object_unref); g_source_attach (client->source, sink->main_context); + } else { + client->source = NULL; + condition = 0; } + client->condition = condition; +} + +static void +gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink, + GstMultiHandleClient * mhclient) +{ + GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); + GstSocketClient *client = (GstSocketClient *) (mhclient); + + ensure_condition (sink, client, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP); } static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient) { + GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); GstSocketClient *client = (GstSocketClient *) (mhclient); - if (client->source) { - g_source_destroy (client->source); - g_source_unref (client->source); - client->source = NULL; - } + ensure_condition (sink, client, 0); +} + +static void +gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink, + GstSocketClient * client) +{ + ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); } /* Handle the clients. This is called when a socket becomes ready @@ -987,14 +999,16 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle, gst_multi_handle_sink_remove_client_link (mhsink, clink); ret = FALSE; goto done; - } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) { + } + if ((condition & G_IO_IN) || (condition & G_IO_PRI)) { /* handle client read */ if (!gst_multi_socket_sink_handle_client_read (sink, client)) { gst_multi_handle_sink_remove_client_link (mhsink, clink); ret = FALSE; goto done; } - } else if ((condition & G_IO_OUT)) { + } + if ((condition & G_IO_OUT)) { /* handle client write */ if (!gst_multi_socket_sink_handle_client_write (sink, client)) { gst_multi_handle_sink_remove_client_link (mhsink, clink); diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index e1d25ece2..29edf1b99 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -54,6 +54,7 @@ typedef struct { GstMultiHandleClient client; GSource *source; + GIOCondition condition; } GstSocketClient; /** |