diff options
author | aliguori <aliguori@c046a42c-6fe2-441c-8c8c-71466251a162> | 2008-12-12 16:41:40 +0000 |
---|---|---|
committer | aliguori <aliguori@c046a42c-6fe2-441c-8c8c-71466251a162> | 2008-12-12 16:41:40 +0000 |
commit | 3c529d935923a70519557d420db1d5a09a65086a (patch) | |
tree | 347898a09209ea707e500da513515a72ae80377e /posix-aio-compat.c | |
parent | 8de24106355d25512a8578ac83dab0c7515575b0 (diff) |
Replace posix-aio with custom thread pool
glibc implements posix-aio as a thread pool and imposes a number of limitations.
1) it limits one request per-file descriptor. we hack around this by dup()'ing
file descriptors which is hideously ugly
2) it's impossible to add new interfaces and we need a vectored read/write
operation to properly support a zero-copy API.
What has been suggested to me by glibc folks, is to implement whatever new
interfaces we want and then it can eventually be proposed for standardization.
This requires that we implement our own posix-aio implementation though.
This patch implements posix-aio using pthreads. It immediately eliminates the
need for fd pooling.
It performs at least as well as the current posix-aio code (in some
circumstances, even better).
Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>
git-svn-id: svn://svn.savannah.nongnu.org/qemu/trunk@5996 c046a42c-6fe2-441c-8c8c-71466251a162
Diffstat (limited to 'posix-aio-compat.c')
-rw-r--r-- | posix-aio-compat.c | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/posix-aio-compat.c b/posix-aio-compat.c new file mode 100644 index 000000000..232b511f9 --- /dev/null +++ b/posix-aio-compat.c @@ -0,0 +1,202 @@ +/* + * QEMU posix-aio emulation + * + * Copyright IBM, Corp. 2008 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + */ + +#include <pthread.h> +#include <unistd.h> +#include <errno.h> +#include <sys/time.h> +#include "osdep.h" + +#include "posix-aio-compat.h" + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static pthread_t thread_id; +static int max_threads = 64; +static int cur_threads = 0; +static int idle_threads = 0; +static TAILQ_HEAD(, qemu_paiocb) request_list; + +static void *aio_thread(void *unused) +{ + sigset_t set; + + /* block all signals */ + sigfillset(&set); + sigprocmask(SIG_BLOCK, &set, NULL); + + while (1) { + struct qemu_paiocb *aiocb; + size_t offset; + int ret = 0; + + pthread_mutex_lock(&lock); + + while (TAILQ_EMPTY(&request_list) && + !(ret == ETIMEDOUT)) { + struct timespec ts = { 0 }; + qemu_timeval tv; + + qemu_gettimeofday(&tv); + ts.tv_sec = tv.tv_sec + 10; + ret = pthread_cond_timedwait(&cond, &lock, &ts); + } + + if (ret == ETIMEDOUT) + break; + + aiocb = TAILQ_FIRST(&request_list); + TAILQ_REMOVE(&request_list, aiocb, node); + + offset = 0; + aiocb->active = 1; + + idle_threads--; + pthread_mutex_unlock(&lock); + + while (offset < aiocb->aio_nbytes) { + ssize_t len; + + if (aiocb->is_write) + len = pwrite(aiocb->aio_fildes, + (const char *)aiocb->aio_buf + offset, + aiocb->aio_nbytes - offset, + aiocb->aio_offset + offset); + else + len = pread(aiocb->aio_fildes, + (char *)aiocb->aio_buf + offset, + aiocb->aio_nbytes - offset, + aiocb->aio_offset + offset); + + if (len == -1 && errno == EINTR) + continue; + else if (len == -1) { + pthread_mutex_lock(&lock); + aiocb->ret = -errno; + pthread_mutex_unlock(&lock); + break; + } else if (len == 0) + break; + + offset += len; + + pthread_mutex_lock(&lock); + aiocb->ret = offset; + pthread_mutex_unlock(&lock); + } + + pthread_mutex_lock(&lock); + idle_threads++; + pthread_mutex_unlock(&lock); + + sigqueue(getpid(), + aiocb->aio_sigevent.sigev_signo, + aiocb->aio_sigevent.sigev_value); + } + + idle_threads--; + cur_threads--; + pthread_mutex_unlock(&lock); + + return NULL; +} + +static int spawn_thread(void) +{ + pthread_attr_t attr; + int ret; + + cur_threads++; + idle_threads++; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + ret = pthread_create(&thread_id, &attr, aio_thread, NULL); + pthread_attr_destroy(&attr); + + return ret; +} + +int qemu_paio_init(struct qemu_paioinit *aioinit) +{ + TAILQ_INIT(&request_list); + + return 0; +} + +static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write) +{ + aiocb->is_write = is_write; + aiocb->ret = -EINPROGRESS; + aiocb->active = 0; + pthread_mutex_lock(&lock); + if (idle_threads == 0 && cur_threads < max_threads) + spawn_thread(); + TAILQ_INSERT_TAIL(&request_list, aiocb, node); + pthread_mutex_unlock(&lock); + pthread_cond_broadcast(&cond); + + return 0; +} + +int qemu_paio_read(struct qemu_paiocb *aiocb) +{ + return qemu_paio_submit(aiocb, 0); +} + +int qemu_paio_write(struct qemu_paiocb *aiocb) +{ + return qemu_paio_submit(aiocb, 1); +} + +ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) +{ + ssize_t ret; + + pthread_mutex_lock(&lock); + ret = aiocb->ret; + pthread_mutex_unlock(&lock); + + return ret; +} + +int qemu_paio_error(struct qemu_paiocb *aiocb) +{ + ssize_t ret = qemu_paio_return(aiocb); + + if (ret < 0) + ret = -ret; + else + ret = 0; + + return ret; +} + +int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb) +{ + int ret; + + pthread_mutex_lock(&lock); + if (!aiocb->active) { + TAILQ_REMOVE(&request_list, aiocb, node); + aiocb->ret = -ECANCELED; + ret = QEMU_PAIO_CANCELED; + } else if (aiocb->ret == -EINPROGRESS) + ret = QEMU_PAIO_NOTCANCELED; + else + ret = QEMU_PAIO_ALLDONE; + pthread_mutex_unlock(&lock); + + return ret; +} + |