summaryrefslogtreecommitdiff
path: root/src/client/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/stream.c')
-rw-r--r--src/client/stream.c142
1 files changed, 97 insertions, 45 deletions
diff --git a/src/client/stream.c b/src/client/stream.c
index 020e336a..f66d705b 100644
--- a/src/client/stream.c
+++ b/src/client/stream.c
@@ -52,7 +52,7 @@ struct _PinosStreamPrivate
GSocket *socket;
GSource *socket_source;
- PinosBufferInfo info;
+ PinosStackBuffer buffer;
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@@ -238,6 +238,10 @@ pinos_stream_finalize (GObject * object)
g_clear_object (&priv->context);
g_free (priv->name);
+ if (priv->buffer.message)
+ g_object_unref (priv->buffer.message);
+ g_free (priv->buffer.data);
+
G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object);
}
@@ -731,8 +735,6 @@ pinos_stream_disconnect (PinosStream *stream)
return TRUE;
}
-#include <gst/wire-protocol.h>
-
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
@@ -746,14 +748,24 @@ on_socket_condition (GSocket *socket,
{
gssize len;
GInputVector ivec;
- FDMessage msg;
+ PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
gint num_messages = 0;
gint flags = 0;
+ gsize need;
GError *error = NULL;
- ivec.buffer = &msg;
- ivec.size = sizeof (msg);
+ need = sizeof (PinosStackHeader);
+
+ if (priv->buffer.allocated_size < need) {
+ priv->buffer.allocated_size = need;
+ priv->buffer.data = g_realloc (priv->buffer.data, need);
+ }
+
+ hdr = priv->buffer.data;
+
+ ivec.buffer = hdr;
+ ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (socket,
NULL,
@@ -765,21 +777,31 @@ on_socket_condition (GSocket *socket,
NULL,
&error);
- g_assert (len == sizeof (msg));
-
- if (priv->info.message)
- g_object_unref (priv->info.message);
+ g_assert (len == sizeof (PinosStackHeader));
if (num_messages == 0)
break;
- priv->info.flags = msg.flags;
- priv->info.seq = msg.seq;
- priv->info.pts = msg.pts;
- priv->info.dts_offset = msg.dts_offset;
- priv->info.offset = msg.offset;
- priv->info.size = msg.size;
- priv->info.message = messages[0];
+ need += hdr->length;
+
+ if (priv->buffer.allocated_size < need) {
+ priv->buffer.allocated_size = need;
+ hdr = priv->buffer.data = g_realloc (priv->buffer.data, need);
+ }
+ priv->buffer.size = need;
+
+ if (priv->buffer.message)
+ g_object_unref (priv->buffer.message);
+ priv->buffer.message = messages[0];
+
+ len = g_socket_receive (socket,
+ (gchar *)priv->buffer.data + sizeof (PinosStackHeader),
+ hdr->length,
+ NULL,
+ &error);
+ g_assert (len == hdr->length);
+
+ priv->buffer.magic = PSB_MAGIC;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
break;
@@ -1058,85 +1080,115 @@ pinos_stream_stop (PinosStream *stream)
/**
* pinos_stream_capture_buffer:
* @stream: a #PinosStream
- * @info: a #PinosBufferInfo
+ * @buffer: a #PinosBuffer
*
* Capture the next buffer from @stream. This function should be called every
* time after the new-buffer callback has been emitted.
*
- * Returns: %TRUE when @info contains valid information
+ * Returns: %TRUE when @buffer contains valid information
*/
gboolean
-pinos_stream_capture_buffer (PinosStream *stream,
- PinosBufferInfo *info)
+pinos_stream_capture_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
- g_return_val_if_fail (info != NULL, FALSE);
+ g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
- *info = priv->info;
+ memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer));
+
+ priv->buffer.data = NULL;
+ priv->buffer.allocated_size = 0;
+ priv->buffer.size = 0;
+ priv->buffer.message = NULL;
return TRUE;
}
/**
+ * pinos_stream_release_buffer:
+ * @stream: a #PinosStream
+ * @buffer: a #PinosBuffer
+ *
+ * Release @buffer back to @stream. This function should be called whenever the
+ * buffer is processed.
+ */
+void
+pinos_stream_release_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
+ PinosStreamPrivate *priv;
+
+ g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
+ g_return_val_if_fail (is_valid_buffer (buffer), FALSE);
+
+ priv = stream->priv;
+
+ if (priv->buffer.data == NULL) {
+ priv->buffer.data = sb->data;
+ priv->buffer.allocated_size = sb->allocated_size;
+ priv->buffer.size = 0;
+ }
+ else
+ g_free (sb->data);
+
+ if (sb->message)
+ g_object_unref (sb->message);
+}
+
+/**
* pinos_stream_provide_buffer:
* @stream: a #PinosStream
- * @info: a #PinosBufferInfo
+ * @buffer: a #PinosBuffer
*
* Provide the next buffer from @stream. This function should be called every
* time a new frame becomes available.
*
- * Returns: %TRUE when @info was handled
+ * Returns: %TRUE when @buffer was handled
*/
gboolean
-pinos_stream_provide_buffer (PinosStream *stream,
- PinosBufferInfo *info)
+pinos_stream_provide_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
gssize len;
- GOutputVector ovec;
- FDMessage msg;
+ PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
+ GOutputVector ovec[1];
gint flags = 0;
GError *error = NULL;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
- g_return_val_if_fail (info != NULL, FALSE);
+ g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
- msg.flags = info->flags;
- msg.seq = info->seq;
- msg.pts = info->pts;
- msg.dts_offset = info->dts_offset;
- msg.offset = info->offset;
- msg.size = info->size;
-
- ovec.buffer = &msg;
- ovec.size = sizeof (msg);
+ ovec[0].buffer = sb->data;
+ ovec[0].size = sb->size;
len = g_socket_send_message (priv->socket,
NULL,
- &ovec,
+ ovec,
1,
- &info->message,
+ &sb->message,
1,
flags,
NULL,
&error);
- if (info->message) {
- g_object_unref (info->message);
- info->message = NULL;
+ if (sb->message) {
+ g_object_unref (sb->message);
+ sb->message = NULL;
}
if (len == -1)
goto send_error;
- g_assert (len == sizeof (msg));
+ g_assert (len == sb->size);
return TRUE;