292f38039129306095367cf6ae6d1c0a80da96b9
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  * Must be rather small, otherwise a partially successful HELLO
71  * validation (some addresses working) might not be available
72  * before a client's request for a connection fails for good.
73  * Besides, if a single request to an address takes a long time,
74  * then the peer is unlikely worthwhile anyway.
75  */
76 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
77
78 /**
79  * How often do we re-add (cheaper) plugins to our list of plugins
80  * to try for a given connected peer?
81  */
82 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
83
84 /**
85  * After how long do we expire an address in a HELLO
86  * that we just validated?  This value is also used
87  * for our own addresses when we create a HELLO.
88  */
89 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
90
91
92 /**
93  * Entry in linked list of network addresses.
94  */
95 struct AddressList
96 {
97   /**
98    * This is a linked list.
99    */
100   struct AddressList *next;
101
102   /**
103    * The address, actually a pointer to the end
104    * of this struct.  Do not free!
105    */
106   void *addr;
107
108   /**
109    * How long until we auto-expire this address (unless it is
110    * re-confirmed by the transport)?
111    */
112   struct GNUNET_TIME_Absolute expires;
113
114   /**
115    * Length of addr.
116    */
117   size_t addrlen;
118
119 };
120
121
122 /**
123  * Entry in linked list of all of our plugins.
124  */
125 struct TransportPlugin
126 {
127
128   /**
129    * This is a linked list.
130    */
131   struct TransportPlugin *next;
132
133   /**
134    * API of the transport as returned by the plugin's
135    * initialization function.
136    */
137   struct GNUNET_TRANSPORT_PluginFunctions *api;
138
139   /**
140    * Short name for the plugin (i.e. "tcp").
141    */
142   char *short_name;
143
144   /**
145    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
146    */
147   char *lib_name;
148
149   /**
150    * List of our known addresses for this transport.
151    */
152   struct AddressList *addresses;
153
154   /**
155    * Environment this transport service is using
156    * for this plugin.
157    */
158   struct GNUNET_TRANSPORT_PluginEnvironment env;
159
160   /**
161    * ID of task that is used to clean up expired addresses.
162    */
163   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
164
165
166   /**
167    * Set to GNUNET_YES if we need to scrap the existing
168    * list of "addresses" and start fresh when we receive
169    * the next address update from a transport.  Set to
170    * GNUNET_NO if we should just add the new address
171    * to the list and wait for the commit call.
172    */
173   int rebuild;
174 };
175
176 struct NeighbourList;
177
178 /**
179  * For each neighbour we keep a list of messages
180  * that we still want to transmit to the neighbour.
181  */
182 struct MessageQueue
183 {
184
185   /**
186    * This is a linked list.
187    */
188   struct MessageQueue *next;
189
190   /**
191    * The message we want to transmit.
192    */
193   struct GNUNET_MessageHeader *message;
194
195   /**
196    * Client responsible for queueing the message;
197    * used to check that a client has not two messages
198    * pending for the same target.  Can be NULL.
199    */
200   struct TransportClient *client;
201
202   /**
203    * Neighbour this entry belongs to.
204    */
205   struct NeighbourList *neighbour;
206
207   /**
208    * Plugin that we used for the transmission.
209    * NULL until we scheduled a transmission.
210    */
211   struct TransportPlugin *plugin;
212
213   /**
214    * Internal message of the transport system that should not be
215    * included in the usual SEND-SEND_OK transmission confirmation
216    * traffic management scheme.  Typically, "internal_msg" will
217    * be set whenever "client" is NULL (but it is not strictly
218    * required).
219    */
220   int internal_msg;
221
222   /**
223    * How important is the message?
224    */
225   unsigned int priority;
226   
227 };
228
229
230 /**
231  * For a given Neighbour, which plugins are available
232  * to talk to this peer and what are their costs?
233  */
234 struct ReadyList
235 {
236
237   /**
238    * This is a linked list.
239    */
240   struct ReadyList *next;
241
242   /**
243    * Which of our transport plugins does this entry
244    * represent?
245    */
246   struct TransportPlugin *plugin;
247
248   /**
249    * Neighbour this entry belongs to.
250    */
251   struct NeighbourList *neighbour;
252
253   /**
254    * What was the last latency observed for this plugin
255    * and peer?  Invalid if connected is GNUNET_NO.
256    */
257   struct GNUNET_TIME_Relative latency;
258
259   /**
260    * If we did not successfully transmit a message to the
261    * given peer via this connection during the specified
262    * time, we should consider the connection to be dead.
263    * This is used in the case that a TCP transport simply
264    * stalls writing to the stream but does not formerly
265    * get a signal that the other peer died.
266    */
267   struct GNUNET_TIME_Absolute timeout;
268
269   /**
270    * Is this plugin currently connected?  The first time
271    * we transmit or send data to a peer via a particular
272    * plugin, we set this to GNUNET_YES.  If we later get
273    * an error (disconnect notification or transmission
274    * failure), we set it back to GNUNET_NO.  Each time the
275    * value is set to GNUNET_YES, we increment the
276    * "connect_attempts" counter.  If that one reaches a
277    * particular threshold, we consider the plugin to not
278    * be working properly at this time for the given peer
279    * and remove it from the eligible list.
280    */
281   int connected;
282
283   /**
284    * How often have we tried to connect using this plugin?
285    */
286   unsigned int connect_attempts;
287
288   /**
289    * Is this plugin ready to transmit to the specific
290    * target?  GNUNET_NO if not.  Initially, all plugins
291    * are marked ready.  If a transmission is in progress,
292    * "transmit_ready" is set to GNUNET_NO.
293    */
294   int transmit_ready;
295
296 };
297
298
299 /**
300  * Entry in linked list of all of our current neighbours.
301  */
302 struct NeighbourList
303 {
304
305   /**
306    * This is a linked list.
307    */
308   struct NeighbourList *next;
309
310   /**
311    * Which of our transports is connected to this peer
312    * and what is their status?
313    */
314   struct ReadyList *plugins;
315
316   /**
317    * List of messages we would like to send to this peer;
318    * must contain at most one message per client.
319    */
320   struct MessageQueue *messages;
321
322   /**
323    * Identity of this neighbour.
324    */
325   struct GNUNET_PeerIdentity id;
326
327   /**
328    * ID of task scheduled to run when this peer is about to
329    * time out (will free resources associated with the peer).
330    */
331   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
332
333   /**
334    * How long until we should consider this peer dead
335    * (if we don't receive another message in the
336    * meantime)?
337    */
338   struct GNUNET_TIME_Absolute peer_timeout;
339
340   /**
341    * At what time did we reset last_received last?
342    */
343   struct GNUNET_TIME_Absolute last_quota_update;
344
345   /**
346    * At what time should we try to again add plugins to
347    * our ready list?
348    */
349   struct GNUNET_TIME_Absolute retry_plugins_time;
350
351   /**
352    * How many bytes have we received since the "last_quota_update"
353    * timestamp?
354    */
355   uint64_t last_received;
356
357   /**
358    * Global quota for inbound traffic for the neighbour in bytes/ms.
359    */
360   uint32_t quota_in;
361
362   /**
363    * How often has the other peer (recently) violated the
364    * inbound traffic limit?  Incremented by 10 per violation,
365    * decremented by 1 per non-violation (for each
366    * time interval).
367    */
368   unsigned int quota_violation_count;
369
370   /**
371    * Have we seen an ACK from this neighbour in the past?
372    * (used to make up a fake ACK for clients connecting after
373    * the neighbour connected to us).
374    */
375   int saw_ack;
376
377 };
378
379
380 /**
381  * Linked list of messages to be transmitted to
382  * the client.  Each entry is followed by the
383  * actual message.
384  */
385 struct ClientMessageQueueEntry
386 {
387   /**
388    * This is a linked list.
389    */
390   struct ClientMessageQueueEntry *next;
391 };
392
393
394 /**
395  * Client connected to the transport service.
396  */
397 struct TransportClient
398 {
399
400   /**
401    * This is a linked list.
402    */
403   struct TransportClient *next;
404
405   /**
406    * Handle to the client.
407    */
408   struct GNUNET_SERVER_Client *client;
409
410   /**
411    * Linked list of messages yet to be transmitted to
412    * the client.
413    */
414   struct ClientMessageQueueEntry *message_queue_head;
415
416   /**
417    * Tail of linked list of messages yet to be transmitted to the
418    * client.
419    */
420   struct ClientMessageQueueEntry *message_queue_tail;
421
422   /**
423    * Is a call to "transmit_send_continuation" pending?  If so, we
424    * must not free this struct (even if the corresponding client
425    * disconnects) and instead only remove it from the linked list and
426    * set the "client" field to NULL.
427    */
428   int tcs_pending;
429
430   /**
431    * Length of the list of messages pending for this client.
432    */
433   unsigned int message_count;
434
435 };
436
437
438 /**
439  * For each HELLO, we may have to validate multiple addresses;
440  * each address gets its own request entry.
441  */
442 struct ValidationAddress
443 {
444   /**
445    * This is a linked list.
446    */
447   struct ValidationAddress *next;
448
449   /**
450    * Name of the transport.
451    */
452   char *transport_name;
453
454   /**
455    * When should this validated address expire?
456    */
457   struct GNUNET_TIME_Absolute expiration;
458
459   /**
460    * Length of the address we are validating.
461    */
462   size_t addr_len;
463
464   /**
465    * Challenge number we used.
466    */
467   uint32_t challenge;
468
469   /**
470    * Set to GNUNET_YES if the challenge was met,
471    * GNUNET_SYSERR if we know it failed, GNUNET_NO
472    * if we are waiting on a response.
473    */
474   int ok;
475 };
476
477
478 /**
479  * Entry in linked list of all HELLOs awaiting validation.
480  */
481 struct ValidationList
482 {
483
484   /**
485    * This is a linked list.
486    */
487   struct ValidationList *next;
488
489   /**
490    * Linked list with one entry per address from the HELLO
491    * that needs to be validated.
492    */
493   struct ValidationAddress *addresses;
494
495   /**
496    * The public key of the peer.
497    */
498   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
499
500   /**
501    * When does this record time-out? (assuming the
502    * challenge goes unanswered)
503    */
504   struct GNUNET_TIME_Absolute timeout;
505
506 };
507
508
509 /**
510  * HELLOs awaiting validation.
511  */
512 static struct ValidationList *pending_validations;
513
514 /**
515  * Our HELLO message.
516  */
517 static struct GNUNET_HELLO_Message *our_hello;
518
519 /**
520  * "version" of "our_hello".  Used to see if a given
521  * neighbour has already been sent the latest version
522  * of our HELLO message.
523  */
524 static unsigned int our_hello_version;
525
526 /**
527  * Our public key.
528  */
529 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
530
531 /**
532  * Our identity.
533  */
534 static struct GNUNET_PeerIdentity my_identity;
535
536 /**
537  * Our private key.
538  */
539 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
540
541 /**
542  * Our scheduler.
543  */
544 struct GNUNET_SCHEDULER_Handle *sched;
545
546 /**
547  * Our configuration.
548  */
549 const struct GNUNET_CONFIGURATION_Handle *cfg;
550
551 /**
552  * Linked list of all clients to this service.
553  */
554 static struct TransportClient *clients;
555
556 /**
557  * All loaded plugins.
558  */
559 static struct TransportPlugin *plugins;
560
561 /**
562  * Our server.
563  */
564 static struct GNUNET_SERVER_Handle *server;
565
566 /**
567  * All known neighbours and their HELLOs.
568  */
569 static struct NeighbourList *neighbours;
570
571 /**
572  * Number of neighbours we'd like to have.
573  */
574 static uint32_t max_connect_per_transport;
575
576
577 /**
578  * Find an entry in the neighbour list for a particular peer.
579  *
580  * @return NULL if not found.
581  */
582 static struct NeighbourList *
583 find_neighbour (const struct GNUNET_PeerIdentity *key)
584 {
585   struct NeighbourList *head = neighbours;
586   while ((head != NULL) &&
587          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
588     head = head->next;
589   return head;
590 }
591
592
593 /**
594  * Find an entry in the transport list for a particular transport.
595  *
596  * @return NULL if not found.
597  */
598 static struct TransportPlugin *
599 find_transport (const char *short_name)
600 {
601   struct TransportPlugin *head = plugins;
602   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
603     head = head->next;
604   return head;
605 }
606
607
608 /**
609  * Update the quota values for the given neighbour now.
610  */
611 static void
612 update_quota (struct NeighbourList *n)
613 {
614   struct GNUNET_TIME_Relative delta;
615   uint64_t allowed;
616   uint64_t remaining;
617
618   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
619   if (delta.value < MIN_QUOTA_REFRESH_TIME)
620     return;                     /* not enough time passed for doing quota update */
621   allowed = delta.value * n->quota_in;
622   if (n->last_received < allowed)
623     {
624       remaining = allowed - n->last_received;
625       if (n->quota_in > 0)
626         remaining /= n->quota_in;
627       else
628         remaining = 0;
629       if (remaining > MAX_BANDWIDTH_CARRY)
630         remaining = MAX_BANDWIDTH_CARRY;
631       n->last_received = 0;
632       n->last_quota_update = GNUNET_TIME_absolute_get ();
633       n->last_quota_update.value -= remaining;
634       if (n->quota_violation_count > 0)
635         n->quota_violation_count--;
636     }
637   else
638     {
639       n->last_received -= allowed;
640       n->last_quota_update = GNUNET_TIME_absolute_get ();
641       if (n->last_received > allowed)
642         {
643           /* more than twice the allowed rate! */
644           n->quota_violation_count += 10;
645         }
646     }
647 }
648
649
650 /**
651  * Function called to notify a client about the socket
652  * being ready to queue more data.  "buf" will be
653  * NULL and "size" zero if the socket was closed for
654  * writing in the meantime.
655  *
656  * @param cls closure
657  * @param size number of bytes available in buf
658  * @param buf where the callee should write the message
659  * @return number of bytes written to buf
660  */
661 static size_t
662 transmit_to_client_callback (void *cls, size_t size, void *buf)
663 {
664   struct TransportClient *client = cls;
665   struct ClientMessageQueueEntry *q;
666   uint16_t msize;
667   size_t tsize;
668   const struct GNUNET_MessageHeader *msg;
669   struct GNUNET_CONNECTION_TransmitHandle *th;
670   char *cbuf;
671
672   if (buf == NULL)
673     {
674       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675                   "Transmission to client failed, closing connection.\n");
676       /* fatal error with client, free message queue! */
677       while (NULL != (q = client->message_queue_head))
678         {
679           client->message_queue_head = q->next;
680           GNUNET_free (q);
681         }
682       client->message_queue_tail = NULL;
683       client->message_count = 0;
684       return 0;
685     }
686   cbuf = buf;
687   tsize = 0;
688   while (NULL != (q = client->message_queue_head))
689     {
690       msg = (const struct GNUNET_MessageHeader *) &q[1];
691       msize = ntohs (msg->size);
692       if (msize + tsize > size)
693         break;
694 #if DEBUG_TRANSPORT
695       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
696                   "Transmitting message of type %u to client.\n",
697                   ntohs (msg->type));
698 #endif
699       client->message_queue_head = q->next;
700       if (q->next == NULL)
701         client->message_queue_tail = NULL;
702       memcpy (&cbuf[tsize], msg, msize);
703       tsize += msize;
704       GNUNET_free (q);
705       client->message_count--;
706     }
707   if (NULL != q)
708     {
709       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
710       th = GNUNET_SERVER_notify_transmit_ready (client->client,
711                                                 msize,
712                                                 GNUNET_TIME_UNIT_FOREVER_REL,
713                                                 &transmit_to_client_callback,
714                                                 client);
715       GNUNET_assert (th != NULL);
716     }
717   return tsize;
718 }
719
720
721 /**
722  * Send the specified message to the specified client.  Since multiple
723  * messages may be pending for the same client at a time, this code
724  * makes sure that no message is lost.
725  *
726  * @param client client to transmit the message to
727  * @param msg the message to send
728  * @param may_drop can this message be dropped if the
729  *        message queue for this client is getting far too large?
730  */
731 static void
732 transmit_to_client (struct TransportClient *client,
733                     const struct GNUNET_MessageHeader *msg, int may_drop)
734 {
735   struct ClientMessageQueueEntry *q;
736   uint16_t msize;
737   struct GNUNET_CONNECTION_TransmitHandle *th;
738
739   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
740     {
741       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
742                   _
743                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
744                   client->message_count, MAX_PENDING);
745       /* TODO: call to statistics... */
746       return;
747     }
748   client->message_count++;
749   msize = ntohs (msg->size);
750   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
751   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
752   memcpy (&q[1], msg, msize);
753   /* append to message queue */
754   if (client->message_queue_tail == NULL)
755     {
756       client->message_queue_tail = q;
757     }
758   else
759     {
760       client->message_queue_tail->next = q;
761       client->message_queue_tail = q;
762     }
763   if (client->message_queue_head == NULL)
764     {
765       client->message_queue_head = q;
766       th = GNUNET_SERVER_notify_transmit_ready (client->client,
767                                                 msize,
768                                                 GNUNET_TIME_UNIT_FOREVER_REL,
769                                                 &transmit_to_client_callback,
770                                                 client);
771       GNUNET_assert (th != NULL);
772     }
773 }
774
775
776 /**
777  * Find alternative plugins for communication.
778  *
779  * @param neighbour for which neighbour should we try to find
780  *        more plugins?
781  */
782 static void
783 try_alternative_plugins (struct NeighbourList *neighbour)
784 {
785   struct ReadyList *rl;
786
787   if ((neighbour->plugins != NULL) &&
788       (neighbour->retry_plugins_time.value >
789        GNUNET_TIME_absolute_get ().value))
790     return;                     /* don't try right now */
791   neighbour->retry_plugins_time
792     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
793
794   rl = neighbour->plugins;
795   while (rl != NULL)
796     {
797       if (rl->connect_attempts > 0)
798         rl->connect_attempts--; /* amnesty */
799       rl = rl->next;
800     }
801
802 }
803
804
805 /**
806  * The peer specified by the given neighbour has timed-out or a plugin
807  * has disconnected.  We may either need to do nothing (other plugins
808  * still up), or trigger a full disconnect and clean up.  This
809  * function updates our state and do the necessary notifications.
810  * Also notifies our clients that the neighbour is now officially
811  * gone.
812  *
813  * @param n the neighbour list entry for the peer
814  * @param check should we just check if all plugins
815  *        disconnected or must we ask all plugins to
816  *        disconnect?
817  */
818 static void
819 disconnect_neighbour (struct NeighbourList *n,
820                       int check);
821
822
823 /**
824  * Check the ready list for the given neighbour and
825  * if a plugin is ready for transmission (and if we
826  * have a message), do so!
827  *
828  * @param neighbour target peer for which to check the plugins
829  */
830 static void 
831 try_transmission_to_peer (struct NeighbourList *neighbour);
832
833
834 /**
835  * Function called by the GNUNET_TRANSPORT_TransmitFunction
836  * upon "completion" of a send request.  This tells the API
837  * that it is now legal to send another message to the given
838  * peer.
839  *
840  * @param cls closure, identifies the entry on the
841  *            message queue that was transmitted and the
842  *            client responsible for queueing the message
843  * @param rl identifies plugin used for the transmission for
844  *           this neighbour; needs to be re-enabled for
845  *           future transmissions
846  * @param target the peer receiving the message
847  * @param result GNUNET_OK on success, if the transmission
848  *           failed, we should not tell the client to transmit
849  *           more messages
850  */
851 static void
852 transmit_send_continuation (void *cls,
853                             struct ReadyList *rl,
854                             const struct GNUNET_PeerIdentity *target,
855                             int result)
856 {
857   struct MessageQueue *mq = cls;
858   struct SendOkMessage send_ok_msg;
859   struct NeighbourList *n;
860
861   GNUNET_assert (mq != NULL);
862   n = mq->neighbour;
863   GNUNET_assert (n != NULL);
864   GNUNET_assert (0 ==
865                  memcmp (&n->id, target,
866                          sizeof (struct GNUNET_PeerIdentity)));
867   if (rl == NULL)
868     {
869       rl = n->plugins;
870       while ((rl != NULL) && (rl->plugin != mq->plugin))
871         rl = rl->next;
872       GNUNET_assert (rl != NULL);
873     }
874   if (result == GNUNET_OK)
875     {
876       rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
877     }
878   else
879     {
880       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881                   "Transmission to peer `%s' failed, marking connection as down.\n",
882                   GNUNET_i2s(target));
883       rl->connected = GNUNET_NO;
884     }
885   if (!mq->internal_msg)
886     rl->transmit_ready = GNUNET_YES;
887   if (mq->client != NULL)
888     {
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Notifying client %p about failed transission to peer `%4s'.\n",
891                   mq->client,
892                   GNUNET_i2s(target));
893       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
894       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
895       send_ok_msg.success = htonl (result);
896       send_ok_msg.peer = n->id;
897       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
898     }
899   GNUNET_free (mq->message);
900   GNUNET_free (mq);
901   /* one plugin just became ready again, try transmitting
902      another message (if available) */
903   if (result == GNUNET_OK)
904     try_transmission_to_peer (n);
905   else    
906     disconnect_neighbour (n, GNUNET_YES); 
907 }
908
909
910 /**
911  * Check the ready list for the given neighbour and
912  * if a plugin is ready for transmission (and if we
913  * have a message), do so!
914  */
915 static void
916 try_transmission_to_peer (struct NeighbourList *neighbour)
917 {
918   struct ReadyList *pos;
919   struct GNUNET_TIME_Relative min_latency;
920   struct ReadyList *rl;
921   struct MessageQueue *mq;
922   struct GNUNET_TIME_Absolute now;
923
924   if (neighbour->messages == NULL)
925     return;                     /* nothing to do */
926   try_alternative_plugins (neighbour);
927   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
928   rl = NULL;
929   mq = neighbour->messages;
930   now = GNUNET_TIME_absolute_get ();
931   pos = neighbour->plugins;
932   while (pos != NULL)
933     {
934       /* set plugins that are inactive for a long time back to disconnected */
935       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
936         {
937 #if DEBUG_TRANSPORT
938           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                       "Marking long-time inactive connection to `%4s' as down.\n",
940                       GNUNET_i2s (&neighbour->id));
941 #endif
942           pos->connected = GNUNET_NO;
943         }
944       if (((GNUNET_YES == pos->transmit_ready) ||
945            (mq->internal_msg)) &&
946           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
947           ((rl == NULL) || (min_latency.value > pos->latency.value)))
948         {
949           rl = pos;
950           min_latency = pos->latency;
951         }
952       pos = pos->next;
953     }
954   if (rl == NULL)
955     {
956 #if DEBUG_TRANSPORT
957       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958                   "No plugin ready to transmit message\n");
959 #endif
960       return;                   /* nobody ready */
961     }
962   if (GNUNET_NO == rl->connected)
963     {
964       rl->connect_attempts++;
965       rl->connected = GNUNET_YES;
966 #if DEBUG_TRANSPORT
967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968               "Establishing fresh connection with `%4s' via plugin `%s'\n",
969               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
970 #endif
971     }
972   neighbour->messages = mq->next;
973   mq->plugin = rl->plugin;
974   if (!mq->internal_msg)
975     rl->transmit_ready = GNUNET_NO;
976 #if DEBUG_TRANSPORT
977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
979               ntohs (mq->message->type),
980               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
981 #endif
982   rl->plugin->api->send (rl->plugin->api->cls,
983                          rl,
984                          &neighbour->id,
985                          mq->priority,
986                          mq->message,
987                          GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
988                          &transmit_send_continuation, mq);
989 }
990
991
992 /**
993  * Send the specified message to the specified peer.
994  *
995  * @param client source of the transmission request (can be NULL)
996  * @param priority how important is the message
997  * @param msg message to send
998  * @param is_internal is this an internal message
999  * @param neighbour handle to the neighbour for transmission
1000  */
1001 static void
1002 transmit_to_peer (struct TransportClient *client,
1003                   unsigned int priority,
1004                   const struct GNUNET_MessageHeader *msg,
1005                   int is_internal, struct NeighbourList *neighbour)
1006 {
1007   struct MessageQueue *mq;
1008   struct MessageQueue *mqe;
1009   struct GNUNET_MessageHeader *m;
1010
1011 #if DEBUG_TRANSPORT
1012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013               _("Sending message of type %u to peer `%4s'\n"),
1014               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
1015 #endif
1016   if (client != NULL)
1017     {
1018       /* check for duplicate submission */
1019       mq = neighbour->messages;
1020       while (NULL != mq)
1021         {
1022           if (mq->client == client)
1023             {
1024               /* client transmitted to same peer twice
1025                  before getting SendOk! */
1026               GNUNET_break (0);
1027               return;
1028             }
1029           mq = mq->next;
1030         }
1031     }
1032   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1033   mq->client = client;
1034   m = GNUNET_malloc (ntohs (msg->size));
1035   memcpy (m, msg, ntohs (msg->size));
1036   mq->message = m;
1037   mq->neighbour = neighbour;
1038   mq->internal_msg = is_internal;
1039   mq->priority = priority;
1040
1041   /* find tail */
1042   mqe = neighbour->messages;
1043   if (mqe != NULL)
1044     while (mqe->next != NULL)
1045       mqe = mqe->next;
1046   if (mqe == NULL)
1047     {
1048       /* new head */
1049       neighbour->messages = mq;
1050       try_transmission_to_peer (neighbour);
1051     }
1052   else
1053     {
1054       /* append */
1055       mqe->next = mq;
1056     }
1057 }
1058
1059
1060 /**
1061  * FIXME: document.
1062  */
1063 struct GeneratorContext
1064 {
1065   struct TransportPlugin *plug_pos;
1066   struct AddressList *addr_pos;
1067   struct GNUNET_TIME_Absolute expiration;
1068 };
1069
1070
1071 /**
1072  * FIXME: document.
1073  */
1074 static size_t
1075 address_generator (void *cls, size_t max, void *buf)
1076 {
1077   struct GeneratorContext *gc = cls;
1078   size_t ret;
1079
1080   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1081     {
1082       gc->plug_pos = gc->plug_pos->next;
1083       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1084     }
1085   if (NULL == gc->plug_pos)
1086     return 0;
1087   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1088                                   gc->expiration,
1089                                   gc->addr_pos->addr,
1090                                   gc->addr_pos->addrlen, buf, max);
1091   gc->addr_pos = gc->addr_pos->next;
1092   return ret;
1093 }
1094
1095
1096 /**
1097  * Construct our HELLO message from all of the addresses of
1098  * all of the transports.
1099  */
1100 static void
1101 refresh_hello ()
1102 {
1103   struct GNUNET_HELLO_Message *hello;
1104   struct TransportClient *cpos;
1105   struct NeighbourList *npos;
1106   struct GeneratorContext gc;
1107
1108 #if DEBUG_TRANSPORT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1110               "Refreshing my `%s'\n",
1111               "HELLO");
1112 #endif
1113   gc.plug_pos = plugins;
1114   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1115   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1116   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1117   cpos = clients;
1118   while (cpos != NULL)
1119     {
1120       transmit_to_client (cpos,
1121                           (const struct GNUNET_MessageHeader *) hello,
1122                           GNUNET_NO);
1123       cpos = cpos->next;
1124     }
1125
1126   GNUNET_free_non_null (our_hello);
1127   our_hello = hello;
1128   our_hello_version++;
1129   GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
1130   npos = neighbours;
1131   while (npos != NULL)
1132     {
1133 #if DEBUG_TRANSPORT
1134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1135                   "Transmitting updated `%s' to neighbour `%4s'\n",
1136                   "HELLO",
1137                   GNUNET_i2s(&npos->id));
1138 #endif
1139       transmit_to_peer (NULL, 0,
1140                         (const struct GNUNET_MessageHeader *) our_hello,
1141                         GNUNET_YES, npos);
1142       npos = npos->next;
1143     }
1144 }
1145
1146
1147 /**
1148  * Task used to clean up expired addresses for a plugin.
1149  *
1150  * @param cls closure
1151  * @param tc context
1152  */
1153 static void
1154 expire_address_task (void *cls,
1155                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1156
1157
1158 /**
1159  * Update the list of addresses for this plugin,
1160  * expiring those that are past their expiration date.
1161  *
1162  * @param plugin addresses of which plugin should be recomputed?
1163  * @param fresh set to GNUNET_YES if a new address was added
1164  *        and we need to regenerate the HELLO even if nobody
1165  *        expired
1166  */
1167 static void
1168 update_addresses (struct TransportPlugin *plugin, int fresh)
1169 {
1170   struct GNUNET_TIME_Relative min_remaining;
1171   struct GNUNET_TIME_Relative remaining;
1172   struct GNUNET_TIME_Absolute now;
1173   struct AddressList *pos;
1174   struct AddressList *prev;
1175   struct AddressList *next;
1176   int expired;
1177
1178   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
1179     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1180   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1181   now = GNUNET_TIME_absolute_get ();
1182   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1183   expired = GNUNET_NO;
1184   prev = NULL;
1185   pos = plugin->addresses;
1186   while (pos != NULL)
1187     {
1188       next = pos->next;
1189       if (pos->expires.value < now.value)
1190         {
1191           expired = GNUNET_YES;
1192           if (prev == NULL)
1193             plugin->addresses = pos->next;
1194           else
1195             prev->next = pos->next;
1196           GNUNET_free (pos);
1197         }
1198       else
1199         {
1200           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1201           if (remaining.value < min_remaining.value)
1202             min_remaining = remaining;
1203           prev = pos;
1204         }
1205       pos = next;
1206     }
1207
1208   if (expired || fresh)
1209     refresh_hello ();
1210   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1211     plugin->address_update_task
1212       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1213                                       min_remaining,
1214                                       &expire_address_task, plugin);
1215
1216 }
1217
1218
1219 /**
1220  * Task used to clean up expired addresses for a plugin.
1221  *
1222  * @param cls closure
1223  * @param tc context
1224  */
1225 static void
1226 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1227 {
1228   struct TransportPlugin *plugin = cls;
1229   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1230   update_addresses (plugin, GNUNET_NO);
1231 }
1232
1233
1234 /**
1235  * Function that must be called by each plugin to notify the
1236  * transport service about the addresses under which the transport
1237  * provided by the plugin can be reached.
1238  *
1239  * @param cls closure
1240  * @param name name of the transport that generated the address
1241  * @param addr one of the addresses of the host, NULL for the last address
1242  *        the specific address format depends on the transport
1243  * @param addrlen length of the address
1244  * @param expires when should this address automatically expire?
1245  */
1246 static void
1247 plugin_env_notify_address (void *cls,
1248                            const char *name,
1249                            const void *addr,
1250                            size_t addrlen,
1251                            struct GNUNET_TIME_Relative expires)
1252 {
1253   struct TransportPlugin *p = cls;
1254   struct AddressList *al;
1255   struct GNUNET_TIME_Absolute abex;
1256
1257   abex = GNUNET_TIME_relative_to_absolute (expires);
1258   GNUNET_assert (p == find_transport (name));
1259
1260   al = p->addresses;
1261   while (al != NULL)
1262     {
1263       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1264         {
1265           if (al->expires.value < abex.value)
1266             al->expires = abex;
1267           return;
1268         }
1269       al = al->next;
1270     }
1271 #if DEBUG_TRANSPORT
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "Plugin `%s' informs us about a new address `%s'\n", name,
1274               GNUNET_a2s(addr, addrlen));
1275 #endif
1276   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1277   al->addr = &al[1];
1278   al->next = p->addresses;
1279   p->addresses = al;
1280   al->expires = abex;
1281   al->addrlen = addrlen;
1282   memcpy (&al[1], addr, addrlen);
1283   update_addresses (p, GNUNET_YES);
1284 }
1285
1286
1287 /**
1288  * FIXME: document.
1289  */
1290 struct LookupHelloContext
1291 {
1292   GNUNET_TRANSPORT_AddressCallback iterator;
1293
1294   void *iterator_cls;
1295 };
1296
1297
1298 /**
1299  * FIXME: document.
1300  */
1301 static int
1302 lookup_address_callback (void *cls,
1303                          const char *tname,
1304                          struct GNUNET_TIME_Absolute expiration,
1305                          const void *addr, size_t addrlen)
1306 {
1307   struct LookupHelloContext *lhc = cls;
1308   lhc->iterator (lhc->iterator_cls, tname, addr, addrlen);
1309   return GNUNET_OK;
1310 }
1311
1312
1313 /**
1314  * FIXME: document.
1315  */
1316 static void
1317 lookup_hello_callback (void *cls,
1318                        const struct GNUNET_PeerIdentity *peer,
1319                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1320 {
1321   struct LookupHelloContext *lhc = cls;
1322
1323   if (peer == NULL)
1324     {
1325       lhc->iterator (lhc->iterator_cls, NULL, NULL, 0);
1326       GNUNET_free (lhc);
1327       return;
1328     }
1329   if (h == NULL)
1330     return;
1331   GNUNET_HELLO_iterate_addresses (h,
1332                                   GNUNET_NO, &lookup_address_callback, lhc);
1333 }
1334
1335
1336 /**
1337  * Function that allows a transport to query the known
1338  * network addresses for a given peer.
1339  *
1340  * @param cls closure
1341  * @param timeout after how long should we time out?
1342  * @param target which peer are we looking for?
1343  * @param iter function to call for each known address
1344  * @param iter_cls closure for iter
1345  */
1346 static void
1347 plugin_env_lookup_address (void *cls,
1348                            struct GNUNET_TIME_Relative timeout,
1349                            const struct GNUNET_PeerIdentity *target,
1350                            GNUNET_TRANSPORT_AddressCallback iter,
1351                            void *iter_cls)
1352 {
1353   struct LookupHelloContext *lhc;
1354
1355   lhc = GNUNET_malloc (sizeof (struct LookupHelloContext));
1356   lhc->iterator = iter;
1357   lhc->iterator_cls = iter_cls;
1358   GNUNET_PEERINFO_for_all (cfg,
1359                            sched,
1360                            target, 0, timeout, &lookup_hello_callback, &lhc);
1361 }
1362
1363
1364 /**
1365  * Notify all of our clients about a peer connecting.
1366  */
1367 static void
1368 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1369                         struct GNUNET_TIME_Relative latency)
1370 {
1371   struct ConnectInfoMessage cim;
1372   struct TransportClient *cpos;
1373
1374 #if DEBUG_TRANSPORT
1375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376               "Informing clients about peer `%4s' connecting to us\n",
1377               GNUNET_i2s (peer));
1378 #endif
1379   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1380   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1381   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000));
1382   cim.latency = GNUNET_TIME_relative_hton (latency);
1383   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1384   cpos = clients;
1385   while (cpos != NULL)
1386     {
1387       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1388       cpos = cpos->next;
1389     }
1390 }
1391
1392
1393 /**
1394  * Notify all of our clients about a peer disconnecting.
1395  */
1396 static void
1397 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1398 {
1399   struct DisconnectInfoMessage dim;
1400   struct TransportClient *cpos;
1401
1402 #if DEBUG_TRANSPORT
1403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1404               "Informing clients about peer `%4s' disconnecting\n",
1405               GNUNET_i2s (peer));
1406 #endif
1407   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1408   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1409   dim.reserved = htonl (0);
1410   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1411   cpos = clients;
1412   while (cpos != NULL)
1413     {
1414       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1415       cpos = cpos->next;
1416     }
1417 }
1418
1419
1420 /**
1421  * Copy any validated addresses to buf.
1422  *
1423  * @return 0 once all addresses have been
1424  *         returned
1425  */
1426 static size_t
1427 list_validated_addresses (void *cls, size_t max, void *buf)
1428 {
1429   struct ValidationAddress **va = cls;
1430   size_t ret;
1431
1432   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1433     *va = (*va)->next;
1434   if (NULL == *va)
1435     return 0;
1436   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1437                                   (*va)->expiration,
1438                                   &(*va)[1], (*va)->addr_len, buf, max);
1439   *va = (*va)->next;
1440   return ret;
1441 }
1442
1443
1444 /**
1445  * HELLO validation cleanup task.
1446  */
1447 static void
1448 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1449 {
1450   struct ValidationAddress *va;
1451   struct ValidationList *pos;
1452   struct ValidationList *prev;
1453   struct GNUNET_TIME_Absolute now;
1454   struct GNUNET_TIME_Absolute first;
1455   struct GNUNET_HELLO_Message *hello;
1456   struct GNUNET_PeerIdentity pid;
1457   struct NeighbourList *n;
1458
1459 #if DEBUG_TRANSPORT
1460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1461               "HELLO validation cleanup background task running...\n");
1462 #endif
1463   now = GNUNET_TIME_absolute_get ();
1464   prev = NULL;
1465   pos = pending_validations;
1466   while (pos != NULL)
1467     {
1468       if (pos->timeout.value < now.value)
1469         {
1470           if (prev == NULL)
1471             pending_validations = pos->next;
1472           else
1473             prev->next = pos->next;
1474           va = pos->addresses;
1475           hello = GNUNET_HELLO_create (&pos->publicKey,
1476                                        &list_validated_addresses, &va);
1477           GNUNET_CRYPTO_hash (&pos->publicKey,
1478                               sizeof (struct
1479                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1480                               &pid.hashPubKey);
1481 #if DEBUG_TRANSPORT
1482           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1483                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1484                       "HELLO", GNUNET_i2s (&pid));
1485 #endif
1486           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1487           n = find_neighbour (&pid);
1488           if (NULL != n)
1489             try_transmission_to_peer (n);           
1490           GNUNET_free (hello);
1491           while (NULL != (va = pos->addresses))
1492             {
1493               pos->addresses = va->next;
1494               GNUNET_free (va->transport_name);
1495               GNUNET_free (va);
1496             }
1497           GNUNET_free (pos);
1498           if (prev == NULL)
1499             pos = pending_validations;
1500           else
1501             pos = prev->next;
1502           continue;
1503         }
1504       prev = pos;
1505       pos = pos->next;
1506     }
1507
1508   /* finally, reschedule cleanup if needed; list is
1509      ordered by timeout, so we need the last element... */
1510   if (NULL != pending_validations)
1511     {
1512       first = pending_validations->timeout;
1513       pos = pending_validations;
1514       while (pos != NULL) 
1515         {
1516           first = GNUNET_TIME_absolute_min (first, pos->timeout);
1517           pos = pos->next;
1518         }
1519       GNUNET_SCHEDULER_add_delayed (sched,
1520                                     GNUNET_TIME_absolute_get_remaining (first),
1521                                     &cleanup_validation, NULL);
1522     }
1523 }
1524
1525
1526
1527
1528 /**
1529  * Function that will be called if we receive a validation
1530  * of an address challenge that we transmitted to another
1531  * peer.  Note that the validation should only be considered
1532  * acceptable if the challenge matches AND if the sender
1533  * address is at least a plausible address for this peer
1534  * (otherwise we may be seeing a MiM attack).
1535  *
1536  * @param cls closure
1537  * @param name name of the transport that generated the address
1538  * @param peer who responded to our challenge
1539  * @param challenge the challenge number we presumably used
1540  * @param sender_addr string describing our sender address (as observed
1541  *         by the other peer in human-readable format)
1542  */
1543 static void
1544 plugin_env_notify_validation (void *cls,
1545                               const char *name,
1546                               const struct GNUNET_PeerIdentity *peer,
1547                               uint32_t challenge,
1548                               const char *sender_addr)
1549 {
1550   unsigned int not_done;
1551   int matched;
1552   struct ValidationList *pos;
1553   struct ValidationAddress *va;
1554   struct GNUNET_PeerIdentity id;
1555
1556   pos = pending_validations;
1557   while (pos != NULL)
1558     {
1559       GNUNET_CRYPTO_hash (&pos->publicKey,
1560                           sizeof (struct
1561                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1562                           &id.hashPubKey);
1563       if (0 ==
1564           memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1565         break;
1566       pos = pos->next;
1567     }
1568   if (pos == NULL)
1569     {
1570       /* TODO: call statistics (unmatched PONG) */
1571       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1572                   _
1573                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1574                   GNUNET_i2s(peer));
1575       return;
1576     }
1577   not_done = 0;
1578   matched = GNUNET_NO;
1579   va = pos->addresses;
1580   while (va != NULL)
1581     {
1582       if (va->challenge == challenge)
1583         {
1584 #if DEBUG_TRANSPORT
1585           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586                       "Confirmed validity of address, peer `%4s' has address `%s'.\n",
1587                       GNUNET_i2s (peer),
1588                       GNUNET_a2s ((const struct sockaddr*) &va[1], 
1589                                   va->addr_len));
1590 #endif
1591           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1592                       _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1593                       sender_addr, 
1594                       name);
1595           va->ok = GNUNET_YES;
1596           va->expiration =
1597             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1598           matched = GNUNET_YES;
1599         }        
1600       if (va->ok != GNUNET_YES)
1601         not_done++;
1602       va = va->next;
1603     }
1604   if (GNUNET_NO == matched)
1605     {
1606       /* TODO: call statistics (unmatched PONG) */
1607       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1608                   _
1609                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1610                   "PONG", "PING");
1611     }
1612   if (0 == not_done)
1613     {
1614 #if DEBUG_TRANSPORT
1615       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1616                   "All addresses validated, will now construct `%s' for `%4s'.\n",
1617                   "HELLO",
1618                   GNUNET_i2s (peer));
1619 #endif
1620       pos->timeout.value = 0;
1621       GNUNET_SCHEDULER_add_with_priority (sched,
1622                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1623                                           &cleanup_validation, NULL);
1624     }
1625   else
1626     {
1627 #if DEBUG_TRANSPORT
1628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1629                   "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n",
1630                   not_done,
1631                   "PONG",
1632                   "HELLO",
1633                   GNUNET_i2s (peer));
1634 #endif
1635     }
1636 }
1637
1638
1639 struct CheckHelloValidatedContext
1640 {
1641   /**
1642    * Plugin for which we are validating.
1643    */
1644   struct TransportPlugin *plugin;
1645
1646   /**
1647    * Hello that we are validating.
1648    */
1649   struct GNUNET_HELLO_Message *hello;
1650
1651   /**
1652    * Validation list being build.
1653    */
1654   struct ValidationList *e;
1655
1656 };
1657
1658
1659 /**
1660  * Append the given address to the list of entries
1661  * that need to be validated.
1662  */
1663 static int
1664 run_validation (void *cls,
1665                 const char *tname,
1666                 struct GNUNET_TIME_Absolute expiration,
1667                 const void *addr, size_t addrlen)
1668 {
1669   struct ValidationList *e = cls;
1670   struct TransportPlugin *tp;
1671   struct ValidationAddress *va;
1672   struct GNUNET_PeerIdentity id;
1673
1674   tp = find_transport (tname);
1675   if (tp == NULL)
1676     {
1677       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1678                   GNUNET_ERROR_TYPE_BULK,
1679                   _
1680                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1681                   tname);
1682       return GNUNET_OK;
1683     }
1684   GNUNET_CRYPTO_hash (&e->publicKey,
1685                       sizeof (struct
1686                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1687                       &id.hashPubKey);
1688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1690               GNUNET_a2s(addr, addrlen),
1691               tname,
1692               GNUNET_i2s(&id));
1693
1694   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1695   va->next = e->addresses;
1696   e->addresses = va;
1697   va->transport_name = GNUNET_strdup (tname);
1698   va->addr_len = addrlen;
1699   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1700                                             (unsigned int) -1);
1701   memcpy (&va[1], addr, addrlen);
1702   return GNUNET_OK;
1703 }
1704
1705
1706 /**
1707  * Check if addresses in validated hello "h" overlap with
1708  * those in "chvc->hello" and update "chvc->hello" accordingly,
1709  * removing those addresses that have already been validated.
1710  */
1711 static void
1712 check_hello_validated (void *cls,
1713                        const struct GNUNET_PeerIdentity *peer,
1714                        const struct GNUNET_HELLO_Message *h, 
1715                        uint32_t trust)
1716 {
1717   struct CheckHelloValidatedContext *chvc = cls;
1718   struct ValidationAddress *va;
1719   struct TransportPlugin *tp;
1720   int first_call;
1721   struct GNUNET_PeerIdentity apeer;
1722
1723   first_call = GNUNET_NO;
1724   if (chvc->e == NULL)
1725     {
1726       first_call = GNUNET_YES;
1727       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1728       GNUNET_assert (GNUNET_OK ==
1729                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1730                                            &chvc->e->publicKey));
1731       chvc->e->timeout =
1732         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1733       chvc->e->next = pending_validations;
1734       pending_validations = chvc->e;
1735     }
1736   if (h != NULL)
1737     {
1738       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1739                                           h,
1740                                           GNUNET_TIME_absolute_get (),
1741                                           &run_validation, chvc->e);
1742     }
1743   else if (GNUNET_YES == first_call)
1744     {
1745       /* no existing HELLO, all addresses are new */
1746       GNUNET_HELLO_iterate_addresses (chvc->hello,
1747                                       GNUNET_NO, &run_validation, chvc->e);
1748     }
1749   if (h != NULL)
1750     return;                     /* wait for next call */
1751   /* finally, transmit validation attempts */
1752   GNUNET_assert (GNUNET_OK ==
1753                  GNUNET_HELLO_get_id (chvc->hello,
1754                                       &apeer));
1755 #if DEBUG_TRANSPORT
1756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757               "Ready to validate addresses from `%s' message for peer `%4s'\n",
1758               "HELLO", GNUNET_i2s (&apeer));
1759 #endif
1760   va = chvc->e->addresses;
1761   while (va != NULL)
1762     {
1763 #if DEBUG_TRANSPORT
1764       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765                   "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n",
1766                   va->transport_name,
1767                   "HELLO",
1768                   GNUNET_a2s ((const struct sockaddr*) &va[1], 
1769                               va->addr_len),
1770                   GNUNET_i2s (&apeer));
1771 #endif
1772       tp = find_transport (va->transport_name);
1773       GNUNET_assert (tp != NULL);
1774       if (GNUNET_OK !=
1775           tp->api->validate (tp->api->cls,
1776                              &apeer,
1777                              va->challenge,
1778                              HELLO_VERIFICATION_TIMEOUT,
1779                              &va[1],
1780                              va->addr_len))
1781         va->ok = GNUNET_SYSERR;
1782       va = va->next;
1783     }
1784   GNUNET_SCHEDULER_add_delayed (sched,
1785                                 GNUNET_TIME_absolute_get_remaining (chvc->e->timeout), 
1786                                 &cleanup_validation, NULL);
1787   GNUNET_free (chvc);
1788 }
1789
1790
1791 /**
1792  * Process HELLO-message.
1793  *
1794  * @param plugin transport involved, may be NULL
1795  * @param message the actual message
1796  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1797  */
1798 static int
1799 process_hello (struct TransportPlugin *plugin,
1800                const struct GNUNET_MessageHeader *message)
1801 {
1802   struct ValidationList *e;
1803   uint16_t hsize;
1804   struct GNUNET_PeerIdentity target;
1805   const struct GNUNET_HELLO_Message *hello;
1806   struct CheckHelloValidatedContext *chvc;
1807   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1808
1809   hsize = ntohs (message->size);
1810   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1811       (hsize < sizeof (struct GNUNET_MessageHeader)))
1812     {
1813       GNUNET_break (0);
1814       return GNUNET_SYSERR;
1815     }
1816   /* first, check if load is too high */
1817   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1818     {
1819       /* TODO: call to stats? */
1820       return GNUNET_OK;
1821     }
1822   hello = (const struct GNUNET_HELLO_Message *) message;
1823   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1824     {
1825       GNUNET_break_op (0);
1826       return GNUNET_SYSERR;
1827     }
1828   GNUNET_CRYPTO_hash (&publicKey,
1829                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1830                       &target.hashPubKey);
1831 #if DEBUG_TRANSPORT
1832   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1833               "Processing `%s' message for `%4s'\n",
1834               "HELLO", GNUNET_i2s (&target));
1835 #endif
1836   /* check if a HELLO for this peer is already on the validation list */
1837   e = pending_validations;
1838   while (e != NULL)
1839     {
1840       if (0 == memcmp (&e->publicKey,
1841                        &publicKey,
1842                        sizeof (struct
1843                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1844         {
1845           /* TODO: call to stats? */
1846 #if DEBUG_TRANSPORT
1847           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1849                       "HELLO", GNUNET_i2s (&target));
1850 #endif    
1851           return GNUNET_OK;
1852         }
1853       e = e->next;
1854     }
1855   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1856   chvc->plugin = plugin;
1857   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1858   memcpy (chvc->hello, hello, hsize);
1859   /* finally, check if HELLO was previously validated
1860      (continuation will then schedule actual validation) */
1861   GNUNET_PEERINFO_for_all (cfg,
1862                            sched,
1863                            &target,
1864                            0,
1865                            HELLO_VERIFICATION_TIMEOUT,
1866                            &check_hello_validated, chvc);
1867   return GNUNET_OK;
1868 }
1869
1870
1871 /**
1872  * The peer specified by the given neighbour has timed-out or a plugin
1873  * has disconnected.  We may either need to do nothing (other plugins
1874  * still up), or trigger a full disconnect and clean up.  This
1875  * function updates our state and do the necessary notifications.
1876  * Also notifies our clients that the neighbour is now officially
1877  * gone.
1878  *
1879  * @param n the neighbour list entry for the peer
1880  * @param check should we just check if all plugins
1881  *        disconnected or must we ask all plugins to
1882  *        disconnect?
1883  */
1884 static void
1885 disconnect_neighbour (struct NeighbourList *n,
1886                       int check)
1887 {
1888   struct ReadyList *rpos;
1889   struct NeighbourList *npos;
1890   struct NeighbourList *nprev;
1891   struct MessageQueue *mq;
1892   
1893   if (GNUNET_YES == check)
1894     {
1895       rpos = n->plugins;
1896       while (NULL != rpos)
1897         {
1898           if (GNUNET_YES == rpos->connected)
1899             return; /* still connected */
1900           rpos = rpos->next;
1901         }
1902     }
1903
1904 #if DEBUG_TRANSPORT
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1906               "Disconnecting from `%4s'\n",
1907               GNUNET_i2s(&n->id));
1908 #endif
1909   /* remove n from neighbours list */
1910   nprev = NULL;
1911   npos = neighbours;
1912   while ((npos != NULL) && (npos != n))
1913     {
1914       nprev = npos;
1915       npos = npos->next;
1916     }
1917   GNUNET_assert (npos != NULL);
1918   if (nprev == NULL)
1919     neighbours = n->next;
1920   else
1921     nprev->next = n->next;
1922
1923   /* notify all clients about disconnect */
1924   notify_clients_disconnect (&n->id);
1925
1926   /* clean up all plugins, cancel connections and pending transmissions */
1927   while (NULL != (rpos = n->plugins))
1928     {
1929       n->plugins = rpos->next;
1930       GNUNET_assert (rpos->neighbour == n);
1931       if (GNUNET_YES == rpos->connected)
1932         rpos->plugin->api->cancel (rpos->plugin->api->cls,
1933                                    rpos,
1934                                    &n->id);
1935       GNUNET_free (rpos);
1936     }
1937
1938   /* free all messages on the queue */
1939   while (NULL != (mq = n->messages))
1940     {
1941       n->messages = mq->next;
1942       GNUNET_assert (mq->neighbour == n);
1943       GNUNET_free (mq);
1944     }
1945   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1946     GNUNET_SCHEDULER_cancel (sched,
1947                              n->timeout_task);
1948   /* finally, free n itself */
1949   GNUNET_free (n);
1950 }
1951
1952
1953 /**
1954  * Add an entry for each of our transport plugins
1955  * (that are able to send) to the list of plugins
1956  * for this neighbour.
1957  *
1958  * @param neighbour to initialize
1959  */
1960 static void
1961 add_plugins (struct NeighbourList *neighbour)
1962 {
1963   struct TransportPlugin *tp;
1964   struct ReadyList *rl;
1965
1966   neighbour->retry_plugins_time
1967     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1968   tp = plugins;
1969   while (tp != NULL)
1970     {
1971       if (tp->api->send != NULL)
1972         {
1973           rl = GNUNET_malloc (sizeof (struct ReadyList));
1974           rl->next = neighbour->plugins;
1975           neighbour->plugins = rl;
1976           rl->plugin = tp;
1977           rl->neighbour = neighbour;
1978           rl->transmit_ready = GNUNET_YES;
1979         }
1980       tp = tp->next;
1981     }
1982 }
1983
1984
1985 static void
1986 neighbour_timeout_task (void *cls,
1987                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1988 {
1989   struct NeighbourList *n = cls;
1990
1991 #if DEBUG_TRANSPORT
1992   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1993               "Neighbour `%4s' has timed out!\n",
1994               GNUNET_i2s(&n->id));
1995 #endif
1996   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1997   disconnect_neighbour (n, GNUNET_NO);
1998 }
1999
2000
2001 /**
2002  * Create a fresh entry in our neighbour list for the given peer.
2003  * Will try to transmit our current HELLO to the new neighbour.  Also
2004  * notifies our clients about the new "connection".
2005  *
2006  * @param peer the peer for which we create the entry
2007  * @return the new neighbour list entry
2008  */
2009 static struct NeighbourList *
2010 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
2011 {
2012   struct NeighbourList *n;
2013
2014 #if DEBUG_TRANSPORT
2015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2016               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
2017               GNUNET_i2s (peer));
2018 #endif
2019   GNUNET_assert (our_hello != NULL);
2020   n = GNUNET_malloc (sizeof (struct NeighbourList));
2021   n->next = neighbours;
2022   neighbours = n;
2023   n->id = *peer;
2024   n->last_quota_update = GNUNET_TIME_absolute_get ();
2025   n->peer_timeout =
2026     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2027   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2028   add_plugins (n);
2029   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
2030                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2031                                                   &neighbour_timeout_task, n);
2032   transmit_to_peer (NULL, 0,
2033                     (const struct GNUNET_MessageHeader *) our_hello,
2034                     GNUNET_YES, n);
2035   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
2036   return n;
2037 }
2038
2039
2040 /**
2041  * Function called by the plugin for each received message.
2042  * Update data volumes, possibly notify plugins about
2043  * reducing the rate at which they read from the socket
2044  * and generally forward to our receive callback.
2045  *
2046  * @param cls the "struct TransportPlugin *" we gave to the plugin
2047  * @param service_context value passed to the transport-service
2048  *        to identify the neighbour; will be NULL on the first
2049  *        call for a given peer
2050  * @param latency estimated latency for communicating with the
2051  *             given peer
2052  * @param peer (claimed) identity of the other peer
2053  * @param message the message, NULL if peer was disconnected
2054  * @return the new service_context that the plugin should use
2055  *         for future receive calls for messages from this
2056  *         particular peer
2057  */
2058 static struct ReadyList *
2059 plugin_env_receive (void *cls,
2060                     struct ReadyList *service_context,
2061                     struct GNUNET_TIME_Relative latency,
2062                     const struct GNUNET_PeerIdentity *peer,
2063                     const struct GNUNET_MessageHeader *message)
2064 {
2065   const struct GNUNET_MessageHeader ack = {
2066     htons (sizeof (struct GNUNET_MessageHeader)),
2067     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
2068   };
2069   struct TransportPlugin *plugin = cls;
2070   struct TransportClient *cpos;
2071   struct InboundMessage *im;
2072   uint16_t msize;
2073   struct NeighbourList *n;
2074
2075   if (service_context != NULL)
2076     {
2077       n = service_context->neighbour;
2078       GNUNET_assert (n != NULL);
2079     }
2080   else
2081     {
2082       n = find_neighbour (peer);
2083       if (n == NULL)
2084         {
2085           if (message == NULL)
2086             return NULL;        /* disconnect of peer already marked down */
2087           n = setup_new_neighbour (peer);
2088         }
2089       service_context = n->plugins;
2090       while ((service_context != NULL) && (plugin != service_context->plugin))
2091         service_context = service_context->next;
2092       GNUNET_assert ((plugin->api->send == NULL) ||
2093                      (service_context != NULL));
2094     }
2095   if (message == NULL)
2096     {
2097 #if DEBUG_TRANSPORT
2098       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2099                   "Receive failed from `%4s', triggering disconnect\n",
2100                   GNUNET_i2s(&n->id));
2101 #endif
2102       /* TODO: call stats */
2103       if (service_context != NULL) 
2104         service_context->connected = GNUNET_NO;        
2105       disconnect_neighbour (n, GNUNET_YES);
2106       return NULL;
2107     }
2108 #if DEBUG_TRANSPORT
2109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2110               "Processing message of type `%u' received by plugin...\n",
2111               ntohs (message->type));
2112 #endif
2113   if (service_context != NULL)
2114     {
2115       if (service_context->connected == GNUNET_NO)
2116         {
2117           service_context->connected = GNUNET_YES;
2118           service_context->transmit_ready = GNUNET_YES;
2119           service_context->connect_attempts++;
2120         }
2121       service_context->timeout
2122         = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2123       service_context->latency = latency;
2124     }
2125   /* update traffic received amount ... */
2126   msize = ntohs (message->size);
2127   n->last_received += msize;
2128   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2129   n->peer_timeout =
2130     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2131   n->timeout_task =
2132     GNUNET_SCHEDULER_add_delayed (sched, 
2133                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2134                                   &neighbour_timeout_task, n);
2135   update_quota (n);
2136   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2137     {
2138       /* dropping message due to frequent inbound volume violations! */
2139       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2140                   GNUNET_ERROR_TYPE_BULK,
2141                   _
2142                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2143       /* TODO: call stats */
2144       GNUNET_assert ( (service_context == NULL) ||
2145                       (NULL != service_context->neighbour) );
2146       return service_context;
2147     }
2148   switch (ntohs (message->type))
2149     {
2150     case GNUNET_MESSAGE_TYPE_HELLO:
2151 #if DEBUG_TRANSPORT
2152       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2153                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2154                   GNUNET_i2s(peer));
2155 #endif
2156       process_hello (plugin, message);
2157 #if DEBUG_TRANSPORT
2158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2159                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2160                   GNUNET_i2s(peer));
2161 #endif
2162       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2163       break;
2164     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2165       n->saw_ack = GNUNET_YES;
2166       /* intentional fall-through! */
2167     default:
2168 #if DEBUG_TRANSPORT
2169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170                   "Received message of type %u from `%4s', sending to all clients.\n",
2171                   ntohs (message->type),
2172                   GNUNET_i2s(peer));
2173 #endif
2174       /* transmit message to all clients */
2175       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2176       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2177       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2178       im->latency = GNUNET_TIME_relative_hton (latency);
2179       im->peer = *peer;
2180       memcpy (&im[1], message, msize);
2181
2182       cpos = clients;
2183       while (cpos != NULL)
2184         {
2185           transmit_to_client (cpos, &im->header, GNUNET_YES);
2186           cpos = cpos->next;
2187         }
2188       GNUNET_free (im);
2189     }
2190   GNUNET_assert ( (service_context == NULL) ||
2191                   (NULL != service_context->neighbour) );
2192   return service_context;
2193 }
2194
2195
2196 /**
2197  * Handle START-message.  This is the first message sent to us
2198  * by any client which causes us to add it to our list.
2199  *
2200  * @param cls closure (always NULL)
2201  * @param client identification of the client
2202  * @param message the actual message
2203  */
2204 static void
2205 handle_start (void *cls,
2206               struct GNUNET_SERVER_Client *client,
2207               const struct GNUNET_MessageHeader *message)
2208 {
2209   struct TransportClient *c;
2210   struct ConnectInfoMessage cim;
2211   struct NeighbourList *n;
2212   struct InboundMessage *im;
2213   struct GNUNET_MessageHeader *ack;
2214
2215 #if DEBUG_TRANSPORT
2216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217               "Received `%s' request from client\n", "START");
2218 #endif
2219   c = clients;
2220   while (c != NULL)
2221     {
2222       if (c->client == client)
2223         {
2224           /* client already on our list! */
2225           GNUNET_break (0);
2226           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2227           return;
2228         }
2229       c = c->next;
2230     }
2231   c = GNUNET_malloc (sizeof (struct TransportClient));
2232   c->next = clients;
2233   clients = c;
2234   c->client = client;
2235   if (our_hello != NULL)
2236     {
2237 #if DEBUG_TRANSPORT
2238       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2239                   "Sending our own `%s' to new client\n",
2240                   "HELLO");
2241 #endif
2242       transmit_to_client (c,
2243                           (const struct GNUNET_MessageHeader *) our_hello,
2244                           GNUNET_NO);
2245       /* tell new client about all existing connections */
2246       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2247       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2248       cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2249       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2250       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2251                           sizeof (struct GNUNET_MessageHeader));
2252       im->header.size = htons (sizeof (struct InboundMessage) +
2253                                sizeof (struct GNUNET_MessageHeader));
2254       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2255       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2256       ack = (struct GNUNET_MessageHeader *) &im[1];
2257       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2258       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2259       for (n = neighbours; n != NULL; n = n->next)
2260         {
2261           cim.id = n->id;
2262           transmit_to_client (c, &cim.header, GNUNET_NO);
2263           if (n->saw_ack)
2264             {
2265               im->peer = n->id;
2266               transmit_to_client (c, &im->header, GNUNET_NO);
2267             }
2268         }
2269       GNUNET_free (im);
2270     }
2271   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2272 }
2273
2274
2275 /**
2276  * Handle HELLO-message.
2277  *
2278  * @param cls closure (always NULL)
2279  * @param client identification of the client
2280  * @param message the actual message
2281  */
2282 static void
2283 handle_hello (void *cls,
2284               struct GNUNET_SERVER_Client *client,
2285               const struct GNUNET_MessageHeader *message)
2286 {
2287   int ret;
2288
2289 #if DEBUG_TRANSPORT
2290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291               "Received `%s' request from client\n", "HELLO");
2292 #endif
2293   ret = process_hello (NULL, message);
2294   GNUNET_SERVER_receive_done (client, ret);
2295 }
2296
2297
2298 /**
2299  * Handle SEND-message.
2300  *
2301  * @param cls closure (always NULL)
2302  * @param client identification of the client
2303  * @param message the actual message
2304  */
2305 static void
2306 handle_send (void *cls,
2307              struct GNUNET_SERVER_Client *client,
2308              const struct GNUNET_MessageHeader *message)
2309 {
2310   struct TransportClient *tc;
2311   struct NeighbourList *n;
2312   const struct OutboundMessage *obm;
2313   const struct GNUNET_MessageHeader *obmm;
2314   uint16_t size;
2315   uint16_t msize;
2316
2317   size = ntohs (message->size);
2318   if (size <
2319       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2320     {
2321       GNUNET_break (0);
2322       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2323       return;
2324     }
2325   obm = (const struct OutboundMessage *) message;
2326 #if DEBUG_TRANSPORT
2327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2328               "Received `%s' request from client with target `%4s'\n",
2329               "SEND", GNUNET_i2s (&obm->peer));
2330 #endif
2331   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2332   msize = ntohs (obmm->size);
2333   if (size != msize + sizeof (struct OutboundMessage))
2334     {
2335       GNUNET_break (0);
2336       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2337       return;
2338     }
2339   n = find_neighbour (&obm->peer);
2340   if (n == NULL)
2341     n = setup_new_neighbour (&obm->peer);
2342   tc = clients;
2343   while ((tc != NULL) && (tc->client != client))
2344     tc = tc->next;
2345
2346 #if DEBUG_TRANSPORT
2347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2348               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2349               ntohs (obmm->size),
2350               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2351 #endif
2352   transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n);
2353   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2354 }
2355
2356
2357 /**
2358  * Handle SET_QUOTA-message.
2359  *
2360  * @param cls closure (always NULL)
2361  * @param client identification of the client
2362  * @param message the actual message
2363  */
2364 static void
2365 handle_set_quota (void *cls,
2366                   struct GNUNET_SERVER_Client *client,
2367                   const struct GNUNET_MessageHeader *message)
2368 {
2369   const struct QuotaSetMessage *qsm =
2370     (const struct QuotaSetMessage *) message;
2371   struct NeighbourList *n;
2372   struct TransportPlugin *p;
2373   struct ReadyList *rl;
2374
2375 #if DEBUG_TRANSPORT
2376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377               "Received `%s' request from client for peer `%4s'\n",
2378               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2379 #endif
2380   n = find_neighbour (&qsm->peer);
2381   if (n == NULL)
2382     {
2383       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2384       return;
2385     }
2386   update_quota (n);
2387   if (n->quota_in < ntohl (qsm->quota_in))
2388     n->last_quota_update = GNUNET_TIME_absolute_get ();
2389   n->quota_in = ntohl (qsm->quota_in);
2390   rl = n->plugins;
2391   while (rl != NULL)
2392     {
2393       p = rl->plugin;
2394       p->api->set_receive_quota (p->api->cls,
2395                                  &qsm->peer, ntohl (qsm->quota_in));
2396       rl = rl->next;
2397     }
2398   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2399 }
2400
2401
2402 /**
2403  * Handle TRY_CONNECT-message.
2404  *
2405  * @param cls closure (always NULL)
2406  * @param client identification of the client
2407  * @param message the actual message
2408  */
2409 static void
2410 handle_try_connect (void *cls,
2411                     struct GNUNET_SERVER_Client *client,
2412                     const struct GNUNET_MessageHeader *message)
2413 {
2414   const struct TryConnectMessage *tcm;
2415
2416   tcm = (const struct TryConnectMessage *) message;
2417 #if DEBUG_TRANSPORT
2418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419               "Received `%s' request from client %p asking to connect to `%4s'\n",
2420               "TRY_CONNECT",
2421               client,
2422               GNUNET_i2s (&tcm->peer));
2423 #endif
2424   if (NULL == find_neighbour (&tcm->peer))
2425     setup_new_neighbour (&tcm->peer);
2426 #if DEBUG_TRANSPORT
2427   else
2428     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2429                 "Client asked to connect to `%4s', but connection already exists\n",
2430                 "TRY_CONNECT", 
2431                 GNUNET_i2s (&tcm->peer));
2432 #endif    
2433   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2434 }
2435
2436 static void
2437 transmit_address_to_client (void *cls, const char *address)
2438 {
2439         struct GNUNET_SERVER_TransmitContext *tc = cls;
2440         size_t slen;
2441
2442         if (NULL == address)
2443                 slen = 0;
2444         else
2445                 slen = strlen (address) + 1;
2446         GNUNET_SERVER_transmit_context_append (tc, address, slen,
2447                                 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY);
2448         if (NULL == address)
2449                 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
2450 }
2451
2452 /**
2453  * Handle AddressLookup-message.
2454  *
2455  * @param cls closure (always NULL)
2456  * @param client identification of the client
2457  * @param message the actual message
2458  */
2459 static void
2460 handle_address_lookup (void *cls,
2461                       struct GNUNET_SERVER_Client *client,
2462                       const struct GNUNET_MessageHeader *message)
2463 {
2464         const struct AddressLookupMessage *alum;
2465         struct TransportPlugin *lsPlugin;
2466         const char *nameTransport;
2467         const char *address;
2468         uint16_t size;
2469         struct GNUNET_SERVER_TransmitContext *tc;
2470
2471         size = ntohs (message->size);
2472         if (size < sizeof(struct AddressLookupMessage))
2473         {
2474                 GNUNET_break_op (0);
2475                 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2476                 return;
2477         }
2478         alum = (const struct AddressLookupMessage *) message;
2479         uint32_t addressLen = ntohl(alum->addrlen);
2480         if (size <= sizeof(struct AddressLookupMessage) + addressLen)
2481         {
2482                 GNUNET_break_op (0);
2483                 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2484                 return;
2485         }
2486         address = (const char *)&alum[1];
2487         nameTransport = (const char*)&address[addressLen];
2488         if (nameTransport [size - sizeof (struct AddressLookupMessage) - addressLen -1] != '\0')
2489         {
2490                 GNUNET_break_op (0);
2491                 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2492                 return;
2493         }
2494         struct GNUNET_TIME_Absolute timeout= GNUNET_TIME_absolute_ntoh(alum->timeout);
2495         struct GNUNET_TIME_Relative rtimeout = GNUNET_TIME_absolute_get_remaining(timeout);
2496         lsPlugin = find_transport(nameTransport);
2497         if (NULL == lsPlugin)
2498         {
2499                 tc = GNUNET_SERVER_transmit_context_create (client);
2500                 GNUNET_SERVER_transmit_context_append (tc, NULL, 0, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY);
2501                 GNUNET_SERVER_transmit_context_run (tc, rtimeout);
2502                 return;
2503         }
2504         tc = GNUNET_SERVER_transmit_context_create (client);
2505         lsPlugin->api->address_pretty_printer(cls, nameTransport,
2506                                 address, addressLen, GNUNET_YES, rtimeout, &transmit_address_to_client, tc);
2507 }
2508
2509 /**
2510  * List of handlers for the messages understood by this
2511  * service.
2512  */
2513 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2514   {&handle_start, NULL,
2515    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2516   {&handle_hello, NULL,
2517    GNUNET_MESSAGE_TYPE_HELLO, 0},
2518   {&handle_send, NULL,
2519    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2520   {&handle_set_quota, NULL,
2521    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2522   {&handle_try_connect, NULL,
2523    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2524    sizeof (struct TryConnectMessage)},
2525   {&handle_address_lookup, NULL,
2526    GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
2527    0 },
2528   {NULL, NULL, 0, 0}
2529 };
2530
2531
2532 /**
2533  * Setup the environment for this plugin.
2534  */
2535 static void
2536 create_environment (struct TransportPlugin *plug)
2537 {
2538   plug->env.cfg = cfg;
2539   plug->env.sched = sched;
2540   plug->env.my_public_key = &my_public_key;
2541   plug->env.my_private_key = my_private_key;
2542   plug->env.my_identity = &my_identity;
2543   plug->env.cls = plug;
2544   plug->env.receive = &plugin_env_receive;
2545   plug->env.lookup = &plugin_env_lookup_address;
2546   plug->env.notify_address = &plugin_env_notify_address;
2547   plug->env.notify_validation = &plugin_env_notify_validation;
2548   plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2549   plug->env.max_connections = max_connect_per_transport;
2550 }
2551
2552
2553 /**
2554  * Start the specified transport (load the plugin).
2555  */
2556 static void
2557 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2558 {
2559   struct TransportPlugin *plug;
2560   char *libname;
2561
2562   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2563               _("Loading `%s' transport plugin\n"), name);
2564   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2565   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2566   create_environment (plug);
2567   plug->short_name = GNUNET_strdup (name);
2568   plug->lib_name = libname;
2569   plug->next = plugins;
2570   plugins = plug;
2571   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2572   if (plug->api == NULL)
2573     {
2574       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2575                   _("Failed to load transport plugin for `%s'\n"), name);
2576       GNUNET_free (plug->short_name);
2577       plugins = plug->next;
2578       GNUNET_free (libname);
2579       GNUNET_free (plug);
2580     }
2581 }
2582
2583
2584 /**
2585  * Called whenever a client is disconnected.  Frees our
2586  * resources associated with that client.
2587  *
2588  * @param cls closure
2589  * @param client identification of the client
2590  */
2591 static void
2592 client_disconnect_notification (void *cls,
2593                                 struct GNUNET_SERVER_Client *client)
2594 {
2595   struct TransportClient *pos;
2596   struct TransportClient *prev;
2597   struct ClientMessageQueueEntry *mqe;
2598
2599 #if DEBUG_TRANSPORT
2600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2601               "Client disconnected, cleaning up.\n");
2602 #endif
2603   prev = NULL;
2604   pos = clients;
2605   while ((pos != NULL) && (pos->client != client))
2606     {
2607       prev = pos;
2608       pos = pos->next;
2609     }
2610   if (pos == NULL)
2611     return;
2612   while (NULL != (mqe = pos->message_queue_head))
2613     {
2614       pos->message_queue_head = mqe->next;
2615       GNUNET_free (mqe);
2616     }
2617   pos->message_queue_head = NULL;
2618   if (prev == NULL)
2619     clients = pos->next;
2620   else
2621     prev->next = pos->next;
2622   if (GNUNET_YES == pos->tcs_pending)
2623     {
2624       pos->client = NULL;
2625       return;
2626     }
2627   GNUNET_free (pos);
2628 }
2629
2630
2631 /**
2632  * Function called when the service shuts down.  Unloads our plugins.
2633  *
2634  * @param cls closure, unused
2635  * @param tc task context (unused)
2636  */
2637 static void
2638 unload_plugins (void *cls, 
2639                 const struct GNUNET_SCHEDULER_TaskContext *tc)
2640 {
2641   struct TransportPlugin *plug;
2642   struct AddressList *al;
2643
2644 #if DEBUG_TRANSPORT
2645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2646               "Transport service is unloading plugins...\n");
2647 #endif
2648   while (NULL != (plug = plugins))
2649     {
2650       plugins = plug->next;
2651       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2652       GNUNET_free (plug->lib_name);
2653       GNUNET_free (plug->short_name);
2654       while (NULL != (al = plug->addresses))
2655         {
2656           plug->addresses = al->next;
2657           GNUNET_free (al);
2658         }
2659       GNUNET_free (plug);
2660     }
2661   if (my_private_key != NULL)
2662     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2663   GNUNET_free_non_null (our_hello);
2664 }
2665
2666
2667 /**
2668  * Initiate transport service.
2669  *
2670  * @param cls closure
2671  * @param s scheduler to use
2672  * @param serv the initialized server
2673  * @param c configuration to use
2674  */
2675 static void
2676 run (void *cls,
2677      struct GNUNET_SCHEDULER_Handle *s,
2678      struct GNUNET_SERVER_Handle *serv, 
2679      const struct GNUNET_CONFIGURATION_Handle *c)
2680 {
2681   char *plugs;
2682   char *pos;
2683   int no_transports;
2684   unsigned long long tneigh;
2685   char *keyfile;
2686
2687   sched = s;
2688   cfg = c;
2689   /* parse configuration */
2690   if ((GNUNET_OK !=
2691        GNUNET_CONFIGURATION_get_value_number (c,
2692                                               "TRANSPORT",
2693                                               "NEIGHBOUR_LIMIT",
2694                                               &tneigh)) ||
2695       (GNUNET_OK !=
2696        GNUNET_CONFIGURATION_get_value_filename (c,
2697                                                 "GNUNETD",
2698                                                 "HOSTKEY", &keyfile)))
2699     {
2700       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2701                   _
2702                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2703       GNUNET_SCHEDULER_shutdown (s);
2704       return;
2705     }
2706   max_connect_per_transport = (uint32_t) tneigh;
2707   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2708   GNUNET_free (keyfile);
2709   if (my_private_key == NULL)
2710     {
2711       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2712                   _
2713                   ("Transport service could not access hostkey.  Exiting.\n"));
2714       GNUNET_SCHEDULER_shutdown (s);
2715       return;
2716     }
2717   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2718   GNUNET_CRYPTO_hash (&my_public_key,
2719                       sizeof (my_public_key), &my_identity.hashPubKey);
2720   /* setup notification */
2721   server = serv;
2722   GNUNET_SERVER_disconnect_notify (server,
2723                                    &client_disconnect_notification, NULL);
2724   /* load plugins... */
2725   no_transports = 1;
2726   if (GNUNET_OK ==
2727       GNUNET_CONFIGURATION_get_value_string (c,
2728                                              "TRANSPORT", "PLUGINS", &plugs))
2729     {
2730       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2731                   _("Starting transport plugins `%s'\n"), plugs);
2732       pos = strtok (plugs, " ");
2733       while (pos != NULL)
2734         {
2735           start_transport (server, pos);
2736           no_transports = 0;
2737           pos = strtok (NULL, " ");
2738         }
2739       GNUNET_free (plugs);
2740     }
2741   GNUNET_SCHEDULER_add_delayed (sched,
2742                                 GNUNET_TIME_UNIT_FOREVER_REL,
2743                                 &unload_plugins, NULL);
2744   if (no_transports)
2745     refresh_hello ();
2746 #if DEBUG_TRANSPORT
2747   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2748               _("Transport service ready.\n"));
2749 #endif
2750   /* process client requests */
2751   GNUNET_SERVER_add_handlers (server, handlers);
2752 }
2753
2754
2755 /**
2756  * The main function for the transport service.
2757  *
2758  * @param argc number of arguments from the command line
2759  * @param argv command line arguments
2760  * @return 0 ok, 1 on error
2761  */
2762 int
2763 main (int argc, char *const *argv)
2764 {
2765   return (GNUNET_OK ==
2766           GNUNET_SERVICE_run (argc,
2767                               argv,
2768                               "transport",
2769                               GNUNET_SERVICE_OPTION_NONE,
2770                               &run, NULL)) ? 0 : 1;
2771 }
2772
2773 /* end of gnunet-service-transport.c */