summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-04-13 19:44:12 +0200
committerWim Taymans <wtaymans@redhat.com>2020-04-13 19:44:12 +0200
commit4fd43733c2f82ee56900f331c11d9a4588c2714c (patch)
tree2f3998be20d8cd27ad9c33b1d448441e4bae975c
parentfd00d16361d24718e59dbd90ad290e381500eb5c (diff)
pulse: more work on timings
The read_index should not include the delay to the device. Keep a separate lis of memory blocks filled by the app and give those to the stream when we can. This is because pulse can allocate an infinite amount of buffers but we must cycle between a fixed number. Use DYNAMIC_DATA to avoid memcpy. Use the right requested_bytes in the write_callback. This should be the tlength - the amount of bytes we already queued. _get_time() should include the sink latency.
-rw-r--r--pipewire-pulseaudio/src/internal.h27
-rw-r--r--pipewire-pulseaudio/src/stream.c255
2 files changed, 156 insertions, 126 deletions
diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h
index 9fd56719..e13d7b5b 100644
--- a/pipewire-pulseaudio/src/internal.h
+++ b/pipewire-pulseaudio/src/internal.h
@@ -325,6 +325,14 @@ struct global *pa_context_find_global(pa_context *c, uint32_t id);
struct global *pa_context_find_global_by_name(pa_context *c, uint32_t mask, const char *name);
struct global *pa_context_find_linked(pa_context *c, uint32_t id);
+struct pa_mem {
+ struct spa_list link;
+ void *data;
+ size_t maxsize;
+ size_t size;
+ size_t offset;
+};
+
#define MAX_BUFFERS 64u
#define MASK_BUFFERS (MAX_BUFFERS-1)
@@ -357,7 +365,7 @@ struct pa_stream {
char *device_name;
pa_timing_info timing_info;
- int64_t ticks_base;
+ uint64_t ticks_base;
uint32_t direct_on_input;
@@ -389,17 +397,16 @@ struct pa_stream {
pa_stream_notify_cb_t buffer_attr_callback;
void *buffer_attr_userdata;
- struct pw_buffer *dequeued[MAX_BUFFERS];
- struct spa_ringbuffer dequeued_ring;
- size_t dequeued_size;
size_t maxsize;
- struct spa_list pending;
+ size_t maxblock;
+ size_t requested_bytes;
+
+ struct pa_mem *mem; /* current mem for playback */
+ struct spa_list free; /* free to fill */
+ struct spa_list ready; /* ready for playback */
+ size_t ready_bytes;
- struct pw_buffer *buffer;
- uint32_t buffer_index;
- void *buffer_data;
- uint32_t buffer_size;
- uint32_t buffer_offset;
+ struct pw_buffer *buffer; /* currently reading for capture */
uint32_t n_channel_volumes;
float channel_volumes[SPA_AUDIO_MAX_CHANNELS];
diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c
index 3047a1a5..4ecce803 100644
--- a/pipewire-pulseaudio/src/stream.c
+++ b/pipewire-pulseaudio/src/stream.c
@@ -36,6 +36,7 @@
#define MIN_QUEUED 1
#define MAX_SIZE (4*1024*1024)
+#define BLOCK_SIZE (64*1024)
static const uint32_t audio_formats[] = {
[PA_SAMPLE_U8] = SPA_AUDIO_FORMAT_U8,
@@ -149,26 +150,6 @@ static inline pa_channel_position_t channel_id2pa(pa_stream *s, uint32_t id)
return PA_CHANNEL_POSITION_INVALID;
}
-static inline int dequeue_buffer(pa_stream *s)
-{
- struct pw_buffer *buf;
- uint32_t index;
-
- buf = pw_stream_dequeue_buffer(s->stream);
- if (buf == NULL)
- return -EPIPE;
-
- spa_ringbuffer_get_write_index(&s->dequeued_ring, &index);
- s->dequeued[index & MASK_BUFFERS] = buf;
- if (s->direction == PA_STREAM_PLAYBACK)
- s->dequeued_size += buf->buffer->datas[0].maxsize;
- else
- s->dequeued_size += buf->buffer->datas[0].chunk->size;
- spa_ringbuffer_write_update(&s->dequeued_ring, index + 1);
-
- return 0;
-}
-
static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr)
{
pw_log_info("stream %p: maxlength: %u", s, attr->maxlength);
@@ -180,7 +161,7 @@ static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr)
static void configure_buffers(pa_stream *s)
{
- s->buffer_attr.maxlength = s->maxsize;
+ s->buffer_attr.maxlength = MAX_SIZE;
if (s->buffer_attr.prebuf == (uint32_t)-1)
s->buffer_attr.prebuf = s->buffer_attr.minreq;
s->buffer_attr.fragsize = s->buffer_attr.minreq;
@@ -342,7 +323,7 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag
}
if (attr->maxlength == (uint32_t) -1)
- attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
+ attr->maxlength = MAX_SIZE; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
if (attr->tlength == (uint32_t) -1)
attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
@@ -432,11 +413,18 @@ static void stream_add_buffer(void *data, struct pw_buffer *buffer)
{
pa_stream *s = data;
s->maxsize += buffer->buffer->datas[0].maxsize;
+ s->maxblock = SPA_MIN(buffer->buffer->datas[0].maxsize, s->maxblock);
}
+
static void stream_remove_buffer(void *data, struct pw_buffer *buffer)
{
pa_stream *s = data;
+ struct pa_mem *m = buffer->user_data;
s->maxsize -= buffer->buffer->datas[0].maxsize;
+ s->maxblock = INT_MAX;
+ if (m != NULL)
+ spa_list_append(&s->free, &m->link);
+ buffer->user_data = NULL;
}
static void update_timing_info(pa_stream *s)
@@ -449,8 +437,6 @@ static void update_timing_info(pa_stream *s)
pw_stream_get_time(s->stream, &pwt);
s->timing_info_valid = false;
- pw_log_debug("stream %p: %"PRIu64" rate:%d", s, pwt.queued, pwt.rate.denom);
-
pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC);
ti->synchronized_clocks = true;
ti->transport_usec = 0;
@@ -459,9 +445,12 @@ static void update_timing_info(pa_stream *s)
ti->read_index_corrupt = false;
if (pwt.rate.denom > 0) {
- if (s->ticks_base == -1)
+ if (s->ticks_base == (uint64_t)-1)
s->ticks_base = pwt.ticks + pwt.delay;
- index = ((pwt.ticks + pwt.delay - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
+ if (pwt.ticks > s->ticks_base)
+ index = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
+ else
+ index = 0;
delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
s->have_time = true;
} else {
@@ -479,25 +468,58 @@ static void update_timing_info(pa_stream *s)
ti->configured_source_usec = 0;
ti->since_underrun = 0;
s->timing_info_valid = true;
+
+ pw_log_trace("stream %p: %"PRIu64" rate:%d delay:%"PRIi64, s, pwt.queued, pwt.rate.denom, delay);
+
+}
+
+static void push_output(pa_stream *s)
+{
+ struct pa_mem *m, *t, *old;
+ struct pw_buffer *buf;
+
+ spa_list_for_each_safe(m, t, &s->ready, link) {
+ buf = pw_stream_dequeue_buffer(s->stream);
+ if (buf == NULL)
+ break;
+
+ if ((old = buf->user_data) != NULL)
+ spa_list_append(&s->free, &old->link);
+
+ spa_list_remove(&m->link);
+ s->ready_bytes -= m->size;
+
+ buf->buffer->datas[0].maxsize = m->maxsize;
+ buf->buffer->datas[0].data = m->data;
+ buf->buffer->datas[0].chunk->offset = m->offset;
+ buf->buffer->datas[0].chunk->size = m->size;
+ buf->user_data = m;
+
+ pw_stream_queue_buffer(s->stream, buf);
+ }
}
static void stream_process(void *data)
{
pa_stream *s = data;
- while (dequeue_buffer(s) == 0);
-
- pw_log_trace("stream %p: %"PRIu64, s, s->dequeued_size);
- if (s->dequeued_size <= 0)
- return;
+ pw_log_trace("stream %p:", s);
+ update_timing_info(s);
if (s->direction == PA_STREAM_PLAYBACK) {
- if (s->write_callback)
- s->write_callback(s, s->dequeued_size, s->write_userdata);
+ if (s->ready_bytes < s->buffer_attr.tlength)
+ s->requested_bytes = s->buffer_attr.tlength - s->ready_bytes;
+ else
+ s->requested_bytes = 0;
+
+ if (s->write_callback && s->requested_bytes)
+ s->write_callback(s, s->requested_bytes, s->write_userdata);
+
+ push_output(s);
}
else {
if (s->read_callback)
- s->read_callback(s, s->dequeued_size, s->read_userdata);
+ s->read_callback(s, s->requested_bytes, s->read_userdata);
}
}
@@ -562,7 +584,8 @@ static pa_stream* stream_new(pa_context *c, const char *name,
s->refcount = 1;
s->context = c;
- spa_list_init(&s->pending);
+ spa_list_init(&s->free);
+ spa_list_init(&s->ready);
s->direction = PA_STREAM_NODIRECTION;
s->state = PA_STREAM_UNCONNECTED;
@@ -609,12 +632,12 @@ static pa_stream* stream_new(pa_context *c, const char *name,
s->buffer_attr.minreq = (uint32_t) -1;
s->buffer_attr.prebuf = (uint32_t) -1;
s->buffer_attr.fragsize = (uint32_t) -1;
+ s->maxblock = INT_MAX;
+ s->requested_bytes = s->buffer_attr.tlength;
s->device_index = PA_INVALID_INDEX;
s->device_name = NULL;
- spa_ringbuffer_init(&s->dequeued_ring);
-
spa_list_append(&c->streams, &s->link);
pa_stream_ref(s);
@@ -1061,48 +1084,38 @@ int pa_stream_disconnect(pa_stream *s)
return 0;
}
-int peek_buffer(pa_stream *s)
+struct pa_mem *get_mem(pa_stream *s, size_t len)
{
- int32_t avail;
- uint32_t index;
-
- if (s->buffer != NULL)
- return 0;
+ struct pa_mem *m;
+ if (s->mem != NULL)
+ return s->mem;
- if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) < MIN_QUEUED)
- return -EPIPE;
+ if (spa_list_is_empty(&s->free)) {
+ if (len > s->maxblock)
+ len = s->maxblock;
+ m = calloc(1, sizeof(struct pa_mem) + len);
+ if (m == NULL)
+ return NULL;
- s->buffer = s->dequeued[index & MASK_BUFFERS];
- s->buffer_index = index;
- s->buffer_data = s->buffer->buffer->datas[0].data;
- if (s->direction == PA_STREAM_RECORD) {
- s->buffer_size = s->buffer->buffer->datas[0].chunk->size;
- s->buffer_offset = s->buffer->buffer->datas[0].chunk->offset;
+ m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void);
+ m->maxsize = len;
+ spa_list_append(&s->free, &m->link);
}
- else {
- s->buffer_size = s->buffer->buffer->datas[0].maxsize;
- }
- return 0;
+ m = spa_list_first(&s->free, struct pa_mem, link);
+ spa_list_remove(&m->link);
+ m->offset = 0;
+ m->size = 0;
+ return m;
}
-int queue_buffer(pa_stream *s)
+int release_mem(pa_stream *s)
{
- if (s->buffer == NULL)
- return 0;
-
- if (s->direction == PA_STREAM_PLAYBACK)
- s->dequeued_size -= s->buffer->buffer->datas[0].maxsize;
- else
- s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size;
- spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1);
-
- s->buffer->size = s->buffer->buffer->datas[0].chunk->size;
- pw_log_trace("%p %"PRIu64"/%d", s->buffer, s->buffer->size,
- s->buffer->buffer->datas[0].chunk->offset);
-
- pw_stream_queue_buffer(s->stream, s->buffer);
- s->buffer = NULL;
- s->buffer_offset = 0;
+ if (s->mem == NULL)
+ return -EINVAL;
+ spa_list_append(&s->ready, &s->mem->link);
+ s->ready_bytes += s->mem->size;
+ s->mem = NULL;
+ push_output(s);
return 0;
}
@@ -1112,7 +1125,7 @@ int pa_stream_begin_write(
void **data,
size_t *nbytes)
{
- int res;
+ size_t max;
spa_assert(s);
spa_assert(s->refcount >= 1);
@@ -1123,17 +1136,17 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
- pw_log_trace("peek buffer %p %zd %d %d", *data, *nbytes, s->buffer_size, s->buffer_offset);
-
- if ((res = peek_buffer(s)) < 0) {
+ s->mem = get_mem(s, *nbytes);
+ if (s->mem == NULL) {
*data = NULL;
*nbytes = 0;
- } else {
- size_t max = s->buffer_size - s->buffer_offset;
- *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void);
- *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max;
+ return -errno;
}
- pw_log_trace("peek buffer %p %zd %p %d", *data, *nbytes, s->buffer, res);
+ max = s->mem->maxsize - s->mem->size;
+ *data = SPA_MEMBER(s->mem->data, s->mem->offset, void);
+ *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max;
+
+ pw_log_trace("peek buffer %p %zd %p", *data, *nbytes, s->mem);
return 0;
}
@@ -1148,8 +1161,13 @@ int pa_stream_cancel_write(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK ||
s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
- pw_log_debug("cancel %p %p %d", s->buffer, s->buffer_data, s->buffer_size);
- s->buffer = NULL;
+ if (s->mem == NULL)
+ return 0;
+
+ pw_log_trace("cancel %p %p %zd", s->mem, s->mem->data, s->mem->size);
+
+ spa_list_prepend(&s->free, &s->mem->link);
+ s->mem = NULL;
return 0;
}
@@ -1185,54 +1203,50 @@ int pa_stream_write_ext_free(pa_stream *s,
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK ||
(seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context,
- !s->buffer ||
- ((data >= s->buffer_data) &&
- ((const char*) data + nbytes <= (const char*) s->buffer_data + s->buffer_size)),
+ s->mem == NULL ||
+ ((data >= s->mem->data) &&
+ ((const char*) data + nbytes <= (const char*) s->mem->data + s->mem->maxsize)),
PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID);
- if (s->buffer == NULL) {
+ pw_log_trace("stream %p: write %zd bytes", s, nbytes);
+
+ if (s->mem == NULL) {
void *dst;
const void *src = data;
size_t towrite = nbytes, dsize;
- pw_log_debug("stream %p: write %zd bytes", s, nbytes);
-
while (towrite > 0) {
dsize = towrite;
if (pa_stream_begin_write(s, &dst, &dsize) < 0 ||
dst == NULL || dsize == 0) {
- pw_log_debug("stream %p: out of buffers, wanted %zd bytes", s, nbytes);
+ pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes);
break;
}
memcpy(dst, src, dsize);
- s->buffer_offset += dsize;
+ s->mem->size += dsize;
+
+ if (s->mem->size >= s->mem->maxsize || towrite == dsize)
+ release_mem(s);
- if (s->buffer_offset >= s->buffer_size || towrite == dsize) {
- s->buffer->buffer->datas[0].chunk->offset = 0;
- s->buffer->buffer->datas[0].chunk->size = s->buffer_offset;
- queue_buffer(s);
- }
towrite -= dsize;
src = SPA_MEMBER(src, dsize, void);
}
if (free_cb)
free_cb(free_cb_data);
-
- s->buffer = NULL;
}
else {
- s->buffer->buffer->datas[0].chunk->offset = SPA_PTRDIFF(data, s->buffer_data);
- s->buffer->buffer->datas[0].chunk->size = nbytes;
- queue_buffer(s);
+ s->mem->offset = SPA_PTRDIFF(data, s->mem->data);
+ s->mem->size = nbytes;
+ release_mem(s);
}
s->timing_info.write_index += nbytes;
- pw_log_debug("stream %p: written %zd bytes", s, nbytes);
+ pw_log_trace("stream %p: written %zd bytes", s, nbytes);
return 0;
}
@@ -1250,15 +1264,20 @@ int pa_stream_peek(pa_stream *s,
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
- if (peek_buffer(s) < 0) {
+ if (s->buffer == NULL)
+ s->buffer = pw_stream_dequeue_buffer(s->stream);
+ if (s->buffer == NULL) {
+ pw_log_error("stream %p: no buffer: %m", s);
*data = NULL;
*nbytes = 0;
- pw_log_debug("stream %p: no buffer", s);
return 0;
}
- *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void);
- *nbytes = s->buffer_size;
- pw_log_trace("stream %p: %p %zd %f", s, *data, *nbytes, *(float*)*data);
+
+ *data = SPA_MEMBER(s->buffer->buffer->datas[0].data,
+ s->buffer->buffer->datas[0].chunk->offset, void);
+ *nbytes = s->buffer->buffer->datas[0].chunk->size;
+
+ pw_log_trace("stream %p: %p %zd", s, *data, *nbytes);
return 0;
}
@@ -1274,7 +1293,8 @@ int pa_stream_drop(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE);
pw_log_trace("stream %p", s);
- queue_buffer(s);
+ pw_stream_queue_buffer(s->stream, s->buffer);
+ s->buffer = NULL;
return 0;
}
@@ -1290,8 +1310,8 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s)
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD,
PA_ERR_BADSTATE, (size_t) -1);
- pw_log_trace("stream %p: %zd", s, s->dequeued_size);
- return s->dequeued_size;
+ pw_log_trace("stream %p: %zd", s, s->requested_bytes);
+ return s->requested_bytes;
}
SPA_EXPORT
@@ -1305,7 +1325,7 @@ size_t pa_stream_readable_size(PA_CONST pa_stream *s)
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD,
PA_ERR_BADSTATE, (size_t) -1);
- return s->dequeued_size;
+ return s->requested_bytes;
}
struct success_ack {
@@ -1657,7 +1677,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
{
pa_usec_t res;
struct timespec ts;
- uint64_t now, delay, read_time;
+ uint64_t now, delay, time;
pa_timing_info *i;
spa_assert(s);
@@ -1676,16 +1696,19 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
else
delay = 0;
- read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec);
+ if (s->direction == PA_STREAM_PLAYBACK)
+ time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec) + i->sink_usec;
+ else
+ time = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec) + i->source_usec;
- res = delay + read_time;
+ res = delay + time;
if (r_usec)
*r_usec = res;
- pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" read_time:%"PRIu64
+ pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" time:%"PRIu64
" write-index:%"PRIi64" read_index:%"PRIi64" diff:%"PRIi64" res:%"PRIu64,
- s, now, delay, read_time,
+ s, now, delay, time,
i->write_index, i->read_index,
i->write_index - i->read_index,
res);