diff options
author | Wim Taymans <wtaymans@redhat.com> | 2020-10-05 17:13:04 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2020-10-05 17:14:02 +0200 |
commit | a3db014a173dbaf97f28b9c0a7ab5670535ba23d (patch) | |
tree | 9322a27d3772c89da02234538ec1d2f743c145f2 | |
parent | eb3deac155a86910b48212537ee9e880214bed2d (diff) |
pulse-bridge: implement timing and corkingpulse-protocol
-rw-r--r-- | src/examples/media-session/pulse-bridge.c | 306 |
1 files changed, 255 insertions, 51 deletions
diff --git a/src/examples/media-session/pulse-bridge.c b/src/examples/media-session/pulse-bridge.c index 6cec47de..9292ae29 100644 --- a/src/examples/media-session/pulse-bridge.c +++ b/src/examples/media-session/pulse-bridge.c @@ -34,6 +34,7 @@ #include <time.h> #include <sys/stat.h> #include <sys/types.h> +#include <sys/time.h> #include <arpa/inet.h> #if HAVE_PWD_H #include <pwd.h> @@ -190,27 +191,33 @@ enum sample_format { SAMPLE_INVALID = -1 }; -static const uint32_t audio_formats[] = { - [SAMPLE_U8] = SPA_AUDIO_FORMAT_U8, - [SAMPLE_ALAW] = SPA_AUDIO_FORMAT_UNKNOWN, - [SAMPLE_ULAW] = SPA_AUDIO_FORMAT_UNKNOWN, - [SAMPLE_S16LE] = SPA_AUDIO_FORMAT_S16_LE, - [SAMPLE_S16BE] = SPA_AUDIO_FORMAT_S16_BE, - [SAMPLE_FLOAT32LE] = SPA_AUDIO_FORMAT_F32_LE, - [SAMPLE_FLOAT32BE] = SPA_AUDIO_FORMAT_F32_BE, - [SAMPLE_S32LE] = SPA_AUDIO_FORMAT_S32_LE, - [SAMPLE_S32BE] = SPA_AUDIO_FORMAT_S32_BE, - [SAMPLE_S24LE] = SPA_AUDIO_FORMAT_S24_LE, - [SAMPLE_S24BE] = SPA_AUDIO_FORMAT_S24_BE, - [SAMPLE_S24_32LE] = SPA_AUDIO_FORMAT_S24_32_LE, - [SAMPLE_S24_32BE] = SPA_AUDIO_FORMAT_S24_32_BE, +struct format { + uint32_t format; + const char *name; + uint32_t size; +}; + +static const struct format audio_formats[] = { + [SAMPLE_U8] = { SPA_AUDIO_FORMAT_U8, "u8", 1 }, + [SAMPLE_ALAW] = { SPA_AUDIO_FORMAT_UNKNOWN, "alaw", 1 }, + [SAMPLE_ULAW] = { SPA_AUDIO_FORMAT_UNKNOWN, "ulaw", 1 }, + [SAMPLE_S16LE] = { SPA_AUDIO_FORMAT_S16_LE, "s16le", 2 }, + [SAMPLE_S16BE] = { SPA_AUDIO_FORMAT_S16_BE, "s16be", 2 }, + [SAMPLE_FLOAT32LE] = { SPA_AUDIO_FORMAT_F32_LE, "f32le", 4 }, + [SAMPLE_FLOAT32BE] = { SPA_AUDIO_FORMAT_F32_BE, "f32be", 5 }, + [SAMPLE_S32LE] = { SPA_AUDIO_FORMAT_S32_LE, "s32le", 4 }, + [SAMPLE_S32BE] = { SPA_AUDIO_FORMAT_S32_BE, "s32be", 4 }, + [SAMPLE_S24LE] = { SPA_AUDIO_FORMAT_S24_LE, "s24le", 3 }, + [SAMPLE_S24BE] = { SPA_AUDIO_FORMAT_S24_BE, "s24be", 3 }, + [SAMPLE_S24_32LE] = { SPA_AUDIO_FORMAT_S24_32_LE, "s24_32le", 4 }, + [SAMPLE_S24_32BE] = { SPA_AUDIO_FORMAT_S24_32_BE, "s24_32be", 4 }, }; static inline uint32_t format_pa2id(enum sample_format format) { if (format < 0 || (size_t)format >= SPA_N_ELEMENTS(audio_formats)) return SPA_AUDIO_FORMAT_UNKNOWN; - return audio_formats[format]; + return audio_formats[format].format; } struct sample_spec { @@ -219,6 +226,13 @@ struct sample_spec { uint8_t channels; }; +static inline uint32_t sample_spec_frame_size(const struct sample_spec *ss) +{ + if (ss->format < 0 || (size_t)ss->format >= SPA_N_ELEMENTS(audio_formats)) + return SPA_AUDIO_FORMAT_UNKNOWN; + return audio_formats[ss->format].size * ss->channels; +} + #define CHANNELS_MAX 64 struct channel_map { @@ -279,12 +293,16 @@ struct stream { struct spa_list blocks; int64_t read_index; int64_t write_index; + uint64_t playing_for; struct sample_spec ss; struct channel_map map; struct buffer_attr attr; + uint32_t frame_size; uint32_t drain_tag; + unsigned int corked:1; + unsigned int adjust_latency:1; }; enum { @@ -1077,6 +1095,11 @@ static void stream_flush(struct stream *stream) struct block *block; spa_list_consume(block, &stream->blocks, link) block_free(block); + if (stream->direction == PW_DIRECTION_INPUT) + stream->read_index = stream->write_index; + else + stream->write_index = stream->read_index; + stream->playing_for = 0; } static void stream_free(struct stream *stream) @@ -1090,12 +1113,62 @@ static void stream_free(struct stream *stream) } free(stream); } +static inline uint32_t queued_size(const struct stream *s, uint64_t elapsed) +{ + uint64_t queued; + queued = s->write_index - SPA_MIN(s->read_index, s->write_index); + queued -= SPA_MIN(queued, elapsed); + return queued; +} + +static inline uint32_t target_queue(const struct stream *s) +{ + return s->attr.tlength; +} + +static inline uint32_t wanted_size(const struct stream *s, uint32_t queued, uint32_t target) +{ + return target - SPA_MIN(queued, target); +} -static int send_request(struct stream *stream, uint32_t size) +static inline uint32_t required_size(const struct stream *s) +{ + return s->attr.minreq; +} + +static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed) +{ + uint32_t queued, target, wanted, required; + + queued = queued_size(s, elapsed); + target = target_queue(s); + wanted = wanted_size(s, queued, target); + required = required_size(s); + + pw_log_trace("stream %p, queued:%u target:%u wanted:%u required:%u", + s, queued, target, wanted, required); + + if (s->adjust_latency) + if (queued >= wanted) + wanted = 0; + if (wanted < required) + wanted = 0; + + return wanted; +} + +static int send_command_request(struct stream *stream) { struct client *client = stream->client; uint8_t buffer[1024]; struct data msg; + uint32_t size; + + size = writable_size(stream, 0); + if (size == 0) + return 0; + + pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); spa_zero(msg); msg.data = buffer; @@ -1120,6 +1193,8 @@ static int reply_simple_ack(struct client *client, uint32_t tag) reply.data = buffer; reply.length = sizeof(buffer); + pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); + data_put(&reply, TAG_U32, COMMAND_REPLY, TAG_U32, tag, @@ -1137,6 +1212,8 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error) reply.data = buffer; reply.length = sizeof(buffer); + pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error); + data_put(&reply, TAG_U32, COMMAND_ERROR, TAG_U32, tag, @@ -1151,17 +1228,20 @@ static int reply_create_playback_stream(struct stream *stream) struct client *client = stream->client; uint8_t buffer[1024]; struct data reply; + uint32_t size; spa_zero(reply); reply.data = buffer; reply.length = sizeof(buffer); + size = writable_size(stream, 0); + data_put(&reply, TAG_U32, COMMAND_REPLY, TAG_U32, stream->create_tag, TAG_U32, stream->channel, /* stream index/channel */ TAG_U32, 0, /* sink_input/stream index */ - TAG_U32, 8192, /* missing/requested bytes */ + TAG_U32, size, /* missing/requested bytes */ TAG_INVALID); if (client->version >= 9) { @@ -1280,7 +1360,11 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * if (id != SPA_PARAM_Format || param == NULL) return; + stream->frame_size = sample_spec_frame_size(&stream->ss); + if (stream->create_tag != SPA_ID_INVALID) { + if (stream->corked) + pw_stream_set_active(stream->stream, false); if (stream->direction == PW_DIRECTION_OUTPUT) reply_create_playback_stream(stream); else @@ -1299,39 +1383,42 @@ static void stream_process(void *data) pw_log_trace(NAME" %p: process", stream); - if (spa_list_is_empty(&stream->blocks)) - return; - - block = spa_list_first(&stream->blocks, struct block, link); - - buffer = pw_stream_dequeue_buffer(stream->stream); - if (buffer == NULL) - return; + while (!spa_list_is_empty(&stream->blocks)) { + buffer = pw_stream_dequeue_buffer(stream->stream); + if (buffer == NULL) + break; - buf = buffer->buffer; - if ((p = buf->datas[0].data) == NULL) - return; + buf = buffer->buffer; + if ((p = buf->datas[0].data) == NULL) + break; - maxsize = buf->datas[0].maxsize; - size = SPA_MIN(block->length, maxsize); - pw_log_trace("process block %p %p", block, block->data); - memcpy(p, block->data, size); + block = spa_list_first(&stream->blocks, struct block, link); + maxsize = buf->datas[0].maxsize; + size = SPA_MIN(block->length - block->offset, maxsize); + memcpy(p, block->data + block->offset, size); - block_free(block); + pw_log_trace(NAME" %p: process block %p %d-%d/%d", + stream, block, block->offset, size, block->length); - buf->datas[0].chunk->offset = 0; - buf->datas[0].chunk->stride = 4; - buf->datas[0].chunk->size = size; + stream->read_index += size; + stream->playing_for += size; + block->offset += size; + if (block->offset >= block->length) + block_free(block); - pw_stream_queue_buffer(stream->stream, buffer); + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = stream->frame_size; + buf->datas[0].chunk->size = size; - send_request(stream, maxsize); + pw_stream_queue_buffer(stream->stream, buffer); + } + send_command_request(stream); } static void stream_drained(void *data) { struct stream *stream = data; - pw_log_info(NAME" %p: drain", stream); + pw_log_info(NAME" %p: drained channel:%u", stream, stream->channel); reply_simple_ack(stream->client, stream->drain_tag); } @@ -1344,6 +1431,70 @@ static const struct pw_stream_events stream_events = .drained = stream_drained, }; +#define MAXLENGTH (4*1024*1024) /* 4MB */ +#define DEFAULT_TLENGTH_MSEC 2000 /* 2s */ +#define DEFAULT_PROCESS_MSEC 20 /* 20ms */ +#define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC + +static size_t usec_to_bytes_round_up(uint64_t usec, const struct sample_spec *ss) +{ + uint64_t u; + u = (uint64_t) usec * (uint64_t) ss->rate; + u = (u + 1000000UL - 1) / 1000000UL; + u *= sample_spec_frame_size(ss); + return (size_t) u; +} + +static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + size_t frame_size, max_prebuf; + + frame_size = sample_spec_frame_size(&s->ss); + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + if (attr->maxlength <= 0) + attr->maxlength = (uint32_t) frame_size; + + if (attr->tlength == (uint32_t) -1) + attr->tlength = (uint32_t) usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*1000, &s->ss); + + if (attr->tlength <= 0) + attr->tlength = (uint32_t) frame_size; + if (attr->tlength > attr->maxlength) + attr->tlength = attr->maxlength; + + if (attr->minreq == (uint32_t) -1) { + uint32_t process = (uint32_t) usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss); + /* With low-latency, tlength/4 gives a decent default in all of traditional, + * adjust latency and early request modes. */ + uint32_t m = attr->tlength / 4; + if (frame_size) + m -= m % frame_size; + attr->minreq = SPA_MIN(process, m); + } + if (attr->minreq <= 0) + attr->minreq = (uint32_t) frame_size; + + if (attr->tlength < attr->minreq+frame_size) + attr->tlength = attr->minreq+(uint32_t) frame_size; + + + if (attr->minreq <= 0) { + attr->minreq = (uint32_t) frame_size; + attr->tlength += (uint32_t) frame_size*2; + } + + if (attr->tlength <= attr->minreq) + attr->tlength = attr->minreq*2 + (uint32_t) frame_size; + + max_prebuf = attr->tlength + (uint32_t)frame_size - attr->minreq; + + if (attr->prebuf == (uint32_t) -1 || + attr->prebuf > max_prebuf) + attr->prebuf = max_prebuf; +} + static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) { struct impl *impl = client->impl; @@ -1382,8 +1533,6 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM", impl); - props = pw_properties_new(NULL, NULL); if (client->version < 13) { @@ -1411,6 +1560,8 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui TAG_INVALID)) < 0) goto error; + pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM corked:%u", impl, corked); + if (client->version >= 12) { if ((res = data_get(d, TAG_BOOLEAN, &no_remap, @@ -1487,6 +1638,8 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui } stream->impl = impl; stream->client = client; + stream->corked = corked; + stream->adjust_latency = adjust_latency; stream->channel = pw_map_insert_new(&client->streams, stream); spa_list_init(&stream->blocks); @@ -1504,20 +1657,20 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui stream->create_tag = tag; stream->ss = ss; stream->map = map; + + fix_playback_buffer_attr(stream, &attr); stream->attr = attr; info = SPA_AUDIO_INFO_RAW_INIT( .format = format_pa2id(ss.format), .channels = ss.channels, .rate = ss.rate); - n_params = 0; params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info); pw_stream_connect(stream->stream, PW_DIRECTION_OUTPUT, SPA_ID_INVALID, - PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS, params, n_params); @@ -1699,7 +1852,6 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint pw_stream_connect(stream->stream, PW_DIRECTION_INPUT, SPA_ID_INVALID, - PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS, params, n_params); @@ -1736,7 +1888,7 @@ static int do_delete_stream(struct client *client, uint32_t command, uint32_t ta return reply_simple_ack(client, tag); } -static int do_get_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) { struct impl *impl = client->impl; uint8_t buffer[1024]; @@ -1757,16 +1909,22 @@ static int do_get_latency(struct client *client, uint32_t command, uint32_t tag, if (stream == NULL) return -EINVAL; + pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64, + stream->read_index, stream->write_index, + stream->write_index - stream->read_index); + spa_zero(reply); reply.data = buffer; reply.length = sizeof(buffer); + gettimeofday(&now, NULL); + data_put(&reply, TAG_U32, COMMAND_REPLY, TAG_U32, tag, - TAG_USEC, 0, - TAG_USEC, 0, - TAG_BOOLEAN, true, + TAG_USEC, 0, /* sink latency + queued samples */ + TAG_USEC, 0, /* always 0 */ + TAG_BOOLEAN, true, /* playing state */ TAG_TIMEVAL, &tv, TAG_TIMEVAL, &now, TAG_S64, stream->write_index, @@ -1782,6 +1940,48 @@ static int do_get_latency(struct client *client, uint32_t command, uint32_t tag, return send_data(client, &reply); } +static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) +{ + struct impl *impl = client->impl; + uint8_t buffer[1024]; + struct data reply; + uint32_t channel; + struct timeval tv, now; + struct stream *stream; + int res; + + if ((res = data_get(d, + TAG_U32, &channel, + TAG_TIMEVAL, &tv, + TAG_INVALID)) < 0) + return res; + + pw_log_debug(NAME" %p: %s channel:%u", impl, commands[command].name, channel); + stream = pw_map_lookup(&client->streams, channel); + if (stream == NULL) + return -EINVAL; + + spa_zero(reply); + reply.data = buffer; + reply.length = sizeof(buffer); + + gettimeofday(&now, NULL); + + data_put(&reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_USEC, 0, /* monitor latency */ + TAG_USEC, 0, /* source latency + queued */ + TAG_BOOLEAN, true, /* playing state */ + TAG_TIMEVAL, &tv, + TAG_TIMEVAL, &now, + TAG_S64, stream->write_index, + TAG_S64, stream->read_index, + TAG_INVALID); + + return send_data(client, &reply); +} + static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) { struct impl *impl = client->impl; @@ -1798,11 +1998,13 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, pw_log_info(NAME" %p: %s channel:%u cork:%s", impl, commands[command].name, channel, cork ? "yes" : "no"); + stream = pw_map_lookup(&client->streams, channel); if (stream == NULL) return -EINVAL; -// pw_stream_set_active(stream->stream, !cork); + pw_stream_set_active(stream->stream, !cork); + stream->corked = cork; return reply_simple_ack(client, tag); } @@ -1830,6 +2032,7 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman case COMMAND_FLUSH_RECORD_STREAM: stream_flush(stream); pw_stream_flush(stream->stream, false); + send_command_request(stream); break; case COMMAND_TRIGGER_PLAYBACK_STREAM: case COMMAND_PREBUF_PLAYBACK_STREAM: @@ -2413,7 +2616,7 @@ static const struct command commands[COMMAND_MAX] = [COMMAND_LOOKUP_SOURCE] = { "LOOKUP_SOURCE", do_lookup, }, [COMMAND_DRAIN_PLAYBACK_STREAM] = { "DRAIN_PLAYBACK_STREAM", do_drain_stream, }, [COMMAND_STAT] = { "STAT", do_stat, }, - [COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_latency, }, + [COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_playback_latency, }, [COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_error_access, }, [COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_error_access, }, [COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_error_access, }, @@ -2471,7 +2674,7 @@ static const struct command commands[COMMAND_MAX] = [COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = { "GET_AUTOLOAD_INFO___OBSOLETE", do_error_access, }, [COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = { "GET_AUTOLOAD_INFO_LIST___OBSOLETE", do_error_access, }, - [COMMAND_GET_RECORD_LATENCY] = { "GET_RECORD_LATENCY", do_get_latency, }, + [COMMAND_GET_RECORD_LATENCY] = { "GET_RECORD_LATENCY", do_get_record_latency, }, [COMMAND_CORK_RECORD_STREAM] = { "CORK_RECORD_STREAM", do_cork_stream, }, [COMMAND_FLUSH_RECORD_STREAM] = { "FLUSH_RECORD_STREAM", do_flush_trigger_prebuf_stream, }, @@ -2637,6 +2840,7 @@ static int handle_memblock(struct client *client) pw_log_debug("new block %p %p", block, block->data); client->data.data = NULL; spa_list_append(&stream->blocks, &block->link); + stream->write_index += block->length; return 0; } |