bugfixes and redesigning scheduler API
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  * Must be rather small, otherwise a partially successful HELLO
71  * validation (some addresses working) might not be available
72  * before a client's request for a connection fails for good.
73  * Besides, if a single request to an address takes a long time,
74  * then the peer is unlikely worthwhile anyway.
75  */
76 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
77
78 /**
79  * How often do we re-add (cheaper) plugins to our list of plugins
80  * to try for a given connected peer?
81  */
82 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
83
84 /**
85  * After how long do we expire an address in a HELLO
86  * that we just validated?  This value is also used
87  * for our own addresses when we create a HELLO.
88  */
89 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
90
91
92 /**
93  * Entry in linked list of network addresses.
94  */
95 struct AddressList
96 {
97   /**
98    * This is a linked list.
99    */
100   struct AddressList *next;
101
102   /**
103    * The address, actually a pointer to the end
104    * of this struct.  Do not free!
105    */
106   void *addr;
107
108   /**
109    * How long until we auto-expire this address (unless it is
110    * re-confirmed by the transport)?
111    */
112   struct GNUNET_TIME_Absolute expires;
113
114   /**
115    * Length of addr.
116    */
117   size_t addrlen;
118
119 };
120
121
122 /**
123  * Entry in linked list of all of our plugins.
124  */
125 struct TransportPlugin
126 {
127
128   /**
129    * This is a linked list.
130    */
131   struct TransportPlugin *next;
132
133   /**
134    * API of the transport as returned by the plugin's
135    * initialization function.
136    */
137   struct GNUNET_TRANSPORT_PluginFunctions *api;
138
139   /**
140    * Short name for the plugin (i.e. "tcp").
141    */
142   char *short_name;
143
144   /**
145    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
146    */
147   char *lib_name;
148
149   /**
150    * List of our known addresses for this transport.
151    */
152   struct AddressList *addresses;
153
154   /**
155    * Environment this transport service is using
156    * for this plugin.
157    */
158   struct GNUNET_TRANSPORT_PluginEnvironment env;
159
160   /**
161    * ID of task that is used to clean up expired addresses.
162    */
163   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
164
165
166   /**
167    * Set to GNUNET_YES if we need to scrap the existing
168    * list of "addresses" and start fresh when we receive
169    * the next address update from a transport.  Set to
170    * GNUNET_NO if we should just add the new address
171    * to the list and wait for the commit call.
172    */
173   int rebuild;
174 };
175
176 struct NeighbourList;
177
178 /**
179  * For each neighbour we keep a list of messages
180  * that we still want to transmit to the neighbour.
181  */
182 struct MessageQueue
183 {
184
185   /**
186    * This is a linked list.
187    */
188   struct MessageQueue *next;
189
190   /**
191    * The message we want to transmit.
192    */
193   struct GNUNET_MessageHeader *message;
194
195   /**
196    * Client responsible for queueing the message;
197    * used to check that a client has not two messages
198    * pending for the same target.  Can be NULL.
199    */
200   struct TransportClient *client;
201
202   /**
203    * Neighbour this entry belongs to.
204    */
205   struct NeighbourList *neighbour;
206
207   /**
208    * Plugin that we used for the transmission.
209    * NULL until we scheduled a transmission.
210    */
211   struct TransportPlugin *plugin;
212
213   /**
214    * Internal message of the transport system that should not be
215    * included in the usual SEND-SEND_OK transmission confirmation
216    * traffic management scheme.  Typically, "internal_msg" will
217    * be set whenever "client" is NULL (but it is not strictly
218    * required).
219    */
220   int internal_msg;
221
222   /**
223    * How important is the message?
224    */
225   unsigned int priority;
226   
227 };
228
229
230 /**
231  * For a given Neighbour, which plugins are available
232  * to talk to this peer and what are their costs?
233  */
234 struct ReadyList
235 {
236
237   /**
238    * This is a linked list.
239    */
240   struct ReadyList *next;
241
242   /**
243    * Which of our transport plugins does this entry
244    * represent?
245    */
246   struct TransportPlugin *plugin;
247
248   /**
249    * Neighbour this entry belongs to.
250    */
251   struct NeighbourList *neighbour;
252
253   /**
254    * Opaque handle (specific to the plugin) for the
255    * connection to our target; can be NULL.
256    */
257   void *plugin_handle;
258
259   /**
260    * What was the last latency observed for this plugin
261    * and peer?  Invalid if connected is GNUNET_NO.
262    */
263   struct GNUNET_TIME_Relative latency;
264
265   /**
266    * If we did not successfully transmit a message to the
267    * given peer via this connection during the specified
268    * time, we should consider the connection to be dead.
269    * This is used in the case that a TCP transport simply
270    * stalls writing to the stream but does not formerly
271    * get a signal that the other peer died.
272    */
273   struct GNUNET_TIME_Absolute timeout;
274
275   /**
276    * Is this plugin currently connected?  The first time
277    * we transmit or send data to a peer via a particular
278    * plugin, we set this to GNUNET_YES.  If we later get
279    * an error (disconnect notification or transmission
280    * failure), we set it back to GNUNET_NO.  Each time the
281    * value is set to GNUNET_YES, we increment the
282    * "connect_attempts" counter.  If that one reaches a
283    * particular threshold, we consider the plugin to not
284    * be working properly at this time for the given peer
285    * and remove it from the eligible list.
286    */
287   int connected;
288
289   /**
290    * How often have we tried to connect using this plugin?
291    */
292   unsigned int connect_attempts;
293
294   /**
295    * Is this plugin ready to transmit to the specific
296    * target?  GNUNET_NO if not.  Initially, all plugins
297    * are marked ready.  If a transmission is in progress,
298    * "transmit_ready" is set to GNUNET_NO.
299    */
300   int transmit_ready;
301
302 };
303
304
305 /**
306  * Entry in linked list of all of our current neighbours.
307  */
308 struct NeighbourList
309 {
310
311   /**
312    * This is a linked list.
313    */
314   struct NeighbourList *next;
315
316   /**
317    * Which of our transports is connected to this peer
318    * and what is their status?
319    */
320   struct ReadyList *plugins;
321
322   /**
323    * List of messages we would like to send to this peer;
324    * must contain at most one message per client.
325    */
326   struct MessageQueue *messages;
327
328   /**
329    * Identity of this neighbour.
330    */
331   struct GNUNET_PeerIdentity id;
332
333   /**
334    * ID of task scheduled to run when this peer is about to
335    * time out (will free resources associated with the peer).
336    */
337   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
338
339   /**
340    * How long until we should consider this peer dead
341    * (if we don't receive another message in the
342    * meantime)?
343    */
344   struct GNUNET_TIME_Absolute peer_timeout;
345
346   /**
347    * At what time did we reset last_received last?
348    */
349   struct GNUNET_TIME_Absolute last_quota_update;
350
351   /**
352    * At what time should we try to again add plugins to
353    * our ready list?
354    */
355   struct GNUNET_TIME_Absolute retry_plugins_time;
356
357   /**
358    * How many bytes have we received since the "last_quota_update"
359    * timestamp?
360    */
361   uint64_t last_received;
362
363   /**
364    * Global quota for inbound traffic for the neighbour in bytes/ms.
365    */
366   uint32_t quota_in;
367
368   /**
369    * How often has the other peer (recently) violated the
370    * inbound traffic limit?  Incremented by 10 per violation,
371    * decremented by 1 per non-violation (for each
372    * time interval).
373    */
374   unsigned int quota_violation_count;
375
376   /**
377    * Have we seen an ACK from this neighbour in the past?
378    * (used to make up a fake ACK for clients connecting after
379    * the neighbour connected to us).
380    */
381   int saw_ack;
382
383 };
384
385
386 /**
387  * Linked list of messages to be transmitted to
388  * the client.  Each entry is followed by the
389  * actual message.
390  */
391 struct ClientMessageQueueEntry
392 {
393   /**
394    * This is a linked list.
395    */
396   struct ClientMessageQueueEntry *next;
397 };
398
399
400 /**
401  * Client connected to the transport service.
402  */
403 struct TransportClient
404 {
405
406   /**
407    * This is a linked list.
408    */
409   struct TransportClient *next;
410
411   /**
412    * Handle to the client.
413    */
414   struct GNUNET_SERVER_Client *client;
415
416   /**
417    * Linked list of messages yet to be transmitted to
418    * the client.
419    */
420   struct ClientMessageQueueEntry *message_queue_head;
421
422   /**
423    * Tail of linked list of messages yet to be transmitted to the
424    * client.
425    */
426   struct ClientMessageQueueEntry *message_queue_tail;
427
428   /**
429    * Is a call to "transmit_send_continuation" pending?  If so, we
430    * must not free this struct (even if the corresponding client
431    * disconnects) and instead only remove it from the linked list and
432    * set the "client" field to NULL.
433    */
434   int tcs_pending;
435
436   /**
437    * Length of the list of messages pending for this client.
438    */
439   unsigned int message_count;
440
441 };
442
443
444 /**
445  * For each HELLO, we may have to validate multiple addresses;
446  * each address gets its own request entry.
447  */
448 struct ValidationAddress
449 {
450   /**
451    * This is a linked list.
452    */
453   struct ValidationAddress *next;
454
455   /**
456    * Name of the transport.
457    */
458   char *transport_name;
459
460   /**
461    * When should this validated address expire?
462    */
463   struct GNUNET_TIME_Absolute expiration;
464
465   /**
466    * Length of the address we are validating.
467    */
468   size_t addr_len;
469
470   /**
471    * Challenge number we used.
472    */
473   uint32_t challenge;
474
475   /**
476    * Set to GNUNET_YES if the challenge was met,
477    * GNUNET_SYSERR if we know it failed, GNUNET_NO
478    * if we are waiting on a response.
479    */
480   int ok;
481 };
482
483
484 /**
485  * Entry in linked list of all HELLOs awaiting validation.
486  */
487 struct ValidationList
488 {
489
490   /**
491    * This is a linked list.
492    */
493   struct ValidationList *next;
494
495   /**
496    * Linked list with one entry per address from the HELLO
497    * that needs to be validated.
498    */
499   struct ValidationAddress *addresses;
500
501   /**
502    * The public key of the peer.
503    */
504   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
505
506   /**
507    * When does this record time-out? (assuming the
508    * challenge goes unanswered)
509    */
510   struct GNUNET_TIME_Absolute timeout;
511
512 };
513
514
515 /**
516  * HELLOs awaiting validation.
517  */
518 static struct ValidationList *pending_validations;
519
520 /**
521  * Our HELLO message.
522  */
523 static struct GNUNET_HELLO_Message *our_hello;
524
525 /**
526  * "version" of "our_hello".  Used to see if a given
527  * neighbour has already been sent the latest version
528  * of our HELLO message.
529  */
530 static unsigned int our_hello_version;
531
532 /**
533  * Our public key.
534  */
535 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
536
537 /**
538  * Our identity.
539  */
540 static struct GNUNET_PeerIdentity my_identity;
541
542 /**
543  * Our private key.
544  */
545 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
546
547 /**
548  * Our scheduler.
549  */
550 struct GNUNET_SCHEDULER_Handle *sched;
551
552 /**
553  * Our configuration.
554  */
555 const struct GNUNET_CONFIGURATION_Handle *cfg;
556
557 /**
558  * Linked list of all clients to this service.
559  */
560 static struct TransportClient *clients;
561
562 /**
563  * All loaded plugins.
564  */
565 static struct TransportPlugin *plugins;
566
567 /**
568  * Our server.
569  */
570 static struct GNUNET_SERVER_Handle *server;
571
572 /**
573  * All known neighbours and their HELLOs.
574  */
575 static struct NeighbourList *neighbours;
576
577 /**
578  * Number of neighbours we'd like to have.
579  */
580 static uint32_t max_connect_per_transport;
581
582
583 /**
584  * Find an entry in the neighbour list for a particular peer.
585  *
586  * @return NULL if not found.
587  */
588 static struct NeighbourList *
589 find_neighbour (const struct GNUNET_PeerIdentity *key)
590 {
591   struct NeighbourList *head = neighbours;
592   while ((head != NULL) &&
593          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
594     head = head->next;
595   return head;
596 }
597
598
599 /**
600  * Find an entry in the transport list for a particular transport.
601  *
602  * @return NULL if not found.
603  */
604 static struct TransportPlugin *
605 find_transport (const char *short_name)
606 {
607   struct TransportPlugin *head = plugins;
608   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
609     head = head->next;
610   return head;
611 }
612
613
614 /**
615  * Update the quota values for the given neighbour now.
616  */
617 static void
618 update_quota (struct NeighbourList *n)
619 {
620   struct GNUNET_TIME_Relative delta;
621   uint64_t allowed;
622   uint64_t remaining;
623
624   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
625   if (delta.value < MIN_QUOTA_REFRESH_TIME)
626     return;                     /* not enough time passed for doing quota update */
627   allowed = delta.value * n->quota_in;
628   if (n->last_received < allowed)
629     {
630       remaining = allowed - n->last_received;
631       if (n->quota_in > 0)
632         remaining /= n->quota_in;
633       else
634         remaining = 0;
635       if (remaining > MAX_BANDWIDTH_CARRY)
636         remaining = MAX_BANDWIDTH_CARRY;
637       n->last_received = 0;
638       n->last_quota_update = GNUNET_TIME_absolute_get ();
639       n->last_quota_update.value -= remaining;
640       if (n->quota_violation_count > 0)
641         n->quota_violation_count--;
642     }
643   else
644     {
645       n->last_received -= allowed;
646       n->last_quota_update = GNUNET_TIME_absolute_get ();
647       if (n->last_received > allowed)
648         {
649           /* more than twice the allowed rate! */
650           n->quota_violation_count += 10;
651         }
652     }
653 }
654
655
656 /**
657  * Function called to notify a client about the socket
658  * being ready to queue more data.  "buf" will be
659  * NULL and "size" zero if the socket was closed for
660  * writing in the meantime.
661  *
662  * @param cls closure
663  * @param size number of bytes available in buf
664  * @param buf where the callee should write the message
665  * @return number of bytes written to buf
666  */
667 static size_t
668 transmit_to_client_callback (void *cls, size_t size, void *buf)
669 {
670   struct TransportClient *client = cls;
671   struct ClientMessageQueueEntry *q;
672   uint16_t msize;
673   size_t tsize;
674   const struct GNUNET_MessageHeader *msg;
675   struct GNUNET_CONNECTION_TransmitHandle *th;
676   char *cbuf;
677
678   if (buf == NULL)
679     {
680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
681                   "Transmission to client failed, closing connection.\n");
682       /* fatal error with client, free message queue! */
683       while (NULL != (q = client->message_queue_head))
684         {
685           client->message_queue_head = q->next;
686           GNUNET_free (q);
687         }
688       client->message_queue_tail = NULL;
689       client->message_count = 0;
690       return 0;
691     }
692   cbuf = buf;
693   tsize = 0;
694   while (NULL != (q = client->message_queue_head))
695     {
696       msg = (const struct GNUNET_MessageHeader *) &q[1];
697       msize = ntohs (msg->size);
698       if (msize + tsize > size)
699         break;
700 #if DEBUG_TRANSPORT
701       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                   "Transmitting message of type %u to client.\n",
703                   ntohs (msg->type));
704 #endif
705       client->message_queue_head = q->next;
706       if (q->next == NULL)
707         client->message_queue_tail = NULL;
708       memcpy (&cbuf[tsize], msg, msize);
709       tsize += msize;
710       GNUNET_free (q);
711       client->message_count--;
712     }
713   if (NULL != q)
714     {
715       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
716       th = GNUNET_SERVER_notify_transmit_ready (client->client,
717                                                 msize,
718                                                 GNUNET_TIME_UNIT_FOREVER_REL,
719                                                 &transmit_to_client_callback,
720                                                 client);
721       GNUNET_assert (th != NULL);
722     }
723   return tsize;
724 }
725
726
727 /**
728  * Send the specified message to the specified client.  Since multiple
729  * messages may be pending for the same client at a time, this code
730  * makes sure that no message is lost.
731  *
732  * @param client client to transmit the message to
733  * @param msg the message to send
734  * @param may_drop can this message be dropped if the
735  *        message queue for this client is getting far too large?
736  */
737 static void
738 transmit_to_client (struct TransportClient *client,
739                     const struct GNUNET_MessageHeader *msg, int may_drop)
740 {
741   struct ClientMessageQueueEntry *q;
742   uint16_t msize;
743   struct GNUNET_CONNECTION_TransmitHandle *th;
744
745   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
746     {
747       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
748                   _
749                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
750                   client->message_count, MAX_PENDING);
751       /* TODO: call to statistics... */
752       return;
753     }
754   client->message_count++;
755   msize = ntohs (msg->size);
756   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
757   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
758   memcpy (&q[1], msg, msize);
759   /* append to message queue */
760   if (client->message_queue_tail == NULL)
761     {
762       client->message_queue_tail = q;
763     }
764   else
765     {
766       client->message_queue_tail->next = q;
767       client->message_queue_tail = q;
768     }
769   if (client->message_queue_head == NULL)
770     {
771       client->message_queue_head = q;
772       th = GNUNET_SERVER_notify_transmit_ready (client->client,
773                                                 msize,
774                                                 GNUNET_TIME_UNIT_FOREVER_REL,
775                                                 &transmit_to_client_callback,
776                                                 client);
777       GNUNET_assert (th != NULL);
778     }
779 }
780
781
782 /**
783  * Find alternative plugins for communication.
784  *
785  * @param neighbour for which neighbour should we try to find
786  *        more plugins?
787  */
788 static void
789 try_alternative_plugins (struct NeighbourList *neighbour)
790 {
791   struct ReadyList *rl;
792
793   if ((neighbour->plugins != NULL) &&
794       (neighbour->retry_plugins_time.value >
795        GNUNET_TIME_absolute_get ().value))
796     return;                     /* don't try right now */
797   neighbour->retry_plugins_time
798     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
799
800   rl = neighbour->plugins;
801   while (rl != NULL)
802     {
803       if (rl->connect_attempts > 0)
804         rl->connect_attempts--; /* amnesty */
805       rl = rl->next;
806     }
807
808 }
809
810
811 /**
812  * The peer specified by the given neighbour has timed-out or a plugin
813  * has disconnected.  We may either need to do nothing (other plugins
814  * still up), or trigger a full disconnect and clean up.  This
815  * function updates our state and do the necessary notifications.
816  * Also notifies our clients that the neighbour is now officially
817  * gone.
818  *
819  * @param n the neighbour list entry for the peer
820  * @param check should we just check if all plugins
821  *        disconnected or must we ask all plugins to
822  *        disconnect?
823  */
824 static void
825 disconnect_neighbour (struct NeighbourList *n,
826                       int check);
827
828
829 /**
830  * Check the ready list for the given neighbour and
831  * if a plugin is ready for transmission (and if we
832  * have a message), do so!
833  *
834  * @param neighbour target peer for which to check the plugins
835  */
836 static void 
837 try_transmission_to_peer (struct NeighbourList *neighbour);
838
839
840 /**
841  * Function called by the GNUNET_TRANSPORT_TransmitFunction
842  * upon "completion" of a send request.  This tells the API
843  * that it is now legal to send another message to the given
844  * peer.
845  *
846  * @param cls closure, identifies the entry on the
847  *            message queue that was transmitted and the
848  *            client responsible for queueing the message
849  * @param rl identifies plugin used for the transmission for
850  *           this neighbour; needs to be re-enabled for
851  *           future transmissions
852  * @param target the peer receiving the message
853  * @param result GNUNET_OK on success, if the transmission
854  *           failed, we should not tell the client to transmit
855  *           more messages
856  */
857 static void
858 transmit_send_continuation (void *cls,
859                             struct ReadyList *rl,
860                             const struct GNUNET_PeerIdentity *target,
861                             int result)
862 {
863   struct MessageQueue *mq = cls;
864   struct SendOkMessage send_ok_msg;
865   struct NeighbourList *n;
866
867   GNUNET_assert (mq != NULL);
868   n = mq->neighbour;
869   GNUNET_assert (n != NULL);
870   GNUNET_assert (0 ==
871                  memcmp (&n->id, target,
872                          sizeof (struct GNUNET_PeerIdentity)));
873   if (rl == NULL)
874     {
875       rl = n->plugins;
876       while ((rl != NULL) && (rl->plugin != mq->plugin))
877         rl = rl->next;
878       GNUNET_assert (rl != NULL);
879     }
880   if (result == GNUNET_OK)
881     {
882       rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
883     }
884   else
885     {
886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887                   "Transmission to peer `%s' failed, marking connection as down.\n",
888                   GNUNET_i2s(target));
889       rl->connected = GNUNET_NO;
890       rl->plugin_handle = NULL;
891     }
892   if (!mq->internal_msg)
893     rl->transmit_ready = GNUNET_YES;
894   if (mq->client != NULL)
895     {
896       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
897                   "Notifying client %p about failed transission to peer `%4s'.\n",
898                   mq->client,
899                   GNUNET_i2s(target));
900       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
901       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
902       send_ok_msg.success = htonl (result);
903       send_ok_msg.peer = n->id;
904       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
905     }
906   GNUNET_free (mq->message);
907   GNUNET_free (mq);
908   /* one plugin just became ready again, try transmitting
909      another message (if available) */
910   if (result == GNUNET_OK)
911     try_transmission_to_peer (n);
912   else    
913     disconnect_neighbour (n, GNUNET_YES); 
914 }
915
916
917 /**
918  * Check the ready list for the given neighbour and
919  * if a plugin is ready for transmission (and if we
920  * have a message), do so!
921  */
922 static void
923 try_transmission_to_peer (struct NeighbourList *neighbour)
924 {
925   struct ReadyList *pos;
926   struct GNUNET_TIME_Relative min_latency;
927   struct ReadyList *rl;
928   struct MessageQueue *mq;
929   struct GNUNET_TIME_Absolute now;
930
931   if (neighbour->messages == NULL)
932     return;                     /* nothing to do */
933   try_alternative_plugins (neighbour);
934   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
935   rl = NULL;
936   mq = neighbour->messages;
937   now = GNUNET_TIME_absolute_get ();
938   pos = neighbour->plugins;
939   while (pos != NULL)
940     {
941       /* set plugins that are inactive for a long time back to disconnected */
942       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
943         {
944 #if DEBUG_TRANSPORT
945           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
946                       "Marking long-time inactive connection to `%4s' as down.\n",
947                       GNUNET_i2s (&neighbour->id));
948 #endif
949           pos->connected = GNUNET_NO;
950         }
951       if (((GNUNET_YES == pos->transmit_ready) ||
952            (mq->internal_msg)) &&
953           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
954           ((rl == NULL) || (min_latency.value > pos->latency.value)))
955         {
956           rl = pos;
957           min_latency = pos->latency;
958         }
959       pos = pos->next;
960     }
961   if (rl == NULL)
962     {
963 #if DEBUG_TRANSPORT
964       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965                   "No plugin ready to transmit message\n");
966 #endif
967       return;                   /* nobody ready */
968     }
969   if (GNUNET_NO == rl->connected)
970     {
971       rl->connect_attempts++;
972       rl->connected = GNUNET_YES;
973 #if DEBUG_TRANSPORT
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "Establishing fresh connection with `%4s' via plugin `%s'\n",
976               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
977 #endif
978     }
979   neighbour->messages = mq->next;
980   mq->plugin = rl->plugin;
981   if (!mq->internal_msg)
982     rl->transmit_ready = GNUNET_NO;
983 #if DEBUG_TRANSPORT
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
986               ntohs (mq->message->type),
987               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
988 #endif
989   rl->plugin_handle
990     = rl->plugin->api->send (rl->plugin->api->cls,
991                              rl->plugin_handle,
992                              rl,
993                              &neighbour->id,
994                              mq->priority,
995                              mq->message,
996                              GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
997                              &transmit_send_continuation, mq);
998 }
999
1000
1001 /**
1002  * Send the specified message to the specified peer.
1003  *
1004  * @param client source of the transmission request (can be NULL)
1005  * @param priority how important is the message
1006  * @param msg message to send
1007  * @param is_internal is this an internal message
1008  * @param neighbour handle to the neighbour for transmission
1009  */
1010 static void
1011 transmit_to_peer (struct TransportClient *client,
1012                   unsigned int priority,
1013                   const struct GNUNET_MessageHeader *msg,
1014                   int is_internal, struct NeighbourList *neighbour)
1015 {
1016   struct MessageQueue *mq;
1017   struct MessageQueue *mqe;
1018   struct GNUNET_MessageHeader *m;
1019
1020 #if DEBUG_TRANSPORT
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               _("Sending message of type %u to peer `%4s'\n"),
1023               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
1024 #endif
1025   if (client != NULL)
1026     {
1027       /* check for duplicate submission */
1028       mq = neighbour->messages;
1029       while (NULL != mq)
1030         {
1031           if (mq->client == client)
1032             {
1033               /* client transmitted to same peer twice
1034                  before getting SendOk! */
1035               GNUNET_break (0);
1036               return;
1037             }
1038           mq = mq->next;
1039         }
1040     }
1041   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1042   mq->client = client;
1043   m = GNUNET_malloc (ntohs (msg->size));
1044   memcpy (m, msg, ntohs (msg->size));
1045   mq->message = m;
1046   mq->neighbour = neighbour;
1047   mq->internal_msg = is_internal;
1048   mq->priority = priority;
1049
1050   /* find tail */
1051   mqe = neighbour->messages;
1052   if (mqe != NULL)
1053     while (mqe->next != NULL)
1054       mqe = mqe->next;
1055   if (mqe == NULL)
1056     {
1057       /* new head */
1058       neighbour->messages = mq;
1059       try_transmission_to_peer (neighbour);
1060     }
1061   else
1062     {
1063       /* append */
1064       mqe->next = mq;
1065     }
1066 }
1067
1068
1069 /**
1070  * FIXME: document.
1071  */
1072 struct GeneratorContext
1073 {
1074   struct TransportPlugin *plug_pos;
1075   struct AddressList *addr_pos;
1076   struct GNUNET_TIME_Absolute expiration;
1077 };
1078
1079
1080 /**
1081  * FIXME: document.
1082  */
1083 static size_t
1084 address_generator (void *cls, size_t max, void *buf)
1085 {
1086   struct GeneratorContext *gc = cls;
1087   size_t ret;
1088
1089   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1090     {
1091       gc->plug_pos = gc->plug_pos->next;
1092       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1093     }
1094   if (NULL == gc->plug_pos)
1095     return 0;
1096   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1097                                   gc->expiration,
1098                                   gc->addr_pos->addr,
1099                                   gc->addr_pos->addrlen, buf, max);
1100   gc->addr_pos = gc->addr_pos->next;
1101   return ret;
1102 }
1103
1104
1105 /**
1106  * Construct our HELLO message from all of the addresses of
1107  * all of the transports.
1108  */
1109 static void
1110 refresh_hello ()
1111 {
1112   struct GNUNET_HELLO_Message *hello;
1113   struct TransportClient *cpos;
1114   struct NeighbourList *npos;
1115   struct GeneratorContext gc;
1116
1117 #if DEBUG_TRANSPORT
1118   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1119               "Refreshing my `%s'\n",
1120               "HELLO");
1121 #endif
1122   gc.plug_pos = plugins;
1123   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1124   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1125   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1126   cpos = clients;
1127   while (cpos != NULL)
1128     {
1129       transmit_to_client (cpos,
1130                           (const struct GNUNET_MessageHeader *) hello,
1131                           GNUNET_NO);
1132       cpos = cpos->next;
1133     }
1134
1135   GNUNET_free_non_null (our_hello);
1136   our_hello = hello;
1137   our_hello_version++;
1138   npos = neighbours;
1139   while (npos != NULL)
1140     {
1141 #if DEBUG_TRANSPORT
1142       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1143                   "Transmitting updated `%s' to neighbour `%4s'\n",
1144                   "HELLO",
1145                   GNUNET_i2s(&npos->id));
1146 #endif
1147       transmit_to_peer (NULL, 0,
1148                         (const struct GNUNET_MessageHeader *) our_hello,
1149                         GNUNET_YES, npos);
1150       npos = npos->next;
1151     }
1152 }
1153
1154
1155 /**
1156  * Task used to clean up expired addresses for a plugin.
1157  *
1158  * @param cls closure
1159  * @param tc context
1160  */
1161 static void
1162 expire_address_task (void *cls,
1163                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1164
1165
1166 /**
1167  * Update the list of addresses for this plugin,
1168  * expiring those that are past their expiration date.
1169  *
1170  * @param plugin addresses of which plugin should be recomputed?
1171  * @param fresh set to GNUNET_YES if a new address was added
1172  *        and we need to regenerate the HELLO even if nobody
1173  *        expired
1174  */
1175 static void
1176 update_addresses (struct TransportPlugin *plugin, int fresh)
1177 {
1178   struct GNUNET_TIME_Relative min_remaining;
1179   struct GNUNET_TIME_Relative remaining;
1180   struct GNUNET_TIME_Absolute now;
1181   struct AddressList *pos;
1182   struct AddressList *prev;
1183   struct AddressList *next;
1184   int expired;
1185
1186   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
1187     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1188   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1189   now = GNUNET_TIME_absolute_get ();
1190   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1191   expired = GNUNET_NO;
1192   prev = NULL;
1193   pos = plugin->addresses;
1194   while (pos != NULL)
1195     {
1196       next = pos->next;
1197       if (pos->expires.value < now.value)
1198         {
1199           expired = GNUNET_YES;
1200           if (prev == NULL)
1201             plugin->addresses = pos->next;
1202           else
1203             prev->next = pos->next;
1204           GNUNET_free (pos);
1205         }
1206       else
1207         {
1208           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1209           if (remaining.value < min_remaining.value)
1210             min_remaining = remaining;
1211           prev = pos;
1212         }
1213       pos = next;
1214     }
1215
1216   if (expired || fresh)
1217     refresh_hello ();
1218   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1219     plugin->address_update_task
1220       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1221                                       min_remaining,
1222                                       &expire_address_task, plugin);
1223
1224 }
1225
1226
1227 /**
1228  * Task used to clean up expired addresses for a plugin.
1229  *
1230  * @param cls closure
1231  * @param tc context
1232  */
1233 static void
1234 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1235 {
1236   struct TransportPlugin *plugin = cls;
1237   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1238   update_addresses (plugin, GNUNET_NO);
1239 }
1240
1241
1242 /**
1243  * Function that must be called by each plugin to notify the
1244  * transport service about the addresses under which the transport
1245  * provided by the plugin can be reached.
1246  *
1247  * @param cls closure
1248  * @param name name of the transport that generated the address
1249  * @param addr one of the addresses of the host, NULL for the last address
1250  *        the specific address format depends on the transport
1251  * @param addrlen length of the address
1252  * @param expires when should this address automatically expire?
1253  */
1254 static void
1255 plugin_env_notify_address (void *cls,
1256                            const char *name,
1257                            const void *addr,
1258                            size_t addrlen,
1259                            struct GNUNET_TIME_Relative expires)
1260 {
1261   struct TransportPlugin *p = cls;
1262   struct AddressList *al;
1263   struct GNUNET_TIME_Absolute abex;
1264
1265   abex = GNUNET_TIME_relative_to_absolute (expires);
1266   GNUNET_assert (p == find_transport (name));
1267
1268   al = p->addresses;
1269   while (al != NULL)
1270     {
1271       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1272         {
1273           if (al->expires.value < abex.value)
1274             al->expires = abex;
1275           return;
1276         }
1277       al = al->next;
1278     }
1279 #if DEBUG_TRANSPORT
1280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1281               "Plugin `%s' informs us about a new address `%s'\n", name,
1282               GNUNET_a2s(addr, addrlen));
1283 #endif
1284   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1285   al->addr = &al[1];
1286   al->next = p->addresses;
1287   p->addresses = al;
1288   al->expires = abex;
1289   al->addrlen = addrlen;
1290   memcpy (&al[1], addr, addrlen);
1291   update_addresses (p, GNUNET_YES);
1292 }
1293
1294
1295 /**
1296  * FIXME: document.
1297  */
1298 struct LookupHelloContext
1299 {
1300   GNUNET_TRANSPORT_AddressCallback iterator;
1301
1302   void *iterator_cls;
1303 };
1304
1305
1306 /**
1307  * FIXME: document.
1308  */
1309 static int
1310 lookup_address_callback (void *cls,
1311                          const char *tname,
1312                          struct GNUNET_TIME_Absolute expiration,
1313                          const void *addr, size_t addrlen)
1314 {
1315   struct LookupHelloContext *lhc = cls;
1316   lhc->iterator (lhc->iterator_cls, tname, addr, addrlen);
1317   return GNUNET_OK;
1318 }
1319
1320
1321 /**
1322  * FIXME: document.
1323  */
1324 static void
1325 lookup_hello_callback (void *cls,
1326                        const struct GNUNET_PeerIdentity *peer,
1327                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1328 {
1329   struct LookupHelloContext *lhc = cls;
1330
1331   if (peer == NULL)
1332     {
1333       lhc->iterator (lhc->iterator_cls, NULL, NULL, 0);
1334       GNUNET_free (lhc);
1335       return;
1336     }
1337   if (h == NULL)
1338     return;
1339   GNUNET_HELLO_iterate_addresses (h,
1340                                   GNUNET_NO, &lookup_address_callback, lhc);
1341 }
1342
1343
1344 /**
1345  * Function that allows a transport to query the known
1346  * network addresses for a given peer.
1347  *
1348  * @param cls closure
1349  * @param timeout after how long should we time out?
1350  * @param target which peer are we looking for?
1351  * @param iter function to call for each known address
1352  * @param iter_cls closure for iter
1353  */
1354 static void
1355 plugin_env_lookup_address (void *cls,
1356                            struct GNUNET_TIME_Relative timeout,
1357                            const struct GNUNET_PeerIdentity *target,
1358                            GNUNET_TRANSPORT_AddressCallback iter,
1359                            void *iter_cls)
1360 {
1361   struct LookupHelloContext *lhc;
1362
1363   lhc = GNUNET_malloc (sizeof (struct LookupHelloContext));
1364   lhc->iterator = iter;
1365   lhc->iterator_cls = iter_cls;
1366   GNUNET_PEERINFO_for_all (cfg,
1367                            sched,
1368                            target, 0, timeout, &lookup_hello_callback, &lhc);
1369 }
1370
1371
1372 /**
1373  * Notify all of our clients about a peer connecting.
1374  */
1375 static void
1376 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1377                         struct GNUNET_TIME_Relative latency)
1378 {
1379   struct ConnectInfoMessage cim;
1380   struct TransportClient *cpos;
1381
1382 #if DEBUG_TRANSPORT
1383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384               "Informing clients about peer `%4s' connecting to us\n",
1385               GNUNET_i2s (peer));
1386 #endif
1387   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1388   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1389   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000));
1390   cim.latency = GNUNET_TIME_relative_hton (latency);
1391   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1392   cpos = clients;
1393   while (cpos != NULL)
1394     {
1395       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1396       cpos = cpos->next;
1397     }
1398 }
1399
1400
1401 /**
1402  * Notify all of our clients about a peer disconnecting.
1403  */
1404 static void
1405 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1406 {
1407   struct DisconnectInfoMessage dim;
1408   struct TransportClient *cpos;
1409
1410 #if DEBUG_TRANSPORT
1411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1412               "Informing clients about peer `%4s' disconnecting\n",
1413               GNUNET_i2s (peer));
1414 #endif
1415   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1416   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1417   dim.reserved = htonl (0);
1418   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1419   cpos = clients;
1420   while (cpos != NULL)
1421     {
1422       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1423       cpos = cpos->next;
1424     }
1425 }
1426
1427
1428 /**
1429  * Copy any validated addresses to buf.
1430  *
1431  * @return 0 once all addresses have been
1432  *         returned
1433  */
1434 static size_t
1435 list_validated_addresses (void *cls, size_t max, void *buf)
1436 {
1437   struct ValidationAddress **va = cls;
1438   size_t ret;
1439
1440   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1441     *va = (*va)->next;
1442   if (NULL == *va)
1443     return 0;
1444   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1445                                   (*va)->expiration,
1446                                   &(*va)[1], (*va)->addr_len, buf, max);
1447   *va = (*va)->next;
1448   return ret;
1449 }
1450
1451
1452 /**
1453  * HELLO validation cleanup task.
1454  */
1455 static void
1456 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1457 {
1458   struct ValidationAddress *va;
1459   struct ValidationList *pos;
1460   struct ValidationList *prev;
1461   struct GNUNET_TIME_Absolute now;
1462   struct GNUNET_TIME_Absolute first;
1463   struct GNUNET_HELLO_Message *hello;
1464   struct GNUNET_PeerIdentity pid;
1465   struct NeighbourList *n;
1466
1467 #if DEBUG_TRANSPORT
1468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1469               "HELLO validation cleanup background task running...\n");
1470 #endif
1471   now = GNUNET_TIME_absolute_get ();
1472   prev = NULL;
1473   pos = pending_validations;
1474   while (pos != NULL)
1475     {
1476       if (pos->timeout.value < now.value)
1477         {
1478           if (prev == NULL)
1479             pending_validations = pos->next;
1480           else
1481             prev->next = pos->next;
1482           va = pos->addresses;
1483           hello = GNUNET_HELLO_create (&pos->publicKey,
1484                                        &list_validated_addresses, &va);
1485           GNUNET_CRYPTO_hash (&pos->publicKey,
1486                               sizeof (struct
1487                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1488                               &pid.hashPubKey);
1489 #if DEBUG_TRANSPORT
1490           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1491                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1492                       "HELLO", GNUNET_i2s (&pid));
1493 #endif
1494           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1495           n = find_neighbour (&pid);
1496           if (NULL != n)
1497             try_transmission_to_peer (n);           
1498           GNUNET_free (hello);
1499           while (NULL != (va = pos->addresses))
1500             {
1501               pos->addresses = va->next;
1502               GNUNET_free (va->transport_name);
1503               GNUNET_free (va);
1504             }
1505           GNUNET_free (pos);
1506           if (prev == NULL)
1507             pos = pending_validations;
1508           else
1509             pos = prev->next;
1510           continue;
1511         }
1512       prev = pos;
1513       pos = pos->next;
1514     }
1515
1516   /* finally, reschedule cleanup if needed; list is
1517      ordered by timeout, so we need the last element... */
1518   if (NULL != pending_validations)
1519     {
1520       first = pending_validations->timeout;
1521       pos = pending_validations;
1522       while (pos != NULL) 
1523         {
1524           first = GNUNET_TIME_absolute_min (first, pos->timeout);
1525           pos = pos->next;
1526         }
1527       GNUNET_SCHEDULER_add_delayed (sched,
1528                                     GNUNET_TIME_absolute_get_remaining (first),
1529                                     &cleanup_validation, NULL);
1530     }
1531 }
1532
1533
1534
1535
1536 /**
1537  * Function that will be called if we receive a validation
1538  * of an address challenge that we transmitted to another
1539  * peer.  Note that the validation should only be considered
1540  * acceptable if the challenge matches AND if the sender
1541  * address is at least a plausible address for this peer
1542  * (otherwise we may be seeing a MiM attack).
1543  *
1544  * @param cls closure
1545  * @param name name of the transport that generated the address
1546  * @param peer who responded to our challenge
1547  * @param challenge the challenge number we presumably used
1548  * @param sender_addr string describing our sender address (as observed
1549  *         by the other peer in human-readable format)
1550  */
1551 static void
1552 plugin_env_notify_validation (void *cls,
1553                               const char *name,
1554                               const struct GNUNET_PeerIdentity *peer,
1555                               uint32_t challenge,
1556                               const char *sender_addr)
1557 {
1558   unsigned int not_done;
1559   int matched;
1560   struct ValidationList *pos;
1561   struct ValidationAddress *va;
1562   struct GNUNET_PeerIdentity id;
1563
1564   pos = pending_validations;
1565   while (pos != NULL)
1566     {
1567       GNUNET_CRYPTO_hash (&pos->publicKey,
1568                           sizeof (struct
1569                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1570                           &id.hashPubKey);
1571       if (0 ==
1572           memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1573         break;
1574       pos = pos->next;
1575     }
1576   if (pos == NULL)
1577     {
1578       /* TODO: call statistics (unmatched PONG) */
1579       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1580                   _
1581                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1582                   GNUNET_i2s(peer));
1583       return;
1584     }
1585   not_done = 0;
1586   matched = GNUNET_NO;
1587   va = pos->addresses;
1588   while (va != NULL)
1589     {
1590       if (va->challenge == challenge)
1591         {
1592 #if DEBUG_TRANSPORT
1593           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1594                       "Confirmed validity of address, peer `%4s' has address `%s'.\n",
1595                       GNUNET_i2s (peer),
1596                       GNUNET_a2s ((const struct sockaddr*) &va[1], 
1597                                   va->addr_len));
1598 #endif
1599           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1600                       _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1601                       sender_addr, 
1602                       name);
1603           va->ok = GNUNET_YES;
1604           va->expiration =
1605             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1606           matched = GNUNET_YES;
1607         }        
1608       if (va->ok != GNUNET_YES)
1609         not_done++;
1610       va = va->next;
1611     }
1612   if (GNUNET_NO == matched)
1613     {
1614       /* TODO: call statistics (unmatched PONG) */
1615       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1616                   _
1617                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1618                   "PONG", "PING");
1619     }
1620   if (0 == not_done)
1621     {
1622 #if DEBUG_TRANSPORT
1623       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1624                   "All addresses validated, will now construct `%s' for `%4s'.\n",
1625                   "HELLO",
1626                   GNUNET_i2s (peer));
1627 #endif
1628       pos->timeout.value = 0;
1629       GNUNET_SCHEDULER_add_with_priority (sched,
1630                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1631                                           &cleanup_validation, NULL);
1632     }
1633   else
1634     {
1635 #if DEBUG_TRANSPORT
1636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                   "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n",
1638                   not_done,
1639                   "PONG",
1640                   "HELLO",
1641                   GNUNET_i2s (peer));
1642 #endif
1643     }
1644 }
1645
1646
1647 struct CheckHelloValidatedContext
1648 {
1649   /**
1650    * Plugin for which we are validating.
1651    */
1652   struct TransportPlugin *plugin;
1653
1654   /**
1655    * Hello that we are validating.
1656    */
1657   struct GNUNET_HELLO_Message *hello;
1658
1659   /**
1660    * Validation list being build.
1661    */
1662   struct ValidationList *e;
1663
1664 };
1665
1666
1667 /**
1668  * Append the given address to the list of entries
1669  * that need to be validated.
1670  */
1671 static int
1672 run_validation (void *cls,
1673                 const char *tname,
1674                 struct GNUNET_TIME_Absolute expiration,
1675                 const void *addr, size_t addrlen)
1676 {
1677   struct ValidationList *e = cls;
1678   struct TransportPlugin *tp;
1679   struct ValidationAddress *va;
1680   struct GNUNET_PeerIdentity id;
1681
1682   tp = find_transport (tname);
1683   if (tp == NULL)
1684     {
1685       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1686                   GNUNET_ERROR_TYPE_BULK,
1687                   _
1688                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1689                   tname);
1690       return GNUNET_OK;
1691     }
1692   GNUNET_CRYPTO_hash (&e->publicKey,
1693                       sizeof (struct
1694                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1695                       &id.hashPubKey);
1696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1698               GNUNET_a2s(addr, addrlen),
1699               tname,
1700               GNUNET_i2s(&id));
1701
1702   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1703   va->next = e->addresses;
1704   e->addresses = va;
1705   va->transport_name = GNUNET_strdup (tname);
1706   va->addr_len = addrlen;
1707   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1708                                             (unsigned int) -1);
1709   memcpy (&va[1], addr, addrlen);
1710   return GNUNET_OK;
1711 }
1712
1713
1714 /**
1715  * Check if addresses in validated hello "h" overlap with
1716  * those in "chvc->hello" and update "chvc->hello" accordingly,
1717  * removing those addresses that have already been validated.
1718  */
1719 static void
1720 check_hello_validated (void *cls,
1721                        const struct GNUNET_PeerIdentity *peer,
1722                        const struct GNUNET_HELLO_Message *h, 
1723                        uint32_t trust)
1724 {
1725   struct CheckHelloValidatedContext *chvc = cls;
1726   struct ValidationAddress *va;
1727   struct TransportPlugin *tp;
1728   int first_call;
1729   struct GNUNET_PeerIdentity apeer;
1730
1731   first_call = GNUNET_NO;
1732   if (chvc->e == NULL)
1733     {
1734       first_call = GNUNET_YES;
1735       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1736       GNUNET_assert (GNUNET_OK ==
1737                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1738                                            &chvc->e->publicKey));
1739       chvc->e->timeout =
1740         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1741       chvc->e->next = pending_validations;
1742       pending_validations = chvc->e;
1743     }
1744   if (h != NULL)
1745     {
1746       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1747                                           h,
1748                                           GNUNET_TIME_absolute_get (),
1749                                           &run_validation, chvc->e);
1750     }
1751   else if (GNUNET_YES == first_call)
1752     {
1753       /* no existing HELLO, all addresses are new */
1754       GNUNET_HELLO_iterate_addresses (chvc->hello,
1755                                       GNUNET_NO, &run_validation, chvc->e);
1756     }
1757   if (h != NULL)
1758     return;                     /* wait for next call */
1759   /* finally, transmit validation attempts */
1760   GNUNET_assert (GNUNET_OK ==
1761                  GNUNET_HELLO_get_id (chvc->hello,
1762                                       &apeer));
1763 #if DEBUG_TRANSPORT
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Ready to validate addresses from `%s' message for peer `%4s'\n",
1766               "HELLO", GNUNET_i2s (&apeer));
1767 #endif
1768   va = chvc->e->addresses;
1769   while (va != NULL)
1770     {
1771 #if DEBUG_TRANSPORT
1772       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773                   "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n",
1774                   va->transport_name,
1775                   "HELLO",
1776                   GNUNET_a2s ((const struct sockaddr*) &va[1], 
1777                               va->addr_len),
1778                   GNUNET_i2s (&apeer));
1779 #endif
1780       tp = find_transport (va->transport_name);
1781       GNUNET_assert (tp != NULL);
1782       if (GNUNET_OK !=
1783           tp->api->validate (tp->api->cls,
1784                              &apeer,
1785                              va->challenge,
1786                              HELLO_VERIFICATION_TIMEOUT,
1787                              &va[1],
1788                              va->addr_len))
1789         va->ok = GNUNET_SYSERR;
1790       va = va->next;
1791     }
1792   GNUNET_SCHEDULER_add_delayed (sched,
1793                                 GNUNET_TIME_absolute_get_remaining (chvc->e->timeout), 
1794                                 &cleanup_validation, NULL);
1795   GNUNET_free (chvc);
1796 }
1797
1798
1799 /**
1800  * Process HELLO-message.
1801  *
1802  * @param plugin transport involved, may be NULL
1803  * @param message the actual message
1804  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1805  */
1806 static int
1807 process_hello (struct TransportPlugin *plugin,
1808                const struct GNUNET_MessageHeader *message)
1809 {
1810   struct ValidationList *e;
1811   uint16_t hsize;
1812   struct GNUNET_PeerIdentity target;
1813   const struct GNUNET_HELLO_Message *hello;
1814   struct CheckHelloValidatedContext *chvc;
1815   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1816
1817   hsize = ntohs (message->size);
1818   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1819       (hsize < sizeof (struct GNUNET_MessageHeader)))
1820     {
1821       GNUNET_break (0);
1822       return GNUNET_SYSERR;
1823     }
1824   /* first, check if load is too high */
1825   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1826     {
1827       /* TODO: call to stats? */
1828       return GNUNET_OK;
1829     }
1830   hello = (const struct GNUNET_HELLO_Message *) message;
1831   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1832     {
1833       GNUNET_break_op (0);
1834       return GNUNET_SYSERR;
1835     }
1836   GNUNET_CRYPTO_hash (&publicKey,
1837                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1838                       &target.hashPubKey);
1839 #if DEBUG_TRANSPORT
1840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841               "Processing `%s' message for `%4s'\n",
1842               "HELLO", GNUNET_i2s (&target));
1843 #endif
1844   /* check if a HELLO for this peer is already on the validation list */
1845   e = pending_validations;
1846   while (e != NULL)
1847     {
1848       if (0 == memcmp (&e->publicKey,
1849                        &publicKey,
1850                        sizeof (struct
1851                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1852         {
1853           /* TODO: call to stats? */
1854 #if DEBUG_TRANSPORT
1855           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1856                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1857                       "HELLO", GNUNET_i2s (&target));
1858 #endif    
1859           return GNUNET_OK;
1860         }
1861       e = e->next;
1862     }
1863   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1864   chvc->plugin = plugin;
1865   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1866   memcpy (chvc->hello, hello, hsize);
1867   /* finally, check if HELLO was previously validated
1868      (continuation will then schedule actual validation) */
1869   GNUNET_PEERINFO_for_all (cfg,
1870                            sched,
1871                            &target,
1872                            0,
1873                            HELLO_VERIFICATION_TIMEOUT,
1874                            &check_hello_validated, chvc);
1875   return GNUNET_OK;
1876 }
1877
1878
1879 /**
1880  * The peer specified by the given neighbour has timed-out or a plugin
1881  * has disconnected.  We may either need to do nothing (other plugins
1882  * still up), or trigger a full disconnect and clean up.  This
1883  * function updates our state and do the necessary notifications.
1884  * Also notifies our clients that the neighbour is now officially
1885  * gone.
1886  *
1887  * @param n the neighbour list entry for the peer
1888  * @param check should we just check if all plugins
1889  *        disconnected or must we ask all plugins to
1890  *        disconnect?
1891  */
1892 static void
1893 disconnect_neighbour (struct NeighbourList *n,
1894                       int check)
1895 {
1896   struct ReadyList *rpos;
1897   struct NeighbourList *npos;
1898   struct NeighbourList *nprev;
1899   struct MessageQueue *mq;
1900   
1901   if (GNUNET_YES == check)
1902     {
1903       rpos = n->plugins;
1904       while (NULL != rpos)
1905         {
1906           if (GNUNET_YES == rpos->connected)
1907             return; /* still connected */
1908           rpos = rpos->next;
1909         }
1910     }
1911
1912 #if DEBUG_TRANSPORT
1913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1914               "Disconnecting from `%4s'\n",
1915               GNUNET_i2s(&n->id));
1916 #endif
1917   /* remove n from neighbours list */
1918   nprev = NULL;
1919   npos = neighbours;
1920   while ((npos != NULL) && (npos != n))
1921     {
1922       nprev = npos;
1923       npos = npos->next;
1924     }
1925   GNUNET_assert (npos != NULL);
1926   if (nprev == NULL)
1927     neighbours = n->next;
1928   else
1929     nprev->next = n->next;
1930
1931   /* notify all clients about disconnect */
1932   notify_clients_disconnect (&n->id);
1933
1934   /* clean up all plugins, cancel connections and pending transmissions */
1935   while (NULL != (rpos = n->plugins))
1936     {
1937       n->plugins = rpos->next;
1938       GNUNET_assert (rpos->neighbour == n);
1939       if (GNUNET_YES == rpos->connected)
1940         rpos->plugin->api->cancel (rpos->plugin->api->cls,
1941                                    rpos->plugin_handle,
1942                                    rpos,
1943                                    &n->id);
1944       GNUNET_free (rpos);
1945     }
1946
1947   /* free all messages on the queue */
1948   while (NULL != (mq = n->messages))
1949     {
1950       n->messages = mq->next;
1951       GNUNET_assert (mq->neighbour == n);
1952       GNUNET_free (mq);
1953     }
1954   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1955     GNUNET_SCHEDULER_cancel (sched,
1956                              n->timeout_task);
1957   /* finally, free n itself */
1958   GNUNET_free (n);
1959 }
1960
1961
1962 /**
1963  * Add an entry for each of our transport plugins
1964  * (that are able to send) to the list of plugins
1965  * for this neighbour.
1966  *
1967  * @param neighbour to initialize
1968  */
1969 static void
1970 add_plugins (struct NeighbourList *neighbour)
1971 {
1972   struct TransportPlugin *tp;
1973   struct ReadyList *rl;
1974
1975   neighbour->retry_plugins_time
1976     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1977   tp = plugins;
1978   while (tp != NULL)
1979     {
1980       if (tp->api->send != NULL)
1981         {
1982           rl = GNUNET_malloc (sizeof (struct ReadyList));
1983           rl->next = neighbour->plugins;
1984           neighbour->plugins = rl;
1985           rl->plugin = tp;
1986           rl->neighbour = neighbour;
1987           rl->transmit_ready = GNUNET_YES;
1988         }
1989       tp = tp->next;
1990     }
1991 }
1992
1993
1994 static void
1995 neighbour_timeout_task (void *cls,
1996                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1997 {
1998   struct NeighbourList *n = cls;
1999
2000 #if DEBUG_TRANSPORT
2001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2002               "Neighbour `%4s' has timed out!\n",
2003               GNUNET_i2s(&n->id));
2004 #endif
2005   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2006   disconnect_neighbour (n, GNUNET_NO);
2007 }
2008
2009
2010 /**
2011  * Create a fresh entry in our neighbour list for the given peer.
2012  * Will try to transmit our current HELLO to the new neighbour.  Also
2013  * notifies our clients about the new "connection".
2014  *
2015  * @param peer the peer for which we create the entry
2016  * @return the new neighbour list entry
2017  */
2018 static struct NeighbourList *
2019 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
2020 {
2021   struct NeighbourList *n;
2022
2023 #if DEBUG_TRANSPORT
2024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2025               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
2026               GNUNET_i2s (peer));
2027 #endif
2028   GNUNET_assert (our_hello != NULL);
2029   n = GNUNET_malloc (sizeof (struct NeighbourList));
2030   n->next = neighbours;
2031   neighbours = n;
2032   n->id = *peer;
2033   n->last_quota_update = GNUNET_TIME_absolute_get ();
2034   n->peer_timeout =
2035     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2036   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2037   add_plugins (n);
2038   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
2039                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2040                                                   &neighbour_timeout_task, n);
2041   transmit_to_peer (NULL, 0,
2042                     (const struct GNUNET_MessageHeader *) our_hello,
2043                     GNUNET_YES, n);
2044   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
2045   return n;
2046 }
2047
2048
2049 /**
2050  * Function called by the plugin for each received message.
2051  * Update data volumes, possibly notify plugins about
2052  * reducing the rate at which they read from the socket
2053  * and generally forward to our receive callback.
2054  *
2055  * @param cls the "struct TransportPlugin *" we gave to the plugin
2056  * @param plugin_context value to pass to this plugin
2057  *        to respond to the given peer (use is optional,
2058  *        but may speed up processing)
2059  * @param service_context value passed to the transport-service
2060  *        to identify the neighbour; will be NULL on the first
2061  *        call for a given peer
2062  * @param latency estimated latency for communicating with the
2063  *             given peer
2064  * @param peer (claimed) identity of the other peer
2065  * @param message the message, NULL if peer was disconnected
2066  * @return the new service_context that the plugin should use
2067  *         for future receive calls for messages from this
2068  *         particular peer
2069  */
2070 static struct ReadyList *
2071 plugin_env_receive (void *cls,
2072                     void *plugin_context,
2073                     struct ReadyList *service_context,
2074                     struct GNUNET_TIME_Relative latency,
2075                     const struct GNUNET_PeerIdentity *peer,
2076                     const struct GNUNET_MessageHeader *message)
2077 {
2078   const struct GNUNET_MessageHeader ack = {
2079     htons (sizeof (struct GNUNET_MessageHeader)),
2080     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
2081   };
2082   struct TransportPlugin *plugin = cls;
2083   struct TransportClient *cpos;
2084   struct InboundMessage *im;
2085   uint16_t msize;
2086   struct NeighbourList *n;
2087
2088   if (service_context != NULL)
2089     {
2090       n = service_context->neighbour;
2091       GNUNET_assert (n != NULL);
2092     }
2093   else
2094     {
2095       n = find_neighbour (peer);
2096       if (n == NULL)
2097         {
2098           if (message == NULL)
2099             return NULL;        /* disconnect of peer already marked down */
2100           n = setup_new_neighbour (peer);
2101         }
2102       service_context = n->plugins;
2103       while ((service_context != NULL) && (plugin != service_context->plugin))
2104         service_context = service_context->next;
2105       GNUNET_assert ((plugin->api->send == NULL) ||
2106                      (service_context != NULL));
2107     }
2108   if (message == NULL)
2109     {
2110 #if DEBUG_TRANSPORT
2111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2112                   "Receive failed from `%4s', triggering disconnect\n",
2113                   GNUNET_i2s(&n->id));
2114 #endif
2115       /* TODO: call stats */
2116       if ((service_context != NULL) &&
2117           (service_context->plugin_handle == plugin_context))
2118         {
2119           service_context->connected = GNUNET_NO;
2120           service_context->plugin_handle = NULL;
2121         }
2122       disconnect_neighbour (n, GNUNET_YES);
2123       return NULL;
2124     }
2125 #if DEBUG_TRANSPORT
2126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2127               "Processing message of type `%u' received by plugin...\n",
2128               ntohs (message->type));
2129 #endif
2130   if (service_context != NULL)
2131     {
2132       if (service_context->connected == GNUNET_NO)
2133         {
2134           service_context->connected = GNUNET_YES;
2135           service_context->transmit_ready = GNUNET_YES;
2136           service_context->connect_attempts++;
2137         }
2138       service_context->timeout
2139         = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2140       service_context->plugin_handle = plugin_context;
2141       service_context->latency = latency;
2142     }
2143   /* update traffic received amount ... */
2144   msize = ntohs (message->size);
2145   n->last_received += msize;
2146   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2147   n->peer_timeout =
2148     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2149   n->timeout_task =
2150     GNUNET_SCHEDULER_add_delayed (sched, 
2151                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2152                                   &neighbour_timeout_task, n);
2153   update_quota (n);
2154   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2155     {
2156       /* dropping message due to frequent inbound volume violations! */
2157       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2158                   GNUNET_ERROR_TYPE_BULK,
2159                   _
2160                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2161       /* TODO: call stats */
2162       GNUNET_assert ( (service_context == NULL) ||
2163                       (NULL != service_context->neighbour) );
2164       return service_context;
2165     }
2166   switch (ntohs (message->type))
2167     {
2168     case GNUNET_MESSAGE_TYPE_HELLO:
2169 #if DEBUG_TRANSPORT
2170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2171                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2172                   GNUNET_i2s(peer));
2173 #endif
2174       process_hello (plugin, message);
2175 #if DEBUG_TRANSPORT
2176       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2178                   GNUNET_i2s(peer));
2179 #endif
2180       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2181       break;
2182     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2183       n->saw_ack = GNUNET_YES;
2184       /* intentional fall-through! */
2185     default:
2186 #if DEBUG_TRANSPORT
2187       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2188                   "Received message of type %u from `%4s', sending to all clients.\n",
2189                   ntohs (message->type),
2190                   GNUNET_i2s(peer));
2191 #endif
2192       /* transmit message to all clients */
2193       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2194       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2195       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2196       im->latency = GNUNET_TIME_relative_hton (latency);
2197       im->peer = *peer;
2198       memcpy (&im[1], message, msize);
2199
2200       cpos = clients;
2201       while (cpos != NULL)
2202         {
2203           transmit_to_client (cpos, &im->header, GNUNET_YES);
2204           cpos = cpos->next;
2205         }
2206       GNUNET_free (im);
2207     }
2208   GNUNET_assert ( (service_context == NULL) ||
2209                   (NULL != service_context->neighbour) );
2210   return service_context;
2211 }
2212
2213
2214 /**
2215  * Handle START-message.  This is the first message sent to us
2216  * by any client which causes us to add it to our list.
2217  *
2218  * @param cls closure (always NULL)
2219  * @param client identification of the client
2220  * @param message the actual message
2221  */
2222 static void
2223 handle_start (void *cls,
2224               struct GNUNET_SERVER_Client *client,
2225               const struct GNUNET_MessageHeader *message)
2226 {
2227   struct TransportClient *c;
2228   struct ConnectInfoMessage cim;
2229   struct NeighbourList *n;
2230   struct InboundMessage *im;
2231   struct GNUNET_MessageHeader *ack;
2232
2233 #if DEBUG_TRANSPORT
2234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2235               "Received `%s' request from client\n", "START");
2236 #endif
2237   c = clients;
2238   while (c != NULL)
2239     {
2240       if (c->client == client)
2241         {
2242           /* client already on our list! */
2243           GNUNET_break (0);
2244           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2245           return;
2246         }
2247       c = c->next;
2248     }
2249   c = GNUNET_malloc (sizeof (struct TransportClient));
2250   c->next = clients;
2251   clients = c;
2252   c->client = client;
2253   if (our_hello != NULL)
2254     {
2255 #if DEBUG_TRANSPORT
2256       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2257                   "Sending our own `%s' to new client\n",
2258                   "HELLO");
2259 #endif
2260       transmit_to_client (c,
2261                           (const struct GNUNET_MessageHeader *) our_hello,
2262                           GNUNET_NO);
2263       /* tell new client about all existing connections */
2264       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2265       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2266       cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2267       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2268       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2269                           sizeof (struct GNUNET_MessageHeader));
2270       im->header.size = htons (sizeof (struct InboundMessage) +
2271                                sizeof (struct GNUNET_MessageHeader));
2272       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2273       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2274       ack = (struct GNUNET_MessageHeader *) &im[1];
2275       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2276       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2277       for (n = neighbours; n != NULL; n = n->next)
2278         {
2279           cim.id = n->id;
2280           transmit_to_client (c, &cim.header, GNUNET_NO);
2281           if (n->saw_ack)
2282             {
2283               im->peer = n->id;
2284               transmit_to_client (c, &im->header, GNUNET_NO);
2285             }
2286         }
2287       GNUNET_free (im);
2288     }
2289   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2290 }
2291
2292
2293 /**
2294  * Handle HELLO-message.
2295  *
2296  * @param cls closure (always NULL)
2297  * @param client identification of the client
2298  * @param message the actual message
2299  */
2300 static void
2301 handle_hello (void *cls,
2302               struct GNUNET_SERVER_Client *client,
2303               const struct GNUNET_MessageHeader *message)
2304 {
2305   int ret;
2306
2307 #if DEBUG_TRANSPORT
2308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309               "Received `%s' request from client\n", "HELLO");
2310 #endif
2311   ret = process_hello (NULL, message);
2312   GNUNET_SERVER_receive_done (client, ret);
2313 }
2314
2315
2316 /**
2317  * Handle SEND-message.
2318  *
2319  * @param cls closure (always NULL)
2320  * @param client identification of the client
2321  * @param message the actual message
2322  */
2323 static void
2324 handle_send (void *cls,
2325              struct GNUNET_SERVER_Client *client,
2326              const struct GNUNET_MessageHeader *message)
2327 {
2328   struct TransportClient *tc;
2329   struct NeighbourList *n;
2330   const struct OutboundMessage *obm;
2331   const struct GNUNET_MessageHeader *obmm;
2332   uint16_t size;
2333   uint16_t msize;
2334
2335   size = ntohs (message->size);
2336   if (size <
2337       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2338     {
2339       GNUNET_break (0);
2340       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2341       return;
2342     }
2343   obm = (const struct OutboundMessage *) message;
2344 #if DEBUG_TRANSPORT
2345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346               "Received `%s' request from client with target `%4s'\n",
2347               "SEND", GNUNET_i2s (&obm->peer));
2348 #endif
2349   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2350   msize = ntohs (obmm->size);
2351   if (size != msize + sizeof (struct OutboundMessage))
2352     {
2353       GNUNET_break (0);
2354       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2355       return;
2356     }
2357   n = find_neighbour (&obm->peer);
2358   if (n == NULL)
2359     n = setup_new_neighbour (&obm->peer);
2360   tc = clients;
2361   while ((tc != NULL) && (tc->client != client))
2362     tc = tc->next;
2363
2364 #if DEBUG_TRANSPORT
2365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2367               ntohs (obmm->size),
2368               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2369 #endif
2370   transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n);
2371   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2372 }
2373
2374
2375 /**
2376  * Handle SET_QUOTA-message.
2377  *
2378  * @param cls closure (always NULL)
2379  * @param client identification of the client
2380  * @param message the actual message
2381  */
2382 static void
2383 handle_set_quota (void *cls,
2384                   struct GNUNET_SERVER_Client *client,
2385                   const struct GNUNET_MessageHeader *message)
2386 {
2387   const struct QuotaSetMessage *qsm =
2388     (const struct QuotaSetMessage *) message;
2389   struct NeighbourList *n;
2390   struct TransportPlugin *p;
2391   struct ReadyList *rl;
2392
2393 #if DEBUG_TRANSPORT
2394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2395               "Received `%s' request from client for peer `%4s'\n",
2396               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2397 #endif
2398   n = find_neighbour (&qsm->peer);
2399   if (n == NULL)
2400     {
2401       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2402       return;
2403     }
2404   update_quota (n);
2405   if (n->quota_in < ntohl (qsm->quota_in))
2406     n->last_quota_update = GNUNET_TIME_absolute_get ();
2407   n->quota_in = ntohl (qsm->quota_in);
2408   rl = n->plugins;
2409   while (rl != NULL)
2410     {
2411       p = rl->plugin;
2412       p->api->set_receive_quota (p->api->cls,
2413                                  &qsm->peer, ntohl (qsm->quota_in));
2414       rl = rl->next;
2415     }
2416   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2417 }
2418
2419
2420 /**
2421  * Handle TRY_CONNECT-message.
2422  *
2423  * @param cls closure (always NULL)
2424  * @param client identification of the client
2425  * @param message the actual message
2426  */
2427 static void
2428 handle_try_connect (void *cls,
2429                     struct GNUNET_SERVER_Client *client,
2430                     const struct GNUNET_MessageHeader *message)
2431 {
2432   const struct TryConnectMessage *tcm;
2433
2434   tcm = (const struct TryConnectMessage *) message;
2435 #if DEBUG_TRANSPORT
2436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2437               "Received `%s' request from client %p asking to connect to `%4s'\n",
2438               "TRY_CONNECT",
2439               client,
2440               GNUNET_i2s (&tcm->peer));
2441 #endif
2442   if (NULL == find_neighbour (&tcm->peer))
2443     setup_new_neighbour (&tcm->peer);
2444 #if DEBUG_TRANSPORT
2445   else
2446     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2447                 "Client asked to connect to `%4s', but connection already exists\n",
2448                 "TRY_CONNECT", 
2449                 GNUNET_i2s (&tcm->peer));
2450 #endif    
2451   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2452 }
2453
2454
2455 /**
2456  * List of handlers for the messages understood by this
2457  * service.
2458  */
2459 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2460   {&handle_start, NULL,
2461    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2462   {&handle_hello, NULL,
2463    GNUNET_MESSAGE_TYPE_HELLO, 0},
2464   {&handle_send, NULL,
2465    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2466   {&handle_set_quota, NULL,
2467    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2468   {&handle_try_connect, NULL,
2469    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2470    sizeof (struct TryConnectMessage)},
2471   {NULL, NULL, 0, 0}
2472 };
2473
2474
2475 /**
2476  * Setup the environment for this plugin.
2477  */
2478 static void
2479 create_environment (struct TransportPlugin *plug)
2480 {
2481   plug->env.cfg = cfg;
2482   plug->env.sched = sched;
2483   plug->env.my_public_key = &my_public_key;
2484   plug->env.my_private_key = my_private_key;
2485   plug->env.my_identity = &my_identity;
2486   plug->env.cls = plug;
2487   plug->env.receive = &plugin_env_receive;
2488   plug->env.lookup = &plugin_env_lookup_address;
2489   plug->env.notify_address = &plugin_env_notify_address;
2490   plug->env.notify_validation = &plugin_env_notify_validation;
2491   plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2492   plug->env.max_connections = max_connect_per_transport;
2493 }
2494
2495
2496 /**
2497  * Start the specified transport (load the plugin).
2498  */
2499 static void
2500 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2501 {
2502   struct TransportPlugin *plug;
2503   char *libname;
2504
2505   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2506               _("Loading `%s' transport plugin\n"), name);
2507   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2508   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2509   create_environment (plug);
2510   plug->short_name = GNUNET_strdup (name);
2511   plug->lib_name = libname;
2512   plug->next = plugins;
2513   plugins = plug;
2514   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2515   if (plug->api == NULL)
2516     {
2517       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2518                   _("Failed to load transport plugin for `%s'\n"), name);
2519       GNUNET_free (plug->short_name);
2520       plugins = plug->next;
2521       GNUNET_free (libname);
2522       GNUNET_free (plug);
2523     }
2524 }
2525
2526
2527 /**
2528  * Called whenever a client is disconnected.  Frees our
2529  * resources associated with that client.
2530  *
2531  * @param cls closure
2532  * @param client identification of the client
2533  */
2534 static void
2535 client_disconnect_notification (void *cls,
2536                                 struct GNUNET_SERVER_Client *client)
2537 {
2538   struct TransportClient *pos;
2539   struct TransportClient *prev;
2540   struct ClientMessageQueueEntry *mqe;
2541
2542 #if DEBUG_TRANSPORT
2543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2544               "Client disconnected, cleaning up.\n");
2545 #endif
2546   prev = NULL;
2547   pos = clients;
2548   while ((pos != NULL) && (pos->client != client))
2549     {
2550       prev = pos;
2551       pos = pos->next;
2552     }
2553   if (pos == NULL)
2554     return;
2555   while (NULL != (mqe = pos->message_queue_head))
2556     {
2557       pos->message_queue_head = mqe->next;
2558       GNUNET_free (mqe);
2559     }
2560   pos->message_queue_head = NULL;
2561   if (prev == NULL)
2562     clients = pos->next;
2563   else
2564     prev->next = pos->next;
2565   if (GNUNET_YES == pos->tcs_pending)
2566     {
2567       pos->client = NULL;
2568       return;
2569     }
2570   GNUNET_free (pos);
2571 }
2572
2573
2574 /**
2575  * Function called when the service shuts down.  Unloads our plugins.
2576  *
2577  * @param cls closure, unused
2578  * @param tc task context (unused)
2579  */
2580 static void
2581 unload_plugins (void *cls, 
2582                 const struct GNUNET_SCHEDULER_TaskContext *tc)
2583 {
2584   struct TransportPlugin *plug;
2585   struct AddressList *al;
2586
2587 #if DEBUG_TRANSPORT
2588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2589               "Transport service is unloading plugins...\n");
2590 #endif
2591   while (NULL != (plug = plugins))
2592     {
2593       plugins = plug->next;
2594       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2595       GNUNET_free (plug->lib_name);
2596       GNUNET_free (plug->short_name);
2597       while (NULL != (al = plug->addresses))
2598         {
2599           plug->addresses = al->next;
2600           GNUNET_free (al);
2601         }
2602       GNUNET_free (plug);
2603     }
2604   if (my_private_key != NULL)
2605     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2606   GNUNET_free_non_null (our_hello);
2607 }
2608
2609
2610 /**
2611  * Initiate transport service.
2612  *
2613  * @param cls closure
2614  * @param s scheduler to use
2615  * @param serv the initialized server
2616  * @param c configuration to use
2617  */
2618 static void
2619 run (void *cls,
2620      struct GNUNET_SCHEDULER_Handle *s,
2621      struct GNUNET_SERVER_Handle *serv, 
2622      const struct GNUNET_CONFIGURATION_Handle *c)
2623 {
2624   char *plugs;
2625   char *pos;
2626   int no_transports;
2627   unsigned long long tneigh;
2628   char *keyfile;
2629
2630   sched = s;
2631   cfg = c;
2632   /* parse configuration */
2633   if ((GNUNET_OK !=
2634        GNUNET_CONFIGURATION_get_value_number (c,
2635                                               "TRANSPORT",
2636                                               "NEIGHBOUR_LIMIT",
2637                                               &tneigh)) ||
2638       (GNUNET_OK !=
2639        GNUNET_CONFIGURATION_get_value_filename (c,
2640                                                 "GNUNETD",
2641                                                 "HOSTKEY", &keyfile)))
2642     {
2643       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2644                   _
2645                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2646       GNUNET_SCHEDULER_shutdown (s);
2647       return;
2648     }
2649   max_connect_per_transport = (uint32_t) tneigh;
2650   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2651   GNUNET_free (keyfile);
2652   if (my_private_key == NULL)
2653     {
2654       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2655                   _
2656                   ("Transport service could not access hostkey.  Exiting.\n"));
2657       GNUNET_SCHEDULER_shutdown (s);
2658       return;
2659     }
2660   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2661   GNUNET_CRYPTO_hash (&my_public_key,
2662                       sizeof (my_public_key), &my_identity.hashPubKey);
2663   /* setup notification */
2664   server = serv;
2665   GNUNET_SERVER_disconnect_notify (server,
2666                                    &client_disconnect_notification, NULL);
2667   /* load plugins... */
2668   no_transports = 1;
2669   if (GNUNET_OK ==
2670       GNUNET_CONFIGURATION_get_value_string (c,
2671                                              "TRANSPORT", "PLUGINS", &plugs))
2672     {
2673       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2674                   _("Starting transport plugins `%s'\n"), plugs);
2675       pos = strtok (plugs, " ");
2676       while (pos != NULL)
2677         {
2678           start_transport (server, pos);
2679           no_transports = 0;
2680           pos = strtok (NULL, " ");
2681         }
2682       GNUNET_free (plugs);
2683     }
2684   GNUNET_SCHEDULER_add_delayed (sched,
2685                                 GNUNET_TIME_UNIT_FOREVER_REL,
2686                                 &unload_plugins, NULL);
2687   if (no_transports)
2688     refresh_hello ();
2689 #if DEBUG_TRANSPORT
2690   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2691               _("Transport service ready.\n"));
2692 #endif
2693   /* process client requests */
2694   GNUNET_SERVER_add_handlers (server, handlers);
2695 }
2696
2697
2698 /**
2699  * The main function for the transport service.
2700  *
2701  * @param argc number of arguments from the command line
2702  * @param argv command line arguments
2703  * @return 0 ok, 1 on error
2704  */
2705 int
2706 main (int argc, char *const *argv)
2707 {
2708   return (GNUNET_OK ==
2709           GNUNET_SERVICE_run (argc,
2710                               argv,
2711                               "transport",
2712                               &run, NULL)) ? 0 : 1;
2713 }
2714
2715 /* end of gnunet-service-transport.c */