diff options
author | Wim Taymans <wtaymans@redhat.com> | 2017-10-27 18:04:00 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2017-10-27 18:04:00 +0200 |
commit | 61555ab3b5bd38163f903716d960d5c2cdc56d4c (patch) | |
tree | d2950e7aec877f3418282ce18b88ab2fbc5cc964 | |
parent | d594444059e05089ea31de23e377ae5ce79a1f6c (diff) |
client-node: Rework scheduling
Only send data to a client when it has sent a NEED_INPUT otherwise
recycle the buffers immediately.
Explicitly recycle buffers when the client is not going to do this.
-rw-r--r-- | src/modules/module-client-node/client-node.c | 63 |
1 files changed, 36 insertions, 27 deletions
diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index c65fa03b..e21296d6 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -115,6 +115,8 @@ struct proxy { struct impl { struct pw_client_node this; + bool client_reuse; + struct pw_core *core; struct pw_type *t; @@ -128,7 +130,7 @@ struct impl { int fds[2]; int other_fds[2]; - bool client_reuse; + uint32_t input_ready; bool out_pending; }; @@ -743,32 +745,39 @@ spa_proxy_node_port_send_command(struct spa_node *node, static int spa_proxy_node_process_input(struct spa_node *node) { - struct impl *impl; - struct proxy *this; - struct spa_graph_node *n; - struct spa_graph_port *p; - - this = SPA_CONTAINER_OF(node, struct proxy, node); - impl = this->impl; - n = &impl->this.node->rt.node; - - spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { - struct spa_port_io *io = p->io; + struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node); + struct impl *impl = this->impl; + struct spa_graph_node *n = &impl->this.node->rt.node; + bool client_reuse = impl->client_reuse; + struct spa_graph_port *p, *pp; + int res; - impl->transport->inputs[p->port_id] = *io; + if (impl->input_ready == 0) { + /* the client is not ready to receive our buffers, recycle them */ + pw_log_trace("node not ready, recycle buffers"); + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) + p->io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else { + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + struct spa_port_io *io = p->io; - pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, - impl->transport->inputs[p->port_id].status, - impl->transport->inputs[p->port_id].buffer_id); + pw_log_trace("set io status to %d %d", io->status, io->buffer_id); + impl->transport->inputs[p->port_id] = *io; - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; - } - pw_client_node_transport_add_message(impl->transport, + /* explicitly recycle buffers when the client is not going to do it */ + if (!client_reuse && (pp = p->peer)) + spa_node_port_reuse_buffer(pp->node->implementation, pp->port_id, io->buffer_id); + } + pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); - do_flush(this); + do_flush(this); - return SPA_RESULT_OK; + impl->input_ready--; + res = SPA_RESULT_OK; + } + return res; } static int spa_proxy_node_process_output(struct spa_node *node) @@ -783,7 +792,7 @@ static int spa_proxy_node_process_output(struct spa_node *node) n = &impl->this.node->rt.node; if (impl->out_pending) - return SPA_RESULT_OK; + goto done; impl->out_pending = true; @@ -797,6 +806,7 @@ static int spa_proxy_node_process_output(struct spa_node *node) impl->transport->outputs[p->port_id].buffer_id); } + done: pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); do_flush(this); @@ -815,17 +825,16 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) { spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { *p->io = impl->transport->outputs[p->port_id]; - pw_log_trace("%d %d", p->io->status, p->io->buffer_id); + pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id); } impl->out_pending = false; this->callbacks->have_output(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { *p->io = impl->transport->inputs[p->port_id]; - if (impl->client_reuse) - p->io->buffer_id = SPA_ID_INVALID; - pw_log_trace("%d %d", p->io->status, p->io->buffer_id); + pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id); } + impl->input_ready++; this->callbacks->need_input(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { if (impl->client_reuse) { |