small API change: do no longer pass rarely needed GNUNET_SCHEDULER_TaskContext to...
[oweals/gnunet.git] / src / util / connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/connection.c
23  * @brief  TCP connection management
24  * @author Christian Grothoff
25  *
26  * This code is rather complex.  Only modify it if you
27  * 1) Have a NEW testcase showing that the new code
28  *    is needed and correct
29  * 2) All EXISTING testcases pass with the new code
30  * These rules should apply in general, but for this
31  * module they are VERY, VERY important.
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_resolver_service.h"
36
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
39
40 #define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util", syscall)
41
42
43 /**
44  * Transmission handle.  There can only be one for each connection.
45  */
46 struct GNUNET_CONNECTION_TransmitHandle
47 {
48
49   /**
50    * Function to call if the send buffer has notify_size
51    * bytes available.
52    */
53   GNUNET_CONNECTION_TransmitReadyNotify notify_ready;
54
55   /**
56    * Closure for notify_ready.
57    */
58   void *notify_ready_cls;
59
60   /**
61    * Our connection handle.
62    */
63   struct GNUNET_CONNECTION_Handle *connection;
64
65   /**
66    * Timeout for receiving (in absolute time).
67    */
68   struct GNUNET_TIME_Absolute transmit_timeout;
69
70   /**
71    * Task called on timeout.
72    */
73   struct GNUNET_SCHEDULER_Task * timeout_task;
74
75   /**
76    * At what number of bytes available in the
77    * write buffer should the notify method be called?
78    */
79   size_t notify_size;
80
81 };
82
83
84 /**
85  * During connect, we try multiple possible IP addresses
86  * to find out which one might work.
87  */
88 struct AddressProbe
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct AddressProbe *next;
95
96   /**
97    * This is a doubly-linked list.
98    */
99   struct AddressProbe *prev;
100
101   /**
102    * The address; do not free (allocated at the end of this struct).
103    */
104   const struct sockaddr *addr;
105
106   /**
107    * Underlying OS's socket.
108    */
109   struct GNUNET_NETWORK_Handle *sock;
110
111   /**
112    * Connection for which we are probing.
113    */
114   struct GNUNET_CONNECTION_Handle *connection;
115
116   /**
117    * Lenth of addr.
118    */
119   socklen_t addrlen;
120
121   /**
122    * Task waiting for the connection to finish connecting.
123    */
124   struct GNUNET_SCHEDULER_Task * task;
125 };
126
127
128 /**
129  * @brief handle for a network connection
130  */
131 struct GNUNET_CONNECTION_Handle
132 {
133
134   /**
135    * Configuration to use.
136    */
137   const struct GNUNET_CONFIGURATION_Handle *cfg;
138
139   /**
140    * Linked list of sockets we are currently trying out
141    * (during connect).
142    */
143   struct AddressProbe *ap_head;
144
145   /**
146    * Linked list of sockets we are currently trying out
147    * (during connect).
148    */
149   struct AddressProbe *ap_tail;
150
151   /**
152    * Network address of the other end-point, may be NULL.
153    */
154   struct sockaddr *addr;
155
156   /**
157    * Pointer to the hostname if connection was
158    * created using DNS lookup, otherwise NULL.
159    */
160   char *hostname;
161
162   /**
163    * Underlying OS's socket, set to NULL after fatal errors.
164    */
165   struct GNUNET_NETWORK_Handle *sock;
166
167   /**
168    * Function to call on data received, NULL if no receive is pending.
169    */
170   GNUNET_CONNECTION_Receiver receiver;
171
172   /**
173    * Closure for @e receiver.
174    */
175   void *receiver_cls;
176
177   /**
178    * Pointer to our write buffer.
179    */
180   char *write_buffer;
181
182   /**
183    * Current size of our @e write_buffer.
184    */
185   size_t write_buffer_size;
186
187   /**
188    * Current write-offset in @e write_buffer (where
189    * would we write next).
190    */
191   size_t write_buffer_off;
192
193   /**
194    * Current read-offset in @e write_buffer (how many
195    * bytes have already been sent).
196    */
197   size_t write_buffer_pos;
198
199   /**
200    * Length of @e addr.
201    */
202   socklen_t addrlen;
203
204   /**
205    * Read task that we may need to wait for.
206    */
207   struct GNUNET_SCHEDULER_Task *read_task;
208
209   /**
210    * Write task that we may need to wait for.
211    */
212   struct GNUNET_SCHEDULER_Task *write_task;
213
214   /**
215    * Handle to a pending DNS lookup request.
216    */
217   struct GNUNET_RESOLVER_RequestHandle *dns_active;
218
219   /**
220    * The handle we return for GNUNET_CONNECTION_notify_transmit_ready.
221    */
222   struct GNUNET_CONNECTION_TransmitHandle nth;
223
224   /**
225    * Timeout for receiving (in absolute time).
226    */
227   struct GNUNET_TIME_Absolute receive_timeout;
228
229   /**
230    * Maximum number of bytes to read (for receiving).
231    */
232   size_t max;
233
234   /**
235    * Port to connect to.
236    */
237   uint16_t port;
238
239   /**
240    * When shutdown, do not ever actually close the socket, but
241    * free resources.  Only should ever be set if using program
242    * termination as a signal (because only then will the leaked
243    * socket be freed!)
244    */
245   int8_t persist;
246
247   /**
248    * Usually 0.  Set to 1 if this handle is in use, and should
249    * #GNUNET_CONNECTION_destroy() be called right now, the action needs
250    * to be deferred by setting it to -1.
251    */
252   int8_t destroy_later;
253
254   /**
255    * Handle to subsequent connection after proxy handshake completes,
256    */
257   struct GNUNET_CONNECTION_Handle *proxy_handshake;
258
259 };
260
261
262 /**
263  * Set the persist option on this connection handle.  Indicates
264  * that the underlying socket or fd should never really be closed.
265  * Used for indicating process death.
266  *
267  * @param connection the connection to set persistent
268  */
269 void
270 GNUNET_CONNECTION_persist_ (struct GNUNET_CONNECTION_Handle *connection)
271 {
272   connection->persist = GNUNET_YES;
273 }
274
275
276 /**
277  * Disable the "CORK" feature for communication with the given connection,
278  * forcing the OS to immediately flush the buffer on transmission
279  * instead of potentially buffering multiple messages.  Essentially
280  * reduces the OS send buffers to zero.
281  * Used to make sure that the last messages sent through the connection
282  * reach the other side before the process is terminated.
283  *
284  * @param connection the connection to make flushing and blocking
285  * @return #GNUNET_OK on success
286  */
287 int
288 GNUNET_CONNECTION_disable_corking (struct GNUNET_CONNECTION_Handle *connection)
289 {
290   return GNUNET_NETWORK_socket_disable_corking (connection->sock);
291 }
292
293
294 /**
295  * Create a connection handle by boxing an existing OS socket.  The OS
296  * socket should henceforth be no longer used directly.
297  * #GNUNET_connection_destroy() will close it.
298  *
299  * @param osSocket existing socket to box
300  * @return the boxed connection handle
301  */
302 struct GNUNET_CONNECTION_Handle *
303 GNUNET_CONNECTION_create_from_existing (struct GNUNET_NETWORK_Handle *osSocket)
304 {
305   struct GNUNET_CONNECTION_Handle *connection;
306
307   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
308   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
309   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
310   connection->sock = osSocket;
311   return connection;
312 }
313
314
315 /**
316  * Create a connection handle by accepting on a listen socket.  This
317  * function may block if the listen socket has no connection ready.
318  *
319  * @param access_cb function to use to check if access is allowed
320  * @param access_cb_cls closure for @a access_cb
321  * @param lsock listen socket
322  * @return the connection handle, NULL on error
323  */
324 struct GNUNET_CONNECTION_Handle *
325 GNUNET_CONNECTION_create_from_accept (GNUNET_CONNECTION_AccessCheck access_cb,
326                                       void *access_cb_cls,
327                                       struct GNUNET_NETWORK_Handle *lsock)
328 {
329   struct GNUNET_CONNECTION_Handle *connection;
330   char addr[128];
331   socklen_t addrlen;
332   struct GNUNET_NETWORK_Handle *sock;
333   int aret;
334   struct sockaddr_in *v4;
335   struct sockaddr_in6 *v6;
336   struct sockaddr *sa;
337   void *uaddr;
338   struct GNUNET_CONNECTION_Credentials *gcp;
339   struct GNUNET_CONNECTION_Credentials gc;
340 #ifdef SO_PEERCRED
341   struct ucred uc;
342   socklen_t olen;
343 #endif
344
345   addrlen = sizeof (addr);
346   sock =
347       GNUNET_NETWORK_socket_accept (lsock, (struct sockaddr *) &addr, &addrlen);
348   if (NULL == sock)
349   {
350     if (EAGAIN != errno)
351       LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, "accept");
352     return NULL;
353   }
354   if ((addrlen > sizeof (addr)) || (addrlen < sizeof (sa_family_t)))
355   {
356     GNUNET_break (0);
357     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
358     return NULL;
359   }
360
361   sa = (struct sockaddr *) addr;
362   v6 = (struct sockaddr_in6 *) addr;
363   if ((AF_INET6 == sa->sa_family) && (IN6_IS_ADDR_V4MAPPED (&v6->sin6_addr)))
364   {
365     /* convert to V4 address */
366     v4 = GNUNET_new (struct sockaddr_in);
367     memset (v4, 0, sizeof (struct sockaddr_in));
368     v4->sin_family = AF_INET;
369 #if HAVE_SOCKADDR_IN_SIN_LEN
370     v4->sin_len = (u_char) sizeof (struct sockaddr_in);
371 #endif
372     memcpy (&v4->sin_addr,
373             &((char *) &v6->sin6_addr)[sizeof (struct in6_addr) -
374                                        sizeof (struct in_addr)],
375             sizeof (struct in_addr));
376     v4->sin_port = v6->sin6_port;
377     uaddr = v4;
378     addrlen = sizeof (struct sockaddr_in);
379   }
380   else
381   {
382     uaddr = GNUNET_malloc (addrlen);
383     memcpy (uaddr, addr, addrlen);
384   }
385   gcp = NULL;
386   gc.uid = 0;
387   gc.gid = 0;
388   if (AF_UNIX == sa->sa_family)
389   {
390 #if HAVE_GETPEEREID
391     /* most BSDs */
392     if (0 == getpeereid (GNUNET_NETWORK_get_fd (sock), &gc.uid, &gc.gid))
393       gcp = &gc;
394 #else
395 #ifdef SO_PEERCRED
396     /* largely traditional GNU/Linux */
397     olen = sizeof (uc);
398     if ((0 ==
399          getsockopt (GNUNET_NETWORK_get_fd (sock), SOL_SOCKET, SO_PEERCRED, &uc,
400                      &olen)) && (olen == sizeof (uc)))
401     {
402       gc.uid = uc.uid;
403       gc.gid = uc.gid;
404       gcp = &gc;
405     }
406 #else
407 #if HAVE_GETPEERUCRED
408     /* this is for Solaris 10 */
409     ucred_t *uc;
410
411     uc = NULL;
412     if (0 == getpeerucred (GNUNET_NETWORK_get_fd (sock), &uc))
413     {
414       gc.uid = ucred_geteuid (uc);
415       gc.gid = ucred_getegid (uc);
416       gcp = &gc;
417     }
418     ucred_free (uc);
419 #endif
420 #endif
421 #endif
422   }
423
424   if ((NULL != access_cb) &&
425       (GNUNET_YES != (aret = access_cb (access_cb_cls, gcp, uaddr, addrlen))))
426   {
427     if (GNUNET_NO == aret)
428       LOG (GNUNET_ERROR_TYPE_INFO,
429            _("Access denied to `%s'\n"),
430            GNUNET_a2s (uaddr,
431                        addrlen));
432     GNUNET_break (GNUNET_OK ==
433                   GNUNET_NETWORK_socket_shutdown (sock,
434                                                   SHUT_RDWR));
435     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
436     GNUNET_free (uaddr);
437     return NULL;
438   }
439   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
440   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
441   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
442   connection->addr = uaddr;
443   connection->addrlen = addrlen;
444   connection->sock = sock;
445   LOG (GNUNET_ERROR_TYPE_INFO,
446        _("Accepting connection from `%s': %p\n"),
447        GNUNET_a2s (uaddr, addrlen), connection);
448   return connection;
449 }
450
451
452 /**
453  * Obtain the network address of the other party.
454  *
455  * @param connection the client to get the address for
456  * @param addr where to store the address
457  * @param addrlen where to store the length of the @a addr
458  * @return #GNUNET_OK on success
459  */
460 int
461 GNUNET_CONNECTION_get_address (struct GNUNET_CONNECTION_Handle *connection,
462                                void **addr,
463                                size_t *addrlen)
464 {
465   if ((NULL == connection->addr) || (0 == connection->addrlen))
466     return GNUNET_NO;
467   *addr = GNUNET_malloc (connection->addrlen);
468   memcpy (*addr, connection->addr, connection->addrlen);
469   *addrlen = connection->addrlen;
470   return GNUNET_OK;
471 }
472
473
474 /**
475  * Tell the receiver callback that we had an IO error.
476  *
477  * @param connection connection to signal error
478  * @param errcode error code to send
479  */
480 static void
481 signal_receive_error (struct GNUNET_CONNECTION_Handle *connection,
482                       int errcode)
483 {
484   GNUNET_CONNECTION_Receiver receiver;
485
486   LOG (GNUNET_ERROR_TYPE_DEBUG,
487        "Receive encounters error (%s), connection closed (%p)\n",
488        STRERROR (errcode),
489        connection);
490   GNUNET_assert (NULL != (receiver = connection->receiver));
491   connection->receiver = NULL;
492   receiver (connection->receiver_cls,
493             NULL,
494             0,
495             connection->addr,
496             connection->addrlen,
497             errcode);
498 }
499
500
501 /**
502  * Tell the receiver callback that a timeout was reached.
503  *
504  * @param connection connection to signal for
505  */
506 static void
507 signal_receive_timeout (struct GNUNET_CONNECTION_Handle *connection)
508 {
509   GNUNET_CONNECTION_Receiver receiver;
510
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        "Connection signals timeout to receiver (%p)!\n",
513        connection);
514   GNUNET_assert (NULL != (receiver = connection->receiver));
515   connection->receiver = NULL;
516   receiver (connection->receiver_cls, NULL, 0, NULL, 0, 0);
517 }
518
519
520 /**
521  * We failed to transmit data to the service, signal the error.
522  *
523  * @param connection handle that had trouble
524  * @param ecode error code (errno)
525  */
526 static void
527 signal_transmit_error (struct GNUNET_CONNECTION_Handle *connection,
528                        int ecode)
529 {
530   GNUNET_CONNECTION_TransmitReadyNotify notify;
531
532   LOG (GNUNET_ERROR_TYPE_DEBUG,
533        "Transmission encounterd error (%s), connection closed (%p)\n",
534        STRERROR (ecode),
535        connection);
536   if (NULL != connection->sock)
537   {
538     (void) GNUNET_NETWORK_socket_shutdown (connection->sock,
539                                            SHUT_RDWR);
540     GNUNET_break (GNUNET_OK ==
541                   GNUNET_NETWORK_socket_close (connection->sock));
542     connection->sock = NULL;
543     GNUNET_assert (NULL == connection->write_task);
544   }
545   if (NULL != connection->read_task)
546   {
547     /* send errors trigger read errors... */
548     GNUNET_SCHEDULER_cancel (connection->read_task);
549     connection->read_task = NULL;
550     signal_receive_timeout (connection);
551     return;
552   }
553   if (NULL == connection->nth.notify_ready)
554     return;                     /* nobody to tell about it */
555   notify = connection->nth.notify_ready;
556   connection->nth.notify_ready = NULL;
557   notify (connection->nth.notify_ready_cls, 0, NULL);
558 }
559
560
561 /**
562  * We've failed for good to establish a connection (timeout or
563  * no more addresses to try).
564  *
565  * @param connection the connection we tried to establish
566  */
567 static void
568 connect_fail_continuation (struct GNUNET_CONNECTION_Handle *connection)
569 {
570   LOG (GNUNET_ERROR_TYPE_INFO,
571        "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n",
572        connection->hostname,
573     connection->port);
574   GNUNET_break (NULL == connection->ap_head);
575   GNUNET_break (NULL == connection->ap_tail);
576   GNUNET_break (GNUNET_NO == connection->dns_active);
577   GNUNET_break (NULL == connection->sock);
578   GNUNET_assert (NULL == connection->write_task);
579   GNUNET_assert (NULL == connection->proxy_handshake);
580
581   /* signal errors for jobs that used to wait on the connection */
582   connection->destroy_later = 1;
583   if (NULL != connection->receiver)
584     signal_receive_error (connection,
585                           ECONNREFUSED);
586   if (NULL != connection->nth.notify_ready)
587   {
588     GNUNET_assert (NULL != connection->nth.timeout_task);
589     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
590     connection->nth.timeout_task = NULL;
591     signal_transmit_error (connection,
592                            ECONNREFUSED);
593   }
594   if (-1 == connection->destroy_later)
595   {
596     /* do it now */
597     connection->destroy_later = 0;
598     GNUNET_CONNECTION_destroy (connection);
599     return;
600   }
601   connection->destroy_later = 0;
602 }
603
604
605 /**
606  * We are ready to transmit (or got a timeout).
607  *
608  * @param cls our connection handle
609  */
610 static void
611 transmit_ready (void *cls);
612
613
614 /**
615  * This function is called once we either timeout or have data ready
616  * to read.
617  *
618  * @param cls connection to read from
619  */
620 static void
621 receive_ready (void *cls);
622
623
624 /**
625  * We've succeeded in establishing a connection.
626  *
627  * @param connection the connection we tried to establish
628  */
629 static void
630 connect_success_continuation (struct GNUNET_CONNECTION_Handle *connection)
631 {
632   LOG (GNUNET_ERROR_TYPE_DEBUG,
633        "Connection to `%s' succeeded! (%p)\n",
634        GNUNET_a2s (connection->addr, connection->addrlen),
635        connection);
636   /* trigger jobs that waited for the connection */
637   if (NULL != connection->receiver)
638   {
639     LOG (GNUNET_ERROR_TYPE_DEBUG,
640          "Connection succeeded, starting with receiving data (%p)\n",
641          connection);
642     GNUNET_assert (NULL == connection->read_task);
643     connection->read_task =
644       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
645                                      (connection->receive_timeout), connection->sock,
646                                      &receive_ready, connection);
647   }
648   if (NULL != connection->nth.notify_ready)
649   {
650     LOG (GNUNET_ERROR_TYPE_DEBUG,
651          "Connection succeeded, starting with sending data (%p)\n",
652          connection);
653     GNUNET_assert (connection->nth.timeout_task != NULL);
654     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
655     connection->nth.timeout_task = NULL;
656     GNUNET_assert (connection->write_task == NULL);
657     connection->write_task =
658         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
659                                         (connection->nth.transmit_timeout), connection->sock,
660                                         &transmit_ready, connection);
661   }
662 }
663
664
665 /**
666  * Scheduler let us know that we're either ready to write on the
667  * socket OR connect timed out.  Do the right thing.
668  *
669  * @param cls the `struct AddressProbe *` with the address that we are probing
670  */
671 static void
672 connect_probe_continuation (void *cls)
673 {
674   struct AddressProbe *ap = cls;
675   struct GNUNET_CONNECTION_Handle *connection = ap->connection;
676   const struct GNUNET_SCHEDULER_TaskContext *tc;
677   struct AddressProbe *pos;
678   int error;
679   socklen_t len;
680
681   GNUNET_assert (NULL != ap->sock);
682   GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, ap);
683   len = sizeof (error);
684   errno = 0;
685   error = 0;
686   tc = GNUNET_SCHEDULER_get_task_context ();
687   if ((0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
688       (GNUNET_OK !=
689        GNUNET_NETWORK_socket_getsockopt (ap->sock, SOL_SOCKET, SO_ERROR, &error,
690                                          &len)) || (0 != error))
691   {
692     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock));
693     GNUNET_free (ap);
694     if ((NULL == connection->ap_head) &&
695         (GNUNET_NO == connection->dns_active) &&
696         (NULL == connection->proxy_handshake))
697       connect_fail_continuation (connection);
698     return;
699   }
700   GNUNET_assert (NULL == connection->sock);
701   connection->sock = ap->sock;
702   GNUNET_assert (NULL == connection->addr);
703   connection->addr = GNUNET_malloc (ap->addrlen);
704   memcpy (connection->addr, ap->addr, ap->addrlen);
705   connection->addrlen = ap->addrlen;
706   GNUNET_free (ap);
707   /* cancel all other attempts */
708   while (NULL != (pos = connection->ap_head))
709   {
710     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
711     GNUNET_SCHEDULER_cancel (pos->task);
712     GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, pos);
713     GNUNET_free (pos);
714   }
715   connect_success_continuation (connection);
716 }
717
718
719 /**
720  * Try to establish a connection given the specified address.
721  * This function is called by the resolver once we have a DNS reply.
722  *
723  * @param cls our `struct GNUNET_CONNECTION_Handle *`
724  * @param addr address to try, NULL for "last call"
725  * @param addrlen length of @a addr
726  */
727 static void
728 try_connect_using_address (void *cls,
729                            const struct sockaddr *addr,
730                            socklen_t addrlen)
731 {
732   struct GNUNET_CONNECTION_Handle *connection = cls;
733   struct AddressProbe *ap;
734   struct GNUNET_TIME_Relative delay;
735
736   if (NULL == addr)
737   {
738     connection->dns_active = NULL;
739     if ((NULL == connection->ap_head) &&
740         (NULL == connection->sock) &&
741         (NULL == connection->proxy_handshake))
742       connect_fail_continuation (connection);
743     return;
744   }
745   if (NULL != connection->sock)
746     return;                     /* already connected */
747   GNUNET_assert (NULL == connection->addr);
748   /* try to connect */
749   LOG (GNUNET_ERROR_TYPE_DEBUG,
750        "Trying to connect using address `%s:%u/%s:%u'\n",
751        connection->hostname,
752        connection->port,
753        GNUNET_a2s (addr, addrlen),
754        connection->port);
755   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
756   ap->addr = (const struct sockaddr *) &ap[1];
757   memcpy (&ap[1], addr, addrlen);
758   ap->addrlen = addrlen;
759   ap->connection = connection;
760
761   switch (ap->addr->sa_family)
762   {
763   case AF_INET:
764     ((struct sockaddr_in *) ap->addr)->sin_port = htons (connection->port);
765     break;
766   case AF_INET6:
767     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (connection->port);
768     break;
769   default:
770     GNUNET_break (0);
771     GNUNET_free (ap);
772     return;                     /* not supported by us */
773   }
774   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family, SOCK_STREAM, 0);
775   if (NULL == ap->sock)
776   {
777     GNUNET_free (ap);
778     return;                     /* not supported by OS */
779   }
780   LOG (GNUNET_ERROR_TYPE_INFO,
781        "Trying to connect to `%s' (%p)\n",
782        GNUNET_a2s (ap->addr, ap->addrlen),
783        connection);
784   if ((GNUNET_OK !=
785        GNUNET_NETWORK_socket_connect (ap->sock, ap->addr, ap->addrlen)) &&
786       (EINPROGRESS != errno))
787   {
788     /* maybe refused / unsupported address, try next */
789     LOG_STRERROR (GNUNET_ERROR_TYPE_INFO, "connect");
790     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock));
791     GNUNET_free (ap);
792     return;
793   }
794   GNUNET_CONTAINER_DLL_insert (connection->ap_head, connection->ap_tail, ap);
795   delay = GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT;
796   if (NULL != connection->nth.notify_ready)
797     delay =
798         GNUNET_TIME_relative_min (delay,
799                                   GNUNET_TIME_absolute_get_remaining (connection->nth.transmit_timeout));
800   if (NULL != connection->receiver)
801     delay =
802         GNUNET_TIME_relative_min (delay,
803                                   GNUNET_TIME_absolute_get_remaining
804                                   (connection->receive_timeout));
805   ap->task =
806       GNUNET_SCHEDULER_add_write_net (delay, ap->sock,
807                                       &connect_probe_continuation, ap);
808 }
809
810
811 /**
812  * Create a connection handle by (asynchronously) connecting to a host.
813  * This function returns immediately, even if the connection has not
814  * yet been established.  This function only creates TCP connections.
815  *
816  * @param cfg configuration to use
817  * @param hostname name of the host to connect to
818  * @param port port to connect to
819  * @return the connection handle
820  */
821 struct GNUNET_CONNECTION_Handle *
822 GNUNET_CONNECTION_create_from_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
823                                        const char *hostname,
824                                        uint16_t port)
825 {
826   struct GNUNET_CONNECTION_Handle *connection;
827
828   GNUNET_assert (0 < strlen (hostname));        /* sanity check */
829   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
830   connection->cfg = cfg;
831   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
832   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
833   connection->port = port;
834   connection->hostname = GNUNET_strdup (hostname);
835   connection->dns_active =
836       GNUNET_RESOLVER_ip_get (connection->hostname, AF_UNSPEC,
837                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
838                               &try_connect_using_address, connection);
839   return connection;
840 }
841
842
843 /**
844  * Create a connection handle by connecting to a UNIX domain service.
845  * This function returns immediately, even if the connection has not
846  * yet been established.  This function only creates UNIX connections.
847  *
848  * @param cfg configuration to use
849  * @param unixpath path to connect to
850  * @return the connection handle, NULL on systems without UNIX support
851  */
852 struct GNUNET_CONNECTION_Handle *
853 GNUNET_CONNECTION_create_from_connect_to_unixpath (const struct GNUNET_CONFIGURATION_Handle *cfg,
854                                                    const char *unixpath)
855 {
856 #ifdef AF_UNIX
857   struct GNUNET_CONNECTION_Handle *connection;
858   struct sockaddr_un *un;
859
860   GNUNET_assert (0 < strlen (unixpath));        /* sanity check */
861   un = GNUNET_new (struct sockaddr_un);
862   un->sun_family = AF_UNIX;
863   strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
864 #ifdef LINUX
865   {
866     int abstract;
867
868     abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg, "TESTING",
869                                                      "USE_ABSTRACT_SOCKETS");
870     if (GNUNET_YES == abstract)
871       un->sun_path[0] = '\0';
872   }
873 #endif
874 #if HAVE_SOCKADDR_IN_SIN_LEN
875   un->sun_len = (u_char) sizeof (struct sockaddr_un);
876 #endif
877   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
878   connection->cfg = cfg;
879   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
880   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
881   connection->port = 0;
882   connection->hostname = NULL;
883   connection->addr = (struct sockaddr *) un;
884   connection->addrlen = sizeof (struct sockaddr_un);
885   connection->sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_STREAM, 0);
886   if (NULL == connection->sock)
887   {
888     GNUNET_free (connection->addr);
889     GNUNET_free (connection->write_buffer);
890     GNUNET_free (connection);
891     return NULL;
892   }
893   if ( (GNUNET_OK !=
894         GNUNET_NETWORK_socket_connect (connection->sock, connection->addr, connection->addrlen)) &&
895        (EINPROGRESS != errno) )
896   {
897     /* Just return; we expect everything to work eventually so don't fail HARD */
898     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (connection->sock));
899     connection->sock = NULL;
900     return connection;
901   }
902   connect_success_continuation (connection);
903   return connection;
904 #else
905   return NULL;
906 #endif
907 }
908
909
910 /**
911  * Create a connection handle by (asynchronously) connecting to a host.
912  * This function returns immediately, even if the connection has not
913  * yet been established.  This function only creates TCP connections.
914  *
915  * @param s socket to connect
916  * @param serv_addr server address
917  * @param addrlen length of @a serv_addr
918  * @return the connection handle
919  */
920 struct GNUNET_CONNECTION_Handle *
921 GNUNET_CONNECTION_connect_socket (struct GNUNET_NETWORK_Handle *s,
922                                   const struct sockaddr *serv_addr,
923                                   socklen_t addrlen)
924 {
925   struct GNUNET_CONNECTION_Handle *connection;
926
927   if ( (GNUNET_OK !=
928         GNUNET_NETWORK_socket_connect (s, serv_addr, addrlen)) &&
929        (EINPROGRESS != errno) )
930   {
931     /* maybe refused / unsupported address, try next */
932     LOG_STRERROR (GNUNET_ERROR_TYPE_DEBUG,
933                   "connect");
934     LOG (GNUNET_ERROR_TYPE_DEBUG,
935          "Attempt to connect to `%s' failed\n",
936          GNUNET_a2s (serv_addr,
937                      addrlen));
938     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
939     return NULL;
940   }
941   connection = GNUNET_CONNECTION_create_from_existing (s);
942   connection->addr = GNUNET_malloc (addrlen);
943   memcpy (connection->addr, serv_addr, addrlen);
944   connection->addrlen = addrlen;
945   LOG (GNUNET_ERROR_TYPE_INFO,
946        "Trying to connect to `%s' (%p)\n",
947        GNUNET_a2s (serv_addr, addrlen),
948        connection);
949   return connection;
950 }
951
952
953 /**
954  * Create a connection handle by creating a socket and
955  * (asynchronously) connecting to a host.  This function returns
956  * immediately, even if the connection has not yet been established.
957  * This function only creates TCP connections.
958  *
959  * @param af_family address family to use
960  * @param serv_addr server address
961  * @param addrlen length of @a serv_addr
962  * @return the connection handle
963  */
964 struct GNUNET_CONNECTION_Handle *
965 GNUNET_CONNECTION_create_from_sockaddr (int af_family,
966                                         const struct sockaddr *serv_addr,
967                                         socklen_t addrlen)
968 {
969   struct GNUNET_NETWORK_Handle *s;
970
971   s = GNUNET_NETWORK_socket_create (af_family, SOCK_STREAM, 0);
972   if (NULL == s)
973   {
974     LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, "socket");
975     return NULL;
976   }
977   return GNUNET_CONNECTION_connect_socket (s, serv_addr, addrlen);
978 }
979
980
981 /**
982  * Check if connection is valid (no fatal errors have happened so far).
983  * Note that a connection that is still trying to connect is considered
984  * valid.
985  *
986  * @param connection connection to check
987  * @return #GNUNET_YES if valid, #GNUNET_NO otherwise
988  */
989 int
990 GNUNET_CONNECTION_check (struct GNUNET_CONNECTION_Handle *connection)
991 {
992   if ((NULL != connection->ap_head) ||
993       (NULL != connection->dns_active) ||
994       (NULL != connection->proxy_handshake))
995     return GNUNET_YES;          /* still trying to connect */
996   if ( (0 != connection->destroy_later) ||
997        (NULL == connection->sock) )
998     return GNUNET_NO;
999   return GNUNET_YES;
1000 }
1001
1002
1003 /**
1004  * Close the connection and free associated resources.  There must
1005  * not be any pending requests for reading or writing to the
1006  * connection at this time.
1007  *
1008  * @param connection connection to destroy
1009  */
1010 void
1011 GNUNET_CONNECTION_destroy (struct GNUNET_CONNECTION_Handle *connection)
1012 {
1013   struct AddressProbe *pos;
1014
1015   if (0 != connection->destroy_later)
1016   {
1017     connection->destroy_later = -1;
1018     return;
1019   }
1020   LOG (GNUNET_ERROR_TYPE_DEBUG,
1021        "Shutting down connection (%p)\n",
1022        connection);
1023   GNUNET_assert (NULL == connection->nth.notify_ready);
1024   GNUNET_assert (NULL == connection->receiver);
1025   if (NULL != connection->write_task)
1026   {
1027     GNUNET_SCHEDULER_cancel (connection->write_task);
1028     connection->write_task = NULL;
1029     connection->write_buffer_off = 0;
1030   }
1031   if (NULL != connection->read_task)
1032   {
1033     GNUNET_SCHEDULER_cancel (connection->read_task);
1034     connection->read_task = NULL;
1035   }
1036   if (NULL != connection->nth.timeout_task)
1037   {
1038     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
1039     connection->nth.timeout_task = NULL;
1040   }
1041   connection->nth.notify_ready = NULL;
1042   if (NULL != connection->dns_active)
1043   {
1044     GNUNET_RESOLVER_request_cancel (connection->dns_active);
1045     connection->dns_active = NULL;
1046   }
1047   if (NULL != connection->proxy_handshake)
1048   {
1049     /* GNUNET_CONNECTION_destroy (connection->proxy_handshake); */
1050     connection->proxy_handshake->destroy_later = -1;
1051     connection->proxy_handshake = NULL;  /* Not leaked ??? */
1052   }
1053   while (NULL != (pos = connection->ap_head))
1054   {
1055     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
1056     GNUNET_SCHEDULER_cancel (pos->task);
1057     GNUNET_CONTAINER_DLL_remove (connection->ap_head, connection->ap_tail, pos);
1058     GNUNET_free (pos);
1059   }
1060   if ( (NULL != connection->sock) &&
1061        (GNUNET_YES != connection->persist) )
1062   {
1063     if ((GNUNET_OK !=
1064          GNUNET_NETWORK_socket_shutdown (connection->sock,
1065                                          SHUT_RDWR)) &&
1066         (ENOTCONN != errno) &&
1067         (ECONNRESET != errno) )
1068       LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
1069                     "shutdown");
1070   }
1071   if (NULL != connection->sock)
1072   {
1073     if (GNUNET_YES != connection->persist)
1074     {
1075       GNUNET_break (GNUNET_OK ==
1076                     GNUNET_NETWORK_socket_close (connection->sock));
1077     }
1078     else
1079     {
1080       GNUNET_NETWORK_socket_free_memory_only_ (connection->sock); /* at least no memory leak (we deliberately
1081                                                                    * leak the socket in this special case) ... */
1082     }
1083   }
1084   GNUNET_free_non_null (connection->addr);
1085   GNUNET_free_non_null (connection->hostname);
1086   GNUNET_free (connection->write_buffer);
1087   GNUNET_free (connection);
1088 }
1089
1090
1091 /**
1092  * This function is called once we either timeout
1093  * or have data ready to read.
1094  *
1095  * @param cls connection to read from
1096  */
1097 static void
1098 receive_ready (void *cls)
1099 {
1100   struct GNUNET_CONNECTION_Handle *connection = cls;
1101   const struct GNUNET_SCHEDULER_TaskContext *tc;
1102   char buffer[connection->max];
1103   ssize_t ret;
1104   GNUNET_CONNECTION_Receiver receiver;
1105
1106   connection->read_task = NULL;
1107   tc = GNUNET_SCHEDULER_get_task_context ();
1108   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1109   {
1110     /* ignore shutdown request, go again immediately */
1111     connection->read_task =
1112         GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
1113                                        (connection->receive_timeout), connection->sock,
1114                                        &receive_ready, connection);
1115     return;
1116   }
1117   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1118   {
1119     LOG (GNUNET_ERROR_TYPE_DEBUG,
1120          "Receive from `%s' encounters error: timeout (%s, %p)\n",
1121          GNUNET_a2s (connection->addr, connection->addrlen),
1122          GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (connection->receive_timeout), GNUNET_YES),
1123          connection);
1124     signal_receive_timeout (connection);
1125     return;
1126   }
1127   if (NULL == connection->sock)
1128   {
1129     /* connect failed for good */
1130     signal_receive_error (connection, ECONNREFUSED);
1131     return;
1132   }
1133   GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->read_ready, connection->sock));
1134 RETRY:
1135   ret = GNUNET_NETWORK_socket_recv (connection->sock,
1136                                     buffer,
1137                                     connection->max);
1138   if (-1 == ret)
1139   {
1140     if (EINTR == errno)
1141       goto RETRY;
1142     signal_receive_error (connection, errno);
1143     return;
1144   }
1145   LOG (GNUNET_ERROR_TYPE_DEBUG,
1146        "receive_ready read %u/%u bytes from `%s' (%p)!\n",
1147        (unsigned int) ret,
1148        connection->max,
1149        GNUNET_a2s (connection->addr,
1150                    connection->addrlen),
1151        connection);
1152   GNUNET_assert (NULL != (receiver = connection->receiver));
1153   connection->receiver = NULL;
1154   receiver (connection->receiver_cls,
1155             buffer,
1156             ret,
1157             connection->addr,
1158             connection->addrlen,
1159             0);
1160 }
1161
1162
1163 /**
1164  * Receive data from the given connection.  Note that this function will
1165  * call @a receiver asynchronously using the scheduler.  It will
1166  * "immediately" return.  Note that there MUST only be one active
1167  * receive call per connection at any given point in time (so do not
1168  * call receive again until the receiver callback has been invoked).
1169  *
1170  * @param connection connection handle
1171  * @param max maximum number of bytes to read
1172  * @param timeout maximum amount of time to wait
1173  * @param receiver function to call with received data
1174  * @param receiver_cls closure for @a receiver
1175  */
1176 void
1177 GNUNET_CONNECTION_receive (struct GNUNET_CONNECTION_Handle *connection,
1178                            size_t max,
1179                            struct GNUNET_TIME_Relative timeout,
1180                            GNUNET_CONNECTION_Receiver receiver,
1181                            void *receiver_cls)
1182 {
1183   GNUNET_assert ((NULL == connection->read_task) &&
1184                  (NULL == connection->receiver));
1185   GNUNET_assert (NULL != receiver);
1186   connection->receiver = receiver;
1187   connection->receiver_cls = receiver_cls;
1188   connection->receive_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1189   connection->max = max;
1190   if (NULL != connection->sock)
1191   {
1192     connection->read_task =
1193       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
1194                                      (connection->receive_timeout),
1195                                      connection->sock,
1196                                      &receive_ready,
1197                                      connection);
1198     return;
1199   }
1200   if ((NULL == connection->dns_active) &&
1201       (NULL == connection->ap_head) &&
1202       (NULL == connection->proxy_handshake))
1203   {
1204     connection->receiver = NULL;
1205     receiver (receiver_cls, NULL, 0, NULL, 0, ETIMEDOUT);
1206     return;
1207   }
1208 }
1209
1210
1211 /**
1212  * Cancel receive job on the given connection.  Note that the
1213  * receiver callback must not have been called yet in order
1214  * for the cancellation to be valid.
1215  *
1216  * @param connection connection handle
1217  * @return closure of the original receiver callback closure
1218  */
1219 void *
1220 GNUNET_CONNECTION_receive_cancel (struct GNUNET_CONNECTION_Handle *connection)
1221 {
1222   if (NULL != connection->read_task)
1223   {
1224     GNUNET_assert (connection ==
1225                    GNUNET_SCHEDULER_cancel (connection->read_task));
1226     connection->read_task = NULL;
1227   }
1228   connection->receiver = NULL;
1229   return connection->receiver_cls;
1230 }
1231
1232
1233 /**
1234  * Try to call the transmit notify method (check if we do
1235  * have enough space available first)!
1236  *
1237  * @param connection connection for which we should do this processing
1238  * @return #GNUNET_YES if we were able to call notify
1239  */
1240 static int
1241 process_notify (struct GNUNET_CONNECTION_Handle *connection)
1242 {
1243   size_t used;
1244   size_t avail;
1245   size_t size;
1246   GNUNET_CONNECTION_TransmitReadyNotify notify;
1247
1248   LOG (GNUNET_ERROR_TYPE_DEBUG,
1249        "process_notify is running\n");
1250   GNUNET_assert (NULL == connection->write_task);
1251   if (NULL == (notify = connection->nth.notify_ready))
1252   {
1253     LOG (GNUNET_ERROR_TYPE_DEBUG,
1254          "No one to notify\n");
1255     return GNUNET_NO;
1256   }
1257   used = connection->write_buffer_off - connection->write_buffer_pos;
1258   avail = connection->write_buffer_size - used;
1259   size = connection->nth.notify_size;
1260   if (size > avail)
1261   {
1262     LOG (GNUNET_ERROR_TYPE_DEBUG,
1263          "Not enough buffer\n");
1264     return GNUNET_NO;
1265   }
1266   connection->nth.notify_ready = NULL;
1267   if (connection->write_buffer_size - connection->write_buffer_off < size)
1268   {
1269     /* need to compact */
1270     memmove (connection->write_buffer,
1271              &connection->write_buffer[connection->write_buffer_pos],
1272              used);
1273     connection->write_buffer_off -= connection->write_buffer_pos;
1274     connection->write_buffer_pos = 0;
1275   }
1276   avail = connection->write_buffer_size - connection->write_buffer_off;
1277   GNUNET_assert (avail >= size);
1278   size =
1279       notify (connection->nth.notify_ready_cls, avail,
1280               &connection->write_buffer[connection->write_buffer_off]);
1281   GNUNET_assert (size <= avail);
1282   if (0 != size)
1283     connection->write_buffer_off += size;
1284   return GNUNET_YES;
1285 }
1286
1287
1288 /**
1289  * Task invoked by the scheduler when a call to transmit
1290  * is timing out (we never got enough buffer space to call
1291  * the callback function before the specified timeout
1292  * expired).
1293  *
1294  * This task notifies the client about the timeout.
1295  *
1296  * @param cls the `struct GNUNET_CONNECTION_Handle`
1297  */
1298 static void
1299 transmit_timeout (void *cls)
1300 {
1301   struct GNUNET_CONNECTION_Handle *connection = cls;
1302   GNUNET_CONNECTION_TransmitReadyNotify notify;
1303
1304   connection->nth.timeout_task = NULL;
1305   LOG (GNUNET_ERROR_TYPE_DEBUG,
1306        "Transmit to `%s:%u/%s' fails, time out reached (%p).\n",
1307        connection->hostname,
1308        connection->port,
1309        GNUNET_a2s (connection->addr,
1310                    connection->addrlen),
1311        connection);
1312   notify = connection->nth.notify_ready;
1313   GNUNET_assert (NULL != notify);
1314   connection->nth.notify_ready = NULL;
1315   notify (connection->nth.notify_ready_cls, 0, NULL);
1316 }
1317
1318
1319 /**
1320  * Task invoked by the scheduler when we failed to connect
1321  * at the time of being asked to transmit.
1322  *
1323  * This task notifies the client about the error.
1324  *
1325  * @param cls the `struct GNUNET_CONNECTION_Handle`
1326  */
1327 static void
1328 connect_error (void *cls)
1329 {
1330   struct GNUNET_CONNECTION_Handle *connection = cls;
1331   GNUNET_CONNECTION_TransmitReadyNotify notify;
1332
1333   LOG (GNUNET_ERROR_TYPE_DEBUG,
1334        "Transmission request of size %u fails (%s/%u), connection failed (%p).\n",
1335        connection->nth.notify_size,
1336        connection->hostname,
1337        connection->port,
1338        connection);
1339   connection->write_task = NULL;
1340   notify = connection->nth.notify_ready;
1341   connection->nth.notify_ready = NULL;
1342   notify (connection->nth.notify_ready_cls, 0, NULL);
1343 }
1344
1345
1346 /**
1347  * We are ready to transmit (or got a timeout).
1348  *
1349  * @param cls our connection handle
1350  */
1351 static void
1352 transmit_ready (void *cls)
1353 {
1354   struct GNUNET_CONNECTION_Handle *connection = cls;
1355   GNUNET_CONNECTION_TransmitReadyNotify notify;
1356   const struct GNUNET_SCHEDULER_TaskContext *tc;
1357   ssize_t ret;
1358   size_t have;
1359
1360   LOG (GNUNET_ERROR_TYPE_DEBUG,
1361        "transmit_ready running (%p).\n",
1362        connection);
1363   GNUNET_assert (NULL != connection->write_task);
1364   connection->write_task = NULL;
1365   GNUNET_assert (NULL == connection->nth.timeout_task);
1366   tc = GNUNET_SCHEDULER_get_task_context ();
1367   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1368   {
1369     if (NULL != connection->sock)
1370       goto SCHEDULE_WRITE;      /* ignore shutdown, go again immediately */
1371     LOG (GNUNET_ERROR_TYPE_DEBUG,
1372          "Transmit to `%s' fails, shutdown happened (%p).\n",
1373          GNUNET_a2s (connection->addr, connection->addrlen), connection);
1374     notify = connection->nth.notify_ready;
1375     if (NULL != notify)
1376     {
1377       connection->nth.notify_ready = NULL;
1378       notify (connection->nth.notify_ready_cls, 0, NULL);
1379     }
1380     return;
1381   }
1382   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1383   {
1384     LOG (GNUNET_ERROR_TYPE_DEBUG,
1385          "Transmit to `%s' fails, time out reached (%p).\n",
1386          GNUNET_a2s (connection->addr,
1387                      connection->addrlen),
1388          connection);
1389     notify = connection->nth.notify_ready;
1390     GNUNET_assert (NULL != notify);
1391     connection->nth.notify_ready = NULL;
1392     notify (connection->nth.notify_ready_cls, 0, NULL);
1393     return;
1394   }
1395   GNUNET_assert (NULL != connection->sock);
1396   if (NULL == tc->write_ready)
1397   {
1398     /* special circumstances (in particular, PREREQ_DONE after
1399      * connect): not yet ready to write, but no "fatal" error either.
1400      * Hence retry.  */
1401     goto SCHEDULE_WRITE;
1402   }
1403   if (!GNUNET_NETWORK_fdset_isset (tc->write_ready, connection->sock))
1404   {
1405     GNUNET_assert (NULL == connection->write_task);
1406     /* special circumstances (in particular, shutdown): not yet ready
1407      * to write, but no "fatal" error either.  Hence retry.  */
1408     goto SCHEDULE_WRITE;
1409   }
1410   GNUNET_assert (connection->write_buffer_off >= connection->write_buffer_pos);
1411   if ((NULL != connection->nth.notify_ready) &&
1412       (connection->write_buffer_size < connection->nth.notify_size))
1413   {
1414     connection->write_buffer =
1415         GNUNET_realloc (connection->write_buffer, connection->nth.notify_size);
1416     connection->write_buffer_size = connection->nth.notify_size;
1417   }
1418   process_notify (connection);
1419   have = connection->write_buffer_off - connection->write_buffer_pos;
1420   if (0 == have)
1421   {
1422     /* no data ready for writing, terminate write loop */
1423     return;
1424   }
1425   GNUNET_assert (have <= connection->write_buffer_size);
1426   GNUNET_assert (have + connection->write_buffer_pos <= connection->write_buffer_size);
1427   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1428 RETRY:
1429   ret =
1430       GNUNET_NETWORK_socket_send (connection->sock,
1431                                   &connection->write_buffer[connection->write_buffer_pos],
1432                                   have);
1433   if (-1 == ret)
1434   {
1435     if (EINTR == errno)
1436       goto RETRY;
1437     if (NULL != connection->write_task)
1438     {
1439       GNUNET_SCHEDULER_cancel (connection->write_task);
1440       connection->write_task = NULL;
1441     }
1442     signal_transmit_error (connection, errno);
1443     return;
1444   }
1445   LOG (GNUNET_ERROR_TYPE_DEBUG,
1446        "Connection transmitted %u/%u bytes to `%s' (%p)\n",
1447        (unsigned int) ret, have, GNUNET_a2s (connection->addr, connection->addrlen), connection);
1448   connection->write_buffer_pos += ret;
1449   if (connection->write_buffer_pos == connection->write_buffer_off)
1450   {
1451     /* transmitted all pending data */
1452     connection->write_buffer_pos = 0;
1453     connection->write_buffer_off = 0;
1454   }
1455   if ((0 == connection->write_buffer_off) && (NULL == connection->nth.notify_ready))
1456     return;                     /* all data sent! */
1457   /* not done writing, schedule more */
1458 SCHEDULE_WRITE:
1459   LOG (GNUNET_ERROR_TYPE_DEBUG,
1460        "Re-scheduling transmit_ready (more to do) (%p).\n", connection);
1461   have = connection->write_buffer_off - connection->write_buffer_pos;
1462   GNUNET_assert ((NULL != connection->nth.notify_ready) || (have > 0));
1463   if (NULL == connection->write_task)
1464     connection->write_task =
1465         GNUNET_SCHEDULER_add_write_net ((connection->nth.notify_ready ==
1466                                          NULL) ? GNUNET_TIME_UNIT_FOREVER_REL :
1467                                         GNUNET_TIME_absolute_get_remaining
1468                                         (connection->nth.transmit_timeout),
1469                                         connection->sock, &transmit_ready, connection);
1470 }
1471
1472
1473 /**
1474  * Ask the connection to call us once the specified number of bytes
1475  * are free in the transmission buffer.  Will never call the @a notify
1476  * callback in this task, but always first go into the scheduler.
1477  *
1478  * @param connection connection
1479  * @param size number of bytes to send
1480  * @param timeout after how long should we give up (and call
1481  *        @a notify with buf NULL and size 0)?
1482  * @param notify function to call
1483  * @param notify_cls closure for @a notify
1484  * @return non-NULL if the notify callback was queued,
1485  *         NULL if we are already going to notify someone else (busy)
1486  */
1487 struct GNUNET_CONNECTION_TransmitHandle *
1488 GNUNET_CONNECTION_notify_transmit_ready (struct GNUNET_CONNECTION_Handle *connection,
1489                                          size_t size,
1490                                          struct GNUNET_TIME_Relative timeout,
1491                                          GNUNET_CONNECTION_TransmitReadyNotify
1492                                          notify, void *notify_cls)
1493 {
1494   if (NULL != connection->nth.notify_ready)
1495   {
1496     GNUNET_assert (0);
1497     return NULL;
1498   }
1499   GNUNET_assert (NULL != notify);
1500   GNUNET_assert (size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1501   GNUNET_assert (connection->write_buffer_off <= connection->write_buffer_size);
1502   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1503   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_off);
1504   connection->nth.notify_ready = notify;
1505   connection->nth.notify_ready_cls = notify_cls;
1506   connection->nth.connection = connection;
1507   connection->nth.notify_size = size;
1508   connection->nth.transmit_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1509   GNUNET_assert (NULL == connection->nth.timeout_task);
1510   if ((NULL == connection->sock) &&
1511       (NULL == connection->ap_head) &&
1512       (NULL == connection->dns_active) &&
1513       (NULL == connection->proxy_handshake))
1514   {
1515     if (NULL != connection->write_task)
1516       GNUNET_SCHEDULER_cancel (connection->write_task);
1517     connection->write_task = GNUNET_SCHEDULER_add_now (&connect_error,
1518                                                        connection);
1519     return &connection->nth;
1520   }
1521   if (NULL != connection->write_task)
1522     return &connection->nth; /* previous transmission still in progress */
1523   if (NULL != connection->sock)
1524   {
1525     /* connected, try to transmit now */
1526     LOG (GNUNET_ERROR_TYPE_DEBUG,
1527          "Scheduling transmission (%p).\n",
1528          connection);
1529     connection->write_task =
1530         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
1531                                         (connection->nth.transmit_timeout),
1532                                         connection->sock, &transmit_ready, connection);
1533     return &connection->nth;
1534   }
1535   /* not yet connected, wait for connection */
1536   LOG (GNUNET_ERROR_TYPE_DEBUG,
1537        "Need to wait to schedule transmission for connection, adding timeout task (%p).\n",
1538        connection);
1539   connection->nth.timeout_task =
1540     GNUNET_SCHEDULER_add_delayed (timeout,
1541                                   &transmit_timeout, connection);
1542   return &connection->nth;
1543 }
1544
1545
1546 /**
1547  * Cancel the specified transmission-ready notification.
1548  *
1549  * @param th notification to cancel
1550  */
1551 void
1552 GNUNET_CONNECTION_notify_transmit_ready_cancel (struct GNUNET_CONNECTION_TransmitHandle *th)
1553 {
1554   GNUNET_assert (NULL != th->notify_ready);
1555   th->notify_ready = NULL;
1556   if (NULL != th->timeout_task)
1557   {
1558     GNUNET_SCHEDULER_cancel (th->timeout_task);
1559     th->timeout_task = NULL;
1560   }
1561   if (NULL != th->connection->write_task)
1562   {
1563     GNUNET_SCHEDULER_cancel (th->connection->write_task);
1564     th->connection->write_task = NULL;
1565   }
1566 }
1567
1568
1569 /**
1570  * Create a connection to be proxied using a given connection.
1571  *
1572  * @param cph connection to proxy server
1573  * @return connection to be proxied
1574  */
1575 struct GNUNET_CONNECTION_Handle *
1576 GNUNET_CONNECTION_create_proxied_from_handshake (struct GNUNET_CONNECTION_Handle *cph)
1577 {
1578   struct GNUNET_CONNECTION_Handle * proxied = GNUNET_CONNECTION_create_from_existing(NULL);
1579   proxied->proxy_handshake = cph;
1580   return proxied;
1581 }
1582
1583
1584 /**
1585  * Activate proxied connection and destroy initial proxy handshake connection.
1586  * There must not be any pending requests for reading or writing to the
1587  * proxy hadshake connection at this time.
1588  *
1589  * @param proxied connection connection to proxy server
1590  */
1591 void
1592 GNUNET_CONNECTION_acivate_proxied (struct GNUNET_CONNECTION_Handle *proxied)
1593 {
1594   struct GNUNET_CONNECTION_Handle *cph = proxied->proxy_handshake;
1595   GNUNET_assert (NULL != cph);
1596   GNUNET_assert (NULL == proxied->sock);
1597   GNUNET_assert (NULL != cph->sock);
1598   proxied->sock=cph->sock;
1599   cph->sock=NULL;
1600   GNUNET_CONNECTION_destroy (cph);
1601   connect_success_continuation (proxied);
1602 }
1603
1604
1605 /* end of connection.c */