diff options
author | Wim Taymans <wtaymans@redhat.com> | 2020-09-23 15:15:14 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2020-09-23 15:15:14 +0200 |
commit | 24d5d783568145e3d9803098b196db5ffe20ffcb (patch) | |
tree | da04b9be877095de346ab2c743743611aafb392a | |
parent | c6358c4ed5ae11b6b6409efae1c2e951d797ca3b (diff) |
pulse: use a ringbufferpulse-ring
-rw-r--r-- | pipewire-pulseaudio/src/internal.h | 6 | ||||
-rw-r--r-- | pipewire-pulseaudio/src/stream.c | 164 |
2 files changed, 111 insertions, 59 deletions
diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index 6b59edd2..e17c0530 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -473,12 +473,14 @@ struct pa_stream { struct spa_list ready; /* ready for playback */ size_t ready_bytes; - struct pw_buffer *buffer; /* currently reading for capture */ - uint32_t n_channel_volumes; float channel_volumes[SPA_AUDIO_MAX_CHANNELS]; bool mute; pa_operation *drain; + + struct spa_ringbuffer ring; + uint32_t ring_size; + void *ring_data; }; void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 13778223..905f6ad1 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -34,7 +34,7 @@ #include "internal.h" #define MIN_SAMPLES 8u -#define MIN_BUFFERS 8u +#define MIN_BUFFERS 2u #define MAX_BUFFERS 64u #define MAX_BUFFER_SAMPLES (8*1024u) @@ -219,21 +219,24 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag if (attr->tlength == (uint32_t) -1 || attr->tlength == 0) attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec); attr->tlength = SPA_MIN(attr->tlength, attr->maxlength); - attr->tlength = SPA_MAX(attr->tlength, MIN_SAMPLES * stride * MIN_BUFFERS); + attr->tlength -= attr->tlength % stride; if (attr->minreq == (uint32_t) -1 || attr->minreq == 0) attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS); + attr->minreq -= attr->minreq % stride; attr->minreq = SPA_MAX(attr->minreq, stride); if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS); + attr->fragsize -= attr->fragsize % stride; attr->fragsize = SPA_MAX(attr->fragsize, stride); if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0) attr->prebuf = attr->tlength - attr->minreq; attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq); + attr->prebuf -= attr->prebuf % stride; attr->prebuf = SPA_MAX(attr->prebuf, stride); dump_buffer_attr(s, attr); @@ -262,6 +265,8 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * patch_buffer_attr(s, &s->buffer_attr, NULL); + s->ring_size = MAX_SIZE - MAX_SIZE % pa_frame_size(&s->sample_spec); + params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b); pw_stream_update_params(s->stream, params, n_params); @@ -326,10 +331,11 @@ static void update_timing_info(pa_stream *s) ti->read_index_corrupt = false; if (pwt.rate.denom > 0) { + uint64_t ticks = pwt.ticks + pwt.delay; if (!s->have_time) - s->ticks_base = pwt.ticks + pwt.delay; - if (pwt.ticks > s->ticks_base) - pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + s->ticks_base = ticks; + if (ticks > s->ticks_base) + pos = ((ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; else pos = 0; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; @@ -350,9 +356,9 @@ static void update_timing_info(pa_stream *s) s->timing_info_valid = true; s->queued_bytes = pwt.queued + s->ready_bytes; - pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64 + pw_log_trace("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64" read:%"PRIu64 " write:%"PRIu64" queued:%"PRIi64, - s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay, + s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, pwt.delay, ti->read_index, ti->write_index, ti->read_index - ti->write_index); } @@ -436,21 +442,49 @@ static void pull_input(pa_stream *s) static void stream_process(void *data) { pa_stream *s = data; + struct pw_buffer *buf; pw_log_trace("stream %p:", s); update_timing_info(s); if (s->direction == PA_STREAM_PLAYBACK) { - uint64_t queued, writable, required; + uint32_t index, maxsize, len, filled, required; + int32_t avail; + + avail = spa_ringbuffer_get_write_index(&s->ring, &index); + filled = SPA_CLAMP(avail, 0, (int32_t)s->ring_size); + filled -= SPA_MIN(s->queued_bytes, filled); + required = s->buffer_attr.minreq; + pw_log_trace("avail:%d filled:%d required:%d", avail, filled, required); + + if (s->write_callback && s->state == PA_STREAM_READY && filled < required) + s->write_callback(s, required, s->write_userdata); + + buf = pw_stream_dequeue_buffer(s->stream); + if (buf == NULL) + return; + + avail = spa_ringbuffer_get_read_index(&s->ring, &index); + len = SPA_CLAMP(avail, 0, (int32_t)s->ring_size); - queue_output(s); + maxsize = buf->buffer->datas[0].maxsize; + len = SPA_MIN(len, maxsize); - queued = s->queued_bytes; - writable = s->maxsize - SPA_MIN(queued, s->maxsize); - required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); + if (len > 0) { + spa_ringbuffer_read_data(&s->ring, s->ring_data, s->ring_size, + index % s->ring_size, buf->buffer->datas[0].data, len); + spa_ringbuffer_read_update(&s->ring, index + len); + } else { + len = maxsize; + memset(buf->buffer->datas[0].data, 0, len); + s->timing_info.since_underrun = 0; + } - if (s->write_callback && s->state == PA_STREAM_READY && writable >= required) - s->write_callback(s, writable, s->write_userdata); + buf->buffer->datas[0].chunk->offset = 0; + buf->buffer->datas[0].chunk->size = len; + buf->size = len; + + pw_stream_queue_buffer(s->stream, buf); } else { pull_input(s); @@ -554,6 +588,10 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->device_index = PA_INVALID_INDEX; s->device_name = NULL; + s->ring_size = MAX_SIZE; + s->ring_data = calloc(1, s->ring_size); + spa_ringbuffer_init(&s->ring); + spa_list_append(&c->streams, &s->link); pa_stream_ref(s); @@ -561,6 +599,7 @@ static pa_stream* stream_new(pa_context *c, const char *name, } SPA_EXPORT + pa_stream* pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) { @@ -851,7 +890,8 @@ static int create_stream(pa_stream_direction_t direction, pa_stream_set_state(s, PA_STREAM_CREATING); fl = PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS; + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS; s->corked = SPA_FLAG_IS_SET(flags, PA_STREAM_START_CORKED); @@ -954,7 +994,7 @@ static int create_stream(pa_stream_direction_t direction, str = "Music"; stride = pa_frame_size(&s->sample_spec); - sprintf(latency, "%u/%u", s->buffer_attr.minreq / stride, s->sample_spec.rate); + sprintf(latency, "%u/%u", s->buffer_attr.tlength / stride, s->sample_spec.rate); n_items = 0; items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio"); @@ -1043,6 +1083,9 @@ int pa_stream_begin_write( void **data, size_t *nbytes) { + int32_t filled; + uint32_t len, index; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -1052,6 +1095,22 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); +#if 0 + free(s->write_buffer); + + if (*nbytes == (size_t)-1) + *nbytes = 8192; + + *nbytes -= *nbytes % pa_frame_size(&s->sample_spec); + + p->write_buffer = malloc(*nbytes); + if (!p->write_buffer) + return -1; + + *data = p->write_buffer; +#endif + +#if 0 if (s->mem == NULL) s->mem = alloc_mem(s, *nbytes); if (s->mem == NULL) { @@ -1062,8 +1121,15 @@ int pa_stream_begin_write( s->mem->offset = s->mem->size = 0; *data = s->mem->data; *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, s->mem->maxsize) : s->mem->maxsize; - - pw_log_trace("buffer %p %zd %p", *data, *nbytes, s->mem); +#else + filled = spa_ringbuffer_get_write_index(&s->ring, &index); + len = s->ring_size - SPA_CLAMP(filled, 0, (int32_t)s->ring_size); + len = SPA_MIN(len, s->buffer_attr.tlength); + index %= s->ring_size; + *data = SPA_MEMBER(s->ring_data, index, void); + *nbytes = SPA_MIN(len, s->ring_size - index); +#endif + pw_log_trace("buffer %p %zd len:%d index:%d", *data, *nbytes, len, index); return 0; } @@ -1078,14 +1144,7 @@ int pa_stream_cancel_write(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - if (s->mem == NULL) - return 0; - - pw_log_trace("cancel %p %p %zd", s->mem, s->mem->data, s->mem->size); - - spa_list_prepend(&s->free, &s->mem->link); - s->mem = NULL; - + pw_log_trace("cancel"); return 0; } @@ -1109,8 +1168,8 @@ int pa_stream_write_ext_free(pa_stream *s, int64_t offset, pa_seek_mode_t seek) { - const void *src = data; - size_t towrite; + uint32_t index; + int32_t avail; spa_assert(s); spa_assert(s->refcount >= 1); @@ -1129,41 +1188,27 @@ int pa_stream_write_ext_free(pa_stream *s, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); - PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); pw_log_trace("stream %p: write %zd bytes", s, nbytes); - towrite = nbytes; - while (towrite > 0) { - size_t dsize = towrite; - if (s->mem == NULL) { - void *dst; - if (pa_stream_begin_write(s, &dst, &dsize) < 0 || - dst == NULL || dsize == 0) { - pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes); - break; - } - memcpy(dst, src, dsize); - src = SPA_MEMBER(src, dsize, void); - } else { - s->mem->offset = SPA_PTRDIFF(src, s->mem->data); - } - towrite -= dsize; - s->mem->size = dsize; - if (s->mem->size >= s->mem->maxsize || towrite == 0) { - spa_list_append(&s->ready, &s->mem->link); - s->ready_bytes += s->mem->size; - s->mem = NULL; - queue_output(s); - } - } - if (free_cb) - free_cb(free_cb_data); + avail = spa_ringbuffer_get_write_index(&s->ring, &index); + avail = s->ring_size - SPA_CLAMP(avail, 0, (int32_t)s->ring_size); + if (nbytes > (size_t)avail) + nbytes = avail; + if (nbytes > (size_t)s->buffer_attr.tlength) + nbytes = s->buffer_attr.tlength; + + spa_ringbuffer_write_data(&s->ring, s->ring_data, s->ring_size, + index % s->ring_size, data, nbytes); + spa_ringbuffer_write_update(&s->ring, index + nbytes); s->timing_info.write_index += nbytes; s->timing_info.since_underrun += nbytes; pw_log_trace("stream %p: written %zd bytes", s, nbytes); + if (free_cb) + free_cb(free_cb_data); + return 0; } @@ -1237,6 +1282,8 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) const pa_timing_info *i; uint64_t now, then, queued, writable, elapsed, required; struct timespec ts; + uint32_t index; + int32_t avail; spa_assert(s); spa_assert(s->refcount >= 1); @@ -1260,8 +1307,10 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) queued = s->queued_bytes; queued -= SPA_MIN(queued, elapsed); - writable = s->maxsize - SPA_MIN(queued, s->maxsize); - required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); + avail = spa_ringbuffer_get_write_index((struct spa_ringbuffer*)&s->ring, &index); + writable = s->ring_size - SPA_CLAMP(avail, 0, (int32_t)s->ring_size); + writable -= SPA_MIN(queued, writable); + required = s->buffer_attr.minreq; pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s, writable, s->buffer_attr.minreq, s->maxblock); @@ -1572,6 +1621,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use s->ready_bytes = 0; s->queued_bytes = 0; s->timing_info.write_index = s->timing_info.read_index = 0; + spa_ringbuffer_init(&s->ring); s->have_time = false; pa_operation_sync(o); |