From ea17e83e666b0104b43d04423cef193f92199c26 Mon Sep 17 00:00:00 2001 From: LRN Date: Sun, 14 Dec 2014 14:54:54 +0000 Subject: [PATCH] Grothoff's W32 select slist elimination and prettification --- src/include/gnunet_network_lib.h | 18 +- src/util/network.c | 911 +++++++++++++------------------ 2 files changed, 409 insertions(+), 520 deletions(-) diff --git a/src/include/gnunet_network_lib.h b/src/include/gnunet_network_lib.h index cc01a8133..4042f3f82 100644 --- a/src/include/gnunet_network_lib.h +++ b/src/include/gnunet_network_lib.h @@ -59,9 +59,23 @@ struct GNUNET_NETWORK_FDSet #ifdef WINDOWS /** - * Linked list of handles + * Array of file handles (from pipes) that are also in + * the FDSet. Needed as those cannot go into @e sds + * on W32. */ - struct GNUNET_CONTAINER_SList *handles; + const struct GNUNET_DISK_FileHandle **handles; + + /** + * Size of the @e handles array + */ + unsigned int handles_size; + + /** + * Number of @e handles slots in use. Always + * smaller than @e handles_size. + */ + unsigned int handles_pos; + #endif }; diff --git a/src/util/network.c b/src/util/network.c index 47fdb91ab..411e468ab 100644 --- a/src/util/network.c +++ b/src/util/network.c @@ -1039,7 +1039,7 @@ GNUNET_NETWORK_fdset_zero (struct GNUNET_NETWORK_FDSet *fds) FD_ZERO (&fds->sds); fds->nsds = 0; #ifdef MINGW - GNUNET_CONTAINER_slist_clear (fds->handles); + fds->handles_pos = 0; #endif } @@ -1092,25 +1092,33 @@ GNUNET_NETWORK_fdset_add (struct GNUNET_NETWORK_FDSet *dst, for (nfds = src->nsds; nfds >= 0; nfds--) if (FD_ISSET (nfds, &src->sds)) - - { FD_SET (nfds, &dst->sds); - if (nfds + 1 > dst->nsds) - dst->nsds = nfds + 1; - } + dst->nsds = GNUNET_MAX (dst->nsds, + src->nsds); #else /* This is MinGW32-specific implementation that relies on the code that * winsock2.h defines for FD_SET. Namely, it relies on FD_SET checking * that fd being added is not already in the set. * Also relies on us knowing what's inside fd_set (fd_count and fd_array). + * + * NOTE: I don't understand why the UNIX-logic wouldn't work + * for the first part here as well. -CG */ - int i; - for (i = 0; i < src->sds.fd_count; i++) - FD_SET (src->sds.fd_array[i], &dst->sds); - if (src->nsds > dst->nsds) - dst->nsds = src->nsds; + unsigned int i; - GNUNET_CONTAINER_slist_append (dst->handles, src->handles); + for (i = 0; i < src->sds.fd_count; i++) + FD_SET (src->sds.fd_array[i], + &dst->sds); + dst->nsds = GNUNET_MAX (src->nsds, + dst->nsds); + + /* also copy over `struct GNUNET_DISK_FileHandle` array */ + if (dst->handles_pos + src->handles_pos > dst->handles_size) + GNUNET_array_grow (dst->handles, + dst->handles_size, + ((dst->handles_pos + src->handles_pos) << 1)); + for (i = 0; i < src->handles_pos; i++) + dst->handles[dst->handles_pos++] = src->handles[i]; #endif } @@ -1129,8 +1137,14 @@ GNUNET_NETWORK_fdset_copy (struct GNUNET_NETWORK_FDSet *to, &to->sds); to->nsds = from->nsds; #ifdef MINGW - GNUNET_CONTAINER_slist_clear (to->handles); - GNUNET_CONTAINER_slist_append (to->handles, from->handles); + if (from->handles_pos > to->handles_size) + GNUNET_array_grow (to->handles, + to->handles_size, + from->handles_pos * 2); + memcpy (to->handles, + from->handles, + from->handles_pos * sizeof (struct GNUNET_NETWORK_Handle *)); + to->handles_pos = from->handles_pos; #endif } @@ -1237,10 +1251,11 @@ GNUNET_NETWORK_fdset_handle_set (struct GNUNET_NETWORK_FDSet *fds, const struct GNUNET_DISK_FileHandle *h) { #ifdef MINGW - GNUNET_CONTAINER_slist_add (fds->handles, - GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, h, - sizeof (struct GNUNET_DISK_FileHandle)); - + if (fds->handles_pos == fds->handles_size) + GNUNET_array_grow (fds->handles, + fds->handles_size, + fds->handles_size * 2 + 2); + fds->handles[fds->handles_pos++] = h; #else int fd; @@ -1267,9 +1282,12 @@ GNUNET_NETWORK_fdset_handle_isset (const struct GNUNET_NETWORK_FDSet *fds, const struct GNUNET_DISK_FileHandle *h) { #ifdef MINGW - return GNUNET_CONTAINER_slist_contains (fds->handles, h, - sizeof (struct - GNUNET_DISK_FileHandle)); + unsigned int i; + + for (i=0;ihandles_pos;i++) + if (fds->handles[i] == h) + return GNUNET_YES; + return GNUNET_NO; #else return FD_ISSET (h->fd, &fds->sds); @@ -1277,6 +1295,28 @@ GNUNET_NETWORK_fdset_handle_isset (const struct GNUNET_NETWORK_FDSet *fds, } +#ifdef MINGW +/** + * Numerically compare pointers to sort them. + * Used to test for overlap in the arrays. + * + * @param p1 a pointer + * @param p2 a pointer + * @return -1, 0 or 1, if the p1 < p2, p1==p2 or p1 > p2. + */ +static int +ptr_cmp (const void *p1, + const void *p2) +{ + if (p1 == p2) + return 0; + if ((intptr_t) p1 < (intptr_t) p2) + return -1; + return 1; +} +#endif + + /** * Checks if two fd sets overlap * @@ -1304,49 +1344,47 @@ GNUNET_NETWORK_fdset_overlap (const struct GNUNET_NETWORK_FDSet *fds1, } return GNUNET_NO; #else - struct GNUNET_CONTAINER_SList_Iterator it; - struct GNUNET_DISK_FileHandle *h; - int i; - int j; + unsigned int i; + unsigned int j; /* This code is somewhat hacky, we are not supposed to know what's * inside of fd_set; also the O(n^2) is really bad... */ for (i = 0; i < fds1->sds.fd_count; i++) - { for (j = 0; j < fds2->sds.fd_count; j++) - { if (fds1->sds.fd_array[i] == fds2->sds.fd_array[j]) return GNUNET_YES; - } - } - it = GNUNET_CONTAINER_slist_begin (fds1->handles); - while (GNUNET_CONTAINER_slist_end (&it) != GNUNET_YES) - { -#if DEBUG_NETWORK - struct GNUNET_CONTAINER_SList_Iterator t; -#endif - h = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&it, - NULL); -#if DEBUG_NETWORK - LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking that FD 0x%x is in another set:\n", - h->h); - for (t = GNUNET_CONTAINER_slist_begin (fds2->handles); - GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&t)) - { - struct GNUNET_DISK_FileHandle *fh; - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&t, - NULL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "0x%x\n", fh->h); - } -#endif - if (GNUNET_CONTAINER_slist_contains - (fds2->handles, h, sizeof (struct GNUNET_DISK_FileHandle))) + /* take a short cut if possible */ + if ( (0 == fds1->handles_pos) || + (0 == fds2->handles_pos) ) + return GNUNET_NO; + + /* Sort file handles array to avoid quadratic complexity when + checking for overlap */ + qsort (fds1->handles, + fds1->handles_pos, + sizeof (void *), + &ptr_cmp); + qsort (fds2->handles, + fds2->handles_pos, + sizeof (void *), + &ptr_cmp); + i = 0; + j = 0; + while ( (i < fds1->handles_pos) && + (j < fds2->handles_pos) ) + { + switch (ptr_cmp (fds1->handles[i], + fds2->handles[j])) { + case -1: + i++; + break; + case 0: return GNUNET_YES; + case 1: + j++; } - GNUNET_CONTAINER_slist_next (&it); } return GNUNET_NO; #endif @@ -1364,9 +1402,6 @@ GNUNET_NETWORK_fdset_create () struct GNUNET_NETWORK_FDSet *fds; fds = GNUNET_new (struct GNUNET_NETWORK_FDSet); -#ifdef MINGW - fds->handles = GNUNET_CONTAINER_slist_create (); -#endif GNUNET_NETWORK_fdset_zero (fds); return fds; } @@ -1381,7 +1416,9 @@ void GNUNET_NETWORK_fdset_destroy (struct GNUNET_NETWORK_FDSet *fds) { #ifdef MINGW - GNUNET_CONTAINER_slist_destroy (fds->handles); + GNUNET_array_grow (fds->handles, + fds->handles_size, + 0); #endif GNUNET_free (fds); } @@ -1464,6 +1501,105 @@ _selector (LPVOID p) } return 0; } + + +static HANDLE hEventPipeWrite; + +static HANDLE hEventReadReady; + +static struct _select_params sp; + +static HANDLE select_thread; + +static HANDLE select_finished_event; + +static HANDLE select_standby_event; + +static SOCKET select_wakeup_socket = -1; + +static SOCKET select_send_socket = -1; + +static struct timeval select_timeout; + + +/** + * On W32, we actually use a thread to help with the + * event loop due to W32-API limitations. This function + * initializes that thread. + */ +static void +initialize_select_thread () +{ + SOCKET select_listening_socket = -1; + struct sockaddr_in s_in; + int alen; + int res; + unsigned long p; + + select_standby_event = CreateEvent (NULL, TRUE, FALSE, NULL); + select_finished_event = CreateEvent (NULL, TRUE, FALSE, NULL); + + select_wakeup_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + + select_listening_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + + p = 1; + res = ioctlsocket (select_wakeup_socket, FIONBIO, &p); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Select thread initialization: ioctlsocket() returns %d\n", + res); + + alen = sizeof (s_in); + s_in.sin_family = AF_INET; + s_in.sin_port = 0; + s_in.sin_addr.S_un.S_un_b.s_b1 = 127; + s_in.sin_addr.S_un.S_un_b.s_b2 = 0; + s_in.sin_addr.S_un.S_un_b.s_b3 = 0; + s_in.sin_addr.S_un.S_un_b.s_b4 = 1; + res = bind (select_listening_socket, + (const struct sockaddr *) &s_in, + sizeof (s_in)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Select thread initialization: bind() returns %d\n", + res); + + res = getsockname (select_listening_socket, + (struct sockaddr *) &s_in, + &alen); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Select thread initialization: getsockname() returns %d\n", + res); + + res = listen (select_listening_socket, + SOMAXCONN); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Select thread initialization: listen() returns %d\n", + res); + res = connect (select_wakeup_socket, + (const struct sockaddr *) &s_in, + sizeof (s_in)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Select thread initialization: connect() returns %d\n", + res); + + select_send_socket = accept (select_listening_socket, + (struct sockaddr *) &s_in, + &alen); + + closesocket (select_listening_socket); + + sp.wakeup = select_finished_event; + sp.standby = select_standby_event; + sp.wakeup_socket = select_wakeup_socket; + + select_thread = CreateThread (NULL, + 0, + _selector, + &sp, + 0, NULL); +} + + #endif @@ -1521,6 +1657,87 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, /* MINGW */ +/** + * Non-blocking test if a pipe is ready for reading. + * + * @param fh pipe handle + * @return #GNUNET_YES if the pipe is ready for reading + */ +static int +pipe_read_ready (struct GNUNET_DISK_FileHandle *fh) +{ + DWORD error; + BOOL bret; + DWORD waitstatus = 0; + + SetLastError (0); + bret = PeekNamedPipe (fh->h, NULL, 0, NULL, &waitstatus, NULL); + error = GetLastError (); + if (0 == bret) + { + /* TODO: either add more errors to this condition, or eliminate it + * entirely (failed to peek -> pipe is in serious trouble, should + * be selected as readable). + */ + if ( (error != ERROR_BROKEN_PIPE) && + (error != ERROR_INVALID_HANDLE) ) + return GNUNET_NO; + } + else if (waitstatus <= 0) + return GNUNET_NO; + return GNUNET_YES; +} + + +/** + * Non-blocking test if a pipe is having an IO exception. + * + * @param fh pipe handle + * @return #GNUNET_YES if the pipe is having an IO exception. + */ +static int +pipe_except_ready (struct GNUNET_DISK_FileHandle *fh) +{ + DWORD dwBytes; + + if (PeekNamedPipe (fh->h, NULL, 0, NULL, &dwBytes, NULL)) + return GNUNET_NO; + return GNUNET_YES; +} + + +/** + * Iterate over handles in fds, destructively rewrite the + * handles array contents of fds so that it starts with the + * handles that are ready, and update handles_pos accordingly. + * + * @param fds set of handles (usually pipes) to be checked for readiness + * @param except GNUNET_NO if fds should be checked for readiness to read, + * GNUNET_YES if fds should be checked for exceptions + * (there is no way to check for write-readiness - pipes are always write-ready) + * @return number of ready handles + */ +static int +check_handles_status (struct GNUNET_NETWORK_FDSet *fds, int except) +{ + struct GNUNET_DISK_FileHandle *fh; + unsigned int roff; + unsigned int woff; + int is_pipe; + + for (woff = 0, roff = 0; roff < fds->handles_pos; roff++) + { + fh = fds->handles[roff]; + is_pipe = fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE; + if ((except && is_pipe && pipe_except_ready (fh)) || + (!except && (!is_pipe || pipe_read_ready (fh)))) + fds->handles[woff++] = fh; + } + fds->handles_pos = woff; + return woff; +} + + /** * Check if sockets or pipes meet certain conditions, version for W32. * @@ -1536,93 +1753,44 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, struct GNUNET_NETWORK_FDSet *efds, const struct GNUNET_TIME_Relative timeout) { - int nfds = 0; - int handles = 0; - int ex_handles = 0; - int read_handles = 0; - int write_handles = 0; - - int i = 0; - int retcode = 0; - uint64_t mcs_total = 0; - DWORD ms_rounded = 0; - + struct GNUNET_DISK_FileHandle *fh; + int nfds; + int handles; + unsigned int i; + int retcode; + uint64_t mcs_total; + DWORD ms_rounded; int nhandles = 0; - - static HANDLE hEventPipeWrite = 0; - static HANDLE hEventReadReady = 0; - - static struct _select_params sp; - static HANDLE select_thread = NULL; - static HANDLE select_finished_event = NULL; - static HANDLE select_standby_event = NULL; - static SOCKET select_wakeup_socket = -1; - static SOCKET select_send_socket = -1; - static struct timeval select_timeout; - - int readPipes = 0; - int writePipePos = 0; - + int read_pipes_off; HANDLE handle_array[FD_SETSIZE + 2]; - int returncode = -1; + int returncode; int returnedpos = 0; - - struct GNUNET_CONTAINER_SList *handles_read; - struct GNUNET_CONTAINER_SList *handles_write; - struct GNUNET_CONTAINER_SList *handles_except; - - int selectret = 0; - + int selectret; fd_set aread; fd_set awrite; fd_set aexcept; -#if DEBUG_NETWORK - fd_set bread; - fd_set bwrite; - fd_set bexcept; -#endif - - /* TODO: Make this growable */ - struct GNUNET_DISK_FileHandle *readArray[50]; - struct timeval tv; - + nfds = 0; + handles = 0; if (NULL != rfds) { - nfds = rfds->nsds; - handles += read_handles = GNUNET_CONTAINER_slist_count (rfds->handles); -#if DEBUG_NETWORK - { - struct GNUNET_CONTAINER_SList_Iterator t; - - for (t = GNUNET_CONTAINER_slist_begin (rfds->handles); - GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&t)) - { - struct GNUNET_DISK_FileHandle *fh; - - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&t, - NULL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "FD 0x%x (0x%x) is SET in rfds\n", fh->h, - fh); - } - } -#endif + nfds = GNUNET_MAX (nfds, rfds->nsds); + handles += rfds->handles_pos; } if (NULL != wfds) { nfds = GNUNET_MAX (nfds, wfds->nsds); - handles += write_handles = GNUNET_CONTAINER_slist_count (wfds->handles); + handles += wfds->handles_pos; } if (NULL != efds) { nfds = GNUNET_MAX (nfds, efds->nsds); - handles += ex_handles = GNUNET_CONTAINER_slist_count (efds->handles); + handles += efds->handles_pos; } - if ((nfds == 0) && - (timeout.rel_value_us == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us) - && (handles == 0) ) + if ((0 == nfds) && + (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == timeout.rel_value_us) && + (0 == handles) ) { GNUNET_break (0); LOG (GNUNET_ERROR_TYPE_ERROR, @@ -1644,97 +1812,28 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, ms_rounded = 1; } /* select() may be used as a portable way to sleep */ - if (!(rfds || wfds || efds)) + if (! (rfds || wfds || efds)) { Sleep (ms_rounded); return 0; } if (NULL == select_thread) - { - SOCKET select_listening_socket = -1; - struct sockaddr_in s_in; - int alen; - int res; - unsigned long p; - - select_standby_event = CreateEvent (NULL, TRUE, FALSE, NULL); - select_finished_event = CreateEvent (NULL, TRUE, FALSE, NULL); - - select_wakeup_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - - select_listening_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - - p = 1; - res = ioctlsocket (select_wakeup_socket, FIONBIO, &p); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select thread initialization: ioctlsocket() returns %d\n", res); - - alen = sizeof (s_in); - s_in.sin_family = AF_INET; - s_in.sin_port = 0; - s_in.sin_addr.S_un.S_un_b.s_b1 = 127; - s_in.sin_addr.S_un.S_un_b.s_b2 = 0; - s_in.sin_addr.S_un.S_un_b.s_b3 = 0; - s_in.sin_addr.S_un.S_un_b.s_b4 = 1; - res = bind (select_listening_socket, (const struct sockaddr *) &s_in, sizeof (s_in)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select thread initialization: bind() returns %d\n", res); - - res = getsockname (select_listening_socket, (struct sockaddr *) &s_in, &alen); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select thread initialization: getsockname() returns %d\n", res); - - res = listen (select_listening_socket, SOMAXCONN); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select thread initialization: listen() returns %d\n", res); + initialize_select_thread (); - res = connect (select_wakeup_socket, (const struct sockaddr *) &s_in, sizeof (s_in)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select thread initialization: connect() returns %d\n", res); - - select_send_socket = accept (select_listening_socket, (struct sockaddr *) &s_in, &alen); - - closesocket (select_listening_socket); - - sp.wakeup = select_finished_event; - sp.standby = select_standby_event; - sp.wakeup_socket = select_wakeup_socket; - - select_thread = CreateThread (NULL, 0, _selector, &sp, 0, NULL); - } - - - handles_read = GNUNET_CONTAINER_slist_create (); - handles_write = GNUNET_CONTAINER_slist_create (); - handles_except = GNUNET_CONTAINER_slist_create (); FD_ZERO (&aread); FD_ZERO (&awrite); FD_ZERO (&aexcept); -#if DEBUG_NETWORK - FD_ZERO (&bread); - FD_ZERO (&bwrite); - FD_ZERO (&bexcept); -#endif if (rfds) - { FD_COPY (&rfds->sds, &aread); -#if DEBUG_NETWORK - FD_COPY (&rfds->sds, &bread); -#endif - } if (wfds) - { FD_COPY (&wfds->sds, &awrite); -#if DEBUG_NETWORK - FD_COPY (&wfds->sds, &bwrite); -#endif - } if (efds) - { FD_COPY (&efds->sds, &aexcept); -#if DEBUG_NETWORK - FD_COPY (&efds->sds, &bexcept); -#endif - } - /* Start by doing a fast check on sockets and pipes (without waiting). It is cheap, and is sufficient most of the time. - By profiling we detected that to be true in 90% of the cases. + /* Start by doing a fast check on sockets and pipes (without + waiting). It is cheap, and is sufficient most of the time. By + profiling we detected that to be true in 90% of the cases. */ /* Do the select now */ @@ -1743,141 +1842,81 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, /* Copy all the writes to the except, so we can detect connect() errors */ for (i = 0; i < awrite.fd_count; i++) - FD_SET (awrite.fd_array[i], &aexcept); - if (aread.fd_count > 0 || awrite.fd_count > 0 || aexcept.fd_count > 0) - selectret = select (1, (rfds != NULL) ? &aread : NULL, - (wfds != NULL) ? &awrite : NULL, &aexcept, &select_timeout); + FD_SET (awrite.fd_array[i], + &aexcept); + if ( (aread.fd_count > 0) || + (awrite.fd_count > 0) || + (aexcept.fd_count > 0) ) + selectret = select (1, + (NULL != rfds) ? &aread : NULL, + (NULL != wfds) ? &awrite : NULL, + &aexcept, + &select_timeout); else selectret = 0; - if (selectret == -1) + if (-1 == selectret) { /* Throw an error early on, while we still have the context. */ - LOG (GNUNET_ERROR_TYPE_ERROR, "W32 select(%d, %d, %d) failed: %lu\n", - rfds ? aread.fd_count : 0, wfds ? awrite.fd_count : 0, aexcept.fd_count, GetLastError ()); + LOG (GNUNET_ERROR_TYPE_ERROR, + "W32 select(%d, %d, %d) failed: %lu\n", + rfds ? aread.fd_count : 0, + wfds ? awrite.fd_count : 0, + aexcept.fd_count, + GetLastError ()); GNUNET_abort (); } - /* Check aexcept, add its contents to awrite - This is technically wrong (aexcept might have its own descriptors), we should - have checked that descriptors were in awrite originally before re-adding them from - aexcept. Luckily, GNUnet never uses aexcept for anything, so this does not become a problem (yet). */ - for (i = 0; i < aexcept.fd_count; i++) - FD_SET (aexcept.fd_array[i], &awrite); + /* Check aexcept, if something is in there and we copied that + FD before to detect connect() errors, add it back to the + write set to report errors. */ + if (NULL != wfds) + for (i = 0; i < aexcept.fd_count; i++) + if (FD_ISSET (aexcept.fd_array[i], + &wfds->sds)) + FD_SET (aexcept.fd_array[i], + &awrite); + - /* If our select returned something or is a 0-timed request, then also check the pipes and get out of here! */ + /* If our select returned something or is a 0-timed request, then + also check the pipes and get out of here! */ /* Sadly, it means code duplication :( */ - if ((selectret > 0) || (mcs_total == 0)) + if ( (selectret > 0) || (0 == mcs_total) ) { - /* Read Pipes */ - if (rfds && read_handles) - { - struct GNUNET_CONTAINER_SList_Iterator i; - int c; - - for (c = 0, i = GNUNET_CONTAINER_slist_begin (rfds->handles); - GNUNET_CONTAINER_slist_end (&i) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&i), c++) - { - struct GNUNET_DISK_FileHandle *fh; - - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&i,NULL); - if (fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE) - { - DWORD error; - BOOL bret; - - SetLastError (0); - DWORD waitstatus = 0; - bret = PeekNamedPipe (fh->h, NULL, 0, NULL, &waitstatus, NULL); - error = GetLastError (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Peek at read pipe %d (0x%x) returned %d (%d bytes available) GLE %u\n", - c, fh->h, bret, waitstatus, error); - if (bret == 0) - { - /* TODO: either add more errors to this condition, or eliminate it - * entirely (failed to peek -> pipe is in serious trouble, should - * be selected as readable). - */ - if (error != ERROR_BROKEN_PIPE && error != ERROR_INVALID_HANDLE) - continue; - } - else if (waitstatus <= 0) - continue; - GNUNET_CONTAINER_slist_add (handles_read, GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - fh, sizeof (struct GNUNET_DISK_FileHandle)); - retcode++; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Added read Pipe 0x%x (0x%x)\n", - fh, fh->h); - } - else - { - GNUNET_CONTAINER_slist_add (handles_read, GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - fh, sizeof (struct GNUNET_DISK_FileHandle)); - retcode++; - } - } - } - if (wfds && write_handles) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the write ready event to the array as %d\n", nhandles); - GNUNET_CONTAINER_slist_append (handles_write, wfds->handles); - retcode += write_handles; - } - if (efds && ex_handles) - { - struct GNUNET_CONTAINER_SList_Iterator i; + retcode = 0; - for (i = GNUNET_CONTAINER_slist_begin (efds->handles); - GNUNET_CONTAINER_slist_end (&i) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&i)) - { - struct GNUNET_DISK_FileHandle *fh; - DWORD dwBytes; + /* Read Pipes */ + if (rfds && (rfds->handles_pos > 0)) + retcode += check_handles_status (rfds, GNUNET_NO); - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&i, NULL); - if (fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE) - { - if (PeekNamedPipe (fh->h, NULL, 0, NULL, &dwBytes, NULL)) - continue; - GNUNET_CONTAINER_slist_add (handles_except, GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - fh, sizeof (struct GNUNET_DISK_FileHandle)); - retcode++; - } - } - } + /* wfds handles remain untouched, on W32 + we pretend our pipes are "always" write-ready */ - /* Add our select() result.*/ - if (selectret >= 0) - retcode += selectret; + /* except pipes */ + if (efds && (efds->handles_pos > 0)) + retcode += check_handles_status (efds, GNUNET_YES); if (rfds) { GNUNET_NETWORK_fdset_zero (rfds); if (selectret != -1) GNUNET_NETWORK_fdset_copy_native (rfds, &aread, selectret); - GNUNET_CONTAINER_slist_append (rfds->handles, handles_read); } if (wfds) { GNUNET_NETWORK_fdset_zero (wfds); if (selectret != -1) GNUNET_NETWORK_fdset_copy_native (wfds, &awrite, selectret); - GNUNET_CONTAINER_slist_append (wfds->handles, handles_write); } if (efds) { GNUNET_NETWORK_fdset_zero (efds); if (selectret != -1) GNUNET_NETWORK_fdset_copy_native (efds, &aexcept, selectret); - GNUNET_CONTAINER_slist_append (efds->handles, handles_except); } - GNUNET_CONTAINER_slist_destroy (handles_read); - GNUNET_CONTAINER_slist_destroy (handles_write); - GNUNET_CONTAINER_slist_destroy (handles_except); - - if (selectret == -1) + if (-1 == selectret) return -1; + /* Add our select() FDs to the total return value */ + retcode += selectret; return retcode; } @@ -1885,134 +1924,69 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, on both sockets and pipes simultaneously */ /* Events for pipes */ - if (!hEventReadReady) + if (! hEventReadReady) hEventReadReady = CreateEvent (NULL, TRUE, TRUE, NULL); - if (!hEventPipeWrite) + if (! hEventPipeWrite) hEventPipeWrite = CreateEvent (NULL, TRUE, TRUE, NULL); - readPipes = 0; - writePipePos = -1; - retcode = 0; FD_ZERO (&aread); FD_ZERO (&awrite); FD_ZERO (&aexcept); -#if DEBUG_NETWORK - FD_ZERO (&bread); - FD_ZERO (&bwrite); - FD_ZERO (&bexcept); -#endif if (rfds) - { FD_COPY (&rfds->sds, &aread); -#if DEBUG_NETWORK - FD_COPY (&rfds->sds, &bread); -#endif - } if (wfds) - { FD_COPY (&wfds->sds, &awrite); -#if DEBUG_NETWORK - FD_COPY (&wfds->sds, &bwrite); -#endif - } if (efds) - { FD_COPY (&efds->sds, &aexcept); -#if DEBUG_NETWORK - FD_COPY (&efds->sds, &bexcept); -#endif - } /* We will first Add the PIPES to the events */ - /* Read Pipes */ - if (rfds && read_handles) + /* Track how far in `handle_array` the read pipes go, + so we may by-pass them quickly if none of them + are selected. */ + read_pipes_off = 0; + if (rfds && (rfds->handles_pos > 0)) { - struct GNUNET_CONTAINER_SList_Iterator i; - - for (i = GNUNET_CONTAINER_slist_begin (rfds->handles); - GNUNET_CONTAINER_slist_end (&i) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&i)) + for (i = 0; i handles_pos; i++) { - struct GNUNET_DISK_FileHandle *fh; - - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&i, - NULL); - if (fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE) + fh = rfds->handles[i]; + if (fh->type != GNUNET_DISK_HANLDE_TYPE_PIPE) + continue; + /* Read zero bytes to check the status of the pipe */ + if (! ReadFile (fh->h, NULL, 0, NULL, fh->oOverlapRead)) { - /* Read zero bytes to check the status of the pipe */ - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading 0 bytes from the pipe 0x%x\n", - fh->h); - if (!ReadFile (fh->h, NULL, 0, NULL, fh->oOverlapRead)) + DWORD error_code = GetLastError (); + + if (error_code == ERROR_IO_PENDING) { - DWORD error_code = GetLastError (); - - if (error_code == ERROR_IO_PENDING) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the pipe's 0x%x overlapped event to the array as %d\n", - fh->h, nhandles); - handle_array[nhandles++] = fh->oOverlapRead->hEvent; - readArray[readPipes++] = fh; - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Read failed, adding the read ready event to the array as %d\n", nhandles); - handle_array[nhandles++] = hEventReadReady; - readArray[readPipes++] = fh; - } + /* add as unready */ + handle_array[nhandles++] = fh->oOverlapRead->hEvent; + read_pipes_off++; } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the read ready event to the array as %d\n", nhandles); + /* add as ready */ handle_array[nhandles++] = hEventReadReady; - readArray[readPipes++] = fh; + read_pipes_off++; } } else { - GNUNET_CONTAINER_slist_add (handles_read, - GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - fh, sizeof (struct GNUNET_DISK_FileHandle)); + /* error also counts as ready */ + handle_array[nhandles++] = hEventReadReady; + read_pipes_off++; } } } - if (wfds && write_handles) + + if (wfds && (wfds->handles_pos > 0)) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the write ready event to the array as %d\n", nhandles); + "Adding the write ready event to the array as %d\n", + nhandles); handle_array[nhandles++] = hEventPipeWrite; - writePipePos = nhandles; - } - if (efds && ex_handles) - { - struct GNUNET_CONTAINER_SList_Iterator i; - - for (i = GNUNET_CONTAINER_slist_begin (efds->handles); - GNUNET_CONTAINER_slist_end (&i) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&i)) - { - struct GNUNET_DISK_FileHandle *fh; - DWORD dwBytes; - - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&i, - NULL); - if (fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE) - { - if (!PeekNamedPipe (fh->h, NULL, 0, NULL, &dwBytes, NULL)) - { - GNUNET_CONTAINER_slist_add (handles_except, - GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - fh, - sizeof (struct GNUNET_DISK_FileHandle)); - } - } - } } sp.status = 0; - if (nfds > 0) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2122,147 +2096,48 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, "return pos is: %d\n", returnedpos); - if (nhandles && (returnedpos < nhandles)) - { - DWORD waitstatus; - - if (sp.status > 0) - retcode += sp.status; - - if ((writePipePos != -1) && (returnedpos < writePipePos)) - { - GNUNET_CONTAINER_slist_append (handles_write, wfds->handles); - retcode += write_handles; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Added write pipe\n"); - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "ReadPipes is: %d\n", - readPipes); - /* We have some pipes ready for read. */ - if (returnedpos < readPipes) - { - for (i = 0; i < readPipes; i++) - { - DWORD error; - BOOL bret; - - SetLastError (0); - waitstatus = 0; - bret = - PeekNamedPipe (readArray[i]->h, NULL, 0, NULL, &waitstatus, NULL); - error = GetLastError (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Peek at read pipe %d (0x%x) returned %d (%d bytes available) GLE %u\n", - i, readArray[i]->h, bret, waitstatus, error); - if (bret == 0) - { - /* TODO: either add more errors to this condition, or eliminate it - * entirely (failed to peek -> pipe is in serious trouble, should - * be selected as readable). - */ - if (error != ERROR_BROKEN_PIPE && error != ERROR_INVALID_HANDLE) - continue; - } - else if (waitstatus <= 0) - continue; - GNUNET_CONTAINER_slist_add (handles_read, - GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - readArray[i], - sizeof (struct GNUNET_DISK_FileHandle)); - retcode++; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Added read Pipe 0x%x (0x%x)\n", - readArray[i], readArray[i]->h); - } - } - } - if (! nhandles || (returnedpos >= nhandles)) - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Returning from _select() with nothing!\n"); if (rfds) { - struct GNUNET_CONTAINER_SList_Iterator t; - - for (t = GNUNET_CONTAINER_slist_begin (rfds->handles); - GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&t)) + /* We queued a zero-long read on each pipe to check + * its state, now we must cancel these read operations. + * This must be done while rfds->handles_pos is still + * intact and matches the number of read handles that we + * got from the caller. + */ + for (i = 0; i < rfds->handles_pos; i++) { - struct GNUNET_DISK_FileHandle *fh; - - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&t, - NULL); - if (fh->type == GNUNET_DISK_HANLDE_TYPE_PIPE) - { + fh = rfds->handles[i]; + if (GNUNET_DISK_HANLDE_TYPE_PIPE == fh->type) CancelIo (fh->h); - } } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing rfds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); - GNUNET_NETWORK_fdset_zero (rfds); - if (retcode != -1 && nhandles && (returnedpos < nhandles)) + + /* We may have some pipes ready for reading. */ + if (returnedpos < read_pipes_off) + retcode += check_handles_status (rfds, GNUNET_NO); + else + rfds->handles_pos = 0; + + if (-1 != sp.status) GNUNET_NETWORK_fdset_copy_native (rfds, &aread, retcode); - GNUNET_CONTAINER_slist_append (rfds->handles, handles_read); } if (wfds) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing wfds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); - GNUNET_NETWORK_fdset_zero (wfds); - if (retcode != -1 && nhandles && (returnedpos < nhandles)) + retcode += wfds->handles_pos; + /* wfds handles remain untouched */ + if (-1 != sp.status) GNUNET_NETWORK_fdset_copy_native (wfds, &awrite, retcode); - GNUNET_CONTAINER_slist_append (wfds->handles, handles_write); } if (efds) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing efds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); - GNUNET_NETWORK_fdset_zero (efds); - if (retcode != -1 && nhandles && (returnedpos < nhandles)) + retcode += check_handles_status (rfds, GNUNET_YES); + if (-1 != sp.status) GNUNET_NETWORK_fdset_copy_native (efds, &aexcept, retcode); - GNUNET_CONTAINER_slist_append (efds->handles, handles_except); } - GNUNET_CONTAINER_slist_destroy (handles_read); - GNUNET_CONTAINER_slist_destroy (handles_write); - GNUNET_CONTAINER_slist_destroy (handles_except); -#if DEBUG_NETWORK - if (rfds) - { - struct GNUNET_CONTAINER_SList_Iterator t; - LOG (GNUNET_ERROR_TYPE_DEBUG, "rfds:\n"); - for (i = 0; i < rfds->sds.fd_count; i++) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", rfds->sds.fd_array[i]); - } - for (t = GNUNET_CONTAINER_slist_begin (rfds->handles); - GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; - GNUNET_CONTAINER_slist_next (&t)) - { - struct GNUNET_DISK_FileHandle *fh; + if (sp.status > 0) + retcode += sp.status; - fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&t, - NULL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", fh->h); - } - } - if (wfds) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "wfds:\n"); - for (i = 0; i < wfds->sds.fd_count; i++) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", wfds->sds.fd_array[i]); - } - } - if (efds) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "efds:\n"); - for (i = 0; i < efds->sds.fd_count; i++) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", efds->sds.fd_array[i]); - } - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Returning %d or 0\n", retcode); -#endif - if (nhandles && (returnedpos < nhandles)) - return retcode; - return 0; + return retcode; } /* MINGW */ -- 2.25.1