Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / util / connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/connection.c
23  * @brief  TCP connection management
24  * @author Christian Grothoff
25  *
26  * This code is rather complex.  Only modify it if you
27  * 1) Have a NEW testcase showing that the new code
28  *    is needed and correct
29  * 2) All EXISTING testcases pass with the new code
30  * These rules should apply in general, but for this
31  * module they are VERY, VERY important.
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "gnunet_resolver_service.h"
36
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "util-connection", __VA_ARGS__)
39
40 #define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util-connection", syscall)
41
42
43 /**
44  * Transmission handle.  There can only be one for each connection.
45  */
46 struct GNUNET_CONNECTION_TransmitHandle
47 {
48
49   /**
50    * Function to call if the send buffer has notify_size
51    * bytes available.
52    */
53   GNUNET_CONNECTION_TransmitReadyNotify notify_ready;
54
55   /**
56    * Closure for notify_ready.
57    */
58   void *notify_ready_cls;
59
60   /**
61    * Our connection handle.
62    */
63   struct GNUNET_CONNECTION_Handle *connection;
64
65   /**
66    * Timeout for receiving (in absolute time).
67    */
68   struct GNUNET_TIME_Absolute transmit_timeout;
69
70   /**
71    * Task called on timeout.
72    */
73   struct GNUNET_SCHEDULER_Task * timeout_task;
74
75   /**
76    * At what number of bytes available in the
77    * write buffer should the notify method be called?
78    */
79   size_t notify_size;
80
81 };
82
83
84 /**
85  * During connect, we try multiple possible IP addresses
86  * to find out which one might work.
87  */
88 struct AddressProbe
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct AddressProbe *next;
95
96   /**
97    * This is a doubly-linked list.
98    */
99   struct AddressProbe *prev;
100
101   /**
102    * The address; do not free (allocated at the end of this struct).
103    */
104   const struct sockaddr *addr;
105
106   /**
107    * Underlying OS's socket.
108    */
109   struct GNUNET_NETWORK_Handle *sock;
110
111   /**
112    * Connection for which we are probing.
113    */
114   struct GNUNET_CONNECTION_Handle *connection;
115
116   /**
117    * Lenth of addr.
118    */
119   socklen_t addrlen;
120
121   /**
122    * Task waiting for the connection to finish connecting.
123    */
124   struct GNUNET_SCHEDULER_Task * task;
125 };
126
127
128 /**
129  * @brief handle for a network connection
130  */
131 struct GNUNET_CONNECTION_Handle
132 {
133
134   /**
135    * Configuration to use.
136    */
137   const struct GNUNET_CONFIGURATION_Handle *cfg;
138
139   /**
140    * Linked list of sockets we are currently trying out
141    * (during connect).
142    */
143   struct AddressProbe *ap_head;
144
145   /**
146    * Linked list of sockets we are currently trying out
147    * (during connect).
148    */
149   struct AddressProbe *ap_tail;
150
151   /**
152    * Network address of the other end-point, may be NULL.
153    */
154   struct sockaddr *addr;
155
156   /**
157    * Pointer to the hostname if connection was
158    * created using DNS lookup, otherwise NULL.
159    */
160   char *hostname;
161
162   /**
163    * Underlying OS's socket, set to NULL after fatal errors.
164    */
165   struct GNUNET_NETWORK_Handle *sock;
166
167   /**
168    * Function to call on data received, NULL if no receive is pending.
169    */
170   GNUNET_CONNECTION_Receiver receiver;
171
172   /**
173    * Closure for @e receiver.
174    */
175   void *receiver_cls;
176
177   /**
178    * Pointer to our write buffer.
179    */
180   char *write_buffer;
181
182   /**
183    * Current size of our @e write_buffer.
184    */
185   size_t write_buffer_size;
186
187   /**
188    * Current write-offset in @e write_buffer (where
189    * would we write next).
190    */
191   size_t write_buffer_off;
192
193   /**
194    * Current read-offset in @e write_buffer (how many
195    * bytes have already been sent).
196    */
197   size_t write_buffer_pos;
198
199   /**
200    * Length of @e addr.
201    */
202   socklen_t addrlen;
203
204   /**
205    * Read task that we may need to wait for.
206    */
207   struct GNUNET_SCHEDULER_Task *read_task;
208
209   /**
210    * Write task that we may need to wait for.
211    */
212   struct GNUNET_SCHEDULER_Task *write_task;
213
214   /**
215    * Handle to a pending DNS lookup request.
216    */
217   struct GNUNET_RESOLVER_RequestHandle *dns_active;
218
219   /**
220    * The handle we return for #GNUNET_CONNECTION_notify_transmit_ready().
221    */
222   struct GNUNET_CONNECTION_TransmitHandle nth;
223
224   /**
225    * Timeout for receiving (in absolute time).
226    */
227   struct GNUNET_TIME_Absolute receive_timeout;
228
229   /**
230    * Maximum number of bytes to read (for receiving).
231    */
232   size_t max;
233
234   /**
235    * Port to connect to.
236    */
237   uint16_t port;
238
239   /**
240    * When shutdown, do not ever actually close the socket, but
241    * free resources.  Only should ever be set if using program
242    * termination as a signal (because only then will the leaked
243    * socket be freed!)
244    */
245   int8_t persist;
246
247   /**
248    * Usually 0.  Set to 1 if this handle is in use, and should
249    * #GNUNET_CONNECTION_destroy() be called right now, the action needs
250    * to be deferred by setting it to -1.
251    */
252   int8_t destroy_later;
253
254   /**
255    * Handle to subsequent connection after proxy handshake completes,
256    */
257   struct GNUNET_CONNECTION_Handle *proxy_handshake;
258
259 };
260
261
262 /**
263  * Set the persist option on this connection handle.  Indicates
264  * that the underlying socket or fd should never really be closed.
265  * Used for indicating process death.
266  *
267  * @param connection the connection to set persistent
268  */
269 void
270 GNUNET_CONNECTION_persist_ (struct GNUNET_CONNECTION_Handle *connection)
271 {
272   connection->persist = GNUNET_YES;
273 }
274
275
276 /**
277  * Disable the "CORK" feature for communication with the given connection,
278  * forcing the OS to immediately flush the buffer on transmission
279  * instead of potentially buffering multiple messages.  Essentially
280  * reduces the OS send buffers to zero.
281  * Used to make sure that the last messages sent through the connection
282  * reach the other side before the process is terminated.
283  *
284  * @param connection the connection to make flushing and blocking
285  * @return #GNUNET_OK on success
286  */
287 int
288 GNUNET_CONNECTION_disable_corking (struct GNUNET_CONNECTION_Handle *connection)
289 {
290   return GNUNET_NETWORK_socket_disable_corking (connection->sock);
291 }
292
293
294 /**
295  * Create a connection handle by boxing an existing OS socket.  The OS
296  * socket should henceforth be no longer used directly.
297  * #GNUNET_connection_destroy() will close it.
298  *
299  * @param osSocket existing socket to box
300  * @return the boxed connection handle
301  */
302 struct GNUNET_CONNECTION_Handle *
303 GNUNET_CONNECTION_create_from_existing (struct GNUNET_NETWORK_Handle *osSocket)
304 {
305   struct GNUNET_CONNECTION_Handle *connection;
306
307   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
308   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
309   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
310   connection->sock = osSocket;
311   return connection;
312 }
313
314
315 /**
316  * Create a connection handle by accepting on a listen socket.  This
317  * function may block if the listen socket has no connection ready.
318  *
319  * @param access_cb function to use to check if access is allowed
320  * @param access_cb_cls closure for @a access_cb
321  * @param lsock listen socket
322  * @return the connection handle, NULL on error
323  */
324 struct GNUNET_CONNECTION_Handle *
325 GNUNET_CONNECTION_create_from_accept (GNUNET_CONNECTION_AccessCheck access_cb,
326                                       void *access_cb_cls,
327                                       struct GNUNET_NETWORK_Handle *lsock)
328 {
329   struct GNUNET_CONNECTION_Handle *connection;
330   char addr[128];
331   socklen_t addrlen;
332   struct GNUNET_NETWORK_Handle *sock;
333   int aret;
334   struct sockaddr_in *v4;
335   struct sockaddr_in6 *v6;
336   struct sockaddr *sa;
337   void *uaddr;
338 #ifdef SO_PEERCRED
339   struct ucred uc;
340   socklen_t olen;
341 #endif
342   struct GNUNET_CONNECTION_Credentials *gcp;
343 #if HAVE_GETPEEREID || defined(SO_PEERCRED) || HAVE_GETPEERUCRED
344   struct GNUNET_CONNECTION_Credentials gc;
345
346   gc.uid = 0;
347   gc.gid = 0;
348 #endif
349
350   addrlen = sizeof (addr);
351   sock =
352       GNUNET_NETWORK_socket_accept (lsock,
353                                     (struct sockaddr *) &addr,
354                                     &addrlen);
355   if (NULL == sock)
356   {
357     if (EAGAIN != errno)
358       LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, "accept");
359     return NULL;
360   }
361   if ((addrlen > sizeof (addr)) || (addrlen < sizeof (sa_family_t)))
362   {
363     GNUNET_break (0);
364     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
365     return NULL;
366   }
367
368   sa = (struct sockaddr *) addr;
369   v6 = (struct sockaddr_in6 *) addr;
370   if ( (AF_INET6 == sa->sa_family) &&
371        (IN6_IS_ADDR_V4MAPPED (&v6->sin6_addr)) )
372   {
373     /* convert to V4 address */
374     v4 = GNUNET_new (struct sockaddr_in);
375     memset (v4, 0, sizeof (struct sockaddr_in));
376     v4->sin_family = AF_INET;
377 #if HAVE_SOCKADDR_IN_SIN_LEN
378     v4->sin_len = (u_char) sizeof (struct sockaddr_in);
379 #endif
380     GNUNET_memcpy (&v4->sin_addr,
381             &((char *) &v6->sin6_addr)[sizeof (struct in6_addr) -
382                                        sizeof (struct in_addr)],
383             sizeof (struct in_addr));
384     v4->sin_port = v6->sin6_port;
385     uaddr = v4;
386     addrlen = sizeof (struct sockaddr_in);
387   }
388   else
389   {
390     uaddr = GNUNET_malloc (addrlen);
391     GNUNET_memcpy (uaddr, addr, addrlen);
392   }
393   gcp = NULL;
394   if (AF_UNIX == sa->sa_family)
395   {
396 #if HAVE_GETPEEREID
397     /* most BSDs */
398     if (0 == getpeereid (GNUNET_NETWORK_get_fd (sock),
399                          &gc.uid,
400                          &gc.gid))
401       gcp = &gc;
402 #else
403 #ifdef SO_PEERCRED
404     /* largely traditional GNU/Linux */
405     olen = sizeof (uc);
406     if ( (0 ==
407           getsockopt (GNUNET_NETWORK_get_fd (sock),
408                       SOL_SOCKET,
409                       SO_PEERCRED,
410                       &uc,
411                       &olen)) &&
412          (olen == sizeof (uc)) )
413     {
414       gc.uid = uc.uid;
415       gc.gid = uc.gid;
416       gcp = &gc;
417     }
418 #else
419 #if HAVE_GETPEERUCRED
420     /* this is for Solaris 10 */
421     ucred_t *uc;
422
423     uc = NULL;
424     if (0 == getpeerucred (GNUNET_NETWORK_get_fd (sock), &uc))
425     {
426       gc.uid = ucred_geteuid (uc);
427       gc.gid = ucred_getegid (uc);
428       gcp = &gc;
429     }
430     ucred_free (uc);
431 #endif
432 #endif
433 #endif
434   }
435
436   if ( (NULL != access_cb) &&
437        (GNUNET_YES != (aret = access_cb (access_cb_cls,
438                                          gcp,
439                                          uaddr,
440                                          addrlen))) )
441   {
442     if (GNUNET_NO == aret)
443       LOG (GNUNET_ERROR_TYPE_INFO,
444            _("Access denied to `%s'\n"),
445            GNUNET_a2s (uaddr,
446                        addrlen));
447     GNUNET_break (GNUNET_OK ==
448                   GNUNET_NETWORK_socket_shutdown (sock,
449                                                   SHUT_RDWR));
450     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
451     GNUNET_free (uaddr);
452     return NULL;
453   }
454   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
455   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
456   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
457   connection->addr = uaddr;
458   connection->addrlen = addrlen;
459   connection->sock = sock;
460   LOG (GNUNET_ERROR_TYPE_INFO,
461        _("Accepting connection from `%s': %p\n"),
462        GNUNET_a2s (uaddr,
463                    addrlen),
464        connection);
465   return connection;
466 }
467
468
469 /**
470  * Obtain the network address of the other party.
471  *
472  * @param connection the client to get the address for
473  * @param addr where to store the address
474  * @param addrlen where to store the length of the @a addr
475  * @return #GNUNET_OK on success
476  */
477 int
478 GNUNET_CONNECTION_get_address (struct GNUNET_CONNECTION_Handle *connection,
479                                void **addr,
480                                size_t *addrlen)
481 {
482   if ((NULL == connection->addr) || (0 == connection->addrlen))
483     return GNUNET_NO;
484   *addr = GNUNET_malloc (connection->addrlen);
485   GNUNET_memcpy (*addr, connection->addr, connection->addrlen);
486   *addrlen = connection->addrlen;
487   return GNUNET_OK;
488 }
489
490
491 /**
492  * Tell the receiver callback that we had an IO error.
493  *
494  * @param connection connection to signal error
495  * @param errcode error code to send
496  */
497 static void
498 signal_receive_error (struct GNUNET_CONNECTION_Handle *connection,
499                       int errcode)
500 {
501   GNUNET_CONNECTION_Receiver receiver;
502
503   LOG (GNUNET_ERROR_TYPE_DEBUG,
504        "Receive encounters error (%s), connection closed (%p)\n",
505        STRERROR (errcode),
506        connection);
507   GNUNET_assert (NULL != (receiver = connection->receiver));
508   connection->receiver = NULL;
509   receiver (connection->receiver_cls,
510             NULL,
511             0,
512             connection->addr,
513             connection->addrlen,
514             errcode);
515 }
516
517
518 /**
519  * Tell the receiver callback that a timeout was reached.
520  *
521  * @param connection connection to signal for
522  */
523 static void
524 signal_receive_timeout (struct GNUNET_CONNECTION_Handle *connection)
525 {
526   GNUNET_CONNECTION_Receiver receiver;
527
528   LOG (GNUNET_ERROR_TYPE_DEBUG,
529        "Connection signals timeout to receiver (%p)!\n",
530        connection);
531   GNUNET_assert (NULL != (receiver = connection->receiver));
532   connection->receiver = NULL;
533   receiver (connection->receiver_cls, NULL, 0, NULL, 0, 0);
534 }
535
536
537 /**
538  * We failed to transmit data to the service, signal the error.
539  *
540  * @param connection handle that had trouble
541  * @param ecode error code (errno)
542  */
543 static void
544 signal_transmit_error (struct GNUNET_CONNECTION_Handle *connection,
545                        int ecode)
546 {
547   GNUNET_CONNECTION_TransmitReadyNotify notify;
548
549   LOG (GNUNET_ERROR_TYPE_DEBUG,
550        "Transmission encounterd error (%s), connection closed (%p)\n",
551        STRERROR (ecode),
552        connection);
553   if (NULL != connection->sock)
554   {
555     (void) GNUNET_NETWORK_socket_shutdown (connection->sock,
556                                            SHUT_RDWR);
557     GNUNET_break (GNUNET_OK ==
558                   GNUNET_NETWORK_socket_close (connection->sock));
559     connection->sock = NULL;
560     GNUNET_assert (NULL == connection->write_task);
561   }
562   if (NULL != connection->read_task)
563   {
564     /* send errors trigger read errors... */
565     GNUNET_SCHEDULER_cancel (connection->read_task);
566     connection->read_task = NULL;
567     signal_receive_timeout (connection);
568     return;
569   }
570   if (NULL == connection->nth.notify_ready)
571     return;                     /* nobody to tell about it */
572   notify = connection->nth.notify_ready;
573   connection->nth.notify_ready = NULL;
574   notify (connection->nth.notify_ready_cls,
575           0,
576           NULL);
577 }
578
579
580 /**
581  * We've failed for good to establish a connection (timeout or
582  * no more addresses to try).
583  *
584  * @param connection the connection we tried to establish
585  */
586 static void
587 connect_fail_continuation (struct GNUNET_CONNECTION_Handle *connection)
588 {
589   LOG (GNUNET_ERROR_TYPE_INFO,
590        "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n",
591        connection->hostname,
592     connection->port);
593   GNUNET_break (NULL == connection->ap_head);
594   GNUNET_break (NULL == connection->ap_tail);
595   GNUNET_break (GNUNET_NO == connection->dns_active);
596   GNUNET_break (NULL == connection->sock);
597   GNUNET_assert (NULL == connection->write_task);
598   GNUNET_assert (NULL == connection->proxy_handshake);
599
600   /* signal errors for jobs that used to wait on the connection */
601   connection->destroy_later = 1;
602   if (NULL != connection->receiver)
603     signal_receive_error (connection,
604                           ECONNREFUSED);
605   if (NULL != connection->nth.notify_ready)
606   {
607     GNUNET_assert (NULL != connection->nth.timeout_task);
608     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
609     connection->nth.timeout_task = NULL;
610     signal_transmit_error (connection,
611                            ECONNREFUSED);
612   }
613   if (-1 == connection->destroy_later)
614   {
615     /* do it now */
616     connection->destroy_later = 0;
617     GNUNET_CONNECTION_destroy (connection);
618     return;
619   }
620   connection->destroy_later = 0;
621 }
622
623
624 /**
625  * We are ready to transmit (or got a timeout).
626  *
627  * @param cls our connection handle
628  */
629 static void
630 transmit_ready (void *cls);
631
632
633 /**
634  * This function is called once we either timeout or have data ready
635  * to read.
636  *
637  * @param cls connection to read from
638  */
639 static void
640 receive_ready (void *cls);
641
642
643 /**
644  * We've succeeded in establishing a connection.
645  *
646  * @param connection the connection we tried to establish
647  */
648 static void
649 connect_success_continuation (struct GNUNET_CONNECTION_Handle *connection)
650 {
651   LOG (GNUNET_ERROR_TYPE_DEBUG,
652        "Connection to `%s' succeeded! (%p)\n",
653        GNUNET_a2s (connection->addr,
654                    connection->addrlen),
655        connection);
656   /* trigger jobs that waited for the connection */
657   if (NULL != connection->receiver)
658   {
659     LOG (GNUNET_ERROR_TYPE_DEBUG,
660          "Connection succeeded, starting with receiving data (%p)\n",
661          connection);
662     GNUNET_assert (NULL == connection->read_task);
663     connection->read_task =
664       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
665                                      (connection->receive_timeout),
666                                      connection->sock,
667                                      &receive_ready, connection);
668   }
669   if (NULL != connection->nth.notify_ready)
670   {
671     LOG (GNUNET_ERROR_TYPE_DEBUG,
672          "Connection succeeded, starting with sending data (%p)\n",
673          connection);
674     GNUNET_assert (connection->nth.timeout_task != NULL);
675     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
676     connection->nth.timeout_task = NULL;
677     GNUNET_assert (connection->write_task == NULL);
678     connection->write_task =
679         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
680                                         (connection->nth.transmit_timeout), connection->sock,
681                                         &transmit_ready, connection);
682   }
683 }
684
685
686 /**
687  * Scheduler let us know that we're either ready to write on the
688  * socket OR connect timed out.  Do the right thing.
689  *
690  * @param cls the `struct AddressProbe *` with the address that we are probing
691  */
692 static void
693 connect_probe_continuation (void *cls)
694 {
695   struct AddressProbe *ap = cls;
696   struct GNUNET_CONNECTION_Handle *connection = ap->connection;
697   const struct GNUNET_SCHEDULER_TaskContext *tc;
698   struct AddressProbe *pos;
699   int error;
700   socklen_t len;
701
702   GNUNET_assert (NULL != ap->sock);
703   GNUNET_CONTAINER_DLL_remove (connection->ap_head,
704                                connection->ap_tail,
705                                ap);
706   len = sizeof (error);
707   errno = 0;
708   error = 0;
709   tc = GNUNET_SCHEDULER_get_task_context ();
710   if ( (0 == (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) ||
711        (GNUNET_OK !=
712         GNUNET_NETWORK_socket_getsockopt (ap->sock,
713                                           SOL_SOCKET,
714                                           SO_ERROR,
715                                           &error,
716                                           &len)) ||
717        (0 != error) )
718   {
719     GNUNET_break (GNUNET_OK ==
720                   GNUNET_NETWORK_socket_close (ap->sock));
721     GNUNET_free (ap);
722     if ( (NULL == connection->ap_head) &&
723          (GNUNET_NO == connection->dns_active) &&
724          (NULL == connection->proxy_handshake) )
725       connect_fail_continuation (connection);
726     return;
727   }
728   GNUNET_assert (NULL == connection->sock);
729   connection->sock = ap->sock;
730   GNUNET_assert (NULL == connection->addr);
731   connection->addr = GNUNET_malloc (ap->addrlen);
732   GNUNET_memcpy (connection->addr, ap->addr, ap->addrlen);
733   connection->addrlen = ap->addrlen;
734   GNUNET_free (ap);
735   /* cancel all other attempts */
736   while (NULL != (pos = connection->ap_head))
737   {
738     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
739     GNUNET_SCHEDULER_cancel (pos->task);
740     GNUNET_CONTAINER_DLL_remove (connection->ap_head,
741                                  connection->ap_tail,
742                                  pos);
743     GNUNET_free (pos);
744   }
745   connect_success_continuation (connection);
746 }
747
748
749 /**
750  * Try to establish a connection given the specified address.
751  * This function is called by the resolver once we have a DNS reply.
752  *
753  * @param cls our `struct GNUNET_CONNECTION_Handle *`
754  * @param addr address to try, NULL for "last call"
755  * @param addrlen length of @a addr
756  */
757 static void
758 try_connect_using_address (void *cls,
759                            const struct sockaddr *addr,
760                            socklen_t addrlen)
761 {
762   struct GNUNET_CONNECTION_Handle *connection = cls;
763   struct AddressProbe *ap;
764   struct GNUNET_TIME_Relative delay;
765
766   if (NULL == addr)
767   {
768     connection->dns_active = NULL;
769     if ((NULL == connection->ap_head) &&
770         (NULL == connection->sock) &&
771         (NULL == connection->proxy_handshake))
772       connect_fail_continuation (connection);
773     return;
774   }
775   if (NULL != connection->sock)
776     return;                     /* already connected */
777   GNUNET_assert (NULL == connection->addr);
778   /* try to connect */
779   LOG (GNUNET_ERROR_TYPE_DEBUG,
780        "Trying to connect using address `%s:%u/%s:%u'\n",
781        connection->hostname,
782        connection->port,
783        GNUNET_a2s (addr, addrlen),
784        connection->port);
785   ap = GNUNET_malloc (sizeof (struct AddressProbe) + addrlen);
786   ap->addr = (const struct sockaddr *) &ap[1];
787   GNUNET_memcpy (&ap[1], addr, addrlen);
788   ap->addrlen = addrlen;
789   ap->connection = connection;
790
791   switch (ap->addr->sa_family)
792   {
793   case AF_INET:
794     ((struct sockaddr_in *) ap->addr)->sin_port = htons (connection->port);
795     break;
796   case AF_INET6:
797     ((struct sockaddr_in6 *) ap->addr)->sin6_port = htons (connection->port);
798     break;
799   default:
800     GNUNET_break (0);
801     GNUNET_free (ap);
802     return;                     /* not supported by us */
803   }
804   ap->sock = GNUNET_NETWORK_socket_create (ap->addr->sa_family,
805                                            SOCK_STREAM, 0);
806   if (NULL == ap->sock)
807   {
808     GNUNET_free (ap);
809     return;                     /* not supported by OS */
810   }
811   LOG (GNUNET_ERROR_TYPE_INFO,
812        "Trying to connect to `%s' (%p)\n",
813        GNUNET_a2s (ap->addr, ap->addrlen),
814        connection);
815   if ((GNUNET_OK !=
816        GNUNET_NETWORK_socket_connect (ap->sock,
817                                       ap->addr,
818                                       ap->addrlen)) &&
819       (EINPROGRESS != errno))
820   {
821     /* maybe refused / unsupported address, try next */
822     LOG_STRERROR (GNUNET_ERROR_TYPE_INFO, "connect");
823     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (ap->sock));
824     GNUNET_free (ap);
825     return;
826   }
827   GNUNET_CONTAINER_DLL_insert (connection->ap_head, connection->ap_tail, ap);
828   delay = GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT;
829   if (NULL != connection->nth.notify_ready)
830     delay = GNUNET_TIME_relative_min (delay,
831                                       GNUNET_TIME_absolute_get_remaining (connection->nth.transmit_timeout));
832   if (NULL != connection->receiver)
833     delay = GNUNET_TIME_relative_min (delay,
834                                       GNUNET_TIME_absolute_get_remaining (connection->receive_timeout));
835   ap->task = GNUNET_SCHEDULER_add_write_net (delay,
836                                              ap->sock,
837                                              &connect_probe_continuation,
838                                              ap);
839 }
840
841
842 /**
843  * Create a connection handle by (asynchronously) connecting to a host.
844  * This function returns immediately, even if the connection has not
845  * yet been established.  This function only creates TCP connections.
846  *
847  * @param cfg configuration to use
848  * @param hostname name of the host to connect to
849  * @param port port to connect to
850  * @return the connection handle
851  */
852 struct GNUNET_CONNECTION_Handle *
853 GNUNET_CONNECTION_create_from_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
854                                        const char *hostname,
855                                        uint16_t port)
856 {
857   struct GNUNET_CONNECTION_Handle *connection;
858
859   GNUNET_assert (0 < strlen (hostname));        /* sanity check */
860   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
861   connection->cfg = cfg;
862   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
863   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
864   connection->port = port;
865   connection->hostname = GNUNET_strdup (hostname);
866   connection->dns_active =
867       GNUNET_RESOLVER_ip_get (connection->hostname,
868                               AF_UNSPEC,
869                               GNUNET_CONNECTION_CONNECT_RETRY_TIMEOUT,
870                               &try_connect_using_address,
871                               connection);
872   return connection;
873 }
874
875
876 /**
877  * Create a connection handle by connecting to a UNIX domain service.
878  * This function returns immediately, even if the connection has not
879  * yet been established.  This function only creates UNIX connections.
880  *
881  * @param cfg configuration to use
882  * @param unixpath path to connect to
883  * @return the connection handle, NULL on systems without UNIX support
884  */
885 struct GNUNET_CONNECTION_Handle *
886 GNUNET_CONNECTION_create_from_connect_to_unixpath (const struct GNUNET_CONFIGURATION_Handle *cfg,
887                                                    const char *unixpath)
888 {
889 #ifdef AF_UNIX
890   struct GNUNET_CONNECTION_Handle *connection;
891   struct sockaddr_un *un;
892
893   GNUNET_assert (0 < strlen (unixpath));        /* sanity check */
894   un = GNUNET_new (struct sockaddr_un);
895   un->sun_family = AF_UNIX;
896   strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
897 #ifdef LINUX
898   {
899     int abstract;
900
901     abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
902                                                      "TESTING",
903                                                      "USE_ABSTRACT_SOCKETS");
904     if (GNUNET_YES == abstract)
905       un->sun_path[0] = '\0';
906   }
907 #endif
908 #if HAVE_SOCKADDR_UN_SUN_LEN
909   un->sun_len = (u_char) sizeof (struct sockaddr_un);
910 #endif
911   connection = GNUNET_new (struct GNUNET_CONNECTION_Handle);
912   connection->cfg = cfg;
913   connection->write_buffer_size = GNUNET_SERVER_MIN_BUFFER_SIZE;
914   connection->write_buffer = GNUNET_malloc (connection->write_buffer_size);
915   connection->port = 0;
916   connection->hostname = NULL;
917   connection->addr = (struct sockaddr *) un;
918   connection->addrlen = sizeof (struct sockaddr_un);
919   connection->sock = GNUNET_NETWORK_socket_create (AF_UNIX,
920                                                    SOCK_STREAM,
921                                                    0);
922   if (NULL == connection->sock)
923   {
924     GNUNET_free (connection->addr);
925     GNUNET_free (connection->write_buffer);
926     GNUNET_free (connection);
927     return NULL;
928   }
929   if ( (GNUNET_OK !=
930         GNUNET_NETWORK_socket_connect (connection->sock,
931                                        connection->addr,
932                                        connection->addrlen)) &&
933        (EINPROGRESS != errno) )
934   {
935     /* Just return; we expect everything to work eventually so don't fail HARD */
936     GNUNET_break (GNUNET_OK ==
937                   GNUNET_NETWORK_socket_close (connection->sock));
938     connection->sock = NULL;
939     return connection;
940   }
941   connect_success_continuation (connection);
942   return connection;
943 #else
944   return NULL;
945 #endif
946 }
947
948
949 /**
950  * Create a connection handle by (asynchronously) connecting to a host.
951  * This function returns immediately, even if the connection has not
952  * yet been established.  This function only creates TCP connections.
953  *
954  * @param s socket to connect
955  * @param serv_addr server address
956  * @param addrlen length of @a serv_addr
957  * @return the connection handle
958  */
959 struct GNUNET_CONNECTION_Handle *
960 GNUNET_CONNECTION_connect_socket (struct GNUNET_NETWORK_Handle *s,
961                                   const struct sockaddr *serv_addr,
962                                   socklen_t addrlen)
963 {
964   struct GNUNET_CONNECTION_Handle *connection;
965
966   if ( (GNUNET_OK !=
967         GNUNET_NETWORK_socket_connect (s, serv_addr, addrlen)) &&
968        (EINPROGRESS != errno) )
969   {
970     /* maybe refused / unsupported address, try next */
971     LOG_STRERROR (GNUNET_ERROR_TYPE_DEBUG,
972                   "connect");
973     LOG (GNUNET_ERROR_TYPE_DEBUG,
974          "Attempt to connect to `%s' failed\n",
975          GNUNET_a2s (serv_addr,
976                      addrlen));
977     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (s));
978     return NULL;
979   }
980   connection = GNUNET_CONNECTION_create_from_existing (s);
981   connection->addr = GNUNET_malloc (addrlen);
982   GNUNET_memcpy (connection->addr, serv_addr, addrlen);
983   connection->addrlen = addrlen;
984   LOG (GNUNET_ERROR_TYPE_INFO,
985        "Trying to connect to `%s' (%p)\n",
986        GNUNET_a2s (serv_addr, addrlen),
987        connection);
988   return connection;
989 }
990
991
992 /**
993  * Create a connection handle by creating a socket and
994  * (asynchronously) connecting to a host.  This function returns
995  * immediately, even if the connection has not yet been established.
996  * This function only creates TCP connections.
997  *
998  * @param af_family address family to use
999  * @param serv_addr server address
1000  * @param addrlen length of @a serv_addr
1001  * @return the connection handle
1002  */
1003 struct GNUNET_CONNECTION_Handle *
1004 GNUNET_CONNECTION_create_from_sockaddr (int af_family,
1005                                         const struct sockaddr *serv_addr,
1006                                         socklen_t addrlen)
1007 {
1008   struct GNUNET_NETWORK_Handle *s;
1009
1010   s = GNUNET_NETWORK_socket_create (af_family, SOCK_STREAM, 0);
1011   if (NULL == s)
1012   {
1013     LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1014                   "socket");
1015     return NULL;
1016   }
1017   return GNUNET_CONNECTION_connect_socket (s,
1018                                            serv_addr,
1019                                            addrlen);
1020 }
1021
1022
1023 /**
1024  * Check if connection is valid (no fatal errors have happened so far).
1025  * Note that a connection that is still trying to connect is considered
1026  * valid.
1027  *
1028  * @param connection connection to check
1029  * @return #GNUNET_YES if valid, #GNUNET_NO otherwise
1030  */
1031 int
1032 GNUNET_CONNECTION_check (struct GNUNET_CONNECTION_Handle *connection)
1033 {
1034   if ((NULL != connection->ap_head) ||
1035       (NULL != connection->dns_active) ||
1036       (NULL != connection->proxy_handshake))
1037     return GNUNET_YES;          /* still trying to connect */
1038   if ( (0 != connection->destroy_later) ||
1039        (NULL == connection->sock) )
1040     return GNUNET_NO;
1041   return GNUNET_YES;
1042 }
1043
1044
1045 /**
1046  * Close the connection and free associated resources.  There must
1047  * not be any pending requests for reading or writing to the
1048  * connection at this time.
1049  *
1050  * @param connection connection to destroy
1051  */
1052 void
1053 GNUNET_CONNECTION_destroy (struct GNUNET_CONNECTION_Handle *connection)
1054 {
1055   struct AddressProbe *pos;
1056
1057   if (0 != connection->destroy_later)
1058   {
1059     connection->destroy_later = -1;
1060     return;
1061   }
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "Shutting down connection (%p)\n",
1064        connection);
1065   GNUNET_assert (NULL == connection->nth.notify_ready);
1066   GNUNET_assert (NULL == connection->receiver);
1067   if (NULL != connection->write_task)
1068   {
1069     GNUNET_SCHEDULER_cancel (connection->write_task);
1070     connection->write_task = NULL;
1071     connection->write_buffer_off = 0;
1072   }
1073   if (NULL != connection->read_task)
1074   {
1075     GNUNET_SCHEDULER_cancel (connection->read_task);
1076     connection->read_task = NULL;
1077   }
1078   if (NULL != connection->nth.timeout_task)
1079   {
1080     GNUNET_SCHEDULER_cancel (connection->nth.timeout_task);
1081     connection->nth.timeout_task = NULL;
1082   }
1083   connection->nth.notify_ready = NULL;
1084   if (NULL != connection->dns_active)
1085   {
1086     GNUNET_RESOLVER_request_cancel (connection->dns_active);
1087     connection->dns_active = NULL;
1088   }
1089   if (NULL != connection->proxy_handshake)
1090   {
1091     /* GNUNET_CONNECTION_destroy (connection->proxy_handshake); */
1092     connection->proxy_handshake->destroy_later = -1;
1093     connection->proxy_handshake = NULL;  /* Not leaked ??? */
1094   }
1095   while (NULL != (pos = connection->ap_head))
1096   {
1097     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pos->sock));
1098     GNUNET_SCHEDULER_cancel (pos->task);
1099     GNUNET_CONTAINER_DLL_remove (connection->ap_head,
1100                                  connection->ap_tail,
1101                                  pos);
1102     GNUNET_free (pos);
1103   }
1104   if ( (NULL != connection->sock) &&
1105        (GNUNET_YES != connection->persist) )
1106   {
1107     if ((GNUNET_OK !=
1108          GNUNET_NETWORK_socket_shutdown (connection->sock,
1109                                          SHUT_RDWR)) &&
1110         (ENOTCONN != errno) &&
1111         (ECONNRESET != errno) )
1112       LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
1113                     "shutdown");
1114   }
1115   if (NULL != connection->sock)
1116   {
1117     if (GNUNET_YES != connection->persist)
1118     {
1119       GNUNET_break (GNUNET_OK ==
1120                     GNUNET_NETWORK_socket_close (connection->sock));
1121     }
1122     else
1123     {
1124       GNUNET_NETWORK_socket_free_memory_only_ (connection->sock); /* at least no memory leak (we deliberately
1125                                                                    * leak the socket in this special case) ... */
1126     }
1127   }
1128   GNUNET_free_non_null (connection->addr);
1129   GNUNET_free_non_null (connection->hostname);
1130   GNUNET_free (connection->write_buffer);
1131   GNUNET_free (connection);
1132 }
1133
1134
1135 /**
1136  * This function is called once we either timeout
1137  * or have data ready to read.
1138  *
1139  * @param cls connection to read from
1140  */
1141 static void
1142 receive_ready (void *cls)
1143 {
1144   struct GNUNET_CONNECTION_Handle *connection = cls;
1145   const struct GNUNET_SCHEDULER_TaskContext *tc;
1146   char buffer[connection->max];
1147   ssize_t ret;
1148   GNUNET_CONNECTION_Receiver receiver;
1149
1150   connection->read_task = NULL;
1151   tc = GNUNET_SCHEDULER_get_task_context ();
1152   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1153   {
1154     LOG (GNUNET_ERROR_TYPE_DEBUG,
1155          "Receive from `%s' encounters error: timeout (%s, %p)\n",
1156          GNUNET_a2s (connection->addr,
1157                      connection->addrlen),
1158          GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (connection->receive_timeout),
1159                                                  GNUNET_YES),
1160          connection);
1161     signal_receive_timeout (connection);
1162     return;
1163   }
1164   if (NULL == connection->sock)
1165   {
1166     /* connect failed for good */
1167     signal_receive_error (connection, ECONNREFUSED);
1168     return;
1169   }
1170   GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->read_ready,
1171                                              connection->sock));
1172 RETRY:
1173   ret = GNUNET_NETWORK_socket_recv (connection->sock,
1174                                     buffer,
1175                                     connection->max);
1176   if (-1 == ret)
1177   {
1178     if (EINTR == errno)
1179       goto RETRY;
1180     signal_receive_error (connection, errno);
1181     return;
1182   }
1183   LOG (GNUNET_ERROR_TYPE_DEBUG,
1184        "receive_ready read %u/%u bytes from `%s' (%p)!\n",
1185        (unsigned int) ret,
1186        connection->max,
1187        GNUNET_a2s (connection->addr,
1188                    connection->addrlen),
1189        connection);
1190   GNUNET_assert (NULL != (receiver = connection->receiver));
1191   connection->receiver = NULL;
1192   receiver (connection->receiver_cls,
1193             buffer,
1194             ret,
1195             connection->addr,
1196             connection->addrlen,
1197             0);
1198 }
1199
1200
1201 /**
1202  * Receive data from the given connection.  Note that this function
1203  * will call @a receiver asynchronously using the scheduler.  It will
1204  * "immediately" return.  Note that there MUST only be one active
1205  * receive call per connection at any given point in time (so do not
1206  * call receive again until the receiver callback has been invoked).
1207  *
1208  * @param connection connection handle
1209  * @param max maximum number of bytes to read
1210  * @param timeout maximum amount of time to wait
1211  * @param receiver function to call with received data
1212  * @param receiver_cls closure for @a receiver
1213  */
1214 void
1215 GNUNET_CONNECTION_receive (struct GNUNET_CONNECTION_Handle *connection,
1216                            size_t max,
1217                            struct GNUNET_TIME_Relative timeout,
1218                            GNUNET_CONNECTION_Receiver receiver,
1219                            void *receiver_cls)
1220 {
1221   GNUNET_assert ((NULL == connection->read_task) &&
1222                  (NULL == connection->receiver));
1223   GNUNET_assert (NULL != receiver);
1224   connection->receiver = receiver;
1225   connection->receiver_cls = receiver_cls;
1226   connection->receive_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1227   connection->max = max;
1228   if (NULL != connection->sock)
1229   {
1230     connection->read_task =
1231       GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining
1232                                      (connection->receive_timeout),
1233                                      connection->sock,
1234                                      &receive_ready,
1235                                      connection);
1236     return;
1237   }
1238   if ((NULL == connection->dns_active) &&
1239       (NULL == connection->ap_head) &&
1240       (NULL == connection->proxy_handshake))
1241   {
1242     connection->receiver = NULL;
1243     receiver (receiver_cls,
1244               NULL, 0,
1245               NULL, 0,
1246               ETIMEDOUT);
1247     return;
1248   }
1249 }
1250
1251
1252 /**
1253  * Cancel receive job on the given connection.  Note that the
1254  * receiver callback must not have been called yet in order
1255  * for the cancellation to be valid.
1256  *
1257  * @param connection connection handle
1258  * @return closure of the original receiver callback closure
1259  */
1260 void *
1261 GNUNET_CONNECTION_receive_cancel (struct GNUNET_CONNECTION_Handle *connection)
1262 {
1263   if (NULL != connection->read_task)
1264   {
1265     GNUNET_assert (connection ==
1266                    GNUNET_SCHEDULER_cancel (connection->read_task));
1267     connection->read_task = NULL;
1268   }
1269   connection->receiver = NULL;
1270   return connection->receiver_cls;
1271 }
1272
1273
1274 /**
1275  * Try to call the transmit notify method (check if we do
1276  * have enough space available first)!
1277  *
1278  * @param connection connection for which we should do this processing
1279  * @return #GNUNET_YES if we were able to call notify
1280  */
1281 static int
1282 process_notify (struct GNUNET_CONNECTION_Handle *connection)
1283 {
1284   size_t used;
1285   size_t avail;
1286   size_t size;
1287   GNUNET_CONNECTION_TransmitReadyNotify notify;
1288
1289   LOG (GNUNET_ERROR_TYPE_DEBUG,
1290        "process_notify is running\n");
1291   GNUNET_assert (NULL == connection->write_task);
1292   if (NULL == (notify = connection->nth.notify_ready))
1293   {
1294     LOG (GNUNET_ERROR_TYPE_DEBUG,
1295          "No one to notify\n");
1296     return GNUNET_NO;
1297   }
1298   used = connection->write_buffer_off - connection->write_buffer_pos;
1299   avail = connection->write_buffer_size - used;
1300   size = connection->nth.notify_size;
1301   if (size > avail)
1302   {
1303     LOG (GNUNET_ERROR_TYPE_DEBUG,
1304          "Not enough buffer\n");
1305     return GNUNET_NO;
1306   }
1307   connection->nth.notify_ready = NULL;
1308   if (connection->write_buffer_size - connection->write_buffer_off < size)
1309   {
1310     /* need to compact */
1311     memmove (connection->write_buffer,
1312              &connection->write_buffer[connection->write_buffer_pos],
1313              used);
1314     connection->write_buffer_off -= connection->write_buffer_pos;
1315     connection->write_buffer_pos = 0;
1316   }
1317   avail = connection->write_buffer_size - connection->write_buffer_off;
1318   GNUNET_assert (avail >= size);
1319   size =
1320       notify (connection->nth.notify_ready_cls, avail,
1321               &connection->write_buffer[connection->write_buffer_off]);
1322   GNUNET_assert (size <= avail);
1323   if (0 != size)
1324     connection->write_buffer_off += size;
1325   return GNUNET_YES;
1326 }
1327
1328
1329 /**
1330  * Task invoked by the scheduler when a call to transmit
1331  * is timing out (we never got enough buffer space to call
1332  * the callback function before the specified timeout
1333  * expired).
1334  *
1335  * This task notifies the client about the timeout.
1336  *
1337  * @param cls the `struct GNUNET_CONNECTION_Handle`
1338  */
1339 static void
1340 transmit_timeout (void *cls)
1341 {
1342   struct GNUNET_CONNECTION_Handle *connection = cls;
1343   GNUNET_CONNECTION_TransmitReadyNotify notify;
1344
1345   connection->nth.timeout_task = NULL;
1346   LOG (GNUNET_ERROR_TYPE_DEBUG,
1347        "Transmit to `%s:%u/%s' fails, time out reached (%p).\n",
1348        connection->hostname,
1349        connection->port,
1350        GNUNET_a2s (connection->addr,
1351                    connection->addrlen),
1352        connection);
1353   notify = connection->nth.notify_ready;
1354   GNUNET_assert (NULL != notify);
1355   connection->nth.notify_ready = NULL;
1356   notify (connection->nth.notify_ready_cls,
1357           0,
1358           NULL);
1359 }
1360
1361
1362 /**
1363  * Task invoked by the scheduler when we failed to connect
1364  * at the time of being asked to transmit.
1365  *
1366  * This task notifies the client about the error.
1367  *
1368  * @param cls the `struct GNUNET_CONNECTION_Handle`
1369  */
1370 static void
1371 connect_error (void *cls)
1372 {
1373   struct GNUNET_CONNECTION_Handle *connection = cls;
1374   GNUNET_CONNECTION_TransmitReadyNotify notify;
1375
1376   LOG (GNUNET_ERROR_TYPE_DEBUG,
1377        "Transmission request of size %u fails (%s/%u), connection failed (%p).\n",
1378        connection->nth.notify_size,
1379        connection->hostname,
1380        connection->port,
1381        connection);
1382   connection->write_task = NULL;
1383   notify = connection->nth.notify_ready;
1384   connection->nth.notify_ready = NULL;
1385   notify (connection->nth.notify_ready_cls,
1386           0,
1387           NULL);
1388 }
1389
1390
1391 /**
1392  * We are ready to transmit (or got a timeout).
1393  *
1394  * @param cls our connection handle
1395  */
1396 static void
1397 transmit_ready (void *cls)
1398 {
1399   struct GNUNET_CONNECTION_Handle *connection = cls;
1400   GNUNET_CONNECTION_TransmitReadyNotify notify;
1401   const struct GNUNET_SCHEDULER_TaskContext *tc;
1402   ssize_t ret;
1403   size_t have;
1404
1405   LOG (GNUNET_ERROR_TYPE_DEBUG,
1406        "transmit_ready running (%p).\n",
1407        connection);
1408   GNUNET_assert (NULL != connection->write_task);
1409   connection->write_task = NULL;
1410   GNUNET_assert (NULL == connection->nth.timeout_task);
1411   tc = GNUNET_SCHEDULER_get_task_context ();
1412   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_TIMEOUT))
1413   {
1414     LOG (GNUNET_ERROR_TYPE_DEBUG,
1415          "Transmit to `%s' fails, time out reached (%p).\n",
1416          GNUNET_a2s (connection->addr,
1417                      connection->addrlen),
1418          connection);
1419     notify = connection->nth.notify_ready;
1420     GNUNET_assert (NULL != notify);
1421     connection->nth.notify_ready = NULL;
1422     notify (connection->nth.notify_ready_cls, 0, NULL);
1423     return;
1424   }
1425   GNUNET_assert (NULL != connection->sock);
1426   if (NULL == tc->write_ready)
1427   {
1428     /* special circumstances (in particular, PREREQ_DONE after
1429      * connect): not yet ready to write, but no "fatal" error either.
1430      * Hence retry.  */
1431     goto SCHEDULE_WRITE;
1432   }
1433   if (! GNUNET_NETWORK_fdset_isset (tc->write_ready,
1434                                     connection->sock))
1435   {
1436     GNUNET_assert (NULL == connection->write_task);
1437     /* special circumstances (in particular, shutdown): not yet ready
1438      * to write, but no "fatal" error either.  Hence retry.  */
1439     goto SCHEDULE_WRITE;
1440   }
1441   GNUNET_assert (connection->write_buffer_off >= connection->write_buffer_pos);
1442   if ((NULL != connection->nth.notify_ready) &&
1443       (connection->write_buffer_size < connection->nth.notify_size))
1444   {
1445     connection->write_buffer =
1446         GNUNET_realloc (connection->write_buffer, connection->nth.notify_size);
1447     connection->write_buffer_size = connection->nth.notify_size;
1448   }
1449   process_notify (connection);
1450   have = connection->write_buffer_off - connection->write_buffer_pos;
1451   if (0 == have)
1452   {
1453     /* no data ready for writing, terminate write loop */
1454     return;
1455   }
1456   GNUNET_assert (have <= connection->write_buffer_size);
1457   GNUNET_assert (have + connection->write_buffer_pos <= connection->write_buffer_size);
1458   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1459 RETRY:
1460   ret =
1461       GNUNET_NETWORK_socket_send (connection->sock,
1462                                   &connection->write_buffer[connection->write_buffer_pos],
1463                                   have);
1464   if (-1 == ret)
1465   {
1466     if (EINTR == errno)
1467       goto RETRY;
1468     if (NULL != connection->write_task)
1469     {
1470       GNUNET_SCHEDULER_cancel (connection->write_task);
1471       connection->write_task = NULL;
1472     }
1473     signal_transmit_error (connection, errno);
1474     return;
1475   }
1476   LOG (GNUNET_ERROR_TYPE_DEBUG,
1477        "Connection transmitted %u/%u bytes to `%s' (%p)\n",
1478        (unsigned int) ret,
1479        have,
1480        GNUNET_a2s (connection->addr,
1481                    connection->addrlen),
1482        connection);
1483   connection->write_buffer_pos += ret;
1484   if (connection->write_buffer_pos == connection->write_buffer_off)
1485   {
1486     /* transmitted all pending data */
1487     connection->write_buffer_pos = 0;
1488     connection->write_buffer_off = 0;
1489   }
1490   if ( (0 == connection->write_buffer_off) &&
1491        (NULL == connection->nth.notify_ready) )
1492     return;                     /* all data sent! */
1493   /* not done writing, schedule more */
1494 SCHEDULE_WRITE:
1495   LOG (GNUNET_ERROR_TYPE_DEBUG,
1496        "Re-scheduling transmit_ready (more to do) (%p).\n",
1497        connection);
1498   have = connection->write_buffer_off - connection->write_buffer_pos;
1499   GNUNET_assert ( (NULL != connection->nth.notify_ready) ||
1500                   (have > 0) );
1501   if (NULL == connection->write_task)
1502     connection->write_task =
1503         GNUNET_SCHEDULER_add_write_net ((connection->nth.notify_ready ==
1504                                          NULL) ? GNUNET_TIME_UNIT_FOREVER_REL :
1505                                         GNUNET_TIME_absolute_get_remaining
1506                                         (connection->nth.transmit_timeout),
1507                                         connection->sock,
1508                                         &transmit_ready, connection);
1509 }
1510
1511
1512 /**
1513  * Ask the connection to call us once the specified number of bytes
1514  * are free in the transmission buffer.  Will never call the @a notify
1515  * callback in this task, but always first go into the scheduler.
1516  *
1517  * @param connection connection
1518  * @param size number of bytes to send
1519  * @param timeout after how long should we give up (and call
1520  *        @a notify with buf NULL and size 0)?
1521  * @param notify function to call
1522  * @param notify_cls closure for @a notify
1523  * @return non-NULL if the notify callback was queued,
1524  *         NULL if we are already going to notify someone else (busy)
1525  */
1526 struct GNUNET_CONNECTION_TransmitHandle *
1527 GNUNET_CONNECTION_notify_transmit_ready (struct GNUNET_CONNECTION_Handle *connection,
1528                                          size_t size,
1529                                          struct GNUNET_TIME_Relative timeout,
1530                                          GNUNET_CONNECTION_TransmitReadyNotify notify,
1531                                          void *notify_cls)
1532 {
1533   if (NULL != connection->nth.notify_ready)
1534   {
1535     GNUNET_assert (0);
1536     return NULL;
1537   }
1538   GNUNET_assert (NULL != notify);
1539   GNUNET_assert (size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1540   GNUNET_assert (connection->write_buffer_off <= connection->write_buffer_size);
1541   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_size);
1542   GNUNET_assert (connection->write_buffer_pos <= connection->write_buffer_off);
1543   connection->nth.notify_ready = notify;
1544   connection->nth.notify_ready_cls = notify_cls;
1545   connection->nth.connection = connection;
1546   connection->nth.notify_size = size;
1547   connection->nth.transmit_timeout = GNUNET_TIME_relative_to_absolute (timeout);
1548   GNUNET_assert (NULL == connection->nth.timeout_task);
1549   if ((NULL == connection->sock) &&
1550       (NULL == connection->ap_head) &&
1551       (NULL == connection->dns_active) &&
1552       (NULL == connection->proxy_handshake))
1553   {
1554     if (NULL != connection->write_task)
1555       GNUNET_SCHEDULER_cancel (connection->write_task);
1556     connection->write_task = GNUNET_SCHEDULER_add_now (&connect_error,
1557                                                        connection);
1558     return &connection->nth;
1559   }
1560   if (NULL != connection->write_task)
1561     return &connection->nth; /* previous transmission still in progress */
1562   if (NULL != connection->sock)
1563   {
1564     /* connected, try to transmit now */
1565     LOG (GNUNET_ERROR_TYPE_DEBUG,
1566          "Scheduling transmission (%p).\n",
1567          connection);
1568     connection->write_task =
1569         GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_absolute_get_remaining
1570                                         (connection->nth.transmit_timeout),
1571                                         connection->sock,
1572                                         &transmit_ready, connection);
1573     return &connection->nth;
1574   }
1575   /* not yet connected, wait for connection */
1576   LOG (GNUNET_ERROR_TYPE_DEBUG,
1577        "Need to wait to schedule transmission for connection, adding timeout task (%p).\n",
1578        connection);
1579   connection->nth.timeout_task =
1580     GNUNET_SCHEDULER_add_delayed (timeout,
1581                                   &transmit_timeout,
1582                                   connection);
1583   return &connection->nth;
1584 }
1585
1586
1587 /**
1588  * Cancel the specified transmission-ready notification.
1589  *
1590  * @param th notification to cancel
1591  */
1592 void
1593 GNUNET_CONNECTION_notify_transmit_ready_cancel (struct GNUNET_CONNECTION_TransmitHandle *th)
1594 {
1595   GNUNET_assert (NULL != th->notify_ready);
1596   th->notify_ready = NULL;
1597   if (NULL != th->timeout_task)
1598   {
1599     GNUNET_SCHEDULER_cancel (th->timeout_task);
1600     th->timeout_task = NULL;
1601   }
1602   if (NULL != th->connection->write_task)
1603   {
1604     GNUNET_SCHEDULER_cancel (th->connection->write_task);
1605     th->connection->write_task = NULL;
1606   }
1607 }
1608
1609
1610 /**
1611  * Create a connection to be proxied using a given connection.
1612  *
1613  * @param cph connection to proxy server
1614  * @return connection to be proxied
1615  */
1616 struct GNUNET_CONNECTION_Handle *
1617 GNUNET_CONNECTION_create_proxied_from_handshake (struct GNUNET_CONNECTION_Handle *cph)
1618 {
1619   struct GNUNET_CONNECTION_Handle *proxied = GNUNET_CONNECTION_create_from_existing (NULL);
1620
1621   proxied->proxy_handshake = cph;
1622   return proxied;
1623 }
1624
1625
1626 /**
1627  * Activate proxied connection and destroy initial proxy handshake connection.
1628  * There must not be any pending requests for reading or writing to the
1629  * proxy hadshake connection at this time.
1630  *
1631  * @param proxied connection connection to proxy server
1632  */
1633 void
1634 GNUNET_CONNECTION_acivate_proxied (struct GNUNET_CONNECTION_Handle *proxied)
1635 {
1636   struct GNUNET_CONNECTION_Handle *cph = proxied->proxy_handshake;
1637
1638   GNUNET_assert (NULL != cph);
1639   GNUNET_assert (NULL == proxied->sock);
1640   GNUNET_assert (NULL != cph->sock);
1641   proxied->sock = cph->sock;
1642   cph->sock = NULL;
1643   GNUNET_CONNECTION_destroy (cph);
1644   connect_success_continuation (proxied);
1645 }
1646
1647
1648 /* end of connection.c */