2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file util/connection.c
23 * @brief TCP connection management
24 * @author Christian Grothoff
26 * This code is rather complex. Only modify it if you
27 * 1) Have a NEW testcase showing that the new code
28 * is needed and correct
29 * 2) All EXISTING testcases pass with the new code
30 * These rules should apply in general, but for this
31 * module they are VERY, VERY important.
34 * - can we merge receive_ready and receive_again?
35 * - can we integrate the nth.timeout_task with the write_task's timeout?
39 #include "gnunet_common.h"
40 #include "gnunet_connection_lib.h"
41 #include "gnunet_scheduler_lib.h"
43 #define DEBUG_CONNECTION GNUNET_NO
46 * List of address families to give as hints to
47 * getaddrinfo, in reverse order of preference.
49 static int address_families[] =
50 { AF_INET, AF_INET6, AF_UNSPEC };
52 struct GNUNET_CONNECTION_TransmitHandle
56 * Function to call if the send buffer has notify_size
59 GNUNET_CONNECTION_TransmitReadyNotify notify_ready;
62 * Closure for notify_ready.
64 void *notify_ready_cls;
69 struct GNUNET_CONNECTION_Handle *sh;
72 * Timeout for receiving (in absolute time).
74 struct GNUNET_TIME_Absolute transmit_timeout;
77 * Task called on timeout.
79 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
82 * At what number of bytes available in the
83 * write buffer should the notify method be called?
90 * @brief handle for a network socket
92 struct GNUNET_CONNECTION_Handle
96 * Scheduler that was used for the connect task.
98 struct GNUNET_SCHEDULER_Handle *sched;
101 * Address information for connect (may be NULL).
106 * Index for the next struct addrinfo for connect attempts (may be NULL)
108 struct addrinfo *ai_pos;
111 * Network address of the other end-point, may be NULL.
113 struct sockaddr *addr;
116 * Pointer to the hostname if socket was
117 * created using DNS lookup, otherwise NULL.
122 * Pointer to our write buffer.
127 * Size of our write buffer.
129 size_t write_buffer_size;
132 * Current write-offset in write buffer (where
133 * would we write next).
135 size_t write_buffer_off;
138 * Current read-offset in write buffer (how many
139 * bytes have already been send).
141 size_t write_buffer_pos;
149 * Offset in our address family list
155 * Connect task that we may need to wait for.
157 GNUNET_SCHEDULER_TaskIdentifier connect_task;
160 * Read task that we may need to wait for.
162 GNUNET_SCHEDULER_TaskIdentifier read_task;
165 * Write task that we may need to wait for.
167 GNUNET_SCHEDULER_TaskIdentifier write_task;
170 * The handle we return for GNUNET_CONNECTION_notify_transmit_ready.
172 struct GNUNET_CONNECTION_TransmitHandle nth;
175 * Underlying OS's socket, set to NULL after fatal errors.
177 struct GNUNET_NETWORK_Handle *sock;
180 * Port to connect to.
185 * Function to call on data received, NULL
186 * if no receive is pending.
188 GNUNET_CONNECTION_Receiver receiver;
191 * Closure for receiver.
196 * Timeout for receiving (in absolute time).
198 struct GNUNET_TIME_Absolute receive_timeout;
201 * Maximum number of bytes to read
210 * Create a socket handle by boxing an existing OS socket. The OS
211 * socket should henceforth be no longer used directly.
212 * GNUNET_socket_destroy will close it.
214 * @param sched scheduler to use
215 * @param osSocket existing socket to box
216 * @param maxbuf maximum write buffer size for the socket (use
217 * 0 for sockets that need no write buffers, such as listen sockets)
218 * @return the boxed socket handle
220 struct GNUNET_CONNECTION_Handle *
221 GNUNET_CONNECTION_create_from_existing (struct GNUNET_SCHEDULER_Handle
222 *sched, struct GNUNET_NETWORK_Handle *osSocket,
225 struct GNUNET_CONNECTION_Handle *ret;
226 ret = GNUNET_malloc (sizeof (struct GNUNET_CONNECTION_Handle) + maxbuf);
227 ret->write_buffer = (char *) &ret[1];
228 ret->write_buffer_size = maxbuf;
229 ret->sock = osSocket;
236 * Create a socket handle by accepting on a listen socket. This
237 * function may block if the listen socket has no connection ready.
239 * @param sched scheduler to use
240 * @param access function to use to check if access is allowed
241 * @param access_cls closure for access
242 * @param lsock listen socket
243 * @param maxbuf maximum write buffer size for the socket (use
244 * 0 for sockets that need no write buffers, such as listen sockets)
245 * @return the socket handle, NULL on error
247 struct GNUNET_CONNECTION_Handle *
248 GNUNET_CONNECTION_create_from_accept (struct GNUNET_SCHEDULER_Handle
250 GNUNET_CONNECTION_AccessCheck access,
251 void *access_cls, struct GNUNET_NETWORK_Handle *lsock,
254 struct GNUNET_CONNECTION_Handle *ret;
257 struct GNUNET_NETWORK_Handle *sock;
259 struct sockaddr_in *v4;
260 struct sockaddr_in6 *v6;
264 addrlen = sizeof (addr);
265 sock = GNUNET_NETWORK_socket_accept (lsock, (struct sockaddr *) &addr, &addrlen);
268 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "accept");
272 if (GNUNET_OK != GNUNET_NETWORK_socket_set_inheritable (sock))
273 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
276 if (addrlen > sizeof (addr))
279 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
283 sa = (struct sockaddr *) addr;
284 v6 = (struct sockaddr_in6 *) addr;
285 if ((sa->sa_family == AF_INET6) && (IN6_IS_ADDR_V4MAPPED (&v6->sin6_addr)))
287 /* convert to V4 address */
288 v4 = GNUNET_malloc (sizeof (struct sockaddr_in));
289 memset (v4, 0, sizeof (struct sockaddr_in));
290 v4->sin_family = AF_INET;
291 memcpy (&v4->sin_addr,
292 &((char *) &v6->sin6_addr)[sizeof (struct in6_addr) -
293 sizeof (struct in_addr)],
294 sizeof (struct in_addr));
295 v4->sin_port = v6->sin6_port;
297 addrlen = sizeof (struct sockaddr_in);
301 uaddr = GNUNET_malloc (addrlen);
302 memcpy (uaddr, addr, addrlen);
305 if ((access != NULL) &&
306 (GNUNET_YES != (aret = access (access_cls, uaddr, addrlen))))
308 if (aret == GNUNET_NO)
309 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
310 _("Access denied to `%s'\n"),
311 GNUNET_a2s(uaddr, addrlen));
312 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_shutdown (sock, SHUT_RDWR));
313 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
318 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
319 _("Accepting connection from `%s'\n"),
320 GNUNET_a2s(uaddr, addrlen));
322 ret = GNUNET_malloc (sizeof (struct GNUNET_CONNECTION_Handle) + maxbuf);
323 ret->write_buffer = (char *) &ret[1];
324 ret->write_buffer_size = maxbuf;
326 ret->addrlen = addrlen;
333 * Obtain the network address of the other party.
335 * @param sock the client to get the address for
336 * @param addr where to store the address
337 * @param addrlen where to store the length of the address
338 * @return GNUNET_OK on success
341 GNUNET_CONNECTION_get_address (struct GNUNET_CONNECTION_Handle *sock,
342 void **addr, size_t * addrlen)
344 if ((sock->addr == NULL) || (sock->addrlen == 0))
346 *addr = GNUNET_malloc (sock->addrlen);
347 memcpy (*addr, sock->addr, sock->addrlen);
348 *addrlen = sock->addrlen;
353 * Perform a DNS lookup for the hostname associated
354 * with the current socket, iterating over the address
355 * families as specified in the "address_families" array.
358 try_lookup (struct GNUNET_CONNECTION_Handle *sock)
360 struct addrinfo hints;
363 while ( (sock->ai_pos == NULL) &&
364 (sock->af_fam_offset > 0) )
366 if (sock->ai != NULL)
367 freeaddrinfo (sock->ai);
368 memset (&hints, 0, sizeof (hints));
369 hints.ai_family = address_families[--sock->af_fam_offset];
370 hints.ai_socktype = SOCK_STREAM;
371 if (0 != (ec = getaddrinfo (sock->hostname, NULL, &hints, &sock->ai)))
373 GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
374 "`%s' failed for hostname `%s': %s\n",
375 "getaddrinfo", sock->hostname, gai_strerror (ec));
378 sock->ai_pos = sock->ai;
384 * Initiate asynchronous TCP connect request.
386 * @param sock what socket to connect
387 * @return GNUNET_SYSERR error (no more addresses to try)
390 try_connect (struct GNUNET_CONNECTION_Handle *sock)
392 struct GNUNET_NETWORK_Handle *s;
394 if (sock->addr != NULL)
396 GNUNET_free (sock->addr);
402 if (sock->ai_pos == NULL)
404 if (sock->ai_pos == NULL)
406 /* no more addresses to try, fatal! */
407 return GNUNET_SYSERR;
409 switch (sock->ai_pos->ai_family)
412 ((struct sockaddr_in *) sock->ai_pos->ai_addr)->sin_port =
416 ((struct sockaddr_in6 *) sock->ai_pos->ai_addr)->sin6_port =
420 sock->ai_pos = sock->ai_pos->ai_next;
423 s = GNUNET_NETWORK_socket_socket (sock->ai_pos->ai_family, SOCK_STREAM, 0);
426 /* maybe unsupported address family, try next */
427 GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, "socket");
428 sock->ai_pos = sock->ai_pos->ai_next;
432 if (GNUNET_OK != GNUNET_NETWORK_socket_set_inheritable (s))
433 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
434 "GNUNET_NETWORK_socket_set_inheritable");
436 if (GNUNET_SYSERR == GNUNET_NETWORK_socket_set_blocking (s, GNUNET_NO))
438 /* we'll treat this one as fatal */
439 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
440 return GNUNET_SYSERR;
443 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
444 _("Trying to connect to `%s'\n"),
445 GNUNET_a2s(sock->ai_pos->ai_addr,
446 sock->ai_pos->ai_addrlen));
448 if ((GNUNET_OK != GNUNET_NETWORK_socket_connect (s,
449 sock->ai_pos->ai_addr,
450 sock->ai_pos->ai_addrlen)) && (errno != EINPROGRESS))
452 /* maybe refused / unsupported address, try next */
453 GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, "connect");
454 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
455 sock->ai_pos = sock->ai_pos->ai_next;
460 /* got one! copy address information! */
461 sock->addrlen = sock->ai_pos->ai_addrlen;
462 sock->addr = GNUNET_malloc (sock->addrlen);
463 memcpy (sock->addr, sock->ai_pos->ai_addr, sock->addrlen);
464 sock->ai_pos = sock->ai_pos->ai_next;
471 * Scheduler let us know that we're either ready to
472 * write on the socket OR connect timed out. Do the
476 connect_continuation (void *cls,
477 const struct GNUNET_SCHEDULER_TaskContext *tc)
479 struct GNUNET_CONNECTION_Handle *sock = cls;
483 /* nobody needs to wait for us anymore... */
484 sock->connect_task = GNUNET_SCHEDULER_NO_TASK;
485 /* Note: write-ready does NOT mean connect succeeded,
486 we need to use getsockopt to be sure */
487 len = sizeof (error);
490 if ((0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
491 (GNUNET_OK != GNUNET_NETWORK_socket_getsockopt (sock->sock, SOL_SOCKET, SO_ERROR, &error, &len)) ||
492 (error != 0) || (errno != 0))
495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496 "Failed to establish TCP connection to `%s'\n",
497 GNUNET_a2s(sock->addr, sock->addrlen));
499 /* connect failed / timed out */
500 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock->sock));
502 if (GNUNET_SYSERR == try_connect (sock))
504 /* failed for good */
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507 "Failed to establish TCP connection, no further addresses to try.\n");
509 /* connect failed / timed out */
510 GNUNET_break (sock->ai_pos == NULL);
511 freeaddrinfo (sock->ai);
515 sock->connect_task = GNUNET_SCHEDULER_add_write_net (tc->sched, GNUNET_NO, /* abort on shutdown */
516 GNUNET_SCHEDULER_PRIORITY_KEEP,
517 GNUNET_SCHEDULER_NO_TASK,
518 GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
520 &connect_continuation,
524 /* connect succeeded! clean up "ai" */
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
527 "Connection to `%s' succeeded!\n",
528 GNUNET_a2s(sock->addr, sock->addrlen));
530 freeaddrinfo (sock->ai);
537 * Create a socket handle by (asynchronously) connecting to a host.
538 * This function returns immediately, even if the connection has not
539 * yet been established. This function only creates TCP connections.
541 * @param sched scheduler to use
542 * @param hostname name of the host to connect to
543 * @param port port to connect to
544 * @param maxbuf maximum write buffer size for the socket (use
545 * 0 for sockets that need no write buffers, such as listen sockets)
546 * @return the socket handle
548 struct GNUNET_CONNECTION_Handle *
549 GNUNET_CONNECTION_create_from_connect (struct GNUNET_SCHEDULER_Handle
550 *sched, const char *hostname,
551 uint16_t port, size_t maxbuf)
553 struct GNUNET_CONNECTION_Handle *ret;
555 ret = GNUNET_malloc (sizeof (struct GNUNET_CONNECTION_Handle) + maxbuf);
558 ret->write_buffer = (char *) &ret[1];
559 ret->write_buffer_size = maxbuf;
561 ret->af_fam_offset = sizeof (address_families) / sizeof(address_families[0]);
562 ret->hostname = GNUNET_strdup (hostname);
563 if (GNUNET_SYSERR == try_connect (ret))
566 freeaddrinfo (ret->ai);
567 GNUNET_free (ret->hostname);
571 ret->connect_task = GNUNET_SCHEDULER_add_write_net (sched, GNUNET_NO, /* abort on shutdown */
572 GNUNET_SCHEDULER_PRIORITY_KEEP,
573 GNUNET_SCHEDULER_NO_TASK,
574 GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
576 &connect_continuation, ret);
583 * Create a socket handle by (asynchronously) connecting to a host.
584 * This function returns immediately, even if the connection has not
585 * yet been established. This function only creates TCP connections.
587 * @param sched scheduler to use
588 * @param af_family address family to use
589 * @param serv_addr server address
590 * @param addrlen length of server address
591 * @param maxbuf maximum write buffer size for the socket (use
592 * 0 for sockets that need no write buffers, such as listen sockets)
593 * @return the socket handle
595 struct GNUNET_CONNECTION_Handle *
596 GNUNET_CONNECTION_create_from_sockaddr (struct GNUNET_SCHEDULER_Handle
597 *sched, int af_family,
598 const struct sockaddr *serv_addr,
599 socklen_t addrlen, size_t maxbuf)
601 struct GNUNET_NETWORK_Handle *s;
602 struct GNUNET_CONNECTION_Handle *ret;
604 s = GNUNET_NETWORK_socket_socket (af_family, SOCK_STREAM, 0);
607 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING |
608 GNUNET_ERROR_TYPE_BULK, "socket");
614 if (0 != fcntl (s, F_SETFD, fcntl (s, F_GETFD) | FD_CLOEXEC))
615 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
619 if (GNUNET_SYSERR == GNUNET_NETWORK_socket_set_blocking (s, GNUNET_NO))
621 /* we'll treat this one as fatal */
622 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
626 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627 _("Trying to connect to `%s'\n"),
628 GNUNET_a2s(serv_addr, addrlen));
630 if ((GNUNET_OK != GNUNET_NETWORK_socket_connect (s, serv_addr, addrlen)) && (errno != EINPROGRESS))
632 /* maybe refused / unsupported address, try next */
633 GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, "connect");
634 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
637 ret = GNUNET_CONNECTION_create_from_existing (sched, s, maxbuf);
638 ret->addr = GNUNET_malloc (addrlen);
639 memcpy (ret->addr, serv_addr, addrlen);
640 ret->addrlen = addrlen;
646 * Check if socket is valid (no fatal errors have happened so far).
647 * Note that a socket that is still trying to connect is considered
650 * @param sock socket to check
651 * @return GNUNET_YES if valid, GNUNET_NO otherwise
654 GNUNET_CONNECTION_check (struct GNUNET_CONNECTION_Handle *sock)
656 if (sock->ai != NULL)
657 return GNUNET_YES; /* still trying to connect */
658 return (sock->sock == NULL) ? GNUNET_NO : GNUNET_YES;
663 * Scheduler let us know that the connect task is finished (or was
664 * cancelled due to shutdown). Now really clean up.
667 destroy_continuation (void *cls,
668 const struct GNUNET_SCHEDULER_TaskContext *tc)
670 struct GNUNET_CONNECTION_Handle *sock = cls;
671 GNUNET_CONNECTION_TransmitReadyNotify notify;
673 if (sock->write_task != GNUNET_SCHEDULER_NO_TASK)
675 GNUNET_SCHEDULER_add_after (sock->sched,
677 GNUNET_SCHEDULER_PRIORITY_KEEP,
679 &destroy_continuation, sock);
682 if (sock->sock != NULL)
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down socket.\n");
687 GNUNET_NETWORK_socket_shutdown (sock->sock, SHUT_RDWR);
689 if (sock->read_task != GNUNET_SCHEDULER_NO_TASK)
691 GNUNET_SCHEDULER_add_after (sock->sched,
693 GNUNET_SCHEDULER_PRIORITY_KEEP,
695 &destroy_continuation, sock);
698 if (NULL != (notify = sock->nth.notify_ready))
700 sock->nth.notify_ready = NULL;
701 notify (sock->nth.notify_ready_cls, 0, NULL);
702 if (sock->nth.timeout_task != GNUNET_SCHEDULER_NO_TASK)
704 GNUNET_SCHEDULER_cancel (sock->sched, sock->nth.timeout_task);
705 sock->nth.timeout_task = GNUNET_SCHEDULER_NO_TASK;
708 if (sock->sock != NULL)
709 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock->sock));
710 GNUNET_free_non_null (sock->addr);
711 if (sock->ai != NULL)
712 freeaddrinfo (sock->ai);
713 GNUNET_free_non_null (sock->hostname);
719 * Close the socket and free associated resources. Pending
720 * transmissions are simply dropped. A pending receive call will be
721 * called with an error code of "EPIPE".
723 * @param sock socket to destroy
726 GNUNET_CONNECTION_destroy (struct GNUNET_CONNECTION_Handle *sock)
728 if (sock->write_buffer_off == 0)
729 sock->ai_pos = NULL; /* if we're still trying to connect and have
730 no message pending, stop trying! */
731 GNUNET_assert (sock->sched != NULL);
732 GNUNET_SCHEDULER_add_after (sock->sched,
734 GNUNET_SCHEDULER_PRIORITY_KEEP,
736 &destroy_continuation, sock);
740 * Tell the receiver callback that a timeout was reached.
743 signal_timeout (struct GNUNET_CONNECTION_Handle *sh)
745 GNUNET_CONNECTION_Receiver receiver;
748 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
749 "Network signals time out to receiver!\n");
751 GNUNET_assert (NULL != (receiver = sh->receiver));
753 receiver (sh->receiver_cls, NULL, 0, NULL, 0, 0);
758 * Tell the receiver callback that we had an IO error.
761 signal_error (struct GNUNET_CONNECTION_Handle *sh, int errcode)
763 GNUNET_CONNECTION_Receiver receiver;
764 GNUNET_assert (NULL != (receiver = sh->receiver));
766 receiver (sh->receiver_cls, NULL, 0, sh->addr, sh->addrlen, errcode);
771 * This function is called once we either timeout
772 * or have data ready to read.
775 receive_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
777 struct GNUNET_CONNECTION_Handle *sh = cls;
778 struct GNUNET_TIME_Absolute now;
779 char buffer[sh->max];
781 GNUNET_CONNECTION_Receiver receiver;
783 sh->read_task = GNUNET_SCHEDULER_NO_TASK;
784 now = GNUNET_TIME_absolute_get ();
785 if ((now.value > sh->receive_timeout.value) ||
786 (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT)) ||
787 (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)))
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "Receive encounters error: time out...\n");
796 if (sh->sock == NULL)
798 /* connect failed for good */
800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 "Receive encounters error, socket closed...\n");
803 signal_error (sh, ECONNREFUSED);
806 GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->read_ready, sh->sock));
808 ret = GNUNET_NETWORK_socket_recv (sh->sock, buffer, sh->max,
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 "Error receiving: %s\n", STRERROR (errno));
824 signal_error (sh, errno);
828 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829 "receive_ready read %u/%u bytes from `%s'!\n",
832 GNUNET_a2s(sh->addr, sh->addrlen));
834 GNUNET_assert (NULL != (receiver = sh->receiver));
836 receiver (sh->receiver_cls, buffer, ret, sh->addr, sh->addrlen, 0);
841 * This function is called after establishing a connection either has
842 * succeeded or timed out. Note that it is possible that the attempt
843 * timed out and that we're immediately retrying. If we are retrying,
844 * we need to wait again (or timeout); if we succeeded, we need to
845 * wait for data (or timeout).
848 receive_again (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
850 struct GNUNET_CONNECTION_Handle *sh = cls;
851 struct GNUNET_TIME_Absolute now;
853 sh->read_task = GNUNET_SCHEDULER_NO_TASK;
854 if ((sh->sock == NULL) &&
855 (sh->connect_task == GNUNET_SCHEDULER_NO_TASK))
857 /* not connected and no longer trying */
859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860 "Receive encounters error, socket closed...\n");
862 signal_error (sh, ECONNREFUSED);
865 now = GNUNET_TIME_absolute_get ();
866 if ((now.value > sh->receive_timeout.value) ||
867 (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)))
870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871 "Receive encounters error: time out...\n");
876 if (sh->connect_task != GNUNET_SCHEDULER_NO_TASK)
878 /* connect was retried */
879 sh->read_task = GNUNET_SCHEDULER_add_after (tc->sched,
881 GNUNET_SCHEDULER_PRIORITY_KEEP,
886 /* connect succeeded, wait for data! */
887 sh->read_task = GNUNET_SCHEDULER_add_read_net (tc->sched,
889 GNUNET_SCHEDULER_PRIORITY_KEEP,
891 GNUNET_TIME_absolute_get_remaining
892 (sh->receive_timeout),
893 sh->sock, &receive_ready,
899 * Receive data from the given socket. Note that this function will
900 * call "receiver" asynchronously using the scheduler. It will
901 * "immediately" return. Note that there MUST only be one active
902 * receive call per socket at any given point in time (so do not
903 * call receive again until the receiver callback has been invoked).
905 * @param sched scheduler to use
906 * @param sock socket handle
907 * @param max maximum number of bytes to read
908 * @param timeout maximum amount of time to wait (use -1 for "forever")
909 * @param receiver function to call with received data
910 * @param receiver_cls closure for receiver
911 * @return scheduler task ID used for receiving, GNUNET_SCHEDULER_NO_TASK on error
913 GNUNET_SCHEDULER_TaskIdentifier
914 GNUNET_CONNECTION_receive (struct GNUNET_CONNECTION_Handle *sock,
916 struct GNUNET_TIME_Relative timeout,
917 GNUNET_CONNECTION_Receiver receiver, void *receiver_cls)
919 struct GNUNET_SCHEDULER_TaskContext tc;
921 GNUNET_assert ((sock->read_task == GNUNET_SCHEDULER_NO_TASK) &&
922 (sock->receiver == NULL));
923 sock->receiver = receiver;
924 sock->receiver_cls = receiver_cls;
925 sock->receive_timeout = GNUNET_TIME_relative_to_absolute (timeout);
927 memset (&tc, 0, sizeof (tc));
928 tc.sched = sock->sched;
929 tc.reason = GNUNET_SCHEDULER_REASON_PREREQ_DONE;
930 receive_again (sock, &tc);
931 return sock->read_task;
936 * Cancel receive job on the given socket. Note that the
937 * receiver callback must not have been called yet in order
938 * for the cancellation to be valid.
940 * @param sock socket handle
941 * @param task task identifier returned from the receive call
942 * @return closure of the original receiver callback
945 GNUNET_CONNECTION_receive_cancel (struct GNUNET_CONNECTION_Handle *sock,
946 GNUNET_SCHEDULER_TaskIdentifier task)
948 GNUNET_assert (sock->read_task == task);
949 GNUNET_assert (sock == GNUNET_SCHEDULER_cancel (sock->sched, task));
950 sock->read_task = GNUNET_SCHEDULER_NO_TASK;
951 sock->receiver = NULL;
952 return sock->receiver_cls;
957 * Try to call the transmit notify method (check if we do
958 * have enough space available first)!
960 * @param sock socket for which we should do this processing
961 * @return GNUNET_YES if we were able to call notify
964 process_notify (struct GNUNET_CONNECTION_Handle *sock)
969 GNUNET_CONNECTION_TransmitReadyNotify notify;
971 GNUNET_assert (sock->write_task == GNUNET_SCHEDULER_NO_TASK);
972 if (NULL == (notify = sock->nth.notify_ready))
974 used = sock->write_buffer_off - sock->write_buffer_pos;
975 avail = sock->write_buffer_size - used;
976 size = sock->nth.notify_size;
977 if (sock->nth.notify_size > avail)
979 sock->nth.notify_ready = NULL;
980 if (sock->nth.timeout_task != GNUNET_SCHEDULER_NO_TASK)
982 GNUNET_SCHEDULER_cancel (sock->sched, sock->nth.timeout_task);
983 sock->nth.timeout_task = GNUNET_SCHEDULER_NO_TASK;
985 if (sock->write_buffer_size - sock->write_buffer_off < size)
987 /* need to compact */
988 memmove (sock->write_buffer,
989 &sock->write_buffer[sock->write_buffer_pos], used);
990 sock->write_buffer_off -= sock->write_buffer_pos;
991 sock->write_buffer_pos = 0;
993 GNUNET_assert (sock->write_buffer_size - sock->write_buffer_off >= size);
994 size = notify (sock->nth.notify_ready_cls,
995 sock->write_buffer_size - sock->write_buffer_off,
996 &sock->write_buffer[sock->write_buffer_off]);
997 sock->write_buffer_off += size;
1003 * Task invoked by the scheduler when a call to transmit
1004 * is timing out (we never got enough buffer space to call
1005 * the callback function before the specified timeout
1008 * This task notifies the client about the timeout.
1011 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1013 struct GNUNET_CONNECTION_Handle *sock = cls;
1014 GNUNET_CONNECTION_TransmitReadyNotify notify;
1016 #if DEBUG_CONNECTION
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmit fails, time out reached.\n");
1019 notify = sock->nth.notify_ready;
1020 sock->nth.notify_ready = NULL;
1021 notify (sock->nth.notify_ready_cls, 0, NULL);
1026 transmit_error (struct GNUNET_CONNECTION_Handle *sock)
1028 if (sock->nth.notify_ready == NULL)
1029 return; /* nobody to tell about it */
1030 if (sock->nth.timeout_task != GNUNET_SCHEDULER_NO_TASK)
1032 GNUNET_SCHEDULER_cancel (sock->sched, sock->nth.timeout_task);
1033 sock->nth.timeout_task = GNUNET_SCHEDULER_NO_TASK;
1035 transmit_timeout (sock, NULL);
1040 * See if we are now connected. If not, wait longer for
1041 * connect to succeed. If connected, we should be able
1042 * to write now as well, unless we timed out.
1045 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1047 struct GNUNET_CONNECTION_Handle *sock = cls;
1051 GNUNET_assert (sock->write_task != GNUNET_SCHEDULER_NO_TASK);
1052 sock->write_task = GNUNET_SCHEDULER_NO_TASK;
1053 if (sock->connect_task != GNUNET_SCHEDULER_NO_TASK)
1055 /* still waiting for connect */
1056 GNUNET_assert (sock->write_task ==
1057 GNUNET_SCHEDULER_NO_TASK);
1059 GNUNET_SCHEDULER_add_delayed (tc->sched, GNUNET_NO,
1060 GNUNET_SCHEDULER_PRIORITY_KEEP,
1062 GNUNET_TIME_UNIT_ZERO, &transmit_ready,
1066 if ( (sock->sock == NULL) ||
1067 ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT)) &&
1068 (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE)) &&
1069 (!GNUNET_NETWORK_fdset_isset (tc->write_ready, sock->sock))) )
1071 #if DEBUG_CONNECTION
1072 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1073 _("Could not satisfy pending transmission request, socket closed or connect failed.\n"));
1075 if (NULL != sock->sock)
1077 GNUNET_NETWORK_socket_shutdown (sock->sock, SHUT_RDWR);
1078 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock->sock));
1081 transmit_error (sock);
1082 return; /* connect failed for good, we're finished */
1084 if ((tc->write_ready == NULL) || (!GNUNET_NETWORK_fdset_isset (tc->write_ready, sock->sock)))
1086 /* special circumstances (in particular,
1087 PREREQ_DONE after connect): not yet ready to write,
1088 but no "fatal" error either. Hence retry. */
1089 goto SCHEDULE_WRITE;
1091 GNUNET_assert (sock->write_buffer_off >= sock->write_buffer_pos);
1092 process_notify (sock);
1093 have = sock->write_buffer_off - sock->write_buffer_pos;
1096 /* no data ready for writing, terminate write loop */
1100 ret = GNUNET_NETWORK_socket_send (sock->sock,
1101 &sock->write_buffer[sock->write_buffer_pos],
1105 MSG_DONTWAIT | MSG_NOSIGNAL
1114 #if DEBUG_CONNECTION
1115 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
1117 GNUNET_NETWORK_socket_shutdown (sock->sock, SHUT_RDWR);
1118 GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock->sock));
1120 transmit_error (sock);
1123 #if DEBUG_CONNECTION
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125 "transmit_ready transmitted %u/%u bytes to `%s'\n",
1128 GNUNET_a2s(sock->addr, sock->addrlen));
1130 sock->write_buffer_pos += ret;
1131 if (sock->write_buffer_pos == sock->write_buffer_off)
1133 /* transmitted all pending data */
1134 sock->write_buffer_pos = 0;
1135 sock->write_buffer_off = 0;
1137 if ((sock->write_buffer_off == 0) && (NULL == sock->nth.notify_ready))
1138 return; /* all data sent! */
1139 /* not done writing, schedule more */
1141 if (sock->write_task == GNUNET_SCHEDULER_NO_TASK)
1143 GNUNET_SCHEDULER_add_write_net (tc->sched,
1145 GNUNET_SCHEDULER_PRIORITY_KEEP,
1146 GNUNET_SCHEDULER_NO_TASK,
1147 GNUNET_TIME_absolute_get_remaining (sock->nth.transmit_timeout),
1148 sock->sock, &transmit_ready, sock);
1153 * Ask the socket to call us once the specified number of bytes
1154 * are free in the transmission buffer. May call the notify
1155 * method immediately if enough space is available.
1157 * @param sock socket
1158 * @param size number of bytes to send
1159 * @param timeout after how long should we give up (and call
1160 * notify with buf NULL and size 0)?
1161 * @param notify function to call
1162 * @param notify_cls closure for notify
1163 * @return non-NULL if the notify callback was queued,
1164 * NULL if we are already going to notify someone else (busy)
1166 struct GNUNET_CONNECTION_TransmitHandle *
1167 GNUNET_CONNECTION_notify_transmit_ready (struct GNUNET_CONNECTION_Handle
1169 struct GNUNET_TIME_Relative timeout,
1170 GNUNET_CONNECTION_TransmitReadyNotify
1171 notify, void *notify_cls)
1173 if (sock->nth.notify_ready != NULL)
1175 GNUNET_assert (notify != NULL);
1176 GNUNET_assert (sock->write_buffer_size >= size);
1178 if ((sock->sock == NULL) &&
1179 (sock->connect_task == GNUNET_SCHEDULER_NO_TASK))
1181 #if DEBUG_CONNECTION
1182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183 "Transmission request of size %u fails, connection failed.\n",
1186 notify (notify_cls, 0, NULL);
1189 GNUNET_assert (sock->write_buffer_off <= sock->write_buffer_size);
1190 GNUNET_assert (sock->write_buffer_pos <= sock->write_buffer_size);
1191 GNUNET_assert (sock->write_buffer_pos <= sock->write_buffer_off);
1192 sock->nth.notify_ready = notify;
1193 sock->nth.notify_ready_cls = notify_cls;
1194 sock->nth.sh = sock;
1195 sock->nth.notify_size = size;
1196 sock->nth.transmit_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1197 sock->nth.timeout_task = GNUNET_SCHEDULER_add_delayed (sock->sched,
1199 GNUNET_SCHEDULER_PRIORITY_KEEP,
1200 GNUNET_SCHEDULER_NO_TASK,
1204 if (sock->write_task == GNUNET_SCHEDULER_NO_TASK)
1206 if (sock->connect_task == GNUNET_SCHEDULER_NO_TASK)
1207 sock->write_task = GNUNET_SCHEDULER_add_write_net (sock->sched,
1209 GNUNET_SCHEDULER_PRIORITY_KEEP,
1210 GNUNET_SCHEDULER_NO_TASK,
1211 GNUNET_TIME_absolute_get_remaining (sock->nth.transmit_timeout),
1213 &transmit_ready, sock);
1215 sock->write_task = GNUNET_SCHEDULER_add_delayed (sock->sched,
1217 GNUNET_SCHEDULER_PRIORITY_KEEP,
1219 GNUNET_TIME_UNIT_ZERO,
1220 &transmit_ready, sock);
1227 * Cancel the specified transmission-ready
1231 GNUNET_CONNECTION_notify_transmit_ready_cancel (struct
1232 GNUNET_CONNECTION_TransmitHandle *h)
1234 GNUNET_assert (h->notify_ready != NULL);
1235 GNUNET_SCHEDULER_cancel (h->sh->sched, h->timeout_task);
1236 h->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1237 h->notify_ready = NULL;
1241 #if 0 /* keep Emacsens' auto-indent happy */
1248 /* end of connection.c */