summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2007-06-05 16:00:33 +0000
committerWim Taymans <wim.taymans@gmail.com>2007-06-05 16:00:33 +0000
commit56e2a6b5163d3573a6f3bafd9de1b78a80ac1d86 (patch)
treea7f2fb6da1e5108827d348d6eabc9ba26ac2025b /gst
parent80c1e3d27cabc82204a22ce10fc1372ddd5d5c17 (diff)
gst/tcp/gstmultifdsink.*: Add support for remuve_flush.
Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_client_status_get_type), (gst_multi_fd_sink_class_init), (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove_flush), (gst_multi_fd_sink_remove_client_link), (gst_multi_fd_sink_handle_client_write), (gst_multi_fd_sink_handle_clients): * gst/tcp/gstmultifdsink.h: Add support for remuve_flush.
Diffstat (limited to 'gst')
-rw-r--r--gst/tcp/gstmultifdsink.c84
-rw-r--r--gst/tcp/gstmultifdsink.h29
2 files changed, 97 insertions, 16 deletions
diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c
index c7d91333f..ad77c54f0 100644
--- a/gst/tcp/gstmultifdsink.c
+++ b/gst/tcp/gstmultifdsink.c
@@ -174,6 +174,7 @@ enum
SIGNAL_ADD,
SIGNAL_ADD_BURST,
SIGNAL_REMOVE,
+ SIGNAL_REMOVE_FLUSH,
SIGNAL_CLEAR,
SIGNAL_GET_STATS,
@@ -317,6 +318,7 @@ gst_client_status_get_type (void)
{GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
{GST_CLIENT_STATUS_ERROR, "Error", "error"},
{GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
+ {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
{0, NULL, NULL},
};
@@ -506,6 +508,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
/**
+ * GstMultiFdSink::remove_flush:
+ * @gstmultifdsink: the multifdsink element to emit this signal on
+ * @fd: the file descriptor to remove from multifdsink
+ *
+ * Remove the given open file descriptor from multifdsink after flushing all
+ * the pending data to the fd.
+ */
+ gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
+ g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, remove_flush),
+ NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+ /**
* GstMultiFdSink::clear:
* @gstmultifdsink: the multifdsink element to emit this signal on
*
@@ -601,6 +615,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
+ klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush);
klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
@@ -678,6 +693,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
client->fd.fd = fd;
client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1;
+ client->flushcount = -1;
client->bufoffset = 0;
client->sending = NULL;
client->bytes_sent = 0;
@@ -796,6 +812,40 @@ done:
CLIENTS_UNLOCK (sink);
}
+/* "remove-flush" signal implementation */
+void
+gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
+{
+ GList *clink;
+
+ GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
+
+ CLIENTS_LOCK (sink);
+ clink = g_hash_table_lookup (sink->fd_hash, &fd);
+ if (clink != NULL) {
+ GstTCPClient *client = (GstTCPClient *) clink->data;
+
+ if (client->status != GST_CLIENT_STATUS_OK) {
+ GST_INFO_OBJECT (sink,
+ "[fd %5d] Client already disconnecting with status %d",
+ fd, client->status);
+ goto done;
+ }
+
+ /* take the position of the client as the number of buffers left to flush.
+ * If the client was at position -1, we flush 0 buffers, 0 == flush 1
+ * buffer, etc... */
+ client->flushcount = client->bufpos + 1;
+ /* mark client as flushing. We can not remove the client right away because
+ * it might have some buffers to flush in the ->sending queue. */
+ client->status = GST_CLIENT_STATUS_FLUSHING;
+ } else {
+ GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
+ }
+done:
+ CLIENTS_UNLOCK (sink);
+}
+
/* can be called both through the signal (i.e. from any thread) or when
* stopping, after the writing thread has shut down */
void
@@ -948,9 +998,11 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
GST_WARNING_OBJECT (sink,
"[fd %5d] removing client %p because of error", fd, client);
break;
+ case GST_CLIENT_STATUS_FLUSHING:
default:
GST_WARNING_OBJECT (sink,
- "[fd %5d] removing client %p with invalid reason", fd, client);
+ "[fd %5d] removing client %p with invalid reason %d", fd, client,
+ client->status);
break;
}
@@ -1770,15 +1822,19 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
int fd = client->fd.fd;
gboolean more;
gboolean res;
+ gboolean flushing;
GstClockTime now;
GTimeVal nowtv;
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
+ flushing = client->status == GST_CLIENT_STATUS_FLUSHING;
+
/* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
- if (!client->caps_sent) {
+ /* don't need to do anything when the client is flushing */
+ if (!client->caps_sent && !flushing) {
GstPad *peer;
GstCaps *caps;
@@ -1818,6 +1874,10 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* client is too fast, remove from write queue until new buffer is
* available */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+ /* if we flushed out all of the client buffers, we can stop */
+ if (client->flushcount == 0)
+ goto flushed;
+
return TRUE;
} else {
/* client can pick a buffer from the global queue */
@@ -1825,7 +1885,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* for new connections, we need to find a good spot in the
* bufqueue to start streaming from */
- if (client->new_connection) {
+ if (client->new_connection && !flushing) {
gint position = gst_multi_fd_sink_new_client (sink, client);
if (position >= 0) {
@@ -1839,9 +1899,18 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
}
}
+ /* we flushed all remaining buffers, no need to get a new one */
+ if (client->flushcount == 0)
+ goto flushed;
+
/* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--;
+
+ /* decrease flushcount */
+ if (client->flushcount != -1)
+ client->flushcount--;
+
GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
fd, client, client->bufpos);
@@ -1912,6 +1981,12 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
return TRUE;
/* ERRORS */
+flushed:
+ {
+ GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
+ client->status = GST_CLIENT_STATUS_REMOVED;
+ return FALSE;
+ }
connection_reset:
{
GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
@@ -2311,7 +2386,8 @@ restart2:
client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
- if (client->status != GST_CLIENT_STATUS_OK) {
+ if (client->status != GST_CLIENT_STATUS_FLUSHING
+ && client->status != GST_CLIENT_STATUS_OK) {
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h
index d827eadd6..baf70d713 100644
--- a/gst/tcp/gstmultifdsink.h
+++ b/gst/tcp/gstmultifdsink.h
@@ -50,7 +50,7 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
typedef enum {
GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
- GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2),
+ GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiFdSinkFlags;
/**
@@ -68,7 +68,7 @@ typedef enum
GST_RECOVER_POLICY_NONE,
GST_RECOVER_POLICY_RESYNC_LATEST,
GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
- GST_RECOVER_POLICY_RESYNC_KEYFRAME,
+ GST_RECOVER_POLICY_RESYNC_KEYFRAME
} GstRecoverPolicy;
/**
@@ -93,7 +93,7 @@ typedef enum
GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME,
- GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
+ GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod;
/**
@@ -110,7 +110,7 @@ typedef enum
GST_UNIT_TYPE_UNDEFINED,
GST_UNIT_TYPE_BUFFERS,
GST_UNIT_TYPE_TIME,
- GST_UNIT_TYPE_BYTES,
+ GST_UNIT_TYPE_BYTES
} GstUnitType;
/**
@@ -121,6 +121,7 @@ typedef enum
* @GST_CLIENT_STATUS_SLOW : client is too slow
* @GST_CLIENT_STATUS_ERROR : client is in error
* @GST_CLIENT_STATUS_DUPLICATE: same client added twice
+ * @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
*
* This specifies the reason why a client was removed from
* multifdsink and is received in the "client-removed" signal.
@@ -133,6 +134,7 @@ typedef enum
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5,
+ GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
/* structure for a client
@@ -141,6 +143,8 @@ typedef struct {
GstFD fd;
gint bufpos; /* position of this client in the global queue */
+ gint flushcount; /* the remaining number of buffers to flush out or -1 if the
+ client is not flushing. */
GstClientStatus status;
gboolean is_socket;
@@ -249,6 +253,7 @@ struct _GstMultiFdSinkClass {
GstUnitType format, guint64 value,
GstUnitType max_unit, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd);
+ void (*remove_flush) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink);
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
@@ -266,14 +271,14 @@ struct _GstMultiFdSinkClass {
GType gst_multi_fd_sink_get_type (void);
-void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
-void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
- GstUnitType min_unit, guint64 min_value,
- GstUnitType max_unit, guint64 max_value);
-void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
-void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
-GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
-
+void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
+void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
+ GstUnitType min_unit, guint64 min_value,
+ GstUnitType max_unit, guint64 max_value);
+void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
+void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
+void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
+GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
G_END_DECLS