summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2012-12-18 13:06:39 +0100
committerWim Taymans <wtaymans@redhat.com>2015-08-25 10:15:27 +0200
commitb68140530f3d6881b99e9f4c4924faa4910ac646 (patch)
tree3d9526d4f36d304c660377be9a37c4b5ad9e2754
parentdc744b05b0a6c54abcee89248af2ec2b17a191d7 (diff)
add burstcache objectmaster-fdo
-rw-r--r--gst/tcp/Makefile.am2
-rw-r--r--gst/tcp/gstburstcache.c1400
-rw-r--r--gst/tcp/gstburstcache.h301
3 files changed, 1703 insertions, 0 deletions
diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am
index 1ff06a38c..ef8ccb070 100644
--- a/gst/tcp/Makefile.am
+++ b/gst/tcp/Makefile.am
@@ -10,6 +10,7 @@ endif
libgsttcp_la_SOURCES = \
gstsocketsrc.c \
gsttcpplugin.c \
+ gstburstcache.c \
gsttcpclientsrc.c gsttcpclientsink.c \
$(multifdsink_SOURCES) \
gstmultihandlesink.c \
@@ -24,6 +25,7 @@ libgsttcp_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
noinst_HEADERS = \
gstsocketsrc.h \
gsttcp.h \
+ gstburstcache.h \
gsttcpclientsrc.h gsttcpclientsink.h \
gstmultifdsink.h \
gstmultisocketsink.h \
diff --git a/gst/tcp/gstburstcache.c b/gst/tcp/gstburstcache.c
new file mode 100644
index 000000000..001e639e9
--- /dev/null
+++ b/gst/tcp/gstburstcache.c
@@ -0,0 +1,1400 @@
+/* GStreamer
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2012> Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:gstburstcache
+ * @short_description: caches and implements burst-on-connect of buffers
+ *
+ * GstBurstCache keeps a cache of buffers up to a configurable limit and replays
+ * this cache for newly added readers. This can be used to implement
+ * burst-on-connect for various network scenarios such as TCP or UDP.
+ *
+ * A new cache is created with gst_burst_cache_new(), which requires a size of
+ * the structure to hold the reader information.
+ *
+ * New buffers are put in the cache with gst_burst_cache_queue_buffer(). Old
+ * buffers will be removed from the cache. With gst_burst_cache_set_min_amount()
+ * one can control the amount of data in time/bytes/buffers to keep in the
+ * cache.
+ *
+ * New readers can be constructed with gst_burst_cache_reader_new(). This will
+ * allocate a new reader structure with the configured size when the cache was
+ * constructed. A callback needs to be provided that will be called when new
+ * data is available for the reader.
+ *
+ * The caller can configure the reader with gst_burst_cache_reader_set_burst().
+ * The start_method property will define which buffer in the cahced buffers will
+ * be sent first to the reader. Readers can be sent the most recent buffer
+ * (which might not be decodable by the reader if it is not a keyframe), the
+ * next keyframe received in the cache (which can take some time depending on
+ * the keyframe rate), or the last received keyframe (which will cause a simple
+ * burst-on-connect). GstBurstCache will always keep at least one keyframe in
+ * its internal cache.
+ *
+ * There are additional values for the start_method to allow finer control over
+ * burst-on-connect behaviour. By selecting the 'burst' method a minimum burst
+ * size can be chosen, 'burst-keyframe' additionally requires that the burst
+ * begin with a keyframe, and 'burst-with-keyframe' attempts to burst beginning
+ * with a keyframe, but will prefer a minimum burst size even if it requires
+ * not starting with a keyframe.
+ *
+ * When a reader is created and configured, it can be added to the cache with
+ * gst_burst_cache_add_reader(). This will trigger the callback when new data is
+ * available for the reader. The reader should call gst_burst_cache_get_buffer()
+ * to retrieve the available buffer until the function returns
+ * GST_BURST_CACHE_RESULT_WAIT, in which case it should wait for the callback again.
+ * This makes it possible to implement a push or pull model for retrieving data
+ * from the cache.
+ *
+ * If the reader does not call gst_burst_cache_get_buffer() fast enough, it will
+ * start to lag. With gst_burst_cache_set_limits() you can configure how much a
+ * reader is allowed to lag. You can configure a soft limit and a hard limit in
+ * and format.
+ *
+ * A reader with a lag of at least soft-max enters the recovery procedure which
+ * is controlled with the recover property. A recover policy of NONE will do
+ * nothing, RESYNC_LATEST will send the most recently received buffer as the
+ * next buffer for the reader, RESYNC_SOFT_LIMIT positions the reader to the
+ * soft limit in the buffer queue and RESYNC_KEYFRAME positions the reader at
+ * the most recent keyframe in the buffer queue.
+ *
+ * When the reader is lagging more that the soft-limit, its recovery
+ * procedure will be started, which usually will make it drop buffers to catch
+ * up. When the hard limit is reached, the reader is removed from the cache.
+ *
+ * A reader can be removed from the cache with gst_burst_cache_remove_reader().
+ *
+ * When a reader is in error, gst_burst_cache_error_reader() must be called,
+ * which will cause the reader to be removed from the cache.
+ *
+ * When a reader is freed, its GHook destroy function will be called with the
+ * GHook data. You can install a custom function and data to be notified.
+ *
+ * Last reviewed on 2012-11-09 (1.0.3)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstburstcache.h"
+
+#define DEFAULT_LIMIT_FORMAT GST_FORMAT_BUFFERS
+#define DEFAULT_LIMIT_MAX -1
+#define DEFAULT_LIMIT_SOFT_MAX -1
+#define DEFAULT_TIME_MIN -1
+#define DEFAULT_BYTES_MIN -1
+#define DEFAULT_BUFFERS_MIN -1
+#define DEFAULT_RECOVER GST_BURST_CACHE_RECOVER_NONE
+
+enum
+{
+ PROP_0,
+ PROP_LAST
+};
+
+GST_DEBUG_CATEGORY_STATIC (burstcache_debug);
+#define GST_CAT_DEFAULT (burstcache_debug)
+
+#define gst_burst_cache_parent_class parent_class
+G_DEFINE_TYPE (GstBurstCache, gst_burst_cache, G_TYPE_OBJECT);
+
+#define CACHE_LOCK_INIT(cache) (g_rec_mutex_init(&(cache)->lock))
+#define CACHE_LOCK_CLEAR(cache) (g_rec_mutex_clear(&(cache)->lock))
+#define CACHE_LOCK(cache) (g_rec_mutex_lock(&(cache)->lock))
+#define CACHE_UNLOCK(cache) (g_rec_mutex_unlock(&(cache)->lock))
+
+static void gst_burst_cache_finalize (GObject * object);
+
+G_DEFINE_POINTER_TYPE (GstBurstCacheReader, gst_burst_cache_reader);
+
+static gint find_keyframe (GstBurstCache * cache, gint idx, gint direction);
+#define find_next_keyframe(s,i) find_keyframe(s,i,1)
+#define find_prev_keyframe(s,i) find_keyframe(s,i,-1)
+static gboolean is_keyframe (GstBurstCache * cache, GstBuffer * buffer);
+
+static gint get_buffers_max (GstBurstCache * cache, GstFormat format,
+ gint64 max);
+static gint gst_burst_cache_recover_reader (GstBurstCache * cache,
+ GstBurstCacheReader * reader);
+static gboolean find_limits (GstBurstCache * cache, gint * min_idx,
+ gint bytes_min, gint buffers_min, gint64 time_min, gint * max_idx,
+ gint bytes_max, gint buffers_max, gint64 time_max);
+
+static void
+gst_burst_cache_class_init (GstBurstCacheClass * klass)
+{
+ GObjectClass *gobject_class;
+
+ gobject_class = (GObjectClass *) klass;
+
+ gobject_class->finalize = gst_burst_cache_finalize;
+
+ GST_DEBUG_CATEGORY_INIT (burstcache_debug, "burstcache", 0, "GstBurstCache");
+}
+
+static void
+gst_burst_cache_init (GstBurstCache * this)
+{
+ CACHE_LOCK_INIT (this);
+ this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
+ this->limit_format = DEFAULT_LIMIT_FORMAT;
+ this->limit_max = DEFAULT_LIMIT_MAX;
+ this->limit_soft_max = DEFAULT_LIMIT_SOFT_MAX;
+ this->time_min = DEFAULT_TIME_MIN;
+ this->bytes_min = DEFAULT_BYTES_MIN;
+ this->buffers_min = DEFAULT_BUFFERS_MIN;
+ this->recover = DEFAULT_RECOVER;
+}
+
+static void
+gst_burst_cache_finalize (GObject * object)
+{
+ GstBurstCache *this;
+
+ this = GST_BURST_CACHE (object);
+
+ g_hook_list_clear (&this->readers);
+
+ g_array_free (this->bufqueue, TRUE);
+ CACHE_LOCK_CLEAR (this);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+/**
+ * gst_burst_cache_new:
+ * @reader_size: the size of the hook
+ *
+ * Make a new burst cache. @hook_size specifies the size of the data structure
+ * that is kep for each client and must be at least
+ * sizeof(GstBurstCacheReader).
+ *
+ * Returns: a new #GstBurstCache
+ */
+GstBurstCache *
+gst_burst_cache_new (guint reader_size)
+{
+ GstBurstCache *cache;
+
+ g_return_val_if_fail (reader_size >= sizeof (GstBurstCacheReader), NULL);
+
+ cache = g_object_new (GST_TYPE_BURST_CACHE, NULL);
+ g_hook_list_init (&cache->readers, reader_size);
+
+ return cache;
+}
+
+/**
+ * gst_burst_cache_set_min_amount:
+ * @cache: a #GstBurstCache
+ * @bytes_min: minimum amount to cache in bytes
+ * @time_min: minimum amount to cache in time
+ * @buffers_min: minimum amount to cache in buffers
+ *
+ * Set the minimum amount of data that @cache should keep around. Values can be
+ * specified in bytes, time and buffers. A value of -1 sets unlimited caching.
+ */
+void
+gst_burst_cache_set_min_amount (GstBurstCache * cache, gint bytes_min,
+ gint64 time_min, gint buffers_min)
+{
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+
+ cache->bytes_min = bytes_min;
+ cache->time_min = time_min;
+ cache->buffers_min = buffers_min;
+}
+
+/**
+ * gst_burst_cache_get_min_amount:
+ * @cache: a #GstBurstCache
+ * @bytes_min: (out) (allow-none): result in bytes
+ * @time_min: (out) (allow-none): result in time
+ * @buffers_min: (out) (allow-none): result in buffers
+ *
+ * Get the minimum amount of data that @cache keeps around.
+ */
+void
+gst_burst_cache_get_min_amount (GstBurstCache * cache, gint * bytes_min,
+ gint64 * time_min, gint * buffers_min)
+{
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+
+ if (bytes_min)
+ *bytes_min = cache->bytes_min;
+ if (time_min)
+ *time_min = cache->time_min;
+ if (buffers_min)
+ *buffers_min = cache->buffers_min;
+}
+
+/**
+ * gst_burst_cache_set_limits:
+ * @cache: a #GstBurstCache
+ * @format: format of @max and @soft_max
+ * @max: maximum lag for a reader
+ * @soft_max: maximum lag for a reader before recovery is performed
+ * @recover: a #GstBurstCacheRecover
+ *
+ * Set the limits for readers. When a reader lags more than @soft_max behind the
+ * most recent buffer, the receovery procedure @recovery is started for the
+ * client. If the client lags up to @max, it will be removed from @cache.
+ */
+void
+gst_burst_cache_set_limits (GstBurstCache * cache, GstFormat format,
+ gint64 max, gint64 soft_max, GstBurstCacheRecover recover)
+{
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+
+ cache->limit_format = format;
+ cache->limit_max = max;
+ cache->limit_soft_max = soft_max;
+ cache->recover = recover;
+}
+
+/**
+ * gst_burst_cache_get_limits:
+ * @cache: a #GstBurstCache
+ * @format: (out) (allow-none): result format of @max and @soft_max
+ * @max: (out) (allow-none): result maximum lag for a reader
+ * @soft_max: (out) (allow-none): result maximum lag for a reader before
+ * recovery is performed
+ * @recover: (out) (allow-none): result #GstBurstCacheRecover
+ *
+ * Get the reader limits. See gst_burst_cache_set_limits().
+ */
+void
+gst_burst_cache_get_limits (GstBurstCache * cache, GstFormat * format,
+ gint64 * max, gint64 * soft_max, GstBurstCacheRecover * recover)
+{
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+
+ if (format)
+ *format = cache->limit_format;
+ if (max)
+ *max = cache->limit_max;
+ if (soft_max)
+ *soft_max = cache->limit_soft_max;
+ if (recover)
+ *recover = cache->recover;
+}
+
+/**
+ * gst_burst_cache_reader_destroy:
+ * @reader: a #GstBurstCacheReader
+ *
+ * Cleanup the memory of @reader.
+ */
+void
+gst_burst_cache_reader_destroy (GstBurstCacheReader * reader)
+{
+ if (reader->reason)
+ g_error_free (reader->reason);
+ if (reader->notify)
+ reader->notify (reader->user_data);
+}
+
+/**
+ * gst_burst_cache_reader_new:
+ * @cache: a #GstBurstCache
+ * @callback: a #GstBurstCacheReaderCallback
+ * @user_data: user data
+ * @notify: a #GDestroyNotify for @user_data
+ *
+ * Make a new #GstBurstCacheReader. When data becomes available for the reader,
+ * @callback will be called with @user_data.
+ *
+ * Returns: a new #GstBurstCacheReader.
+ */
+GstBurstCacheReader *
+gst_burst_cache_reader_new (GstBurstCache * cache,
+ GstBurstCacheReaderCallback callback, gpointer user_data,
+ GDestroyNotify notify)
+{
+ GstBurstCacheReader *reader;
+
+ reader = (GstBurstCacheReader *) g_hook_alloc (&cache->readers);
+
+ reader->hook.data = reader;
+ reader->hook.destroy = (GDestroyNotify) gst_burst_cache_reader_destroy;
+
+ reader->bufpos = -1;
+ reader->draincount = -1;
+ reader->new_reader = TRUE;
+ reader->discont = FALSE;
+
+ reader->callback = callback;
+ reader->user_data = user_data;
+ reader->notify = notify;
+
+ reader->reason = NULL;
+
+ reader->start_method = GST_BURST_CACHE_START_LATEST;
+
+ reader->bytes_sent = 0;
+ reader->dropped_buffers = 0;
+ reader->avg_queue_size = 0;
+ reader->first_buffer_ts = GST_CLOCK_TIME_NONE;
+ reader->last_buffer_ts = GST_CLOCK_TIME_NONE;
+
+ /* update start time */
+ reader->add_time = g_get_real_time ();
+ reader->remove_time = 0;
+ /* set last activity time to add time */
+ reader->last_activity_time = reader->add_time;
+
+ return reader;
+}
+
+/**
+ * gst_burst_cache_reader_set_burst:
+ * @reader: a #GstBurstCacheReader
+ * @start_method: a #GstBurstCacheStart
+ * @min_format: format of @min_value
+ * @min_value: minimum burst amount
+ * @max_format: format of @max_value
+ * @max_value: maximum burst amount
+ *
+ * Set the burst parameters for @reader. @start_method defines where to position
+ * the reader in the cache. At least @min_value of data and at most @max_value
+ * of data will be sent to the new client.
+ *
+ * Returns: %TRUE on success.
+ */
+gboolean
+gst_burst_cache_reader_set_burst (GstBurstCacheReader * reader,
+ GstBurstCacheStart start_method, GstFormat min_format, guint64 min_value,
+ GstFormat max_format, guint64 max_value)
+{
+ /* do limits check if we can */
+ if (min_format == max_format) {
+ if (max_value != -1 && min_value != -1 && max_value < min_value)
+ return FALSE;
+ }
+
+ reader->start_method = start_method;
+ reader->min_format = min_format;
+ reader->min_value = min_value;
+ reader->max_format = max_format;
+ reader->max_value = max_value;
+
+ return TRUE;
+}
+
+static gboolean
+is_keyframe (GstBurstCache * cache, GstBuffer * buffer)
+{
+ if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
+ return FALSE;
+ } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
+/* find the keyframe in the list of buffers starting the
+ * search from @idx. @direction as -1 will search backwards,
+ * 1 will search forwards.
+ * Returns: the index or -1 if there is no keyframe after idx.
+ */
+static gint
+find_keyframe (GstBurstCache * cache, gint idx, gint direction)
+{
+ gint i, len, result;
+
+ /* take length of queued buffers */
+ len = cache->bufqueue->len;
+
+ /* assume we don't find a keyframe */
+ result = -1;
+
+ /* then loop over all buffers to find the first keyframe */
+ for (i = idx; i >= 0 && i < len; i += direction) {
+ GstBuffer *buf;
+
+ buf = g_array_index (cache->bufqueue, GstBuffer *, i);
+ if (is_keyframe (cache, buf)) {
+ GST_LOG_OBJECT (cache, "found keyframe at %d from %d, direction %d",
+ i, idx, direction);
+ result = i;
+ break;
+ }
+ }
+ return result;
+}
+
+/* Get the number of buffers from the buffer queue needed to satisfy
+ * the maximum max in the configured units.
+ * If units are not BUFFERS, and there are insufficient buffers in the
+ * queue to satify the limit, return len(queue) + 1 */
+static gint
+get_buffers_max (GstBurstCache * cache, GstFormat format, gint64 max)
+{
+ switch (format) {
+ case GST_FORMAT_BUFFERS:
+ return max;
+ case GST_FORMAT_TIME:
+ {
+ GstBuffer *buf;
+ int i;
+ int len;
+ gint64 diff;
+ GstClockTime first = GST_CLOCK_TIME_NONE;
+
+ len = cache->bufqueue->len;
+
+ for (i = 0; i < len; i++) {
+ buf = g_array_index (cache->bufqueue, GstBuffer *, i);
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ if (first == -1)
+ first = GST_BUFFER_TIMESTAMP (buf);
+
+ diff = first - GST_BUFFER_TIMESTAMP (buf);
+
+ if (diff > max)
+ return i + 1;
+ }
+ }
+ return len + 1;
+ }
+ case GST_FORMAT_BYTES:
+ {
+ GstBuffer *buf;
+ int i;
+ int len;
+ gint acc = 0;
+
+ len = cache->bufqueue->len;
+
+ for (i = 0; i < len; i++) {
+ buf = g_array_index (cache->bufqueue, GstBuffer *, i);
+ acc += gst_buffer_get_size (buf);
+
+ if (acc > max)
+ return i + 1;
+ }
+ return len + 1;
+ }
+ default:
+ return max;
+ }
+}
+
+/* find the positions in the buffer queue where *_min and *_max
+ * is satisfied
+ */
+/* count the amount of data in the buffers and return the index
+ * that satifies the given limits.
+ *
+ * Returns: index @idx in the buffer queue so that the given limits are
+ * satisfied. TRUE if all the limits could be satisfied, FALSE if not
+ * enough data was in the queue.
+ *
+ * FIXME, this code might now work if any of the units is in buffers...
+ */
+static gboolean
+find_limits (GstBurstCache * cache,
+ gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min,
+ gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max)
+{
+ GstClockTime first, time;
+ gint i, len, bytes;
+ gboolean result, max_hit;
+
+ /* take length of queue */
+ len = cache->bufqueue->len;
+
+ /* this must hold */
+ g_assert (len > 0);
+
+ GST_LOG_OBJECT (cache,
+ "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT
+ ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min,
+ buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max,
+ GST_TIME_ARGS (time_max));
+
+ /* do the trivial buffer limit test */
+ if (buffers_min != -1 && len < buffers_min) {
+ *min_idx = len - 1;
+ *max_idx = len - 1;
+ return FALSE;
+ }
+
+ result = FALSE;
+ /* else count bytes and time */
+ first = -1;
+ bytes = 0;
+ /* unset limits */
+ *min_idx = -1;
+ *max_idx = -1;
+ max_hit = FALSE;
+
+ i = 0;
+ /* loop through the buffers, when a limit is ok, mark it
+ * as -1, we have at least one buffer in the queue. */
+ do {
+ GstBuffer *buf;
+
+ /* if we checked all min limits, update result */
+ if (bytes_min == -1 && time_min == -1 && *min_idx == -1) {
+ /* don't go below 0 */
+ *min_idx = MAX (i - 1, 0);
+ }
+ /* if we reached one max limit break out */
+ if (max_hit) {
+ /* i > 0 when we get here, we subtract one to get the position
+ * of the previous buffer. */
+ *max_idx = i - 1;
+ /* we have valid complete result if we found a min_idx too */
+ result = *min_idx != -1;
+ break;
+ }
+ buf = g_array_index (cache->bufqueue, GstBuffer *, i);
+
+ bytes += gst_buffer_get_size (buf);
+
+ /* take timestamp and save for the base first timestamp */
+ if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
+ GST_LOG_OBJECT (cache, "Ts %" GST_TIME_FORMAT " on buffer",
+ GST_TIME_ARGS (time));
+ if (first == -1)
+ first = time;
+
+ /* increase max usage if we did not fill enough. Note that
+ * buffers are sorted from new to old, so the first timestamp is
+ * bigger than the next one. */
+ if (time_min != -1 && first - time >= time_min)
+ time_min = -1;
+ if (time_max != -1 && first - time >= time_max)
+ max_hit = TRUE;
+ } else {
+ GST_LOG_OBJECT (cache, "No timestamp on buffer");
+ }
+ /* time is OK or unknown, check and increase if not enough bytes */
+ if (bytes_min != -1) {
+ if (bytes >= bytes_min)
+ bytes_min = -1;
+ }
+ if (bytes_max != -1) {
+ if (bytes >= bytes_max) {
+ max_hit = TRUE;
+ }
+ }
+ i++;
+ }
+ while (i < len);
+
+ /* if we did not hit the max or min limit, set to buffer size */
+ if (*max_idx == -1)
+ *max_idx = len - 1;
+ /* make sure min does not exceed max */
+ if (*min_idx == -1)
+ *min_idx = *max_idx;
+
+ return result;
+}
+
+/* parse the unit/value pair and assign it to the result value of the
+ * right type, leave the other values untouched
+ *
+ * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
+ */
+static gboolean
+assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
+ GstClockTime * time)
+{
+ gboolean res = TRUE;
+
+ /* set only the limit of the given format to the given value */
+ switch (format) {
+ case GST_FORMAT_BUFFERS:
+ *buffers = (gint) value;
+ break;
+ case GST_FORMAT_TIME:
+ *time = value;
+ break;
+ case GST_FORMAT_BYTES:
+ *bytes = (gint) value;
+ break;
+ case GST_FORMAT_UNDEFINED:
+ default:
+ res = FALSE;
+ break;
+ }
+ return res;
+}
+
+/* count the index in the buffer queue to satisfy the given unit
+ * and value pair starting from buffer at index 0.
+ *
+ * Returns: TRUE if there was enough data in the queue to satisfy the
+ * burst values. @idx contains the index in the buffer that contains enough
+ * data to satisfy the limits or the last buffer in the queue when the
+ * function returns FALSE.
+ */
+static gboolean
+count_burst_unit (GstBurstCache * cache, gint * min_idx,
+ GstFormat min_format, guint64 min_value, gint * max_idx,
+ GstFormat max_format, guint64 max_value)
+{
+ gint bytes_min = -1, buffers_min = -1;
+ gint bytes_max = -1, buffers_max = -1;
+ GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
+
+ assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
+ assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
+
+ return find_limits (cache, min_idx, bytes_min, buffers_min, time_min,
+ max_idx, bytes_max, buffers_max, time_max);
+}
+
+/* decide where in the current buffer queue this new reader should start
+ * receiving buffers from.
+ * This function is called whenever a reader is added and has not yet
+ * received a buffer.
+ */
+static void
+handle_new_reader (GstBurstCache * cache, GstBurstCacheReader * reader)
+{
+ gint position;
+
+ GST_DEBUG_OBJECT (cache,
+ "%s new reader, deciding where to start in queue", reader->debug);
+ GST_DEBUG_OBJECT (cache, "queue is currently %d buffers long",
+ cache->bufqueue->len);
+
+ switch (reader->start_method) {
+ case GST_BURST_CACHE_START_LATEST:
+ /* no syncing, we are happy with whatever the reader is going to get */
+ position = reader->bufpos;
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_LATEST, position %d", reader->debug, position);
+ break;
+ case GST_BURST_CACHE_START_NEXT_KEYFRAME:
+ {
+ /* if one of the new buffers (between reader->bufpos and 0) in the queue
+ * is a key frame, we can proceed, otherwise we need to keep waiting */
+ GST_LOG_OBJECT (cache,
+ "%s new reader, bufpos %d, waiting for keyframe",
+ reader->debug, reader->bufpos);
+
+ position = find_prev_keyframe (cache, reader->bufpos);
+ if (position != -1) {
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_NEXT_KEYFRAME: position %d", reader->debug,
+ position);
+ break;
+ }
+
+ /* reader is not on a keyframe, need to skip these buffers and
+ * wait some more */
+ GST_LOG_OBJECT (cache,
+ "%s new reader, skipping buffer(s), no keyframe found",
+ reader->debug);
+ reader->bufpos = -1;
+ break;
+ }
+ case GST_BURST_CACHE_START_LATEST_KEYFRAME:
+ {
+ GST_DEBUG_OBJECT (cache, "%s BURST_CACHE_START_LATEST_KEYFRAME",
+ reader->debug);
+
+ /* for new readers we initially scan the complete buffer queue for
+ * a keyframe when a buffer is added. If we don't find a keyframe,
+ * we need to wait for the next keyframe and so we change the reader's
+ * start method to GST_BURST_CACHE_START_NEXT_KEYFRAME.
+ */
+ position = find_next_keyframe (cache, 0);
+ if (position != -1) {
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_LATEST_KEYFRAME: position %d", reader->debug,
+ position);
+ break;
+ }
+
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_LATEST_KEYFRAME: no keyframe found, "
+ "switching to BURST_CACHE_START_NEXT_KEYFRAME", reader->debug);
+ /* throw reader to the waiting state */
+ reader->bufpos = -1;
+ /* and make reader sync to next keyframe */
+ reader->start_method = GST_BURST_CACHE_START_NEXT_KEYFRAME;
+ break;
+ }
+ case GST_BURST_CACHE_START_BURST:
+ {
+ gboolean ok;
+ gint max;
+
+ /* move to the position where we satisfy the reader's burst
+ * parameters. If we could not satisfy the parameters because there
+ * is not enough data, we just send what we have (which is in position).
+ * We use the max value to limit the search
+ */
+ ok = count_burst_unit (cache, &position, reader->min_format,
+ reader->min_value, &max, reader->max_format, reader->max_value);
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_BURST: burst_unit returned %d, position %d",
+ reader->debug, ok, position);
+
+ GST_LOG_OBJECT (cache, "min %d, max %d", position, max);
+
+ /* we hit the max and it is below the min, use that then */
+ if (max != -1 && max <= position) {
+ position = MAX (max - 1, 0);
+ GST_DEBUG_OBJECT (cache,
+ "%s BURST_CACHE_START_BURST: position above max, taken down to %d",
+ reader->debug, position);
+ }
+ break;
+ }
+ case GST_BURST_CACHE_START_BURST_KEYFRAME:
+ {
+ gint min_idx, max_idx;
+ gint next_keyframe, prev_keyframe;
+
+ /* BURST_KEYFRAME:
+ *
+ * _always_ start sending a keyframe to the reader. We first search
+ * a keyframe between min/max limits. If there is none, we send it the
+ * last keyframe before min. If there is none, the behaviour is like
+ * NEXT_KEYFRAME.
+ */
+ /* gather burst limits */
+ count_burst_unit (cache, &min_idx, reader->min_format,
+ reader->min_value, &max_idx, reader->max_format, reader->max_value);
+
+ GST_LOG_OBJECT (cache, "min %d, max %d", min_idx, max_idx);
+
+ /* first find a keyframe after min_idx */
+ next_keyframe = find_next_keyframe (cache, min_idx);
+ if (next_keyframe != -1 && next_keyframe < max_idx) {
+ /* we have a valid keyframe and it's below the max */
+ GST_LOG_OBJECT (cache, "found keyframe in min/max limits");
+ position = next_keyframe;
+ break;
+ }
+
+ /* no valid keyframe, try to find one below min */
+ prev_keyframe = find_prev_keyframe (cache, min_idx);
+ if (prev_keyframe != -1) {
+ GST_WARNING_OBJECT (cache,
+ "using keyframe below min in BURST_KEYFRAME start mode");
+ position = prev_keyframe;
+ break;
+ }
+
+ /* no prev keyframe or not enough data */
+ GST_WARNING_OBJECT (cache,
+ "no prev keyframe found in BURST_KEYFRAME start mode, waiting for next");
+
+ /* throw reader to the waiting state */
+ reader->bufpos = -1;
+ /* and make reader sync to next keyframe */
+ reader->start_method = GST_BURST_CACHE_START_NEXT_KEYFRAME;
+ position = -1;
+ break;
+ }
+ case GST_BURST_CACHE_START_BURST_WITH_KEYFRAME:
+ {
+ gint min_idx, max_idx;
+ gint next_keyframe;
+
+ /* BURST_WITH_KEYFRAME:
+ *
+ * try to start sending a keyframe to the reader. We first search
+ * a keyframe between min/max limits. If there is none, we send it the
+ * amount of data up 'till min.
+ */
+ /* gather enough data to burst */
+ count_burst_unit (cache, &min_idx, reader->min_format,
+ reader->min_value, &max_idx, reader->max_format, reader->max_value);
+
+ GST_LOG_OBJECT (cache, "min %d, max %d", min_idx, max_idx);
+
+ /* first find a keyframe after min_idx */
+ next_keyframe = find_next_keyframe (cache, min_idx);
+ if (next_keyframe != -1 && next_keyframe < max_idx) {
+ /* we have a valid keyframe and it's below the max */
+ GST_LOG_OBJECT (cache, "found keyframe in min/max limits");
+ position = next_keyframe;
+ break;
+ }
+
+ /* no keyframe, send data from min_idx */
+ GST_WARNING_OBJECT (cache, "using min in BURST_WITH_KEYFRAME start mode");
+
+ /* make sure we don't go over the max limit */
+ if (max_idx != -1 && max_idx <= min_idx) {
+ position = MAX (max_idx - 1, 0);
+ } else {
+ position = min_idx;
+ }
+
+ break;
+ }
+ default:
+ g_warning ("unknown start method %d", reader->start_method);
+ position = reader->bufpos;
+ break;
+ }
+
+ if (position >= 0) {
+ /* we got a valid spot in the queue */
+ reader->new_reader = FALSE;
+ reader->bufpos = position;
+ /* signal that the reader is ready */
+ reader->callback (cache, reader, reader->user_data);
+ }
+}
+
+/**
+ * gst_burst_cache_add_reader:
+ * @cache: a #GstBurstCache
+ * @reader: a #GstBurstCacheReader
+ *
+ * Add @reader to @cache.
+ *
+ * Returns: %TRUE when @reader could be added
+ */
+gboolean
+gst_burst_cache_add_reader (GstBurstCache * cache, GstBurstCacheReader * reader)
+{
+ g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE);
+ g_return_val_if_fail (reader != NULL, FALSE);
+ g_return_val_if_fail (reader->new_reader, FALSE);
+
+ /* do limits check if we can */
+ if (reader->min_format == reader->max_format) {
+ if (reader->max_value != -1 && reader->min_value != -1 &&
+ reader->max_value < reader->min_value)
+ goto wrong_limits;
+ }
+
+ CACHE_LOCK (cache);
+ handle_new_reader (cache, reader);
+ /* we can add the handle now */
+ g_hook_prepend (&cache->readers, (GHook *) reader);
+ cache->readers_cookie++;
+ CACHE_UNLOCK (cache);
+
+ return TRUE;
+
+ /* errors */
+wrong_limits:
+ {
+ GST_WARNING_OBJECT (cache,
+ "%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
+ G_GUINT64_FORMAT ", unit %d specified when adding reader",
+ reader->debug, reader->min_value, reader->max_value,
+ reader->min_format);
+ return FALSE;
+ }
+}
+
+/* should be called with the readerslock held. */
+static void
+gst_burst_cache_remove_reader_link (GstBurstCache * cache,
+ GstBurstCacheReader * reader, gboolean remove, GError * reason)
+{
+ if (!G_HOOK_IS_VALID (reader))
+ goto was_removing;
+
+ GST_DEBUG_OBJECT (cache, "%s removing reader %p: (%s)",
+ reader->debug, reader, reason ? reason->message : "Unknown reason");
+
+ /* set reader to invalid position while being removed */
+ reader->bufpos = -1;
+ reader->reason = reason;
+ reader->remove_time = g_get_real_time ();
+
+ cache->readers_cookie++;
+ if (remove)
+ g_hook_destroy_link (&cache->readers, (GHook *) reader);
+
+ return;
+
+ /* ERRORS */
+was_removing:
+ {
+ GST_WARNING_OBJECT (cache, "%s reader is already being removed",
+ reader->debug);
+ if (reason)
+ g_error_free (reason);
+ return;
+ }
+}
+
+/**
+ * gst_burst_cache_remove_reader:
+ * @cache: a #GstBurstCache
+ * @reader: a #GstBurstCacheReader
+ * @drain: drain flag
+ *
+ * Remove @reader from @cache. When @drain is %TRUE all remaining data
+ * in the cache will be sent to the reader before it is removed.
+ *
+ * Returns: %TRUE on success
+ */
+gboolean
+gst_burst_cache_remove_reader (GstBurstCache * cache,
+ GstBurstCacheReader * reader, gboolean drain)
+{
+ g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE);
+ g_return_val_if_fail (reader != NULL, FALSE);
+
+ GST_DEBUG_OBJECT (cache, "%s removing reader", reader->debug);
+
+ CACHE_LOCK (cache);
+ if (!G_HOOK_IS_VALID (reader))
+ goto not_valid;
+
+ if (drain) {
+ if (reader->draincount == -1) {
+ /* take the position of the reader as the number of buffers left to drain.
+ * If the reader was at position -1, we drain 0 buffers, 0 == drain 1
+ * buffer, etc... This will mark reader as draining. We can not remove the
+ * reader right away because it might have some buffers to drain in its
+ * queue. */
+ reader->draincount = reader->bufpos + 1;
+ } else {
+ GST_INFO_OBJECT (cache, "%s Reader already draining", reader->debug);
+ }
+ } else {
+ gst_burst_cache_remove_reader_link (cache, reader, TRUE,
+ g_error_new (0, GST_BURST_CACHE_ERROR_NONE, "User requested remove"));
+ }
+ CACHE_UNLOCK (cache);
+
+ return TRUE;
+
+ /* ERRORS */
+not_valid:
+ {
+ GST_WARNING_OBJECT (cache, "reader %s not found!", reader->debug);
+ CACHE_UNLOCK (cache);
+ return FALSE;
+ }
+}
+
+/**
+ * gst_burst_cache_error_reader:
+ * @cache: a #GstBurstCache
+ * @reader: a #GstBurstCacheReader
+ * @error: (transfer full): a #GError
+ *
+ * Remove @reader from @cache and set the reason to @error. Ownership is taken
+ * of @error.
+ *
+ * Returns: %TRUE on success
+ */
+gboolean
+gst_burst_cache_error_reader (GstBurstCache * cache,
+ GstBurstCacheReader * reader, GError * error)
+{
+ g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE);
+ g_return_val_if_fail (reader != NULL, FALSE);
+
+ GST_DEBUG_OBJECT (cache, "%s error reader", reader->debug);
+
+ CACHE_LOCK (cache);
+ if (!G_HOOK_IS_VALID (reader))
+ goto not_valid;
+
+ if (error == NULL)
+ error = g_error_new (0, GST_BURST_CACHE_ERROR_ERROR, "Unknown error");
+
+ GST_WARNING_OBJECT (cache, "%s reader %p error, removing: %s",
+ reader->debug, reader, error->message);
+
+ gst_burst_cache_remove_reader_link (cache, reader, TRUE, error);
+ CACHE_UNLOCK (cache);
+
+ return TRUE;
+
+ /* ERRORS */
+not_valid:
+ {
+ GST_WARNING_OBJECT (cache, "reader %s not found!", reader->debug);
+ CACHE_UNLOCK (cache);
+ return FALSE;
+ }
+}
+
+static gboolean
+remove_hook (GstBurstCacheReader * reader, GstBurstCache * cache)
+{
+ gst_burst_cache_remove_reader_link (cache, reader, FALSE,
+ g_error_new (0, GST_BURST_CACHE_ERROR_NONE, "User requested clear"));
+
+ /* FALSE to remove */
+ return FALSE;
+}
+
+/**
+ * gst_burst_cache_clear_readers:
+ * @cache: a #GstBurstCache
+ *
+ * Remove all readers from @cache.
+ */
+void
+gst_burst_cache_clear_readers (GstBurstCache * cache)
+{
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+
+ GST_DEBUG_OBJECT (cache, "clearing all readers");
+
+ CACHE_LOCK (cache);
+ g_hook_list_marshal_check (&cache->readers, TRUE, (GHookCheckMarshaller)
+ remove_hook, cache);
+ CACHE_UNLOCK (cache);
+}
+
+/* calculate the new position for a reader after recovery. This function
+ * does not update the reader position but merely returns the required
+ * position.
+ */
+static gint
+gst_burst_cache_recover_reader (GstBurstCache * cache,
+ GstBurstCacheReader * reader)
+{
+ gint newbufpos;
+
+ GST_WARNING_OBJECT (cache,
+ "%s reader %p is lagging at %d, recover using policy %d",
+ reader->debug, reader, reader->bufpos, cache->recover);
+
+ switch (cache->recover) {
+ case GST_BURST_CACHE_RECOVER_NONE:
+ /* do nothing, reader will catch up or get kicked out when it reaches
+ * the hard max */
+ newbufpos = reader->bufpos;
+ break;
+ case GST_BURST_CACHE_RECOVER_RESYNC_LATEST:
+ /* move to beginning of queue */
+ newbufpos = -1;
+ break;
+ case GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT:
+ /* move to beginning of soft max */
+ newbufpos =
+ get_buffers_max (cache, cache->limit_format, cache->limit_soft_max);
+ break;
+ case GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME:
+ /* find keyframe in buffers, we search backwards to find the
+ * closest keyframe relative to what this reader already received. */
+ newbufpos = MIN (cache->bufqueue->len - 1,
+ get_buffers_max (cache, cache->limit_format,
+ cache->limit_soft_max) - 1);
+
+ while (newbufpos >= 0) {
+ GstBuffer *buf;
+
+ buf = g_array_index (cache->bufqueue, GstBuffer *, newbufpos);
+ if (is_keyframe (cache, buf)) {
+ /* found a buffer that is not a delta unit */
+ break;
+ }
+ newbufpos--;
+ }
+ break;
+ default:
+ /* unknown recovery procedure */
+ newbufpos =
+ get_buffers_max (cache, cache->limit_format, cache->limit_soft_max);
+ break;
+ }
+ return newbufpos;
+}
+
+typedef struct
+{
+ GstBurstCache *cache;
+ GstBurstCacheClass *klass;
+ GstClockTime now;
+ gint max_buffer_usage;
+ gint max_buffers;
+ gint soft_max_buffers;
+} QueueHookData;
+
+static gboolean
+queue_hook (GstBurstCacheReader * reader, QueueHookData * data)
+{
+ GstBurstCache *cache = data->cache;
+
+ /* move reader forwards */
+ reader->bufpos++;
+
+ GST_LOG_OBJECT (cache, "%s reader %p at position %d",
+ reader->debug, reader, reader->bufpos);
+
+ /* check soft max if needed, recover reader */
+ if (data->soft_max_buffers > 0 && reader->bufpos >= data->soft_max_buffers) {
+ gint newpos;
+
+ newpos = gst_burst_cache_recover_reader (cache, reader);
+ if (newpos != reader->bufpos) {
+ reader->dropped_buffers += reader->bufpos - newpos;
+ reader->bufpos = newpos;
+ reader->discont = TRUE;
+ GST_INFO_OBJECT (cache, "%s reader %p position reset to %d",
+ reader->debug, reader, reader->bufpos);
+ } else {
+ GST_INFO_OBJECT (cache,
+ "%s reader %p not recovering position", reader->debug, reader);
+ }
+ }
+
+ /* check hard max */
+ if (data->max_buffers > 0 && reader->bufpos >= data->max_buffers)
+ goto hit_limit;
+
+ /* check timeout */
+ if (reader->timeout > 0 && data->now - reader->last_activity_time >
+ reader->timeout)
+ goto timeout;
+
+ if (reader->new_reader) {
+ handle_new_reader (cache, reader);
+ } else if (reader->bufpos == 0) {
+ /* reader changed from -1 to 0, we can send data to this reader now. */
+ reader->callback (cache, reader, reader->user_data);
+ }
+
+ /* keep track of maximum buffer usage */
+ if (reader->bufpos > data->max_buffer_usage) {
+ data->max_buffer_usage = reader->bufpos;
+ }
+
+ return TRUE;
+
+ /* ERRORS */
+hit_limit:
+ {
+ GST_WARNING_OBJECT (cache, "%s reader %p is too slow, removing",
+ reader->debug, reader);
+ gst_burst_cache_remove_reader_link (cache, reader, FALSE,
+ g_error_new (0, GST_BURST_CACHE_ERROR_SLOW, "Reader is too slow"));
+ /* remove reader */
+ return FALSE;
+ }
+timeout:
+ {
+ GST_WARNING_OBJECT (cache, "%s reader %p timeout, removing",
+ reader->debug, reader);
+ gst_burst_cache_remove_reader_link (cache, reader, FALSE,
+ g_error_new (0, GST_BURST_CACHE_ERROR_SLOW, "Reader timed out"));
+ /* remove reader */
+ return FALSE;
+ }
+}
+
+/**
+ * gst_burst_cache_queue_buffer:
+ * @cache: a #GstBurstCache
+ * @buffer: a #GstBuffer
+ *
+ * Queue @buffer in @cache. Older and unused buffers will be removed from
+ * @cache.
+ */
+void
+gst_burst_cache_queue_buffer (GstBurstCache * cache, GstBuffer * buffer)
+{
+ gint queuelen;
+ gint i;
+ QueueHookData data;
+
+ g_return_if_fail (GST_IS_BURST_CACHE (cache));
+ g_return_if_fail (buffer != NULL);
+
+ data.now = g_get_real_time ();
+
+ data.klass = GST_BURST_CACHE_GET_CLASS (cache);
+
+ CACHE_LOCK (cache);
+ /* add buffer to queue */
+ g_array_prepend_val (cache->bufqueue, buffer);
+ queuelen = cache->bufqueue->len;
+
+ data.cache = cache;
+
+ if (cache->limit_max > 0)
+ data.max_buffers =
+ get_buffers_max (cache, cache->limit_format, cache->limit_max);
+ else
+ data.max_buffers = -1;
+
+ if (cache->limit_soft_max > 0)
+ data.soft_max_buffers =
+ get_buffers_max (cache, cache->limit_format, cache->limit_soft_max);
+ else
+ data.soft_max_buffers = -1;
+
+ GST_LOG_OBJECT (cache, "Using max %d, softmax %d", data.max_buffers,
+ data.soft_max_buffers);
+
+ /* After adding the buffer, we update all reader positions in the queue. If
+ * a reader moves over the soft max, we start the recovery procedure for this
+ * slow reader. If it goes over the hard max, it is put into the slow list
+ * and removed. */
+ data.max_buffer_usage = 0;
+
+ g_hook_list_marshal_check (&cache->readers, TRUE, (GHookCheckMarshaller)
+ queue_hook, &data);
+
+ /* make sure we respect bytes-min, buffers-min and time-min when they are set */
+ {
+ gint usage, max;
+
+ GST_LOG_OBJECT (cache,
+ "extending queue %d to respect time_min %" GST_TIME_FORMAT
+ ", bytes_min %d, buffers_min %d", data.max_buffer_usage,
+ GST_TIME_ARGS (cache->time_min), cache->bytes_min, cache->buffers_min);
+
+ /* get index where the limits are ok, we don't really care if all limits
+ * are ok, we just queue as much as we need. We also don't compare against
+ * the max limits. */
+ find_limits (cache, &usage, cache->bytes_min, cache->buffers_min,
+ cache->time_min, &max, -1, -1, -1);
+
+ data.max_buffer_usage = MAX (data.max_buffer_usage, usage + 1);
+ GST_LOG_OBJECT (cache, "extended queue to %d", data.max_buffer_usage);
+ }
+
+ /* now look for start points and make sure there is at least one
+ * keyframe point in the queue. */
+ {
+ /* no point in searching beyond the queue length */
+ gint limit = queuelen;
+
+ /* no point in searching beyond the soft-max if any. */
+ if (data.soft_max_buffers > 0) {
+ limit = MIN (limit, data.soft_max_buffers);
+ }
+ GST_LOG_OBJECT (cache,
+ "extending queue to include start point, now at %d, limit is %d",
+ data.max_buffer_usage, limit);
+
+ for (i = 0; i < limit; i++) {
+ GstBuffer *buf;
+
+ buf = g_array_index (cache->bufqueue, GstBuffer *, i);
+ if (is_keyframe (cache, buf)) {
+ /* found a sync frame, now extend the buffer usage to
+ * include at least this frame. */
+ data.max_buffer_usage = MAX (data.max_buffer_usage, i);
+ break;
+ }
+ }
+ GST_LOG_OBJECT (cache, "max buffer usage is now %d", data.max_buffer_usage);
+ }
+
+ GST_LOG_OBJECT (cache, "len %d, usage %d", queuelen, data.max_buffer_usage);
+
+ /* nobody is referencing units after max_buffer_usage so we can
+ * remove them from the queue. We remove them in reverse order as
+ * this is the most optimal for GArray. */
+ for (i = queuelen - 1; i > data.max_buffer_usage; i--) {
+ GstBuffer *old;
+
+ /* queue exceeded max size */
+ queuelen--;
+ old = g_array_index (cache->bufqueue, GstBuffer *, i);
+ cache->bufqueue = g_array_remove_index (cache->bufqueue, i);
+
+ /* unref tail buffer */
+ gst_buffer_unref (old);
+ }
+ /* save for stats */
+ cache->buffers_queued = data.max_buffer_usage;
+ CACHE_UNLOCK (cache);
+}
+
+/**
+ * gst_burst_cache_get_buffer:
+ * @cache: a #GstBurstCache
+ * @reader: a #GstBurstCacheReader
+ * @buffer: a #GstBuffer
+ *
+ * Get the next buffer for @reader in @cache.
+ *
+ * Returns: #GST_BURST_CACHE_RESULT_OK when a buffer is available.
+ * #GST_BURST_CACHE_RESULT_WAIT is returned when no buffers are available, the
+ * caller should wait for the callback signal before attempting to get a
+ * buffer again. #GST_BURST_CACHE_RESULT_EOS is returned when the client has
+ * received all buffers and is ready to be removed.
+ */
+GstBurstCacheResult
+gst_burst_cache_get_buffer (GstBurstCache * cache, GstBurstCacheReader * reader,
+ GstBuffer ** buffer)
+{
+ GstBuffer *buf;
+ GstClockTime timestamp;
+
+ g_return_val_if_fail (GST_IS_BURST_CACHE (cache),
+ GST_BURST_CACHE_RESULT_ERROR);
+ g_return_val_if_fail (reader != NULL, GST_BURST_CACHE_RESULT_ERROR);
+ g_return_val_if_fail (buffer != NULL, GST_BURST_CACHE_RESULT_ERROR);
+
+ CACHE_LOCK (cache);
+ if (reader->bufpos == -1)
+ goto no_data_yet;
+
+ /* we drained all remaining buffers, no need to get a new one */
+ if (reader->draincount == 0)
+ goto drained;
+
+ /* grab buffer */
+ buf = g_array_index (cache->bufqueue, GstBuffer *, reader->bufpos);
+ reader->bufpos--;
+
+ /* update stats */
+ timestamp = GST_BUFFER_TIMESTAMP (buf);
+ if (reader->first_buffer_ts == GST_CLOCK_TIME_NONE)
+ reader->first_buffer_ts = timestamp;
+ if (timestamp != -1)
+ reader->last_buffer_ts = timestamp;
+
+ /* decrease draincount */
+ if (reader->draincount != -1)
+ reader->draincount--;
+
+ GST_LOG_OBJECT (cache, "%s reader %p at position %d",
+ reader->debug, reader, reader->bufpos);
+
+ *buffer = gst_buffer_ref (buf);
+ CACHE_UNLOCK (cache);
+
+ return GST_BURST_CACHE_RESULT_OK;
+
+ /* ERRORS */
+no_data_yet:
+ {
+ GST_DEBUG_OBJECT (cache, "%s no data available", reader->debug);
+ CACHE_UNLOCK (cache);
+ return GST_BURST_CACHE_RESULT_WAIT;
+ }
+drained:
+ {
+ GST_DEBUG_OBJECT (cache, "%s drained", reader->debug);
+ CACHE_UNLOCK (cache);
+ return GST_BURST_CACHE_RESULT_EOS;
+ }
+}
diff --git a/gst/tcp/gstburstcache.h b/gst/tcp/gstburstcache.h
new file mode 100644
index 000000000..7c2d85bd3
--- /dev/null
+++ b/gst/tcp/gstburstcache.h
@@ -0,0 +1,301 @@
+/* GStreamer
+ * Copyright (C) <2012> Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_BURST_CACHE_H__
+#define __GST_BURST_CACHE_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_BURST_CACHE \
+ (gst_burst_cache_get_type())
+#define GST_BURST_CACHE(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BURST_CACHE,GstBurstCache))
+#define GST_BURST_CACHE_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BURST_CACHE,GstBurstCacheClass))
+#define GST_IS_BURST_CACHE(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BURST_CACHE))
+#define GST_IS_BURST_CACHE_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BURST_CACHE))
+#define GST_BURST_CACHE_GET_CLASS(klass) \
+ (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_BURST_CACHE, GstBurstCacheClass))
+
+typedef struct _GstBurstCache GstBurstCache;
+typedef struct _GstBurstCacheClass GstBurstCacheClass;
+typedef struct _GstBurstCacheReader GstBurstCacheReader;
+
+/**
+ * GstBurstCacheRecover:
+ * @GST_BURST_CACHE_RECOVER_NONE : no recovering is done
+ * @GST_BURST_CACHE_RECOVER_RESYNC_LATEST : reader is moved to last buffer
+ * @GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT: reader is moved to the soft limit
+ * @GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME : reader is moved to latest keyframe
+ *
+ * Possible values for the recovery procedure to use when a reader consumes
+ * data too slowly and has a backlog of more that soft-limit buffers.
+ */
+typedef enum
+{
+ GST_BURST_CACHE_RECOVER_NONE,
+ GST_BURST_CACHE_RECOVER_RESYNC_LATEST,
+ GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT,
+ GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME
+} GstBurstCacheRecover;
+
+/**
+ * GstBurstCacheStart:
+ * @GST_BURST_CACHE_START_LATEST : reader receives most recent buffer
+ * @GST_BURST_CACHE_START_NEXT_KEYFRAME : reader receives next keyframe
+ * @GST_BURST_CACHE_START_LATEST_KEYFRAME : reader receives latest keyframe (burst)
+ * @GST_BURST_CACHE_START_BURST : reader receives specific amount of data
+ * @GST_BURST_CACHE_START_BURST_KEYFRAME : reader receives specific amount of data
+ * starting from latest keyframe
+ * @GST_BURST_CACHE_START_BURST_WITH_KEYFRAME : reader receives specific amount of data from
+ * a keyframe, or if there is not enough data after
+ * the keyframe, starting before the keyframe
+ *
+ * This enum defines the selection of the first buffer that is sent
+ * to a new reader.
+ */
+typedef enum
+{
+ GST_BURST_CACHE_START_LATEST,
+ GST_BURST_CACHE_START_NEXT_KEYFRAME,
+ GST_BURST_CACHE_START_LATEST_KEYFRAME,
+ GST_BURST_CACHE_START_BURST,
+ GST_BURST_CACHE_START_BURST_KEYFRAME,
+ GST_BURST_CACHE_START_BURST_WITH_KEYFRAME
+} GstBurstCacheStart;
+
+/**
+ * GstBurstCacheError:
+ * @GST_BURST_CACHE_ERROR_NONE : No error
+ * @GST_BURST_CACHE_ERROR_SLOW : reader is too slow
+ * @GST_BURST_CACHE_ERROR_ERROR : reader is in error
+ * @GST_BURST_CACHE_ERROR_DUPLICATE: same reader added twice
+ *
+ * Error codes used in the reason GError.
+ */
+typedef enum
+{
+ GST_BURST_CACHE_ERROR_NONE = 0,
+ GST_BURST_CACHE_ERROR_SLOW = 1,
+ GST_BURST_CACHE_ERROR_ERROR = 2,
+ GST_BURST_CACHE_ERROR_DUPLICATE = 3,
+} GstBurstCacheError;
+
+/**
+ * GstBurstCacheResult:
+ * @GST_BURST_CACHE_RESULT_ERROR : An error occured
+ * @GST_BURST_CACHE_RESULT_OK : No error
+ * @GST_BURST_CACHE_RESULT_WAIT : Wait for more buffers
+ * @GST_BURST_CACHE_RESULT_EOS : No more buffers
+ *
+ * Error codes used in the reason GError.
+ */
+typedef enum
+{
+ GST_BURST_CACHE_RESULT_ERROR = 0,
+ GST_BURST_CACHE_RESULT_OK = 1,
+ GST_BURST_CACHE_RESULT_WAIT = 2,
+ GST_BURST_CACHE_RESULT_EOS = 3,
+} GstBurstCacheResult;
+
+/**
+ * GstBurstCacheReaderCallback:
+ * @cache: a #GstBurstCache
+ * @reader: a #GstBurstCacheReader
+ * @user_data: user data given when creating @reader
+ *
+ * Called when @reader in @cache has data. You can get the new data with
+ * gst_burst_cache_get_buffer() from this callback or any other thread.
+ */
+typedef void (*GstBurstCacheReaderCallback) (GstBurstCache *cache,
+ GstBurstCacheReader *reader,
+ gpointer user_data);
+
+/**
+ * GstBurstCacheReader:
+ * @object: parent miniobject
+ * @bufpos: position of this reader in the global queue
+ * @draincount: the remaining number of buffers to drain or -1 if the
+ * reader is not draining.
+ * @new_reader: this is a new reader
+ * @discont: is the next buffer was discont
+ * @reason: the reason why the reader is being removed
+ *
+ * structure for a reader
+ */
+struct _GstBurstCacheReader {
+ GHook hook;
+
+ gint bufpos;
+ gint draincount;
+
+ GstBurstCacheReaderCallback callback;
+ gpointer user_data;
+ GDestroyNotify notify;
+
+ gboolean new_reader;
+ gboolean discont;
+
+ GError *reason;
+
+ /* method to sync reader when connecting */
+ GstBurstCacheStart start_method;
+ GstFormat min_format;
+ guint64 min_value;
+ GstFormat max_format;
+ guint64 max_value;
+
+ /* stats */
+ guint64 bytes_sent;
+ guint64 dropped_buffers;
+ guint64 avg_queue_size;
+ guint64 first_buffer_ts;
+ guint64 last_buffer_ts;
+
+ guint64 add_time;
+ guint64 remove_time;
+ guint64 last_activity_time;
+ guint64 timeout;
+
+ gchar debug[30]; /* a debug string used in debug calls to
+ identify the reader */
+};
+
+/**
+ * GstBurstCache:
+ * @parent: parent GObject
+ * @lock: lock to protect @readers
+ * @bufqueue: global queue of buffers
+ * @readers: list of readers we are serving
+ * @readers_cookie: Cookie to detect changes to @readers
+ * @limit_format: the format of @limit_max and @@limit_soft_max
+ * @limit_max: max units to queue for a reader
+ * @limit_soft_max: max units a reader can lag before recovery starts
+ * @recover: how to recover a lagging reader
+ * @bytes_min: min number of bytes to queue
+ * @time_min: min time to queue
+ * @buffers_min: min number of buffers to queue
+ * @bytes_to_serve: how much bytes we must serve
+ * @bytes_served: how much bytes have we served
+ * @bytes_queued: number of queued bytes
+ * @time_queued: amount of queued time
+ * @buffers_queued: number of queued buffers
+ */
+struct _GstBurstCache {
+ GObject parent;
+
+ /*< private >*/
+ GRecMutex lock;
+ GArray *bufqueue;
+ /* the readers */
+ GHookList readers;
+ guint readers_cookie;
+
+ /* these values are used to check if a reader is reading fast
+ * enough and to control recovery */
+ GstFormat limit_format;
+ gint64 limit_max;
+ gint64 limit_soft_max;
+ GstBurstCacheRecover recover;
+
+ /* these values are used to control the amount of data
+ * kept in the queues. It allows readers to perform a burst
+ * on connect. */
+ gint bytes_min;
+ gint64 time_min;
+ gint buffers_min;
+
+ /* stats */
+ gint bytes_queued;
+ gint64 time_queued;
+ gint buffers_queued;
+};
+
+/**
+ * GstBurstCacheClass:
+ * @parent_class: parent GObjectClass
+ * @reader_ready: called when a reader has a new buffer available
+ *
+ * The GstBurstCache class structure.
+ */
+struct _GstBurstCacheClass {
+ GObjectClass parent_class;
+};
+
+GType gst_burst_cache_get_type (void);
+GType gst_burst_cache_reader_get_type (void);
+
+GstBurstCache * gst_burst_cache_new (guint reader_size);
+
+void gst_burst_cache_set_min_amount (GstBurstCache *cache,
+ gint bytes_min,
+ gint64 time_min,
+ gint buffers_min);
+void gst_burst_cache_get_min_amount (GstBurstCache *cache,
+ gint *bytes_min,
+ gint64 *time_min,
+ gint *buffers_min);
+
+void gst_burst_cache_set_limits (GstBurstCache *cache,
+ GstFormat format,
+ gint64 max,
+ gint64 soft_max,
+ GstBurstCacheRecover recover);
+void gst_burst_cache_get_limits (GstBurstCache *cache,
+ GstFormat *format,
+ gint64 *max,
+ gint64 *soft_max,
+ GstBurstCacheRecover *recover);
+
+void gst_burst_cache_queue_buffer (GstBurstCache *cache,
+ GstBuffer *buffer);
+
+GstBurstCacheReader * gst_burst_cache_reader_new (GstBurstCache *cache,
+ GstBurstCacheReaderCallback callback,
+ gpointer user_data,
+ GDestroyNotify notify);
+gboolean gst_burst_cache_reader_set_burst (GstBurstCacheReader *reader,
+ GstBurstCacheStart start_method,
+ GstFormat min_format, guint64 min_value,
+ GstFormat max_format, guint64 max_value);
+void gst_burst_cache_reader_destroy (GstBurstCacheReader *reader);
+
+gboolean gst_burst_cache_add_reader (GstBurstCache *cache,
+ GstBurstCacheReader *reader);
+gboolean gst_burst_cache_remove_reader (GstBurstCache *cache,
+ GstBurstCacheReader *reader,
+ gboolean flush);
+gboolean gst_burst_cache_error_reader (GstBurstCache *cache,
+ GstBurstCacheReader *reader,
+ GError *error);
+
+void gst_burst_cache_clear_readers (GstBurstCache * cache);
+
+
+GstBurstCacheResult gst_burst_cache_get_buffer (GstBurstCache *cache,
+ GstBurstCacheReader *reader,
+ GstBuffer **buffer);
+
+G_END_DECLS
+
+#endif /* __GST_BURST_CACHE_H__ */