From 56e2a6b5163d3573a6f3bafd9de1b78a80ac1d86 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 5 Jun 2007 16:00:33 +0000 Subject: gst/tcp/gstmultifdsink.*: Add support for remuve_flush. Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_client_status_get_type), (gst_multi_fd_sink_class_init), (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove_flush), (gst_multi_fd_sink_remove_client_link), (gst_multi_fd_sink_handle_client_write), (gst_multi_fd_sink_handle_clients): * gst/tcp/gstmultifdsink.h: Add support for remuve_flush. --- gst/tcp/gstmultifdsink.c | 84 +++++++++++++++++++++++++++++++++++++++++++++--- gst/tcp/gstmultifdsink.h | 29 ++++++++++------- 2 files changed, 97 insertions(+), 16 deletions(-) (limited to 'gst') diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index c7d91333f..ad77c54f0 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -174,6 +174,7 @@ enum SIGNAL_ADD, SIGNAL_ADD_BURST, SIGNAL_REMOVE, + SIGNAL_REMOVE_FLUSH, SIGNAL_CLEAR, SIGNAL_GET_STATS, @@ -317,6 +318,7 @@ gst_client_status_get_type (void) {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"}, {GST_CLIENT_STATUS_ERROR, "Error", "error"}, {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"}, + {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"}, {0, NULL, NULL}, }; @@ -505,6 +507,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + /** + * GstMultiFdSink::remove_flush: + * @gstmultifdsink: the multifdsink element to emit this signal on + * @fd: the file descriptor to remove from multifdsink + * + * Remove the given open file descriptor from multifdsink after flushing all + * the pending data to the fd. + */ + gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] = + g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, remove_flush), + NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); /** * GstMultiFdSink::clear: * @gstmultifdsink: the multifdsink element to emit this signal on @@ -601,6 +615,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); + klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush); klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); @@ -678,6 +693,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, client->fd.fd = fd; client->status = GST_CLIENT_STATUS_OK; client->bufpos = -1; + client->flushcount = -1; client->bufoffset = 0; client->sending = NULL; client->bytes_sent = 0; @@ -796,6 +812,40 @@ done: CLIENTS_UNLOCK (sink); } +/* "remove-flush" signal implementation */ +void +gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) +{ + GList *clink; + + GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd); + + CLIENTS_LOCK (sink); + clink = g_hash_table_lookup (sink->fd_hash, &fd); + if (clink != NULL) { + GstTCPClient *client = (GstTCPClient *) clink->data; + + if (client->status != GST_CLIENT_STATUS_OK) { + GST_INFO_OBJECT (sink, + "[fd %5d] Client already disconnecting with status %d", + fd, client->status); + goto done; + } + + /* take the position of the client as the number of buffers left to flush. + * If the client was at position -1, we flush 0 buffers, 0 == flush 1 + * buffer, etc... */ + client->flushcount = client->bufpos + 1; + /* mark client as flushing. We can not remove the client right away because + * it might have some buffers to flush in the ->sending queue. */ + client->status = GST_CLIENT_STATUS_FLUSHING; + } else { + GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); + } +done: + CLIENTS_UNLOCK (sink); +} + /* can be called both through the signal (i.e. from any thread) or when * stopping, after the writing thread has shut down */ void @@ -948,9 +998,11 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p because of error", fd, client); break; + case GST_CLIENT_STATUS_FLUSHING: default: GST_WARNING_OBJECT (sink, - "[fd %5d] removing client %p with invalid reason", fd, client); + "[fd %5d] removing client %p with invalid reason %d", fd, client, + client->status); break; } @@ -1770,15 +1822,19 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, int fd = client->fd.fd; gboolean more; gboolean res; + gboolean flushing; GstClockTime now; GTimeVal nowtv; g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); + flushing = client->status == GST_CLIENT_STATUS_FLUSHING; + /* when using GDP, first check if we have queued caps yet */ if (sink->protocol == GST_TCP_PROTOCOL_GDP) { - if (!client->caps_sent) { + /* don't need to do anything when the client is flushing */ + if (!client->caps_sent && !flushing) { GstPad *peer; GstCaps *caps; @@ -1818,6 +1874,10 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* client is too fast, remove from write queue until new buffer is * available */ gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); + /* if we flushed out all of the client buffers, we can stop */ + if (client->flushcount == 0) + goto flushed; + return TRUE; } else { /* client can pick a buffer from the global queue */ @@ -1825,7 +1885,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ - if (client->new_connection) { + if (client->new_connection && !flushing) { gint position = gst_multi_fd_sink_new_client (sink, client); if (position >= 0) { @@ -1839,9 +1899,18 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, } } + /* we flushed all remaining buffers, no need to get a new one */ + if (client->flushcount == 0) + goto flushed; + /* grab buffer */ buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); client->bufpos--; + + /* decrease flushcount */ + if (client->flushcount != -1) + client->flushcount--; + GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", fd, client, client->bufpos); @@ -1912,6 +1981,12 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, return TRUE; /* ERRORS */ +flushed: + { + GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd); + client->status = GST_CLIENT_STATUS_REMOVED; + return FALSE; + } connection_reset: { GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); @@ -2311,7 +2386,8 @@ restart2: client = (GstTCPClient *) clients->data; next = g_list_next (clients); - if (client->status != GST_CLIENT_STATUS_OK) { + if (client->status != GST_CLIENT_STATUS_FLUSHING + && client->status != GST_CLIENT_STATUS_OK) { gst_multi_fd_sink_remove_client_link (sink, clients); continue; } diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index d827eadd6..baf70d713 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -50,7 +50,7 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; typedef enum { GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2), + GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) } GstMultiFdSinkFlags; /** @@ -68,7 +68,7 @@ typedef enum GST_RECOVER_POLICY_NONE, GST_RECOVER_POLICY_RESYNC_LATEST, GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, - GST_RECOVER_POLICY_RESYNC_KEYFRAME, + GST_RECOVER_POLICY_RESYNC_KEYFRAME } GstRecoverPolicy; /** @@ -93,7 +93,7 @@ typedef enum GST_SYNC_METHOD_LATEST_KEYFRAME, GST_SYNC_METHOD_BURST, GST_SYNC_METHOD_BURST_KEYFRAME, - GST_SYNC_METHOD_BURST_WITH_KEYFRAME, + GST_SYNC_METHOD_BURST_WITH_KEYFRAME } GstSyncMethod; /** @@ -110,7 +110,7 @@ typedef enum GST_UNIT_TYPE_UNDEFINED, GST_UNIT_TYPE_BUFFERS, GST_UNIT_TYPE_TIME, - GST_UNIT_TYPE_BYTES, + GST_UNIT_TYPE_BYTES } GstUnitType; /** @@ -121,6 +121,7 @@ typedef enum * @GST_CLIENT_STATUS_SLOW : client is too slow * @GST_CLIENT_STATUS_ERROR : client is in error * @GST_CLIENT_STATUS_DUPLICATE: same client added twice + * @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers. * * This specifies the reason why a client was removed from * multifdsink and is received in the "client-removed" signal. @@ -133,6 +134,7 @@ typedef enum GST_CLIENT_STATUS_SLOW = 3, GST_CLIENT_STATUS_ERROR = 4, GST_CLIENT_STATUS_DUPLICATE = 5, + GST_CLIENT_STATUS_FLUSHING = 6 } GstClientStatus; /* structure for a client @@ -141,6 +143,8 @@ typedef struct { GstFD fd; gint bufpos; /* position of this client in the global queue */ + gint flushcount; /* the remaining number of buffers to flush out or -1 if the + client is not flushing. */ GstClientStatus status; gboolean is_socket; @@ -249,6 +253,7 @@ struct _GstMultiFdSinkClass { GstUnitType format, guint64 value, GstUnitType max_unit, guint64 max_value); void (*remove) (GstMultiFdSink *sink, int fd); + void (*remove_flush) (GstMultiFdSink *sink, int fd); void (*clear) (GstMultiFdSink *sink); GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); @@ -266,14 +271,14 @@ struct _GstMultiFdSinkClass { GType gst_multi_fd_sink_get_type (void); -void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstUnitType min_unit, guint64 min_value, - GstUnitType max_unit, guint64 max_value); -void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_clear (GstMultiFdSink *sink); -GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); - +void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); +void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, + GstUnitType min_unit, guint64 min_value, + GstUnitType max_unit, guint64 max_value); +void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); +void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); +void gst_multi_fd_sink_clear (GstMultiFdSink *sink); +GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); G_END_DECLS -- cgit v1.2.3