summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2019-04-17 15:23:11 +0200
committerWim Taymans <wtaymans@redhat.com>2019-04-17 15:23:11 +0200
commitbb3da2fcc198b32037689b8d2fd3c68ba1681314 (patch)
tree55b5d2bcdfa49519fe3e3d26edcabf0af4447c79
parent715594c6eff5c5d9a14201331310c65793069301 (diff)
remote-node: reuse activation logic in pw_node
Set the received eventfd as the node eventfd and let existing logic take care of scheduling the node.
-rw-r--r--src/gst/gstpipewiredeviceprovider.c4
-rw-r--r--src/modules/module-client-node/remote-node.c91
2 files changed, 14 insertions, 81 deletions
diff --git a/src/gst/gstpipewiredeviceprovider.c b/src/gst/gstpipewiredeviceprovider.c
index 7a96cffa..71c4ffa9 100644
--- a/src/gst/gstpipewiredeviceprovider.c
+++ b/src/gst/gstpipewiredeviceprovider.c
@@ -356,8 +356,8 @@ on_state_changed (void *data, enum pw_remote_state old, enum pw_remote_state sta
switch (state) {
case PW_REMOTE_STATE_CONNECTING:
- break;
case PW_REMOTE_STATE_UNCONNECTED:
+ break;
case PW_REMOTE_STATE_CONNECTED:
self->core_proxy = pw_remote_get_core_proxy(self->remote);
pw_core_proxy_add_listener(self->core_proxy, &rd->core_listener, &core_events, self);
@@ -399,7 +399,7 @@ static const struct pw_port_proxy_events port_events = {
static void node_event_info(void *data, const struct pw_node_info *info)
{
struct node_data *node_data = data;
- pw_log_debug("%p", node_data);
+ pw_log_debug("%p", node_data->proxy);
node_data->info = pw_node_info_update(node_data->info, info);
}
diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c
index 7e60a629..f69a6f36 100644
--- a/src/modules/module-client-node/remote-node.c
+++ b/src/modules/module-client-node/remote-node.c
@@ -92,7 +92,6 @@ struct node_data {
uint32_t remote_id;
int rtwritefd;
- struct spa_source *rtsocket_source;
struct mix mix_pool[MAX_MIX];
struct spa_list mix[2];
@@ -102,7 +101,8 @@ struct node_data {
struct pw_node *node;
struct spa_hook node_listener;
- bool do_free;
+ int do_free:1;
+ int have_transport:1;
struct pw_client_node_proxy *node_proxy;
struct spa_hook node_proxy_listener;
@@ -116,50 +116,6 @@ struct node_data {
/** \endcond */
-static int
-do_remove_source(struct spa_loop *loop,
- bool async, uint32_t seq, const void *data, size_t size, void *user_data)
-{
- struct node_data *d = user_data;
-
- if (d->rtsocket_source) {
- pw_loop_destroy_source(d->core->data_loop, d->rtsocket_source);
- d->rtsocket_source = NULL;
- }
- return 0;
-}
-
-
-static void unhandle_socket(struct node_data *data)
-{
- pw_loop_invoke(data->core->data_loop,
- do_remove_source, 1, NULL, 0, true, data);
-}
-
-static void
-on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
-{
- struct pw_proxy *proxy = user_data;
- struct node_data *data = proxy->user_data;
-
- if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
- pw_log_warn("got error");
- unhandle_socket(data);
- return;
- }
-
- if (mask & SPA_IO_IN) {
- uint64_t cmd;
-
- if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd) || cmd != 1)
- pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd);
-
- pw_log_trace("remote %p: process %p", data->remote, proxy);
-
- data->node->rt.target.signal(data->node->rt.target.data);
- }
-}
-
static struct link *find_activation(struct pw_array *links, uint32_t node_id)
{
struct link *l;
@@ -231,7 +187,7 @@ static void clear_mem(struct node_data *data, struct mem *m)
int fd;
struct mem *m2;
- pw_log_debug("remote %p: clear mem %d", data, m->id);
+ pw_log_debug("remote %p: clear mem %p %d %d", data, m, m->id, m->fd);
fd = m->fd;
m->fd = -1;
@@ -263,11 +219,9 @@ static void clean_transport(struct node_data *data)
struct mem *m;
struct link *l;
- if (data->rtsocket_source == NULL)
+ if (!data->have_transport)
return;
- unhandle_socket(data);
-
pw_array_for_each(m, &data->mems)
clear_mem(data, m);
pw_array_clear(&data->mems);
@@ -278,6 +232,7 @@ static void clean_transport(struct node_data *data)
close(data->rtwritefd);
data->remote_id = SPA_ID_INVALID;
+ data->have_transport = false;
}
static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id)
@@ -407,16 +362,16 @@ static int client_node_transport(void *object, uint32_t node_id,
clean_transport(data);
+ data->have_transport = true;
data->remote_id = node_id;
pw_log_debug("remote-node %p: create transport with fds %d %d for node %u",
proxy, readfd, writefd, node_id);
data->rtwritefd = writefd;
- data->rtsocket_source = pw_loop_add_io(remote->core->data_loop,
- readfd,
- SPA_IO_ERR | SPA_IO_HUP,
- true, on_rtsocket_condition, proxy);
+ close(data->node->source.fd);
+ data->node->source.fd = readfd;
+
if (data->node->active)
pw_client_node_proxy_set_active(data->node_proxy, true);
@@ -601,32 +556,16 @@ static int client_node_event(void *object, const struct spa_event *event)
return -ENOTSUP;
}
-static int
-do_pause_source(struct spa_loop *loop,
- bool async, uint32_t seq, const void *data, size_t size, void *user_data)
-{
- struct node_data *d = user_data;
- pw_loop_update_io(d->core->data_loop,
- d->rtsocket_source,
- SPA_IO_ERR | SPA_IO_HUP);
- return 0;
-}
-
static int client_node_command(void *object, const struct spa_command *command)
{
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
- struct pw_remote *remote = proxy->remote;
int res;
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Pause:
pw_log_debug("node %p: pause", proxy);
- if (data->rtsocket_source) {
- pw_loop_invoke(data->core->data_loop,
- do_pause_source, 1, NULL, 0, true, data);
- }
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0) {
pw_log_warn("node %p: pause failed", proxy);
pw_proxy_error(proxy, res, "pause failed");
@@ -640,11 +579,6 @@ static int client_node_command(void *object, const struct spa_command *command)
pw_log_warn("node %p: start failed", proxy);
pw_proxy_error(proxy, res, "start failed");
}
- else if (data->rtsocket_source) {
- pw_loop_update_io(remote->core->data_loop,
- data->rtsocket_source,
- SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
- }
break;
default:
pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command));
@@ -1021,6 +955,7 @@ client_node_set_activation(void *object,
if (data->remote_id == node_id) {
pw_log_debug("node %p: our activation %u: %u %u %u %p", node, node_id,
memid, offset, size, ptr);
+ close(signalfd);
return 0;
}
@@ -1125,14 +1060,12 @@ static void node_destroy(void *data)
static void node_free(void *data)
{
struct node_data *d = data;
- struct pw_remote *remote = d->remote;
struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy;
pw_log_debug("%p: free", d);
- if (remote->core_proxy)
- pw_core_proxy_destroy(remote->core_proxy, proxy);
- spa_hook_remove(&d->proxy_listener);
+ pw_proxy_destroy(d->proxy);
+ pw_proxy_destroy(proxy);
}
static void node_info_changed(void *data, const struct pw_node_info *info)