summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2019-02-20 17:51:05 +0100
committerWim Taymans <wtaymans@redhat.com>2019-02-21 09:40:12 +0100
commit7b12212eeb0f2f7d30b6f9a7b239fe967b1276c1 (patch)
tree0bb32b2004cce5f0180080d00357b7df451aad4d /src
parentb743518f78ac9a763a31864cefcd832d518bc21b (diff)
node: improve async handlingasync
Remove the done and error callbacks. The error callback is in an error message. The done callback is replace with spa_pending. Make enum_params take a callback and data for the results. This allows us to push the results one after another to the app and avoids ownership issues of the passed data. We can then extend this to handle the async case by doing a _wait call with a spa_pending+callback+data that will be called when the _enum_params returns and async result. Add a sync method. All methods can now return SPA_RESULT_IS_ASYNC return values and you can use spa_node_wait() to register a callback when they complete with optional extra parameters. This makes it easier to sync and handle the reply. Make helper methods to simulate the sync enum_params behaviour for sync nodes. Let the transport generate the sequence number for pw_resource_sync() and pw_proxy_sync(). That way we don't need to keep track of numbers ourselves and we can match the reply to the request easily.
Diffstat (limited to 'src')
-rw-r--r--src/examples/export-sink.c85
-rw-r--r--src/examples/export-source.c99
-rw-r--r--src/examples/local-v4l2.c70
-rw-r--r--src/examples/media-session.c20
-rw-r--r--src/extensions/protocol-native.h5
-rw-r--r--src/gst/gstpipewiredeviceprovider.c9
-rw-r--r--src/modules/module-audio-dsp/floatmix.c50
-rw-r--r--src/modules/module-client-node/client-node.c164
-rw-r--r--src/modules/module-client-node/client-node.h1
-rw-r--r--src/modules/module-client-node/client-stream.c124
-rw-r--r--src/modules/module-client-node/protocol-native.c32
-rw-r--r--src/modules/module-client-node/remote-node.c13
-rw-r--r--src/modules/module-protocol-native.c9
-rw-r--r--src/modules/module-protocol-native/connection.c25
-rw-r--r--src/modules/module-protocol-native/connection.h9
-rw-r--r--src/modules/module-protocol-native/protocol-native.c80
-rw-r--r--src/modules/spa/spa-node.c42
-rw-r--r--src/modules/spa/spa-node.h7
-rw-r--r--src/pipewire/core.c17
-rw-r--r--src/pipewire/interfaces.h6
-rw-r--r--src/pipewire/introspect.c2
-rw-r--r--src/pipewire/link.c82
-rw-r--r--src/pipewire/node.c88
-rw-r--r--src/pipewire/node.h6
-rw-r--r--src/pipewire/port.c9
-rw-r--r--src/pipewire/proxy.c8
-rw-r--r--src/pipewire/proxy.h4
-rw-r--r--src/pipewire/resource.c8
-rw-r--r--src/pipewire/resource.h4
-rw-r--r--src/pipewire/stream.c40
-rw-r--r--src/pipewire/utils.h14
-rw-r--r--src/pipewire/work-queue.c1
-rw-r--r--src/tools/pipewire-cli.c13
-rw-r--r--src/tools/pipewire-monitor.c7
34 files changed, 671 insertions, 482 deletions
diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c
index 0f65dcc8..51d3e3a5 100644
--- a/src/examples/export-sink.c
+++ b/src/examples/export-sink.c
@@ -30,6 +30,7 @@
#include <spa/param/props.h>
#include <spa/node/io.h>
#include <spa/control/control.h>
+#include <spa/pod/filter.h>
#include <spa/debug/format.h>
#include <spa/debug/pod.h>
@@ -179,36 +180,24 @@ static int impl_port_set_io(struct spa_node *node,
return 0;
}
-static int port_enum_formats(struct spa_node *node,
- enum spa_direction direction, uint32_t port_id,
- uint32_t *index,
- const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
-{
- struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
- SDL_RendererInfo info;
-
- if (*index != 0)
- return 0;
-
- SDL_GetRendererInfo(d->renderer, &info);
- *result = sdl_build_formats(&info, builder);
-
- (*index)++;
-
- return 1;
-}
-
static int impl_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[1024];
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
+
+ result.next = start;
+
+ next:
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
case SPA_PARAM_List:
@@ -219,28 +208,36 @@ static int impl_port_enum_params(struct spa_node *node,
SPA_PARAM_Meta,
SPA_PARAM_IO };
- if (*index < SPA_N_ELEMENTS(list))
- param = spa_pod_builder_add_object(builder,
+ if (result.next < SPA_N_ELEMENTS(list))
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
- SPA_PARAM_LIST_id, SPA_POD_Id(list[*index]));
+ SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
else
return 0;
break;
}
case SPA_PARAM_EnumFormat:
- return port_enum_formats(node, direction, port_id, index, filter, result, builder);
+ {
+ SDL_RendererInfo info;
+
+ if (result.next != 0)
+ return 0;
+ SDL_GetRendererInfo(d->renderer, &info);
+ param = sdl_build_formats(&info, &b);
+ break;
+ }
case SPA_PARAM_Format:
- if (*index != 0 || d->format.format == 0)
+ if (result.next != 0 || d->format.format == 0)
return 0;
- param = spa_format_video_raw_build(builder, id, &d->format);
+ param = spa_format_video_raw_build(&b, id, &d->format);
break;
case SPA_PARAM_Buffers:
- if (*index != 0)
+ if (result.next != 0)
return 0;
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 2, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
@@ -250,15 +247,15 @@ static int impl_port_enum_params(struct spa_node *node,
break;
case SPA_PARAM_Meta:
- switch (*index) {
+ switch (result.next) {
case 0:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)));
break;
case 1:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoDamage),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_region)));
@@ -269,15 +266,15 @@ static int impl_port_enum_params(struct spa_node *node,
break;
case SPA_PARAM_IO:
- switch (*index) {
+ switch (result.next) {
case 0:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Notify),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence) + 1024));
@@ -289,12 +286,18 @@ static int impl_port_enum_params(struct spa_node *node,
default:
return -ENOENT;
}
+ result.next++;
- (*index)++;
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
- *result = param;
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
- return 1;
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static int port_set_format(struct spa_node *node,
diff --git a/src/examples/export-source.c b/src/examples/export-source.c
index e033a530..3370aefe 100644
--- a/src/examples/export-source.c
+++ b/src/examples/export-source.c
@@ -30,6 +30,7 @@
#include <spa/param/audio/format-utils.h>
#include <spa/param/props.h>
#include <spa/node/io.h>
+#include <spa/pod/filter.h>
#include <spa/debug/format.h>
#include <pipewire/pipewire.h>
@@ -147,43 +148,24 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction,
return 0;
}
-static int port_enum_formats(struct spa_node *node,
- enum spa_direction direction, uint32_t port_id,
- uint32_t *index,
- const struct spa_pod *filter,
- struct spa_pod **param,
- struct spa_pod_builder *builder)
-{
- if (*index != 0)
- return 0;
-
- *param = spa_pod_builder_add_object(builder,
- SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
- SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio),
- SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw),
- SPA_FORMAT_AUDIO_format, SPA_POD_CHOICE_ENUM_Id(5,
- SPA_AUDIO_FORMAT_S16,
- SPA_AUDIO_FORMAT_S16P,
- SPA_AUDIO_FORMAT_S16,
- SPA_AUDIO_FORMAT_F32P,
- SPA_AUDIO_FORMAT_F32),
- SPA_FORMAT_AUDIO_channels, SPA_POD_CHOICE_RANGE_Int(2, 1, INT32_MAX),
- SPA_FORMAT_AUDIO_rate, SPA_POD_CHOICE_RANGE_Int(44100, 1, INT32_MAX));
-
- (*index)++;
-
- return 1;
-}
-
static int impl_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[1024];
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
+
+ result.next = start;
+
+ next:
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
case SPA_PARAM_List:
@@ -194,30 +176,45 @@ static int impl_port_enum_params(struct spa_node *node,
SPA_PARAM_Meta,
SPA_PARAM_IO };
- if (*index < SPA_N_ELEMENTS(list))
- param = spa_pod_builder_add_object(builder,
+ if (result.next < SPA_N_ELEMENTS(list))
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
- SPA_PARAM_LIST_id, SPA_POD_Id(list[*index]));
+ SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
else
return 0;
break;
}
case SPA_PARAM_EnumFormat:
- return port_enum_formats(node, direction, port_id, index, filter, result, builder);
+ if (result.next != 0)
+ return 0;
+
+ param = spa_pod_builder_add_object(&b,
+ SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat,
+ SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio),
+ SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw),
+ SPA_FORMAT_AUDIO_format, SPA_POD_CHOICE_ENUM_Id(5,
+ SPA_AUDIO_FORMAT_S16,
+ SPA_AUDIO_FORMAT_S16P,
+ SPA_AUDIO_FORMAT_S16,
+ SPA_AUDIO_FORMAT_F32P,
+ SPA_AUDIO_FORMAT_F32),
+ SPA_FORMAT_AUDIO_channels, SPA_POD_CHOICE_RANGE_Int(2, 1, INT32_MAX),
+ SPA_FORMAT_AUDIO_rate, SPA_POD_CHOICE_RANGE_Int(44100, 1, INT32_MAX));
+ break;
case SPA_PARAM_Format:
- if (*index != 0)
+ if (result.next != 0)
return 0;
if (d->format.format == 0)
return 0;
- param = spa_format_audio_raw_build(builder, id, &d->format);
+ param = spa_format_audio_raw_build(&b, id, &d->format);
break;
case SPA_PARAM_Buffers:
- if (*index > 0)
+ if (result.next > 0)
return 0;
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, 32),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
@@ -227,9 +224,9 @@ static int impl_port_enum_params(struct spa_node *node,
break;
case SPA_PARAM_Meta:
- switch (*index) {
+ switch (result.next) {
case 0:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)));
@@ -239,15 +236,15 @@ static int impl_port_enum_params(struct spa_node *node,
}
break;
case SPA_PARAM_IO:
- switch (*index) {
+ switch (result.next) {
case 0:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
- param = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Notify),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence) + 1024));
@@ -260,10 +257,18 @@ static int impl_port_enum_params(struct spa_node *node,
return -ENOENT;
}
- (*index)++;
- *result = param;
+ result.next++;
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
- return 1;
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static int port_set_format(struct spa_node *node,
diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c
index c2c66d61..5452f7b8 100644
--- a/src/examples/local-v4l2.c
+++ b/src/examples/local-v4l2.c
@@ -33,6 +33,7 @@
#include <spa/param/video/format-utils.h>
#include <spa/param/props.h>
+#include <spa/pod/filter.h>
#include <spa/node/io.h>
#include <spa/debug/format.h>
@@ -121,45 +122,42 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction,
return 0;
}
-static int port_enum_formats(struct spa_node *node,
- enum spa_direction direction, uint32_t port_id,
- uint32_t *index,
- const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
-{
- struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
- SDL_RendererInfo info;
-
- if (*index != 0)
- return 0;
-
- SDL_GetRendererInfo(d->renderer, &info);
- *result = sdl_build_formats(&info, builder);
-
- (*index)++;
-
- return 1;
-}
-
static int impl_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
+ struct spa_pod *param;
+ struct spa_pod_builder b = { 0 };
+ uint8_t buffer[1024];
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
+
+ result.next = start;
+
+ next:
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
case SPA_PARAM_EnumFormat:
- return port_enum_formats(node, direction, port_id, index, filter, result, builder);
+ {
+ SDL_RendererInfo info;
+
+ if (result.next > 0)
+ return 0;
+ SDL_GetRendererInfo(d->renderer, &info);
+ param = sdl_build_formats(&info, &b);
+ break;
+ }
case SPA_PARAM_Buffers:
- if (*index > 0)
+ if (result.next > 0)
return 0;
- *result = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, 32),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
@@ -169,10 +167,10 @@ static int impl_port_enum_params(struct spa_node *node,
break;
case SPA_PARAM_Meta:
- if (*index > 0)
+ if (result.next > 0)
return 0;
- *result = spa_pod_builder_add_object(builder,
+ param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)));
@@ -182,8 +180,18 @@ static int impl_port_enum_params(struct spa_node *node,
return -ENOENT;
}
- (*index)++;
- return 1;
+ result.next++;
+
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ goto next;
+
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static int port_set_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id,
diff --git a/src/examples/media-session.c b/src/examples/media-session.c
index 8b0526f3..8937cf00 100644
--- a/src/examples/media-session.c
+++ b/src/examples/media-session.c
@@ -193,7 +193,7 @@ static void *find_object(struct impl *impl, uint32_t id)
static void schedule_rescan(struct impl *impl)
{
if (impl->core_proxy)
- pw_core_proxy_sync(impl->core_proxy, 0, ++impl->seq);
+ impl->seq = pw_core_proxy_sync(impl->core_proxy, 0);
}
static void remove_idle_timeout(struct session *sess)
@@ -574,7 +574,8 @@ handle_node(struct impl *impl, uint32_t id, uint32_t parent_id,
node->type = NODE_TYPE_DEVICE;
node->manager = sess;
- pw_log_debug(NAME" %p: new session for device node %d", impl, id);
+ pw_log_debug(NAME" %p: new session for device node %d %d", impl, id,
+ need_dsp);
}
return 1;
}
@@ -687,14 +688,15 @@ handle_port(struct impl *impl, uint32_t id, uint32_t parent_id, uint32_t type,
spa_list_append(&node->port_list, &port->l);
+ pw_log_debug(NAME" %p: new port %d for node %d type %d %08x", impl, id, parent_id,
+ node->type, port->flags);
+
if (node->type == NODE_TYPE_DEVICE) {
pw_port_proxy_enum_params((struct pw_port_proxy*)p,
SPA_PARAM_EnumFormat,
0, -1, NULL);
}
- pw_log_debug(NAME" %p: new port %d for node %d type %d %08x", impl, id, parent_id,
- node->type, port->flags);
return 0;
}
@@ -1240,12 +1242,16 @@ static void rescan_session(struct impl *impl, struct session *sess)
struct spa_pod *param;
const char *str;
- if (node->info->props == NULL)
+ if (node->info->props == NULL) {
+ pw_log_debug(NAME " %p: node %p has no properties", impl, node);
return;
+ }
if (node->media_type != SPA_MEDIA_TYPE_audio ||
- node->media_subtype != SPA_MEDIA_SUBTYPE_raw)
+ node->media_subtype != SPA_MEDIA_SUBTYPE_raw) {
+ pw_log_debug(NAME " %p: node %p has no media type", impl, node);
return;
+ }
info = node->format;
@@ -1290,6 +1296,7 @@ static void do_rescan(struct impl *impl)
struct node *node;
clock_gettime(CLOCK_MONOTONIC, &impl->now);
+ pw_log_debug("media-session %p: do rescan", impl);
spa_list_for_each(sess, &impl->session_list, l)
rescan_session(impl, sess);
@@ -1300,6 +1307,7 @@ static void do_rescan(struct impl *impl)
static int core_done(void *data, uint32_t id, uint32_t seq)
{
struct impl *impl = data;
+ pw_log_debug("media-session %p: sync %d %u/%u", impl, id, seq, impl->seq);
if (impl->seq == seq)
do_rescan(impl);
return 0;
diff --git a/src/extensions/protocol-native.h b/src/extensions/protocol-native.h
index 00756e5b..3a690662 100644
--- a/src/extensions/protocol-native.h
+++ b/src/extensions/protocol-native.h
@@ -46,7 +46,7 @@ struct pw_protocol_native_ext {
uint32_t version;
struct spa_pod_builder * (*begin_proxy) (struct pw_proxy *proxy,
- uint8_t opcode);
+ uint8_t opcode, int *res);
uint32_t (*add_proxy_fd) (struct pw_proxy *proxy, int fd);
int (*get_proxy_fd) (struct pw_proxy *proxy, uint32_t index);
@@ -55,14 +55,13 @@ struct pw_protocol_native_ext {
struct spa_pod_builder *builder);
struct spa_pod_builder * (*begin_resource) (struct pw_resource *resource,
- uint8_t opcode);
+ uint8_t opcode, int *res);
uint32_t (*add_resource_fd) (struct pw_resource *resource, int fd);
int (*get_resource_fd) (struct pw_resource *resource, uint32_t index);
int (*end_resource) (struct pw_resource *resource,
struct spa_pod_builder *builder);
-
};
#define pw_protocol_native_begin_proxy(p,...) pw_protocol_ext(pw_proxy_get_protocol(p),struct pw_protocol_native_ext,begin_proxy,p,__VA_ARGS__)
diff --git a/src/gst/gstpipewiredeviceprovider.c b/src/gst/gstpipewiredeviceprovider.c
index aa47b914..f43bda41 100644
--- a/src/gst/gstpipewiredeviceprovider.c
+++ b/src/gst/gstpipewiredeviceprovider.c
@@ -279,9 +279,8 @@ static void add_pending(GstPipeWireDeviceProvider *self, struct pending *p,
spa_list_append(&self->pending, &p->link);
p->callback = callback;
p->data = data;
- p->seq = ++self->seq;
pw_log_debug("add pending %d", p->seq);
- pw_core_proxy_sync(self->core_proxy, 0, p->seq);
+ self->seq = p->seq = pw_core_proxy_sync(self->core_proxy, 0);
}
static void remove_pending(struct pending *p)
@@ -559,7 +558,6 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
spa_list_init(&data->ports);
spa_list_init(&self->pending);
- self->seq = 1;
pw_remote_add_listener(r, &listener, &remote_events, data);
if (pw_remote_connect (r) < 0)
@@ -591,7 +589,7 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
data->registry = pw_core_proxy_get_registry(self->core_proxy,
PW_TYPE_INTERFACE_Registry, PW_VERSION_REGISTRY, 0);
pw_registry_proxy_add_listener(data->registry, &data->registry_listener, &registry_events, data);
- pw_core_proxy_sync(self->core_proxy, 0, ++self->seq);
+ self->seq = pw_core_proxy_sync(self->core_proxy, 0);
for (;;) {
if (pw_remote_get_state(r, NULL) <= 0)
@@ -624,7 +622,6 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
self->loop = pw_loop_new (NULL);
self->list_only = FALSE;
spa_list_init(&self->pending);
- self->seq = 1;
if (!(self->main_loop = pw_thread_loop_new (self->loop, "pipewire-device-monitor"))) {
GST_ERROR_OBJECT (self, "Could not create PipeWire mainloop");
@@ -681,7 +678,7 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
data->registry = self->registry;
pw_registry_proxy_add_listener(self->registry, &data->registry_listener, &registry_events, data);
- pw_core_proxy_sync(self->core_proxy, 0, ++self->seq);
+ self->seq = pw_core_proxy_sync(self->core_proxy, 0);
for (;;) {
if (self->end)
diff --git a/src/modules/module-audio-dsp/floatmix.c b/src/modules/module-audio-dsp/floatmix.c
index 750ae81c..fc35cf4e 100644
--- a/src/modules/module-audio-dsp/floatmix.c
+++ b/src/modules/module-audio-dsp/floatmix.c
@@ -122,10 +122,9 @@ struct impl {
#define GET_PORT(this,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,p) : GET_OUT_PORT(this,p))
static int impl_node_enum_params(struct spa_node *node,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **param,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
return -ENOTSUP;
}
@@ -274,13 +273,13 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
static int port_enum_formats(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t *index,
+ uint32_t index,
struct spa_pod **param,
struct spa_pod_builder *builder)
{
struct impl *this = SPA_CONTAINER_OF(node, struct impl, node);
- switch (*index) {
+ switch (index) {
case 0:
if (this->have_format) {
*param = spa_format_audio_raw_build(builder, SPA_PARAM_EnumFormat,
@@ -304,21 +303,22 @@ static int port_enum_formats(struct spa_node *node,
static int
impl_node_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct impl *this;
struct port *port;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
uint8_t buffer[1024];
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
- spa_return_val_if_fail(index != NULL, -EINVAL);
- spa_return_val_if_fail(builder != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
@@ -326,6 +326,8 @@ impl_node_port_enum_params(struct spa_node *node,
port = GET_PORT(this, direction, port_id);
+ result.next = start;
+
next:
spa_pod_builder_init(&b, buffer, sizeof(buffer));
@@ -338,32 +340,32 @@ impl_node_port_enum_params(struct spa_node *node,
SPA_PARAM_Meta,
SPA_PARAM_IO };
- if (*index < SPA_N_ELEMENTS(list))
+ if (result.next < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
- SPA_PARAM_LIST_id, SPA_POD_Id(list[*index]));
+ SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
else
return 0;
break;
}
case SPA_PARAM_EnumFormat:
- if ((res = port_enum_formats(node, direction, port_id, index, &param, &b)) <= 0)
+ if ((res = port_enum_formats(node, direction, port_id, result.next, &param, &b)) <= 0)
return res;
break;
case SPA_PARAM_Format:
if (!port->have_format)
return -EIO;
- if (*index > 0)
+ if (result.next > 0)
return 0;
- param = spa_format_audio_raw_build(builder, id, &this->format.info.raw);
+ param = spa_format_audio_raw_build(&b, id, &this->format.info.raw);
break;
case SPA_PARAM_Buffers:
if (!port->have_format)
return -EIO;
- if (*index > 0)
+ if (result.next > 0)
return 0;
param = spa_pod_builder_add_object(&b,
@@ -382,7 +384,7 @@ impl_node_port_enum_params(struct spa_node *node,
if (!port->have_format)
return -EIO;
- switch (*index) {
+ switch (result.next) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, id,
@@ -395,7 +397,7 @@ impl_node_port_enum_params(struct spa_node *node,
break;
case SPA_PARAM_IO:
- switch (*index) {
+ switch (result.next) {
case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
@@ -422,12 +424,18 @@ impl_node_port_enum_params(struct spa_node *node,
return -ENOENT;
}
- (*index)++;
+ result.next++;
- if (spa_pod_filter(builder, result, param, filter) < 0)
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
- return 1;
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static int clear_buffers(struct impl *this, struct port *port)
diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c
index 58a6ed7d..c6bd316c 100644
--- a/src/modules/module-client-node/client-node.c
+++ b/src/modules/module-client-node/client-node.c
@@ -146,6 +146,8 @@ struct node {
uint32_t n_params;
struct spa_pod **params;
+
+ struct spa_list pending_list;
};
struct impl {
@@ -163,6 +165,7 @@ struct impl {
struct spa_hook resource_listener;
struct pw_array mems;
+ uint32_t init_seq;
int fds[2];
int other_fds[2];
@@ -352,34 +355,47 @@ static void mix_clear(struct node *this, struct mix *mix)
}
static int impl_node_enum_params(struct spa_node *node,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct node *this;
+ uint8_t buffer[1024];
+ struct spa_pod_builder b = { 0 };
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
- spa_return_val_if_fail(index != NULL, -EINVAL);
- spa_return_val_if_fail(builder != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
+ result.next = start;
+
while (true) {
struct spa_pod *param;
- if (*index >= this->n_params)
+ if (result.next >= this->n_params)
return 0;
- param = this->params[(*index)++];
+ param = this->params[result.next++];
if (param == NULL || !spa_pod_is_object_id(param, id))
continue;
- if (spa_pod_filter(builder, result, param, filter) == 0)
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+ if (spa_pod_filter(&b, &result.param, param, filter) != 0)
+ continue;
+
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
break;
}
- return 1;
+ return 0;
}
static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flags,
@@ -392,7 +408,7 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag
this = SPA_CONTAINER_OF(node, struct node, node);
if (this->resource == NULL)
- return 0;
+ return -EIO;
return pw_client_node_resource_set_param(this->resource, id, flags, param);
}
@@ -410,9 +426,12 @@ static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
- if (this->resource == NULL)
+ if (impl->this.flags & 1)
return 0;
+ if (this->resource == NULL)
+ return -EIO;
+
if (data) {
if ((mem = pw_memblock_find(data)) == NULL)
return -EINVAL;
@@ -451,7 +470,7 @@ static int impl_node_send_command(struct spa_node *node, const struct spa_comman
this = SPA_CONTAINER_OF(node, struct node, node);
if (this->resource == NULL)
- return 0;
+ return -EIO;
pw_log_debug("client-node %p: send command %d", node, SPA_COMMAND_TYPE(command));
return pw_client_node_resource_command(this->resource, command);
@@ -460,9 +479,9 @@ static int impl_node_send_command(struct spa_node *node, const struct spa_comman
static void emit_port_info(struct node *this, struct port *port)
{
- if (this->callbacks && this->callbacks->port_info) {
- this->callbacks->port_info(this->callbacks_data, port->direction, port->id, &port->info);
- }
+ if (this->callbacks && this->callbacks->port_info)
+ this->callbacks->port_info(this->callbacks_data,
+ port->direction, port->id, &port->info);
}
static int
@@ -471,6 +490,7 @@ impl_node_set_callbacks(struct spa_node *node,
void *data)
{
struct node *this;
+ int res = 0;
uint32_t i;
spa_return_val_if_fail(node != NULL, -EINVAL);
@@ -479,6 +499,8 @@ impl_node_set_callbacks(struct spa_node *node,
this->callbacks = callbacks;
this->callbacks_data = data;
+ pw_log_debug("client-node %p: callbacks %p", this, callbacks);
+
for (i = 0; i < MAX_INPUTS; i++) {
if (this->in_ports[i])
emit_port_info(this, this->in_ports[i]);
@@ -487,17 +509,50 @@ impl_node_set_callbacks(struct spa_node *node,
if (this->out_ports[i])
emit_port_info(this, this->out_ports[i]);
}
- return 0;
+ if (callbacks && this->resource)
+ res = pw_resource_sync(this->resource);
+
+ return res;
}
static int
-impl_node_sync(struct spa_node *node, uint32_t seq)
+impl_node_sync(struct spa_node *node)
{
struct node *this;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
- pw_log_debug("client-node %p: sync %u", node, seq);
- return pw_resource_sync(this->resource, seq);
+ pw_log_debug("client-node %p: sync", node);
+ if (this->resource == NULL)
+ return -EIO;
+ return pw_resource_sync(this->resource);
+}
+
+static int
+impl_node_wait(struct spa_node *node, int res, struct spa_pending *pending,
+ spa_result_func_t func, void *data)
+{
+ struct node *this;
+ int seq;
+
+ spa_return_val_if_fail(node != NULL, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
+ spa_return_val_if_fail(pending != NULL, -EINVAL);
+
+ this = SPA_CONTAINER_OF(node, struct node, node);
+
+ pw_log_debug("client-node %p: wait %d %d", node, res, SPA_RESULT_ASYNC_SEQ(res));
+ if (this->resource == NULL)
+ return -EIO;
+
+ seq = pw_resource_sync(this->resource);
+
+ pending->seq = seq;
+ pending->res = res;
+ pending->func = func;
+ pending->data = data;
+ spa_list_append(&this->pending_list, &pending->link);
+
+ return seq;
}
static void
@@ -520,7 +575,7 @@ do_update_port(struct node *this,
port->params = realloc(port->params, port->n_params * sizeof(struct spa_pod *));
for (i = 0; i < port->n_params; i++) {
- port->params[i] = params[i] ? pw_spa_pod_copy(params[i]) : NULL;
+ port->params[i] = params[i] ? spa_pod_copy(params[i]) : NULL;
if (port->params[i] && spa_pod_is_object_id(port->params[i], SPA_PARAM_Format))
port->have_format = true;
@@ -608,17 +663,21 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
static int
impl_node_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct node *this;
struct port *port;
+ uint8_t buffer[1024];
+ struct spa_pod_builder b = { 0 };
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
- spa_return_val_if_fail(index != NULL, -EINVAL);
- spa_return_val_if_fail(builder != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
@@ -629,21 +688,30 @@ impl_node_port_enum_params(struct spa_node *node,
pw_log_debug("client-node %p: port %d.%d", this,
direction, port_id);
+ result.next = start;
+
while (true) {
struct spa_pod *param;
- if (*index >= port->n_params)
+ if (result.next >= port->n_params)
return 0;
- param = port->params[(*index)++];
+ param = port->params[result.next++];
if (param == NULL || !spa_pod_is_object_id(param, id))
continue;
- if (spa_pod_filter(builder, result, param, filter) == 0)
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
+ continue;
+
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
break;
}
- return 1;
+ return 0;
}
static int
@@ -661,7 +729,7 @@ impl_node_port_set_param(struct spa_node *node,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
if (this->resource == NULL)
- return 0;
+ return -EIO;
pw_log_debug("node %p: port %d.%d add param %s %d", this,
direction, port_id,
@@ -692,7 +760,7 @@ static int do_port_set_io(struct impl *impl,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
if (this->resource == NULL)
- return 0;
+ return -EIO;
port = GET_PORT(this, direction, port_id);
@@ -781,7 +849,7 @@ do_port_use_buffers(struct impl *impl,
mix->n_buffers = n_buffers;
if (this->resource == NULL)
- return 0;
+ return -EIO;
for (i = 0; i < n_buffers; i++) {
struct buffer *b = &mix->buffers[i];
@@ -963,7 +1031,7 @@ client_node_update(void *data,
this->params = realloc(this->params, this->n_params * sizeof(struct spa_pod *));
for (i = 0; i < this->n_params; i++)
- this->params[i] = params[i] ? pw_spa_pod_copy(params[i]) : NULL;
+ this->params[i] = params[i] ? spa_pod_copy(params[i]) : NULL;
}
if (change_mask & PW_CLIENT_NODE_UPDATE_PROPS) {
pw_node_update_properties(impl->this.node, props);
@@ -1068,6 +1136,7 @@ static const struct spa_node impl_node = {
NULL,
.set_callbacks = impl_node_set_callbacks,
.sync = impl_node_sync,
+ .wait = impl_node_wait,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.set_io = impl_node_set_io,
@@ -1109,6 +1178,7 @@ node_init(struct node *this,
}
this->node = impl_node;
+ spa_list_init(&this->pending_list);
init_ios(this->ios);
@@ -1174,18 +1244,24 @@ static void client_node_resource_error(void *data, int res, const char *message)
struct impl *impl = data;
struct node *this = &impl->node;
pw_log_error("client-node %p: error %d: %s", this, res, message);
- if (this->callbacks && this->callbacks->error)
- this->callbacks->error(this->callbacks_data, res, message);
}
static void client_node_resource_done(void *data, uint32_t seq)
{
struct impl *impl = data;
struct node *this = &impl->node;
+ struct spa_pending *p, *t;
pw_log_debug("client-node %p: done %d", this, seq);
- if (this->callbacks && this->callbacks->done)
- this->callbacks->done(this->callbacks_data, SPA_RESULT_ASYNC_SEQ(seq));
+
+ spa_list_for_each_safe(p, t, &this->pending_list, link) {
+ pw_log_debug("client-node %p: check %d", this, p->seq);
+ if (p->seq == (int) seq) {
+ pw_log_debug("client-node %p: found %d", this, p->res);
+ p->func(p->data, p->res, 0, NULL);
+ spa_list_remove(&p->link);
+ }
+ }
}
void pw_client_node_registered(struct pw_client_node *this, uint32_t node_id)
@@ -1318,14 +1394,13 @@ static const struct pw_port_implementation port_impl = {
static int
impl_mix_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct port *port = SPA_CONTAINER_OF(node, struct port, mix_node);
return impl_node_port_enum_params(&port->node->node, direction, port->id,
- id, index, filter, result, builder);
+ id, start, num, filter, func, data);
}
static int
@@ -1597,6 +1672,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
support = pw_core_get_support(impl->core, &n_support);
node_init(&impl->node, NULL, support, n_support);
impl->node.impl = impl;
+ impl->node.resource = resource;
+ this->flags = do_register ? 0 : 1;
pw_map_init(&impl->io_map, 64, 64);
pw_array_init(&impl->mems, 64);
@@ -1610,7 +1687,6 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
pw_resource_get_client(this->resource),
parent,
name,
- PW_SPA_NODE_FLAG_ASYNC |
(do_register ? 0 : PW_SPA_NODE_FLAG_NO_REGISTER),
&impl->node.node,
NULL,
@@ -1619,6 +1695,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
goto error_no_node;
this->node->remote = true;
+ this->flags = 0;
spa_graph_node_set_callbacks(&this->node->rt.root, &root_impl, this);
@@ -1630,13 +1707,10 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
&client_node_methods,
impl);
- impl->node.resource = this->resource;
this->node->port_user_data_size = sizeof(struct port);
pw_node_add_listener(this->node, &impl->node_listener, &node_events, impl);
- pw_resource_sync(this->resource, 0);
-
return this;
error_no_node:
diff --git a/src/modules/module-client-node/client-node.h b/src/modules/module-client-node/client-node.h
index 1dcf4299..56047a73 100644
--- a/src/modules/module-client-node/client-node.h
+++ b/src/modules/module-client-node/client-node.h
@@ -41,6 +41,7 @@ struct pw_client_node {
struct pw_resource *resource;
struct pw_global *parent;
+ uint32_t flags;
};
struct pw_client_node *
diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c
index 87b6e3de..c2f8a36e 100644
--- a/src/modules/module-client-node/client-stream.c
+++ b/src/modules/module-client-node/client-stream.c
@@ -34,6 +34,7 @@
#include <sys/eventfd.h>
#include <spa/node/node.h>
+#include <spa/node/utils.h>
#include <spa/buffer/alloc.h>
#include <spa/pod/parser.h>
#include <spa/pod/filter.h>
@@ -68,8 +69,6 @@ struct node {
const struct spa_node_callbacks *callbacks;
void *callbacks_data;
-
- uint32_t seq;
};
struct impl {
@@ -83,7 +82,6 @@ struct impl {
struct spa_hook node_listener;
struct spa_hook client_node_listener;
- struct spa_hook resource_listener;
enum spa_direction direction;
@@ -110,24 +108,28 @@ struct impl {
/** \endcond */
static int impl_node_enum_params(struct spa_node *node,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
uint8_t buffer[1024];
+ struct spa_result_node_enum_params result;
+ uint32_t count = 0;
+ int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
- spa_return_val_if_fail(index != NULL, -EINVAL);
- spa_return_val_if_fail(builder != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
+ result.next = start;
+
next:
spa_pod_builder_init(&b, buffer, sizeof(buffer));
@@ -138,10 +140,10 @@ static int impl_node_enum_params(struct spa_node *node,
SPA_PARAM_EnumFormat,
SPA_PARAM_Format };
- if (*index < SPA_N_ELEMENTS(list))
+ if (result.next < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
- SPA_PARAM_LIST_id, SPA_POD_Id(list[*index]));
+ SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
else
return 0;
break;
@@ -149,8 +151,7 @@ static int impl_node_enum_params(struct spa_node *node,
case SPA_PARAM_Props:
if (impl->adapter != impl->cnode) {
return spa_node_enum_params(impl->adapter,
- id, index,
- filter, result, builder);
+ id, start, num, filter, func, data);
}
return 0;
@@ -158,18 +159,24 @@ static int impl_node_enum_params(struct spa_node *node,
case SPA_PARAM_Format:
return spa_node_port_enum_params(impl->cnode,
impl->direction, 0,
- id, index,
- filter, result, builder);
+ id, start, num,
+ filter, func, data);
default:
return -ENOENT;
}
- (*index)++;
+ result.next++;
- if (spa_pod_filter(builder, result, param, filter) < 0)
+ if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
- return 1;
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static void try_link_controls(struct impl *impl)
@@ -323,11 +330,26 @@ impl_node_set_callbacks(struct spa_node *node,
if (this->callbacks && impl->adapter && impl->adapter != impl->cnode)
spa_node_set_callbacks(impl->adapter, &adapter_node_callbacks, impl);
- return 0;
+ return spa_node_sync(impl->cnode);
+}
+
+static int
+impl_node_sync(struct spa_node *node)
+{
+ struct node *this;
+ struct impl *impl;
+
+ spa_return_val_if_fail(node != NULL, -EINVAL);
+
+ this = SPA_CONTAINER_OF(node, struct node, node);
+ impl = this->impl;
+
+ return spa_node_sync(impl->cnode);
}
static int
-impl_node_sync(struct spa_node *node, uint32_t seq)
+impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending,
+ spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
@@ -337,7 +359,7 @@ impl_node_sync(struct spa_node *node, uint32_t seq)
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
- return spa_node_sync(impl->cnode, seq);
+ return spa_node_wait(impl->cnode, seq, pending, func, data);
}
static int
@@ -389,17 +411,16 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
static int
impl_node_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
- spa_return_val_if_fail(index != NULL, -EINVAL);
- spa_return_val_if_fail(builder != NULL, -EINVAL);
+ spa_return_val_if_fail(num != 0, -EINVAL);
+ spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
@@ -408,7 +429,7 @@ impl_node_port_enum_params(struct spa_node *node,
return -EINVAL;
return spa_node_port_enum_params(impl->adapter, direction, port_id, id,
- index, filter, result, builder);
+ start, num, filter, func, data);
}
static int debug_params(struct impl *impl, struct spa_node *node,
@@ -426,11 +447,11 @@ static int debug_params(struct impl *impl, struct spa_node *node,
state = 0;
while (true) {
spa_pod_builder_init(&b, buffer, sizeof(buffer));
- res = spa_node_port_enum_params(node,
+ res = spa_node_port_enum_params_sync(node,
direction, port_id,
id, &state,
NULL, &param, &b);
- if (res <= 0) {
+ if (res != 1) {
if (res < 0)
spa_log_error(this->log, " error: %s", spa_strerror(res));
break;
@@ -460,11 +481,11 @@ static int negotiate_format(struct impl *impl)
spa_log_debug(this->log, NAME "%p: negiotiate", impl);
state = 0;
- if ((res = spa_node_port_enum_params(impl->adapter_mix,
+ if ((res = spa_node_port_enum_params_sync(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_EnumFormat, &state,
- NULL, &format, &b)) <= 0) {
+ NULL, &format, &b)) != 1) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
@@ -473,10 +494,10 @@ static int negotiate_format(struct impl *impl)
}
state = 0;
- if ((res = spa_node_port_enum_params(impl->cnode,
+ if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
- format, &format, &b)) <= 0) {
+ format, &format, &b)) != 1) {
debug_params(impl, impl->cnode, impl->direction, 0,
SPA_PARAM_EnumFormat, format);
return -ENOTSUP;
@@ -524,22 +545,22 @@ static int negotiate_buffers(struct impl *impl)
return 0;
state = 0;
- if ((res = spa_node_port_enum_params(impl->adapter_mix,
+ if ((res = spa_node_port_enum_params_sync(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, &state,
- param, &param, &b)) <= 0) {
+ param, &param, &b)) < 0) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, param);
return -ENOTSUP;
}
- if (res == 0)
+ if (res != 1)
param = NULL;
state = 0;
- if ((res = spa_node_port_enum_params(impl->cnode,
+ if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_Buffers, &state,
param, &param, &b)) < 0) {
@@ -833,6 +854,7 @@ static const struct spa_node impl_node = {
SPA_VERSION_NODE,
.set_callbacks = impl_node_set_callbacks,
.sync = impl_node_sync,
+ .wait = impl_node_wait,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.set_io = impl_node_set_io,
@@ -861,10 +883,7 @@ node_init(struct node *this,
this->log = support[i].data;
}
this->node = impl_node;
-
- this->seq = 1;
-
- return SPA_RESULT_RETURN_ASYNC(this->seq++);
+ return 0;
}
static int do_port_info(void *data, struct pw_port *port)
@@ -957,10 +976,10 @@ static void client_node_initialized(void *data)
state = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
- if ((res = spa_node_port_enum_params(impl->cnode,
+ if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
- NULL, &format, &b)) <= 0) {
+ NULL, &format, &b)) != 1) {
pw_log_warn("client-stream %p: no format given", &impl->this);
impl->adapter = impl->cnode;
impl->adapter_mix = impl->client_port->mix;
@@ -1049,15 +1068,6 @@ static void client_node_initialized(void *data)
items[0] = SPA_DICT_ITEM_INIT("media.class", media_class);
pw_node_update_properties(impl->this.node, &SPA_DICT_INIT(items, 1));
-
- pw_node_register(impl->this.node,
- pw_resource_get_client(impl->client_node->resource),
- impl->client_node->parent,
- NULL);
-
- pw_log_debug("client-stream %p: activating", &impl->this);
-
- pw_node_set_active(impl->this.node, true);
}
static void cleanup(struct impl *impl)
@@ -1087,15 +1097,6 @@ static void client_node_destroy(void *data)
cleanup(impl);
}
-static void client_node_async_complete(void *data, uint32_t seq, int res)
-{
- struct impl *impl = data;
- struct node *node = &impl->node;
-
- pw_log_debug("client-stream %p: async complete %d %d", &impl->this, seq, res);
- node->callbacks->done(node->callbacks_data, seq);
-}
-
static void client_node_active_changed(void *data, bool active)
{
struct impl *impl = data;
@@ -1118,7 +1119,6 @@ static const struct pw_node_events client_node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = client_node_destroy,
.initialized = client_node_initialized,
- .async_complete = client_node_async_complete,
.active_changed = client_node_active_changed,
.info_changed = client_node_info_changed,
};
@@ -1225,7 +1225,7 @@ struct pw_client_stream *pw_client_stream_new(struct pw_resource *resource,
client,
parent,
name,
- PW_SPA_NODE_FLAG_ASYNC,
+ PW_SPA_NODE_FLAG_ACTIVATE,
&impl->node.node,
NULL,
properties, 0);
diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c
index cb31ce81..3e25be25 100644
--- a/src/modules/module-client-node/protocol-native.c
+++ b/src/modules/module-client-node/protocol-native.c
@@ -61,7 +61,7 @@ client_node_marshal_update(void *object,
struct spa_pod_frame f;
uint32_t i;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_UPDATE);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_UPDATE, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -93,7 +93,7 @@ client_node_marshal_port_update(void *object,
struct spa_pod_frame f[2];
uint32_t i, n_items;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE, NULL);
spa_pod_builder_push_struct(b, &f[0]);
spa_pod_builder_add(b,
@@ -134,7 +134,7 @@ static int client_node_marshal_set_active(void *object, bool active)
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_SET_ACTIVE);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_SET_ACTIVE, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Bool(active));
@@ -147,7 +147,7 @@ static int client_node_marshal_event_method(void *object, struct spa_event *even
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_EVENT);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_EVENT, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Pod(event));
@@ -472,7 +472,7 @@ client_node_marshal_add_mem(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_MEM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(mem_id),
@@ -488,7 +488,7 @@ static int client_node_marshal_transport(void *object, uint32_t node_id, int rea
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(node_id),
@@ -505,7 +505,7 @@ client_node_marshal_set_param(void *object, uint32_t id, uint32_t flags,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -520,7 +520,7 @@ static int client_node_marshal_event_event(void *object, const struct spa_event
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_EVENT);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_EVENT, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Pod(event));
@@ -534,7 +534,7 @@ client_node_marshal_command(void *object, const struct spa_command *command)
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_COMMAND);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_COMMAND, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Pod(command));
@@ -551,7 +551,7 @@ client_node_marshal_add_port(void *object,
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_ADD_PORT, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -570,7 +570,7 @@ client_node_marshal_remove_port(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_REMOVE_PORT, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(direction),
@@ -590,7 +590,7 @@ client_node_marshal_port_set_param(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(direction),
@@ -614,7 +614,7 @@ client_node_marshal_port_use_buffers(void *object,
struct spa_pod_frame f;
uint32_t i, j;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_USE_BUFFERS);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_USE_BUFFERS, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -668,7 +668,7 @@ client_node_marshal_port_set_io(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_IO);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_IO, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(direction),
@@ -693,7 +693,7 @@ client_node_marshal_set_activation(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_ACTIVATION);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_ACTIVATION, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(node_id),
@@ -715,7 +715,7 @@ client_node_marshal_set_io(void *object,
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_IO);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_IO, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
SPA_POD_Int(memid),
diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c
index 92625b45..5a56244a 100644
--- a/src/modules/module-client-node/remote-node.c
+++ b/src/modules/module-client-node/remote-node.c
@@ -30,6 +30,7 @@
#include <sys/mman.h>
#include <spa/pod/parser.h>
+#include <spa/node/utils.h>
#include <spa/debug/types.h>
#include "pipewire/pipewire.h"
@@ -413,10 +414,10 @@ static int add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_
struct spa_pod *param;
spa_pod_builder_init(&b, buf, sizeof(buf));
- if (spa_node_port_enum_params(port->node->node,
+ if (spa_node_port_enum_params_sync(port->node->node,
port->direction, port->port_id,
SPA_PARAM_List, &idx1,
- NULL, &param, &b) <= 0)
+ NULL, &param, &b) != 1)
break;
spa_pod_parse_object(param,
@@ -424,18 +425,18 @@ static int add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_
SPA_PARAM_LIST_id, SPA_POD_Id(&id));
params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1));
- params[n_params++] = pw_spa_pod_copy(param);
+ params[n_params++] = spa_pod_copy(param);
for (idx2 = 0;;) {
spa_pod_builder_init(&b, buf, sizeof(buf));
- if (spa_node_port_enum_params(port->node->node,
+ if (spa_node_port_enum_params_sync(port->node->node,
port->direction, port->port_id,
id, &idx2,
- NULL, &param, &b) <= 0)
+ NULL, &param, &b) != 1)
break;
params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1));
- params[n_params++] = pw_spa_pod_copy(param);
+ params[n_params++] = spa_pod_copy(param);
}
}
}
diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c
index 6c3e0e0c..de68ce6a 100644
--- a/src/modules/module-protocol-native.c
+++ b/src/modules/module-protocol-native.c
@@ -754,10 +754,10 @@ const static struct pw_protocol_implementaton protocol_impl = {
};
static struct spa_pod_builder *
-impl_ext_begin_proxy(struct pw_proxy *proxy, uint8_t opcode)
+impl_ext_begin_proxy(struct pw_proxy *proxy, uint8_t opcode, int *res)
{
struct client *impl = SPA_CONTAINER_OF(proxy->remote->conn, struct client, this);
- return pw_protocol_native_connection_begin_proxy(impl->connection, proxy, opcode);
+ return pw_protocol_native_connection_begin(impl->connection, proxy->id, opcode, res);
}
static uint32_t impl_ext_add_proxy_fd(struct pw_proxy *proxy, int fd)
@@ -780,10 +780,10 @@ static int impl_ext_end_proxy(struct pw_proxy *proxy,
}
static struct spa_pod_builder *
-impl_ext_begin_resource(struct pw_resource *resource, uint8_t opcode)
+impl_ext_begin_resource(struct pw_resource *resource, uint8_t opcode, int *res)
{
struct client_data *data = resource->client->user_data;
- return pw_protocol_native_connection_begin_resource(data->connection, resource, opcode);
+ return pw_protocol_native_connection_begin(data->connection, resource->id, opcode, res);
}
static uint32_t impl_ext_add_resource_fd(struct pw_resource *resource, int fd)
@@ -803,7 +803,6 @@ static int impl_ext_end_resource(struct pw_resource *resource,
struct client_data *data = resource->client->user_data;
return pw_protocol_native_connection_end(data->connection, builder);
}
-
const static struct pw_protocol_native_ext protocol_ext_impl = {
PW_VERSION_PROTOCOL_NATIVE_EXT,
impl_ext_begin_proxy,
diff --git a/src/modules/module-protocol-native/connection.c b/src/modules/module-protocol-native/connection.c
index 17af01cf..549c8e01 100644
--- a/src/modules/module-protocol-native/connection.c
+++ b/src/modules/module-protocol-native/connection.c
@@ -369,32 +369,17 @@ static const struct spa_pod_builder_callbacks builder_callbacks = {
};
struct spa_pod_builder *
-pw_protocol_native_connection_begin_resource(struct pw_protocol_native_connection *conn,
- struct pw_resource *resource,
- uint8_t opcode)
+pw_protocol_native_connection_begin(struct pw_protocol_native_connection *conn,
+ uint32_t id, uint8_t opcode, int *res)
{
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
-
- impl->dest_id = resource->id;
- impl->opcode = opcode;
- impl->builder = SPA_POD_BUILDER_INIT(NULL, 0);
- impl->builder.callbacks = &builder_callbacks;
- impl->builder.callbacks_data = impl;
- return &impl->builder;
-}
-
-struct spa_pod_builder *
-pw_protocol_native_connection_begin_proxy(struct pw_protocol_native_connection *conn,
- struct pw_proxy *proxy,
- uint8_t opcode)
-{
- struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
-
- impl->dest_id = proxy->id;
+ impl->dest_id = id;
impl->opcode = opcode;
impl->builder = SPA_POD_BUILDER_INIT(NULL, 0);
impl->builder.callbacks = &builder_callbacks;
impl->builder.callbacks_data = impl;
+ if (res)
+ *res = SPA_RESULT_RETURN_ASYNC(impl->seq);
return &impl->builder;
}
diff --git a/src/modules/module-protocol-native/connection.h b/src/modules/module-protocol-native/connection.h
index 3f4a3bdb..7ff0566e 100644
--- a/src/modules/module-protocol-native/connection.h
+++ b/src/modules/module-protocol-native/connection.h
@@ -82,14 +82,9 @@ uint32_t pw_protocol_native_connection_add_fd(struct pw_protocol_native_connecti
int pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection *conn, uint32_t index);
struct spa_pod_builder *
-pw_protocol_native_connection_begin_resource(struct pw_protocol_native_connection *conn,
- struct pw_resource *resource,
- uint8_t opcode);
+pw_protocol_native_connection_begin(struct pw_protocol_native_connection *conn,
+ uint32_t id, uint8_t opcode, int *res);
-struct spa_pod_builder *
-pw_protocol_native_connection_begin_proxy(struct pw_protocol_native_connection *conn,
- struct pw_proxy *proxy,
- uint8_t opcode);
int
pw_protocol_native_connection_end(struct pw_protocol_native_connection *conn,
struct spa_pod_builder *builder);
diff --git a/src/modules/module-protocol-native/protocol-native.c b/src/modules/module-protocol-native/protocol-native.c
index 26aed5bb..eac6d6c7 100644
--- a/src/modules/module-protocol-native/protocol-native.c
+++ b/src/modules/module-protocol-native/protocol-native.c
@@ -37,7 +37,7 @@ static int core_method_marshal_hello(void *object, uint32_t version)
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_HELLO);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_HELLO, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(version));
@@ -49,12 +49,13 @@ static int core_method_marshal_sync(void *object, uint32_t id, uint32_t seq)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
+ int res;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_SYNC);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_SYNC, &res);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
- SPA_POD_Int(seq));
+ SPA_POD_Int(res));
return pw_protocol_native_end_proxy(proxy, b);
}
@@ -64,7 +65,7 @@ static int core_method_marshal_done(void *object, uint32_t id, uint32_t seq)
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_DONE);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_DONE, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
@@ -78,7 +79,7 @@ static int core_method_marshal_error(void *object, uint32_t id, int res, const c
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_ERROR);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_ERROR, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
@@ -93,7 +94,7 @@ static int core_method_marshal_get_registry(void *object, uint32_t version, uint
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_GET_REGISTRY);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_GET_REGISTRY, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(version),
@@ -128,7 +129,7 @@ core_method_marshal_create_object(void *object,
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_CREATE_OBJECT);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_CREATE_OBJECT, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -149,7 +150,7 @@ core_method_marshal_destroy(void *object, uint32_t id)
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_DESTROY);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_DESTROY, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id));
@@ -263,7 +264,7 @@ static int core_event_marshal_info(void *object, const struct pw_core_info *info
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -286,7 +287,7 @@ static int core_event_marshal_done(void *object, uint32_t id, uint32_t seq)
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_DONE);
+ b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_DONE, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
@@ -299,12 +300,13 @@ static int core_event_marshal_sync(void *object, uint32_t id, uint32_t seq)
{
struct pw_resource *resource = object;
struct spa_pod_builder *b;
+ int res;
- b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_SYNC);
+ b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_SYNC, &res);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
- SPA_POD_Int(seq));
+ SPA_POD_Int(res));
return pw_protocol_native_end_resource(resource, b);
}
@@ -314,7 +316,7 @@ static int core_event_marshal_error(void *object, uint32_t id, int res, const ch
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_ERROR);
+ b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_ERROR, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
@@ -329,7 +331,7 @@ static int core_event_marshal_remove_id(void *object, uint32_t id)
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_REMOVE_ID);
+ b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_REMOVE_ID, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id));
@@ -475,7 +477,7 @@ static int registry_marshal_global(void *object, uint32_t id, uint32_t parent_id
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_REGISTRY_PROXY_EVENT_GLOBAL);
+ b = pw_protocol_native_begin_resource(resource, PW_REGISTRY_PROXY_EVENT_GLOBAL, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -496,7 +498,7 @@ static int registry_marshal_global_remove(void *object, uint32_t id)
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_REGISTRY_PROXY_EVENT_GLOBAL_REMOVE);
+ b = pw_protocol_native_begin_resource(resource, PW_REGISTRY_PROXY_EVENT_GLOBAL_REMOVE, NULL);
spa_pod_builder_add_struct(b, SPA_POD_Int(id));
@@ -540,7 +542,7 @@ static int module_marshal_info(void *object, const struct pw_module_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_MODULE_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_MODULE_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -597,7 +599,7 @@ static int device_marshal_info(void *object, const struct pw_device_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_DEVICE_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_DEVICE_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -650,7 +652,7 @@ static int device_marshal_param(void *object, uint32_t id, uint32_t index, uint3
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_DEVICE_PROXY_EVENT_PARAM);
+ b = pw_protocol_native_begin_resource(resource, PW_DEVICE_PROXY_EVENT_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -685,7 +687,7 @@ static int device_marshal_enum_params(void *object, uint32_t id, uint32_t index,
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_ENUM_PARAMS);
+ b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_ENUM_PARAMS, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -720,7 +722,7 @@ static int device_marshal_set_param(void *object, uint32_t id, uint32_t flags,
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_SET_PARAM);
+ b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_SET_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -752,7 +754,7 @@ static int factory_marshal_info(void *object, const struct pw_factory_info *info
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_FACTORY_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_FACTORY_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -809,7 +811,7 @@ static int node_marshal_info(void *object, const struct pw_node_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_NODE_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_NODE_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -874,7 +876,7 @@ static int node_marshal_param(void *object, uint32_t id, uint32_t index, uint32_
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_NODE_PROXY_EVENT_PARAM);
+ b = pw_protocol_native_begin_resource(resource, PW_NODE_PROXY_EVENT_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -909,7 +911,7 @@ static int node_marshal_enum_params(void *object, uint32_t id, uint32_t index, u
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_ENUM_PARAMS);
+ b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_ENUM_PARAMS, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -944,7 +946,7 @@ static int node_marshal_set_param(void *object, uint32_t id, uint32_t flags,
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_SET_PARAM);
+ b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_SET_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -975,7 +977,7 @@ static int node_marshal_send_command(void *object, const struct spa_command *com
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_SEND_COMMAND);
+ b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_SEND_COMMAND, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Pod(command));
return pw_protocol_native_end_proxy(proxy, b);
@@ -1001,7 +1003,7 @@ static int port_marshal_info(void *object, const struct pw_port_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_PORT_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_PORT_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -1054,7 +1056,7 @@ static int port_marshal_param(void *object, uint32_t id, uint32_t index, uint32_
struct pw_resource *resource = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_resource(resource, PW_PORT_PROXY_EVENT_PARAM);
+ b = pw_protocol_native_begin_resource(resource, PW_PORT_PROXY_EVENT_PARAM, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -1089,7 +1091,7 @@ static int port_marshal_enum_params(void *object, uint32_t id, uint32_t index, u
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_PORT_PROXY_METHOD_ENUM_PARAMS);
+ b = pw_protocol_native_begin_proxy(proxy, PW_PORT_PROXY_METHOD_ENUM_PARAMS, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Id(id),
@@ -1124,7 +1126,7 @@ static int client_marshal_info(void *object, const struct pw_client_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -1177,7 +1179,7 @@ static int client_marshal_permissions(void *object, uint32_t index, uint32_t n_p
struct spa_pod_frame f[2];
uint32_t i, n = 0;
- b = pw_protocol_native_begin_resource(resource, PW_CLIENT_PROXY_EVENT_PERMISSIONS);
+ b = pw_protocol_native_begin_resource(resource, PW_CLIENT_PROXY_EVENT_PERMISSIONS, NULL);
for (i = 0; i < n_permissions; i++) {
if (permissions[i].permissions != SPA_ID_INVALID)
@@ -1235,7 +1237,7 @@ static int client_marshal_error(void *object, uint32_t id, int res, const char *
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_ERROR);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_ERROR, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
SPA_POD_Int(res),
@@ -1265,7 +1267,7 @@ static int client_marshal_get_permissions(void *object, uint32_t index, uint32_t
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_GET_PERMISSIONS);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_GET_PERMISSIONS, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(index),
@@ -1280,7 +1282,7 @@ static int client_marshal_update_properties(void *object, const struct spa_dict
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_UPDATE_PROPERTIES);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_UPDATE_PROPERTIES, NULL);
spa_pod_builder_push_struct(b, &f);
push_dict(b, props);
@@ -1338,7 +1340,7 @@ static int client_marshal_update_permissions(void *object, uint32_t n_permission
struct spa_pod_frame f;
uint32_t i;
- b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_UPDATE_PERMISSIONS);
+ b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_PROXY_METHOD_UPDATE_PERMISSIONS, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_int(b, n_permissions);
@@ -1382,7 +1384,7 @@ static int link_marshal_info(void *object, const struct pw_link_info *info)
struct spa_pod_builder *b;
struct spa_pod_frame f;
- b = pw_protocol_native_begin_resource(resource, PW_LINK_PROXY_EVENT_INFO);
+ b = pw_protocol_native_begin_resource(resource, PW_LINK_PROXY_EVENT_INFO, NULL);
spa_pod_builder_push_struct(b, &f);
spa_pod_builder_add(b,
@@ -1497,7 +1499,7 @@ static int registry_marshal_bind(void *object, uint32_t id,
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_REGISTRY_PROXY_METHOD_BIND);
+ b = pw_protocol_native_begin_proxy(proxy, PW_REGISTRY_PROXY_METHOD_BIND, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
@@ -1513,7 +1515,7 @@ static int registry_marshal_destroy(void *object, uint32_t id)
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
- b = pw_protocol_native_begin_proxy(proxy, PW_REGISTRY_PROXY_METHOD_DESTROY);
+ b = pw_protocol_native_begin_proxy(proxy, PW_REGISTRY_PROXY_METHOD_DESTROY, NULL);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id));
return pw_protocol_native_end_proxy(proxy, b);
diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c
index 8772f373..b25a86c6 100644
--- a/src/modules/spa/spa-node.c
+++ b/src/modules/spa/spa-node.c
@@ -31,6 +31,7 @@
#include <dlfcn.h>
#include <spa/node/node.h>
+#include <spa/node/utils.h>
#include <spa/param/props.h>
#include <spa/pod/iter.h>
#include <spa/debug/types.h>
@@ -48,7 +49,6 @@ struct impl {
struct pw_global *parent;
enum pw_spa_node_flags flags;
- bool async_init;
void *hnd;
struct spa_handle *handle;
@@ -57,8 +57,11 @@ struct impl {
char *factory_name;
struct spa_hook node_listener;
+ struct spa_pending init_pending;
void *user_data;
+
+ int async_init:1;
};
static void pw_spa_node_free(void *data)
@@ -79,7 +82,7 @@ static void pw_spa_node_free(void *data)
dlclose(impl->hnd);
}
-static void complete_init(struct impl *impl)
+static int complete_init(struct impl *impl)
{
struct pw_node *this = impl->this;
@@ -93,24 +96,20 @@ static void complete_init(struct impl *impl)
pw_node_register(this, impl->owner, impl->parent, NULL);
else
pw_node_initialized(this);
+ return 0;
}
-static void on_node_done(void *data, uint32_t seq, int res)
+static int on_init_done(void *data, int seq, int res, const void *result)
{
struct impl *impl = data;
struct pw_node *this = impl->this;
-
- if (impl->async_init) {
- complete_init(impl);
- impl->async_init = false;
- }
- pw_log_debug("spa-node %p: async complete event %d %d", this, seq, res);
+ pw_log_debug("spa-node %p: init complete event %d %d", this, seq, res);
+ return complete_init(impl);
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.free = pw_spa_node_free,
- .async_complete = on_node_done,
};
struct pw_node *
@@ -126,6 +125,7 @@ pw_spa_node_new(struct pw_core *core,
{
struct pw_node *this;
struct impl *impl;
+ int res;
this = pw_node_new(core, name, properties, sizeof(struct impl) + user_data_size);
if (this == NULL)
@@ -137,18 +137,27 @@ pw_spa_node_new(struct pw_core *core,
impl->parent = parent;
impl->node = node;
impl->flags = flags;
- impl->async_init = flags & PW_SPA_NODE_FLAG_ASYNC;
if (user_data_size > 0)
impl->user_data = SPA_MEMBER(impl, sizeof(struct impl), void);
pw_node_add_listener(this, &impl->node_listener, &node_events, impl);
- pw_node_set_implementation(this, impl->node);
+ res = pw_node_set_implementation(this, impl->node);
- if (!impl->async_init)
- complete_init(impl);
+ if (res < 0)
+ goto clean_node;
+ if (SPA_RESULT_IS_ASYNC(res)) {
+ spa_node_wait(impl->node, res, &impl->init_pending, on_init_done, impl);
+ } else {
+ complete_init(impl);
+ }
return this;
+
+ clean_node:
+ pw_node_destroy(this);
+ return NULL;
+
}
void *pw_spa_node_get_user_data(struct pw_node *node)
@@ -169,7 +178,8 @@ setup_props(struct pw_core *core, struct spa_node *spa_node, struct pw_propertie
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
const struct spa_pod_prop *prop = NULL;
- if ((res = spa_node_enum_params(spa_node, SPA_PARAM_Props, &index, NULL, &props, &b)) <= 0) {
+ if ((res = spa_node_enum_params_sync(spa_node,
+ SPA_PARAM_Props, &index, NULL, &props, &b)) != 1) {
pw_log_debug("spa_node_get_props failed: %d", res);
return res;
}
@@ -290,8 +300,6 @@ struct pw_node *pw_spa_node_load(struct pw_core *core,
pw_log_error("can't make factory instance: %d", res);
goto init_failed;
}
- if (SPA_RESULT_IS_ASYNC(res))
- flags |= PW_SPA_NODE_FLAG_ASYNC;
if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_Node, &iface)) < 0) {
pw_log_error("can't get node interface %d", res);
diff --git a/src/modules/spa/spa-node.h b/src/modules/spa/spa-node.h
index 29c0bff9..92a122d7 100644
--- a/src/modules/spa/spa-node.h
+++ b/src/modules/spa/spa-node.h
@@ -35,10 +35,9 @@ extern "C" {
#endif
enum pw_spa_node_flags {
- PW_SPA_NODE_FLAG_ASYNC = (1 << 0),
- PW_SPA_NODE_FLAG_DISABLE = (1 << 1),
- PW_SPA_NODE_FLAG_ACTIVATE = (1 << 2),
- PW_SPA_NODE_FLAG_NO_REGISTER = (1 << 3),
+ PW_SPA_NODE_FLAG_DISABLE = (1 << 0),
+ PW_SPA_NODE_FLAG_ACTIVATE = (1 << 1),
+ PW_SPA_NODE_FLAG_NO_REGISTER = (1 << 2),
};
struct pw_node *
diff --git a/src/pipewire/core.c b/src/pipewire/core.c
index ec713c06..0d851152 100644
--- a/src/pipewire/core.c
+++ b/src/pipewire/core.c
@@ -29,6 +29,7 @@
#include <pipewire/log.h>
#include <spa/support/dbus.h>
+#include <spa/node/utils.h>
#include <spa/debug/format.h>
#include <spa/debug/types.h>
@@ -831,10 +832,10 @@ int pw_core_find_format(struct pw_core *core,
if (in_state == PW_PORT_STATE_CONFIGURE && out_state > PW_PORT_STATE_CONFIGURE) {
/* only input needs format */
- if ((res = spa_node_port_enum_params(output->node->node,
+ if ((res = spa_node_port_enum_params_sync(output->node->node,
output->direction, output->port_id,
SPA_PARAM_Format, &oidx,
- NULL, format, builder)) <= 0) {
+ NULL, format, builder)) != 1) {
if (res == 0)
res = -EBADF;
asprintf(error, "error get output format: %s", spa_strerror(res));
@@ -845,10 +846,10 @@ int pw_core_find_format(struct pw_core *core,
spa_debug_format(2, NULL, *format);
} else if (out_state >= PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) {
/* only output needs format */
- if ((res = spa_node_port_enum_params(input->node->node,
+ if ((res = spa_node_port_enum_params_sync(input->node->node,
input->direction, input->port_id,
SPA_PARAM_Format, &iidx,
- NULL, format, builder)) <= 0) {
+ NULL, format, builder)) != 1) {
if (res == 0)
res = -EBADF;
asprintf(error, "error get input format: %s", spa_strerror(res));
@@ -865,10 +866,10 @@ int pw_core_find_format(struct pw_core *core,
/* both ports need a format */
pw_log_debug("core %p: do enum input %d", core, iidx);
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
- if ((res = spa_node_port_enum_params(input->node->node,
+ if ((res = spa_node_port_enum_params_sync(input->node->node,
input->direction, input->port_id,
SPA_PARAM_EnumFormat, &iidx,
- NULL, &filter, &fb)) <= 0) {
+ NULL, &filter, &fb)) != 1) {
if (res == 0 && iidx == 0) {
asprintf(error, "error input enum formats: %s", spa_strerror(res));
goto error;
@@ -880,10 +881,10 @@ int pw_core_find_format(struct pw_core *core,
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_format(2, NULL, filter);
- if ((res = spa_node_port_enum_params(output->node->node,
+ if ((res = spa_node_port_enum_params_sync(output->node->node,
output->direction, output->port_id,
SPA_PARAM_EnumFormat, &oidx,
- filter, format, builder)) <= 0) {
+ filter, format, builder)) != 1) {
if (res == 0) {
oidx = 0;
goto again;
diff --git a/src/pipewire/interfaces.h b/src/pipewire/interfaces.h
index 69419045..6a5f6664 100644
--- a/src/pipewire/interfaces.h
+++ b/src/pipewire/interfaces.h
@@ -177,9 +177,9 @@ pw_core_proxy_hello(struct pw_core_proxy *core, uint32_t version)
}
static inline int
-pw_core_proxy_sync(struct pw_core_proxy *core, uint32_t id, uint32_t seq)
+pw_core_proxy_sync(struct pw_core_proxy *core, uint32_t id)
{
- return pw_proxy_do((struct pw_proxy*)core, struct pw_core_proxy_methods, sync, id, seq);
+ return pw_proxy_do((struct pw_proxy*)core, struct pw_core_proxy_methods, sync, id, 0);
}
static inline int
@@ -326,7 +326,7 @@ pw_core_proxy_add_listener(struct pw_core_proxy *core,
#define pw_core_resource_info(r,...) pw_resource_notify(r,struct pw_core_proxy_events,info,__VA_ARGS__)
#define pw_core_resource_done(r,...) pw_resource_notify(r,struct pw_core_proxy_events,done,__VA_ARGS__)
-#define pw_core_resource_sync(r,...) pw_resource_notify(r,struct pw_core_proxy_events,sync,__VA_ARGS__)
+#define pw_core_resource_sync(r,id) pw_resource_notify(r,struct pw_core_proxy_events,sync,id,0)
#define pw_core_resource_error(r,...) pw_resource_notify(r,struct pw_core_proxy_events,error,__VA_ARGS__)
#define pw_core_resource_remove_id(r,...) pw_resource_notify(r,struct pw_core_proxy_events,remove_id,__VA_ARGS__)
diff --git a/src/pipewire/introspect.c b/src/pipewire/introspect.c
index 32ece4c8..7e0e1f16 100644
--- a/src/pipewire/introspect.c
+++ b/src/pipewire/introspect.c
@@ -440,7 +440,7 @@ struct pw_link_info *pw_link_info_update(struct pw_link_info *info,
}
if (update->change_mask & PW_LINK_CHANGE_MASK_FORMAT) {
free(info->format);
- info->format = update->format ? pw_spa_pod_copy(update->format) : NULL;
+ info->format = update->format ? spa_pod_copy(update->format) : NULL;
}
if (update->change_mask & PW_LINK_CHANGE_MASK_PROPS) {
if (info->props)
diff --git a/src/pipewire/link.c b/src/pipewire/link.c
index d1b2fb16..b7b48b1c 100644
--- a/src/pipewire/link.c
+++ b/src/pipewire/link.c
@@ -27,6 +27,7 @@
#include <stdio.h>
#include <time.h>
+#include <spa/node/utils.h>
#include <spa/pod/parser.h>
#include <spa/pod/compare.h>
#include <spa/param/param.h>
@@ -149,6 +150,28 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
}
}
+static int complete_output(void *data, int seq, int res, const void *result)
+{
+ struct pw_link *this = data;
+ struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
+ struct pw_node *node = this->output->node;
+ seq = SPA_RESULT_ASYNC_SEQ(seq);
+ pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res);
+ pw_work_queue_complete(impl->work, node, seq, res);
+ return 0;
+}
+
+static int complete_input(void *data, int seq, int res, const void *result)
+{
+ struct pw_link *this = data;
+ struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
+ struct pw_node *node = this->output->node;
+ seq = SPA_RESULT_ASYNC_SEQ(seq);
+ pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res);
+ pw_work_queue_complete(impl->work, node, seq, res);
+ return 0;
+}
+
static void complete_ready(void *obj, void *data, int res, uint32_t id)
{
struct pw_port_mix *mix = data;
@@ -181,6 +204,19 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
}
}
+static void remove_pending(struct spa_pending *pending)
+{
+ free(pending);
+}
+
+static struct spa_pending *make_pending(struct impl *impl)
+{
+ struct spa_pending *pending;
+ pending = calloc(1, sizeof(struct spa_pending));
+ pending->removed = remove_pending;
+ return pending;
+}
+
static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_state)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
@@ -214,14 +250,14 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
in_state = input->state;
out_state = output->state;
- format = pw_spa_pod_copy(format);
+ format = spa_pod_copy(format);
spa_pod_fixate(format);
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if (out_state > PW_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
- res = spa_node_port_enum_params(output->node->node,
+ res = spa_node_port_enum_params_sync(output->node->node,
output->direction, output->port_id,
SPA_PARAM_Format, &index,
NULL, &current, &b);
@@ -256,7 +292,7 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
}
if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
- res = spa_node_port_enum_params(input->node->node,
+ res = spa_node_port_enum_params_sync(input->node->node,
input->direction, input->port_id,
SPA_PARAM_Format, &index,
NULL, &current, &b);
@@ -299,7 +335,11 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
- spa_node_sync(output->node->node, res);
+ spa_node_wait(output->node->node, res,
+ make_pending(impl),
+ complete_output,
+ this);
+
pw_work_queue_add(impl->work, output->node, res, complete_ready,
&this->rt.out_mix);
}
@@ -314,7 +354,11 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
goto error;
}
if (SPA_RESULT_IS_ASYNC(res2)) {
- spa_node_sync(input->node->node, res2);
+ spa_node_wait(input->node->node, res2,
+ make_pending(impl),
+ complete_input,
+ this);
+
pw_work_queue_add(impl->work, input->node, res2, complete_ready,
&this->rt.in_mix);
if (res == 0)
@@ -453,12 +497,12 @@ param_filter(struct pw_link *this,
for (iidx = 0;;) {
spa_pod_builder_init(&ib, ibuf, sizeof(ibuf));
pw_log_debug("iparam %d", iidx);
- if ((res = spa_node_port_enum_params(in_port->node->node,
+ if ((res = spa_node_port_enum_params_sync(in_port->node->node,
in_port->direction, in_port->port_id,
id, &iidx, NULL, &iparam, &ib)) < 0)
break;
- if (res == 0) {
+ if (res != 1) {
if (num > 0)
break;
iparam = NULL;
@@ -469,9 +513,9 @@ param_filter(struct pw_link *this,
for (oidx = 0;;) {
pw_log_debug("oparam %d", oidx);
- if (spa_node_port_enum_params(out_port->node->node, out_port->direction,
+ if (spa_node_port_enum_params_sync(out_port->node->node, out_port->direction,
out_port->port_id, id, &oidx,
- iparam, &oparam, result) <= 0) {
+ iparam, &oparam, result) != 1) {
break;
}
@@ -680,7 +724,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
- spa_node_sync(output->node->node, res);
+ //spa_node_sync(output->node->node, res);
pw_work_queue_add(impl->work, output->node, res, complete_paused,
&this->rt.out_mix);
}
@@ -700,7 +744,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
- spa_node_sync(input->node->node, res);
+ //spa_node_sync(input->node->node, res);
pw_work_queue_add(impl->work, input->node, res, complete_paused,
&this->rt.in_mix);
}
@@ -723,7 +767,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
- spa_node_sync(output->node->node, res);
+ spa_node_wait(output->node->node, res,
+ make_pending(impl),
+ complete_output,
+ this);
pw_work_queue_add(impl->work, output->node, res, complete_paused,
&this->rt.out_mix);
}
@@ -743,7 +790,10 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
- spa_node_sync(input->node->node, res);
+ spa_node_wait(input->node->node, res,
+ make_pending(impl),
+ complete_input,
+ this);
pw_work_queue_add(impl->work, input->node, res, complete_paused,
&this->rt.in_mix);
}
@@ -870,6 +920,7 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
this, -EBUSY, (pw_work_func_t) check_states, this);
}
+#if 0
static void
input_node_async_complete(void *data, uint32_t seq, int res)
{
@@ -889,6 +940,7 @@ output_node_async_complete(void *data, uint32_t seq, int res)
pw_log_debug("link %p: node %p async complete %d %d", impl, node, seq, res);
pw_work_queue_complete(impl->work, node, seq, res);
}
+#endif
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
@@ -1126,12 +1178,12 @@ static const struct pw_port_events output_port_events = {
static const struct pw_node_events input_node_events = {
PW_VERSION_NODE_EVENTS,
- .async_complete = input_node_async_complete,
+// .async_complete = input_node_async_complete,
};
static const struct pw_node_events output_node_events = {
PW_VERSION_NODE_EVENTS,
- .async_complete = output_node_async_complete,
+// .async_complete = output_node_async_complete,
};
static int find_driver(struct pw_link *this)
diff --git a/src/pipewire/node.c b/src/pipewire/node.c
index aa004fd9..a5d44a26 100644
--- a/src/pipewire/node.c
+++ b/src/pipewire/node.c
@@ -31,6 +31,7 @@
#include <sys/eventfd.h>
#include <spa/pod/parser.h>
+#include <spa/node/utils.h>
#include "pipewire/interfaces.h"
#include "pipewire/private.h"
@@ -61,6 +62,7 @@ struct impl {
uint32_t next_position;
int last_error;
+ struct spa_pending init_pending;
};
struct resource_data {
@@ -154,6 +156,7 @@ static int start_node(struct pw_node *this)
res = spa_node_send_command(this->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start));
+
if (res < 0)
pw_log_debug("node %p: start node error %s", this, spa_strerror(res));
@@ -834,35 +837,55 @@ static int node_port_info(void *data, enum spa_direction direction, uint32_t por
return 0;
}
-static int node_error(void *data, int res, const char *message)
-{
- struct pw_node *node = data;
- struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
+struct node_pending {
+ struct spa_pending pending;
+ struct impl *impl;
+ enum pw_node_state state;
+};
- pw_log_debug("node %p: error event %d: %s", node, res, message);
- impl->last_error = res;
- node_update_state(node, PW_NODE_STATE_ERROR, strdup(message));
- return 0;
+static void remove_pending(struct spa_pending *pending)
+{
+ free(pending);
}
-static int node_done(void *data, uint32_t seq)
+static struct node_pending *make_pending(struct impl *impl, enum pw_node_state state)
{
- struct pw_node *node = data;
- struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
+ struct node_pending *pending;
+ pending = calloc(1, sizeof(struct node_pending));
+ pending->pending.removed = remove_pending;
+ pending->impl = impl;
+ pending->state = state;
+ return pending;
+}
- pw_log_debug("node %p: done event %u", node, seq);
- pw_work_queue_complete(impl->work, node, seq, impl->last_error);
- pw_node_events_async_complete(node, seq, impl->last_error);
- impl->last_error = 0;
+static int node_complete(void *data, int seq, int res, const void *result)
+{
+ struct node_pending *pending = data;
+ struct impl *impl = pending->impl;
+ seq = SPA_RESULT_ASYNC_SEQ(seq);
+ pw_log_debug("node %p: done event %d %u", impl, res, pending->pending.seq);
+ impl->last_error = res;
+ pw_work_queue_complete(impl->work, &impl->this, seq, res);
return 0;
}
static int node_event(void *data, struct spa_event *event)
{
struct pw_node *node = data;
+ struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
pw_log_trace("node %p: event %d", node, SPA_EVENT_TYPE(event));
+
+ switch (SPA_NODE_EVENT_ID(event)) {
+ case SPA_NODE_EVENT_Error:
+ impl->last_error = -EFAULT;
+ node_update_state(node, PW_NODE_STATE_ERROR, strdup("error"));
+ break;
+ default:
+ break;
+ }
pw_node_events_event(node, event);
+
return 0;
}
@@ -903,8 +926,6 @@ static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
static const struct spa_node_callbacks node_callbacks = {
SPA_VERSION_NODE_CALLBACKS,
- .done = node_done,
- .error = node_error,
.info = node_info,
.port_info = node_port_info,
.event = node_event,
@@ -912,14 +933,28 @@ static const struct spa_node_callbacks node_callbacks = {
.reuse_buffer = node_reuse_buffer,
};
+static int init_result(void *data, int seq, int res, const void *result)
+{
+ pw_log_debug("node %p: set_callbacks finished", data);
+ return 0;
+}
+
SPA_EXPORT
-void pw_node_set_implementation(struct pw_node *node,
- struct spa_node *spa_node)
+int pw_node_set_implementation(struct pw_node *node,
+ struct spa_node *spa_node)
{
+ int res;
+ struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
+
node->node = spa_node;
pw_log_debug("node %p: implementation %p", node, spa_node);
- spa_node_set_callbacks(node->node, &node_callbacks, node);
spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node);
+ res = spa_node_set_callbacks(node->node, &node_callbacks, node);
+
+ if (SPA_RESULT_IS_ASYNC(res)) {
+ pw_log_debug("node %p: init is async %d", node, res);
+ spa_node_wait(node->node, res, &impl->init_pending, init_result, node);
+ }
if (spa_node_set_io(node->node,
SPA_IO_Position,
@@ -935,6 +970,7 @@ void pw_node_set_implementation(struct pw_node *node,
pw_log_debug("node %p: set clock %p", node, &node->rt.activation->position.clock);
node->rt.clock = &node->rt.activation->position.clock;
}
+ return res;
}
SPA_EXPORT
@@ -1068,9 +1104,9 @@ int pw_node_for_each_param(struct pw_node *node,
spa_pod_builder_init(&b, buf, sizeof(buf));
idx = index;
- if ((res = spa_node_enum_params(node->node,
+ if ((res = spa_node_enum_params_sync(node->node,
param_id, &index,
- filter, &param, &b)) <= 0)
+ filter, &param, &b)) != 1)
break;
if ((res = callback(data, param_id, idx, index, param)) != 0)
@@ -1232,8 +1268,12 @@ int pw_node_set_state(struct pw_node *node, enum pw_node_state state)
if (SPA_RESULT_IS_ERROR(res))
return res;
- if (SPA_RESULT_IS_ASYNC(res))
- spa_node_sync(node->node, res);
+ if (SPA_RESULT_IS_ASYNC(res)) {
+ struct node_pending *pending = make_pending(impl, state);
+
+ spa_node_wait(node->node, res, &pending->pending,
+ node_complete, pending);
+ }
pw_work_queue_add(impl->work,
node, res, on_state_complete, SPA_INT_TO_PTR(state));
diff --git a/src/pipewire/node.h b/src/pipewire/node.h
index 81a25f38..9d489dcc 100644
--- a/src/pipewire/node.h
+++ b/src/pipewire/node.h
@@ -83,9 +83,6 @@ struct pw_node_events {
void (*state_changed) (void *data, enum pw_node_state old,
enum pw_node_state state, const char *error);
- /** an async operation completed on the node */
- void (*async_complete) (void *data, uint32_t seq, int res);
-
/** an event is emited */
void (*event) (void *data, const struct spa_event *event);
@@ -147,7 +144,8 @@ const struct pw_properties *pw_node_get_properties(struct pw_node *node);
int pw_node_update_properties(struct pw_node *node, const struct spa_dict *dict);
/** Set the node implementation */
-void pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node);
+int pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node);
+
/** Get the node implementation */
struct spa_node *pw_node_get_implementation(struct pw_node *node);
diff --git a/src/pipewire/port.c b/src/pipewire/port.c
index 8252ca03..b05df9c1 100644
--- a/src/pipewire/port.c
+++ b/src/pipewire/port.c
@@ -27,6 +27,7 @@
#include <errno.h>
#include <spa/pod/parser.h>
+#include <spa/node/utils.h>
#include <spa/debug/types.h>
#include "pipewire/pipewire.h"
@@ -463,6 +464,7 @@ static const struct pw_resource_events resource_events = {
static int reply_param(void *data, uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
{
struct pw_resource *resource = data;
+ pw_log_debug("resource %p: reply param %d %d %d", resource, id, index, next);
pw_port_resource_param(resource, id, index, next, param);
return 0;
}
@@ -474,6 +476,7 @@ static int port_enum_params(void *object, uint32_t id, uint32_t index, uint32_t
struct resource_data *data = pw_resource_get_user_data(resource);
struct pw_port *port = data->port;
int res;
+ pw_log_debug("resource %p: enum params", resource);
if ((res = pw_port_for_each_param(port, id, index, num, filter,
reply_param, resource)) < 0)
@@ -758,15 +761,17 @@ int pw_port_for_each_param(struct pw_port *port,
for (count = 0; count < max; count++) {
spa_pod_builder_init(&b, buf, sizeof(buf));
idx = index;
- if ((res = spa_node_port_enum_params(node->node,
+ if ((res = spa_node_port_enum_params_sync(node->node,
port->direction, port->port_id,
param_id, &index,
- filter, &param, &b)) <= 0)
+ filter, &param, &b)) != 1)
break;
+ pw_log_debug("port %p: have param %d %u %u", port, param_id, idx, index);
if ((res = callback(data, param_id, idx, index, param)) != 0)
break;
}
+ pw_log_debug("port %p: res %d", port, res);
return res;
}
diff --git a/src/pipewire/proxy.c b/src/pipewire/proxy.c
index 2d311e50..b37d342a 100644
--- a/src/pipewire/proxy.c
+++ b/src/pipewire/proxy.c
@@ -147,11 +147,13 @@ void pw_proxy_destroy(struct pw_proxy *proxy)
}
SPA_EXPORT
-int pw_proxy_sync(struct pw_proxy *proxy, uint32_t seq)
+int pw_proxy_sync(struct pw_proxy *proxy)
{
int res = -EIO;
- if (proxy->remote->core_proxy != NULL)
- res = pw_core_proxy_sync(proxy->remote->core_proxy, proxy->id, seq);
+ if (proxy->remote->core_proxy != NULL) {
+ res = pw_core_proxy_sync(proxy->remote->core_proxy, proxy->id);
+ pw_log_debug("proxy %p: %u sync %u", proxy, proxy->id, res);
+ }
return res;
}
diff --git a/src/pipewire/proxy.h b/src/pipewire/proxy.h
index 3c6e9a23..f1088c02 100644
--- a/src/pipewire/proxy.h
+++ b/src/pipewire/proxy.h
@@ -151,8 +151,8 @@ uint32_t pw_proxy_get_id(struct pw_proxy *proxy);
struct pw_protocol *pw_proxy_get_protocol(struct pw_proxy *proxy);
/** Generate an sync method for a proxy. This will generate a done event
- * with the same \a seq. */
-int pw_proxy_sync(struct pw_proxy *proxy, uint32_t seq);
+ * with the same seq number of the reply. */
+int pw_proxy_sync(struct pw_proxy *proxy);
/** Generate an error for a proxy */
int pw_proxy_error(struct pw_proxy *proxy, int result, const char *error, ...);
diff --git a/src/pipewire/resource.c b/src/pipewire/resource.c
index 84e7b5b3..9ab34ef0 100644
--- a/src/pipewire/resource.c
+++ b/src/pipewire/resource.c
@@ -170,11 +170,13 @@ const struct pw_protocol_marshal *pw_resource_get_marshal(struct pw_resource *re
}
SPA_EXPORT
-int pw_resource_sync(struct pw_resource *resource, uint32_t seq)
+int pw_resource_sync(struct pw_resource *resource)
{
int res = -EIO;
- if (resource->client->core_resource != NULL)
- res = pw_core_resource_sync(resource->client->core_resource, resource->id, seq);
+ if (resource->client->core_resource != NULL) {
+ res = pw_core_resource_sync(resource->client->core_resource, resource->id);
+ pw_log_debug("resource %p: %u sync %u", resource, resource->id, res);
+ }
return res;
}
diff --git a/src/pipewire/resource.h b/src/pipewire/resource.h
index 43ded038..555565ed 100644
--- a/src/pipewire/resource.h
+++ b/src/pipewire/resource.h
@@ -123,8 +123,8 @@ void pw_resource_add_override(struct pw_resource *resource,
void *data);
/** Generate an sync method for a resource. This will generate a done event
- * with the same \a seq. */
-int pw_resource_sync(struct pw_resource *resource, uint32_t seq);
+ * with the same \a sequence number in the return value. */
+int pw_resource_sync(struct pw_resource *resource);
/** Generate an error for a resource */
int pw_resource_error(struct pw_resource *resource, int result, const char *error, ...);
diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c
index e7d7af3a..8ba00278 100644
--- a/src/pipewire/stream.c
+++ b/src/pipewire/stream.c
@@ -142,7 +142,7 @@ static struct param *add_param(struct pw_stream *stream,
struct param *p;
struct spa_pod *copy = NULL;
- if (param != NULL && (copy = pw_spa_pod_copy(param)) == NULL)
+ if (param != NULL && (copy = spa_pod_copy(param)) == NULL)
return NULL;
p = pw_array_add(&impl->params, sizeof(struct param));
@@ -378,49 +378,65 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction,
static int impl_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
- uint32_t id, uint32_t *index,
+ uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
- struct spa_pod **result,
- struct spa_pod_builder *builder)
+ spa_result_func_t func, void *data)
{
struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node);
struct spa_pod *param;
uint32_t last_id = SPA_ID_INVALID;
uint32_t n_params = pw_array_get_len(&d->params, struct param);
+ struct spa_result_node_enum_params result;
+ uint8_t buffer[1024];
+ struct spa_pod_builder b = { 0 };
+ uint32_t count = 0;
+ int res;
+
+ result.next = start;
+
+ next:
+ spa_pod_builder_init(&b, buffer, sizeof(buffer));
while (true) {
- if (*index < n_params) {
- param = pw_array_get_unchecked(&d->params, *index, struct param)->param;
+ if (result.next < n_params) {
+ param = pw_array_get_unchecked(&d->params, result.next, struct param)->param;
}
else if (last_id != SPA_ID_INVALID)
- return 1;
+ break;
else
return 0;
- (*index)++;
+ result.next++;
if (id == SPA_PARAM_List) {
uint32_t new_id = ((struct spa_pod_object *) param)->body.id;
if (last_id == SPA_ID_INVALID){
- *result = spa_pod_builder_add_object(builder,
+ result.param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
SPA_PARAM_LIST_id, SPA_POD_Id(new_id));
last_id = new_id;
}
else if (last_id != new_id) {
- (*index)--;
+ result.next--;
break;
}
} else {
if (param == NULL || !spa_pod_is_object_id(param, id))
continue;
- if (spa_pod_filter(builder, result, param, filter) == 0)
+ if (spa_pod_filter(&b, &result.param, param, filter) == 0)
break;
}
}
- return 1;
+
+ if ((res = func(data, count, 1, &result)) != 0)
+ return res;
+
+ if (++count != num)
+ goto next;
+
+ return 0;
}
static int port_set_format(struct spa_node *node,
diff --git a/src/pipewire/utils.h b/src/pipewire/utils.h
index ad542f4d..bdb50635 100644
--- a/src/pipewire/utils.h
+++ b/src/pipewire/utils.h
@@ -52,20 +52,6 @@ pw_free_strv(char **str);
char *
pw_strip(char *str, const char *whitespace);
-/** Copy a pod structure \memberof pw_utils */
-static inline struct spa_pod *
-pw_spa_pod_copy(const struct spa_pod *pod)
-{
- size_t size;
- struct spa_pod *c;
-
- size = SPA_POD_SIZE(pod);
- if ((c = (struct spa_pod *) malloc(size)) == NULL)
- return NULL;
-
- return (struct spa_pod *) memcpy(c, pod, size);
-}
-
#ifdef __cplusplus
} /* extern "C" */
#endif
diff --git a/src/pipewire/work-queue.c b/src/pipewire/work-queue.c
index 7b60046e..861b47be 100644
--- a/src/pipewire/work-queue.c
+++ b/src/pipewire/work-queue.c
@@ -27,6 +27,7 @@
#include <string.h>
#include <spa/utils/type.h>
+#include <spa/utils/result.h>
#include "pipewire/log.h"
#include "pipewire/work-queue.h"
diff --git a/src/tools/pipewire-cli.c b/src/tools/pipewire-cli.c
index 2bcbfb37..288e91dd 100644
--- a/src/tools/pipewire-cli.c
+++ b/src/tools/pipewire-cli.c
@@ -73,6 +73,7 @@ struct remote_data {
struct pw_remote *remote;
struct spa_hook remote_listener;
+ uint32_t prompt_pending;
struct pw_core_proxy *core_proxy;
struct spa_hook core_listener;
@@ -261,13 +262,9 @@ static int on_core_done(void *_data, uint32_t id, uint32_t seq)
{
struct remote_data *rd = _data;
- switch (seq) {
- case 1:
+ if (seq == rd->prompt_pending)
show_prompt(rd);
- break;
- default:
- break;
- }
+
return 0;
}
@@ -396,7 +393,7 @@ static void on_state_changed(void *_data, enum pw_remote_state old,
pw_registry_proxy_add_listener(rd->registry_proxy,
&rd->registry_listener,
&registry_events, rd);
- pw_core_proxy_sync(rd->core_proxy, 0, 1);
+ rd->prompt_pending = pw_core_proxy_sync(rd->core_proxy, 0);
break;
default:
@@ -1390,7 +1387,7 @@ static void do_input(void *data, int fd, enum spa_io mask)
if (d->current == NULL)
pw_main_loop_quit(d->loop);
else if (d->current->core_proxy)
- pw_core_proxy_sync(d->current->core_proxy, 0, 1);
+ d->current->prompt_pending = pw_core_proxy_sync(d->current->core_proxy, 0);
}
}
diff --git a/src/tools/pipewire-monitor.c b/src/tools/pipewire-monitor.c
index d1acb12c..3e5c2083 100644
--- a/src/tools/pipewire-monitor.c
+++ b/src/tools/pipewire-monitor.c
@@ -52,7 +52,6 @@ struct data {
struct pw_registry_proxy *registry_proxy;
struct spa_hook registry_listener;
- uint32_t seq;
struct spa_list pending_list;
};
@@ -81,8 +80,7 @@ static void add_pending(struct proxy_data *pd)
struct data *d = pd->data;
spa_list_append(&d->pending_list, &pd->pending_link);
- pd->pending_seq = ++d->seq;
- pw_core_proxy_sync(d->core_proxy, 0, pd->pending_seq);
+ pd->pending_seq = pw_core_proxy_sync(d->core_proxy, 0);
}
static void remove_pending(struct proxy_data *pd)
@@ -126,7 +124,7 @@ static int add_param(struct proxy_data *data, const struct spa_pod *param)
data->params = realloc(data->params, sizeof(struct spa_pod *) * data->n_params);
if (data->params == NULL)
return -ENOMEM;
- data->params[idx] = pw_spa_pod_copy(param);
+ data->params[idx] = spa_pod_copy(param);
return 0;
}
@@ -710,7 +708,6 @@ int main(int argc, char *argv[])
if (pw_remote_connect(data.remote) < 0)
return -1;
- data.seq = 1;
spa_list_init(&data.pending_list);
pw_main_loop_run(data.loop);