summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/design.txt24
-rw-r--r--src/Makefile.am2
-rw-r--r--src/client/buffer.c499
-rw-r--r--src/client/buffer.h132
-rw-r--r--src/client/pinos.h1
-rw-r--r--src/client/private.h20
-rw-r--r--src/client/stream.c142
-rw-r--r--src/client/stream.h18
-rw-r--r--src/gst/gstfddepay.c90
-rw-r--r--src/gst/gstfdpay.c59
-rw-r--r--src/gst/gstpinossink.c33
-rw-r--r--src/gst/gstpinossrc.c101
-rw-r--r--src/gst/gstpinossrc.h2
-rw-r--r--src/gst/wire-protocol.h45
14 files changed, 938 insertions, 230 deletions
diff --git a/doc/design.txt b/doc/design.txt
index 39adb70d..4db76349 100644
--- a/doc/design.txt
+++ b/doc/design.txt
@@ -55,29 +55,31 @@ Wire
Fixed header
-<flags> : 4 bytes
-<seq> : 4 bytes
-<pts> : 8 bytes
-<dts-offset> : 8 bytes
+<flags> : 4 bytes : buffer flags
+<seq> : 4 bytes : sequence number
+<pts> : 8 bytes : presentation time
+<dts-offset> : 8 bytes : dts-offset
+<length> : 8 bytes : total message length
Followed by 1 or more type-length-data sections
<type> : 1 byte
<length> : variable length, 7 bits, hight bit is continuation marker
-<data> : <length> bytes
+<data> : <length> bytes, see below for contents based on <type>
Types:
- 0: format change
+ 0: fd-payload section
+
+ <offset> : 8 bytes : offset
+ <size> : 8 bytes : size
+ <fd-index> : 4 bytes : index of fd
+
+ 1: format change
<format-id> : 1 byte : format id
<format> : 0-terminated : contains serialized format
- 1: fd-payload section
-
- <offset> : 8 bytes : offset
- <size> : 8 bytes : size
- <fd-index> : 1 byte : index of fd
2: property changes
diff --git a/src/Makefile.am b/src/Makefile.am
index 2e01b0ac..081d720b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -157,6 +157,7 @@ pinosgstsource = gst/gstfdpay.h gst/gstfdpay.c \
pinosinclude_HEADERS = \
client/pinos.h \
+ client/buffer.h \
client/context.h \
client/enumtypes.h \
client/introspect.h \
@@ -170,6 +171,7 @@ lib_LTLIBRARIES = \
# Public interface
libpinos_@PINOS_MAJORMINOR@_la_SOURCES = \
+ client/buffer.h client/buffer.c \
client/context.h client/context.c \
client/enumtypes.h client/enumtypes.c \
client/introspect.h client/introspect.c \
diff --git a/src/client/buffer.c b/src/client/buffer.c
new file mode 100644
index 00000000..0786d926
--- /dev/null
+++ b/src/client/buffer.c
@@ -0,0 +1,499 @@
+/* Pinos
+ * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <sys/socket.h>
+#include <string.h>
+
+#include <gio/gio.h>
+#include <gio/gunixfdmessage.h>
+
+#include "client/properties.h"
+#include "client/context.h"
+#include "client/buffer.h"
+#include "client/private.h"
+
+G_STATIC_ASSERT (sizeof (PinosStackBuffer) <= sizeof (PinosBuffer));
+
+void
+pinos_buffer_init_take_data (PinosBuffer *buffer,
+ gpointer data,
+ gsize size,
+ GSocketControlMessage *message)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+
+ sb->magic = PSB_MAGIC;
+ sb->data = data;
+ sb->size = size;
+ sb->allocated_size = size;
+ sb->message = message;
+}
+
+void
+pinos_buffer_clear (PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+
+ g_return_val_if_fail (is_valid_buffer (buffer), -1);
+
+ g_free (sb->data);
+ sb->size = 0;
+ sb->allocated_size = 0;
+ g_clear_object (&sb->message);
+}
+
+const PinosBufferHeader *
+pinos_buffer_get_header (PinosBuffer *buffer, guint32 *version)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+ PinosStackHeader *hdr;
+
+ g_return_val_if_fail (is_valid_buffer (buffer), NULL);
+
+ hdr = sb->data;
+
+ if (version)
+ *version = hdr->version;
+
+ return (const PinosBufferHeader *) &hdr->header;
+}
+
+/**
+ * pinos_buffer_get_fd:
+ * @buffer: a #PinosBuffer
+ * @index: an index
+ * @error: a #GError or %NULL
+ *
+ * Get the file descriptor at @index in @buffer.
+ *
+ * Returns: a file descriptor ar @index in @buffer. The file descriptor is
+ * duplicated using dup() and set as close-on-exec before being returned.
+ * You must call close() on it when you are done. -1 is returned on error and
+ * @error is set.
+ */
+int
+pinos_buffer_get_fd (PinosBuffer *buffer, gint index, GError **error)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+ GUnixFDList *fds;
+
+ g_return_val_if_fail (is_valid_buffer (buffer), -1);
+ g_return_val_if_fail (sb->message != NULL, -1);
+
+ if (g_socket_control_message_get_msg_type (sb->message) != SCM_RIGHTS)
+ goto not_found;
+
+ fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (sb->message));
+ if (fds == NULL)
+ goto not_found;
+
+ if (g_unix_fd_list_get_length (fds) <= index)
+ goto not_found;
+
+ return g_unix_fd_list_get (fds, index, error);
+
+ /* ERRORS */
+not_found:
+ {
+ if (error)
+ *error = g_error_new (G_IO_ERROR,
+ G_IO_ERROR_NOT_FOUND,
+ "Buffer does not have any fd at index %d", index);
+ return -1;
+ }
+}
+
+/**
+ * pinos_buffer_get_socket_control_message:
+ * @buffer: a #PinosBuffer
+ *
+ * Get the #GSocketControlMessage of @buffer
+ *
+ * Returns: the #GSocketControlMessage it remains valid as long as @buffer
+ * is valid
+ */
+GSocketControlMessage *
+pinos_buffer_get_socket_control_message (PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+
+ g_return_val_if_fail (is_valid_buffer (buffer), NULL);
+
+ return sb->message;
+}
+
+/**
+ * pinos_buffer_get_size:
+ * @buffer: a #PinosBuffer
+ *
+ * Get the total size needed to store @buffer with pinos_buffer_store().
+ *
+ * Returns: the serialized size of @buffer.
+ */
+gsize
+pinos_buffer_get_size (PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+
+ g_return_val_if_fail (is_valid_buffer (buffer), 0);
+
+ return sizeof (PinosStackHeader) + sb->size;
+}
+
+/**
+ * pinos_buffer_store:
+ * @buffer: a #PinosBuffer
+ * @data: destination
+ *
+ * Store the contents of @buffer in @data. @data must be large enough, see
+ * pinos_buffer_get_size().
+ */
+void
+pinos_buffer_store (PinosBuffer *buffer,
+ gpointer data)
+{
+ PinosStackBuffer *sb = PSB (buffer);
+
+ g_return_val_if_fail (is_valid_buffer (buffer), 0);
+
+ memcpy (data, sb->data, sizeof (PinosStackHeader) + sb->size);
+}
+
+/**
+ * PinosPacketIter:
+ *
+ * #PinosPacketIter is an opaque data structure and can only be accessed
+ * using the following functions.
+ */
+struct stack_iter {
+ gsize magic;
+ guint32 version;
+ PinosStackBuffer *buffer;
+ gsize offset;
+
+ PinosPacketType type;
+ gsize size;
+ gpointer data;
+
+ guint item;
+};
+
+G_STATIC_ASSERT (sizeof (struct stack_iter) <= sizeof (PinosPacketIter));
+
+#define PPSI(i) ((struct stack_iter *) (i))
+#define PPSI_MAGIC ((gsize) 6739527471u)
+#define is_valid_iter(i) (i != NULL && \
+ PPSI(i)->magic == PPSI_MAGIC)
+
+/**
+ * pinos_packet_iter_init:
+ * @iter: a #PinosPacketIter
+ * @buffer: a #PinosBuffer
+ *
+ * Initialize @iter to iterate the packets in @buffer.
+ */
+void
+pinos_packet_iter_init_full (PinosPacketIter *iter,
+ PinosBuffer *buffer,
+ guint32 version)
+{
+ struct stack_iter *si = PPSI (iter);
+
+ g_return_if_fail (iter != NULL);
+ g_return_if_fail (is_valid_buffer (buffer));
+
+ si->magic = PPSI_MAGIC;
+ si->version = version;
+ si->buffer = PSB (buffer);
+ si->offset = 0;
+ si->type = PINOS_PACKET_TYPE_INVALID;
+ si->size = sizeof (PinosStackHeader);
+ si->data = NULL;
+ si->item = 0;
+}
+
+static gboolean
+read_length (guint8 * data, guint size, gsize * length, gsize * skip)
+{
+ gsize len, offset;
+ guint8 b;
+
+ /* start reading the length, we need this to skip to the data later */
+ len = offset = 0;
+ do {
+ if (offset >= size)
+ return FALSE;
+ b = data[offset++];
+ len = (len << 7) | (b & 0x7f);
+ } while (b & 0x80);
+
+ /* check remaining buffer size */
+ if (size - offset < len)
+ return FALSE;
+
+ *length = len;
+ *skip = offset;
+
+ return TRUE;
+}
+
+
+/**
+ * pinos_packet_iter_next:
+ * @iter: a #PinosPacketIter
+ *
+ * Move to the next packet in @iter.
+ *
+ * Returns: %TRUE if more packets are available.
+ */
+gboolean
+pinos_packet_iter_next (PinosPacketIter *iter)
+{
+ struct stack_iter *si = PPSI (iter);
+ gsize len, size, skip;
+ guint8 *data;
+
+ g_return_val_if_fail (is_valid_iter (iter), FALSE);
+
+ /* move to next packet */
+ si->offset += si->size;
+
+ /* now read packet */
+ data = si->buffer->data;
+ size = si->buffer->size;
+ if (si->offset >= size)
+ return FALSE;
+
+ data += si->offset;
+ size -= si->offset;
+
+ if (size < 1)
+ return FALSE;
+
+ si->type = *data;
+
+ data++;
+ size--;
+
+ if (!read_length (data, size, &len, &skip))
+ return FALSE;
+
+ si->size = len;
+ si->data = data + skip;
+ si->offset += 1 + skip;
+
+ return TRUE;
+}
+
+PinosPacketType
+pinos_packet_iter_get_type (PinosPacketIter *iter)
+{
+ struct stack_iter *si = PPSI (iter);
+
+ g_return_val_if_fail (is_valid_iter (iter), PINOS_PACKET_TYPE_INVALID);
+
+ return si->type;
+}
+
+gpointer
+pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size)
+{
+ struct stack_iter *si = PPSI (iter);
+
+ g_return_val_if_fail (is_valid_iter (iter), NULL);
+
+ if (size)
+ *size = si->size;
+
+ return si->data;
+}
+
+
+/**
+ * PinosPacketBuilder:
+ * @buffer: owner #PinosBuffer
+ */
+struct stack_builder {
+ gsize magic;
+
+ PinosStackHeader *sh;
+ PinosStackBuffer buf;
+
+ PinosPacketType type;
+ gsize offset;
+
+ guint n_sockets;
+};
+
+G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosPacketBuilder));
+
+#define PPSB(b) ((struct stack_builder *) (b))
+#define PPSB_MAGIC ((gsize) 8103647428u)
+#define is_valid_builder(b) (b != NULL && \
+ PPSB(b)->magic == PPSB_MAGIC)
+
+
+void
+pinos_packet_builder_init_full (PinosPacketBuilder *builder,
+ guint32 version,
+ const PinosBufferHeader *header)
+{
+ struct stack_builder *sb = PPSB (builder);
+ PinosStackHeader *sh;
+
+ g_return_if_fail (builder != NULL);
+
+ sb->magic = PPSB_MAGIC;
+ sb->buf.allocated_size = sizeof (PinosStackHeader) + 128;
+ sb->buf.data = g_malloc (sb->buf.allocated_size);
+ sb->buf.size = sizeof (PinosStackHeader);
+ sb->buf.message = NULL;
+
+ sh = sb->sh = sb->buf.data;
+ sh->version = version;
+ sh->header = *header;
+ sh->length = 0;
+
+ sb->type = 0;
+ sb->offset = 0;
+}
+
+void
+pinos_packet_builder_end (PinosPacketBuilder *builder,
+ PinosBuffer *buffer)
+{
+ struct stack_builder *sb = PPSB (builder);
+ PinosStackBuffer *sbuf = PSB (buffer);
+
+ g_return_if_fail (is_valid_builder (builder));
+ g_return_if_fail (buffer != NULL);
+
+ sb->sh->length = sb->buf.size - sizeof (PinosStackHeader);
+
+ sbuf->magic = PSB_MAGIC;
+ sbuf->data = sb->buf.data;
+ sbuf->size = sb->buf.size;
+ sbuf->allocated_size = sb->buf.allocated_size;
+ sbuf->message = sb->buf.message;
+
+ sb->buf.data = NULL;
+ sb->buf.size = 0;
+ sb->buf.allocated_size = 0;
+ sb->buf.message = NULL;
+ sb->buf.magic = 0;
+}
+
+static gpointer
+builder_ensure_size (struct stack_builder *sb, gsize size)
+{
+ if (sb->buf.size + size > sb->buf.allocated_size) {
+ sb->buf.allocated_size = sb->buf.size + MAX (size, 1024);
+ sb->buf.data = g_realloc (sb->buf.data, sb->buf.allocated_size);
+ }
+ return (guint8 *) sb->buf.data + sb->buf.size;
+}
+
+static gpointer
+builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size)
+{
+ guint8 *p;
+ guint plen;
+
+ plen = 1;
+ while (size >> (7 * plen))
+ plen++;
+
+ /* 1 for type, plen for size and size for payload */
+ p = builder_ensure_size (sb, 1 + plen + size);
+
+ sb->type = type;
+ sb->offset = sb->buf.size;
+ sb->buf.size += 1 + plen + size;
+
+ *p++ = type;
+ /* write length */
+ while (plen) {
+ plen--;
+ *p++ = ((plen > 0) ? 0x80 : 0) | ((size >> (7 * plen)) & 0x7f);
+ }
+ return p;
+}
+
+/* fd-payload packets */
+/**
+ * pinos_packet_iter_get_fd_payload:
+ * @iter: a #PinosPacketIter
+ * @payload: a #PinosPacketFDPayload
+ *
+ * Get the #PinosPacketFDPayload. @iter must be positioned on a packet of
+ * type #PINOS_PACKET_TYPE_FD_PAYLOAD
+ */
+void
+pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter,
+ PinosPacketFDPayload *payload)
+{
+ struct stack_iter *si = PPSI (iter);
+
+ g_return_if_fail (is_valid_iter (iter));
+ g_return_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD);
+
+ *payload = *((PinosPacketFDPayload *) si->data);
+}
+
+/**
+ * pinos_packet_builder_add_fd_payload:
+ * @builder: a #PinosPacketBuilder
+ * @offset: an offset
+ * @size: a size
+ * @fd: a file descriptor
+ * @error: a #GError or %NULL
+ *
+ * Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder.
+ *
+ * Returns: %TRUE on success. When %FALSE is returned, @error contains more
+ * information.
+ */
+gboolean
+pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder,
+ gint64 offset, gint64 size, int fd,
+ GError **error)
+{
+ struct stack_builder *sb = PPSB (builder);
+ PinosPacketFDPayload *p;
+
+ g_return_if_fail (is_valid_builder (builder));
+ g_return_if_fail (size > 0);
+ g_return_if_fail (offset >= 0);
+ g_return_if_fail (fd != -1);
+
+ if (sb->buf.message == NULL) {
+ sb->buf.message = g_unix_fd_message_new ();
+ sb->n_sockets = 0;
+ }
+ if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error))
+ return FALSE;
+
+ p = builder_add_packet (sb, PINOS_PACKET_TYPE_FD_PAYLOAD, sizeof (PinosPacketFDPayload));
+ p->offset = offset;
+ p->size = size;
+ p->fd_index = sb->n_sockets++;
+
+ return TRUE;
+}
+
diff --git a/src/client/buffer.h b/src/client/buffer.h
new file mode 100644
index 00000000..4750eee0
--- /dev/null
+++ b/src/client/buffer.h
@@ -0,0 +1,132 @@
+/* Pinos
+ * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __PINOS_BUFFER_H__
+#define __PINOS_BUFFER_H__
+
+#include <glib-object.h>
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+typedef struct _PinosBuffer PinosBuffer;
+typedef struct _PinosBufferInfo PinosBufferInfo;
+typedef struct _PinosPacketIter PinosPacketIter;
+typedef struct _PinosPacketBuilder PinosPacketBuilder;
+
+#define PINOS_BUFFER_VERSION 0
+
+typedef struct {
+ guint32 flags;
+ guint32 seq;
+ gint64 pts;
+ gint64 dts_offset;
+} PinosBufferHeader;
+
+struct _PinosBuffer {
+ /*< private >*/
+ gsize x[16];
+};
+
+void pinos_buffer_init_take_data (PinosBuffer *buffer,
+ gpointer data,
+ gsize size,
+ GSocketControlMessage *message);
+
+void pinos_buffer_clear (PinosBuffer *buffer);
+
+const PinosBufferHeader *
+ pinos_buffer_get_header (PinosBuffer *buffer,
+ guint32 *version);
+int pinos_buffer_get_fd (PinosBuffer *buffer,
+ gint index,
+ GError **error);
+GSocketControlMessage *
+ pinos_buffer_get_socket_control_message (PinosBuffer *buffer);
+
+gsize pinos_buffer_get_size (PinosBuffer *buffer);
+void pinos_buffer_store (PinosBuffer *buffer,
+ gpointer data);
+
+
+typedef enum {
+ PINOS_PACKET_TYPE_INVALID = 0,
+
+ PINOS_PACKET_TYPE_FD_PAYLOAD = 1,
+ PINOS_PACKET_TYPE_FORMAT_CHANGE = 2,
+ PINOS_PACKET_TYPE_PROPERTY_CHANGE = 3,
+} PinosPacketType;
+
+
+/* iterating packets */
+struct _PinosPacketIter {
+ /*< private >*/
+ gsize x[16];
+};
+
+void pinos_packet_iter_init_full (PinosPacketIter *iter,
+ PinosBuffer *buffer,
+ guint32 version);
+#define pinos_packet_iter_init(i,b) pinos_packet_iter_init_full(i,b, PINOS_BUFFER_VERSION);
+
+gboolean pinos_packet_iter_next (PinosPacketIter *iter);
+
+PinosPacketType pinos_packet_iter_get_type (PinosPacketIter *iter);
+gpointer pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size);
+
+/**
+ * PinosPacketBuilder:
+ */
+struct _PinosPacketBuilder {
+ /*< private >*/
+ gsize x[16];
+};
+
+void pinos_packet_builder_init_full (PinosPacketBuilder *builder,
+ guint32 version,
+ const PinosBufferHeader *header);
+#define pinos_packet_builder_init(b,h) pinos_packet_builder_init_full(b, PINOS_BUFFER_VERSION,h);
+
+void pinos_packet_builder_end (PinosPacketBuilder *builder,
+ PinosBuffer *buffer);
+
+/* fd-payload packets */
+/**
+ * PinosPacketFDPayload:
+ * @fd_index: the index of the fd with the data
+ * @offset: the offset of the data
+ * @size: the size of the data
+ *
+ * A Packet that contains data in an fd at @fd_index at @offset and with
+ * @size.
+ */
+typedef struct {
+ gint32 fd_index;
+ gint64 offset;
+ gint64 size;
+} PinosPacketFDPayload;
+
+void pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter,
+ PinosPacketFDPayload *payload);
+gboolean pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder,
+ gint64 offset, gint64 size, int fd,
+ GError **error);
+
+#endif /* __PINOS_BUFFER_H__ */
+
diff --git a/src/client/pinos.h b/src/client/pinos.h
index fb186daf..1a87e634 100644
--- a/src/client/pinos.h
+++ b/src/client/pinos.h
@@ -20,6 +20,7 @@
#ifndef __PINOS_H__
#define __PINOS_H__
+#include <client/buffer.h>
#include <client/context.h>
#include <client/introspect.h>
#include <client/mainloop.h>
diff --git a/src/client/private.h b/src/client/private.h
index 5b87409d..cf397cce 100644
--- a/src/client/private.h
+++ b/src/client/private.h
@@ -54,3 +54,23 @@ GDBusProxy * pinos_subscribe_get_proxy_finish (PinosSubscribe *subsc
GAsyncResult *res,
GError **error);
+
+typedef struct {
+ guint32 version;
+ PinosBufferHeader header;
+ guint32 length;
+} PinosStackHeader;
+
+typedef struct {
+ gsize allocated_size;
+ gsize size;
+ gpointer data;
+ GSocketControlMessage *message;
+ gsize magic;
+} PinosStackBuffer;
+
+#define PSB(b) ((PinosStackBuffer *) (b))
+#define PSB_MAGIC ((gsize) 5493683301u)
+#define is_valid_buffer(b) (b != NULL && \
+ PSB(b)->magic == PSB_MAGIC)
+
diff --git a/src/client/stream.c b/src/client/stream.c
index 020e336a..f66d705b 100644
--- a/src/client/stream.c
+++ b/src/client/stream.c
@@ -52,7 +52,7 @@ struct _PinosStreamPrivate
GSocket *socket;
GSource *socket_source;
- PinosBufferInfo info;
+ PinosStackBuffer buffer;
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@@ -238,6 +238,10 @@ pinos_stream_finalize (GObject * object)
g_clear_object (&priv->context);
g_free (priv->name);
+ if (priv->buffer.message)
+ g_object_unref (priv->buffer.message);
+ g_free (priv->buffer.data);
+
G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object);
}
@@ -731,8 +735,6 @@ pinos_stream_disconnect (PinosStream *stream)
return TRUE;
}
-#include <gst/wire-protocol.h>
-
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
@@ -746,14 +748,24 @@ on_socket_condition (GSocket *socket,
{
gssize len;
GInputVector ivec;
- FDMessage msg;
+ PinosStackHeader *hdr;
GSocketControlMessage **messages = NULL;
gint num_messages = 0;
gint flags = 0;
+ gsize need;
GError *error = NULL;
- ivec.buffer = &msg;
- ivec.size = sizeof (msg);
+ need = sizeof (PinosStackHeader);
+
+ if (priv->buffer.allocated_size < need) {
+ priv->buffer.allocated_size = need;
+ priv->buffer.data = g_realloc (priv->buffer.data, need);
+ }
+
+ hdr = priv->buffer.data;
+
+ ivec.buffer = hdr;
+ ivec.size = sizeof (PinosStackHeader);
len = g_socket_receive_message (socket,
NULL,
@@ -765,21 +777,31 @@ on_socket_condition (GSocket *socket,
NULL,
&error);
- g_assert (len == sizeof (msg));
-
- if (priv->info.message)
- g_object_unref (priv->info.message);
+ g_assert (len == sizeof (PinosStackHeader));
if (num_messages == 0)
break;
- priv->info.flags = msg.flags;
- priv->info.seq = msg.seq;
- priv->info.pts = msg.pts;
- priv->info.dts_offset = msg.dts_offset;
- priv->info.offset = msg.offset;
- priv->info.size = msg.size;
- priv->info.message = messages[0];
+ need += hdr->length;
+
+ if (priv->buffer.allocated_size < need) {
+ priv->buffer.allocated_size = need;
+ hdr = priv->buffer.data = g_realloc (priv->buffer.data, need);
+ }
+ priv->buffer.size = need;
+
+ if (priv->buffer.message)
+ g_object_unref (priv->buffer.message);
+ priv->buffer.message = messages[0];
+
+ len = g_socket_receive (socket,
+ (gchar *)priv->buffer.data + sizeof (PinosStackHeader),
+ hdr->length,
+ NULL,
+ &error);
+ g_assert (len == hdr->length);
+
+ priv->buffer.magic = PSB_MAGIC;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
break;
@@ -1058,85 +1080,115 @@ pinos_stream_stop (PinosStream *stream)
/**
* pinos_stream_capture_buffer:
* @stream: a #PinosStream
- * @info: a #PinosBufferInfo
+ * @buffer: a #PinosBuffer
*
* Capture the next buffer from @stream. This function should be called every
* time after the new-buffer callback has been emitted.
*
- * Returns: %TRUE when @info contains valid information
+ * Returns: %TRUE when @buffer contains valid information
*/
gboolean
-pinos_stream_capture_buffer (PinosStream *stream,
- PinosBufferInfo *info)
+pinos_stream_capture_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
- g_return_val_if_fail (info != NULL, FALSE);
+ g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
- *info = priv->info;
+ memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer));
+
+ priv->buffer.data = NULL;
+ priv->buffer.allocated_size = 0;
+ priv->buffer.size = 0;
+ priv->buffer.message = NULL;
return TRUE;
}
/**
+ * pinos_stream_release_buffer:
+ * @stream: a #PinosStream
+ * @buffer: a #PinosBuffer
+ *
+ * Release @buffer back to @stream. This function should be called whenever the
+ * buffer is processed.
+ */
+void
+pinos_stream_release_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
+{
+ PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
+ PinosStreamPrivate *priv;
+
+ g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
+ g_return_val_if_fail (is_valid_buffer (buffer), FALSE);
+
+ priv = stream->priv;
+
+ if (priv->buffer.data == NULL) {
+ priv->buffer.data = sb->data;
+ priv->buffer.allocated_size = sb->allocated_size;
+ priv->buffer.size = 0;
+ }
+ else
+ g_free (sb->data);
+
+ if (sb->message)
+ g_object_unref (sb->message);
+}
+
+/**
* pinos_stream_provide_buffer:
* @stream: a #PinosStream
- * @info: a #PinosBufferInfo
+ * @buffer: a #PinosBuffer
*
* Provide the next buffer from @stream. This function should be called every
* time a new frame becomes available.
*
- * Returns: %TRUE when @info was handled
+ * Returns: %TRUE when @buffer was handled
*/
gboolean
-pinos_stream_provide_buffer (PinosStream *stream,
- PinosBufferInfo *info)
+pinos_stream_provide_buffer (PinosStream *stream,
+ PinosBuffer *buffer)
{
PinosStreamPrivate *priv;
gssize len;
- GOutputVector ovec;
- FDMessage msg;
+ PinosStackBuffer *sb = (PinosStackBuffer *) buffer;
+ GOutputVector ovec[1];
gint flags = 0;
GError *error = NULL;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
- g_return_val_if_fail (info != NULL, FALSE);
+ g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE);
- msg.flags = info->flags;
- msg.seq = info->seq;
- msg.pts = info->pts;
- msg.dts_offset = info->dts_offset;
- msg.offset = info->offset;
- msg.size = info->size;
-
- ovec.buffer = &msg;
- ovec.size = sizeof (msg);
+ ovec[0].buffer = sb->data;
+ ovec[0].size = sb->size;
len = g_socket_send_message (priv->socket,
NULL,
- &ovec,
+ ovec,
1,
- &info->message,
+ &sb->message,
1,
flags,
NULL,
&error);
- if (info->message) {
- g_object_unref (info->message);
- info->message = NULL;
+ if (sb->message) {
+ g_object_unref (sb->message);
+ sb->message = NULL;
}
if (len == -1)
goto send_error;
- g_assert (len == sizeof (msg));
+ g_assert (len == sb->size);
return TRUE;
diff --git a/src/client/stream.h b/src/client/stream.h
index 1799ae06..37a31808 100644
--- a/src/client/stream.h
+++ b/src/client/stream.h
@@ -22,6 +22,7 @@
#include <glib-object.h>
+#include <client/buffer.h>
#include <client/context.h>
G_BEGIN_DECLS
@@ -53,16 +54,6 @@ typedef enum {
PINOS_STREAM_FLAGS_NONE = 0,
} PinosStreamFlags;
-typedef struct {
- guint32 flags;
- guint32 seq;
- gint64 pts;
- gint64 dts_offset;
- guint64 offset;
- guint64 size;
- GSocketControlMessage *message;
-} PinosBufferInfo;
-
typedef enum {
PINOS_STREAM_MODE_SOCKET = 0,
PINOS_STREAM_MODE_BUFFER = 1,
@@ -114,9 +105,12 @@ gboolean pinos_stream_start (PinosStream *stream,
gboolean pinos_stream_stop (PinosStream *stream);
gboolean pinos_stream_capture_buffer (PinosStream *stream,
- PinosBufferInfo *info);
+ PinosBuffer *buffer);
+void pinos_stream_release_buffer (PinosStream *stream,
+ PinosBuffer *buffer);
+
gboolean pinos_stream_provide_buffer (PinosStream *stream,
- PinosBufferInfo *info);
+ PinosBuffer *buffer);
G_END_DECLS
diff --git a/src/gst/gstfddepay.c b/src/gst/gstfddepay.c
index 7e17f913..bc4f7057 100644
--- a/src/gst/gstfddepay.c
+++ b/src/gst/gstfddepay.c
@@ -35,7 +35,6 @@
#endif
#include "gstfddepay.h"
-#include "wire-protocol.h"
#include <gst/net/gstnetcontrolmessagemeta.h>
#include <gst/gst.h>
@@ -48,6 +47,7 @@
#include <sys/socket.h>
#include <unistd.h>
+#include <client/pinos.h>
GST_DEBUG_CATEGORY_STATIC (gst_fddepay_debug_category);
#define GST_CAT_DEFAULT gst_fddepay_debug_category
@@ -168,66 +168,66 @@ static GstFlowReturn
gst_fddepay_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
{
GstFddepay *fddepay = GST_FDDEPAY (trans);
- FDMessage msg;
- GstMemory *fdmem = NULL;
+ PinosBuffer pbuf;
+ PinosPacketIter it;
GstNetControlMessageMeta * meta;
- int *fds = NULL;
- int fds_len = 0;
- int fd = -1;
+ GSocketControlMessage *msg = NULL;
+ const PinosBufferHeader *hdr;
+ gpointer data;
+ gsize size;
+ GError *err = NULL;
GST_DEBUG_OBJECT (fddepay, "transform_ip");
- if (gst_buffer_get_size (buf) != sizeof (msg)) {
- /* We're guaranteed that we can't `read` from a socket across an attached
- * file descriptor so we should get the data in chunks no bigger than
- * sizeof(FDMessage) */
- GST_WARNING_OBJECT (fddepay, "fddepay: Received wrong amount of data "
- "between fds.");
- goto error;
- }
-
- gst_buffer_extract (buf, 0, &msg, sizeof (msg));
+ gst_buffer_extract_dup (buf, 0, gst_buffer_get_size (buf), &data, &size);
meta = ((GstNetControlMessageMeta*) gst_buffer_get_meta (
buf, GST_NET_CONTROL_MESSAGE_META_API_TYPE));
- if (meta &&
- g_socket_control_message_get_msg_type (meta->message) == SCM_RIGHTS) {
- fds = g_unix_fd_message_steal_fds ((GUnixFDMessage*) meta->message,
- &fds_len);
+ if (meta) {
+ msg = meta->message;
+ gst_buffer_remove_meta (buf, (GstMeta *) meta);
meta = NULL;
}
- if (fds == NULL || fds_len != 1) {
- GST_WARNING_OBJECT (fddepay, "fddepay: Expect to receive 1 FD for each "
- "buffer, received %i", fds_len);
- goto error;
- }
- fd = fds[0];
- fcntl (fd, F_SETFD, FD_CLOEXEC);
- g_free (fds);
- fds = NULL;
-
- /* FIXME: Use stat to find out the size of the file, to make sure that the
- * size we've been told is the true size for safety and security. */
- fdmem = gst_fd_allocator_alloc (fddepay->fd_allocator, fd,
- msg.offset + msg.size, GST_FD_MEMORY_FLAG_NONE);
- gst_memory_resize (fdmem, msg.offset, msg.size);
+ pinos_buffer_init_take_data (&pbuf, data, size, msg);
gst_buffer_remove_all_memory (buf);
- gst_buffer_remove_meta (buf,
- gst_buffer_get_meta (buf, GST_NET_CONTROL_MESSAGE_META_API_TYPE));
- gst_buffer_append_memory (buf, fdmem);
- fdmem = NULL;
- GST_BUFFER_OFFSET (buf) = msg.seq;
+ pinos_packet_iter_init (&it, &pbuf);
+ while (pinos_packet_iter_next (&it)) {
+ switch (pinos_packet_iter_get_type (&it)) {
+ case PINOS_PACKET_TYPE_FD_PAYLOAD:
+ {
+ GstMemory *fdmem = NULL;
+ PinosPacketFDPayload p;
+ int fd;
+
+ pinos_packet_iter_parse_fd_payload (&it, &p);
+ fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err);
+ if (fd == -1)
+ goto error;
+
+ fdmem = gst_fd_allocator_alloc (fddepay->fd_allocator, fd,
+ p.offset + p.size, GST_FD_MEMORY_FLAG_NONE);
+ gst_memory_resize (fdmem, p.offset, p.size);
+ gst_buffer_append_memory (buf, fdmem);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ hdr = pinos_buffer_get_header (&pbuf, NULL);
+ GST_BUFFER_OFFSET (buf) = hdr->seq;
return GST_FLOW_OK;
error:
- if (fds)
- g_free (fds);
- if (fd >= 0)
- close (fd);
- return GST_FLOW_ERROR;
+ {
+ GST_ELEMENT_ERROR (fddepay, RESOURCE, SETTINGS, (NULL),
+ ("can't get fd: %s", err->message));
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
}
diff --git a/src/gst/gstfdpay.c b/src/gst/gstfdpay.c
index 14fd3b1d..f7ba0ff0 100644
--- a/src/gst/gstfdpay.c
+++ b/src/gst/gstfdpay.c
@@ -53,7 +53,7 @@
#include <string.h>
#include <unistd.h>
-#include "wire-protocol.h"
+#include <client/pinos.h>
GST_DEBUG_CATEGORY_STATIC (gst_fdpay_debug_category);
#define GST_CAT_DEFAULT gst_fdpay_debug_category
@@ -191,7 +191,7 @@ gst_fdpay_transform_size (GstBaseTransform *trans, GstPadDirection direction,
return FALSE;
} else {
/* transform size going downstream */
- *othersize = sizeof (FDMessage);
+ *othersize = sizeof (PinosBuffer) + 30;
}
return TRUE;
@@ -241,40 +241,53 @@ gst_fdpay_transform (GstBaseTransform * trans, GstBuffer * inbuf,
GstMemory *fdmem = NULL;
GstMapInfo info;
GError *err = NULL;
- GSocketControlMessage *fdmsg = NULL;
- FDMessage msg = { 0, };
+ PinosBuffer pbuf;
+ PinosPacketBuilder builder;
+ PinosBufferHeader hdr;
+ gsize size;
GST_DEBUG_OBJECT (fdpay, "transform_ip");
fdmem = gst_fdpay_get_fd_memory (fdpay, inbuf);
- msg.flags = 0;
- msg.seq = GST_BUFFER_OFFSET (inbuf);
- msg.pts = GST_BUFFER_TIMESTAMP (inbuf) + GST_ELEMENT_CAST (trans)->base_time;
- msg.dts_offset = 0;
- msg.size = fdmem->size;
- msg.offset = fdmem->offset;
-
- fdmsg = g_unix_fd_message_new ();
- if (!g_unix_fd_message_append_fd ((GUnixFDMessage*) fdmsg,
- gst_fd_memory_get_fd (fdmem), &err)) {
+ hdr.flags = 0;
+ hdr.seq = GST_BUFFER_OFFSET (inbuf);
+ hdr.pts = GST_BUFFER_TIMESTAMP (inbuf) + GST_ELEMENT_CAST (trans)->base_time;
+ hdr.dts_offset = 0;
+
+ pinos_packet_builder_init (&builder, &hdr);
+ if (!pinos_packet_builder_add_fd_payload (&builder,
+ fdmem->offset,
+ fdmem->size,
+ gst_fd_memory_get_fd (fdmem),
+ &err))
goto append_fd_failed;
- }
+
+ pinos_packet_builder_end (&builder, &pbuf);
gst_memory_unref(fdmem);
fdmem = NULL;
- gst_buffer_add_net_control_message_meta (outbuf, fdmsg);
- g_clear_object (&fdmsg);
+ gst_buffer_add_net_control_message_meta (outbuf,
+ pinos_buffer_get_socket_control_message (&pbuf));
+
+ size = pinos_buffer_get_size (&pbuf);
gst_buffer_map (outbuf, &info, GST_MAP_WRITE);
- memcpy (info.data, &msg, sizeof (msg));
+ pinos_buffer_store (&pbuf, info.data);
gst_buffer_unmap (outbuf, &info);
+ gst_buffer_resize (outbuf, 0, size);
+
+ pinos_buffer_clear (&pbuf);
return GST_FLOW_OK;
+
+ /* ERRORS */
append_fd_failed:
- GST_WARNING_OBJECT (trans, "Appending fd failed: %s", err->message);
- gst_memory_unref(fdmem);
- g_clear_error (&err);
- g_clear_object (&fdmsg);
- return GST_FLOW_ERROR;
+ {
+ GST_WARNING_OBJECT (trans, "Appending fd failed: %s", err->message);
+ gst_memory_unref(fdmem);
+ g_clear_error (&err);
+
+ return GST_FLOW_ERROR;
+ }
}
diff --git a/src/gst/gstpinossink.c b/src/gst/gstpinossink.c
index 713a690a..cd80ce4f 100644
--- a/src/gst/gstpinossink.c
+++ b/src/gst/gstpinossink.c
@@ -329,10 +329,12 @@ static GstFlowReturn
gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstPinosSink *pinossink;
- PinosBufferInfo info;
- GSocketControlMessage *mesg;
+ PinosBuffer pbuf;
+ PinosPacketBuilder builder;
GstMemory *mem = NULL;
GstClockTime pts, dts, base;
+ PinosBufferHeader hdr;
+ gsize size;
pinossink = GST_PINOS_SINK (bsink);
@@ -348,14 +350,15 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
else if (!GST_CLOCK_TIME_IS_VALID (dts))
dts = pts;
- info.flags = 0;
- info.seq = GST_BUFFER_OFFSET (buffer);
- info.pts = GST_CLOCK_TIME_IS_VALID (pts) ? pts + base : base;
- info.dts_offset = GST_CLOCK_TIME_IS_VALID (dts) && GST_CLOCK_TIME_IS_VALID (pts) ? pts - dts : 0;
- info.offset = 0;
- info.size = gst_buffer_get_size (buffer);
+ hdr.flags = 0;
+ hdr.seq = GST_BUFFER_OFFSET (buffer);
+ hdr.pts = GST_CLOCK_TIME_IS_VALID (pts) ? pts + base : base;
+ hdr.dts_offset = GST_CLOCK_TIME_IS_VALID (dts) && GST_CLOCK_TIME_IS_VALID (pts) ? pts - dts : 0;
+
+ size = gst_buffer_get_size (buffer);
+
+ pinos_packet_builder_init (&builder, &hdr);
- mesg = g_unix_fd_message_new ();
if (gst_buffer_n_memory (buffer) == 1
&& gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) {
mem = gst_buffer_get_memory (buffer, 0);
@@ -365,20 +368,22 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GST_INFO_OBJECT (bsink, "Buffer cannot be payloaded without copying");
- mem = gst_allocator_alloc (pinossink->allocator, info.size, &params);
+ mem = gst_allocator_alloc (pinossink->allocator, size, &params);
if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE))
goto map_error;
- gst_buffer_extract (buffer, 0, minfo.data, info.size);
+ gst_buffer_extract (buffer, 0, minfo.data, size);
gst_memory_unmap (mem, &minfo);
}
- g_unix_fd_message_append_fd ((GUnixFDMessage*)mesg, gst_fd_memory_get_fd (mem), NULL);
+
+ pinos_packet_builder_add_fd_payload (&builder, 0, size, gst_fd_memory_get_fd (mem), NULL);
gst_memory_unref (mem);
- info.message = mesg;
+
+ pinos_packet_builder_end (&builder, &pbuf);
pinos_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
goto streaming_error;
- pinos_stream_provide_buffer (pinossink->stream, &info);
+ pinos_stream_provide_buffer (pinossink->stream, &pbuf);
pinos_main_loop_unlock (pinossink->loop);
return GST_FLOW_OK;
diff --git a/src/gst/gstpinossrc.c b/src/gst/gstpinossrc.c
index ecd5bcaf..bccc5a13 100644
--- a/src/gst/gstpinossrc.c
+++ b/src/gst/gstpinossrc.c
@@ -251,9 +251,72 @@ on_new_buffer (GObject *gobject,
gpointer user_data)
{
GstPinosSrc *pinossrc = user_data;
+ PinosBuffer pbuf;
+ const PinosBufferHeader *hdr;
+ PinosPacketIter it;
+ GstBuffer *buf;
+ GError *error = NULL;
GST_LOG_OBJECT (pinossrc, "got new buffer");
+ pinos_stream_capture_buffer (pinossrc->stream, &pbuf);
+
+ buf = gst_buffer_new ();
+
+ hdr = pinos_buffer_get_header (&pbuf, NULL);
+
+ if (GST_CLOCK_TIME_IS_VALID (hdr->pts)) {
+ if (hdr->pts > GST_ELEMENT_CAST (pinossrc)->base_time)
+ GST_BUFFER_PTS (buf) = hdr->pts - GST_ELEMENT_CAST (pinossrc)->base_time;
+
+ if (GST_BUFFER_PTS (buf) + hdr->dts_offset > 0)
+ GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr->dts_offset;
+ }
+ GST_BUFFER_OFFSET (buf) = hdr->seq;
+
+ pinos_packet_iter_init (&it, &pbuf);
+ while (pinos_packet_iter_next (&it)) {
+ switch (pinos_packet_iter_get_type (&it)) {
+ case PINOS_PACKET_TYPE_FD_PAYLOAD:
+ {
+ GstMemory *fdmem = NULL;
+ PinosPacketFDPayload p;
+ int fd;
+
+ GST_DEBUG ("got fd payload");
+ pinos_packet_iter_parse_fd_payload (&it, &p);
+ fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &error);
+ if (fd == -1)
+ goto no_fds;
+
+ fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fd,
+ p.offset + p.size, GST_FD_MEMORY_FLAG_NONE);
+ gst_memory_resize (fdmem, p.offset, p.size);
+ gst_buffer_append_memory (buf, fdmem);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ if (pinossrc->current)
+ gst_buffer_unref (pinossrc->current);
+ pinossrc->current = buf;
+
+ pinos_stream_release_buffer (pinossrc->stream, &pbuf);
+
pinos_main_loop_signal (pinossrc->loop, FALSE);
+
+ return;
+
+ /* ERRORS */
+no_fds:
+ {
+ GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
+ ("buffer error: %s", error->message), (NULL));
+ pinos_main_loop_signal (pinossrc->loop, FALSE);
+ return;
+ }
+
}
static void
@@ -442,17 +505,12 @@ static GstFlowReturn
gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
{
GstPinosSrc *pinossrc;
- PinosBufferInfo info;
- gint *fds, n_fds;
- GstMemory *fdmem = NULL;
- GstBuffer *buf;
pinossrc = GST_PINOS_SRC (psrc);
if (!pinossrc->negotiated)
goto not_negotiated;
-again:
pinos_main_loop_lock (pinossrc->loop);
while (TRUE) {
PinosStreamState state;
@@ -466,39 +524,12 @@ again:
if (state != PINOS_STREAM_STATE_STREAMING)
goto streaming_stopped;
- GST_LOG_OBJECT (pinossrc, "start capture buffer");
- pinos_stream_capture_buffer (pinossrc->stream, &info);
- if (info.message != NULL) {
- GST_LOG_OBJECT (pinossrc, "no message, retry");
- break;
- }
+ break;
}
pinos_main_loop_unlock (pinossrc->loop);
- if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS)
- goto again;
-
- fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds);
- if (n_fds < 1 || fds[0] < 0)
- goto again;
-
- fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fds[0],
- info.offset + info.size, GST_FD_MEMORY_FLAG_NONE);
- gst_memory_resize (fdmem, info.offset, info.size);
-
- buf = gst_buffer_new ();
- gst_buffer_append_memory (buf, fdmem);
-
- if (GST_CLOCK_TIME_IS_VALID (info.pts)) {
- if (info.pts > GST_ELEMENT_CAST (pinossrc)->base_time)
- GST_BUFFER_PTS (buf) = info.pts - GST_ELEMENT_CAST (pinossrc)->base_time;
-
- if (GST_BUFFER_PTS (buf) + info.dts_offset > 0)
- GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + info.dts_offset;
- }
- GST_BUFFER_OFFSET (buf) = info.seq;
-
- *buffer = buf;
+ *buffer = pinossrc->current;
+ pinossrc->current = NULL;
return GST_FLOW_OK;
diff --git a/src/gst/gstpinossrc.h b/src/gst/gstpinossrc.h
index cc58d598..05aed1a9 100644
--- a/src/gst/gstpinossrc.h
+++ b/src/gst/gstpinossrc.h
@@ -62,6 +62,8 @@ struct _GstPinosSrc {
PinosContext *ctx;
PinosStream *stream;
GstAllocator *fd_allocator;
+
+ GstBuffer *current;
};
struct _GstPinosSrcClass {
diff --git a/src/gst/wire-protocol.h b/src/gst/wire-protocol.h
deleted file mode 100644
index cad4ae42..00000000
--- a/src/gst/wire-protocol.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/* GStreamer
- * Copyright (C) 2014 William Manley <will@williammanley.net>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef _GST_FDPAY_WIRE_PROTOCOL_H_
-#define _GST_FDPAY_WIRE_PROTOCOL_H_
-
-#include <stdint.h>
-
-/**
- * @flags: possible flags
- * @seq: a sequence number
- * @pts: a PTS or presentation timestamp
- * @dts_offset: an offset to @pts to get the DTS
- * @offset: offset in fd
- * @size: size of data in fd
- *
- * Almost the simplest possible FD passing protocol. Each message should have
- * a file-descriptor attached which should be mmapable. The data in the FD can
- * be found at offset and is size bytes long. */
-typedef struct {
- guint32 flags;
- guint32 seq;
- gint64 pts;
- gint64 dts_offset;
- guint64 offset;
- guint64 size;
-} FDMessage;
-
-#endif