diff options
-rw-r--r-- | doc/design.txt | 24 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/client/buffer.c | 499 | ||||
-rw-r--r-- | src/client/buffer.h | 132 | ||||
-rw-r--r-- | src/client/pinos.h | 1 | ||||
-rw-r--r-- | src/client/private.h | 20 | ||||
-rw-r--r-- | src/client/stream.c | 142 | ||||
-rw-r--r-- | src/client/stream.h | 18 | ||||
-rw-r--r-- | src/gst/gstfddepay.c | 90 | ||||
-rw-r--r-- | src/gst/gstfdpay.c | 59 | ||||
-rw-r--r-- | src/gst/gstpinossink.c | 33 | ||||
-rw-r--r-- | src/gst/gstpinossrc.c | 101 | ||||
-rw-r--r-- | src/gst/gstpinossrc.h | 2 | ||||
-rw-r--r-- | src/gst/wire-protocol.h | 45 |
14 files changed, 938 insertions, 230 deletions
diff --git a/doc/design.txt b/doc/design.txt index 39adb70d..4db76349 100644 --- a/doc/design.txt +++ b/doc/design.txt @@ -55,29 +55,31 @@ Wire Fixed header -<flags> : 4 bytes -<seq> : 4 bytes -<pts> : 8 bytes -<dts-offset> : 8 bytes +<flags> : 4 bytes : buffer flags +<seq> : 4 bytes : sequence number +<pts> : 8 bytes : presentation time +<dts-offset> : 8 bytes : dts-offset +<length> : 8 bytes : total message length Followed by 1 or more type-length-data sections <type> : 1 byte <length> : variable length, 7 bits, hight bit is continuation marker -<data> : <length> bytes +<data> : <length> bytes, see below for contents based on <type> Types: - 0: format change + 0: fd-payload section + + <offset> : 8 bytes : offset + <size> : 8 bytes : size + <fd-index> : 4 bytes : index of fd + + 1: format change <format-id> : 1 byte : format id <format> : 0-terminated : contains serialized format - 1: fd-payload section - - <offset> : 8 bytes : offset - <size> : 8 bytes : size - <fd-index> : 1 byte : index of fd 2: property changes diff --git a/src/Makefile.am b/src/Makefile.am index 2e01b0ac..081d720b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -157,6 +157,7 @@ pinosgstsource = gst/gstfdpay.h gst/gstfdpay.c \ pinosinclude_HEADERS = \ client/pinos.h \ + client/buffer.h \ client/context.h \ client/enumtypes.h \ client/introspect.h \ @@ -170,6 +171,7 @@ lib_LTLIBRARIES = \ # Public interface libpinos_@PINOS_MAJORMINOR@_la_SOURCES = \ + client/buffer.h client/buffer.c \ client/context.h client/context.c \ client/enumtypes.h client/enumtypes.c \ client/introspect.h client/introspect.c \ diff --git a/src/client/buffer.c b/src/client/buffer.c new file mode 100644 index 00000000..0786d926 --- /dev/null +++ b/src/client/buffer.c @@ -0,0 +1,499 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include <sys/socket.h> +#include <string.h> + +#include <gio/gio.h> +#include <gio/gunixfdmessage.h> + +#include "client/properties.h" +#include "client/context.h" +#include "client/buffer.h" +#include "client/private.h" + +G_STATIC_ASSERT (sizeof (PinosStackBuffer) <= sizeof (PinosBuffer)); + +void +pinos_buffer_init_take_data (PinosBuffer *buffer, + gpointer data, + gsize size, + GSocketControlMessage *message) +{ + PinosStackBuffer *sb = PSB (buffer); + + sb->magic = PSB_MAGIC; + sb->data = data; + sb->size = size; + sb->allocated_size = size; + sb->message = message; +} + +void +pinos_buffer_clear (PinosBuffer *buffer) +{ + PinosStackBuffer *sb = PSB (buffer); + + g_return_val_if_fail (is_valid_buffer (buffer), -1); + + g_free (sb->data); + sb->size = 0; + sb->allocated_size = 0; + g_clear_object (&sb->message); +} + +const PinosBufferHeader * +pinos_buffer_get_header (PinosBuffer *buffer, guint32 *version) +{ + PinosStackBuffer *sb = PSB (buffer); + PinosStackHeader *hdr; + + g_return_val_if_fail (is_valid_buffer (buffer), NULL); + + hdr = sb->data; + + if (version) + *version = hdr->version; + + return (const PinosBufferHeader *) &hdr->header; +} + +/** + * pinos_buffer_get_fd: + * @buffer: a #PinosBuffer + * @index: an index + * @error: a #GError or %NULL + * + * Get the file descriptor at @index in @buffer. + * + * Returns: a file descriptor ar @index in @buffer. The file descriptor is + * duplicated using dup() and set as close-on-exec before being returned. + * You must call close() on it when you are done. -1 is returned on error and + * @error is set. + */ +int +pinos_buffer_get_fd (PinosBuffer *buffer, gint index, GError **error) +{ + PinosStackBuffer *sb = PSB (buffer); + GUnixFDList *fds; + + g_return_val_if_fail (is_valid_buffer (buffer), -1); + g_return_val_if_fail (sb->message != NULL, -1); + + if (g_socket_control_message_get_msg_type (sb->message) != SCM_RIGHTS) + goto not_found; + + fds = g_unix_fd_message_get_fd_list (G_UNIX_FD_MESSAGE (sb->message)); + if (fds == NULL) + goto not_found; + + if (g_unix_fd_list_get_length (fds) <= index) + goto not_found; + + return g_unix_fd_list_get (fds, index, error); + + /* ERRORS */ +not_found: + { + if (error) + *error = g_error_new (G_IO_ERROR, + G_IO_ERROR_NOT_FOUND, + "Buffer does not have any fd at index %d", index); + return -1; + } +} + +/** + * pinos_buffer_get_socket_control_message: + * @buffer: a #PinosBuffer + * + * Get the #GSocketControlMessage of @buffer + * + * Returns: the #GSocketControlMessage it remains valid as long as @buffer + * is valid + */ +GSocketControlMessage * +pinos_buffer_get_socket_control_message (PinosBuffer *buffer) +{ + PinosStackBuffer *sb = PSB (buffer); + + g_return_val_if_fail (is_valid_buffer (buffer), NULL); + + return sb->message; +} + +/** + * pinos_buffer_get_size: + * @buffer: a #PinosBuffer + * + * Get the total size needed to store @buffer with pinos_buffer_store(). + * + * Returns: the serialized size of @buffer. + */ +gsize +pinos_buffer_get_size (PinosBuffer *buffer) +{ + PinosStackBuffer *sb = PSB (buffer); + + g_return_val_if_fail (is_valid_buffer (buffer), 0); + + return sizeof (PinosStackHeader) + sb->size; +} + +/** + * pinos_buffer_store: + * @buffer: a #PinosBuffer + * @data: destination + * + * Store the contents of @buffer in @data. @data must be large enough, see + * pinos_buffer_get_size(). + */ +void +pinos_buffer_store (PinosBuffer *buffer, + gpointer data) +{ + PinosStackBuffer *sb = PSB (buffer); + + g_return_val_if_fail (is_valid_buffer (buffer), 0); + + memcpy (data, sb->data, sizeof (PinosStackHeader) + sb->size); +} + +/** + * PinosPacketIter: + * + * #PinosPacketIter is an opaque data structure and can only be accessed + * using the following functions. + */ +struct stack_iter { + gsize magic; + guint32 version; + PinosStackBuffer *buffer; + gsize offset; + + PinosPacketType type; + gsize size; + gpointer data; + + guint item; +}; + +G_STATIC_ASSERT (sizeof (struct stack_iter) <= sizeof (PinosPacketIter)); + +#define PPSI(i) ((struct stack_iter *) (i)) +#define PPSI_MAGIC ((gsize) 6739527471u) +#define is_valid_iter(i) (i != NULL && \ + PPSI(i)->magic == PPSI_MAGIC) + +/** + * pinos_packet_iter_init: + * @iter: a #PinosPacketIter + * @buffer: a #PinosBuffer + * + * Initialize @iter to iterate the packets in @buffer. + */ +void +pinos_packet_iter_init_full (PinosPacketIter *iter, + PinosBuffer *buffer, + guint32 version) +{ + struct stack_iter *si = PPSI (iter); + + g_return_if_fail (iter != NULL); + g_return_if_fail (is_valid_buffer (buffer)); + + si->magic = PPSI_MAGIC; + si->version = version; + si->buffer = PSB (buffer); + si->offset = 0; + si->type = PINOS_PACKET_TYPE_INVALID; + si->size = sizeof (PinosStackHeader); + si->data = NULL; + si->item = 0; +} + +static gboolean +read_length (guint8 * data, guint size, gsize * length, gsize * skip) +{ + gsize len, offset; + guint8 b; + + /* start reading the length, we need this to skip to the data later */ + len = offset = 0; + do { + if (offset >= size) + return FALSE; + b = data[offset++]; + len = (len << 7) | (b & 0x7f); + } while (b & 0x80); + + /* check remaining buffer size */ + if (size - offset < len) + return FALSE; + + *length = len; + *skip = offset; + + return TRUE; +} + + +/** + * pinos_packet_iter_next: + * @iter: a #PinosPacketIter + * + * Move to the next packet in @iter. + * + * Returns: %TRUE if more packets are available. + */ +gboolean +pinos_packet_iter_next (PinosPacketIter *iter) +{ + struct stack_iter *si = PPSI (iter); + gsize len, size, skip; + guint8 *data; + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + + /* move to next packet */ + si->offset += si->size; + + /* now read packet */ + data = si->buffer->data; + size = si->buffer->size; + if (si->offset >= size) + return FALSE; + + data += si->offset; + size -= si->offset; + + if (size < 1) + return FALSE; + + si->type = *data; + + data++; + size--; + + if (!read_length (data, size, &len, &skip)) + return FALSE; + + si->size = len; + si->data = data + skip; + si->offset += 1 + skip; + + return TRUE; +} + +PinosPacketType +pinos_packet_iter_get_type (PinosPacketIter *iter) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), PINOS_PACKET_TYPE_INVALID); + + return si->type; +} + +gpointer +pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), NULL); + + if (size) + *size = si->size; + + return si->data; +} + + +/** + * PinosPacketBuilder: + * @buffer: owner #PinosBuffer + */ +struct stack_builder { + gsize magic; + + PinosStackHeader *sh; + PinosStackBuffer buf; + + PinosPacketType type; + gsize offset; + + guint n_sockets; +}; + +G_STATIC_ASSERT (sizeof (struct stack_builder) <= sizeof (PinosPacketBuilder)); + +#define PPSB(b) ((struct stack_builder *) (b)) +#define PPSB_MAGIC ((gsize) 8103647428u) +#define is_valid_builder(b) (b != NULL && \ + PPSB(b)->magic == PPSB_MAGIC) + + +void +pinos_packet_builder_init_full (PinosPacketBuilder *builder, + guint32 version, + const PinosBufferHeader *header) +{ + struct stack_builder *sb = PPSB (builder); + PinosStackHeader *sh; + + g_return_if_fail (builder != NULL); + + sb->magic = PPSB_MAGIC; + sb->buf.allocated_size = sizeof (PinosStackHeader) + 128; + sb->buf.data = g_malloc (sb->buf.allocated_size); + sb->buf.size = sizeof (PinosStackHeader); + sb->buf.message = NULL; + + sh = sb->sh = sb->buf.data; + sh->version = version; + sh->header = *header; + sh->length = 0; + + sb->type = 0; + sb->offset = 0; +} + +void +pinos_packet_builder_end (PinosPacketBuilder *builder, + PinosBuffer *buffer) +{ + struct stack_builder *sb = PPSB (builder); + PinosStackBuffer *sbuf = PSB (buffer); + + g_return_if_fail (is_valid_builder (builder)); + g_return_if_fail (buffer != NULL); + + sb->sh->length = sb->buf.size - sizeof (PinosStackHeader); + + sbuf->magic = PSB_MAGIC; + sbuf->data = sb->buf.data; + sbuf->size = sb->buf.size; + sbuf->allocated_size = sb->buf.allocated_size; + sbuf->message = sb->buf.message; + + sb->buf.data = NULL; + sb->buf.size = 0; + sb->buf.allocated_size = 0; + sb->buf.message = NULL; + sb->buf.magic = 0; +} + +static gpointer +builder_ensure_size (struct stack_builder *sb, gsize size) +{ + if (sb->buf.size + size > sb->buf.allocated_size) { + sb->buf.allocated_size = sb->buf.size + MAX (size, 1024); + sb->buf.data = g_realloc (sb->buf.data, sb->buf.allocated_size); + } + return (guint8 *) sb->buf.data + sb->buf.size; +} + +static gpointer +builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size) +{ + guint8 *p; + guint plen; + + plen = 1; + while (size >> (7 * plen)) + plen++; + + /* 1 for type, plen for size and size for payload */ + p = builder_ensure_size (sb, 1 + plen + size); + + sb->type = type; + sb->offset = sb->buf.size; + sb->buf.size += 1 + plen + size; + + *p++ = type; + /* write length */ + while (plen) { + plen--; + *p++ = ((plen > 0) ? 0x80 : 0) | ((size >> (7 * plen)) & 0x7f); + } + return p; +} + +/* fd-payload packets */ +/** + * pinos_packet_iter_get_fd_payload: + * @iter: a #PinosPacketIter + * @payload: a #PinosPacketFDPayload + * + * Get the #PinosPacketFDPayload. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_FD_PAYLOAD + */ +void +pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter, + PinosPacketFDPayload *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_if_fail (is_valid_iter (iter)); + g_return_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD); + + *payload = *((PinosPacketFDPayload *) si->data); +} + +/** + * pinos_packet_builder_add_fd_payload: + * @builder: a #PinosPacketBuilder + * @offset: an offset + * @size: a size + * @fd: a file descriptor + * @error: a #GError or %NULL + * + * Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder. + * + * Returns: %TRUE on success. When %FALSE is returned, @error contains more + * information. + */ +gboolean +pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder, + gint64 offset, gint64 size, int fd, + GError **error) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketFDPayload *p; + + g_return_if_fail (is_valid_builder (builder)); + g_return_if_fail (size > 0); + g_return_if_fail (offset >= 0); + g_return_if_fail (fd != -1); + + if (sb->buf.message == NULL) { + sb->buf.message = g_unix_fd_message_new (); + sb->n_sockets = 0; + } + if (!g_unix_fd_message_append_fd ((GUnixFDMessage*)sb->buf.message, fd, error)) + return FALSE; + + p = builder_add_packet (sb, PINOS_PACKET_TYPE_FD_PAYLOAD, sizeof (PinosPacketFDPayload)); + p->offset = offset; + p->size = size; + p->fd_index = sb->n_sockets++; + + return TRUE; +} + diff --git a/src/client/buffer.h b/src/client/buffer.h new file mode 100644 index 00000000..4750eee0 --- /dev/null +++ b/src/client/buffer.h @@ -0,0 +1,132 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_BUFFER_H__ +#define __PINOS_BUFFER_H__ + +#include <glib-object.h> +#include <gio/gio.h> + +G_BEGIN_DECLS + +typedef struct _PinosBuffer PinosBuffer; +typedef struct _PinosBufferInfo PinosBufferInfo; +typedef struct _PinosPacketIter PinosPacketIter; +typedef struct _PinosPacketBuilder PinosPacketBuilder; + +#define PINOS_BUFFER_VERSION 0 + +typedef struct { + guint32 flags; + guint32 seq; + gint64 pts; + gint64 dts_offset; +} PinosBufferHeader; + +struct _PinosBuffer { + /*< private >*/ + gsize x[16]; +}; + +void pinos_buffer_init_take_data (PinosBuffer *buffer, + gpointer data, + gsize size, + GSocketControlMessage *message); + +void pinos_buffer_clear (PinosBuffer *buffer); + +const PinosBufferHeader * + pinos_buffer_get_header (PinosBuffer *buffer, + guint32 *version); +int pinos_buffer_get_fd (PinosBuffer *buffer, + gint index, + GError **error); +GSocketControlMessage * + pinos_buffer_get_socket_control_message (PinosBuffer *buffer); + +gsize pinos_buffer_get_size (PinosBuffer *buffer); +void pinos_buffer_store (PinosBuffer *buffer, + gpointer data); + + +typedef enum { + PINOS_PACKET_TYPE_INVALID = 0, + + PINOS_PACKET_TYPE_FD_PAYLOAD = 1, + PINOS_PACKET_TYPE_FORMAT_CHANGE = 2, + PINOS_PACKET_TYPE_PROPERTY_CHANGE = 3, +} PinosPacketType; + + +/* iterating packets */ +struct _PinosPacketIter { + /*< private >*/ + gsize x[16]; +}; + +void pinos_packet_iter_init_full (PinosPacketIter *iter, + PinosBuffer *buffer, + guint32 version); +#define pinos_packet_iter_init(i,b) pinos_packet_iter_init_full(i,b, PINOS_BUFFER_VERSION); + +gboolean pinos_packet_iter_next (PinosPacketIter *iter); + +PinosPacketType pinos_packet_iter_get_type (PinosPacketIter *iter); +gpointer pinos_packet_iter_get_data (PinosPacketIter *iter, gsize *size); + +/** + * PinosPacketBuilder: + */ +struct _PinosPacketBuilder { + /*< private >*/ + gsize x[16]; +}; + +void pinos_packet_builder_init_full (PinosPacketBuilder *builder, + guint32 version, + const PinosBufferHeader *header); +#define pinos_packet_builder_init(b,h) pinos_packet_builder_init_full(b, PINOS_BUFFER_VERSION,h); + +void pinos_packet_builder_end (PinosPacketBuilder *builder, + PinosBuffer *buffer); + +/* fd-payload packets */ +/** + * PinosPacketFDPayload: + * @fd_index: the index of the fd with the data + * @offset: the offset of the data + * @size: the size of the data + * + * A Packet that contains data in an fd at @fd_index at @offset and with + * @size. + */ +typedef struct { + gint32 fd_index; + gint64 offset; + gint64 size; +} PinosPacketFDPayload; + +void pinos_packet_iter_parse_fd_payload (PinosPacketIter *iter, + PinosPacketFDPayload *payload); +gboolean pinos_packet_builder_add_fd_payload (PinosPacketBuilder *builder, + gint64 offset, gint64 size, int fd, + GError **error); + +#endif /* __PINOS_BUFFER_H__ */ + diff --git a/src/client/pinos.h b/src/client/pinos.h index fb186daf..1a87e634 100644 --- a/src/client/pinos.h +++ b/src/client/pinos.h @@ -20,6 +20,7 @@ #ifndef __PINOS_H__ #define __PINOS_H__ +#include <client/buffer.h> #include <client/context.h> #include <client/introspect.h> #include <client/mainloop.h> diff --git a/src/client/private.h b/src/client/private.h index 5b87409d..cf397cce 100644 --- a/src/client/private.h +++ b/src/client/private.h @@ -54,3 +54,23 @@ GDBusProxy * pinos_subscribe_get_proxy_finish (PinosSubscribe *subsc GAsyncResult *res, GError **error); + +typedef struct { + guint32 version; + PinosBufferHeader header; + guint32 length; +} PinosStackHeader; + +typedef struct { + gsize allocated_size; + gsize size; + gpointer data; + GSocketControlMessage *message; + gsize magic; +} PinosStackBuffer; + +#define PSB(b) ((PinosStackBuffer *) (b)) +#define PSB_MAGIC ((gsize) 5493683301u) +#define is_valid_buffer(b) (b != NULL && \ + PSB(b)->magic == PSB_MAGIC) + diff --git a/src/client/stream.c b/src/client/stream.c index 020e336a..f66d705b 100644 --- a/src/client/stream.c +++ b/src/client/stream.c @@ -52,7 +52,7 @@ struct _PinosStreamPrivate GSocket *socket; GSource *socket_source; - PinosBufferInfo info; + PinosStackBuffer buffer; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -238,6 +238,10 @@ pinos_stream_finalize (GObject * object) g_clear_object (&priv->context); g_free (priv->name); + if (priv->buffer.message) + g_object_unref (priv->buffer.message); + g_free (priv->buffer.data); + G_OBJECT_CLASS (pinos_stream_parent_class)->finalize (object); } @@ -731,8 +735,6 @@ pinos_stream_disconnect (PinosStream *stream) return TRUE; } -#include <gst/wire-protocol.h> - static gboolean on_socket_condition (GSocket *socket, GIOCondition condition, @@ -746,14 +748,24 @@ on_socket_condition (GSocket *socket, { gssize len; GInputVector ivec; - FDMessage msg; + PinosStackHeader *hdr; GSocketControlMessage **messages = NULL; gint num_messages = 0; gint flags = 0; + gsize need; GError *error = NULL; - ivec.buffer = &msg; - ivec.size = sizeof (msg); + need = sizeof (PinosStackHeader); + + if (priv->buffer.allocated_size < need) { + priv->buffer.allocated_size = need; + priv->buffer.data = g_realloc (priv->buffer.data, need); + } + + hdr = priv->buffer.data; + + ivec.buffer = hdr; + ivec.size = sizeof (PinosStackHeader); len = g_socket_receive_message (socket, NULL, @@ -765,21 +777,31 @@ on_socket_condition (GSocket *socket, NULL, &error); - g_assert (len == sizeof (msg)); - - if (priv->info.message) - g_object_unref (priv->info.message); + g_assert (len == sizeof (PinosStackHeader)); if (num_messages == 0) break; - priv->info.flags = msg.flags; - priv->info.seq = msg.seq; - priv->info.pts = msg.pts; - priv->info.dts_offset = msg.dts_offset; - priv->info.offset = msg.offset; - priv->info.size = msg.size; - priv->info.message = messages[0]; + need += hdr->length; + + if (priv->buffer.allocated_size < need) { + priv->buffer.allocated_size = need; + hdr = priv->buffer.data = g_realloc (priv->buffer.data, need); + } + priv->buffer.size = need; + + if (priv->buffer.message) + g_object_unref (priv->buffer.message); + priv->buffer.message = messages[0]; + + len = g_socket_receive (socket, + (gchar *)priv->buffer.data + sizeof (PinosStackHeader), + hdr->length, + NULL, + &error); + g_assert (len == hdr->length); + + priv->buffer.magic = PSB_MAGIC; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); break; @@ -1058,85 +1080,115 @@ pinos_stream_stop (PinosStream *stream) /** * pinos_stream_capture_buffer: * @stream: a #PinosStream - * @info: a #PinosBufferInfo + * @buffer: a #PinosBuffer * * Capture the next buffer from @stream. This function should be called every * time after the new-buffer callback has been emitted. * - * Returns: %TRUE when @info contains valid information + * Returns: %TRUE when @buffer contains valid information */ gboolean -pinos_stream_capture_buffer (PinosStream *stream, - PinosBufferInfo *info) +pinos_stream_capture_buffer (PinosStream *stream, + PinosBuffer *buffer) { PinosStreamPrivate *priv; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (info != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - *info = priv->info; + memcpy (buffer, &priv->buffer, sizeof (PinosStackBuffer)); + + priv->buffer.data = NULL; + priv->buffer.allocated_size = 0; + priv->buffer.size = 0; + priv->buffer.message = NULL; return TRUE; } /** + * pinos_stream_release_buffer: + * @stream: a #PinosStream + * @buffer: a #PinosBuffer + * + * Release @buffer back to @stream. This function should be called whenever the + * buffer is processed. + */ +void +pinos_stream_release_buffer (PinosStream *stream, + PinosBuffer *buffer) +{ + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + PinosStreamPrivate *priv; + + g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); + g_return_val_if_fail (is_valid_buffer (buffer), FALSE); + + priv = stream->priv; + + if (priv->buffer.data == NULL) { + priv->buffer.data = sb->data; + priv->buffer.allocated_size = sb->allocated_size; + priv->buffer.size = 0; + } + else + g_free (sb->data); + + if (sb->message) + g_object_unref (sb->message); +} + +/** * pinos_stream_provide_buffer: * @stream: a #PinosStream - * @info: a #PinosBufferInfo + * @buffer: a #PinosBuffer * * Provide the next buffer from @stream. This function should be called every * time a new frame becomes available. * - * Returns: %TRUE when @info was handled + * Returns: %TRUE when @buffer was handled */ gboolean -pinos_stream_provide_buffer (PinosStream *stream, - PinosBufferInfo *info) +pinos_stream_provide_buffer (PinosStream *stream, + PinosBuffer *buffer) { PinosStreamPrivate *priv; gssize len; - GOutputVector ovec; - FDMessage msg; + PinosStackBuffer *sb = (PinosStackBuffer *) buffer; + GOutputVector ovec[1]; gint flags = 0; GError *error = NULL; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); - g_return_val_if_fail (info != NULL, FALSE); + g_return_val_if_fail (buffer != NULL, FALSE); priv = stream->priv; g_return_val_if_fail (priv->state == PINOS_STREAM_STATE_STREAMING, FALSE); - msg.flags = info->flags; - msg.seq = info->seq; - msg.pts = info->pts; - msg.dts_offset = info->dts_offset; - msg.offset = info->offset; - msg.size = info->size; - - ovec.buffer = &msg; - ovec.size = sizeof (msg); + ovec[0].buffer = sb->data; + ovec[0].size = sb->size; len = g_socket_send_message (priv->socket, NULL, - &ovec, + ovec, 1, - &info->message, + &sb->message, 1, flags, NULL, &error); - if (info->message) { - g_object_unref (info->message); - info->message = NULL; + if (sb->message) { + g_object_unref (sb->message); + sb->message = NULL; } if (len == -1) goto send_error; - g_assert (len == sizeof (msg)); + g_assert (len == sb->size); return TRUE; diff --git a/src/client/stream.h b/src/client/stream.h index 1799ae06..37a31808 100644 --- a/src/client/stream.h +++ b/src/client/stream.h @@ -22,6 +22,7 @@ #include <glib-object.h> +#include <client/buffer.h> #include <client/context.h> G_BEGIN_DECLS @@ -53,16 +54,6 @@ typedef enum { PINOS_STREAM_FLAGS_NONE = 0, } PinosStreamFlags; -typedef struct { - guint32 flags; - guint32 seq; - gint64 pts; - gint64 dts_offset; - guint64 offset; - guint64 size; - GSocketControlMessage *message; -} PinosBufferInfo; - typedef enum { PINOS_STREAM_MODE_SOCKET = 0, PINOS_STREAM_MODE_BUFFER = 1, @@ -114,9 +105,12 @@ gboolean pinos_stream_start (PinosStream *stream, gboolean pinos_stream_stop (PinosStream *stream); gboolean pinos_stream_capture_buffer (PinosStream *stream, - PinosBufferInfo *info); + PinosBuffer *buffer); +void pinos_stream_release_buffer (PinosStream *stream, + PinosBuffer *buffer); + gboolean pinos_stream_provide_buffer (PinosStream *stream, - PinosBufferInfo *info); + PinosBuffer *buffer); G_END_DECLS diff --git a/src/gst/gstfddepay.c b/src/gst/gstfddepay.c index 7e17f913..bc4f7057 100644 --- a/src/gst/gstfddepay.c +++ b/src/gst/gstfddepay.c @@ -35,7 +35,6 @@ #endif #include "gstfddepay.h" -#include "wire-protocol.h" #include <gst/net/gstnetcontrolmessagemeta.h> #include <gst/gst.h> @@ -48,6 +47,7 @@ #include <sys/socket.h> #include <unistd.h> +#include <client/pinos.h> GST_DEBUG_CATEGORY_STATIC (gst_fddepay_debug_category); #define GST_CAT_DEFAULT gst_fddepay_debug_category @@ -168,66 +168,66 @@ static GstFlowReturn gst_fddepay_transform_ip (GstBaseTransform * trans, GstBuffer * buf) { GstFddepay *fddepay = GST_FDDEPAY (trans); - FDMessage msg; - GstMemory *fdmem = NULL; + PinosBuffer pbuf; + PinosPacketIter it; GstNetControlMessageMeta * meta; - int *fds = NULL; - int fds_len = 0; - int fd = -1; + GSocketControlMessage *msg = NULL; + const PinosBufferHeader *hdr; + gpointer data; + gsize size; + GError *err = NULL; GST_DEBUG_OBJECT (fddepay, "transform_ip"); - if (gst_buffer_get_size (buf) != sizeof (msg)) { - /* We're guaranteed that we can't `read` from a socket across an attached - * file descriptor so we should get the data in chunks no bigger than - * sizeof(FDMessage) */ - GST_WARNING_OBJECT (fddepay, "fddepay: Received wrong amount of data " - "between fds."); - goto error; - } - - gst_buffer_extract (buf, 0, &msg, sizeof (msg)); + gst_buffer_extract_dup (buf, 0, gst_buffer_get_size (buf), &data, &size); meta = ((GstNetControlMessageMeta*) gst_buffer_get_meta ( buf, GST_NET_CONTROL_MESSAGE_META_API_TYPE)); - if (meta && - g_socket_control_message_get_msg_type (meta->message) == SCM_RIGHTS) { - fds = g_unix_fd_message_steal_fds ((GUnixFDMessage*) meta->message, - &fds_len); + if (meta) { + msg = meta->message; + gst_buffer_remove_meta (buf, (GstMeta *) meta); meta = NULL; } - if (fds == NULL || fds_len != 1) { - GST_WARNING_OBJECT (fddepay, "fddepay: Expect to receive 1 FD for each " - "buffer, received %i", fds_len); - goto error; - } - fd = fds[0]; - fcntl (fd, F_SETFD, FD_CLOEXEC); - g_free (fds); - fds = NULL; - - /* FIXME: Use stat to find out the size of the file, to make sure that the - * size we've been told is the true size for safety and security. */ - fdmem = gst_fd_allocator_alloc (fddepay->fd_allocator, fd, - msg.offset + msg.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, msg.offset, msg.size); + pinos_buffer_init_take_data (&pbuf, data, size, msg); gst_buffer_remove_all_memory (buf); - gst_buffer_remove_meta (buf, - gst_buffer_get_meta (buf, GST_NET_CONTROL_MESSAGE_META_API_TYPE)); - gst_buffer_append_memory (buf, fdmem); - fdmem = NULL; - GST_BUFFER_OFFSET (buf) = msg.seq; + pinos_packet_iter_init (&it, &pbuf); + while (pinos_packet_iter_next (&it)) { + switch (pinos_packet_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_FD_PAYLOAD: + { + GstMemory *fdmem = NULL; + PinosPacketFDPayload p; + int fd; + + pinos_packet_iter_parse_fd_payload (&it, &p); + fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &err); + if (fd == -1) + goto error; + + fdmem = gst_fd_allocator_alloc (fddepay->fd_allocator, fd, + p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, p.offset, p.size); + gst_buffer_append_memory (buf, fdmem); + break; + } + default: + break; + } + } + hdr = pinos_buffer_get_header (&pbuf, NULL); + GST_BUFFER_OFFSET (buf) = hdr->seq; return GST_FLOW_OK; error: - if (fds) - g_free (fds); - if (fd >= 0) - close (fd); - return GST_FLOW_ERROR; + { + GST_ELEMENT_ERROR (fddepay, RESOURCE, SETTINGS, (NULL), + ("can't get fd: %s", err->message)); + g_clear_error (&err); + return GST_FLOW_ERROR; + } } diff --git a/src/gst/gstfdpay.c b/src/gst/gstfdpay.c index 14fd3b1d..f7ba0ff0 100644 --- a/src/gst/gstfdpay.c +++ b/src/gst/gstfdpay.c @@ -53,7 +53,7 @@ #include <string.h> #include <unistd.h> -#include "wire-protocol.h" +#include <client/pinos.h> GST_DEBUG_CATEGORY_STATIC (gst_fdpay_debug_category); #define GST_CAT_DEFAULT gst_fdpay_debug_category @@ -191,7 +191,7 @@ gst_fdpay_transform_size (GstBaseTransform *trans, GstPadDirection direction, return FALSE; } else { /* transform size going downstream */ - *othersize = sizeof (FDMessage); + *othersize = sizeof (PinosBuffer) + 30; } return TRUE; @@ -241,40 +241,53 @@ gst_fdpay_transform (GstBaseTransform * trans, GstBuffer * inbuf, GstMemory *fdmem = NULL; GstMapInfo info; GError *err = NULL; - GSocketControlMessage *fdmsg = NULL; - FDMessage msg = { 0, }; + PinosBuffer pbuf; + PinosPacketBuilder builder; + PinosBufferHeader hdr; + gsize size; GST_DEBUG_OBJECT (fdpay, "transform_ip"); fdmem = gst_fdpay_get_fd_memory (fdpay, inbuf); - msg.flags = 0; - msg.seq = GST_BUFFER_OFFSET (inbuf); - msg.pts = GST_BUFFER_TIMESTAMP (inbuf) + GST_ELEMENT_CAST (trans)->base_time; - msg.dts_offset = 0; - msg.size = fdmem->size; - msg.offset = fdmem->offset; - - fdmsg = g_unix_fd_message_new (); - if (!g_unix_fd_message_append_fd ((GUnixFDMessage*) fdmsg, - gst_fd_memory_get_fd (fdmem), &err)) { + hdr.flags = 0; + hdr.seq = GST_BUFFER_OFFSET (inbuf); + hdr.pts = GST_BUFFER_TIMESTAMP (inbuf) + GST_ELEMENT_CAST (trans)->base_time; + hdr.dts_offset = 0; + + pinos_packet_builder_init (&builder, &hdr); + if (!pinos_packet_builder_add_fd_payload (&builder, + fdmem->offset, + fdmem->size, + gst_fd_memory_get_fd (fdmem), + &err)) goto append_fd_failed; - } + + pinos_packet_builder_end (&builder, &pbuf); gst_memory_unref(fdmem); fdmem = NULL; - gst_buffer_add_net_control_message_meta (outbuf, fdmsg); - g_clear_object (&fdmsg); + gst_buffer_add_net_control_message_meta (outbuf, + pinos_buffer_get_socket_control_message (&pbuf)); + + size = pinos_buffer_get_size (&pbuf); gst_buffer_map (outbuf, &info, GST_MAP_WRITE); - memcpy (info.data, &msg, sizeof (msg)); + pinos_buffer_store (&pbuf, info.data); gst_buffer_unmap (outbuf, &info); + gst_buffer_resize (outbuf, 0, size); + + pinos_buffer_clear (&pbuf); return GST_FLOW_OK; + + /* ERRORS */ append_fd_failed: - GST_WARNING_OBJECT (trans, "Appending fd failed: %s", err->message); - gst_memory_unref(fdmem); - g_clear_error (&err); - g_clear_object (&fdmsg); - return GST_FLOW_ERROR; + { + GST_WARNING_OBJECT (trans, "Appending fd failed: %s", err->message); + gst_memory_unref(fdmem); + g_clear_error (&err); + + return GST_FLOW_ERROR; + } } diff --git a/src/gst/gstpinossink.c b/src/gst/gstpinossink.c index 713a690a..cd80ce4f 100644 --- a/src/gst/gstpinossink.c +++ b/src/gst/gstpinossink.c @@ -329,10 +329,12 @@ static GstFlowReturn gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstPinosSink *pinossink; - PinosBufferInfo info; - GSocketControlMessage *mesg; + PinosBuffer pbuf; + PinosPacketBuilder builder; GstMemory *mem = NULL; GstClockTime pts, dts, base; + PinosBufferHeader hdr; + gsize size; pinossink = GST_PINOS_SINK (bsink); @@ -348,14 +350,15 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) else if (!GST_CLOCK_TIME_IS_VALID (dts)) dts = pts; - info.flags = 0; - info.seq = GST_BUFFER_OFFSET (buffer); - info.pts = GST_CLOCK_TIME_IS_VALID (pts) ? pts + base : base; - info.dts_offset = GST_CLOCK_TIME_IS_VALID (dts) && GST_CLOCK_TIME_IS_VALID (pts) ? pts - dts : 0; - info.offset = 0; - info.size = gst_buffer_get_size (buffer); + hdr.flags = 0; + hdr.seq = GST_BUFFER_OFFSET (buffer); + hdr.pts = GST_CLOCK_TIME_IS_VALID (pts) ? pts + base : base; + hdr.dts_offset = GST_CLOCK_TIME_IS_VALID (dts) && GST_CLOCK_TIME_IS_VALID (pts) ? pts - dts : 0; + + size = gst_buffer_get_size (buffer); + + pinos_packet_builder_init (&builder, &hdr); - mesg = g_unix_fd_message_new (); if (gst_buffer_n_memory (buffer) == 1 && gst_is_fd_memory (gst_buffer_peek_memory (buffer, 0))) { mem = gst_buffer_get_memory (buffer, 0); @@ -365,20 +368,22 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GST_INFO_OBJECT (bsink, "Buffer cannot be payloaded without copying"); - mem = gst_allocator_alloc (pinossink->allocator, info.size, ¶ms); + mem = gst_allocator_alloc (pinossink->allocator, size, ¶ms); if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE)) goto map_error; - gst_buffer_extract (buffer, 0, minfo.data, info.size); + gst_buffer_extract (buffer, 0, minfo.data, size); gst_memory_unmap (mem, &minfo); } - g_unix_fd_message_append_fd ((GUnixFDMessage*)mesg, gst_fd_memory_get_fd (mem), NULL); + + pinos_packet_builder_add_fd_payload (&builder, 0, size, gst_fd_memory_get_fd (mem), NULL); gst_memory_unref (mem); - info.message = mesg; + + pinos_packet_builder_end (&builder, &pbuf); pinos_main_loop_lock (pinossink->loop); if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING) goto streaming_error; - pinos_stream_provide_buffer (pinossink->stream, &info); + pinos_stream_provide_buffer (pinossink->stream, &pbuf); pinos_main_loop_unlock (pinossink->loop); return GST_FLOW_OK; diff --git a/src/gst/gstpinossrc.c b/src/gst/gstpinossrc.c index ecd5bcaf..bccc5a13 100644 --- a/src/gst/gstpinossrc.c +++ b/src/gst/gstpinossrc.c @@ -251,9 +251,72 @@ on_new_buffer (GObject *gobject, gpointer user_data) { GstPinosSrc *pinossrc = user_data; + PinosBuffer pbuf; + const PinosBufferHeader *hdr; + PinosPacketIter it; + GstBuffer *buf; + GError *error = NULL; GST_LOG_OBJECT (pinossrc, "got new buffer"); + pinos_stream_capture_buffer (pinossrc->stream, &pbuf); + + buf = gst_buffer_new (); + + hdr = pinos_buffer_get_header (&pbuf, NULL); + + if (GST_CLOCK_TIME_IS_VALID (hdr->pts)) { + if (hdr->pts > GST_ELEMENT_CAST (pinossrc)->base_time) + GST_BUFFER_PTS (buf) = hdr->pts - GST_ELEMENT_CAST (pinossrc)->base_time; + + if (GST_BUFFER_PTS (buf) + hdr->dts_offset > 0) + GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr->dts_offset; + } + GST_BUFFER_OFFSET (buf) = hdr->seq; + + pinos_packet_iter_init (&it, &pbuf); + while (pinos_packet_iter_next (&it)) { + switch (pinos_packet_iter_get_type (&it)) { + case PINOS_PACKET_TYPE_FD_PAYLOAD: + { + GstMemory *fdmem = NULL; + PinosPacketFDPayload p; + int fd; + + GST_DEBUG ("got fd payload"); + pinos_packet_iter_parse_fd_payload (&it, &p); + fd = pinos_buffer_get_fd (&pbuf, p.fd_index, &error); + if (fd == -1) + goto no_fds; + + fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fd, + p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, p.offset, p.size); + gst_buffer_append_memory (buf, fdmem); + break; + } + default: + break; + } + } + if (pinossrc->current) + gst_buffer_unref (pinossrc->current); + pinossrc->current = buf; + + pinos_stream_release_buffer (pinossrc->stream, &pbuf); + pinos_main_loop_signal (pinossrc->loop, FALSE); + + return; + + /* ERRORS */ +no_fds: + { + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, + ("buffer error: %s", error->message), (NULL)); + pinos_main_loop_signal (pinossrc->loop, FALSE); + return; + } + } static void @@ -442,17 +505,12 @@ static GstFlowReturn gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer) { GstPinosSrc *pinossrc; - PinosBufferInfo info; - gint *fds, n_fds; - GstMemory *fdmem = NULL; - GstBuffer *buf; pinossrc = GST_PINOS_SRC (psrc); if (!pinossrc->negotiated) goto not_negotiated; -again: pinos_main_loop_lock (pinossrc->loop); while (TRUE) { PinosStreamState state; @@ -466,39 +524,12 @@ again: if (state != PINOS_STREAM_STATE_STREAMING) goto streaming_stopped; - GST_LOG_OBJECT (pinossrc, "start capture buffer"); - pinos_stream_capture_buffer (pinossrc->stream, &info); - if (info.message != NULL) { - GST_LOG_OBJECT (pinossrc, "no message, retry"); - break; - } + break; } pinos_main_loop_unlock (pinossrc->loop); - if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS) - goto again; - - fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds); - if (n_fds < 1 || fds[0] < 0) - goto again; - - fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, fds[0], - info.offset + info.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, info.offset, info.size); - - buf = gst_buffer_new (); - gst_buffer_append_memory (buf, fdmem); - - if (GST_CLOCK_TIME_IS_VALID (info.pts)) { - if (info.pts > GST_ELEMENT_CAST (pinossrc)->base_time) - GST_BUFFER_PTS (buf) = info.pts - GST_ELEMENT_CAST (pinossrc)->base_time; - - if (GST_BUFFER_PTS (buf) + info.dts_offset > 0) - GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + info.dts_offset; - } - GST_BUFFER_OFFSET (buf) = info.seq; - - *buffer = buf; + *buffer = pinossrc->current; + pinossrc->current = NULL; return GST_FLOW_OK; diff --git a/src/gst/gstpinossrc.h b/src/gst/gstpinossrc.h index cc58d598..05aed1a9 100644 --- a/src/gst/gstpinossrc.h +++ b/src/gst/gstpinossrc.h @@ -62,6 +62,8 @@ struct _GstPinosSrc { PinosContext *ctx; PinosStream *stream; GstAllocator *fd_allocator; + + GstBuffer *current; }; struct _GstPinosSrcClass { diff --git a/src/gst/wire-protocol.h b/src/gst/wire-protocol.h deleted file mode 100644 index cad4ae42..00000000 --- a/src/gst/wire-protocol.h +++ /dev/null @@ -1,45 +0,0 @@ -/* GStreamer - * Copyright (C) 2014 William Manley <will@williammanley.net> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#ifndef _GST_FDPAY_WIRE_PROTOCOL_H_ -#define _GST_FDPAY_WIRE_PROTOCOL_H_ - -#include <stdint.h> - -/** - * @flags: possible flags - * @seq: a sequence number - * @pts: a PTS or presentation timestamp - * @dts_offset: an offset to @pts to get the DTS - * @offset: offset in fd - * @size: size of data in fd - * - * Almost the simplest possible FD passing protocol. Each message should have - * a file-descriptor attached which should be mmapable. The data in the FD can - * be found at offset and is size bytes long. */ -typedef struct { - guint32 flags; - guint32 seq; - gint64 pts; - gint64 dts_offset; - guint64 offset; - guint64 size; -} FDMessage; - -#endif |