diff options
author | Wim Taymans <wtaymans@redhat.com> | 2017-10-25 16:33:26 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2017-10-25 16:35:28 +0200 |
commit | b89234513963180e5f92c6e932aa5a80f8a3c0be (patch) | |
tree | 4db8086149aa27e73d06a56294364917c59ee8a6 | |
parent | 4f33a37ac60c5b2641f3abe9f79f5986b41df9d6 (diff) |
Use transport area directlyscheduler
Use transport area directly instead of making a copy. Does not quite
work because async push and async pull overwrite the status and
buffer fields.
-rw-r--r-- | spa/include/spa/node.h | 4 | ||||
-rw-r--r-- | spa/include/spa/pod-builder.h | 2 | ||||
-rw-r--r-- | src/examples/export-spa.c | 2 | ||||
-rw-r--r-- | src/modules/module-client-node/client-node.c | 187 | ||||
-rw-r--r-- | src/pipewire/node.h | 2 | ||||
-rw-r--r-- | src/pipewire/port.c | 3 |
6 files changed, 70 insertions, 130 deletions
diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index f4a547ba..126ea174 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -137,7 +137,7 @@ struct spa_node_callbacks { /** * struct spa_node: * - * A struct spa_node is a component that can comsume and produce buffers. + * A struct spa_node is a component that can consume and produce buffers. * * * @@ -585,7 +585,7 @@ struct spa_node { * output from * * - set the status to SPA_RESULT_OK for the port you don't want more - * buffer from. + * buffers from. * * Returns: #SPA_RESULT_OK on success or when the node is asynchronous * #SPA_RESULT_NEED_BUFFER for synchronous nodes when input diff --git a/spa/include/spa/pod-builder.h b/spa/include/spa/pod-builder.h index 433b3a67..b5c9999f 100644 --- a/spa/include/spa/pod-builder.h +++ b/spa/include/spa/pod-builder.h @@ -101,7 +101,7 @@ spa_pod_builder_raw(struct spa_pod_builder *builder, const void *data, uint32_t return ref; } -static void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) +static inline void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size) { uint64_t zeroes = 0; size = SPA_ROUND_UP_N(size, 8) - size; diff --git a/src/examples/export-spa.c b/src/examples/export-spa.c index 4a0d9963..9ae770b7 100644 --- a/src/examples/export-spa.c +++ b/src/examples/export-spa.c @@ -148,8 +148,6 @@ int main(int argc, char *argv[]) pw_main_loop_run(data.loop); - if (data.node) - pw_node_destroy(data.node); pw_core_destroy(data.core); pw_main_loop_destroy(data.loop); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ca51e700..d8870017 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -34,6 +34,7 @@ #include "pipewire/pipewire.h" #include "pipewire/interfaces.h" +#include "pipewire/private.h" #include "pipewire/core.h" #include "modules/spa/spa-node.h" @@ -128,7 +129,6 @@ struct impl { int other_fds[2]; bool client_reuse; - bool out_pending; }; /** \endcond */ @@ -746,99 +746,35 @@ spa_proxy_node_port_send_command(struct spa_node *node, static int spa_proxy_node_process_input(struct spa_node *node) { - struct impl *impl; - struct proxy *this; - int i, res = SPA_RESULT_OK; - - if (node == NULL) - return SPA_RESULT_INVALID_ARGUMENTS; - - this = SPA_CONTAINER_OF(node, struct proxy, node); - impl = this->impl; - - for (i = 0; i < MAX_INPUTS; i++) { - struct spa_port_io *io = this->in_ports[i].io; - - if (!io) - continue; - - impl->transport->inputs[i] = *io; - pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, - impl->transport->inputs[i].status, - impl->transport->inputs[i].buffer_id); + struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node); + struct impl *impl = this->impl; - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; - } pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); do_flush(this); - return res; + return SPA_RESULT_OK; } static int spa_proxy_node_process_output(struct spa_node *node) { - struct proxy *this; - struct impl *impl; - int i, res = SPA_RESULT_OK; - - this = SPA_CONTAINER_OF(node, struct proxy, node); - impl = this->impl; - - if (impl->out_pending) - return res; - - impl->out_pending = true; - - for (i = 0; i < MAX_OUTPUTS; i++) { - struct spa_port_io *io = this->out_ports[i].io; - - if (!io) - continue; - - impl->transport->outputs[i] = *io; - pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id, - impl->transport->outputs[i].status, - impl->transport->outputs[i].buffer_id); - } + struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node); + struct impl *impl = this->impl; pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); do_flush(this); - return res; + return SPA_RESULT_OK; } static int handle_node_message(struct proxy *this, struct pw_client_node_message *message) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy); - int i; if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) { - for (i = 0; i < MAX_OUTPUTS; i++) { - struct spa_port_io *io = this->out_ports[i].io; - - if (!io) - continue; - - *io = impl->transport->outputs[i]; - pw_log_trace("%d %d", io->status, io->buffer_id); - impl->out_pending = false; - } this->callbacks->have_output(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { - for (i = 0; i < MAX_INPUTS; i++) { - struct spa_port_io *io = this->in_ports[i].io; - - if (!io) - continue; - - *io = impl->transport->inputs[i]; - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; - pw_log_trace("%d %d", io->status, io->buffer_id); - } this->callbacks->need_input(this->callbacks_data); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { if (impl->client_reuse) { @@ -851,12 +787,26 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message return SPA_RESULT_OK; } +static void setup_transport(struct impl *impl) +{ + uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0; + + spa_proxy_node_get_n_ports(&impl->proxy.node, &n_inputs, &max_inputs, &n_outputs, &max_outputs); + + impl->transport = pw_client_node_transport_new(max_inputs, max_outputs); + impl->transport->area->n_input_ports = n_inputs; + impl->transport->area->n_output_ports = n_outputs; +} + static void client_node_done(void *data, int seq, int res) { struct impl *impl = data; struct proxy *this = &impl->proxy; + if (seq == 0 && res == SPA_RESULT_OK) + setup_transport(impl); + this->callbacks->done(this->callbacks_data, seq, res); } @@ -1028,59 +978,6 @@ proxy_init(struct proxy *this, return SPA_RESULT_RETURN_ASYNC(this->seq++); } -static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - - if (impl->fds[0] == -1) { -#if 0 - if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != - 0) - return SPA_RESULT_ERRNO; - - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[0]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[1]; -#else - impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[1]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[0]; -#endif - - spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); - pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); - } - *readfd = impl->other_fds[0]; - *writefd = impl->other_fds[1]; - - return SPA_RESULT_OK; -} - -static void node_initialized(void *data) -{ - struct impl *impl = data; - struct pw_client_node *this = &impl->this; - struct pw_node *node = this->node; - int readfd, writefd; - const struct pw_node_info *i = pw_node_get_info(node); - - if (this->resource == NULL) - return; - - impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports); - impl->transport->area->n_input_ports = i->n_input_ports; - impl->transport->area->n_output_ports = i->n_output_ports; - - client_node_get_fds(this, &readfd, &writefd); - - pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), - readfd, writefd, impl->transport); -} - static int proxy_clear(struct proxy *this) { uint32_t i; @@ -1113,6 +1010,47 @@ static void client_node_resource_destroy(void *data) pw_node_destroy(this->node); } +static void node_port_init(void *data, struct pw_port *port) +{ + struct impl *impl = data; + + switch (port->direction) { + case PW_DIRECTION_INPUT: + port->rt.port.io = &impl->transport->inputs[port->port_id]; + port->rt.mix_port.io = &impl->transport->inputs[port->port_id]; + break; + case PW_DIRECTION_OUTPUT: + port->rt.port.io = &impl->transport->outputs[port->port_id]; + port->rt.mix_port.io = &impl->transport->outputs[port->port_id]; + break; + default: + break; + } +} + +static void node_initialized(void *data) +{ + struct impl *impl = data; + struct pw_client_node *this = &impl->this; + struct pw_node *node = this->node; + + if (this->resource == NULL) + return; + + impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; + + spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); + pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); + + pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), + impl->other_fds[0], impl->other_fds[1], impl->transport); +} + static void node_free(void *data) { struct impl *impl = data; @@ -1135,6 +1073,7 @@ static void node_free(void *data) static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .free = node_free, + .port_init = node_port_init, .initialized = node_initialized, }; diff --git a/src/pipewire/node.h b/src/pipewire/node.h index 5523cc84..f1f26676 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -59,6 +59,8 @@ struct pw_node_events { /** the node is initialized */ void (*initialized) (void *data); + /* a port is about to be added to a node */ + void (*port_init) (void *data, struct pw_port *port); /** a port was added */ void (*port_added) (void *data, struct pw_port *port); /** a port was removed */ diff --git a/src/pipewire/port.c b/src/pipewire/port.c index a3b3409b..991138c1 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -286,8 +286,9 @@ bool pw_port_add(struct pw_port *port, struct pw_node *node) node->info.n_output_ports++; node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS; } + spa_hook_list_call(&node->listener_list, struct pw_node_events, port_init, port); - spa_node_port_set_io(node->node, port->direction, port_id, &port->io); + spa_node_port_set_io(node->node, port->direction, port_id, port->rt.port.io); port->rt.graph = node->rt.graph; pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port); |