summaryrefslogtreecommitdiff
path: root/pipewire-jack/src/pipewire-jack.c
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-04-30 12:57:56 +0200
committerWim Taymans <wtaymans@redhat.com>2020-04-30 12:57:56 +0200
commit30d5a247e1b6e9a6547d4c671788ab2b72e8cc6c (patch)
tree6f3d841832c55825b14ab469792c9aaaa8250d9f /pipewire-jack/src/pipewire-jack.c
parentacd9991bfc0050a778061bfccd5911dc3f0a5897 (diff)
jack: improve locking
Protect the global metadata with a lock because we update this from multiple clients. Avoid updating the metadata if it didn't change. Add a simple lock to protect the session objects, they could be accessed from the main thread or data thread. Use the simple lock in methods that just read. Use the new data loop invoke to make sure we sync the data update with the data thread. Stop the data loop when our position io is removed.
Diffstat (limited to 'pipewire-jack/src/pipewire-jack.c')
-rw-r--r--pipewire-jack/src/pipewire-jack.c163
1 files changed, 105 insertions, 58 deletions
diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c
index 14906cb3..e698326f 100644
--- a/pipewire-jack/src/pipewire-jack.c
+++ b/pipewire-jack/src/pipewire-jack.c
@@ -86,7 +86,7 @@ struct port;
struct globals {
jack_thread_creator_t creator;
- jack_client_t *client;
+ pthread_mutex_t lock;
struct pw_array descriptions;
};
@@ -214,11 +214,13 @@ struct link {
};
struct context {
- struct pw_thread_loop *loop;
+ struct pw_thread_loop *loop; /* thread_lock protects all below */
struct pw_context *context;
- struct pw_map globals;
struct spa_list free_objects;
+
+ pthread_mutex_t lock; /* protects map and lists below, in addition to thread_lock */
+ struct pw_map globals;
struct spa_list ports;
struct spa_list nodes;
struct spa_list links;
@@ -317,7 +319,10 @@ struct client {
struct pw_node_activation *activation;
uint32_t xrun_count;
- struct spa_list target_links;
+ struct {
+ struct pw_node_activation *driver_activation;
+ struct spa_list target_links;
+ } rt;
unsigned int started:1;
unsigned int active:1;
@@ -374,7 +379,9 @@ static struct object * alloc_object(struct client *c)
static void free_object(struct client *c, struct object *o)
{
+ pthread_mutex_lock(&c->context.lock);
spa_list_remove(&o->link);
+ pthread_mutex_unlock(&c->context.lock);
spa_list_append(&c->context.free_objects, &o->link);
}
@@ -456,7 +463,6 @@ static struct port * alloc_port(struct client *c, enum spa_direction direction)
o->id = SPA_ID_INVALID;
o->port.node_id = c->node_id;
o->port.port_id = p->id;
- spa_list_append(&c->context.ports, &o->link);
p->valid = true;
p->zeroed = false;
@@ -466,6 +472,10 @@ static struct port * alloc_port(struct client *c, enum spa_direction direction)
spa_list_append(&c->ports[direction], &p->link);
+ pthread_mutex_lock(&c->context.lock);
+ spa_list_append(&c->context.ports, &o->link);
+ pthread_mutex_unlock(&c->context.lock);
+
return p;
}
@@ -687,8 +697,8 @@ do_remove_sources(struct spa_loop *loop,
static void unhandle_socket(struct client *c)
{
- pw_loop_invoke(c->loop->loop,
- do_remove_sources, 1, NULL, 0, true, c);
+ pw_data_loop_invoke(c->loop,
+ do_remove_sources, 1, NULL, 0, true, c);
}
static void reuse_buffer(struct client *c, struct mix *mix, uint32_t id)
@@ -986,7 +996,7 @@ static inline uint32_t cycle_run(struct client *c)
int fd = c->socket_source->fd;
struct spa_io_position *pos = c->position;
struct pw_node_activation *activation = c->activation;
- struct pw_node_activation *driver = c->driver_activation;
+ struct pw_node_activation *driver = c->rt.driver_activation;
/* this is blocking if nothing ready */
if (SPA_UNLIKELY(read(fd, &cmd, sizeof(cmd)) != sizeof(cmd))) {
@@ -1062,7 +1072,7 @@ static inline void signal_sync(struct client *c)
activation->finish_time = nsec;
cmd = 1;
- spa_list_for_each(l, &c->target_links, target_link) {
+ spa_list_for_each(l, &c->rt.target_links, target_link) {
struct pw_node_activation_state *state;
if (SPA_UNLIKELY(l->activation == NULL))
@@ -1087,7 +1097,7 @@ static inline void signal_sync(struct client *c)
static inline void cycle_signal(struct client *c, int status)
{
- struct pw_node_activation *driver = c->driver_activation;
+ struct pw_node_activation *driver = c->rt.driver_activation;
struct pw_node_activation *activation = c->activation;
if (SPA_LIKELY(status == 0)) {
@@ -1138,9 +1148,19 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask)
}
}
-static void clear_link(struct client *c, struct link *link)
+static int
+do_clear_link(struct spa_loop *loop,
+ bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
+ struct link *link = user_data;
spa_list_remove(&link->target_link);
+ return 0;
+}
+
+static void clear_link(struct client *c, struct link *link)
+{
+ pw_data_loop_invoke(c->loop,
+ do_clear_link, 1, NULL, 0, true, link);
pw_memmap_free(link->mem);
close(link->signalfd);
spa_list_remove(&link->link);
@@ -1154,8 +1174,6 @@ static void clean_transport(struct client *c)
if (!c->has_transport)
return;
- pw_data_loop_stop(c->loop);
-
unhandle_socket(c);
spa_list_consume(l, &c->links, link)
@@ -1238,6 +1256,15 @@ static int install_timemaster(struct client *c)
return 0;
}
+static int
+do_update_driver_activation(struct spa_loop *loop,
+ bool async, uint32_t seq, const void *data, size_t size, void *user_data)
+{
+ struct client *c = user_data;
+ c->rt.driver_activation = c->driver_activation;
+ return 0;
+}
+
static int update_driver_activation(struct client *c)
{
struct link *link;
@@ -1245,7 +1272,9 @@ static int update_driver_activation(struct client *c)
link = find_activation(&c->links, c->driver_id);
c->driver_activation = link ? link->activation : NULL;
-
+ pw_data_loop_invoke(c->loop,
+ do_update_driver_activation, SPA_ID_INVALID, NULL, 0,
+ c->driver_activation == NULL, c);
install_timemaster(c);
return 0;
@@ -1283,12 +1312,13 @@ static int client_node_set_io(void *object,
switch (id) {
case SPA_IO_Position:
+ if (ptr == NULL)
+ pw_data_loop_stop(c->loop);
c->position = ptr;
c->driver_id = ptr ? c->position->clock.id : SPA_ID_INVALID;
update_driver_activation(c);
- if (ptr) {
+ if (ptr)
check_sample_rate(c);
- }
break;
default:
break;
@@ -1774,7 +1804,7 @@ do_activate_link(struct spa_loop *loop,
struct link *link = user_data;
struct client *c = link->client;
pw_log_trace("link %p activate", link);
- spa_list_append(&c->target_links, &link->target_link);
+ spa_list_append(&c->rt.target_links, &link->target_link);
return 0;
}
@@ -1829,7 +1859,7 @@ static int client_node_set_activation(void *object,
link->signalfd = signalfd;
spa_list_append(&c->links, &link->link);
- pw_loop_invoke(c->loop->loop,
+ pw_data_loop_invoke(c->loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, link);
}
else {
@@ -1948,7 +1978,10 @@ static void registry_event_global(void *data, uint32_t id,
o->node.priority = pw_properties_parse_int(str);
pw_log_debug(NAME" %p: add node %d", c, id);
+
+ pthread_mutex_lock(&c->context.lock);
spa_list_append(&c->context.nodes, &o->link);
+ pthread_mutex_unlock(&c->context.lock);
}
else if (strcmp(type, PW_TYPE_INTERFACE_Port) == 0) {
const struct spa_dict_item *item;
@@ -2004,7 +2037,10 @@ static void registry_event_global(void *data, uint32_t id,
if (o == NULL)
goto exit;
+ pthread_mutex_lock(&c->context.lock);
spa_list_append(&c->context.ports, &o->link);
+ pthread_mutex_unlock(&c->context.lock);
+
ot = pw_map_lookup(&c->context.globals, node_id);
if (ot == NULL || ot->type != INTERFACE_Node)
goto exit_free;
@@ -2042,7 +2078,9 @@ static void registry_event_global(void *data, uint32_t id,
o = alloc_object(c);
object_type = INTERFACE_Link;
+ pthread_mutex_lock(&c->context.lock);
spa_list_append(&c->context.links, &o->link);
+ pthread_mutex_unlock(&c->context.lock);
if ((str = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_PORT)) == NULL)
goto exit_free;
@@ -2079,10 +2117,12 @@ static void registry_event_global(void *data, uint32_t id,
o->type = object_type;
o->id = id;
+ pthread_mutex_lock(&c->context.lock);
size = pw_map_get_size(&c->context.globals);
while (id > size)
pw_map_insert_at(&c->context.globals, size++, NULL);
pw_map_insert_at(&c->context.globals, id, o);
+ pthread_mutex_unlock(&c->context.lock);
pw_thread_loop_unlock(c->context.loop);
@@ -2192,6 +2232,7 @@ jack_client_t * jack_client_open (const char *client_name,
0);
client->allow_mlock = client->context.context->defaults.mem_allow_mlock;
spa_list_init(&client->context.free_objects);
+ pthread_mutex_init(&client->context.lock, NULL);
spa_list_init(&client->context.nodes);
spa_list_init(&client->context.ports);
spa_list_init(&client->context.links);
@@ -2215,7 +2256,7 @@ jack_client_t * jack_client_open (const char *client_name,
goto init_failed;
spa_list_init(&client->links);
- spa_list_init(&client->target_links);
+ spa_list_init(&client->rt.target_links);
client->buffer_frames = (uint32_t)-1;
client->sample_rate = (uint32_t)-1;
@@ -2301,8 +2342,6 @@ jack_client_t * jack_client_open (const char *client_name,
if (status)
*status = 0;
- globals.client = (jack_client_t *)client;
-
pw_log_debug(NAME" %p: new", client);
return (jack_client_t *)client;
@@ -2349,9 +2388,9 @@ int jack_client_close (jack_client_t *client)
res = jack_deactivate(client);
- pw_thread_loop_stop(c->context.loop);
-
pw_context_destroy(c->context.context);
+
+ pw_thread_loop_stop(c->context.loop);
pw_thread_loop_destroy(c->context.loop);
pw_log_debug(NAME" %p: free", client);
@@ -2383,19 +2422,23 @@ char *jack_get_uuid_for_client_name (jack_client_t *client,
{
struct client *c = (struct client *) client;
struct object *o;
+ char *uuid = NULL;
spa_return_val_if_fail(c != NULL, NULL);
spa_return_val_if_fail(client_name != NULL, NULL);
+ pthread_mutex_lock(&c->context.lock);
+
spa_list_for_each(o, &c->context.nodes, link) {
if (strcmp(o->node.name, client_name) == 0) {
- char *uuid = spa_aprintf( "%" PRIu64, (cuuid << 32) | o->id);
+ uuid = spa_aprintf( "%" PRIu64, (cuuid << 32) | o->id);
pw_log_debug(NAME" %p: name %s -> %s",
client, client_name, uuid);
- return uuid;
+ break;
}
}
- return NULL;
+ pthread_mutex_unlock(&c->context.lock);
+ return uuid;
}
SPA_EXPORT
@@ -2406,6 +2449,7 @@ char *jack_get_client_name_by_uuid (jack_client_t *client,
struct object *o;
jack_uuid_t uuid;
jack_uuid_t cuuid = 0x2;
+ char *name = NULL;
spa_return_val_if_fail(c != NULL, NULL);
spa_return_val_if_fail(client_uuid != NULL, NULL);
@@ -2413,14 +2457,17 @@ char *jack_get_client_name_by_uuid (jack_client_t *client,
if (jack_uuid_parse(client_uuid, &uuid) < 0)
return NULL;
+ pthread_mutex_lock(&c->context.lock);
spa_list_for_each(o, &c->context.nodes, link) {
if ((cuuid << 32 | o->id) == uuid) {
pw_log_debug(NAME" %p: uuid %s (%"PRIu64")-> %s",
client, client_uuid, uuid, o->node.name);
- return strdup(o->node.name);
+ name = strdup(o->node.name);
+ break;
}
}
- return NULL;
+ pthread_mutex_unlock(&c->context.lock);
+ return name;
}
SPA_EXPORT
@@ -2442,14 +2489,16 @@ static int do_activate(struct client *c)
{
int res;
- pw_data_loop_start(c->loop);
-
pw_thread_loop_lock(c->context.loop);
+
+ if ((res = pw_data_loop_start(c->loop)) < 0)
+ goto done;
+
pw_log_debug(NAME" %p: activate", c);
pw_client_node_set_active(c->node, true);
res = do_sync(c);
-
+done:
pw_thread_loop_unlock(c->context.loop);
return res;
}
@@ -2490,6 +2539,8 @@ int jack_deactivate (jack_client_t *client)
pw_thread_loop_lock(c->context.loop);
pw_log_debug(NAME" %p: deactivate", c);
+ pw_data_loop_stop(c->loop);
+
pw_client_node_set_active(c->node, false);
c->activation->pending_new_pos = false;
@@ -2499,12 +2550,11 @@ int jack_deactivate (jack_client_t *client)
pw_thread_loop_unlock(c->context.loop);
- pw_data_loop_stop(c->loop);
-
if (res < 0)
return res;
c->active = false;
+
return 0;
}
@@ -3250,13 +3300,13 @@ int jack_port_connected (const jack_port_t *port)
c = o->client;
- pw_thread_loop_lock(c->context.loop);
+ pthread_mutex_lock(&c->context.lock);
spa_list_for_each(l, &c->context.links, link) {
if (l->port_link.src == o->id ||
l->port_link.dst == o->id)
res++;
}
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_unlock(&c->context.lock);
return res;
}
@@ -3275,7 +3325,7 @@ int jack_port_connected_to (const jack_port_t *port,
c = o->client;
- pw_thread_loop_lock(c->context.loop);
+ pthread_mutex_lock(&c->context.lock);
p = find_port(c, port_name);
if (p == NULL)
@@ -3293,7 +3343,7 @@ int jack_port_connected_to (const jack_port_t *port,
res = 1;
exit:
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_lock(&c->context.lock);
return res;
}
@@ -3323,8 +3373,7 @@ const char ** jack_port_get_all_connections (const jack_client_t *client,
res = malloc(sizeof(char*) * (CONNECTION_NUM_FOR_PORT + 1));
- pw_thread_loop_lock(c->context.loop);
-
+ pthread_mutex_lock(&c->context.lock);
spa_list_for_each(l, &c->context.links, link) {
if (l->port_link.src == o->id)
p = pw_map_lookup(&c->context.globals, l->port_link.dst);
@@ -3340,7 +3389,7 @@ const char ** jack_port_get_all_connections (const jack_client_t *client,
if (count == CONNECTION_NUM_FOR_PORT)
break;
}
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_unlock(&c->context.lock);
if (count == 0) {
free(res);
@@ -3560,11 +3609,9 @@ int jack_port_request_monitor_by_name (jack_client_t *client,
spa_return_val_if_fail(c != NULL, -EINVAL);
spa_return_val_if_fail(port_name != NULL, -EINVAL);
- pw_thread_loop_lock(c->context.loop);
-
+ pthread_mutex_lock(&c->context.lock);
p = find_port(c, port_name);
-
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_unlock(&c->context.lock);
if (p == NULL) {
pw_log_error(NAME" %p: jack_port_request_monitor_by_name called"
@@ -3885,11 +3932,11 @@ const char ** jack_get_ports (jack_client_t *client,
if (type_name_pattern && type_name_pattern[0])
regcomp(&type_regex, type_name_pattern, REG_EXTENDED | REG_NOSUB);
- pw_thread_loop_lock(c->context.loop);
pw_log_debug(NAME" %p: ports id:%d name:%s type:%s flags:%08lx", c, id,
port_name_pattern, type_name_pattern, flags);
+ pthread_mutex_lock(&c->context.lock);
count = 0;
spa_list_for_each(o, &c->context.ports, link) {
pw_log_debug(NAME" %p: check port type:%d flags:%08lx name:%s", c,
@@ -3917,6 +3964,8 @@ const char ** jack_get_ports (jack_client_t *client,
c, o->port.name, o->port.priority, count);
tmp[count++] = o;
}
+ pthread_mutex_unlock(&c->context.lock);
+
if (count > 0) {
qsort(tmp, count, sizeof(struct object *), port_compare_func);
@@ -3928,8 +3977,6 @@ const char ** jack_get_ports (jack_client_t *client,
res = NULL;
}
- pw_thread_loop_unlock(c->context.loop);
-
if (port_name_pattern && port_name_pattern[0])
regfree(&port_regex);
if (type_name_pattern && type_name_pattern[0])
@@ -3946,11 +3993,9 @@ jack_port_t * jack_port_by_name (jack_client_t *client, const char *port_name)
spa_return_val_if_fail(c != NULL, NULL);
- pw_thread_loop_lock(c->context.loop);
-
+ pthread_mutex_lock(&c->context.lock);
res = find_port(c, port_name);
-
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_unlock(&c->context.lock);
return (jack_port_t *)res;
}
@@ -3964,7 +4009,7 @@ jack_port_t * jack_port_by_id (jack_client_t *client,
spa_return_val_if_fail(c != NULL, NULL);
- pw_thread_loop_lock(c->context.loop);
+ pthread_mutex_lock(&c->context.lock);
o = pw_map_lookup(&c->context.globals, port_id);
pw_log_debug(NAME" %p: port %d -> %p", c, port_id, o);
@@ -3975,7 +4020,7 @@ jack_port_t * jack_port_by_id (jack_client_t *client,
res = o;
exit:
- pw_thread_loop_unlock(c->context.loop);
+ pthread_mutex_unlock(&c->context.lock);
return (jack_port_t *)res;
}
@@ -4239,7 +4284,7 @@ jack_transport_state_t jack_transport_query (const jack_client_t *client,
spa_return_val_if_fail(c != NULL, JackTransportStopped);
- if (SPA_LIKELY((a = c->driver_activation) != NULL))
+ if (SPA_LIKELY((a = c->rt.driver_activation) != NULL))
jack_state = position_to_jack(a, pos);
else if (pos != NULL)
memset(pos, 0, sizeof(jack_position_t));
@@ -4258,7 +4303,7 @@ jack_nframes_t jack_get_current_transport_frame (const jack_client_t *client)
spa_return_val_if_fail(c != NULL, -EINVAL);
- if (SPA_UNLIKELY((a = c->driver_activation) == NULL))
+ if (SPA_UNLIKELY((a = c->rt.driver_activation) == NULL))
return -EIO;
pos = &a->position;
@@ -4284,7 +4329,7 @@ int jack_transport_reposition (jack_client_t *client,
spa_return_val_if_fail(c != NULL, -EINVAL);
- a = c->driver_activation;
+ a = c->rt.driver_activation;
na = c->activation;
if (!a || !na)
return -EIO;
@@ -4305,7 +4350,7 @@ int jack_transport_reposition (jack_client_t *client,
static void update_command(struct client *c, uint32_t command)
{
- struct pw_node_activation *a = c->driver_activation;
+ struct pw_node_activation *a = c->rt.driver_activation;
if (!a)
return;
ATOMIC_STORE(a->command, command);
@@ -4642,7 +4687,7 @@ int jack_get_video_image_size(jack_client_t *client, jack_image_size_t *size)
spa_return_val_if_fail(c != NULL, 0);
- a = c->driver_activation;
+ a = c->rt.driver_activation;
if (SPA_UNLIKELY(a == NULL))
a = c->activation;
if (SPA_UNLIKELY(a == NULL))
@@ -4663,4 +4708,6 @@ static void reg(void) __attribute__ ((constructor));
static void reg(void)
{
pw_init(NULL, NULL);
+ pthread_mutex_init(&globals.lock, NULL);
+ pw_array_init(&globals.descriptions, 16);
}