From bf3c0179736ca876b87abe997312ad2ece34221b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 7 Jun 2012 07:05:01 +0000 Subject: [PATCH] LRN: new select wrapper for W32 which avoids busy-waiting --- src/util/network.c | 339 ++++++++++++++++++++++----------------------- 1 file changed, 164 insertions(+), 175 deletions(-) diff --git a/src/util/network.c b/src/util/network.c index 972f93849..ed7e8be30 100644 --- a/src/util/network.c +++ b/src/util/network.c @@ -1087,13 +1087,47 @@ GNUNET_NETWORK_fdset_destroy (struct GNUNET_NETWORK_FDSet *fds) GNUNET_free (fds); } +#if MINGW +struct _select_params +{ + fd_set *r; + fd_set *w; + fd_set *e; + struct timeval *tv; + HANDLE wakeup; + HANDLE standby; + SOCKET wakeup_socket; + int status; +}; + +static DWORD WINAPI +_selector (LPVOID p) +{ + struct _select_params *sp = p; + int i; + while (1) + { + WaitForSingleObject (sp->standby, INFINITE); + ResetEvent (sp->standby); + sp->status = select (1, sp->r, sp->w, sp->e, sp->tv); + if (FD_ISSET (sp->wakeup_socket, sp->r)) + { + FD_CLR (sp->wakeup_socket, sp->r); + sp->status -= 1; + } + SetEvent (sp->wakeup); + } + return 0; +} +#endif + /** - * Check if sockets meet certain conditions - * @param rfds set of sockets to be checked for readability - * @param wfds set of sockets to be checked for writability - * @param efds set of sockets to be checked for exceptions + * Check if sockets or pipes meet certain conditions + * @param rfds set of sockets or pipes to be checked for readability + * @param wfds set of sockets or pipes to be checked for writability + * @param efds set of sockets or pipes to be checked for exceptions * @param timeout relative value when to return - * @return number of selected sockets, GNUNET_SYSERR on error + * @return number of selected sockets or pipes, GNUNET_SYSERR on error */ int GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, @@ -1112,22 +1146,24 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, int retcode = 0; DWORD ms_total = 0; - int nsock = 0; int nhandles = 0; - int nSockEvents = 0; - static HANDLE hEventRead = 0; - static HANDLE hEventWrite = 0; - static HANDLE hEventException = 0; static HANDLE hEventPipeWrite = 0; static HANDLE hEventReadReady = 0; + static struct _select_params sp; + static HANDLE select_thread = NULL; + static HANDLE select_finished_event = NULL; + static HANDLE select_standby_event = NULL; + static SOCKET select_wakeup_socket = -1; + static SOCKET select_send_socket = -1; + static struct timeval select_timeout; + int readPipes = 0; int writePipePos = 0; HANDLE handle_array[FD_SETSIZE + 2]; int returncode = -1; - DWORD newretcode = 0; int returnedpos = 0; struct GNUNET_CONTAINER_SList *handles_read; @@ -1226,23 +1262,53 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, return 0; } - /* Events for sockets */ - if (!hEventRead) - hEventRead = CreateEvent (NULL, TRUE, FALSE, NULL); - else - ResetEvent (hEventRead); + if (select_thread == NULL) + { + SOCKET select_listening_socket = -1; + struct sockaddr_in s_in; + int alen; + int res; + unsigned long p; + + select_standby_event = CreateEvent (NULL, TRUE, FALSE, NULL); + select_finished_event = CreateEvent (NULL, TRUE, FALSE, NULL); + + select_wakeup_socket = WSASocket (AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + + select_listening_socket = WSASocket (AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + + p = 1; + res = ioctlsocket (select_wakeup_socket, FIONBIO, &p); + + alen = sizeof (s_in); + s_in.sin_family = AF_INET; + s_in.sin_port = 0; + s_in.sin_addr.S_un.S_un_b.s_b1 = 127; + s_in.sin_addr.S_un.S_un_b.s_b2 = 0; + s_in.sin_addr.S_un.S_un_b.s_b3 = 0; + s_in.sin_addr.S_un.S_un_b.s_b4 = 1; + res = bind (select_listening_socket, (const struct sockaddr *) &s_in, sizeof (s_in)); + + res = getsockname (select_listening_socket, (struct sockaddr *) &s_in, &alen); + + res = listen (select_listening_socket, SOMAXCONN); + + res = connect (select_wakeup_socket, (const struct sockaddr *) &s_in, sizeof (s_in)); + + select_send_socket = accept (select_listening_socket, (struct sockaddr *) &s_in, &alen); + + closesocket (select_listening_socket); + + sp.wakeup = select_finished_event; + sp.standby = select_standby_event; + sp.wakeup_socket = select_wakeup_socket; + + select_thread = CreateThread (NULL, 0, _selector, &sp, 0, NULL); + } + + /* Events for pipes */ if (!hEventReadReady) hEventReadReady = CreateEvent (NULL, TRUE, TRUE, NULL); - if (!hEventWrite) - hEventWrite = CreateEvent (NULL, TRUE, FALSE, NULL); - else - ResetEvent (hEventWrite); - if (!hEventException) - hEventException = CreateEvent (NULL, TRUE, FALSE, NULL); - else - ResetEvent (hEventException); - - /* Event for pipes */ if (!hEventPipeWrite) hEventPipeWrite = CreateEvent (NULL, TRUE, TRUE, NULL); readPipes = 0; @@ -1363,103 +1429,91 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, fh, sizeof (struct GNUNET_DISK_FileHandle)); - newretcode++; } } } } + + sp.status = 0; + if (nfds > 0) { - if (rfds) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the socket read event to the array as %d\n", nhandles); - handle_array[nhandles++] = hEventRead; - nSockEvents++; - for (i = 0; i < rfds->sds.fd_count; i++) - { - WSAEventSelect (rfds->sds.fd_array[i], hEventRead, - FD_ACCEPT | FD_READ | FD_CLOSE); - nsock++; - } - } - if (wfds) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Adding the socket event to the array as %d\n", nhandles); + handle_array[nhandles++] = select_finished_event; + if (timeout.rel_value == GNUNET_TIME_UNIT_FOREVER_REL.rel_value) + sp.tv = NULL; + else { - int wakeup = 0; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the socket write event to the array as %d\n", nhandles); - handle_array[nhandles++] = hEventWrite; - nSockEvents++; - for (i = 0; i < wfds->sds.fd_count; i++) - { - DWORD error; - int status; - - status = send (wfds->sds.fd_array[i], NULL, 0, 0); - error = GetLastError (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "pre-send to the socket %d returned %d (%u)\n", i, status, error); - if (status == 0 || (error != WSAEWOULDBLOCK && error != WSAENOTCONN)) - wakeup = 1; - WSAEventSelect (wfds->sds.fd_array[i], hEventWrite, - FD_WRITE | FD_CONNECT | FD_CLOSE); - nsock++; - } - if (wakeup) - SetEvent (hEventWrite); + select_timeout.tv_sec = timeout.rel_value / GNUNET_TIME_UNIT_SECONDS.rel_value; + select_timeout.tv_usec = 1000 * (timeout.rel_value - + (select_timeout.tv_sec * GNUNET_TIME_UNIT_SECONDS.rel_value)); + sp.tv = &select_timeout; } - if (efds) + FD_SET (select_wakeup_socket, &aread); + sp.r = &aread; + sp.w = &awrite; + sp.e = &aexcept; + /* Failed connections cause sockets to be set in errorfds on W32, + * but on POSIX it should set them in writefds. + * First copy all awrite sockets to aexcept, later we'll + * check aexcept and set its contents in awrite as well + * Sockets are also set in errorfds when OOB data is available, + * but we don't use OOB data. + */ + for (i = 0; i < awrite.fd_count; i++) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding the socket error event to the array as %d\n", nhandles); - handle_array[nhandles++] = hEventException; - nSockEvents++; - for (i = 0; i < efds->sds.fd_count; i++) - { - WSAEventSelect (efds->sds.fd_array[i], hEventException, - FD_OOB | FD_CLOSE); - nsock++; - } + if (awrite.fd_array[i] != 0 && awrite.fd_array[i] != -1) + FD_SET (awrite.fd_array[i], &aexcept); } + ResetEvent (select_finished_event); + SetEvent (select_standby_event); } handle_array[nhandles] = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Number nfds: %d, handles: %d, return code: %u will wait: %d ms\n", - nfds, nhandles, newretcode, ms_total); + LOG (GNUNET_ERROR_TYPE_DEBUG, "nfds: %d, handles: %d, will wait: %d ms\n", + nfds, nhandles, ms_total); if (nhandles) returncode = WaitForMultipleObjects (nhandles, handle_array, FALSE, ms_total); LOG (GNUNET_ERROR_TYPE_DEBUG, "WaitForMultipleObjects Returned : %d\n", returncode); + if (nfds > 0) + { + /* Don't wake up select-thread when delay is 0, it should return immediately + * and wake up by itself. + */ + if (ms_total != 0) + i = send (select_send_socket, (const char *) &returnedpos, 1, 0); + i = (int) WaitForSingleObject (select_finished_event, INFINITE); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished waiting for the select thread: %d %d\n", i, sp.status); + if (ms_total != 0) + { + do + { + i = recv (select_wakeup_socket, (char *) &returnedpos, 1, 0); + } while (i == 1); + } + /* Check aexcept, add its contents to awrite */ + for (i = 0; i < aexcept.fd_count; i++) + { + if (aexcept.fd_array[i] != 0 && aexcept.fd_array[i] != -1) + FD_SET (aexcept.fd_array[i], &awrite); + } + } + returnedpos = returncode - WAIT_OBJECT_0; LOG (GNUNET_ERROR_TYPE_DEBUG, "return pos is : %d\n", returnedpos); - /* FIXME: THIS LINE IS WRONG !! We should add to handles only handles that fired the events, not all ! */ - /* - * if(rfds) - * GNUNET_CONTAINER_slist_append (handles_read, rfds->handles); - */ if (nhandles && (returnedpos < nhandles)) { DWORD waitstatus; - /* Do the select */ - if (nfds) - { - struct timeval tvslice; - - tvslice.tv_sec = 0; - tvslice.tv_usec = 0; - retcode = select (nfds, &aread, &awrite, &aexcept, &tvslice); - if (retcode == -1) - retcode = 0; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Select retcode : %d\n", retcode); - } - /* FIXME: <= writePipePos? Really? */ - if ((writePipePos != -1) && (returnedpos <= writePipePos)) + if (sp.status > 0) + retcode += sp.status; + + if ((writePipePos != -1) && (returnedpos < writePipePos)) { GNUNET_CONTAINER_slist_append (handles_write, wfds->handles); retcode += write_handles; @@ -1467,24 +1521,8 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, } LOG (GNUNET_ERROR_TYPE_DEBUG, "ReadPipes is : %d\n", readPipes); /* We have some pipes ready for read. */ - /* FIXME: it is supposed to work !! Only choose the Pipes who fired the event, but it is not working */ - if (returnedpos < readPipes) { - /* - * for (i = 0; i < readPipes; i++) - * { - * waitstatus = WaitForSingleObject (handle_array[i], 0); - * LOG (GNUNET_ERROR_TYPE_DEBUG, "Read pipe %d wait status is : %d\n", i, waitstatus); - * if (waitstatus != WAIT_OBJECT_0) - * continue; - * GNUNET_CONTAINER_slist_add (handles_read, - * GNUNET_CONTAINER_SLIST_DISPOSITION_TRANSIENT, - * readArray[i], sizeof (struct GNUNET_DISK_FileHandle)); - * retcode++; - * LOG (GNUNET_ERROR_TYPE_DEBUG, "Added read Pipe\n"); - * } - */ for (i = 0; i < readPipes; i++) { DWORD error; @@ -1514,34 +1552,6 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, readArray[i], readArray[i]->h); } } - waitstatus = WaitForSingleObject (hEventWrite, 0); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Wait for the write event returned %d\n", - waitstatus); - if (waitstatus == WAIT_OBJECT_0) - { - for (i = 0; i < wfds->sds.fd_count; i++) - { - DWORD error; - int status; - int so_error = 0; - int sizeof_so_error = sizeof (so_error); - int gso_result = - getsockopt (wfds->sds.fd_array[i], SOL_SOCKET, SO_ERROR, - (char *) &so_error, &sizeof_so_error); - - status = send (wfds->sds.fd_array[i], NULL, 0, 0); - error = GetLastError (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "send to the socket %d returned %d (%u)\n", i, status, error); - if (status == 0 || (error != WSAEWOULDBLOCK && error != WSAENOTCONN) || - (status == -1 && gso_result == 0 && error == WSAENOTCONN && - so_error == WSAECONNREFUSED)) - { - FD_SET (wfds->sds.fd_array[i], &awrite); - retcode += 1; - } - } - } } if (!nhandles || (returnedpos >= nhandles)) LOG (GNUNET_ERROR_TYPE_DEBUG, "Returning from _select() with nothing!\n"); @@ -1549,11 +1559,6 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, { struct GNUNET_CONTAINER_SList_Iterator t; - for (i = 0; i < rfds->sds.fd_count; i++) - { - WSAEventSelect (rfds->sds.fd_array[i], hEventRead, 0); - nsock++; - } for (t = GNUNET_CONTAINER_slist_begin (rfds->handles); GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; GNUNET_CONTAINER_slist_next (&t)) @@ -1567,7 +1572,7 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, CancelIo (fh->h); } } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing rfds\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing rfds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); GNUNET_NETWORK_fdset_zero (rfds); if (retcode != -1 && nhandles && (returnedpos < nhandles)) GNUNET_NETWORK_fdset_copy_native (rfds, &aread, retcode); @@ -1575,12 +1580,7 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, } if (wfds) { - for (i = 0; i < wfds->sds.fd_count; i++) - { - WSAEventSelect (wfds->sds.fd_array[i], hEventWrite, 0); - nsock++; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing wfds\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing wfds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); GNUNET_NETWORK_fdset_zero (wfds); if (retcode != -1 && nhandles && (returnedpos < nhandles)) GNUNET_NETWORK_fdset_copy_native (wfds, &awrite, retcode); @@ -1588,12 +1588,7 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, } if (efds) { - for (i = 0; i < efds->sds.fd_count; i++) - { - WSAEventSelect (efds->sds.fd_array[i], hEventException, 0); - nsock++; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing efds\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Zeroing efds%s\n", (retcode != -1 && nhandles && (returnedpos < nhandles)) ? ", copying fdset" : ""); GNUNET_NETWORK_fdset_zero (efds); if (retcode != -1 && nhandles && (returnedpos < nhandles)) GNUNET_NETWORK_fdset_copy_native (efds, &aexcept, retcode); @@ -1607,12 +1602,10 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, { struct GNUNET_CONTAINER_SList_Iterator t; - for (i = 0; i < bread.fd_count; i++) + LOG (GNUNET_ERROR_TYPE_DEBUG, "rfds:\n"); + for (i = 0; i < rfds->sds.fd_count; i++) { - if (bread.fd_array[i] != 0) - LOG (GNUNET_ERROR_TYPE_DEBUG, "FD 0x%x is %s in rfds\n", - bread.fd_array[i], - (SAFE_FD_ISSET (bread.fd_array[i], rfds)) ? "SET" : "NOT SET"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", rfds->sds.fd_array[i]); } for (t = GNUNET_CONTAINER_slist_begin (rfds->handles); GNUNET_CONTAINER_slist_end (&t) != GNUNET_YES; @@ -1622,27 +1615,23 @@ GNUNET_NETWORK_socket_select (struct GNUNET_NETWORK_FDSet *rfds, fh = (struct GNUNET_DISK_FileHandle *) GNUNET_CONTAINER_slist_get (&t, NULL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "FD 0x%x is SET in rfds\n", fh->h); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", fh->h); } } if (wfds) { - for (i = 0; i < bwrite.fd_count; i++) + LOG (GNUNET_ERROR_TYPE_DEBUG, "wfds:\n"); + for (i = 0; i < wfds->sds.fd_count; i++) { - if (bwrite.fd_array[i] != 0) - LOG (GNUNET_ERROR_TYPE_DEBUG, "FD 0x%x is %s in wfds\n", - bwrite.fd_array[i], - (SAFE_FD_ISSET (bwrite.fd_array[i], rfds)) ? "SET" : "NOT SET"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", wfds->sds.fd_array[i]); } } if (efds) { - for (i = 0; i < bexcept.fd_count; i++) + LOG (GNUNET_ERROR_TYPE_DEBUG, "efds:\n"); + for (i = 0; i < efds->sds.fd_count; i++) { - if (bexcept.fd_array[i] != 0) - LOG (GNUNET_ERROR_TYPE_DEBUG, "FD 0x%x is %s in efds\n", - bexcept.fd_array[i], - (SAFE_FD_ISSET (bexcept.fd_array[i], rfds)) ? "SET" : "NOT SET"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%d\n", efds->sds.fd_array[i]); } } LOG (GNUNET_ERROR_TYPE_DEBUG, "Returning %d or 0\n", retcode); -- 2.25.1