diff options
author | Josep Torra <n770galaxy@gmail.com> | 2013-09-20 17:09:52 +0200 |
---|---|---|
committer | Josep Torra <n770galaxy@gmail.com> | 2013-09-20 18:50:26 +0200 |
commit | aa21522afb6effb474e32e53ee04a831d8c18965 (patch) | |
tree | 14b60301415af9d48ad27646250c177d9b9a3a3c | |
parent | 995e3af89adcc550e00046116e3ac513b90ab141 (diff) |
examples: fix a race condition when seeking
Fixes a race condition that caused pipeline deadlock during seeks.
-rw-r--r-- | examples/egl/testegl.c | 119 |
1 files changed, 74 insertions, 45 deletions
diff --git a/examples/egl/testegl.c b/examples/egl/testegl.c index fa43d88..b20c91e 100644 --- a/examples/egl/testegl.c +++ b/examples/egl/testegl.c @@ -162,11 +162,13 @@ typedef struct /* GStreamer related resources */ GstElement *pipeline; + GstElement *vsink; GstEGLDisplay *gst_display; /* Interthread comunication */ GAsyncQueue *queue; - GMutex *lock; + GMutex *queue_lock; + GMutex *flow_lock; GCond *cond; gboolean flushing; GstMiniObject *popped_obj; @@ -954,14 +956,11 @@ update_image (APP_STATE_T * state, GstBuffer * buffer) { GstMemory *mem = gst_buffer_peek_memory (buffer, 0); - g_mutex_lock (state->lock); if (state->current_mem) { gst_memory_unref (state->current_mem); } state->current_mem = gst_memory_ref (mem); - g_mutex_unlock (state->lock); - TRACE_VC_MEMORY_ONCE_FOR_ID ("before glEGLImageTargetTexture2DOES", gid0); glBindTexture (GL_TEXTURE_2D, state->tex); @@ -969,8 +968,6 @@ update_image (APP_STATE_T * state, GstBuffer * buffer) gst_egl_image_memory_get_image (mem)); TRACE_VC_MEMORY_ONCE_FOR_ID ("after glEGLImageTargetTexture2DOES", gid1); - - render_scene (state); } static void @@ -978,7 +975,8 @@ init_intercom (APP_STATE_T * state) { state->queue = g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref); - state->lock = g_mutex_new (); + state->queue_lock = g_mutex_new (); + state->flow_lock = g_mutex_new (); state->cond = g_cond_new (); } @@ -990,8 +988,12 @@ terminate_intercom (APP_STATE_T * state) g_async_queue_unref (state->queue); } - if (state->lock) { - g_mutex_free (state->lock); + if (state->queue_lock) { + g_mutex_free (state->queue_lock); + } + + if (state->flow_lock) { + g_mutex_free (state->flow_lock); } if (state->cond) { @@ -1013,33 +1015,47 @@ flush_start (APP_STATE_T * state) { GstMiniObject *object = NULL; - g_mutex_lock (state->lock); + g_mutex_lock (state->queue_lock); state->flushing = TRUE; g_cond_broadcast (state->cond); - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); + g_mutex_lock (state->flow_lock); while ((object = g_async_queue_try_pop (state->queue))) { gst_mini_object_unref (object); } - + g_mutex_lock (state->queue_lock); flush_internal (state); + state->popped_obj = NULL; + g_mutex_unlock (state->queue_lock); + g_mutex_unlock (state->flow_lock); } static void flush_stop (APP_STATE_T * state) { - g_mutex_lock (state->lock); + GstMiniObject *object = NULL; + + g_mutex_lock (state->queue_lock); + while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) { + gst_mini_object_unref (object); + } + flush_internal (state); state->popped_obj = NULL; state->flushing = FALSE; - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); } static void pipeline_pause (APP_STATE_T * state) { - flush_start (state); gst_element_set_state (state->pipeline, GST_STATE_PAUSED); - flush_stop (state); +} + +static void +pipeline_play (APP_STATE_T * state) +{ + gst_element_set_state (state->pipeline, GST_STATE_PLAYING); } static gint64 @@ -1048,7 +1064,7 @@ pipeline_get_position (APP_STATE_T * state) gint64 position = -1; if (state->pipeline) { - gst_element_query_position (state->pipeline, GST_FORMAT_TIME, &position); + gst_element_query_position (state->vsink, GST_FORMAT_TIME, &position); } return position; @@ -1074,7 +1090,7 @@ pipeline_seek (APP_STATE_T * state, gint64 position) event = gst_event_new_seek (1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE); - if (!gst_element_send_event (state->pipeline, event)) { + if (!gst_element_send_event (state->vsink, event)) { g_print ("seek failed\n"); } } @@ -1084,32 +1100,33 @@ static gboolean handle_queued_objects (APP_STATE_T * state) { GstMiniObject *object = NULL; + gboolean done = FALSE; if (g_async_queue_length (state->queue) == 0) { return FALSE; } - while ((object = g_async_queue_try_pop (state->queue))) { + g_mutex_lock (state->queue_lock); + if (state->flushing) { + g_cond_broadcast (state->cond); + done = TRUE; + } + g_mutex_unlock (state->queue_lock); - g_mutex_lock (state->lock); - if (state->flushing) { - state->popped_obj = object; - gst_mini_object_unref (object); - g_cond_broadcast (state->cond); - g_mutex_unlock (state->lock); - continue; - } - g_mutex_unlock (state->lock); + while (!done && (object = g_async_queue_try_pop (state->queue))) { if (GST_IS_BUFFER (object)) { GstBuffer *buffer = GST_BUFFER_CAST (object); + g_mutex_lock (state->queue_lock); update_image (state, buffer); + render_scene (state); gst_buffer_unref (buffer); + g_mutex_unlock (state->queue_lock); } else if (GST_IS_QUERY (object)) { GstQuery *query = GST_QUERY_CAST (object); GstStructure *s = (GstStructure *) gst_query_get_structure (query); - g_mutex_lock (state->lock); + g_mutex_lock (state->queue_lock); if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) { GstBuffer *buffer; @@ -1139,7 +1156,7 @@ handle_queued_objects (APP_STATE_T * state) state->popped_obj = object; g_cond_broadcast (state->cond); - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); return TRUE; } else if (GST_IS_EVENT (object)) { @@ -1147,7 +1164,7 @@ handle_queued_objects (APP_STATE_T * state) g_print ("\nevent %p %s\n", event, gst_event_type_get_name (GST_EVENT_TYPE (event))); - g_mutex_lock (state->lock); + g_mutex_lock (state->queue_lock); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: flush_internal (state); @@ -1155,14 +1172,14 @@ handle_queued_objects (APP_STATE_T * state) default: break; } - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); gst_event_unref (event); } - g_mutex_lock (state->lock); + g_mutex_lock (state->queue_lock); state->popped_obj = object; g_cond_broadcast (state->cond); - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); } return FALSE; @@ -1171,24 +1188,29 @@ handle_queued_objects (APP_STATE_T * state) static gboolean queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous) { - g_mutex_lock (state->lock); + gboolean res = TRUE; + + g_mutex_lock (state->flow_lock); if (state->flushing) { - g_mutex_unlock (state->lock); gst_mini_object_unref (obj); - return FALSE; + res = FALSE; + goto beach; } g_async_queue_push (state->queue, obj); if (synchronous) { /* Waiting for object to be handled */ + g_mutex_lock (state->queue_lock); do { - g_cond_wait (state->cond, state->lock); + g_cond_wait (state->cond, state->queue_lock); } while (!state->flushing && state->popped_obj != obj); + g_mutex_unlock (state->queue_lock); } - g_mutex_unlock (state->lock); - return TRUE; +beach: + g_mutex_unlock (state->flow_lock); + return res; } static void @@ -1261,9 +1283,9 @@ query_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) return GST_PAD_PROBE_OK; } - g_mutex_lock (state->lock); + g_mutex_lock (state->queue_lock); pool = state->pool ? gst_object_ref (state->pool) : NULL; - g_mutex_unlock (state->lock); + g_mutex_unlock (state->queue_lock); if (pool) { GstCaps *pcaps; @@ -1373,6 +1395,7 @@ init_playbin_player (APP_STATE_T * state, const gchar * uri) "video-sink", vsink, "flags", GST_PLAY_FLAG_NATIVE_VIDEO | GST_PLAY_FLAG_AUDIO, NULL); + state->vsink = gst_object_ref (vsink); return TRUE; } @@ -1409,6 +1432,7 @@ init_parse_launch_player (APP_STATE_T * state, const gchar * spipeline) gst_pad_add_probe (gst_element_get_static_pad (vsink, "sink"), GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, query_cb, state, NULL); + state->vsink = gst_object_ref (vsink); return TRUE; } @@ -1502,7 +1526,7 @@ handle_keyboard (GIOChannel * source, GIOCondition cond, APP_STATE_T * state) pipeline_pause (state); break; case 'r': - gst_element_set_state (state->pipeline, GST_STATE_PLAYING); + pipeline_play (state); break; case 'l': report_position_duration (state); @@ -1555,10 +1579,10 @@ buffering_cb (GstBus * bus, GstMessage * msg, APP_STATE_T * state) gst_message_parse_buffering (msg, &percent); g_print ("Buffering %3d%%\r", percent); if (percent < 100) - gst_element_set_state (state->pipeline, GST_STATE_PAUSED); + pipeline_pause (state); else { g_print ("\n"); - gst_element_set_state (state->pipeline, GST_STATE_PLAYING); + pipeline_play (state); } } @@ -1774,6 +1798,11 @@ done: /* Release pipeline */ if (state->pipeline) { gst_element_set_state (state->pipeline, GST_STATE_NULL); + if (state->vsink) { + gst_object_unref (state->vsink); + state->vsink = NULL; + } + gst_object_unref (state->pipeline); } |