diff options
author | Tim-Philipp Müller <tim@centricular.com> | 2017-12-02 15:12:25 +0000 |
---|---|---|
committer | Tim-Philipp Müller <tim@centricular.com> | 2017-12-02 15:27:22 +0000 |
commit | 39664da1f07bda3032ea867fc695450b689b9d8f (patch) | |
tree | 0011b1eeff5ebc45b8743f786089887ecdd9796f | |
parent | 9d812d7619e917b123e506a5f7da24e2dc4dc064 (diff) | |
parent | 79d0239e2fe92ea39a6abc0ecdccc92034a64ce6 (diff) |
Move GstAggregator from -bad to core
Merge branch 'aggregator-move'
https://bugzilla.gnome.org/show_bug.cgi?id=739010
-rw-r--r-- | libs/gst/base/gstaggregator.c | 3022 | ||||
-rw-r--r-- | libs/gst/base/gstaggregator.h | 336 | ||||
-rw-r--r-- | tests/check/libs/aggregator.c | 1276 |
3 files changed, 4634 insertions, 0 deletions
diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c new file mode 100644 index 000000000..9dddf837c --- /dev/null +++ b/libs/gst/base/gstaggregator.c @@ -0,0 +1,3022 @@ +/* GStreamer aggregator base class + * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com> + * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org> + * + * gstaggregator.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION: gstaggregator + * @title: GstAggregator + * @short_description: manages a set of pads with the purpose of + * aggregating their buffers. + * @see_also: gstcollectpads for historical reasons. + * + * Manages a set of pads with the purpose of aggregating their buffers. + * Control is given to the subclass when all pads have data. + * + * * Base class for mixers and muxers. Subclasses should at least implement + * the #GstAggregatorClass.aggregate() virtual method. + * + * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a + * #GstPadQueryFunction to queue all serialized data packets per sink pad. + * Subclasses should not overwrite those, but instead implement + * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as + * needed. + * + * * When data is queued on all pads, the aggregate vmethod is called. + * + * * One can peek at the data on any given GstAggregatorPad with the + * gst_aggregator_pad_get_buffer () method, and take ownership of it + * with the gst_aggregator_pad_steal_buffer () method. When a buffer + * has been taken with steal_buffer (), a new buffer can be queued + * on that pad. + * + * * If the subclass wishes to push a buffer downstream in its aggregate + * implementation, it should do so through the + * gst_aggregator_finish_buffer () method. This method will take care + * of sending and ordering mandatory events such as stream start, caps + * and segment. + * + * * Same goes for EOS events, which should not be pushed directly by the + * subclass, it should instead return GST_FLOW_EOS in its aggregate + * implementation. + * + * * Note that the aggregator logic regarding gap event handling is to turn + * these into gap buffers with matching PTS and duration. It will also + * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE + * to ease their identification and subsequent processing. + * + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <string.h> /* strlen */ + +#include "gstaggregator.h" + +typedef enum +{ + GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + GST_AGGREGATOR_START_TIME_SELECTION_SET +} GstAggregatorStartTimeSelection; + +static GType +gst_aggregator_start_time_selection_get_type (void) +{ + static GType gtype = 0; + + if (gtype == 0) { + static const GEnumValue values[] = { + {GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + "Start at 0 running time (default)", "zero"}, + {GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + "Start at first observed input running time", "first"}, + {GST_AGGREGATOR_START_TIME_SELECTION_SET, + "Set start time with start-time property", "set"}, + {0, NULL, NULL} + }; + + gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values); + } + return gtype; +} + +/* Might become API */ +static void gst_aggregator_merge_tags (GstAggregator * aggregator, + const GstTagList * tags, GstTagMergeMode mode); +static void gst_aggregator_set_latency_property (GstAggregator * agg, + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); + +static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); + +GST_DEBUG_CATEGORY_STATIC (aggregator_debug); +#define GST_CAT_DEFAULT aggregator_debug + +/* Locking order, locks in this element must always be taken in this order + * + * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad) + * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad) + * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad) + * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST + * standard element object lock -> GST_OBJECT_LOCK(agg) + * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad) + * standard src pad object lock -> GST_OBJECT_LOCK(srcpad) + * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) + */ + +/* GstAggregatorPad definitions */ +#define PAD_LOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&pad->priv->lock); \ + GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define PAD_UNLOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&pad->priv->lock); \ + GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + + +#define PAD_WAIT_EVENT(pad) G_STMT_START { \ + GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \ + g_thread_self()); \ + g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \ + (&((GstAggregatorPad*)pad)->priv->lock)); \ + GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define PAD_BROADCAST_EVENT(pad) G_STMT_START { \ + GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \ + g_thread_self()); \ + g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \ + } G_STMT_END + + +#define PAD_FLUSH_LOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Taking lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&pad->priv->flush_lock); \ + GST_TRACE_OBJECT (pad, "Took lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define PAD_FLUSH_UNLOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Releasing lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&pad->priv->flush_lock); \ + GST_TRACE_OBJECT (pad, "Release lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_LOCK(self) G_STMT_START { \ + GST_TRACE_OBJECT (self, "Taking src lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&self->priv->src_lock); \ + GST_TRACE_OBJECT (self, "Took src lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_UNLOCK(self) G_STMT_START { \ + GST_TRACE_OBJECT (self, "Releasing src lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&self->priv->src_lock); \ + GST_TRACE_OBJECT (self, "Released src lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_WAIT(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Waiting for src on thread %p", \ + g_thread_self()); \ + g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \ + GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define SRC_BROADCAST(self) G_STMT_START { \ + GST_LOG_OBJECT (self, "Signaling src from thread %p", \ + g_thread_self()); \ + if (self->priv->aggregate_id) \ + gst_clock_id_unschedule (self->priv->aggregate_id); \ + g_cond_broadcast(&(self->priv->src_cond)); \ + } G_STMT_END + +struct _GstAggregatorPadPrivate +{ + /* Following fields are protected by the PAD_LOCK */ + GstFlowReturn flow_return; + gboolean pending_flush_start; + gboolean pending_flush_stop; + + gboolean first_buffer; + + GQueue data; /* buffers, events and queries */ + GstBuffer *clipped_buffer; + guint num_buffers; + + /* used to track fill state of queues, only used with live-src and when + * latency property is set to > 0 */ + GstClockTime head_position; + GstClockTime tail_position; + GstClockTime head_time; /* running time */ + GstClockTime tail_time; + GstClockTime time_level; /* how much head is ahead of tail */ + GstSegment head_segment; /* segment before the queue */ + + gboolean negotiated; + + gboolean eos; + + GMutex lock; + GCond event_cond; + /* This lock prevents a flush start processing happening while + * the chain function is also happening. + */ + GMutex flush_lock; +}; + +/* Must be called with PAD_LOCK held */ +static void +gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) +{ + aggpad->priv->eos = FALSE; + aggpad->priv->flow_return = GST_FLOW_OK; + GST_OBJECT_LOCK (aggpad); + gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (aggpad); + aggpad->priv->head_position = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; + aggpad->priv->time_level = 0; + aggpad->priv->first_buffer = TRUE; +} + +static gboolean +gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) +{ + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + PAD_LOCK (aggpad); + gst_aggregator_pad_reset_unlocked (aggpad); + PAD_UNLOCK (aggpad); + + if (klass->flush) + return klass->flush (aggpad, agg); + + return TRUE; +} + +/************************************* + * GstAggregator implementation * + *************************************/ +static GstElementClass *aggregator_parent_class = NULL; + +/* All members are protected by the object lock unless otherwise noted */ + +struct _GstAggregatorPrivate +{ + gint max_padserial; + + /* Our state is >= PAUSED */ + gboolean running; /* protected by src_lock */ + + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ + gint seqnum; + gboolean send_stream_start; /* protected by srcpad stream lock */ + gboolean send_segment; + gboolean flush_seeking; + gboolean pending_flush_start; + gboolean send_eos; /* protected by srcpad stream lock */ + + GstCaps *srccaps; /* protected by the srcpad stream lock */ + + GstTagList *tags; + gboolean tags_changed; + + gboolean peer_latency_live; /* protected by src_lock */ + GstClockTime peer_latency_min; /* protected by src_lock */ + GstClockTime peer_latency_max; /* protected by src_lock */ + gboolean has_peer_latency; /* protected by src_lock */ + + GstClockTime sub_latency_min; /* protected by src_lock */ + GstClockTime sub_latency_max; /* protected by src_lock */ + + /* aggregate */ + GstClockID aggregate_id; /* protected by src_lock */ + GMutex src_lock; + GCond src_cond; + + gboolean first_buffer; /* protected by object lock */ + GstAggregatorStartTimeSelection start_time_selection; + GstClockTime start_time; + + /* protected by the object lock */ + GstQuery *allocation_query; + GstAllocator *allocator; + GstBufferPool *pool; + GstAllocationParams allocation_params; + + /* properties */ + gint64 latency; /* protected by both src_lock and all pad locks */ +}; + +/* Seek event forwarding helper */ +typedef struct +{ + /* parameters */ + GstEvent *event; + gboolean flush; + gboolean only_to_active_pads; + + /* results */ + gboolean result; + gboolean one_actually_seeked; +} EventData; + +#define DEFAULT_LATENCY 0 +#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO +#define DEFAULT_START_TIME (-1) + +enum +{ + PROP_0, + PROP_LATENCY, + PROP_START_TIME_SELECTION, + PROP_START_TIME, + PROP_LAST +}; + +static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); + +static gboolean +gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) +{ + return (g_queue_peek_tail (&pad->priv->data) == NULL && + pad->priv->clipped_buffer == NULL); +} + +static gboolean +gst_aggregator_check_pads_ready (GstAggregator * self) +{ + GstAggregatorPad *pad; + GList *l, *sinkpads; + gboolean have_buffer = TRUE; + gboolean have_event_or_query = FALSE; + + GST_LOG_OBJECT (self, "checking pads"); + + GST_OBJECT_LOCK (self); + + sinkpads = GST_ELEMENT_CAST (self)->sinkpads; + if (sinkpads == NULL) + goto no_sinkpads; + + for (l = sinkpads; l != NULL; l = l->next) { + pad = l->data; + + PAD_LOCK (pad); + + if (pad->priv->num_buffers == 0) { + if (!gst_aggregator_pad_queue_is_empty (pad)) + have_event_or_query = TRUE; + if (!pad->priv->eos) { + have_buffer = FALSE; + + /* If not live we need data on all pads, so leave the loop */ + if (!self->priv->peer_latency_live) { + PAD_UNLOCK (pad); + goto pad_not_ready; + } + } + } else if (self->priv->peer_latency_live) { + /* In live mode, having a single pad with buffers is enough to + * generate a start time from it. In non-live mode all pads need + * to have a buffer + */ + self->priv->first_buffer = FALSE; + } + + PAD_UNLOCK (pad); + } + + if (!have_buffer && !have_event_or_query) + goto pad_not_ready; + + if (have_buffer) + self->priv->first_buffer = FALSE; + + GST_OBJECT_UNLOCK (self); + GST_LOG_OBJECT (self, "pads are ready"); + return TRUE; + +no_sinkpads: + { + GST_LOG_OBJECT (self, "pads not ready: no sink pads"); + GST_OBJECT_UNLOCK (self); + return FALSE; + } +pad_not_ready: + { + if (have_event_or_query) + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," + " but waking up for serialized event"); + else + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); + GST_OBJECT_UNLOCK (self); + return have_event_or_query; + } +} + +static void +gst_aggregator_reset_flow_values (GstAggregator * self) +{ + GST_OBJECT_LOCK (self); + self->priv->send_stream_start = TRUE; + self->priv->send_segment = TRUE; + gst_segment_init (&self->segment, GST_FORMAT_TIME); + self->priv->first_buffer = TRUE; + GST_OBJECT_UNLOCK (self); +} + +static inline void +gst_aggregator_push_mandatory_events (GstAggregator * self) +{ + GstAggregatorPrivate *priv = self->priv; + GstEvent *segment = NULL; + GstEvent *tags = NULL; + + if (self->priv->send_stream_start) { + gchar s_id[32]; + + GST_INFO_OBJECT (self, "pushing stream start"); + /* stream-start (FIXME: create id based on input ids) */ + g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ()); + if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) { + GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed"); + } + self->priv->send_stream_start = FALSE; + } + + if (self->priv->srccaps) { + + GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT, + self->priv->srccaps); + if (!gst_pad_push_event (self->srcpad, + gst_event_new_caps (self->priv->srccaps))) { + GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed"); + } + gst_caps_unref (self->priv->srccaps); + self->priv->srccaps = NULL; + } + + GST_OBJECT_LOCK (self); + if (self->priv->send_segment && !self->priv->flush_seeking) { + segment = gst_event_new_segment (&self->segment); + + if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ + self->priv->seqnum = gst_event_get_seqnum (segment); + else + gst_event_set_seqnum (segment, self->priv->seqnum); + self->priv->send_segment = FALSE; + + GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment); + } + + if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) { + tags = gst_event_new_tag (gst_tag_list_ref (priv->tags)); + priv->tags_changed = FALSE; + } + GST_OBJECT_UNLOCK (self); + + if (segment) + gst_pad_push_event (self->srcpad, segment); + if (tags) + gst_pad_push_event (self->srcpad, tags); + +} + +/** + * gst_aggregator_set_src_caps: + * @self: The #GstAggregator + * @caps: The #GstCaps to set on the src pad. + * + * Sets the caps to be used on the src pad. + */ +void +gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) +{ + GST_PAD_STREAM_LOCK (self->srcpad); + gst_caps_replace (&self->priv->srccaps, caps); + gst_aggregator_push_mandatory_events (self); + GST_PAD_STREAM_UNLOCK (self->srcpad); +} + +/** + * gst_aggregator_finish_buffer: + * @self: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +{ + gst_aggregator_push_mandatory_events (self); + + GST_OBJECT_LOCK (self); + if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) { + GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer); + GST_OBJECT_UNLOCK (self); + return gst_pad_push (self->srcpad, buffer); + } else { + GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)", + self->priv->flush_seeking, gst_pad_is_active (self->srcpad)); + GST_OBJECT_UNLOCK (self); + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } +} + +static void +gst_aggregator_push_eos (GstAggregator * self) +{ + GstEvent *event; + gst_aggregator_push_mandatory_events (self); + + event = gst_event_new_eos (); + + GST_OBJECT_LOCK (self); + self->priv->send_eos = FALSE; + gst_event_set_seqnum (event, self->priv->seqnum); + GST_OBJECT_UNLOCK (self); + + gst_pad_push_event (self->srcpad, event); +} + +static GstClockTime +gst_aggregator_get_next_time (GstAggregator * self) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + + if (klass->get_next_time) + return klass->get_next_time (self); + + return GST_CLOCK_TIME_NONE; +} + +static gboolean +gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) +{ + GstClockTime latency; + GstClockTime start; + gboolean res; + + *timeout = FALSE; + + SRC_LOCK (self); + + latency = gst_aggregator_get_latency_unlocked (self); + + if (gst_aggregator_check_pads_ready (self)) { + GST_DEBUG_OBJECT (self, "all pads have data"); + SRC_UNLOCK (self); + + return TRUE; + } + + /* Before waiting, check if we're actually still running */ + if (!self->priv->running || !self->priv->send_eos) { + SRC_UNLOCK (self); + + return FALSE; + } + + start = gst_aggregator_get_next_time (self); + + /* If we're not live, or if we use the running time + * of the first buffer as start time, we wait until + * all pads have buffers. + * Otherwise (i.e. if we are live!), we wait on the clock + * and if a pad does not have a buffer in time we ignore + * that pad. + */ + GST_OBJECT_LOCK (self); + if (!GST_CLOCK_TIME_IS_VALID (latency) || + !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || + !GST_CLOCK_TIME_IS_VALID (start) || + (self->priv->first_buffer + && self->priv->start_time_selection == + GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) { + /* We wake up here when something happened, and below + * then check if we're ready now. If we return FALSE, + * we will be directly called again. + */ + GST_OBJECT_UNLOCK (self); + SRC_WAIT (self); + } else { + GstClockTime base_time, time; + GstClock *clock; + GstClockReturn status; + GstClockTimeDiff jitter; + + GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (start)); + + base_time = GST_ELEMENT_CAST (self)->base_time; + clock = gst_object_ref (GST_ELEMENT_CLOCK (self)); + GST_OBJECT_UNLOCK (self); + + time = base_time + start; + time += latency; + + GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" + GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT + " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")", + GST_TIME_ARGS (time), + GST_TIME_ARGS (base_time), + GST_TIME_ARGS (start), GST_TIME_ARGS (latency), + GST_TIME_ARGS (gst_clock_get_time (clock))); + + self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time); + gst_object_unref (clock); + SRC_UNLOCK (self); + + jitter = 0; + status = gst_clock_id_wait (self->priv->aggregate_id, &jitter); + + SRC_LOCK (self); + if (self->priv->aggregate_id) { + gst_clock_id_unref (self->priv->aggregate_id); + self->priv->aggregate_id = NULL; + } + + GST_DEBUG_OBJECT (self, + "clock returned %d (jitter: %" GST_STIME_FORMAT ")", + status, GST_STIME_ARGS (jitter)); + + /* we timed out */ + if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { + SRC_UNLOCK (self); + *timeout = TRUE; + return TRUE; + } + } + + res = gst_aggregator_check_pads_ready (self); + SRC_UNLOCK (self); + + return res; +} + +static gboolean +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); + GstEvent *event = NULL; + GstQuery *query = NULL; + GstAggregatorClass *klass = NULL; + gboolean *processed_event = user_data; + + do { + event = NULL; + query = NULL; + + PAD_LOCK (pad); + if (pad->priv->clipped_buffer == NULL && + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); + } + PAD_UNLOCK (pad); + if (event || query) { + gboolean ret; + + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + + if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + gst_event_ref (event); + ret = klass->sink_event (aggregator, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); + gst_event_unref (event); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (aggregator, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->data) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->data); + } + } + + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); + } + } while (event || query); + + return TRUE; +} + +static void +gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, + GstFlowReturn flow_return, gboolean full) +{ + GList *item; + + PAD_LOCK (aggpad); + if (flow_return == GST_FLOW_NOT_LINKED) + aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); + else + aggpad->priv->flow_return = flow_return; + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + /* In partial flush, we do like the pad, we get rid of non-sticky events + * and EOS/SEGMENT. + */ + if (full || GST_IS_BUFFER (item->data) || + GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || + GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || + !GST_EVENT_IS_STICKY (item->data)) { + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } + item = next; + } + aggpad->priv->num_buffers = 0; + gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); + + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); +} + +static GstFlowReturn +gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps, + GstCaps ** ret) +{ + *ret = gst_caps_ref (caps); + + return GST_FLOW_OK; +} + +static GstCaps * +gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps) +{ + caps = gst_caps_fixate (caps); + + return caps; +} + +static gboolean +gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps) +{ + return TRUE; +} + + +/* takes ownership of the pool, allocator and query */ +static gboolean +gst_aggregator_set_allocation (GstAggregator * self, + GstBufferPool * pool, GstAllocator * allocator, + GstAllocationParams * params, GstQuery * query) +{ + GstAllocator *oldalloc; + GstBufferPool *oldpool; + GstQuery *oldquery; + + GST_DEBUG ("storing allocation query"); + + GST_OBJECT_LOCK (self); + oldpool = self->priv->pool; + self->priv->pool = pool; + + oldalloc = self->priv->allocator; + self->priv->allocator = allocator; + + oldquery = self->priv->allocation_query; + self->priv->allocation_query = query; + + if (params) + self->priv->allocation_params = *params; + else + gst_allocation_params_init (&self->priv->allocation_params); + GST_OBJECT_UNLOCK (self); + + if (oldpool) { + GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool); + gst_buffer_pool_set_active (oldpool, FALSE); + gst_object_unref (oldpool); + } + if (oldalloc) { + gst_object_unref (oldalloc); + } + if (oldquery) { + gst_query_unref (oldquery); + } + return TRUE; +} + + +static gboolean +gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query) +{ + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); + + if (aggclass->decide_allocation) + if (!aggclass->decide_allocation (self, query)) + return FALSE; + + return TRUE; +} + +static gboolean +gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps) +{ + GstQuery *query; + gboolean result = TRUE; + GstBufferPool *pool = NULL; + GstAllocator *allocator; + GstAllocationParams params; + + /* find a pool for the negotiated caps now */ + GST_DEBUG_OBJECT (self, "doing allocation query"); + query = gst_query_new_allocation (caps, TRUE); + if (!gst_pad_peer_query (self->srcpad, query)) { + /* not a problem, just debug a little */ + GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed"); + } + + GST_DEBUG_OBJECT (self, "calling decide_allocation"); + result = gst_aggregator_decide_allocation (self, query); + + GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result, + query); + + if (!result) + goto no_decide_allocation; + + /* we got configuration from our peer or the decide_allocation method, + * parse them */ + if (gst_query_get_n_allocation_params (query) > 0) { + gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms); + } else { + allocator = NULL; + gst_allocation_params_init (¶ms); + } + + if (gst_query_get_n_allocation_pools (query) > 0) + gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL); + + /* now store */ + result = + gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query); + + return result; + + /* Errors */ +no_decide_allocation: + { + GST_WARNING_OBJECT (self, "Failed to decide allocation"); + gst_query_unref (query); + + return result; + } + +} + +/* WITH SRC_LOCK held */ +static GstFlowReturn +gst_aggregator_update_src_caps (GstAggregator * self) +{ + GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self); + GstCaps *downstream_caps, *template_caps, *caps = NULL; + GstFlowReturn ret = GST_FLOW_OK; + + template_caps = gst_pad_get_pad_template_caps (self->srcpad); + downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps); + + if (gst_caps_is_empty (downstream_caps)) { + GST_INFO_OBJECT (self, "Downstream caps (%" + GST_PTR_FORMAT ") not compatible with pad template caps (%" + GST_PTR_FORMAT ")", downstream_caps, template_caps); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + + g_assert (agg_klass->update_src_caps); + GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT, + downstream_caps); + ret = agg_klass->update_src_caps (self, downstream_caps, &caps); + if (ret < GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Subclass failed to update provided caps"); + goto done; + } + if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) { + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + +#ifdef GST_ENABLE_EXTRA_CHECKS + if (!gst_caps_is_subset (caps, template_caps)) { + GstCaps *intersection; + + GST_ERROR_OBJECT (self, + "update_src_caps returned caps %" GST_PTR_FORMAT + " which are not a real subset of the template caps %" + GST_PTR_FORMAT, caps, template_caps); + g_warning ("%s: update_src_caps returned caps which are not a real " + "subset of the filter caps", GST_ELEMENT_NAME (self)); + + intersection = + gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (caps); + caps = intersection; + } +#endif + + if (gst_caps_is_any (caps)) { + goto done; + } + + if (!gst_caps_is_fixed (caps)) { + g_assert (agg_klass->fixate_src_caps); + + GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps); + if (!(caps = agg_klass->fixate_src_caps (self, caps))) { + GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + } + + if (agg_klass->negotiated_src_caps) { + if (!agg_klass->negotiated_src_caps (self, caps)) { + GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + } + + gst_aggregator_set_src_caps (self, caps); + + if (!gst_aggregator_do_allocation (self, caps)) { + GST_WARNING_OBJECT (self, "Allocation negotiation failed"); + ret = GST_FLOW_NOT_NEGOTIATED; + } + +done: + gst_caps_unref (downstream_caps); + gst_caps_unref (template_caps); + + if (caps) + gst_caps_unref (caps); + + return ret; +} + +static void +gst_aggregator_aggregate_func (GstAggregator * self) +{ + GstAggregatorPrivate *priv = self->priv; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + gboolean timeout = FALSE; + + if (self->priv->running == FALSE) { + GST_DEBUG_OBJECT (self, "Not running anymore"); + return; + } + + GST_LOG_OBJECT (self, "Checking aggregate"); + while (priv->send_eos && priv->running) { + GstFlowReturn flow_return = GST_FLOW_OK; + gboolean processed_event = FALSE; + + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, NULL); + + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ + if (!gst_aggregator_wait_and_check (self, &timeout)) + continue; + + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &processed_event); + + if (processed_event) + continue; + + if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { + flow_return = gst_aggregator_update_src_caps (self); + if (flow_return != GST_FLOW_OK) + gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self)); + } + + if (timeout || flow_return >= GST_FLOW_OK) { + GST_TRACE_OBJECT (self, "Actually aggregating!"); + flow_return = klass->aggregate (self, timeout); + } + + if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) + continue; + + GST_OBJECT_LOCK (self); + if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { + /* We don't want to set the pads to flushing, but we want to + * stop the thread, so just break here */ + GST_OBJECT_UNLOCK (self); + break; + } + GST_OBJECT_UNLOCK (self); + + if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) { + gst_aggregator_push_eos (self); + } + + GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); + + if (flow_return != GST_FLOW_OK) { + GList *item; + + GST_OBJECT_LOCK (self); + for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + + gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); + } + GST_OBJECT_UNLOCK (self); + break; + } + } + + /* Pause the task here, the only ways to get here are: + * 1) We're stopping, in which case the task is stopped anyway + * 2) We got a flow error above, in which case it might take + * some time to forward the flow return upstream and we + * would otherwise call the task function over and over + * again without doing anything + */ + gst_pad_pause_task (self->srcpad); +} + +static gboolean +gst_aggregator_start (GstAggregator * self) +{ + GstAggregatorClass *klass; + gboolean result; + + self->priv->send_stream_start = TRUE; + self->priv->send_segment = TRUE; + self->priv->send_eos = TRUE; + self->priv->srccaps = NULL; + + gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL); + + klass = GST_AGGREGATOR_GET_CLASS (self); + + if (klass->start) + result = klass->start (self); + else + result = TRUE; + + return result; +} + +static gboolean +_check_pending_flush_stop (GstAggregatorPad * pad) +{ + gboolean res; + + PAD_LOCK (pad); + res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); + PAD_UNLOCK (pad); + + return res; +} + +static gboolean +gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start) +{ + gboolean res = TRUE; + + GST_INFO_OBJECT (self, "%s srcpad task", + flush_start ? "Pausing" : "Stopping"); + + SRC_LOCK (self); + self->priv->running = FALSE; + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + if (flush_start) { + res = gst_pad_push_event (self->srcpad, flush_start); + } + + gst_pad_stop_task (self->srcpad); + + return res; +} + +static void +gst_aggregator_start_srcpad_task (GstAggregator * self) +{ + GST_INFO_OBJECT (self, "Starting srcpad task"); + + self->priv->running = TRUE; + gst_pad_start_task (GST_PAD (self->srcpad), + (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL); +} + +static GstFlowReturn +gst_aggregator_flush (GstAggregator * self) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstAggregatorPrivate *priv = self->priv; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + + GST_DEBUG_OBJECT (self, "Flushing everything"); + GST_OBJECT_LOCK (self); + priv->send_segment = TRUE; + priv->flush_seeking = FALSE; + priv->tags_changed = FALSE; + GST_OBJECT_UNLOCK (self); + if (klass->flush) + ret = klass->flush (self); + + return ret; +} + + +/* Called with GstAggregator's object lock held */ + +static gboolean +gst_aggregator_all_flush_stop_received_locked (GstAggregator * self) +{ + GList *tmp; + GstAggregatorPad *tmppad; + + for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) { + tmppad = (GstAggregatorPad *) tmp->data; + + if (_check_pending_flush_stop (tmppad) == FALSE) { + GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i", + tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop); + return FALSE; + } + } + + return TRUE; +} + +static void +gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, + GstEvent * event) +{ + GstAggregatorPrivate *priv = self->priv; + GstAggregatorPadPrivate *padpriv = aggpad->priv; + + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); + + PAD_FLUSH_LOCK (aggpad); + PAD_LOCK (aggpad); + if (padpriv->pending_flush_start) { + GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); + + padpriv->pending_flush_start = FALSE; + padpriv->pending_flush_stop = TRUE; + } + PAD_UNLOCK (aggpad); + + GST_OBJECT_LOCK (self); + if (priv->flush_seeking) { + /* If flush_seeking we forward the first FLUSH_START */ + if (priv->pending_flush_start) { + priv->pending_flush_start = FALSE; + GST_OBJECT_UNLOCK (self); + + GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); + gst_aggregator_stop_srcpad_task (self, event); + + GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); + GST_PAD_STREAM_LOCK (self->srcpad); + GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); + event = NULL; + } else { + GST_OBJECT_UNLOCK (self); + gst_event_unref (event); + } + } else { + GST_OBJECT_UNLOCK (self); + gst_event_unref (event); + } + PAD_FLUSH_UNLOCK (aggpad); +} + +/* Must be called with the the PAD_LOCK held */ +static void +update_time_level (GstAggregatorPad * aggpad, gboolean head) +{ + GstAggregatorPadPrivate *priv = aggpad->priv; + + if (head) { + if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && + priv->head_segment.format == GST_FORMAT_TIME) + priv->head_time = gst_segment_to_running_time (&priv->head_segment, + GST_FORMAT_TIME, priv->head_position); + else + priv->head_time = GST_CLOCK_TIME_NONE; + + if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) + priv->tail_time = priv->head_time; + } else { + if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && + aggpad->segment.format == GST_FORMAT_TIME) + priv->tail_time = gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, priv->tail_position); + else + priv->tail_time = priv->head_time; + } + + if (priv->head_time == GST_CLOCK_TIME_NONE || + priv->tail_time == GST_CLOCK_TIME_NONE) { + priv->time_level = 0; + return; + } + + if (priv->tail_time > priv->head_time) + priv->time_level = 0; + else + priv->time_level = priv->head_time - priv->tail_time; +} + + +/* GstAggregator vmethods default implementations */ +static gboolean +gst_aggregator_default_sink_event (GstAggregator * self, + GstAggregatorPad * aggpad, GstEvent * event) +{ + gboolean res = TRUE; + GstPad *pad = GST_PAD (aggpad); + GstAggregatorPrivate *priv = self->priv; + + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + { + gst_aggregator_flush_start (self, aggpad, event); + /* We forward only in one case: right after flush_seeking */ + event = NULL; + goto eat; + } + case GST_EVENT_FLUSH_STOP: + { + gst_aggregator_pad_flush (aggpad, self); + GST_OBJECT_LOCK (self); + if (priv->flush_seeking) { + g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE); + if (gst_aggregator_all_flush_stop_received_locked (self)) { + GST_OBJECT_UNLOCK (self); + /* That means we received FLUSH_STOP/FLUSH_STOP on + * all sinkpads -- Seeking is Done... sending FLUSH_STOP */ + gst_aggregator_flush (self); + gst_pad_push_event (self->srcpad, event); + event = NULL; + SRC_LOCK (self); + priv->send_eos = TRUE; + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); + GST_PAD_STREAM_UNLOCK (self->srcpad); + gst_aggregator_start_srcpad_task (self); + } else { + GST_OBJECT_UNLOCK (self); + } + } else { + GST_OBJECT_UNLOCK (self); + } + + /* We never forward the event */ + goto eat; + } + case GST_EVENT_EOS: + { + SRC_LOCK (self); + PAD_LOCK (aggpad); + g_assert (aggpad->priv->num_buffers == 0); + aggpad->priv->eos = TRUE; + PAD_UNLOCK (aggpad); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + goto eat; + } + case GST_EVENT_SEGMENT: + { + PAD_LOCK (aggpad); + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->segment); + /* We've got a new segment, tail_position is now meaningless + * and may interfere with the time_level calculation + */ + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + update_time_level (aggpad, FALSE); + GST_OBJECT_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); + + GST_OBJECT_LOCK (self); + self->priv->seqnum = gst_event_get_seqnum (event); + GST_OBJECT_UNLOCK (self); + goto eat; + } + case GST_EVENT_STREAM_START: + { + goto eat; + } + case GST_EVENT_GAP: + { + GstClockTime pts, endpts; + GstClockTime duration; + GstBuffer *gapbuf; + + gst_event_parse_gap (event, &pts, &duration); + gapbuf = gst_buffer_new (); + + if (GST_CLOCK_TIME_IS_VALID (duration)) + endpts = pts + duration; + else + endpts = GST_CLOCK_TIME_NONE; + + GST_OBJECT_LOCK (aggpad); + res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, + &pts, &endpts); + GST_OBJECT_UNLOCK (aggpad); + + if (!res) { + GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); + goto eat; + } + + if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) + duration = endpts - pts; + else + duration = GST_CLOCK_TIME_NONE; + + GST_BUFFER_PTS (gapbuf) = pts; + GST_BUFFER_DURATION (gapbuf) = duration; + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + + /* Remove GAP event so we can replace it with the buffer */ + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != + GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); + res = FALSE; + } + + goto eat; + } + case GST_EVENT_TAG: + { + GstTagList *tags; + + gst_event_parse_tag (event, &tags); + + if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { + gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); + gst_event_unref (event); + event = NULL; + goto eat; + } + break; + } + default: + { + break; + } + } + + GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event); + return gst_pad_event_default (pad, GST_OBJECT (self), event); + +eat: + GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event); + if (event) + gst_event_unref (event); + + return res; +} + +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); + + PAD_LOCK (pad); + pad->priv->flow_return = GST_FLOW_FLUSHING; + pad->priv->negotiated = FALSE; + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); + + return TRUE; +} + +static gboolean +gst_aggregator_stop (GstAggregator * agg) +{ + GstAggregatorClass *klass; + gboolean result; + + gst_aggregator_reset_flow_values (agg); + + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); + + klass = GST_AGGREGATOR_GET_CLASS (agg); + + if (klass->stop) + result = klass->stop (agg); + else + result = TRUE; + + agg->priv->has_peer_latency = FALSE; + agg->priv->peer_latency_live = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; + + if (agg->priv->tags) + gst_tag_list_unref (agg->priv->tags); + agg->priv->tags = NULL; + + gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL); + + return result; +} + +/* GstElement vmethods implementations */ +static GstStateChangeReturn +gst_aggregator_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstAggregator *self = GST_AGGREGATOR (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (!gst_aggregator_start (self)) + goto error_start; + break; + default: + break; + } + + if ((ret = + GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element, + transition)) == GST_STATE_CHANGE_FAILURE) + goto failure; + + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + if (!gst_aggregator_stop (self)) { + /* What to do in this case? Error out? */ + GST_ERROR_OBJECT (self, "Subclass failed to stop."); + } + break; + default: + break; + } + + return ret; + +/* ERRORS */ +failure: + { + GST_ERROR_OBJECT (element, "parent failed state change"); + return ret; + } +error_start: + { + GST_ERROR_OBJECT (element, "Subclass failed to start"); + return GST_STATE_CHANGE_FAILURE; + } +} + +static void +gst_aggregator_release_pad (GstElement * element, GstPad * pad) +{ + GstAggregator *self = GST_AGGREGATOR (element); + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + GST_INFO_OBJECT (pad, "Removing pad"); + + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + gst_element_remove_pad (element, pad); + + self->priv->has_peer_latency = FALSE; + SRC_BROADCAST (self); + SRC_UNLOCK (self); +} + +static GstAggregatorPad * +gst_aggregator_default_create_new_pad (GstAggregator * self, + GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) +{ + GstAggregatorPad *agg_pad; + GstAggregatorPrivate *priv = self->priv; + gint serial = 0; + gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); + + if (templ->direction != GST_PAD_SINK) + goto not_sink; + + if (templ->presence != GST_PAD_REQUEST) + goto not_request; + + GST_OBJECT_LOCK (self); + if (req_name == NULL || strlen (req_name) < 6 + || !g_str_has_prefix (req_name, "sink_")) { + /* no name given when requesting the pad, use next available int */ + serial = ++priv->max_padserial; + } else { + /* parse serial number from requested padname */ + serial = g_ascii_strtoull (&req_name[5], NULL, 10); + if (serial > priv->max_padserial) + priv->max_padserial = serial; + } + + name = g_strdup_printf ("sink_%u", serial); + agg_pad = g_object_new (pad_type, + "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); + g_free (name); + + GST_OBJECT_UNLOCK (self); + + return agg_pad; + + /* errors */ +not_sink: + { + GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad"); + return NULL; + } +not_request: + { + GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad"); + return NULL; + } +} + +static GstPad * +gst_aggregator_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) +{ + GstAggregator *self; + GstAggregatorPad *agg_pad; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element); + GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + + self = GST_AGGREGATOR (element); + + agg_pad = klass->create_new_pad (self, templ, req_name, caps); + if (!agg_pad) { + GST_ERROR_OBJECT (element, "Couldn't create new pad"); + return NULL; + } + + GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad)); + + if (priv->running) + gst_pad_set_active (GST_PAD (agg_pad), TRUE); + + /* add the pad to the element */ + gst_element_add_pad (element, GST_PAD (agg_pad)); + + return GST_PAD (agg_pad); +} + +/* Must be called with SRC_LOCK held */ + +static gboolean +gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) +{ + gboolean query_ret, live; + GstClockTime our_latency, min, max; + + query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); + + if (!query_ret) { + GST_WARNING_OBJECT (self, "Latency query failed"); + return FALSE; + } + + gst_query_parse_latency (query, &live, &min, &max); + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { + GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT + ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); + return FALSE; + } + + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { + GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), + ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" + GST_TIME_FORMAT ". Add queues or other buffering elements.", + GST_TIME_ARGS (max), GST_TIME_ARGS (min))); + return FALSE; + } + + our_latency = self->priv->latency; + + self->priv->peer_latency_live = live; + self->priv->peer_latency_min = min; + self->priv->peer_latency_max = max; + self->priv->has_peer_latency = TRUE; + + /* add our own */ + min += our_latency; + min += self->priv->sub_latency_min; + if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) + && GST_CLOCK_TIME_IS_VALID (max)) + max += self->priv->sub_latency_max + our_latency; + else + max = GST_CLOCK_TIME_NONE; + + SRC_BROADCAST (self); + + GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT + " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); + + gst_query_set_latency (query, live, min, max); + + return query_ret; +} + +/* + * MUST be called with the src_lock held. + * + * See gst_aggregator_get_latency() for doc + */ +static GstClockTime +gst_aggregator_get_latency_unlocked (GstAggregator * self) +{ + GstClockTime latency; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0); + + if (!self->priv->has_peer_latency) { + GstQuery *query = gst_query_new_latency (); + gboolean ret; + + ret = gst_aggregator_query_latency_unlocked (self, query); + gst_query_unref (query); + if (!ret) + return GST_CLOCK_TIME_NONE; + } + + if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) + return GST_CLOCK_TIME_NONE; + + /* latency_min is never GST_CLOCK_TIME_NONE by construction */ + latency = self->priv->peer_latency_min; + + /* add our own */ + latency += self->priv->latency; + latency += self->priv->sub_latency_min; + + return latency; +} + +/** + * gst_aggregator_get_latency: + * @self: a #GstAggregator + * + * Retrieves the latency values reported by @self in response to the latency + * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element + * will not wait for the clock. + * + * Typically only called by subclasses. + * + * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync + */ +GstClockTime +gst_aggregator_get_latency (GstAggregator * self) +{ + GstClockTime ret; + + SRC_LOCK (self); + ret = gst_aggregator_get_latency_unlocked (self); + SRC_UNLOCK (self); + + return ret; +} + +static gboolean +gst_aggregator_send_event (GstElement * element, GstEvent * event) +{ + GstAggregator *self = GST_AGGREGATOR (element); + + GST_STATE_LOCK (element); + if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK && + GST_STATE (element) < GST_STATE_PAUSED) { + gdouble rate; + GstFormat fmt; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + + gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, + &start, &stop_type, &stop); + + GST_OBJECT_LOCK (self); + gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, + stop_type, stop, NULL); + self->priv->seqnum = gst_event_get_seqnum (event); + self->priv->first_buffer = FALSE; + GST_OBJECT_UNLOCK (self); + + GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event); + } + GST_STATE_UNLOCK (element); + + + return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element, + event); +} + +static gboolean +gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query) +{ + gboolean res = TRUE; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_SEEKING: + { + GstFormat format; + + /* don't pass it along as some (file)sink might claim it does + * whereas with a collectpads in between that will not likely work */ + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + gst_query_set_seeking (query, format, FALSE, 0, -1); + res = TRUE; + + break; + } + case GST_QUERY_LATENCY: + SRC_LOCK (self); + res = gst_aggregator_query_latency_unlocked (self, query); + SRC_UNLOCK (self); + break; + default: + return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); + } + + return res; +} + +static gboolean +gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) +{ + EventData *evdata = user_data; + gboolean ret = TRUE; + GstPad *peer = gst_pad_get_peer (pad); + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + if (peer) { + if (evdata->only_to_active_pads && aggpad->priv->first_buffer) { + GST_DEBUG_OBJECT (pad, "not sending event to inactive pad"); + ret = TRUE; + } else { + ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); + GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); + gst_object_unref (peer); + } + } + + if (ret == FALSE) { + if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) { + GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME); + + GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); + + if (gst_pad_query (peer, seeking)) { + gboolean seekable; + + gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL); + + if (seekable == FALSE) { + GST_INFO_OBJECT (pad, + "Source not seekable, We failed but it does not matter!"); + + ret = TRUE; + } + } else { + GST_ERROR_OBJECT (pad, "Query seeking FAILED"); + } + + gst_query_unref (seeking); + } + + if (evdata->flush) { + PAD_LOCK (aggpad); + aggpad->priv->pending_flush_start = FALSE; + aggpad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (aggpad); + } + } else { + evdata->one_actually_seeked = TRUE; + } + + evdata->result &= ret; + + /* Always send to all pads */ + return FALSE; +} + +static void +gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, + EventData * evdata) +{ + evdata->result = TRUE; + evdata->one_actually_seeked = FALSE; + + /* We first need to set all pads as flushing in a first pass + * as flush_start flush_stop is sometimes sent synchronously + * while we send the seek event */ + if (evdata->flush) { + GList *l; + + GST_OBJECT_LOCK (self); + for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { + GstAggregatorPad *pad = l->data; + + PAD_LOCK (pad); + pad->priv->pending_flush_start = TRUE; + pad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (pad); + } + GST_OBJECT_UNLOCK (self); + } + + gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); + + gst_event_unref (evdata->event); +} + +static gboolean +gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) +{ + gdouble rate; + GstFormat fmt; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gboolean flush; + EventData evdata = { 0, }; + GstAggregatorPrivate *priv = self->priv; + + gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, + &start, &stop_type, &stop); + + GST_INFO_OBJECT (self, "starting SEEK"); + + flush = flags & GST_SEEK_FLAG_FLUSH; + + GST_OBJECT_LOCK (self); + if (flush) { + priv->pending_flush_start = TRUE; + priv->flush_seeking = TRUE; + } + + gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, + stop_type, stop, NULL); + + /* Seeking sets a position */ + self->priv->first_buffer = FALSE; + GST_OBJECT_UNLOCK (self); + + /* forward the seek upstream */ + evdata.event = event; + evdata.flush = flush; + evdata.only_to_active_pads = FALSE; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); + event = NULL; + + if (!evdata.result || !evdata.one_actually_seeked) { + GST_OBJECT_LOCK (self); + priv->flush_seeking = FALSE; + priv->pending_flush_start = FALSE; + GST_OBJECT_UNLOCK (self); + } + + GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result); + + return evdata.result; +} + +static gboolean +gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) +{ + EventData evdata = { 0, }; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + /* _do_seek() unrefs the event. */ + return gst_aggregator_do_seek (self, event); + case GST_EVENT_NAVIGATION: + /* navigation is rather pointless. */ + gst_event_unref (event); + return FALSE; + default: + break; + } + + /* Don't forward QOS events to pads that had no active buffer yet. Otherwise + * they will receive a QOS event that has earliest_time=0 (because we can't + * have negative timestamps), and consider their buffer as too late */ + evdata.event = event; + evdata.flush = FALSE; + evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); + return evdata.result; +} + +static gboolean +gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + return klass->src_event (GST_AGGREGATOR (parent), event); +} + +static gboolean +gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent, + GstQuery * query) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + return klass->src_query (GST_AGGREGATOR (parent), query); +} + +static gboolean +gst_aggregator_src_pad_activate_mode_func (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active) +{ + GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + if (klass->src_activate) { + if (klass->src_activate (self, mode, active) == FALSE) { + return FALSE; + } + } + + if (active == TRUE) { + switch (mode) { + case GST_PAD_MODE_PUSH: + { + GST_INFO_OBJECT (pad, "Activating pad!"); + gst_aggregator_start_srcpad_task (self); + return TRUE; + } + default: + { + GST_ERROR_OBJECT (pad, "Only supported mode is PUSH"); + return FALSE; + } + } + } + + /* deactivating */ + GST_INFO_OBJECT (self, "Deactivating srcpad"); + gst_aggregator_stop_srcpad_task (self, FALSE); + + return TRUE; +} + +static gboolean +gst_aggregator_default_sink_query (GstAggregator * self, + GstAggregatorPad * aggpad, GstQuery * query) +{ + GstPad *pad = GST_PAD (aggpad); + + if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) { + GstQuery *decide_query = NULL; + GstAggregatorClass *agg_class; + gboolean ret; + + GST_OBJECT_LOCK (self); + PAD_LOCK (aggpad); + if (G_UNLIKELY (!aggpad->priv->negotiated)) { + GST_DEBUG_OBJECT (self, + "not negotiated yet, can't answer ALLOCATION query"); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + + return FALSE; + } + + if ((decide_query = self->priv->allocation_query)) + gst_query_ref (decide_query); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + + GST_DEBUG_OBJECT (self, + "calling propose allocation with query %" GST_PTR_FORMAT, decide_query); + + agg_class = GST_AGGREGATOR_GET_CLASS (self); + + /* pass the query to the propose_allocation vmethod if any */ + if (agg_class->propose_allocation) + ret = agg_class->propose_allocation (self, aggpad, decide_query, query); + else + ret = FALSE; + + if (decide_query) + gst_query_unref (decide_query); + + GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query); + return ret; + } + + return gst_pad_query_default (pad, GST_OBJECT (self), query); +} + +static void +gst_aggregator_finalize (GObject * object) +{ + GstAggregator *self = (GstAggregator *) object; + + g_mutex_clear (&self->priv->src_lock); + g_cond_clear (&self->priv->src_cond); + + G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); +} + +/* + * gst_aggregator_set_latency_property: + * @agg: a #GstAggregator + * @latency: the new latency value (in nanoseconds). + * + * Sets the new latency value to @latency. This value is used to limit the + * amount of time a pad waits for data to appear before considering the pad + * as unresponsive. + */ +static void +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) +{ + gboolean changed; + + g_return_if_fail (GST_IS_AGGREGATOR (self)); + g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); + + SRC_LOCK (self); + changed = (self->priv->latency != latency); + + if (changed) { + GList *item; + + GST_OBJECT_LOCK (self); + /* First lock all the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_LOCK (aggpad); + } + + self->priv->latency = latency; + + SRC_BROADCAST (self); + + /* Now wake up the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + GST_OBJECT_UNLOCK (self); + } + + SRC_UNLOCK (self); + + if (changed) + gst_element_post_message (GST_ELEMENT_CAST (self), + gst_message_new_latency (GST_OBJECT_CAST (self))); +} + +/* + * gst_aggregator_get_latency_property: + * @agg: a #GstAggregator + * + * Gets the latency value. See gst_aggregator_set_latency for + * more details. + * + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * before a pad is deemed unresponsive. A value of -1 means an + * unlimited time. + */ +static GstClockTime +gst_aggregator_get_latency_property (GstAggregator * agg) +{ + GstClockTime res; + + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); + + GST_OBJECT_LOCK (agg); + res = agg->priv->latency; + GST_OBJECT_UNLOCK (agg); + + return res; +} + +static void +gst_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_LATENCY: + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); + break; + case PROP_START_TIME_SELECTION: + agg->priv->start_time_selection = g_value_get_enum (value); + break; + case PROP_START_TIME: + agg->priv->start_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_LATENCY: + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); + break; + case PROP_START_TIME_SELECTION: + g_value_set_enum (value, agg->priv->start_time_selection); + break; + case PROP_START_TIME: + g_value_set_uint64 (value, agg->priv->start_time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +/* GObject vmethods implementations */ +static void +gst_aggregator_class_init (GstAggregatorClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + + aggregator_parent_class = g_type_class_peek_parent (klass); + g_type_class_add_private (klass, sizeof (GstAggregatorPrivate)); + + GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", + GST_DEBUG_FG_MAGENTA, "GstAggregator"); + + klass->sink_event = gst_aggregator_default_sink_event; + klass->sink_query = gst_aggregator_default_sink_query; + + klass->src_event = gst_aggregator_default_src_event; + klass->src_query = gst_aggregator_default_src_query; + + klass->create_new_pad = gst_aggregator_default_create_new_pad; + klass->update_src_caps = gst_aggregator_default_update_src_caps; + klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps; + klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps; + + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); + gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_aggregator_release_pad); + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_aggregator_change_state); + + gobject_class->set_property = gst_aggregator_set_property; + gobject_class->get_property = gst_aggregator_get_property; + gobject_class->finalize = gst_aggregator_finalize; + + g_object_class_install_property (gobject_class, PROP_LATENCY, + g_param_spec_uint64 ("latency", "Buffer latency", + "Additional latency in live mode to allow upstream " + "to take longer to produce buffers for the current " + "position (in nanoseconds)", 0, G_MAXUINT64, + DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, + g_param_spec_enum ("start-time-selection", "Start Time Selection", + "Decides which start time is output", + gst_aggregator_start_time_selection_get_type (), + DEFAULT_START_TIME_SELECTION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_START_TIME, + g_param_spec_uint64 ("start-time", "Start Time", + "Start time to use if start-time-selection=set", 0, + G_MAXUINT64, + DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); +} + +static void +gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) +{ + GstPadTemplate *pad_template; + GstAggregatorPrivate *priv; + + g_return_if_fail (klass->aggregate != NULL); + + self->priv = + G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR, + GstAggregatorPrivate); + + priv = self->priv; + + pad_template = + gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src"); + g_return_if_fail (pad_template != NULL); + + priv->max_padserial = -1; + priv->tags_changed = FALSE; + + self->priv->peer_latency_live = FALSE; + self->priv->peer_latency_min = self->priv->sub_latency_min = 0; + self->priv->peer_latency_max = self->priv->sub_latency_max = 0; + self->priv->has_peer_latency = FALSE; + gst_aggregator_reset_flow_values (self); + + self->srcpad = gst_pad_new_from_template (pad_template, "src"); + + gst_pad_set_event_function (self->srcpad, + GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func)); + gst_pad_set_query_function (self->srcpad, + GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func)); + gst_pad_set_activatemode_function (self->srcpad, + GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func)); + + gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + + self->priv->latency = DEFAULT_LATENCY; + self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; + self->priv->start_time = DEFAULT_START_TIME; + + g_mutex_init (&self->priv->src_lock); + g_cond_init (&self->priv->src_cond); +} + +/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init + * method to get to the padtemplates */ +GType +gst_aggregator_get_type (void) +{ + static volatile gsize type = 0; + + if (g_once_init_enter (&type)) { + GType _type; + static const GTypeInfo info = { + sizeof (GstAggregatorClass), + NULL, + NULL, + (GClassInitFunc) gst_aggregator_class_init, + NULL, + NULL, + sizeof (GstAggregator), + 0, + (GInstanceInitFunc) gst_aggregator_init, + }; + + _type = g_type_register_static (GST_TYPE_ELEMENT, + "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT); + g_once_init_leave (&type, _type); + } + return type; +} + +/* Must be called with SRC lock and PAD lock held */ +static gboolean +gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) +{ + /* Empty queue always has space */ + if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) + return TRUE; + + /* We also want at least two buffers, one is being processed and one is ready + * for the next iteration when we operate in live mode. */ + if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2) + return TRUE; + + /* zero latency, if there is a buffer, it's full */ + if (self->priv->latency == 0) + return FALSE; + + /* Allow no more buffers than the latency */ + return (aggpad->priv->time_level <= self->priv->latency); +} + +/* Must be called with the PAD_LOCK held */ +static void +apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstClockTime timestamp; + + if (GST_BUFFER_DTS_IS_VALID (buffer)) + timestamp = GST_BUFFER_DTS (buffer); + else + timestamp = GST_BUFFER_PTS (buffer); + + if (timestamp == GST_CLOCK_TIME_NONE) { + if (head) + timestamp = aggpad->priv->head_position; + else + timestamp = aggpad->priv->tail_position; + } + + /* add duration */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + timestamp += GST_BUFFER_DURATION (buffer); + + if (head) + aggpad->priv->head_position = timestamp; + else + aggpad->priv->tail_position = timestamp; + + update_time_level (aggpad, head); +} + +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + +static GstFlowReturn +gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstFlowReturn flow_return; + GstClockTime buf_pts; + + GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); + + PAD_LOCK (aggpad); + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) + goto flushing; + + PAD_UNLOCK (aggpad); + + buf_pts = GST_BUFFER_PTS (buffer); + + for (;;) { + SRC_LOCK (self); + GST_OBJECT_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->first_buffer) { + self->priv->has_peer_latency = FALSE; + aggpad->priv->first_buffer = FALSE; + } + + if ((gst_aggregator_pad_has_space (self, aggpad) || !head) + && aggpad->priv->flow_return == GST_FLOW_OK) { + if (head) + g_queue_push_head (&aggpad->priv->data, buffer); + else + g_queue_push_tail (&aggpad->priv->data, buffer); + apply_buffer (aggpad, buffer, head); + aggpad->priv->num_buffers++; + buffer = NULL; + SRC_BROADCAST (self); + break; + } + + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) { + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); + goto flushing; + } + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); + PAD_WAIT_EVENT (aggpad); + + PAD_UNLOCK (aggpad); + } + + if (self->priv->first_buffer) { + GstClockTime start_time; + + switch (self->priv->start_time_selection) { + case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: + default: + start_time = 0; + break; + case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: + GST_OBJECT_LOCK (aggpad); + if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) { + start_time = buf_pts; + if (start_time != -1) { + start_time = MAX (start_time, aggpad->priv->head_segment.start); + start_time = + gst_segment_to_running_time (&aggpad->priv->head_segment, + GST_FORMAT_TIME, start_time); + } + } else { + start_time = 0; + GST_WARNING_OBJECT (aggpad, + "Ignoring request of selecting the first start time " + "as the segment is a %s segment instead of a time segment", + gst_format_get_name (aggpad->segment.format)); + } + GST_OBJECT_UNLOCK (aggpad); + break; + case GST_AGGREGATOR_START_TIME_SELECTION_SET: + start_time = self->priv->start_time; + if (start_time == -1) + start_time = 0; + break; + } + + if (start_time != -1) { + if (self->segment.position == -1) + self->segment.position = start_time; + else + self->segment.position = MIN (start_time, self->segment.position); + + GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, + GST_TIME_ARGS (start_time)); + } + } + + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); + + GST_DEBUG_OBJECT (aggpad, "Done chaining"); + + return flow_return; + +flushing: + PAD_UNLOCK (aggpad); + + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", + gst_flow_get_name (flow_return)); + if (buffer) + gst_buffer_unref (buffer); + + return flow_return; +} + +static GstFlowReturn +gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; +} + +static gboolean +gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, + GstQuery * query) +{ + GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { + GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + PAD_WAIT_EVENT (aggpad); + } + + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + + if (aggpad->priv->flow_return != GST_FLOW_OK) + goto flushing; + + PAD_UNLOCK (aggpad); + + return ret; + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + return klass->sink_query (self, aggpad, query); + } + +flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + + return FALSE; +} + +/* Queue serialized events and let the others go through directly. + * The queued events with be handled from the src-pad task in + * gst_aggregator_do_events_and_queries(). + */ +static GstFlowReturn +gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + if (GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + SRC_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->flow_return != GST_FLOW_OK) + goto flushing; + + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->priv->head_segment); + aggpad->priv->head_position = aggpad->priv->head_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); + } + + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + if (!klass->sink_event (self, aggpad, event)) { + /* Copied from GstPad to convert boolean to a GstFlowReturn in + * the event handling func */ + ret = GST_FLOW_ERROR; + } + } + + return ret; + +flushing: + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); + if (GST_EVENT_IS_STICKY (event)) + gst_pad_store_sticky_event (pad, event); + gst_event_unref (event); + + return aggpad->priv->flow_return; +} + +static gboolean +gst_aggregator_pad_activate_mode_func (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active) +{ + GstAggregator *self = GST_AGGREGATOR (parent); + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + if (active == FALSE) { + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + } else { + PAD_LOCK (aggpad); + aggpad->priv->flow_return = GST_FLOW_OK; + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + + return TRUE; +} + +/*********************************** + * GstAggregatorPad implementation * + ************************************/ +G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); + +static void +gst_aggregator_pad_constructed (GObject * object) +{ + GstPad *pad = GST_PAD (object); + + gst_pad_set_chain_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); + gst_pad_set_event_full_function_full (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); + gst_pad_set_query_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); + gst_pad_set_activatemode_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); +} + +static void +gst_aggregator_pad_finalize (GObject * object) +{ + GstAggregatorPad *pad = (GstAggregatorPad *) object; + + g_cond_clear (&pad->priv->event_cond); + g_mutex_clear (&pad->priv->flush_lock); + g_mutex_clear (&pad->priv->lock); + + G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object); +} + +static void +gst_aggregator_pad_dispose (GObject * object) +{ + GstAggregatorPad *pad = (GstAggregatorPad *) object; + + gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); + + G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); +} + +static void +gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + + g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate)); + + gobject_class->constructed = gst_aggregator_pad_constructed; + gobject_class->finalize = gst_aggregator_pad_finalize; + gobject_class->dispose = gst_aggregator_pad_dispose; +} + +static void +gst_aggregator_pad_init (GstAggregatorPad * pad) +{ + pad->priv = + G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, + GstAggregatorPadPrivate); + + g_queue_init (&pad->priv->data); + g_cond_init (&pad->priv->event_cond); + + g_mutex_init (&pad->priv->flush_lock); + g_mutex_init (&pad->priv->lock); + + gst_aggregator_pad_reset_unlocked (pad); + pad->priv->negotiated = FALSE; +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +{ + pad->priv->num_buffers--; + GST_TRACE_OBJECT (pad, "Consuming buffer"); + PAD_BROADCAST_EVENT (pad); +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) +{ + GstAggregator *self = NULL; + GstAggregatorClass *aggclass = NULL; + GstBuffer *buffer = NULL; + + while (pad->priv->clipped_buffer == NULL && + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); + + apply_buffer (pad, buffer, FALSE); + + /* We only take the parent here so that it's not taken if the buffer is + * already clipped or if the queue is empty. + */ + if (self == NULL) { + self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); + if (self == NULL) { + gst_buffer_unref (buffer); + return; + } + + aggclass = GST_AGGREGATOR_GET_CLASS (self); + } + + if (aggclass->clip) { + GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); + + buffer = aggclass->clip (self, pad, buffer); + + if (buffer == NULL) { + gst_aggregator_pad_buffer_consumed (pad); + GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); + } + } + + pad->priv->clipped_buffer = buffer; + } + + if (self) + gst_object_unref (self); +} + +/** + * gst_aggregator_pad_steal_buffer: + * @pad: the pad to get buffer from + * + * Steal the ref to the buffer currently queued in @pad. + * + * Returns: (transfer full): The buffer in @pad or NULL if no buffer was + * queued. You should unref the buffer after usage. + */ +GstBuffer * +gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +{ + GstBuffer *buffer; + + PAD_LOCK (pad); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + buffer = pad->priv->clipped_buffer; + + if (buffer) { + pad->priv->clipped_buffer = NULL; + gst_aggregator_pad_buffer_consumed (pad); + GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); + } + + PAD_UNLOCK (pad); + + return buffer; +} + +/** + * gst_aggregator_pad_drop_buffer: + * @pad: the pad where to drop any pending buffer + * + * Drop the buffer currently queued in @pad. + * + * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not. + */ +gboolean +gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) +{ + GstBuffer *buf; + + buf = gst_aggregator_pad_steal_buffer (pad); + + if (buf == NULL) + return FALSE; + + gst_buffer_unref (buf); + return TRUE; +} + +/** + * gst_aggregator_pad_get_buffer: + * @pad: the pad to get buffer from + * + * Returns: (transfer full): A reference to the buffer in @pad or + * NULL if no buffer was queued. You should unref the buffer after + * usage. + */ +GstBuffer * +gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +{ + GstBuffer *buffer; + + PAD_LOCK (pad); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + if (pad->priv->clipped_buffer) { + buffer = gst_buffer_ref (pad->priv->clipped_buffer); + } else { + buffer = NULL; + } + PAD_UNLOCK (pad); + + return buffer; +} + +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ +gboolean +gst_aggregator_pad_is_eos (GstAggregatorPad * pad) +{ + gboolean is_eos; + + PAD_LOCK (pad); + is_eos = pad->priv->eos; + PAD_UNLOCK (pad); + + return is_eos; +} + +/** + * gst_aggregator_merge_tags: + * @self: a #GstAggregator + * @tags: a #GstTagList to merge + * @mode: the #GstTagMergeMode to use + * + * Adds tags to so-called pending tags, which will be processed + * before pushing out data downstream. + * + * Note that this is provided for convenience, and the subclass is + * not required to use this and can still do tag handling on its own. + * + * MT safe. + */ +void +gst_aggregator_merge_tags (GstAggregator * self, + const GstTagList * tags, GstTagMergeMode mode) +{ + GstTagList *otags; + + g_return_if_fail (GST_IS_AGGREGATOR (self)); + g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags)); + + /* FIXME Check if we can use OBJECT lock here! */ + GST_OBJECT_LOCK (self); + if (tags) + GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags); + otags = self->priv->tags; + self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode); + if (otags) + gst_tag_list_unref (otags); + self->priv->tags_changed = TRUE; + GST_OBJECT_UNLOCK (self); +} + +/** + * gst_aggregator_set_latency: + * @self: a #GstAggregator + * @min_latency: minimum latency + * @max_latency: maximum latency + * + * Lets #GstAggregator sub-classes tell the baseclass what their internal + * latency is. Will also post a LATENCY message on the bus so the pipeline + * can reconfigure its global latency. + */ +void +gst_aggregator_set_latency (GstAggregator * self, + GstClockTime min_latency, GstClockTime max_latency) +{ + gboolean changed = FALSE; + + g_return_if_fail (GST_IS_AGGREGATOR (self)); + g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency)); + g_return_if_fail (max_latency >= min_latency); + + SRC_LOCK (self); + if (self->priv->sub_latency_min != min_latency) { + self->priv->sub_latency_min = min_latency; + changed = TRUE; + } + if (self->priv->sub_latency_max != max_latency) { + self->priv->sub_latency_max = max_latency; + changed = TRUE; + } + + if (changed) + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + if (changed) { + gst_element_post_message (GST_ELEMENT_CAST (self), + gst_message_new_latency (GST_OBJECT_CAST (self))); + } +} + +/** + * gst_aggregator_get_buffer_pool: + * @self: a #GstAggregator + * + * Returns: (transfer full): the instance of the #GstBufferPool used + * by @trans; free it after use it + */ +GstBufferPool * +gst_aggregator_get_buffer_pool (GstAggregator * self) +{ + GstBufferPool *pool; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL); + + GST_OBJECT_LOCK (self); + pool = self->priv->pool; + if (pool) + gst_object_ref (pool); + GST_OBJECT_UNLOCK (self); + + return pool; +} + +/** + * gst_aggregator_get_allocator: + * @self: a #GstAggregator + * @allocator: (out) (allow-none) (transfer full): the #GstAllocator + * used + * @params: (out) (allow-none) (transfer full): the + * #GstAllocationParams of @allocator + * + * Lets #GstAggregator sub-classes get the memory @allocator + * acquired by the base class and its @params. + * + * Unref the @allocator after use it. + */ +void +gst_aggregator_get_allocator (GstAggregator * self, + GstAllocator ** allocator, GstAllocationParams * params) +{ + g_return_if_fail (GST_IS_AGGREGATOR (self)); + + if (allocator) + *allocator = self->priv->allocator ? + gst_object_ref (self->priv->allocator) : NULL; + + if (params) + *params = self->priv->allocation_params; +} diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h new file mode 100644 index 000000000..fc2d6aeae --- /dev/null +++ b/libs/gst/base/gstaggregator.h @@ -0,0 +1,336 @@ +/* GStreamer aggregator base class + * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@oencreed.com> + * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_AGGREGATOR_H__ +#define __GST_AGGREGATOR_H__ + +#ifndef GST_USE_UNSTABLE_API +#warning "The Base library from gst-plugins-bad is unstable API and may change in future." +#warning "You can define GST_USE_UNSTABLE_API to avoid this warning." +#endif + +#include <gst/gst.h> + +G_BEGIN_DECLS + +/************************** + * GstAggregator Structs * + *************************/ + +typedef struct _GstAggregator GstAggregator; +typedef struct _GstAggregatorPrivate GstAggregatorPrivate; +typedef struct _GstAggregatorClass GstAggregatorClass; + +/************************ + * GstAggregatorPad API * + ***********************/ + +#define GST_TYPE_AGGREGATOR_PAD (gst_aggregator_pad_get_type()) +#define GST_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPad)) +#define GST_AGGREGATOR_PAD_CAST(obj) ((GstAggregatorPad *)(obj)) +#define GST_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadClass)) +#define GST_AGGREGATOR_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadClass)) +#define GST_IS_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AGGREGATOR_PAD)) +#define GST_IS_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AGGREGATOR_PAD)) + +/**************************** + * GstAggregatorPad Structs * + ***************************/ + +typedef struct _GstAggregatorPad GstAggregatorPad; +typedef struct _GstAggregatorPadClass GstAggregatorPadClass; +typedef struct _GstAggregatorPadPrivate GstAggregatorPadPrivate; + +/** + * GstAggregatorPad: + * @segment: last segment received. + * + * The implementation the GstPad to use with #GstAggregator + */ +struct _GstAggregatorPad +{ + GstPad parent; + + /* Protected by the OBJECT_LOCK */ + GstSegment segment; + + /* < Private > */ + GstAggregatorPadPrivate * priv; + + gpointer _gst_reserved[GST_PADDING]; +}; + +/** + * GstAggregatorPadClass: + * @flush: Optional + * Called when the pad has received a flush stop, this is the place + * to flush any information specific to the pad, it allows for individual + * pads to be flushed while others might not be. + * + */ +struct _GstAggregatorPadClass +{ + GstPadClass parent_class; + + GstFlowReturn (*flush) (GstAggregatorPad * aggpad, GstAggregator * aggregator); + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_aggregator_pad_get_type (void); + +/**************************** + * GstAggregatorPad methods * + ***************************/ + +GST_EXPORT +GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad); + +GST_EXPORT +GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad); + +GST_EXPORT +gboolean gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad); + +GST_EXPORT +gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad); + +/********************* + * GstAggregator API * + ********************/ + +#define GST_TYPE_AGGREGATOR (gst_aggregator_get_type()) +#define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AGGREGATOR,GstAggregator)) +#define GST_AGGREGATOR_CAST(obj) ((GstAggregator *)(obj)) +#define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AGGREGATOR,GstAggregatorClass)) +#define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AGGREGATOR,GstAggregatorClass)) +#define GST_IS_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AGGREGATOR)) +#define GST_IS_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AGGREGATOR)) + +#define GST_AGGREGATOR_FLOW_NEED_DATA GST_FLOW_CUSTOM_ERROR + +/** + * GstAggregator: + * @srcpad: the aggregator's source pad + * @segment: the output segment + * + * Aggregator base class object structure. + */ +struct _GstAggregator +{ + GstElement parent; + + GstPad * srcpad; + + /* Only access with the object lock held */ + GstSegment segment; + + /*< private >*/ + GstAggregatorPrivate * priv; + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +/** + * GstAggregatorClass: + * @flush: Optional. + * Called after a successful flushing seek, once all the flush + * stops have been received. Flush pad-specific data in + * #GstAggregatorPad->flush. + * @clip: Optional. + * Called when a buffer is received on a sink pad, the task of + * clipping it and translating it to the current segment falls + * on the subclass. The function should use the segment of data + * and the negotiated media type on the pad to perform + * clipping of input buffer. This function takes ownership of + * buf and should output a buffer or return NULL in + * if the buffer should be dropped. + * @sink_event: Optional. + * Called when an event is received on a sink pad, the subclass + * should always chain up. + * @sink_query: Optional. + * Called when a query is received on a sink pad, the subclass + * should always chain up. + * @src_event: Optional. + * Called when an event is received on the src pad, the subclass + * should always chain up. + * @src_query: Optional. + * Called when a query is received on the src pad, the subclass + * should always chain up. + * @src_activate: Optional. + * Called when the src pad is activated, it will start/stop its + * pad task right after that call. + * @aggregate: Mandatory. + * Called when buffers are queued on all sinkpads. Classes + * should iterate the GstElement->sinkpads and peek or steal + * buffers from the #GstAggregatorPads. If the subclass returns + * GST_FLOW_EOS, sending of the eos event will be taken care + * of. Once / if a buffer has been constructed from the + * aggregated buffers, the subclass should call _finish_buffer. + * @stop: Optional. + * Called when the element goes from PAUSED to READY. + * The subclass should free all resources and reset its state. + * @start: Optional. + * Called when the element goes from READY to PAUSED. + * The subclass should get ready to process + * aggregated buffers. + * @get_next_time: Optional. + * Called when the element needs to know the running time of the next + * rendered buffer for live pipelines. This causes deadline + * based aggregation to occur. Defaults to returning + * GST_CLOCK_TIME_NONE causing the element to wait for buffers + * on all sink pads before aggregating. + * @update_src_caps: Lets subclasses update the #GstCaps representing + * the src pad caps before usage. The result should end up + * in @ret. Return %GST_AGGREGATOR_FLOW_NEED_DATA to indicate that the + * element needs more information (caps, a buffer, etc) to + * choose the correct caps. Should return ANY caps if the + * stream has not caps at all. + * @fixate_src_caps: Optional. + * Fixate and return the src pad caps provided. The function takes + * ownership of @caps and returns a fixated version of + * @caps. @caps is not guaranteed to be writable. + * @negotiated_src_caps: Optional. + * Notifies subclasses what caps format has been negotiated + * @decide_allocation: Optional. + * Allows the subclass to influence the allocation choices. + * Setup the allocation parameters for allocating output + * buffers. The passed in query contains the result of the + * downstream allocation query. + * + * The aggregator base class will handle in a thread-safe way all manners of + * concurrent flushes, seeks, pad additions and removals, leaving to the + * subclass the responsibility of clipping buffers, and aggregating buffers in + * the way the implementor sees fit. + * + * It will also take care of event ordering (stream-start, segment, eos). + * + * Basically, a simple implementation will override @aggregate, and call + * _finish_buffer from inside that function. + */ +struct _GstAggregatorClass { + GstElementClass parent_class; + + GstFlowReturn (*flush) (GstAggregator * aggregator); + + GstBuffer * (*clip) (GstAggregator * aggregator, + GstAggregatorPad * aggregator_pad, + GstBuffer * buf); + + /* sinkpads virtual methods */ + gboolean (*sink_event) (GstAggregator * aggregator, + GstAggregatorPad * aggregator_pad, + GstEvent * event); + + gboolean (*sink_query) (GstAggregator * aggregator, + GstAggregatorPad * aggregator_pad, + GstQuery * query); + + /* srcpad virtual methods */ + gboolean (*src_event) (GstAggregator * aggregator, + GstEvent * event); + + gboolean (*src_query) (GstAggregator * aggregator, + GstQuery * query); + + gboolean (*src_activate) (GstAggregator * aggregator, + GstPadMode mode, + gboolean active); + + GstFlowReturn (*aggregate) (GstAggregator * aggregator, + gboolean timeout); + + gboolean (*stop) (GstAggregator * aggregator); + + gboolean (*start) (GstAggregator * aggregator); + + GstClockTime (*get_next_time) (GstAggregator * aggregator); + + GstAggregatorPad * (*create_new_pad) (GstAggregator * self, + GstPadTemplate * templ, + const gchar * req_name, + const GstCaps * caps); + GstFlowReturn (*update_src_caps) (GstAggregator * self, + GstCaps * caps, + GstCaps ** ret); + GstCaps * (*fixate_src_caps) (GstAggregator * self, + GstCaps * caps); + gboolean (*negotiated_src_caps) (GstAggregator * self, + GstCaps * caps); + gboolean (*decide_allocation) (GstAggregator * self, + GstQuery * query); + gboolean (*propose_allocation) (GstAggregator * self, + GstAggregatorPad * pad, + GstQuery * decide_query, + GstQuery * query); + /*< private >*/ + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +/************************************ + * GstAggregator convenience macros * + ***********************************/ + +/** + * GST_AGGREGATOR_SRC_PAD: + * @agg: a #GstAggregator + * + * Convenience macro to access the source pad of #GstAggregator + * + * Since: 1.6 + */ +#define GST_AGGREGATOR_SRC_PAD(agg) (((GstAggregator *)(agg))->srcpad) + +/************************* + * GstAggregator methods * + ************************/ + +GST_EXPORT +GstFlowReturn gst_aggregator_finish_buffer (GstAggregator * self, + GstBuffer * buffer); + +GST_EXPORT +void gst_aggregator_set_src_caps (GstAggregator * self, + GstCaps * caps); + +GST_EXPORT +void gst_aggregator_set_latency (GstAggregator * self, + GstClockTime min_latency, + GstClockTime max_latency); + +GST_EXPORT +GType gst_aggregator_get_type(void); + +GST_EXPORT +GstClockTime gst_aggregator_get_latency (GstAggregator * self); + +GST_EXPORT +GstBufferPool * gst_aggregator_get_buffer_pool (GstAggregator * self); + +GST_EXPORT +void gst_aggregator_get_allocator (GstAggregator * self, + GstAllocator ** allocator, + GstAllocationParams * params); + +G_END_DECLS + +#endif /* __GST_AGGREGATOR_H__ */ diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c new file mode 100644 index 000000000..06ef40311 --- /dev/null +++ b/tests/check/libs/aggregator.c @@ -0,0 +1,1276 @@ +/* + * aggregator.c - GstAggregator testsuite + * Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com> + * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@oencreed.com> + * Copyright (C) 2014 Thibault Saunier <tsaunier@opencreed.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <gst/check/gstcheck.h> +#include <gst/base/gstaggregator.h> + +/* dummy aggregator based element */ + +#define GST_TYPE_TEST_AGGREGATOR (gst_test_aggregator_get_type ()) +#define GST_TEST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEST_AGGREGATOR, GstTestAggregator)) +#define GST_TEST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEST_AGGREGATOR, GstTestAggregatorClass)) +#define GST_TEST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_TEST_AGGREGATOR, GstTestAggregatorClass)) + +#define fail_error_message(msg) \ + G_STMT_START { \ + GError *error; \ + gst_message_parse_error(msg, &error, NULL); \ + fail_unless(FALSE, "Error Message from %s : %s", \ + GST_OBJECT_NAME (GST_MESSAGE_SRC(msg)), error->message); \ + g_error_free (error); \ + } G_STMT_END; + +typedef struct _GstTestAggregator GstTestAggregator; +typedef struct _GstTestAggregatorClass GstTestAggregatorClass; + +static GType gst_test_aggregator_get_type (void); + +#define BUFFER_DURATION 100000000 /* 10 frames per second */ +#define TEST_GAP_PTS 0 +#define TEST_GAP_DURATION (5 * GST_SECOND) + +struct _GstTestAggregator +{ + GstAggregator parent; + + guint64 timestamp; + gboolean gap_expected; +}; + +struct _GstTestAggregatorClass +{ + GstAggregatorClass parent_class; +}; + +static GstFlowReturn +gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) +{ + GstIterator *iter; + gboolean all_eos = TRUE; + GstTestAggregator *testagg; + GstBuffer *buf; + + gboolean done_iterating = FALSE; + + testagg = GST_TEST_AGGREGATOR (aggregator); + + iter = gst_element_iterate_sink_pads (GST_ELEMENT (testagg)); + while (!done_iterating) { + GValue value = { 0, }; + GstAggregatorPad *pad; + + switch (gst_iterator_next (iter, &value)) { + case GST_ITERATOR_OK: + pad = g_value_get_object (&value); + + if (gst_aggregator_pad_is_eos (pad) == FALSE) + all_eos = FALSE; + + if (testagg->gap_expected == TRUE) { + buf = gst_aggregator_pad_get_buffer (pad); + fail_unless (buf); + fail_unless (GST_BUFFER_PTS (buf) == TEST_GAP_PTS); + fail_unless (GST_BUFFER_DURATION (buf) == TEST_GAP_DURATION); + fail_unless (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)); + fail_unless (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DROPPABLE)); + gst_buffer_unref (buf); + testagg->gap_expected = FALSE; + } + + gst_aggregator_pad_drop_buffer (pad); + + g_value_reset (&value); + break; + case GST_ITERATOR_RESYNC: + gst_iterator_resync (iter); + break; + case GST_ITERATOR_ERROR: + GST_WARNING_OBJECT (testagg, "Sinkpads iteration error"); + done_iterating = TRUE; + break; + case GST_ITERATOR_DONE: + done_iterating = TRUE; + break; + } + } + gst_iterator_free (iter); + + if (all_eos == TRUE) { + GST_INFO_OBJECT (testagg, "no data available, must be EOS"); + gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ()); + return GST_FLOW_EOS; + } + + buf = gst_buffer_new (); + GST_BUFFER_TIMESTAMP (buf) = testagg->timestamp; + GST_BUFFER_DURATION (buf) = BUFFER_DURATION; + testagg->timestamp += BUFFER_DURATION; + + gst_aggregator_finish_buffer (aggregator, buf); + + /* We just check finish_frame return FLOW_OK */ + return GST_FLOW_OK; +} + +#define gst_test_aggregator_parent_class parent_class +G_DEFINE_TYPE (GstTestAggregator, gst_test_aggregator, GST_TYPE_AGGREGATOR); + +static void +gst_test_aggregator_class_init (GstTestAggregatorClass * klass) +{ + GstElementClass *gstelement_class = (GstElementClass *) klass; + GstAggregatorClass *base_aggregator_class = (GstAggregatorClass *) klass; + + static GstStaticPadTemplate _src_template = + GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + + static GstStaticPadTemplate _sink_template = + GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); + + gst_element_class_add_static_pad_template (gstelement_class, &_src_template); + + gst_element_class_add_static_pad_template (gstelement_class, &_sink_template); + + gst_element_class_set_static_metadata (gstelement_class, "Aggregator", + "Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>"); + + base_aggregator_class->aggregate = + GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate); +} + +static void +gst_test_aggregator_init (GstTestAggregator * self) +{ + GstAggregator *agg = GST_AGGREGATOR (self); + gst_segment_init (&agg->segment, GST_FORMAT_TIME); + self->timestamp = 0; + self->gap_expected = FALSE; +} + +static gboolean +gst_test_aggregator_plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "testaggregator", GST_RANK_NONE, + GST_TYPE_TEST_AGGREGATOR); +} + +static gboolean +gst_test_aggregator_plugin_register (void) +{ + return gst_plugin_register_static (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "testaggregator", + "Combine buffers", + gst_test_aggregator_plugin_init, + VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); +} + +/* test helpers */ + +typedef struct +{ + GQueue *queue; + GstElement *aggregator; + GstPad *sinkpad, *srcpad; + GstFlowReturn expected_result; + + /* ------------------ + * ----------- --------|-- | + * | srcpad | -- | sinkpad | aggregator | + * ----------- --------|-- | + * ------------------ + * This is for 1 Chain, we can have several + */ +} ChainData; + +typedef struct +{ + GMainLoop *ml; + GstPad *srcpad, /* srcpad of the GstAggregator */ + *sinkpad; /* fake sinkpad to which GstAggregator.srcpad is linked */ + guint timeout_id; + GstElement *aggregator; + + /* -----------------| + * | ---------- ----------- + * | aggregator | srcpad | -- | sinkpad | + * | ---------- ----------- + * -----------------| + */ + + gint flush_start_events, flush_stop_events; +} TestData; + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static void +start_flow (ChainData * chain_data) +{ + GstSegment segment; + GstCaps *caps; + + gst_pad_push_event (chain_data->srcpad, gst_event_new_stream_start ("test")); + + caps = gst_caps_new_empty_simple ("foo/x-bar"); + gst_pad_push_event (chain_data->srcpad, gst_event_new_caps (caps)); + gst_caps_unref (caps); + + gst_segment_init (&segment, GST_FORMAT_TIME); + gst_pad_push_event (chain_data->srcpad, gst_event_new_segment (&segment)); +} + +static gpointer +push_data (gpointer user_data) +{ + ChainData *chain_data = (ChainData *) user_data; + GstTestAggregator *aggregator = (GstTestAggregator *) chain_data->aggregator; + GstPad *sinkpad = chain_data->sinkpad; + GstPad *srcpad = chain_data->srcpad; + gpointer data; + + start_flow (chain_data); + + while ((data = g_queue_pop_head (chain_data->queue))) { + GST_DEBUG_OBJECT (sinkpad, "Pushing %" GST_PTR_FORMAT, data); + + /* switch on the data type and push */ + if (GST_IS_BUFFER (data)) { + GstFlowReturn flow = gst_pad_push (srcpad, GST_BUFFER_CAST (data)); + fail_unless (flow == chain_data->expected_result, + "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow), + gst_flow_get_name (chain_data->expected_result), + GST_DEBUG_PAD_NAME (sinkpad)); + } else if (GST_IS_EVENT (data)) { + switch (GST_EVENT_TYPE (data)) { + case GST_EVENT_GAP: + aggregator->gap_expected = TRUE; + break; + default: + break; + } + fail_unless (gst_pad_push_event (srcpad, GST_EVENT_CAST (data))); + } else if (GST_IS_QUERY (data)) { + /* we don't care whether the query actualy got handled */ + gst_pad_peer_query (srcpad, GST_QUERY_CAST (data)); + gst_query_unref (GST_QUERY_CAST (data)); + } else { + GST_WARNING_OBJECT (sinkpad, "bad queue entry: %" GST_PTR_FORMAT, data); + } + } + GST_DEBUG_OBJECT (sinkpad, "All data from queue sent"); + + return NULL; +} + +static gboolean +_aggregate_timeout (GMainLoop * ml) +{ + g_main_loop_quit (ml); + + fail_unless ("No buffer found on aggregator.srcpad -> TIMEOUT" == NULL); + + return FALSE; +} + +static gboolean +_quit (GMainLoop * ml) +{ + GST_DEBUG ("QUITING ML"); + g_main_loop_quit (ml); + + return G_SOURCE_REMOVE; +} + +static GstPadProbeReturn +_aggregated_cb (GstPad * pad, GstPadProbeInfo * info, GMainLoop * ml) +{ + GST_DEBUG ("Received data %" GST_PTR_FORMAT, info->data); + GST_DEBUG ("Should quit ML"); + g_idle_add ((GSourceFunc) _quit, ml); + + return GST_PAD_PROBE_REMOVE; +} + +static GstPadProbeReturn +_downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, TestData * test) +{ + GST_DEBUG ("PROBING "); + if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { + if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == + GST_EVENT_FLUSH_START) { + + g_atomic_int_inc (&test->flush_start_events); + GST_DEBUG ("==========> FLUSH: %i", test->flush_start_events); + } else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == + GST_EVENT_FLUSH_STOP) + g_atomic_int_inc (&test->flush_stop_events); + } + return GST_PAD_PROBE_OK; +} + +/* + * Not thread safe, will create a new ChainData which contains + * an activated src pad linked to a requested sink pad of @agg, and + * a newly allocated buffer ready to be pushed. Caller needs to + * clear with _chain_data_clear after. + */ +static void +_chain_data_init (ChainData * data, GstElement * agg, ...) +{ + static gint num_src_pads = 0; + gchar *pad_name = g_strdup_printf ("src%d", num_src_pads); + va_list var_args; + gpointer d; + + num_src_pads += 1; + + data->srcpad = gst_pad_new_from_static_template (&srctemplate, pad_name); + g_free (pad_name); + gst_pad_set_active (data->srcpad, TRUE); + data->aggregator = agg; + data->sinkpad = gst_element_get_request_pad (agg, "sink_%u"); + fail_unless (GST_IS_PAD (data->sinkpad)); + fail_unless (gst_pad_link (data->srcpad, data->sinkpad) == GST_PAD_LINK_OK); + + /* add data items */ + data->queue = g_queue_new (); + va_start (var_args, agg); + while (TRUE) { + if (!(d = va_arg (var_args, gpointer))) + break; + g_queue_push_tail (data->queue, d); + GST_DEBUG_OBJECT (data->sinkpad, "Adding to queue: %" GST_PTR_FORMAT, d); + } + va_end (var_args); +} + +static void +_chain_data_clear (ChainData * chain_data) +{ + gpointer data; + + while ((data = g_queue_pop_head (chain_data->queue))) { + /* switch on the data type and free */ + if (GST_IS_BUFFER (data)) { + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_EVENT (data)) { + gst_event_unref (GST_EVENT_CAST (data)); + } else if (GST_IS_QUERY (data)) { + gst_query_unref (GST_QUERY_CAST (data)); + } else { + GST_WARNING_OBJECT (chain_data->sinkpad, "bad queue entry: %" + GST_PTR_FORMAT, data); + } + } + g_queue_free (chain_data->queue); + + if (chain_data->srcpad) + gst_object_unref (chain_data->srcpad); + if (chain_data->sinkpad) + gst_object_unref (chain_data->sinkpad); +} + +static GstFlowReturn +_test_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + /* accept any buffers */ + gst_buffer_unref (buffer); + return GST_FLOW_OK; +} + +static void +_test_data_init (TestData * test, gboolean needs_flushing) +{ + test->aggregator = gst_element_factory_make ("testaggregator", NULL); + gst_element_set_state (test->aggregator, GST_STATE_PLAYING); + test->ml = g_main_loop_new (NULL, TRUE); + test->srcpad = GST_AGGREGATOR (test->aggregator)->srcpad; + + GST_DEBUG_OBJECT (test->srcpad, "Init test data for srcpad"); + + if (needs_flushing) { + static gint num_sink_pads = 0; + gchar *pad_name = g_strdup_printf ("sink%d", num_sink_pads); + + num_sink_pads += 1; + test->sinkpad = gst_pad_new_from_static_template (&sinktemplate, pad_name); + gst_pad_set_chain_function (test->sinkpad, _test_chain); + gst_pad_set_active (test->sinkpad, TRUE); + g_free (pad_name); + fail_unless (gst_pad_link (test->srcpad, test->sinkpad) == GST_PAD_LINK_OK); + gst_pad_add_probe (test->srcpad, GST_PAD_PROBE_TYPE_EVENT_FLUSH, + (GstPadProbeCallback) _downstream_probe_cb, test, NULL); + } else { + gst_pad_add_probe (test->srcpad, GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _aggregated_cb, test->ml, NULL); + } + + test->timeout_id = + g_timeout_add (1000, (GSourceFunc) _aggregate_timeout, test->ml); +} + +static void +_test_data_clear (TestData * test) +{ + gst_element_set_state (test->aggregator, GST_STATE_NULL); + gst_object_unref (test->aggregator); + + if (test->sinkpad) + gst_object_unref (test->sinkpad); + + g_main_loop_unref (test->ml); +} + +/* tests */ + +GST_START_TEST (test_aggregate) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; + +GST_START_TEST (test_aggregate_eos) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_event_new_eos (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; + +GST_START_TEST (test_aggregate_gap) +{ + GThread *thread; + ChainData data = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + _chain_data_init (&data, test.aggregator, + gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION), NULL); + + thread = g_thread_try_new ("gst-check", push_data, &data, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread); + + _chain_data_clear (&data); + _test_data_clear (&test); +} + +GST_END_TEST; + +GST_START_TEST (test_aggregate_handle_events) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + _chain_data_init (&data1, test.aggregator, + gst_event_new_tag (gst_tag_list_new_empty ()), gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; + +GST_START_TEST (test_aggregate_handle_queries) +{ + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + + _test_data_init (&test, FALSE); + _chain_data_init (&data1, test.aggregator, + gst_query_new_allocation (gst_caps_new_empty_simple ("foo/x-bar"), FALSE), + gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); + + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + /* these will return immediately as when the data is popped the threads are + * unlocked and will terminate */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); +} + +GST_END_TEST; + +#define NUM_BUFFERS 3 +static void +handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count) +{ + *count = *count + 1; + GST_DEBUG ("HANDOFF: %i", *count); +} + +/* Test a linear pipeline using aggregator */ +GST_START_TEST (test_linear_pipeline) +{ + GstBus *bus; + GstMessage *msg; + GstElement *pipeline, *src, *agg, *sink; + gint count = 0; + + pipeline = gst_pipeline_new ("pipeline"); + src = gst_check_setup_element ("fakesrc"); + g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, + NULL); + agg = gst_check_setup_element ("testaggregator"); + sink = gst_check_setup_element ("fakesink"); + g_object_set (sink, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink, "handoff", (GCallback) handoff, &count); + + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (src, agg)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + fail_unless_equals_int (count, NUM_BUFFERS); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +GST_START_TEST (test_two_src_pipeline) +{ + GstBus *bus; + GstMessage *msg; + GstElement *pipeline, *src, *src1, *agg, *sink; + gint count = 0; + + pipeline = gst_pipeline_new ("pipeline"); + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, + NULL); + + src1 = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src1, "num-buffers", NUM_BUFFERS + 1, "sizetype", 2, "sizemax", + 4, NULL); + + agg = gst_check_setup_element ("testaggregator"); + sink = gst_check_setup_element ("fakesink"); + g_object_set (sink, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink, "handoff", (GCallback) handoff, &count); + + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_bin_add (GST_BIN (pipeline), src1)); + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (src, agg)); + fail_unless (gst_element_link (src1, agg)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + fail_unless_equals_int (count, NUM_BUFFERS + 1); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +static GstPadProbeReturn +_drop_buffer_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + gint wait; + + if (GST_IS_BUFFER (info->data)) { + wait = GPOINTER_TO_INT (user_data); + if (wait > 0) + g_usleep (wait / 1000); + return GST_PAD_PROBE_DROP; + } + + return GST_PAD_PROBE_PASS; +} + +#define TIMEOUT_NUM_BUFFERS 20 +static void +_test_timeout (gint buffer_wait) +{ + GstBus *bus; + GstMessage *msg; + GstElement *pipeline, *src, *src1, *agg, *sink; + GstPad *src1pad; + gint count = 0; + + pipeline = gst_pipeline_new ("pipeline"); + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2, + "sizemax", 4, "is-live", TRUE, "datarate", 4000, NULL); + + src1 = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src1, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2, + "sizemax", 4, "is-live", TRUE, "datarate", 4000, NULL); + + agg = gst_check_setup_element ("testaggregator"); + g_object_set (agg, "latency", GST_USECOND, NULL); + sink = gst_check_setup_element ("fakesink"); + g_object_set (sink, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink, "handoff", (GCallback) handoff, &count); + + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_bin_add (GST_BIN (pipeline), src1)); + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + + src1pad = gst_element_get_static_pad (src1, "src"); + fail_if (src1pad == NULL); + gst_pad_add_probe (src1pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, + (GstPadProbeCallback) _drop_buffer_probe_cb, + GINT_TO_POINTER (buffer_wait), NULL); + + fail_unless (gst_element_link (src, agg)); + fail_unless (gst_element_link (src1, agg)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + /* cannot rely on the exact number of buffers as the timeout may produce + * more buffers with the unsynchronized _aggregate() implementation in + * testaggregator */ + fail_if (count < TIMEOUT_NUM_BUFFERS); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (src1pad); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_START_TEST (test_timeout_pipeline) +{ + _test_timeout (0); +} + +GST_END_TEST; + +GST_START_TEST (test_timeout_pipeline_with_wait) +{ + _test_timeout (1000000 /* 1 ms */ ); +} + +GST_END_TEST; + +GST_START_TEST (test_flushing_seek) +{ + GstEvent *event; + GThread *thread1, *thread2; + ChainData data1 = { 0, }; + ChainData data2 = { 0, }; + TestData test = { 0, }; + GstBuffer *buf; + + _test_data_init (&test, TRUE); + + /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the + * new flushing seek logic is triggered. On the first FLUSH_START call the + * buffers queued in collectpads should get flushed. Only one FLUSH_START and + * one FLUSH_STOP should be forwarded downstream. + */ + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + + buf = gst_buffer_new (); + GST_BUFFER_TIMESTAMP (buf) = 0; + _chain_data_init (&data2, test.aggregator, buf, NULL); + + gst_segment_init (&GST_AGGREGATOR (test.aggregator)->segment, + GST_FORMAT_TIME); + + /* now do a successful flushing seek */ + event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); + fail_unless (gst_pad_send_event (test.srcpad, event)); + + /* flushing starts once one of the upstream elements sends the first + * FLUSH_START */ + fail_unless_equals_int (test.flush_start_events, 0); + fail_unless_equals_int (test.flush_stop_events, 0); + + /* send a first FLUSH_START on agg:sink_0, will be sent downstream */ + GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start"); + fail_unless (gst_pad_push_event (data2.srcpad, gst_event_new_flush_start ())); + fail_unless_equals_int (test.flush_start_events, 1); + fail_unless_equals_int (test.flush_stop_events, 0); + + /* expect this buffer to be flushed */ + data2.expected_result = GST_FLOW_FLUSHING; + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + /* this should send not additional flush_start */ + GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start"); + fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ())); + fail_unless_equals_int (test.flush_start_events, 1); + fail_unless_equals_int (test.flush_stop_events, 0); + + /* the first FLUSH_STOP is not forwarded downstream */ + GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop"); + fail_unless (gst_pad_push_event (data1.srcpad, + gst_event_new_flush_stop (TRUE))); + fail_unless_equals_int (test.flush_start_events, 1); + fail_unless_equals_int (test.flush_stop_events, 0); + + /* at this point even the other pad agg:sink_1 should be flushing so thread2 + * should have stopped */ + g_thread_join (thread2); + + /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify + * that flushing completes once all the pads have been flushed */ + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + + /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is + * sent downstream */ + GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop"); + gst_pad_push_event (data2.srcpad, gst_event_new_flush_stop (TRUE)); + + /* and the last FLUSH_STOP is forwarded downstream */ + fail_unless_equals_int (test.flush_stop_events, 1); + + /* Check collected */ + gst_pad_add_probe (test.srcpad, GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _aggregated_cb, test.ml, NULL); + + g_queue_push_tail (data2.queue, gst_event_new_eos ()); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); + + g_main_loop_run (test.ml); + g_source_remove (test.timeout_id); + + fail_unless_equals_int (test.flush_stop_events, 1); + + /* these will return immediately as at this point the threads have been + * unlocked and are finished */ + g_thread_join (thread1); + g_thread_join (thread2); + + _chain_data_clear (&data1); + _chain_data_clear (&data2); + _test_data_clear (&test); + +} + +GST_END_TEST; + +static void +infinite_seek (guint num_srcs, guint num_seeks, gboolean is_live) +{ + GstBus *bus; + GstMessage *message; + GstElement *pipeline, *src, *agg, *sink; + gint count = 0, i; + gboolean seek_res, carry_on = TRUE; + + pipeline = gst_pipeline_new ("pipeline"); + + agg = gst_check_setup_element ("testaggregator"); + sink = gst_check_setup_element ("fakesink"); + + if (is_live) + g_object_set (agg, "latency", GST_MSECOND, NULL); + + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (agg, sink)); + + for (i = 0; i < num_srcs; i++) { + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "sizetype", 2, "sizemax", 4, + "format", GST_FORMAT_TIME, "datarate", 1000, NULL); + if (is_live) + g_object_set (src, "is-live", TRUE, NULL); + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_element_link (src, agg)); + } + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + while (count < num_seeks && carry_on) { + message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 10); + if (message) { + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_EOS: + { + /* we should check if we really finished here */ + GST_WARNING ("Got an EOS"); + carry_on = FALSE; + break; + } + case GST_MESSAGE_STATE_CHANGED: + { + GstState new; + + if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) { + gst_message_parse_state_changed (message, NULL, &new, NULL); + + if (new != GST_STATE_PLAYING) + break; + + GST_INFO ("Seeking (num: %i)", count); + seek_res = + gst_element_seek_simple (sink, GST_FORMAT_TIME, + GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, 0); + GST_INFO ("seek result is : %d", seek_res); + fail_unless (seek_res != 0); + count++; + } + + break; + } + case GST_MESSAGE_ERROR: + GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message); + carry_on = FALSE; + fail_error_message (message); + break; + default: + break; + } + gst_message_unref (message); + } + } + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_START_TEST (test_infinite_seek) +{ + infinite_seek (2, 500, FALSE); +} + +GST_END_TEST; + +GST_START_TEST (test_infinite_seek_50_src) +{ + infinite_seek (50, 100, FALSE); +} + +GST_END_TEST; + +GST_START_TEST (test_infinite_seek_50_src_live) +{ + infinite_seek (50, 100, TRUE); +} + +GST_END_TEST; + +typedef struct +{ + GstElement *agg, *src, *pipeline; + GCond *cond; + GMutex *lock; +} RemoveElementData; + +static GstPadProbeReturn +pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, RemoveElementData * data) +{ + GstPad *peer; + + GST_INFO_OBJECT (pad, "Removing pad"); + + peer = gst_pad_get_peer (pad); + gst_pad_unlink (pad, peer); + gst_element_release_request_pad (data->agg, peer); + fail_unless (gst_bin_remove (GST_BIN (data->pipeline), data->src)); + gst_object_unref (peer); + + g_mutex_lock (data->lock); + g_cond_broadcast (data->cond); + g_mutex_unlock (data->lock); + + return GST_PAD_PROBE_OK; +} + +GST_START_TEST (test_add_remove) +{ + /* Used to notify that we removed the pad from */ + GCond cond; + GMutex lock; + GstBus *bus; + GstState state; + GstMessage *message; + gboolean carry_on = TRUE; + guint num_iterations = 100; + GstPad *pad; + GstElement *pipeline, *src, *src1 = NULL, *agg, *sink; + gint count = 0; + + g_mutex_init (&lock); + g_cond_init (&cond); + + pipeline = gst_pipeline_new ("pipeline"); + + agg = gst_check_setup_element ("testaggregator"); + sink = gst_check_setup_element ("fakesink"); + + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + while (count < num_iterations) { + + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "num-buffers", 100000, "sizetype", 2, "sizemax", 4, + "format", GST_FORMAT_TIME, "datarate", 1000, NULL); + gst_element_set_locked_state (src, TRUE); + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_element_link (src, agg)); + gst_element_set_locked_state (src, FALSE); + fail_unless (gst_element_sync_state_with_parent (src)); + + if (count == 0) + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + /* Now make sure the seek happend */ + carry_on = TRUE; + do { + message = gst_bus_timed_pop (bus, -1); + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_EOS: + { + /* we should check if we really finished here */ + GST_WARNING ("Got an EOS"); + carry_on = FALSE; + break; + } + case GST_MESSAGE_STATE_CHANGED: + { + if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) { + gst_message_parse_state_changed (message, NULL, &state, NULL); + + if (state == GST_STATE_PLAYING) { + RemoveElementData data; + + carry_on = FALSE; + if (count == 0) { + GST_DEBUG ("First run, not removing any element yet"); + + break; + } + + data.src = gst_object_ref (src1); + data.agg = agg; + data.lock = &lock; + data.cond = &cond; + data.pipeline = pipeline; + pad = gst_element_get_static_pad (data.src, "src"); + + g_mutex_lock (&lock); + gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, + (GstPadProbeCallback) pad_probe_cb, &data, NULL); + GST_INFO ("Waiting for %" GST_PTR_FORMAT " %s", pad, + gst_element_state_get_name (GST_STATE (data.src))); + g_cond_wait (&cond, &lock); + g_mutex_unlock (&lock); + gst_object_unref (pad); + + /* We can not set state from the streaming thread so we + * need to make sure that the source has been removed + * before setting its state to NULL */ + gst_element_set_state (data.src, GST_STATE_NULL); + + gst_object_unref (data.src); + } + } + + break; + } + case GST_MESSAGE_ERROR: + { + GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message); + carry_on = FALSE; + fail_error_message (message); + break; + } + default: + break; + } + + gst_message_unref (message); + } while (carry_on); + + GST_INFO ("Seeking"); + fail_unless (gst_element_seek_simple (pipeline, GST_FORMAT_TIME, + GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, 0)); + + count++; + src1 = src; + } + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); + g_mutex_clear (&lock); + g_cond_clear (&cond); +} + +GST_END_TEST; + +GST_START_TEST (test_change_state_intensive) +{ + GstBus *bus; + GstMessage *message; + GstElement *pipeline, *src, *agg, *sink; + gint i, state_i = 0, num_srcs = 3; + gboolean carry_on = TRUE, ready = FALSE; + GstStateChangeReturn state_return; + GstState wanted_state, wanted_states[] = { + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY, + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY, + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY, + GST_STATE_PAUSED, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_READY, + GST_STATE_PAUSED, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_NULL, + GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_NULL, + GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_NULL, + GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL, + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL, + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL, + GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL, + }; + + pipeline = gst_pipeline_new ("pipeline"); + + agg = gst_check_setup_element ("testaggregator"); + sink = gst_check_setup_element ("fakesink"); + + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (agg, sink)); + + for (i = 0; i < num_srcs; i++) { + src = gst_element_factory_make ("fakesrc", NULL); + g_object_set (src, "sizetype", 2, "sizemax", 4, NULL); + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_element_link (src, agg)); + } + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + + wanted_state = wanted_states[state_i++]; + state_return = gst_element_set_state (pipeline, wanted_state); + + while (state_i < G_N_ELEMENTS (wanted_states) && carry_on) { + if (state_return == GST_STATE_CHANGE_SUCCESS && ready) { + wanted_state = wanted_states[state_i++]; + fail_unless (gst_element_set_state (pipeline, wanted_state), + GST_STATE_CHANGE_SUCCESS); + GST_INFO ("Wanted state: %s", gst_element_state_get_name (wanted_state)); + } + + message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 10); + if (message) { + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_EOS: + { + /* we should check if we really finished here */ + GST_WARNING ("Got an EOS"); + carry_on = FALSE; + break; + } + case GST_MESSAGE_STATE_CHANGED: + { + GstState new; + + if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) { + gst_message_parse_state_changed (message, NULL, &new, NULL); + + if (new != wanted_state) { + ready = FALSE; + break; + } + + GST_DEBUG ("State %s reached", + gst_element_state_get_name (wanted_state)); + wanted_state = wanted_states[state_i++]; + GST_DEBUG ("Wanted state: %s", + gst_element_state_get_name (wanted_state)); + state_return = gst_element_set_state (pipeline, wanted_state); + fail_unless (state_return == GST_STATE_CHANGE_SUCCESS || + state_return == GST_STATE_CHANGE_ASYNC); + ready = TRUE; + } + + break; + } + case GST_MESSAGE_ERROR: + GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message); + carry_on = FALSE; + break; + default: + break; + } + gst_message_unref (message); + } + } + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +static Suite * +gst_aggregator_suite (void) +{ + Suite *suite; + TCase *general; + + gst_test_aggregator_plugin_register (); + + suite = suite_create ("GstAggregator"); + + general = tcase_create ("general"); + suite_add_tcase (suite, general); + tcase_add_test (general, test_aggregate); + tcase_add_test (general, test_aggregate_eos); + tcase_add_test (general, test_aggregate_gap); + tcase_add_test (general, test_aggregate_handle_events); + tcase_add_test (general, test_aggregate_handle_queries); + tcase_add_test (general, test_flushing_seek); + tcase_add_test (general, test_infinite_seek); + tcase_add_test (general, test_infinite_seek_50_src); + tcase_add_test (general, test_infinite_seek_50_src_live); + tcase_add_test (general, test_linear_pipeline); + tcase_add_test (general, test_two_src_pipeline); + tcase_add_test (general, test_timeout_pipeline); + tcase_add_test (general, test_timeout_pipeline_with_wait); + tcase_add_test (general, test_add_remove); + tcase_add_test (general, test_change_state_intensive); + + return suite; +} + +GST_CHECK_MAIN (gst_aggregator); |