/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
Copyright (C) 2010 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see .
*/
#include "config.h"
#include "spice-pulse.h"
#include "spice-common.h"
#include "spice-session-priv.h"
#include "spice-channel-priv.h"
#include "spice-util-priv.h"
#include
#include
#include
struct async_task {
SpicePulse *pulse;
SpiceMainChannel *main_channel;
GTask *gtask;
GAsyncReadyCallback callback;
gpointer user_data;
gboolean is_playback;
pa_operation *pa_op;
gulong cancel_id;
};
struct stream {
pa_sample_spec spec;
pa_stream *stream;
int state;
pa_operation *uncork_op;
pa_operation *cork_op;
gboolean started;
guint num_underflow;
gboolean info_updated;
gchar *name;
pa_ext_stream_restore_info info;
};
struct _SpicePulsePrivate {
SpiceChannel *pchannel;
SpiceChannel *rchannel;
pa_glib_mainloop *mainloop;
pa_context *context;
int state;
struct stream playback;
struct stream record;
guint last_delay;
guint target_delay;
struct async_task *pending_restore_task;
GList *results;
};
G_DEFINE_TYPE_WITH_PRIVATE(SpicePulse, spice_pulse, SPICE_TYPE_AUDIO)
static const char *stream_state_names[] = {
[ PA_STREAM_UNCONNECTED ] = "unconnected",
[ PA_STREAM_CREATING ] = "creating",
[ PA_STREAM_READY ] = "ready",
[ PA_STREAM_FAILED ] = "failed",
[ PA_STREAM_TERMINATED ] = "terminated",
};
static const char *context_state_names[] = {
[ PA_CONTEXT_UNCONNECTED ] = "unconnected",
[ PA_CONTEXT_CONNECTING ] = "connecting",
[ PA_CONTEXT_AUTHORIZING ] = "authorizing",
[ PA_CONTEXT_SETTING_NAME ] = "setting_name",
[ PA_CONTEXT_READY ] = "ready",
[ PA_CONTEXT_FAILED ] = "failed",
[ PA_CONTEXT_TERMINATED ] = "terminated",
};
#define STATE_NAME(array, state) \
((state < G_N_ELEMENTS(array)) ? array[state] : NULL)
static void stream_stop(SpicePulse *pulse, struct stream *s);
static gboolean connect_channel(SpiceAudio *audio, SpiceChannel *channel);
static void channel_weak_notified(gpointer data, GObject *where_the_object_was);
static void spice_pulse_get_playback_volume_info_async(SpiceAudio *audio, GCancellable *cancellable,
SpiceMainChannel *main_channel, GAsyncReadyCallback callback, gpointer user_data);
static gboolean spice_pulse_get_playback_volume_info_finish(SpiceAudio *audio, GAsyncResult *res,
gboolean *mute, guint8 *nchannels, guint16 **volume, GError **error);
static void spice_pulse_get_record_volume_info_async(SpiceAudio *audio, GCancellable *cancellable,
SpiceMainChannel *main_channel, GAsyncReadyCallback callback, gpointer user_data);
static gboolean spice_pulse_get_record_volume_info_finish(SpiceAudio *audio,GAsyncResult *res,
gboolean *mute, guint8 *nchannels, guint16 **volume, GError **error);
static void stream_restore_read_cb(pa_context *context,
const pa_ext_stream_restore_info *info, int eol, void *userdata);
static void spice_pulse_complete_async_task(struct async_task *task, const gchar *err_msg);
static void spice_pulse_complete_all_async_tasks(SpicePulse *pulse, const gchar *err_msg);
static void spice_pulse_finalize(GObject *obj)
{
SpicePulse *pulse = SPICE_PULSE(obj);
SpicePulsePrivate *p;
p = pulse->priv;
if (p->context != NULL)
pa_context_unref(p->context);
if (p->mainloop != NULL)
pa_glib_mainloop_free(p->mainloop);
G_OBJECT_CLASS(spice_pulse_parent_class)->finalize(obj);
}
static void spice_pulse_dispose(GObject *obj)
{
SpicePulse *pulse = SPICE_PULSE(obj);
SpicePulsePrivate *p;
SPICE_DEBUG("%s", __FUNCTION__);
p = pulse->priv;
g_clear_pointer(&p->playback.uncork_op, pa_operation_unref);
g_clear_pointer(&p->playback.cork_op, pa_operation_unref);
g_clear_pointer(&p->record.uncork_op, pa_operation_unref);
g_clear_pointer(&p->record.cork_op, pa_operation_unref);
if (p->results != NULL)
spice_pulse_complete_all_async_tasks(pulse, "PulseAudio is being dispose");
g_clear_pointer(&p->playback.name, g_free);
g_clear_pointer(&p->record.name, g_free);
if (p->pchannel)
g_object_weak_unref(G_OBJECT(p->pchannel), channel_weak_notified, pulse);
p->pchannel = NULL;
if (p->rchannel)
g_object_weak_unref(G_OBJECT(p->rchannel), channel_weak_notified, pulse);
p->rchannel = NULL;
G_OBJECT_CLASS(spice_pulse_parent_class)->dispose(obj);
}
static void spice_pulse_init(SpicePulse *pulse)
{
pulse->priv = spice_pulse_get_instance_private(pulse);
}
static void spice_pulse_class_init(SpicePulseClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
SpiceAudioClass *audio_class = SPICE_AUDIO_CLASS(klass);
audio_class->connect_channel = connect_channel;
audio_class->get_playback_volume_info_async = spice_pulse_get_playback_volume_info_async;
audio_class->get_playback_volume_info_finish = spice_pulse_get_playback_volume_info_finish;
audio_class->get_record_volume_info_async = spice_pulse_get_record_volume_info_async;
audio_class->get_record_volume_info_finish = spice_pulse_get_record_volume_info_finish;
gobject_class->finalize = spice_pulse_finalize;
gobject_class->dispose = spice_pulse_dispose;
}
/* ------------------------------------------------------------------ */
static void pulse_uncork_cb(pa_stream *pastream, int success, void *data)
{
struct stream *s = data;
if (!success)
g_warning("pulseaudio uncork operation failed");
g_clear_pointer(&s->uncork_op, pa_operation_unref);
}
static void stream_uncork(SpicePulse *pulse, struct stream *s)
{
SpicePulsePrivate *p = pulse->priv;
pa_operation *o = NULL;
g_return_if_fail(s->stream);
if (s->cork_op) {
pa_operation_cancel(s->cork_op);
g_clear_pointer(&s->cork_op, pa_operation_unref);
}
if (pa_stream_is_corked(s->stream) && !s->uncork_op) {
if (!(o = pa_stream_cork(s->stream, 0, pulse_uncork_cb, s))) {
g_warning("pa_stream_uncork() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
s->uncork_op = o;
}
}
static void pulse_flush_cb(pa_stream *pastream, int success, void *data)
{
struct stream *s = data;
if (!success)
g_warning("pulseaudio flush operation failed");
g_clear_pointer(&s->cork_op, pa_operation_unref);
}
static void pulse_cork_flush_cb(pa_stream *pastream, int success, void *data)
{
struct stream *s = data;
if (!success)
g_warning("pulseaudio cork operation failed");
pa_operation_unref(s->cork_op);
if (!(s->cork_op = pa_stream_flush(s->stream, pulse_flush_cb, s))) {
g_warning("pa_stream_flush() failed");
}
}
static void pulse_cork_cb(pa_stream *pastream, int success, void *data)
{
struct stream *s = data;
SPICE_DEBUG("%s: cork started", __FUNCTION__);
if (!success)
g_warning("pulseaudio cork operation failed");
g_clear_pointer(&s->cork_op, pa_operation_unref);
}
static void stream_cork(SpicePulse *pulse, struct stream *s, gboolean with_flush)
{
SpicePulsePrivate *p = pulse->priv;
pa_operation *o = NULL;
if (s->uncork_op) {
pa_operation_cancel(s->uncork_op);
g_clear_pointer(&s->uncork_op, pa_operation_unref);
}
if (!pa_stream_is_corked(s->stream) && !s->cork_op) {
if (!(o = pa_stream_cork(s->stream, 1,
with_flush ? pulse_cork_flush_cb :
pulse_cork_cb,
s))) {
g_warning("pa_stream_cork() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
s->cork_op = o;
}
}
static void stream_stop(SpicePulse *pulse, struct stream *s)
{
SpicePulsePrivate *p = pulse->priv;
if (pa_stream_disconnect(s->stream) < 0) {
g_warning("pa_stream_disconnect() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
g_clear_pointer(&s->stream, pa_stream_unref);
}
static void stream_state_callback(pa_stream *s, void *userdata)
{
SpicePulse *pulse = userdata;
SpicePulsePrivate *p;
p = pulse->priv;
g_return_if_fail(p != NULL);
g_return_if_fail(s != NULL);
switch (pa_stream_get_state(s)) {
case PA_STREAM_CREATING:
case PA_STREAM_TERMINATED:
case PA_STREAM_READY:
break;
case PA_STREAM_FAILED:
default:
g_warning("Stream error: %s", pa_strerror(pa_context_errno(pa_stream_get_context(s))));
}
}
static void stream_underflow_cb(pa_stream *s, void *userdata)
{
SpicePulse *pulse = userdata;
SpicePulsePrivate *p;
SPICE_DEBUG("PA stream underflow!!");
p = pulse->priv;
g_return_if_fail(p != NULL);
p->playback.num_underflow++;
#ifdef PULSE_ADJUST_LATENCY
const pa_buffer_attr *buffer_attr;
pa_buffer_attr new_buffer_attr;
pa_operation *op;
buffer_attr = pa_stream_get_buffer_attr(s);
g_return_if_fail(buffer_attr != NULL);
new_buffer_attr = *buffer_attr;
new_buffer_attr.tlength *= 2;
new_buffer_attr.minreq *= 2;
op = pa_stream_set_buffer_attr(s, &new_buffer_attr, NULL, NULL);
pa_operation_unref(op);
#endif
}
static void stream_update_latency_callback(pa_stream *s, void *userdata)
{
SpicePulse *pulse = userdata;
pa_usec_t usec;
int negative = 0;
SpicePulsePrivate *p;
p = pulse->priv;
g_return_if_fail(s != NULL);
g_return_if_fail(p != NULL);
if (!p->playback.stream || !p->playback.started)
return;
if (pa_stream_get_latency(s, &usec, &negative) < 0) {
g_warning("Failed to get latency: %s", pa_strerror(pa_context_errno(p->context)));
return;
}
g_return_if_fail(negative == FALSE);
p->last_delay = usec / PA_USEC_PER_MSEC;
spice_playback_channel_set_delay(SPICE_PLAYBACK_CHANNEL(p->pchannel), usec / 1000);
if (pa_stream_is_corked(p->playback.stream)) {
if (p->last_delay >= p->target_delay) {
SPICE_DEBUG("%s: uncork playback. delay %u target %u", __FUNCTION__, p->last_delay, p->target_delay);
stream_uncork(pulse, &p->playback);
} else {
SPICE_DEBUG("%s: still corked. delay %u target %u", __FUNCTION__, p->last_delay, p->target_delay);
}
}
}
static void create_playback(SpicePulse *pulse)
{
SpicePulsePrivate *p = pulse->priv;
pa_stream_flags_t flags;
pa_buffer_attr buffer_attr = { 0, };
g_return_if_fail(p != NULL);
g_return_if_fail(p->context != NULL);
g_return_if_fail(p->playback.stream == NULL);
g_return_if_fail(pa_context_get_state(p->context) == PA_CONTEXT_READY);
p->playback.state = PA_STREAM_READY;
p->playback.stream = pa_stream_new(p->context, "playback",
&p->playback.spec, NULL);
pa_stream_set_state_callback(p->playback.stream, stream_state_callback, pulse);
pa_stream_set_underflow_callback(p->playback.stream, stream_underflow_cb, pulse);
pa_stream_set_latency_update_callback(p->playback.stream, stream_update_latency_callback, pulse);
buffer_attr.maxlength = -1;
buffer_attr.tlength = pa_usec_to_bytes(p->target_delay * PA_USEC_PER_MSEC, &p->playback.spec);
buffer_attr.prebuf = -1;
buffer_attr.minreq = -1;
flags = PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE;
if (pa_stream_connect_playback(p->playback.stream,
NULL, &buffer_attr, flags, NULL, NULL) < 0) {
g_warning("pa_stream_connect_playback() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
}
static void playback_start(SpicePlaybackChannel *channel, gint format, gint channels,
gint frequency, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
pa_context_state_t state;
guint latency;
g_return_if_fail(p != NULL);
p->playback.started = TRUE;
p->playback.num_underflow = 0;
g_object_get(p->pchannel, "min-latency", &latency, NULL);
if (p->playback.stream &&
(p->playback.spec.rate != frequency ||
p->playback.spec.channels != channels ||
p->target_delay != latency)) {
stream_stop(pulse, &p->playback);
}
g_return_if_fail(format == SPICE_AUDIO_FMT_S16);
p->playback.spec.format = PA_SAMPLE_S16LE;
p->playback.spec.rate = frequency;
p->playback.spec.channels = channels;
p->target_delay = latency;
p->last_delay = 0;
state = pa_context_get_state(p->context);
switch (state) {
case PA_CONTEXT_READY:
if (p->state != state) {
SPICE_DEBUG("%s: pulse context ready", __FUNCTION__);
}
if (p->playback.stream == NULL) {
create_playback(pulse);
} else
stream_uncork(pulse, &p->playback);
break;
default:
if (p->state != state) {
SPICE_DEBUG("%s: pulse context not ready (%s)",
__FUNCTION__, STATE_NAME(context_state_names, state));
}
break;
}
p->state = state;
}
static void playback_data(SpicePlaybackChannel *channel,
gpointer *audio, gint size,
gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
pa_stream_state_t state;
if (!p->playback.stream)
return;
state = pa_stream_get_state(p->playback.stream);
switch (state) {
case PA_STREAM_CREATING:
SPICE_DEBUG("stream creating, dropping data");
break;
case PA_STREAM_READY:
if (p->playback.state != state) {
SPICE_DEBUG("%s: pulse playback stream ready", __FUNCTION__);
}
if (pa_stream_write(p->playback.stream, audio, size, NULL, 0, PA_SEEK_RELATIVE) < 0) {
g_warning("pa_stream_write() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
break;
default:
if (p->playback.state != state) {
SPICE_DEBUG("%s: pulse playback stream not ready (%s)",
__FUNCTION__, STATE_NAME(stream_state_names, state));
}
break;
}
p->playback.state = state;
}
static void playback_stop(SpicePulse *pulse)
{
SpicePulsePrivate *p = pulse->priv;
SPICE_DEBUG("%s: #underflow %u", __FUNCTION__, p->playback.num_underflow);
p->playback.started = FALSE;
if (!p->playback.stream)
return;
stream_cork(pulse, &p->playback, TRUE);
}
static void stream_read_callback(pa_stream *s, size_t length, void *data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
g_return_if_fail(p != NULL);
while (pa_stream_readable_size(s) > 0) {
const void *snddata;
if (pa_stream_peek(s, &snddata, &length) < 0) {
g_warning("pa_stream_peek() failed: %s",
pa_strerror(pa_context_errno(p->context)));
return;
}
g_return_if_fail(snddata);
g_return_if_fail(length > 0);
if (p->rchannel != NULL)
spice_record_channel_send_data(SPICE_RECORD_CHANNEL(p->rchannel),
/* FIXME: server side doesn't care about ts?
what is the unit? ms apparently */
(gpointer)snddata, length, 0);
if (pa_stream_drop(s) < 0) {
g_warning("pa_stream_drop() failed: %s",
pa_strerror(pa_context_errno(p->context)));
return;
}
}
}
static void create_record(SpicePulse *pulse)
{
SpicePulsePrivate *p = pulse->priv;
pa_buffer_attr buffer_attr = { 0, };
pa_stream_flags_t flags;
g_return_if_fail(p != NULL);
g_return_if_fail(p->context != NULL);
g_return_if_fail(p->record.stream == NULL);
g_return_if_fail(pa_context_get_state(p->context) == PA_CONTEXT_READY);
p->record.state = PA_STREAM_READY;
p->record.stream = pa_stream_new(p->context, "record",
&p->record.spec, NULL);
pa_stream_set_read_callback(p->record.stream, stream_read_callback, pulse);
pa_stream_set_state_callback(p->record.stream, stream_state_callback, pulse);
/* FIXME: we might want customizable latency */
buffer_attr.maxlength = -1;
buffer_attr.prebuf = -1;
buffer_attr.fragsize = buffer_attr.tlength = pa_usec_to_bytes(20 * PA_USEC_PER_MSEC, &p->record.spec);
buffer_attr.minreq = (uint32_t) -1;
flags = PA_STREAM_ADJUST_LATENCY;
if (pa_stream_connect_record(p->record.stream, NULL, &buffer_attr, flags) < 0) {
g_warning("pa_stream_connect_record() failed: %s",
pa_strerror(pa_context_errno(p->context)));
}
}
static void record_start(SpiceRecordChannel *channel, gint format, gint channels,
gint frequency, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
pa_context_state_t state;
p->record.started = TRUE;
if (p->record.stream &&
(p->record.spec.rate != frequency ||
p->record.spec.channels != channels)) {
stream_stop(pulse, &p->record);
}
g_return_if_fail(format == SPICE_AUDIO_FMT_S16);
p->record.spec.format = PA_SAMPLE_S16LE;
p->record.spec.rate = frequency;
p->record.spec.channels = channels;
state = pa_context_get_state(p->context);
switch (state) {
case PA_CONTEXT_READY:
if (p->state != state) {
SPICE_DEBUG("%s: pulse context ready", __FUNCTION__);
}
if (p->record.stream == NULL) {
create_record(pulse);
} else
stream_uncork(pulse, &p->record);
break;
default:
if (p->state != state) {
g_warning("%s: pulse context not ready (%s)",
__FUNCTION__, STATE_NAME(context_state_names, state));
}
break;
}
p->state = state;
}
static void record_stop(SpicePulse *pulse)
{
SpicePulsePrivate *p = pulse->priv;
SPICE_DEBUG("%s", __FUNCTION__);
p->record.started = FALSE;
if (!p->record.stream)
return;
stream_stop(pulse, &p->record);
}
static void playback_volume_changed(GObject *object, GParamSpec *pspec, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
guint16 *volume;
guint nchannels;
pa_operation *op;
pa_cvolume v;
guint i;
g_object_get(object,
"volume", &volume,
"nchannels", &nchannels,
NULL);
pa_cvolume_init(&v);
v.channels = p->playback.spec.channels;
for (i = 0; i < nchannels; ++i) {
v.values[i] = (PA_VOLUME_NORM - PA_VOLUME_MUTED) * volume[i] / G_MAXUINT16;
SPICE_DEBUG("playback volume changed %u", v.values[i]);
}
if (!p->playback.stream ||
pa_stream_get_index(p->playback.stream) == PA_INVALID_INDEX)
return;
op = pa_context_set_sink_input_volume(p->context,
pa_stream_get_index(p->playback.stream),
&v, NULL, NULL);
if (!op)
g_warning("set_sink_input_volume() failed: %s",
pa_strerror(pa_context_errno(p->context)));
else
pa_operation_unref(op);
}
static void playback_mute_changed(GObject *object, GParamSpec *pspec, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
gboolean mute;
pa_operation *op;
g_object_get(object, "mute", &mute, NULL);
SPICE_DEBUG("playback mute changed %d", mute);
if (!p->playback.stream ||
pa_stream_get_index(p->playback.stream) == PA_INVALID_INDEX)
return;
op = pa_context_set_sink_input_mute(p->context,
pa_stream_get_index(p->playback.stream),
mute, NULL, NULL);
if (!op)
g_warning("set_sink_input_mute() failed: %s",
pa_strerror(pa_context_errno(p->context)));
else
pa_operation_unref(op);
}
static void playback_min_latency_changed(GObject *object, GParamSpec *pspec, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
guint min_latency;
g_object_get(object, "min-latency", &min_latency, NULL);
p->target_delay = min_latency;
if (p->last_delay < p->target_delay) {
SPICE_DEBUG("%s: corking", __FUNCTION__);
if (p->playback.stream)
stream_cork(pulse, &p->playback, FALSE);
} else {
SPICE_DEBUG("%s: not corking. The current delay satisfies the requirement", __FUNCTION__);
}
}
static void record_mute_changed(GObject *object, GParamSpec *pspec, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
gboolean mute;
pa_operation *op;
g_object_get(object, "mute", &mute, NULL);
SPICE_DEBUG("record mute changed %d", mute);
if (!p->record.stream ||
pa_stream_get_device_index(p->record.stream) == PA_INVALID_INDEX)
return;
#if PA_CHECK_VERSION(1,0,0)
op = pa_context_set_source_output_mute(p->context,
pa_stream_get_index(p->record.stream),
#else
op = pa_context_set_source_mute_by_index(p->context,
pa_stream_get_device_index(p->record.stream),
#endif
mute, NULL, NULL);
if (!op)
g_warning("set_source_output_mute() failed: %s",
pa_strerror(pa_context_errno(p->context)));
else
pa_operation_unref(op);
}
static void record_volume_changed(GObject *object, GParamSpec *pspec, gpointer data)
{
SpicePulse *pulse = data;
SpicePulsePrivate *p = pulse->priv;
guint16 *volume;
guint nchannels;
pa_operation *op;
pa_cvolume v;
guint i;
g_object_get(object,
"volume", &volume,
"nchannels", &nchannels,
NULL);
pa_cvolume_init(&v);
v.channels = p->record.spec.channels;
for (i = 0; i < nchannels; ++i) {
v.values[i] = (PA_VOLUME_NORM - PA_VOLUME_MUTED) * volume[i] / G_MAXUINT16;
SPICE_DEBUG("record volume changed %u", v.values[i]);
}
if (!p->record.stream ||
pa_stream_get_device_index(p->record.stream) == PA_INVALID_INDEX)
return;
#if PA_CHECK_VERSION(1,0,0)
op = pa_context_set_source_output_volume(p->context,
pa_stream_get_index(p->record.stream),
#else
op = pa_context_set_source_volume_by_index(p->context,
pa_stream_get_device_index(p->record.stream),
#endif
&v, NULL, NULL);
if (!op)
g_warning("set_source_output_volume() failed: %s",
pa_strerror(pa_context_errno(p->context)));
else
pa_operation_unref(op);
}
static void
channel_weak_notified(gpointer data,
GObject *where_the_object_was)
{
SpicePulse *pulse = SPICE_PULSE(data);
SpicePulsePrivate *p = pulse->priv;
if (where_the_object_was == (GObject *)p->pchannel) {
SPICE_DEBUG("playback closed");
playback_stop(pulse);
p->pchannel = NULL;
} else if (where_the_object_was == (GObject *)p->rchannel) {
SPICE_DEBUG("record closed");
record_stop(pulse);
p->rchannel = NULL;
}
}
static gboolean connect_channel(SpiceAudio *audio, SpiceChannel *channel)
{
SpicePulse *pulse = SPICE_PULSE(audio);
SpicePulsePrivate *p = pulse->priv;
if (SPICE_IS_PLAYBACK_CHANNEL(channel)) {
g_return_val_if_fail(p->pchannel == NULL, FALSE);
p->pchannel = channel;
g_object_weak_ref(G_OBJECT(p->pchannel), channel_weak_notified, audio);
spice_g_signal_connect_object(channel, "playback-start",
G_CALLBACK(playback_start), pulse, 0);
spice_g_signal_connect_object(channel, "playback-data",
G_CALLBACK(playback_data), pulse, 0);
spice_g_signal_connect_object(channel, "playback-stop",
G_CALLBACK(playback_stop), pulse, G_CONNECT_SWAPPED);
spice_g_signal_connect_object(channel, "notify::volume",
G_CALLBACK(playback_volume_changed), pulse, 0);
spice_g_signal_connect_object(channel, "notify::mute",
G_CALLBACK(playback_mute_changed), pulse, 0);
spice_g_signal_connect_object(channel, "notify::min-latency",
G_CALLBACK(playback_min_latency_changed), pulse, 0);
return TRUE;
}
if (SPICE_IS_RECORD_CHANNEL(channel)) {
g_return_val_if_fail(p->rchannel == NULL, FALSE);
p->rchannel = channel;
g_object_weak_ref(G_OBJECT(p->rchannel), channel_weak_notified, audio);
spice_g_signal_connect_object(channel, "record-start",
G_CALLBACK(record_start), pulse, 0);
spice_g_signal_connect_object(channel, "record-stop",
G_CALLBACK(record_stop), pulse, G_CONNECT_SWAPPED);
spice_g_signal_connect_object(channel, "notify::volume",
G_CALLBACK(record_volume_changed), pulse, 0);
spice_g_signal_connect_object(channel, "notify::mute",
G_CALLBACK(record_mute_changed), pulse, 0);
return TRUE;
}
return FALSE;
}
static void context_state_callback(pa_context *c, void *userdata)
{
SpicePulse *pulse = userdata;
SpicePulsePrivate *p;
p = pulse->priv;
g_return_if_fail(p != NULL);
g_return_if_fail(c != NULL);
switch (pa_context_get_state(c)) {
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
case PA_CONTEXT_UNCONNECTED:
break;
case PA_CONTEXT_READY: {
if (!p->record.stream && p->record.started)
create_record(SPICE_PULSE(userdata));
if (!p->playback.stream && p->playback.started)
create_playback(SPICE_PULSE(userdata));
if (p->pending_restore_task != NULL &&
p->pending_restore_task->pa_op == NULL) {
pa_operation *op = pa_ext_stream_restore_read(p->context,
stream_restore_read_cb,
pulse);
if (!op)
goto context_fail;
p->pending_restore_task->pa_op = op;
}
break;
}
case PA_CONTEXT_FAILED:
g_warning("PulseAudio context failed %s",
pa_strerror(pa_context_errno(p->context)));
goto context_fail;
case PA_CONTEXT_TERMINATED:
default:
SPICE_DEBUG("PulseAudio context terminated");
goto context_fail;
}
return;
context_fail:
if (p->pending_restore_task != NULL) {
const gchar *errmsg = pa_strerror(pa_context_errno(p->context));
errmsg = (errmsg != NULL) ? errmsg : "PulseAudio context terminated";
spice_pulse_complete_all_async_tasks(pulse, errmsg);
}
}
SpicePulse *spice_pulse_new(SpiceSession *session, GMainContext *context,
const char *name)
{
SpicePulse *pulse;
SpicePulsePrivate *p;
pulse = g_object_new(SPICE_TYPE_PULSE,
"session", session,
"main-context", context,
NULL);
p = pulse->priv;
p->mainloop = pa_glib_mainloop_new(context);
p->state = PA_CONTEXT_READY;
p->context = pa_context_new(pa_glib_mainloop_get_api(p->mainloop), name);
pa_context_set_state_callback(p->context, context_state_callback, pulse);
if (pa_context_connect(p->context, NULL, 0, NULL) < 0) {
g_warning("pa_context_connect() failed: %s",
pa_strerror(pa_context_errno(p->context)));
goto error;
}
p->playback.name = g_strconcat("sink-input-by-application-name:",
g_get_application_name(), NULL);
p->record.name = g_strconcat("source-output-by-application-name:",
g_get_application_name(), NULL);
return pulse;
error:
g_object_unref(pulse);
return NULL;
}
static gboolean free_async_task(gpointer user_data)
{
struct async_task *task = user_data;
if (task == NULL)
return G_SOURCE_REMOVE;
if (task->pa_op != NULL) {
pa_operation_cancel(task->pa_op);
g_clear_pointer(&task->pa_op, pa_operation_unref);
}
if (task->pulse) {
if (task->pulse->priv->pending_restore_task == task) {
task->pulse->priv->pending_restore_task = NULL;
}
g_object_unref(task->pulse);
}
if (task->main_channel)
g_object_unref(task->main_channel);
if (task->pa_op != NULL)
pa_operation_unref(task->pa_op);
g_cancellable_disconnect(g_task_get_cancellable(task->gtask), task->cancel_id);
if (task->gtask)
g_object_unref(task->gtask);
g_free(task);
return G_SOURCE_REMOVE;
}
static void cancel_task(GCancellable *cancellable, gpointer user_data)
{
struct async_task *task = user_data;
g_return_if_fail(task != NULL);
free_async_task(task);
}
static void complete_task(SpicePulse *pulse, struct async_task *task, const gchar *err_msg)
{
SpicePulsePrivate *p = pulse->priv;
/* If we do have any err_msg, we failed */
if (err_msg != NULL) {
g_task_return_new_error(task->gtask,
SPICE_CLIENT_ERROR,
SPICE_CLIENT_ERROR_FAILED,
"restore-info failed: %s",
err_msg);
/* Volume-info does not change if stream is not found */
} else if ((task->is_playback == TRUE && p->playback.info_updated == FALSE) ||
(task->is_playback == FALSE && p->record.info_updated == FALSE)) {
g_task_return_new_error(task->gtask,
SPICE_CLIENT_ERROR,
SPICE_CLIENT_ERROR_FAILED,
"Stream not found by pulse");
} else {
g_task_return_boolean(task->gtask, TRUE);
}
}
static void spice_pulse_complete_async_task(struct async_task *task, const gchar *err_msg)
{
SpicePulsePrivate *p;
g_return_if_fail(task != NULL);
p = task->pulse->priv;
complete_task(task->pulse, task, err_msg);
if (p->results != NULL) {
p->results = g_list_remove(p->results, task);
SPICE_DEBUG("Number of async task is %u", g_list_length(p->results));
}
free_async_task(task);
}
static void spice_pulse_complete_all_async_tasks(SpicePulse *pulse, const gchar *err_msg)
{
SpicePulsePrivate *p;
GList *it;
g_return_if_fail(pulse != NULL);
p = pulse->priv;
/* Complete all tasks in list */
for(it = p->results; it != NULL; it = it->next) {
struct async_task *task = it->data;
complete_task(pulse, task, err_msg);
free_async_task(task);
}
g_clear_pointer(&p->results, g_list_free);
SPICE_DEBUG("All async tasks completed");
}
static void stream_restore_read_cb(pa_context *context,
const pa_ext_stream_restore_info *info,
int eol,
void *userdata)
{
SpicePulsePrivate *p = SPICE_PULSE(userdata)->priv;
struct stream *pstream = NULL;
if (eol ||
(p->playback.info_updated == TRUE &&
p->record.info_updated == TRUE)) {
/* We only have one pa_operation running the stream-restore-info
* which retrieves volume-info from both Playback and Record channels;
* We can complete all async tasks now that this operation ended.
* (or we already have the volume-info we want)
* Note: the following function cancel the current pa_operation */
spice_pulse_complete_all_async_tasks(SPICE_PULSE(userdata), NULL);
return;
}
if (g_strcmp0(info->name, p->playback.name) == 0) {
pstream = &p->playback;
} else if (g_strcmp0(info->name, p->record.name) == 0) {
pstream = &p->record;
} else {
/* This is not the stream you are looking for. */
return;
}
if (info->channel_map.channels == 0) {
SPICE_DEBUG("Number of channels stored is zero. Ignore. (%s)", info->name);
return;
}
pstream->info_updated = TRUE;
pstream->info.name = pstream->name;
pstream->info.mute = info->mute;
pstream->info.channel_map = info->channel_map;
pstream->info.volume = info->volume;
}
#if PA_CHECK_VERSION(1,0,0)
static void source_output_info_cb(pa_context *context,
const pa_source_output_info *info,
int eol,
void *userdata)
#else
static void source_info_cb(pa_context *context,
const pa_source_info *info,
int eol,
void *userdata)
#endif
{
struct async_task *task = userdata;
SpicePulsePrivate *p = task->pulse->priv;
struct stream *pstream = &p->record;
if (eol) {
spice_pulse_complete_async_task(task, NULL);
return;
}
pstream->info_updated = TRUE;
pstream->info.name = pstream->name;
pstream->info.mute = info->mute;
pstream->info.channel_map = info->channel_map;
pstream->info.volume = info->volume;
}
static void sink_input_info_cb(pa_context *context,
const pa_sink_input_info *info,
int eol,
void *userdata)
{
struct async_task *task = userdata;
SpicePulsePrivate *p = task->pulse->priv;
struct stream *pstream = &p->playback;
if (eol) {
spice_pulse_complete_async_task(task, NULL);
return;
}
pstream->info_updated = TRUE;
pstream->info.name = pstream->name;
pstream->info.mute = info->mute;
pstream->info.channel_map = info->channel_map;
pstream->info.volume = info->volume;
}
/* to avoid code duplication */
static void pulse_stream_restore_info_async(gboolean is_playback,
SpiceAudio *audio,
GCancellable *cancellable,
SpiceMainChannel *main_channel,
GAsyncReadyCallback callback,
gpointer user_data)
{
SpicePulsePrivate *p = SPICE_PULSE(audio)->priv;
GTask *gtask;
struct async_task *task = g_malloc0(sizeof(struct async_task));
pa_operation *op = NULL;
gtask = g_task_new(audio, cancellable, callback, user_data);
task->gtask = gtask;
task->pulse = g_object_ref(audio);
task->callback = callback;
task->user_data = user_data;
task->is_playback = is_playback;
task->main_channel = g_object_ref(main_channel);
task->pa_op = NULL;
if (cancellable)
task->cancel_id = g_cancellable_connect(cancellable, G_CALLBACK(cancel_task), task, NULL);
/* If Playback/Record stream is created we use pulse API to get volume-info
* from those streams directly. If the stream is not created, retrieve last
* volume/mute values from Pulse database using the application name;
* If we already have retrieved volume-info from Pulse database then it is
* safe to return the volume-info we already have in info */
if (is_playback == TRUE &&
p->playback.stream != NULL &&
pa_stream_get_index(p->playback.stream) != PA_INVALID_INDEX) {
SPICE_DEBUG("Playback stream is created - get-sink-input-info");
p->playback.info_updated = FALSE;
op = pa_context_get_sink_input_info(p->context,
pa_stream_get_index(p->playback.stream),
sink_input_info_cb,
task);
if (!op)
goto fail;
task->pa_op = op;
} else if (is_playback == FALSE &&
p->record.stream != NULL &&
pa_stream_get_index(p->record.stream) != PA_INVALID_INDEX) {
SPICE_DEBUG("Record stream is created - get-source-output-info");
p->record.info_updated = FALSE;
#if PA_CHECK_VERSION(1,0,0)
op = pa_context_get_source_output_info(p->context,
pa_stream_get_index(p->record.stream),
source_output_info_cb,
task);
#else
op = pa_context_get_source_info_by_index(p->context,
pa_stream_get_device_index(p->record.stream),
source_info_cb,
task);
#endif
if (!op)
goto fail;
task->pa_op = op;
} else {
if (p->playback.info.name != NULL ||
p->record.info.name != NULL) {
/* If the pstream->info.name is set then we already have updated
* volume information. We can complete the request now */
SPICE_DEBUG("Return the volume-information we already have");
spice_pulse_complete_async_task(task, NULL);
return;
}
if (p->results == NULL) {
SPICE_DEBUG("Streams are not created - ext-stream-restore");
p->playback.info_updated = FALSE;
p->record.info_updated = FALSE;
if (pa_context_get_state(p->context) == PA_CONTEXT_READY) {
/* Restore value from pulse db */
op = pa_ext_stream_restore_read(p->context, stream_restore_read_cb, audio);
if (!op)
goto fail;
task->pa_op = op;
} else {
/* It is possible that we want to get volume-info before the
* context is in READY state. In this case, we wait for the
* context state change to READY. */
p->pending_restore_task = task;
}
}
}
p->results = g_list_append(p->results, task);
SPICE_DEBUG ("Number of async task is %u", g_list_length(p->results));
return;
fail:
if (!op) {
g_task_report_new_error(audio,
callback,
user_data,
pulse_stream_restore_info_async,
SPICE_CLIENT_ERROR,
SPICE_CLIENT_ERROR_FAILED,
"Volume-Info failed: %s",
pa_strerror(pa_context_errno(p->context)));
free_async_task(task);
}
}
/* to avoid code duplication */
static gboolean pulse_stream_restore_info_finish(gboolean is_playback,
SpiceAudio *audio,
GAsyncResult *res,
gboolean *mute,
guint8 *nchannels,
guint16 **volume,
GError **error)
{
SpicePulsePrivate *p = SPICE_PULSE(audio)->priv;
struct stream *pstream = (is_playback) ? &p->playback : &p->record;
GTask *task = G_TASK(res);
g_return_val_if_fail(g_task_is_valid(task, G_OBJECT(audio)), FALSE);
if (g_task_had_error(task)) {
/* set out args that should have new alloc'ed memory to NULL */
if (volume != NULL) {
*volume = NULL;
}
return g_task_propagate_boolean(task, error);
}
if (mute != NULL) {
*mute = (pstream->info.mute) ? TRUE : FALSE;
}
if (nchannels != NULL) {
*nchannels = pstream->info.channel_map.channels;
}
if (volume != NULL) {
gint i;
*volume = g_new(guint16, pstream->info.channel_map.channels);
for (i = 0; i < pstream->info.channel_map.channels; i++) {
(*volume)[i] = MIN(pstream->info.volume.values[i], G_MAXUINT16);
SPICE_DEBUG("(%s) volume at channel %d is %u",
(is_playback) ? "playback" : "record", i, (*volume)[i]);
}
}
return g_task_propagate_boolean(task, error);
}
static void spice_pulse_get_playback_volume_info_async(SpiceAudio *audio,
GCancellable *cancellable,
SpiceMainChannel *main_channel,
GAsyncReadyCallback callback,
gpointer user_data)
{
pulse_stream_restore_info_async(TRUE, audio, cancellable, main_channel, callback, user_data);
}
static gboolean spice_pulse_get_playback_volume_info_finish(SpiceAudio *audio,
GAsyncResult *res,
gboolean *mute,
guint8 *nchannels,
guint16 **volume,
GError **error)
{
return pulse_stream_restore_info_finish(TRUE, audio, res, mute,
nchannels, volume, error);
}
static void spice_pulse_get_record_volume_info_async(SpiceAudio *audio,
GCancellable *cancellable,
SpiceMainChannel *main_channel,
GAsyncReadyCallback callback,
gpointer user_data)
{
pulse_stream_restore_info_async(FALSE, audio, cancellable, main_channel, callback, user_data);
}
static gboolean spice_pulse_get_record_volume_info_finish(SpiceAudio *audio,
GAsyncResult *res,
gboolean *mute,
guint8 *nchannels,
guint16 **volume,
GError **error)
{
return pulse_stream_restore_info_finish(FALSE, audio, res, mute,
nchannels, volume, error);
}