diff options
45 files changed, 1216 insertions, 293 deletions
diff --git a/src/daemon/main.c b/src/daemon/main.c index 38d465f80..aada0ad79 100644 --- a/src/daemon/main.c +++ b/src/daemon/main.c @@ -559,7 +559,7 @@ int main(int argc, char *argv[]) { mainloop = pa_mainloop_new(); assert(mainloop); - c = pa_core_new(pa_mainloop_get_api(mainloop)); + c = pa_core_new(pa_mainloop_get_api(mainloop), 1); assert(c); c->is_system_instance = !!conf->system_instance; diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 8da3d2360..0cebd50f1 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -492,7 +492,7 @@ int pa__init(pa_core *c, pa_module*m) { pa_log_info(__FILE__": using %u fragments of size %lu bytes.", periods, (long unsigned)u->fragment_size); - u->silence.memblock = pa_memblock_new(u->silence.length = u->fragment_size, c->memblock_stat); + u->silence.memblock = pa_memblock_new(c->mempool, u->silence.length = u->fragment_size); assert(u->silence.memblock); pa_silence_memblock(u->silence.memblock, &ss); u->silence.index = 0; diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index 4a8678c9c..c3979df17 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -151,7 +151,7 @@ static void do_read(struct userdata *u) { size_t l; if (!u->memchunk.memblock) { - u->memchunk.memblock = pa_memblock_new(u->memchunk.length = u->fragment_size, u->source->core->memblock_stat); + u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size); u->memchunk.index = 0; } diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c index 008fe6e74..5243975b5 100644 --- a/src/modules/module-combine.c +++ b/src/modules/module-combine.c @@ -235,8 +235,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink, int resample pa_frame_size(&u->sink->sample_spec), 1, 0, - NULL, - sink->core->memblock_stat); + NULL); snprintf(t, sizeof(t), "%s: output #%u", u->sink->name, u->n_outputs+1); diff --git a/src/modules/module-jack-source.c b/src/modules/module-jack-source.c index 583f3b8e9..8e659198d 100644 --- a/src/modules/module-jack-source.c +++ b/src/modules/module-jack-source.c @@ -137,7 +137,7 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_ fs = pa_frame_size(&u->source->sample_spec); - chunk.memblock = pa_memblock_new(chunk.length = u->frames_posted * fs, u->core->memblock_stat); + chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs); chunk.index = 0; for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) { diff --git a/src/modules/module-oss-mmap.c b/src/modules/module-oss-mmap.c index c783a2f1f..75ab9a9e9 100644 --- a/src/modules/module-oss-mmap.c +++ b/src/modules/module-oss-mmap.c @@ -162,10 +162,10 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) { chunk.memblock = u->out_memblocks[u->out_current] = pa_memblock_new_fixed( + u->core->mempool, (uint8_t*) u->out_mmap+u->out_fragment_size*u->out_current, u->out_fragment_size, - 1, - u->core->memblock_stat); + 1); assert(chunk.memblock); chunk.length = chunk.memblock->length; chunk.index = 0; @@ -210,7 +210,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) { pa_memchunk chunk; if (!u->in_memblocks[u->in_current]) { - chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed((uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1, u->core->memblock_stat); + chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1); chunk.length = chunk.memblock->length; chunk.index = 0; diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index ce11ee025..b9b80e725 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -217,7 +217,7 @@ static void do_read(struct userdata *u) { } do { - memchunk.memblock = pa_memblock_new(l, u->core->memblock_stat); + memchunk.memblock = pa_memblock_new(u->core->mempool, l); assert(memchunk.memblock); if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) { pa_memblock_unref(memchunk.memblock); @@ -503,7 +503,7 @@ int pa__init(pa_core *c, pa_module*m) { u->out_fragment_size = out_frag_size; u->in_fragment_size = in_frag_size; - u->silence.memblock = pa_memblock_new(u->silence.length = u->out_fragment_size, u->core->memblock_stat); + u->silence.memblock = pa_memblock_new(u->core->mempool, u->silence.length = u->out_fragment_size); assert(u->silence.memblock); pa_silence_memblock(u->silence.memblock, &ss); u->silence.index = 0; diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index 5caa60a3e..43a8dab55 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -91,7 +91,7 @@ static void do_read(struct userdata *u) { pa_module_set_used(u->module, pa_idxset_size(u->source->outputs)); if (!u->chunk.memblock) { - u->chunk.memblock = pa_memblock_new(1024, u->core->memblock_stat); + u->chunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF); u->chunk.index = chunk.length = 0; } diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c index 5ceddce08..89c3c6093 100644 --- a/src/modules/module-sine.c +++ b/src/modules/module-sine.c @@ -139,7 +139,7 @@ int pa__init(pa_core *c, pa_module*m) { goto fail; } - u->memblock = pa_memblock_new(pa_bytes_per_second(&ss), c->memblock_stat); + u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss)); calc_sine(u->memblock->data, u->memblock->length, frequency); snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency); diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c index 9bb11c094..53bffd3b1 100644 --- a/src/modules/module-tunnel.c +++ b/src/modules/module-tunnel.c @@ -651,7 +651,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata return; } - u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat); + u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool); u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX); pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u); diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c index df6f8c118..5d3f3e279 100644 --- a/src/modules/rtp/module-rtp-recv.c +++ b/src/modules/rtp/module-rtp-recv.c @@ -150,7 +150,7 @@ static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event assert(fd == s->rtp_context.fd); assert(flags == PA_IO_EVENT_INPUT); - if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0) + if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->mempool) < 0) return; if (s->sdp_info.payload != s->rtp_context.payload) { @@ -312,10 +312,10 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in s->sink_input->kill = sink_input_kill; s->sink_input->get_latency = sink_input_get_latency; - silence = pa_silence_memblock_new(&s->sink_input->sample_spec, + silence = pa_silence_memblock_new(s->userdata->core->mempool, + &s->sink_input->sample_spec, (pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))* - pa_frame_size(&s->sink_input->sample_spec), - s->userdata->core->memblock_stat); + pa_frame_size(&s->sink_input->sample_spec)); s->memblockq = pa_memblockq_new( 0, @@ -324,8 +324,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in pa_frame_size(&s->sink_input->sample_spec), pa_bytes_per_second(&s->sink_input->sample_spec)/10+1, 0, - silence, - u->core->memblock_stat); + silence); pa_memblock_unref(silence); diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c index 759aa8197..1b85c840b 100644 --- a/src/modules/rtp/module-rtp-send.c +++ b/src/modules/rtp/module-rtp-send.c @@ -297,8 +297,7 @@ int pa__init(pa_core *c, pa_module*m) { pa_frame_size(&ss), 1, 0, - NULL, - c->memblock_stat); + NULL); u->mtu = mtu; diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index ee037d421..8e77c60a6 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -149,7 +149,7 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame return c; } -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) { +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { int size; struct msghdr m; struct iovec iov; @@ -170,7 +170,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) { if (!size) return 0; - chunk->memblock = pa_memblock_new(size, st); + chunk->memblock = pa_memblock_new(pool, size); iov.iov_base = chunk->memblock->data; iov.iov_len = size; diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h index 35fbbd357..123602b23 100644 --- a/src/modules/rtp/rtp.h +++ b/src/modules/rtp/rtp.h @@ -41,7 +41,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q); pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size); -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st); +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool); void pa_rtp_context_destroy(pa_rtp_context *c); diff --git a/src/pulse/context.c b/src/pulse/context.c index 34f517f0b..b35305424 100644 --- a/src/pulse/context.c +++ b/src/pulse/context.c @@ -128,7 +128,7 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) { c->subscribe_callback = NULL; c->subscribe_userdata = NULL; - c->memblock_stat = pa_memblock_stat_new(); + c->mempool = pa_mempool_new(1); c->local = -1; c->server_list = NULL; c->server = NULL; @@ -177,7 +177,7 @@ static void context_free(pa_context *c) { if (c->playback_streams) pa_dynarray_free(c->playback_streams, NULL, NULL); - pa_memblock_stat_unref(c->memblock_stat); + pa_mempool_free(c->mempool); if (c->conf) pa_client_conf_free(c->conf); @@ -407,7 +407,9 @@ static void setup_context(pa_context *c, pa_iochannel *io) { pa_context_ref(c); assert(!c->pstream); - c->pstream = pa_pstream_new(c->mainloop, io, c->memblock_stat); + c->pstream = pa_pstream_new(c->mainloop, io, c->mempool); + + pa_pstream_use_shm(c->pstream, 1); pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c); pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c); diff --git a/src/pulse/internal.h b/src/pulse/internal.h index 96028d832..afcfaeff5 100644 --- a/src/pulse/internal.h +++ b/src/pulse/internal.h @@ -69,7 +69,7 @@ struct pa_context { pa_context_subscribe_cb_t subscribe_callback; void *subscribe_userdata; - pa_memblock_stat *memblock_stat; + pa_mempool *mempool; int local; int do_autospawn; diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 677df009d..180cd096d 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -437,8 +437,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED pa_frame_size(&s->sample_spec), 1, 0, - NULL, - s->context->memblock_stat); + NULL); } s->channel_valid = 1; @@ -604,9 +603,9 @@ int pa_stream_write( return 0; if (free_cb) - chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat); + chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1); else { - chunk.memblock = pa_memblock_new(length, s->context->memblock_stat); + chunk.memblock = pa_memblock_new(s->context->mempool, length); memcpy(chunk.memblock->data, data, length); } diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c index f74258d3c..811b96d2a 100644 --- a/src/pulsecore/cli-command.c +++ b/src/pulsecore/cli-command.c @@ -100,6 +100,7 @@ static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); +static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail); /* A method table for all available commands */ @@ -144,6 +145,7 @@ static const struct command commands[] = { { "list-props", pa_cli_command_list_props, NULL, 1}, { "move-sink-input", pa_cli_command_move_sink_input, "Move sink input to another sink (args: index, sink)", 3}, { "move-source-output", pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3}, + { "vacuum", pa_cli_command_vacuum, NULL, 1}, { NULL, NULL, NULL, 0 } }; @@ -239,23 +241,32 @@ static int pa_cli_command_source_outputs(pa_core *c, pa_tokenizer *t, pa_strbuf static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) { char s[256]; + const pa_mempool_stat *stat; assert(c && t); - pa_bytes_snprint(s, sizeof(s), c->memblock_stat->total_size); + stat = pa_mempool_get_stat(c->mempool); + pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n", - c->memblock_stat->total, - s); + stat->n_allocated, + pa_bytes_snprint(s, sizeof(s), stat->allocated_size)); - pa_bytes_snprint(s, sizeof(s), c->memblock_stat->allocated_size); pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n", - c->memblock_stat->allocated, - s); + stat->n_accumulated, + pa_bytes_snprint(s, sizeof(s), stat->accumulated_size)); + + pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n", + stat->n_imported, + pa_bytes_snprint(s, sizeof(s), stat->imported_size)); - pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)); - pa_strbuf_printf(buf, "Total sample cache size: %s.\n", s); + pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n", + stat->n_exported, + pa_bytes_snprint(s, sizeof(s), stat->exported_size)); - pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec); - pa_strbuf_printf(buf, "Default sample spec: %s\n", s); + pa_strbuf_printf(buf, "Total sample cache size: %s.\n", + pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c))); + + pa_strbuf_printf(buf, "Default sample spec: %s\n", + pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec)); pa_strbuf_printf(buf, "Default sink name: %s\n" "Default source name: %s\n", @@ -731,6 +742,15 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf return 0; } +static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) { + assert(c); + assert(t); + + pa_mempool_vacuum(c->mempool); + + return 0; +} + static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) { const char *n, *k; pa_sink_input *si; diff --git a/src/pulsecore/core-scache.c b/src/pulsecore/core-scache.c index 377dd569f..ca2408fe7 100644 --- a/src/pulsecore/core-scache.c +++ b/src/pulsecore/core-scache.c @@ -176,7 +176,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3 filename = buf; #endif - if (pa_sound_file_load(filename, &ss, &map, &chunk, c->memblock_stat) < 0) + if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0) return -1; r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx); @@ -261,7 +261,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t return -1; if (e->lazy && !e->memchunk.memblock) { - if (pa_sound_file_load(e->filename, &e->sample_spec, &e->channel_map, &e->memchunk, c->memblock_stat) < 0) + if (pa_sound_file_load(c->mempool, e->filename, &e->sample_spec, &e->channel_map, &e->memchunk) < 0) return -1; pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index); diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c index 7f2f0f60d..5fdeab569 100644 --- a/src/pulsecore/core.c +++ b/src/pulsecore/core.c @@ -44,7 +44,7 @@ #include "core.h" -pa_core* pa_core_new(pa_mainloop_api *m) { +pa_core* pa_core_new(pa_mainloop_api *m, int shared) { pa_core* c; c = pa_xnew(pa_core, 1); @@ -78,7 +78,7 @@ pa_core* pa_core_new(pa_mainloop_api *m) { PA_LLIST_HEAD_INIT(pa_subscription_event, c->subscription_event_queue); c->subscription_event_last = NULL; - c->memblock_stat = pa_memblock_stat_new(); + c->mempool = pa_mempool_new(shared); c->disallow_module_loading = 0; @@ -139,7 +139,7 @@ void pa_core_free(pa_core *c) { pa_xfree(c->default_source_name); pa_xfree(c->default_sink_name); - pa_memblock_stat_unref(c->memblock_stat); + pa_mempool_free(c->mempool); pa_property_cleanup(c); diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h index f9fa386e8..3a34d297a 100644 --- a/src/pulsecore/core.h +++ b/src/pulsecore/core.h @@ -67,7 +67,7 @@ struct pa_core { PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue); pa_subscription_event *subscription_event_last; - pa_memblock_stat *memblock_stat; + pa_mempool *mempool; int disallow_module_loading, running_as_daemon; int exit_idle_time, module_idle_time, scache_idle_time; @@ -88,7 +88,7 @@ struct pa_core { hook_source_disconnect; }; -pa_core* pa_core_new(pa_mainloop_api *m); +pa_core* pa_core_new(pa_mainloop_api *m, int shared); void pa_core_free(pa_core*c); /* Check whether noone is connected to this core */ diff --git a/src/pulsecore/mcalign.c b/src/pulsecore/mcalign.c index 8283a7a0b..9ede610d7 100644 --- a/src/pulsecore/mcalign.c +++ b/src/pulsecore/mcalign.c @@ -35,10 +35,9 @@ struct pa_mcalign { size_t base; pa_memchunk leftover, current; - pa_memblock_stat *memblock_stat; }; -pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) { +pa_mcalign *pa_mcalign_new(size_t base) { pa_mcalign *m; assert(base); @@ -47,7 +46,6 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) { m->base = base; pa_memchunk_reset(&m->leftover); pa_memchunk_reset(&m->current); - m->memblock_stat = s; return m; } @@ -100,7 +98,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) { l = c->length; /* Can we use the current block? */ - pa_memchunk_make_writable(&m->leftover, m->memblock_stat, m->base); + pa_memchunk_make_writable(&m->leftover, m->base); memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l); m->leftover.length += l; diff --git a/src/pulsecore/mcalign.h b/src/pulsecore/mcalign.h index 80e374996..94e99e212 100644 --- a/src/pulsecore/mcalign.h +++ b/src/pulsecore/mcalign.h @@ -63,7 +63,7 @@ typedef struct pa_mcalign pa_mcalign; -pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s); +pa_mcalign *pa_mcalign_new(size_t base); void pa_mcalign_free(pa_mcalign *m); /* Push a new memchunk into the aligner. The caller of this routine diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 36de17fb3..4ce1b7c1a 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -27,86 +27,271 @@ #include <stdlib.h> #include <assert.h> #include <string.h> +#include <unistd.h> #include <pulse/xmalloc.h> +#include <pulsecore/shm.h> +#include <pulsecore/log.h> +#include <pulsecore/hashmap.h> + #include "memblock.h" -static void stat_add(pa_memblock*m, pa_memblock_stat *s) { - assert(m); +#define PA_MEMPOOL_SLOTS_MAX 128 +#define PA_MEMPOOL_SLOT_SIZE (16*1024) - if (!s) { - m->stat = NULL; - return; - } +#define PA_MEMEXPORT_SLOTS_MAX 128 + +#define PA_MEMIMPORT_SLOTS_MAX 128 +#define PA_MEMIMPORT_SEGMENTS_MAX 16 + +struct pa_memimport_segment { + pa_memimport *import; + pa_shm memory; + unsigned n_blocks; +}; + +struct pa_memimport { + pa_mempool *pool; + pa_hashmap *segments; + pa_hashmap *blocks; + + /* Called whenever an imported memory block is no longer + * needed. */ + pa_memimport_release_cb_t release_cb; + void *userdata; + + PA_LLIST_FIELDS(pa_memimport); +}; + +struct memexport_slot { + PA_LLIST_FIELDS(struct memexport_slot); + pa_memblock *block; +}; + +struct pa_memexport { + pa_mempool *pool; + + struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX]; + PA_LLIST_HEAD(struct memexport_slot, free_slots); + PA_LLIST_HEAD(struct memexport_slot, used_slots); + unsigned n_init; + + /* Called whenever a client from which we imported a memory block + which we in turn exported to another client dies and we need to + revoke the memory block accordingly */ + pa_memexport_revoke_cb_t revoke_cb; + void *userdata; + + PA_LLIST_FIELDS(pa_memexport); +}; + +struct mempool_slot { + PA_LLIST_FIELDS(struct mempool_slot); + /* the actual data follows immediately hereafter */ +}; - m->stat = pa_memblock_stat_ref(s); - s->total++; - s->allocated++; - s->total_size += m->length; - s->allocated_size += m->length; +struct pa_mempool { + pa_shm memory; + size_t block_size; + unsigned n_blocks, n_init; + + PA_LLIST_HEAD(pa_memimport, imports); + PA_LLIST_HEAD(pa_memexport, exports); + + /* A list of free slots that may be reused */ + PA_LLIST_HEAD(struct mempool_slot, free_slots); + PA_LLIST_HEAD(struct mempool_slot, used_slots); + + pa_mempool_stat stat; +}; + +static void segment_detach(pa_memimport_segment *seg); + +static void stat_add(pa_memblock*b) { + assert(b); + assert(b->pool); + + b->pool->stat.n_allocated ++; + b->pool->stat.n_accumulated ++; + b->pool->stat.allocated_size += b->length; + b->pool->stat.accumulated_size += b->length; + + if (b->type == PA_MEMBLOCK_IMPORTED) { + b->pool->stat.n_imported++; + b->pool->stat.imported_size += b->length; + } } -static void stat_remove(pa_memblock *m) { - assert(m); +static void stat_remove(pa_memblock *b) { + assert(b); + assert(b->pool); - if (!m->stat) - return; + assert(b->pool->stat.n_allocated > 0); + assert(b->pool->stat.allocated_size >= b->length); + + b->pool->stat.n_allocated --; + b->pool->stat.allocated_size -= b->length; + + if (b->type == PA_MEMBLOCK_IMPORTED) { + assert(b->pool->stat.n_imported > 0); + assert(b->pool->stat.imported_size >= b->length); + + b->pool->stat.n_imported --; + b->pool->stat.imported_size -= b->length; + } +} - m->stat->total--; - m->stat->total_size -= m->length; +static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length); + +pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) { + pa_memblock *b; - pa_memblock_stat_unref(m->stat); - m->stat = NULL; + assert(p); + assert(length > 0); + + if (!(b = pa_memblock_new_pool(p, length))) + b = memblock_new_appended(p, length); + + return b; } -pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s) { - pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)+length); +static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) { + pa_memblock *b; + + assert(p); + assert(length > 0); + + b = pa_xmalloc(sizeof(pa_memblock) + length); b->type = PA_MEMBLOCK_APPENDED; + b->read_only = 0; b->ref = 1; b->length = length; - b->data = b+1; - b->free_cb = NULL; - b->read_only = 0; - stat_add(b, s); + b->data = (uint8_t*) b + sizeof(pa_memblock); + b->pool = p; + + stat_add(b); return b; } -pa_memblock *pa_memblock_new_dynamic(void *d, size_t length, pa_memblock_stat*s) { - pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)); - b->type = PA_MEMBLOCK_DYNAMIC; - b->ref = 1; +static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) { + struct mempool_slot *slot; + assert(p); + + if (p->free_slots) { + slot = p->free_slots; + PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot); + } else if (p->n_init < p->n_blocks) + slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++)); + else { + pa_log_debug(__FILE__": Pool full"); + p->stat.n_pool_full++; + return NULL; + } + + PA_LLIST_PREPEND(struct mempool_slot, p->used_slots, slot); + return slot; +} + +static void* mempool_slot_data(struct mempool_slot *slot) { + assert(slot); + + return (uint8_t*) slot + sizeof(struct mempool_slot); +} + +static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) { + assert(p); + assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr); + assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size); + + return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size; +} + +static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) { + unsigned idx; + + if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1) + return NULL; + + return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size)); +} + +pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) { + pa_memblock *b = NULL; + struct mempool_slot *slot; + + assert(p); + assert(length > 0); + + if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) { + + if (!(slot = mempool_allocate_slot(p))) + return NULL; + + b = mempool_slot_data(slot); + b->type = PA_MEMBLOCK_POOL; + b->data = (uint8_t*) b + sizeof(pa_memblock); + + } else if (p->block_size - sizeof(struct mempool_slot) >= length) { + + if (!(slot = mempool_allocate_slot(p))) + return NULL; + + b = pa_xnew(pa_memblock, 1); + b->type = PA_MEMBLOCK_POOL_EXTERNAL; + b->data = mempool_slot_data(slot); + } else { + pa_log_debug(__FILE__": Memory block to large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot)); + p->stat.n_too_large_for_pool++; + return NULL; + } + b->length = length; - b->data = d; - b->free_cb = NULL; b->read_only = 0; - stat_add(b, s); + b->ref = 1; + b->pool = p; + + stat_add(b); return b; } -pa_memblock *pa_memblock_new_fixed(void *d, size_t length, int read_only, pa_memblock_stat*s) { - pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)); +pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) { + pa_memblock *b; + + assert(p); + assert(d); + assert(length > 0); + + b = pa_xnew(pa_memblock, 1); b->type = PA_MEMBLOCK_FIXED; + b->read_only = read_only; b->ref = 1; b->length = length; b->data = d; - b->free_cb = NULL; - b->read_only = read_only; - stat_add(b, s); + b->pool = p; + + stat_add(b); return b; } -pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s) { +pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) { pa_memblock *b; - assert(d && length && free_cb); - b = pa_xmalloc(sizeof(pa_memblock)); + + assert(p); + assert(d); + assert(length > 0); + assert(free_cb); + + b = pa_xnew(pa_memblock, 1); b->type = PA_MEMBLOCK_USER; + b->read_only = read_only; b->ref = 1; b->length = length; b->data = d; - b->free_cb = free_cb; - b->read_only = read_only; - stat_add(b, s); + b->per_type.user.free_cb = free_cb; + b->pool = p; + + stat_add(b); return b; } @@ -122,52 +307,458 @@ void pa_memblock_unref(pa_memblock*b) { assert(b); assert(b->ref >= 1); - if ((--(b->ref)) == 0) { - stat_remove(b); + if ((--(b->ref)) > 0) + return; + + stat_remove(b); + + switch (b->type) { + case PA_MEMBLOCK_USER : + assert(b->per_type.user.free_cb); + b->per_type.user.free_cb(b->data); + + /* Fall through */ + + case PA_MEMBLOCK_FIXED: + case PA_MEMBLOCK_APPENDED : + pa_xfree(b); + break; + + case PA_MEMBLOCK_IMPORTED : { + pa_memimport_segment *segment; + + segment = b->per_type.imported.segment; + assert(segment); + assert(segment->import); + + pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id)); + segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata); + + if (-- segment->n_blocks <= 0) + segment_detach(segment); + + pa_xfree(b); + break; + } - if (b->type == PA_MEMBLOCK_USER) { - assert(b->free_cb); - b->free_cb(b->data); - } else if (b->type == PA_MEMBLOCK_DYNAMIC) - pa_xfree(b->data); + case PA_MEMBLOCK_POOL_EXTERNAL: + case PA_MEMBLOCK_POOL: { + struct mempool_slot *slot; - pa_xfree(b); + slot = mempool_slot_by_ptr(b->pool, b->data); + assert(slot); + + PA_LLIST_REMOVE(struct mempool_slot, b->pool->used_slots, slot); + PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot); + + if (b->type == PA_MEMBLOCK_POOL_EXTERNAL) + pa_xfree(b); + } } } +static void memblock_make_local(pa_memblock *b) { + assert(b); + + if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) { + struct mempool_slot *slot; + + if ((slot = mempool_allocate_slot(b->pool))) { + void *new_data; + /* We can move it into a local pool, perfect! */ + + b->type = PA_MEMBLOCK_POOL_EXTERNAL; + b->read_only = 0; + + new_data = mempool_slot_data(slot); + memcpy(new_data, b->data, b->length); + b->data = new_data; + return; + } + } + + /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */ + b->type = PA_MEMBLOCK_USER; + b->per_type.user.free_cb = pa_xfree; + b->read_only = 0; + b->data = pa_xmemdup(b->data, b->length); +} + void pa_memblock_unref_fixed(pa_memblock *b) { - assert(b && b->ref >= 1 && b->type == PA_MEMBLOCK_FIXED); + assert(b); + assert(b->ref >= 1); + assert(b->type == PA_MEMBLOCK_FIXED); - if (b->ref == 1) - pa_memblock_unref(b); - else { - b->data = pa_xmemdup(b->data, b->length); - b->type = PA_MEMBLOCK_DYNAMIC; - b->ref--; + if (b->ref > 1) + memblock_make_local(b); + + pa_memblock_unref(b); +} + +static void memblock_replace_import(pa_memblock *b) { + pa_memimport_segment *seg; + + assert(b); + assert(b->type == PA_MEMBLOCK_IMPORTED); + + assert(b->pool->stat.n_imported > 0); + assert(b->pool->stat.imported_size >= b->length); + b->pool->stat.n_imported --; + b->pool->stat.imported_size -= b->length; + + seg = b->per_type.imported.segment; + assert(seg); + assert(seg->import); + + pa_hashmap_remove( + seg->import->blocks, + PA_UINT32_TO_PTR(b->per_type.imported.id)); + + memblock_make_local(b); + + if (-- seg->n_blocks <= 0) + segment_detach(seg); +} + +pa_mempool* pa_mempool_new(int shared) { + size_t ps; + pa_mempool *p; + + p = pa_xnew(pa_mempool, 1); + + ps = (size_t) sysconf(_SC_PAGESIZE); + + p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps; + + if (p->block_size < ps) + p->block_size = ps; + + p->n_blocks = PA_MEMPOOL_SLOTS_MAX; + + assert(p->block_size > sizeof(struct mempool_slot)); + + if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) { + pa_xfree(p); + return NULL; + } + + p->n_init = 0; + + PA_LLIST_HEAD_INIT(pa_memimport, p->imports); + PA_LLIST_HEAD_INIT(pa_memexport, p->exports); + PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots); + PA_LLIST_HEAD_INIT(struct mempool_slot, p->used_slots); + + memset(&p->stat, 0, sizeof(p->stat)); + + return p; +} + +void pa_mempool_free(pa_mempool *p) { + assert(p); + + while (p->imports) + pa_memimport_free(p->imports); + + while (p->exports) + pa_memexport_free(p->exports); + + if (p->stat.n_allocated > 0) + pa_log_warn(__FILE__": WARNING! Memory pool destroyed but not all memory blocks freed!"); + + pa_shm_free(&p->memory); + pa_xfree(p); +} + +const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) { + assert(p); + + return &p->stat; +} + +void pa_mempool_vacuum(pa_mempool *p) { + struct mempool_slot *slot; + + assert(p); + + for (slot = p->free_slots; slot; slot = slot->next) { + pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot)); } } -pa_memblock_stat* pa_memblock_stat_new(void) { - pa_memblock_stat *s; +int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) { + assert(p); - s = pa_xmalloc(sizeof(pa_memblock_stat)); - s->ref = 1; - s->total = s->total_size = s->allocated = s->allocated_size = 0; + if (!p->memory.shared) + return -1; - return s; + *id = p->memory.id; + + return 0; +} + +/* For recieving blocks from other nodes */ +pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) { + pa_memimport *i; + + assert(p); + assert(cb); + + i = pa_xnew(pa_memimport, 1); + i->pool = p; + i->segments = pa_hashmap_new(NULL, NULL); + i->blocks = pa_hashmap_new(NULL, NULL); + i->release_cb = cb; + i->userdata = userdata; + + PA_LLIST_PREPEND(pa_memimport, p->imports, i); + return i; } -void pa_memblock_stat_unref(pa_memblock_stat *s) { - assert(s && s->ref >= 1); +static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i); + +static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) { + pa_memimport_segment* seg; - if (!(--(s->ref))) { - assert(!s->total); - pa_xfree(s); + if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX) + return NULL; + + seg = pa_xnew(pa_memimport_segment, 1); + + if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) { + pa_xfree(seg); + return NULL; } + + seg->import = i; + seg->n_blocks = 0; + + pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg); + return seg; +} + +static void segment_detach(pa_memimport_segment *seg) { + assert(seg); + + pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id)); + pa_shm_free(&seg->memory); + pa_xfree(seg); +} + +void pa_memimport_free(pa_memimport *i) { + pa_memexport *e; + pa_memblock *b; + + assert(i); + + /* If we've exported this block further we need to revoke that export */ + for (e = i->pool->exports; e; e = e->next) + memexport_revoke_blocks(e, i); + + while ((b = pa_hashmap_get_first(i->blocks))) + memblock_replace_import(b); + + assert(pa_hashmap_size(i->segments) == 0); + + pa_hashmap_free(i->blocks, NULL, NULL); + pa_hashmap_free(i->segments, NULL, NULL); + + PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i); + pa_xfree(i); } -pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s) { - assert(s); - s->ref++; - return s; +pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) { + pa_memblock *b; + pa_memimport_segment *seg; + + assert(i); + + if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX) + return NULL; + + if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id)))) + if (!(seg = segment_attach(i, shm_id))) + return NULL; + + if (offset+size > seg->memory.size) + return NULL; + + b = pa_xnew(pa_memblock, 1); + b->type = PA_MEMBLOCK_IMPORTED; + b->read_only = 1; + b->ref = 1; + b->length = size; + b->data = (uint8_t*) seg->memory.ptr + offset; + b->pool = i->pool; + b->per_type.imported.id = block_id; + b->per_type.imported.segment = seg; + + pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b); + + seg->n_blocks++; + + stat_add(b); + + return b; +} + +int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) { + pa_memblock *b; + assert(i); + + if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id)))) + return -1; + + memblock_replace_import(b); + return 0; +} + +/* For sending blocks to other nodes */ +pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) { + pa_memexport *e; + + assert(p); + assert(cb); + + if (!p->memory.shared) + return NULL; + + e = pa_xnew(pa_memexport, 1); + e->pool = p; + PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots); + PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots); + e->n_init = 0; + e->revoke_cb = cb; + e->userdata = userdata; + + PA_LLIST_PREPEND(pa_memexport, p->exports, e); + return e; +} + +void pa_memexport_free(pa_memexport *e) { + assert(e); + + while (e->used_slots) + pa_memexport_process_release(e, e->used_slots - e->slots); + + PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e); + pa_xfree(e); +} + +int pa_memexport_process_release(pa_memexport *e, uint32_t id) { + assert(e); + + if (id >= e->n_init) + return -1; + + if (!e->slots[id].block) + return -1; + +/* pa_log("Processing release for %u", id); */ + + assert(e->pool->stat.n_exported > 0); + assert(e->pool->stat.exported_size >= e->slots[id].block->length); + + e->pool->stat.n_exported --; + e->pool->stat.exported_size -= e->slots[id].block->length; + + pa_memblock_unref(e->slots[id].block); + e->slots[id].block = NULL; + + PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]); + PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]); + + return 0; +} + +static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) { + struct memexport_slot *slot, *next; + assert(e); + assert(i); + + for (slot = e->used_slots; slot; slot = next) { + uint32_t idx; + next = slot->next; + + if (slot->block->type != PA_MEMBLOCK_IMPORTED || + slot->block->per_type.imported.segment->import != i) + continue; + + idx = slot - e->slots; + e->revoke_cb(e, idx, e->userdata); + pa_memexport_process_release(e, idx); + } +} + +static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) { + pa_memblock *n; + + assert(p); + assert(b); + + if (b->type == PA_MEMBLOCK_IMPORTED || + b->type == PA_MEMBLOCK_POOL || + b->type == PA_MEMBLOCK_POOL_EXTERNAL) { + assert(b->pool == p); + return pa_memblock_ref(b); + } + + if (!(n = pa_memblock_new_pool(p, b->length))) + return NULL; + + memcpy(n->data, b->data, b->length); + return n; +} + +int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) { + pa_shm *memory; + struct memexport_slot *slot; + + assert(e); + assert(b); + assert(block_id); + assert(shm_id); + assert(offset); + assert(size); + assert(b->pool == e->pool); + + if (!(b = memblock_shared_copy(e->pool, b))) + return -1; + + if (e->free_slots) { + slot = e->free_slots; + PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot); + } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) { + slot = &e->slots[e->n_init++]; + } else { + pa_memblock_unref(b); + return -1; + } + + PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot); + slot->block = b; + *block_id = slot - e->slots; + +/* pa_log("Got block id %u", *block_id); */ + + if (b->type == PA_MEMBLOCK_IMPORTED) { + assert(b->per_type.imported.segment); + memory = &b->per_type.imported.segment->memory; + } else { + assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL); + assert(b->pool); + memory = &b->pool->memory; + } + + assert(b->data >= memory->ptr); + assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size); + + *shm_id = memory->id; + *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr; + *size = b->length; + + e->pool->stat.n_exported ++; + e->pool->stat.exported_size += b->length; + + return 0; } diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index 04a0b55b1..e63e1e0fe 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -1,5 +1,5 @@ -#ifndef foomemblockhfoo -#define foomemblockhfoo +#ifndef foopulsememblockhfoo +#define foopulsememblockhfoo /* $Id$ */ @@ -25,6 +25,8 @@ #include <sys/types.h> #include <inttypes.h> +#include <pulsecore/llist.h> + /* A pa_memblock is a reference counted memory block. PulseAudio * passed references to pa_memblocks around instead of copying * data. See pa_memchunk for a structure that describes parts of @@ -32,43 +34,72 @@ /* The type of memory this block points to */ typedef enum pa_memblock_type { - PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */ - PA_MEMBLOCK_APPENDED, /* The most common kind: the data is appended to the memory block */ - PA_MEMBLOCK_DYNAMIC, /* data is a pointer to some memory allocated with pa_xmalloc() */ - PA_MEMBLOCK_USER /* User supplied memory, to be freed with free_cb */ + PA_MEMBLOCK_POOL, /* Memory is part of the memory pool */ + PA_MEMBLOCK_POOL_EXTERNAL, /* Data memory is part of the memory pool but the pa_memblock structure itself not */ + PA_MEMBLOCK_APPENDED, /* the data is appended to the memory block */ + PA_MEMBLOCK_USER, /* User supplied memory, to be freed with free_cb */ + PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */ + PA_MEMBLOCK_IMPORTED, /* Memory is imported from another process via shm */ } pa_memblock_type_t; -/* A structure of keeping memory block statistics */ -/* Maintains statistics about memory blocks */ -typedef struct pa_memblock_stat { - int ref; - unsigned total; - unsigned total_size; - unsigned allocated; - unsigned allocated_size; -} pa_memblock_stat; - -typedef struct pa_memblock { +typedef struct pa_memblock pa_memblock; +typedef struct pa_mempool pa_mempool; +typedef struct pa_mempool_stat pa_mempool_stat; +typedef struct pa_memimport_segment pa_memimport_segment; +typedef struct pa_memimport pa_memimport; +typedef struct pa_memexport pa_memexport; + +typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata); +typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata); + +struct pa_memblock { pa_memblock_type_t type; - unsigned ref; /* the reference counter */ int read_only; /* boolean */ + unsigned ref; /* the reference counter */ size_t length; void *data; - void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */ - pa_memblock_stat *stat; -} pa_memblock; + pa_mempool *pool; -/* Allocate a new memory block of type PA_MEMBLOCK_APPENDED */ -pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s); + union { + struct { + void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */ + } user; + + struct { + uint32_t id; + pa_memimport_segment *segment; + } imported; + } per_type; +}; -/* Allocate a new memory block of type PA_MEMBLOCK_DYNAMIC. The pointer data is to be maintained be the memory block */ -pa_memblock *pa_memblock_new_dynamic(void *data, size_t length, pa_memblock_stat*s); +struct pa_mempool_stat { + unsigned n_allocated; + unsigned n_accumulated; + unsigned n_imported; + unsigned n_exported; + size_t allocated_size; + size_t accumulated_size; + size_t imported_size; + size_t exported_size; -/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */ -pa_memblock *pa_memblock_new_fixed(void *data, size_t length, int read_only, pa_memblock_stat*s); + unsigned n_too_large_for_pool; + unsigned n_pool_full; +}; + +/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */ +pa_memblock *pa_memblock_new(pa_mempool *, size_t length); + +/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL. If the requested size is too large, return NULL */ +pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length); /* Allocate a new memory block of type PA_MEMBLOCK_USER */ -pa_memblock *pa_memblock_new_user(void *data, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s); +pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only); + +/* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc() */ +#define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0) + +/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */ +pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only); void pa_memblock_unref(pa_memblock*b); pa_memblock* pa_memblock_ref(pa_memblock*b); @@ -79,8 +110,23 @@ references to the memory. This causes the memory to be copied and converted into a PA_MEMBLOCK_DYNAMIC type memory block */ void pa_memblock_unref_fixed(pa_memblock*b); -pa_memblock_stat* pa_memblock_stat_new(void); -void pa_memblock_stat_unref(pa_memblock_stat *s); -pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s); +/* The memory block manager */ +pa_mempool* pa_mempool_new(int shared); +void pa_mempool_free(pa_mempool *p); +const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p); +void pa_mempool_vacuum(pa_mempool *p); +int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id); + +/* For recieving blocks from other nodes */ +pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata); +void pa_memimport_free(pa_memimport *i); +pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size); +int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id); + +/* For sending blocks to other nodes */ +pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata); +void pa_memexport_free(pa_memexport *e); +int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size); +int pa_memexport_process_release(pa_memexport *e, uint32_t id); #endif diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index 822bd66cd..2fd38850e 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -49,7 +49,6 @@ struct pa_memblockq { size_t maxlength, tlength, base, prebuf, minreq; int64_t read_index, write_index; enum { PREBUF, RUNNING } state; - pa_memblock_stat *memblock_stat; pa_memblock *silence; pa_mcalign *mcalign; }; @@ -61,8 +60,7 @@ pa_memblockq* pa_memblockq_new( size_t base, size_t prebuf, size_t minreq, - pa_memblock *silence, - pa_memblock_stat *s) { + pa_memblock *silence) { pa_memblockq* bq; @@ -75,7 +73,6 @@ pa_memblockq* pa_memblockq_new( bq->base = base; bq->read_index = bq->write_index = idx; - bq->memblock_stat = s; pa_log_debug(__FILE__": memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq); @@ -586,7 +583,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) { return pa_memblockq_push(bq, chunk); if (!bq->mcalign) - bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat); + bq->mcalign = pa_mcalign_new(bq->base); if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length))) return -1; diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h index c35b62ddd..4d701a80e 100644 --- a/src/pulsecore/memblockq.h +++ b/src/pulsecore/memblockq.h @@ -69,8 +69,7 @@ pa_memblockq* pa_memblockq_new( size_t base, size_t prebuf, size_t minreq, - pa_memblock *silence, - pa_memblock_stat *s); + pa_memblock *silence); void pa_memblockq_free(pa_memblockq*bq); diff --git a/src/pulsecore/memchunk.c b/src/pulsecore/memchunk.c index abfc2cab7..bcf0ce043 100644 --- a/src/pulsecore/memchunk.c +++ b/src/pulsecore/memchunk.c @@ -32,10 +32,13 @@ #include "memchunk.h" -void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) { +void pa_memchunk_make_writable(pa_memchunk *c, size_t min) { pa_memblock *n; size_t l; - assert(c && c->memblock && c->memblock->ref >= 1); + + assert(c); + assert(c->memblock); + assert(c->memblock->ref >= 1); if (c->memblock->ref == 1 && !c->memblock->read_only && c->memblock->length >= c->index+min) return; @@ -44,7 +47,7 @@ void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) if (l < min) l = min; - n = pa_memblock_new(l, s); + n = pa_memblock_new(c->memblock->pool, l); memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length); pa_memblock_unref(c->memblock); c->memblock = n; diff --git a/src/pulsecore/memchunk.h b/src/pulsecore/memchunk.h index 1b26c2e64..b8ce62495 100644 --- a/src/pulsecore/memchunk.h +++ b/src/pulsecore/memchunk.h @@ -36,7 +36,7 @@ typedef struct pa_memchunk { /* Make a memchunk writable, i.e. make sure that the caller may have * exclusive access to the memblock and it is not read_only. If needed * the memblock in the structure is replaced by a copy. */ -void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min); +void pa_memchunk_make_writable(pa_memchunk *c, size_t min); /* Invalidate a memchunk. This does not free the cotaining memblock, * but sets all members to zero. */ diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index f1a827bce..2fadeca30 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -377,8 +377,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t pa_frame_size(&ss), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, - NULL, - c->protocol->core->memblock_stat); + NULL); pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); c->playback.fragment_size = l/10; @@ -469,8 +468,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co pa_frame_size(&ss), 1, 0, - NULL, - c->protocol->core->memblock_stat); + NULL); pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2); c->source_output->push = source_output_push_cb; @@ -722,7 +720,7 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_ CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name."); assert(!c->scache.memchunk.memblock); - c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat); + c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length); c->scache.memchunk.index = 0; c->scache.memchunk.length = sc_length; c->scache.sample_spec = ss; @@ -941,7 +939,7 @@ static int do_read(struct connection *c) { } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat); + c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); c->playback.memblock_index = 0; } diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index 0b79892c5..2c9b3566c 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -348,8 +348,7 @@ static struct record_stream* record_stream_new( base = pa_frame_size(ss), 1, 0, - NULL, - c->protocol->core->memblock_stat); + NULL); assert(s->memblockq); s->fragment_size = (fragment_size/base)*base; @@ -448,7 +447,7 @@ static struct playback_stream* playback_stream_new( start_index = 0; } - silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat); + silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0); s->memblockq = pa_memblockq_new( start_index, @@ -457,8 +456,7 @@ static struct playback_stream* playback_stream_new( pa_frame_size(ss), prebuf, minreq, - silence, - c->protocol->core->memblock_stat); + silence); pa_memblock_unref(silence); @@ -1076,6 +1074,7 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { struct connection *c = userdata; pa_tagstruct *reply; + const pa_mempool_stat *stat; assert(c && t); if (!pa_tagstruct_eof(t)) { @@ -1085,11 +1084,13 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS); + stat = pa_mempool_get_stat(c->protocol->core->mempool); + reply = reply_new(tag); - pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total); - pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size); - pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated); - pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size); + pa_tagstruct_putu32(reply, stat->n_allocated); + pa_tagstruct_putu32(reply, stat->allocated_size); + pa_tagstruct_putu32(reply, stat->n_accumulated); + pa_tagstruct_putu32(reply, stat->accumulated_size); pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core)); pa_pstream_send_tagstruct(c->pstream, reply); } @@ -2256,7 +2257,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o pa_memblock_ref(u->memchunk.memblock); u->length = 0; } else { - u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat); + u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length); u->memchunk.index = u->memchunk.length = 0; } } @@ -2349,9 +2350,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo c->client->userdata = c; c->client->owner = p->module; - c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat); + c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool); assert(c->pstream); + pa_pstream_use_shm(c->pstream, 1); + pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c); pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c); pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c); diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 3705986d5..924ee29ec 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -128,7 +128,7 @@ static int do_read(struct connection *c) { } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat); + c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); c->playback.memblock_index = 0; } @@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, - NULL, - p->core->memblock_stat); + NULL); assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/10; @@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), 1, 0, - NULL, - p->core->memblock_stat); + NULL); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); pa_source_notify(c->source_output->source); } diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 7096d65af..421f5de9c 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -49,28 +49,45 @@ #include "pstream.h" +/* We piggyback information if audio data blocks are stored in SHM on the seek mode */ +#define PA_FLAG_SHMDATA 0x80000000LU +#define PA_FLAG_SHMRELEASE 0x40000000LU +#define PA_FLAG_SHMREVOKE 0xC0000000LU +#define PA_FLAG_SHMMASK 0xFF000000LU +#define PA_FLAG_SEEKMASK 0x000000FFLU + +/* The sequence descriptor header consists of 5 32bit integers: */ enum { PA_PSTREAM_DESCRIPTOR_LENGTH, PA_PSTREAM_DESCRIPTOR_CHANNEL, PA_PSTREAM_DESCRIPTOR_OFFSET_HI, PA_PSTREAM_DESCRIPTOR_OFFSET_LO, - PA_PSTREAM_DESCRIPTOR_SEEK, + PA_PSTREAM_DESCRIPTOR_FLAGS, PA_PSTREAM_DESCRIPTOR_MAX }; +/* If we have an SHM block, this info follows the descriptor */ +enum { + PA_PSTREAM_SHM_BLOCKID, + PA_PSTREAM_SHM_SHMID, + PA_PSTREAM_SHM_INDEX, + PA_PSTREAM_SHM_LENGTH, + PA_PSTREAM_SHM_MAX +}; + typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX]; #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */ struct item_info { - enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type; + enum { + PA_PSTREAM_ITEM_PACKET, + PA_PSTREAM_ITEM_MEMBLOCK, + PA_PSTREAM_ITEM_SHMRELEASE, + PA_PSTREAM_ITEM_SHMREVOKE + } type; - /* memblock info */ - pa_memchunk chunk; - uint32_t channel; - int64_t offset; - pa_seek_mode_t seek_mode; /* packet info */ pa_packet *packet; @@ -78,6 +95,15 @@ struct item_info { int with_creds; pa_creds creds; #endif + + /* memblock info */ + pa_memchunk chunk; + uint32_t channel; + int64_t offset; + pa_seek_mode_t seek_mode; + + /* release/revoke info */ + uint32_t block_id; }; struct pa_pstream { @@ -91,20 +117,26 @@ struct pa_pstream { int dead; struct { - struct item_info* current; pa_pstream_descriptor descriptor; + struct item_info* current; + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; void *data; size_t index; } write; struct { + pa_pstream_descriptor descriptor; pa_memblock *memblock; pa_packet *packet; - pa_pstream_descriptor descriptor; + uint32_t shm_info[PA_PSTREAM_SHM_MAX]; void *data; size_t index; } read; + int use_shm; + pa_memimport *import; + pa_memexport *export; + pa_pstream_packet_cb_t recieve_packet_callback; void *recieve_packet_callback_userdata; @@ -117,7 +149,7 @@ struct pa_pstream { pa_pstream_notify_cb_t die_callback; void *die_callback_userdata; - pa_memblock_stat *memblock_stat; + pa_mempool *mempool; #ifdef HAVE_CREDS pa_creds read_creds, write_creds; @@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) do_something(p); } -pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) { +static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata); + +pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) { pa_pstream *p; + + assert(m); assert(io); + assert(pool); p = pa_xnew(pa_pstream, 1); - p->ref = 1; p->io = io; pa_iochannel_set_callback(io, io_callback, p); - p->dead = 0; p->mainloop = m; @@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta p->write.current = NULL; p->write.index = 0; - p->read.memblock = NULL; p->read.packet = NULL; p->read.index = 0; p->recieve_packet_callback = NULL; p->recieve_packet_callback_userdata = NULL; - p->recieve_memblock_callback = NULL; p->recieve_memblock_callback_userdata = NULL; - p->drain_callback = NULL; p->drain_callback_userdata = NULL; - p->die_callback = NULL; p->die_callback_userdata = NULL; - p->memblock_stat = s; + p->mempool = pool; + + p->use_shm = 0; + p->export = NULL; + p->import = NULL; pa_iochannel_socket_set_rcvbuf(io, 1024*8); pa_iochannel_socket_set_sndbuf(io, 1024*8); @@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) { if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) { assert(i->chunk.memblock); pa_memblock_unref(i->chunk.memblock); - } else { - assert(i->type == PA_PSTREAM_ITEM_PACKET); + } else if (i->type == PA_PSTREAM_ITEM_PACKET) { assert(i->packet); pa_packet_unref(i->packet); } @@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) { void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) { struct item_info *i; - assert(p && packet && p->ref >= 1); + + assert(p); + assert(p->ref >= 1); + assert(packet); if (p->dead) return; -/* pa_log(__FILE__": push-packet %p", packet); */ - i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_PACKET; i->packet = pa_packet_ref(packet); + #ifdef HAVE_CREDS if ((i->with_creds = !!creds)) i->creds = *creds; @@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { struct item_info *i; - assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1); + + assert(p); + assert(p->ref >= 1); + assert(channel != (uint32_t) -1); + assert(chunk); if (p->dead) return; - -/* pa_log(__FILE__": push-memblock %p", chunk); */ - + i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_MEMBLOCK; i->chunk = *chunk; @@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa p->mainloop->defer_enable(p->defer_event, 1); } +static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) { + struct item_info *item; + pa_pstream *p = userdata; + + assert(p); + assert(p->ref >= 1); + + if (p->dead) + return; + +/* pa_log(__FILE__": Releasing block %u", block_id); */ + + item = pa_xnew(struct item_info, 1); + item->type = PA_PSTREAM_ITEM_SHMRELEASE; + item->block_id = block_id; +#ifdef HAVE_CREDS + item->with_creds = 0; +#endif + + pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); +} + +static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { + struct item_info *item; + pa_pstream *p = userdata; + + assert(p); + assert(p->ref >= 1); + + if (p->dead) + return; + +/* pa_log(__FILE__": Revoking block %u", block_id); */ + + item = pa_xnew(struct item_info, 1); + item->type = PA_PSTREAM_ITEM_SHMREVOKE; + item->block_id = block_id; +#ifdef HAVE_CREDS + item->with_creds = 0; +#endif + + pa_queue_push(p->send_queue, item); + p->mainloop->defer_enable(p->defer_event, 1); +} + static void prepare_next_write_item(pa_pstream *p) { assert(p); @@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) { return; p->write.index = 0; + p->write.data = NULL; + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0; if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) { - /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/ assert(p->write.current->packet); p->write.data = p->write.current->packet->data; p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0; + } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) { + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); + + } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) { + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id); } else { - assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock); - p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index; - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length); + uint32_t flags; + int send_payload = 1; + + assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK); + assert(p->write.current->chunk.memblock); + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32)); p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset)); - p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode); + + flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK; + + if (p->use_shm) { + uint32_t block_id, shm_id; + size_t offset, length; + + assert(p->export); + + if (pa_memexport_put(p->export, + p->write.current->chunk.memblock, + &block_id, + &shm_id, + &offset, + &length) >= 0) { + + flags |= PA_FLAG_SHMDATA; + send_payload = 0; + + p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id); + p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id); + p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index)); + p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length); + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info)); + p->write.data = p->write.shm_info; + } +/* else */ +/* pa_log_warn(__FILE__": Failed to export memory block."); */ + } + + if (send_payload) { + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length); + p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index; + } + + p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags); } #ifdef HAVE_CREDS @@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) { p->write_creds = p->write.current->creds; #endif - } static int do_write(pa_pstream *p) { @@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) { if (!p->write.current) return 0; - assert(p->write.data); - if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) p->write.descriptor + p->write.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index; } else { + assert(p->write.data); + d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE; l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); } + assert(l > 0); + #ifdef HAVE_CREDS if (p->send_creds_now) { @@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) { p->write.index += r; - if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { + if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) { assert(p->write.current); item_free(p->write.current, (void *) 1); p->write.current = NULL; @@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) { p->read.index += r; if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) { + uint32_t flags, length, channel; /* Reading of frame descriptor complete */ - /* Frame size too large */ - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) { - pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX); + flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]); + + if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) { + pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled."); + return -1; + } + + if (flags == PA_FLAG_SHMRELEASE) { + + /* This is a SHM memblock release frame with no payload */ + +/* pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + assert(p->export); + pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + + } else if (flags == PA_FLAG_SHMREVOKE) { + + /* This is a SHM memblock revoke frame with no payload */ + +/* pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */ + + assert(p->import); + pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); + + goto frame_done; + } + + length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]); + + if (length > FRAME_SIZE_MAX) { + pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length); return -1; } assert(!p->read.packet && !p->read.memblock); - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) { + channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]); + + if (channel == (uint32_t) -1) { + + if (flags != 0) { + pa_log_warn(__FILE__": Received packet frame with invalid flags value."); + return -1; + } + /* Frame is a packet frame */ - p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])); + p->read.packet = pa_packet_new(length); p->read.data = p->read.packet->data; + } else { - /* Frame is a memblock frame */ - p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat); - p->read.data = p->read.memblock->data; - if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) { - pa_log_warn(__FILE__": Invalid seek mode"); + if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) { + pa_log_warn(__FILE__": Received memblock frame with invalid seek mode."); + return -1; + } + + if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) { + + if (length != sizeof(p->read.shm_info)) { + pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length."); + return -1; + } + + /* Frame is a memblock frame referencing an SHM memblock */ + p->read.data = p->read.shm_info; + + } else if ((flags & PA_FLAG_SHMMASK) == 0) { + + /* Frame is a memblock frame */ + + p->read.memblock = pa_memblock_new(p->mempool, length); + p->read.data = p->read.memblock->data; + } else { + + pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value."); return -1; } } @@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) { } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) { /* Frame payload available */ - if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */ + if (p->read.memblock && p->recieve_memblock_callback) { + + /* Is this memblock data? Than pass it to the user */ l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r; if (l > 0) { @@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) { p, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), offset, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]), + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, &chunk, p->recieve_memblock_callback_userdata); } /* Drop seek info for following callbacks */ - p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = + p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0; } @@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) { /* Frame complete */ if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) { + if (p->read.memblock) { - assert(!p->read.packet); - + + /* This was a memblock frame. We can unref the memblock now */ pa_memblock_unref(p->read.memblock); - p->read.memblock = NULL; - } else { - assert(p->read.packet); + + } else if (p->read.packet) { if (p->recieve_packet_callback) #ifdef HAVE_CREDS @@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) { #endif pa_packet_unref(p->read.packet); - p->read.packet = NULL; + } else { + pa_memblock *b; + + assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA); + + assert(p->import); + + if (!(b = pa_memimport_get(p->import, + ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]), + ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) { + + pa_log_warn(__FILE__": Failed to import memory block."); + return -1; + } + + if (p->recieve_memblock_callback) { + int64_t offset; + pa_memchunk chunk; + + chunk.memblock = b; + chunk.index = 0; + chunk.length = b->length; + + offset = (int64_t) ( + (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | + (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO])))); + + p->recieve_memblock_callback( + p, + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + offset, + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK, + &chunk, + p->recieve_memblock_callback_userdata); + } + + pa_memblock_unref(b); } - p->read.index = 0; -#ifdef HAVE_CREDS - p->read_creds_valid = 0; -#endif + goto frame_done; } } - return 0; + return 0; + +frame_done: + p->read.memblock = NULL; + p->read.packet = NULL; + p->read.index = 0; + +#ifdef HAVE_CREDS + p->read_creds_valid = 0; +#endif + + return 0; } void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { @@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) { p->dead = 1; + if (p->import) { + pa_memimport_free(p->import); + p->import = NULL; + } + + if (p->export) { + pa_memexport_free(p->export); + p->export = NULL; + } + if (p->io) { pa_iochannel_free(p->io); p->io = NULL; @@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) { p->drain_callback = NULL; p->recieve_packet_callback = NULL; p->recieve_memblock_callback = NULL; + + +} + +void pa_pstream_use_shm(pa_pstream *p, int enable) { + assert(p); + assert(p->ref >= 1); + + p->use_shm = enable; + + if (!p->import) + p->import = pa_memimport_new(p->mempool, memimport_release_cb, p); + + if (!p->export) + p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p); } diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h index 789e40bc2..26bb7699c 100644 --- a/src/pulsecore/pstream.h +++ b/src/pulsecore/pstream.h @@ -39,7 +39,7 @@ typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const p typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata); typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata); -pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s); +pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *p); void pa_pstream_unref(pa_pstream*p); pa_pstream* pa_pstream_ref(pa_pstream*p); @@ -54,6 +54,8 @@ void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void int pa_pstream_is_pending(pa_pstream *p); +void pa_pstream_use_shm(pa_pstream *p, int enable); + void pa_pstream_close(pa_pstream *p); #endif diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c index 23cdf3811..742267146 100644 --- a/src/pulsecore/resampler.c +++ b/src/pulsecore/resampler.c @@ -42,7 +42,7 @@ struct pa_resampler { pa_sample_spec i_ss, o_ss; pa_channel_map i_cm, o_cm; size_t i_fz, o_fz; - pa_memblock_stat *memblock_stat; + pa_mempool *mempool; void (*impl_free)(pa_resampler *r); void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate); @@ -71,15 +71,16 @@ static int libsamplerate_init(pa_resampler*r); static int trivial_init(pa_resampler*r); pa_resampler* pa_resampler_new( - const pa_sample_spec *a, - const pa_channel_map *am, - const pa_sample_spec *b, - const pa_channel_map *bm, - pa_memblock_stat *s, - pa_resample_method_t resample_method) { + pa_mempool *pool, + const pa_sample_spec *a, + const pa_channel_map *am, + const pa_sample_spec *b, + const pa_channel_map *bm, + pa_resample_method_t resample_method) { pa_resampler *r = NULL; + assert(pool); assert(a); assert(b); assert(pa_sample_spec_valid(a)); @@ -88,7 +89,7 @@ pa_resampler* pa_resampler_new( r = pa_xnew(pa_resampler, 1); r->impl_data = NULL; - r->memblock_stat = s; + r->mempool = pool; r->resample_method = resample_method; r->impl_free = NULL; @@ -450,7 +451,7 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun assert(p); /* Take the existing buffer and make it a memblock */ - out->memblock = pa_memblock_new_dynamic(*p, out->length, r->memblock_stat); + out->memblock = pa_memblock_new_malloced(r->mempool, *p, out->length); *p = NULL; } } else { @@ -549,7 +550,7 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz; out->index = 0; - out->memblock = pa_memblock_new(l, r->memblock_stat); + out->memblock = pa_memblock_new(r->mempool, l); for (o_index = 0;; o_index++, u->o_counter++) { unsigned j; diff --git a/src/pulsecore/resampler.h b/src/pulsecore/resampler.h index c1199e5ce..327e24a29 100644 --- a/src/pulsecore/resampler.h +++ b/src/pulsecore/resampler.h @@ -43,12 +43,12 @@ typedef enum pa_resample_method { } pa_resample_method_t; pa_resampler* pa_resampler_new( - const pa_sample_spec *a, - const pa_channel_map *am, - const pa_sample_spec *b, - const pa_channel_map *bm, - pa_memblock_stat *s, - pa_resample_method_t resample_method); + pa_mempool *pool, + const pa_sample_spec *a, + const pa_channel_map *am, + const pa_sample_spec *b, + const pa_channel_map *bm, + pa_resample_method_t resample_method); void pa_resampler_free(pa_resampler *r); diff --git a/src/pulsecore/sample-util.c b/src/pulsecore/sample-util.c index 638f80670..7f5d8a020 100644 --- a/src/pulsecore/sample-util.c +++ b/src/pulsecore/sample-util.c @@ -35,13 +35,14 @@ #include "sample-util.h" #include "endianmacros.h" -pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) { +pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) { + assert(pool); assert(spec); if (length == 0) - length = pa_bytes_per_second(spec)/10; /* 100 ms */ + length = pa_bytes_per_second(spec)/20; /* 50 ms */ - return pa_silence_memblock(pa_memblock_new(length, s), spec); + return pa_silence_memblock(pa_memblock_new(pool, length), spec); } pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) { diff --git a/src/pulsecore/sample-util.h b/src/pulsecore/sample-util.h index 3ebb7e2ea..6b7707925 100644 --- a/src/pulsecore/sample-util.h +++ b/src/pulsecore/sample-util.h @@ -28,7 +28,7 @@ #include <pulsecore/memchunk.h> pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec); -pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s); +pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length); void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec); void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec); diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index b3fabad3a..b5ba9df19 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -136,9 +136,9 @@ pa_sink_input* pa_sink_input_new( !pa_channel_map_equal(&data->channel_map, &data->sink->channel_map)) if (!(resampler = pa_resampler_new( + core->mempool, &data->sample_spec, &data->channel_map, &data->sink->sample_spec, &data->sink->channel_map, - core->memblock_stat, data->resample_method))) { pa_log_warn(__FILE__": Unsupported resampling operation."); return NULL; @@ -299,7 +299,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) * while until the old sink has drained its playback buffer */ if (!i->silence_memblock) - i->silence_memblock = pa_silence_memblock_new(&i->sink->sample_spec, SILENCE_BUFFER_LENGTH, i->sink->core->memblock_stat); + i->silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH); chunk->memblock = pa_memblock_ref(i->silence_memblock); chunk->index = 0; @@ -338,7 +338,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { - pa_memchunk_make_writable(&tchunk, i->sink->core->memblock_stat, 0); + pa_memchunk_make_writable(&tchunk, 0); pa_volume_memchunk(&tchunk, &i->sample_spec, &i->volume); } @@ -540,9 +540,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { /* Okey, we need a new resampler for the new sink */ if (!(new_resampler = pa_resampler_new( + dest->core->mempool, &i->sample_spec, &i->channel_map, &dest->sample_spec, &dest->channel_map, - dest->core->memblock_stat, i->resample_method))) { pa_log_warn(__FILE__": Unsupported resampling operation."); return -1; @@ -553,7 +553,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { pa_usec_t old_latency, new_latency; pa_usec_t silence_usec = 0; - buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL, NULL); + buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL); /* Let's do a little bit of Voodoo for compensating latency * differences */ @@ -599,7 +599,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) { chunk.length = n; if (!volume_is_norm) { - pa_memchunk_make_writable(&chunk, origin->core->memblock_stat, 0); + pa_memchunk_make_writable(&chunk, 0); pa_volume_memchunk(&chunk, &origin->sample_spec, &volume); } diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 557d5efcb..aacb89fd5 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -298,14 +298,14 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) { pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume); if (s->sw_muted || !pa_cvolume_is_norm(&volume)) { - pa_memchunk_make_writable(result, s->core->memblock_stat, 0); + pa_memchunk_make_writable(result, 0); if (s->sw_muted) pa_silence_memchunk(result, &s->sample_spec); else pa_volume_memchunk(result, &s->sample_spec, &volume); } } else { - result->memblock = pa_memblock_new(length, s->core->memblock_stat); + result->memblock = pa_memblock_new(s->core->mempool, length); assert(result->memblock); /* pa_log("mixing %i", n); */ @@ -429,7 +429,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) { /*** This needs optimization ***/ - result->memblock = pa_memblock_new(result->length = length, s->core->memblock_stat); + result->memblock = pa_memblock_new(s->core->mempool, result->length = length); result->index = 0; pa_sink_render_into_full(s, result); diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index 6063b93e3..6782f50eb 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -75,7 +75,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { uint32_t fs = pa_frame_size(&i->sample_spec); sf_count_t n; - u->memchunk.memblock = pa_memblock_new(BUF_SIZE, i->sink->core->memblock_stat); + u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE); u->memchunk.index = 0; if (u->readf_function) { diff --git a/src/pulsecore/sound-file.c b/src/pulsecore/sound-file.c index d11d4b9d9..256cce434 100644 --- a/src/pulsecore/sound-file.c +++ b/src/pulsecore/sound-file.c @@ -34,7 +34,7 @@ #include "sound-file.h" #include "core-scache.h" -int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s) { +int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk) { SNDFILE*sf = NULL; SF_INFO sfinfo; int ret = -1; @@ -92,7 +92,7 @@ int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *ma goto finish; } - chunk->memblock = pa_memblock_new(l, s); + chunk->memblock = pa_memblock_new(pool, l); assert(chunk->memblock); chunk->index = 0; chunk->length = l; diff --git a/src/pulsecore/sound-file.h b/src/pulsecore/sound-file.h index 0b81d97e1..7e3c82eab 100644 --- a/src/pulsecore/sound-file.h +++ b/src/pulsecore/sound-file.h @@ -26,7 +26,7 @@ #include <pulse/channelmap.h> #include <pulsecore/memchunk.h> -int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s); +int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk); int pa_sound_file_too_big_to_cache(const char *fname); diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c index 7371474f1..f9d66f6d1 100644 --- a/src/pulsecore/source-output.c +++ b/src/pulsecore/source-output.c @@ -115,9 +115,9 @@ pa_source_output* pa_source_output_new( if (!pa_sample_spec_equal(&data->sample_spec, &data->source->sample_spec) || !pa_channel_map_equal(&data->channel_map, &data->source->channel_map)) if (!(resampler = pa_resampler_new( + core->mempool, &data->source->sample_spec, &data->source->channel_map, &data->sample_spec, &data->channel_map, - core->memblock_stat, data->resample_method))) { pa_log_warn(__FILE__": Unsupported resampling operation."); return NULL; @@ -330,9 +330,9 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) { /* Okey, we need a new resampler for the new sink */ if (!(new_resampler = pa_resampler_new( + dest->core->mempool, &dest->sample_spec, &dest->channel_map, &o->sample_spec, &o->channel_map, - dest->core->memblock_stat, o->resample_method))) { pa_log_warn(__FILE__": Unsupported resampling operation."); return -1; diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index 0d55da448..cb5b10302 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -211,7 +211,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) { pa_memchunk vchunk = *chunk; pa_memblock_ref(vchunk.memblock); - pa_memchunk_make_writable(&vchunk, s->core->memblock_stat, 0); + pa_memchunk_make_writable(&vchunk, 0); if (s->sw_muted) pa_silence_memchunk(&vchunk, &s->sample_spec); else |