summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2017-10-25 16:33:26 +0200
committerWim Taymans <wtaymans@redhat.com>2017-10-25 16:35:28 +0200
commitb89234513963180e5f92c6e932aa5a80f8a3c0be (patch)
tree4db8086149aa27e73d06a56294364917c59ee8a6
parent4f33a37ac60c5b2641f3abe9f79f5986b41df9d6 (diff)
Use transport area directlyscheduler
Use transport area directly instead of making a copy. Does not quite work because async push and async pull overwrite the status and buffer fields.
-rw-r--r--spa/include/spa/node.h4
-rw-r--r--spa/include/spa/pod-builder.h2
-rw-r--r--src/examples/export-spa.c2
-rw-r--r--src/modules/module-client-node/client-node.c187
-rw-r--r--src/pipewire/node.h2
-rw-r--r--src/pipewire/port.c3
6 files changed, 70 insertions, 130 deletions
diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h
index f4a547ba..126ea174 100644
--- a/spa/include/spa/node.h
+++ b/spa/include/spa/node.h
@@ -137,7 +137,7 @@ struct spa_node_callbacks {
/**
* struct spa_node:
*
- * A struct spa_node is a component that can comsume and produce buffers.
+ * A struct spa_node is a component that can consume and produce buffers.
*
*
*
@@ -585,7 +585,7 @@ struct spa_node {
* output from
*
* - set the status to SPA_RESULT_OK for the port you don't want more
- * buffer from.
+ * buffers from.
*
* Returns: #SPA_RESULT_OK on success or when the node is asynchronous
* #SPA_RESULT_NEED_BUFFER for synchronous nodes when input
diff --git a/spa/include/spa/pod-builder.h b/spa/include/spa/pod-builder.h
index 433b3a67..b5c9999f 100644
--- a/spa/include/spa/pod-builder.h
+++ b/spa/include/spa/pod-builder.h
@@ -101,7 +101,7 @@ spa_pod_builder_raw(struct spa_pod_builder *builder, const void *data, uint32_t
return ref;
}
-static void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size)
+static inline void spa_pod_builder_pad(struct spa_pod_builder *builder, uint32_t size)
{
uint64_t zeroes = 0;
size = SPA_ROUND_UP_N(size, 8) - size;
diff --git a/src/examples/export-spa.c b/src/examples/export-spa.c
index 4a0d9963..9ae770b7 100644
--- a/src/examples/export-spa.c
+++ b/src/examples/export-spa.c
@@ -148,8 +148,6 @@ int main(int argc, char *argv[])
pw_main_loop_run(data.loop);
- if (data.node)
- pw_node_destroy(data.node);
pw_core_destroy(data.core);
pw_main_loop_destroy(data.loop);
diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c
index ca51e700..d8870017 100644
--- a/src/modules/module-client-node/client-node.c
+++ b/src/modules/module-client-node/client-node.c
@@ -34,6 +34,7 @@
#include "pipewire/pipewire.h"
#include "pipewire/interfaces.h"
+#include "pipewire/private.h"
#include "pipewire/core.h"
#include "modules/spa/spa-node.h"
@@ -128,7 +129,6 @@ struct impl {
int other_fds[2];
bool client_reuse;
- bool out_pending;
};
/** \endcond */
@@ -746,99 +746,35 @@ spa_proxy_node_port_send_command(struct spa_node *node,
static int spa_proxy_node_process_input(struct spa_node *node)
{
- struct impl *impl;
- struct proxy *this;
- int i, res = SPA_RESULT_OK;
-
- if (node == NULL)
- return SPA_RESULT_INVALID_ARGUMENTS;
-
- this = SPA_CONTAINER_OF(node, struct proxy, node);
- impl = this->impl;
-
- for (i = 0; i < MAX_INPUTS; i++) {
- struct spa_port_io *io = this->in_ports[i].io;
-
- if (!io)
- continue;
-
- impl->transport->inputs[i] = *io;
- pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id,
- impl->transport->inputs[i].status,
- impl->transport->inputs[i].buffer_id);
+ struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node);
+ struct impl *impl = this->impl;
- if (impl->client_reuse)
- io->buffer_id = SPA_ID_INVALID;
- }
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
do_flush(this);
- return res;
+ return SPA_RESULT_OK;
}
static int spa_proxy_node_process_output(struct spa_node *node)
{
- struct proxy *this;
- struct impl *impl;
- int i, res = SPA_RESULT_OK;
-
- this = SPA_CONTAINER_OF(node, struct proxy, node);
- impl = this->impl;
-
- if (impl->out_pending)
- return res;
-
- impl->out_pending = true;
-
- for (i = 0; i < MAX_OUTPUTS; i++) {
- struct spa_port_io *io = this->out_ports[i].io;
-
- if (!io)
- continue;
-
- impl->transport->outputs[i] = *io;
- pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id,
- impl->transport->outputs[i].status,
- impl->transport->outputs[i].buffer_id);
- }
+ struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node);
+ struct impl *impl = this->impl;
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
do_flush(this);
- return res;
+ return SPA_RESULT_OK;
}
static int handle_node_message(struct proxy *this, struct pw_client_node_message *message)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy);
- int i;
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) {
- for (i = 0; i < MAX_OUTPUTS; i++) {
- struct spa_port_io *io = this->out_ports[i].io;
-
- if (!io)
- continue;
-
- *io = impl->transport->outputs[i];
- pw_log_trace("%d %d", io->status, io->buffer_id);
- impl->out_pending = false;
- }
this->callbacks->have_output(this->callbacks_data);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
- for (i = 0; i < MAX_INPUTS; i++) {
- struct spa_port_io *io = this->in_ports[i].io;
-
- if (!io)
- continue;
-
- *io = impl->transport->inputs[i];
- if (impl->client_reuse)
- io->buffer_id = SPA_ID_INVALID;
- pw_log_trace("%d %d", io->status, io->buffer_id);
- }
this->callbacks->need_input(this->callbacks_data);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
if (impl->client_reuse) {
@@ -851,12 +787,26 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message
return SPA_RESULT_OK;
}
+static void setup_transport(struct impl *impl)
+{
+ uint32_t max_inputs = 0, max_outputs = 0, n_inputs = 0, n_outputs = 0;
+
+ spa_proxy_node_get_n_ports(&impl->proxy.node, &n_inputs, &max_inputs, &n_outputs, &max_outputs);
+
+ impl->transport = pw_client_node_transport_new(max_inputs, max_outputs);
+ impl->transport->area->n_input_ports = n_inputs;
+ impl->transport->area->n_output_ports = n_outputs;
+}
+
static void
client_node_done(void *data, int seq, int res)
{
struct impl *impl = data;
struct proxy *this = &impl->proxy;
+ if (seq == 0 && res == SPA_RESULT_OK)
+ setup_transport(impl);
+
this->callbacks->done(this->callbacks_data, seq, res);
}
@@ -1028,59 +978,6 @@ proxy_init(struct proxy *this,
return SPA_RESULT_RETURN_ASYNC(this->seq++);
}
-static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd)
-{
- struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
-
- if (impl->fds[0] == -1) {
-#if 0
- if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) !=
- 0)
- return SPA_RESULT_ERRNO;
-
- impl->proxy.data_source.fd = impl->fds[0];
- impl->proxy.writefd = impl->fds[0];
- impl->other_fds[0] = impl->fds[1];
- impl->other_fds[1] = impl->fds[1];
-#else
- impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
- impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
- impl->proxy.data_source.fd = impl->fds[0];
- impl->proxy.writefd = impl->fds[1];
- impl->other_fds[0] = impl->fds[1];
- impl->other_fds[1] = impl->fds[0];
-#endif
-
- spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
- pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
- }
- *readfd = impl->other_fds[0];
- *writefd = impl->other_fds[1];
-
- return SPA_RESULT_OK;
-}
-
-static void node_initialized(void *data)
-{
- struct impl *impl = data;
- struct pw_client_node *this = &impl->this;
- struct pw_node *node = this->node;
- int readfd, writefd;
- const struct pw_node_info *i = pw_node_get_info(node);
-
- if (this->resource == NULL)
- return;
-
- impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports);
- impl->transport->area->n_input_ports = i->n_input_ports;
- impl->transport->area->n_output_ports = i->n_output_ports;
-
- client_node_get_fds(this, &readfd, &writefd);
-
- pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)),
- readfd, writefd, impl->transport);
-}
-
static int proxy_clear(struct proxy *this)
{
uint32_t i;
@@ -1113,6 +1010,47 @@ static void client_node_resource_destroy(void *data)
pw_node_destroy(this->node);
}
+static void node_port_init(void *data, struct pw_port *port)
+{
+ struct impl *impl = data;
+
+ switch (port->direction) {
+ case PW_DIRECTION_INPUT:
+ port->rt.port.io = &impl->transport->inputs[port->port_id];
+ port->rt.mix_port.io = &impl->transport->inputs[port->port_id];
+ break;
+ case PW_DIRECTION_OUTPUT:
+ port->rt.port.io = &impl->transport->outputs[port->port_id];
+ port->rt.mix_port.io = &impl->transport->outputs[port->port_id];
+ break;
+ default:
+ break;
+ }
+}
+
+static void node_initialized(void *data)
+{
+ struct impl *impl = data;
+ struct pw_client_node *this = &impl->this;
+ struct pw_node *node = this->node;
+
+ if (this->resource == NULL)
+ return;
+
+ impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ impl->proxy.data_source.fd = impl->fds[0];
+ impl->proxy.writefd = impl->fds[1];
+ impl->other_fds[0] = impl->fds[1];
+ impl->other_fds[1] = impl->fds[0];
+
+ spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source);
+ pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd);
+
+ pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)),
+ impl->other_fds[0], impl->other_fds[1], impl->transport);
+}
+
static void node_free(void *data)
{
struct impl *impl = data;
@@ -1135,6 +1073,7 @@ static void node_free(void *data)
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.free = node_free,
+ .port_init = node_port_init,
.initialized = node_initialized,
};
diff --git a/src/pipewire/node.h b/src/pipewire/node.h
index 5523cc84..f1f26676 100644
--- a/src/pipewire/node.h
+++ b/src/pipewire/node.h
@@ -59,6 +59,8 @@ struct pw_node_events {
/** the node is initialized */
void (*initialized) (void *data);
+ /* a port is about to be added to a node */
+ void (*port_init) (void *data, struct pw_port *port);
/** a port was added */
void (*port_added) (void *data, struct pw_port *port);
/** a port was removed */
diff --git a/src/pipewire/port.c b/src/pipewire/port.c
index a3b3409b..991138c1 100644
--- a/src/pipewire/port.c
+++ b/src/pipewire/port.c
@@ -286,8 +286,9 @@ bool pw_port_add(struct pw_port *port, struct pw_node *node)
node->info.n_output_ports++;
node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS;
}
+ spa_hook_list_call(&node->listener_list, struct pw_node_events, port_init, port);
- spa_node_port_set_io(node->node, port->direction, port_id, &port->io);
+ spa_node_port_set_io(node->node, port->direction, port_id, port->rt.port.io);
port->rt.graph = node->rt.graph;
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port);