diff options
Diffstat (limited to 'src/pulsecore/protocol-esound.c')
-rw-r--r-- | src/pulsecore/protocol-esound.c | 183 |
1 files changed, 112 insertions, 71 deletions
diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index f963f2ad..492dc9fa 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -70,10 +70,12 @@ #define PLAYBACK_BUFFER_SECONDS (.25) #define PLAYBACK_BUFFER_FRAGMENTS (10) #define RECORD_BUFFER_SECONDS (5) -#define RECORD_BUFFER_FRAGMENTS (100) #define MAX_CACHE_SAMPLE_SIZE (2048000) +#define DEFAULT_SINK_LATENCY (150*PA_USEC_PER_MSEC) +#define DEFAULT_SOURCE_LATENCY (150*PA_USEC_PER_MSEC) + #define SCACHE_PREFIX "esound." /* This is heavily based on esound's code */ @@ -102,8 +104,9 @@ typedef struct connection { struct { pa_memblock *current_memblock; - size_t memblock_index, fragment_size; + size_t memblock_index; pa_atomic_t missing; + pa_bool_t underrun; } playback; struct { @@ -122,7 +125,7 @@ static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject); struct pa_protocol_esound { pa_module *module; pa_core *core; - int public; + pa_bool_t public; pa_socket_server *server; pa_idxset *connections; @@ -149,8 +152,9 @@ typedef struct proto_handler { const char *description; } esd_proto_handler_info_t; -static void sink_input_drop_cb(pa_sink_input *i, size_t length); -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk); +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes); +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes); static void sink_input_kill_cb(pa_sink_input *i); static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); static pa_usec_t source_output_get_latency_cb(pa_source_output *o); @@ -398,8 +402,7 @@ static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t reques CHECK_VALIDITY(sink, "No such sink: %s", c->protocol->sink_name); } - strncpy(name, data, sizeof(name)); - name[sizeof(name)-1] = 0; + pa_strlcpy(name, data, sizeof(name)); utf8_name = pa_utf8_filter(name); pa_client_set_name(c->client, utf8_name); @@ -410,34 +413,39 @@ static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t reques pa_assert(!c->sink_input && !c->input_memblockq); pa_sink_input_new_data_init(&sdata); - sdata.sink = sink; sdata.driver = __FILE__; - sdata.name = c->client->name; - pa_sink_input_new_data_set_sample_spec(&sdata, &ss); sdata.module = c->protocol->module; sdata.client = c->client; + sdata.sink = sink; + pa_proplist_update(sdata.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_sink_input_new_data_set_sample_spec(&sdata, &ss); c->sink_input = pa_sink_input_new(c->protocol->core, &sdata, 0); + pa_sink_input_new_data_done(&sdata); + CHECK_VALIDITY(c->sink_input, "Failed to create sink input."); l = (size_t) (pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS); c->input_memblockq = pa_memblockq_new( 0, l, - 0, + l, pa_frame_size(&ss), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, + 0, NULL); - pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); - c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; + pa_iochannel_socket_set_rcvbuf(c->io, l); c->sink_input->parent.process_msg = sink_input_process_msg; - c->sink_input->peek = sink_input_peek_cb; - c->sink_input->drop = sink_input_drop_cb; + c->sink_input->pop = sink_input_pop_cb; + c->sink_input->process_rewind = sink_input_process_rewind_cb; + c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; c->sink_input->kill = sink_input_kill_cb; c->sink_input->userdata = c; + pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY); + c->state = ESD_STREAMING_DATA; c->protocol->n_player++; @@ -497,8 +505,7 @@ static int esd_proto_stream_record(connection *c, esd_proto_t request, const voi } } - strncpy(name, data, sizeof(name)); - name[sizeof(name)-1] = 0; + pa_strlcpy(name, data, sizeof(name)); utf8_name = pa_utf8_filter(name); pa_client_set_name(c->client, utf8_name); @@ -509,32 +516,37 @@ static int esd_proto_stream_record(connection *c, esd_proto_t request, const voi pa_assert(!c->output_memblockq && !c->source_output); pa_source_output_new_data_init(&sdata); - sdata.source = source; sdata.driver = __FILE__; - sdata.name = c->client->name; - pa_source_output_new_data_set_sample_spec(&sdata, &ss); sdata.module = c->protocol->module; sdata.client = c->client; + sdata.source = source; + pa_proplist_update(sdata.proplist, PA_UPDATE_MERGE, c->client->proplist); + pa_source_output_new_data_set_sample_spec(&sdata, &ss); + + c->source_output = pa_source_output_new(c->protocol->core, &sdata, 0); + pa_source_output_new_data_done(&sdata); - c->source_output = pa_source_output_new(c->protocol->core, &sdata, 9); - CHECK_VALIDITY(c->source_output, "Failed to create source_output."); + CHECK_VALIDITY(c->source_output, "Failed to create source output."); l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS); c->output_memblockq = pa_memblockq_new( 0, l, - 0, + l, pa_frame_size(&ss), 1, 0, + 0, NULL); - pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2); + pa_iochannel_socket_set_sndbuf(c->io, l); c->source_output->push = source_output_push_cb; c->source_output->kill = source_output_kill_cb; c->source_output->get_latency = source_output_get_latency_cb; c->source_output->userdata = c; + pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY); + c->state = ESD_STREAMING_DATA; c->protocol->n_player++; @@ -638,8 +650,8 @@ static int esd_proto_all_info(connection *c, esd_proto_t request, const void *da memset(name, 0, ESD_NAME_MAX); /* don't leak old data */ if (conn->original_name) strncpy(name, conn->original_name, ESD_NAME_MAX); - else if (conn->client && conn->client->name) - strncpy(name, conn->client->name, ESD_NAME_MAX); + else if (conn->client && pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME)) + strncpy(name, pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME), ESD_NAME_MAX); connection_write(c, name, ESD_NAME_MAX); /* rate */ @@ -785,8 +797,7 @@ static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t reque CHECK_VALIDITY(sc_length <= MAX_CACHE_SAMPLE_SIZE, "Sample too large (%d bytes).", (int)sc_length); strcpy(name, SCACHE_PREFIX); - strncpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); - name[sizeof(name)-1] = 0; + pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name."); @@ -800,7 +811,7 @@ static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t reque c->state = ESD_CACHING_SAMPLE; - pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, &idx); + pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, c->client->proplist, &idx); idx += 1; connection_write(c, &idx, sizeof(uint32_t)); @@ -818,8 +829,7 @@ static int esd_proto_sample_get_id(connection *c, PA_GCC_UNUSED esd_proto_t requ pa_assert(length == ESD_NAME_MAX); strcpy(name, SCACHE_PREFIX); - strncpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); - name[sizeof(name)-1] = 0; + pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name."); @@ -851,7 +861,7 @@ static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, con pa_sink *sink; if ((sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) - if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM) >= 0) + if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM, c->client->proplist, NULL) >= 0) ok = idx + 1; } else { pa_assert(request == ESD_PROTO_SAMPLE_FREE); @@ -992,7 +1002,7 @@ static int do_read(connection *c) { uint32_t idx; c->scache.memchunk.index = 0; - pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, &idx); + pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, c->client->proplist, &idx); pa_memblock_unref(c->scache.memchunk.memblock); c->scache.memchunk.memblock = NULL; @@ -1012,6 +1022,7 @@ static int do_read(connection *c) { ssize_t r; size_t l; void *p; + size_t space; pa_assert(c->input_memblockq); @@ -1020,21 +1031,26 @@ static int do_read(connection *c) { if (!(l = pa_atomic_load(&c->playback.missing))) return 0; - if (l > c->playback.fragment_size) - l = c->playback.fragment_size; + if (c->playback.current_memblock) { + + space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index; - if (c->playback.current_memblock) - if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { + if (space <= 0) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; - c->playback.memblock_index = 0; } + } if (!c->playback.current_memblock) { - pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2)); + pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, 0)); c->playback.memblock_index = 0; + + space = pa_memblock_get_length(c->playback.current_memblock); } + if (l > space) + l = space; + p = pa_memblock_acquire(c->playback.current_memblock); r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l); pa_memblock_release(c->playback.current_memblock); @@ -1122,12 +1138,11 @@ static void do_work(connection *c) { if (c->dead) return; - if (pa_iochannel_is_readable(c->io)) { + if (pa_iochannel_is_readable(c->io)) if (do_read(c) < 0) goto fail; - } - if (c->state == ESD_STREAMING_DATA && c->source_output && pa_iochannel_is_hungup(c->io)) + if (c->state == ESD_STREAMING_DATA && !c->sink_input && pa_iochannel_is_hungup(c->io)) /* In case we are in capture mode we will never call read() * on the socket, hence we need to detect the hangup manually * here, instead of simply waiting for read() to return 0. */ @@ -1212,15 +1227,19 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int /* New data from the main loop */ pa_memblockq_push_align(c->input_memblockq, chunk); + if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) { + pa_log_debug("Requesting rewind due to end of underrun."); + pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE); + } + /* pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */ return 0; } - case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { + case SINK_INPUT_MESSAGE_DISABLE_PREBUF: pa_memblockq_prebuf_disable(c->input_memblockq); return 0; - } case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; @@ -1237,41 +1256,62 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int } /* Called from thread context */ -static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { +static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { connection*c; - int r; - pa_assert(i); + pa_sink_input_assert_ref(i); c = CONNECTION(i->userdata); connection_assert_ref(c); pa_assert(chunk); - if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0 && c->dead) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + + c->playback.underrun = TRUE; + + if (c->dead && pa_sink_input_safe_to_remove(i)) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + + return -1; + } else { + size_t m; - return r; + c->playback.underrun = FALSE; + + pa_memblockq_drop(c->input_memblockq, chunk->length); + m = pa_memblockq_pop_missing(c->input_memblockq); + + if (m > 0) + if (pa_atomic_add(&c->playback.missing, m) <= 0) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + + return 0; + } } /* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, size_t length) { - connection*c; - size_t old, new; +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { + connection *c; - pa_assert(i); + pa_sink_input_assert_ref(i); c = CONNECTION(i->userdata); connection_assert_ref(c); - pa_assert(length); - /* pa_log("DROP"); */ + /* If we are in an underrun, then we don't rewind */ + if (i->thread_info.underrun_for > 0) + return; - old = pa_memblockq_missing(c->input_memblockq); - pa_memblockq_drop(c->input_memblockq, length); - new = pa_memblockq_missing(c->input_memblockq); + pa_memblockq_rewind(c->input_memblockq, nbytes); +} - if (new > old) { - if (pa_atomic_add(&c->playback.missing, new - old) <= 0) - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); - } +/* Called from thread context */ +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { + connection *c; + + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + + pa_memblockq_set_maxrewind(c->input_memblockq, nbytes); } static void sink_input_kill_cb(pa_sink_input *i) { @@ -1286,7 +1326,7 @@ static void sink_input_kill_cb(pa_sink_input *i) { static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { connection *c; - pa_assert(o); + pa_source_output_assert_ref(o); c = CONNECTION(o->userdata); pa_assert(c); pa_assert(chunk); @@ -1303,7 +1343,7 @@ static void source_output_kill_cb(pa_source_output *o) { static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { connection*c; - pa_assert(o); + pa_source_output_assert_ref(o); c = CONNECTION(o->userdata); pa_assert(c); @@ -1349,7 +1389,8 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); pa_snprintf(cname, sizeof(cname), "EsounD client (%s)", pname); c->client = pa_client_new(p->core, __FILE__, cname); - c->client->owner = p->module; + pa_proplist_sets(c->client->proplist, "esound-protocol.peer", pname); + c->client->module = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; @@ -1374,11 +1415,10 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->playback.current_memblock = NULL; c->playback.memblock_index = 0; - c->playback.fragment_size = 0; + c->playback.underrun = TRUE; pa_atomic_store(&c->playback.missing, 0); - c->scache.memchunk.length = c->scache.memchunk.index = 0; - c->scache.memchunk.memblock = NULL; + pa_memchunk_reset(&c->scache.memchunk); c->scache.name = NULL; c->original_name = NULL; @@ -1436,7 +1476,7 @@ pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server *serve p->core = core; p->module = m; p->public = public; - p->server = server; + p->server = pa_socket_server_ref(server); pa_socket_server_set_callback(p->server, on_connection, p); p->connections = pa_idxset_new(NULL, NULL); @@ -1459,7 +1499,8 @@ void pa_protocol_esound_free(pa_protocol_esound *p) { connection_unlink(c); pa_idxset_free(p->connections, NULL, NULL); - pa_socket_server_unref(p->server); + if (p->server) + pa_socket_server_unref(p->server); if (p->auth_ip_acl) pa_ip_acl_free(p->auth_ip_acl); |