-signal connection failure to receive even if receive is triggered after failure...
[oweals/gnunet.git] / src / util / connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file util/connection.c
23  * @brief  TCP connection management
24  * @author Christian Grothoff
25  *
26  * This code is rather complex.  Only modify it if you
27  * 1) Have a NEW testcase showing that the new code
28  *    is needed and correct
29  * 2) All EXISTING testcases pass with the new code
30  * These rules should apply in general, but for this
31  * module they are VERY, VERY important.
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_resolver_service.h"
36
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
39
40 #define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util", syscall)
41
42
43 /**
44  * Transmission handle.  There can only be one for each connection.
45  */
46 struct GNUNET_CONNECTION_TransmitHandle
47 {
48
49   /**
50    * Function to call if the send buffer has notify_size
51    * bytes available.
52    */
53   GNUNET_CONNECTION_TransmitReadyNotify notify_ready;
54
55   /**
56    * Closure for notify_ready.
57    */
58   void *notify_ready_cls;
59
60   /**
61    * Our connection handle.
62    */
63   struct GNUNET_CONNECTION_Handle *connection;
64
65   /**
66    * Timeout for receiving (in absolute time).
67    */
68   struct GNUNET_TIME_Absolute transmit_timeout;
69
70   /**
71    * Task called on timeout.
72    */
73   struct GNUNET_SCHEDULER_Task * timeout_task;
74
75   /**
76    * At what number of bytes available in the
77    * write buffer should the notify method be called?
78    */
79   size_t notify_size;
80
81 };
82
83
84 /**
85  * During connect, we try multiple possible IP addresses
86  * to find out which one might work.
87  */
88 struct AddressProbe
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct AddressProbe *next;
95
96   /**
97    * This is a doubly-linked list.
98    */
99   struct AddressProbe *prev;
100
101   /**
102    * The address; do not free (allocated at the end of this struct).
103    */
104   const struct sockaddr *addr;
105
106   /**
107    * Underlying OS's socket.
108    */
109   struct GNUNET_NETWORK_Handle *sock;
110
111   /**
112    * Connection for which we are probing.
113    */
114   struct GNUNET_CONNECTION_Handle *connection;
115
116   /**
117    * Lenth of addr.
118    */
119   socklen_t addrlen;
120
121   /**
122    * Task waiting for the connection to finish connecting.
123    */
124   struct GNUNET_SCHEDULER_Task * task;
125 };
126
127
128 /**
129  * @brief handle for a network connection
130  */
131 struct GNUNET_CONNECTION_Handle
132 {
133
134   /**
135    * Configuration to use.
136    */
137   const struct GNUNET_CONFIGURATION_Handle *cfg;
138
139   /**
140    * Linked list of sockets we are currently trying out
141    * (during connect).
142    */
143   struct AddressProbe *ap_head;
144
145   /**
146    * Linked list of sockets we are currently trying out
147    * (during connect).
148    */
149   struct AddressProbe *ap_tail;
150
151   /**
152    * Network address of the other end-point, may be NULL.
153    */
154   struct sockaddr *addr;
155
156   /**
157    * Pointer to the hostname if connection was
158    * created using DNS lookup, otherwise NULL.
159    */
160   char *hostname;
161
162   /**
163    * Underlying OS's socket, set to NULL after fatal errors.
164    */
165   struct GNUNET_NETWORK_Handle *sock;
166
167   /**
168    * Function to call on data received, NULL if no receive is pending.
169    */
170   GNUNET_CONNECTION_Receiver receiver;
171
172   /**
173    * Closure for receiver.
174    */
175   void *receiver_cls;
176
177   /**
178    * Pointer to our write buffer.
179    */
180   char *write_buffer;
181
182   /**
183    * Current size of our write buffer.
184    */
185   size_t write_buffer_size;
186
187   /**
188    * Current write-offset in write buffer (where
189    * would we write next).
190    */
191   size_t write_buffer_off;
192
193   /**
194    * Current read-offset in write buffer (how many
195    * bytes have already been sent).
196    */
197   size_t write_buffer_pos;
198
199   /**
200    * Length of addr.
201    */
202   socklen_t addrlen;
203
204   /**
205    * Read task that we may need to wait for.
206    */
207   struct GNUNET_SCHEDULER_Task *read_task;
208
209   /**
210    * Write task that we may need to wait for.
211    */
212   struct GNUNET_SCHEDULER_Task *write_task;
213
214   /**
215    * Handle to a pending DNS lookup request.
216    */
217   struct GNUNET_RESOLVER_RequestHandle *dns_active;
218
219   /**
220    * The handle we return for GNUNET_CONNECTION_notify_transmit_ready.
221    */
222   struct GNUNET_CONNECTION_TransmitHandle nth;
223
224   /**
225    * Timeout for receiving (in absolute time).
226    */
227   struct GNUNET_TIME_Absolute receive_timeout;
228
229   /**
230    * Maximum number of bytes to read (for receiving).
231    */
232   size_t max;
233
234   /**
235    * Port to connect to.
236    */
237   uint16_t port;
238
239   /**
240    * When shutdown, do not ever actually close the socket, but
241    * free resources.  Only should ever be set if using program
242    * termination as a signal (because only then will the leaked
243    * socket be freed!)
244    */
245   int8_t persist;
246
247   /**
248    * Usually 0.  Set to 1 if this handle is in used and should
249    * 'GNUNET_CONNECTION_destroy' be called right now, the action needs
250    * to be deferred by setting it to -1.
251    */
252   int8_t destroy_later;
253
254 };
255
256
257 /**
258  * Set the persist option on this connection handle.  Indicates
259  * that the underlying socket or fd should never really be closed.
260  * Used for indicating process death.
261  *
262  * @param connection the connection to set persistent
263  */
264 void
265 GNUNET_CONNECTION_persist_ (struct GNUNET_CONNECTION_Handle *connection)
266 {
267   connection->persist = GNUNET_YES;
268 }
269
270
271 /**
272  * Disable the "CORK" feature for communication with the given connection,
273  * forcing the OS to immediately flush the buffer on transmission
274  * instead of potentially buffering multiple messages.  Essentially
275  * reduces the OS send buffers to zero.
276  * Used to make sure that the last messages sent through the connection
277  * reach the other side before the process is terminated.
278  *
279  * @param connection the connection to make flushing and blocking
280  * @return GNUNET_OK on success
281  */
282 int
283 GNUNET_CONNECTION_disable_corking (struct GNUNET_CONNECTION_Handle *connection)
284 {
285   return GNUNET_NETWORK_socket_disable_corking (connection->sock);
286 }
287
288
289 /**
290  * Create a connection handle by boxing an existing OS socket.  The OS
291  * socket should henceforth be no longer used directly.
292  * GNUNET_connection_destroy will close it.
293  *
294  * @param osSocket existing socket to box
295  * @return the boxed connection handle
296  */
297 struct GNUNET_CONNECTION_Handle *
298 GNUNET_CONNECTION_create_from_existing (struct GNUNET_NETWORK_Handle *osSocket)
299 {
300   struct GNUNET_CONNECTION_Handle *connection;
301
302   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
303   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
304   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
305   connection->sock = osSocket;
306   return connection;
307 }
308
309
310 /**
311  * Create a connection handle by accepting on a listen socket.  This
312  * function may block if the listen socket has no connection ready.
313  *
314  * @param access_cb function to use to check if access is allowed
315  * @param access_cb_cls closure for @a access_cb
316  * @param lsock listen socket
317  * @return the connection handle, NULL on error
318  */
319 struct GNUNET_CONNECTION_Handle *
320 GNUNET_CONNECTION_create_from_accept (GNUNET_CONNECTION_AccessCheck access_cb,
321                                       void *access_cb_cls,
322                                       struct GNUNET_NETWORK_Handle *lsock)
323 {
324   struct GNUNET_CONNECTION_Handle *connection;
325   char addr[128];
326   socklen_t addrlen;
327   struct GNUNET_NETWORK_Handle *sock;
328   int aret;
329   struct sockaddr_in *v4;
330   struct sockaddr_in6 *v6;
331   struct sockaddr *sa;
332   void *uaddr;
333   struct GNUNET_CONNECTION_Credentials *gcp;
334   struct GNUNET_CONNECTION_Credentials gc;
335 #ifdef SO_PEERCRED
336   struct ucred uc;
337   socklen_t olen;
338 #endif
339
340   addrlen = sizeof (addr);
341   sock =
342       GNUNET_NETWORK_socket_accept (lsock, (struct sockaddr *) &addr, &addrlen);
343   if (NULL == sock)
344   {
345     LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, "accept");
346     return NULL;
347   }
348   if ((addrlen > sizeof (addr)) || (addrlen < sizeof (sa_family_t)))
349   {
350     GNUNET_break (0);
351     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
352     return NULL;
353   }
354
355   sa = (struct sockaddr *) addr;
356   v6 = (struct sockaddr_in6 *) addr;
357   if ((AF_INET6 == sa->sa_family) && (IN6_IS_ADDR_V4MAPPED (&v6->sin6_addr)))
358   {
359     /* convert to V4 address */
360     v4 = GNUNET_new (struct sockaddr_in);
361     memset (v4, 0, sizeof (struct sockaddr_in));
362     v4->sin_family = AF_INET;
363 #if HAVE_SOCKADDR_IN_SIN_LEN
364     v4->sin_len = (u_char) sizeof (struct sockaddr_in);
365 #endif
366     memcpy (&v4->sin_addr,
367             &((char *) &v6->sin6_addr)[sizeof (struct in6_addr) -
368                                        sizeof (struct in_addr)],
369             sizeof (struct in_addr));
370     v4->sin_port = v6->sin6_port;
371     uaddr = v4;
372     addrlen = sizeof (struct sockaddr_in);
373   }
374   else
375   {
376     uaddr = GNUNET_malloc (addrlen);
377     memcpy (uaddr, addr, addrlen);
378   }
379   gcp = NULL;
380   gc.uid = 0;
381   gc.gid = 0;
382   if (AF_UNIX == sa->sa_family)
383   {
384 #if HAVE_GETPEEREID
385     /* most BSDs */
386     if (0 == getpeereid (GNUNET_NETWORK_get_fd (sock), &gc.uid, &gc.gid))
387       gcp = &gc;
388 #else
389 #ifdef SO_PEERCRED
390     /* largely traditional GNU/Linux */
391     olen = sizeof (uc);
392     if ((0 ==
393          getsockopt (GNUNET_NETWORK_get_fd (sock), SOL_SOCKET, SO_PEERCRED, &uc,
394                      &olen)) && (olen == sizeof (uc)))
395     {
396       gc.uid = uc.uid;
397       gc.gid = uc.gid;
398       gcp = &gc;
399     }
400 #else
401 #if HAVE_GETPEERUCRED
402     /* this is for Solaris 10 */
403     ucred_t *uc;
404
405     uc = NULL;
406     if (0 == getpeerucred (GNUNET_NETWORK_get_fd (sock), &uc))
407     {
408       gc.uid = ucred_geteuid (uc);
409       gc.gid = ucred_getegid (uc);
410       gcp = &gc;
411     }
412     ucred_free (uc);
413 #endif
414 #endif
415 #endif
416   }
417
418   if ((NULL != access_cb) &&
419       (GNUNET_YES != (aret = access_cb (access_cb_cls, gcp, uaddr, addrlen))))
420   {
421     if (GNUNET_NO == aret)
422       LOG (GNUNET_ERROR_TYPE_INFO,
423            _("Access denied to `%s'\n"),
424            GNUNET_a2s (uaddr, addrlen));
425     GNUNET_break (GNUNET_OK ==
426                   GNUNET_NETWORK_socket_shutdown (sock, SHUT_RDWR));
427     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
428     GNUNET_free (uaddr);
429     return NULL;
430   }
431   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
432   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
433   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
434   connection->addr = uaddr;
435   connection->addrlen = addrlen;
436   connection->sock = sock;
437   LOG (GNUNET_ERROR_TYPE_INFO,
438        _("Accepting connection from `%s': %p\n"),
439        GNUNET_a2s (uaddr, addrlen), connection);
440   return connection;
441 }
442
443
444 /**
445  * Obtain the network address of the other party.
446  *
447  * @param connection the client to get the address for
448  * @param addr where to store the address
449  * @param addrlen where to store the length of the address
450  * @return GNUNET_OK on success
451  */
452 int
453 GNUNET_CONNECTION_get_address (struct GNUNET_CONNECTION_Handle *connection,
454                                void **addr, size_t * addrlen)
455 {
456   if ((NULL == connection->addr) || (0 == connection->addrlen))
457     return GNUNET_NO;
458   *addr = GNUNET_malloc (connection->addrlen);
459   memcpy (*addr, connection->addr, connection->addrlen);
460   *addrlen = connection->addrlen;
461   return GNUNET_OK;
462 }
463
464
465 /**
466  * Tell the receiver callback that we had an IO error.
467  *
468  * @param connection connection to signal error
469  * @param errcode error code to send
470  */
471 static void
472 signal_receive_error (struct GNUNET_CONNECTION_Handle *connection,
473                       int errcode)
474 {
475   GNUNET_CONNECTION_Receiver receiver;
476
477   LOG (GNUNET_ERROR_TYPE_DEBUG,
478        "Receive encounters error (%s), connection closed (%p)\n",
479        STRERROR (errcode),
480        connection);
481   GNUNET_assert (NULL != (receiver = connection->receiver));
482   connection->receiver = NULL;
483   receiver (connection->receiver_cls,
484             NULL,
485             0,
486             connection->addr,
487             connection->addrlen,
488             errcode);
489 }
490
491
492 /**
493  * Tell the receiver callback that a timeout was reached.
494  *
495  * @param connection connection to signal for
496  */
497 static void
498 signal_receive_timeout (struct GNUNET_CONNECTION_Handle *connection)
499 {
500   GNUNET_CONNECTION_Receiver receiver;
501
502   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection signals timeout to receiver (%p)!\n",
503        connection);
504   GNUNET_assert (NULL != (receiver = connection->receiver));
505   connection->receiver = NULL;
506   receiver (connection->receiver_cls, NULL, 0, NULL, 0, 0);
507 }
508
509
510 /**
511  * We failed to transmit data to the service, signal the error.
512  *
513  * @param connection handle that had trouble
514  * @param ecode error code (errno)
515  */
516 static void
517 signal_transmit_error (struct GNUNET_CONNECTION_Handle *connection,
518                        int ecode)
519 {
520   GNUNET_CONNECTION_TransmitReadyNotify notify;
521
522   LOG (GNUNET_ERROR_TYPE_DEBUG,
523        "Transmission encounterd error (%s), connection closed (%p)\n",
524        STRERROR (ecode),
525        connection);
526   if (NULL != connection->sock)
527   {
528     (void) GNUNET_NETWORK_socket_shutdown (connection->sock, SHUT_RDWR);
529     GNUNET_break (GNUNET_OK ==
530                   GNUNET_NETWORK_socket_close (connection->sock));
531     connection->sock = NULL;
532     GNUNET_assert (NULL == connection->write_task);
533   }
534   if (NULL != connection->read_task)
535   {
536     /* send errors trigger read errors... */
537     GNUNET_SCHEDULER_cancel (connection->read_task);
538     connection->read_task = NULL;
539     signal_receive_timeout (connection);
540     return;
541   }
542   if (NULL == connection->nth.notify_ready)
543     return;                     /* nobody to tell about it */
544   notify = connection->nth.notify_ready;
545   connection->nth.notify_ready = NULL;
546   notify (connection->nth.notify_ready_cls, 0, NULL);
547 }
548
549
550 /**
551  * We've failed for good to establish a connection (timeout or
552  * no more addresses to try).
553  *
554  * @param connection the connection we tried to establish
555  */
556 static void
557 connect_fail_continuation (struct GNUNET_CONNECTION_Handle *connection)
558 {
559   LOG (GNUNET_ERROR_TYPE_INFO,
560        _("Failed to establish TCP connection to `%s:%u', no further addresses to try.\n"),
561        connection->hostname, connection->port);
562   GNUNET_break (NULL == connection->ap_head);
563   GNUNET_break (NULL == connection->ap_tail);
564   GNUNET_break (GNUNET_NO == connection->dns_active);
565   GNUNET_break (NULL == connection->sock);
566   GNUNET_assert (NULL == connection->write_task);
567
568   /* signal errors for jobs that used to wait on the connection */
569   connection->destroy_later = 1;
570   if (NULL != connection->receiver)
571     signal_receive_error (connection,
572                           ECONNREFUSED);
573   if (NULL != connection->nth.notify_ready)
574   {
575     GNUNET_assert (NULL != connection->nth.timeout_task);
576     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
577     connection->nth.timeout_task = NULL;
578     signal_transmit_error (connection,
579                            ECONNREFUSED);
580   }
581   if (-1 == connection->destroy_later)
582   {
583     /* do it now */
584     connection->destroy_later = 0;
585     GNUNET_CONNECTION_destroy (connection);
586     return;
587   }
588   connection->destroy_later = 0;
589 }
590
591
592 /**
593  * We are ready to transmit (or got a timeout).
594  *
595  * @param cls our connection handle
596  * @param tc task context describing why we are here
597  */
598 static void
599 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
600
601
602 /**
603  * This function is called once we either timeout or have data ready
604  * to read.
605  *
606  * @param cls connection to read from
607  * @param tc scheduler context
608  */
609 static void
610 receive_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
611
612
613 /**
614  * We've succeeded in establishing a connection.
615  *
616  * @param connection the connection we tried to establish
617  */
618 static void
619 connect_success_continuation (struct GNUNET_CONNECTION_Handle *connection)
620 {
621   LOG (GNUNET_ERROR_TYPE_DEBUG,
622        "Connection to `%s' succeeded! (%p)\n",
623        GNUNET_a2s (connection->addr, connection->addrlen), connection);
624   /* trigger jobs that waited for the connection */
625   if (NULL != connection->receiver)
626   {
627     LOG (GNUNET_ERROR_TYPE_DEBUG,
628          "Connection succeeded, starting with receiving data (%p)\n",
629          connection);
630     GNUNET_assert (NULL == connection->read_task);
631     connection->read_task =
632       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
633                                      (connection->receive_timeout), connection->sock,
634                                      &receive_ready, connection);
635   }
636   if (NULL != connection->nth.notify_ready)
637   {
638     LOG (GNUNET_ERROR_TYPE_DEBUG,
639          "Connection succeeded, starting with sending data (%p)\n",
640          connection);
641     GNUNET_assert (connection->nth.timeout_task != NULL);
642     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
643     connection->nth.timeout_task = NULL;
644     GNUNET_assert (connection->write_task == NULL);
645     connection->write_task =
646         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
647                                         (connection->nth.transmit_timeout), connection->sock,
648                                         &transmit_ready, connection);
649   }
650 }
651
652
653 /**
654  * Scheduler let us know that we're either ready to write on the
655  * socket OR connect timed out.  Do the right thing.
656  *
657  * @param cls the "struct AddressProbe*" with the address that we are probing
658  * @param tc success or failure info about the connect attempt.
659  */
660 static void
661 connect_probe_continuation (void *cls,
662                             const struct GNUNET_SCHEDULER_TaskContext *tc)
663 {
664   struct AddressProbe *ap = cls;
665   struct GNUNET_CONNECTION_Handle *connection = ap->connection;
666   struct AddressProbe *pos;
667   int error;
668   socklen_t len;
669
670   GNUNET_assert (NULL != ap->sock);
671   GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, ap);
672   len = sizeof (error);
673   errno = 0;
674   error = 0;
675   if ((0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
676       (GNUNET_OK !=
677        GNUNET_NETWORK_socket_getsockopt (ap->sock, SOL_SOCKET, SO_ERROR, &error,
678                                          &len)) || (0 != error))
679   {
680     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock));
681     GNUNET_free (ap);
682     if ((NULL == connection->ap_head) && (GNUNET_NO == connection->dns_active))
683       connect_fail_continuation (connection);
684     return;
685   }
686   GNUNET_assert (NULL == connection->sock);
687   connection->sock = ap->sock;
688   GNUNET_assert (NULL == connection->addr);
689   connection->addr = GNUNET_malloc (ap->addrlen);
690   memcpy (connection->addr, ap->addr, ap->addrlen);
691   connection->addrlen = ap->addrlen;
692   GNUNET_free (ap);
693   /* cancel all other attempts */
694   while (NULL != (pos = connection->ap_head))
695   {
696     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
697     GNUNET_SCHEDULER_cancel (pos->task);
698     GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, pos);
699     GNUNET_free (pos);
700   }
701   connect_success_continuation (connection);
702 }
703
704
705 /**
706  * Try to establish a connection given the specified address.
707  * This function is called by the resolver once we have a DNS reply.
708  *
709  * @param cls our "struct GNUNET_CONNECTION_Handle *"
710  * @param addr address to try, NULL for "last call"
711  * @param addrlen length of addr
712  */
713 static void
714 try_connect_using_address (void *cls, const struct sockaddr *addr,
715                            socklen_t addrlen)
716 {
717   struct GNUNET_CONNECTION_Handle *connection = cls;
718   struct AddressProbe *ap;
719   struct GNUNET_TIME_Relative delay;
720
721   if (NULL == addr)
722   {
723     connection->dns_active = NULL;
724     if ((NULL == connection->ap_head) && (NULL == connection->sock))
725       connect_fail_continuation (connection);
726     return;
727   }
728   if (NULL != connection->sock)
729     return;                     /* already connected */
730   GNUNET_assert (NULL == connection->addr);
731   /* try to connect */
732   LOG (GNUNET_ERROR_TYPE_DEBUG,
733        "Trying to connect using address `%s:%u/%s:%u'\n", connection->hostname, connection->port,
734        GNUNET_a2s (addr, addrlen), connection->port);
735   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
736   ap->addr = (const struct sockaddr *) &ap[1];
737   memcpy (&ap[1], addr, addrlen);
738   ap->addrlen = addrlen;
739   ap->connection = connection;
740
741   switch (ap->addr->sa_family)
742   {
743   case AF_INET:
744     ((struct sockaddr_in *) ap->addr)->sin_port = htons (connection->port);
745     break;
746   case AF_INET6:
747     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (connection->port);
748     break;
749   default:
750     GNUNET_break (0);
751     GNUNET_free (ap);
752     return;                     /* not supported by us */
753   }
754   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family, SOCK_STREAM, 0);
755   if (NULL == ap->sock)
756   {
757     GNUNET_free (ap);
758     return;                     /* not supported by OS */
759   }
760   LOG (GNUNET_ERROR_TYPE_INFO, _("Trying to connect to `%s' (%p)\n"),
761        GNUNET_a2s (ap->addr, ap->addrlen), connection);
762   if ((GNUNET_OK !=
763        GNUNET_NETWORK_socket_connect (ap->sock, ap->addr, ap->addrlen)) &&
764       (EINPROGRESS != errno))
765   {
766     /* maybe refused / unsupported address, try next */
767     LOG_STRERROR (GNUNET_ERROR_TYPE_INFO, "connect");
768     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock));
769     GNUNET_free (ap);
770     return;
771   }
772   GNUNET_CONTAINER_DLL_insert (connection->ap_head, connection->ap_tail, ap);
773   delay = GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT;
774   if (NULL != connection->nth.notify_ready)
775     delay =
776         GNUNET_TIME_relative_min (delay,
777                                   GNUNET_TIME_absolute_get_remaining (connection->
778                                                                       nth.transmit_timeout));
779   if (NULL != connection->receiver)
780     delay =
781         GNUNET_TIME_relative_min (delay,
782                                   GNUNET_TIME_absolute_get_remaining
783                                   (connection->receive_timeout));
784   ap->task =
785       GNUNET_SCHEDULER_add_write_net (delay, ap->sock,
786                                       &connect_probe_continuation, ap);
787 }
788
789
790 /**
791  * Create a connection handle by (asynchronously) connecting to a host.
792  * This function returns immediately, even if the connection has not
793  * yet been established.  This function only creates TCP connections.
794  *
795  * @param cfg configuration to use
796  * @param hostname name of the host to connect to
797  * @param port port to connect to
798  * @return the connection handle
799  */
800 struct GNUNET_CONNECTION_Handle *
801 GNUNET_CONNECTION_create_from_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
802                                        const char *hostname,
803                                        uint16_t port)
804 {
805   struct GNUNET_CONNECTION_Handle *connection;
806
807   GNUNET_assert (0 < strlen (hostname));        /* sanity check */
808   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
809   connection->cfg = cfg;
810   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
811   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
812   connection->port = port;
813   connection->hostname = GNUNET_strdup (hostname);
814   connection->dns_active =
815       GNUNET_RESOLVER_ip_get (connection->hostname, AF_UNSPEC,
816                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
817                               &try_connect_using_address, connection);
818   return connection;
819 }
820
821
822 /**
823  * Create a connection handle by connecting to a UNIX domain service.
824  * This function returns immediately, even if the connection has not
825  * yet been established.  This function only creates UNIX connections.
826  *
827  * @param cfg configuration to use
828  * @param unixpath path to connect to
829  * @return the connection handle, NULL on systems without UNIX support
830  */
831 struct GNUNET_CONNECTION_Handle *
832 GNUNET_CONNECTION_create_from_connect_to_unixpath (const struct
833                                                    GNUNET_CONFIGURATION_Handle
834                                                    *cfg, const char *unixpath)
835 {
836 #ifdef AF_UNIX
837   struct GNUNET_CONNECTION_Handle *connection;
838   struct sockaddr_un *un;
839
840   GNUNET_assert (0 < strlen (unixpath));        /* sanity check */
841   un = GNUNET_new (struct sockaddr_un);
842   un->sun_family = AF_UNIX;
843   strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
844 #ifdef LINUX
845   {
846     int abstract;
847
848     abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg, "TESTING",
849                                                      "USE_ABSTRACT_SOCKETS");
850     if (GNUNET_YES == abstract)
851       un->sun_path[0] = '\0';
852   }
853 #endif
854 #if HAVE_SOCKADDR_IN_SIN_LEN
855   un->sun_len = (u_char) sizeof (struct sockaddr_un);
856 #endif
857   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
858   connection->cfg = cfg;
859   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
860   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
861   connection->port = 0;
862   connection->hostname = NULL;
863   connection->addr = (struct sockaddr *) un;
864   connection->addrlen = sizeof (struct sockaddr_un);
865   connection->sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_STREAM, 0);
866   if (NULL == connection->sock)
867   {
868     GNUNET_free (connection->addr);
869     GNUNET_free (connection->write_buffer);
870     GNUNET_free (connection);
871     return NULL;
872   }
873   if ( (GNUNET_OK !=
874         GNUNET_NETWORK_socket_connect (connection->sock, connection->addr, connection->addrlen)) &&
875        (EINPROGRESS != errno) )
876   {
877     /* Just return; we expect everything to work eventually so don't fail HARD */
878     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (connection->sock));
879     connection->sock = NULL;
880     return connection;
881   }
882   connect_success_continuation (connection);
883   return connection;
884 #else
885   return NULL;
886 #endif
887 }
888
889
890 /**
891  * Create a connection handle by (asynchronously) connecting to a host.
892  * This function returns immediately, even if the connection has not
893  * yet been established.  This function only creates TCP connections.
894  *
895  * @param s socket to connect
896  * @param serv_addr server address
897  * @param addrlen length of server address
898  * @return the connection handle
899  */
900 struct GNUNET_CONNECTION_Handle *
901 GNUNET_CONNECTION_connect_socket (struct GNUNET_NETWORK_Handle *s,
902                                   const struct sockaddr *serv_addr,
903                                   socklen_t addrlen)
904 {
905   struct GNUNET_CONNECTION_Handle *connection;
906
907   if ((GNUNET_OK != GNUNET_NETWORK_socket_connect (s, serv_addr, addrlen)) &&
908       (EINPROGRESS != errno))
909   {
910     /* maybe refused / unsupported address, try next */
911     LOG_STRERROR (GNUNET_ERROR_TYPE_INFO, "connect");
912     LOG (GNUNET_ERROR_TYPE_INFO,
913          _("Attempt to connect to `%s' failed\n"),
914          GNUNET_a2s (serv_addr, addrlen));
915     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
916     return NULL;
917   }
918   connection = GNUNET_CONNECTION_create_from_existing (s);
919   connection->addr = GNUNET_malloc (addrlen);
920   memcpy (connection->addr, serv_addr, addrlen);
921   connection->addrlen = addrlen;
922   LOG (GNUNET_ERROR_TYPE_INFO,
923        _("Trying to connect to `%s' (%p)\n"),
924        GNUNET_a2s (serv_addr, addrlen), connection);
925   return connection;
926 }
927
928
929 /**
930  * Create a connection handle by creating a socket and
931  * (asynchronously) connecting to a host.  This function returns
932  * immediately, even if the connection has not yet been established.
933  * This function only creates TCP connections.
934  *
935  * @param af_family address family to use
936  * @param serv_addr server address
937  * @param addrlen length of @a serv_addr
938  * @return the connection handle
939  */
940 struct GNUNET_CONNECTION_Handle *
941 GNUNET_CONNECTION_create_from_sockaddr (int af_family,
942                                         const struct sockaddr *serv_addr,
943                                         socklen_t addrlen)
944 {
945   struct GNUNET_NETWORK_Handle *s;
946
947   s = GNUNET_NETWORK_socket_create (af_family, SOCK_STREAM, 0);
948   if (NULL == s)
949   {
950     LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, "socket");
951     return NULL;
952   }
953   return GNUNET_CONNECTION_connect_socket (s, serv_addr, addrlen);
954 }
955
956
957 /**
958  * Check if connection is valid (no fatal errors have happened so far).
959  * Note that a connection that is still trying to connect is considered
960  * valid.
961  *
962  * @param connection connection to check
963  * @return #GNUNET_YES if valid, #GNUNET_NO otherwise
964  */
965 int
966 GNUNET_CONNECTION_check (struct GNUNET_CONNECTION_Handle *connection)
967 {
968   if ((NULL != connection->ap_head) || (NULL != connection->dns_active))
969     return GNUNET_YES;          /* still trying to connect */
970   return (NULL == connection->sock) ? GNUNET_NO : GNUNET_YES;
971 }
972
973
974 /**
975  * Close the connection and free associated resources.  There must
976  * not be any pending requests for reading or writing to the
977  * connection at this time.
978  *
979  * @param connection connection to destroy
980  */
981 void
982 GNUNET_CONNECTION_destroy (struct GNUNET_CONNECTION_Handle *connection)
983 {
984   struct AddressProbe *pos;
985
986   if (0 != connection->destroy_later)
987   {
988     connection->destroy_later = -1;
989     return;
990   }
991   LOG (GNUNET_ERROR_TYPE_DEBUG,
992        "Shutting down connection (%p)\n",
993        connection);
994   GNUNET_assert (NULL == connection->nth.notify_ready);
995   GNUNET_assert (NULL == connection->receiver);
996   if (NULL != connection->write_task)
997   {
998     GNUNET_SCHEDULER_cancel (connection->write_task);
999     connection->write_task = NULL;
1000     connection->write_buffer_off = 0;
1001   }
1002   if (NULL != connection->read_task)
1003   {
1004     GNUNET_SCHEDULER_cancel (connection->read_task);
1005     connection->read_task = NULL;
1006   }
1007   if (NULL != connection->nth.timeout_task)
1008   {
1009     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
1010     connection->nth.timeout_task = NULL;
1011   }
1012   connection->nth.notify_ready = NULL;
1013   if (NULL != connection->dns_active)
1014   {
1015     GNUNET_RESOLVER_request_cancel (connection->dns_active);
1016     connection->dns_active = NULL;
1017   }
1018   while (NULL != (pos = connection->ap_head))
1019   {
1020     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
1021     GNUNET_SCHEDULER_cancel (pos->task);
1022     GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, pos);
1023     GNUNET_free (pos);
1024   }
1025   if ( (NULL != connection->sock) &&
1026        (GNUNET_YES != connection->persist) )
1027   {
1028     if ((GNUNET_OK !=
1029          GNUNET_NETWORK_socket_shutdown (connection->sock,
1030                                          SHUT_RDWR)) &&
1031         (ENOTCONN != errno) &&
1032         (ECONNRESET != errno) )
1033       LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
1034                     "shutdown");
1035   }
1036   if (NULL != connection->sock)
1037   {
1038     if (GNUNET_YES != connection->persist)
1039     {
1040       GNUNET_break (GNUNET_OK ==
1041                     GNUNET_NETWORK_socket_close (connection->sock));
1042     }
1043     else
1044     {
1045       GNUNET_NETWORK_socket_free_memory_only_ (connection->sock); /* at least no memory leak (we deliberately
1046                                                                    * leak the socket in this special case) ... */
1047     }
1048   }
1049   GNUNET_free_non_null (connection->addr);
1050   GNUNET_free_non_null (connection->hostname);
1051   GNUNET_free (connection->write_buffer);
1052   GNUNET_free (connection);
1053 }
1054
1055
1056 /**
1057  * This function is called once we either timeout
1058  * or have data ready to read.
1059  *
1060  * @param cls connection to read from
1061  * @param tc scheduler context
1062  */
1063 static void
1064 receive_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1065 {
1066   struct GNUNET_CONNECTION_Handle *connection = cls;
1067   char buffer[connection->max];
1068   ssize_t ret;
1069   GNUNET_CONNECTION_Receiver receiver;
1070
1071   connection->read_task = NULL;
1072   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1073   {
1074     /* ignore shutdown request, go again immediately */
1075     connection->read_task =
1076         GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
1077                                        (connection->receive_timeout), connection->sock,
1078                                        &receive_ready, connection);
1079     return;
1080   }
1081   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1082   {
1083     LOG (GNUNET_ERROR_TYPE_DEBUG,
1084          "Receive from `%s' encounters error: timeout (%s, %p)\n",
1085          GNUNET_a2s (connection->addr, connection->addrlen),
1086          GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (connection->receive_timeout), GNUNET_YES),
1087          connection);
1088     signal_receive_timeout (connection);
1089     return;
1090   }
1091   if (NULL == connection->sock)
1092   {
1093     /* connect failed for good */
1094     signal_receive_error (connection, ECONNREFUSED);
1095     return;
1096   }
1097   GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->read_ready, connection->sock));
1098 RETRY:
1099   ret = GNUNET_NETWORK_socket_recv (connection->sock,
1100                                     buffer,
1101                                     connection->max);
1102   if (-1 == ret)
1103   {
1104     if (EINTR == errno)
1105       goto RETRY;
1106     signal_receive_error (connection, errno);
1107     return;
1108   }
1109   LOG (GNUNET_ERROR_TYPE_DEBUG,
1110        "receive_ready read %u/%u bytes from `%s' (%p)!\n",
1111        (unsigned int) ret,
1112        connection->max,
1113        GNUNET_a2s (connection->addr,
1114                    connection->addrlen),
1115        connection);
1116   GNUNET_assert (NULL != (receiver = connection->receiver));
1117   connection->receiver = NULL;
1118   receiver (connection->receiver_cls,
1119             buffer,
1120             ret,
1121             connection->addr,
1122             connection->addrlen,
1123             0);
1124 }
1125
1126
1127 /**
1128  * Receive data from the given connection.  Note that this function will
1129  * call "receiver" asynchronously using the scheduler.  It will
1130  * "immediately" return.  Note that there MUST only be one active
1131  * receive call per connection at any given point in time (so do not
1132  * call receive again until the receiver callback has been invoked).
1133  *
1134  * @param connection connection handle
1135  * @param max maximum number of bytes to read
1136  * @param timeout maximum amount of time to wait
1137  * @param receiver function to call with received data
1138  * @param receiver_cls closure for receiver
1139  */
1140 void
1141 GNUNET_CONNECTION_receive (struct GNUNET_CONNECTION_Handle *connection, size_t max,
1142                            struct GNUNET_TIME_Relative timeout,
1143                            GNUNET_CONNECTION_Receiver receiver,
1144                            void *receiver_cls)
1145 {
1146   GNUNET_assert ((NULL == connection->read_task) &&
1147                  (NULL == connection->receiver));
1148   GNUNET_assert (NULL != receiver);
1149   connection->receiver = receiver;
1150   connection->receiver_cls = receiver_cls;
1151   connection->receive_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1152   connection->max = max;
1153   if (NULL != connection->sock)
1154   {
1155     connection->read_task =
1156       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
1157                                      (connection->receive_timeout), connection->sock,
1158                                      &receive_ready, connection);
1159     return;
1160   }
1161   if ((NULL == connection->dns_active) && (NULL == connection->ap_head))
1162   {
1163     connection->receiver = NULL;
1164     receiver (receiver_cls, NULL, 0, NULL, 0, ETIMEDOUT);
1165     return;
1166   }
1167 }
1168
1169
1170 /**
1171  * Cancel receive job on the given connection.  Note that the
1172  * receiver callback must not have been called yet in order
1173  * for the cancellation to be valid.
1174  *
1175  * @param connection connection handle
1176  * @return closure of the original receiver callback closure
1177  */
1178 void *
1179 GNUNET_CONNECTION_receive_cancel (struct GNUNET_CONNECTION_Handle *connection)
1180 {
1181   if (NULL != connection->read_task)
1182   {
1183     GNUNET_assert (connection == GNUNET_SCHEDULER_cancel (connection->read_task));
1184     connection->read_task = NULL;
1185   }
1186   connection->receiver = NULL;
1187   return connection->receiver_cls;
1188 }
1189
1190
1191 /**
1192  * Try to call the transmit notify method (check if we do
1193  * have enough space available first)!
1194  *
1195  * @param connection connection for which we should do this processing
1196  * @return GNUNET_YES if we were able to call notify
1197  */
1198 static int
1199 process_notify (struct GNUNET_CONNECTION_Handle *connection)
1200 {
1201   size_t used;
1202   size_t avail;
1203   size_t size;
1204   GNUNET_CONNECTION_TransmitReadyNotify notify;
1205
1206   LOG (GNUNET_ERROR_TYPE_DEBUG, "process_notify is running\n");
1207
1208   GNUNET_assert (NULL == connection->write_task);
1209   if (NULL == (notify = connection->nth.notify_ready))
1210   {
1211     LOG (GNUNET_ERROR_TYPE_DEBUG, "Noone to notify\n");
1212     return GNUNET_NO;
1213   }
1214   used = connection->write_buffer_off - connection->write_buffer_pos;
1215   avail = connection->write_buffer_size - used;
1216   size = connection->nth.notify_size;
1217   if (size > avail)
1218   {
1219     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not enough buffer\n");
1220     return GNUNET_NO;
1221   }
1222   connection->nth.notify_ready = NULL;
1223   if (connection->write_buffer_size - connection->write_buffer_off < size)
1224   {
1225     /* need to compact */
1226     memmove (connection->write_buffer, &connection->write_buffer[connection->write_buffer_pos],
1227              used);
1228     connection->write_buffer_off -= connection->write_buffer_pos;
1229     connection->write_buffer_pos = 0;
1230   }
1231   avail = connection->write_buffer_size - connection->write_buffer_off;
1232   GNUNET_assert (avail >= size);
1233   size =
1234       notify (connection->nth.notify_ready_cls, avail,
1235               &connection->write_buffer[connection->write_buffer_off]);
1236   GNUNET_assert (size <= avail);
1237   if (0 != size)
1238     connection->write_buffer_off += size;
1239   return GNUNET_YES;
1240 }
1241
1242
1243 /**
1244  * Task invoked by the scheduler when a call to transmit
1245  * is timing out (we never got enough buffer space to call
1246  * the callback function before the specified timeout
1247  * expired).
1248  *
1249  * This task notifies the client about the timeout.
1250  *
1251  * @param cls the 'struct GNUNET_CONNECTION_Handle'
1252  * @param tc scheduler context
1253  */
1254 static void
1255 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1256 {
1257   struct GNUNET_CONNECTION_Handle *connection = cls;
1258   GNUNET_CONNECTION_TransmitReadyNotify notify;
1259
1260   connection->nth.timeout_task = NULL;
1261   LOG (GNUNET_ERROR_TYPE_DEBUG,
1262        "Transmit to `%s:%u/%s' fails, time out reached (%p).\n",
1263        connection->hostname,
1264        connection->port, GNUNET_a2s (connection->addr, connection->addrlen), connection);
1265   notify = connection->nth.notify_ready;
1266   GNUNET_assert (NULL != notify);
1267   connection->nth.notify_ready = NULL;
1268   notify (connection->nth.notify_ready_cls, 0, NULL);
1269 }
1270
1271
1272 /**
1273  * Task invoked by the scheduler when we failed to connect
1274  * at the time of being asked to transmit.
1275  *
1276  * This task notifies the client about the error.
1277  *
1278  * @param cls the 'struct GNUNET_CONNECTION_Handle'
1279  * @param tc scheduler context
1280  */
1281 static void
1282 connect_error (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1283 {
1284   struct GNUNET_CONNECTION_Handle *connection = cls;
1285   GNUNET_CONNECTION_TransmitReadyNotify notify;
1286
1287   LOG (GNUNET_ERROR_TYPE_DEBUG,
1288        "Transmission request of size %u fails (%s/%u), connection failed (%p).\n",
1289        connection->nth.notify_size, connection->hostname, connection->port, connection);
1290   connection->write_task = NULL;
1291   notify = connection->nth.notify_ready;
1292   connection->nth.notify_ready = NULL;
1293   notify (connection->nth.notify_ready_cls, 0, NULL);
1294 }
1295
1296
1297 /**
1298  * We are ready to transmit (or got a timeout).
1299  *
1300  * @param cls our connection handle
1301  * @param tc task context describing why we are here
1302  */
1303 static void
1304 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1305 {
1306   struct GNUNET_CONNECTION_Handle *connection = cls;
1307   GNUNET_CONNECTION_TransmitReadyNotify notify;
1308   ssize_t ret;
1309   size_t have;
1310
1311   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_ready running (%p).\n", connection);
1312   GNUNET_assert (NULL != connection->write_task);
1313   connection->write_task = NULL;
1314   GNUNET_assert (NULL == connection->nth.timeout_task);
1315   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1316   {
1317     if (NULL != connection->sock)
1318       goto SCHEDULE_WRITE;      /* ignore shutdown, go again immediately */
1319     LOG (GNUNET_ERROR_TYPE_DEBUG,
1320          "Transmit to `%s' fails, shutdown happened (%p).\n",
1321          GNUNET_a2s (connection->addr, connection->addrlen), connection);
1322     notify = connection->nth.notify_ready;
1323     if (NULL != notify)
1324     {
1325       connection->nth.notify_ready = NULL;
1326       notify (connection->nth.notify_ready_cls, 0, NULL);
1327     }
1328     return;
1329   }
1330   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1331   {
1332     LOG (GNUNET_ERROR_TYPE_DEBUG,
1333          "Transmit to `%s' fails, time out reached (%p).\n",
1334          GNUNET_a2s (connection->addr, connection->addrlen), connection);
1335     notify = connection->nth.notify_ready;
1336     GNUNET_assert (NULL != notify);
1337     connection->nth.notify_ready = NULL;
1338     notify (connection->nth.notify_ready_cls, 0, NULL);
1339     return;
1340   }
1341   GNUNET_assert (NULL != connection->sock);
1342   if (NULL == tc->write_ready)
1343   {
1344     /* special circumstances (in particular, PREREQ_DONE after
1345      * connect): not yet ready to write, but no "fatal" error either.
1346      * Hence retry.  */
1347     goto SCHEDULE_WRITE;
1348   }
1349   if (!GNUNET_NETWORK_fdset_isset (tc->write_ready, connection->sock))
1350   {
1351     GNUNET_assert (NULL == connection->write_task);
1352     /* special circumstances (in particular, shutdown): not yet ready
1353      * to write, but no "fatal" error either.  Hence retry.  */
1354     goto SCHEDULE_WRITE;
1355   }
1356   GNUNET_assert (connection->write_buffer_off >= connection->write_buffer_pos);
1357   if ((NULL != connection->nth.notify_ready) &&
1358       (connection->write_buffer_size < connection->nth.notify_size))
1359   {
1360     connection->write_buffer =
1361         GNUNET_realloc (connection->write_buffer, connection->nth.notify_size);
1362     connection->write_buffer_size = connection->nth.notify_size;
1363   }
1364   process_notify (connection);
1365   have = connection->write_buffer_off - connection->write_buffer_pos;
1366   if (0 == have)
1367   {
1368     /* no data ready for writing, terminate write loop */
1369     return;
1370   }
1371   GNUNET_assert (have <= connection->write_buffer_size);
1372   GNUNET_assert (have + connection->write_buffer_pos <= connection->write_buffer_size);
1373   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1374 RETRY:
1375   ret =
1376       GNUNET_NETWORK_socket_send (connection->sock,
1377                                   &connection->write_buffer[connection->write_buffer_pos],
1378                                   have);
1379   if (-1 == ret)
1380   {
1381     if (EINTR == errno)
1382       goto RETRY;
1383     if (NULL != connection->write_task)
1384     {
1385       GNUNET_SCHEDULER_cancel (connection->write_task);
1386       connection->write_task = NULL;
1387     }
1388     signal_transmit_error (connection, errno);
1389     return;
1390   }
1391   LOG (GNUNET_ERROR_TYPE_DEBUG,
1392        "Connection transmitted %u/%u bytes to `%s' (%p)\n",
1393        (unsigned int) ret, have, GNUNET_a2s (connection->addr, connection->addrlen), connection);
1394   connection->write_buffer_pos += ret;
1395   if (connection->write_buffer_pos == connection->write_buffer_off)
1396   {
1397     /* transmitted all pending data */
1398     connection->write_buffer_pos = 0;
1399     connection->write_buffer_off = 0;
1400   }
1401   if ((0 == connection->write_buffer_off) && (NULL == connection->nth.notify_ready))
1402     return;                     /* all data sent! */
1403   /* not done writing, schedule more */
1404 SCHEDULE_WRITE:
1405   LOG (GNUNET_ERROR_TYPE_DEBUG,
1406        "Re-scheduling transmit_ready (more to do) (%p).\n", connection);
1407   have = connection->write_buffer_off - connection->write_buffer_pos;
1408   GNUNET_assert ((NULL != connection->nth.notify_ready) || (have > 0));
1409   if (NULL == connection->write_task)
1410     connection->write_task =
1411         GNUNET_SCHEDULER_add_write_net ((connection->nth.notify_ready ==
1412                                          NULL) ? GNUNET_TIME_UNIT_FOREVER_REL :
1413                                         GNUNET_TIME_absolute_get_remaining
1414                                         (connection->nth.transmit_timeout),
1415                                         connection->sock, &transmit_ready, connection);
1416 }
1417
1418
1419 /**
1420  * Ask the connection to call us once the specified number of bytes
1421  * are free in the transmission buffer.  Will never call the @a notify
1422  * callback in this task, but always first go into the scheduler.
1423  *
1424  * @param connection connection
1425  * @param size number of bytes to send
1426  * @param timeout after how long should we give up (and call
1427  *        notify with buf NULL and size 0)?
1428  * @param notify function to call
1429  * @param notify_cls closure for @a notify
1430  * @return non-NULL if the notify callback was queued,
1431  *         NULL if we are already going to notify someone else (busy)
1432  */
1433 struct GNUNET_CONNECTION_TransmitHandle *
1434 GNUNET_CONNECTION_notify_transmit_ready (struct GNUNET_CONNECTION_Handle *connection,
1435                                          size_t size,
1436                                          struct GNUNET_TIME_Relative timeout,
1437                                          GNUNET_CONNECTION_TransmitReadyNotify
1438                                          notify, void *notify_cls)
1439 {
1440   if (NULL != connection->nth.notify_ready)
1441   {
1442     GNUNET_assert (0);
1443     return NULL;
1444   }
1445   GNUNET_assert (NULL != notify);
1446   GNUNET_assert (size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1447   GNUNET_assert (connection->write_buffer_off <= connection->write_buffer_size);
1448   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1449   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_off);
1450   connection->nth.notify_ready = notify;
1451   connection->nth.notify_ready_cls = notify_cls;
1452   connection->nth.connection = connection;
1453   connection->nth.notify_size = size;
1454   connection->nth.transmit_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1455   GNUNET_assert (NULL == connection->nth.timeout_task);
1456   if ((NULL == connection->sock) &&
1457       (NULL == connection->ap_head) &&
1458       (NULL == connection->dns_active))
1459   {
1460     if (NULL != connection->write_task)
1461       GNUNET_SCHEDULER_cancel (connection->write_task);
1462     connection->write_task = GNUNET_SCHEDULER_add_now (&connect_error,
1463                                                        connection);
1464     return &connection->nth;
1465   }
1466   if (NULL != connection->write_task)
1467     return &connection->nth; /* previous transmission still in progress */
1468   if (NULL != connection->sock)
1469   {
1470     /* connected, try to transmit now */
1471     LOG (GNUNET_ERROR_TYPE_DEBUG,
1472          "Scheduling transmission (%p).\n",
1473          connection);
1474     connection->write_task =
1475         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
1476                                         (connection->nth.transmit_timeout),
1477                                         connection->sock, &transmit_ready, connection);
1478     return &connection->nth;
1479   }
1480   /* not yet connected, wait for connection */
1481   LOG (GNUNET_ERROR_TYPE_DEBUG,
1482        "Need to wait to schedule transmission for connection, adding timeout task (%p).\n",
1483        connection);
1484   connection->nth.timeout_task =
1485     GNUNET_SCHEDULER_add_delayed (timeout,
1486                                   &transmit_timeout, connection);
1487   return &connection->nth;
1488 }
1489
1490
1491 /**
1492  * Cancel the specified transmission-ready notification.
1493  *
1494  * @param th notification to cancel
1495  */
1496 void
1497 GNUNET_CONNECTION_notify_transmit_ready_cancel (struct
1498                                                 GNUNET_CONNECTION_TransmitHandle
1499                                                 *th)
1500 {
1501   GNUNET_assert (NULL != th->notify_ready);
1502   th->notify_ready = NULL;
1503   if (NULL != th->timeout_task)
1504   {
1505     GNUNET_SCHEDULER_cancel (th->timeout_task);
1506     th->timeout_task = NULL;
1507   }
1508   if (NULL != th->connection->write_task)
1509   {
1510     GNUNET_SCHEDULER_cancel (th->connection->write_task);
1511     th->connection->write_task = NULL;
1512   }
1513 }
1514
1515 /* end of connection.c */