diff options
Diffstat (limited to 'src/pulsecore/thread-mq.c')
-rw-r--r-- | src/pulsecore/thread-mq.c | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c index 9b879425..7e39c577 100644 --- a/src/pulsecore/thread-mq.c +++ b/src/pulsecore/thread-mq.c @@ -43,15 +43,15 @@ PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq); -static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { +static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { pa_thread_mq *q = userdata; pa_asyncmsgq *aq; - pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd); + pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd); pa_assert(events == PA_IO_EVENT_INPUT); pa_asyncmsgq_ref(aq = q->outq); - pa_asyncmsgq_after_poll(aq); + pa_asyncmsgq_write_after_poll(aq); for (;;) { pa_msgobject *object; @@ -68,14 +68,24 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even pa_asyncmsgq_done(aq, ret); } - if (pa_asyncmsgq_before_poll(aq) == 0) + if (pa_asyncmsgq_read_before_poll(aq) == 0) break; } pa_asyncmsgq_unref(aq); } -void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { +static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_thread_mq *q = userdata; + + pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd); + pa_assert(events == PA_IO_EVENT_INPUT); + + pa_asyncmsgq_write_after_poll(q->inq); + pa_asyncmsgq_write_before_poll(q->inq); +} + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) { pa_assert(q); pa_assert(mainloop); @@ -83,15 +93,22 @@ void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { pa_assert_se(q->inq = pa_asyncmsgq_new(0)); pa_assert_se(q->outq = pa_asyncmsgq_new(0)); - pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0); - pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q)); + pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); + pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); + + pa_asyncmsgq_write_before_poll(q->inq); + pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q)); + + pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq); + pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq); } void pa_thread_mq_done(pa_thread_mq *q) { pa_assert(q); - q->mainloop->io_free(q->io_event); - q->io_event = NULL; + q->mainloop->io_free(q->read_event); + q->mainloop->io_free(q->write_event); + q->read_event = q->write_event = NULL; pa_asyncmsgq_unref(q->inq); pa_asyncmsgq_unref(q->outq); |