summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--omx/gstomx_base_filter.c11
-rw-r--r--omx/gstomx_util.c6
-rw-r--r--omx/gstomx_util.h1
-rw-r--r--tests/standalone/core.c28
-rw-r--r--util/async_queue.c33
-rw-r--r--util/async_queue.h1
6 files changed, 68 insertions, 12 deletions
diff --git a/omx/gstomx_base_filter.c b/omx/gstomx_base_filter.c
index 074468a..a4ec715 100644
--- a/omx/gstomx_base_filter.c
+++ b/omx/gstomx_base_filter.c
@@ -646,8 +646,10 @@ pad_event (GstPad *pad,
break;
case GST_EVENT_FLUSH_START:
- g_omx_port_flush (self->in_port);
- g_omx_port_flush (self->out_port);
+ /* flush all buffers */
+ OMX_SendCommand (self->gomx->omx_handle, OMX_CommandFlush, OMX_ALL, NULL);
+
+ /* unlock loops */
g_omx_port_disable (self->in_port);
g_omx_port_disable (self->out_port);
@@ -712,9 +714,8 @@ activate_push (GstPad *pad,
if (self->initialized)
{
- /* taint buffers */
- g_omx_port_flush (self->in_port);
- g_omx_port_flush (self->out_port);
+ /* flush all buffers */
+ OMX_SendCommand (self->gomx->omx_handle, OMX_CommandFlush, OMX_ALL, NULL);
/* unlock loops */
g_omx_port_disable (self->in_port);
diff --git a/omx/gstomx_util.c b/omx/gstomx_util.c
index 6f3d3c5..f17b8cb 100644
--- a/omx/gstomx_util.c
+++ b/omx/gstomx_util.c
@@ -536,12 +536,6 @@ g_omx_port_disable (GOmxPort *port)
}
void
-g_omx_port_flush (GOmxPort *port)
-{
- /** @todo tain the buffers */
-}
-
-void
g_omx_port_finish (GOmxPort *port)
{
port->enabled = FALSE;
diff --git a/omx/gstomx_util.h b/omx/gstomx_util.h
index e4720f4..6147852 100644
--- a/omx/gstomx_util.h
+++ b/omx/gstomx_util.h
@@ -135,7 +135,6 @@ OMX_BUFFERHEADERTYPE *g_omx_port_request_buffer (GOmxPort *port);
void g_omx_port_release_buffer (GOmxPort *port, OMX_BUFFERHEADERTYPE *omx_buffer);
void g_omx_port_enable (GOmxPort *port);
void g_omx_port_disable (GOmxPort *port);
-void g_omx_port_flush (GOmxPort *port);
void g_omx_port_finish (GOmxPort *port);
GOmxSem *g_omx_sem_new (void);
diff --git a/tests/standalone/core.c b/tests/standalone/core.c
index 4e8080e..cfd1941 100644
--- a/tests/standalone/core.c
+++ b/tests/standalone/core.c
@@ -34,6 +34,7 @@ struct CompPrivate
OMX_PTR app_data;
CompPrivatePort *ports;
gboolean done;
+ GMutex *flush_mutex;
};
struct CompPrivatePort
@@ -145,6 +146,27 @@ comp_SendCommand (OMX_HANDLETYPE handle,
OMX_CommandStateSet, private->state, data);
}
break;
+ case OMX_CommandFlush:
+ {
+ g_mutex_lock (private->flush_mutex);
+ {
+ OMX_BUFFERHEADERTYPE *buffer;
+
+ while (buffer = async_queue_pop_forced (private->ports[0].queue))
+ {
+ private->callbacks->EmptyBufferDone (comp,
+ private->app_data, buffer);
+ }
+
+ while (buffer = async_queue_pop_forced (private->ports[1].queue))
+ {
+ private->callbacks->FillBufferDone (comp,
+ private->app_data, buffer);
+ }
+ }
+ g_mutex_unlock (private->flush_mutex);
+ }
+ break;
default:
/* printf ("command: %d\n", command); */
break;
@@ -222,6 +244,8 @@ foo_thread (gpointer cb_data)
out_buffer->nFlags = in_buffer->nFlags;
}
+ g_mutex_lock (private->flush_mutex);
+
private->callbacks->FillBufferDone (comp,
private->app_data, out_buffer);
if (in_buffer->nFilledLen == 0)
@@ -229,6 +253,8 @@ foo_thread (gpointer cb_data)
private->callbacks->EmptyBufferDone (comp,
private->app_data, in_buffer);
}
+
+ g_mutex_unlock (private->flush_mutex);
}
return NULL;
@@ -297,6 +323,7 @@ OMX_GetHandle (OMX_HANDLETYPE *handle,
private->callbacks = callbacks;
private->app_data = data;
private->ports = calloc (2, sizeof (CompPrivatePort));
+ private->flush_mutex = g_mutex_new ();
private->ports[0].queue = async_queue_new ();
private->ports[1].queue = async_queue_new ();
@@ -341,5 +368,6 @@ OMX_GetHandle (OMX_HANDLETYPE *handle,
OMX_ERRORTYPE
OMX_FreeHandle (OMX_HANDLETYPE handle)
{
+ /** @todo Free private structure? */
return OMX_ErrorNone;
}
diff --git a/util/async_queue.c b/util/async_queue.c
index fbfaaac..b83bfb8 100644
--- a/util/async_queue.c
+++ b/util/async_queue.c
@@ -99,6 +99,39 @@ leave:
return data;
}
+gpointer
+async_queue_pop_forced (AsyncQueue *queue)
+{
+ gpointer data = NULL;
+
+ g_mutex_lock (queue->mutex);
+
+ if (!queue->enabled)
+ {
+ /* g_warning ("not enabled!"); */
+ goto leave;
+ }
+
+ if (queue->tail)
+ {
+ GList *node = queue->tail;
+ data = node->data;
+
+ queue->tail = node->prev;
+ if (queue->tail)
+ queue->tail->next = NULL;
+ else
+ queue->head = NULL;
+ queue->length--;
+ g_list_free_1 (node);
+ }
+
+leave:
+ g_mutex_unlock (queue->mutex);
+
+ return data;
+}
+
void
async_queue_disable (AsyncQueue *queue)
{
diff --git a/util/async_queue.h b/util/async_queue.h
index 5e5cf91..5573022 100644
--- a/util/async_queue.h
+++ b/util/async_queue.h
@@ -38,6 +38,7 @@ AsyncQueue *async_queue_new (void);
void async_queue_free (AsyncQueue *queue);
void async_queue_push (AsyncQueue *queue, gpointer data);
gpointer async_queue_pop (AsyncQueue *queue);
+gpointer async_queue_pop_forced (AsyncQueue *queue);
void async_queue_disable (AsyncQueue *queue);
void async_queue_enable (AsyncQueue *queue);