diff options
-rw-r--r-- | omx/gstomx_base_filter.c | 11 | ||||
-rw-r--r-- | omx/gstomx_util.c | 6 | ||||
-rw-r--r-- | omx/gstomx_util.h | 1 | ||||
-rw-r--r-- | tests/standalone/core.c | 28 | ||||
-rw-r--r-- | util/async_queue.c | 33 | ||||
-rw-r--r-- | util/async_queue.h | 1 |
6 files changed, 68 insertions, 12 deletions
diff --git a/omx/gstomx_base_filter.c b/omx/gstomx_base_filter.c index 074468a..a4ec715 100644 --- a/omx/gstomx_base_filter.c +++ b/omx/gstomx_base_filter.c @@ -646,8 +646,10 @@ pad_event (GstPad *pad, break; case GST_EVENT_FLUSH_START: - g_omx_port_flush (self->in_port); - g_omx_port_flush (self->out_port); + /* flush all buffers */ + OMX_SendCommand (self->gomx->omx_handle, OMX_CommandFlush, OMX_ALL, NULL); + + /* unlock loops */ g_omx_port_disable (self->in_port); g_omx_port_disable (self->out_port); @@ -712,9 +714,8 @@ activate_push (GstPad *pad, if (self->initialized) { - /* taint buffers */ - g_omx_port_flush (self->in_port); - g_omx_port_flush (self->out_port); + /* flush all buffers */ + OMX_SendCommand (self->gomx->omx_handle, OMX_CommandFlush, OMX_ALL, NULL); /* unlock loops */ g_omx_port_disable (self->in_port); diff --git a/omx/gstomx_util.c b/omx/gstomx_util.c index 6f3d3c5..f17b8cb 100644 --- a/omx/gstomx_util.c +++ b/omx/gstomx_util.c @@ -536,12 +536,6 @@ g_omx_port_disable (GOmxPort *port) } void -g_omx_port_flush (GOmxPort *port) -{ - /** @todo tain the buffers */ -} - -void g_omx_port_finish (GOmxPort *port) { port->enabled = FALSE; diff --git a/omx/gstomx_util.h b/omx/gstomx_util.h index e4720f4..6147852 100644 --- a/omx/gstomx_util.h +++ b/omx/gstomx_util.h @@ -135,7 +135,6 @@ OMX_BUFFERHEADERTYPE *g_omx_port_request_buffer (GOmxPort *port); void g_omx_port_release_buffer (GOmxPort *port, OMX_BUFFERHEADERTYPE *omx_buffer); void g_omx_port_enable (GOmxPort *port); void g_omx_port_disable (GOmxPort *port); -void g_omx_port_flush (GOmxPort *port); void g_omx_port_finish (GOmxPort *port); GOmxSem *g_omx_sem_new (void); diff --git a/tests/standalone/core.c b/tests/standalone/core.c index 4e8080e..cfd1941 100644 --- a/tests/standalone/core.c +++ b/tests/standalone/core.c @@ -34,6 +34,7 @@ struct CompPrivate OMX_PTR app_data; CompPrivatePort *ports; gboolean done; + GMutex *flush_mutex; }; struct CompPrivatePort @@ -145,6 +146,27 @@ comp_SendCommand (OMX_HANDLETYPE handle, OMX_CommandStateSet, private->state, data); } break; + case OMX_CommandFlush: + { + g_mutex_lock (private->flush_mutex); + { + OMX_BUFFERHEADERTYPE *buffer; + + while (buffer = async_queue_pop_forced (private->ports[0].queue)) + { + private->callbacks->EmptyBufferDone (comp, + private->app_data, buffer); + } + + while (buffer = async_queue_pop_forced (private->ports[1].queue)) + { + private->callbacks->FillBufferDone (comp, + private->app_data, buffer); + } + } + g_mutex_unlock (private->flush_mutex); + } + break; default: /* printf ("command: %d\n", command); */ break; @@ -222,6 +244,8 @@ foo_thread (gpointer cb_data) out_buffer->nFlags = in_buffer->nFlags; } + g_mutex_lock (private->flush_mutex); + private->callbacks->FillBufferDone (comp, private->app_data, out_buffer); if (in_buffer->nFilledLen == 0) @@ -229,6 +253,8 @@ foo_thread (gpointer cb_data) private->callbacks->EmptyBufferDone (comp, private->app_data, in_buffer); } + + g_mutex_unlock (private->flush_mutex); } return NULL; @@ -297,6 +323,7 @@ OMX_GetHandle (OMX_HANDLETYPE *handle, private->callbacks = callbacks; private->app_data = data; private->ports = calloc (2, sizeof (CompPrivatePort)); + private->flush_mutex = g_mutex_new (); private->ports[0].queue = async_queue_new (); private->ports[1].queue = async_queue_new (); @@ -341,5 +368,6 @@ OMX_GetHandle (OMX_HANDLETYPE *handle, OMX_ERRORTYPE OMX_FreeHandle (OMX_HANDLETYPE handle) { + /** @todo Free private structure? */ return OMX_ErrorNone; } diff --git a/util/async_queue.c b/util/async_queue.c index fbfaaac..b83bfb8 100644 --- a/util/async_queue.c +++ b/util/async_queue.c @@ -99,6 +99,39 @@ leave: return data; } +gpointer +async_queue_pop_forced (AsyncQueue *queue) +{ + gpointer data = NULL; + + g_mutex_lock (queue->mutex); + + if (!queue->enabled) + { + /* g_warning ("not enabled!"); */ + goto leave; + } + + if (queue->tail) + { + GList *node = queue->tail; + data = node->data; + + queue->tail = node->prev; + if (queue->tail) + queue->tail->next = NULL; + else + queue->head = NULL; + queue->length--; + g_list_free_1 (node); + } + +leave: + g_mutex_unlock (queue->mutex); + + return data; +} + void async_queue_disable (AsyncQueue *queue) { diff --git a/util/async_queue.h b/util/async_queue.h index 5e5cf91..5573022 100644 --- a/util/async_queue.h +++ b/util/async_queue.h @@ -38,6 +38,7 @@ AsyncQueue *async_queue_new (void); void async_queue_free (AsyncQueue *queue); void async_queue_push (AsyncQueue *queue, gpointer data); gpointer async_queue_pop (AsyncQueue *queue); +gpointer async_queue_pop_forced (AsyncQueue *queue); void async_queue_disable (AsyncQueue *queue); void async_queue_enable (AsyncQueue *queue); |