summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2014-04-02 15:17:11 +0200
committerWim Taymans <wtaymans@redhat.com>2014-04-02 15:28:13 +0200
commit429a8cbc201b002a8e3dfa9f0fcde957119690a1 (patch)
tree7deb241c46d310b2b92bc6a3767d0c8e59791790
parentdfcb4f3a622c3399dc846f281537fb8bec481c38 (diff)
combine: add support for DYNAMIC_LATENCYrhel-6.6-dynlat
Mark the sink as DYNAMIC_LATENCY and implement update_sink_latency_range on its sink-input to collect the combined latency range of all sinks. Let each input-sink notify us when their requested latency changes and use this value to configure a final latency when we don't have any upstream requests. Implement update_requested_latency on the sink to configure the final latency by combining the sink-input requested latencies and our output latencies. Configure this final latency on all out outputs. This makes us honour the client latency request. We don't need to call update_max_request() and update_fixed_latency() when adding and removing outputs, this is already done when we attach the sink-inputs. Also add more debug log.
-rw-r--r--src/modules/module-combine.c162
1 files changed, 139 insertions, 23 deletions
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index cffb901b6..0388d7b98 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -106,6 +106,8 @@ struct output {
/* For coomunication of the stream parameters to the sink thread */
pa_atomic_t max_request;
pa_atomic_t requested_latency;
+ pa_atomic_t max_latency;
+ pa_atomic_t min_latency;
PA_LLIST_FIELDS(struct output);
};
@@ -124,6 +126,7 @@ struct userdata {
pa_bool_t automatic;
pa_bool_t auto_desc;
+ pa_usec_t requested_latency;
pa_hook_slot *sink_put_slot, *sink_unlink_slot, *sink_state_changed_slot;
@@ -149,11 +152,13 @@ enum {
SINK_MESSAGE_NEED,
SINK_MESSAGE_UPDATE_LATENCY,
SINK_MESSAGE_UPDATE_MAX_REQUEST,
- SINK_MESSAGE_UPDATE_REQUESTED_LATENCY
+ SINK_MESSAGE_UPDATE_REQUESTED_LATENCY,
+ SINK_MESSAGE_UPDATE_LATENCY_RANGE
};
enum {
SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
+ SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY
};
static void output_disable(struct output *o);
@@ -455,6 +460,7 @@ static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
return;
pa_atomic_store(&o->max_request, (int) nbytes);
+ pa_log_debug("Sink input update max request %lu", (unsigned long) nbytes);
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
}
@@ -470,20 +476,48 @@ static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
c = pa_sink_get_requested_latency_within_thread(i->sink);
- if (c == (pa_usec_t) -1)
- c = i->sink->thread_info.max_latency;
-
if (pa_atomic_load(&o->requested_latency) == (int) c)
return;
pa_atomic_store(&o->requested_latency, (int) c);
+ pa_log_debug("Sink input update requested latency %0.2f", (double) c / PA_USEC_PER_MSEC);
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
}
+/* Called from thread context */
+static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
+ struct output *o;
+ pa_usec_t min, max, fix;
+
+ pa_assert(i);
+
+ pa_sink_input_assert_ref(i);
+ pa_assert_se(o = i->userdata);
+
+ fix = i->sink->thread_info.fixed_latency;
+ if (fix > 0) {
+ min = fix;
+ max = fix;
+ } else {
+ min = i->sink->thread_info.min_latency;
+ max = i->sink->thread_info.max_latency;
+ }
+
+ if ((pa_atomic_load(&o->min_latency) == (int) min) &&
+ (pa_atomic_load(&o->max_latency) == (int) max))
+ return;
+
+ pa_atomic_store(&o->min_latency, (int) min);
+ pa_atomic_store(&o->max_latency, (int) max);
+ pa_log_debug("Sink input update latency range %lu %lu", (unsigned long) min, (unsigned long) max);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
+}
+
/* Called from I/O thread context */
static void sink_input_attach_cb(pa_sink_input *i) {
struct output *o;
- pa_usec_t c;
+ pa_usec_t c, fix, min, max;
+ size_t nbytes;
pa_sink_input_assert_ref(i);
pa_assert_se(o = i->userdata);
@@ -503,12 +537,27 @@ static void sink_input_attach_cb(pa_sink_input *i) {
pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
- pa_atomic_store(&o->max_request, (int) pa_sink_input_get_max_request(i));
+ nbytes = pa_sink_input_get_max_request(i);
+ pa_atomic_store(&o->max_request, (int) nbytes);
+ pa_log_debug("attach max request %lu", (unsigned long) nbytes);
+
+ fix = i->sink->thread_info.fixed_latency;
+ if (fix > 0) {
+ min = max = fix;
+ } else {
+ min = i->sink->thread_info.min_latency;
+ max = i->sink->thread_info.max_latency;
+ }
+ pa_atomic_store(&o->min_latency, (int) min);
+ pa_atomic_store(&o->max_latency, (int) max);
+ pa_log_debug("attach latency range %lu %lu", (unsigned long) min, (unsigned long) max);
c = pa_sink_get_requested_latency_within_thread(i->sink);
- pa_atomic_store(&o->requested_latency, (int) (c == (pa_usec_t) -1 ? 0 : c));
+ pa_atomic_store(&o->requested_latency, (int) c);
+ pa_log_debug("attach requested latency %0.2f", (double) c / PA_USEC_PER_MSEC);
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_MAX_REQUEST, NULL, 0, NULL, NULL);
+ pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_LATENCY_RANGE, NULL, 0, NULL, NULL);
pa_asyncmsgq_post(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_UPDATE_REQUESTED_LATENCY, NULL, 0, NULL, NULL);
}
@@ -565,6 +614,14 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
pa_memblockq_flush_write(o->memblockq, TRUE);
return 0;
+
+ case SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY: {
+ pa_usec_t latency = (pa_usec_t) offset;
+
+ pa_sink_input_set_requested_latency_within_thread(o->sink_input, latency);
+
+ return 0;
+ }
}
return pa_sink_input_process_msg(obj, code, data, offset, chunk);
@@ -653,31 +710,62 @@ static void update_max_request(struct userdata *u) {
if (max_request <= 0)
max_request = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
+ pa_log_debug("Sink update max request %lu", (unsigned long) max_request);
pa_sink_set_max_request_within_thread(u->sink, max_request);
}
/* Called from IO context */
-static void update_fixed_latency(struct userdata *u) {
- pa_usec_t fixed_latency = 0;
+static void update_requested_latency(struct userdata *u) {
+ size_t requested_latency = 0;
struct output *o;
pa_assert(u);
pa_sink_assert_io_context(u->sink);
- /* Collects the requested_latency values of all streams and sets
- * the largest one as fixed_latency locally */
+ /* Collects the requested_latency values of all streams and remember
+ * the largest one. We use this latency when the sink-inputs don't provide
+ * a latency for our sink */
PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
- pa_usec_t rl = (size_t) pa_atomic_load(&o->requested_latency);
+ pa_usec_t rl = (pa_usec_t) pa_atomic_load(&o->requested_latency);
+
+ if (rl > requested_latency)
+ requested_latency = rl;
+ }
+
+ if (requested_latency <= 0)
+ requested_latency = u->block_usec;
- if (rl > fixed_latency)
- fixed_latency = rl;
+ if (u->requested_latency != requested_latency) {
+ u->requested_latency = requested_latency;
+ pa_log_debug("requested latency %lu", (unsigned long) requested_latency);
}
+}
+
+/* Called from IO context */
+static void update_latency_range(struct userdata *u) {
+ pa_usec_t min_latency = 0, max_latency = (pa_usec_t) -1;
+ struct output *o;
+
+ pa_assert(u);
+ pa_sink_assert_io_context(u->sink);
- if (fixed_latency <= 0)
- fixed_latency = u->block_usec;
+ /* Collects the latency_range values of all streams and sets
+ * the max of min and min of max locally */
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+ pa_usec_t min = (size_t) pa_atomic_load(&o->min_latency);
+ pa_usec_t max = (size_t) pa_atomic_load(&o->max_latency);
- pa_sink_set_fixed_latency_within_thread(u->sink, fixed_latency);
+ if (min > min_latency)
+ min_latency = min;
+ if (max_latency == (pa_usec_t) -1 || max < max_latency)
+ max_latency = max;
+ }
+ if (max_latency < min_latency)
+ max_latency = min_latency;
+
+ pa_log_debug("Sink update latency range %lu %lu", min_latency, max_latency);
+ pa_sink_set_latency_range_within_thread(u->sink, min_latency, max_latency);
}
/* Called from thread context of the io thread */
@@ -717,6 +805,30 @@ static void output_remove_within_thread(struct output *o) {
}
}
+/* Called from sink I/O thread context */
+static void sink_update_requested_latency(pa_sink *s) {
+ struct userdata *u;
+ struct output *o;
+ pa_usec_t latency;
+
+ pa_sink_assert_ref(s);
+ pa_assert_se(u = s->userdata);
+
+ latency = pa_sink_get_requested_latency_within_thread(s);
+ if (latency == (pa_usec_t) -1)
+ latency = u->requested_latency;
+ if (latency == (pa_usec_t) -1)
+ return;
+
+ pa_log_debug("Sink update requested latency %0.2f", (double) latency / PA_USEC_PER_MSEC);
+
+ /* Just hand this one over to all sink_inputs */
+ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
+ pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, latency, NULL, NULL);
+ }
+}
+
+
/* Called from thread context of the io thread */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
@@ -751,14 +863,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
case SINK_MESSAGE_ADD_OUTPUT:
output_add_within_thread(data);
- update_max_request(u);
- update_fixed_latency(u);
return 0;
case SINK_MESSAGE_REMOVE_OUTPUT:
output_remove_within_thread(data);
- update_max_request(u);
- update_fixed_latency(u);
return 0;
case SINK_MESSAGE_NEED:
@@ -784,8 +892,12 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
update_max_request(u);
break;
+ case SINK_MESSAGE_UPDATE_LATENCY_RANGE:
+ update_latency_range(u);
+ break;
+
case SINK_MESSAGE_UPDATE_REQUESTED_LATENCY:
- update_fixed_latency(u);
+ update_requested_latency(u);
break;
}
@@ -859,6 +971,7 @@ static int output_create_sink_input(struct output *o) {
o->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
o->sink_input->update_max_request = sink_input_update_max_request_cb;
o->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
+ o->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
o->sink_input->attach = sink_input_attach_cb;
o->sink_input->detach = sink_input_detach_cb;
o->sink_input->kill = sink_input_kill_cb;
@@ -893,6 +1006,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
&u->sink->silence);
pa_assert_se(pa_idxset_put(u->outputs, o, NULL) == 0);
+ pa_atomic_store(&o->requested_latency, -1);
update_description(u);
return o;
@@ -1150,6 +1264,7 @@ int pa__init(pa_module*m) {
slaves = pa_modargs_get_value(ma, "slaves", NULL);
u->automatic = !slaves;
+ u->requested_latency = (pa_usec_t) -1;
ss = m->core->default_sample_spec;
map = m->core->default_channel_map;
@@ -1234,7 +1349,7 @@ int pa__init(pa_module*m) {
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Simultaneous Output");
}
- u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+ u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
pa_sink_new_data_done(&data);
if (!u->sink) {
@@ -1244,6 +1359,7 @@ int pa__init(pa_module*m) {
u->sink->parent.process_msg = sink_process_msg;
u->sink->set_state = sink_set_state;
+ u->sink->update_requested_latency = sink_update_requested_latency;
u->sink->userdata = u;
pa_sink_set_rtpoll(u->sink, u->rtpoll);