redo cond vars again, use sequence numbers
authorRich Felker <dalias@aerifal.cx>
Mon, 26 Sep 2011 04:25:13 +0000 (00:25 -0400)
committerRich Felker <dalias@aerifal.cx>
Mon, 26 Sep 2011 04:25:13 +0000 (00:25 -0400)
testing revealed that the old implementation, while correct, was
giving way too many spurious wakeups due to races changing the value
of the condition futex. in a test program with 5 threads receiving
broadcast signals, the number of returns from pthread_cond_wait was
roughly 3 times what it should have been (2 spurious wakeups for every
legitimate wakeup). moreover, the magnitude of this effect seems to
grow with the number of threads.

the old implementation may also have had some nasty race conditions
with reuse of the cond var with a new mutex.

the new implementation is based on incrementing a sequence number with
each signal event. this sequence number has nothing to do with the
number of threads intended to be woken; it's only used to provide a
value for the futex wait to avoid deadlock. in theory there is a
danger of race conditions due to the value wrapping around after 2^32
signals. it would be nice to eliminate that, if there's a way.

testing showed no spurious wakeups (though they are of course
possible) with the new implementation, as well as slightly improved
performance.

src/internal/pthread_impl.h
src/thread/pthread_cond_broadcast.c
src/thread/pthread_cond_signal.c
src/thread/pthread_cond_timedwait.c

index 4c743d2e32b3adaaee4bc1b8aac79976007c3cfa..63639ec250dd39bd0799dc24f35446e25f6cde2e 100644 (file)
@@ -65,11 +65,11 @@ struct __timer {
 #define _m_next __u.__p[4]
 #define _m_count __u.__i[5]
 #define _c_mutex __u.__p[0]
-#define _c_block __u.__i[2]
+#define _c_seq __u.__i[2]
 #define _c_waiters __u.__i[3]
 #define _c_clock __u.__i[4]
-#define _c_bcast __u.__i[5]
-#define _c_leavers __u.__i[6]
+#define _c_lock __u.__i[5]
+#define _c_lockwait __u.__i[6]
 #define _rw_lock __u.__i[0]
 #define _rw_waiters __u.__i[1]
 #define _b_inst __u.__p[0]
index 7e5ea91c507401525d4616fe0154b8fb01fe39b8..bf6de048c1f83a7ef708964deb42a1cc9d9c6419 100644 (file)
@@ -1,48 +1,39 @@
 #include "pthread_impl.h"
 
-static void unlock(pthread_cond_t *c)
-{
-       a_dec(&c->_c_bcast);
-       if (c->_c_leavers) __wake(&c->_c_bcast, -1, 0);
-}
-
 int pthread_cond_broadcast(pthread_cond_t *c)
 {
        pthread_mutex_t *m;
-       int w;
 
        if (!c->_c_waiters) return 0;
-       a_inc(&c->_c_bcast);
-       if (!c->_c_waiters) {
-               unlock(c);
-               return 0;
-       }
 
-       a_store(&c->_c_block, 0);
+       a_inc(&c->_c_seq);
 
-       m = c->_c_mutex;
-
-       /* If mutex ptr is not available, simply wake all waiters. */
-       if (m == (void *)-1) {
-               unlock(c);
-               __wake(&c->_c_block, -1, 0);
+       /* If cond var is process-shared, simply wake all waiters. */
+       if (c->_c_mutex == (void *)-1) {
+               __wake(&c->_c_seq, -1, 0);
                return 0;
        }
 
+       /* Block waiters from returning so we can use the mutex. */
+       while (a_swap(&c->_c_lock, 1))
+               __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
+       if (!c->_c_waiters)
+               goto out;
+       m = c->_c_mutex;
+
        /* Move waiter count to the mutex */
-       for (;;) {
-               w = c->_c_waiters;
-               a_fetch_add(&m->_m_waiters, w);
-               if (a_cas(&c->_c_waiters, w, 0) == w) break;
-               a_fetch_add(&m->_m_waiters, -w);
-       }
+       a_fetch_add(&m->_m_waiters, c->_c_waiters);
+       a_store(&c->_c_waiters, 0);
 
        /* Perform the futex requeue, waking one waiter unless we know
         * that the calling thread holds the mutex. */
-       __syscall(SYS_futex, &c->_c_block, FUTEX_REQUEUE,
+       __syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE,
                !m->_m_type || (m->_m_lock&INT_MAX)!=pthread_self()->tid,
                INT_MAX, &m->_m_lock);
 
-       unlock(c);
+out:
+       a_store(&c->_c_lock, 0);
+       if (c->_c_lockwait) __wake(&c->_c_lock, 1, 0);
+
        return 0;
 }
index b26b8ce55e5507dc28b28e0f38595388295e3f7f..71bcdcd99332f802b64f2ef790e3bd930ba811ce 100644 (file)
@@ -2,7 +2,8 @@
 
 int pthread_cond_signal(pthread_cond_t *c)
 {
-       a_store(&c->_c_block, 0);
-       if (c->_c_waiters) __wake(&c->_c_block, 1, 0);
+       if (!c->_c_waiters) return 0;
+       a_inc(&c->_c_seq);
+       if (c->_c_waiters) __wake(&c->_c_seq, 1, 0);
        return 0;
 }
index 9616dd85005eba918abeeb95e82c7a26f8155c68..e9b5e2fcb82ffa4f76dd1efee77e5b2ca9d287db 100644 (file)
@@ -7,17 +7,22 @@ struct cm {
 
 static void unwait(pthread_cond_t *c, pthread_mutex_t *m)
 {
-       int w;
+       /* Removing a waiter is non-trivial if we could be using requeue
+        * based broadcast signals, due to mutex access issues, etc. */
 
-       /* Cannot leave waiting status if there are any live broadcasters
-        * which might be inspecting/using the mutex. */
-       while ((w=c->_c_bcast)) __wait(&c->_c_bcast, &c->_c_leavers, w, 0);
+       if (c->_c_mutex == (void *)-1) {
+               a_dec(&c->_c_waiters);
+               return;
+       }
 
-       /* If the waiter count is zero, it must be the case that the
-        * caller's count has been moved to the mutex due to bcast. */
-       do w = c->_c_waiters;
-       while (w && a_cas(&c->_c_waiters, w, w-1)!=w);
-       if (!w) a_dec(&m->_m_waiters);
+       while (a_swap(&c->_c_lock, 1))
+               __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
+
+       if (c->_c_waiters) c->_c_waiters--;
+       else a_dec(&m->_m_waiters);
+
+       a_store(&c->_c_lock, 0);
+       if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
 }
 
 static void cleanup(void *p)
@@ -30,28 +35,35 @@ static void cleanup(void *p)
 int pthread_cond_timedwait(pthread_cond_t *c, pthread_mutex_t *m, const struct timespec *ts)
 {
        struct cm cm = { .c=c, .m=m };
-       int r, e, tid;
+       int r, e=0, seq;
 
        if (ts && ts->tv_nsec >= 1000000000UL)
                return EINVAL;
 
        pthread_testcancel();
 
-       if (c->_c_mutex != (void *)-1) c->_c_mutex = m;
+       if (c->_c_mutex == (void *)-1) {
+               a_inc(&c->_c_waiters);
+       } else {
+               c->_c_mutex = m;
+               while (a_swap(&c->_c_lock, 1))
+                       __wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
+               c->_c_waiters++;
+               a_store(&c->_c_lock, 0);
+               if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
+       }
 
-       a_inc(&c->_c_waiters);
-       c->_c_block = tid = pthread_self()->tid;
+       seq = c->_c_seq;
 
        if ((r=pthread_mutex_unlock(m))) return r;
 
-       do e = __timedwait(&c->_c_block, tid, c->_c_clock, ts, cleanup, &cm, 0);
-       while (c->_c_block == tid && (!e || e==EINTR));
+       do e = __timedwait(&c->_c_seq, seq, c->_c_clock, ts, cleanup, &cm, 0);
+       while (c->_c_seq == seq && (!e || e==EINTR));
        if (e == EINTR) e = 0;
 
        unwait(c, m);
 
        if ((r=pthread_mutex_lock(m))) return r;
 
-       pthread_testcancel();
        return e;
 }