From 71441565fd8dcc73949a3091151665813a591431 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 7 Sep 2020 14:36:25 +0200 Subject: pulse: rework sync Use a global sync that both syncs the globals and completes the operations. In the case of a card profile change, first the nodes are removed and added and then the Profile and Route info updated. We need to be able to bind to the new node and get the device.profile.id before we can find the active port of the node. See #279 --- pipewire-pulseaudio/src/context.c | 151 ++++++++++++++++++++---------------- pipewire-pulseaudio/src/internal.h | 7 +- pipewire-pulseaudio/src/operation.c | 15 ++-- 3 files changed, 98 insertions(+), 75 deletions(-) diff --git a/pipewire-pulseaudio/src/context.c b/pipewire-pulseaudio/src/context.c index fc5241e0..22bd2694 100644 --- a/pipewire-pulseaudio/src/context.c +++ b/pipewire-pulseaudio/src/context.c @@ -270,16 +270,6 @@ static const char *str_efac(pa_subscription_event_type_t event) return "invalid"; } -static void global_sync(struct global *g) -{ - pa_operation *o; - pa_context *c = g->context; - - g->pending_seq = pw_proxy_sync(g->proxy, 0); - spa_list_for_each(o, &c->operations, link) - o->seq = g->pending_seq; -} - static void emit_event(pa_context *c, struct global *g, pa_subscription_event_type_t event) { if (c->subscribe_callback && (c->subscribe_mask & g->mask)) { @@ -301,6 +291,35 @@ static void emit_event(pa_context *c, struct global *g, pa_subscription_event_ty } } +static void do_global_sync(struct global *g) +{ + pa_subscription_event_type_t event; + + if (g->ginfo && g->ginfo->sync) + g->ginfo->sync(g); + if (g->init) { + if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) { + if (g->node_info.device_index == SPA_ID_INVALID || + (g->stream && g->stream->state != PA_STREAM_READY)) + return; + } + g->init = false; + event = PA_SUBSCRIPTION_EVENT_NEW; + } else { + event = PA_SUBSCRIPTION_EVENT_CHANGE; + } + pw_log_debug("emit because of pending"); + emit_event(g->context, g, event); +} + + +static void global_sync(struct global *g) +{ + pa_context *c = g->context; + c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, c->pending_seq); + g->sync = true; +} + static struct param *add_param(struct spa_list *params, uint32_t id, const struct spa_pod *param) { struct param *p; @@ -394,8 +413,11 @@ static void device_event_info(void *object, const struct pw_device_info *info) remove_params(&g->card_info.ports, id); g->card_info.n_ports = 0; break; - case SPA_PARAM_Profile: case SPA_PARAM_Route: + remove_params(&g->card_info.routes, id); + g->card_info.n_routes = 0; + break; + case SPA_PARAM_Profile: break; default: do_enum = false; @@ -500,7 +522,6 @@ static void device_event_param(void *object, int seq, const struct spa_pod *param) { struct global *g = object; - pa_context *c = g->context; switch (id) { case SPA_PARAM_EnumProfile: @@ -557,29 +578,21 @@ static void device_event_param(void *object, int seq, { uint32_t index, device; enum spa_direction direction; - struct spa_pod *props = NULL; - struct global *ng; if (spa_pod_parse_object(param, SPA_TYPE_OBJECT_ParamRoute, NULL, SPA_PARAM_ROUTE_index, SPA_POD_Int(&index), SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction), - SPA_PARAM_ROUTE_device, SPA_POD_Int(&device), - SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) { + SPA_PARAM_ROUTE_device, SPA_POD_Int(&device)) < 0) { pw_log_warn("device %d: can't parse route", g->id); return; } + if (add_param(&g->card_info.routes, id, param)) + g->card_info.n_routes++; - pw_log_debug("device %d: active %s route %d", g->id, + pw_log_debug("device %d: active %s route %d device %d", g->id, direction == SPA_DIRECTION_OUTPUT ? "output" : "input", - index); - - ng = find_node_for_route(c, g, device); - if (props && ng && ng->node_info.active_port != index) { - ng->node_info.active_port = index; - parse_props(ng, props, true); - emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE); - } + index, device); break; } default: @@ -702,6 +715,7 @@ static void device_clear_ports(struct global *g) static void device_sync_ports(struct global *g) { pa_card_info *i = &g->card_info.info; + pa_context *c = g->context; uint32_t n_ports, j; struct param *p; @@ -741,6 +755,8 @@ static void device_sync_ports(struct global *g) continue; } + pw_log_debug("port %d: name:%s", j, name); + pi = i->ports[j] = &g->card_info.card_ports[j]; spa_zero(*pi); pi->name = name; @@ -800,6 +816,30 @@ static void device_sync_ports(struct global *g) i->n_ports = j; if (i->n_ports == 0) i->ports = NULL; + + spa_list_for_each(p, &g->card_info.routes, link) { + struct global *ng; + uint32_t index, device; + enum spa_direction direction; + struct spa_pod *props = NULL; + + if (spa_pod_parse_object(p->param, + SPA_TYPE_OBJECT_ParamRoute, NULL, + SPA_PARAM_ROUTE_index, SPA_POD_Int(&index), + SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction), + SPA_PARAM_ROUTE_device, SPA_POD_Int(&device), + SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) { + pw_log_warn("device %d: can't parse route", g->id); + continue; + } + + ng = find_node_for_route(c, g, device); + if (props && ng && ng->node_info.active_port != index) { + ng->node_info.active_port = index; + parse_props(ng, props, true); + emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE); + } + } } static void device_sync(struct global *g) @@ -831,6 +871,7 @@ static void device_destroy(void *data) device_clear_ports(global); device_clear_profiles(global); + remove_params(&global->card_info.routes, SPA_ID_INVALID); remove_params(&global->card_info.ports, SPA_ID_INVALID); remove_params(&global->card_info.profiles, SPA_ID_INVALID); @@ -1078,35 +1119,10 @@ static void proxy_destroy(void *data) g->proxy = NULL; } -static void proxy_done(void *data, int seq) -{ - struct global *g = data; - pa_subscription_event_type_t event; - - if (g->pending_seq == seq) { - if (g->ginfo && g->ginfo->sync) - g->ginfo->sync(g); - if (g->init) { - if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) { - if (g->node_info.device_index == SPA_ID_INVALID || - (g->stream && g->stream->state != PA_STREAM_READY)) - return; - } - g->init = false; - event = PA_SUBSCRIPTION_EVENT_NEW; - } else { - event = PA_SUBSCRIPTION_EVENT_CHANGE; - } - pw_log_debug("emit because of pending"); - emit_event(g->context, g, event); - } -} - static const struct pw_proxy_events proxy_events = { PW_VERSION_PROXY_EVENTS, .removed = proxy_removed, .destroy = proxy_destroy, - .done = proxy_done, }; static void update_link(pa_context *c, uint32_t src_node_id, uint32_t dst_node_id) @@ -1153,6 +1169,7 @@ static int set_mask(pa_context *c, struct global *g) ginfo = &device_info; spa_list_init(&g->card_info.profiles); spa_list_init(&g->card_info.ports); + spa_list_init(&g->card_info.routes); } else if (strcmp(g->type, PW_TYPE_INTERFACE_Node) == 0) { if (g->props == NULL) return 0; @@ -1347,19 +1364,6 @@ static const struct pw_registry_events registry_events = .global_remove = registry_event_global_remove, }; -static void complete_operations(pa_context *c, int seq) -{ - pa_operation *o, *t; - spa_list_for_each_safe(o, t, &c->operations, link) { - if (o->seq != seq) - continue; - pa_operation_ref(o); - if (o->callback) - o->callback(o, o->userdata); - pa_operation_unref(o); - } -} - static void core_info(void *data, const struct pw_core_info *info) { pa_context *c = data; @@ -1394,8 +1398,25 @@ static void core_error(void *data, uint32_t id, int seq, int res, const char *me static void core_done(void *data, uint32_t id, int seq) { pa_context *c = data; - pw_log_debug("done id:%u seq:%d", id, seq); - complete_operations(c, seq); + pa_operation *o, *t; + struct global *g; + + pw_log_debug("done id:%u seq:%d/%d", id, seq, c->pending_seq); + if (c->pending_seq != seq) + return; + + spa_list_for_each(g, &c->globals, link) { + if (g->sync) { + do_global_sync(g); + g->sync = false; + } + } + spa_list_for_each_safe(o, t, &c->operations, link) { + pa_operation_ref(o); + if (o->callback) + o->callback(o, o->userdata); + pa_operation_unref(o); + } } static const struct pw_core_events core_events = { diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index e67bcc59..f7a62f8f 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -256,8 +256,8 @@ struct global { pa_subscription_event_type_t event; int priority_driver; - int pending_seq; int init:1; + int sync:1; void *info; struct global_info *ginfo; @@ -306,6 +306,8 @@ struct global { uint32_t active_profile; struct spa_list ports; uint32_t n_ports; + struct spa_list routes; + uint32_t n_routes; pa_card_info info; pa_card_profile_info2 *card_profiles; unsigned int pending_profiles:1; @@ -371,6 +373,8 @@ struct pa_context { int no_fail:1; int disconnect:1; + int pending_seq; + struct global *metadata; uint32_t default_sink; uint32_t default_source; @@ -485,7 +489,6 @@ struct pa_operation pa_context *context; pa_stream *stream; - int seq; pa_operation_state_t state; pa_operation_cb_t callback; diff --git a/pipewire-pulseaudio/src/operation.c b/pipewire-pulseaudio/src/operation.c index 3b6f8d55..dd14a6c4 100644 --- a/pipewire-pulseaudio/src/operation.c +++ b/pipewire-pulseaudio/src/operation.c @@ -36,7 +36,6 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb o->refcount = 1; o->context = c; o->stream = s ? pa_stream_ref(s) : NULL; - o->seq = SPA_ID_INVALID; o->state = PA_OPERATION_RUNNING; o->callback = cb; @@ -52,8 +51,8 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb int pa_operation_sync(pa_operation *o) { pa_context *c = o->context; - o->seq = pw_core_sync(c->core, PW_ID_CORE, 0); - pw_log_debug("operation %p: sync seq:%d", o, o->seq); + c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, 0); + pw_log_debug("operation %p: sync seq:%d", o, c->pending_seq); return 0; } @@ -70,14 +69,14 @@ static void operation_free(pa_operation *o) { pa_assert(!o->context); pa_assert(!o->stream); - pw_log_debug("%p seq:%d", o, o->seq); + pw_log_debug("%p", o); free(o); } static void operation_unlink(pa_operation *o) { pa_assert(o); - pw_log_debug("%p seq:%d", o, o->seq); + pw_log_debug("%p", o); if (o->context) { pa_assert(o->refcount >= 2); @@ -100,7 +99,7 @@ void pa_operation_unref(pa_operation *o) { pa_assert(o); pa_assert(o->refcount >= 1); - pw_log_debug("%p seq:%d ref:%d", o, o->seq, o->refcount); + pw_log_debug("%p ref:%d", o, o->refcount); if (--o->refcount == 0) operation_free(o); } @@ -114,7 +113,7 @@ static void operation_set_state(pa_operation *o, pa_operation_state_t st) { pa_operation_ref(o); - pw_log_debug("new state %p seq:%d state:%d", o, o->seq, st); + pw_log_debug("new state %p state:%d", o, st); o->state = st; if (o->state_callback) @@ -132,7 +131,7 @@ void pa_operation_cancel(pa_operation *o) { pa_assert(o); pa_assert(o->refcount >= 1); - pw_log_debug("%p seq:%d", o, o->seq); + pw_log_debug("%p", o); operation_set_state(o, PA_OPERATION_CANCELED); } -- cgit v1.2.3