summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-09-07 14:36:25 +0200
committerWim Taymans <wtaymans@redhat.com>2020-09-07 14:36:25 +0200
commit71441565fd8dcc73949a3091151665813a591431 (patch)
tree40faebb7549c3f22b9f7a4f299158a6f03159c1c
parent185a3d4c361588ae3dae6bf28a1aec3390489852 (diff)
pulse: rework sync
Use a global sync that both syncs the globals and completes the operations. In the case of a card profile change, first the nodes are removed and added and then the Profile and Route info updated. We need to be able to bind to the new node and get the device.profile.id before we can find the active port of the node. See #279
-rw-r--r--pipewire-pulseaudio/src/context.c151
-rw-r--r--pipewire-pulseaudio/src/internal.h7
-rw-r--r--pipewire-pulseaudio/src/operation.c15
3 files changed, 98 insertions, 75 deletions
diff --git a/pipewire-pulseaudio/src/context.c b/pipewire-pulseaudio/src/context.c
index fc5241e0..22bd2694 100644
--- a/pipewire-pulseaudio/src/context.c
+++ b/pipewire-pulseaudio/src/context.c
@@ -270,16 +270,6 @@ static const char *str_efac(pa_subscription_event_type_t event)
return "invalid";
}
-static void global_sync(struct global *g)
-{
- pa_operation *o;
- pa_context *c = g->context;
-
- g->pending_seq = pw_proxy_sync(g->proxy, 0);
- spa_list_for_each(o, &c->operations, link)
- o->seq = g->pending_seq;
-}
-
static void emit_event(pa_context *c, struct global *g, pa_subscription_event_type_t event)
{
if (c->subscribe_callback && (c->subscribe_mask & g->mask)) {
@@ -301,6 +291,35 @@ static void emit_event(pa_context *c, struct global *g, pa_subscription_event_ty
}
}
+static void do_global_sync(struct global *g)
+{
+ pa_subscription_event_type_t event;
+
+ if (g->ginfo && g->ginfo->sync)
+ g->ginfo->sync(g);
+ if (g->init) {
+ if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) {
+ if (g->node_info.device_index == SPA_ID_INVALID ||
+ (g->stream && g->stream->state != PA_STREAM_READY))
+ return;
+ }
+ g->init = false;
+ event = PA_SUBSCRIPTION_EVENT_NEW;
+ } else {
+ event = PA_SUBSCRIPTION_EVENT_CHANGE;
+ }
+ pw_log_debug("emit because of pending");
+ emit_event(g->context, g, event);
+}
+
+
+static void global_sync(struct global *g)
+{
+ pa_context *c = g->context;
+ c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, c->pending_seq);
+ g->sync = true;
+}
+
static struct param *add_param(struct spa_list *params, uint32_t id, const struct spa_pod *param)
{
struct param *p;
@@ -394,8 +413,11 @@ static void device_event_info(void *object, const struct pw_device_info *info)
remove_params(&g->card_info.ports, id);
g->card_info.n_ports = 0;
break;
- case SPA_PARAM_Profile:
case SPA_PARAM_Route:
+ remove_params(&g->card_info.routes, id);
+ g->card_info.n_routes = 0;
+ break;
+ case SPA_PARAM_Profile:
break;
default:
do_enum = false;
@@ -500,7 +522,6 @@ static void device_event_param(void *object, int seq,
const struct spa_pod *param)
{
struct global *g = object;
- pa_context *c = g->context;
switch (id) {
case SPA_PARAM_EnumProfile:
@@ -557,29 +578,21 @@ static void device_event_param(void *object, int seq,
{
uint32_t index, device;
enum spa_direction direction;
- struct spa_pod *props = NULL;
- struct global *ng;
if (spa_pod_parse_object(param,
SPA_TYPE_OBJECT_ParamRoute, NULL,
SPA_PARAM_ROUTE_index, SPA_POD_Int(&index),
SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction),
- SPA_PARAM_ROUTE_device, SPA_POD_Int(&device),
- SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) {
+ SPA_PARAM_ROUTE_device, SPA_POD_Int(&device)) < 0) {
pw_log_warn("device %d: can't parse route", g->id);
return;
}
+ if (add_param(&g->card_info.routes, id, param))
+ g->card_info.n_routes++;
- pw_log_debug("device %d: active %s route %d", g->id,
+ pw_log_debug("device %d: active %s route %d device %d", g->id,
direction == SPA_DIRECTION_OUTPUT ? "output" : "input",
- index);
-
- ng = find_node_for_route(c, g, device);
- if (props && ng && ng->node_info.active_port != index) {
- ng->node_info.active_port = index;
- parse_props(ng, props, true);
- emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE);
- }
+ index, device);
break;
}
default:
@@ -702,6 +715,7 @@ static void device_clear_ports(struct global *g)
static void device_sync_ports(struct global *g)
{
pa_card_info *i = &g->card_info.info;
+ pa_context *c = g->context;
uint32_t n_ports, j;
struct param *p;
@@ -741,6 +755,8 @@ static void device_sync_ports(struct global *g)
continue;
}
+ pw_log_debug("port %d: name:%s", j, name);
+
pi = i->ports[j] = &g->card_info.card_ports[j];
spa_zero(*pi);
pi->name = name;
@@ -800,6 +816,30 @@ static void device_sync_ports(struct global *g)
i->n_ports = j;
if (i->n_ports == 0)
i->ports = NULL;
+
+ spa_list_for_each(p, &g->card_info.routes, link) {
+ struct global *ng;
+ uint32_t index, device;
+ enum spa_direction direction;
+ struct spa_pod *props = NULL;
+
+ if (spa_pod_parse_object(p->param,
+ SPA_TYPE_OBJECT_ParamRoute, NULL,
+ SPA_PARAM_ROUTE_index, SPA_POD_Int(&index),
+ SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction),
+ SPA_PARAM_ROUTE_device, SPA_POD_Int(&device),
+ SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) {
+ pw_log_warn("device %d: can't parse route", g->id);
+ continue;
+ }
+
+ ng = find_node_for_route(c, g, device);
+ if (props && ng && ng->node_info.active_port != index) {
+ ng->node_info.active_port = index;
+ parse_props(ng, props, true);
+ emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE);
+ }
+ }
}
static void device_sync(struct global *g)
@@ -831,6 +871,7 @@ static void device_destroy(void *data)
device_clear_ports(global);
device_clear_profiles(global);
+ remove_params(&global->card_info.routes, SPA_ID_INVALID);
remove_params(&global->card_info.ports, SPA_ID_INVALID);
remove_params(&global->card_info.profiles, SPA_ID_INVALID);
@@ -1078,35 +1119,10 @@ static void proxy_destroy(void *data)
g->proxy = NULL;
}
-static void proxy_done(void *data, int seq)
-{
- struct global *g = data;
- pa_subscription_event_type_t event;
-
- if (g->pending_seq == seq) {
- if (g->ginfo && g->ginfo->sync)
- g->ginfo->sync(g);
- if (g->init) {
- if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) {
- if (g->node_info.device_index == SPA_ID_INVALID ||
- (g->stream && g->stream->state != PA_STREAM_READY))
- return;
- }
- g->init = false;
- event = PA_SUBSCRIPTION_EVENT_NEW;
- } else {
- event = PA_SUBSCRIPTION_EVENT_CHANGE;
- }
- pw_log_debug("emit because of pending");
- emit_event(g->context, g, event);
- }
-}
-
static const struct pw_proxy_events proxy_events = {
PW_VERSION_PROXY_EVENTS,
.removed = proxy_removed,
.destroy = proxy_destroy,
- .done = proxy_done,
};
static void update_link(pa_context *c, uint32_t src_node_id, uint32_t dst_node_id)
@@ -1153,6 +1169,7 @@ static int set_mask(pa_context *c, struct global *g)
ginfo = &device_info;
spa_list_init(&g->card_info.profiles);
spa_list_init(&g->card_info.ports);
+ spa_list_init(&g->card_info.routes);
} else if (strcmp(g->type, PW_TYPE_INTERFACE_Node) == 0) {
if (g->props == NULL)
return 0;
@@ -1347,19 +1364,6 @@ static const struct pw_registry_events registry_events =
.global_remove = registry_event_global_remove,
};
-static void complete_operations(pa_context *c, int seq)
-{
- pa_operation *o, *t;
- spa_list_for_each_safe(o, t, &c->operations, link) {
- if (o->seq != seq)
- continue;
- pa_operation_ref(o);
- if (o->callback)
- o->callback(o, o->userdata);
- pa_operation_unref(o);
- }
-}
-
static void core_info(void *data, const struct pw_core_info *info)
{
pa_context *c = data;
@@ -1394,8 +1398,25 @@ static void core_error(void *data, uint32_t id, int seq, int res, const char *me
static void core_done(void *data, uint32_t id, int seq)
{
pa_context *c = data;
- pw_log_debug("done id:%u seq:%d", id, seq);
- complete_operations(c, seq);
+ pa_operation *o, *t;
+ struct global *g;
+
+ pw_log_debug("done id:%u seq:%d/%d", id, seq, c->pending_seq);
+ if (c->pending_seq != seq)
+ return;
+
+ spa_list_for_each(g, &c->globals, link) {
+ if (g->sync) {
+ do_global_sync(g);
+ g->sync = false;
+ }
+ }
+ spa_list_for_each_safe(o, t, &c->operations, link) {
+ pa_operation_ref(o);
+ if (o->callback)
+ o->callback(o, o->userdata);
+ pa_operation_unref(o);
+ }
}
static const struct pw_core_events core_events = {
diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h
index e67bcc59..f7a62f8f 100644
--- a/pipewire-pulseaudio/src/internal.h
+++ b/pipewire-pulseaudio/src/internal.h
@@ -256,8 +256,8 @@ struct global {
pa_subscription_event_type_t event;
int priority_driver;
- int pending_seq;
int init:1;
+ int sync:1;
void *info;
struct global_info *ginfo;
@@ -306,6 +306,8 @@ struct global {
uint32_t active_profile;
struct spa_list ports;
uint32_t n_ports;
+ struct spa_list routes;
+ uint32_t n_routes;
pa_card_info info;
pa_card_profile_info2 *card_profiles;
unsigned int pending_profiles:1;
@@ -371,6 +373,8 @@ struct pa_context {
int no_fail:1;
int disconnect:1;
+ int pending_seq;
+
struct global *metadata;
uint32_t default_sink;
uint32_t default_source;
@@ -485,7 +489,6 @@ struct pa_operation
pa_context *context;
pa_stream *stream;
- int seq;
pa_operation_state_t state;
pa_operation_cb_t callback;
diff --git a/pipewire-pulseaudio/src/operation.c b/pipewire-pulseaudio/src/operation.c
index 3b6f8d55..dd14a6c4 100644
--- a/pipewire-pulseaudio/src/operation.c
+++ b/pipewire-pulseaudio/src/operation.c
@@ -36,7 +36,6 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
o->refcount = 1;
o->context = c;
o->stream = s ? pa_stream_ref(s) : NULL;
- o->seq = SPA_ID_INVALID;
o->state = PA_OPERATION_RUNNING;
o->callback = cb;
@@ -52,8 +51,8 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
int pa_operation_sync(pa_operation *o)
{
pa_context *c = o->context;
- o->seq = pw_core_sync(c->core, PW_ID_CORE, 0);
- pw_log_debug("operation %p: sync seq:%d", o, o->seq);
+ c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, 0);
+ pw_log_debug("operation %p: sync seq:%d", o, c->pending_seq);
return 0;
}
@@ -70,14 +69,14 @@ static void operation_free(pa_operation *o)
{
pa_assert(!o->context);
pa_assert(!o->stream);
- pw_log_debug("%p seq:%d", o, o->seq);
+ pw_log_debug("%p", o);
free(o);
}
static void operation_unlink(pa_operation *o) {
pa_assert(o);
- pw_log_debug("%p seq:%d", o, o->seq);
+ pw_log_debug("%p", o);
if (o->context) {
pa_assert(o->refcount >= 2);
@@ -100,7 +99,7 @@ void pa_operation_unref(pa_operation *o)
{
pa_assert(o);
pa_assert(o->refcount >= 1);
- pw_log_debug("%p seq:%d ref:%d", o, o->seq, o->refcount);
+ pw_log_debug("%p ref:%d", o, o->refcount);
if (--o->refcount == 0)
operation_free(o);
}
@@ -114,7 +113,7 @@ static void operation_set_state(pa_operation *o, pa_operation_state_t st) {
pa_operation_ref(o);
- pw_log_debug("new state %p seq:%d state:%d", o, o->seq, st);
+ pw_log_debug("new state %p state:%d", o, st);
o->state = st;
if (o->state_callback)
@@ -132,7 +131,7 @@ void pa_operation_cancel(pa_operation *o)
{
pa_assert(o);
pa_assert(o->refcount >= 1);
- pw_log_debug("%p seq:%d", o, o->seq);
+ pw_log_debug("%p", o);
operation_set_state(o, PA_OPERATION_CANCELED);
}