From b4de6f93aed733b8fc8d103e5ced69ebe7d659e6 Mon Sep 17 00:00:00 2001 From: Rich Felker Date: Fri, 9 Sep 2011 01:07:38 -0400 Subject: [PATCH] implement POSIX asynchronous io some features are not yet supported, and only minimal testing has been performed. should be considered experimental at this point. --- include/aio.h | 60 +++++++++++++++++ src/aio/aio_cancel.c | 16 +++++ src/aio/aio_error.c | 6 ++ src/aio/aio_fsync.c | 9 +++ src/aio/aio_readwrite.c | 104 +++++++++++++++++++++++++++++ src/aio/aio_return.c | 6 ++ src/aio/aio_suspend.c | 57 ++++++++++++++++ src/aio/lio_listio.c | 140 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 398 insertions(+) create mode 100644 include/aio.h create mode 100644 src/aio/aio_cancel.c create mode 100644 src/aio/aio_error.c create mode 100644 src/aio/aio_fsync.c create mode 100644 src/aio/aio_readwrite.c create mode 100644 src/aio/aio_return.c create mode 100644 src/aio/aio_suspend.c create mode 100644 src/aio/lio_listio.c diff --git a/include/aio.h b/include/aio.h new file mode 100644 index 00000000..893c0e9e --- /dev/null +++ b/include/aio.h @@ -0,0 +1,60 @@ +#ifndef _AIO_H +#define _AIO_H + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined(_POSIX_SOURCE) || defined(_POSIX_C_SOURCE) \ + || defined(_XOPEN_SOURCE) || defined(_GNU_SOURCE) + +#include +#include + +#define __NEED_ssize_t +#define __NEED_off_t + +#include + +struct aiocb { + int aio_filedes, aio_lio_opcode, aio_reqprio; + volatile void *aio_buf; + size_t aio_nbytes; + struct sigevent aio_sigevent; + void *__td; + int __lock[2]; + int __err; + ssize_t __ret; + off_t aio_offset; + void *__next, *__prev; + char __dummy4[32-2*sizeof(void *)]; +}; + +#define AIO_CANCELED 0 +#define AIO_NOTCANCELED 1 +#define AIO_ALLDONE 2 + +#define LIO_READ 0 +#define LIO_WRITE 1 +#define LIO_NOP 2 + +#define LIO_WAIT 0 +#define LIO_NOWAIT 1 + +int aio_read(struct aiocb *); +int aio_write(struct aiocb *); +int aio_error(struct aiocb *); +ssize_t aio_return(struct aiocb *); +int aio_cancel(int, struct aiocb *); +int aio_suspend(struct aiocb *const [], int, const struct timespec *); +int aio_fsync(int, struct aiocb *); + +int lio_listio(int, struct aiocb *const [], int, struct sigevent *); + +#endif + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/aio/aio_cancel.c b/src/aio/aio_cancel.c new file mode 100644 index 00000000..5a753b1f --- /dev/null +++ b/src/aio/aio_cancel.c @@ -0,0 +1,16 @@ +#include +#include +#include + +int aio_cancel(int fd, struct aiocb *cb) +{ + if (!cb) { + /* FIXME: for correctness, we should return AIO_ALLDONE + * if there are no outstanding aio operations on this + * file descriptor, but that would require making aio + * much slower, and seems to have little advantage since + * we don't support cancellation anyway. */ + return AIO_NOTCANCELED; + } + return cb->__err==EINPROGRESS ? AIO_NOTCANCELED : AIO_ALLDONE; +} diff --git a/src/aio/aio_error.c b/src/aio/aio_error.c new file mode 100644 index 00000000..169a9a30 --- /dev/null +++ b/src/aio/aio_error.c @@ -0,0 +1,6 @@ +#include + +int aio_error(struct aiocb *cb) +{ + return cb->__err; +} diff --git a/src/aio/aio_fsync.c b/src/aio/aio_fsync.c new file mode 100644 index 00000000..0ac6ea87 --- /dev/null +++ b/src/aio/aio_fsync.c @@ -0,0 +1,9 @@ +#include +#include + +int aio_fsync(int op, struct aiocb *cb) +{ + /* FIXME: unsupported */ + errno = EINVAL; + return -1; +} diff --git a/src/aio/aio_readwrite.c b/src/aio/aio_readwrite.c new file mode 100644 index 00000000..27168f25 --- /dev/null +++ b/src/aio/aio_readwrite.c @@ -0,0 +1,104 @@ +#include +#include +#include "pthread_impl.h" + +static void dummy(void) +{ +} + +weak_alias(dummy, __aio_wake); + +static void notify_signal(struct sigevent *sev) +{ + siginfo_t si = { + .si_signo = sev->sigev_signo, + .si_value = sev->sigev_value, + .si_code = SI_ASYNCIO, + .si_pid = __pthread_self()->pid, + .si_uid = getuid() + }; + __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); +} + +static void *io_thread(void *p) +{ + struct aiocb *cb = p; + int fd = cb->aio_filedes; + void *buf = (void *)cb->aio_buf; + size_t len = cb->aio_nbytes; + off_t off = cb->aio_offset; + int op = cb->aio_lio_opcode; + struct sigevent sev = cb->aio_sigevent; + ssize_t ret; + + if (op == LIO_WRITE) { + if ( (fcntl(fd, F_GETFL) & O_APPEND) + ||((ret = pwrite(fd, buf, len, off))<0 && errno==ESPIPE) ) + ret = write(fd, buf, len); + } else if (op == LIO_READ) { + if ( (ret = pread(fd, buf, len, off))<0 && errno==ESPIPE ) + ret = read(fd, buf, len); + } else { + ret = 0; + } + cb->__ret = ret; + + if (ret < 0) a_store(&cb->__err, errno); + else a_store(&cb->__err, 0); + + __aio_wake(); + + switch (cb->aio_sigevent.sigev_notify) { + case SIGEV_SIGNAL: + notify_signal(&sev); + break; + case SIGEV_THREAD: + sev.sigev_notify_function(sev.sigev_value); + break; + } + + return 0; +} + +static int new_req(struct aiocb *cb) +{ + int ret = 0; + pthread_attr_t a; + sigset_t set; + pthread_t td; + + if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) { + if (cb->aio_sigevent.sigev_notify_attributes) + a = *cb->aio_sigevent.sigev_notify_attributes; + else + pthread_attr_init(&a); + } else { + pthread_attr_init(&a); + pthread_attr_setstacksize(&a, PAGE_SIZE); + pthread_attr_setguardsize(&a, 0); + } + pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); + sigfillset(&set); + pthread_sigmask(SIG_BLOCK, &set, &set); + cb->__err = EINPROGRESS; + if (pthread_create(&td, &a, io_thread, cb)) { + errno = EAGAIN; + ret = -1; + } + pthread_sigmask(SIG_SETMASK, &set, 0); + cb->__td = td; + + return ret; +} + +ssize_t aio_read(struct aiocb *cb) +{ + cb->aio_lio_opcode = LIO_READ; + return new_req(cb); +} + +ssize_t aio_write(struct aiocb *cb) +{ + cb->aio_lio_opcode = LIO_WRITE; + return new_req(cb); +} diff --git a/src/aio/aio_return.c b/src/aio/aio_return.c new file mode 100644 index 00000000..df10bdbe --- /dev/null +++ b/src/aio/aio_return.c @@ -0,0 +1,6 @@ +#include + +ssize_t aio_return(struct aiocb *cb) +{ + return cb->__ret; +} diff --git a/src/aio/aio_suspend.c b/src/aio/aio_suspend.c new file mode 100644 index 00000000..cb2539e9 --- /dev/null +++ b/src/aio/aio_suspend.c @@ -0,0 +1,57 @@ +#include +#include +#include "pthread_impl.h" + +/* Due to the requirement that aio_suspend be async-signal-safe, we cannot + * use any locks, wait queues, etc. that would make it more efficient. The + * only obviously-correct algorithm is to generate a wakeup every time any + * aio operation finishes and have aio_suspend re-evaluate the completion + * status of each aiocb it was waiting on. */ + +static volatile int seq; + +void __aio_wake(void) +{ + a_inc(&seq); + __wake(&seq, -1, 1); +} + +int aio_suspend(struct aiocb *const cbs[], int cnt, const struct timespec *ts) +{ + int i, last, first=1, ret=0; + struct timespec at; + + if (cnt<0) { + errno = EINVAL; + return -1; + } + + for (;;) { + last = seq; + + for (i=0; i__err != EINPROGRESS) + return 0; + } + + if (first && ts) { + clock_gettime(CLOCK_MONOTONIC, &at); + at.tv_sec += ts->tv_sec; + if ((at.tv_nsec += ts->tv_nsec) >= 1000000000) { + at.tv_nsec -= 1000000000; + at.tv_sec++; + } + first = 0; + } + + ret = __timedwait(&seq, last, CLOCK_MONOTONIC, + ts ? &at : 0, 0, 0, 1); + + if (ret == ETIMEDOUT) ret = EAGAIN; + + if (ret) { + errno = ret; + return -1; + } + } +} diff --git a/src/aio/lio_listio.c b/src/aio/lio_listio.c new file mode 100644 index 00000000..8865029a --- /dev/null +++ b/src/aio/lio_listio.c @@ -0,0 +1,140 @@ +#include +#include +#include "pthread_impl.h" + +struct lio_state { + struct sigevent *sev; + int cnt; + struct aiocb *cbs[]; +}; + +static int lio_wait(struct lio_state *st) +{ + int i, err, got_err; + int cnt = st->cnt; + struct aiocb **cbs = st->cbs; + + for (;;) { + for (i=0; isigev_signo, + .si_value = sev->sigev_value, + .si_code = SI_ASYNCIO, + .si_pid = __pthread_self()->pid, + .si_uid = getuid() + }; + __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); +} + +static void *wait_thread(void *p) +{ + struct lio_state *st = p; + struct sigevent *sev = st->sev; + lio_wait(st); + free(st); + switch (sev->sigev_notify) { + case SIGEV_SIGNAL: + notify_signal(sev); + break; + case SIGEV_THREAD: + sev->sigev_notify_function(sev->sigev_value); + break; + } + return 0; +} + +int lio_listio(int mode, struct aiocb *const cbs[], int cnt, struct sigevent *sev) +{ + int i, ret; + struct lio_state *st=0; + + if (cnt < 0) { + errno = EINVAL; + return -1; + } + + if (mode == LIO_WAIT || (sev && sev->sigev_notify != SIGEV_NONE)) { + if (!(st = malloc(sizeof *st + cnt*sizeof *cbs))) { + errno = EAGAIN; + return -1; + } + st->cnt = cnt; + st->sev = sev; + memcpy(st->cbs, cbs, cnt*sizeof *cbs); + } + + for (i=0; iaio_lio_opcode) { + case LIO_READ: + ret = aio_read(cbs[i]); + break; + case LIO_WRITE: + ret = aio_write(cbs[i]); + break; + default: + continue; + } + if (ret) { + free(st); + errno = EAGAIN; + return -1; + } + } + + if (mode == LIO_WAIT) { + ret = lio_wait(st); + free(st); + return 0; + } + + if (st) { + pthread_attr_t a; + sigset_t set; + pthread_t td; + + if (sev->sigev_notify == SIGEV_THREAD) { + if (sev->sigev_notify_attributes) + a = *sev->sigev_notify_attributes; + else + pthread_attr_init(&a); + } else { + pthread_attr_init(&a); + pthread_attr_setstacksize(&a, PAGE_SIZE); + pthread_attr_setguardsize(&a, 0); + } + pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); + sigfillset(&set); + pthread_sigmask(SIG_BLOCK, &set, &set); + if (pthread_create(&td, &a, wait_thread, st)) { + free(st); + errno = EAGAIN; + return -1; + } + pthread_sigmask(SIG_SETMASK, &set, 0); + } + + return 0; +} + -- 2.25.1