diff options
author | Wim Taymans <wtaymans@redhat.com> | 2016-09-22 08:55:30 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2016-09-22 09:08:44 +0200 |
commit | 68148188fa4588353ce4c2a392578a1eac8142a1 (patch) | |
tree | 3721c74d6b7551ef058971c629d9e53e5c88e067 | |
parent | 27acab7532e8870d6921c31c359ef8628e8f0008 (diff) |
Add support for async results
Add an async result code and an event to signal the completion.
Use async return values to signal completion of a method and potential
state change.
Add selected format to port update message.
Make it possible to parse into a custom copy of the command memory.
Remove state change events from the elements, we now just update the
state.
Implement async results in the proxy element
Add support for removing buffers in the client.
Fix up pinossink
Deal with async return in the links.
-rw-r--r-- | pinos/client/stream.c | 224 | ||||
-rw-r--r-- | pinos/client/stream.h | 3 | ||||
-rw-r--r-- | pinos/gst/gstpinospool.c | 2 | ||||
-rw-r--r-- | pinos/gst/gstpinossink.c | 27 | ||||
-rw-r--r-- | pinos/gst/gstpinossrc.c | 9 | ||||
-rw-r--r-- | pinos/server/link.c | 19 | ||||
-rw-r--r-- | pinos/server/node.c | 36 | ||||
-rw-r--r-- | spa/include/spa/control.h | 31 | ||||
-rw-r--r-- | spa/include/spa/defs.h | 11 | ||||
-rw-r--r-- | spa/include/spa/node-event.h | 9 | ||||
-rw-r--r-- | spa/include/spa/node.h | 16 | ||||
-rw-r--r-- | spa/lib/control.c | 48 | ||||
-rw-r--r-- | spa/lib/format.c | 40 | ||||
-rw-r--r-- | spa/plugins/alsa/alsa-sink.c | 12 | ||||
-rw-r--r-- | spa/plugins/alsa/alsa-source.c | 12 | ||||
-rw-r--r-- | spa/plugins/audiomixer/audiomixer.c | 30 | ||||
-rw-r--r-- | spa/plugins/audiotestsrc/audiotestsrc.c | 14 | ||||
-rw-r--r-- | spa/plugins/ffmpeg/ffmpeg-dec.c | 31 | ||||
-rw-r--r-- | spa/plugins/ffmpeg/ffmpeg-enc.c | 31 | ||||
-rw-r--r-- | spa/plugins/remote/proxy.c | 140 | ||||
-rw-r--r-- | spa/plugins/v4l2/v4l2-source.c | 12 | ||||
-rw-r--r-- | spa/plugins/v4l2/v4l2-utils.c | 1 | ||||
-rw-r--r-- | spa/plugins/videotestsrc/videotestsrc.c | 14 | ||||
-rw-r--r-- | spa/plugins/volume/volume.c | 30 | ||||
-rw-r--r-- | spa/plugins/xv/xv-sink.c | 30 |
25 files changed, 441 insertions, 391 deletions
diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 86efb38c..bc275bcd 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -39,6 +39,8 @@ #define MAX_BUFFER_SIZE 4096 #define MAX_FDS 32 +#define MAX_INPUTS 64 +#define MAX_OUTPUTS 64 typedef struct { bool cleanup; @@ -47,13 +49,6 @@ typedef struct { SpaBuffer *buf; } BufferId; -static void -clear_buffer_id (BufferId *id) -{ - spa_memory_unref (&id->buf->mem.mem); - id->buf = NULL; -} - struct _PinosStreamPrivate { PinosContext *context; @@ -68,9 +63,12 @@ struct _PinosStreamPrivate PinosDirection direction; gchar *path; + SpaNodeState node_state; GPtrArray *possible_formats; SpaFormat *format; SpaPortInfo port_info; + uint32_t port_id; + uint32_t pending_seq; PinosStreamFlags flags; @@ -127,6 +125,23 @@ enum static guint signals[LAST_SIGNAL] = { 0 }; static void +clear_buffers (PinosStream *stream) +{ + PinosStreamPrivate *priv = stream->priv; + guint i; + + for (i = 0; i < priv->buffer_ids->len; i++) { + BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i); + + g_signal_emit (stream, signals[SIGNAL_REMOVE_BUFFER], 0, bid->id); + spa_memory_unref (&bid->buf->mem.mem); + bid->buf = NULL; + } + g_array_set_size (priv->buffer_ids, 0); + priv->in_order = TRUE; +} + +static void pinos_stream_get_property (GObject *_object, guint prop_id, GValue *value, @@ -491,9 +506,9 @@ pinos_stream_init (PinosStream * stream) g_debug ("new stream %p", stream); priv->state = PINOS_STREAM_STATE_UNCONNECTED; + priv->node_state = SPA_NODE_STATE_INIT; priv->buffer_ids = g_array_sized_new (FALSE, FALSE, sizeof (BufferId), 64); - g_array_set_clear_func (priv->buffer_ids, (GDestroyNotify) clear_buffer_id); - priv->in_order = TRUE; + priv->pending_seq = SPA_ID_INVALID; } /** @@ -612,17 +627,30 @@ add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang } static void +add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlCmdNodeStateChange sc; + + sc.state = priv->node_state = state; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_STATE_CHANGE, &sc); +} + +static void add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdPortUpdate pu = { 0, };; - pu.port_id = 0; + pu.port_id = priv->port_id; pu.change_mask = change_mask; if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS) { pu.n_possible_formats = priv->possible_formats->len; pu.possible_formats = (SpaFormat **)priv->possible_formats->pdata; } + if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_FORMAT) { + pu.format = priv->format; + } pu.props = NULL; if (change_mask & SPA_CONTROL_CMD_PORT_UPDATE_INFO) pu.info = &priv->port_info; @@ -630,21 +658,6 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang } static void -add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) -{ - SpaControlCmdNodeEvent cne; - SpaNodeEvent ne; - SpaNodeEventStateChange sc; - - cne.event = ≠ - ne.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - ne.data = ≻ - ne.size = sizeof (sc); - sc.state = state; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); -} - -static void add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) { SpaControlCmdNodeEvent cne; @@ -694,6 +707,25 @@ add_request_clock_update (PinosStream *stream, SpaControlBuilder *builder) } static void +add_async_complete (PinosStream *stream, + SpaControlBuilder *builder, + uint32_t seq, + SpaResult res) +{ + SpaControlCmdNodeEvent cne; + SpaNodeEvent ne; + SpaNodeEventAsyncComplete ac; + + cne.event = ≠ + ne.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + ne.data = ∾ + ne.size = sizeof (ac); + ac.seq = seq; + ac.res = res; + spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); +} + +static void send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) { PinosStreamPrivate *priv = stream->priv; @@ -751,6 +783,30 @@ send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) spa_control_clear (&control); } +static void +do_node_init (PinosStream *stream) +{ + PinosStreamPrivate *priv = stream->priv; + SpaControlBuilder builder; + SpaControl control; + + control_builder_init (stream, &builder); + add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | + SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); + + priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | + SPA_CONTROL_CMD_PORT_UPDATE_INFO); + + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); +} + static BufferId * find_buffer (PinosStream *stream, uint32_t id) { @@ -779,9 +835,9 @@ handle_node_event (PinosStream *stream, case SPA_NODE_EVENT_TYPE_INVALID: case SPA_NODE_EVENT_TYPE_PORT_ADDED: case SPA_NODE_EVENT_TYPE_PORT_REMOVED: - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: g_warning ("unhandled node event %d", event->type); break; @@ -790,7 +846,7 @@ handle_node_event (PinosStream *stream, SpaNodeEventReuseBuffer *p = event->data; BufferId *bid; - if (p->port_id != 0) + if (p->port_id != priv->port_id) break; if (priv->direction != PINOS_DIRECTION_OUTPUT) break; @@ -818,6 +874,7 @@ handle_node_event (PinosStream *stream, static gboolean handle_node_command (PinosStream *stream, + uint32_t seq, SpaNodeCommand *command) { PinosStreamPrivate *priv = stream->priv; @@ -834,6 +891,7 @@ handle_node_command (PinosStream *stream, control_builder_init (stream, &builder); add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + add_async_complete (stream, &builder, seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -852,8 +910,9 @@ handle_node_command (PinosStream *stream, g_debug ("stream %p: start", stream); control_builder_init (stream, &builder); if (priv->direction == PINOS_DIRECTION_INPUT) - add_need_input (stream, &builder, 0); + add_need_input (stream, &builder, priv->port_id); add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); + add_async_complete (stream, &builder, seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -867,8 +926,21 @@ handle_node_command (PinosStream *stream, case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: + { + SpaControlBuilder builder; + SpaControl control; + g_warning ("unhandled node command %d", command->type); + control_builder_init (stream, &builder); + add_async_complete (stream, &builder, seq, SPA_RESULT_NOT_IMPLEMENTED); + spa_control_builder_end (&builder, &control); + + if (spa_control_write (&control, priv->fd) < 0) + g_warning ("stream %p: error writing control", stream); + + spa_control_clear (&control); break; + } case SPA_NODE_COMMAND_CLOCK_UPDATE: { @@ -904,6 +976,7 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_PORT_UPDATE: case SPA_CONTROL_CMD_PORT_REMOVED: case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: g_warning ("got unexpected control %d", cmd); break; @@ -924,21 +997,8 @@ parse_control (PinosStream *stream, priv->format = p.format; spa_debug_format (p.format); + priv->pending_seq = p.seq; g_object_notify (G_OBJECT (stream), "format"); - - if (priv->port_info.n_params != 0) { - SpaControlBuilder builder; - SpaControl control; - - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_READY); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); - } break; } case SPA_CONTROL_CMD_SET_PROPERTY: @@ -960,7 +1020,7 @@ parse_control (PinosStream *stream, mem = spa_memory_import (&p.mem); if (mem->fd == -1) { - g_debug ("add mem %d,%d, %d, %d", p.mem.pool_id, p.mem.id, fd, p.flags); + g_debug ("add mem %u:%u, fd %d, flags %d", p.mem.pool_id, p.mem.id, fd, p.flags); mem->flags = p.flags; mem->fd = fd; mem->ptr = NULL; @@ -991,13 +1051,15 @@ parse_control (PinosStream *stream, break; /* clear previous buffers */ - g_array_set_size (priv->buffer_ids, 0); + clear_buffers (stream); for (i = 0; i < p.n_buffers; i++) { bid.buf = p.buffers[i]; bid.cleanup = false; bid.id = bid.buf->id; - g_debug ("add buffer %d, %d, %zd, %zd", bid.id, bid.buf->mem.mem.id, bid.buf->mem.offset, bid.buf->mem.size); + g_debug ("add buffer %d: %u:%u, %zd-%zd", bid.id, + bid.buf->mem.mem.pool_id, bid.buf->mem.mem.id, + bid.buf->mem.offset, bid.buf->mem.size); if (bid.id != priv->buffer_ids->len) { g_warning ("unexpected id %u found, expected %u", bid.id, priv->buffer_ids->len); @@ -1013,6 +1075,7 @@ parse_control (PinosStream *stream, } else { add_state_change (stream, &builder, SPA_NODE_STATE_READY); } + add_async_complete (stream, &builder, p.seq, SPA_RESULT_OK); spa_control_builder_end (&builder, &control); if (spa_control_write (&control, priv->fd) < 0) @@ -1033,7 +1096,7 @@ parse_control (PinosStream *stream, g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id); - send_need_input (stream, 0); + send_need_input (stream, priv->port_id); break; } case SPA_CONTROL_CMD_NODE_EVENT: @@ -1053,7 +1116,7 @@ parse_control (PinosStream *stream, if (spa_control_iter_parse_cmd (&it, &p) < 0) break; - handle_node_command (stream, p.command); + handle_node_command (stream, p.seq, p.command); break; } @@ -1175,8 +1238,6 @@ on_node_proxy (GObject *source_object, PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; PinosContext *context = priv->context; - SpaControlBuilder builder; - SpaControl control; GError *error = NULL; @@ -1186,22 +1247,7 @@ on_node_proxy (GObject *source_object, if (priv->node == NULL) goto node_failed; - control_builder_init (stream, &builder); - add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | - SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); - - priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | - SPA_CONTROL_CMD_PORT_UPDATE_INFO); - - add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); - - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + do_node_init (stream); stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); g_object_unref (stream); @@ -1343,6 +1389,7 @@ pinos_stream_connect (PinosStream *stream, g_return_val_if_fail (pinos_stream_get_state (stream) == PINOS_STREAM_STATE_UNCONNECTED, FALSE); priv->direction = direction; + priv->port_id = direction == PINOS_DIRECTION_INPUT ? 0 : MAX_INPUTS; priv->mode = mode; g_free (priv->path); priv->path = g_strdup (port_path); @@ -1361,16 +1408,26 @@ pinos_stream_connect (PinosStream *stream, } /** - * pinos_stream_start_allocation: + * pinos_stream_finish_format: * @stream: a #PinosStream - * @props: a #PinosProperties + * @res: a #SpaResult + * @params: an array of pointers to #SpaAllocParam + * @n_params: number of elements in @params + * + * Complete the negotiation process with result code @res. + * + * This function should be called after notification of the format. + + * When @res indicates success, @params contain the parameters for the + * allocation state. * * Returns: %TRUE on success */ gboolean -pinos_stream_start_allocation (PinosStream *stream, - SpaAllocParam **params, - unsigned int n_params) +pinos_stream_finish_format (PinosStream *stream, + SpaResult res, + SpaAllocParam **params, + unsigned int n_params) { PinosStreamPrivate *priv; PinosContext *context; @@ -1379,24 +1436,31 @@ pinos_stream_start_allocation (PinosStream *stream, g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; + g_return_val_if_fail (priv->pending_seq != SPA_ID_INVALID, FALSE); context = priv->context; g_return_val_if_fail (pinos_context_get_state (context) == PINOS_CONTEXT_STATE_CONNECTED, FALSE); - control_builder_init (stream, &builder); - priv->port_info.params = params; priv->port_info.n_params = n_params; - /* send update port status */ - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO); - - /* send state-change */ - if (priv->format) - add_state_change (stream, &builder, SPA_NODE_STATE_READY); + control_builder_init (stream, &builder); + if (SPA_RESULT_IS_OK (res)) { + add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO | + SPA_CONTROL_CMD_PORT_UPDATE_FORMAT); + if (priv->format) { + add_state_change (stream, &builder, SPA_NODE_STATE_READY); + } else { + clear_buffers (stream); + add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + } + } + add_async_complete (stream, &builder, priv->pending_seq, res); spa_control_builder_end (&builder, &control); + priv->pending_seq = SPA_ID_INVALID; + if (spa_control_write (&control, priv->fd) < 0) g_warning ("stream %p: error writing control", stream); @@ -1659,7 +1723,7 @@ pinos_stream_recycle_buffer (PinosStream *stream, priv = stream->priv; g_return_val_if_fail (priv->direction == PINOS_DIRECTION_INPUT, FALSE); - send_reuse_buffer (stream, 0, id); + send_reuse_buffer (stream, priv->port_id, id); return TRUE; } @@ -1715,7 +1779,7 @@ pinos_stream_send_buffer (PinosStream *stream, if ((bid = find_buffer (stream, id))) { bid->used = TRUE; - send_process_buffer (stream, 0, id); + send_process_buffer (stream, priv->port_id, id); return TRUE; } else { return FALSE; diff --git a/pinos/client/stream.h b/pinos/client/stream.h index 3e2481b3..1ac07ff2 100644 --- a/pinos/client/stream.h +++ b/pinos/client/stream.h @@ -108,7 +108,8 @@ gboolean pinos_stream_connect (PinosStream *stream, GPtrArray *possible_formats); gboolean pinos_stream_disconnect (PinosStream *stream); -gboolean pinos_stream_start_allocation (PinosStream *stream, +gboolean pinos_stream_finish_format (PinosStream *stream, + SpaResult res, SpaAllocParam **params, unsigned int n_params); diff --git a/pinos/gst/gstpinospool.c b/pinos/gst/gstpinospool.c index aa0aaa1a..2019bb11 100644 --- a/pinos/gst/gstpinospool.c +++ b/pinos/gst/gstpinospool.c @@ -124,7 +124,7 @@ do_start (GstBufferPool * pool) param_meta_enable.param.size = sizeof (SpaAllocParamMetaEnable); param_meta_enable.type = SPA_META_TYPE_HEADER; - pinos_stream_start_allocation (p->stream, port_params, 2); + pinos_stream_finish_format (p->stream, SPA_RESULT_OK, port_params, 2); return TRUE; } diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 17b8baba..683a8959 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -481,6 +481,33 @@ on_format_notify (GObject *gobject, GParamSpec *pspec, gpointer user_data) { + GstPinosSink *pinossink = user_data; + GstStructure *config; + GstCaps *caps; + guint size; + guint min_buffers; + guint max_buffers; + SpaAllocParam *port_params[2]; + SpaAllocParamMetaEnable param_meta_enable; + SpaAllocParamBuffers param_buffers; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pinossink->pool)); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + port_params[0] = ¶m_buffers.param; + param_buffers.param.type = SPA_ALLOC_PARAM_TYPE_BUFFERS; + param_buffers.param.size = sizeof (SpaAllocParamBuffers); + param_buffers.minsize = size; + param_buffers.stride = 0; + param_buffers.min_buffers = min_buffers; + param_buffers.max_buffers = max_buffers; + param_buffers.align = 16; + port_params[1] = ¶m_meta_enable.param; + param_meta_enable.param.type = SPA_ALLOC_PARAM_TYPE_META_ENABLE; + param_meta_enable.param.size = sizeof (SpaAllocParamMetaEnable); + param_meta_enable.type = SPA_META_TYPE_HEADER; + + pinos_stream_finish_format (pinossink->stream, SPA_RESULT_OK, port_params, 2); } static gboolean diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 6f8bd1da..c828b6b5 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -718,14 +718,19 @@ on_format_notify (GObject *gobject, GstPinosSrc *pinossrc = user_data; SpaFormat *format; GstCaps *caps; + gboolean res; g_object_get (gobject, "format", &format, NULL); caps = gst_caps_from_format (format); - gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); + res = gst_base_src_set_caps (GST_BASE_SRC (pinossrc), caps); gst_caps_unref (caps); - pinos_stream_start_allocation (pinossrc->stream, NULL, 0); + if (res) { + pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_OK, NULL, 0); + } else { + pinos_stream_finish_format (pinossrc->stream, SPA_RESULT_INVALID_MEDIA_TYPE, NULL, 0); + } } static gboolean diff --git a/pinos/server/link.c b/pinos/server/link.c index 9309d895..3ead648e 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -34,7 +34,7 @@ #define PINOS_LINK_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_LINK, PinosLinkPrivate)) -#define MAX_BUFFERS 64 +#define MAX_BUFFERS 16 struct _PinosLinkPrivate { @@ -498,7 +498,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) } g_debug ("allocated out_buffers %p from output port", priv->out_buffers); } - else if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { + if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { g_debug ("using out_buffers %p on input port", priv->out_buffers); if ((res = spa_node_port_use_buffers (this->input_node->node, this->input_port, priv->out_buffers, priv->n_out_buffers)) < 0) { @@ -563,6 +563,7 @@ check_states (PinosLink *this) SpaResult res; SpaNodeState in_state, out_state; +again: in_state = this->input_node->node->state; out_state = this->output_node->node->state; @@ -577,9 +578,21 @@ check_states (PinosLink *this) if ((res = do_start (this, in_state, out_state)) < 0) return res; + if (this->input_node->node->state != in_state) + goto again; + if (this->output_node->node->state != out_state) + goto again; + return SPA_RESULT_OK; } +static gboolean +do_check_states (PinosLink *this) +{ + check_states (this); + return G_SOURCE_REMOVE; +} + static void on_node_state_notify (GObject *obj, GParamSpec *pspec, @@ -588,7 +601,7 @@ on_node_state_notify (GObject *obj, PinosLink *this = user_data; g_debug ("link %p: node %p state change", this, obj); - check_states (this); + g_idle_add ((GSourceFunc) do_check_states, this); } static void diff --git a/pinos/server/node.c b/pinos/server/node.c index 04405920..f07dcd45 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -311,7 +311,6 @@ suspend_node (PinosNode *this) if ((res = spa_node_port_set_format (this->node, 0, 0, NULL)) < 0) g_warning ("error unset format output: %d", res); - } static void @@ -405,6 +404,23 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) PinosNodePrivate *priv = this->priv; switch (event->type) { + case SPA_NODE_EVENT_TYPE_INVALID: + case SPA_NODE_EVENT_TYPE_DRAINED: + case SPA_NODE_EVENT_TYPE_MARKER: + case SPA_NODE_EVENT_TYPE_ERROR: + case SPA_NODE_EVENT_TYPE_BUFFERING: + case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: + break; + + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: + { + SpaNodeEventAsyncComplete *ac = event->data; + g_debug ("async complete %u %d", ac->seq, ac->res); + if (SPA_RESULT_IS_OK (ac->res)) + g_object_notify (G_OBJECT (this), "node-state"); + break; + } + case SPA_NODE_EVENT_TYPE_PORT_ADDED: { SpaNodeEventPortAdded *pa = event->data; @@ -432,17 +448,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) g_main_context_invoke (NULL, (GSourceFunc) do_signal_port_removed, data); break; } - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: - { - SpaNodeEventStateChange *sc = event->data; - - g_debug ("node %p: update SPA state to %d", this, sc->state); - if (sc->state == SPA_NODE_STATE_CONFIGURE) { - update_port_ids (this, FALSE); - } - g_object_notify (G_OBJECT (this), "node-state"); - break; - } case SPA_NODE_EVENT_TYPE_ADD_POLL: { SpaPollItem *poll = event->data; @@ -499,10 +504,13 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { + SpaNodeEventHaveOutput *ho = event->data; SpaPortOutputInfo oinfo[1] = { 0, }; SpaResult res; guint i; + oinfo[0].port_id = ho->port_id; + if ((res = spa_node_port_pull_output (node, 1, oinfo)) < 0) { g_warning ("node %p: got pull error %d, %d", this, res, oinfo[0].status); break; @@ -554,10 +562,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: send_clock_update (this); break; - - default: - g_debug ("node %p: got event %d", this, event->type); - break; } } diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index 66470299..f5685016 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -35,6 +35,7 @@ typedef struct _SpaControlBuilder SpaControlBuilder; #include <spa/format.h> #include <spa/port.h> #include <spa/node.h> +#include <spa/memory.h> struct _SpaControl { size_t x[16]; @@ -59,8 +60,9 @@ typedef enum { SPA_CONTROL_CMD_NODE_UPDATE = 1, SPA_CONTROL_CMD_PORT_UPDATE = 2, SPA_CONTROL_CMD_PORT_REMOVED = 3, + SPA_CONTROL_CMD_NODE_STATE_CHANGE = 4, - SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 4, + SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 5, /* server to client */ SPA_CONTROL_CMD_ADD_PORT = 32, @@ -92,16 +94,17 @@ typedef struct { const SpaProps *props; } SpaControlCmdNodeUpdate; - /* SPA_CONTROL_CMD_PORT_UPDATE */ typedef struct { uint32_t port_id; #define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 0) -#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 1) -#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 2) +#define SPA_CONTROL_CMD_PORT_UPDATE_FORMAT (1 << 1) +#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 2) +#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 3) uint32_t change_mask; unsigned int n_possible_formats; SpaFormat **possible_formats; + SpaFormat *format; const SpaProps *props; const SpaPortInfo *info; } SpaControlCmdPortUpdate; @@ -113,25 +116,34 @@ typedef struct { /* SPA_CONTROL_CMD_PORT_STATUS_CHANGE */ +/* SPA_CONTROL_CMD_NODE_STATE_CHANGE */ +typedef struct { + SpaNodeState state; +} SpaControlCmdNodeStateChange; + /* SPA_CONTROL_CMD_ADD_PORT */ typedef struct { - uint32_t port_id; + uint32_t seq; + uint32_t port_id; } SpaControlCmdAddPort; /* SPA_CONTROL_CMD_REMOVE_PORT */ typedef struct { + uint32_t seq; uint32_t port_id; } SpaControlCmdRemovePort; /* SPA_CONTROL_CMD_SET_FORMAT */ typedef struct { + uint32_t seq; uint32_t port_id; SpaFormat *format; } SpaControlCmdSetFormat; /* SPA_CONTROL_CMD_SET_PROPERTY */ typedef struct { + uint32_t seq; uint32_t port_id; uint32_t id; size_t size; @@ -140,11 +152,13 @@ typedef struct { /* SPA_CONTROL_CMD_NODE_COMMAND */ typedef struct { + uint32_t seq; SpaNodeCommand *command; } SpaControlCmdNodeCommand; /* SPA_CONTROL_CMD_ADD_MEM */ typedef struct { + uint32_t seq; uint32_t port_id; SpaMemoryRef mem; uint32_t mem_type; @@ -155,12 +169,14 @@ typedef struct { /* SPA_CONTROL_CMD_REMOVE_MEM */ typedef struct { + uint32_t seq; uint32_t port_id; SpaMemoryRef mem; } SpaControlCmdRemoveMem; /* SPA_CONTROL_CMD_USE_BUFFERS */ typedef struct { + uint32_t seq; uint32_t port_id; unsigned int n_buffers; SpaBuffer **buffers; @@ -192,7 +208,10 @@ SpaResult spa_control_iter_end (SpaControlIter *iter); SpaControlCmd spa_control_iter_get_cmd (SpaControlIter *iter); void * spa_control_iter_get_data (SpaControlIter *iter, - size_t *size); + size_t *size); +SpaResult spa_control_iter_set_data (SpaControlIter *iter, + void *data, + size_t size); SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command); diff --git a/spa/include/spa/defs.h b/spa/include/spa/defs.h index beb1bc16..5c635c62 100644 --- a/spa/include/spa/defs.h +++ b/spa/include/spa/defs.h @@ -30,6 +30,8 @@ extern "C" { #include <stddef.h> typedef enum { + SPA_RESULT_ASYNC = (1 << 30), + SPA_RESULT_MODIFIED = 1, SPA_RESULT_OK = 0, SPA_RESULT_ERROR = -1, SPA_RESULT_INACTIVE = -2, @@ -59,8 +61,17 @@ typedef enum { SPA_RESULT_NO_BUFFERS = -27, SPA_RESULT_INVALID_BUFFER_ID = -28, SPA_RESULT_WRONG_STATE = -29, + SPA_RESULT_ASYNC_BUSY = -30, } SpaResult; +#define SPA_RESULT_IS_OK(res) ((res) >= 0) +#define SPA_RESULT_IS_ERROR(res) ((res) < 0) +#define SPA_RESULT_IS_ASYNC(res) (((res) & SPA_RESULT_ASYNC) == SPA_RESULT_ASYNC) + +#define SPA_ASYNC_SEQ_MASK (SPA_RESULT_ASYNC - 1) +#define SPA_RESULT_ASYNC_SEQ(res) ((res) & SPA_ASYNC_SEQ_MASK) +#define SPA_RESULT_RETURN_ASYNC(seq) (SPA_RESULT_ASYNC | ((seq) & SPA_ASYNC_SEQ_MASK)) + typedef void (*SpaNotify) (void *data); #define SPA_N_ELEMENTS(arr) (sizeof (arr) / sizeof ((arr)[0])) diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 486c64d7..6d176212 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -33,9 +33,9 @@ typedef struct _SpaNodeEvent SpaNodeEvent; /** * SpaEventType: * @SPA_NODE_EVENT_TYPE_INVALID: invalid event, should be ignored + * @SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: an async operation completed * @SPA_NODE_EVENT_TYPE_PORT_ADDED: a new port is added * @SPA_NODE_EVENT_TYPE_PORT_REMOVED: a port is removed - * @SPA_NODE_EVENT_TYPE_STATE_CHANGE: emited when the state changes * @SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled * @SPA_NODE_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node * @SPA_NODE_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused @@ -50,9 +50,9 @@ typedef struct _SpaNodeEvent SpaNodeEvent; */ typedef enum { SPA_NODE_EVENT_TYPE_INVALID = 0, + SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE, SPA_NODE_EVENT_TYPE_PORT_ADDED, SPA_NODE_EVENT_TYPE_PORT_REMOVED, - SPA_NODE_EVENT_TYPE_STATE_CHANGE, SPA_NODE_EVENT_TYPE_HAVE_OUTPUT, SPA_NODE_EVENT_TYPE_NEED_INPUT, SPA_NODE_EVENT_TYPE_REUSE_BUFFER, @@ -74,6 +74,11 @@ struct _SpaNodeEvent { }; typedef struct { + uint32_t seq; + SpaResult res; +} SpaNodeEventAsyncComplete; + +typedef struct { uint32_t port_id; } SpaNodeEventPortAdded; diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index 332d9310..c44d2dc7 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -215,10 +215,13 @@ struct _SpaNode { * * Send a command to @node. * + * Upon completion, a command might change the state of a node. + * * Returns: #SPA_RESULT_OK on success * #SPA_RESULT_INVALID_ARGUMENTS when node or command is %NULL * #SPA_RESULT_NOT_IMPLEMENTED when this node can't process commands * #SPA_RESULT_INVALID_COMMAND @command is an invalid command + * #SPA_RESULT_ASYNC @command is executed asynchronously */ SpaResult (*send_command) (SpaNode *node, SpaNodeCommand *command); @@ -339,14 +342,20 @@ struct _SpaNode { * * This function takes a copy of the format. * + * Upon completion, this function might change the state of a node to + * the READY state or to CONFIGURE when @format is NULL. + * * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_OK_RECHECK on success * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_PORT when port_id is not valid * #SPA_RESULT_INVALID_MEDIA_TYPE when the media type is not valid * #SPA_RESULT_INVALID_FORMAT_PROPERTIES when one of the mandatory format - * properties is not specified. + * properties is not specified and #SPA_PORT_FORMAT_FLAG_FIXATE was + * not set in @flags. * #SPA_RESULT_WRONG_PROPERTY_TYPE when the type or size of a property * is not correct. + * #SPA_RESULT_ASYNC the function is executed asynchronously */ SpaResult (*port_set_format) (SpaNode *node, uint32_t port_id, @@ -401,7 +410,12 @@ struct _SpaNode { * Passing %NULL as @buffers will remove the reference that the port has * on the buffers. * + * Upon completion, this function might change the state of the + * node to PAUSED, when the node has enough buffers, or READY when + * @buffers are %NULL. + * * Returns: #SPA_RESULT_OK on success + * #SPA_RESULT_ASYNC the function is executed asynchronously */ SpaResult (*port_use_buffers) (SpaNode *node, uint32_t port_id, diff --git a/spa/lib/control.c b/spa/lib/control.c index 81aa463f..594754dc 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -418,6 +418,8 @@ iter_parse_port_update (struct stack_iter *si, SpaControlCmdPortUpdate *pu) pu->possible_formats[i] = parse_format (p, si->size, SPA_PTR_TO_INT (pu->possible_formats[i])); } + if (pu->format) + pu->format = parse_format (p, si->size, SPA_PTR_TO_INT (pu->format)); if (pu->props) pu->props = parse_props (p, SPA_PTR_TO_INT (pu->props)); @@ -485,6 +487,26 @@ iter_parse_node_command (struct stack_iter *si, SpaControlCmdNodeCommand *cmd) } SpaResult +spa_control_iter_set_data (SpaControlIter *iter, + void *data, + size_t size) +{ + struct stack_iter *si = SCSI (iter); + SpaResult res = SPA_RESULT_OK; + + if (!is_valid_iter (iter)) + return SPA_RESULT_INVALID_ARGUMENTS; + + if (si->size > size) + return SPA_RESULT_INVALID_ARGUMENTS; + + si->size = size; + si->data = data; + + return SPA_RESULT_OK; +} + +SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, void *command) { @@ -514,6 +536,12 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, fprintf (stderr, "implement iter of %d\n", si->cmd); break; + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + if (si->size < sizeof (SpaControlCmdNodeStateChange)) + return SPA_RESULT_ERROR; + memcpy (command, si->data, sizeof (SpaControlCmdNodeStateChange)); + break; + /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: if (si->size < sizeof (SpaControlCmdAddPort)) @@ -572,7 +600,6 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, return res; } - struct stack_builder { size_t magic; @@ -945,8 +972,8 @@ write_format (void *p, const SpaFormat *format) tf = p; tf->media_type = format->media_type; tf->media_subtype = format->media_subtype; - tf->mem.mem.pool_id = SPA_ID_INVALID; - tf->mem.mem.id = SPA_ID_INVALID; + tf->mem.mem.pool_id = 0; + tf->mem.mem.id = 0; tf->mem.offset = 0; tf->mem.size = 0; @@ -1025,6 +1052,7 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) len += pu->n_possible_formats * sizeof (SpaFormat *); for (i = 0; i < pu->n_possible_formats; i++) len += calc_format_len (pu->possible_formats[i]); + len += calc_format_len (pu->format); len += calc_props_len (pu->props); if (pu->info) { len += sizeof (SpaPortInfo); @@ -1051,6 +1079,13 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); p = SPA_MEMBER (p, len, void); } + if (pu->format) { + len = write_format (p, pu->format); + d->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->format = 0; + } if (pu->props) { len = write_props (p, pu->props, sizeof (SpaProps)); d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); @@ -1066,6 +1101,7 @@ builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) d->info = 0; } } + static void builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf) { @@ -1218,6 +1254,11 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, p = builder_add_cmd (sb, cmd, 0); break; + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdNodeStateChange)); + memcpy (p, command, sizeof (SpaControlCmdNodeStateChange)); + break; + /* S -> C */ case SPA_CONTROL_CMD_ADD_PORT: p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddPort)); @@ -1265,7 +1306,6 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, builder_add_node_command (sb, command); break; - default: case SPA_CONTROL_CMD_INVALID: return SPA_RESULT_INVALID_ARGUMENTS; } diff --git a/spa/lib/format.c b/spa/lib/format.c index 0b572b77..4223c118 100644 --- a/spa/lib/format.c +++ b/spa/lib/format.c @@ -36,50 +36,10 @@ spa_format_to_string (const SpaFormat *format, char **result) SpaResult spa_format_fixate (SpaFormat *format) { -#if 0 - unsigned int i, j; - SpaProps *props; - uint32_t mask; -#endif - if (format == NULL) return SPA_RESULT_INVALID_ARGUMENTS; format->props.unset_mask = 0; -#if 0 - props = &format->props; - mask = props->unset_mask; - - for (i = 0; i < props->n_prop_info; i++) { - if (mask & 1) { - const SpaPropInfo *pi = &props->prop_info[i]; - - switch (pi->range_type) { - case SPA_PROP_RANGE_TYPE_NONE: - break; - case SPA_PROP_RANGE_TYPE_MIN_MAX: - break; - case SPA_PROP_RANGE_TYPE_STEP: - break; - case SPA_PROP_RANGE_TYPE_ENUM: - { - for (j = 0; j < pi->n_range_values; j++) { - const SpaPropRangeInfo *ri = &pi->range_values[j]; - memcpy (SPA_MEMBER (props, pi->offset, void), ri->value, ri->size); - SPA_PROPS_INDEX_SET (props, i); - break; - } - break; - } - case SPA_PROP_RANGE_TYPE_FLAGS: - break; - default: - break; - } - } - mask >>= 1; - } -#endif return SPA_RESULT_OK; } diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 9cbe1b16..d799c8c4 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -45,19 +45,7 @@ reset_alsa_sink_props (SpaALSAProps *props) static void update_state (SpaALSASink *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } static const uint32_t min_uint32 = 1; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 89383b57..5bd83f6b 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -32,19 +32,7 @@ typedef struct _SpaALSAState SpaALSASource; static void update_state (SpaALSASource *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } static const char default_device[] = "hw:0"; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index a905e6f1..f5340830 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -131,6 +131,12 @@ spa_audiomixer_node_set_props (SpaNode *node, return res; } +static void +update_state (SpaAudioMixer *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_audiomixer_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -147,31 +153,11 @@ spa_audiomixer_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 1e1ec8e7..b53f1448 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -296,21 +296,7 @@ audiotestsrc_on_output (SpaPollNotifyData *data) static void update_state (SpaAudioTestSrc *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - if (this->event_cb) { - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } } static SpaResult diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index 7012fefc..5f0c0f6a 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -127,6 +127,12 @@ spa_ffmpeg_dec_node_set_props (SpaNode *node, return res; } +static void +update_state (SpaFFMpegDec *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_ffmpeg_dec_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -143,30 +149,11 @@ spa_ffmpeg_dec_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; - case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + case SPA_NODE_COMMAND_PAUSE: + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index 2d97eb94..f683e55e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -86,6 +86,12 @@ static const SpaPropInfo prop_info[] = { 0, }, }; +static void +update_state (SpaFFMpegEnc *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_ffmpeg_enc_node_get_props (SpaNode *node, SpaProps **props) @@ -143,30 +149,11 @@ spa_ffmpeg_enc_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; - case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + case SPA_NODE_COMMAND_PAUSE: + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index ae4b26cf..577cfd9c 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -78,6 +78,8 @@ struct _SpaProxy { unsigned int max_outputs; unsigned int n_outputs; SpaProxyPort ports[MAX_PORTS]; + + uint32_t seq; }; enum { @@ -101,11 +103,12 @@ reset_proxy_props (SpaProxyProps *props) props->socketfd = -1; } -static void +static SpaResult update_poll (SpaProxy *this, int socketfd) { SpaNodeEvent event; SpaProxyProps *p; + SpaResult res = SPA_RESULT_OK; p = &this->props[1]; @@ -124,6 +127,21 @@ update_poll (SpaProxy *this, int socketfd) event.size = sizeof (this->poll); this->event_cb (&this->node, &event, this->user_data); } + return res; +} + +static void +send_async_complete (SpaProxy *this, uint32_t seq, SpaResult res) +{ + SpaNodeEvent event; + SpaNodeEventAsyncComplete ac; + + event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE; + event.data = ∾ + event.size = sizeof (ac); + ac.seq = seq; + ac.res = res; + this->event_cb (&this->node, &event, this->user_data); } static SpaResult @@ -169,7 +187,7 @@ spa_proxy_node_set_props (SpaNode *node, /* compare changes */ if (op->socketfd != np->socketfd) - update_poll (this, np->socketfd); + res = update_poll (this, np->socketfd); /* commit changes */ memcpy (op, np, sizeof (*np)); @@ -182,7 +200,7 @@ spa_proxy_node_send_command (SpaNode *node, SpaNodeCommand *command) { SpaProxy *this; - SpaResult res; + SpaResult res = SPA_RESULT_OK; if (node == NULL || node->handle == NULL || command == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -194,26 +212,10 @@ spa_proxy_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - { - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; - SpaControlCmdNodeCommand cnc; - - /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); - cnc.command = command; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - fprintf (stderr, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); - break; - } - case SPA_NODE_COMMAND_PAUSE: + case SPA_NODE_COMMAND_FLUSH: + case SPA_NODE_COMMAND_DRAIN: + case SPA_NODE_COMMAND_MARKER: { SpaControlBuilder builder; SpaControl control; @@ -222,6 +224,7 @@ spa_proxy_node_send_command (SpaNode *node, /* send start */ spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + cnc.seq = this->seq++; cnc.command = command; spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); spa_control_builder_end (&builder, &control); @@ -230,14 +233,11 @@ spa_proxy_node_send_command (SpaNode *node, fprintf (stderr, "proxy %p: error writing control %d\n", this, res); spa_control_clear (&control); + + res = SPA_RESULT_RETURN_ASYNC (cnc.seq); break; } - case SPA_NODE_COMMAND_FLUSH: - case SPA_NODE_COMMAND_DRAIN: - case SPA_NODE_COMMAND_MARKER: - return SPA_RESULT_NOT_IMPLEMENTED; - case SPA_NODE_COMMAND_CLOCK_UPDATE: { SpaControlBuilder builder; @@ -258,7 +258,7 @@ spa_proxy_node_send_command (SpaNode *node, break; } } - return SPA_RESULT_OK; + return res; } static SpaResult @@ -502,7 +502,7 @@ spa_proxy_node_port_set_format (SpaNode *node, if (node == NULL || node->handle == NULL) return SPA_RESULT_INVALID_ARGUMENTS; - this = (SpaProxy *) node->handle; + this = (SpaProxy *) node->handle; if (!CHECK_PORT_ID (this, port_id)) return SPA_RESULT_INVALID_PORT; @@ -510,6 +510,7 @@ spa_proxy_node_port_set_format (SpaNode *node, port = &this->ports[port_id]; spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); + sf.seq = this->seq++; sf.port_id = port_id; sf.format = (SpaFormat *) format; spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_SET_FORMAT, &sf); @@ -520,13 +521,9 @@ spa_proxy_node_port_set_format (SpaNode *node, spa_control_clear (&control); - if (port->format) - spa_format_unref (port->format); - if (format) - spa_format_ref ((SpaFormat *) format); - port->format = (SpaFormat *)format; + port->format = format; - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (sf.seq); } static SpaResult @@ -705,7 +702,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, if (port->n_buffers == n_buffers && port->buffers == buffers) return SPA_RESULT_OK; - spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, sizeof (fds)); + spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, SPA_N_ELEMENTS (fds)); if (buffers == NULL || n_buffers == 0) { port->buffers = NULL; @@ -717,6 +714,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, for (i = 0; i < port->n_buffers; i++) add_buffer_mem (this, &builder, port_id, port->buffers[i]); + ub.seq = this->seq++; ub.port_id = port_id; ub.n_buffers = port->n_buffers; ub.buffers = port->buffers; @@ -729,7 +727,7 @@ spa_proxy_node_port_use_buffers (SpaNode *node, spa_control_clear (&control); - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (ub.seq); } static SpaResult @@ -885,6 +883,7 @@ spa_proxy_node_port_pull_output (SpaNode *node, for (i = 0; i < n_info; i++) { if (!CHECK_PORT_ID_OUT (this, info[i].port_id)) { + fprintf (stderr, "invalid port %u\n", info[i].port_id); info[i].status = SPA_RESULT_INVALID_PORT; have_error = true; continue; @@ -933,23 +932,11 @@ handle_node_event (SpaProxy *this, { switch (event->type) { case SPA_NODE_EVENT_TYPE_INVALID: - case SPA_NODE_EVENT_TYPE_PORT_ADDED: - case SPA_NODE_EVENT_TYPE_PORT_REMOVED: - this->event_cb (&this->node, event, this->user_data); break; - case SPA_NODE_EVENT_TYPE_STATE_CHANGE: - { - SpaNodeEventStateChange *sc = event->data; - - fprintf (stderr, "proxy %p: got state-change to %d\n", this, sc->state); - if (this->node.state != sc->state) { - this->node.state = sc->state; - this->event_cb (&this->node, event, this->user_data); - } - break; - } - + case SPA_NODE_EVENT_TYPE_PORT_ADDED: + case SPA_NODE_EVENT_TYPE_PORT_REMOVED: + case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: @@ -980,6 +967,7 @@ parse_control (SpaProxy *this, SpaControlCmd cmd = spa_control_iter_get_cmd (&it); switch (cmd) { + case SPA_CONTROL_CMD_INVALID: case SPA_CONTROL_CMD_ADD_PORT: case SPA_CONTROL_CMD_REMOVE_PORT: case SPA_CONTROL_CMD_SET_FORMAT: @@ -999,10 +987,14 @@ parse_control (SpaProxy *this, if (spa_control_iter_parse_cmd (&it, &nu) < 0) break; - this->max_inputs = nu.max_input_ports; - this->max_outputs = nu.max_output_ports; - fprintf (stderr, "proxy %p: got node update %d, %u, %u\n", this, cmd, + if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS) + this->max_inputs = nu.max_input_ports; + if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS) + this->max_outputs = nu.max_output_ports; + + fprintf (stderr, "proxy %p: got node update %d, max_in %u, max_out %u\n", this, cmd, this->max_inputs, this->max_outputs); + break; } @@ -1010,13 +1002,24 @@ parse_control (SpaProxy *this, { SpaControlCmdPortUpdate pu; bool remove; + SpaMemory *mem; + void *data; + size_t size; + + data = spa_control_iter_get_data (&it, &size); + mem = spa_memory_alloc_size (SPA_MEMORY_POOL_LOCAL, data, size); + spa_control_iter_set_data (&it, spa_memory_ensure_ptr (mem), size); fprintf (stderr, "proxy %p: got port update %d\n", this, cmd); - if (spa_control_iter_parse_cmd (&it, &pu) < 0) + if (spa_control_iter_parse_cmd (&it, &pu) < 0) { + spa_memory_unref (&mem->mem); break; + } - if (pu.port_id >= MAX_PORTS) + if (pu.port_id >= MAX_PORTS) { + spa_memory_unref (&mem->mem); break; + } remove = (pu.change_mask == 0); @@ -1033,6 +1036,24 @@ parse_control (SpaProxy *this, fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd); break; } + + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + { + SpaControlCmdNodeStateChange sc; + SpaNodeState old = this->node.state; + + if (spa_control_iter_parse_cmd (&it, &sc) < 0) + break; + + this->node.state = sc.state; + if (old == SPA_NODE_STATE_INIT) + send_async_complete (this, 0, SPA_RESULT_OK); + + fprintf (stderr, "proxy %p: got node state change %d\n", this, this->node.state); + + break; + } + case SPA_CONTROL_CMD_ADD_MEM: break; case SPA_CONTROL_CMD_REMOVE_MEM: @@ -1065,9 +1086,6 @@ parse_control (SpaProxy *this, handle_node_event (this, cne.event); break; } - default: - fprintf (stderr, "proxy %p: command unhandled %d\n", this, cmd); - break; } } spa_control_iter_end (&it); @@ -1184,7 +1202,7 @@ proxy_init (const SpaHandleFactory *factory, this->poll.after_cb = proxy_on_fd_events; this->poll.user_data = this; - return SPA_RESULT_OK; + return SPA_RESULT_RETURN_ASYNC (this->seq++); } static const SpaInterfaceInfo proxy_interfaces[] = diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 061fc635..ea7d6176 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -137,19 +137,7 @@ struct _SpaV4l2Source { static void update_state (SpaV4l2Source *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); } #include "v4l2-utils.c" diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 457e2cdc..efdff82d 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -111,6 +111,7 @@ spa_v4l2_clear_buffers (SpaV4l2Source *this) if (state->alloc_mem) spa_memory_unref (&state->alloc_mem->mem); + state->alloc_mem = NULL; state->have_buffers = false; return SPA_RESULT_OK; diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index fbeb4ef9..cae68705 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -244,21 +244,7 @@ videotestsrc_on_output (SpaPollNotifyData *data) static void update_state (SpaVideoTestSrc *this, SpaNodeState state) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - if (this->node.state == state) - return; - this->node.state = state; - - if (this->event_cb) { - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = state; - this->event_cb (&this->node, &event, this->user_data); - } } static SpaResult diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 7bbe3d27..f18844b5 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -96,6 +96,12 @@ reset_volume_props (SpaVolumeProps *props) props->mute = default_mute; } +static void +update_state (SpaVolume *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_volume_node_get_props (SpaNode *node, SpaProps **props) @@ -152,31 +158,11 @@ spa_volume_node_send_command (SpaNode *node, return SPA_RESULT_INVALID_COMMAND; case SPA_NODE_COMMAND_START: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index fe30dc13..4db0e44b 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -117,6 +117,12 @@ static const SpaPropInfo prop_info[] = NULL }, }; +static void +update_state (SpaXvSink *this, SpaNodeState state) +{ + this->node.state = state; +} + static SpaResult spa_xv_sink_node_get_props (SpaNode *node, SpaProps **props) @@ -176,32 +182,12 @@ spa_xv_sink_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_START: spa_xv_start (this); - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_STREAMING; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_STREAMING); break; case SPA_NODE_COMMAND_PAUSE: spa_xv_stop (this); - if (this->event_cb) { - SpaNodeEvent event; - SpaNodeEventStateChange sc; - - event.type = SPA_NODE_EVENT_TYPE_STATE_CHANGE; - event.data = ≻ - event.size = sizeof (sc); - sc.state = SPA_NODE_STATE_PAUSED; - - this->event_cb (node, &event, this->user_data); - } + update_state (this, SPA_NODE_STATE_PAUSED); break; case SPA_NODE_COMMAND_FLUSH: |