ad4aec8ca255fe32f750a15cf0f2a8d544981c03
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  */
71 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_UNIT_MINUTES
72
73 /**
74  * How often do we re-add (cheaper) plugins to our list of plugins
75  * to try for a given connected peer?
76  */
77 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
78
79 /**
80  * After how long do we expire an address in a HELLO
81  * that we just validated?  This value is also used
82  * for our own addresses when we create a HELLO.
83  */
84 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
85
86
87 /**
88  * Entry in linked list of network addresses.
89  */
90 struct AddressList
91 {
92   /**
93    * This is a linked list.
94    */
95   struct AddressList *next;
96
97   /**
98    * The address, actually a pointer to the end
99    * of this struct.  Do not free!
100    */
101   void *addr;
102
103   /**
104    * How long until we auto-expire this address (unless it is
105    * re-confirmed by the transport)?
106    */
107   struct GNUNET_TIME_Absolute expires;
108
109   /**
110    * Length of addr.
111    */
112   size_t addrlen;
113
114 };
115
116
117 /**
118  * Entry in linked list of all of our plugins.
119  */
120 struct TransportPlugin
121 {
122
123   /**
124    * This is a linked list.
125    */
126   struct TransportPlugin *next;
127
128   /**
129    * API of the transport as returned by the plugin's
130    * initialization function.
131    */
132   struct GNUNET_TRANSPORT_PluginFunctions *api;
133
134   /**
135    * Short name for the plugin (i.e. "tcp").
136    */
137   char *short_name;
138
139   /**
140    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
141    */
142   char *lib_name;
143
144   /**
145    * List of our known addresses for this transport.
146    */
147   struct AddressList *addresses;
148
149   /**
150    * Environment this transport service is using
151    * for this plugin.
152    */
153   struct GNUNET_TRANSPORT_PluginEnvironment env;
154
155   /**
156    * ID of task that is used to clean up expired addresses.
157    */
158   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
159
160
161   /**
162    * Set to GNUNET_YES if we need to scrap the existing
163    * list of "addresses" and start fresh when we receive
164    * the next address update from a transport.  Set to
165    * GNUNET_NO if we should just add the new address
166    * to the list and wait for the commit call.
167    */
168   int rebuild;
169 };
170
171 struct NeighbourList;
172
173 /**
174  * For each neighbour we keep a list of messages
175  * that we still want to transmit to the neighbour.
176  */
177 struct MessageQueue
178 {
179
180   /**
181    * This is a linked list.
182    */
183   struct MessageQueue *next;
184
185   /**
186    * The message we want to transmit.
187    */
188   struct GNUNET_MessageHeader *message;
189
190   /**
191    * Client responsible for queueing the message;
192    * used to check that a client has not two messages
193    * pending for the same target.  Can be NULL.
194    */
195   struct TransportClient *client;
196
197   /**
198    * Neighbour this entry belongs to.
199    */
200   struct NeighbourList *neighbour;
201
202   /**
203    * Plugin that we used for the transmission.
204    * NULL until we scheduled a transmission.
205    */
206   struct TransportPlugin *plugin;
207
208   /**
209    * Internal message of the transport system that should not be
210    * included in the usual SEND-SEND_OK transmission confirmation
211    * traffic management scheme.  Typically, "internal_msg" will
212    * be set whenever "client" is NULL (but it is not strictly
213    * required).
214    */
215   int internal_msg;
216
217   /**
218    * How important is the message?
219    */
220   unsigned int priority;
221   
222 };
223
224
225 /**
226  * For a given Neighbour, which plugins are available
227  * to talk to this peer and what are their costs?
228  */
229 struct ReadyList
230 {
231
232   /**
233    * This is a linked list.
234    */
235   struct ReadyList *next;
236
237   /**
238    * Which of our transport plugins does this entry
239    * represent?
240    */
241   struct TransportPlugin *plugin;
242
243   /**
244    * Neighbour this entry belongs to.
245    */
246   struct NeighbourList *neighbour;
247
248   /**
249    * Opaque handle (specific to the plugin) for the
250    * connection to our target; can be NULL.
251    */
252   void *plugin_handle;
253
254   /**
255    * What was the last latency observed for this plugin
256    * and peer?  Invalid if connected is GNUNET_NO.
257    */
258   struct GNUNET_TIME_Relative latency;
259
260   /**
261    * If we did not successfully transmit a message to the
262    * given peer via this connection during the specified
263    * time, we should consider the connection to be dead.
264    * This is used in the case that a TCP transport simply
265    * stalls writing to the stream but does not formerly
266    * get a signal that the other peer died.
267    */
268   struct GNUNET_TIME_Absolute timeout;
269
270   /**
271    * Is this plugin currently connected?  The first time
272    * we transmit or send data to a peer via a particular
273    * plugin, we set this to GNUNET_YES.  If we later get
274    * an error (disconnect notification or transmission
275    * failure), we set it back to GNUNET_NO.  Each time the
276    * value is set to GNUNET_YES, we increment the
277    * "connect_attempts" counter.  If that one reaches a
278    * particular threshold, we consider the plugin to not
279    * be working properly at this time for the given peer
280    * and remove it from the eligible list.
281    */
282   int connected;
283
284   /**
285    * How often have we tried to connect using this plugin?
286    */
287   unsigned int connect_attempts;
288
289   /**
290    * Is this plugin ready to transmit to the specific
291    * target?  GNUNET_NO if not.  Initially, all plugins
292    * are marked ready.  If a transmission is in progress,
293    * "transmit_ready" is set to GNUNET_NO.
294    */
295   int transmit_ready;
296
297 };
298
299
300 /**
301  * Entry in linked list of all of our current neighbours.
302  */
303 struct NeighbourList
304 {
305
306   /**
307    * This is a linked list.
308    */
309   struct NeighbourList *next;
310
311   /**
312    * Which of our transports is connected to this peer
313    * and what is their status?
314    */
315   struct ReadyList *plugins;
316
317   /**
318    * List of messages we would like to send to this peer;
319    * must contain at most one message per client.
320    */
321   struct MessageQueue *messages;
322
323   /**
324    * Identity of this neighbour.
325    */
326   struct GNUNET_PeerIdentity id;
327
328   /**
329    * ID of task scheduled to run when this peer is about to
330    * time out (will free resources associated with the peer).
331    */
332   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
333
334   /**
335    * How long until we should consider this peer dead
336    * (if we don't receive another message in the
337    * meantime)?
338    */
339   struct GNUNET_TIME_Absolute peer_timeout;
340
341   /**
342    * At what time did we reset last_received last?
343    */
344   struct GNUNET_TIME_Absolute last_quota_update;
345
346   /**
347    * At what time should we try to again add plugins to
348    * our ready list?
349    */
350   struct GNUNET_TIME_Absolute retry_plugins_time;
351
352   /**
353    * How many bytes have we received since the "last_quota_update"
354    * timestamp?
355    */
356   uint64_t last_received;
357
358   /**
359    * Global quota for inbound traffic for the neighbour in bytes/ms.
360    */
361   uint32_t quota_in;
362
363   /**
364    * How often has the other peer (recently) violated the
365    * inbound traffic limit?  Incremented by 10 per violation,
366    * decremented by 1 per non-violation (for each
367    * time interval).
368    */
369   unsigned int quota_violation_count;
370
371   /**
372    * Have we seen an ACK from this neighbour in the past?
373    * (used to make up a fake ACK for clients connecting after
374    * the neighbour connected to us).
375    */
376   int saw_ack;
377
378 };
379
380
381 /**
382  * Linked list of messages to be transmitted to
383  * the client.  Each entry is followed by the
384  * actual message.
385  */
386 struct ClientMessageQueueEntry
387 {
388   /**
389    * This is a linked list.
390    */
391   struct ClientMessageQueueEntry *next;
392 };
393
394
395 /**
396  * Client connected to the transport service.
397  */
398 struct TransportClient
399 {
400
401   /**
402    * This is a linked list.
403    */
404   struct TransportClient *next;
405
406   /**
407    * Handle to the client.
408    */
409   struct GNUNET_SERVER_Client *client;
410
411   /**
412    * Linked list of messages yet to be transmitted to
413    * the client.
414    */
415   struct ClientMessageQueueEntry *message_queue_head;
416
417   /**
418    * Tail of linked list of messages yet to be transmitted to the
419    * client.
420    */
421   struct ClientMessageQueueEntry *message_queue_tail;
422
423   /**
424    * Is a call to "transmit_send_continuation" pending?  If so, we
425    * must not free this struct (even if the corresponding client
426    * disconnects) and instead only remove it from the linked list and
427    * set the "client" field to NULL.
428    */
429   int tcs_pending;
430
431   /**
432    * Length of the list of messages pending for this client.
433    */
434   unsigned int message_count;
435
436 };
437
438
439 /**
440  * For each HELLO, we may have to validate multiple addresses;
441  * each address gets its own request entry.
442  */
443 struct ValidationAddress
444 {
445   /**
446    * This is a linked list.
447    */
448   struct ValidationAddress *next;
449
450   /**
451    * Name of the transport.
452    */
453   char *transport_name;
454
455   /**
456    * When should this validated address expire?
457    */
458   struct GNUNET_TIME_Absolute expiration;
459
460   /**
461    * Length of the address we are validating.
462    */
463   size_t addr_len;
464
465   /**
466    * Challenge number we used.
467    */
468   uint32_t challenge;
469
470   /**
471    * Set to GNUNET_YES if the challenge was met,
472    * GNUNET_SYSERR if we know it failed, GNUNET_NO
473    * if we are waiting on a response.
474    */
475   int ok;
476 };
477
478
479 /**
480  * Entry in linked list of all HELLOs awaiting validation.
481  */
482 struct ValidationList
483 {
484
485   /**
486    * This is a linked list.
487    */
488   struct ValidationList *next;
489
490   /**
491    * Linked list with one entry per address from the HELLO
492    * that needs to be validated.
493    */
494   struct ValidationAddress *addresses;
495
496   /**
497    * The public key of the peer.
498    */
499   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
500
501   /**
502    * When does this record time-out? (assuming the
503    * challenge goes unanswered)
504    */
505   struct GNUNET_TIME_Absolute timeout;
506
507 };
508
509
510 /**
511  * HELLOs awaiting validation.
512  */
513 static struct ValidationList *pending_validations;
514
515 /**
516  * Our HELLO message.
517  */
518 static struct GNUNET_HELLO_Message *our_hello;
519
520 /**
521  * "version" of "our_hello".  Used to see if a given
522  * neighbour has already been sent the latest version
523  * of our HELLO message.
524  */
525 static unsigned int our_hello_version;
526
527 /**
528  * Our public key.
529  */
530 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
531
532 /**
533  * Our identity.
534  */
535 static struct GNUNET_PeerIdentity my_identity;
536
537 /**
538  * Our private key.
539  */
540 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
541
542 /**
543  * Our scheduler.
544  */
545 struct GNUNET_SCHEDULER_Handle *sched;
546
547 /**
548  * Our configuration.
549  */
550 struct GNUNET_CONFIGURATION_Handle *cfg;
551
552 /**
553  * Linked list of all clients to this service.
554  */
555 static struct TransportClient *clients;
556
557 /**
558  * All loaded plugins.
559  */
560 static struct TransportPlugin *plugins;
561
562 /**
563  * Our server.
564  */
565 static struct GNUNET_SERVER_Handle *server;
566
567 /**
568  * All known neighbours and their HELLOs.
569  */
570 static struct NeighbourList *neighbours;
571
572 /**
573  * Number of neighbours we'd like to have.
574  */
575 static uint32_t max_connect_per_transport;
576
577
578 /**
579  * Find an entry in the neighbour list for a particular peer.
580  *
581  * @return NULL if not found.
582  */
583 static struct NeighbourList *
584 find_neighbour (const struct GNUNET_PeerIdentity *key)
585 {
586   struct NeighbourList *head = neighbours;
587   while ((head != NULL) &&
588          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
589     head = head->next;
590   return head;
591 }
592
593
594 /**
595  * Find an entry in the transport list for a particular transport.
596  *
597  * @return NULL if not found.
598  */
599 static struct TransportPlugin *
600 find_transport (const char *short_name)
601 {
602   struct TransportPlugin *head = plugins;
603   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
604     head = head->next;
605   return head;
606 }
607
608
609 /**
610  * Update the quota values for the given neighbour now.
611  */
612 static void
613 update_quota (struct NeighbourList *n)
614 {
615   struct GNUNET_TIME_Relative delta;
616   uint64_t allowed;
617   uint64_t remaining;
618
619   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
620   if (delta.value < MIN_QUOTA_REFRESH_TIME)
621     return;                     /* not enough time passed for doing quota update */
622   allowed = delta.value * n->quota_in;
623   if (n->last_received < allowed)
624     {
625       remaining = allowed - n->last_received;
626       if (n->quota_in > 0)
627         remaining /= n->quota_in;
628       else
629         remaining = 0;
630       if (remaining > MAX_BANDWIDTH_CARRY)
631         remaining = MAX_BANDWIDTH_CARRY;
632       n->last_received = 0;
633       n->last_quota_update = GNUNET_TIME_absolute_get ();
634       n->last_quota_update.value -= remaining;
635       if (n->quota_violation_count > 0)
636         n->quota_violation_count--;
637     }
638   else
639     {
640       n->last_received -= allowed;
641       n->last_quota_update = GNUNET_TIME_absolute_get ();
642       if (n->last_received > allowed)
643         {
644           /* more than twice the allowed rate! */
645           n->quota_violation_count += 10;
646         }
647     }
648 }
649
650
651 /**
652  * Function called to notify a client about the socket
653  * being ready to queue more data.  "buf" will be
654  * NULL and "size" zero if the socket was closed for
655  * writing in the meantime.
656  *
657  * @param cls closure
658  * @param size number of bytes available in buf
659  * @param buf where the callee should write the message
660  * @return number of bytes written to buf
661  */
662 static size_t
663 transmit_to_client_callback (void *cls, size_t size, void *buf)
664 {
665   struct TransportClient *client = cls;
666   struct ClientMessageQueueEntry *q;
667   uint16_t msize;
668   size_t tsize;
669   const struct GNUNET_MessageHeader *msg;
670   struct GNUNET_NETWORK_TransmitHandle *th;
671   char *cbuf;
672
673   if (buf == NULL)
674     {
675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676                   "Transmission to client failed, closing connection.\n");
677       /* fatal error with client, free message queue! */
678       while (NULL != (q = client->message_queue_head))
679         {
680           client->message_queue_head = q->next;
681           GNUNET_free (q);
682         }
683       client->message_queue_tail = NULL;
684       client->message_count = 0;
685       return 0;
686     }
687   cbuf = buf;
688   tsize = 0;
689   while (NULL != (q = client->message_queue_head))
690     {
691       msg = (const struct GNUNET_MessageHeader *) &q[1];
692       msize = ntohs (msg->size);
693       if (msize + tsize > size)
694         break;
695       client->message_queue_head = q->next;
696       if (q->next == NULL)
697         client->message_queue_tail = NULL;
698       memcpy (&cbuf[tsize], msg, msize);
699       tsize += msize;
700       GNUNET_free (q);
701       client->message_count--;
702     }
703   GNUNET_assert (tsize > 0);
704   if (NULL != q)
705     {
706       th = GNUNET_SERVER_notify_transmit_ready (client->client,
707                                                 msize,
708                                                 GNUNET_TIME_UNIT_FOREVER_REL,
709                                                 &transmit_to_client_callback,
710                                                 client);
711       GNUNET_assert (th != NULL);
712     }
713   return tsize;
714 }
715
716
717 /**
718  * Send the specified message to the specified client.  Since multiple
719  * messages may be pending for the same client at a time, this code
720  * makes sure that no message is lost.
721  *
722  * @param client client to transmit the message to
723  * @param msg the message to send
724  * @param may_drop can this message be dropped if the
725  *        message queue for this client is getting far too large?
726  */
727 static void
728 transmit_to_client (struct TransportClient *client,
729                     const struct GNUNET_MessageHeader *msg, int may_drop)
730 {
731   struct ClientMessageQueueEntry *q;
732   uint16_t msize;
733   struct GNUNET_NETWORK_TransmitHandle *th;
734
735   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
736     {
737       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
738                   _
739                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
740                   client->message_count, MAX_PENDING);
741       /* TODO: call to statistics... */
742       return;
743     }
744   client->message_count++;
745   msize = ntohs (msg->size);
746   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
747   memcpy (&q[1], msg, msize);
748   /* append to message queue */
749   if (client->message_queue_tail == NULL)
750     {
751       client->message_queue_tail = q;
752     }
753   else
754     {
755       client->message_queue_tail->next = q;
756       client->message_queue_tail = q;
757     }
758   if (client->message_queue_head == NULL)
759     {
760       client->message_queue_head = q;
761       th = GNUNET_SERVER_notify_transmit_ready (client->client,
762                                                 msize,
763                                                 GNUNET_TIME_UNIT_FOREVER_REL,
764                                                 &transmit_to_client_callback,
765                                                 client);
766       GNUNET_assert (th != NULL);
767     }
768 }
769
770
771 /**
772  * Find alternative plugins for communication.
773  *
774  * @param neighbour for which neighbour should we try to find
775  *        more plugins?
776  */
777 static void
778 try_alternative_plugins (struct NeighbourList *neighbour)
779 {
780   struct ReadyList *rl;
781
782   if ((neighbour->plugins != NULL) &&
783       (neighbour->retry_plugins_time.value >
784        GNUNET_TIME_absolute_get ().value))
785     return;                     /* don't try right now */
786   neighbour->retry_plugins_time
787     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
788
789   rl = neighbour->plugins;
790   while (rl != NULL)
791     {
792       if (rl->connect_attempts > 0)
793         rl->connect_attempts--; /* amnesty */
794       rl = rl->next;
795     }
796
797 }
798
799
800 /**
801  * Check the ready list for the given neighbour and
802  * if a plugin is ready for transmission (and if we
803  * have a message), do so!
804  *
805  * @param neighbour target peer for which to check the plugins
806  */
807 static void try_transmission_to_peer (struct NeighbourList *neighbour);
808
809
810 /**
811  * The peer specified by the given neighbour has timed-out.  Update
812  * our state and do the necessary notifications.  Also notifies
813  * our clients that the neighbour is now officially gone.
814  *
815  * @param n the neighbour list entry for the peer
816  */
817 static void
818 disconnect_neighbour (struct NeighbourList *n);
819
820
821 /**
822  * Function called by the GNUNET_TRANSPORT_TransmitFunction
823  * upon "completion" of a send request.  This tells the API
824  * that it is now legal to send another message to the given
825  * peer.
826  *
827  * @param cls closure, identifies the entry on the
828  *            message queue that was transmitted and the
829  *            client responsible for queueing the message
830  * @param rl identifies plugin used for the transmission for
831  *           this neighbour; needs to be re-enabled for
832  *           future transmissions
833  * @param target the peer receiving the message
834  * @param result GNUNET_OK on success, if the transmission
835  *           failed, we should not tell the client to transmit
836  *           more messages
837  */
838 static void
839 transmit_send_continuation (void *cls,
840                             struct ReadyList *rl,
841                             const struct GNUNET_PeerIdentity *target,
842                             int result)
843 {
844   struct MessageQueue *mq = cls;
845   struct SendOkMessage send_ok_msg;
846   struct NeighbourList *n;
847
848   GNUNET_assert (mq != NULL);
849   n = mq->neighbour;
850   GNUNET_assert (n != NULL);
851   GNUNET_assert (0 ==
852                  memcmp (&n->id, target,
853                          sizeof (struct GNUNET_PeerIdentity)));
854   if (rl == NULL)
855     {
856       rl = n->plugins;
857       while ((rl != NULL) && (rl->plugin != mq->plugin))
858         rl = rl->next;
859       GNUNET_assert (rl != NULL);
860     }
861   if (result == GNUNET_OK)
862     {
863       rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
864     }
865   else
866     {
867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                   "Transmission to peer `%s' failed, marking connection as down.\n",
869                   GNUNET_i2s(target));
870       rl->connected = GNUNET_NO;
871     }
872   if (!mq->internal_msg)
873     rl->transmit_ready = GNUNET_YES;
874   if (mq->client != NULL)
875     {
876       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877                   "Notifying client %p about failed transission to peer `%4s'.\n",
878                   mq->client,
879                   GNUNET_i2s(target));
880       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
881       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
882       send_ok_msg.success = htonl (result);
883       send_ok_msg.peer = n->id;
884       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
885     }
886   GNUNET_free (mq->message);
887   GNUNET_free (mq);
888   /* one plugin just became ready again, try transmitting
889      another message (if available) */
890   if (result == GNUNET_OK)
891     try_transmission_to_peer (n);
892   else
893     disconnect_neighbour (n); 
894 }
895
896
897 /**
898  * Check the ready list for the given neighbour and
899  * if a plugin is ready for transmission (and if we
900  * have a message), do so!
901  */
902 static void
903 try_transmission_to_peer (struct NeighbourList *neighbour)
904 {
905   struct ReadyList *pos;
906   struct GNUNET_TIME_Relative min_latency;
907   struct ReadyList *rl;
908   struct MessageQueue *mq;
909   struct GNUNET_TIME_Absolute now;
910
911   if (neighbour->messages == NULL)
912     return;                     /* nothing to do */
913   try_alternative_plugins (neighbour);
914   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
915   rl = NULL;
916   mq = neighbour->messages;
917   now = GNUNET_TIME_absolute_get ();
918   pos = neighbour->plugins;
919   while (pos != NULL)
920     {
921       /* set plugins that are inactive for a long time back to disconnected */
922       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
923         {
924 #if DEBUG_TRANSPORT
925           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926                       "Marking long-time inactive connection to `%4s' as down.\n",
927                       GNUNET_i2s (&neighbour->id));
928 #endif
929           pos->connected = GNUNET_NO;
930         }
931       if (((GNUNET_YES == pos->transmit_ready) ||
932            (mq->internal_msg)) &&
933           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
934           ((rl == NULL) || (min_latency.value > pos->latency.value)))
935         {
936           rl = pos;
937           min_latency = pos->latency;
938         }
939       pos = pos->next;
940     }
941   if (rl == NULL)
942     {
943 #if DEBUG_TRANSPORT
944       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945                   "No plugin ready to transmit message\n");
946 #endif
947       return;                   /* nobody ready */
948     }
949   if (GNUNET_NO == rl->connected)
950     {
951       rl->connect_attempts++;
952       rl->connected = GNUNET_YES;
953 #if DEBUG_TRANSPORT
954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955               "Establishing fresh connection with `%4s' via plugin `%s'\n",
956               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
957 #endif
958     }
959   neighbour->messages = mq->next;
960   mq->plugin = rl->plugin;
961   if (!mq->internal_msg)
962     rl->transmit_ready = GNUNET_NO;
963 #if DEBUG_TRANSPORT
964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
966               ntohs (mq->message->type),
967               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
968 #endif
969   rl->plugin_handle
970     = rl->plugin->api->send (rl->plugin->api->cls,
971                              rl->plugin_handle,
972                              rl,
973                              &neighbour->id,
974                              mq->priority,
975                              mq->message,
976                              GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
977                              &transmit_send_continuation, mq);
978 }
979
980
981 /**
982  * Send the specified message to the specified peer.
983  *
984  * @param client source of the transmission request (can be NULL)
985  * @param priority how important is the message
986  * @param msg message to send
987  * @param is_internal is this an internal message
988  * @param neighbour handle to the neighbour for transmission
989  */
990 static void
991 transmit_to_peer (struct TransportClient *client,
992                   unsigned int priority,
993                   const struct GNUNET_MessageHeader *msg,
994                   int is_internal, struct NeighbourList *neighbour)
995 {
996   struct MessageQueue *mq;
997   struct MessageQueue *mqe;
998   struct GNUNET_MessageHeader *m;
999
1000 #if DEBUG_TRANSPORT
1001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002               _("Sending message of type %u to peer `%4s'\n"),
1003               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
1004 #endif
1005   if (client != NULL)
1006     {
1007       /* check for duplicate submission */
1008       mq = neighbour->messages;
1009       while (NULL != mq)
1010         {
1011           if (mq->client == client)
1012             {
1013               /* client transmitted to same peer twice
1014                  before getting SendOk! */
1015               GNUNET_break (0);
1016               return;
1017             }
1018           mq = mq->next;
1019         }
1020     }
1021   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1022   mq->client = client;
1023   m = GNUNET_malloc (ntohs (msg->size));
1024   memcpy (m, msg, ntohs (msg->size));
1025   mq->message = m;
1026   mq->neighbour = neighbour;
1027   mq->internal_msg = is_internal;
1028   mq->priority = priority;
1029
1030   /* find tail */
1031   mqe = neighbour->messages;
1032   if (mqe != NULL)
1033     while (mqe->next != NULL)
1034       mqe = mqe->next;
1035   if (mqe == NULL)
1036     {
1037       /* new head */
1038       neighbour->messages = mq;
1039       try_transmission_to_peer (neighbour);
1040     }
1041   else
1042     {
1043       /* append */
1044       mqe->next = mq;
1045     }
1046 }
1047
1048
1049 /**
1050  * FIXME: document.
1051  */
1052 struct GeneratorContext
1053 {
1054   struct TransportPlugin *plug_pos;
1055   struct AddressList *addr_pos;
1056   struct GNUNET_TIME_Absolute expiration;
1057 };
1058
1059
1060 /**
1061  * FIXME: document.
1062  */
1063 static size_t
1064 address_generator (void *cls, size_t max, void *buf)
1065 {
1066   struct GeneratorContext *gc = cls;
1067   size_t ret;
1068
1069   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1070     {
1071       gc->plug_pos = gc->plug_pos->next;
1072       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1073     }
1074   if (NULL == gc->plug_pos)
1075     return 0;
1076   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1077                                   gc->expiration,
1078                                   gc->addr_pos->addr,
1079                                   gc->addr_pos->addrlen, buf, max);
1080   gc->addr_pos = gc->addr_pos->next;
1081   return ret;
1082 }
1083
1084
1085 /**
1086  * Construct our HELLO message from all of the addresses of
1087  * all of the transports.
1088  */
1089 static void
1090 refresh_hello ()
1091 {
1092   struct GNUNET_HELLO_Message *hello;
1093   struct TransportClient *cpos;
1094   struct NeighbourList *npos;
1095   struct GeneratorContext gc;
1096
1097 #if DEBUG_TRANSPORT
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1099               "Refreshing my `%s'\n",
1100               "HELLO");
1101 #endif
1102   gc.plug_pos = plugins;
1103   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1104   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1105   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1106   cpos = clients;
1107   while (cpos != NULL)
1108     {
1109       transmit_to_client (cpos,
1110                           (const struct GNUNET_MessageHeader *) hello,
1111                           GNUNET_NO);
1112       cpos = cpos->next;
1113     }
1114
1115   GNUNET_free_non_null (our_hello);
1116   our_hello = hello;
1117   our_hello_version++;
1118   npos = neighbours;
1119   while (npos != NULL)
1120     {
1121 #if DEBUG_TRANSPORT
1122       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1123                   "Transmitting updated `%s' to neighbour `%4s'\n",
1124                   "HELLO",
1125                   GNUNET_i2s(&npos->id));
1126 #endif
1127       transmit_to_peer (NULL, 0,
1128                         (const struct GNUNET_MessageHeader *) our_hello,
1129                         GNUNET_YES, npos);
1130       npos = npos->next;
1131     }
1132 }
1133
1134
1135 /**
1136  * Task used to clean up expired addresses for a plugin.
1137  *
1138  * @param cls closure
1139  * @param tc context
1140  */
1141 static void
1142 expire_address_task (void *cls,
1143                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1144
1145
1146 /**
1147  * Update the list of addresses for this plugin,
1148  * expiring those that are past their expiration date.
1149  *
1150  * @param plugin addresses of which plugin should be recomputed?
1151  * @param fresh set to GNUNET_YES if a new address was added
1152  *        and we need to regenerate the HELLO even if nobody
1153  *        expired
1154  */
1155 static void
1156 update_addresses (struct TransportPlugin *plugin, int fresh)
1157 {
1158   struct GNUNET_TIME_Relative min_remaining;
1159   struct GNUNET_TIME_Relative remaining;
1160   struct GNUNET_TIME_Absolute now;
1161   struct AddressList *pos;
1162   struct AddressList *prev;
1163   struct AddressList *next;
1164   int expired;
1165
1166   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1167     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1168   plugin->address_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1169   now = GNUNET_TIME_absolute_get ();
1170   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1171   expired = GNUNET_NO;
1172   prev = NULL;
1173   pos = plugin->addresses;
1174   while (pos != NULL)
1175     {
1176       next = pos->next;
1177       if (pos->expires.value < now.value)
1178         {
1179           expired = GNUNET_YES;
1180           if (prev == NULL)
1181             plugin->addresses = pos->next;
1182           else
1183             prev->next = pos->next;
1184           GNUNET_free (pos);
1185         }
1186       else
1187         {
1188           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1189           if (remaining.value < min_remaining.value)
1190             min_remaining = remaining;
1191           prev = pos;
1192         }
1193       pos = next;
1194     }
1195
1196   if (expired || fresh)
1197     refresh_hello ();
1198   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1199     plugin->address_update_task
1200       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1201                                       GNUNET_NO,
1202                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1203                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1204                                       min_remaining,
1205                                       &expire_address_task, plugin);
1206
1207 }
1208
1209
1210 /**
1211  * Task used to clean up expired addresses for a plugin.
1212  *
1213  * @param cls closure
1214  * @param tc context
1215  */
1216 static void
1217 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1218 {
1219   struct TransportPlugin *plugin = cls;
1220   plugin->address_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1221   update_addresses (plugin, GNUNET_NO);
1222 }
1223
1224
1225 /**
1226  * Function that must be called by each plugin to notify the
1227  * transport service about the addresses under which the transport
1228  * provided by the plugin can be reached.
1229  *
1230  * @param cls closure
1231  * @param name name of the transport that generated the address
1232  * @param addr one of the addresses of the host, NULL for the last address
1233  *        the specific address format depends on the transport
1234  * @param addrlen length of the address
1235  * @param expires when should this address automatically expire?
1236  */
1237 static void
1238 plugin_env_notify_address (void *cls,
1239                            const char *name,
1240                            const void *addr,
1241                            size_t addrlen,
1242                            struct GNUNET_TIME_Relative expires)
1243 {
1244   struct TransportPlugin *p = cls;
1245   struct AddressList *al;
1246   struct GNUNET_TIME_Absolute abex;
1247
1248   abex = GNUNET_TIME_relative_to_absolute (expires);
1249   GNUNET_assert (p == find_transport (name));
1250
1251   al = p->addresses;
1252   while (al != NULL)
1253     {
1254       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1255         {
1256           if (al->expires.value < abex.value)
1257             al->expires = abex;
1258           return;
1259         }
1260       al = al->next;
1261     }
1262 #if DEBUG_TRANSPORT
1263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264               "Plugin `%s' informs us about a new address `%s'\n", name,
1265               GNUNET_a2s(addr, addrlen));
1266 #endif
1267   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1268   al->addr = &al[1];
1269   al->next = p->addresses;
1270   p->addresses = al;
1271   al->expires = abex;
1272   al->addrlen = addrlen;
1273   memcpy (&al[1], addr, addrlen);
1274   update_addresses (p, GNUNET_YES);
1275 }
1276
1277
1278 /**
1279  * FIXME: document.
1280  */
1281 struct LookupHelloContext
1282 {
1283   GNUNET_TRANSPORT_AddressCallback iterator;
1284
1285   void *iterator_cls;
1286 };
1287
1288
1289 /**
1290  * FIXME: document.
1291  */
1292 static int
1293 lookup_address_callback (void *cls,
1294                          const char *tname,
1295                          struct GNUNET_TIME_Absolute expiration,
1296                          const void *addr, size_t addrlen)
1297 {
1298   struct LookupHelloContext *lhc = cls;
1299   lhc->iterator (lhc->iterator_cls, tname, addr, addrlen);
1300   return GNUNET_OK;
1301 }
1302
1303
1304 /**
1305  * FIXME: document.
1306  */
1307 static void
1308 lookup_hello_callback (void *cls,
1309                        const struct GNUNET_PeerIdentity *peer,
1310                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1311 {
1312   struct LookupHelloContext *lhc = cls;
1313
1314   if (peer == NULL)
1315     {
1316       lhc->iterator (lhc->iterator_cls, NULL, NULL, 0);
1317       GNUNET_free (lhc);
1318       return;
1319     }
1320   if (h == NULL)
1321     return;
1322   GNUNET_HELLO_iterate_addresses (h,
1323                                   GNUNET_NO, &lookup_address_callback, lhc);
1324 }
1325
1326
1327 /**
1328  * Function that allows a transport to query the known
1329  * network addresses for a given peer.
1330  *
1331  * @param cls closure
1332  * @param timeout after how long should we time out?
1333  * @param target which peer are we looking for?
1334  * @param iter function to call for each known address
1335  * @param iter_cls closure for iter
1336  */
1337 static void
1338 plugin_env_lookup_address (void *cls,
1339                            struct GNUNET_TIME_Relative timeout,
1340                            const struct GNUNET_PeerIdentity *target,
1341                            GNUNET_TRANSPORT_AddressCallback iter,
1342                            void *iter_cls)
1343 {
1344   struct LookupHelloContext *lhc;
1345
1346   lhc = GNUNET_malloc (sizeof (struct LookupHelloContext));
1347   lhc->iterator = iter;
1348   lhc->iterator_cls = iter_cls;
1349   GNUNET_PEERINFO_for_all (cfg,
1350                            sched,
1351                            target, 0, timeout, &lookup_hello_callback, &lhc);
1352 }
1353
1354
1355 /**
1356  * Notify all of our clients about a peer connecting.
1357  */
1358 static void
1359 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1360                         struct GNUNET_TIME_Relative latency)
1361 {
1362   struct ConnectInfoMessage cim;
1363   struct TransportClient *cpos;
1364
1365 #if DEBUG_TRANSPORT
1366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1367               "Informing clients about peer `%4s' connecting to us\n",
1368               GNUNET_i2s (peer));
1369 #endif
1370   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1371   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1372   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000));
1373   cim.latency = GNUNET_TIME_relative_hton (latency);
1374   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1375   cpos = clients;
1376   while (cpos != NULL)
1377     {
1378       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1379       cpos = cpos->next;
1380     }
1381 }
1382
1383
1384 /**
1385  * Notify all of our clients about a peer disconnecting.
1386  */
1387 static void
1388 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1389 {
1390   struct DisconnectInfoMessage dim;
1391   struct TransportClient *cpos;
1392
1393 #if DEBUG_TRANSPORT
1394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395               "Informing clients about peer `%4s' disconnecting\n",
1396               GNUNET_i2s (peer));
1397 #endif
1398   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1399   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1400   dim.reserved = htonl (0);
1401   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1402   cpos = clients;
1403   while (cpos != NULL)
1404     {
1405       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1406       cpos = cpos->next;
1407     }
1408 }
1409
1410
1411 /**
1412  * Copy any validated addresses to buf.
1413  *
1414  * @return 0 once all addresses have been
1415  *         returned
1416  */
1417 static size_t
1418 list_validated_addresses (void *cls, size_t max, void *buf)
1419 {
1420   struct ValidationAddress **va = cls;
1421   size_t ret;
1422
1423   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1424     *va = (*va)->next;
1425   if (NULL == *va)
1426     return 0;
1427   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1428                                   (*va)->expiration,
1429                                   &(*va)[1], (*va)->addr_len, buf, max);
1430   *va = (*va)->next;
1431   return ret;
1432 }
1433
1434
1435 /**
1436  * HELLO validation cleanup task.
1437  */
1438 static void
1439 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1440 {
1441   struct ValidationAddress *va;
1442   struct ValidationList *pos;
1443   struct ValidationList *prev;
1444   struct GNUNET_TIME_Absolute now;
1445   struct GNUNET_HELLO_Message *hello;
1446   struct GNUNET_PeerIdentity pid;
1447   struct NeighbourList *n;
1448
1449 #if DEBUG_TRANSPORT
1450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1451               "HELLO validation cleanup background task running...\n");
1452 #endif
1453   now = GNUNET_TIME_absolute_get ();
1454   prev = NULL;
1455   pos = pending_validations;
1456   while (pos != NULL)
1457     {
1458       if (pos->timeout.value < now.value)
1459         {
1460           if (prev == NULL)
1461             pending_validations = pos->next;
1462           else
1463             prev->next = pos->next;
1464           va = pos->addresses;
1465           hello = GNUNET_HELLO_create (&pos->publicKey,
1466                                        &list_validated_addresses, &va);
1467           GNUNET_CRYPTO_hash (&pos->publicKey,
1468                               sizeof (struct
1469                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1470                               &pid.hashPubKey);
1471 #if DEBUG_TRANSPORT
1472           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1474                       "HELLO", GNUNET_i2s (&pid));
1475 #endif
1476           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1477           n = find_neighbour (&pid);
1478           if (NULL != n)
1479             try_transmission_to_peer (n);           
1480           GNUNET_free (hello);
1481           while (NULL != (va = pos->addresses))
1482             {
1483               pos->addresses = va->next;
1484               GNUNET_free (va->transport_name);
1485               GNUNET_free (va);
1486             }
1487           GNUNET_free (pos);
1488           if (prev == NULL)
1489             pos = pending_validations;
1490           else
1491             pos = prev->next;
1492           continue;
1493         }
1494       prev = pos;
1495       pos = pos->next;
1496     }
1497
1498   /* finally, reschedule cleanup if needed; list is
1499      ordered by timeout, so we need the last element... */
1500   pos = pending_validations;
1501   while ((pos != NULL) && (pos->next != NULL))
1502     pos = pos->next;
1503   if (NULL != pos)
1504     GNUNET_SCHEDULER_add_delayed (sched,
1505                                   GNUNET_NO,
1506                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1507                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1508                                   GNUNET_TIME_absolute_get_remaining
1509                                   (pos->timeout), &cleanup_validation, NULL);
1510 }
1511
1512
1513
1514
1515 /**
1516  * Function that will be called if we receive a validation
1517  * of an address challenge that we transmitted to another
1518  * peer.  Note that the validation should only be considered
1519  * acceptable if the challenge matches AND if the sender
1520  * address is at least a plausible address for this peer
1521  * (otherwise we may be seeing a MiM attack).
1522  *
1523  * @param cls closure
1524  * @param name name of the transport that generated the address
1525  * @param peer who responded to our challenge
1526  * @param challenge the challenge number we presumably used
1527  * @param sender_addr string describing our sender address (as observed
1528  *         by the other peer in human-readable format)
1529  */
1530 static void
1531 plugin_env_notify_validation (void *cls,
1532                               const char *name,
1533                               const struct GNUNET_PeerIdentity *peer,
1534                               uint32_t challenge,
1535                               const char *sender_addr)
1536 {
1537   int all_done;
1538   int matched;
1539   struct ValidationList *pos;
1540   struct ValidationAddress *va;
1541   struct GNUNET_PeerIdentity id;
1542
1543   pos = pending_validations;
1544   while (pos != NULL)
1545     {
1546       GNUNET_CRYPTO_hash (&pos->publicKey,
1547                           sizeof (struct
1548                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1549                           &id.hashPubKey);
1550       if (0 ==
1551           memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1552         break;
1553       pos = pos->next;
1554     }
1555   if (pos == NULL)
1556     {
1557       /* TODO: call statistics (unmatched PONG) */
1558       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1559                   _
1560                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1561                   GNUNET_i2s(peer));
1562       return;
1563     }
1564   all_done = GNUNET_YES;
1565   matched = GNUNET_NO;
1566   va = pos->addresses;
1567   while (va != NULL)
1568     {
1569       if (va->challenge == challenge)
1570         {
1571 #if DEBUG_TRANSPORT
1572           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1573                       "Confirmed validity of peer address.\n");
1574 #endif
1575           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1576                       _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1577                       sender_addr, 
1578                       name);
1579           va->ok = GNUNET_YES;
1580           va->expiration =
1581             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1582           matched = GNUNET_YES;
1583         }        
1584       if (va->ok != GNUNET_YES)
1585         all_done = GNUNET_NO;
1586       va = va->next;
1587     }
1588   if (GNUNET_NO == matched)
1589     {
1590       /* TODO: call statistics (unmatched PONG) */
1591       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1592                   _
1593                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1594                   "PONG", "PING");
1595     }
1596   if (GNUNET_YES == all_done)
1597     {
1598       pos->timeout.value = 0;
1599       GNUNET_SCHEDULER_add_delayed (sched,
1600                                     GNUNET_NO,
1601                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1602                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1603                                     GNUNET_TIME_UNIT_ZERO,
1604                                     &cleanup_validation, NULL);
1605     }
1606 }
1607
1608
1609 struct CheckHelloValidatedContext
1610 {
1611   /**
1612    * Plugin for which we are validating.
1613    */
1614   struct TransportPlugin *plugin;
1615
1616   /**
1617    * Hello that we are validating.
1618    */
1619   struct GNUNET_HELLO_Message *hello;
1620
1621   /**
1622    * Validation list being build.
1623    */
1624   struct ValidationList *e;
1625
1626 };
1627
1628
1629 /**
1630  * Append the given address to the list of entries
1631  * that need to be validated.
1632  */
1633 static int
1634 run_validation (void *cls,
1635                 const char *tname,
1636                 struct GNUNET_TIME_Absolute expiration,
1637                 const void *addr, size_t addrlen)
1638 {
1639   struct ValidationList *e = cls;
1640   struct TransportPlugin *tp;
1641   struct ValidationAddress *va;
1642   struct GNUNET_PeerIdentity id;
1643
1644   tp = find_transport (tname);
1645   if (tp == NULL)
1646     {
1647       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1648                   GNUNET_ERROR_TYPE_BULK,
1649                   _
1650                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1651                   tname);
1652       return GNUNET_OK;
1653     }
1654   GNUNET_CRYPTO_hash (&e->publicKey,
1655                       sizeof (struct
1656                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1657                       &id.hashPubKey);
1658   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1660               GNUNET_a2s(addr, addrlen),
1661               tname,
1662               GNUNET_i2s(&id));
1663
1664   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1665   va->next = e->addresses;
1666   e->addresses = va;
1667   va->transport_name = GNUNET_strdup (tname);
1668   va->addr_len = addrlen;
1669   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1670                                             (unsigned int) -1);
1671   memcpy (&va[1], addr, addrlen);
1672   return GNUNET_OK;
1673 }
1674
1675
1676 /**
1677  * Check if addresses in validated hello "h" overlap with
1678  * those in "chvc->hello" and update "chvc->hello" accordingly,
1679  * removing those addresses that have already been validated.
1680  */
1681 static void
1682 check_hello_validated (void *cls,
1683                        const struct GNUNET_PeerIdentity *peer,
1684                        const struct GNUNET_HELLO_Message *h, 
1685                        uint32_t trust)
1686 {
1687   struct CheckHelloValidatedContext *chvc = cls;
1688   struct ValidationAddress *va;
1689   struct TransportPlugin *tp;
1690   int first_call;
1691   struct GNUNET_PeerIdentity apeer;
1692
1693   first_call = GNUNET_NO;
1694   if (chvc->e == NULL)
1695     {
1696       first_call = GNUNET_YES;
1697       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1698       GNUNET_assert (GNUNET_OK ==
1699                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1700                                            &chvc->e->publicKey));
1701       chvc->e->timeout =
1702         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1703       chvc->e->next = pending_validations;
1704       pending_validations = chvc->e;
1705     }
1706   if (h != NULL)
1707     {
1708       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1709                                           h,
1710                                           GNUNET_TIME_absolute_get (),
1711                                           &run_validation, chvc->e);
1712     }
1713   else if (GNUNET_YES == first_call)
1714     {
1715       /* no existing HELLO, all addresses are new */
1716       GNUNET_HELLO_iterate_addresses (chvc->hello,
1717                                       GNUNET_NO, &run_validation, chvc->e);
1718     }
1719   if (h != NULL)
1720     return;                     /* wait for next call */
1721   /* finally, transmit validation attempts */
1722   GNUNET_assert (GNUNET_OK ==
1723                  GNUNET_HELLO_get_id (chvc->hello,
1724                                       &apeer));
1725   va = chvc->e->addresses;
1726   while (va != NULL)
1727     {
1728 #if DEBUG_TRANSPORT
1729       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1730                   "Establishing `%s' connection to validate `%s' of `%4s'\n",
1731                   va->transport_name,
1732                   "HELLO",
1733                   GNUNET_i2s (&apeer));
1734 #endif
1735       tp = find_transport (va->transport_name);
1736       GNUNET_assert (tp != NULL);
1737       if (GNUNET_OK !=
1738           tp->api->validate (tp->api->cls,
1739                              &apeer,
1740                              va->challenge,
1741                              HELLO_VERIFICATION_TIMEOUT,
1742                              &va[1],
1743                              va->addr_len))
1744         va->ok = GNUNET_SYSERR;
1745       va = va->next;
1746     }
1747   if (chvc->e->next == NULL)
1748     GNUNET_SCHEDULER_add_delayed (sched,
1749                                   GNUNET_NO,
1750                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1751                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1752                                   GNUNET_TIME_absolute_get_remaining
1753                                   (chvc->e->timeout), &cleanup_validation,
1754                                   NULL);
1755   GNUNET_free (chvc);
1756 }
1757
1758
1759 /**
1760  * Process HELLO-message.
1761  *
1762  * @param plugin transport involved, may be NULL
1763  * @param message the actual message
1764  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1765  */
1766 static int
1767 process_hello (struct TransportPlugin *plugin,
1768                const struct GNUNET_MessageHeader *message)
1769 {
1770   struct ValidationList *e;
1771   uint16_t hsize;
1772   struct GNUNET_PeerIdentity target;
1773   const struct GNUNET_HELLO_Message *hello;
1774   struct CheckHelloValidatedContext *chvc;
1775   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1776
1777   hsize = ntohs (message->size);
1778   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1779       (hsize < sizeof (struct GNUNET_MessageHeader)))
1780     {
1781       GNUNET_break (0);
1782       return GNUNET_SYSERR;
1783     }
1784   /* first, check if load is too high */
1785   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1786     {
1787       /* TODO: call to stats? */
1788       return GNUNET_OK;
1789     }
1790   hello = (const struct GNUNET_HELLO_Message *) message;
1791   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1792     {
1793       GNUNET_break_op (0);
1794       return GNUNET_SYSERR;
1795     }
1796   GNUNET_CRYPTO_hash (&publicKey,
1797                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1798                       &target.hashPubKey);
1799 #if DEBUG_TRANSPORT
1800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1801               "Processing `%s' message for `%4s'\n",
1802               "HELLO", GNUNET_i2s (&target));
1803 #endif
1804   /* check if a HELLO for this peer is already on the validation list */
1805   e = pending_validations;
1806   while (e != NULL)
1807     {
1808       if (0 == memcmp (&e->publicKey,
1809                        &publicKey,
1810                        sizeof (struct
1811                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1812         {
1813           /* TODO: call to stats? */
1814 #if DEBUG_TRANSPORT
1815           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1816                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1817                       "HELLO", GNUNET_i2s (&target));
1818 #endif    
1819           return GNUNET_OK;
1820         }
1821       e = e->next;
1822     }
1823   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1824   chvc->plugin = plugin;
1825   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1826   memcpy (chvc->hello, hello, hsize);
1827   /* finally, check if HELLO was previously validated
1828      (continuation will then schedule actual validation) */
1829   GNUNET_PEERINFO_for_all (cfg,
1830                            sched,
1831                            &target,
1832                            0,
1833                            HELLO_VERIFICATION_TIMEOUT,
1834                            &check_hello_validated, chvc);
1835   return GNUNET_OK;
1836 }
1837
1838
1839 /**
1840  * The peer specified by the given neighbour has timed-out.  Update
1841  * our state and do the necessary notifications.  Also notifies
1842  * our clients that the neighbour is now officially gone.
1843  *
1844  * @param n the neighbour list entry for the peer
1845  */
1846 static void
1847 disconnect_neighbour (struct NeighbourList *n)
1848 {
1849   struct ReadyList *rpos;
1850   struct NeighbourList *npos;
1851   struct NeighbourList *nprev;
1852   struct MessageQueue *mq;
1853
1854 #if DEBUG_TRANSPORT
1855   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1856               "Disconnecting from `%4s'\n",
1857               GNUNET_i2s(&n->id));
1858 #endif
1859   /* remove n from neighbours list */
1860   nprev = NULL;
1861   npos = neighbours;
1862   while ((npos != NULL) && (npos != n))
1863     {
1864       nprev = npos;
1865       npos = npos->next;
1866     }
1867   GNUNET_assert (npos != NULL);
1868   if (nprev == NULL)
1869     neighbours = n->next;
1870   else
1871     nprev->next = n->next;
1872
1873   /* notify all clients about disconnect */
1874   notify_clients_disconnect (&n->id);
1875
1876   /* clean up all plugins, cancel connections and pending transmissions */
1877   while (NULL != (rpos = n->plugins))
1878     {
1879       n->plugins = rpos->next;
1880       GNUNET_assert (rpos->neighbour == n);
1881       rpos->plugin->api->cancel (rpos->plugin->api->cls,
1882                                  rpos->plugin_handle,
1883                                  rpos,
1884                                  &n->id);
1885       GNUNET_free (rpos);
1886     }
1887
1888   /* free all messages on the queue */
1889   while (NULL != (mq = n->messages))
1890     {
1891       n->messages = mq->next;
1892       GNUNET_assert (mq->neighbour == n);
1893       GNUNET_free (mq);
1894     }
1895
1896   /* finally, free n itself */
1897   GNUNET_free (n);
1898 }
1899
1900
1901 /**
1902  * Add an entry for each of our transport plugins
1903  * (that are able to send) to the list of plugins
1904  * for this neighbour.
1905  *
1906  * @param neighbour to initialize
1907  */
1908 static void
1909 add_plugins (struct NeighbourList *neighbour)
1910 {
1911   struct TransportPlugin *tp;
1912   struct ReadyList *rl;
1913
1914   neighbour->retry_plugins_time
1915     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1916   tp = plugins;
1917   while (tp != NULL)
1918     {
1919       if (tp->api->send != NULL)
1920         {
1921           rl = GNUNET_malloc (sizeof (struct ReadyList));
1922           rl->next = neighbour->plugins;
1923           neighbour->plugins = rl;
1924           rl->plugin = tp;
1925           rl->neighbour = neighbour;
1926           rl->transmit_ready = GNUNET_YES;
1927         }
1928       tp = tp->next;
1929     }
1930 }
1931
1932
1933 static void
1934 neighbour_timeout_task (void *cls,
1935                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1936 {
1937   struct NeighbourList *n = cls;
1938
1939 #if DEBUG_TRANSPORT
1940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1941               "Neighbour `%4s' has timed out!\n",
1942               GNUNET_i2s(&n->id));
1943 #endif
1944   n->timeout_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1945   disconnect_neighbour (n);
1946 }
1947
1948
1949 /**
1950  * Create a fresh entry in our neighbour list for the given peer.
1951  * Will try to transmit our current HELLO to the new neighbour.  Also
1952  * notifies our clients about the new "connection".
1953  *
1954  * @param peer the peer for which we create the entry
1955  * @return the new neighbour list entry
1956  */
1957 static struct NeighbourList *
1958 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
1959 {
1960   struct NeighbourList *n;
1961
1962 #if DEBUG_TRANSPORT
1963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1964               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
1965               GNUNET_i2s (peer));
1966 #endif
1967   GNUNET_assert (our_hello != NULL);
1968   n = GNUNET_malloc (sizeof (struct NeighbourList));
1969   n->next = neighbours;
1970   neighbours = n;
1971   n->id = *peer;
1972   n->last_quota_update = GNUNET_TIME_absolute_get ();
1973   n->peer_timeout =
1974     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1975   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
1976   add_plugins (n);
1977   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
1978                                                   GNUNET_NO,
1979                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1980                                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1981                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1982                                                   &neighbour_timeout_task, n);
1983   transmit_to_peer (NULL, 0,
1984                     (const struct GNUNET_MessageHeader *) our_hello,
1985                     GNUNET_YES, n);
1986   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
1987   return n;
1988 }
1989
1990
1991 /**
1992  * Function called by the plugin for each received message.
1993  * Update data volumes, possibly notify plugins about
1994  * reducing the rate at which they read from the socket
1995  * and generally forward to our receive callback.
1996  *
1997  * @param plugin_context value to pass to this plugin
1998  *        to respond to the given peer (use is optional,
1999  *        but may speed up processing)
2000  * @param service_context value passed to the transport-service
2001  *        to identify the neighbour; will be NULL on the first
2002  *        call for a given peer
2003  * @param latency estimated latency for communicating with the
2004  *             given peer
2005  * @param peer (claimed) identity of the other peer
2006  * @param message the message, NULL if peer was disconnected
2007  * @return the new service_context that the plugin should use
2008  *         for future receive calls for messages from this
2009  *         particular peer
2010  */
2011 static struct ReadyList *
2012 plugin_env_receive (void *cls,
2013                     void *plugin_context,
2014                     struct ReadyList *service_context,
2015                     struct GNUNET_TIME_Relative latency,
2016                     const struct GNUNET_PeerIdentity *peer,
2017                     const struct GNUNET_MessageHeader *message)
2018 {
2019   const struct GNUNET_MessageHeader ack = {
2020     htons (sizeof (struct GNUNET_MessageHeader)),
2021     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
2022   };
2023   struct TransportPlugin *plugin = cls;
2024   struct TransportClient *cpos;
2025   struct InboundMessage *im;
2026   uint16_t msize;
2027   struct NeighbourList *n;
2028
2029   if (service_context != NULL)
2030     {
2031       n = service_context->neighbour;
2032       GNUNET_assert (n != NULL);
2033     }
2034   else
2035     {
2036       n = find_neighbour (peer);
2037       if (n == NULL)
2038         {
2039           if (message == NULL)
2040             return NULL;        /* disconnect of peer already marked down */
2041           n = setup_new_neighbour (peer);
2042         }
2043       service_context = n->plugins;
2044       while ((service_context != NULL) && (plugin != service_context->plugin))
2045         service_context = service_context->next;
2046       GNUNET_assert ((plugin->api->send == NULL) ||
2047                      (service_context != NULL));
2048     }
2049   if (message == NULL)
2050     {
2051 #if DEBUG_TRANSPORT
2052       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2053                   "Receive failed from `%4s', triggering disconnect\n",
2054                   GNUNET_i2s(&n->id));
2055 #endif
2056       /* TODO: call stats */
2057       disconnect_neighbour (n);
2058       if ((service_context != NULL) &&
2059           (service_context->plugin_handle == plugin_context))
2060         {
2061           service_context->connected = GNUNET_NO;
2062           service_context->plugin_handle = NULL;
2063         }
2064       return NULL;
2065     }
2066 #if DEBUG_TRANSPORT
2067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2068               "Processing message of type `%u' received by plugin...\n",
2069               ntohs (message->type));
2070 #endif
2071   if (service_context != NULL)
2072     {
2073       if (service_context->connected == GNUNET_NO)
2074         {
2075           service_context->connected = GNUNET_YES;
2076           service_context->transmit_ready = GNUNET_YES;
2077           service_context->connect_attempts++;
2078         }
2079       service_context->timeout
2080         = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2081       service_context->plugin_handle = plugin_context;
2082       service_context->latency = latency;
2083     }
2084   /* update traffic received amount ... */
2085   msize = ntohs (message->size);
2086   n->last_received += msize;
2087   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2088   n->peer_timeout =
2089     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2090   n->timeout_task =
2091     GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO,
2092                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
2093                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2094                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2095                                   &neighbour_timeout_task, n);
2096   update_quota (n);
2097   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2098     {
2099       /* dropping message due to frequent inbound volume violations! */
2100       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2101                   GNUNET_ERROR_TYPE_BULK,
2102                   _
2103                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2104       /* TODO: call stats */
2105       GNUNET_assert (NULL != service_context->neighbour);
2106       return service_context;
2107     }
2108   switch (ntohs (message->type))
2109     {
2110     case GNUNET_MESSAGE_TYPE_HELLO:
2111 #if DEBUG_TRANSPORT
2112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2113                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2114                   GNUNET_i2s(peer));
2115 #endif
2116       process_hello (plugin, message);
2117 #if DEBUG_TRANSPORT
2118       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2119                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2120                   GNUNET_i2s(peer));
2121 #endif
2122       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2123       break;
2124     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2125       n->saw_ack = GNUNET_YES;
2126       /* intentional fall-through! */
2127     default:
2128 #if DEBUG_TRANSPORT
2129       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2130                   "Received message of type %u from `%4s', sending to all clients.\n",
2131                   ntohs (message->type),
2132                   GNUNET_i2s(peer));
2133 #endif
2134       /* transmit message to all clients */
2135       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2136       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2137       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2138       im->latency = GNUNET_TIME_relative_hton (latency);
2139       im->peer = *peer;
2140       memcpy (&im[1], message, msize);
2141
2142       cpos = clients;
2143       while (cpos != NULL)
2144         {
2145           transmit_to_client (cpos, &im->header, GNUNET_YES);
2146           cpos = cpos->next;
2147         }
2148       GNUNET_free (im);
2149     }
2150   GNUNET_assert (NULL != service_context->neighbour);
2151   return service_context;
2152 }
2153
2154
2155 /**
2156  * Handle START-message.  This is the first message sent to us
2157  * by any client which causes us to add it to our list.
2158  *
2159  * @param cls closure (always NULL)
2160  * @param client identification of the client
2161  * @param message the actual message
2162  */
2163 static void
2164 handle_start (void *cls,
2165               struct GNUNET_SERVER_Client *client,
2166               const struct GNUNET_MessageHeader *message)
2167 {
2168   struct TransportClient *c;
2169   struct ConnectInfoMessage cim;
2170   struct NeighbourList *n;
2171   struct InboundMessage *im;
2172   struct GNUNET_MessageHeader *ack;
2173
2174 #if DEBUG_TRANSPORT
2175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2176               "Received `%s' request from client\n", "START");
2177 #endif
2178   c = clients;
2179   while (c != NULL)
2180     {
2181       if (c->client == client)
2182         {
2183           /* client already on our list! */
2184           GNUNET_break (0);
2185           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2186           return;
2187         }
2188       c = c->next;
2189     }
2190   c = GNUNET_malloc (sizeof (struct TransportClient));
2191   c->next = clients;
2192   clients = c;
2193   c->client = client;
2194   if (our_hello != NULL)
2195     {
2196 #if DEBUG_TRANSPORT
2197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2198                   "Sending our own `%s' to new client\n",
2199                   "HELLO");
2200 #endif
2201       transmit_to_client (c,
2202                           (const struct GNUNET_MessageHeader *) our_hello,
2203                           GNUNET_NO);
2204       /* tell new client about all existing connections */
2205       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2206       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2207       cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2208       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2209       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2210                           sizeof (struct GNUNET_MessageHeader));
2211       im->header.size = htons (sizeof (struct InboundMessage) +
2212                                sizeof (struct GNUNET_MessageHeader));
2213       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2214       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2215       ack = (struct GNUNET_MessageHeader *) &im[1];
2216       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2217       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2218       for (n = neighbours; n != NULL; n = n->next)
2219         {
2220           cim.id = n->id;
2221           transmit_to_client (c, &cim.header, GNUNET_NO);
2222           if (n->saw_ack)
2223             {
2224               im->peer = n->id;
2225               transmit_to_client (c, &im->header, GNUNET_NO);
2226             }
2227         }
2228       GNUNET_free (im);
2229     }
2230   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2231 }
2232
2233
2234 /**
2235  * Handle HELLO-message.
2236  *
2237  * @param cls closure (always NULL)
2238  * @param client identification of the client
2239  * @param message the actual message
2240  */
2241 static void
2242 handle_hello (void *cls,
2243               struct GNUNET_SERVER_Client *client,
2244               const struct GNUNET_MessageHeader *message)
2245 {
2246   int ret;
2247
2248 #if DEBUG_TRANSPORT
2249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250               "Received `%s' request from client\n", "HELLO");
2251 #endif
2252   ret = process_hello (NULL, message);
2253   GNUNET_SERVER_receive_done (client, ret);
2254 }
2255
2256
2257 /**
2258  * Handle SEND-message.
2259  *
2260  * @param cls closure (always NULL)
2261  * @param client identification of the client
2262  * @param message the actual message
2263  */
2264 static void
2265 handle_send (void *cls,
2266              struct GNUNET_SERVER_Client *client,
2267              const struct GNUNET_MessageHeader *message)
2268 {
2269   struct TransportClient *tc;
2270   struct NeighbourList *n;
2271   const struct OutboundMessage *obm;
2272   const struct GNUNET_MessageHeader *obmm;
2273   uint16_t size;
2274   uint16_t msize;
2275
2276   size = ntohs (message->size);
2277   if (size <
2278       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2279     {
2280       GNUNET_break (0);
2281       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2282       return;
2283     }
2284   obm = (const struct OutboundMessage *) message;
2285 #if DEBUG_TRANSPORT
2286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287               "Received `%s' request from client with target `%4s'\n",
2288               "SEND", GNUNET_i2s (&obm->peer));
2289 #endif
2290   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2291   msize = ntohs (obmm->size);
2292   if (size != msize + sizeof (struct OutboundMessage))
2293     {
2294       GNUNET_break (0);
2295       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2296       return;
2297     }
2298   n = find_neighbour (&obm->peer);
2299   if (n == NULL)
2300     n = setup_new_neighbour (&obm->peer);
2301   tc = clients;
2302   while ((tc != NULL) && (tc->client != client))
2303     tc = tc->next;
2304
2305 #if DEBUG_TRANSPORT
2306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2307               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2308               ntohs (obmm->size),
2309               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2310 #endif
2311   transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n);
2312   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2313 }
2314
2315
2316 /**
2317  * Handle SET_QUOTA-message.
2318  *
2319  * @param cls closure (always NULL)
2320  * @param client identification of the client
2321  * @param message the actual message
2322  */
2323 static void
2324 handle_set_quota (void *cls,
2325                   struct GNUNET_SERVER_Client *client,
2326                   const struct GNUNET_MessageHeader *message)
2327 {
2328   const struct QuotaSetMessage *qsm =
2329     (const struct QuotaSetMessage *) message;
2330   struct NeighbourList *n;
2331   struct TransportPlugin *p;
2332   struct ReadyList *rl;
2333
2334 #if DEBUG_TRANSPORT
2335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2336               "Received `%s' request from client for peer `%4s'\n",
2337               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2338 #endif
2339   n = find_neighbour (&qsm->peer);
2340   if (n == NULL)
2341     {
2342       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2343       return;
2344     }
2345   update_quota (n);
2346   if (n->quota_in < ntohl (qsm->quota_in))
2347     n->last_quota_update = GNUNET_TIME_absolute_get ();
2348   n->quota_in = ntohl (qsm->quota_in);
2349   rl = n->plugins;
2350   while (rl != NULL)
2351     {
2352       p = rl->plugin;
2353       p->api->set_receive_quota (p->api->cls,
2354                                  &qsm->peer, ntohl (qsm->quota_in));
2355       rl = rl->next;
2356     }
2357   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2358 }
2359
2360
2361 /**
2362  * Handle TRY_CONNECT-message.
2363  *
2364  * @param cls closure (always NULL)
2365  * @param client identification of the client
2366  * @param message the actual message
2367  */
2368 static void
2369 handle_try_connect (void *cls,
2370                     struct GNUNET_SERVER_Client *client,
2371                     const struct GNUNET_MessageHeader *message)
2372 {
2373   const struct TryConnectMessage *tcm;
2374
2375   tcm = (const struct TryConnectMessage *) message;
2376 #if DEBUG_TRANSPORT
2377   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2378               "Received `%s' request from client %p asking to connect to `%4s'\n",
2379               "TRY_CONNECT",
2380               client,
2381               GNUNET_i2s (&tcm->peer));
2382 #endif
2383   if (NULL == find_neighbour (&tcm->peer))
2384     setup_new_neighbour (&tcm->peer);
2385 #if DEBUG_TRANSPORT
2386   else
2387     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2388                 "Client asked to connect to `%4s', but connection already exists\n",
2389                 "TRY_CONNECT", 
2390                 GNUNET_i2s (&tcm->peer));
2391 #endif    
2392   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2393 }
2394
2395
2396 /**
2397  * List of handlers for the messages understood by this
2398  * service.
2399  */
2400 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2401   {&handle_start, NULL,
2402    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2403   {&handle_hello, NULL,
2404    GNUNET_MESSAGE_TYPE_HELLO, 0},
2405   {&handle_send, NULL,
2406    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2407   {&handle_set_quota, NULL,
2408    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2409   {&handle_try_connect, NULL,
2410    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2411    sizeof (struct TryConnectMessage)},
2412   {NULL, NULL, 0, 0}
2413 };
2414
2415
2416 /**
2417  * Setup the environment for this plugin.
2418  */
2419 static void
2420 create_environment (struct TransportPlugin *plug)
2421 {
2422   plug->env.cfg = cfg;
2423   plug->env.sched = sched;
2424   plug->env.my_public_key = &my_public_key;
2425   plug->env.my_private_key = my_private_key;
2426   plug->env.my_identity = &my_identity;
2427   plug->env.cls = plug;
2428   plug->env.receive = &plugin_env_receive;
2429   plug->env.lookup = &plugin_env_lookup_address;
2430   plug->env.notify_address = &plugin_env_notify_address;
2431   plug->env.notify_validation = &plugin_env_notify_validation;
2432   plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2433   plug->env.max_connections = max_connect_per_transport;
2434 }
2435
2436
2437 /**
2438  * Start the specified transport (load the plugin).
2439  */
2440 static void
2441 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2442 {
2443   struct TransportPlugin *plug;
2444   char *libname;
2445
2446   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2447               _("Loading `%s' transport plugin\n"), name);
2448   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2449   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2450   create_environment (plug);
2451   plug->short_name = GNUNET_strdup (name);
2452   plug->lib_name = libname;
2453   plug->next = plugins;
2454   plugins = plug;
2455   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2456   if (plug->api == NULL)
2457     {
2458       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2459                   _("Failed to load transport plugin for `%s'\n"), name);
2460       GNUNET_free (plug->short_name);
2461       plugins = plug->next;
2462       GNUNET_free (libname);
2463       GNUNET_free (plug);
2464     }
2465 }
2466
2467
2468 /**
2469  * Called whenever a client is disconnected.  Frees our
2470  * resources associated with that client.
2471  *
2472  * @param cls closure
2473  * @param client identification of the client
2474  */
2475 static void
2476 client_disconnect_notification (void *cls,
2477                                 struct GNUNET_SERVER_Client *client)
2478 {
2479   struct TransportClient *pos;
2480   struct TransportClient *prev;
2481   struct ClientMessageQueueEntry *mqe;
2482
2483 #if DEBUG_TRANSPORT
2484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2485               "Client disconnected, cleaning up.\n");
2486 #endif
2487   prev = NULL;
2488   pos = clients;
2489   while ((pos != NULL) && (pos->client != client))
2490     {
2491       prev = pos;
2492       pos = pos->next;
2493     }
2494   if (pos == NULL)
2495     return;
2496   while (NULL != (mqe = pos->message_queue_head))
2497     {
2498       pos->message_queue_head = mqe->next;
2499       GNUNET_free (mqe);
2500     }
2501   pos->message_queue_head = NULL;
2502   if (prev == NULL)
2503     clients = pos->next;
2504   else
2505     prev->next = pos->next;
2506   if (GNUNET_YES == pos->tcs_pending)
2507     {
2508       pos->client = NULL;
2509       return;
2510     }
2511   GNUNET_free (pos);
2512 }
2513
2514
2515 /**
2516  * Initiate transport service.
2517  *
2518  * @param cls closure
2519  * @param s scheduler to use
2520  * @param serv the initialized server
2521  * @param c configuration to use
2522  */
2523 static void
2524 run (void *cls,
2525      struct GNUNET_SCHEDULER_Handle *s,
2526      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
2527 {
2528   char *plugs;
2529   char *pos;
2530   int no_transports;
2531   unsigned long long tneigh;
2532   char *keyfile;
2533
2534   sched = s;
2535   cfg = c;
2536   /* parse configuration */
2537   if ((GNUNET_OK !=
2538        GNUNET_CONFIGURATION_get_value_number (c,
2539                                               "TRANSPORT",
2540                                               "NEIGHBOUR_LIMIT",
2541                                               &tneigh)) ||
2542       (GNUNET_OK !=
2543        GNUNET_CONFIGURATION_get_value_filename (c,
2544                                                 "GNUNETD",
2545                                                 "HOSTKEY", &keyfile)))
2546     {
2547       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2548                   _
2549                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2550       GNUNET_SCHEDULER_shutdown (s);
2551       return;
2552     }
2553   max_connect_per_transport = (uint32_t) tneigh;
2554   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2555   GNUNET_free (keyfile);
2556   if (my_private_key == NULL)
2557     {
2558       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2559                   _
2560                   ("Transport service could not access hostkey.  Exiting.\n"));
2561       GNUNET_SCHEDULER_shutdown (s);
2562       return;
2563     }
2564   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2565   GNUNET_CRYPTO_hash (&my_public_key,
2566                       sizeof (my_public_key), &my_identity.hashPubKey);
2567   /* setup notification */
2568   server = serv;
2569   GNUNET_SERVER_disconnect_notify (server,
2570                                    &client_disconnect_notification, NULL);
2571   /* load plugins... */
2572   no_transports = 1;
2573   if (GNUNET_OK ==
2574       GNUNET_CONFIGURATION_get_value_string (c,
2575                                              "TRANSPORT", "PLUGINS", &plugs))
2576     {
2577       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2578                   _("Starting transport plugins `%s'\n"), plugs);
2579       pos = strtok (plugs, " ");
2580       while (pos != NULL)
2581         {
2582           start_transport (server, pos);
2583           no_transports = 0;
2584           pos = strtok (NULL, " ");
2585         }
2586       GNUNET_free (plugs);
2587     }
2588   if (no_transports)
2589     refresh_hello ();
2590 #if DEBUG_TRANSPORT
2591   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2592               _("Transport service ready.\n"));
2593 #endif
2594   /* process client requests */
2595   GNUNET_SERVER_add_handlers (server, handlers);
2596 }
2597
2598
2599 /**
2600  * Function called when the service shuts
2601  * down.  Unloads our plugins.
2602  *
2603  * @param cls closure
2604  * @param cfg configuration to use
2605  */
2606 static void
2607 unload_plugins (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
2608 {
2609   struct TransportPlugin *plug;
2610   struct AddressList *al;
2611
2612 #if DEBUG_TRANSPORT
2613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2614               "Transport service is unloading plugins...\n");
2615 #endif
2616   while (NULL != (plug = plugins))
2617     {
2618       plugins = plug->next;
2619       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2620       GNUNET_free (plug->lib_name);
2621       GNUNET_free (plug->short_name);
2622       while (NULL != (al = plug->addresses))
2623         {
2624           plug->addresses = al->next;
2625           GNUNET_free (al);
2626         }
2627       GNUNET_free (plug);
2628     }
2629   if (my_private_key != NULL)
2630     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2631 }
2632
2633
2634 /**
2635  * The main function for the transport service.
2636  *
2637  * @param argc number of arguments from the command line
2638  * @param argv command line arguments
2639  * @return 0 ok, 1 on error
2640  */
2641 int
2642 main (int argc, char *const *argv)
2643 {
2644   return (GNUNET_OK ==
2645           GNUNET_SERVICE_run (argc,
2646                               argv,
2647                               "transport",
2648                               &run, NULL, &unload_plugins, NULL)) ? 0 : 1;
2649 }
2650
2651 /* end of gnunet-service-transport.c */