summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/daemon/main.c2
-rw-r--r--src/modules/module-alsa-sink.c2
-rw-r--r--src/modules/module-alsa-source.c2
-rw-r--r--src/modules/module-combine.c3
-rw-r--r--src/modules/module-jack-source.c2
-rw-r--r--src/modules/module-oss-mmap.c6
-rw-r--r--src/modules/module-oss.c4
-rw-r--r--src/modules/module-pipe-source.c2
-rw-r--r--src/modules/module-sine.c2
-rw-r--r--src/modules/module-tunnel.c2
-rw-r--r--src/modules/rtp/module-rtp-recv.c11
-rw-r--r--src/modules/rtp/module-rtp-send.c3
-rw-r--r--src/modules/rtp/rtp.c4
-rw-r--r--src/modules/rtp/rtp.h2
-rw-r--r--src/pulse/context.c8
-rw-r--r--src/pulse/internal.h2
-rw-r--r--src/pulse/stream.c7
-rw-r--r--src/pulsecore/cli-command.c40
-rw-r--r--src/pulsecore/core-scache.c4
-rw-r--r--src/pulsecore/core.c6
-rw-r--r--src/pulsecore/core.h4
-rw-r--r--src/pulsecore/mcalign.c6
-rw-r--r--src/pulsecore/mcalign.h2
-rw-r--r--src/pulsecore/memblock.c737
-rw-r--r--src/pulsecore/memblock.h108
-rw-r--r--src/pulsecore/memblockq.c7
-rw-r--r--src/pulsecore/memblockq.h3
-rw-r--r--src/pulsecore/memchunk.c9
-rw-r--r--src/pulsecore/memchunk.h2
-rw-r--r--src/pulsecore/protocol-esound.c10
-rw-r--r--src/pulsecore/protocol-native.c25
-rw-r--r--src/pulsecore/protocol-simple.c8
-rw-r--r--src/pulsecore/pstream.c396
-rw-r--r--src/pulsecore/pstream.h4
-rw-r--r--src/pulsecore/resampler.c21
-rw-r--r--src/pulsecore/resampler.h12
-rw-r--r--src/pulsecore/sample-util.c7
-rw-r--r--src/pulsecore/sample-util.h2
-rw-r--r--src/pulsecore/sink-input.c12
-rw-r--r--src/pulsecore/sink.c6
-rw-r--r--src/pulsecore/sound-file-stream.c2
-rw-r--r--src/pulsecore/sound-file.c4
-rw-r--r--src/pulsecore/sound-file.h2
-rw-r--r--src/pulsecore/source-output.c4
-rw-r--r--src/pulsecore/source.c2
45 files changed, 1216 insertions, 293 deletions
diff --git a/src/daemon/main.c b/src/daemon/main.c
index 38d465f80..aada0ad79 100644
--- a/src/daemon/main.c
+++ b/src/daemon/main.c
@@ -559,7 +559,7 @@ int main(int argc, char *argv[]) {
mainloop = pa_mainloop_new();
assert(mainloop);
- c = pa_core_new(pa_mainloop_get_api(mainloop));
+ c = pa_core_new(pa_mainloop_get_api(mainloop), 1);
assert(c);
c->is_system_instance = !!conf->system_instance;
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c
index 8da3d2360..0cebd50f1 100644
--- a/src/modules/module-alsa-sink.c
+++ b/src/modules/module-alsa-sink.c
@@ -492,7 +492,7 @@ int pa__init(pa_core *c, pa_module*m) {
pa_log_info(__FILE__": using %u fragments of size %lu bytes.", periods, (long unsigned)u->fragment_size);
- u->silence.memblock = pa_memblock_new(u->silence.length = u->fragment_size, c->memblock_stat);
+ u->silence.memblock = pa_memblock_new(c->mempool, u->silence.length = u->fragment_size);
assert(u->silence.memblock);
pa_silence_memblock(u->silence.memblock, &ss);
u->silence.index = 0;
diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c
index 4a8678c9c..c3979df17 100644
--- a/src/modules/module-alsa-source.c
+++ b/src/modules/module-alsa-source.c
@@ -151,7 +151,7 @@ static void do_read(struct userdata *u) {
size_t l;
if (!u->memchunk.memblock) {
- u->memchunk.memblock = pa_memblock_new(u->memchunk.length = u->fragment_size, u->source->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size);
u->memchunk.index = 0;
}
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 008fe6e74..5243975b5 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -235,8 +235,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink, int resample
pa_frame_size(&u->sink->sample_spec),
1,
0,
- NULL,
- sink->core->memblock_stat);
+ NULL);
snprintf(t, sizeof(t), "%s: output #%u", u->sink->name, u->n_outputs+1);
diff --git a/src/modules/module-jack-source.c b/src/modules/module-jack-source.c
index 583f3b8e9..8e659198d 100644
--- a/src/modules/module-jack-source.c
+++ b/src/modules/module-jack-source.c
@@ -137,7 +137,7 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
fs = pa_frame_size(&u->source->sample_spec);
- chunk.memblock = pa_memblock_new(chunk.length = u->frames_posted * fs, u->core->memblock_stat);
+ chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);
chunk.index = 0;
for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {
diff --git a/src/modules/module-oss-mmap.c b/src/modules/module-oss-mmap.c
index c783a2f1f..75ab9a9e9 100644
--- a/src/modules/module-oss-mmap.c
+++ b/src/modules/module-oss-mmap.c
@@ -162,10 +162,10 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {
chunk.memblock = u->out_memblocks[u->out_current] =
pa_memblock_new_fixed(
+ u->core->mempool,
(uint8_t*) u->out_mmap+u->out_fragment_size*u->out_current,
u->out_fragment_size,
- 1,
- u->core->memblock_stat);
+ 1);
assert(chunk.memblock);
chunk.length = chunk.memblock->length;
chunk.index = 0;
@@ -210,7 +210,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {
pa_memchunk chunk;
if (!u->in_memblocks[u->in_current]) {
- chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed((uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1, u->core->memblock_stat);
+ chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1);
chunk.length = chunk.memblock->length;
chunk.index = 0;
diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c
index ce11ee025..b9b80e725 100644
--- a/src/modules/module-oss.c
+++ b/src/modules/module-oss.c
@@ -217,7 +217,7 @@ static void do_read(struct userdata *u) {
}
do {
- memchunk.memblock = pa_memblock_new(l, u->core->memblock_stat);
+ memchunk.memblock = pa_memblock_new(u->core->mempool, l);
assert(memchunk.memblock);
if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) {
pa_memblock_unref(memchunk.memblock);
@@ -503,7 +503,7 @@ int pa__init(pa_core *c, pa_module*m) {
u->out_fragment_size = out_frag_size;
u->in_fragment_size = in_frag_size;
- u->silence.memblock = pa_memblock_new(u->silence.length = u->out_fragment_size, u->core->memblock_stat);
+ u->silence.memblock = pa_memblock_new(u->core->mempool, u->silence.length = u->out_fragment_size);
assert(u->silence.memblock);
pa_silence_memblock(u->silence.memblock, &ss);
u->silence.index = 0;
diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c
index 5caa60a3e..43a8dab55 100644
--- a/src/modules/module-pipe-source.c
+++ b/src/modules/module-pipe-source.c
@@ -91,7 +91,7 @@ static void do_read(struct userdata *u) {
pa_module_set_used(u->module, pa_idxset_size(u->source->outputs));
if (!u->chunk.memblock) {
- u->chunk.memblock = pa_memblock_new(1024, u->core->memblock_stat);
+ u->chunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF);
u->chunk.index = chunk.length = 0;
}
diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c
index 5ceddce08..89c3c6093 100644
--- a/src/modules/module-sine.c
+++ b/src/modules/module-sine.c
@@ -139,7 +139,7 @@ int pa__init(pa_core *c, pa_module*m) {
goto fail;
}
- u->memblock = pa_memblock_new(pa_bytes_per_second(&ss), c->memblock_stat);
+ u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss));
calc_sine(u->memblock->data, u->memblock->length, frequency);
snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);
diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 9bb11c094..53bffd3b1 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -651,7 +651,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
return;
}
- u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat);
+ u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index df6f8c118..5d3f3e279 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -150,7 +150,7 @@ static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event
assert(fd == s->rtp_context.fd);
assert(flags == PA_IO_EVENT_INPUT);
- if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0)
+ if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->mempool) < 0)
return;
if (s->sdp_info.payload != s->rtp_context.payload) {
@@ -312,10 +312,10 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
s->sink_input->kill = sink_input_kill;
s->sink_input->get_latency = sink_input_get_latency;
- silence = pa_silence_memblock_new(&s->sink_input->sample_spec,
+ silence = pa_silence_memblock_new(s->userdata->core->mempool,
+ &s->sink_input->sample_spec,
(pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))*
- pa_frame_size(&s->sink_input->sample_spec),
- s->userdata->core->memblock_stat);
+ pa_frame_size(&s->sink_input->sample_spec));
s->memblockq = pa_memblockq_new(
0,
@@ -324,8 +324,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
pa_frame_size(&s->sink_input->sample_spec),
pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
0,
- silence,
- u->core->memblock_stat);
+ silence);
pa_memblock_unref(silence);
diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c
index 759aa8197..1b85c840b 100644
--- a/src/modules/rtp/module-rtp-send.c
+++ b/src/modules/rtp/module-rtp-send.c
@@ -297,8 +297,7 @@ int pa__init(pa_core *c, pa_module*m) {
pa_frame_size(&ss),
1,
0,
- NULL,
- c->memblock_stat);
+ NULL);
u->mtu = mtu;
diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c
index ee037d421..8e77c60a6 100644
--- a/src/modules/rtp/rtp.c
+++ b/src/modules/rtp/rtp.c
@@ -149,7 +149,7 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame
return c;
}
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
int size;
struct msghdr m;
struct iovec iov;
@@ -170,7 +170,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
if (!size)
return 0;
- chunk->memblock = pa_memblock_new(size, st);
+ chunk->memblock = pa_memblock_new(pool, size);
iov.iov_base = chunk->memblock->data;
iov.iov_len = size;
diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h
index 35fbbd357..123602b23 100644
--- a/src/modules/rtp/rtp.h
+++ b/src/modules/rtp/rtp.h
@@ -41,7 +41,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr
int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q);
pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool);
void pa_rtp_context_destroy(pa_rtp_context *c);
diff --git a/src/pulse/context.c b/src/pulse/context.c
index 34f517f0b..b35305424 100644
--- a/src/pulse/context.c
+++ b/src/pulse/context.c
@@ -128,7 +128,7 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
c->subscribe_callback = NULL;
c->subscribe_userdata = NULL;
- c->memblock_stat = pa_memblock_stat_new();
+ c->mempool = pa_mempool_new(1);
c->local = -1;
c->server_list = NULL;
c->server = NULL;
@@ -177,7 +177,7 @@ static void context_free(pa_context *c) {
if (c->playback_streams)
pa_dynarray_free(c->playback_streams, NULL, NULL);
- pa_memblock_stat_unref(c->memblock_stat);
+ pa_mempool_free(c->mempool);
if (c->conf)
pa_client_conf_free(c->conf);
@@ -407,7 +407,9 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
pa_context_ref(c);
assert(!c->pstream);
- c->pstream = pa_pstream_new(c->mainloop, io, c->memblock_stat);
+ c->pstream = pa_pstream_new(c->mainloop, io, c->mempool);
+
+ pa_pstream_use_shm(c->pstream, 1);
pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
diff --git a/src/pulse/internal.h b/src/pulse/internal.h
index 96028d832..afcfaeff5 100644
--- a/src/pulse/internal.h
+++ b/src/pulse/internal.h
@@ -69,7 +69,7 @@ struct pa_context {
pa_context_subscribe_cb_t subscribe_callback;
void *subscribe_userdata;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
int local;
int do_autospawn;
diff --git a/src/pulse/stream.c b/src/pulse/stream.c
index 677df009d..180cd096d 100644
--- a/src/pulse/stream.c
+++ b/src/pulse/stream.c
@@ -437,8 +437,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
pa_frame_size(&s->sample_spec),
1,
0,
- NULL,
- s->context->memblock_stat);
+ NULL);
}
s->channel_valid = 1;
@@ -604,9 +603,9 @@ int pa_stream_write(
return 0;
if (free_cb)
- chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
+ chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);
else {
- chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
+ chunk.memblock = pa_memblock_new(s->context->mempool, length);
memcpy(chunk.memblock->data, data, length);
}
diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c
index f74258d3c..811b96d2a 100644
--- a/src/pulsecore/cli-command.c
+++ b/src/pulsecore/cli-command.c
@@ -100,6 +100,7 @@ static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int
static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
/* A method table for all available commands */
@@ -144,6 +145,7 @@ static const struct command commands[] = {
{ "list-props", pa_cli_command_list_props, NULL, 1},
{ "move-sink-input", pa_cli_command_move_sink_input, "Move sink input to another sink (args: index, sink)", 3},
{ "move-source-output", pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
+ { "vacuum", pa_cli_command_vacuum, NULL, 1},
{ NULL, NULL, NULL, 0 }
};
@@ -239,23 +241,32 @@ static int pa_cli_command_source_outputs(pa_core *c, pa_tokenizer *t, pa_strbuf
static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
char s[256];
+ const pa_mempool_stat *stat;
assert(c && t);
- pa_bytes_snprint(s, sizeof(s), c->memblock_stat->total_size);
+ stat = pa_mempool_get_stat(c->mempool);
+
pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n",
- c->memblock_stat->total,
- s);
+ stat->n_allocated,
+ pa_bytes_snprint(s, sizeof(s), stat->allocated_size));
- pa_bytes_snprint(s, sizeof(s), c->memblock_stat->allocated_size);
pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n",
- c->memblock_stat->allocated,
- s);
+ stat->n_accumulated,
+ pa_bytes_snprint(s, sizeof(s), stat->accumulated_size));
+
+ pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n",
+ stat->n_imported,
+ pa_bytes_snprint(s, sizeof(s), stat->imported_size));
- pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c));
- pa_strbuf_printf(buf, "Total sample cache size: %s.\n", s);
+ pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n",
+ stat->n_exported,
+ pa_bytes_snprint(s, sizeof(s), stat->exported_size));
- pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec);
- pa_strbuf_printf(buf, "Default sample spec: %s\n", s);
+ pa_strbuf_printf(buf, "Total sample cache size: %s.\n",
+ pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)));
+
+ pa_strbuf_printf(buf, "Default sample spec: %s\n",
+ pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec));
pa_strbuf_printf(buf, "Default sink name: %s\n"
"Default source name: %s\n",
@@ -731,6 +742,15 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf
return 0;
}
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+ assert(c);
+ assert(t);
+
+ pa_mempool_vacuum(c->mempool);
+
+ return 0;
+}
+
static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
const char *n, *k;
pa_sink_input *si;
diff --git a/src/pulsecore/core-scache.c b/src/pulsecore/core-scache.c
index 377dd569f..ca2408fe7 100644
--- a/src/pulsecore/core-scache.c
+++ b/src/pulsecore/core-scache.c
@@ -176,7 +176,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3
filename = buf;
#endif
- if (pa_sound_file_load(filename, &ss, &map, &chunk, c->memblock_stat) < 0)
+ if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0)
return -1;
r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx);
@@ -261,7 +261,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t
return -1;
if (e->lazy && !e->memchunk.memblock) {
- if (pa_sound_file_load(e->filename, &e->sample_spec, &e->channel_map, &e->memchunk, c->memblock_stat) < 0)
+ if (pa_sound_file_load(c->mempool, e->filename, &e->sample_spec, &e->channel_map, &e->memchunk) < 0)
return -1;
pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index);
diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c
index 7f2f0f60d..5fdeab569 100644
--- a/src/pulsecore/core.c
+++ b/src/pulsecore/core.c
@@ -44,7 +44,7 @@
#include "core.h"
-pa_core* pa_core_new(pa_mainloop_api *m) {
+pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
pa_core* c;
c = pa_xnew(pa_core, 1);
@@ -78,7 +78,7 @@ pa_core* pa_core_new(pa_mainloop_api *m) {
PA_LLIST_HEAD_INIT(pa_subscription_event, c->subscription_event_queue);
c->subscription_event_last = NULL;
- c->memblock_stat = pa_memblock_stat_new();
+ c->mempool = pa_mempool_new(shared);
c->disallow_module_loading = 0;
@@ -139,7 +139,7 @@ void pa_core_free(pa_core *c) {
pa_xfree(c->default_source_name);
pa_xfree(c->default_sink_name);
- pa_memblock_stat_unref(c->memblock_stat);
+ pa_mempool_free(c->mempool);
pa_property_cleanup(c);
diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h
index f9fa386e8..3a34d297a 100644
--- a/src/pulsecore/core.h
+++ b/src/pulsecore/core.h
@@ -67,7 +67,7 @@ struct pa_core {
PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue);
pa_subscription_event *subscription_event_last;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
int disallow_module_loading, running_as_daemon;
int exit_idle_time, module_idle_time, scache_idle_time;
@@ -88,7 +88,7 @@ struct pa_core {
hook_source_disconnect;
};
-pa_core* pa_core_new(pa_mainloop_api *m);
+pa_core* pa_core_new(pa_mainloop_api *m, int shared);
void pa_core_free(pa_core*c);
/* Check whether noone is connected to this core */
diff --git a/src/pulsecore/mcalign.c b/src/pulsecore/mcalign.c
index 8283a7a0b..9ede610d7 100644
--- a/src/pulsecore/mcalign.c
+++ b/src/pulsecore/mcalign.c
@@ -35,10 +35,9 @@
struct pa_mcalign {
size_t base;
pa_memchunk leftover, current;
- pa_memblock_stat *memblock_stat;
};
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
+pa_mcalign *pa_mcalign_new(size_t base) {
pa_mcalign *m;
assert(base);
@@ -47,7 +46,6 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
m->base = base;
pa_memchunk_reset(&m->leftover);
pa_memchunk_reset(&m->current);
- m->memblock_stat = s;
return m;
}
@@ -100,7 +98,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
l = c->length;
/* Can we use the current block? */
- pa_memchunk_make_writable(&m->leftover, m->memblock_stat, m->base);
+ pa_memchunk_make_writable(&m->leftover, m->base);
memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l);
m->leftover.length += l;
diff --git a/src/pulsecore/mcalign.h b/src/pulsecore/mcalign.h
index 80e374996..94e99e212 100644
--- a/src/pulsecore/mcalign.h
+++ b/src/pulsecore/mcalign.h
@@ -63,7 +63,7 @@
typedef struct pa_mcalign pa_mcalign;
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s);
+pa_mcalign *pa_mcalign_new(size_t base);
void pa_mcalign_free(pa_mcalign *m);
/* Push a new memchunk into the aligner. The caller of this routine
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 36de17fb3..4ce1b7c1a 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -27,86 +27,271 @@
#include <stdlib.h>
#include <assert.h>
#include <string.h>
+#include <unistd.h>
#include <pulse/xmalloc.h>
+#include <pulsecore/shm.h>
+#include <pulsecore/log.h>
+#include <pulsecore/hashmap.h>
+
#include "memblock.h"
-static void stat_add(pa_memblock*m, pa_memblock_stat *s) {
- assert(m);
+#define PA_MEMPOOL_SLOTS_MAX 128
+#define PA_MEMPOOL_SLOT_SIZE (16*1024)
- if (!s) {
- m->stat = NULL;
- return;
- }
+#define PA_MEMEXPORT_SLOTS_MAX 128
+
+#define PA_MEMIMPORT_SLOTS_MAX 128
+#define PA_MEMIMPORT_SEGMENTS_MAX 16
+
+struct pa_memimport_segment {
+ pa_memimport *import;
+ pa_shm memory;
+ unsigned n_blocks;
+};
+
+struct pa_memimport {
+ pa_mempool *pool;
+ pa_hashmap *segments;
+ pa_hashmap *blocks;
+
+ /* Called whenever an imported memory block is no longer
+ * needed. */
+ pa_memimport_release_cb_t release_cb;
+ void *userdata;
+
+ PA_LLIST_FIELDS(pa_memimport);
+};
+
+struct memexport_slot {
+ PA_LLIST_FIELDS(struct memexport_slot);
+ pa_memblock *block;
+};
+
+struct pa_memexport {
+ pa_mempool *pool;
+
+ struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
+ PA_LLIST_HEAD(struct memexport_slot, free_slots);
+ PA_LLIST_HEAD(struct memexport_slot, used_slots);
+ unsigned n_init;
+
+ /* Called whenever a client from which we imported a memory block
+ which we in turn exported to another client dies and we need to
+ revoke the memory block accordingly */
+ pa_memexport_revoke_cb_t revoke_cb;
+ void *userdata;
+
+ PA_LLIST_FIELDS(pa_memexport);
+};
+
+struct mempool_slot {
+ PA_LLIST_FIELDS(struct mempool_slot);
+ /* the actual data follows immediately hereafter */
+};
- m->stat = pa_memblock_stat_ref(s);
- s->total++;
- s->allocated++;
- s->total_size += m->length;
- s->allocated_size += m->length;
+struct pa_mempool {
+ pa_shm memory;
+ size_t block_size;
+ unsigned n_blocks, n_init;
+
+ PA_LLIST_HEAD(pa_memimport, imports);
+ PA_LLIST_HEAD(pa_memexport, exports);
+
+ /* A list of free slots that may be reused */
+ PA_LLIST_HEAD(struct mempool_slot, free_slots);
+ PA_LLIST_HEAD(struct mempool_slot, used_slots);
+
+ pa_mempool_stat stat;
+};
+
+static void segment_detach(pa_memimport_segment *seg);
+
+static void stat_add(pa_memblock*b) {
+ assert(b);
+ assert(b->pool);
+
+ b->pool->stat.n_allocated ++;
+ b->pool->stat.n_accumulated ++;
+ b->pool->stat.allocated_size += b->length;
+ b->pool->stat.accumulated_size += b->length;
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ b->pool->stat.n_imported++;
+ b->pool->stat.imported_size += b->length;
+ }
}
-static void stat_remove(pa_memblock *m) {
- assert(m);
+static void stat_remove(pa_memblock *b) {
+ assert(b);
+ assert(b->pool);
- if (!m->stat)
- return;
+ assert(b->pool->stat.n_allocated > 0);
+ assert(b->pool->stat.allocated_size >= b->length);
+
+ b->pool->stat.n_allocated --;
+ b->pool->stat.allocated_size -= b->length;
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ assert(b->pool->stat.n_imported > 0);
+ assert(b->pool->stat.imported_size >= b->length);
+
+ b->pool->stat.n_imported --;
+ b->pool->stat.imported_size -= b->length;
+ }
+}
- m->stat->total--;
- m->stat->total_size -= m->length;
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
+
+pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
+ pa_memblock *b;
- pa_memblock_stat_unref(m->stat);
- m->stat = NULL;
+ assert(p);
+ assert(length > 0);
+
+ if (!(b = pa_memblock_new_pool(p, length)))
+ b = memblock_new_appended(p, length);
+
+ return b;
}
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)+length);
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
+ pa_memblock *b;
+
+ assert(p);
+ assert(length > 0);
+
+ b = pa_xmalloc(sizeof(pa_memblock) + length);
b->type = PA_MEMBLOCK_APPENDED;
+ b->read_only = 0;
b->ref = 1;
b->length = length;
- b->data = b+1;
- b->free_cb = NULL;
- b->read_only = 0;
- stat_add(b, s);
+ b->data = (uint8_t*) b + sizeof(pa_memblock);
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_dynamic(void *d, size_t length, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
- b->type = PA_MEMBLOCK_DYNAMIC;
- b->ref = 1;
+static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
+ struct mempool_slot *slot;
+ assert(p);
+
+ if (p->free_slots) {
+ slot = p->free_slots;
+ PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot);
+ } else if (p->n_init < p->n_blocks)
+ slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++));
+ else {
+ pa_log_debug(__FILE__": Pool full");
+ p->stat.n_pool_full++;
+ return NULL;
+ }
+
+ PA_LLIST_PREPEND(struct mempool_slot, p->used_slots, slot);
+ return slot;
+}
+
+static void* mempool_slot_data(struct mempool_slot *slot) {
+ assert(slot);
+
+ return (uint8_t*) slot + sizeof(struct mempool_slot);
+}
+
+static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
+ assert(p);
+ assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
+ assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
+
+ return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
+}
+
+static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
+ unsigned idx;
+
+ if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
+ return NULL;
+
+ return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
+}
+
+pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
+ pa_memblock *b = NULL;
+ struct mempool_slot *slot;
+
+ assert(p);
+ assert(length > 0);
+
+ if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
+
+ if (!(slot = mempool_allocate_slot(p)))
+ return NULL;
+
+ b = mempool_slot_data(slot);
+ b->type = PA_MEMBLOCK_POOL;
+ b->data = (uint8_t*) b + sizeof(pa_memblock);
+
+ } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
+
+ if (!(slot = mempool_allocate_slot(p)))
+ return NULL;
+
+ b = pa_xnew(pa_memblock, 1);
+ b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+ b->data = mempool_slot_data(slot);
+ } else {
+ pa_log_debug(__FILE__": Memory block to large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
+ p->stat.n_too_large_for_pool++;
+ return NULL;
+ }
+
b->length = length;
- b->data = d;
- b->free_cb = NULL;
b->read_only = 0;
- stat_add(b, s);
+ b->ref = 1;
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_fixed(void *d, size_t length, int read_only, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
+pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
+ pa_memblock *b;
+
+ assert(p);
+ assert(d);
+ assert(length > 0);
+
+ b = pa_xnew(pa_memblock, 1);
b->type = PA_MEMBLOCK_FIXED;
+ b->read_only = read_only;
b->ref = 1;
b->length = length;
b->data = d;
- b->free_cb = NULL;
- b->read_only = read_only;
- stat_add(b, s);
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s) {
+pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
pa_memblock *b;
- assert(d && length && free_cb);
- b = pa_xmalloc(sizeof(pa_memblock));
+
+ assert(p);
+ assert(d);
+ assert(length > 0);
+ assert(free_cb);
+
+ b = pa_xnew(pa_memblock, 1);
b->type = PA_MEMBLOCK_USER;
+ b->read_only = read_only;
b->ref = 1;
b->length = length;
b->data = d;
- b->free_cb = free_cb;
- b->read_only = read_only;
- stat_add(b, s);
+ b->per_type.user.free_cb = free_cb;
+ b->pool = p;
+
+ stat_add(b);
return b;
}
@@ -122,52 +307,458 @@ void pa_memblock_unref(pa_memblock*b) {
assert(b);
assert(b->ref >= 1);
- if ((--(b->ref)) == 0) {
- stat_remove(b);
+ if ((--(b->ref)) > 0)
+ return;
+
+ stat_remove(b);
+
+ switch (b->type) {
+ case PA_MEMBLOCK_USER :
+ assert(b->per_type.user.free_cb);
+ b->per_type.user.free_cb(b->data);
+
+ /* Fall through */
+
+ case PA_MEMBLOCK_FIXED:
+ case PA_MEMBLOCK_APPENDED :
+ pa_xfree(b);
+ break;
+
+ case PA_MEMBLOCK_IMPORTED : {
+ pa_memimport_segment *segment;
+
+ segment = b->per_type.imported.segment;
+ assert(segment);
+ assert(segment->import);
+
+ pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
+ segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata);
+
+ if (-- segment->n_blocks <= 0)
+ segment_detach(segment);
+
+ pa_xfree(b);
+ break;
+ }
- if (b->type == PA_MEMBLOCK_USER) {
- assert(b->free_cb);
- b->free_cb(b->data);
- } else if (b->type == PA_MEMBLOCK_DYNAMIC)
- pa_xfree(b->data);
+ case PA_MEMBLOCK_POOL_EXTERNAL:
+ case PA_MEMBLOCK_POOL: {
+ struct mempool_slot *slot;
- pa_xfree(b);
+ slot = mempool_slot_by_ptr(b->pool, b->data);
+ assert(slot);
+
+ PA_LLIST_REMOVE(struct mempool_slot, b->pool->used_slots, slot);
+ PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot);
+
+ if (b->type == PA_MEMBLOCK_POOL_EXTERNAL)
+ pa_xfree(b);
+ }
}
}
+static void memblock_make_local(pa_memblock *b) {
+ assert(b);
+
+ if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
+ struct mempool_slot *slot;
+
+ if ((slot = mempool_allocate_slot(b->pool))) {
+ void *new_data;
+ /* We can move it into a local pool, perfect! */
+
+ b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+ b->read_only = 0;
+
+ new_data = mempool_slot_data(slot);
+ memcpy(new_data, b->data, b->length);
+ b->data = new_data;
+ return;
+ }
+ }
+
+ /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
+ b->type = PA_MEMBLOCK_USER;
+ b->per_type.user.free_cb = pa_xfree;
+ b->read_only = 0;
+ b->data = pa_xmemdup(b->data, b->length);
+}
+
void pa_memblock_unref_fixed(pa_memblock *b) {
- assert(b && b->ref >= 1 && b->type == PA_MEMBLOCK_FIXED);
+ assert(b);
+ assert(b->ref >= 1);
+ assert(b->type == PA_MEMBLOCK_FIXED);
- if (b->ref == 1)
- pa_memblock_unref(b);
- else {
- b->data = pa_xmemdup(b->data, b->length);
- b->type = PA_MEMBLOCK_DYNAMIC;
- b->ref--;
+ if (b->ref > 1)
+ memblock_make_local(b);
+
+ pa_memblock_unref(b);
+}
+
+static void memblock_replace_import(pa_memblock *b) {
+ pa_memimport_segment *seg;
+
+ assert(b);
+ assert(b->type == PA_MEMBLOCK_IMPORTED);
+
+ assert(b->pool->stat.n_imported > 0);
+ assert(b->pool->stat.imported_size >= b->length);
+ b->pool->stat.n_imported --;
+ b->pool->stat.imported_size -= b->length;
+
+ seg = b->per_type.imported.segment;
+ assert(seg);
+ assert(seg->import);
+
+ pa_hashmap_remove(
+ seg->import->blocks,
+ PA_UINT32_TO_PTR(b->per_type.imported.id));
+
+ memblock_make_local(b);
+
+ if (-- seg->n_blocks <= 0)
+ segment_detach(seg);
+}
+
+pa_mempool* pa_mempool_new(int shared) {
+ size_t ps;
+ pa_mempool *p;
+
+ p = pa_xnew(pa_mempool, 1);
+
+ ps = (size_t) sysconf(_SC_PAGESIZE);
+
+ p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
+
+ if (p->block_size < ps)
+ p->block_size = ps;
+
+ p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
+
+ assert(p->block_size > sizeof(struct mempool_slot));
+
+ if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
+ pa_xfree(p);
+ return NULL;
+ }
+
+ p->n_init = 0;
+
+ PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
+ PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
+ PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots);
+ PA_LLIST_HEAD_INIT(struct mempool_slot, p->used_slots);
+
+ memset(&p->stat, 0, sizeof(p->stat));
+
+ return p;
+}
+
+void pa_mempool_free(pa_mempool *p) {
+ assert(p);
+
+ while (p->imports)
+ pa_memimport_free(p->imports);
+
+ while (p->exports)
+ pa_memexport_free(p->exports);
+
+ if (p->stat.n_allocated > 0)
+ pa_log_warn(__FILE__": WARNING! Memory pool destroyed but not all memory blocks freed!");
+
+ pa_shm_free(&p->memory);
+ pa_xfree(p);
+}
+
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
+ assert(p);
+
+ return &p->stat;
+}
+
+void pa_mempool_vacuum(pa_mempool *p) {
+ struct mempool_slot *slot;
+
+ assert(p);
+
+ for (slot = p->free_slots; slot; slot = slot->next) {
+ pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot));
}
}
-pa_memblock_stat* pa_memblock_stat_new(void) {
- pa_memblock_stat *s;
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
+ assert(p);
- s = pa_xmalloc(sizeof(pa_memblock_stat));
- s->ref = 1;
- s->total = s->total_size = s->allocated = s->allocated_size = 0;
+ if (!p->memory.shared)
+ return -1;
- return s;
+ *id = p->memory.id;
+
+ return 0;
+}
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
+ pa_memimport *i;
+
+ assert(p);
+ assert(cb);
+
+ i = pa_xnew(pa_memimport, 1);
+ i->pool = p;
+ i->segments = pa_hashmap_new(NULL, NULL);
+ i->blocks = pa_hashmap_new(NULL, NULL);
+ i->release_cb = cb;
+ i->userdata = userdata;
+
+ PA_LLIST_PREPEND(pa_memimport, p->imports, i);
+ return i;
}
-void pa_memblock_stat_unref(pa_memblock_stat *s) {
- assert(s && s->ref >= 1);
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
+
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+ pa_memimport_segment* seg;
- if (!(--(s->ref))) {
- assert(!s->total);
- pa_xfree(s);
+ if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+ return NULL;
+
+ seg = pa_xnew(pa_memimport_segment, 1);
+
+ if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
+ pa_xfree(seg);
+ return NULL;
}
+
+ seg->import = i;
+ seg->n_blocks = 0;
+
+ pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
+ return seg;
+}
+
+static void segment_detach(pa_memimport_segment *seg) {
+ assert(seg);
+
+ pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
+ pa_shm_free(&seg->memory);
+ pa_xfree(seg);
+}
+
+void pa_memimport_free(pa_memimport *i) {
+ pa_memexport *e;
+ pa_memblock *b;
+
+ assert(i);
+
+ /* If we've exported this block further we need to revoke that export */
+ for (e = i->pool->exports; e; e = e->next)
+ memexport_revoke_blocks(e, i);
+
+ while ((b = pa_hashmap_get_first(i->blocks)))
+ memblock_replace_import(b);
+
+ assert(pa_hashmap_size(i->segments) == 0);
+
+ pa_hashmap_free(i->blocks, NULL, NULL);
+ pa_hashmap_free(i->segments, NULL, NULL);
+
+ PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
+ pa_xfree(i);
}
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s) {
- assert(s);
- s->ref++;
- return s;
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+ pa_memblock *b;
+ pa_memimport_segment *seg;
+
+ assert(i);
+
+ if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
+ return NULL;
+
+ if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
+ if (!(seg = segment_attach(i, shm_id)))
+ return NULL;
+
+ if (offset+size > seg->memory.size)
+ return NULL;
+
+ b = pa_xnew(pa_memblock, 1);
+ b->type = PA_MEMBLOCK_IMPORTED;
+ b->read_only = 1;
+ b->ref = 1;
+ b->length = size;
+ b->data = (uint8_t*) seg->memory.ptr + offset;
+ b->pool = i->pool;
+ b->per_type.imported.id = block_id;
+ b->per_type.imported.segment = seg;
+
+ pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
+
+ seg->n_blocks++;
+
+ stat_add(b);
+
+ return b;
+}
+
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
+ pa_memblock *b;
+ assert(i);
+
+ if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
+ return -1;
+
+ memblock_replace_import(b);
+ return 0;
+}
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
+ pa_memexport *e;
+
+ assert(p);
+ assert(cb);
+
+ if (!p->memory.shared)
+ return NULL;
+
+ e = pa_xnew(pa_memexport, 1);
+ e->pool = p;
+ PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
+ PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
+ e->n_init = 0;
+ e->revoke_cb = cb;
+ e->userdata = userdata;
+
+ PA_LLIST_PREPEND(pa_memexport, p->exports, e);
+ return e;
+}
+
+void pa_memexport_free(pa_memexport *e) {
+ assert(e);
+
+ while (e->used_slots)
+ pa_memexport_process_release(e, e->used_slots - e->slots);
+
+ PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
+ pa_xfree(e);
+}
+
+int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
+ assert(e);
+
+ if (id >= e->n_init)
+ return -1;
+
+ if (!e->slots[id].block)
+ return -1;
+
+/* pa_log("Processing release for %u", id); */
+
+ assert(e->pool->stat.n_exported > 0);
+ assert(e->pool->stat.exported_size >= e->slots[id].block->length);
+
+ e->pool->stat.n_exported --;
+ e->pool->stat.exported_size -= e->slots[id].block->length;
+
+ pa_memblock_unref(e->slots[id].block);
+ e->slots[id].block = NULL;
+
+ PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
+ PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
+
+ return 0;
+}
+
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
+ struct memexport_slot *slot, *next;
+ assert(e);
+ assert(i);
+
+ for (slot = e->used_slots; slot; slot = next) {
+ uint32_t idx;
+ next = slot->next;
+
+ if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
+ slot->block->per_type.imported.segment->import != i)
+ continue;
+
+ idx = slot - e->slots;
+ e->revoke_cb(e, idx, e->userdata);
+ pa_memexport_process_release(e, idx);
+ }
+}
+
+static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
+ pa_memblock *n;
+
+ assert(p);
+ assert(b);
+
+ if (b->type == PA_MEMBLOCK_IMPORTED ||
+ b->type == PA_MEMBLOCK_POOL ||
+ b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
+ assert(b->pool == p);
+ return pa_memblock_ref(b);
+ }
+
+ if (!(n = pa_memblock_new_pool(p, b->length)))
+ return NULL;
+
+ memcpy(n->data, b->data, b->length);
+ return n;
+}
+
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
+ pa_shm *memory;
+ struct memexport_slot *slot;
+
+ assert(e);
+ assert(b);
+ assert(block_id);
+ assert(shm_id);
+ assert(offset);
+ assert(size);
+ assert(b->pool == e->pool);
+
+ if (!(b = memblock_shared_copy(e->pool, b)))
+ return -1;
+
+ if (e->free_slots) {
+ slot = e->free_slots;
+ PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
+ } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) {
+ slot = &e->slots[e->n_init++];
+ } else {
+ pa_memblock_unref(b);
+ return -1;
+ }
+
+ PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
+ slot->block = b;
+ *block_id = slot - e->slots;
+
+/* pa_log("Got block id %u", *block_id); */
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ assert(b->per_type.imported.segment);
+ memory = &b->per_type.imported.segment->memory;
+ } else {
+ assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
+ assert(b->pool);
+ memory = &b->pool->memory;
+ }
+
+ assert(b->data >= memory->ptr);
+ assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size);
+
+ *shm_id = memory->id;
+ *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr;
+ *size = b->length;
+
+ e->pool->stat.n_exported ++;
+ e->pool->stat.exported_size += b->length;
+
+ return 0;
}
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 04a0b55b1..e63e1e0fe 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -1,5 +1,5 @@
-#ifndef foomemblockhfoo
-#define foomemblockhfoo
+#ifndef foopulsememblockhfoo
+#define foopulsememblockhfoo
/* $Id$ */
@@ -25,6 +25,8 @@
#include <sys/types.h>
#include <inttypes.h>
+#include <pulsecore/llist.h>
+
/* A pa_memblock is a reference counted memory block. PulseAudio
* passed references to pa_memblocks around instead of copying
* data. See pa_memchunk for a structure that describes parts of
@@ -32,43 +34,72 @@
/* The type of memory this block points to */
typedef enum pa_memblock_type {
- PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */
- PA_MEMBLOCK_APPENDED, /* The most common kind: the data is appended to the memory block */
- PA_MEMBLOCK_DYNAMIC, /* data is a pointer to some memory allocated with pa_xmalloc() */
- PA_MEMBLOCK_USER /* User supplied memory, to be freed with free_cb */
+ PA_MEMBLOCK_POOL, /* Memory is part of the memory pool */
+ PA_MEMBLOCK_POOL_EXTERNAL, /* Data memory is part of the memory pool but the pa_memblock structure itself not */
+ PA_MEMBLOCK_APPENDED, /* the data is appended to the memory block */
+ PA_MEMBLOCK_USER, /* User supplied memory, to be freed with free_cb */
+ PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */
+ PA_MEMBLOCK_IMPORTED, /* Memory is imported from another process via shm */
} pa_memblock_type_t;
-/* A structure of keeping memory block statistics */
-/* Maintains statistics about memory blocks */
-typedef struct pa_memblock_stat {
- int ref;
- unsigned total;
- unsigned total_size;
- unsigned allocated;
- unsigned allocated_size;
-} pa_memblock_stat;
-
-typedef struct pa_memblock {
+typedef struct pa_memblock pa_memblock;
+typedef struct pa_mempool pa_mempool;
+typedef struct pa_mempool_stat pa_mempool_stat;
+typedef struct pa_memimport_segment pa_memimport_segment;
+typedef struct pa_memimport pa_memimport;
+typedef struct pa_memexport pa_memexport;
+
+typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);
+typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata);
+
+struct pa_memblock {
pa_memblock_type_t type;
- unsigned ref; /* the reference counter */
int read_only; /* boolean */
+ unsigned ref; /* the reference counter */
size_t length;
void *data;
- void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
- pa_memblock_stat *stat;
-} pa_memblock;
+ pa_mempool *pool;
-/* Allocate a new memory block of type PA_MEMBLOCK_APPENDED */
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s);
+ union {
+ struct {
+ void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
+ } user;
+
+ struct {
+ uint32_t id;
+ pa_memimport_segment *segment;
+ } imported;
+ } per_type;
+};
-/* Allocate a new memory block of type PA_MEMBLOCK_DYNAMIC. The pointer data is to be maintained be the memory block */
-pa_memblock *pa_memblock_new_dynamic(void *data, size_t length, pa_memblock_stat*s);
+struct pa_mempool_stat {
+ unsigned n_allocated;
+ unsigned n_accumulated;
+ unsigned n_imported;
+ unsigned n_exported;
+ size_t allocated_size;
+ size_t accumulated_size;
+ size_t imported_size;
+ size_t exported_size;
-/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
-pa_memblock *pa_memblock_new_fixed(void *data, size_t length, int read_only, pa_memblock_stat*s);
+ unsigned n_too_large_for_pool;
+ unsigned n_pool_full;
+};
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */
+pa_memblock *pa_memblock_new(pa_mempool *, size_t length);
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL. If the requested size is too large, return NULL */
+pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length);
/* Allocate a new memory block of type PA_MEMBLOCK_USER */
-pa_memblock *pa_memblock_new_user(void *data, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s);
+pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only);
+
+/* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc() */
+#define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0)
+
+/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
+pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only);
void pa_memblock_unref(pa_memblock*b);
pa_memblock* pa_memblock_ref(pa_memblock*b);
@@ -79,8 +110,23 @@ references to the memory. This causes the memory to be copied and
converted into a PA_MEMBLOCK_DYNAMIC type memory block */
void pa_memblock_unref_fixed(pa_memblock*b);
-pa_memblock_stat* pa_memblock_stat_new(void);
-void pa_memblock_stat_unref(pa_memblock_stat *s);
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s);
+/* The memory block manager */
+pa_mempool* pa_mempool_new(int shared);
+void pa_mempool_free(pa_mempool *p);
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
+void pa_mempool_vacuum(pa_mempool *p);
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
+void pa_memimport_free(pa_memimport *i);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
+void pa_memexport_free(pa_memexport *e);
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_process_release(pa_memexport *e, uint32_t id);
#endif
diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c
index 822bd66cd..2fd38850e 100644
--- a/src/pulsecore/memblockq.c
+++ b/src/pulsecore/memblockq.c
@@ -49,7 +49,6 @@ struct pa_memblockq {
size_t maxlength, tlength, base, prebuf, minreq;
int64_t read_index, write_index;
enum { PREBUF, RUNNING } state;
- pa_memblock_stat *memblock_stat;
pa_memblock *silence;
pa_mcalign *mcalign;
};
@@ -61,8 +60,7 @@ pa_memblockq* pa_memblockq_new(
size_t base,
size_t prebuf,
size_t minreq,
- pa_memblock *silence,
- pa_memblock_stat *s) {
+ pa_memblock *silence) {
pa_memblockq* bq;
@@ -75,7 +73,6 @@ pa_memblockq* pa_memblockq_new(
bq->base = base;
bq->read_index = bq->write_index = idx;
- bq->memblock_stat = s;
pa_log_debug(__FILE__": memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
(unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq);
@@ -586,7 +583,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
return pa_memblockq_push(bq, chunk);
if (!bq->mcalign)
- bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
+ bq->mcalign = pa_mcalign_new(bq->base);
if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
return -1;
diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h
index c35b62ddd..4d701a80e 100644
--- a/src/pulsecore/memblockq.h
+++ b/src/pulsecore/memblockq.h
@@ -69,8 +69,7 @@ pa_memblockq* pa_memblockq_new(
size_t base,
size_t prebuf,
size_t minreq,
- pa_memblock *silence,
- pa_memblock_stat *s);
+ pa_memblock *silence);
void pa_memblockq_free(pa_memblockq*bq);
diff --git a/src/pulsecore/memchunk.c b/src/pulsecore/memchunk.c
index abfc2cab7..bcf0ce043 100644
--- a/src/pulsecore/memchunk.c
+++ b/src/pulsecore/memchunk.c
@@ -32,10 +32,13 @@
#include "memchunk.h"
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) {
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
pa_memblock *n;
size_t l;
- assert(c && c->memblock && c->memblock->ref >= 1);
+
+ assert(c);
+ assert(c->memblock);
+ assert(c->memblock->ref >= 1);
if (c->memblock->ref == 1 && !c->memblock->read_only && c->memblock->length >= c->index+min)
return;
@@ -44,7 +47,7 @@ void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min)
if (l < min)
l = min;
- n = pa_memblock_new(l, s);
+ n = pa_memblock_new(c->memblock->pool, l);
memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length);
pa_memblock_unref(c->memblock);
c->memblock = n;
diff --git a/src/pulsecore/memchunk.h b/src/pulsecore/memchunk.h
index 1b26c2e64..b8ce62495 100644
--- a/src/pulsecore/memchunk.h
+++ b/src/pulsecore/memchunk.h
@@ -36,7 +36,7 @@ typedef struct pa_memchunk {
/* Make a memchunk writable, i.e. make sure that the caller may have
* exclusive access to the memblock and it is not read_only. If needed
* the memblock in the structure is replaced by a copy. */
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min);
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min);
/* Invalidate a memchunk. This does not free the cotaining memblock,
* but sets all members to zero. */
diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c
index f1a827bce..2fadeca30 100644
--- a/src/pulsecore/protocol-esound.c
+++ b/src/pulsecore/protocol-esound.c
@@ -377,8 +377,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t
pa_frame_size(&ss),
(size_t) -1,
l/PLAYBACK_BUFFER_FRAGMENTS,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
c->playback.fragment_size = l/10;
@@ -469,8 +468,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co
pa_frame_size(&ss),
1,
0,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2);
c->source_output->push = source_output_push_cb;
@@ -722,7 +720,7 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_
CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
assert(!c->scache.memchunk.memblock);
- c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat);
+ c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length);
c->scache.memchunk.index = 0;
c->scache.memchunk.length = sc_length;
c->scache.sample_spec = ss;
@@ -941,7 +939,7 @@ static int do_read(struct connection *c) {
}
if (!c->playback.current_memblock) {
- c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+ c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 0b79892c5..2c9b3566c 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -348,8 +348,7 @@ static struct record_stream* record_stream_new(
base = pa_frame_size(ss),
1,
0,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
assert(s->memblockq);
s->fragment_size = (fragment_size/base)*base;
@@ -448,7 +447,7 @@ static struct playback_stream* playback_stream_new(
start_index = 0;
}
- silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+ silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
s->memblockq = pa_memblockq_new(
start_index,
@@ -457,8 +456,7 @@ static struct playback_stream* playback_stream_new(
pa_frame_size(ss),
prebuf,
minreq,
- silence,
- c->protocol->core->memblock_stat);
+ silence);
pa_memblock_unref(silence);
@@ -1076,6 +1074,7 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
pa_tagstruct *reply;
+ const pa_mempool_stat *stat;
assert(c && t);
if (!pa_tagstruct_eof(t)) {
@@ -1085,11 +1084,13 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
+ stat = pa_mempool_get_stat(c->protocol->core->mempool);
+
reply = reply_new(tag);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
+ pa_tagstruct_putu32(reply, stat->n_allocated);
+ pa_tagstruct_putu32(reply, stat->allocated_size);
+ pa_tagstruct_putu32(reply, stat->n_accumulated);
+ pa_tagstruct_putu32(reply, stat->accumulated_size);
pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
pa_pstream_send_tagstruct(c->pstream, reply);
}
@@ -2256,7 +2257,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
pa_memblock_ref(u->memchunk.memblock);
u->length = 0;
} else {
- u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
u->memchunk.index = u->memchunk.length = 0;
}
}
@@ -2349,9 +2350,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
c->client->userdata = c;
c->client->owner = p->module;
- c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
+ c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
assert(c->pstream);
+ pa_pstream_use_shm(c->pstream, 1);
+
pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c
index 3705986d5..924ee29ec 100644
--- a/src/pulsecore/protocol-simple.c
+++ b/src/pulsecore/protocol-simple.c
@@ -128,7 +128,7 @@ static int do_read(struct connection *c) {
}
if (!c->playback.current_memblock) {
- c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+ c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
@@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_frame_size(&p->sample_spec),
(size_t) -1,
l/PLAYBACK_BUFFER_FRAGMENTS,
- NULL,
- p->core->memblock_stat);
+ NULL);
assert(c->input_memblockq);
pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
c->playback.fragment_size = l/10;
@@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_frame_size(&p->sample_spec),
1,
0,
- NULL,
- p->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
pa_source_notify(c->source_output->source);
}
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 7096d65af..421f5de9c 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -49,28 +49,45 @@
#include "pstream.h"
+/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
+#define PA_FLAG_SHMDATA 0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE 0xC0000000LU
+#define PA_FLAG_SHMMASK 0xFF000000LU
+#define PA_FLAG_SEEKMASK 0x000000FFLU
+
+/* The sequence descriptor header consists of 5 32bit integers: */
enum {
PA_PSTREAM_DESCRIPTOR_LENGTH,
PA_PSTREAM_DESCRIPTOR_CHANNEL,
PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
- PA_PSTREAM_DESCRIPTOR_SEEK,
+ PA_PSTREAM_DESCRIPTOR_FLAGS,
PA_PSTREAM_DESCRIPTOR_MAX
};
+/* If we have an SHM block, this info follows the descriptor */
+enum {
+ PA_PSTREAM_SHM_BLOCKID,
+ PA_PSTREAM_SHM_SHMID,
+ PA_PSTREAM_SHM_INDEX,
+ PA_PSTREAM_SHM_LENGTH,
+ PA_PSTREAM_SHM_MAX
+};
+
typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
#define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
#define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
struct item_info {
- enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
+ enum {
+ PA_PSTREAM_ITEM_PACKET,
+ PA_PSTREAM_ITEM_MEMBLOCK,
+ PA_PSTREAM_ITEM_SHMRELEASE,
+ PA_PSTREAM_ITEM_SHMREVOKE
+ } type;
- /* memblock info */
- pa_memchunk chunk;
- uint32_t channel;
- int64_t offset;
- pa_seek_mode_t seek_mode;
/* packet info */
pa_packet *packet;
@@ -78,6 +95,15 @@ struct item_info {
int with_creds;
pa_creds creds;
#endif
+
+ /* memblock info */
+ pa_memchunk chunk;
+ uint32_t channel;
+ int64_t offset;
+ pa_seek_mode_t seek_mode;
+
+ /* release/revoke info */
+ uint32_t block_id;
};
struct pa_pstream {
@@ -91,20 +117,26 @@ struct pa_pstream {
int dead;
struct {
- struct item_info* current;
pa_pstream_descriptor descriptor;
+ struct item_info* current;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
void *data;
size_t index;
} write;
struct {
+ pa_pstream_descriptor descriptor;
pa_memblock *memblock;
pa_packet *packet;
- pa_pstream_descriptor descriptor;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
void *data;
size_t index;
} read;
+ int use_shm;
+ pa_memimport *import;
+ pa_memexport *export;
+
pa_pstream_packet_cb_t recieve_packet_callback;
void *recieve_packet_callback_userdata;
@@ -117,7 +149,7 @@ struct pa_pstream {
pa_pstream_notify_cb_t die_callback;
void *die_callback_userdata;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
#ifdef HAVE_CREDS
pa_creds read_creds, write_creds;
@@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)
do_something(p);
}
-pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
+
+pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
pa_pstream *p;
+
+ assert(m);
assert(io);
+ assert(pool);
p = pa_xnew(pa_pstream, 1);
-
p->ref = 1;
p->io = io;
pa_iochannel_set_callback(io, io_callback, p);
-
p->dead = 0;
p->mainloop = m;
@@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta
p->write.current = NULL;
p->write.index = 0;
-
p->read.memblock = NULL;
p->read.packet = NULL;
p->read.index = 0;
p->recieve_packet_callback = NULL;
p->recieve_packet_callback_userdata = NULL;
-
p->recieve_memblock_callback = NULL;
p->recieve_memblock_callback_userdata = NULL;
-
p->drain_callback = NULL;
p->drain_callback_userdata = NULL;
-
p->die_callback = NULL;
p->die_callback_userdata = NULL;
- p->memblock_stat = s;
+ p->mempool = pool;
+
+ p->use_shm = 0;
+ p->export = NULL;
+ p->import = NULL;
pa_iochannel_socket_set_rcvbuf(io, 1024*8);
pa_iochannel_socket_set_sndbuf(io, 1024*8);
@@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) {
if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
assert(i->chunk.memblock);
pa_memblock_unref(i->chunk.memblock);
- } else {
- assert(i->type == PA_PSTREAM_ITEM_PACKET);
+ } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
assert(i->packet);
pa_packet_unref(i->packet);
}
@@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) {
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
struct item_info *i;
- assert(p && packet && p->ref >= 1);
+
+ assert(p);
+ assert(p->ref >= 1);
+ assert(packet);
if (p->dead)
return;
-/* pa_log(__FILE__": push-packet %p", packet); */
-
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
i->packet = pa_packet_ref(packet);
+
#ifdef HAVE_CREDS
if ((i->with_creds = !!creds))
i->creds = *creds;
@@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
struct item_info *i;
- assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
+
+ assert(p);
+ assert(p->ref >= 1);
+ assert(channel != (uint32_t) -1);
+ assert(chunk);
if (p->dead)
return;
-
-/* pa_log(__FILE__": push-memblock %p", chunk); */
-
+
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_MEMBLOCK;
i->chunk = *chunk;
@@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
p->mainloop->defer_enable(p->defer_event, 1);
}
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
+ struct item_info *item;
+ pa_pstream *p = userdata;
+
+ assert(p);
+ assert(p->ref >= 1);
+
+ if (p->dead)
+ return;
+
+/* pa_log(__FILE__": Releasing block %u", block_id); */
+
+ item = pa_xnew(struct item_info, 1);
+ item->type = PA_PSTREAM_ITEM_SHMRELEASE;
+ item->block_id = block_id;
+#ifdef HAVE_CREDS
+ item->with_creds = 0;
+#endif
+
+ pa_queue_push(p->send_queue, item);
+ p->mainloop->defer_enable(p->defer_event, 1);
+}
+
+static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
+ struct item_info *item;
+ pa_pstream *p = userdata;
+
+ assert(p);
+ assert(p->ref >= 1);
+
+ if (p->dead)
+ return;
+
+/* pa_log(__FILE__": Revoking block %u", block_id); */
+
+ item = pa_xnew(struct item_info, 1);
+ item->type = PA_PSTREAM_ITEM_SHMREVOKE;
+ item->block_id = block_id;
+#ifdef HAVE_CREDS
+ item->with_creds = 0;
+#endif
+
+ pa_queue_push(p->send_queue, item);
+ p->mainloop->defer_enable(p->defer_event, 1);
+}
+
static void prepare_next_write_item(pa_pstream *p) {
assert(p);
@@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) {
return;
p->write.index = 0;
+ p->write.data = NULL;
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
- /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
assert(p->write.current->packet);
p->write.data = p->write.current->packet->data;
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
+ } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
+
+ } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
} else {
- assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
- p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+ uint32_t flags;
+ int send_payload = 1;
+
+ assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+ assert(p->write.current->chunk.memblock);
+
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
+
+ flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
+
+ if (p->use_shm) {
+ uint32_t block_id, shm_id;
+ size_t offset, length;
+
+ assert(p->export);
+
+ if (pa_memexport_put(p->export,
+ p->write.current->chunk.memblock,
+ &block_id,
+ &shm_id,
+ &offset,
+ &length) >= 0) {
+
+ flags |= PA_FLAG_SHMDATA;
+ send_payload = 0;
+
+ p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+ p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+ p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+ p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
+ p->write.data = p->write.shm_info;
+ }
+/* else */
+/* pa_log_warn(__FILE__": Failed to export memory block."); */
+ }
+
+ if (send_payload) {
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+ p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+ }
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
}
#ifdef HAVE_CREDS
@@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) {
p->write_creds = p->write.current->creds;
#endif
-
}
static int do_write(pa_pstream *p) {
@@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) {
if (!p->write.current)
return 0;
- assert(p->write.data);
-
if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
d = (uint8_t*) p->write.descriptor + p->write.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
} else {
+ assert(p->write.data);
+
d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
+ assert(l > 0);
+
#ifdef HAVE_CREDS
if (p->send_creds_now) {
@@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) {
p->write.index += r;
- if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+ if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
assert(p->write.current);
item_free(p->write.current, (void *) 1);
p->write.current = NULL;
@@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) {
p->read.index += r;
if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+ uint32_t flags, length, channel;
/* Reading of frame descriptor complete */
- /* Frame size too large */
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
- pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
+ flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+
+ if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
+ pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled.");
+ return -1;
+ }
+
+ if (flags == PA_FLAG_SHMRELEASE) {
+
+ /* This is a SHM memblock release frame with no payload */
+
+/* pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ assert(p->export);
+ pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+
+ } else if (flags == PA_FLAG_SHMREVOKE) {
+
+ /* This is a SHM memblock revoke frame with no payload */
+
+/* pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ assert(p->import);
+ pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+ }
+
+ length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+
+ if (length > FRAME_SIZE_MAX) {
+ pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length);
return -1;
}
assert(!p->read.packet && !p->read.memblock);
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
+ channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+
+ if (channel == (uint32_t) -1) {
+
+ if (flags != 0) {
+ pa_log_warn(__FILE__": Received packet frame with invalid flags value.");
+ return -1;
+ }
+
/* Frame is a packet frame */
- p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
+ p->read.packet = pa_packet_new(length);
p->read.data = p->read.packet->data;
+
} else {
- /* Frame is a memblock frame */
- p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
- p->read.data = p->read.memblock->data;
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
- pa_log_warn(__FILE__": Invalid seek mode");
+ if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
+ pa_log_warn(__FILE__": Received memblock frame with invalid seek mode.");
+ return -1;
+ }
+
+ if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+
+ if (length != sizeof(p->read.shm_info)) {
+ pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length.");
+ return -1;
+ }
+
+ /* Frame is a memblock frame referencing an SHM memblock */
+ p->read.data = p->read.shm_info;
+
+ } else if ((flags & PA_FLAG_SHMMASK) == 0) {
+
+ /* Frame is a memblock frame */
+
+ p->read.memblock = pa_memblock_new(p->mempool, length);
+ p->read.data = p->read.memblock->data;
+ } else {
+
+ pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value.");
return -1;
}
}
@@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) {
} else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
- if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
+ if (p->read.memblock && p->recieve_memblock_callback) {
+
+ /* Is this memblock data? Than pass it to the user */
l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
if (l > 0) {
@@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) {
p,
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->recieve_memblock_callback_userdata);
}
/* Drop seek info for following callbacks */
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
+ p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
@@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) {
/* Frame complete */
if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+
if (p->read.memblock) {
- assert(!p->read.packet);
-
+
+ /* This was a memblock frame. We can unref the memblock now */
pa_memblock_unref(p->read.memblock);
- p->read.memblock = NULL;
- } else {
- assert(p->read.packet);
+
+ } else if (p->read.packet) {
if (p->recieve_packet_callback)
#ifdef HAVE_CREDS
@@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) {
#endif
pa_packet_unref(p->read.packet);
- p->read.packet = NULL;
+ } else {
+ pa_memblock *b;
+
+ assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+
+ assert(p->import);
+
+ if (!(b = pa_memimport_get(p->import,
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+
+ pa_log_warn(__FILE__": Failed to import memory block.");
+ return -1;
+ }
+
+ if (p->recieve_memblock_callback) {
+ int64_t offset;
+ pa_memchunk chunk;
+
+ chunk.memblock = b;
+ chunk.index = 0;
+ chunk.length = b->length;
+
+ offset = (int64_t) (
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+
+ p->recieve_memblock_callback(
+ p,
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ offset,
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ &chunk,
+ p->recieve_memblock_callback_userdata);
+ }
+
+ pa_memblock_unref(b);
}
- p->read.index = 0;
-#ifdef HAVE_CREDS
- p->read_creds_valid = 0;
-#endif
+ goto frame_done;
}
}
- return 0;
+ return 0;
+
+frame_done:
+ p->read.memblock = NULL;
+ p->read.packet = NULL;
+ p->read.index = 0;
+
+#ifdef HAVE_CREDS
+ p->read_creds_valid = 0;
+#endif
+
+ return 0;
}
void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
@@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) {
p->dead = 1;
+ if (p->import) {
+ pa_memimport_free(p->import);
+ p->import = NULL;
+ }
+
+ if (p->export) {
+ pa_memexport_free(p->export);
+ p->export = NULL;
+ }
+
if (p->io) {
pa_iochannel_free(p->io);
p->io = NULL;
@@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) {
p->drain_callback = NULL;
p->recieve_packet_callback = NULL;
p->recieve_memblock_callback = NULL;
+
+
+}
+
+void pa_pstream_use_shm(pa_pstream *p, int enable) {
+ assert(p);
+ assert(p->ref >= 1);
+
+ p->use_shm = enable;
+
+ if (!p->import)
+ p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
+
+ if (!p->export)
+ p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
}
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 789e40bc2..26bb7699c 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -39,7 +39,7 @@ typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const p
typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata);
-pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s);
+pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *p);
void pa_pstream_unref(pa_pstream*p);
pa_pstream* pa_pstream_ref(pa_pstream*p);
@@ -54,6 +54,8 @@ void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void
int pa_pstream_is_pending(pa_pstream *p);
+void pa_pstream_use_shm(pa_pstream *p, int enable);
+
void pa_pstream_close(pa_pstream *p);
#endif
diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c
index 23cdf3811..742267146 100644
--- a/src/pulsecore/resampler.c
+++ b/src/pulsecore/resampler.c
@@ -42,7 +42,7 @@ struct pa_resampler {
pa_sample_spec i_ss, o_ss;
pa_channel_map i_cm, o_cm;
size_t i_fz, o_fz;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
void (*impl_free)(pa_resampler *r);
void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
@@ -71,15 +71,16 @@ static int libsamplerate_init(pa_resampler*r);
static int trivial_init(pa_resampler*r);
pa_resampler* pa_resampler_new(
- const pa_sample_spec *a,
- const pa_channel_map *am,
- const pa_sample_spec *b,
- const pa_channel_map *bm,
- pa_memblock_stat *s,
- pa_resample_method_t resample_method) {
+ pa_mempool *pool,
+ const pa_sample_spec *a,
+ const pa_channel_map *am,
+ const pa_sample_spec *b,
+ const pa_channel_map *bm,
+ pa_resample_method_t resample_method) {
pa_resampler *r = NULL;
+ assert(pool);
assert(a);
assert(b);
assert(pa_sample_spec_valid(a));
@@ -88,7 +89,7 @@ pa_resampler* pa_resampler_new(
r = pa_xnew(pa_resampler, 1);
r->impl_data = NULL;
- r->memblock_stat = s;
+ r->mempool = pool;
r->resample_method = resample_method;
r->impl_free = NULL;
@@ -450,7 +451,7 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun
assert(p);
/* Take the existing buffer and make it a memblock */
- out->memblock = pa_memblock_new_dynamic(*p, out->length, r->memblock_stat);
+ out->memblock = pa_memblock_new_malloced(r->mempool, *p, out->length);
*p = NULL;
}
} else {
@@ -549,7 +550,7 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;
out->index = 0;
- out->memblock = pa_memblock_new(l, r->memblock_stat);
+ out->memblock = pa_memblock_new(r->mempool, l);
for (o_index = 0;; o_index++, u->o_counter++) {
unsigned j;
diff --git a/src/pulsecore/resampler.h b/src/pulsecore/resampler.h
index c1199e5ce..327e24a29 100644
--- a/src/pulsecore/resampler.h
+++ b/src/pulsecore/resampler.h
@@ -43,12 +43,12 @@ typedef enum pa_resample_method {
} pa_resample_method_t;
pa_resampler* pa_resampler_new(
- const pa_sample_spec *a,
- const pa_channel_map *am,
- const pa_sample_spec *b,
- const pa_channel_map *bm,
- pa_memblock_stat *s,
- pa_resample_method_t resample_method);
+ pa_mempool *pool,
+ const pa_sample_spec *a,
+ const pa_channel_map *am,
+ const pa_sample_spec *b,
+ const pa_channel_map *bm,
+ pa_resample_method_t resample_method);
void pa_resampler_free(pa_resampler *r);
diff --git a/src/pulsecore/sample-util.c b/src/pulsecore/sample-util.c
index 638f80670..7f5d8a020 100644
--- a/src/pulsecore/sample-util.c
+++ b/src/pulsecore/sample-util.c
@@ -35,13 +35,14 @@
#include "sample-util.h"
#include "endianmacros.h"
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) {
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) {
+ assert(pool);
assert(spec);
if (length == 0)
- length = pa_bytes_per_second(spec)/10; /* 100 ms */
+ length = pa_bytes_per_second(spec)/20; /* 50 ms */
- return pa_silence_memblock(pa_memblock_new(length, s), spec);
+ return pa_silence_memblock(pa_memblock_new(pool, length), spec);
}
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
diff --git a/src/pulsecore/sample-util.h b/src/pulsecore/sample-util.h
index 3ebb7e2ea..6b7707925 100644
--- a/src/pulsecore/sample-util.h
+++ b/src/pulsecore/sample-util.h
@@ -28,7 +28,7 @@
#include <pulsecore/memchunk.h>
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec);
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s);
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length);
void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec);
void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec);
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index b3fabad3a..b5ba9df19 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -136,9 +136,9 @@ pa_sink_input* pa_sink_input_new(
!pa_channel_map_equal(&data->channel_map, &data->sink->channel_map))
if (!(resampler = pa_resampler_new(
+ core->mempool,
&data->sample_spec, &data->channel_map,
&data->sink->sample_spec, &data->sink->channel_map,
- core->memblock_stat,
data->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return NULL;
@@ -299,7 +299,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
* while until the old sink has drained its playback buffer */
if (!i->silence_memblock)
- i->silence_memblock = pa_silence_memblock_new(&i->sink->sample_spec, SILENCE_BUFFER_LENGTH, i->sink->core->memblock_stat);
+ i->silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH);
chunk->memblock = pa_memblock_ref(i->silence_memblock);
chunk->index = 0;
@@ -338,7 +338,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
/* It might be necessary to adjust the volume here */
if (do_volume_adj_here && !volume_is_norm) {
- pa_memchunk_make_writable(&tchunk, i->sink->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&tchunk, 0);
pa_volume_memchunk(&tchunk, &i->sample_spec, &i->volume);
}
@@ -540,9 +540,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
/* Okey, we need a new resampler for the new sink */
if (!(new_resampler = pa_resampler_new(
+ dest->core->mempool,
&i->sample_spec, &i->channel_map,
&dest->sample_spec, &dest->channel_map,
- dest->core->memblock_stat,
i->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return -1;
@@ -553,7 +553,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
pa_usec_t old_latency, new_latency;
pa_usec_t silence_usec = 0;
- buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL, NULL);
+ buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL);
/* Let's do a little bit of Voodoo for compensating latency
* differences */
@@ -599,7 +599,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
chunk.length = n;
if (!volume_is_norm) {
- pa_memchunk_make_writable(&chunk, origin->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&chunk, 0);
pa_volume_memchunk(&chunk, &origin->sample_spec, &volume);
}
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 557d5efcb..aacb89fd5 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -298,14 +298,14 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);
if (s->sw_muted || !pa_cvolume_is_norm(&volume)) {
- pa_memchunk_make_writable(result, s->core->memblock_stat, 0);
+ pa_memchunk_make_writable(result, 0);
if (s->sw_muted)
pa_silence_memchunk(result, &s->sample_spec);
else
pa_volume_memchunk(result, &s->sample_spec, &volume);
}
} else {
- result->memblock = pa_memblock_new(length, s->core->memblock_stat);
+ result->memblock = pa_memblock_new(s->core->mempool, length);
assert(result->memblock);
/* pa_log("mixing %i", n); */
@@ -429,7 +429,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) {
/*** This needs optimization ***/
- result->memblock = pa_memblock_new(result->length = length, s->core->memblock_stat);
+ result->memblock = pa_memblock_new(s->core->mempool, result->length = length);
result->index = 0;
pa_sink_render_into_full(s, result);
diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c
index 6063b93e3..6782f50eb 100644
--- a/src/pulsecore/sound-file-stream.c
+++ b/src/pulsecore/sound-file-stream.c
@@ -75,7 +75,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
uint32_t fs = pa_frame_size(&i->sample_spec);
sf_count_t n;
- u->memchunk.memblock = pa_memblock_new(BUF_SIZE, i->sink->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
u->memchunk.index = 0;
if (u->readf_function) {
diff --git a/src/pulsecore/sound-file.c b/src/pulsecore/sound-file.c
index d11d4b9d9..256cce434 100644
--- a/src/pulsecore/sound-file.c
+++ b/src/pulsecore/sound-file.c
@@ -34,7 +34,7 @@
#include "sound-file.h"
#include "core-scache.h"
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s) {
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk) {
SNDFILE*sf = NULL;
SF_INFO sfinfo;
int ret = -1;
@@ -92,7 +92,7 @@ int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *ma
goto finish;
}
- chunk->memblock = pa_memblock_new(l, s);
+ chunk->memblock = pa_memblock_new(pool, l);
assert(chunk->memblock);
chunk->index = 0;
chunk->length = l;
diff --git a/src/pulsecore/sound-file.h b/src/pulsecore/sound-file.h
index 0b81d97e1..7e3c82eab 100644
--- a/src/pulsecore/sound-file.h
+++ b/src/pulsecore/sound-file.h
@@ -26,7 +26,7 @@
#include <pulse/channelmap.h>
#include <pulsecore/memchunk.h>
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s);
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk);
int pa_sound_file_too_big_to_cache(const char *fname);
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 7371474f1..f9d66f6d1 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -115,9 +115,9 @@ pa_source_output* pa_source_output_new(
if (!pa_sample_spec_equal(&data->sample_spec, &data->source->sample_spec) ||
!pa_channel_map_equal(&data->channel_map, &data->source->channel_map))
if (!(resampler = pa_resampler_new(
+ core->mempool,
&data->source->sample_spec, &data->source->channel_map,
&data->sample_spec, &data->channel_map,
- core->memblock_stat,
data->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return NULL;
@@ -330,9 +330,9 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
/* Okey, we need a new resampler for the new sink */
if (!(new_resampler = pa_resampler_new(
+ dest->core->mempool,
&dest->sample_spec, &dest->channel_map,
&o->sample_spec, &o->channel_map,
- dest->core->memblock_stat,
o->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return -1;
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 0d55da448..cb5b10302 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -211,7 +211,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
pa_memchunk vchunk = *chunk;
pa_memblock_ref(vchunk.memblock);
- pa_memchunk_make_writable(&vchunk, s->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&vchunk, 0);
if (s->sw_muted)
pa_silence_memchunk(&vchunk, &s->sample_spec);
else