summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorg Chini <georg@chini.tk>2021-01-01 00:25:31 +0100
committerPulseAudio Marge Bot <pulseaudio-maintainers@lists.freedesktop.org>2021-11-03 18:37:31 +0000
commita275a0b8111071ac1937f0c0f3c1703e3831beeb (patch)
treeac85365ce974708f0b64569fcb18988a5087fb3f
parentda539ed33640ad7cf71bbe3bc6f136e0643bcc1e (diff)
sink-input: Change move logic
The introduction of the history queue makes it possible to implement moving of streams without involving the implementer. Instead of dropping all data from the render memblockq and requesting the implementer to rewrite the data, the render memblockq is now reconstructed from the history queue. Additionally, the render queue will be filled with silence matching the amount of audio that is left playing on the old sink to avoid playing the same audio twice. This patch slightly breaks moving for virtual sinks because they do not yet include the resampler delay in their latency reports. This will be fixed in a different patch set. Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/120>
-rw-r--r--src/pulsecore/sink-input.c111
-rw-r--r--src/pulsecore/sink-input.h11
-rw-r--r--src/pulsecore/sink.c106
-rw-r--r--src/pulsecore/sink.h5
4 files changed, 177 insertions, 56 deletions
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index 81d16bf0c..4b9740c6d 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -624,6 +624,11 @@ int pa_sink_input_new(
i->thread_info.underrun_for_sink = 0;
i->thread_info.playing_for = 0;
i->thread_info.direct_outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
+ i->thread_info.move_start_time = 0;
+ i->thread_info.resampler_delay_frames = 0;
+ i->thread_info.origin_sink_latency = 0;
+ i->thread_info.dont_rewrite = false;
+ i->origin_rewind_bytes = 0;
pa_assert_se(pa_idxset_put(core->sink_inputs, i, &i->index) == 0);
pa_assert_se(pa_idxset_put(i->sink->inputs, pa_sink_input_ref(i), NULL) == 0);
@@ -1201,6 +1206,9 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
pa_memblockq_rewind(i->thread_info.history_memblockq, sink_input_nbytes);
}
+ if (i->thread_info.dont_rewrite)
+ goto finish;
+
if (i->thread_info.rewrite_nbytes == (size_t) -1) {
/* We were asked to drop all buffered data, and rerequest new
@@ -1274,10 +1282,12 @@ void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sam
}
}
+finish:
if (!called)
if (i->process_rewind)
i->process_rewind(i, 0);
+ i->thread_info.dont_rewrite = false;
i->thread_info.rewrite_nbytes = 0;
i->thread_info.rewrite_flush = false;
i->thread_info.dont_rewind_render = false;
@@ -1895,6 +1905,11 @@ int pa_sink_input_start_move(pa_sink_input *i) {
pa_cvolume_remap(&i->volume_factor_sink, &i->sink->channel_map, &i->channel_map);
+ /* Calculate how much of the latency was rewound on the old sink */
+ i->origin_rewind_bytes = pa_sink_get_last_rewind(i->sink) / pa_frame_size(&i->sink->sample_spec);
+ i->origin_rewind_bytes = i->origin_rewind_bytes * i->sample_spec.rate / i->sink->sample_spec.rate;
+ i->origin_rewind_bytes *= pa_frame_size(&i->sample_spec);
+
i->sink = NULL;
i->sink_requested_by_application = false;
@@ -2045,6 +2060,89 @@ static void set_preferred_sink(pa_sink_input *i, const char *sink_name) {
pa_hook_fire(&i->core->hooks[PA_CORE_HOOK_SINK_INPUT_PREFERRED_SINK_CHANGED], i);
}
+/* Restores the render memblockq from the history memblockq during a move.
+ * Called from main context while the sink input is detached. */
+static void restore_render_memblockq(pa_sink_input *i) {
+ size_t block_size, to_push;
+ size_t latency_bytes = 0;
+ size_t bytes_on_origin_sink = 0;
+ size_t resampler_delay_bytes = 0;
+
+ /* Calculate how much of the latency was left on the old sink */
+ latency_bytes = pa_usec_to_bytes(i->thread_info.origin_sink_latency, &i->sample_spec);
+ if (latency_bytes > i->origin_rewind_bytes)
+ bytes_on_origin_sink = latency_bytes - i->origin_rewind_bytes;
+
+ /* Get resampler latency of old resampler */
+ resampler_delay_bytes = i->thread_info.resampler_delay_frames * pa_frame_size(&i->sample_spec);
+
+ /* Flush the render memblockq and reset the resampler */
+ pa_memblockq_flush_write(i->thread_info.render_memblockq, true);
+ if (i->thread_info.resampler)
+ pa_resampler_reset(i->thread_info.resampler);
+
+ /* Rewind the history queue */
+ if (i->origin_rewind_bytes + resampler_delay_bytes > 0)
+ pa_memblockq_rewind(i->thread_info.history_memblockq, i->origin_rewind_bytes + resampler_delay_bytes);
+
+ /* If something is left playing on the origin sink, add silence to the render memblockq */
+ if (bytes_on_origin_sink > 0) {
+ pa_memchunk chunk;;
+
+ chunk.length = pa_resampler_result(i->thread_info.resampler, bytes_on_origin_sink);
+ if (chunk.length > 0) {
+ chunk.memblock = pa_memblock_new(i->core->mempool, chunk.length);
+ chunk.index = 0;
+ pa_silence_memchunk(&chunk, &i->sink->sample_spec);
+ pa_memblockq_push(i->thread_info.render_memblockq, &chunk);
+ pa_memblock_unref(chunk.memblock);
+ }
+ }
+
+ /* Determine maximum block size */
+ if (i->thread_info.resampler)
+ block_size = pa_resampler_max_block_size(i->thread_info.resampler);
+ else
+ block_size = pa_frame_align(pa_mempool_block_size_max(i->core->mempool), &i->sample_spec);
+
+ /* Now push all the data in the history queue into the render memblockq */
+ to_push = pa_memblockq_get_length(i->thread_info.history_memblockq);
+ while (to_push > 0) {
+ pa_memchunk in_chunk, out_chunk;
+ size_t push_bytes;
+
+ push_bytes = block_size;
+ if (to_push < block_size)
+ push_bytes = to_push;
+
+ if (pa_memblockq_peek_fixed_size(i->thread_info.history_memblockq, push_bytes, &in_chunk) < 0) {
+ pa_log_warn("Could not restore memblockq during move");
+ break;
+ }
+
+ if (i->thread_info.resampler) {
+ pa_resampler_run(i->thread_info.resampler, &in_chunk, &out_chunk);
+ pa_memblock_unref(in_chunk.memblock);
+ } else
+ out_chunk = in_chunk;
+
+ if (out_chunk.length > 0) {
+ pa_memblockq_push(i->thread_info.render_memblockq, &out_chunk);
+ pa_memblock_unref(out_chunk.memblock);
+ }
+
+ pa_memblockq_drop(i->thread_info.history_memblockq, push_bytes);
+ to_push -= push_bytes;
+ }
+
+ /* No need to rewind the history queue here, it will be re-synchronized
+ * with the render queue during the next pa_sink_input_drop() call. */
+
+ /* Tell the sink input not to ask the implementer to rewrite during the
+ * the next rewind */
+ i->thread_info.dont_rewrite = true;
+}
+
/* Called from main context */
int pa_sink_input_finish_move(pa_sink_input *i, pa_sink *dest, bool save) {
struct volume_factor_entry *v;
@@ -2103,7 +2201,9 @@ int pa_sink_input_finish_move(pa_sink_input *i, pa_sink *dest, bool save) {
if (i->state == PA_SINK_INPUT_CORKED)
i->sink->n_corked++;
- pa_sink_input_update_resampler(i);
+ pa_sink_input_update_resampler(i, false);
+
+ restore_render_memblockq(i);
pa_sink_update_status(dest);
@@ -2114,6 +2214,9 @@ int pa_sink_input_finish_move(pa_sink_input *i, pa_sink *dest, bool save) {
pa_assert_se(pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_FINISH_MOVE, i, 0, NULL) == 0);
+ /* Reset move variable */
+ i->origin_rewind_bytes = 0;
+
pa_log_debug("Successfully moved sink input %i to %s.", i->index, dest->name);
/* Notify everyone */
@@ -2448,7 +2551,7 @@ finish:
/* Called from main context */
/* Updates the sink input's resampler with whatever the current sink requires
* -- useful when the underlying sink's sample spec might have changed */
-int pa_sink_input_update_resampler(pa_sink_input *i) {
+int pa_sink_input_update_resampler(pa_sink_input *i, bool flush_history) {
pa_resampler *new_resampler;
char *memblockq_name;
@@ -2485,6 +2588,9 @@ int pa_sink_input_update_resampler(pa_sink_input *i) {
} else
new_resampler = NULL;
+ if (flush_history)
+ pa_memblockq_flush_write(i->thread_info.history_memblockq, true);
+
if (new_resampler == i->thread_info.resampler)
return 0;
@@ -2494,7 +2600,6 @@ int pa_sink_input_update_resampler(pa_sink_input *i) {
i->thread_info.resampler = new_resampler;
pa_memblockq_free(i->thread_info.render_memblockq);
- pa_memblockq_flush_write(i->thread_info.history_memblockq, true);
memblockq_name = pa_sprintf_malloc("sink input render_memblockq [%u]", i->index);
i->thread_info.render_memblockq = pa_memblockq_new(
diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h
index fbb304bfe..654539937 100644
--- a/src/pulsecore/sink-input.h
+++ b/src/pulsecore/sink-input.h
@@ -231,6 +231,9 @@ struct pa_sink_input {
* mute status changes. Called from main context */
void (*mute_changed)(pa_sink_input *i); /* may be NULL */
+ /* Used to store the rewind amount of the origin sink during a move */
+ size_t origin_rewind_bytes; /* In sink input sample spec */
+
struct {
pa_sink_input_state_t state;
@@ -261,6 +264,12 @@ struct pa_sink_input {
/* The requested latency for the sink */
pa_usec_t requested_sink_latency;
+ /* Variables used during move */
+ pa_usec_t move_start_time;
+ pa_usec_t origin_sink_latency;
+ size_t resampler_delay_frames;
+ bool dont_rewrite;
+
pa_hashmap *direct_outputs;
} thread_info;
@@ -365,7 +374,7 @@ void pa_sink_input_request_rewind(pa_sink_input *i, size_t nbytes, bool rewrite,
void pa_sink_input_cork(pa_sink_input *i, bool b);
int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate);
-int pa_sink_input_update_resampler(pa_sink_input *i);
+int pa_sink_input_update_resampler(pa_sink_input *i, bool flush_history);
/* This returns the sink's fields converted into out sample type */
size_t pa_sink_input_get_max_rewind(pa_sink_input *i);
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 905e1db7b..01a94eda0 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -40,6 +40,7 @@
#include <pulsecore/namereg.h>
#include <pulsecore/core-util.h>
#include <pulsecore/sample-util.h>
+#include <pulsecore/stream-util.h>
#include <pulsecore/mix.h>
#include <pulsecore/core-subscribe.h>
#include <pulsecore/log.h>
@@ -338,6 +339,7 @@ pa_sink* pa_sink_new(
s->thread_info.soft_muted = s->muted;
s->thread_info.state = s->state;
s->thread_info.rewind_nbytes = 0;
+ s->thread_info.last_rewind_nbytes = 0;
s->thread_info.rewind_requested = false;
s->thread_info.max_rewind = 0;
s->thread_info.max_request = 0;
@@ -1073,6 +1075,9 @@ void pa_sink_process_rewind(pa_sink *s, size_t nbytes) {
pa_sink_volume_change_rewind(s, nbytes);
}
+ /* Save rewind value */
+ s->thread_info.last_rewind_nbytes = nbytes;
+
PA_HASHMAP_FOREACH(i, s->thread_info.inputs, state) {
pa_sink_input_assert_ref(i);
pa_sink_input_process_rewind(i, nbytes);
@@ -1557,13 +1562,26 @@ void pa_sink_reconfigure(pa_sink *s, pa_sample_spec *spec, bool passthrough) {
PA_IDXSET_FOREACH(i, s->inputs, idx) {
if (i->state == PA_SINK_INPUT_CORKED)
- pa_sink_input_update_resampler(i);
+ pa_sink_input_update_resampler(i, true);
}
pa_sink_suspend(s, false, PA_SUSPEND_INTERNAL);
}
/* Called from main thread */
+size_t pa_sink_get_last_rewind(pa_sink *s) {
+ size_t rewind_bytes;
+
+ pa_sink_assert_ref(s);
+ pa_assert_ctl_context();
+ pa_assert(PA_SINK_IS_LINKED(s->state));
+
+ pa_assert_se(pa_asyncmsgq_send(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_GET_LAST_REWIND, &rewind_bytes, 0, NULL) == 0);
+
+ return rewind_bytes;
+}
+
+/* Called from main thread */
pa_usec_t pa_sink_get_latency(pa_sink *s) {
int64_t usec = 0;
@@ -2669,59 +2687,21 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
pa_assert(!i->thread_info.sync_prev);
if (i->thread_info.state != PA_SINK_INPUT_CORKED) {
- pa_usec_t usec = 0;
- size_t sink_nbytes, total_nbytes;
/* The old sink probably has some audio from this
* stream in its buffer. We want to "take it back" as
* much as possible and play it to the new sink. We
* don't know at this point how much the old sink can
- * rewind. We have to pick something, and that
- * something is the full latency of the old sink here.
- * So we rewind the stream buffer by the sink latency
- * amount, which may be more than what we should
- * rewind. This can result in a chunk of audio being
- * played both to the old sink and the new sink.
- *
- * FIXME: Fix this code so that we don't have to make
- * guesses about how much the sink will actually be
- * able to rewind. If someone comes up with a solution
- * for this, something to note is that the part of the
- * latency that the old sink couldn't rewind should
- * ideally be compensated after the stream has moved
- * to the new sink by adding silence. The new sink
- * most likely can't start playing the moved stream
- * immediately, and that gap should be removed from
- * the "compensation silence" (at least at the time of
- * writing this, the move finish code will actually
- * already take care of dropping the new sink's
- * unrewindable latency, so taking into account the
- * unrewindable latency of the old sink is the only
- * problem).
- *
- * The render_memblockq contents are discarded,
- * because when the sink changes, the format of the
- * audio stored in the render_memblockq may change
- * too, making the stored audio invalid. FIXME:
- * However, the read and write indices are moved back
- * the same amount, so if they are not the same now,
- * they won't be the same after the rewind either. If
- * the write index of the render_memblockq is ahead of
- * the read index, then the render_memblockq will feed
- * the new sink some silence first, which it shouldn't
- * do. The write index should be flushed to be the
- * same as the read index. */
-
- /* Get the latency of the sink */
- usec = pa_sink_get_latency_within_thread(s, false);
- sink_nbytes = pa_usec_to_bytes(usec, &s->sample_spec);
- total_nbytes = sink_nbytes + pa_memblockq_get_length(i->thread_info.render_memblockq);
-
- if (total_nbytes > 0) {
- i->thread_info.rewrite_nbytes = i->thread_info.resampler ? pa_resampler_request(i->thread_info.resampler, total_nbytes) : total_nbytes;
- i->thread_info.rewrite_flush = true;
- pa_sink_input_process_rewind(i, sink_nbytes);
- }
+ * rewind, so we just save some values and reconstruct
+ * the render memblockq in finish_move(). */
+
+ /* Save some current values for restore_render_memblockq() */
+ i->thread_info.origin_sink_latency = pa_sink_get_latency_within_thread(s, false);
+ i->thread_info.move_start_time = pa_rtclock_now();
+ i->thread_info.resampler_delay_frames = 0;
+ if (i->thread_info.resampler)
+ /* Round down */
+ i->thread_info.resampler_delay_frames = pa_resampler_get_delay(i->thread_info.resampler, false);
}
pa_sink_input_detach(i);
@@ -2754,7 +2734,7 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
if (i->thread_info.state != PA_SINK_INPUT_CORKED) {
pa_usec_t usec = 0;
- size_t nbytes;
+ size_t nbytes, delay_bytes;
/* In the ideal case the new sink would start playing
* the stream immediately. That requires the sink to
@@ -2778,8 +2758,20 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
usec = pa_sink_get_latency_within_thread(s, false);
nbytes = pa_usec_to_bytes(usec, &s->sample_spec);
- if (nbytes > 0)
- pa_sink_input_drop(i, nbytes);
+ /* Calculate number of samples that have been played during the move */
+ delay_bytes = 0;
+ if (i->thread_info.move_start_time > 0) {
+ usec = pa_rtclock_now() - i->thread_info.move_start_time;
+ pa_log_debug("Move took %lu usec", usec);
+ delay_bytes = pa_usec_to_bytes(usec, &s->sample_spec);
+ }
+
+ /* max_rewind must be updated for the sink input because otherwise
+ * the data in the render memblockq will get lost */
+ pa_sink_input_update_max_rewind(i, nbytes);
+
+ if (nbytes + delay_bytes > 0)
+ pa_sink_input_drop(i, nbytes + delay_bytes);
pa_log_debug("Requesting rewind due to finished move");
pa_sink_request_rewind(s, nbytes);
@@ -2796,6 +2788,11 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
pa_sink_input_update_max_rewind(i, s->thread_info.max_rewind);
pa_sink_input_update_max_request(i, s->thread_info.max_request);
+ /* Reset move variables */
+ i->thread_info.move_start_time = 0;
+ i->thread_info.resampler_delay_frames = 0;
+ i->thread_info.origin_sink_latency = 0;
+
return o->process_msg(o, PA_SINK_MESSAGE_SET_SHARED_VOLUME, NULL, 0, NULL);
}
@@ -2942,6 +2939,11 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
*((size_t*) userdata) = s->thread_info.max_rewind;
return 0;
+ case PA_SINK_MESSAGE_GET_LAST_REWIND:
+
+ *((size_t*) userdata) = s->thread_info.last_rewind_nbytes;
+ return 0;
+
case PA_SINK_MESSAGE_GET_MAX_REQUEST:
*((size_t*) userdata) = s->thread_info.max_request;
diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h
index 87bfddd0b..9492b18dc 100644
--- a/src/pulsecore/sink.h
+++ b/src/pulsecore/sink.h
@@ -299,6 +299,9 @@ struct pa_sink {
size_t rewind_nbytes;
bool rewind_requested;
+ /* Size of last rewind */
+ size_t last_rewind_nbytes;
+
/* Both dynamic and fixed latencies will be clamped to this
* range. */
pa_usec_t min_latency; /* we won't go below this latency */
@@ -359,6 +362,7 @@ typedef enum pa_sink_message {
PA_SINK_MESSAGE_SET_MAX_REQUEST,
PA_SINK_MESSAGE_UPDATE_VOLUME_AND_MUTE,
PA_SINK_MESSAGE_SET_PORT_LATENCY_OFFSET,
+ PA_SINK_MESSAGE_GET_LAST_REWIND,
PA_SINK_MESSAGE_MAX
} pa_sink_message_t;
@@ -456,6 +460,7 @@ void pa_sink_get_latency_range(pa_sink *s, pa_usec_t *min_latency, pa_usec_t *ma
pa_usec_t pa_sink_get_fixed_latency(pa_sink *s);
size_t pa_sink_get_max_rewind(pa_sink *s);
+size_t pa_sink_get_last_rewind(pa_sink *s);
size_t pa_sink_get_max_request(pa_sink *s);
int pa_sink_update_status(pa_sink*s);