diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2012-12-18 13:06:39 +0100 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2016-04-12 09:29:29 +0200 |
commit | 6bacd788c7f392be87baf16657b13abd8d60b7c5 (patch) | |
tree | 90872e0e9963b7227929d9cd6448cc62afc7e03e | |
parent | f05ea1e6a62e12d166b6a2f2d0e4b59018515d47 (diff) |
add burstcache objectburstcache
-rw-r--r-- | gst/tcp/Makefile.am | 2 | ||||
-rw-r--r-- | gst/tcp/gstburstcache.c | 1400 | ||||
-rw-r--r-- | gst/tcp/gstburstcache.h | 301 |
3 files changed, 1703 insertions, 0 deletions
diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index 1ff06a38c..ef8ccb070 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -10,6 +10,7 @@ endif libgsttcp_la_SOURCES = \ gstsocketsrc.c \ gsttcpplugin.c \ + gstburstcache.c \ gsttcpclientsrc.c gsttcpclientsink.c \ $(multifdsink_SOURCES) \ gstmultihandlesink.c \ @@ -24,6 +25,7 @@ libgsttcp_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) noinst_HEADERS = \ gstsocketsrc.h \ gsttcp.h \ + gstburstcache.h \ gsttcpclientsrc.h gsttcpclientsink.h \ gstmultifdsink.h \ gstmultisocketsink.h \ diff --git a/gst/tcp/gstburstcache.c b/gst/tcp/gstburstcache.c new file mode 100644 index 000000000..001e639e9 --- /dev/null +++ b/gst/tcp/gstburstcache.c @@ -0,0 +1,1400 @@ +/* GStreamer + * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> + * Copyright (C) <2012> Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:gstburstcache + * @short_description: caches and implements burst-on-connect of buffers + * + * GstBurstCache keeps a cache of buffers up to a configurable limit and replays + * this cache for newly added readers. This can be used to implement + * burst-on-connect for various network scenarios such as TCP or UDP. + * + * A new cache is created with gst_burst_cache_new(), which requires a size of + * the structure to hold the reader information. + * + * New buffers are put in the cache with gst_burst_cache_queue_buffer(). Old + * buffers will be removed from the cache. With gst_burst_cache_set_min_amount() + * one can control the amount of data in time/bytes/buffers to keep in the + * cache. + * + * New readers can be constructed with gst_burst_cache_reader_new(). This will + * allocate a new reader structure with the configured size when the cache was + * constructed. A callback needs to be provided that will be called when new + * data is available for the reader. + * + * The caller can configure the reader with gst_burst_cache_reader_set_burst(). + * The start_method property will define which buffer in the cahced buffers will + * be sent first to the reader. Readers can be sent the most recent buffer + * (which might not be decodable by the reader if it is not a keyframe), the + * next keyframe received in the cache (which can take some time depending on + * the keyframe rate), or the last received keyframe (which will cause a simple + * burst-on-connect). GstBurstCache will always keep at least one keyframe in + * its internal cache. + * + * There are additional values for the start_method to allow finer control over + * burst-on-connect behaviour. By selecting the 'burst' method a minimum burst + * size can be chosen, 'burst-keyframe' additionally requires that the burst + * begin with a keyframe, and 'burst-with-keyframe' attempts to burst beginning + * with a keyframe, but will prefer a minimum burst size even if it requires + * not starting with a keyframe. + * + * When a reader is created and configured, it can be added to the cache with + * gst_burst_cache_add_reader(). This will trigger the callback when new data is + * available for the reader. The reader should call gst_burst_cache_get_buffer() + * to retrieve the available buffer until the function returns + * GST_BURST_CACHE_RESULT_WAIT, in which case it should wait for the callback again. + * This makes it possible to implement a push or pull model for retrieving data + * from the cache. + * + * If the reader does not call gst_burst_cache_get_buffer() fast enough, it will + * start to lag. With gst_burst_cache_set_limits() you can configure how much a + * reader is allowed to lag. You can configure a soft limit and a hard limit in + * and format. + * + * A reader with a lag of at least soft-max enters the recovery procedure which + * is controlled with the recover property. A recover policy of NONE will do + * nothing, RESYNC_LATEST will send the most recently received buffer as the + * next buffer for the reader, RESYNC_SOFT_LIMIT positions the reader to the + * soft limit in the buffer queue and RESYNC_KEYFRAME positions the reader at + * the most recent keyframe in the buffer queue. + * + * When the reader is lagging more that the soft-limit, its recovery + * procedure will be started, which usually will make it drop buffers to catch + * up. When the hard limit is reached, the reader is removed from the cache. + * + * A reader can be removed from the cache with gst_burst_cache_remove_reader(). + * + * When a reader is in error, gst_burst_cache_error_reader() must be called, + * which will cause the reader to be removed from the cache. + * + * When a reader is freed, its GHook destroy function will be called with the + * GHook data. You can install a custom function and data to be notified. + * + * Last reviewed on 2012-11-09 (1.0.3) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstburstcache.h" + +#define DEFAULT_LIMIT_FORMAT GST_FORMAT_BUFFERS +#define DEFAULT_LIMIT_MAX -1 +#define DEFAULT_LIMIT_SOFT_MAX -1 +#define DEFAULT_TIME_MIN -1 +#define DEFAULT_BYTES_MIN -1 +#define DEFAULT_BUFFERS_MIN -1 +#define DEFAULT_RECOVER GST_BURST_CACHE_RECOVER_NONE + +enum +{ + PROP_0, + PROP_LAST +}; + +GST_DEBUG_CATEGORY_STATIC (burstcache_debug); +#define GST_CAT_DEFAULT (burstcache_debug) + +#define gst_burst_cache_parent_class parent_class +G_DEFINE_TYPE (GstBurstCache, gst_burst_cache, G_TYPE_OBJECT); + +#define CACHE_LOCK_INIT(cache) (g_rec_mutex_init(&(cache)->lock)) +#define CACHE_LOCK_CLEAR(cache) (g_rec_mutex_clear(&(cache)->lock)) +#define CACHE_LOCK(cache) (g_rec_mutex_lock(&(cache)->lock)) +#define CACHE_UNLOCK(cache) (g_rec_mutex_unlock(&(cache)->lock)) + +static void gst_burst_cache_finalize (GObject * object); + +G_DEFINE_POINTER_TYPE (GstBurstCacheReader, gst_burst_cache_reader); + +static gint find_keyframe (GstBurstCache * cache, gint idx, gint direction); +#define find_next_keyframe(s,i) find_keyframe(s,i,1) +#define find_prev_keyframe(s,i) find_keyframe(s,i,-1) +static gboolean is_keyframe (GstBurstCache * cache, GstBuffer * buffer); + +static gint get_buffers_max (GstBurstCache * cache, GstFormat format, + gint64 max); +static gint gst_burst_cache_recover_reader (GstBurstCache * cache, + GstBurstCacheReader * reader); +static gboolean find_limits (GstBurstCache * cache, gint * min_idx, + gint bytes_min, gint buffers_min, gint64 time_min, gint * max_idx, + gint bytes_max, gint buffers_max, gint64 time_max); + +static void +gst_burst_cache_class_init (GstBurstCacheClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = gst_burst_cache_finalize; + + GST_DEBUG_CATEGORY_INIT (burstcache_debug, "burstcache", 0, "GstBurstCache"); +} + +static void +gst_burst_cache_init (GstBurstCache * this) +{ + CACHE_LOCK_INIT (this); + this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); + this->limit_format = DEFAULT_LIMIT_FORMAT; + this->limit_max = DEFAULT_LIMIT_MAX; + this->limit_soft_max = DEFAULT_LIMIT_SOFT_MAX; + this->time_min = DEFAULT_TIME_MIN; + this->bytes_min = DEFAULT_BYTES_MIN; + this->buffers_min = DEFAULT_BUFFERS_MIN; + this->recover = DEFAULT_RECOVER; +} + +static void +gst_burst_cache_finalize (GObject * object) +{ + GstBurstCache *this; + + this = GST_BURST_CACHE (object); + + g_hook_list_clear (&this->readers); + + g_array_free (this->bufqueue, TRUE); + CACHE_LOCK_CLEAR (this); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +/** + * gst_burst_cache_new: + * @reader_size: the size of the hook + * + * Make a new burst cache. @hook_size specifies the size of the data structure + * that is kep for each client and must be at least + * sizeof(GstBurstCacheReader). + * + * Returns: a new #GstBurstCache + */ +GstBurstCache * +gst_burst_cache_new (guint reader_size) +{ + GstBurstCache *cache; + + g_return_val_if_fail (reader_size >= sizeof (GstBurstCacheReader), NULL); + + cache = g_object_new (GST_TYPE_BURST_CACHE, NULL); + g_hook_list_init (&cache->readers, reader_size); + + return cache; +} + +/** + * gst_burst_cache_set_min_amount: + * @cache: a #GstBurstCache + * @bytes_min: minimum amount to cache in bytes + * @time_min: minimum amount to cache in time + * @buffers_min: minimum amount to cache in buffers + * + * Set the minimum amount of data that @cache should keep around. Values can be + * specified in bytes, time and buffers. A value of -1 sets unlimited caching. + */ +void +gst_burst_cache_set_min_amount (GstBurstCache * cache, gint bytes_min, + gint64 time_min, gint buffers_min) +{ + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + + cache->bytes_min = bytes_min; + cache->time_min = time_min; + cache->buffers_min = buffers_min; +} + +/** + * gst_burst_cache_get_min_amount: + * @cache: a #GstBurstCache + * @bytes_min: (out) (allow-none): result in bytes + * @time_min: (out) (allow-none): result in time + * @buffers_min: (out) (allow-none): result in buffers + * + * Get the minimum amount of data that @cache keeps around. + */ +void +gst_burst_cache_get_min_amount (GstBurstCache * cache, gint * bytes_min, + gint64 * time_min, gint * buffers_min) +{ + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + + if (bytes_min) + *bytes_min = cache->bytes_min; + if (time_min) + *time_min = cache->time_min; + if (buffers_min) + *buffers_min = cache->buffers_min; +} + +/** + * gst_burst_cache_set_limits: + * @cache: a #GstBurstCache + * @format: format of @max and @soft_max + * @max: maximum lag for a reader + * @soft_max: maximum lag for a reader before recovery is performed + * @recover: a #GstBurstCacheRecover + * + * Set the limits for readers. When a reader lags more than @soft_max behind the + * most recent buffer, the receovery procedure @recovery is started for the + * client. If the client lags up to @max, it will be removed from @cache. + */ +void +gst_burst_cache_set_limits (GstBurstCache * cache, GstFormat format, + gint64 max, gint64 soft_max, GstBurstCacheRecover recover) +{ + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + + cache->limit_format = format; + cache->limit_max = max; + cache->limit_soft_max = soft_max; + cache->recover = recover; +} + +/** + * gst_burst_cache_get_limits: + * @cache: a #GstBurstCache + * @format: (out) (allow-none): result format of @max and @soft_max + * @max: (out) (allow-none): result maximum lag for a reader + * @soft_max: (out) (allow-none): result maximum lag for a reader before + * recovery is performed + * @recover: (out) (allow-none): result #GstBurstCacheRecover + * + * Get the reader limits. See gst_burst_cache_set_limits(). + */ +void +gst_burst_cache_get_limits (GstBurstCache * cache, GstFormat * format, + gint64 * max, gint64 * soft_max, GstBurstCacheRecover * recover) +{ + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + + if (format) + *format = cache->limit_format; + if (max) + *max = cache->limit_max; + if (soft_max) + *soft_max = cache->limit_soft_max; + if (recover) + *recover = cache->recover; +} + +/** + * gst_burst_cache_reader_destroy: + * @reader: a #GstBurstCacheReader + * + * Cleanup the memory of @reader. + */ +void +gst_burst_cache_reader_destroy (GstBurstCacheReader * reader) +{ + if (reader->reason) + g_error_free (reader->reason); + if (reader->notify) + reader->notify (reader->user_data); +} + +/** + * gst_burst_cache_reader_new: + * @cache: a #GstBurstCache + * @callback: a #GstBurstCacheReaderCallback + * @user_data: user data + * @notify: a #GDestroyNotify for @user_data + * + * Make a new #GstBurstCacheReader. When data becomes available for the reader, + * @callback will be called with @user_data. + * + * Returns: a new #GstBurstCacheReader. + */ +GstBurstCacheReader * +gst_burst_cache_reader_new (GstBurstCache * cache, + GstBurstCacheReaderCallback callback, gpointer user_data, + GDestroyNotify notify) +{ + GstBurstCacheReader *reader; + + reader = (GstBurstCacheReader *) g_hook_alloc (&cache->readers); + + reader->hook.data = reader; + reader->hook.destroy = (GDestroyNotify) gst_burst_cache_reader_destroy; + + reader->bufpos = -1; + reader->draincount = -1; + reader->new_reader = TRUE; + reader->discont = FALSE; + + reader->callback = callback; + reader->user_data = user_data; + reader->notify = notify; + + reader->reason = NULL; + + reader->start_method = GST_BURST_CACHE_START_LATEST; + + reader->bytes_sent = 0; + reader->dropped_buffers = 0; + reader->avg_queue_size = 0; + reader->first_buffer_ts = GST_CLOCK_TIME_NONE; + reader->last_buffer_ts = GST_CLOCK_TIME_NONE; + + /* update start time */ + reader->add_time = g_get_real_time (); + reader->remove_time = 0; + /* set last activity time to add time */ + reader->last_activity_time = reader->add_time; + + return reader; +} + +/** + * gst_burst_cache_reader_set_burst: + * @reader: a #GstBurstCacheReader + * @start_method: a #GstBurstCacheStart + * @min_format: format of @min_value + * @min_value: minimum burst amount + * @max_format: format of @max_value + * @max_value: maximum burst amount + * + * Set the burst parameters for @reader. @start_method defines where to position + * the reader in the cache. At least @min_value of data and at most @max_value + * of data will be sent to the new client. + * + * Returns: %TRUE on success. + */ +gboolean +gst_burst_cache_reader_set_burst (GstBurstCacheReader * reader, + GstBurstCacheStart start_method, GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value) +{ + /* do limits check if we can */ + if (min_format == max_format) { + if (max_value != -1 && min_value != -1 && max_value < min_value) + return FALSE; + } + + reader->start_method = start_method; + reader->min_format = min_format; + reader->min_value = min_value; + reader->max_format = max_format; + reader->max_value = max_value; + + return TRUE; +} + +static gboolean +is_keyframe (GstBurstCache * cache, GstBuffer * buffer) +{ + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { + return FALSE; + } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) { + return TRUE; + } + return FALSE; +} + +/* find the keyframe in the list of buffers starting the + * search from @idx. @direction as -1 will search backwards, + * 1 will search forwards. + * Returns: the index or -1 if there is no keyframe after idx. + */ +static gint +find_keyframe (GstBurstCache * cache, gint idx, gint direction) +{ + gint i, len, result; + + /* take length of queued buffers */ + len = cache->bufqueue->len; + + /* assume we don't find a keyframe */ + result = -1; + + /* then loop over all buffers to find the first keyframe */ + for (i = idx; i >= 0 && i < len; i += direction) { + GstBuffer *buf; + + buf = g_array_index (cache->bufqueue, GstBuffer *, i); + if (is_keyframe (cache, buf)) { + GST_LOG_OBJECT (cache, "found keyframe at %d from %d, direction %d", + i, idx, direction); + result = i; + break; + } + } + return result; +} + +/* Get the number of buffers from the buffer queue needed to satisfy + * the maximum max in the configured units. + * If units are not BUFFERS, and there are insufficient buffers in the + * queue to satify the limit, return len(queue) + 1 */ +static gint +get_buffers_max (GstBurstCache * cache, GstFormat format, gint64 max) +{ + switch (format) { + case GST_FORMAT_BUFFERS: + return max; + case GST_FORMAT_TIME: + { + GstBuffer *buf; + int i; + int len; + gint64 diff; + GstClockTime first = GST_CLOCK_TIME_NONE; + + len = cache->bufqueue->len; + + for (i = 0; i < len; i++) { + buf = g_array_index (cache->bufqueue, GstBuffer *, i); + if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + if (first == -1) + first = GST_BUFFER_TIMESTAMP (buf); + + diff = first - GST_BUFFER_TIMESTAMP (buf); + + if (diff > max) + return i + 1; + } + } + return len + 1; + } + case GST_FORMAT_BYTES: + { + GstBuffer *buf; + int i; + int len; + gint acc = 0; + + len = cache->bufqueue->len; + + for (i = 0; i < len; i++) { + buf = g_array_index (cache->bufqueue, GstBuffer *, i); + acc += gst_buffer_get_size (buf); + + if (acc > max) + return i + 1; + } + return len + 1; + } + default: + return max; + } +} + +/* find the positions in the buffer queue where *_min and *_max + * is satisfied + */ +/* count the amount of data in the buffers and return the index + * that satifies the given limits. + * + * Returns: index @idx in the buffer queue so that the given limits are + * satisfied. TRUE if all the limits could be satisfied, FALSE if not + * enough data was in the queue. + * + * FIXME, this code might now work if any of the units is in buffers... + */ +static gboolean +find_limits (GstBurstCache * cache, + gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, + gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) +{ + GstClockTime first, time; + gint i, len, bytes; + gboolean result, max_hit; + + /* take length of queue */ + len = cache->bufqueue->len; + + /* this must hold */ + g_assert (len > 0); + + GST_LOG_OBJECT (cache, + "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT + ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min, + buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max, + GST_TIME_ARGS (time_max)); + + /* do the trivial buffer limit test */ + if (buffers_min != -1 && len < buffers_min) { + *min_idx = len - 1; + *max_idx = len - 1; + return FALSE; + } + + result = FALSE; + /* else count bytes and time */ + first = -1; + bytes = 0; + /* unset limits */ + *min_idx = -1; + *max_idx = -1; + max_hit = FALSE; + + i = 0; + /* loop through the buffers, when a limit is ok, mark it + * as -1, we have at least one buffer in the queue. */ + do { + GstBuffer *buf; + + /* if we checked all min limits, update result */ + if (bytes_min == -1 && time_min == -1 && *min_idx == -1) { + /* don't go below 0 */ + *min_idx = MAX (i - 1, 0); + } + /* if we reached one max limit break out */ + if (max_hit) { + /* i > 0 when we get here, we subtract one to get the position + * of the previous buffer. */ + *max_idx = i - 1; + /* we have valid complete result if we found a min_idx too */ + result = *min_idx != -1; + break; + } + buf = g_array_index (cache->bufqueue, GstBuffer *, i); + + bytes += gst_buffer_get_size (buf); + + /* take timestamp and save for the base first timestamp */ + if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { + GST_LOG_OBJECT (cache, "Ts %" GST_TIME_FORMAT " on buffer", + GST_TIME_ARGS (time)); + if (first == -1) + first = time; + + /* increase max usage if we did not fill enough. Note that + * buffers are sorted from new to old, so the first timestamp is + * bigger than the next one. */ + if (time_min != -1 && first - time >= time_min) + time_min = -1; + if (time_max != -1 && first - time >= time_max) + max_hit = TRUE; + } else { + GST_LOG_OBJECT (cache, "No timestamp on buffer"); + } + /* time is OK or unknown, check and increase if not enough bytes */ + if (bytes_min != -1) { + if (bytes >= bytes_min) + bytes_min = -1; + } + if (bytes_max != -1) { + if (bytes >= bytes_max) { + max_hit = TRUE; + } + } + i++; + } + while (i < len); + + /* if we did not hit the max or min limit, set to buffer size */ + if (*max_idx == -1) + *max_idx = len - 1; + /* make sure min does not exceed max */ + if (*min_idx == -1) + *min_idx = *max_idx; + + return result; +} + +/* parse the unit/value pair and assign it to the result value of the + * right type, leave the other values untouched + * + * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. + */ +static gboolean +assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, + GstClockTime * time) +{ + gboolean res = TRUE; + + /* set only the limit of the given format to the given value */ + switch (format) { + case GST_FORMAT_BUFFERS: + *buffers = (gint) value; + break; + case GST_FORMAT_TIME: + *time = value; + break; + case GST_FORMAT_BYTES: + *bytes = (gint) value; + break; + case GST_FORMAT_UNDEFINED: + default: + res = FALSE; + break; + } + return res; +} + +/* count the index in the buffer queue to satisfy the given unit + * and value pair starting from buffer at index 0. + * + * Returns: TRUE if there was enough data in the queue to satisfy the + * burst values. @idx contains the index in the buffer that contains enough + * data to satisfy the limits or the last buffer in the queue when the + * function returns FALSE. + */ +static gboolean +count_burst_unit (GstBurstCache * cache, gint * min_idx, + GstFormat min_format, guint64 min_value, gint * max_idx, + GstFormat max_format, guint64 max_value) +{ + gint bytes_min = -1, buffers_min = -1; + gint bytes_max = -1, buffers_max = -1; + GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; + + assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min); + assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max); + + return find_limits (cache, min_idx, bytes_min, buffers_min, time_min, + max_idx, bytes_max, buffers_max, time_max); +} + +/* decide where in the current buffer queue this new reader should start + * receiving buffers from. + * This function is called whenever a reader is added and has not yet + * received a buffer. + */ +static void +handle_new_reader (GstBurstCache * cache, GstBurstCacheReader * reader) +{ + gint position; + + GST_DEBUG_OBJECT (cache, + "%s new reader, deciding where to start in queue", reader->debug); + GST_DEBUG_OBJECT (cache, "queue is currently %d buffers long", + cache->bufqueue->len); + + switch (reader->start_method) { + case GST_BURST_CACHE_START_LATEST: + /* no syncing, we are happy with whatever the reader is going to get */ + position = reader->bufpos; + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_LATEST, position %d", reader->debug, position); + break; + case GST_BURST_CACHE_START_NEXT_KEYFRAME: + { + /* if one of the new buffers (between reader->bufpos and 0) in the queue + * is a key frame, we can proceed, otherwise we need to keep waiting */ + GST_LOG_OBJECT (cache, + "%s new reader, bufpos %d, waiting for keyframe", + reader->debug, reader->bufpos); + + position = find_prev_keyframe (cache, reader->bufpos); + if (position != -1) { + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_NEXT_KEYFRAME: position %d", reader->debug, + position); + break; + } + + /* reader is not on a keyframe, need to skip these buffers and + * wait some more */ + GST_LOG_OBJECT (cache, + "%s new reader, skipping buffer(s), no keyframe found", + reader->debug); + reader->bufpos = -1; + break; + } + case GST_BURST_CACHE_START_LATEST_KEYFRAME: + { + GST_DEBUG_OBJECT (cache, "%s BURST_CACHE_START_LATEST_KEYFRAME", + reader->debug); + + /* for new readers we initially scan the complete buffer queue for + * a keyframe when a buffer is added. If we don't find a keyframe, + * we need to wait for the next keyframe and so we change the reader's + * start method to GST_BURST_CACHE_START_NEXT_KEYFRAME. + */ + position = find_next_keyframe (cache, 0); + if (position != -1) { + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_LATEST_KEYFRAME: position %d", reader->debug, + position); + break; + } + + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_LATEST_KEYFRAME: no keyframe found, " + "switching to BURST_CACHE_START_NEXT_KEYFRAME", reader->debug); + /* throw reader to the waiting state */ + reader->bufpos = -1; + /* and make reader sync to next keyframe */ + reader->start_method = GST_BURST_CACHE_START_NEXT_KEYFRAME; + break; + } + case GST_BURST_CACHE_START_BURST: + { + gboolean ok; + gint max; + + /* move to the position where we satisfy the reader's burst + * parameters. If we could not satisfy the parameters because there + * is not enough data, we just send what we have (which is in position). + * We use the max value to limit the search + */ + ok = count_burst_unit (cache, &position, reader->min_format, + reader->min_value, &max, reader->max_format, reader->max_value); + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_BURST: burst_unit returned %d, position %d", + reader->debug, ok, position); + + GST_LOG_OBJECT (cache, "min %d, max %d", position, max); + + /* we hit the max and it is below the min, use that then */ + if (max != -1 && max <= position) { + position = MAX (max - 1, 0); + GST_DEBUG_OBJECT (cache, + "%s BURST_CACHE_START_BURST: position above max, taken down to %d", + reader->debug, position); + } + break; + } + case GST_BURST_CACHE_START_BURST_KEYFRAME: + { + gint min_idx, max_idx; + gint next_keyframe, prev_keyframe; + + /* BURST_KEYFRAME: + * + * _always_ start sending a keyframe to the reader. We first search + * a keyframe between min/max limits. If there is none, we send it the + * last keyframe before min. If there is none, the behaviour is like + * NEXT_KEYFRAME. + */ + /* gather burst limits */ + count_burst_unit (cache, &min_idx, reader->min_format, + reader->min_value, &max_idx, reader->max_format, reader->max_value); + + GST_LOG_OBJECT (cache, "min %d, max %d", min_idx, max_idx); + + /* first find a keyframe after min_idx */ + next_keyframe = find_next_keyframe (cache, min_idx); + if (next_keyframe != -1 && next_keyframe < max_idx) { + /* we have a valid keyframe and it's below the max */ + GST_LOG_OBJECT (cache, "found keyframe in min/max limits"); + position = next_keyframe; + break; + } + + /* no valid keyframe, try to find one below min */ + prev_keyframe = find_prev_keyframe (cache, min_idx); + if (prev_keyframe != -1) { + GST_WARNING_OBJECT (cache, + "using keyframe below min in BURST_KEYFRAME start mode"); + position = prev_keyframe; + break; + } + + /* no prev keyframe or not enough data */ + GST_WARNING_OBJECT (cache, + "no prev keyframe found in BURST_KEYFRAME start mode, waiting for next"); + + /* throw reader to the waiting state */ + reader->bufpos = -1; + /* and make reader sync to next keyframe */ + reader->start_method = GST_BURST_CACHE_START_NEXT_KEYFRAME; + position = -1; + break; + } + case GST_BURST_CACHE_START_BURST_WITH_KEYFRAME: + { + gint min_idx, max_idx; + gint next_keyframe; + + /* BURST_WITH_KEYFRAME: + * + * try to start sending a keyframe to the reader. We first search + * a keyframe between min/max limits. If there is none, we send it the + * amount of data up 'till min. + */ + /* gather enough data to burst */ + count_burst_unit (cache, &min_idx, reader->min_format, + reader->min_value, &max_idx, reader->max_format, reader->max_value); + + GST_LOG_OBJECT (cache, "min %d, max %d", min_idx, max_idx); + + /* first find a keyframe after min_idx */ + next_keyframe = find_next_keyframe (cache, min_idx); + if (next_keyframe != -1 && next_keyframe < max_idx) { + /* we have a valid keyframe and it's below the max */ + GST_LOG_OBJECT (cache, "found keyframe in min/max limits"); + position = next_keyframe; + break; + } + + /* no keyframe, send data from min_idx */ + GST_WARNING_OBJECT (cache, "using min in BURST_WITH_KEYFRAME start mode"); + + /* make sure we don't go over the max limit */ + if (max_idx != -1 && max_idx <= min_idx) { + position = MAX (max_idx - 1, 0); + } else { + position = min_idx; + } + + break; + } + default: + g_warning ("unknown start method %d", reader->start_method); + position = reader->bufpos; + break; + } + + if (position >= 0) { + /* we got a valid spot in the queue */ + reader->new_reader = FALSE; + reader->bufpos = position; + /* signal that the reader is ready */ + reader->callback (cache, reader, reader->user_data); + } +} + +/** + * gst_burst_cache_add_reader: + * @cache: a #GstBurstCache + * @reader: a #GstBurstCacheReader + * + * Add @reader to @cache. + * + * Returns: %TRUE when @reader could be added + */ +gboolean +gst_burst_cache_add_reader (GstBurstCache * cache, GstBurstCacheReader * reader) +{ + g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE); + g_return_val_if_fail (reader != NULL, FALSE); + g_return_val_if_fail (reader->new_reader, FALSE); + + /* do limits check if we can */ + if (reader->min_format == reader->max_format) { + if (reader->max_value != -1 && reader->min_value != -1 && + reader->max_value < reader->min_value) + goto wrong_limits; + } + + CACHE_LOCK (cache); + handle_new_reader (cache, reader); + /* we can add the handle now */ + g_hook_prepend (&cache->readers, (GHook *) reader); + cache->readers_cookie++; + CACHE_UNLOCK (cache); + + return TRUE; + + /* errors */ +wrong_limits: + { + GST_WARNING_OBJECT (cache, + "%s wrong values min =%" G_GUINT64_FORMAT ", max=%" + G_GUINT64_FORMAT ", unit %d specified when adding reader", + reader->debug, reader->min_value, reader->max_value, + reader->min_format); + return FALSE; + } +} + +/* should be called with the readerslock held. */ +static void +gst_burst_cache_remove_reader_link (GstBurstCache * cache, + GstBurstCacheReader * reader, gboolean remove, GError * reason) +{ + if (!G_HOOK_IS_VALID (reader)) + goto was_removing; + + GST_DEBUG_OBJECT (cache, "%s removing reader %p: (%s)", + reader->debug, reader, reason ? reason->message : "Unknown reason"); + + /* set reader to invalid position while being removed */ + reader->bufpos = -1; + reader->reason = reason; + reader->remove_time = g_get_real_time (); + + cache->readers_cookie++; + if (remove) + g_hook_destroy_link (&cache->readers, (GHook *) reader); + + return; + + /* ERRORS */ +was_removing: + { + GST_WARNING_OBJECT (cache, "%s reader is already being removed", + reader->debug); + if (reason) + g_error_free (reason); + return; + } +} + +/** + * gst_burst_cache_remove_reader: + * @cache: a #GstBurstCache + * @reader: a #GstBurstCacheReader + * @drain: drain flag + * + * Remove @reader from @cache. When @drain is %TRUE all remaining data + * in the cache will be sent to the reader before it is removed. + * + * Returns: %TRUE on success + */ +gboolean +gst_burst_cache_remove_reader (GstBurstCache * cache, + GstBurstCacheReader * reader, gboolean drain) +{ + g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE); + g_return_val_if_fail (reader != NULL, FALSE); + + GST_DEBUG_OBJECT (cache, "%s removing reader", reader->debug); + + CACHE_LOCK (cache); + if (!G_HOOK_IS_VALID (reader)) + goto not_valid; + + if (drain) { + if (reader->draincount == -1) { + /* take the position of the reader as the number of buffers left to drain. + * If the reader was at position -1, we drain 0 buffers, 0 == drain 1 + * buffer, etc... This will mark reader as draining. We can not remove the + * reader right away because it might have some buffers to drain in its + * queue. */ + reader->draincount = reader->bufpos + 1; + } else { + GST_INFO_OBJECT (cache, "%s Reader already draining", reader->debug); + } + } else { + gst_burst_cache_remove_reader_link (cache, reader, TRUE, + g_error_new (0, GST_BURST_CACHE_ERROR_NONE, "User requested remove")); + } + CACHE_UNLOCK (cache); + + return TRUE; + + /* ERRORS */ +not_valid: + { + GST_WARNING_OBJECT (cache, "reader %s not found!", reader->debug); + CACHE_UNLOCK (cache); + return FALSE; + } +} + +/** + * gst_burst_cache_error_reader: + * @cache: a #GstBurstCache + * @reader: a #GstBurstCacheReader + * @error: (transfer full): a #GError + * + * Remove @reader from @cache and set the reason to @error. Ownership is taken + * of @error. + * + * Returns: %TRUE on success + */ +gboolean +gst_burst_cache_error_reader (GstBurstCache * cache, + GstBurstCacheReader * reader, GError * error) +{ + g_return_val_if_fail (GST_IS_BURST_CACHE (cache), FALSE); + g_return_val_if_fail (reader != NULL, FALSE); + + GST_DEBUG_OBJECT (cache, "%s error reader", reader->debug); + + CACHE_LOCK (cache); + if (!G_HOOK_IS_VALID (reader)) + goto not_valid; + + if (error == NULL) + error = g_error_new (0, GST_BURST_CACHE_ERROR_ERROR, "Unknown error"); + + GST_WARNING_OBJECT (cache, "%s reader %p error, removing: %s", + reader->debug, reader, error->message); + + gst_burst_cache_remove_reader_link (cache, reader, TRUE, error); + CACHE_UNLOCK (cache); + + return TRUE; + + /* ERRORS */ +not_valid: + { + GST_WARNING_OBJECT (cache, "reader %s not found!", reader->debug); + CACHE_UNLOCK (cache); + return FALSE; + } +} + +static gboolean +remove_hook (GstBurstCacheReader * reader, GstBurstCache * cache) +{ + gst_burst_cache_remove_reader_link (cache, reader, FALSE, + g_error_new (0, GST_BURST_CACHE_ERROR_NONE, "User requested clear")); + + /* FALSE to remove */ + return FALSE; +} + +/** + * gst_burst_cache_clear_readers: + * @cache: a #GstBurstCache + * + * Remove all readers from @cache. + */ +void +gst_burst_cache_clear_readers (GstBurstCache * cache) +{ + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + + GST_DEBUG_OBJECT (cache, "clearing all readers"); + + CACHE_LOCK (cache); + g_hook_list_marshal_check (&cache->readers, TRUE, (GHookCheckMarshaller) + remove_hook, cache); + CACHE_UNLOCK (cache); +} + +/* calculate the new position for a reader after recovery. This function + * does not update the reader position but merely returns the required + * position. + */ +static gint +gst_burst_cache_recover_reader (GstBurstCache * cache, + GstBurstCacheReader * reader) +{ + gint newbufpos; + + GST_WARNING_OBJECT (cache, + "%s reader %p is lagging at %d, recover using policy %d", + reader->debug, reader, reader->bufpos, cache->recover); + + switch (cache->recover) { + case GST_BURST_CACHE_RECOVER_NONE: + /* do nothing, reader will catch up or get kicked out when it reaches + * the hard max */ + newbufpos = reader->bufpos; + break; + case GST_BURST_CACHE_RECOVER_RESYNC_LATEST: + /* move to beginning of queue */ + newbufpos = -1; + break; + case GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT: + /* move to beginning of soft max */ + newbufpos = + get_buffers_max (cache, cache->limit_format, cache->limit_soft_max); + break; + case GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME: + /* find keyframe in buffers, we search backwards to find the + * closest keyframe relative to what this reader already received. */ + newbufpos = MIN (cache->bufqueue->len - 1, + get_buffers_max (cache, cache->limit_format, + cache->limit_soft_max) - 1); + + while (newbufpos >= 0) { + GstBuffer *buf; + + buf = g_array_index (cache->bufqueue, GstBuffer *, newbufpos); + if (is_keyframe (cache, buf)) { + /* found a buffer that is not a delta unit */ + break; + } + newbufpos--; + } + break; + default: + /* unknown recovery procedure */ + newbufpos = + get_buffers_max (cache, cache->limit_format, cache->limit_soft_max); + break; + } + return newbufpos; +} + +typedef struct +{ + GstBurstCache *cache; + GstBurstCacheClass *klass; + GstClockTime now; + gint max_buffer_usage; + gint max_buffers; + gint soft_max_buffers; +} QueueHookData; + +static gboolean +queue_hook (GstBurstCacheReader * reader, QueueHookData * data) +{ + GstBurstCache *cache = data->cache; + + /* move reader forwards */ + reader->bufpos++; + + GST_LOG_OBJECT (cache, "%s reader %p at position %d", + reader->debug, reader, reader->bufpos); + + /* check soft max if needed, recover reader */ + if (data->soft_max_buffers > 0 && reader->bufpos >= data->soft_max_buffers) { + gint newpos; + + newpos = gst_burst_cache_recover_reader (cache, reader); + if (newpos != reader->bufpos) { + reader->dropped_buffers += reader->bufpos - newpos; + reader->bufpos = newpos; + reader->discont = TRUE; + GST_INFO_OBJECT (cache, "%s reader %p position reset to %d", + reader->debug, reader, reader->bufpos); + } else { + GST_INFO_OBJECT (cache, + "%s reader %p not recovering position", reader->debug, reader); + } + } + + /* check hard max */ + if (data->max_buffers > 0 && reader->bufpos >= data->max_buffers) + goto hit_limit; + + /* check timeout */ + if (reader->timeout > 0 && data->now - reader->last_activity_time > + reader->timeout) + goto timeout; + + if (reader->new_reader) { + handle_new_reader (cache, reader); + } else if (reader->bufpos == 0) { + /* reader changed from -1 to 0, we can send data to this reader now. */ + reader->callback (cache, reader, reader->user_data); + } + + /* keep track of maximum buffer usage */ + if (reader->bufpos > data->max_buffer_usage) { + data->max_buffer_usage = reader->bufpos; + } + + return TRUE; + + /* ERRORS */ +hit_limit: + { + GST_WARNING_OBJECT (cache, "%s reader %p is too slow, removing", + reader->debug, reader); + gst_burst_cache_remove_reader_link (cache, reader, FALSE, + g_error_new (0, GST_BURST_CACHE_ERROR_SLOW, "Reader is too slow")); + /* remove reader */ + return FALSE; + } +timeout: + { + GST_WARNING_OBJECT (cache, "%s reader %p timeout, removing", + reader->debug, reader); + gst_burst_cache_remove_reader_link (cache, reader, FALSE, + g_error_new (0, GST_BURST_CACHE_ERROR_SLOW, "Reader timed out")); + /* remove reader */ + return FALSE; + } +} + +/** + * gst_burst_cache_queue_buffer: + * @cache: a #GstBurstCache + * @buffer: a #GstBuffer + * + * Queue @buffer in @cache. Older and unused buffers will be removed from + * @cache. + */ +void +gst_burst_cache_queue_buffer (GstBurstCache * cache, GstBuffer * buffer) +{ + gint queuelen; + gint i; + QueueHookData data; + + g_return_if_fail (GST_IS_BURST_CACHE (cache)); + g_return_if_fail (buffer != NULL); + + data.now = g_get_real_time (); + + data.klass = GST_BURST_CACHE_GET_CLASS (cache); + + CACHE_LOCK (cache); + /* add buffer to queue */ + g_array_prepend_val (cache->bufqueue, buffer); + queuelen = cache->bufqueue->len; + + data.cache = cache; + + if (cache->limit_max > 0) + data.max_buffers = + get_buffers_max (cache, cache->limit_format, cache->limit_max); + else + data.max_buffers = -1; + + if (cache->limit_soft_max > 0) + data.soft_max_buffers = + get_buffers_max (cache, cache->limit_format, cache->limit_soft_max); + else + data.soft_max_buffers = -1; + + GST_LOG_OBJECT (cache, "Using max %d, softmax %d", data.max_buffers, + data.soft_max_buffers); + + /* After adding the buffer, we update all reader positions in the queue. If + * a reader moves over the soft max, we start the recovery procedure for this + * slow reader. If it goes over the hard max, it is put into the slow list + * and removed. */ + data.max_buffer_usage = 0; + + g_hook_list_marshal_check (&cache->readers, TRUE, (GHookCheckMarshaller) + queue_hook, &data); + + /* make sure we respect bytes-min, buffers-min and time-min when they are set */ + { + gint usage, max; + + GST_LOG_OBJECT (cache, + "extending queue %d to respect time_min %" GST_TIME_FORMAT + ", bytes_min %d, buffers_min %d", data.max_buffer_usage, + GST_TIME_ARGS (cache->time_min), cache->bytes_min, cache->buffers_min); + + /* get index where the limits are ok, we don't really care if all limits + * are ok, we just queue as much as we need. We also don't compare against + * the max limits. */ + find_limits (cache, &usage, cache->bytes_min, cache->buffers_min, + cache->time_min, &max, -1, -1, -1); + + data.max_buffer_usage = MAX (data.max_buffer_usage, usage + 1); + GST_LOG_OBJECT (cache, "extended queue to %d", data.max_buffer_usage); + } + + /* now look for start points and make sure there is at least one + * keyframe point in the queue. */ + { + /* no point in searching beyond the queue length */ + gint limit = queuelen; + + /* no point in searching beyond the soft-max if any. */ + if (data.soft_max_buffers > 0) { + limit = MIN (limit, data.soft_max_buffers); + } + GST_LOG_OBJECT (cache, + "extending queue to include start point, now at %d, limit is %d", + data.max_buffer_usage, limit); + + for (i = 0; i < limit; i++) { + GstBuffer *buf; + + buf = g_array_index (cache->bufqueue, GstBuffer *, i); + if (is_keyframe (cache, buf)) { + /* found a sync frame, now extend the buffer usage to + * include at least this frame. */ + data.max_buffer_usage = MAX (data.max_buffer_usage, i); + break; + } + } + GST_LOG_OBJECT (cache, "max buffer usage is now %d", data.max_buffer_usage); + } + + GST_LOG_OBJECT (cache, "len %d, usage %d", queuelen, data.max_buffer_usage); + + /* nobody is referencing units after max_buffer_usage so we can + * remove them from the queue. We remove them in reverse order as + * this is the most optimal for GArray. */ + for (i = queuelen - 1; i > data.max_buffer_usage; i--) { + GstBuffer *old; + + /* queue exceeded max size */ + queuelen--; + old = g_array_index (cache->bufqueue, GstBuffer *, i); + cache->bufqueue = g_array_remove_index (cache->bufqueue, i); + + /* unref tail buffer */ + gst_buffer_unref (old); + } + /* save for stats */ + cache->buffers_queued = data.max_buffer_usage; + CACHE_UNLOCK (cache); +} + +/** + * gst_burst_cache_get_buffer: + * @cache: a #GstBurstCache + * @reader: a #GstBurstCacheReader + * @buffer: a #GstBuffer + * + * Get the next buffer for @reader in @cache. + * + * Returns: #GST_BURST_CACHE_RESULT_OK when a buffer is available. + * #GST_BURST_CACHE_RESULT_WAIT is returned when no buffers are available, the + * caller should wait for the callback signal before attempting to get a + * buffer again. #GST_BURST_CACHE_RESULT_EOS is returned when the client has + * received all buffers and is ready to be removed. + */ +GstBurstCacheResult +gst_burst_cache_get_buffer (GstBurstCache * cache, GstBurstCacheReader * reader, + GstBuffer ** buffer) +{ + GstBuffer *buf; + GstClockTime timestamp; + + g_return_val_if_fail (GST_IS_BURST_CACHE (cache), + GST_BURST_CACHE_RESULT_ERROR); + g_return_val_if_fail (reader != NULL, GST_BURST_CACHE_RESULT_ERROR); + g_return_val_if_fail (buffer != NULL, GST_BURST_CACHE_RESULT_ERROR); + + CACHE_LOCK (cache); + if (reader->bufpos == -1) + goto no_data_yet; + + /* we drained all remaining buffers, no need to get a new one */ + if (reader->draincount == 0) + goto drained; + + /* grab buffer */ + buf = g_array_index (cache->bufqueue, GstBuffer *, reader->bufpos); + reader->bufpos--; + + /* update stats */ + timestamp = GST_BUFFER_TIMESTAMP (buf); + if (reader->first_buffer_ts == GST_CLOCK_TIME_NONE) + reader->first_buffer_ts = timestamp; + if (timestamp != -1) + reader->last_buffer_ts = timestamp; + + /* decrease draincount */ + if (reader->draincount != -1) + reader->draincount--; + + GST_LOG_OBJECT (cache, "%s reader %p at position %d", + reader->debug, reader, reader->bufpos); + + *buffer = gst_buffer_ref (buf); + CACHE_UNLOCK (cache); + + return GST_BURST_CACHE_RESULT_OK; + + /* ERRORS */ +no_data_yet: + { + GST_DEBUG_OBJECT (cache, "%s no data available", reader->debug); + CACHE_UNLOCK (cache); + return GST_BURST_CACHE_RESULT_WAIT; + } +drained: + { + GST_DEBUG_OBJECT (cache, "%s drained", reader->debug); + CACHE_UNLOCK (cache); + return GST_BURST_CACHE_RESULT_EOS; + } +} diff --git a/gst/tcp/gstburstcache.h b/gst/tcp/gstburstcache.h new file mode 100644 index 000000000..7c2d85bd3 --- /dev/null +++ b/gst/tcp/gstburstcache.h @@ -0,0 +1,301 @@ +/* GStreamer + * Copyright (C) <2012> Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_BURST_CACHE_H__ +#define __GST_BURST_CACHE_H__ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +#define GST_TYPE_BURST_CACHE \ + (gst_burst_cache_get_type()) +#define GST_BURST_CACHE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BURST_CACHE,GstBurstCache)) +#define GST_BURST_CACHE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BURST_CACHE,GstBurstCacheClass)) +#define GST_IS_BURST_CACHE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BURST_CACHE)) +#define GST_IS_BURST_CACHE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BURST_CACHE)) +#define GST_BURST_CACHE_GET_CLASS(klass) \ + (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_BURST_CACHE, GstBurstCacheClass)) + +typedef struct _GstBurstCache GstBurstCache; +typedef struct _GstBurstCacheClass GstBurstCacheClass; +typedef struct _GstBurstCacheReader GstBurstCacheReader; + +/** + * GstBurstCacheRecover: + * @GST_BURST_CACHE_RECOVER_NONE : no recovering is done + * @GST_BURST_CACHE_RECOVER_RESYNC_LATEST : reader is moved to last buffer + * @GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT: reader is moved to the soft limit + * @GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME : reader is moved to latest keyframe + * + * Possible values for the recovery procedure to use when a reader consumes + * data too slowly and has a backlog of more that soft-limit buffers. + */ +typedef enum +{ + GST_BURST_CACHE_RECOVER_NONE, + GST_BURST_CACHE_RECOVER_RESYNC_LATEST, + GST_BURST_CACHE_RECOVER_RESYNC_SOFT_LIMIT, + GST_BURST_CACHE_RECOVER_RESYNC_KEYFRAME +} GstBurstCacheRecover; + +/** + * GstBurstCacheStart: + * @GST_BURST_CACHE_START_LATEST : reader receives most recent buffer + * @GST_BURST_CACHE_START_NEXT_KEYFRAME : reader receives next keyframe + * @GST_BURST_CACHE_START_LATEST_KEYFRAME : reader receives latest keyframe (burst) + * @GST_BURST_CACHE_START_BURST : reader receives specific amount of data + * @GST_BURST_CACHE_START_BURST_KEYFRAME : reader receives specific amount of data + * starting from latest keyframe + * @GST_BURST_CACHE_START_BURST_WITH_KEYFRAME : reader receives specific amount of data from + * a keyframe, or if there is not enough data after + * the keyframe, starting before the keyframe + * + * This enum defines the selection of the first buffer that is sent + * to a new reader. + */ +typedef enum +{ + GST_BURST_CACHE_START_LATEST, + GST_BURST_CACHE_START_NEXT_KEYFRAME, + GST_BURST_CACHE_START_LATEST_KEYFRAME, + GST_BURST_CACHE_START_BURST, + GST_BURST_CACHE_START_BURST_KEYFRAME, + GST_BURST_CACHE_START_BURST_WITH_KEYFRAME +} GstBurstCacheStart; + +/** + * GstBurstCacheError: + * @GST_BURST_CACHE_ERROR_NONE : No error + * @GST_BURST_CACHE_ERROR_SLOW : reader is too slow + * @GST_BURST_CACHE_ERROR_ERROR : reader is in error + * @GST_BURST_CACHE_ERROR_DUPLICATE: same reader added twice + * + * Error codes used in the reason GError. + */ +typedef enum +{ + GST_BURST_CACHE_ERROR_NONE = 0, + GST_BURST_CACHE_ERROR_SLOW = 1, + GST_BURST_CACHE_ERROR_ERROR = 2, + GST_BURST_CACHE_ERROR_DUPLICATE = 3, +} GstBurstCacheError; + +/** + * GstBurstCacheResult: + * @GST_BURST_CACHE_RESULT_ERROR : An error occured + * @GST_BURST_CACHE_RESULT_OK : No error + * @GST_BURST_CACHE_RESULT_WAIT : Wait for more buffers + * @GST_BURST_CACHE_RESULT_EOS : No more buffers + * + * Error codes used in the reason GError. + */ +typedef enum +{ + GST_BURST_CACHE_RESULT_ERROR = 0, + GST_BURST_CACHE_RESULT_OK = 1, + GST_BURST_CACHE_RESULT_WAIT = 2, + GST_BURST_CACHE_RESULT_EOS = 3, +} GstBurstCacheResult; + +/** + * GstBurstCacheReaderCallback: + * @cache: a #GstBurstCache + * @reader: a #GstBurstCacheReader + * @user_data: user data given when creating @reader + * + * Called when @reader in @cache has data. You can get the new data with + * gst_burst_cache_get_buffer() from this callback or any other thread. + */ +typedef void (*GstBurstCacheReaderCallback) (GstBurstCache *cache, + GstBurstCacheReader *reader, + gpointer user_data); + +/** + * GstBurstCacheReader: + * @object: parent miniobject + * @bufpos: position of this reader in the global queue + * @draincount: the remaining number of buffers to drain or -1 if the + * reader is not draining. + * @new_reader: this is a new reader + * @discont: is the next buffer was discont + * @reason: the reason why the reader is being removed + * + * structure for a reader + */ +struct _GstBurstCacheReader { + GHook hook; + + gint bufpos; + gint draincount; + + GstBurstCacheReaderCallback callback; + gpointer user_data; + GDestroyNotify notify; + + gboolean new_reader; + gboolean discont; + + GError *reason; + + /* method to sync reader when connecting */ + GstBurstCacheStart start_method; + GstFormat min_format; + guint64 min_value; + GstFormat max_format; + guint64 max_value; + + /* stats */ + guint64 bytes_sent; + guint64 dropped_buffers; + guint64 avg_queue_size; + guint64 first_buffer_ts; + guint64 last_buffer_ts; + + guint64 add_time; + guint64 remove_time; + guint64 last_activity_time; + guint64 timeout; + + gchar debug[30]; /* a debug string used in debug calls to + identify the reader */ +}; + +/** + * GstBurstCache: + * @parent: parent GObject + * @lock: lock to protect @readers + * @bufqueue: global queue of buffers + * @readers: list of readers we are serving + * @readers_cookie: Cookie to detect changes to @readers + * @limit_format: the format of @limit_max and @@limit_soft_max + * @limit_max: max units to queue for a reader + * @limit_soft_max: max units a reader can lag before recovery starts + * @recover: how to recover a lagging reader + * @bytes_min: min number of bytes to queue + * @time_min: min time to queue + * @buffers_min: min number of buffers to queue + * @bytes_to_serve: how much bytes we must serve + * @bytes_served: how much bytes have we served + * @bytes_queued: number of queued bytes + * @time_queued: amount of queued time + * @buffers_queued: number of queued buffers + */ +struct _GstBurstCache { + GObject parent; + + /*< private >*/ + GRecMutex lock; + GArray *bufqueue; + /* the readers */ + GHookList readers; + guint readers_cookie; + + /* these values are used to check if a reader is reading fast + * enough and to control recovery */ + GstFormat limit_format; + gint64 limit_max; + gint64 limit_soft_max; + GstBurstCacheRecover recover; + + /* these values are used to control the amount of data + * kept in the queues. It allows readers to perform a burst + * on connect. */ + gint bytes_min; + gint64 time_min; + gint buffers_min; + + /* stats */ + gint bytes_queued; + gint64 time_queued; + gint buffers_queued; +}; + +/** + * GstBurstCacheClass: + * @parent_class: parent GObjectClass + * @reader_ready: called when a reader has a new buffer available + * + * The GstBurstCache class structure. + */ +struct _GstBurstCacheClass { + GObjectClass parent_class; +}; + +GType gst_burst_cache_get_type (void); +GType gst_burst_cache_reader_get_type (void); + +GstBurstCache * gst_burst_cache_new (guint reader_size); + +void gst_burst_cache_set_min_amount (GstBurstCache *cache, + gint bytes_min, + gint64 time_min, + gint buffers_min); +void gst_burst_cache_get_min_amount (GstBurstCache *cache, + gint *bytes_min, + gint64 *time_min, + gint *buffers_min); + +void gst_burst_cache_set_limits (GstBurstCache *cache, + GstFormat format, + gint64 max, + gint64 soft_max, + GstBurstCacheRecover recover); +void gst_burst_cache_get_limits (GstBurstCache *cache, + GstFormat *format, + gint64 *max, + gint64 *soft_max, + GstBurstCacheRecover *recover); + +void gst_burst_cache_queue_buffer (GstBurstCache *cache, + GstBuffer *buffer); + +GstBurstCacheReader * gst_burst_cache_reader_new (GstBurstCache *cache, + GstBurstCacheReaderCallback callback, + gpointer user_data, + GDestroyNotify notify); +gboolean gst_burst_cache_reader_set_burst (GstBurstCacheReader *reader, + GstBurstCacheStart start_method, + GstFormat min_format, guint64 min_value, + GstFormat max_format, guint64 max_value); +void gst_burst_cache_reader_destroy (GstBurstCacheReader *reader); + +gboolean gst_burst_cache_add_reader (GstBurstCache *cache, + GstBurstCacheReader *reader); +gboolean gst_burst_cache_remove_reader (GstBurstCache *cache, + GstBurstCacheReader *reader, + gboolean flush); +gboolean gst_burst_cache_error_reader (GstBurstCache *cache, + GstBurstCacheReader *reader, + GError *error); + +void gst_burst_cache_clear_readers (GstBurstCache * cache); + + +GstBurstCacheResult gst_burst_cache_get_buffer (GstBurstCache *cache, + GstBurstCacheReader *reader, + GstBuffer **buffer); + +G_END_DECLS + +#endif /* __GST_BURST_CACHE_H__ */ |