diff options
author | Wim Taymans <wtaymans@redhat.com> | 2020-10-02 16:19:53 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2020-10-05 17:14:02 +0200 |
commit | ec0483d520225f00f22b432f92f9c540ff4e247b (patch) | |
tree | 581abf02c2da2a68d821e5e518505e7361766ff8 | |
parent | 815361ac1e0ceaca2ffc7951cebbda4aa08c8835 (diff) |
pulse-bridge: implement enough to play flatpak Spotify
-rw-r--r-- | src/examples/media-session/pulse-bridge.c | 1282 |
1 files changed, 1209 insertions, 73 deletions
diff --git a/src/examples/media-session/pulse-bridge.c b/src/examples/media-session/pulse-bridge.c index 1b897978..84ed8a42 100644 --- a/src/examples/media-session/pulse-bridge.c +++ b/src/examples/media-session/pulse-bridge.c @@ -40,6 +40,11 @@ #endif #include <spa/utils/result.h> +#include <spa/debug/dict.h> +#include <spa/debug/mem.h> +#include <spa/param/audio/raw.h> +#include <spa/pod/pod.h> +#include <spa/param/audio/format-utils.h> #include "pipewire/pipewire.h" @@ -58,11 +63,19 @@ #define FRAME_SIZE_MAX_ALLOW (1024*1024*16) +#define PROTOCOL_FLAG_MASK 0xffff0000u +#define PROTOCOL_VERSION_MASK 0x0000ffffu +#define PROTOCOL_VERSION 34 + +#define NATIVE_COOKIE_LENGTH 256 +#define MAX_TAG_SIZE (64*1024) + struct impl { struct sm_media_session *session; struct spa_hook listener; struct pw_loop *loop; + struct pw_context *context; struct spa_source *source; struct spa_list clients; @@ -110,6 +123,12 @@ struct client { struct spa_source *source; + uint32_t version; + + struct pw_properties *props; + + struct pw_core *core; + uint32_t index; struct descriptor desc; @@ -117,6 +136,122 @@ struct client { #define TYPE_MEMBLOCK 1 uint32_t type; struct data data; + + struct pw_map streams; +}; + +enum sample_format { + SAMPLE_U8, + SAMPLE_ALAW, + SAMPLE_ULAW, + SAMPLE_S16LE, + SAMPLE_S16BE, + SAMPLE_FLOAT32LE, + SAMPLE_FLOAT32BE, + SAMPLE_S32LE, + SAMPLE_S32BE, + SAMPLE_S24LE, + SAMPLE_S24BE, + SAMPLE_S24_32LE, + SAMPLE_S24_32BE, + SAMPLE_MAX, + SAMPLE_INVALID = -1 +}; + +static const uint32_t audio_formats[] = { + [SAMPLE_U8] = SPA_AUDIO_FORMAT_U8, + [SAMPLE_ALAW] = SPA_AUDIO_FORMAT_UNKNOWN, + [SAMPLE_ULAW] = SPA_AUDIO_FORMAT_UNKNOWN, + [SAMPLE_S16LE] = SPA_AUDIO_FORMAT_S16_LE, + [SAMPLE_S16BE] = SPA_AUDIO_FORMAT_S16_BE, + [SAMPLE_FLOAT32LE] = SPA_AUDIO_FORMAT_F32_LE, + [SAMPLE_FLOAT32BE] = SPA_AUDIO_FORMAT_F32_BE, + [SAMPLE_S32LE] = SPA_AUDIO_FORMAT_S32_LE, + [SAMPLE_S32BE] = SPA_AUDIO_FORMAT_S32_BE, + [SAMPLE_S24LE] = SPA_AUDIO_FORMAT_S24_LE, + [SAMPLE_S24BE] = SPA_AUDIO_FORMAT_S24_BE, + [SAMPLE_S24_32LE] = SPA_AUDIO_FORMAT_S24_32_LE, + [SAMPLE_S24_32BE] = SPA_AUDIO_FORMAT_S24_32_BE, +}; + +static inline uint32_t format_pa2id(enum sample_format format) +{ + if (format < 0 || (size_t)format >= SPA_N_ELEMENTS(audio_formats)) + return SPA_AUDIO_FORMAT_UNKNOWN; + return audio_formats[format]; +} + +struct sample_spec { + enum sample_format format; + uint32_t rate; + uint8_t channels; +}; + +#define CHANNELS_MAX 64 + +struct channel_map { + uint8_t channels; + uint32_t map[CHANNELS_MAX]; +}; + +struct cvolume { + uint8_t channels; + float values[CHANNELS_MAX]; +}; + +struct buffer_attr { + uint32_t maxlength; + uint32_t tlength; + uint32_t prebuf; + uint32_t minreq; + uint32_t fragsize; +}; + +enum encoding { + ENCODING_ANY, + ENCODING_PCM, + ENCODING_AC3_IEC61937, + ENCODING_EAC3_IEC61937, + ENCODING_MPEG_IEC61937, + ENCODING_DTS_IEC61937, + ENCODING_MPEG2_AAC_IEC61937, + ENCODING_TRUEHD_IEC61937, + ENCODING_DTSHD_IEC61937, + ENCODING_MAX, + NCODING_INVALID = -1, +}; + +struct format_info { + enum encoding encoding; + struct pw_properties *props; +}; + +struct block { + struct spa_list link; + uint8_t *data; + uint32_t length; + uint32_t offset; +}; + +struct stream { + uint32_t create_tag; + uint32_t channel; /* index in map */ + + struct impl *impl; + struct client *client; + + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct spa_list blocks; + int64_t read_index; + int64_t write_index; + + struct sample_spec ss; + struct channel_map map; + struct buffer_attr attr; + + uint32_t drain_tag; }; enum { @@ -281,76 +416,493 @@ struct command { int (*run) (struct client *client, uint32_t command, uint32_t tag, struct data *d); }; -static int data_readtype(struct data *d, uint8_t type) +static int data_get(struct data *d, ...); + +static int read_u8(struct data *d, uint8_t *val) { if (d->offset + 1 > d->length) return -ENOSPC; - if (d->data[d->offset] != type) - return -EINVAL; + *val = d->data[d->offset]; d->offset++; return 0; } -static int data_writetype(struct data *d, uint8_t type) +static int read_u32(struct data *d, uint32_t *val) { - if (d->offset + 1 > d->length) + if (d->offset + 4 > d->length) return -ENOSPC; - d->data[d->offset] = type; - d->offset++; + memcpy(val, &d->data[d->offset], 4); + *val = ntohl(*val); + d->offset += 4; + return 0; +} +static int read_u64(struct data *d, uint64_t *val) +{ + uint32_t tmp; + int res; + if ((res = read_u32(d, &tmp)) < 0) + return res; + *val = ((uint64_t)tmp) << 32; + if ((res = read_u32(d, &tmp)) < 0) + return res; + *val |= tmp; return 0; } -static int data_writeu8(struct data *d, uint8_t val) +static int read_sample_spec(struct data *d, struct sample_spec *ss) { - if (d->offset + 1 > d->length) - return -ENOSPC; - d->data[d->offset] = val; - d->offset++; + int res; + uint8_t tmp; + if ((res = read_u8(d, &tmp)) < 0) + return res; + ss->format = tmp; + if ((res = read_u8(d, &ss->channels)) < 0) + return res; + return read_u32(d, &ss->rate); +} + +static int read_props(struct data *d, struct pw_properties *props) +{ + int res; + + while (true) { + char *key; + void *data; + uint32_t length; + + if ((res = data_get(d, + TAG_STRING, &key, + TAG_INVALID)) < 0) + return res; + + if (key == NULL) + break; + + if ((res = data_get(d, + TAG_U32, &length, + TAG_INVALID)) < 0) + return res; + if (length > MAX_TAG_SIZE) + return -EINVAL; + + if ((res = data_get(d, + TAG_ARBITRARY, &data, length, + TAG_INVALID)) < 0) + return res; + + pw_log_debug("%s %s", key, (char*)data); + pw_properties_set(props, key, data); + } return 0; } -static int data_readu8(struct data *d, uint8_t *val) +static int read_arbitrary(struct data *d, const void **val, size_t length) { - if (d->offset + 1 > d->length) + uint32_t len; + int res; + if ((res = read_u32(d, &len)) < 0) + return res; + if (len != length) + return -EINVAL; + if (d->offset + length > d->length) return -ENOSPC; - *val = d->data[d->offset]; - d->offset++; + *val = d->data + d->offset; + d->offset += length; return 0; } -static int data_readu32(struct data *d, uint32_t *val) +static int read_string(struct data *d, char **str) { - if (d->offset + 4 > d->length) - return -ENOSPC; - memcpy(val, &d->data[d->offset], 4); - *val = ntohl(*val); - d->offset += 4; + uint32_t n, maxlen = d->length - d->offset; + n = strnlen(d->data + d->offset, maxlen); + if (n == maxlen) + return -EINVAL; + *str = d->data + d->offset; + d->offset += n + 1; return 0; } -static int data_writeu32(struct data *d, uint32_t val) +static int read_timeval(struct data *d, struct timeval *tv) +{ + int res; + uint32_t tmp; + + if ((res = read_u32(d, &tmp)) < 0) + return res; + tv->tv_sec = tmp; + if ((res = read_u32(d, &tmp)) < 0) + return res; + tv->tv_usec = tmp; + return 0; +} + +static int read_channel_map(struct data *d, struct channel_map *map) +{ + int res; + uint8_t i, tmp; + + if ((res = read_u8(d, &map->channels)) < 0) + return res; + if (map->channels > CHANNELS_MAX) + return -EINVAL; + for (i = 0; i < map->channels; i ++) { + if ((res = read_u8(d, &tmp)) < 0) + return res; + map->map[i] = tmp; + } + return 0; +} +static int read_volume(struct data *d, float *vol) +{ + int res; + uint32_t v; + if ((res = read_u32(d, &v)) < 0) + return res; + *vol = ((float)v) / 0x10000U; + return 0; +} + +static int read_cvolume(struct data *d, struct cvolume *vol) +{ + int res; + uint8_t i; + + if ((res = read_u8(d, &vol->channels)) < 0) + return res; + if (vol->channels > CHANNELS_MAX) + return -EINVAL; + for (i = 0; i < vol->channels; i ++) { + if ((res = read_volume(d, &vol->values[i])) < 0) + return res; + } + return 0; +} + +static int read_format_info(struct data *d, struct format_info *info) +{ + int res; + uint8_t tag, encoding; + + if ((res = read_u8(d, &tag)) < 0) + return res; + if (tag != TAG_U8) + return -EPROTO; + if ((res = read_u8(d, &encoding)) < 0) + return res; + info->encoding = encoding; + + if ((res = read_u8(d, &tag)) < 0) + return res; + if (tag != TAG_PROPLIST) + return -EPROTO; + + info->props = pw_properties_new(NULL, NULL); + if (info->props == NULL) + return -errno; + return read_props(d, info->props); +} + +static int data_get(struct data *d, ...) +{ + va_list va; + int res; + + va_start(va, d); + + while (true) { + int tag = va_arg(va, int); + uint8_t dtag; + if (tag == TAG_INVALID) + break; + + if ((res = read_u8(d, &dtag)) < 0) + return res; + + switch (dtag) { + case TAG_STRING: + if (tag != TAG_STRING) + return -EINVAL; + if ((res = read_string(d, va_arg(va, char**))) < 0) + return res; + break; + case TAG_STRING_NULL: + if (tag != TAG_STRING) + return -EINVAL; + *va_arg(va, char**) = NULL; + break; + case TAG_U8: + if (dtag != tag) + return -EINVAL; + if ((res = read_u8(d, va_arg(va, uint8_t*))) < 0) + return res; + break; + case TAG_U32: + if (dtag != tag) + return -EINVAL; + if ((res = read_u32(d, va_arg(va, uint32_t*))) < 0) + return res; + break; + case TAG_S64: + case TAG_U64: + case TAG_USEC: + if (dtag != tag) + return -EINVAL; + if ((res = read_u64(d, va_arg(va, uint64_t*))) < 0) + return res; + break; + case TAG_SAMPLE_SPEC: + if (dtag != tag) + return -EINVAL; + if ((res = read_sample_spec(d, va_arg(va, struct sample_spec*))) < 0) + return res; + break; + case TAG_ARBITRARY: + { + const void **val = va_arg(va, const void**); + size_t len = va_arg(va, size_t); + if (dtag != tag) + return -EINVAL; + if ((res = read_arbitrary(d, val, len)) < 0) + return res; + break; + } + case TAG_BOOLEAN_TRUE: + if (tag != TAG_BOOLEAN) + return -EINVAL; + *va_arg(va, bool*) = true; + break; + case TAG_BOOLEAN_FALSE: + if (tag != TAG_BOOLEAN) + return -EINVAL; + *va_arg(va, bool*) = false; + break; + case TAG_TIMEVAL: + if (dtag != tag) + return -EINVAL; + if ((res = read_timeval(d, va_arg(va, struct timeval*))) < 0) + return res; + break; + case TAG_CHANNEL_MAP: + if (dtag != tag) + return -EINVAL; + if ((res = read_channel_map(d, va_arg(va, struct channel_map*))) < 0) + return res; + break; + case TAG_CVOLUME: + if (dtag != tag) + return -EINVAL; + if ((res = read_cvolume(d, va_arg(va, struct cvolume*))) < 0) + return res; + break; + case TAG_PROPLIST: + if (dtag != tag) + return -EINVAL; + if ((res = read_props(d, va_arg(va, struct pw_properties*))) < 0) + return res; + break; + case TAG_VOLUME: + if (dtag != tag) + return -EINVAL; + if ((res = read_volume(d, va_arg(va, float*))) < 0) + return res; + break; + case TAG_FORMAT_INFO: + if (dtag != tag) + return -EINVAL; + if ((res = read_format_info(d, va_arg(va, struct format_info*))) < 0) + return res; + break; + } + } + va_end(va); + + return 0; +} + +static void write_8(struct data *d, uint8_t val) +{ + if (d->offset < d->length) + d->data[d->offset] = val; + d->offset++; +} + +static void write_32(struct data *d, uint32_t val) { - if (d->offset + 4 > d->length) - return -ENOSPC; val = htonl(val); - memcpy(d->data + d->offset, &val, 4); + if (d->offset + 4 <= d->length) + memcpy(d->data + d->offset, &val, 4); d->offset += 4; - return 0; } -static int data_getu32(struct data *d, uint32_t *val) +static void write_string(struct data *d, const char *s) +{ + write_8(d, s ? TAG_STRING : TAG_STRING_NULL); + if (s != NULL) { + int len = strlen(s) + 1; + if (d->offset + len <= d->length) + strcpy(&d->data[d->offset], s); + d->offset += len; + } +} +static void write_u8(struct data *d, uint8_t val) +{ + write_8(d, TAG_U8); + write_8(d, val); +} + +static void write_u32(struct data *d, uint32_t val) +{ + write_8(d, TAG_U32); + write_32(d, val); +} + +static void write_64(struct data *d, uint8_t tag, uint64_t val) +{ + write_8(d, tag); + write_32(d, val >> 32); + write_32(d, val); +} + +static void write_sample_spec(struct data *d, struct sample_spec *ss) +{ + write_8(d, TAG_SAMPLE_SPEC); + write_8(d, ss->format); + write_8(d, ss->channels); + write_32(d, ss->rate); +} + +static void write_arbitrary(struct data *d, const void *p, size_t length) +{ + write_8(d, TAG_ARBITRARY); + write_32(d, length); + if (length > 0 && d->offset + length <= d->length) + memcpy(d->data + d->offset, p, length); + d->offset += length; +} + +static void write_boolean(struct data *d, bool val) +{ + write_8(d, val ? TAG_BOOLEAN_TRUE : TAG_BOOLEAN_FALSE); +} + +static void write_timeval(struct data *d, struct timeval *tv) { - if (data_readtype(d, TAG_U32) < 0) - return -1; - return data_readu32(d, val); + write_8(d, TAG_TIMEVAL); + write_32(d, tv->tv_sec); + write_32(d, tv->tv_usec); } -static int data_putu32(struct data *d, uint32_t val) + +static void write_channel_map(struct data *d, struct channel_map *map) { - if (data_writetype(d, TAG_U32) < 0) - return -1; - return data_writeu32(d, val); + uint8_t i; + write_8(d, TAG_CHANNEL_MAP); + write_8(d, map->channels); + for (i = 0; i < map->channels; i ++) + write_8(d, map->map[i]); } +static void write_volume(struct data *d, float vol) +{ + write_8(d, TAG_VOLUME); + write_32(d, vol * 0x10000U); +} + +static void write_cvolume(struct data *d, struct cvolume *cvol) +{ + uint8_t i; + write_8(d, TAG_CVOLUME); + write_8(d, cvol->channels); + for (i = 0; i < cvol->channels; i ++) + write_32(d, cvol->values[i] * 0x10000U); +} + +static void write_props(struct data *d, struct pw_properties *props) +{ + const struct spa_dict_item *it; + write_8(d, TAG_PROPLIST); + if (props != NULL) { + spa_dict_for_each(it, &props->dict) { + int l = strlen(it->value); + write_string(d, it->key); + write_u32(d, l); + write_arbitrary(d, it->value, l); + } + } + write_string(d, NULL); +} + +static void write_format_info(struct data *d, struct format_info *info) +{ + write_8(d, TAG_FORMAT_INFO); + write_u8(d, (uint8_t) info->encoding); + write_props(d, info->props); +} + +static int data_put(struct data *d, ...) +{ + va_list va; + + va_start(va, d); + + while (true) { + int tag = va_arg(va, int); + if (tag == TAG_INVALID) + break; + + switch (tag) { + case TAG_STRING: + write_string(d, va_arg(va, const char *)); + break; + case TAG_U8: + write_u8(d, (uint8_t)va_arg(va, int)); + break; + case TAG_U32: + write_u32(d, (uint32_t)va_arg(va, uint32_t)); + break; + case TAG_S64: + case TAG_U64: + case TAG_USEC: + write_64(d, tag, va_arg(va, uint64_t)); + break; + case TAG_SAMPLE_SPEC: + write_sample_spec(d, va_arg(va, struct sample_spec*)); + break; + case TAG_ARBITRARY: + { + const void *p = va_arg(va, const void*); + size_t length = va_arg(va, size_t); + write_arbitrary(d, p, length); + break; + } + case TAG_BOOLEAN: + write_boolean(d, va_arg(va, int)); + break; + case TAG_TIMEVAL: + write_timeval(d, va_arg(va, struct timeval*)); + break; + case TAG_CHANNEL_MAP: + write_channel_map(d, va_arg(va, struct channel_map*)); + break; + case TAG_CVOLUME: + write_cvolume(d, va_arg(va, struct cvolume*)); + break; + case TAG_PROPLIST: + write_props(d, va_arg(va, struct pw_properties*)); + break; + case TAG_VOLUME: + write_volume(d, va_arg(va, double)); + break; + case TAG_FORMAT_INFO: + write_format_info(d, va_arg(va, struct format_info*)); + break; + } + } + va_end(va); + + return 0; +} + + static int send_data(struct client *client, struct data *d) { struct descriptor desc; @@ -370,16 +922,36 @@ static int do_command_auth(struct client *client, uint32_t command, uint32_t tag struct impl *impl = client->impl; uint8_t buffer[1024]; struct data reply; + uint32_t version; + const void *cookie; + int res; + + if ((res = data_get(d, + TAG_U32, &version, + TAG_ARBITRARY, &cookie, NATIVE_COOKIE_LENGTH, + TAG_INVALID)) < 0) { + return res; + } + + if (version < 8) + return -EPROTO; + + if ((version & PROTOCOL_VERSION_MASK) >= 13) + version &= PROTOCOL_VERSION_MASK; + + client->version = version; spa_zero(reply); reply.data = buffer; reply.length = sizeof(buffer); - pw_log_info(NAME" %p: AUTH", impl); + pw_log_info(NAME" %p: AUTH version:%d", impl, version); - data_putu32(&reply, COMMAND_REPLY); - data_putu32(&reply, tag); - data_putu32(&reply, 34); + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_U32, PROTOCOL_VERSION, + TAG_INVALID); return send_data(client, &reply); } @@ -389,17 +961,45 @@ static int do_set_client_name(struct client *client, uint32_t command, uint32_t struct impl *impl = client->impl; uint8_t buffer[1024]; struct data reply; + const char *name = NULL; + struct pw_properties *props; + int res; + + props = pw_properties_new(NULL, NULL); + + if (client->version < 13) { + if ((res = data_get(d, + TAG_STRING, &name, + TAG_INVALID)) < 0) + return res; + } else { + if ((res = data_get(d, + TAG_PROPLIST, props, + TAG_INVALID)) < 0) + return res; + } + if (name) + pw_properties_set(props, "application.name", name); + + pw_log_info(NAME" %p: SET_CLIENT_NAME %s", impl, + pw_properties_get(props, "application.name")); + + pw_properties_free(props); spa_zero(reply); reply.data = buffer; reply.length = sizeof(buffer); - pw_log_info(NAME" %p: SET_CLIENT_NAME", impl); - - data_putu32(&reply, COMMAND_REPLY); - data_putu32(&reply, tag); - data_putu32(&reply, 0); + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_INVALID); + if (client->version >= 13) { + data_put(&reply, + TAG_U32, 0, /* client index */ + TAG_INVALID); + } return send_data(client, &reply); } @@ -408,17 +1008,502 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s struct impl *impl = client->impl; uint8_t buffer[1024]; struct data reply; + uint32_t mask; + int res; + + if ((res = data_get(d, + TAG_U32, &mask, + TAG_INVALID)) < 0) + return res; + + pw_log_info(NAME" %p: SUBSCRIBE mask:%08x", impl, mask); + + spa_zero(reply); + reply.data = buffer; + reply.length = sizeof(buffer); + + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_INVALID); + + return send_data(client, &reply); +} + +static void stream_free(struct stream *stream) +{ + struct client *client = stream->client; + pw_map_remove(&client->streams, stream->channel); + if (stream->stream) { + spa_hook_remove(&stream->stream_listener); + pw_stream_destroy(stream->stream); + } + free(stream); +} + +static int send_request(struct stream *stream, uint32_t size) +{ + struct client *client = stream->client; + uint8_t buffer[1024]; + struct data msg; + + spa_zero(msg); + msg.data = buffer; + msg.length = sizeof(buffer); + + data_put(&msg, + TAG_U32, COMMAND_REQUEST, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_U32, size, + TAG_INVALID); + + return send_data(client, &msg); +} + +static int reply_simple_ack(struct stream *stream, uint32_t tag) +{ + struct client *client = stream->client; + uint8_t buffer[1024]; + struct data reply; + + spa_zero(reply); + reply.data = buffer; + reply.length = sizeof(buffer); + + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_INVALID); + + return send_data(client, &reply); +} + +static int reply_error(struct stream *stream, uint32_t tag, uint32_t error) +{ + struct client *client = stream->client; + uint8_t buffer[1024]; + struct data reply; + + spa_zero(reply); + reply.data = buffer; + reply.length = sizeof(buffer); + + data_put(&reply, + TAG_U32, COMMAND_ERROR, + TAG_U32, tag, + TAG_U32, error, + TAG_INVALID); + + return send_data(client, &reply); +} + +static int reply_create_playback_stream(struct stream *stream) +{ + struct impl *impl = stream->impl; + struct client *client = stream->client; + uint8_t buffer[1024]; + struct data reply; spa_zero(reply); reply.data = buffer; reply.length = sizeof(buffer); - pw_log_info(NAME" %p: SUBSCRIBE", impl); + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, stream->create_tag, + TAG_U32, stream->channel, /* stream index/channel */ + TAG_U32, 0, /* sink_input/stream index */ + TAG_U32, 8192, /* missing/requested bytes */ + TAG_INVALID); + + if (client->version >= 9) { + data_put(&reply, + TAG_U32, stream->attr.maxlength, + TAG_U32, stream->attr.tlength, + TAG_U32, stream->attr.prebuf, + TAG_U32, stream->attr.minreq, + TAG_INVALID); + } + if (client->version >= 12) { + data_put(&reply, + TAG_SAMPLE_SPEC, &stream->ss, + TAG_CHANNEL_MAP, &stream->map, + TAG_U32, 0, /* sink index */ + TAG_STRING, "sink", /* sink name */ + TAG_BOOLEAN, false, /* sink suspended state */ + TAG_INVALID); + } + if (client->version >= 13) { + data_put(&reply, + TAG_USEC, 0ULL, /* sink configured latency */ + TAG_INVALID); + } + if (client->version >= 21) { + struct format_info info; + spa_zero(info); + info.encoding = ENCODING_PCM; + data_put(&reply, + TAG_FORMAT_INFO, &info, /* sink_input format */ + TAG_INVALID); + } - data_putu32(&reply, COMMAND_REPLY); - data_putu32(&reply, tag); + stream->create_tag = SPA_ID_INVALID; return send_data(client, &reply); + +} + +static void stream_state_changed(void *data, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct stream *stream = data; + + switch (state) { + case PW_STREAM_STATE_ERROR: + reply_error(stream, 0, 0); + break; + case PW_STREAM_STATE_UNCONNECTED: + reply_error(stream, 0, 0); + break; + case PW_STREAM_STATE_CONNECTING: + break; + case PW_STREAM_STATE_PAUSED: + break; + case PW_STREAM_STATE_STREAMING: + break; + } +} + +static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) +{ + struct stream *stream = data; + + if (id != SPA_PARAM_Format || param == NULL) + return; + + if (stream->create_tag != SPA_ID_INVALID) + reply_create_playback_stream(stream); +} + +static void stream_process(void *data) +{ + struct stream *stream = data; + struct block *block; + struct pw_buffer *buffer; + struct spa_buffer *buf; + uint32_t size, maxsize; + void *p; + + pw_log_trace(NAME" %p: process", stream); + + if (spa_list_is_empty(&stream->blocks)) + return; + + block = spa_list_first(&stream->blocks, struct block, link); + + buffer = pw_stream_dequeue_buffer(stream->stream); + if (buffer == NULL) + return; + + buf = buffer->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + maxsize = buf->datas[0].maxsize; + size = SPA_MIN(block->length, maxsize); + pw_log_trace("process block %p %p", block, block->data); + memcpy(p, block->data, size); + + spa_list_remove(&block->link); + free(block->data); + free(block); + + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = 4; + buf->datas[0].chunk->size = size; + + pw_stream_queue_buffer(stream->stream, buffer); + + send_request(stream, maxsize); +} + +static void stream_drained(void *data) +{ + struct stream *stream = data; + pw_log_info(NAME" %p: drain", stream); + reply_simple_ack(stream, stream->drain_tag); +} + +static const struct pw_stream_events stream_events = +{ + PW_VERSION_STREAM_EVENTS, + .state_changed = stream_state_changed, + .param_changed = stream_param_changed, + .process = stream_process, + .drained = stream_drained, +}; + + + +static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +{ + struct impl *impl = client->impl; + const char *name = NULL; + int res; + struct sample_spec ss; + struct channel_map map; + uint32_t sink_index, syncid; + const char *sink_name; + struct buffer_attr attr; + bool corked = false, + no_remap = false, + no_remix = false, + fix_format = false, + fix_rate = false, + fix_channels = false, + no_move = false, + variable_rate = false, + muted = false, + adjust_latency = false, + early_requests = false, + dont_inhibit_auto_suspend = false, + volume_set = true, + muted_set = false, + fail_on_suspend = false, + relative_volume = false, + passthrough = false; + struct cvolume volume; + struct pw_properties *props = NULL; + uint8_t n_formats = 0; + struct format_info *formats = NULL; + struct stream *stream; + struct spa_audio_info_raw info; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[4096]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM", impl); + + props = pw_properties_new(NULL, NULL); + + if (client->version < 13) { + if ((res = data_get(d, + TAG_STRING, &name, + TAG_INVALID)) < 0) + goto error; + if (name == NULL) { + res = -EPROTO; + goto error; + } + } + if ((res = data_get(d, + TAG_SAMPLE_SPEC, &ss, + TAG_CHANNEL_MAP, &map, + TAG_U32, &sink_index, + TAG_STRING, &sink_name, + TAG_U32, &attr.maxlength, + TAG_BOOLEAN, &corked, + TAG_U32, &attr.tlength, + TAG_U32, &attr.prebuf, + TAG_U32, &attr.minreq, + TAG_U32, &syncid, + TAG_CVOLUME, &volume, + TAG_INVALID)) < 0) + goto error; + + if (client->version >= 12) { + if ((res = data_get(d, + TAG_BOOLEAN, &no_remap, + TAG_BOOLEAN, &no_remix, + TAG_BOOLEAN, &fix_format, + TAG_BOOLEAN, &fix_rate, + TAG_BOOLEAN, &fix_channels, + TAG_BOOLEAN, &no_move, + TAG_BOOLEAN, &variable_rate, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 13) { + if ((res = data_get(d, + TAG_BOOLEAN, &muted, + TAG_BOOLEAN, &adjust_latency, + TAG_PROPLIST, props, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 14) { + if ((res = data_get(d, + TAG_BOOLEAN, &volume_set, + TAG_BOOLEAN, &early_requests, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 15) { + if ((res = data_get(d, + TAG_BOOLEAN, &muted_set, + TAG_BOOLEAN, &dont_inhibit_auto_suspend, + TAG_BOOLEAN, &fail_on_suspend, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 17) { + if ((res = data_get(d, + TAG_BOOLEAN, &relative_volume, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 18) { + if ((res = data_get(d, + TAG_BOOLEAN, &passthrough, + TAG_INVALID)) < 0) + goto error; + } + if (client->version >= 21) { + if ((res = data_get(d, + TAG_U8, &n_formats, + TAG_INVALID)) < 0) + goto error; + + if (n_formats) { + uint8_t i; + formats = calloc(n_formats, sizeof(struct format_info)); + for (i = 0; i < n_formats; i++) { + if ((res = data_get(d, + TAG_FORMAT_INFO, &formats[i], + TAG_INVALID)) < 0) + goto error; + } + } + } + if (d->offset != d->length) { + res = -EPROTO; + goto error; + } + + stream = calloc(1, sizeof(struct stream)); + if (stream == NULL) { + res = -errno; + goto error; + } + stream->impl = impl; + stream->client = client; + stream->channel = pw_map_insert_new(&client->streams, stream); + spa_list_init(&stream->blocks); + + stream->stream = pw_stream_new(client->core, name, props); + props = NULL; + if (stream->stream == NULL) { + res = -errno; + goto error; + } + pw_stream_add_listener(stream->stream, + &stream->stream_listener, + &stream_events, stream); + + stream->create_tag = tag; + stream->ss = ss; + stream->map = map; + stream->attr = attr; + + info = SPA_AUDIO_INFO_RAW_INIT( + .format = format_pa2id(ss.format), + .channels = ss.channels, + .rate = ss.rate); + + n_params = 0; + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info); + + pw_stream_connect(stream->stream, + PW_DIRECTION_OUTPUT, + SPA_ID_INVALID, + PW_STREAM_FLAG_INACTIVE | + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS, + params, n_params); + + return 0; + +error: + if (props) + pw_properties_free(props); + if (stream) + stream_free(stream); + return res; +} + +static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) +{ + struct impl *impl = client->impl; + uint8_t buffer[1024]; + struct data reply; + uint32_t idx; + struct timeval tv, now; + struct stream *stream; + int res; + + if ((res = data_get(d, + TAG_U32, &idx, + TAG_TIMEVAL, &tv, + TAG_INVALID)) < 0) + return res; + + pw_log_info(NAME" %p: GET_PLAYBACK_LATENCY idx:%u", impl, idx); + stream = pw_map_lookup(&client->streams, idx); + if (stream == NULL) + return -EINVAL; + + spa_zero(reply); + reply.data = buffer; + reply.length = sizeof(buffer); + + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_USEC, 0, + TAG_USEC, 0, + TAG_BOOLEAN, true, + TAG_TIMEVAL, &tv, + TAG_TIMEVAL, &now, + TAG_S64, stream->write_index, + TAG_S64, stream->read_index, + TAG_INVALID); + + if (client->version >= 13) { + data_put(&reply, + TAG_U64, 0, /* underrun_for */ + TAG_U64, 0, /* playing_for */ + TAG_INVALID); + } + return send_data(client, &reply); +} + +static int do_cork_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +{ + struct impl *impl = client->impl; + uint8_t buffer[1024]; + struct data reply; + uint32_t idx; + bool cork; + struct stream *stream; + int res; + + if ((res = data_get(d, + TAG_U32, &idx, + TAG_BOOLEAN, &cork, + TAG_INVALID)) < 0) + return res; + + pw_log_info(NAME" %p: CORK_PLAYBACK_STREAM idx:%u cork:%s", + impl, idx, cork ? "yes" : "no"); + stream = pw_map_lookup(&client->streams, idx); + if (stream == NULL) + return -EINVAL; + +// pw_stream_set_active(stream->stream, !cork); + + reply_simple_ack(stream, tag); } static const struct command commands[COMMAND_MAX] = @@ -426,12 +1511,22 @@ static const struct command commands[COMMAND_MAX] = [COMMAND_AUTH] = { do_command_auth, }, [COMMAND_SET_CLIENT_NAME] = { do_set_client_name, }, [COMMAND_SUBSCRIBE] = { do_subscribe, }, + [COMMAND_CREATE_PLAYBACK_STREAM] = { do_create_playback_stream, }, + [COMMAND_GET_PLAYBACK_LATENCY] = { do_get_playback_latency, }, + [COMMAND_CORK_PLAYBACK_STREAM] = { do_cork_playback_stream, }, }; static void client_free(struct client *client) { struct impl *impl = client->impl; + + pw_log_info(NAME" %p: client %p free", impl, client); spa_list_remove(&client->link); + pw_map_clear(&client->streams); + if (client->core) + pw_core_disconnect(client->core); + if (client->props) + pw_properties_free(client->props); if (client->source) pw_loop_destroy_source(impl->loop, client->source); free(client); @@ -444,21 +1539,25 @@ static int handle_packet(struct client *client) uint32_t command, tag; struct data *d = &client->data; - if (data_getu32(d, &command) < 0 || - data_getu32(d, &tag) < 0) { + if (data_get(d, + TAG_U32, &command, + TAG_U32, &tag, + TAG_INVALID) < 0) { res = -EPROTO; goto finish; } - pw_log_info(NAME" %p: Received packet command %u tag %u", + pw_log_debug(NAME" %p: Received packet command %u tag %u", impl, command, tag); if (command >= COMMAND_MAX || commands[command].run == NULL) { + pw_log_error(NAME" %p: command %d not implemented", + impl, command); res = -ENOTSUP; goto finish; } - commands[command].run(client, command, tag, d); + res = commands[command].run(client, command, tag, d); finish: return res; @@ -467,8 +1566,31 @@ finish: static int handle_memblock(struct client *client) { struct impl *impl = client->impl; - pw_log_info(NAME" %p: Received memblock of size: %u", - impl, client->data.length); + int64_t offset; + uint32_t channel, flags; + struct stream *stream; + struct block *block; + + channel = ntohl(client->desc.channel); + offset = (int64_t) ( + (((uint64_t) ntohl(client->desc.offset_hi)) << 32) | + (((uint64_t) ntohl(client->desc.offset_lo)))); + flags = ntohl(client->desc.flags) & FLAG_SEEKMASK, + + pw_log_debug(NAME" %p: Received memblock channel:%d size:%u", + impl, channel, client->data.length); + + stream = pw_map_lookup(&client->streams, channel); + if (stream == NULL) + return -EINVAL; + + block = calloc(1, sizeof(struct block)); + block->data = client->data.data; + block->length = client->data.length; + pw_log_debug("new block %p %p", block, block->data); + client->data.data = NULL; + spa_list_append(&stream->blocks, &block->link); + return 0; } @@ -494,14 +1616,12 @@ static int do_read(struct client *client) size = client->data.length - idx; } while (true) { - pw_log_info(NAME" %p: read %zd", impl, size); if ((r = recv(client->source->fd, data, size, 0)) < 0) { if (errno == EINTR) continue; res = -errno; goto error; } - pw_log_info(NAME" %p: got %zd", impl, r); client->index += r; break; } @@ -537,7 +1657,7 @@ static int do_read(struct client *client) client->data.data = calloc(1, length); client->data.length = length; client->data.offset = 0; - } else if (client->index >= ntohl(client->desc.length) + sizeof(client->desc)) { + } else if (client->index >= client->data.length + sizeof(client->desc)) { switch (client->type) { case TYPE_PACKET: res = handle_packet(client); @@ -573,10 +1693,10 @@ on_client_data(void *data, int fd, uint32_t mask) goto error; } if (mask & SPA_IO_OUT) { - pw_log_info(NAME" %p: can write", impl); + pw_log_trace(NAME" %p: can write", impl); } if (mask & SPA_IO_IN) { - pw_log_info(NAME" %p: can read", impl); + pw_log_trace(NAME" %p: can read", impl); if ((res = do_read(client)) < 0) goto error; } @@ -600,29 +1720,44 @@ on_connect(void *data, int fd, uint32_t mask) int client_fd; struct client *client; + client = calloc(1, sizeof(struct client)); + if (client == NULL) + goto error; + + client->impl = impl; + spa_list_append(&impl->clients, &client->link); + pw_map_init(&client->streams, 16, 16); + + client->props = pw_properties_new( + PW_KEY_CLIENT_API, "pipewire-pulse", + NULL); + if (client->props == NULL) + goto error; + length = sizeof(name); client_fd = accept4(fd, (struct sockaddr *) &name, &length, SOCK_CLOEXEC); - if (client_fd < 0) { - pw_log_error(NAME" %p: failed to accept: %m", impl); - return; - } - pw_log_info(NAME": client connection: %d", fd); + if (client_fd < 0) + goto error; + + pw_log_info(NAME": client %p fd:%d", client, client_fd); - client = calloc(1, sizeof(struct client)); - if (client == NULL) { - pw_log_error(NAME" %p: failed to create client: %m", impl); - goto error_close; - } - client->impl = impl; client->source = pw_loop_add_io(impl->loop, client_fd, SPA_IO_ERR | SPA_IO_HUP | SPA_IO_IN, true, on_client_data, client); - spa_list_append(&impl->clients, &client->link); - return; + if (client->source == NULL) + goto error; -error_close: - close(client_fd); + client->core = pw_context_connect(impl->context, + pw_properties_copy(client->props), 0); + if (client->core == NULL) + goto error; + + return; +error: + pw_log_error(NAME" %p: failed to create client: %m", impl); + if (client) + client_free(client); return; } @@ -736,6 +1871,7 @@ int sm_pulse_bridge_start(struct sm_media_session *session) impl->session = session; impl->loop = session->loop; + impl->context = session->context; spa_list_init(&impl->clients); sm_media_session_add_listener(impl->session, |