diff options
author | Wim Taymans <wtaymans@redhat.com> | 2014-04-02 15:17:11 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2014-04-02 15:28:13 +0200 |
commit | 429a8cbc201b002a8e3dfa9f0fcde957119690a1 (patch) | |
tree | 7deb241c46d310b2b92bc6a3767d0c8e59791790 | |
parent | dfcb4f3a622c3399dc846f281537fb8bec481c38 (diff) |
combine: add support for DYNAMIC_LATENCYrhel-6.6-dynlat
Mark the sink as DYNAMIC_LATENCY and implement update_sink_latency_range
on its sink-input to collect the combined latency range of all sinks.
Let each input-sink notify us when their requested latency changes and
use this value to configure a final latency when we don't have any
upstream requests.
Implement update_requested_latency on the sink to configure the final
latency by combining the sink-input requested latencies and our
output latencies. Configure this final latency on all out outputs.
This makes us honour the client latency request.
We don't need to call update_max_request() and update_fixed_latency()
when adding and removing outputs, this is already done when we attach
the sink-inputs.
Also add more debug log.
-rw-r--r-- | src/modules/module-combine.c | 162 |
1 files changed, 139 insertions, 23 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index cffb901b6..0388d7b98 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -106,6 +106,8 @@ struct output { /* For coomunication of the stream parameters to the sink thread */ pa_atomic_t max_request; pa_atomic_t requested_latency; + pa_atomic_t max_latency; + pa_atomic_t min_latency; PA_LLIST_FIELDS(struct output); }; @@ -124,6 +126,7 @@ struct userdata { pa_bool_t automatic; pa_bool_t auto_desc; + pa_usec_t requested_latency; pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot; @@ -149,11 +152,13 @@ enum { SINK_MESSAGE_NEED, SINK_MESSAGE_UPDATE_LATENCY, SINK_MESSAGE_UPDATE_MAX_REQUEST, - SINK_MESSAGE_UPDATE_REQUESTED_LATENCY + SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, + SINK_MESSAGE_UPDATE_LATENCY_RANGE }; enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, + SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY }; static void output_disable(struct output *o); @@ -455,6 +460,7 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) { return; pa_atomic_store(&o->max_request, (int) nbytes); + pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes); pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL); } @@ -470,20 +476,48 @@ static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) { c = pa_sink_get_requested_latency_within_thread(i->sink); - if (c == (pa_usec_t) -1) - c = i->sink->thread_info.max_latency; - if (pa_atomic_load(&o->requested_latency) == (int) c) return; pa_atomic_store(&o->requested_latency, (int) c); + pa_log_debug("Sink input update requested latency %0.2f", (double) c / PA_USEC_PER_MSEC); pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL); } +/* Called from thread context */ +static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) { + struct output *o; + pa_usec_t min, max, fix; + + pa_assert(i); + + pa_sink_input_assert_ref(i); + pa_assert_se(o = i->userdata); + + fix = i->sink->thread_info.fixed_latency; + if (fix > 0) { + min = fix; + max = fix; + } else { + min = i->sink->thread_info.min_latency; + max = i->sink->thread_info.max_latency; + } + + if ((pa_atomic_load(&o->min_latency) == (int) min) && + (pa_atomic_load(&o->max_latency) == (int) max)) + return; + + pa_atomic_store(&o->min_latency, (int) min); + pa_atomic_store(&o->max_latency, (int) max); + pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max); + pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL); +} + /* Called from I/O thread context */ static void sink_input_attach_cb(pa_sink_input *i) { struct output *o; - pa_usec_t c; + pa_usec_t c, fix, min, max; + size_t nbytes; pa_sink_input_assert_ref(i); pa_assert_se(o = i->userdata); @@ -503,12 +537,27 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE); - pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i)); + nbytes = pa_sink_input_get_max_request(i); + pa_atomic_store(&o->max_request, (int) nbytes); + pa_log_debug("attach max request %lu", (unsigned long) nbytes); + + fix = i->sink->thread_info.fixed_latency; + if (fix > 0) { + min = max = fix; + } else { + min = i->sink->thread_info.min_latency; + max = i->sink->thread_info.max_latency; + } + pa_atomic_store(&o->min_latency, (int) min); + pa_atomic_store(&o->max_latency, (int) max); + pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max); c = pa_sink_get_requested_latency_within_thread(i->sink); - pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c)); + pa_atomic_store(&o->requested_latency, (int) c); + pa_log_debug("attach requested latency %0.2f", (double) c / PA_USEC_PER_MSEC); pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL); + pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL); pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL); } @@ -565,6 +614,14 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 pa_memblockq_flush_write(o->memblockq, TRUE); return 0; + + case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: { + pa_usec_t latency = (pa_usec_t) offset; + + pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency); + + return 0; + } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -653,31 +710,62 @@ static void update_max_request(struct userdata *u) { if (max_request <= 0) max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); + pa_log_debug("Sink update max request %lu", (unsigned long) max_request); pa_sink_set_max_request_within_thread(u->sink, max_request); } /* Called from IO context */ -static void update_fixed_latency(struct userdata *u) { - pa_usec_t fixed_latency = 0; +static void update_requested_latency(struct userdata *u) { + size_t requested_latency = 0; struct output *o; pa_assert(u); pa_sink_assert_io_context(u->sink); - /* Collects the requested_latency values of all streams and sets - * the largest one as fixed_latency locally */ + /* Collects the requested_latency values of all streams and remember + * the largest one. We use this latency when the sink-inputs don't provide + * a latency for our sink */ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) { - pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency); + pa_usec_t rl = (pa_usec_t) pa_atomic_load(&o->requested_latency); + + if (rl > requested_latency) + requested_latency = rl; + } + + if (requested_latency <= 0) + requested_latency = u->block_usec; - if (rl > fixed_latency) - fixed_latency = rl; + if (u->requested_latency != requested_latency) { + u->requested_latency = requested_latency; + pa_log_debug("requested latency %lu", (unsigned long) requested_latency); } +} + +/* Called from IO context */ +static void update_latency_range(struct userdata *u) { + pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1; + struct output *o; + + pa_assert(u); + pa_sink_assert_io_context(u->sink); - if (fixed_latency <= 0) - fixed_latency = u->block_usec; + /* Collects the latency_range values of all streams and sets + * the max of min and min of max locally */ + PA_LLIST_FOREACH(o, u->thread_info.active_outputs) { + pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency); + pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency); - pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency); + if (min > min_latency) + min_latency = min; + if (max_latency == (pa_usec_t) -1 || max < max_latency) + max_latency = max; + } + if (max_latency < min_latency) + max_latency = min_latency; + + pa_log_debug("Sink update latency range %lu %lu", min_latency, max_latency); + pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency); } /* Called from thread context of the io thread */ @@ -717,6 +805,30 @@ static void output_remove_within_thread(struct output *o) { } } +/* Called from sink I/O thread context */ +static void sink_update_requested_latency(pa_sink *s) { + struct userdata *u; + struct output *o; + pa_usec_t latency; + + pa_sink_assert_ref(s); + pa_assert_se(u = s->userdata); + + latency = pa_sink_get_requested_latency_within_thread(s); + if (latency == (pa_usec_t) -1) + latency = u->requested_latency; + if (latency == (pa_usec_t) -1) + return; + + pa_log_debug("Sink update requested latency %0.2f", (double) latency / PA_USEC_PER_MSEC); + + /* Just hand this one over to all sink_inputs */ + PA_LLIST_FOREACH(o, u->thread_info.active_outputs) { + pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, latency, NULL, NULL); + } +} + + /* Called from thread context of the io thread */ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; @@ -751,14 +863,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case SINK_MESSAGE_ADD_OUTPUT: output_add_within_thread(data); - update_max_request(u); - update_fixed_latency(u); return 0; case SINK_MESSAGE_REMOVE_OUTPUT: output_remove_within_thread(data); - update_max_request(u); - update_fixed_latency(u); return 0; case SINK_MESSAGE_NEED: @@ -784,8 +892,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse update_max_request(u); break; + case SINK_MESSAGE_UPDATE_LATENCY_RANGE: + update_latency_range(u); + break; + case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY: - update_fixed_latency(u); + update_requested_latency(u); break; } @@ -859,6 +971,7 @@ static int output_create_sink_input(struct output *o) { o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; o->sink_input->update_max_request = sink_input_update_max_request_cb; o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb; + o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb; o->sink_input->attach = sink_input_attach_cb; o->sink_input->detach = sink_input_detach_cb; o->sink_input->kill = sink_input_kill_cb; @@ -893,6 +1006,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) { &u->sink->silence); pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0); + pa_atomic_store(&o->requested_latency, -1); update_description(u); return o; @@ -1150,6 +1264,7 @@ int pa__init(pa_module*m) { slaves = pa_modargs_get_value(ma, "slaves", NULL); u->automatic = !slaves; + u->requested_latency = (pa_usec_t) -1; ss = m->core->default_sample_spec; map = m->core->default_channel_map; @@ -1234,7 +1349,7 @@ int pa__init(pa_module*m) { pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output"); } - u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY); + u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY); pa_sink_new_data_done(&data); if (!u->sink) { @@ -1244,6 +1359,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg; u->sink->set_state = sink_set_state; + u->sink->update_requested_latency = sink_update_requested_latency; u->sink->userdata = u; pa_sink_set_rtpoll(u->sink, u->rtpoll); |