summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2017-06-30 19:32:11 +0200
committerWim Taymans <wtaymans@redhat.com>2017-06-30 19:36:39 +0200
commitd2f877912aabda296ea16fb12953ad39209b36ab (patch)
treee3a776e1caf9a71cc1de94c3b35840713b87974c
parent7297c188397c5868747ef01fd3fe7c4249938f78 (diff)
Use graph to schedule things
Make real spa_graph nodes and ports and schedule those. This makes it possible to add explicit tee and mixers in the real graph. Rework the way we add and remove ports and nodes from the graph. Remove confusing pw_port_link and merge core with pw_link_new() Move scheduling in separate files, add some more graph-schedulers.
-rw-r--r--pipewire/modules/module-autolink.c4
-rw-r--r--pipewire/modules/module-client-node/client-node.c5
-rw-r--r--pipewire/modules/module-mixer.c2
-rw-r--r--pipewire/server/core.c3
-rw-r--r--pipewire/server/core.h6
-rw-r--r--pipewire/server/link.c181
-rw-r--r--pipewire/server/link.h15
-rw-r--r--pipewire/server/node.c124
-rw-r--r--pipewire/server/node.h6
-rw-r--r--pipewire/server/port.c266
-rw-r--r--pipewire/server/port.h20
-rw-r--r--spa/include/spa/graph-scheduler1.h137
-rw-r--r--spa/include/spa/graph-scheduler2.h153
-rw-r--r--spa/include/spa/graph-scheduler3.h173
-rw-r--r--spa/include/spa/graph.h127
-rw-r--r--spa/include/spa/list.h8
-rw-r--r--spa/tests/test-graph.c13
-rw-r--r--spa/tests/test-mixer.c15
-rw-r--r--spa/tests/test-perf.c15
19 files changed, 845 insertions, 428 deletions
diff --git a/pipewire/modules/module-autolink.c b/pipewire/modules/module-autolink.c
index f4cb6132..02746a2d 100644
--- a/pipewire/modules/module-autolink.c
+++ b/pipewire/modules/module-autolink.c
@@ -159,9 +159,9 @@ static void try_link_port(struct pw_node *node, struct pw_port *port, struct nod
goto error;
if (port->direction == PW_DIRECTION_OUTPUT)
- link = pw_port_link(port, target, NULL, NULL, &error);
+ link = pw_link_new(impl->core, port, target, NULL, NULL, &error);
else
- link = pw_port_link(target, port, NULL, NULL, &error);
+ link = pw_link_new(impl->core, target, port, NULL, NULL, &error);
if (link == NULL)
goto error;
diff --git a/pipewire/modules/module-client-node/client-node.c b/pipewire/modules/module-client-node/client-node.c
index f049054f..2d8f4da3 100644
--- a/pipewire/modules/module-client-node/client-node.c
+++ b/pipewire/modules/module-client-node/client-node.c
@@ -782,7 +782,10 @@ static int spa_proxy_node_process_input(struct spa_node *node)
}
send_have_output(this);
- return SPA_RESULT_OK;
+ if (this->callbacks->need_input)
+ return SPA_RESULT_OK;
+ else
+ return SPA_RESULT_NEED_BUFFER;
}
static int spa_proxy_node_process_output(struct spa_node *node)
diff --git a/pipewire/modules/module-mixer.c b/pipewire/modules/module-mixer.c
index 716d0491..ff133f84 100644
--- a/pipewire/modules/module-mixer.c
+++ b/pipewire/modules/module-mixer.c
@@ -157,7 +157,7 @@ static struct impl *module_new(struct pw_core *core, struct pw_properties *prope
if (op == NULL)
continue;
- pw_port_link(op, ip, NULL, NULL, &error);
+ pw_link_new(core, op, ip, NULL, NULL, &error);
}
return impl;
}
diff --git a/pipewire/server/core.c b/pipewire/server/core.c
index 2aed3baf..9b8c981a 100644
--- a/pipewire/server/core.c
+++ b/pipewire/server/core.c
@@ -291,6 +291,9 @@ struct pw_core *pw_core_new(struct pw_main_loop *main_loop, struct pw_properties
pw_type_init(&this->type);
pw_map_init(&this->objects, 128, 32);
+ spa_graph_init(&this->rt.graph);
+ spa_graph_scheduler_init(&this->rt.sched, &this->rt.graph);
+
spa_debug_set_type_map(this->type.map);
impl->support[0] = SPA_SUPPORT_INIT(SPA_TYPE__TypeMap, this->type.map);
diff --git a/pipewire/server/core.h b/pipewire/server/core.h
index cd9a076c..01950ca9 100644
--- a/pipewire/server/core.h
+++ b/pipewire/server/core.h
@@ -25,6 +25,7 @@ extern "C" {
#endif
#include <spa/log.h>
+#include <spa/graph-scheduler3.h>
struct pw_global;
@@ -180,6 +181,11 @@ struct pw_core {
/** Emited when a global is removed */
PW_SIGNAL(global_removed, (struct pw_listener *listener,
struct pw_core *core, struct pw_global *global));
+
+ struct {
+ struct spa_graph_scheduler sched;
+ struct spa_graph graph;
+ } rt;
};
struct pw_core *
diff --git a/pipewire/server/link.c b/pipewire/server/link.c
index 5f62d7c5..3f085570 100644
--- a/pipewire/server/link.c
+++ b/pipewire/server/link.c
@@ -758,21 +758,59 @@ on_output_async_complete_notify(struct pw_listener *listener,
pw_work_queue_complete(impl->work, node, seq, res);
}
+static int
+do_remove_input(struct spa_loop *loop,
+ bool async, uint32_t seq, size_t size, void *data, void *user_data)
+{
+ struct pw_link *this = user_data;
+ struct pw_port *port = ((struct pw_port **) data)[0];
+ spa_graph_port_remove(port->rt.graph, &this->rt.in_port);
+ return SPA_RESULT_OK;
+}
+
+static void input_remove(struct pw_link *this, struct pw_port *port)
+{
+ struct impl *impl = (struct impl *) this;
+
+ pw_log_debug("link %p: remove input port %p", this, port);
+ pw_signal_remove(&impl->input_port_destroy);
+ pw_signal_remove(&impl->input_async_complete);
+ pw_loop_invoke(port->node->data_loop->loop,
+ do_remove_input, 1, sizeof(struct pw_port*), &port, true, this);
+}
+
+static int
+do_remove_output(struct spa_loop *loop,
+ bool async, uint32_t seq, size_t size, void *data, void *user_data)
+{
+ struct pw_link *this = user_data;
+ struct pw_port *port = ((struct pw_port **) data)[0];
+ spa_graph_port_remove(port->rt.graph, &this->rt.out_port);
+ return SPA_RESULT_OK;
+}
+
+static void output_remove(struct pw_link *this, struct pw_port *port)
+{
+ struct impl *impl = (struct impl *) this;
+
+ pw_log_debug("link %p: remove output port %p", this, port);
+ pw_signal_remove(&impl->output_port_destroy);
+ pw_signal_remove(&impl->output_async_complete);
+ pw_loop_invoke(port->node->data_loop->loop,
+ do_remove_output, 1, sizeof(struct pw_port*), &port, true, this);
+}
+
static void on_port_destroy(struct pw_link *this, struct pw_port *port)
{
struct impl *impl = (struct impl *) this;
struct pw_port *other;
if (port == this->input) {
- pw_log_debug("link %p: input port destroyed %p", this, port);
- pw_signal_remove(&impl->input_port_destroy);
- pw_signal_remove(&impl->input_async_complete);
+ input_remove(this, port);
this->input = NULL;
other = this->output;
} else if (port == this->output) {
- pw_log_debug("link %p: output port destroyed %p", this, port);
- pw_signal_remove(&impl->output_port_destroy);
- pw_signal_remove(&impl->output_async_complete);
+ output_remove(this, port);
this->output = NULL;
other = this->input;
} else
@@ -784,6 +822,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port)
pw_log_debug("link %p: clear input allocated buffers on port %p", this, other);
pw_port_use_buffers(other, NULL, 0);
+ impl->buffer_owner = NULL;
}
pw_signal_emit(&this->port_unlinked, this, port);
@@ -863,18 +902,54 @@ link_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers
return SPA_RESULT_NO_MEMORY;
}
+static int
+do_add_link(struct spa_loop *loop,
+ bool async, uint32_t seq, size_t size, void *data, void *user_data)
+{
+ struct pw_link *this = user_data;
+ struct pw_port *port = ((struct pw_port **) data)[0];
+
+ if (port->direction == PW_DIRECTION_OUTPUT) {
+ spa_graph_port_add(port->rt.graph,
+ &port->rt.mix_node,
+ &this->rt.out_port,
+ PW_DIRECTION_OUTPUT,
+ this->rt.out_port.port_id,
+ 0,
+ &this->io);
+ } else {
+ spa_graph_port_add(port->rt.graph,
+ &port->rt.mix_node,
+ &this->rt.in_port,
+ PW_DIRECTION_INPUT,
+ this->rt.in_port.port_id,
+ 0,
+ &this->io);
+ }
+
+ return SPA_RESULT_OK;
+}
+
struct pw_link *pw_link_new(struct pw_core *core,
struct pw_port *output,
struct pw_port *input,
struct spa_format *format_filter,
- struct pw_properties *properties)
+ struct pw_properties *properties,
+ char **error)
{
struct impl *impl;
struct pw_link *this;
+ struct pw_node *input_node, *output_node;
+
+ if (output == input)
+ goto same_ports;
+
+ if (pw_link_find(output, input))
+ goto link_exists;
impl = calloc(1, sizeof(struct impl));
if (impl == NULL)
- return NULL;
+ goto no_mem;
this = &impl->this;
pw_log_debug("link %p: new", this);
@@ -888,6 +963,9 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->input = input;
this->output = output;
+ input_node = input->node;
+ output_node = output->node;
+
spa_list_init(&this->resource_list);
pw_signal_init(&this->port_unlinked);
pw_signal_init(&this->state_changed);
@@ -895,21 +973,33 @@ struct pw_link *pw_link_new(struct pw_core *core,
impl->format_filter = format_filter;
- pw_signal_add(&this->input->destroy_signal,
+ pw_signal_add(&input->destroy_signal,
&impl->input_port_destroy, on_input_port_destroy);
- pw_signal_add(&this->input->node->async_complete,
+ pw_signal_add(&input_node->async_complete,
&impl->input_async_complete, on_input_async_complete_notify);
- pw_signal_add(&this->output->destroy_signal,
+ pw_signal_add(&output->destroy_signal,
&impl->output_port_destroy, on_output_port_destroy);
- pw_signal_add(&this->output->node->async_complete,
+ pw_signal_add(&output_node->async_complete,
&impl->output_async_complete, on_output_async_complete_notify);
pw_log_debug("link %p: constructed %p:%d -> %p:%d", impl,
- this->output->node, this->output->port_id,
- this->input->node, this->input->port_id);
+ output_node, output->port_id, input_node, input->port_id);
+
+ input_node->live = output_node->live;
+ if (output_node->clock)
+ input_node->clock = output_node->clock;
+
+ pw_log_debug("link %p: output node %p clock %p, live %d", this, output_node, output_node->clock,
+ output_node->live);
+
+ spa_list_insert(output->links.prev, &this->output_link);
+ spa_list_insert(input->links.prev, &this->input_link);
+
+ output_node->n_used_output_links++;
+ input_node->n_used_input_links++;
spa_list_insert(core->link_list.prev, &this->link);
@@ -922,7 +1012,26 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->info.input_port_id = input ? input->port_id : -1;
this->info.format = NULL;
+ spa_graph_port_link(output_node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port);
+
+ pw_loop_invoke(output_node->data_loop->loop,
+ do_add_link,
+ SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this);
+ pw_loop_invoke(input_node->data_loop->loop,
+ do_add_link,
+ SPA_ID_INVALID, sizeof(struct pw_port *), &input, false, this);
+
return this;
+
+ same_ports:
+ asprintf(error, "can't link the same ports");
+ return NULL;
+ link_exists:
+ asprintf(error, "link already exists");
+ return NULL;
+ no_mem:
+ asprintf(error, "no memory");
+ return NULL;
}
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
@@ -933,22 +1042,6 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
pw_port_use_buffers(port, NULL, 0);
}
-static int
-do_link_remove(struct spa_loop *loop,
- bool async, uint32_t seq, size_t size, void *data, void *user_data)
-{
- struct pw_link *this = user_data;
-
- if (this->rt.input) {
- spa_list_remove(&this->rt.input_link);
- this->rt.input = NULL;
- }
- if (this->rt.output) {
- spa_list_remove(&this->rt.output_link);
- this->rt.output = NULL;
- }
- return SPA_RESULT_OK;
-}
void pw_link_destroy(struct pw_link *link)
{
@@ -965,21 +1058,8 @@ void pw_link_destroy(struct pw_link *link)
pw_resource_destroy(resource);
if (link->input) {
- pw_signal_remove(&impl->input_port_destroy);
- pw_signal_remove(&impl->input_async_complete);
-
- pw_loop_invoke(link->input->node->data_loop->loop,
- do_link_remove, 1, 0, NULL, true, link);
- }
- if (link->output) {
- pw_signal_remove(&impl->output_port_destroy);
- pw_signal_remove(&impl->output_async_complete);
-
- pw_loop_invoke(link->output->node->data_loop->loop,
- do_link_remove, 2, 0, NULL, true, link);
- }
+ input_remove(link, link->input);
- if (link->input) {
spa_list_remove(&link->input_link);
link->input->node->n_used_input_links--;
@@ -993,6 +1073,8 @@ void pw_link_destroy(struct pw_link *link)
link->input = NULL;
}
if (link->output) {
+ output_remove(link, link->output);
+
spa_list_remove(&link->output_link);
link->output->node->n_used_output_links--;
@@ -1016,3 +1098,14 @@ void pw_link_destroy(struct pw_link *link)
free(impl);
}
+
+struct pw_link *pw_link_find(struct pw_port *output_port, struct pw_port *input_port)
+{
+ struct pw_link *pl;
+
+ spa_list_for_each(pl, &output_port->links, output_link) {
+ if (pl->input == input_port)
+ return pl;
+ }
+ return NULL;
+}
diff --git a/pipewire/server/link.h b/pipewire/server/link.h
index d9be699d..f55d2258 100644
--- a/pipewire/server/link.h
+++ b/pipewire/server/link.h
@@ -68,19 +68,20 @@ struct pw_link {
struct spa_list resource_list; /**< list of bound resources */
+ struct spa_port_io io; /**< link io area */
+
struct pw_port *output; /**< output port */
struct spa_list output_link; /**< link in output port links */
struct pw_port *input; /**< input port */
struct spa_list input_link; /**< link in input port links */
+
/** Emited when the port is unlinked */
PW_SIGNAL(port_unlinked, (struct pw_listener *listener,
struct pw_link *link, struct pw_port *port));
struct {
- struct pw_port *input;
- struct pw_port *output;
- struct spa_list input_link;
- struct spa_list output_link;
+ struct spa_graph_port out_port;
+ struct spa_graph_port in_port;
} rt;
};
@@ -92,11 +93,15 @@ pw_link_new(struct pw_core *core, /**< the core object */
struct pw_port *output, /**< an output port */
struct pw_port *input, /**< an input port */
struct spa_format *format_filter, /**< an optional format filter */
- struct pw_properties *properties /**< extra properties */);
+ struct pw_properties *properties /**< extra properties */,
+ char **error /**< error string */);
/** Destroy a link \memberof pw_link */
void pw_link_destroy(struct pw_link *link);
+/** Find the link between 2 ports \memberof pw_link */
+struct pw_link * pw_link_find(struct pw_port *output, struct pw_port *input);
+
/** Activate a link \memberof pw_link
* Starts the negotiation of formats and buffers on \a link and then
* starts data streaming */
diff --git a/pipewire/server/node.c b/pipewire/server/node.c
index f69cd8f8..a2daafa9 100644
--- a/pipewire/server/node.c
+++ b/pipewire/server/node.c
@@ -21,6 +21,8 @@
#include <stdlib.h>
#include <errno.h>
+#include <spa/graph-scheduler3.h>
+
#include "pipewire/client/pipewire.h"
#include "pipewire/client/interfaces.h"
@@ -237,63 +239,6 @@ static void send_clock_update(struct pw_node *this)
pw_log_debug("got error %d", res);
}
-static int do_pull(struct pw_node *this)
-{
- int res = SPA_RESULT_OK;
- struct pw_port *inport;
- bool have_output = false;
-
- spa_list_for_each(inport, &this->input_ports, link) {
- struct pw_link *link;
- struct pw_port *outport;
- struct spa_port_io *pi;
- struct spa_port_io *po;
-
- pi = &inport->io;
- pw_log_trace("node %p: need input port %d, %d %d", this,
- inport->port_id, pi->buffer_id, pi->status);
-
- if (pi->status != SPA_RESULT_NEED_BUFFER)
- continue;
-
- spa_list_for_each(link, &inport->rt.links, rt.input_link) {
- if (link->rt.input == NULL || link->rt.output == NULL)
- continue;
-
- outport = link->rt.output;
- po = &outport->io;
-
- /* pull */
- *po = *pi;
- pi->buffer_id = SPA_ID_INVALID;
-
- pw_log_trace("node %p: process output %p %d", outport->node, po,
- po->buffer_id);
-
- res = spa_node_process_output(outport->node->node);
-
- if (res == SPA_RESULT_NEED_BUFFER) {
- res = do_pull(outport->node);
- pw_log_trace("node %p: pull return %d", outport->node, res);
- }
- if (res == SPA_RESULT_HAVE_BUFFER) {
- *pi = *po;
- pw_log_trace("node %p: have output %d %d", this, pi->status,
- pi->buffer_id);
- have_output = true;
- } else if (res < 0) {
- pw_log_warn("node %p: got process output %d", outport->node, res);
- }
-
- }
- }
- if (have_output) {
- pw_log_trace("node %p: doing process input", this);
- res = spa_node_process_input(this->node);
- }
- return res;
-}
-
static void on_node_done(struct spa_node *node, int seq, int res, void *user_data)
{
struct impl *impl = user_data;
@@ -319,49 +264,24 @@ static void on_node_need_input(struct spa_node *node, void *user_data)
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
- do_pull(this);
+ spa_graph_scheduler_pull(this->rt.sched, &this->rt.node);
+ while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void on_node_have_output(struct spa_node *node, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
- int res;
- struct pw_port *outport;
-
- spa_list_for_each(outport, &this->output_ports, link) {
- struct pw_link *link;
- struct spa_port_io *po;
-
- po = &outport->io;
- if (po->buffer_id == SPA_ID_INVALID)
- continue;
-
- pw_log_trace("node %p: have output %d", this, po->buffer_id);
-
- spa_list_for_each(link, &outport->rt.links, rt.output_link) {
- struct pw_port *inport;
-
- if (link->rt.input == NULL || link->rt.output == NULL)
- continue;
-
- inport = link->rt.input;
- inport->io = *po;
- pw_log_trace("node %p: do process input %d", this, po->buffer_id);
-
- if ((res = spa_node_process_input(inport->node->node)) < 0)
- pw_log_warn("node %p: got process input %d", inport->node, res);
-
- }
- po->status = SPA_RESULT_NEED_BUFFER;
- }
- res = spa_node_process_output(this->node);
+ spa_graph_scheduler_push(this->rt.sched, &this->rt.node);
+ while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void
on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data)
{
+
+#if 0
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
struct pw_port *inport;
@@ -380,6 +300,7 @@ on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id
outport->io.buffer_id = buffer_id;
}
}
+#endif
}
static void node_unbind_func(void *data)
@@ -476,11 +397,17 @@ static void init_complete(struct pw_node *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
+ spa_graph_node_add(this->rt.sched->graph,
+ &this->rt.node,
+ spa_graph_scheduler_default,
+ this->node);
+
update_port_ids(this);
pw_log_debug("node %p: init completed", this);
impl->async_init = false;
spa_list_insert(this->core->node_list.prev, &this->link);
+
pw_core_add_global(this->core,
this->owner,
this->core->type.node, 0, this, node_bind_func, &this->global);
@@ -529,6 +456,8 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->clock = clock;
this->data_loop = core->data_loop;
+ this->rt.sched = &core->rt.sched;
+
spa_list_init(&this->resource_list);
if (spa_node_set_callbacks(this->node, &node_callbacks, impl) < 0)
@@ -585,26 +514,11 @@ do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_node *this = user_data;
- struct pw_port *port, *tmp;
pause_node(this);
- spa_list_for_each_safe(port, tmp, &this->input_ports, link) {
- struct pw_link *link, *tlink;
- spa_list_for_each_safe(link, tlink, &port->rt.links, rt.input_link) {
- pw_port_pause_rt(link->rt.input);
- spa_list_remove(&link->rt.input_link);
- link->rt.input = NULL;
- }
- }
- spa_list_for_each_safe(port, tmp, &this->output_ports, link) {
- struct pw_link *link, *tlink;
- spa_list_for_each_safe(link, tlink, &port->rt.links, rt.output_link) {
- pw_port_pause_rt(link->rt.output);
- spa_list_remove(&link->rt.output_link);
- link->rt.output = NULL;
- }
- }
+ spa_graph_node_remove(this->rt.sched->graph, &this->rt.node);
+
return SPA_RESULT_OK;
}
diff --git a/pipewire/server/node.h b/pipewire/server/node.h
index c8ca0552..88b916e3 100644
--- a/pipewire/server/node.h
+++ b/pipewire/server/node.h
@@ -106,6 +106,12 @@ struct pw_node {
struct pw_node *node, uint32_t seq, int res));
struct pw_data_loop *data_loop; /**< the data loop for this node */
+
+ struct {
+ struct spa_graph_scheduler *sched;
+ struct spa_graph_node node;
+ } rt;
+
};
/** Create a new node \memberof pw_node */
diff --git a/pipewire/server/port.c b/pipewire/server/port.c
index cc7a2320..bfa0b3e6 100644
--- a/pipewire/server/port.c
+++ b/pipewire/server/port.c
@@ -33,6 +33,96 @@ struct impl {
};
/** \endcond */
+
+static int schedule_tee(struct spa_graph_node *node)
+{
+ int res;
+ struct pw_port *this = node->user_data;
+ struct spa_graph_port *p;
+ struct spa_port_io *io = this->rt.mix_port.io;
+
+ if (node->action == SPA_GRAPH_ACTION_IN) {
+ if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) {
+ io->status = SPA_RESULT_NEED_BUFFER;
+ res = SPA_RESULT_NEED_BUFFER;
+ }
+ else {
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
+ *p->io = *io;
+ io->status = SPA_RESULT_OK;
+ io->buffer_id = SPA_ID_INVALID;
+ res = SPA_RESULT_HAVE_BUFFER;
+ }
+ }
+ else if (node->action == SPA_GRAPH_ACTION_OUT) {
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
+ *io = *p->io;
+ io->status = SPA_RESULT_NEED_BUFFER;
+ res = SPA_RESULT_NEED_BUFFER;
+ }
+ else
+ res = SPA_RESULT_ERROR;
+
+ return res;
+}
+
+static int schedule_mix(struct spa_graph_node *node)
+{
+ int res;
+ struct pw_port *this = node->user_data;
+ struct spa_graph_port *p;
+ struct spa_port_io *io = this->rt.mix_port.io;
+
+ if (node->action == SPA_GRAPH_ACTION_IN) {
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
+ *io = *p->io;
+ p->io->status = SPA_RESULT_OK;
+ p->io->buffer_id = SPA_ID_INVALID;
+ }
+ res = SPA_RESULT_HAVE_BUFFER;
+ }
+ else if (node->action == SPA_GRAPH_ACTION_OUT) {
+ io->status = SPA_RESULT_NEED_BUFFER;
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link)
+ *p->io = *io;
+ res = SPA_RESULT_NEED_BUFFER;
+ }
+ else
+ res = SPA_RESULT_ERROR;
+
+ return res;
+}
+
+static int do_add_port(struct spa_loop *loop,
+ bool async, uint32_t seq, size_t size, void *data, void *user_data)
+{
+ struct pw_port *this = user_data;
+
+ spa_graph_port_add(this->rt.graph,
+ &this->node->rt.node,
+ &this->rt.port,
+ this->direction,
+ this->port_id,
+ 0,
+ &this->io);
+ spa_graph_node_add(this->rt.graph,
+ &this->rt.mix_node,
+ this->direction == PW_DIRECTION_INPUT ? schedule_mix : schedule_tee,
+ this);
+ spa_graph_port_add(this->rt.graph,
+ &this->rt.mix_node,
+ &this->rt.mix_port,
+ pw_direction_reverse(this->direction),
+ 0,
+ 0,
+ &this->io);
+ spa_graph_port_link(this->rt.graph,
+ &this->rt.port,
+ &this->rt.mix_port);
+
+ return SPA_RESULT_OK;
+}
+
struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id)
{
struct impl *impl;
@@ -51,18 +141,46 @@ struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, u
this->io.buffer_id = SPA_ID_INVALID;
spa_list_init(&this->links);
- spa_list_init(&this->rt.links);
+
pw_signal_init(&this->destroy_signal);
+ this->rt.graph = node->rt.sched->graph;
+
+ pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, this);
+
return this;
}
+static int do_remove_port(struct spa_loop *loop,
+ bool async, uint32_t seq, size_t size, void *data, void *user_data)
+{
+ struct pw_port *this = user_data;
+ struct spa_graph_port *p;
+
+ spa_graph_port_unlink(this->rt.graph,
+ &this->rt.port);
+ spa_graph_port_remove(this->rt.graph,
+ &this->rt.port);
+
+ spa_list_for_each(p, &this->rt.mix_node.ports[this->direction], link)
+ spa_graph_port_remove(this->rt.graph, p);
+
+ spa_graph_port_remove(this->rt.graph,
+ &this->rt.mix_port);
+ spa_graph_node_remove(this->rt.graph,
+ &this->rt.mix_node);
+
+ return SPA_RESULT_OK;
+}
+
void pw_port_destroy(struct pw_port *port)
{
pw_log_debug("port %p: destroy", port);
pw_signal_emit(&port->destroy_signal, port);
+ pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port);
+
spa_list_remove(&port->link);
free(port);
@@ -76,95 +194,6 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state)
}
}
-static int
-do_add_link(struct spa_loop *loop,
- bool async, uint32_t seq, size_t size, void *data, void *user_data)
-{
- struct pw_port *this = user_data;
- struct pw_link *link = ((struct pw_link **) data)[0];
-
- if (this->direction == PW_DIRECTION_INPUT) {
- spa_list_insert(this->rt.links.prev, &link->rt.input_link);
- link->rt.input = this;
- } else {
- spa_list_insert(this->rt.links.prev, &link->rt.output_link);
- link->rt.output = this;
- }
-
- return SPA_RESULT_OK;
-}
-
-static struct pw_link *find_link(struct pw_port *output_port, struct pw_port *input_port)
-{
- struct pw_link *pl;
-
- spa_list_for_each(pl, &output_port->links, output_link) {
- if (pl->input == input_port)
- return pl;
- }
- return NULL;
-}
-
-struct pw_link *pw_port_link(struct pw_port *output_port,
- struct pw_port *input_port,
- struct spa_format *format_filter,
- struct pw_properties *properties,
- char **error)
-{
- struct pw_node *input_node, *output_node;
- struct pw_link *link;
-
- output_node = output_port->node;
- input_node = input_port->node;
-
- pw_log_debug("port link %p:%u -> %p:%u", output_node, output_port->port_id, input_node,
- input_port->port_id);
-
- if (output_node == input_node)
- goto same_node;
-
- if (!spa_list_is_empty(&input_port->links))
- goto was_linked;
-
- link = find_link(output_port, input_port);
-
- if (link == NULL) {
- input_node->live = output_node->live;
- if (output_node->clock)
- input_node->clock = output_node->clock;
- pw_log_debug("node %p: clock %p, live %d", output_node, output_node->clock,
- output_node->live);
-
- link = pw_link_new(output_node->core,
- output_port, input_port, format_filter, properties);
- if (link == NULL)
- goto no_mem;
-
- spa_list_insert(output_port->links.prev, &link->output_link);
- spa_list_insert(input_port->links.prev, &link->input_link);
-
- output_node->n_used_output_links++;
- input_node->n_used_input_links++;
-
- pw_loop_invoke(output_node->data_loop->loop,
- do_add_link,
- SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, output_port);
- pw_loop_invoke(input_node->data_loop->loop,
- do_add_link,
- SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, input_port);
- }
- return link;
-
- same_node:
- asprintf(error, "can't link a node to itself");
- return NULL;
- was_linked:
- asprintf(error, "input port was already linked");
- return NULL;
- no_mem:
- return NULL;
-}
-
int pw_port_pause_rt(struct pw_port *port)
{
int res;
@@ -182,63 +211,6 @@ int pw_port_pause_rt(struct pw_port *port)
}
static int
-do_remove_link(struct spa_loop *loop,
- bool async, uint32_t seq, size_t size, void *data, void *user_data)
-{
- struct pw_port *port = user_data;
- struct pw_link *link = ((struct pw_link **) data)[0];
-
- if (port->direction == PW_DIRECTION_INPUT) {
- pw_port_pause_rt(link->rt.input);
- spa_list_remove(&link->rt.input_link);
- link->rt.input = NULL;
- } else {
- pw_port_pause_rt(link->rt.output);
- spa_list_remove(&link->rt.output_link);
- link->rt.output = NULL;
- }
- return SPA_RESULT_OK;
-}
-
-int pw_port_unlink(struct pw_port *port, struct pw_link *link)
-{
- int res;
- struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
- struct pw_node *node = port->node;
-
- pw_log_debug("port %p: start unlink %p", port, link);
-
- res = pw_loop_invoke(node->data_loop->loop,
- do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port);
-
- if (port->state > PW_PORT_STATE_PAUSED)
- port_update_state (port, PW_PORT_STATE_PAUSED);
-
- pw_log_debug("port %p: finish unlink", port);
- if (port->direction == PW_DIRECTION_OUTPUT) {
- if (link->output) {
- spa_list_remove(&link->output_link);
- node->n_used_output_links--;
- link->output = NULL;
- }
- } else {
- if (link->input) {
- spa_list_remove(&link->input_link);
- node->n_used_input_links--;
- link->input = NULL;
- }
- }
-
- if (!port->allocated)
- pw_port_use_buffers(port, NULL, 0);
-
- if (node->n_used_output_links == 0 && node->n_used_input_links == 0)
- pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL);
-
- return res;
-}
-
-static int
do_port_pause(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
diff --git a/pipewire/server/port.h b/pipewire/server/port.h
index bcd0babe..c3184db9 100644
--- a/pipewire/server/port.h
+++ b/pipewire/server/port.h
@@ -75,8 +75,10 @@ struct pw_port {
void *multiplex; /**< optional port buffer mix/split */
struct {
- struct spa_list links; /**< list of \ref pw_link only accessed from the
- * data thread */
+ struct spa_graph *graph;
+ struct spa_graph_port port;
+ struct spa_graph_port mix_port;
+ struct spa_graph_node mix_node;
} rt; /**< data only accessed from the data thread */
};
@@ -88,20 +90,6 @@ pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id)
/** Destroy a port \memberof pw_port */
void pw_port_destroy(struct pw_port *port);
-/** Link two ports with an optional filter \memberof pw_port
- * \return a newly allocated \ref pw_link or NULL and \a error is set.
- *
- * If the ports were already linked, the existing link will be returned. */
-struct pw_link *
-pw_port_link(struct pw_port *output_port, /**< output port */
- struct pw_port *input_port, /**< input port */
- struct spa_format *format_filter, /**< optional filter */
- struct pw_properties *properties, /**< extra properties */
- char **error /**< result error message or NULL */);
-
-/** Unlink a port \memberof pw_port */
-int pw_port_unlink(struct pw_port *port, struct pw_link *link);
-
/** Set a format on a port \memberof pw_port */
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format);
diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h
new file mode 100644
index 00000000..6d12714c
--- /dev/null
+++ b/spa/include/spa/graph-scheduler1.h
@@ -0,0 +1,137 @@
+/* Simple Plugin API
+ * Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __SPA_GRAPH_SCHEDULER_H__
+#define __SPA_GRAPH_SCHEDULER_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <spa/graph.h>
+
+struct spa_graph_scheduler {
+ struct spa_graph *graph;
+ struct spa_list pending;
+ struct spa_graph_node *node;
+};
+
+static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
+ struct spa_graph *graph)
+{
+ sched->graph = graph;
+ spa_list_init(&sched->pending);
+ sched->node = NULL;
+}
+
+static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
+{
+ int res;
+ struct spa_node *n = node->user_data;
+
+ if (node->action == SPA_GRAPH_ACTION_IN)
+ res = spa_node_process_input(n);
+ else if (node->action == SPA_GRAPH_ACTION_OUT)
+ res = spa_node_process_output(n);
+ else
+ res = SPA_RESULT_ERROR;
+
+ return res;
+}
+
+static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
+{
+ bool res;
+ struct spa_graph *graph = sched->graph;
+ struct spa_graph_port *p;
+ struct spa_graph_node *n;
+
+ res = !spa_list_is_empty(&graph->ready);
+ if (res) {
+ n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
+
+ spa_list_remove(&n->ready_link);
+ n->ready_link.next = NULL;
+
+ debug("node %p action %d state %d\n", n, n->action, n->state);
+
+ switch (n->action) {
+ case SPA_GRAPH_ACTION_IN:
+ case SPA_GRAPH_ACTION_OUT:
+ n->state = n->schedule(n);
+ debug("node %p scheduled action %d state %d\n", n, n->action, n->state);
+ if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node)
+ break;
+ n->action = SPA_GRAPH_ACTION_CHECK;
+ spa_list_insert(graph->ready.prev, &n->ready_link);
+ break;
+
+ case SPA_GRAPH_ACTION_CHECK:
+ if (n->state == SPA_RESULT_NEED_BUFFER) {
+ n->ready_in = 0;
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
+ struct spa_graph_node *pn = p->peer->node;
+ if (p->io->status == SPA_RESULT_NEED_BUFFER) {
+ if (pn != sched->node
+ || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
+ pn->action = SPA_GRAPH_ACTION_OUT;
+ spa_list_insert(graph->ready.prev,
+ &pn->ready_link);
+ }
+ } else if (p->io->status == SPA_RESULT_OK)
+ n->ready_in++;
+ }
+ } else if (n->state == SPA_RESULT_HAVE_BUFFER) {
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
+ spa_graph_port_check(graph, p->peer);
+ }
+ break;
+
+ default:
+ break;
+ }
+ res = !spa_list_is_empty(&graph->ready);
+ }
+ return res;
+}
+
+static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
+{
+ debug("node %p start pull\n", node);
+ node->action = SPA_GRAPH_ACTION_CHECK;
+ node->state = SPA_RESULT_NEED_BUFFER;
+ sched->node = node;
+ if (node->ready_link.next == NULL)
+ spa_list_insert(sched->graph->ready.prev, &node->ready_link);
+}
+
+static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
+{
+ debug("node %p start push\n", node);
+ node->action = SPA_GRAPH_ACTION_OUT;
+ sched->node = node;
+ if (node->ready_link.next == NULL)
+ spa_list_insert(sched->graph->ready.prev, &node->ready_link);
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* __SPA_GRAPH_SCHEDULER_H__ */
diff --git a/spa/include/spa/graph-scheduler2.h b/spa/include/spa/graph-scheduler2.h
new file mode 100644
index 00000000..4615bc50
--- /dev/null
+++ b/spa/include/spa/graph-scheduler2.h
@@ -0,0 +1,153 @@
+/* Simple Plugin API
+ * Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __SPA_GRAPH_SCHEDULER_H__
+#define __SPA_GRAPH_SCHEDULER_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <spa/graph.h>
+
+static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
+{
+ int res;
+ struct spa_node *n = node->user_data;
+
+ if (node->action == SPA_GRAPH_ACTION_IN)
+ res = spa_node_process_input(n);
+ else if (node->action == SPA_GRAPH_ACTION_OUT)
+ res = spa_node_process_output(n);
+ else
+ res = SPA_RESULT_ERROR;
+
+ return res;
+}
+
+static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
+{
+ bool empty;
+ struct spa_graph_port *p;
+ struct spa_graph_node *n;
+ int iter = 1;
+ uint32_t action;
+
+next:
+ empty = spa_list_is_empty(&graph->ready);
+ if (empty && !spa_list_is_empty(&graph->pending)) {
+ debug("copy pending\n");
+ spa_list_insert_list(&graph->ready, &graph->pending);
+ spa_list_init(&graph->pending);
+ empty = false;
+ }
+ if (iter-- == 0 || empty)
+ return !empty;
+
+ n = spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
+ spa_list_remove(&n->ready_link);
+ n->ready_link.next = NULL;
+
+ action = n->action;
+
+ debug("node %p action %d, state %d\n", n, action, n->state);
+
+ switch (action) {
+ case SPA_GRAPH_ACTION_IN:
+ case SPA_GRAPH_ACTION_OUT:
+ case SPA_GRAPH_ACTION_END:
+ if (action == SPA_GRAPH_ACTION_END)
+ n->action = SPA_GRAPH_ACTION_OUT;
+
+ n->state = n->schedule(n);
+ debug("node %p schedule %d res %d\n", n, action, n->state);
+
+ if (action == SPA_GRAPH_ACTION_IN && n == graph->node)
+ break;
+
+ if (action != SPA_GRAPH_ACTION_END) {
+ debug("node %p add ready for CHECK\n", n);
+ n->action = SPA_GRAPH_ACTION_CHECK;
+ spa_list_insert(graph->ready.prev, &n->ready_link);
+ }
+ else {
+ spa_graph_node_update(graph, n);
+ }
+ break;
+
+ case SPA_GRAPH_ACTION_CHECK:
+ if (n->state == SPA_RESULT_NEED_BUFFER) {
+ n->ready_in = 0;
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
+ struct spa_graph_node *pn = p->peer->node;
+ if (p->io->status == SPA_RESULT_NEED_BUFFER) {
+ if (pn != graph->node
+ || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
+ pn->action = SPA_GRAPH_ACTION_OUT;
+ debug("node %p add ready OUT\n", n);
+ spa_list_insert(graph->ready.prev,
+ &pn->ready_link);
+ }
+ } else if (p->io->status == SPA_RESULT_OK)
+ n->ready_in++;
+ }
+ }
+ else if (n->state == SPA_RESULT_HAVE_BUFFER) {
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
+ spa_graph_port_check(graph, p->peer);
+
+ debug("node %p add pending\n", n);
+ n->action = SPA_GRAPH_ACTION_END;
+ spa_list_insert(&graph->pending, &n->ready_link);
+ }
+ else if (n->state == SPA_RESULT_OK) {
+ spa_graph_node_update(graph, n);
+ }
+ break;
+
+ default:
+ break;
+ }
+ goto next;
+}
+
+static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_graph_node *node)
+{
+ node->action = SPA_GRAPH_ACTION_CHECK;
+ node->state = SPA_RESULT_NEED_BUFFER;
+ graph->node = node;
+ debug("node %p start pull\n", node);
+ if (node->ready_link.next == NULL)
+ spa_list_insert(graph->ready.prev, &node->ready_link);
+}
+
+static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_graph_node *node)
+{
+ node->action = SPA_GRAPH_ACTION_OUT;
+ graph->node = node;
+ debug("node %p start push\n", node);
+ if (node->ready_link.next == NULL)
+ spa_list_insert(graph->ready.prev, &node->ready_link);
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* __SPA_GRAPH_SCHEDULER_H__ */
diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h
new file mode 100644
index 00000000..405c51bc
--- /dev/null
+++ b/spa/include/spa/graph-scheduler3.h
@@ -0,0 +1,173 @@
+/* Simple Plugin API
+ * Copyright (C) 2017 Wim Taymans <wim.taymans@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __SPA_GRAPH_SCHEDULER_H__
+#define __SPA_GRAPH_SCHEDULER_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <spa/graph.h>
+
+struct spa_graph_scheduler {
+ struct spa_graph *graph;
+ struct spa_list pending;
+ struct spa_graph_node *node;
+};
+
+static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
+ struct spa_graph *graph)
+{
+ sched->graph = graph;
+ spa_list_init(&sched->pending);
+ sched->node = NULL;
+}
+
+static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
+{
+ int res;
+ struct spa_node *n = node->user_data;
+
+ if (node->action == SPA_GRAPH_ACTION_IN)
+ res = spa_node_process_input(n);
+ else if (node->action == SPA_GRAPH_ACTION_OUT)
+ res = spa_node_process_output(n);
+ else
+ res = SPA_RESULT_ERROR;
+
+ return res;
+}
+
+static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
+{
+ struct spa_graph_port *p;
+ struct spa_graph_node *n, *t;
+ struct spa_list ready;
+
+ debug("node %p start pull\n", node);
+
+ spa_list_init(&ready);
+
+ node->ready_in = 0;
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
+ struct spa_graph_port *pport = p->peer;
+ struct spa_graph_node *pnode = pport->node;
+ debug("node %p peer %p io %d\n", node, pnode, pport->io->status);
+ if (pport->io->status == SPA_RESULT_NEED_BUFFER) {
+ spa_list_insert(ready.prev, &pnode->ready_link);
+ }
+ else if (pport->io->status == SPA_RESULT_OK && !(pnode->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
+ node->ready_in++;
+ }
+
+ spa_list_for_each_safe(n, t, &ready, ready_link) {
+ n->action = SPA_GRAPH_ACTION_OUT;
+ n->state = n->schedule(n);
+ debug("peer %p scheduled %d %d\n", n, n->action, n->state);
+ if (n->state == SPA_RESULT_NEED_BUFFER)
+ spa_graph_scheduler_pull(sched, n);
+ else {
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
+ if (p->io->status == SPA_RESULT_HAVE_BUFFER)
+ node->ready_in++;
+ }
+ }
+ spa_list_remove(&n->ready_link);
+ n->ready_link.next = NULL;
+ }
+
+ debug("node %p %d %d\n", node, node->ready_in, node->required_in);
+
+ if (node->required_in > 0 && node->ready_in == node->required_in) {
+ node->action = SPA_GRAPH_ACTION_IN;
+ node->state = node->schedule(node);
+ debug("node %p scheduled %d %d\n", node, node->action, node->state);
+ if (node->state == SPA_RESULT_HAVE_BUFFER) {
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
+ if (p->io->status == SPA_RESULT_HAVE_BUFFER)
+ p->peer->node->ready_in++;
+ }
+ }
+ }
+}
+
+static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched)
+{
+ return false;
+}
+
+
+static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
+{
+ struct spa_graph_port *p;
+ struct spa_graph_node *n, *t;
+ struct spa_list ready;
+
+ debug("node %p start push\n", node);
+
+ spa_list_init(&ready);
+
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
+ struct spa_graph_port *pport = p->peer;
+ struct spa_graph_node *pnode = pport->node;
+ if (pport->io->status == SPA_RESULT_HAVE_BUFFER)
+ pnode->ready_in++;
+
+ debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status,
+ pnode->ready_in, pnode->required_in);
+
+ if (pnode->required_in > 0 && pnode->ready_in == pnode->required_in)
+ spa_list_insert(ready.prev, &pnode->ready_link);
+ }
+
+ spa_list_for_each_safe(n, t, &ready, ready_link) {
+ n->action = SPA_GRAPH_ACTION_IN;
+ n->state = n->schedule(n);
+ debug("peer %p scheduled %d %d\n", n, n->action, n->state);
+ if (n->state == SPA_RESULT_HAVE_BUFFER)
+ spa_graph_scheduler_push(sched, n);
+ else {
+ n->ready_in = 0;
+ spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
+ if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
+ node->ready_in++;
+ }
+ }
+ spa_list_remove(&n->ready_link);
+ n->ready_link.next = NULL;
+ }
+
+ node->action = SPA_GRAPH_ACTION_OUT;
+ node->state = node->schedule(node);
+ debug("node %p scheduled %d %d\n", node, node->action, node->state);
+ if (node->state == SPA_RESULT_NEED_BUFFER) {
+ node->ready_in = 0;
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
+ if (p->io->status == SPA_RESULT_OK && !(n->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
+ p->peer->node->ready_in++;
+ }
+ }
+}
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* __SPA_GRAPH_SCHEDULER_H__ */
diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h
index b6555661..4a8e9749 100644
--- a/spa/include/spa/graph.h
+++ b/spa/include/spa/graph.h
@@ -26,6 +26,13 @@ extern "C" {
#include <spa/defs.h>
#include <spa/list.h>
+#include <spa/node.h>
+
+#if 0
+#define debug(...) printf(__VA_ARGS__)
+#else
+#define debug(...)
+#endif
struct spa_graph;
struct spa_graph_node;
@@ -34,7 +41,6 @@ struct spa_graph_port;
struct spa_graph {
struct spa_list nodes;
struct spa_list ready;
- struct spa_graph_node *node;
};
typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node);
@@ -73,20 +79,6 @@ static inline void spa_graph_init(struct spa_graph *graph)
spa_list_init(&graph->ready);
}
-static inline int spa_graph_node_schedule_default(struct spa_graph_node *node)
-{
- int res;
- struct spa_node *n = node->user_data;
-
- if (node->action == SPA_GRAPH_ACTION_IN)
- res = spa_node_process_input(n);
- else if (node->action == SPA_GRAPH_ACTION_OUT)
- res = spa_node_process_output(n);
- else
- res = SPA_RESULT_ERROR;
- return res;
-}
-
static inline void
spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
spa_graph_node_func_t schedule, void *user_data)
@@ -94,12 +86,14 @@ spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
spa_list_init(&node->ports[SPA_DIRECTION_INPUT]);
spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]);
node->flags = 0;
- node->state = SPA_RESULT_OK;
+ node->state = SPA_RESULT_NEED_BUFFER;
node->action = SPA_GRAPH_ACTION_OUT;
node->schedule = schedule;
node->user_data = user_data;
+ node->ready_link.next = NULL;
spa_list_insert(graph->nodes.prev, &node->link);
node->max_in = node->required_in = node->ready_in = 0;
+ debug("node %p add\n", node);
}
static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_graph_port *port)
@@ -109,6 +103,8 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready_in++;
+ debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready_in, node->required_in);
+
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
if (node->ready_link.next == NULL)
@@ -119,6 +115,17 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap
}
}
+static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_graph_node *node) {
+ struct spa_graph_port *p;
+
+ node->ready_in = 0;
+ spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
+ if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
+ node->ready_in++;
+ }
+ debug("node %p update %d ready\n", node, node->ready_in);
+}
+
static inline void
spa_graph_port_add(struct spa_graph *graph,
struct spa_graph_node *node,
@@ -128,13 +135,13 @@ spa_graph_port_add(struct spa_graph *graph,
uint32_t flags,
struct spa_port_io *io)
{
+ debug("port %p add %d to node %p \n", port, direction, node);
port->node = node;
port->direction = direction;
port->port_id = port_id;
port->flags = flags;
port->io = io;
- port->peer = NULL;
- spa_list_insert(node->ports[port->direction].prev, &port->link);
+ spa_list_insert(node->ports[direction].prev, &port->link);
node->max_in++;
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT)
node->required_in++;
@@ -143,96 +150,36 @@ spa_graph_port_add(struct spa_graph *graph,
static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node)
{
+ debug("node %p remove\n", node);
spa_list_remove(&node->link);
+ if (node->ready_link.next)
+ spa_list_remove(&node->ready_link);
}
static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port)
{
+ debug("port %p remove\n", port);
spa_list_remove(&port->link);
+ if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT)
+ port->node->required_in--;
}
static inline void
spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in)
{
+ debug("port %p link to %p \n", out, in);
out->peer = in;
in->peer = out;
}
static inline void
-spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *out,
- struct spa_graph_port *in)
+spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port)
{
- out->peer = NULL;
- in->peer = NULL;
-}
-
-static inline bool spa_graph_node_iterate(struct spa_graph *graph)
-{
- bool res;
- struct spa_graph_port *p;
-
- res = !spa_list_is_empty(&graph->ready);
- if (res) {
- struct spa_graph_node *n =
- spa_list_first(&graph->ready, struct spa_graph_node, ready_link);
-
- spa_list_remove(&n->ready_link);
- n->ready_link.next = NULL;
-
- switch (n->action) {
- case SPA_GRAPH_ACTION_IN:
- case SPA_GRAPH_ACTION_OUT:
- n->state = n->schedule(n);
- if (n->action == SPA_GRAPH_ACTION_IN && n == graph->node)
- break;
- n->action = SPA_GRAPH_ACTION_CHECK;
- spa_list_insert(graph->ready.prev, &n->ready_link);
- break;
-
- case SPA_GRAPH_ACTION_CHECK:
- if (n->state == SPA_RESULT_NEED_BUFFER) {
- n->ready_in = 0;
- spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
- struct spa_graph_node *pn = p->peer->node;
- if (p->io->status == SPA_RESULT_NEED_BUFFER) {
- if (pn != graph->node
- || pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
- pn->action = SPA_GRAPH_ACTION_OUT;
- spa_list_insert(graph->ready.prev,
- &pn->ready_link);
- }
- } else if (p->io->status == SPA_RESULT_OK)
- n->ready_in++;
- }
- } else if (n->state == SPA_RESULT_HAVE_BUFFER) {
- spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
- spa_graph_port_check(graph, p->peer);
- }
- break;
-
- default:
- break;
- }
- res = !spa_list_is_empty(&graph->ready);
+ debug("port %p unlink from %p \n", port, port->peer);
+ if (port->peer) {
+ port->peer->peer = NULL;
+ port->peer = NULL;
}
- return res;
-}
-
-static inline void spa_graph_node_pull(struct spa_graph *graph, struct spa_graph_node *node)
-{
- node->action = SPA_GRAPH_ACTION_CHECK;
- node->state = SPA_RESULT_NEED_BUFFER;
- graph->node = node;
- if (node->ready_link.next == NULL)
- spa_list_insert(graph->ready.prev, &node->ready_link);
-}
-
-static inline void spa_graph_node_push(struct spa_graph *graph, struct spa_graph_node *node)
-{
- node->action = SPA_GRAPH_ACTION_OUT;
- graph->node = node;
- if (node->ready_link.next == NULL)
- spa_list_insert(graph->ready.prev, &node->ready_link);
}
#ifdef __cplusplus
diff --git a/spa/include/spa/list.h b/spa/include/spa/list.h
index 4e6c7023..492bb484 100644
--- a/spa/include/spa/list.h
+++ b/spa/include/spa/list.h
@@ -45,6 +45,14 @@ static inline void spa_list_insert(struct spa_list *list, struct spa_list *elem)
elem->next->prev = elem;
}
+static inline void spa_list_insert_list(struct spa_list *list, struct spa_list *other)
+{
+ other->next->prev = list;
+ other->prev->next = list->next;
+ list->next->prev = other->prev;
+ list->next = other->next;
+}
+
static inline void spa_list_remove(struct spa_list *elem)
{
elem->prev->next = elem->next;
diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c
index 404ea0eb..f215da86 100644
--- a/spa/tests/test-graph.c
+++ b/spa/tests/test-graph.c
@@ -36,6 +36,7 @@
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#include <spa/graph.h>
+#include <spa/graph-scheduler1.h>
static SPA_TYPE_MAP_IMPL(default_map, 4096);
static SPA_LOG_IMPL(default_log);
@@ -97,6 +98,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
+ struct spa_graph_scheduler sched;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port volume_in;
@@ -229,9 +231,9 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
- spa_graph_node_pull(&data->graph, &data->sink_node);
+ spa_graph_scheduler_pull(&data->sched, &data->sink_node);
- while (spa_graph_node_iterate(&data->graph));
+ while (spa_graph_scheduler_iterate(&data->sched));
}
static void
@@ -338,12 +340,12 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]);
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]);
- spa_graph_node_add(&data->graph, &data->source_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default,
data->source);
spa_graph_port_add(&data->graph, &data->source_node, &data->source_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
- spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default,
data->volume);
spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT,
0, 0, &data->source_volume_io[0]);
@@ -353,7 +355,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->graph, &data->volume_node,
&data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]);
- spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->volume_sink_io[0]);
@@ -528,6 +530,7 @@ int main(int argc, char *argv[])
spa_graph_init(&data.graph);
+ spa_graph_scheduler_init(&data.sched, &data.graph);
data.map = &default_map.map;
data.log = &default_log.log;
diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c
index 99b614f0..136ec4a6 100644
--- a/spa/tests/test-mixer.c
+++ b/spa/tests/test-mixer.c
@@ -31,6 +31,7 @@
#include <spa/log-impl.h>
#include <spa/loop.h>
#include <spa/graph.h>
+#include <spa/graph-scheduler1.h>
#include <spa/type-map.h>
#include <spa/type-map-impl.h>
#include <spa/audio/format-utils.h>
@@ -99,6 +100,7 @@ struct data {
uint32_t n_support;
struct spa_graph graph;
+ struct spa_graph_scheduler sched;
struct spa_graph_node source1_node;
struct spa_graph_port source1_out;
struct spa_graph_node source2_node;
@@ -240,8 +242,8 @@ static void on_sink_need_input(struct spa_node *node, void *user_data)
{
struct data *data = user_data;
#ifdef USE_GRAPH
- spa_graph_node_pull(&data->graph, &data->sink_node);
- while (spa_graph_node_iterate(&data->graph));
+ spa_graph_scheduler_pull(&data->sched, &data->sink_node);
+ while (spa_graph_scheduler_iterate(&data->sched));
#else
int res;
@@ -416,17 +418,17 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]);
#ifdef USE_GRAPH
- spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default,
data->source1);
spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
- spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default,
data->source2);
spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
- spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default,
data->mix);
spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
@@ -439,7 +441,7 @@ static int make_nodes(struct data *data, const char *device)
spa_graph_port_add(&data->graph, &data->mix_node,
&data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]);
- spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->mix_sink_io[0]);
@@ -646,6 +648,7 @@ int main(int argc, char *argv[])
data.data_loop.invoke = do_invoke;
spa_graph_init(&data.graph);
+ spa_graph_scheduler_init(&data.sched, &data.graph);
if ((str = getenv("SPA_DEBUG")))
data.log->level = atoi(str);
diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c
index eb22eff7..e2b0ee02 100644
--- a/spa/tests/test-perf.c
+++ b/spa/tests/test-perf.c
@@ -34,6 +34,7 @@
#include <spa/format-utils.h>
#include <spa/format-builder.h>
#include <spa/graph.h>
+#include <spa/graph-scheduler1.h>
#define MODE_SYNC_PUSH (1<<0)
#define MODE_SYNC_PULL (1<<1)
@@ -102,6 +103,7 @@ struct data {
int iterations;
struct spa_graph graph;
+ struct spa_graph_scheduler sched;
struct spa_graph_node source_node;
struct spa_graph_port source_out;
struct spa_graph_port sink_in;
@@ -223,8 +225,8 @@ static void on_sink_pull(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
- spa_graph_node_pull(&data->graph, &data->sink_node);
- while (spa_graph_node_iterate(&data->graph));
+ spa_graph_scheduler_pull(&data->sched, &data->sink_node);
+ while (spa_graph_scheduler_iterate(&data->sched));
}
}
@@ -235,8 +237,8 @@ static void on_source_push(struct data *data)
spa_node_process_output(data->source);
spa_node_process_input(data->sink);
} else {
- spa_graph_node_push(&data->graph, &data->source_node);
- while (spa_graph_node_iterate(&data->graph));
+ spa_graph_scheduler_push(&data->sched, &data->source_node);
+ while (spa_graph_scheduler_iterate(&data->sched));
}
}
@@ -365,12 +367,12 @@ static int make_nodes(struct data *data)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]);
spa_graph_node_add(&data->graph, &data->source_node,
- spa_graph_node_schedule_default, data->source);
+ spa_graph_scheduler_default, data->source);
data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->source_node,
&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]);
- spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_node_schedule_default,
+ spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->sink_node,
@@ -526,6 +528,7 @@ int main(int argc, char *argv[])
const char *str;
spa_graph_init(&data.graph);
+ spa_graph_scheduler_init(&data.sched, &data.graph);
data.map = &default_map.map;
data.log = &default_log.log;