summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-09-23 15:15:14 +0200
committerWim Taymans <wtaymans@redhat.com>2020-09-23 15:15:14 +0200
commit24d5d783568145e3d9803098b196db5ffe20ffcb (patch)
treeda04b9be877095de346ab2c743743611aafb392a
parentc6358c4ed5ae11b6b6409efae1c2e951d797ca3b (diff)
pulse: use a ringbufferpulse-ring
-rw-r--r--pipewire-pulseaudio/src/internal.h6
-rw-r--r--pipewire-pulseaudio/src/stream.c164
2 files changed, 111 insertions, 59 deletions
diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h
index 6b59edd2..e17c0530 100644
--- a/pipewire-pulseaudio/src/internal.h
+++ b/pipewire-pulseaudio/src/internal.h
@@ -473,12 +473,14 @@ struct pa_stream {
struct spa_list ready; /* ready for playback */
size_t ready_bytes;
- struct pw_buffer *buffer; /* currently reading for capture */
-
uint32_t n_channel_volumes;
float channel_volumes[SPA_AUDIO_MAX_CHANNELS];
bool mute;
pa_operation *drain;
+
+ struct spa_ringbuffer ring;
+ uint32_t ring_size;
+ void *ring_data;
};
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st);
diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c
index 13778223..905f6ad1 100644
--- a/pipewire-pulseaudio/src/stream.c
+++ b/pipewire-pulseaudio/src/stream.c
@@ -34,7 +34,7 @@
#include "internal.h"
#define MIN_SAMPLES 8u
-#define MIN_BUFFERS 8u
+#define MIN_BUFFERS 2u
#define MAX_BUFFERS 64u
#define MAX_BUFFER_SAMPLES (8*1024u)
@@ -219,21 +219,24 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag
if (attr->tlength == (uint32_t) -1 || attr->tlength == 0)
attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec);
attr->tlength = SPA_MIN(attr->tlength, attr->maxlength);
- attr->tlength = SPA_MAX(attr->tlength, MIN_SAMPLES * stride * MIN_BUFFERS);
+ attr->tlength -= attr->tlength % stride;
if (attr->minreq == (uint32_t) -1 || attr->minreq == 0)
attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec);
attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS);
+ attr->minreq -= attr->minreq % stride;
attr->minreq = SPA_MAX(attr->minreq, stride);
if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec);
attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS);
+ attr->fragsize -= attr->fragsize % stride;
attr->fragsize = SPA_MAX(attr->fragsize, stride);
if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0)
attr->prebuf = attr->tlength - attr->minreq;
attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq);
+ attr->prebuf -= attr->prebuf % stride;
attr->prebuf = SPA_MAX(attr->prebuf, stride);
dump_buffer_attr(s, attr);
@@ -262,6 +265,8 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
patch_buffer_attr(s, &s->buffer_attr, NULL);
+ s->ring_size = MAX_SIZE - MAX_SIZE % pa_frame_size(&s->sample_spec);
+
params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b);
pw_stream_update_params(s->stream, params, n_params);
@@ -326,10 +331,11 @@ static void update_timing_info(pa_stream *s)
ti->read_index_corrupt = false;
if (pwt.rate.denom > 0) {
+ uint64_t ticks = pwt.ticks + pwt.delay;
if (!s->have_time)
- s->ticks_base = pwt.ticks + pwt.delay;
- if (pwt.ticks > s->ticks_base)
- pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
+ s->ticks_base = ticks;
+ if (ticks > s->ticks_base)
+ pos = ((ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride;
else
pos = 0;
delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
@@ -350,9 +356,9 @@ static void update_timing_info(pa_stream *s)
s->timing_info_valid = true;
s->queued_bytes = pwt.queued + s->ready_bytes;
- pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64
+ pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64" read:%"PRIu64
" write:%"PRIu64" queued:%"PRIi64,
- s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay,
+ s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, pwt.delay,
ti->read_index, ti->write_index, ti->read_index - ti->write_index);
}
@@ -436,21 +442,49 @@ static void pull_input(pa_stream *s)
static void stream_process(void *data)
{
pa_stream *s = data;
+ struct pw_buffer *buf;
pw_log_trace("stream %p:", s);
update_timing_info(s);
if (s->direction == PA_STREAM_PLAYBACK) {
- uint64_t queued, writable, required;
+ uint32_t index, maxsize, len, filled, required;
+ int32_t avail;
+
+ avail = spa_ringbuffer_get_write_index(&s->ring, &index);
+ filled = SPA_CLAMP(avail, 0, (int32_t)s->ring_size);
+ filled -= SPA_MIN(s->queued_bytes, filled);
+ required = s->buffer_attr.minreq;
+ pw_log_trace("avail:%d filled:%d required:%d", avail, filled, required);
+
+ if (s->write_callback && s->state == PA_STREAM_READY && filled < required)
+ s->write_callback(s, required, s->write_userdata);
+
+ buf = pw_stream_dequeue_buffer(s->stream);
+ if (buf == NULL)
+ return;
+
+ avail = spa_ringbuffer_get_read_index(&s->ring, &index);
+ len = SPA_CLAMP(avail, 0, (int32_t)s->ring_size);
- queue_output(s);
+ maxsize = buf->buffer->datas[0].maxsize;
+ len = SPA_MIN(len, maxsize);
- queued = s->queued_bytes;
- writable = s->maxsize - SPA_MIN(queued, s->maxsize);
- required = SPA_MIN(s->maxblock, s->buffer_attr.minreq);
+ if (len > 0) {
+ spa_ringbuffer_read_data(&s->ring, s->ring_data, s->ring_size,
+ index % s->ring_size, buf->buffer->datas[0].data, len);
+ spa_ringbuffer_read_update(&s->ring, index + len);
+ } else {
+ len = maxsize;
+ memset(buf->buffer->datas[0].data, 0, len);
+ s->timing_info.since_underrun = 0;
+ }
- if (s->write_callback && s->state == PA_STREAM_READY && writable >= required)
- s->write_callback(s, writable, s->write_userdata);
+ buf->buffer->datas[0].chunk->offset = 0;
+ buf->buffer->datas[0].chunk->size = len;
+ buf->size = len;
+
+ pw_stream_queue_buffer(s->stream, buf);
}
else {
pull_input(s);
@@ -554,6 +588,10 @@ static pa_stream* stream_new(pa_context *c, const char *name,
s->device_index = PA_INVALID_INDEX;
s->device_name = NULL;
+ s->ring_size = MAX_SIZE;
+ s->ring_data = calloc(1, s->ring_size);
+ spa_ringbuffer_init(&s->ring);
+
spa_list_append(&c->streams, &s->link);
pa_stream_ref(s);
@@ -561,6 +599,7 @@ static pa_stream* stream_new(pa_context *c, const char *name,
}
SPA_EXPORT
+
pa_stream* pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss,
const pa_channel_map *map)
{
@@ -851,7 +890,8 @@ static int create_stream(pa_stream_direction_t direction,
pa_stream_set_state(s, PA_STREAM_CREATING);
fl = PW_STREAM_FLAG_AUTOCONNECT |
- PW_STREAM_FLAG_MAP_BUFFERS;
+ PW_STREAM_FLAG_MAP_BUFFERS |
+ PW_STREAM_FLAG_RT_PROCESS;
s->corked = SPA_FLAG_IS_SET(flags, PA_STREAM_START_CORKED);
@@ -954,7 +994,7 @@ static int create_stream(pa_stream_direction_t direction,
str = "Music";
stride = pa_frame_size(&s->sample_spec);
- sprintf(latency, "%u/%u", s->buffer_attr.minreq / stride, s->sample_spec.rate);
+ sprintf(latency, "%u/%u", s->buffer_attr.tlength / stride, s->sample_spec.rate);
n_items = 0;
items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio");
@@ -1043,6 +1083,9 @@ int pa_stream_begin_write(
void **data,
size_t *nbytes)
{
+ int32_t filled;
+ uint32_t len, index;
+
spa_assert(s);
spa_assert(s->refcount >= 1);
@@ -1052,6 +1095,22 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
+#if 0
+ free(s->write_buffer);
+
+ if (*nbytes == (size_t)-1)
+ *nbytes = 8192;
+
+ *nbytes -= *nbytes % pa_frame_size(&s->sample_spec);
+
+ p->write_buffer = malloc(*nbytes);
+ if (!p->write_buffer)
+ return -1;
+
+ *data = p->write_buffer;
+#endif
+
+#if 0
if (s->mem == NULL)
s->mem = alloc_mem(s, *nbytes);
if (s->mem == NULL) {
@@ -1062,8 +1121,15 @@ int pa_stream_begin_write(
s->mem->offset = s->mem->size = 0;
*data = s->mem->data;
*nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, s->mem->maxsize) : s->mem->maxsize;
-
- pw_log_trace("buffer %p %zd %p", *data, *nbytes, s->mem);
+#else
+ filled = spa_ringbuffer_get_write_index(&s->ring, &index);
+ len = s->ring_size - SPA_CLAMP(filled, 0, (int32_t)s->ring_size);
+ len = SPA_MIN(len, s->buffer_attr.tlength);
+ index %= s->ring_size;
+ *data = SPA_MEMBER(s->ring_data, index, void);
+ *nbytes = SPA_MIN(len, s->ring_size - index);
+#endif
+ pw_log_trace("buffer %p %zd len:%d index:%d", *data, *nbytes, len, index);
return 0;
}
@@ -1078,14 +1144,7 @@ int pa_stream_cancel_write(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK ||
s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
- if (s->mem == NULL)
- return 0;
-
- pw_log_trace("cancel %p %p %zd", s->mem, s->mem->data, s->mem->size);
-
- spa_list_prepend(&s->free, &s->mem->link);
- s->mem = NULL;
-
+ pw_log_trace("cancel");
return 0;
}
@@ -1109,8 +1168,8 @@ int pa_stream_write_ext_free(pa_stream *s,
int64_t offset,
pa_seek_mode_t seek)
{
- const void *src = data;
- size_t towrite;
+ uint32_t index;
+ int32_t avail;
spa_assert(s);
spa_assert(s->refcount >= 1);
@@ -1129,41 +1188,27 @@ int pa_stream_write_ext_free(pa_stream *s,
PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
- PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID);
pw_log_trace("stream %p: write %zd bytes", s, nbytes);
- towrite = nbytes;
- while (towrite > 0) {
- size_t dsize = towrite;
- if (s->mem == NULL) {
- void *dst;
- if (pa_stream_begin_write(s, &dst, &dsize) < 0 ||
- dst == NULL || dsize == 0) {
- pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes);
- break;
- }
- memcpy(dst, src, dsize);
- src = SPA_MEMBER(src, dsize, void);
- } else {
- s->mem->offset = SPA_PTRDIFF(src, s->mem->data);
- }
- towrite -= dsize;
- s->mem->size = dsize;
- if (s->mem->size >= s->mem->maxsize || towrite == 0) {
- spa_list_append(&s->ready, &s->mem->link);
- s->ready_bytes += s->mem->size;
- s->mem = NULL;
- queue_output(s);
- }
- }
- if (free_cb)
- free_cb(free_cb_data);
+ avail = spa_ringbuffer_get_write_index(&s->ring, &index);
+ avail = s->ring_size - SPA_CLAMP(avail, 0, (int32_t)s->ring_size);
+ if (nbytes > (size_t)avail)
+ nbytes = avail;
+ if (nbytes > (size_t)s->buffer_attr.tlength)
+ nbytes = s->buffer_attr.tlength;
+
+ spa_ringbuffer_write_data(&s->ring, s->ring_data, s->ring_size,
+ index % s->ring_size, data, nbytes);
+ spa_ringbuffer_write_update(&s->ring, index + nbytes);
s->timing_info.write_index += nbytes;
s->timing_info.since_underrun += nbytes;
pw_log_trace("stream %p: written %zd bytes", s, nbytes);
+ if (free_cb)
+ free_cb(free_cb_data);
+
return 0;
}
@@ -1237,6 +1282,8 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s)
const pa_timing_info *i;
uint64_t now, then, queued, writable, elapsed, required;
struct timespec ts;
+ uint32_t index;
+ int32_t avail;
spa_assert(s);
spa_assert(s->refcount >= 1);
@@ -1260,8 +1307,10 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s)
queued = s->queued_bytes;
queued -= SPA_MIN(queued, elapsed);
- writable = s->maxsize - SPA_MIN(queued, s->maxsize);
- required = SPA_MIN(s->maxblock, s->buffer_attr.minreq);
+ avail = spa_ringbuffer_get_write_index((struct spa_ringbuffer*)&s->ring, &index);
+ writable = s->ring_size - SPA_CLAMP(avail, 0, (int32_t)s->ring_size);
+ writable -= SPA_MIN(queued, writable);
+ required = s->buffer_attr.minreq;
pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s,
writable, s->buffer_attr.minreq, s->maxblock);
@@ -1572,6 +1621,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
s->ready_bytes = 0;
s->queued_bytes = 0;
s->timing_info.write_index = s->timing_info.read_index = 0;
+ spa_ringbuffer_init(&s->ring);
s->have_time = false;
pa_operation_sync(o);