summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2019-05-14 18:10:18 +0200
committerWim Taymans <wtaymans@redhat.com>2019-05-14 18:10:18 +0200
commitf6ace8a000a4ecbdafc59dc2af6ef27629266dad (patch)
tree747f06237135de606b5b9f55b2562f3f03d8f972
parent903cbeb7c1c758e11c5e8cf6cec8eb9f3a1f2328 (diff)
a2dp: add events and implementation
Use hooks for the implementation of the transport Add events to the transport Handle the destroy event of the transport to release the reference in source and sink.
-rw-r--r--spa/plugins/bluez5/a2dp-sink.c20
-rw-r--r--spa/plugins/bluez5/a2dp-source.c59
-rw-r--r--spa/plugins/bluez5/bluez5-monitor.c54
-rw-r--r--spa/plugins/bluez5/defs.h44
4 files changed, 142 insertions, 35 deletions
diff --git a/spa/plugins/bluez5/a2dp-sink.c b/spa/plugins/bluez5/a2dp-sink.c
index 3235f7e1..3fa4ae56 100644
--- a/spa/plugins/bluez5/a2dp-sink.c
+++ b/spa/plugins/bluez5/a2dp-sink.c
@@ -103,6 +103,7 @@ struct impl {
struct props props;
struct spa_bt_transport *transport;
+ struct spa_hook transport_listener;
struct port port;
@@ -796,7 +797,7 @@ static int do_start(struct impl *this)
spa_log_debug(this->log, "a2dp-sink %p: start slaved:%d", this, this->slaved);
- if ((res = this->transport->acquire(this->transport, false)) < 0)
+ if ((res = spa_bt_transport_acquire(this->transport, false)) < 0)
return res;
init_sbc(this);
@@ -879,7 +880,7 @@ static int do_stop(struct impl *this)
this->started = false;
- res = this->transport->release(this->transport);
+ res = spa_bt_transport_release(this->transport);
return res;
}
@@ -1372,6 +1373,18 @@ static const struct spa_node impl_node = {
.process = impl_node_process,
};
+static void transport_destroy(void *data)
+{
+ struct impl *this = data;
+ spa_log_debug(this->log, "transport %p destroy", this->transport);
+ this->transport = NULL;
+}
+
+static const struct spa_bt_transport_events transport_events = {
+ SPA_VERSION_BT_TRANSPORT_EVENTS,
+ .destroy = transport_destroy,
+};
+
static int impl_get_interface(struct spa_handle *handle, uint32_t type, void **interface)
{
struct impl *this;
@@ -1474,6 +1487,9 @@ impl_init(const struct spa_handle_factory *factory,
spa_log_error(this->log, "a transport is needed");
return -EINVAL;
}
+ spa_bt_transport_add_listener(this->transport,
+ &this->transport_listener, &transport_events, this);
+
this->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
return 0;
diff --git a/spa/plugins/bluez5/a2dp-source.c b/spa/plugins/bluez5/a2dp-source.c
index 7303ccbe..906a59d4 100644
--- a/spa/plugins/bluez5/a2dp-source.c
+++ b/spa/plugins/bluez5/a2dp-source.c
@@ -78,6 +78,7 @@ struct impl {
struct props props;
struct spa_bt_transport *transport;
+ struct spa_hook transport_listener;
bool have_format;
struct spa_audio_info current_format;
@@ -257,14 +258,15 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size)
uint8_t *dest;
size_t decoded, dest_size, written;
- /* skip the header */
- src += header_size;
- src_size -= header_size;
- if (src_size <= 0) {
+ if (src_size <= header_size) {
spa_log_error(this->log, "not valid header found. dropping data...");
return;
}
+ /* skip the header */
+ src += header_size;
+ src_size -= header_size;
+
/* check if we have a new buffer */
if (spa_list_is_empty(&this->free)) {
spa_log_warn(this->log, "no more buffers available, dropping data...");
@@ -277,7 +279,7 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size)
/* remove the the buffer from the list */
spa_list_remove(&buffer->link);
- /* ppdate the outstanding flag */
+ /* update the outstanding flag */
buffer->outstanding = false;
/* set the header */
@@ -293,14 +295,16 @@ static void decode_sbc_data(struct impl *this, uint8_t *src, size_t src_size)
dest_size = data[0].maxsize;
/* decode the source data */
- spa_log_debug(this->log, "decoding data for buffer_id=%d", buffer->id);
+ spa_log_debug(this->log, "decoding data for buffer_id=%d %zd %zd",
+ buffer->id, src_size, dest_size);
+
while (src_size > 0 && dest_size > 0) {
decoded = sbc_decode(&this->sbc,
src, src_size,
dest, dest_size, &written);
if (decoded <= 0) {
- printf ("Decoding error. Exiting...\n");
- exit(-1);
+ spa_log_error(this->log, "Decoding error. (%zd)", decoded);
+ return;
}
/* update source and dest pointers */
@@ -344,10 +348,14 @@ static void a2dp_on_ready_read(struct spa_source *source)
again:
/* read data from socket */
- spa_log_debug(this->log, "reading socket data");
size_read = read(this->transport->fd, this->buffer_read, buffer_size);
- if (size_read < 0) {
- /* retry if interrumpted */
+ spa_log_debug(this->log, "read socket data %zd/%zd", size_read, buffer_size);
+
+ if (size_read == 0) {
+ goto stop;
+ }
+ else if (size_read < 0) {
+ /* retry if interrupted */
if (errno == EINTR)
goto again;
@@ -381,9 +389,12 @@ static int do_start(struct impl *this)
if (this->started)
return 0;
+ if (this->transport == NULL)
+ return -EIO;
+
spa_log_debug(this->log, "a2dp-source %p: start", this);
- if ((res = this->transport->acquire(this->transport, false)) < 0)
+ if ((res = spa_bt_transport_acquire(this->transport, false)) < 0)
return res;
sbc_init_a2dp(&this->sbc, 0, this->transport->configuration,
@@ -444,7 +455,10 @@ static int do_stop(struct impl *this)
this->started = false;
- res = this->transport->release(this->transport);
+ if (this->transport)
+ res = spa_bt_transport_release(this->transport);
+ else
+ res = 0;
sbc_finish(&this->sbc);
@@ -589,6 +603,9 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
if (result.index > 0)
return 0;
+ if (this->transport == NULL)
+ return -EIO;
+
switch (this->transport->codec) {
case A2DP_CODEC_SBC:
{
@@ -926,6 +943,18 @@ static const struct spa_node impl_node = {
.process = impl_node_process,
};
+static void transport_destroy(void *data)
+{
+ struct impl *this = data;
+ spa_log_debug(this->log, "transport %p destroy", this->transport);
+ this->transport = NULL;
+}
+
+static const struct spa_bt_transport_events transport_events = {
+ SPA_VERSION_BT_TRANSPORT_EVENTS,
+ .destroy = transport_destroy,
+};
+
static int impl_get_interface(struct spa_handle *handle, uint32_t type, void **interface)
{
struct impl *this;
@@ -1038,6 +1067,8 @@ impl_init(const struct spa_handle_factory *factory,
spa_log_error(this->log, "codec != SBC not yet supported");
return -EINVAL;
}
+ spa_bt_transport_add_listener(this->transport,
+ &this->transport_listener, &transport_events, this);
return 0;
}
@@ -1079,4 +1110,4 @@ struct spa_handle_factory spa_a2dp_source_factory = {
impl_get_size,
impl_init,
impl_enum_interface_info,
-}; \ No newline at end of file
+};
diff --git a/spa/plugins/bluez5/bluez5-monitor.c b/spa/plugins/bluez5/bluez5-monitor.c
index a9986ee9..84fddf5a 100644
--- a/spa/plugins/bluez5/bluez5-monitor.c
+++ b/spa/plugins/bluez5/bluez5-monitor.c
@@ -816,6 +816,7 @@ static struct spa_bt_transport *transport_create(struct spa_bt_monitor *monitor,
t->path = path;
t->fd = -1;
t->user_data = SPA_MEMBER(t, sizeof(struct spa_bt_transport), void);
+ spa_hook_list_init(&t->listener_list);
spa_list_append(&monitor->transport_list, &t->link);
@@ -824,8 +825,11 @@ static struct spa_bt_transport *transport_create(struct spa_bt_monitor *monitor,
static void transport_free(struct spa_bt_transport *transport)
{
- if (transport->destroy)
- transport->destroy(transport);
+
+ spa_bt_transport_emit_destroy(transport);
+
+ spa_bt_transport_destroy(transport);
+
spa_list_remove(&transport->link);
if (transport->device) {
transport->device->connected_profiles &= ~transport->profile;
@@ -923,8 +927,9 @@ static int transport_update_props(struct spa_bt_transport *transport,
return 0;
}
-static int transport_acquire(struct spa_bt_transport *transport, bool optional)
+static int transport_acquire(void *data, bool optional)
{
+ struct spa_bt_transport *transport = data;
struct spa_bt_monitor *monitor = transport->monitor;
DBusMessage *m, *r;
DBusError err;
@@ -977,16 +982,17 @@ static int transport_acquire(struct spa_bt_transport *transport, bool optional)
ret = -EIO;
goto finish;
}
- spa_log_debug(monitor->log, "transport %p: %s, fd %d MTU %d:%d", transport, method,
- transport->fd, transport->read_mtu, transport->write_mtu);
+ spa_log_debug(monitor->log, "transport %p: %s %s, fd %d MTU %d:%d", transport, method,
+ transport->path, transport->fd, transport->read_mtu, transport->write_mtu);
finish:
dbus_message_unref(r);
return ret;
}
-static int transport_release(struct spa_bt_transport *transport)
+static int transport_release(void *data)
{
+ struct spa_bt_transport *transport = data;
struct spa_bt_monitor *monitor = transport->monitor;
DBusMessage *m, *r;
DBusError err;
@@ -994,7 +1000,8 @@ static int transport_release(struct spa_bt_transport *transport)
if (transport->fd < 0)
return 0;
- spa_log_debug(monitor->log, "transport %p: release", transport);
+ spa_log_debug(monitor->log, "transport %p: Release %s",
+ transport, transport->path);
close(transport->fd);
transport->fd = -1;
@@ -1026,6 +1033,12 @@ static int transport_release(struct spa_bt_transport *transport)
return 0;
}
+static const struct spa_bt_transport_implementation transport_impl = {
+ SPA_VERSION_BT_TRANSPORT_IMPLEMENTATION,
+ .acquire = transport_acquire,
+ .release = transport_release,
+};
+
static DBusHandlerResult endpoint_set_configuration(DBusConnection *conn,
const char *path, DBusMessage *m, void *userdata)
{
@@ -1054,8 +1067,7 @@ static DBusHandlerResult endpoint_set_configuration(DBusConnection *conn,
if (transport == NULL)
return DBUS_HANDLER_RESULT_NEED_MEMORY;
- transport->acquire = transport_acquire;
- transport->release = transport_release;
+ spa_bt_transport_set_implementation(transport, &transport_impl, transport);
}
transport_update_props(transport, &it[1], NULL);
@@ -1100,6 +1112,10 @@ static DBusHandlerResult endpoint_clear_configuration(DBusConnection *conn, DBus
if (transport != NULL) {
struct spa_bt_device *device = transport->device;
+
+ spa_log_debug(monitor->log, "transport %p: free %s",
+ transport, transport->path);
+
transport_free(transport);
if (device != NULL)
check_profiles(device);
@@ -1495,8 +1511,9 @@ fail_close:
}
-static int sco_acquire_cb(struct spa_bt_transport *t, bool optional)
+static int sco_acquire_cb(void *data, bool optional)
{
+ struct spa_bt_transport *t = data;
struct spa_bt_monitor *monitor = t->monitor;
int sock;
socklen_t len;
@@ -1531,8 +1548,9 @@ fail:
return -1;
}
-static int sco_release_cb(struct spa_bt_transport *t)
+static int sco_release_cb(void *data)
{
+ struct spa_bt_transport *t = data;
struct spa_bt_monitor *monitor = t->monitor;
spa_log_info(monitor->log, "Transport %s released", t->path);
/* device will close the SCO socket for us */
@@ -1614,8 +1632,9 @@ fail_close:
return -1;
}
-static int sco_destroy_cb(struct spa_bt_transport *trans)
+static int sco_destroy_cb(void *data)
{
+ struct spa_bt_transport *trans = data;
struct transport_data *td = trans->user_data;
if (td->sco.data) {
@@ -1633,6 +1652,13 @@ static int sco_destroy_cb(struct spa_bt_transport *trans)
return 0;
}
+static const struct spa_bt_transport_implementation sco_transport_impl = {
+ SPA_VERSION_BT_TRANSPORT_IMPLEMENTATION,
+ .acquire = sco_acquire_cb,
+ .release = sco_release_cb,
+ .destroy = sco_destroy_cb,
+};
+
static DBusHandlerResult profile_new_connection(DBusConnection *conn, DBusMessage *m, void *userdata)
{
struct spa_bt_monitor *monitor = userdata;
@@ -1681,10 +1707,8 @@ static DBusHandlerResult profile_new_connection(DBusConnection *conn, DBusMessag
asprintf(&pathfd, "%s/fd%d", path, fd);
t = transport_create(monitor, pathfd, sizeof(struct transport_data));
+ spa_bt_transport_set_implementation(t, &sco_transport_impl, t);
- t->acquire = sco_acquire_cb;
- t->release = sco_release_cb;
- t->destroy = sco_destroy_cb;
t->device = d;
spa_list_append(&t->device->transport_list, &t->device_link);
t->profile = profile;
diff --git a/spa/plugins/bluez5/defs.h b/spa/plugins/bluez5/defs.h
index 6d439d83..66b1eb8d 100644
--- a/spa/plugins/bluez5/defs.h
+++ b/spa/plugins/bluez5/defs.h
@@ -29,6 +29,8 @@
extern "C" {
#endif
+#include <spa/utils/hook.h>
+
#define BLUEZ_SERVICE "org.bluez"
#define BLUEZ_PROFILE_MANAGER_INTERFACE BLUEZ_SERVICE ".ProfileManager1"
#define BLUEZ_PROFILE_INTERFACE BLUEZ_SERVICE ".Profile1"
@@ -180,6 +182,22 @@ enum spa_bt_transport_state {
SPA_BT_TRANSPORT_STATE_ACTIVE,
};
+struct spa_bt_transport_events {
+#define SPA_VERSION_BT_TRANSPORT_EVENTS 0
+ uint32_t version;
+
+ void (*destroy) (void *data);
+};
+
+struct spa_bt_transport_implementation {
+#define SPA_VERSION_BT_TRANSPORT_IMPLEMENTATION 0
+ uint32_t version;
+
+ int (*acquire) (void *data, bool optional);
+ int (*release) (void *data);
+ int (*destroy) (void *data);
+};
+
struct spa_bt_transport {
struct spa_list link;
struct spa_bt_monitor *monitor;
@@ -198,12 +216,30 @@ struct spa_bt_transport {
uint16_t write_mtu;
void *user_data;
- int (*acquire) (struct spa_bt_transport *trans, bool optional);
+ struct spa_hook_list listener_list;
+ struct spa_hook impl;
+};
- int (*release) (struct spa_bt_transport *trans);
+#define spa_bt_transport_emit(t,m,v,...) spa_hook_list_call(&(t)->listener_list, \
+ struct spa_bt_transport_events, m, v, ##__VA_ARGS__)
+#define spa_bt_transport_emit_destroy(t) spa_bt_transport_emit(t, destroy, 0)
- int (*destroy) (struct spa_bt_transport *trans);
-};
+#define spa_bt_transport_add_listener(t,listener,events,data) \
+ spa_hook_list_append(&(t)->listener_list, listener, events, data)
+
+#define spa_bt_transport_set_implementation(t,_impl,_data) \
+ (t)->impl = SPA_HOOK_INIT(_impl, _data)
+
+#define spa_bt_transport_impl(t,m,v,...) \
+({ \
+ int res = 0; \
+ spa_hook_call_res(&(t)->impl, struct spa_bt_transport_implementation, res, m, v, ##__VA_ARGS__); \
+ res; \
+})
+
+#define spa_bt_transport_acquire(t,o) spa_bt_transport_impl(t, acquire, 0, o)
+#define spa_bt_transport_release(t) spa_bt_transport_impl(t, release, 0)
+#define spa_bt_transport_destroy(t) spa_bt_transport_impl(t, destroy, 0)
static inline enum spa_bt_transport_state spa_bt_transport_state_from_string(const char *value)
{