summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2019-07-26 15:53:26 +0200
committerWim Taymans <wtaymans@redhat.com>2019-07-26 16:17:39 +0200
commite76a7abceb7389365f0789d1f8853fd9b52fb5be (patch)
tree85c2d8c866332376681f31e8b08c4993b7d16b42
parentdeb6c52f76bf7598437ac69a8b8392adcb8ecf0d (diff)
Add support for client fd memory
Remove the node buffers reply again. We don't need it. Instead add a new method to the client-node to upload an array of buffer datas. This method is called after the client has allocated buffer mem. It will update the buffers on the server side with the client allocated memory. Wait for the async reply of use_buffers when doing alloc_buffers so that we can get the updated buffer mem before we continue. Let the link follow the states of the ports. Add some error code to the port error states. Add PW_STREAM_FLAG_ALLOC_BUFFERS flag to make the client alloc buffer memory.
-rw-r--r--spa/examples/example-control.c10
-rw-r--r--spa/examples/local-v4l2.c5
-rw-r--r--spa/include/spa/node/node.h13
-rw-r--r--spa/plugins/v4l2/v4l2-utils.c3
-rw-r--r--src/examples/export-sink.c3
-rw-r--r--src/examples/export-source.c3
-rw-r--r--src/examples/local-v4l2.c3
-rw-r--r--src/extensions/client-node.h22
-rw-r--r--src/modules/module-client-node/client-node.c55
-rw-r--r--src/modules/module-client-node/protocol-native.c109
-rw-r--r--src/modules/module-client-node/remote-node.c12
-rw-r--r--src/pipewire/link.c177
-rw-r--r--src/pipewire/node.c2
-rw-r--r--src/pipewire/port.c81
-rw-r--r--src/pipewire/port.h3
-rw-r--r--src/pipewire/private.h10
-rw-r--r--src/pipewire/stream.c4
-rw-r--r--src/pipewire/stream.h3
18 files changed, 373 insertions, 145 deletions
diff --git a/spa/examples/example-control.c b/spa/examples/example-control.c
index 1a118bfb..8132a978 100644
--- a/spa/examples/example-control.c
+++ b/spa/examples/example-control.c
@@ -394,12 +394,14 @@ static int negotiate_formats(struct data *data)
init_buffer(data, data->source_buffers, data->source_buffer, 1, BUFFER_SIZE);
if ((res =
- spa_node_port_use_buffers(data->sink, SPA_DIRECTION_INPUT, 0, 0, data->source_buffers,
- 1)) < 0)
+ spa_node_port_use_buffers(data->sink,
+ SPA_DIRECTION_INPUT, 0, 0,
+ data->source_buffers, 1)) < 0)
return res;
if ((res =
- spa_node_port_use_buffers(data->source, SPA_DIRECTION_OUTPUT, 0, 0, data->source_buffers,
- 1)) < 0)
+ spa_node_port_use_buffers(data->source,
+ SPA_DIRECTION_OUTPUT, 0, 0,
+ data->source_buffers, 1)) < 0)
return res;
return 0;
diff --git a/spa/examples/local-v4l2.c b/spa/examples/local-v4l2.c
index d123e2a1..2d21833f 100644
--- a/spa/examples/local-v4l2.c
+++ b/spa/examples/local-v4l2.c
@@ -390,8 +390,9 @@ static int negotiate_formats(struct data *data)
if ((res = sdl_alloc_buffers(data)) < 0)
return res;
- if ((res = spa_node_port_use_buffers(data->source, SPA_DIRECTION_OUTPUT, 0, 0, data->bp,
- data->n_buffers)) < 0) {
+ if ((res = spa_node_port_use_buffers(data->source,
+ SPA_DIRECTION_OUTPUT, 0, 0,
+ data->bp, data->n_buffers)) < 0) {
printf("can't allocate buffers: %s\n", spa_strerror(res));
return -1;
}
diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h
index c642474f..61b6a4b7 100644
--- a/spa/include/spa/node/node.h
+++ b/spa/include/spa/node/node.h
@@ -113,7 +113,6 @@ struct spa_port_info {
#define SPA_RESULT_TYPE_NODE_ERROR 1
#define SPA_RESULT_TYPE_NODE_PARAMS 2
-#define SPA_RESULT_TYPE_NODE_BUFFERS 3
/** an error result */
struct spa_result_node_error {
@@ -128,12 +127,6 @@ struct spa_result_node_params {
struct spa_pod *param; /**< the result param */
};
-/** the result of use_buffers. */
-struct spa_result_node_buffers {
- uint32_t n_buffers;
- struct spa_buffer **buffers;
-};
-
#define SPA_NODE_EVENT_INFO 0
#define SPA_NODE_EVENT_PORT_INFO 1
#define SPA_NODE_EVENT_RESULT 2
@@ -536,13 +529,13 @@ struct spa_node_methods {
* Passing NULL as \a buffers will remove the reference that the port has
* on the buffers.
*
- * The function will emit the result event of type SPA_RESULT_TYPE_NODE_BUFFERS
- * with the final allocation of the buffers.
+ * When this function returns async, use the spa_node_sync operation to
+ * wait for completion.
*
* This function must be called from the main thread.
*
* \param object an object implementing the interface
- * \param direction an spa_direction
+ * \param direction a port direction
* \param port_id a port id
* \param flags extra flags
* \param buffers an array of buffer pointers
diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c
index 46c402ed..26aebc6e 100644
--- a/spa/plugins/v4l2/v4l2-utils.c
+++ b/spa/plugins/v4l2/v4l2-utils.c
@@ -1338,8 +1338,7 @@ static int spa_v4l2_use_buffers(struct impl *this, struct spa_buffer **buffers,
static int
mmap_init(struct impl *this,
- struct spa_buffer **buffers,
- uint32_t n_buffers)
+ struct spa_buffer **buffers, uint32_t n_buffers)
{
struct port *port = &this->out_ports[0];
struct spa_v4l2_device *dev = &port->dev;
diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c
index 82815b43..e91d67db 100644
--- a/src/examples/export-sink.c
+++ b/src/examples/export-sink.c
@@ -339,7 +339,8 @@ static int impl_port_set_param(void *object,
return -ENOENT;
}
-static int impl_port_use_buffers(void *object, enum spa_direction direction, uint32_t port_id,
+static int impl_port_use_buffers(void *object,
+ enum spa_direction direction, uint32_t port_id,
uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
diff --git a/src/examples/export-source.c b/src/examples/export-source.c
index a69babdf..6c7269c4 100644
--- a/src/examples/export-source.c
+++ b/src/examples/export-source.c
@@ -303,7 +303,8 @@ static int impl_port_set_param(void *object,
return -ENOENT;
}
-static int impl_port_use_buffers(void *object, enum spa_direction direction, uint32_t port_id,
+static int impl_port_use_buffers(void *object,
+ enum spa_direction direction, uint32_t port_id,
uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c
index 1fd4161c..e020c872 100644
--- a/src/examples/local-v4l2.c
+++ b/src/examples/local-v4l2.c
@@ -242,7 +242,8 @@ static int impl_port_set_param(void *object,
return -ENOENT;
}
-static int impl_port_use_buffers(void *object, enum spa_direction direction, uint32_t port_id,
+static int impl_port_use_buffers(void *object,
+ enum spa_direction direction, uint32_t port_id,
uint32_t flags, struct spa_buffer **buffers, uint32_t n_buffers)
{
struct data *d = object;
diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h
index ec0227fc..0857a666 100644
--- a/src/extensions/client-node.h
+++ b/src/extensions/client-node.h
@@ -208,7 +208,8 @@ struct pw_client_node_proxy_events {
#define PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE 3
#define PW_CLIENT_NODE_PROXY_METHOD_SET_ACTIVE 4
#define PW_CLIENT_NODE_PROXY_METHOD_EVENT 5
-#define PW_CLIENT_NODE_PROXY_METHOD_NUM 6
+#define PW_CLIENT_NODE_PROXY_METHOD_PORT_BUFFERS 6
+#define PW_CLIENT_NODE_PROXY_METHOD_NUM 7
/** \ref pw_client_node methods */
struct pw_client_node_proxy_methods {
@@ -269,6 +270,16 @@ struct pw_client_node_proxy_methods {
* \param event the event to send
*/
int (*event) (void *object, const struct spa_event *event);
+
+ /**
+ * Send allocated buffers
+ */
+ int (*port_buffers) (void *object,
+ enum spa_direction direction,
+ uint32_t port_id,
+ uint32_t mix_id,
+ uint32_t n_buffers,
+ struct spa_buffer **buffers);
};
@@ -294,10 +305,11 @@ pw_client_node_proxy_get_node(struct pw_client_node_proxy *p, uint32_t version,
return res;
}
-#define pw_client_node_proxy_update(c,...) pw_client_node_proxy_method(c,update,0,__VA_ARGS__)
-#define pw_client_node_proxy_port_update(c,...) pw_client_node_proxy_method(c,port_update,0,__VA_ARGS__)
-#define pw_client_node_proxy_set_active(c,...) pw_client_node_proxy_method(c,set_active,0,__VA_ARGS__)
-#define pw_client_node_proxy_event(c,...) pw_client_node_proxy_method(c,event,0,__VA_ARGS__)
+#define pw_client_node_proxy_update(c,...) pw_client_node_proxy_method(c,update,0,__VA_ARGS__)
+#define pw_client_node_proxy_port_update(c,...) pw_client_node_proxy_method(c,port_update,0,__VA_ARGS__)
+#define pw_client_node_proxy_set_active(c,...) pw_client_node_proxy_method(c,set_active,0,__VA_ARGS__)
+#define pw_client_node_proxy_event(c,...) pw_client_node_proxy_method(c,event,0,__VA_ARGS__)
+#define pw_client_node_proxy_port_buffers(c,...) pw_client_node_proxy_method(c,port_buffers,0,__VA_ARGS__)
#ifdef __cplusplus
} /* extern "C" */
diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c
index ecbdec47..551d703a 100644
--- a/src/modules/module-client-node/client-node.c
+++ b/src/modules/module-client-node/client-node.c
@@ -786,6 +786,9 @@ do_port_use_buffers(struct impl *impl,
memcpy(&b->buffer.datas[j], d, sizeof(struct spa_data));
+ if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
+ continue;
+
if (d->type == SPA_DATA_DmaBuf ||
d->type == SPA_DATA_MemFd) {
uint32_t flags = PW_MEMBLOCK_FLAG_DONT_CLOSE;
@@ -974,6 +977,57 @@ static int client_node_event(void *data, const struct spa_event *event)
return 0;
}
+static int client_node_port_buffers(void *data,
+ enum spa_direction direction,
+ uint32_t port_id,
+ uint32_t mix_id,
+ uint32_t n_buffers,
+ struct spa_buffer **buffers)
+{
+ struct impl *impl = data;
+ struct node *this = &impl->node;
+ struct port *p;
+ struct mix *mix;
+ uint32_t i, j;
+
+ spa_log_debug(this->log, NAME " %p: %s port %d.%d buffers %p %u", impl,
+ direction == SPA_DIRECTION_INPUT ? "input" : "output",
+ port_id, mix_id, buffers, n_buffers);
+
+ spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
+
+ p = GET_PORT(this, direction, port_id);
+ if (!p->have_format)
+ return -EIO;
+
+ if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid)
+ return -EINVAL;
+
+
+ for (i = 0; i < n_buffers; i++) {
+ struct spa_buffer *oldbuf, *newbuf;
+
+ oldbuf = mix->buffers[i].outbuf;
+ newbuf = buffers[i];
+
+ spa_log_debug(this->log, "buffer %d n_datas:%d", i, newbuf->n_datas);
+
+ if (oldbuf->n_datas != newbuf->n_datas)
+ return -EINVAL;
+
+ for (j = 0; j < newbuf->n_datas; j++) {
+ oldbuf->datas[j] = newbuf->datas[j];
+
+ spa_log_debug(this->log, " data %d type:%d fd:%d", j,
+ newbuf->datas[j].type,
+ (int) newbuf->datas[j].fd);
+ }
+ }
+ mix->n_buffers = n_buffers;
+
+ return 0;
+}
+
static struct pw_client_node_proxy_methods client_node_methods = {
PW_VERSION_CLIENT_NODE_PROXY_METHODS,
.get_node = client_node_get_node,
@@ -981,6 +1035,7 @@ static struct pw_client_node_proxy_methods client_node_methods = {
.port_update = client_node_port_update,
.set_active = client_node_set_active,
.event = client_node_event,
+ .port_buffers = client_node_port_buffers,
};
static void node_on_data_fd_events(struct spa_source *source)
diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c
index b90264de..c511a8f1 100644
--- a/src/modules/module-client-node/protocol-native.c
+++ b/src/modules/module-client-node/protocol-native.c
@@ -262,6 +262,49 @@ static int client_node_marshal_event_method(void *object, const struct spa_event
return pw_protocol_native_end_proxy(proxy, b);
}
+static int
+client_node_marshal_port_buffers(void *object,
+ enum spa_direction direction,
+ uint32_t port_id,
+ uint32_t mix_id,
+ uint32_t n_buffers,
+ struct spa_buffer **buffers)
+{
+ struct pw_proxy *proxy = object;
+ struct spa_pod_builder *b;
+ struct spa_pod_frame f[2];
+ uint32_t i, j;
+
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_PORT_BUFFERS, NULL);
+
+ spa_pod_builder_push_struct(b, &f[0]);
+ spa_pod_builder_add(b,
+ SPA_POD_Int(direction),
+ SPA_POD_Int(port_id),
+ SPA_POD_Int(mix_id),
+ SPA_POD_Int(n_buffers), NULL);
+
+ for (i = 0; i < n_buffers; i++) {
+ struct spa_buffer *buf = buffers[i];
+
+ spa_pod_builder_add(b,
+ SPA_POD_Int(buf->n_datas), NULL);
+
+ for (j = 0; j < buf->n_datas; j++) {
+ struct spa_data *d = &buf->datas[j];
+ spa_pod_builder_add(b,
+ SPA_POD_Id(d->type),
+ SPA_POD_Int(pw_protocol_native_add_proxy_fd(proxy, d->fd)),
+ SPA_POD_Int(d->flags),
+ SPA_POD_Int(d->mapoffset),
+ SPA_POD_Int(d->maxsize), NULL);
+ }
+ }
+ spa_pod_builder_pop(b, &f[0]);
+
+ return pw_protocol_native_end_proxy(proxy, b);
+}
+
static int client_node_demarshal_transport(void *object, const struct pw_protocol_native_message *msg)
{
struct pw_proxy *proxy = object;
@@ -687,11 +730,11 @@ client_node_marshal_port_use_buffers(void *object,
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
- SPA_POD_Int(direction),
- SPA_POD_Int(port_id),
- SPA_POD_Int(mix_id),
- SPA_POD_Int(flags),
- SPA_POD_Int(n_buffers), NULL);
+ SPA_POD_Int(direction),
+ SPA_POD_Int(port_id),
+ SPA_POD_Int(mix_id),
+ SPA_POD_Int(flags),
+ SPA_POD_Int(n_buffers), NULL);
for (i = 0; i < n_buffers; i++) {
struct spa_buffer *buf = buffers[i].buffer;
@@ -997,6 +1040,56 @@ static int client_node_demarshal_event_method(void *object, const struct pw_prot
return 0;
}
+static int client_node_demarshal_port_buffers(void *object, const struct pw_protocol_native_message *msg)
+{
+ struct pw_resource *resource = object;
+ struct spa_pod_parser prs;
+ struct spa_pod_frame f;
+ uint32_t i, j, direction, port_id, mix_id, n_buffers, data_id;
+ struct spa_buffer **buffers = NULL;
+
+ spa_pod_parser_init(&prs, msg->data, msg->size);
+ if (spa_pod_parser_push_struct(&prs, &f) < 0 ||
+ spa_pod_parser_get(&prs,
+ SPA_POD_Int(&direction),
+ SPA_POD_Int(&port_id),
+ SPA_POD_Int(&mix_id),
+ SPA_POD_Int(&n_buffers), NULL) < 0)
+ return -EINVAL;
+
+ buffers = alloca(sizeof(struct spa_buffer*) * n_buffers);
+ for (i = 0; i < n_buffers; i++) {
+ struct spa_buffer *buf = buffers[i] = alloca(sizeof(struct spa_buffer));
+
+ buf->n_metas = 0;
+ buf->metas = NULL;
+
+ if (spa_pod_parser_get(&prs,
+ SPA_POD_Int(&buf->n_datas), NULL) < 0)
+ return -EINVAL;
+
+ buf->datas = alloca(sizeof(struct spa_data) * buf->n_datas);
+ for (j = 0; j < buf->n_datas; j++) {
+ struct spa_data *d = &buf->datas[j];
+
+ if (spa_pod_parser_get(&prs,
+ SPA_POD_Id(&d->type),
+ SPA_POD_Int(&data_id),
+ SPA_POD_Int(&d->flags),
+ SPA_POD_Int(&d->mapoffset),
+ SPA_POD_Int(&d->maxsize), NULL) < 0)
+ return -EINVAL;
+
+ d->fd = pw_protocol_native_get_resource_fd(resource, data_id);
+ }
+ }
+
+ pw_resource_notify(resource, struct pw_client_node_proxy_methods, port_buffers, 0,
+ direction, port_id, mix_id, n_buffers, buffers);
+
+ return 0;
+}
+
static const struct pw_client_node_proxy_methods pw_protocol_native_client_node_method_marshal = {
PW_VERSION_CLIENT_NODE_PROXY_METHODS,
.add_listener = &client_node_marshal_add_listener,
@@ -1004,7 +1097,8 @@ static const struct pw_client_node_proxy_methods pw_protocol_native_client_node_
.update = &client_node_marshal_update,
.port_update = &client_node_marshal_port_update,
.set_active = &client_node_marshal_set_active,
- .event = &client_node_marshal_event_method
+ .event = &client_node_marshal_event_method,
+ .port_buffers = &client_node_marshal_port_buffers
};
static const struct pw_protocol_native_demarshal
@@ -1015,7 +1109,8 @@ pw_protocol_native_client_node_method_demarshal[PW_CLIENT_NODE_PROXY_METHOD_NUM]
[PW_CLIENT_NODE_PROXY_METHOD_UPDATE] = { &client_node_demarshal_update, 0 },
[PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE] = { &client_node_demarshal_port_update, 0 },
[PW_CLIENT_NODE_PROXY_METHOD_SET_ACTIVE] = { &client_node_demarshal_set_active, 0 },
- [PW_CLIENT_NODE_PROXY_METHOD_EVENT] = { &client_node_demarshal_event_method, 0 }
+ [PW_CLIENT_NODE_PROXY_METHOD_EVENT] = { &client_node_demarshal_event_method, 0 },
+ [PW_CLIENT_NODE_PROXY_METHOD_PORT_BUFFERS] = { &client_node_demarshal_port_buffers, 0 }
};
static const struct pw_client_node_proxy_events pw_protocol_native_client_node_event_marshal = {
diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c
index a06f3034..f1920ef8 100644
--- a/src/modules/module-client-node/remote-node.c
+++ b/src/modules/module-client-node/remote-node.c
@@ -359,8 +359,7 @@ static int add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_
pi.flags = port->spa_flags;
pi.rate = SPA_FRACTION(0, 1);
pi.props = &port->properties->dict;
- SPA_FLAG_UNSET(pi.flags,
- SPA_PORT_FLAG_CAN_ALLOC_BUFFERS | SPA_PORT_FLAG_DYNAMIC_DATA);
+ SPA_FLAG_UNSET(pi.flags, SPA_PORT_FLAG_DYNAMIC_DATA);
pi.n_params = port->info.n_params;
pi.params = port->info.params;
}
@@ -667,9 +666,15 @@ client_node_port_use_buffers(void *object,
bufs[i] = b;
}
- if ((res = pw_port_use_buffers(mix->port, mix->mix_id, flags, bufs, n_buffers)) < 0)
+ if ((res = pw_port_use_buffers(mix->port, mix_id, flags, bufs, n_buffers)) < 0)
goto error_exit_cleanup;
+ if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) {
+ pw_client_node_proxy_port_buffers(data->node_proxy,
+ direction, port_id, mix_id,
+ n_buffers,
+ bufs);
+ }
return res;
error_exit_cleanup:
@@ -968,7 +973,6 @@ static void node_active_changed(void *data, bool active)
pw_client_node_proxy_set_active(d->node_proxy, active);
}
-
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = node_destroy,
diff --git a/src/pipewire/link.c b/src/pipewire/link.c
index 657b0b15..90fa86bd 100644
--- a/src/pipewire/link.c
+++ b/src/pipewire/link.c
@@ -100,11 +100,23 @@ static void debug_link(struct pw_link *link)
in->idle_used_output_links);
}
+static void info_changed(struct pw_link *link)
+{
+ struct pw_resource *resource;
+
+ pw_link_emit_info_changed(link, &link->info);
+
+ if (link->global)
+ spa_list_for_each(resource, &link->global->resource_list, link)
+ pw_link_resource_info(resource, &link->info);
+
+ link->info.change_mask = 0;
+}
+
static void pw_link_update_state(struct pw_link *link, enum pw_link_state state, char *error)
{
enum pw_link_state old = link->info.state;
struct pw_node *in = link->input->node, *out = link->output->node;
- struct pw_resource *resource;
if (state == old)
return;
@@ -124,13 +136,7 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
pw_link_emit_state_changed(link, old, state, error);
link->info.change_mask |= PW_LINK_CHANGE_MASK_STATE;
- pw_link_emit_info_changed(link, &link->info);
-
- if (link->global)
- spa_list_for_each(resource, &link->global->resource_list, link)
- pw_link_resource_info(resource, &link->info);
-
- link->info.change_mask = 0;
+ info_changed(link);
debug_link(link);
@@ -159,16 +165,13 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
struct pw_port_mix *mix = obj == this->input->node ? &this->rt.in_mix : &this->rt.out_mix;
struct pw_port *port = mix->p;
+ pw_log_debug("port %p: complete READY: %s", port, spa_strerror(res));
+
if (SPA_RESULT_IS_OK(res)) {
- pw_port_update_state(port, PW_PORT_STATE_READY);
- pw_log_debug("port %p: state READY", port);
+ pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
} else {
- pw_port_update_state(port, PW_PORT_STATE_ERROR);
- pw_log_warn("port %p: failed to go to READY", port);
+ pw_port_update_state(port, PW_PORT_STATE_ERROR, NULL);
}
- if (this->input->state >= PW_PORT_STATE_READY &&
- this->output->state >= PW_PORT_STATE_READY)
- pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL);
}
static void complete_paused(void *obj, void *data, int res, uint32_t id)
@@ -177,18 +180,13 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
struct pw_port_mix *mix = obj == this->input->node ? &this->rt.in_mix : &this->rt.out_mix;
struct pw_port *port = mix->p;
+ pw_log_debug("port %p: complete PAUSED: %s", port, spa_strerror(res));
+
if (SPA_RESULT_IS_OK(res)) {
- pw_port_update_state(port, PW_PORT_STATE_PAUSED);
- mix->have_buffers = true;
- pw_log_debug("port %p: state PAUSED", port);
+ pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
} else {
- pw_port_update_state(port, PW_PORT_STATE_ERROR);
- mix->have_buffers = false;
- pw_log_warn("port %p: failed to go to PAUSED", port);
+ pw_port_update_state(port, PW_PORT_STATE_ERROR, NULL);
}
- if (this->rt.in_mix.have_buffers && this->rt.out_mix.have_buffers)
- pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
-
}
static int do_negotiate(struct pw_link *this)
@@ -197,7 +195,6 @@ static int do_negotiate(struct pw_link *this)
int res = -EIO, res2;
struct spa_pod *format = NULL, *current;
char *error = NULL;
- struct pw_resource *resource;
bool changed = true;
struct pw_port *input, *output;
uint8_t buffer[4096];
@@ -347,14 +344,7 @@ static int do_negotiate(struct pw_link *this)
if (changed) {
this->info.change_mask |= PW_LINK_CHANGE_MASK_FORMAT;
-
- pw_link_emit_info_changed(this, &this->info);
-
- if (this->global)
- spa_list_for_each(resource, &this->global->resource_list, link)
- pw_link_resource_info(resource, &this->info);
-
- this->info.change_mask = 0;
+ info_changed(this);
}
pw_log_debug("link %p: result %d", this, res);
return res;
@@ -561,7 +551,7 @@ static int do_allocation(struct pw_link *this)
char *error = NULL;
struct pw_port *input, *output;
struct allocation allocation = { NULL, };
- bool in_use, out_use;
+ bool out_alloc = false;
if (this->info.state > PW_LINK_STATE_ALLOCATING)
return 0;
@@ -575,7 +565,6 @@ static int do_allocation(struct pw_link *this)
in_flags = input->spa_flags;
out_flags = output->spa_flags;
- in_use = out_use = true;
pw_log_debug("link %p: doing alloc buffers %p %p: in_flags:%08x out_flags:%08x",
this, output->node, input->node, in_flags, out_flags);
@@ -678,61 +667,61 @@ static int do_allocation(struct pw_link *this)
if (out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(output,
- params, n_params,
allocation.buffers,
- &allocation.n_buffers)) < 0) {
+ allocation.n_buffers)) < 0) {
asprintf(&error, "error alloc output buffers: %d", res);
goto error;
}
out_res = res;
- out_use = false;
+ out_alloc = true;
move_allocation(&allocation, &output->allocation);
- pw_log_debug("link %p: allocated %d buffers %p from output port", this,
- allocation.n_buffers, allocation.buffers);
+ pw_log_debug("link %p: allocated %d buffers %p from output port: %s", this,
+ allocation.n_buffers, allocation.buffers, spa_strerror(out_res));
}
}
- if (out_use) {
+
+ if (!out_alloc) {
pw_log_debug("link %p: using %d buffers %p on output port", this,
allocation.n_buffers, allocation.buffers);
+
if ((res = pw_port_use_buffers(output,
- this->rt.out_mix.port.port_id,
- 0,
- allocation.buffers,
- allocation.n_buffers)) < 0) {
+ this->rt.out_mix.port.port_id, 0,
+ allocation.buffers,
+ allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use output buffers: %s", this,
spa_strerror(res));
goto error;
}
out_res = res;
+
move_allocation(&allocation, &output->allocation);
}
- if (in_use) {
- pw_log_debug("link %p: using %d buffers %p on input port", this,
- allocation.n_buffers, allocation.buffers);
- if ((res = pw_port_use_buffers(input,
- this->rt.in_mix.port.port_id,
- 0,
- allocation.buffers,
- allocation.n_buffers)) < 0) {
- asprintf(&error, "link %p: error use input buffers: %s", this,
- spa_strerror(res));
- goto error;
- }
- in_res = res;
- } else {
- asprintf(&error, "no common buffer alloc found");
- res = -EIO;
+
+ pw_log_debug("link %p: using %d buffers %p on input port", this,
+ output->allocation.n_buffers, output->allocation.buffers);
+
+ if ((res = pw_port_use_buffers(input,
+ this->rt.in_mix.port.port_id,
+ 0,
+ output->allocation.buffers,
+ output->allocation.n_buffers)) < 0) {
+ asprintf(&error, "link %p: error use input buffers: %s", this,
+ spa_strerror(res));
goto error;
}
+ in_res = res;
if (SPA_RESULT_IS_ASYNC(out_res)) {
pw_work_queue_add(impl->work, output->node,
spa_node_sync(output->node->node, out_res),
complete_paused, this);
+ if (out_alloc)
+ return 0;
} else {
complete_paused(output->node, this, out_res, 0);
}
+
if (SPA_RESULT_IS_ASYNC(in_res)) {
pw_work_queue_add(impl->work, input->node,
spa_node_sync(input->node->node, in_res),
@@ -841,8 +830,8 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
}
if (PW_PORT_IS_CONTROL(output) && PW_PORT_IS_CONTROL(input)) {
- pw_port_update_state(input, PW_PORT_STATE_PAUSED);
- pw_port_update_state(output, PW_PORT_STATE_PAUSED);
+ pw_port_update_state(input, PW_PORT_STATE_PAUSED, NULL);
+ pw_port_update_state(output, PW_PORT_STATE_PAUSED, NULL);
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
}
@@ -1091,34 +1080,80 @@ error_resource:
return -errno;
}
+static void port_state_changed(struct pw_link *this, struct pw_port *port, struct pw_port *other,
+ enum pw_port_state state, const char *error)
+{
+ switch (state) {
+ case PW_PORT_STATE_ERROR:
+ pw_link_update_state(this, PW_LINK_STATE_ERROR, strdup(error));
+ break;
+ case PW_PORT_STATE_INIT:
+ break;
+ case PW_PORT_STATE_CONFIGURE:
+ break;
+ case PW_PORT_STATE_READY:
+ if (other->state >= PW_PORT_STATE_READY)
+ pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL);
+ break;
+ case PW_PORT_STATE_PAUSED:
+ if (other->state >= PW_PORT_STATE_PAUSED)
+ pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
+ break;
+ }
+}
+
+static void input_port_state_changed(void *data, enum pw_port_state old,
+ enum pw_port_state state, const char *error)
+{
+ struct impl *impl = data;
+ struct pw_link *this = &impl->this;
+ port_state_changed(this, this->input, this->output, state, error);
+}
+
+static void output_port_state_changed(void *data, enum pw_port_state old,
+ enum pw_port_state state, const char *error)
+{
+ struct impl *impl = data;
+ struct pw_link *this = &impl->this;
+ port_state_changed(this, this->output, this->input, state, error);
+}
+
static const struct pw_port_events input_port_events = {
PW_VERSION_PORT_EVENTS,
+ .state_changed = input_port_state_changed,
.destroy = input_port_destroy,
};
static const struct pw_port_events output_port_events = {
PW_VERSION_PORT_EVENTS,
+ .state_changed = output_port_state_changed,
.destroy = output_port_destroy,
};
+static void node_result(struct impl *impl, struct pw_node *node,
+ int seq, int res, uint32_t type, const void *result)
+{
+ if (SPA_RESULT_IS_ASYNC(seq))
+ pw_work_queue_complete(impl->work, node, SPA_RESULT_ASYNC_SEQ(seq), res);
+}
+
static void input_node_result(void *data, int seq, int res, uint32_t type, const void *result)
{
struct impl *impl = data;
struct pw_node *node = impl->this.input->node;
- if (SPA_RESULT_IS_ASYNC(seq)) {
- pw_log_trace("link %p: input node %p result %d %d", impl, node, seq, res);
- pw_work_queue_complete(impl->work, node, SPA_RESULT_ASYNC_SEQ(seq), res);
- }
+ pw_log_debug("link %p: input node %p result seq:%d res:%d type:%u",
+ impl, node, seq, res, type);
+ node_result(impl, node, seq, res, type, result);
}
static void output_node_result(void *data, int seq, int res, uint32_t type, const void *result)
{
struct impl *impl = data;
struct pw_node *node = impl->this.output->node;
- if (SPA_RESULT_IS_ASYNC(seq)) {
- pw_log_trace("link %p: output node %p result %d %d", impl, node, seq, res);
- pw_work_queue_complete(impl->work, node, SPA_RESULT_ASYNC_SEQ(seq), res);
- }
+ pw_log_debug("link %p: output node %p result seq:%d res:%d type:%u",
+ impl, node, seq, res, type);
+
+ node_result(impl, node, seq, res, type, result);
}
static const struct pw_node_events input_node_events = {
diff --git a/src/pipewire/node.c b/src/pipewire/node.c
index 0a71a163..f8c81e11 100644
--- a/src/pipewire/node.c
+++ b/src/pipewire/node.c
@@ -1082,7 +1082,7 @@ static void node_result(void *data, int seq, int res, uint32_t type, const void
struct pw_node *node = data;
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
- pw_log_trace("node %p: result seq:%d res:%d", node, seq, res);
+ pw_log_trace("node %p: result seq:%d res:%d type:%u", node, seq, res, type);
impl->last_error = res;
if (SPA_RESULT_IS_ASYNC(seq))
diff --git a/src/pipewire/port.c b/src/pipewire/port.c
index 54e3d18f..d62c3c69 100644
--- a/src/pipewire/port.c
+++ b/src/pipewire/port.c
@@ -77,14 +77,37 @@ static void emit_info_changed(struct pw_port *port)
port->info.change_mask = 0;
}
-void pw_port_update_state(struct pw_port *port, enum pw_port_state state)
+static const char *port_state_as_string(enum pw_port_state state)
{
- if (port->state != state) {
+ switch (state) {
+ case PW_PORT_STATE_ERROR:
+ return "error";
+ case PW_PORT_STATE_INIT:
+ return "init";
+ case PW_PORT_STATE_CONFIGURE:
+ return "configure";
+ case PW_PORT_STATE_READY:
+ return "ready";
+ case PW_PORT_STATE_PAUSED:
+ return "paused";
+ }
+ return "invalid-state";
+}
+
+void pw_port_update_state(struct pw_port *port, enum pw_port_state state, char *error)
+{
+ enum pw_port_state old = port->state;
+
+ if (old != state) {
pw_log(state == PW_PORT_STATE_ERROR ?
SPA_LOG_LEVEL_ERROR : SPA_LOG_LEVEL_DEBUG,
- "port %p: state %d -> %d", port, port->state, state);
+ "port %p: state %s -> %s (%s)", port,
+ port_state_as_string(old), port_state_as_string(state), error);
+
port->state = state;
- pw_port_emit_state_changed(port, state);
+ free((void*)port->error);
+ port->error = error;
+ pw_port_emit_state_changed(port, old, state, error);
}
}
@@ -755,7 +778,7 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port);
if (port->state <= PW_PORT_STATE_INIT)
- pw_port_update_state(port, PW_PORT_STATE_CONFIGURE);
+ pw_port_update_state(port, PW_PORT_STATE_CONFIGURE, NULL);
pw_node_emit_port_added(node, port);
@@ -838,6 +861,7 @@ void pw_port_destroy(struct pw_port *port)
pw_port_emit_free(port);
free_allocation(&port->allocation);
+ free((void*)port->error);
pw_map_clear(&port->mix_port_map);
@@ -1009,15 +1033,16 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
if (id == SPA_PARAM_Format) {
pw_log_debug("port %p: %d %p %d", port, port->state, param, res);
+
/* setting the format always destroys the negotiated buffers */
free_allocation(&port->allocation);
port->allocated = false;
if (param == NULL || res < 0) {
- pw_port_update_state(port, PW_PORT_STATE_CONFIGURE);
+ pw_port_update_state(port, PW_PORT_STATE_CONFIGURE, NULL);
}
else if (!SPA_RESULT_IS_ASYNC(res)) {
- pw_port_update_state(port, PW_PORT_STATE_READY);
+ pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
}
}
return res;
@@ -1032,7 +1057,8 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
struct pw_port_mix *mix = NULL;
pw_log_debug("port %p: %d:%d.%d: %d buffers state:%d", port,
- port->direction, port->port_id, mix_id, n_buffers, port->state);
+ port->direction, port->port_id, mix_id,
+ n_buffers, port->state);
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return 0;
@@ -1040,10 +1066,7 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
if (n_buffers > 0 && port->state < PW_PORT_STATE_READY)
return -EIO;
- if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL)
- return -EIO;
-
- {
+ if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) != NULL) {
res = spa_node_port_use_buffers(port->mix,
mix->port.direction, mix->port.port_id, flags,
buffers, n_buffers);
@@ -1056,7 +1079,7 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
if (n_buffers == 0) {
if (port->n_mix == 1)
- pw_port_update_state(port, PW_PORT_STATE_READY);
+ pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
}
if (port->state == PW_PORT_STATE_READY) {
if (!SPA_FLAG_CHECK(port->mix_flags, PW_PORT_MIX_FLAG_MIX_ONLY)) {
@@ -1072,21 +1095,20 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
port->allocated = false;
free_allocation(&port->allocation);
- res = pw_port_call_use_buffers(port, flags, buffers, n_buffers);
+ pw_port_call_use_buffers(port, flags, buffers, n_buffers);
}
- if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) {
- pw_port_update_state(port, PW_PORT_STATE_PAUSED);
- }
+ if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res))
+ pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
+
return res;
}
SPA_EXPORT
int pw_port_alloc_buffers(struct pw_port *port,
- struct spa_pod **params, uint32_t n_params,
- struct spa_buffer **buffers, uint32_t *n_buffers)
+ struct spa_buffer **buffers, uint32_t n_buffers)
{
- int res;
+ int res, res2;
struct pw_node *node = port->node;
if (port->state < PW_PORT_STATE_READY)
@@ -1095,37 +1117,38 @@ int pw_port_alloc_buffers(struct pw_port *port,
if ((res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
SPA_NODE_BUFFERS_FLAG_ALLOC,
- buffers, *n_buffers)) < 0) {
+ buffers, n_buffers)) < 0) {
pw_log_error("port %p: %d alloc failed: %d (%s)", port, port->port_id,
res, spa_strerror(res));
}
if (res >= 0) {
- res = pw_port_call_use_buffers(port, SPA_NODE_BUFFERS_FLAG_ALLOC, buffers, *n_buffers);
- if (res < 0) {
+ res2 = pw_port_call_use_buffers(port,
+ SPA_NODE_BUFFERS_FLAG_ALLOC,
+ buffers, n_buffers);
+ if (res2 < 0) {
pw_log_error("port %p: %d implementation alloc failed: %d (%s)",
- port, port->port_id, res, spa_strerror(res));
+ port, port->port_id, res, spa_strerror(res2));
}
}
pw_log_debug("port %p: %d alloc %d buffers: %d (%s)", port,
- port->port_id, *n_buffers, res, spa_strerror(res));
+ port->port_id, n_buffers, res, spa_strerror(res));
free_allocation(&port->allocation);
if (res < 0) {
- *n_buffers = 0;
port->allocated = false;
} else {
port->allocated = true;
}
- if (*n_buffers == 0) {
+ if (n_buffers == 0) {
if (port->n_mix == 1)
- pw_port_update_state(port, PW_PORT_STATE_READY);
+ pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
}
else if (!SPA_RESULT_IS_ASYNC(res)) {
- pw_port_update_state(port, PW_PORT_STATE_PAUSED);
+ pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
}
return res;
diff --git a/src/pipewire/port.h b/src/pipewire/port.h
index ffb18d82..9245e7b8 100644
--- a/src/pipewire/port.h
+++ b/src/pipewire/port.h
@@ -78,7 +78,8 @@ struct pw_port_events {
void (*link_removed) (void *data, struct pw_link *link);
/** the state of the port changed */
- void (*state_changed) (void *data, enum pw_port_state state);
+ void (*state_changed) (void *data, enum pw_port_state old,
+ enum pw_port_state state, const char *error);
/** a control was added to the port */
void (*control_added) (void *data, struct pw_control *control);
diff --git a/src/pipewire/private.h b/src/pipewire/private.h
index 0ccaa957..5eb4deea 100644
--- a/src/pipewire/private.h
+++ b/src/pipewire/private.h
@@ -467,7 +467,6 @@ struct pw_port_mix {
} port;
struct spa_io_buffers *io;
uint32_t id;
- int have_buffers;
};
struct pw_port_implementation {
@@ -498,7 +497,7 @@ struct pw_port_implementation {
#define pw_port_emit_info_changed(p,i) pw_port_emit(p, info_changed, 0, i)
#define pw_port_emit_link_added(p,l) pw_port_emit(p, link_added, 0, l)
#define pw_port_emit_link_removed(p,l) pw_port_emit(p, link_removed, 0, l)
-#define pw_port_emit_state_changed(p,s) pw_port_emit(p, state_changed, 0, s)
+#define pw_port_emit_state_changed(p,o,s,e) pw_port_emit(p, state_changed, 0, o, s, e)
#define pw_port_emit_control_added(p,c) pw_port_emit(p, control_added, 0, c)
#define pw_port_emit_control_removed(p,c) pw_port_emit(p, control_removed, 0, c)
@@ -523,6 +522,7 @@ struct pw_port {
uint32_t port_id; /**< port id */
enum pw_port_state state; /**< state of the port */
+ const char *error; /**< error state */
struct pw_properties *properties; /**< properties of the port */
struct pw_port_info info;
@@ -824,8 +824,7 @@ int pw_port_add(struct pw_port *port, struct pw_node *node);
int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix);
int pw_port_release_mix(struct pw_port *port, struct pw_port_mix *mix);
-void pw_port_mix_update_state(struct pw_port *port, struct pw_port_mix *mix, enum pw_port_state state);
-void pw_port_update_state(struct pw_port *port, enum pw_port_state state);
+void pw_port_update_state(struct pw_port *port, enum pw_port_state state, char *error);
/** Unlink a port \memberof pw_port */
void pw_port_unlink(struct pw_port *port);
@@ -878,8 +877,7 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
/** Allocate memory for buffers on a port \memberof pw_port */
int pw_port_alloc_buffers(struct pw_port *port,
- struct spa_pod **params, uint32_t n_params,
- struct spa_buffer **buffers, uint32_t *n_buffers);
+ struct spa_buffer **buffers, uint32_t n_buffers);
/** Change the state of the node */
int pw_node_set_state(struct pw_node *node, enum pw_node_state state);
diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c
index 61fc3d94..68c56834 100644
--- a/src/pipewire/stream.c
+++ b/src/pipewire/stream.c
@@ -152,6 +152,7 @@ struct stream {
unsigned int disconnecting:1;
unsigned int free_data:1;
unsigned int subscribe:1;
+ unsigned int alloc_buffers:1;
};
static int get_param_index(uint32_t id)
@@ -386,6 +387,8 @@ static void emit_port_info(struct stream *d)
info = SPA_PORT_INFO_INIT();
info.change_mask |= SPA_PORT_CHANGE_MASK_FLAGS;
info.flags = 0;
+ if (d->alloc_buffers)
+ info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS;
info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
info.params = d->params;
info.n_params = 5;
@@ -1400,6 +1403,7 @@ pw_stream_connect(struct pw_stream *stream,
if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "1");
+ impl->alloc_buffers = SPA_FLAG_CHECK(flags, PW_STREAM_FLAG_ALLOC_BUFFERS);
pw_properties_setf(stream->properties, PW_KEY_MEDIA_CLASS, "Stream/%s/Audio",
direction == PW_DIRECTION_INPUT ? "Input" : "Output");
diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h
index c36d5f1e..63215be3 100644
--- a/src/pipewire/stream.h
+++ b/src/pipewire/stream.h
@@ -242,6 +242,9 @@ enum pw_stream_flags {
* device */
PW_STREAM_FLAG_DONT_RECONNECT = (1 << 7), /**< don't try to reconnect this stream
* when the sink/source is removed */
+ PW_STREAM_FLAG_ALLOC_BUFFERS = (1 << 8), /**< the application will allocate buffer
+ * memory. In the add_buffer event, the
+ * data of the buffer should be set */
};
/** Create a new unconneced \ref pw_stream \memberof pw_stream