From bb3da2fcc198b32037689b8d2fd3c68ba1681314 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 17 Apr 2019 15:23:11 +0200 Subject: remote-node: reuse activation logic in pw_node Set the received eventfd as the node eventfd and let existing logic take care of scheduling the node. --- src/gst/gstpipewiredeviceprovider.c | 4 +- src/modules/module-client-node/remote-node.c | 91 ++++------------------------ 2 files changed, 14 insertions(+), 81 deletions(-) diff --git a/src/gst/gstpipewiredeviceprovider.c b/src/gst/gstpipewiredeviceprovider.c index 7a96cffa..71c4ffa9 100644 --- a/src/gst/gstpipewiredeviceprovider.c +++ b/src/gst/gstpipewiredeviceprovider.c @@ -356,8 +356,8 @@ on_state_changed (void *data, enum pw_remote_state old, enum pw_remote_state sta switch (state) { case PW_REMOTE_STATE_CONNECTING: - break; case PW_REMOTE_STATE_UNCONNECTED: + break; case PW_REMOTE_STATE_CONNECTED: self->core_proxy = pw_remote_get_core_proxy(self->remote); pw_core_proxy_add_listener(self->core_proxy, &rd->core_listener, &core_events, self); @@ -399,7 +399,7 @@ static const struct pw_port_proxy_events port_events = { static void node_event_info(void *data, const struct pw_node_info *info) { struct node_data *node_data = data; - pw_log_debug("%p", node_data); + pw_log_debug("%p", node_data->proxy); node_data->info = pw_node_info_update(node_data->info, info); } diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index 7e60a629..f69a6f36 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -92,7 +92,6 @@ struct node_data { uint32_t remote_id; int rtwritefd; - struct spa_source *rtsocket_source; struct mix mix_pool[MAX_MIX]; struct spa_list mix[2]; @@ -102,7 +101,8 @@ struct node_data { struct pw_node *node; struct spa_hook node_listener; - bool do_free; + int do_free:1; + int have_transport:1; struct pw_client_node_proxy *node_proxy; struct spa_hook node_proxy_listener; @@ -116,50 +116,6 @@ struct node_data { /** \endcond */ -static int -do_remove_source(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct node_data *d = user_data; - - if (d->rtsocket_source) { - pw_loop_destroy_source(d->core->data_loop, d->rtsocket_source); - d->rtsocket_source = NULL; - } - return 0; -} - - -static void unhandle_socket(struct node_data *data) -{ - pw_loop_invoke(data->core->data_loop, - do_remove_source, 1, NULL, 0, true, data); -} - -static void -on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) -{ - struct pw_proxy *proxy = user_data; - struct node_data *data = proxy->user_data; - - if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { - pw_log_warn("got error"); - unhandle_socket(data); - return; - } - - if (mask & SPA_IO_IN) { - uint64_t cmd; - - if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd) || cmd != 1) - pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd); - - pw_log_trace("remote %p: process %p", data->remote, proxy); - - data->node->rt.target.signal(data->node->rt.target.data); - } -} - static struct link *find_activation(struct pw_array *links, uint32_t node_id) { struct link *l; @@ -231,7 +187,7 @@ static void clear_mem(struct node_data *data, struct mem *m) int fd; struct mem *m2; - pw_log_debug("remote %p: clear mem %d", data, m->id); + pw_log_debug("remote %p: clear mem %p %d %d", data, m, m->id, m->fd); fd = m->fd; m->fd = -1; @@ -263,11 +219,9 @@ static void clean_transport(struct node_data *data) struct mem *m; struct link *l; - if (data->rtsocket_source == NULL) + if (!data->have_transport) return; - unhandle_socket(data); - pw_array_for_each(m, &data->mems) clear_mem(data, m); pw_array_clear(&data->mems); @@ -278,6 +232,7 @@ static void clean_transport(struct node_data *data) close(data->rtwritefd); data->remote_id = SPA_ID_INVALID; + data->have_transport = false; } static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) @@ -407,16 +362,16 @@ static int client_node_transport(void *object, uint32_t node_id, clean_transport(data); + data->have_transport = true; data->remote_id = node_id; pw_log_debug("remote-node %p: create transport with fds %d %d for node %u", proxy, readfd, writefd, node_id); data->rtwritefd = writefd; - data->rtsocket_source = pw_loop_add_io(remote->core->data_loop, - readfd, - SPA_IO_ERR | SPA_IO_HUP, - true, on_rtsocket_condition, proxy); + close(data->node->source.fd); + data->node->source.fd = readfd; + if (data->node->active) pw_client_node_proxy_set_active(data->node_proxy, true); @@ -601,32 +556,16 @@ static int client_node_event(void *object, const struct spa_event *event) return -ENOTSUP; } -static int -do_pause_source(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct node_data *d = user_data; - pw_loop_update_io(d->core->data_loop, - d->rtsocket_source, - SPA_IO_ERR | SPA_IO_HUP); - return 0; -} - static int client_node_command(void *object, const struct spa_command *command) { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - struct pw_remote *remote = proxy->remote; int res; switch (SPA_NODE_COMMAND_ID(command)) { case SPA_NODE_COMMAND_Pause: pw_log_debug("node %p: pause", proxy); - if (data->rtsocket_source) { - pw_loop_invoke(data->core->data_loop, - do_pause_source, 1, NULL, 0, true, data); - } if ((res = pw_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0) { pw_log_warn("node %p: pause failed", proxy); pw_proxy_error(proxy, res, "pause failed"); @@ -640,11 +579,6 @@ static int client_node_command(void *object, const struct spa_command *command) pw_log_warn("node %p: start failed", proxy); pw_proxy_error(proxy, res, "start failed"); } - else if (data->rtsocket_source) { - pw_loop_update_io(remote->core->data_loop, - data->rtsocket_source, - SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); - } break; default: pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command)); @@ -1021,6 +955,7 @@ client_node_set_activation(void *object, if (data->remote_id == node_id) { pw_log_debug("node %p: our activation %u: %u %u %u %p", node, node_id, memid, offset, size, ptr); + close(signalfd); return 0; } @@ -1125,14 +1060,12 @@ static void node_destroy(void *data) static void node_free(void *data) { struct node_data *d = data; - struct pw_remote *remote = d->remote; struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; pw_log_debug("%p: free", d); - if (remote->core_proxy) - pw_core_proxy_destroy(remote->core_proxy, proxy); - spa_hook_remove(&d->proxy_listener); + pw_proxy_destroy(d->proxy); + pw_proxy_destroy(proxy); } static void node_info_changed(void *data, const struct pw_node_info *info) -- cgit v1.2.3