diff options
Diffstat (limited to 'src/client/stream.c')
-rw-r--r-- | src/client/stream.c | 142 |
1 files changed, 97 insertions, 45 deletions
diff --git a/src/client/stream.c b/src/client/stream.c index 020e336a..f66d705b 100644 --- a/src/client/stream.c +++ b/src/client/stream.c @@ -52,7 +52,7 @@ struct _PinosStreamPrivate GSocket *socket; GSource *socket_source; - PinosBufferInfo info; + PinosStackBuffer buffer; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -238,6 +238,10 @@ pinos_stream_finalize (GObject * object) g_clear_object (&priv->context); g_free (priv->name); + if (priv->buffer.message) + g_object_unref (priv->buffer.message); + g_free (priv->buffer.data); + G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object); } @@ -731,8 +735,6 @@ pinos_stream_disconnect (PinosStream *stream) return TRUE; } -#include <gst/wire-protocol.h> - static gboolean on_socket_condition (GSocket *socket, GIOCondition condition, @@ -746,14 +748,24 @@ on_socket_condition (GSocket *socket, { gssize len; GInputVector ivec; - FDMessage msg; + PinosStackHeader *hdr; GSocketControlMessage **messages = NULL; gint num_messages = 0; gint flags = 0; + gsize need; GError *error = NULL; - ivec.buffer = &msg; - ivec.size = sizeof (msg); + need = sizeof (PinosStackHeader); + + if (priv->buffer.allocated_size < need) { + priv->buffer.allocated_size = need; + priv->buffer.data = g_realloc (priv->buffer.data, need); + } + + hdr = priv->buffer.data; + + ivec.buffer = hdr; + ivec.size = sizeof (PinosStackHeader); len = g_socket_receive_message (socket, NULL, @@ -765,21 +777,31 @@ on_socket_condition (GSocket *socket, NULL, &error); - g_assert (len == sizeof (msg)); - - if (priv->info.message) - g_object_unref (priv->info.message); + g_assert (len == sizeof (PinosStackHeader)); if (num_messages == 0) break; - priv->info.flags = msg.flags; - priv->info.seq = msg.seq; - priv->info.pts = msg.pts; - priv->info.dts_offset = msg.dts_offset; - priv->info.offset = msg.offset; - priv->info.size = msg.size; - priv->info.message = messages[0]; + need += hdr->length; + + if (priv->buffer.allocated_size < need) { + priv->buffer.allocated_size = need; + hdr = priv->buffer.data = g_realloc (priv->buffer.data, need); + } + priv->buffer.size = need; + + if (priv->buffer.message) + g_object_unref (priv->buffer.message); + priv->buffer.message = messages[0]; + + len = g_socket_receive (socket, + (gchar *)priv->buffer.data + sizeof (PinosStackHeader), + hdr->length, + NULL, + &error); + g_assert (len == hdr->length); + + priv->buffer.magic = PSB_MAGIC; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); break; @@ -1058,85 +1080,115 @@ pinos_stream_stop (PinosStream *stream) /** * pinos_stream_capture_buffer: * @stream: a #PinosStream - * @info: a #PinosBufferInfo + * @buffer: a #PinosBuffer * * Capture the next buffer from @stream. This function should be called every * time after the new-buffer callback has been emitted. * - * Returns: %TRUE when @info contains valid information + * Returns: %TRUE when @buffer contains valid information */ gboolean -pinos_stream_capture_buffer (PinosStream *stream, - PinosBufferInfo *info) +pinos_stream_capture_buffer (PinosStream *stream, + PinosBuffer *buffer) { PinosStreamPrivate *priv; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (info != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - *info = priv->info; + memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer)); + + priv->buffer.data = NULL; + priv->buffer.allocated_size = 0; + priv->buffer.size = 0; + priv->buffer.message = NULL; return TRUE; } /** + * pinos_stream_release_buffer: + * @stream: a #PinosStream + * @buffer: a #PinosBuffer + * + * Release @buffer back to @stream. This function should be called whenever the + * buffer is processed. + */ +void +pinos_stream_release_buffer (PinosStream *stream, + PinosBuffer *buffer) +{ + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + PinosStreamPrivate *priv; + + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + g_return_val_if_fail (is_valid_buffer (buffer), FALSE); + + priv = stream->priv; + + if (priv->buffer.data == NULL) { + priv->buffer.data = sb->data; + priv->buffer.allocated_size = sb->allocated_size; + priv->buffer.size = 0; + } + else + g_free (sb->data); + + if (sb->message) + g_object_unref (sb->message); +} + +/** * pinos_stream_provide_buffer: * @stream: a #PinosStream - * @info: a #PinosBufferInfo + * @buffer: a #PinosBuffer * * Provide the next buffer from @stream. This function should be called every * time a new frame becomes available. * - * Returns: %TRUE when @info was handled + * Returns: %TRUE when @buffer was handled */ gboolean -pinos_stream_provide_buffer (PinosStream *stream, - PinosBufferInfo *info) +pinos_stream_provide_buffer (PinosStream *stream, + PinosBuffer *buffer) { PinosStreamPrivate *priv; gssize len; - GOutputVector ovec; - FDMessage msg; + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + GOutputVector ovec[1]; gint flags = 0; GError *error = NULL; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (info != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - msg.flags = info->flags; - msg.seq = info->seq; - msg.pts = info->pts; - msg.dts_offset = info->dts_offset; - msg.offset = info->offset; - msg.size = info->size; - - ovec.buffer = &msg; - ovec.size = sizeof (msg); + ovec[0].buffer = sb->data; + ovec[0].size = sb->size; len = g_socket_send_message (priv->socket, NULL, - &ovec, + ovec, 1, - &info->message, + &sb->message, 1, flags, NULL, &error); - if (info->message) { - g_object_unref (info->message); - info->message = NULL; + if (sb->message) { + g_object_unref (sb->message); + sb->message = NULL; } if (len == -1) goto send_error; - g_assert (len == sizeof (msg)); + g_assert (len == sb->size); return TRUE; |