summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-10-05 17:13:04 +0200
committerWim Taymans <wtaymans@redhat.com>2020-10-05 17:14:02 +0200
commita3db014a173dbaf97f28b9c0a7ab5670535ba23d (patch)
tree9322a27d3772c89da02234538ec1d2f743c145f2
parenteb3deac155a86910b48212537ee9e880214bed2d (diff)
pulse-bridge: implement timing and corkingpulse-protocol
-rw-r--r--src/examples/media-session/pulse-bridge.c306
1 files changed, 255 insertions, 51 deletions
diff --git a/src/examples/media-session/pulse-bridge.c b/src/examples/media-session/pulse-bridge.c
index 6cec47de..9292ae29 100644
--- a/src/examples/media-session/pulse-bridge.c
+++ b/src/examples/media-session/pulse-bridge.c
@@ -34,6 +34,7 @@
#include <time.h>
#include <sys/stat.h>
#include <sys/types.h>
+#include <sys/time.h>
#include <arpa/inet.h>
#if HAVE_PWD_H
#include <pwd.h>
@@ -190,27 +191,33 @@ enum sample_format {
SAMPLE_INVALID = -1
};
-static const uint32_t audio_formats[] = {
- [SAMPLE_U8] = SPA_AUDIO_FORMAT_U8,
- [SAMPLE_ALAW] = SPA_AUDIO_FORMAT_UNKNOWN,
- [SAMPLE_ULAW] = SPA_AUDIO_FORMAT_UNKNOWN,
- [SAMPLE_S16LE] = SPA_AUDIO_FORMAT_S16_LE,
- [SAMPLE_S16BE] = SPA_AUDIO_FORMAT_S16_BE,
- [SAMPLE_FLOAT32LE] = SPA_AUDIO_FORMAT_F32_LE,
- [SAMPLE_FLOAT32BE] = SPA_AUDIO_FORMAT_F32_BE,
- [SAMPLE_S32LE] = SPA_AUDIO_FORMAT_S32_LE,
- [SAMPLE_S32BE] = SPA_AUDIO_FORMAT_S32_BE,
- [SAMPLE_S24LE] = SPA_AUDIO_FORMAT_S24_LE,
- [SAMPLE_S24BE] = SPA_AUDIO_FORMAT_S24_BE,
- [SAMPLE_S24_32LE] = SPA_AUDIO_FORMAT_S24_32_LE,
- [SAMPLE_S24_32BE] = SPA_AUDIO_FORMAT_S24_32_BE,
+struct format {
+ uint32_t format;
+ const char *name;
+ uint32_t size;
+};
+
+static const struct format audio_formats[] = {
+ [SAMPLE_U8] = { SPA_AUDIO_FORMAT_U8, "u8", 1 },
+ [SAMPLE_ALAW] = { SPA_AUDIO_FORMAT_UNKNOWN, "alaw", 1 },
+ [SAMPLE_ULAW] = { SPA_AUDIO_FORMAT_UNKNOWN, "ulaw", 1 },
+ [SAMPLE_S16LE] = { SPA_AUDIO_FORMAT_S16_LE, "s16le", 2 },
+ [SAMPLE_S16BE] = { SPA_AUDIO_FORMAT_S16_BE, "s16be", 2 },
+ [SAMPLE_FLOAT32LE] = { SPA_AUDIO_FORMAT_F32_LE, "f32le", 4 },
+ [SAMPLE_FLOAT32BE] = { SPA_AUDIO_FORMAT_F32_BE, "f32be", 5 },
+ [SAMPLE_S32LE] = { SPA_AUDIO_FORMAT_S32_LE, "s32le", 4 },
+ [SAMPLE_S32BE] = { SPA_AUDIO_FORMAT_S32_BE, "s32be", 4 },
+ [SAMPLE_S24LE] = { SPA_AUDIO_FORMAT_S24_LE, "s24le", 3 },
+ [SAMPLE_S24BE] = { SPA_AUDIO_FORMAT_S24_BE, "s24be", 3 },
+ [SAMPLE_S24_32LE] = { SPA_AUDIO_FORMAT_S24_32_LE, "s24_32le", 4 },
+ [SAMPLE_S24_32BE] = { SPA_AUDIO_FORMAT_S24_32_BE, "s24_32be", 4 },
};
static inline uint32_t format_pa2id(enum sample_format format)
{
if (format < 0 || (size_t)format >= SPA_N_ELEMENTS(audio_formats))
return SPA_AUDIO_FORMAT_UNKNOWN;
- return audio_formats[format];
+ return audio_formats[format].format;
}
struct sample_spec {
@@ -219,6 +226,13 @@ struct sample_spec {
uint8_t channels;
};
+static inline uint32_t sample_spec_frame_size(const struct sample_spec *ss)
+{
+ if (ss->format < 0 || (size_t)ss->format >= SPA_N_ELEMENTS(audio_formats))
+ return SPA_AUDIO_FORMAT_UNKNOWN;
+ return audio_formats[ss->format].size * ss->channels;
+}
+
#define CHANNELS_MAX 64
struct channel_map {
@@ -279,12 +293,16 @@ struct stream {
struct spa_list blocks;
int64_t read_index;
int64_t write_index;
+ uint64_t playing_for;
struct sample_spec ss;
struct channel_map map;
struct buffer_attr attr;
+ uint32_t frame_size;
uint32_t drain_tag;
+ unsigned int corked:1;
+ unsigned int adjust_latency:1;
};
enum {
@@ -1077,6 +1095,11 @@ static void stream_flush(struct stream *stream)
struct block *block;
spa_list_consume(block, &stream->blocks, link)
block_free(block);
+ if (stream->direction == PW_DIRECTION_INPUT)
+ stream->read_index = stream->write_index;
+ else
+ stream->write_index = stream->read_index;
+ stream->playing_for = 0;
}
static void stream_free(struct stream *stream)
@@ -1090,12 +1113,62 @@ static void stream_free(struct stream *stream)
}
free(stream);
}
+static inline uint32_t queued_size(const struct stream *s, uint64_t elapsed)
+{
+ uint64_t queued;
+ queued = s->write_index - SPA_MIN(s->read_index, s->write_index);
+ queued -= SPA_MIN(queued, elapsed);
+ return queued;
+}
+
+static inline uint32_t target_queue(const struct stream *s)
+{
+ return s->attr.tlength;
+}
+
+static inline uint32_t wanted_size(const struct stream *s, uint32_t queued, uint32_t target)
+{
+ return target - SPA_MIN(queued, target);
+}
-static int send_request(struct stream *stream, uint32_t size)
+static inline uint32_t required_size(const struct stream *s)
+{
+ return s->attr.minreq;
+}
+
+static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed)
+{
+ uint32_t queued, target, wanted, required;
+
+ queued = queued_size(s, elapsed);
+ target = target_queue(s);
+ wanted = wanted_size(s, queued, target);
+ required = required_size(s);
+
+ pw_log_trace("stream %p, queued:%u target:%u wanted:%u required:%u",
+ s, queued, target, wanted, required);
+
+ if (s->adjust_latency)
+ if (queued >= wanted)
+ wanted = 0;
+ if (wanted < required)
+ wanted = 0;
+
+ return wanted;
+}
+
+static int send_command_request(struct stream *stream)
{
struct client *client = stream->client;
uint8_t buffer[1024];
struct data msg;
+ uint32_t size;
+
+ size = writable_size(stream, 0);
+ if (size == 0)
+ return 0;
+
+ pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
spa_zero(msg);
msg.data = buffer;
@@ -1120,6 +1193,8 @@ static int reply_simple_ack(struct client *client, uint32_t tag)
reply.data = buffer;
reply.length = sizeof(buffer);
+ pw_log_debug(NAME" %p: REPLY tag:%u", client, tag);
+
data_put(&reply,
TAG_U32, COMMAND_REPLY,
TAG_U32, tag,
@@ -1137,6 +1212,8 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error)
reply.data = buffer;
reply.length = sizeof(buffer);
+ pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error);
+
data_put(&reply,
TAG_U32, COMMAND_ERROR,
TAG_U32, tag,
@@ -1151,17 +1228,20 @@ static int reply_create_playback_stream(struct stream *stream)
struct client *client = stream->client;
uint8_t buffer[1024];
struct data reply;
+ uint32_t size;
spa_zero(reply);
reply.data = buffer;
reply.length = sizeof(buffer);
+ size = writable_size(stream, 0);
+
data_put(&reply,
TAG_U32, COMMAND_REPLY,
TAG_U32, stream->create_tag,
TAG_U32, stream->channel, /* stream index/channel */
TAG_U32, 0, /* sink_input/stream index */
- TAG_U32, 8192, /* missing/requested bytes */
+ TAG_U32, size, /* missing/requested bytes */
TAG_INVALID);
if (client->version >= 9) {
@@ -1280,7 +1360,11 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
if (id != SPA_PARAM_Format || param == NULL)
return;
+ stream->frame_size = sample_spec_frame_size(&stream->ss);
+
if (stream->create_tag != SPA_ID_INVALID) {
+ if (stream->corked)
+ pw_stream_set_active(stream->stream, false);
if (stream->direction == PW_DIRECTION_OUTPUT)
reply_create_playback_stream(stream);
else
@@ -1299,39 +1383,42 @@ static void stream_process(void *data)
pw_log_trace(NAME" %p: process", stream);
- if (spa_list_is_empty(&stream->blocks))
- return;
-
- block = spa_list_first(&stream->blocks, struct block, link);
-
- buffer = pw_stream_dequeue_buffer(stream->stream);
- if (buffer == NULL)
- return;
+ while (!spa_list_is_empty(&stream->blocks)) {
+ buffer = pw_stream_dequeue_buffer(stream->stream);
+ if (buffer == NULL)
+ break;
- buf = buffer->buffer;
- if ((p = buf->datas[0].data) == NULL)
- return;
+ buf = buffer->buffer;
+ if ((p = buf->datas[0].data) == NULL)
+ break;
- maxsize = buf->datas[0].maxsize;
- size = SPA_MIN(block->length, maxsize);
- pw_log_trace("process block %p %p", block, block->data);
- memcpy(p, block->data, size);
+ block = spa_list_first(&stream->blocks, struct block, link);
+ maxsize = buf->datas[0].maxsize;
+ size = SPA_MIN(block->length - block->offset, maxsize);
+ memcpy(p, block->data + block->offset, size);
- block_free(block);
+ pw_log_trace(NAME" %p: process block %p %d-%d/%d",
+ stream, block, block->offset, size, block->length);
- buf->datas[0].chunk->offset = 0;
- buf->datas[0].chunk->stride = 4;
- buf->datas[0].chunk->size = size;
+ stream->read_index += size;
+ stream->playing_for += size;
+ block->offset += size;
+ if (block->offset >= block->length)
+ block_free(block);
- pw_stream_queue_buffer(stream->stream, buffer);
+ buf->datas[0].chunk->offset = 0;
+ buf->datas[0].chunk->stride = stream->frame_size;
+ buf->datas[0].chunk->size = size;
- send_request(stream, maxsize);
+ pw_stream_queue_buffer(stream->stream, buffer);
+ }
+ send_command_request(stream);
}
static void stream_drained(void *data)
{
struct stream *stream = data;
- pw_log_info(NAME" %p: drain", stream);
+ pw_log_info(NAME" %p: drained channel:%u", stream, stream->channel);
reply_simple_ack(stream->client, stream->drain_tag);
}
@@ -1344,6 +1431,70 @@ static const struct pw_stream_events stream_events =
.drained = stream_drained,
};
+#define MAXLENGTH (4*1024*1024) /* 4MB */
+#define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
+#define DEFAULT_PROCESS_MSEC 20 /* 20ms */
+#define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
+
+static size_t usec_to_bytes_round_up(uint64_t usec, const struct sample_spec *ss)
+{
+ uint64_t u;
+ u = (uint64_t) usec * (uint64_t) ss->rate;
+ u = (u + 1000000UL - 1) / 1000000UL;
+ u *= sample_spec_frame_size(ss);
+ return (size_t) u;
+}
+
+static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
+{
+ size_t frame_size, max_prebuf;
+
+ frame_size = sample_spec_frame_size(&s->ss);
+
+ if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
+ attr->maxlength = MAXLENGTH;
+ if (attr->maxlength <= 0)
+ attr->maxlength = (uint32_t) frame_size;
+
+ if (attr->tlength == (uint32_t) -1)
+ attr->tlength = (uint32_t) usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*1000, &s->ss);
+
+ if (attr->tlength <= 0)
+ attr->tlength = (uint32_t) frame_size;
+ if (attr->tlength > attr->maxlength)
+ attr->tlength = attr->maxlength;
+
+ if (attr->minreq == (uint32_t) -1) {
+ uint32_t process = (uint32_t) usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss);
+ /* With low-latency, tlength/4 gives a decent default in all of traditional,
+ * adjust latency and early request modes. */
+ uint32_t m = attr->tlength / 4;
+ if (frame_size)
+ m -= m % frame_size;
+ attr->minreq = SPA_MIN(process, m);
+ }
+ if (attr->minreq <= 0)
+ attr->minreq = (uint32_t) frame_size;
+
+ if (attr->tlength < attr->minreq+frame_size)
+ attr->tlength = attr->minreq+(uint32_t) frame_size;
+
+
+ if (attr->minreq <= 0) {
+ attr->minreq = (uint32_t) frame_size;
+ attr->tlength += (uint32_t) frame_size*2;
+ }
+
+ if (attr->tlength <= attr->minreq)
+ attr->tlength = attr->minreq*2 + (uint32_t) frame_size;
+
+ max_prebuf = attr->tlength + (uint32_t)frame_size - attr->minreq;
+
+ if (attr->prebuf == (uint32_t) -1 ||
+ attr->prebuf > max_prebuf)
+ attr->prebuf = max_prebuf;
+}
+
static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d)
{
struct impl *impl = client->impl;
@@ -1382,8 +1533,6 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
- pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM", impl);
-
props = pw_properties_new(NULL, NULL);
if (client->version < 13) {
@@ -1411,6 +1560,8 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
TAG_INVALID)) < 0)
goto error;
+ pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM corked:%u", impl, corked);
+
if (client->version >= 12) {
if ((res = data_get(d,
TAG_BOOLEAN, &no_remap,
@@ -1487,6 +1638,8 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
}
stream->impl = impl;
stream->client = client;
+ stream->corked = corked;
+ stream->adjust_latency = adjust_latency;
stream->channel = pw_map_insert_new(&client->streams, stream);
spa_list_init(&stream->blocks);
@@ -1504,20 +1657,20 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
stream->create_tag = tag;
stream->ss = ss;
stream->map = map;
+
+ fix_playback_buffer_attr(stream, &attr);
stream->attr = attr;
info = SPA_AUDIO_INFO_RAW_INIT(
.format = format_pa2id(ss.format),
.channels = ss.channels,
.rate = ss.rate);
-
n_params = 0;
params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info);
pw_stream_connect(stream->stream,
PW_DIRECTION_OUTPUT,
SPA_ID_INVALID,
- PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS,
params, n_params);
@@ -1699,7 +1852,6 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
pw_stream_connect(stream->stream,
PW_DIRECTION_INPUT,
SPA_ID_INVALID,
- PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS,
params, n_params);
@@ -1736,7 +1888,7 @@ static int do_delete_stream(struct client *client, uint32_t command, uint32_t ta
return reply_simple_ack(client, tag);
}
-static int do_get_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d)
+static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d)
{
struct impl *impl = client->impl;
uint8_t buffer[1024];
@@ -1757,16 +1909,22 @@ static int do_get_latency(struct client *client, uint32_t command, uint32_t tag,
if (stream == NULL)
return -EINVAL;
+ pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64,
+ stream->read_index, stream->write_index,
+ stream->write_index - stream->read_index);
+
spa_zero(reply);
reply.data = buffer;
reply.length = sizeof(buffer);
+ gettimeofday(&now, NULL);
+
data_put(&reply,
TAG_U32, COMMAND_REPLY,
TAG_U32, tag,
- TAG_USEC, 0,
- TAG_USEC, 0,
- TAG_BOOLEAN, true,
+ TAG_USEC, 0, /* sink latency + queued samples */
+ TAG_USEC, 0, /* always 0 */
+ TAG_BOOLEAN, true, /* playing state */
TAG_TIMEVAL, &tv,
TAG_TIMEVAL, &now,
TAG_S64, stream->write_index,
@@ -1782,6 +1940,48 @@ static int do_get_latency(struct client *client, uint32_t command, uint32_t tag,
return send_data(client, &reply);
}
+static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d)
+{
+ struct impl *impl = client->impl;
+ uint8_t buffer[1024];
+ struct data reply;
+ uint32_t channel;
+ struct timeval tv, now;
+ struct stream *stream;
+ int res;
+
+ if ((res = data_get(d,
+ TAG_U32, &channel,
+ TAG_TIMEVAL, &tv,
+ TAG_INVALID)) < 0)
+ return res;
+
+ pw_log_debug(NAME" %p: %s channel:%u", impl, commands[command].name, channel);
+ stream = pw_map_lookup(&client->streams, channel);
+ if (stream == NULL)
+ return -EINVAL;
+
+ spa_zero(reply);
+ reply.data = buffer;
+ reply.length = sizeof(buffer);
+
+ gettimeofday(&now, NULL);
+
+ data_put(&reply,
+ TAG_U32, COMMAND_REPLY,
+ TAG_U32, tag,
+ TAG_USEC, 0, /* monitor latency */
+ TAG_USEC, 0, /* source latency + queued */
+ TAG_BOOLEAN, true, /* playing state */
+ TAG_TIMEVAL, &tv,
+ TAG_TIMEVAL, &now,
+ TAG_S64, stream->write_index,
+ TAG_S64, stream->read_index,
+ TAG_INVALID);
+
+ return send_data(client, &reply);
+}
+
static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d)
{
struct impl *impl = client->impl;
@@ -1798,11 +1998,13 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag,
pw_log_info(NAME" %p: %s channel:%u cork:%s",
impl, commands[command].name, channel, cork ? "yes" : "no");
+
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
-// pw_stream_set_active(stream->stream, !cork);
+ pw_stream_set_active(stream->stream, !cork);
+ stream->corked = cork;
return reply_simple_ack(client, tag);
}
@@ -1830,6 +2032,7 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman
case COMMAND_FLUSH_RECORD_STREAM:
stream_flush(stream);
pw_stream_flush(stream->stream, false);
+ send_command_request(stream);
break;
case COMMAND_TRIGGER_PLAYBACK_STREAM:
case COMMAND_PREBUF_PLAYBACK_STREAM:
@@ -2413,7 +2616,7 @@ static const struct command commands[COMMAND_MAX] =
[COMMAND_LOOKUP_SOURCE] = { "LOOKUP_SOURCE", do_lookup, },
[COMMAND_DRAIN_PLAYBACK_STREAM] = { "DRAIN_PLAYBACK_STREAM", do_drain_stream, },
[COMMAND_STAT] = { "STAT", do_stat, },
- [COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_latency, },
+ [COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_playback_latency, },
[COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_error_access, },
[COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_error_access, },
[COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_error_access, },
@@ -2471,7 +2674,7 @@ static const struct command commands[COMMAND_MAX] =
[COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = { "GET_AUTOLOAD_INFO___OBSOLETE", do_error_access, },
[COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = { "GET_AUTOLOAD_INFO_LIST___OBSOLETE", do_error_access, },
- [COMMAND_GET_RECORD_LATENCY] = { "GET_RECORD_LATENCY", do_get_latency, },
+ [COMMAND_GET_RECORD_LATENCY] = { "GET_RECORD_LATENCY", do_get_record_latency, },
[COMMAND_CORK_RECORD_STREAM] = { "CORK_RECORD_STREAM", do_cork_stream, },
[COMMAND_FLUSH_RECORD_STREAM] = { "FLUSH_RECORD_STREAM", do_flush_trigger_prebuf_stream, },
@@ -2637,6 +2840,7 @@ static int handle_memblock(struct client *client)
pw_log_debug("new block %p %p", block, block->data);
client->data.data = NULL;
spa_list_append(&stream->blocks, &block->link);
+ stream->write_index += block->length;
return 0;
}