c106457d933c0425210127c26edfc65befcd625b
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  * Must be rather small, otherwise a partially successful HELLO
71  * validation (some addresses working) might not be available
72  * before a client's request for a connection fails for good.
73  * Besides, if a single request to an address takes a long time,
74  * then the peer is unlikely worthwhile anyway.
75  */
76 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
77
78 /**
79  * How often do we re-add (cheaper) plugins to our list of plugins
80  * to try for a given connected peer?
81  */
82 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
83
84 /**
85  * After how long do we expire an address in a HELLO
86  * that we just validated?  This value is also used
87  * for our own addresses when we create a HELLO.
88  */
89 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
90
91
92 /**
93  * Entry in linked list of network addresses.
94  */
95 struct AddressList
96 {
97   /**
98    * This is a linked list.
99    */
100   struct AddressList *next;
101
102   /**
103    * The address, actually a pointer to the end
104    * of this struct.  Do not free!
105    */
106   void *addr;
107
108   /**
109    * How long until we auto-expire this address (unless it is
110    * re-confirmed by the transport)?
111    */
112   struct GNUNET_TIME_Absolute expires;
113
114   /**
115    * Length of addr.
116    */
117   size_t addrlen;
118
119 };
120
121
122 /**
123  * Entry in linked list of all of our plugins.
124  */
125 struct TransportPlugin
126 {
127
128   /**
129    * This is a linked list.
130    */
131   struct TransportPlugin *next;
132
133   /**
134    * API of the transport as returned by the plugin's
135    * initialization function.
136    */
137   struct GNUNET_TRANSPORT_PluginFunctions *api;
138
139   /**
140    * Short name for the plugin (i.e. "tcp").
141    */
142   char *short_name;
143
144   /**
145    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
146    */
147   char *lib_name;
148
149   /**
150    * List of our known addresses for this transport.
151    */
152   struct AddressList *addresses;
153
154   /**
155    * Environment this transport service is using
156    * for this plugin.
157    */
158   struct GNUNET_TRANSPORT_PluginEnvironment env;
159
160   /**
161    * ID of task that is used to clean up expired addresses.
162    */
163   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
164
165
166   /**
167    * Set to GNUNET_YES if we need to scrap the existing
168    * list of "addresses" and start fresh when we receive
169    * the next address update from a transport.  Set to
170    * GNUNET_NO if we should just add the new address
171    * to the list and wait for the commit call.
172    */
173   int rebuild;
174 };
175
176 struct NeighbourList;
177
178 /**
179  * For each neighbour we keep a list of messages
180  * that we still want to transmit to the neighbour.
181  */
182 struct MessageQueue
183 {
184
185   /**
186    * This is a linked list.
187    */
188   struct MessageQueue *next;
189
190   /**
191    * The message we want to transmit.
192    */
193   struct GNUNET_MessageHeader *message;
194
195   /**
196    * Client responsible for queueing the message;
197    * used to check that a client has not two messages
198    * pending for the same target.  Can be NULL.
199    */
200   struct TransportClient *client;
201
202   /**
203    * Neighbour this entry belongs to.
204    */
205   struct NeighbourList *neighbour;
206
207   /**
208    * Plugin that we used for the transmission.
209    * NULL until we scheduled a transmission.
210    */
211   struct TransportPlugin *plugin;
212
213   /**
214    * Internal message of the transport system that should not be
215    * included in the usual SEND-SEND_OK transmission confirmation
216    * traffic management scheme.  Typically, "internal_msg" will
217    * be set whenever "client" is NULL (but it is not strictly
218    * required).
219    */
220   int internal_msg;
221
222   /**
223    * How important is the message?
224    */
225   unsigned int priority;
226
227 };
228
229
230 /**
231  * For a given Neighbour, which plugins are available
232  * to talk to this peer and what are their costs?
233  */
234 struct ReadyList
235 {
236
237   /**
238    * This is a linked list.
239    */
240   struct ReadyList *next;
241
242   /**
243    * Which of our transport plugins does this entry
244    * represent?
245    */
246   struct TransportPlugin *plugin;
247
248   /**
249    * Neighbour this entry belongs to.
250    */
251   struct NeighbourList *neighbour;
252
253   /**
254    * What was the last latency observed for this plugin
255    * and peer?  Invalid if connected is GNUNET_NO.
256    */
257   struct GNUNET_TIME_Relative latency;
258
259   /**
260    * If we did not successfully transmit a message to the given peer
261    * via this connection during the specified time, we should consider
262    * the connection to be dead.  This is used in the case that a TCP
263    * transport simply stalls writing to the stream but does not
264    * formerly get a signal that the other peer died.
265    */
266   struct GNUNET_TIME_Absolute timeout;
267
268   /**
269    * Is this plugin currently connected?  The first time
270    * we transmit or send data to a peer via a particular
271    * plugin, we set this to GNUNET_YES.  If we later get
272    * an error (disconnect notification or transmission
273    * failure), we set it back to GNUNET_NO.  Each time the
274    * value is set to GNUNET_YES, we increment the
275    * "connect_attempts" counter.  If that one reaches a
276    * particular threshold, we consider the plugin to not
277    * be working properly at this time for the given peer
278    * and remove it from the eligible list.
279    */
280   int connected;
281
282   /**
283    * How often have we tried to connect using this plugin?
284    */
285   unsigned int connect_attempts;
286
287   /**
288    * Is this plugin ready to transmit to the specific target?
289    * GNUNET_NO if not.  Initially, all plugins are marked ready.  If a
290    * transmission is in progress, "transmit_ready" is set to
291    * GNUNET_NO.
292    */
293   int transmit_ready;
294
295 };
296
297
298 /**
299  * Entry in linked list of all of our current neighbours.
300  */
301 struct NeighbourList
302 {
303
304   /**
305    * This is a linked list.
306    */
307   struct NeighbourList *next;
308
309   /**
310    * Which of our transports is connected to this peer
311    * and what is their status?
312    */
313   struct ReadyList *plugins;
314
315   /**
316    * List of messages we would like to send to this peer;
317    * must contain at most one message per client.
318    */
319   struct MessageQueue *messages;
320
321   /**
322    * Identity of this neighbour.
323    */
324   struct GNUNET_PeerIdentity id;
325
326   /**
327    * ID of task scheduled to run when this peer is about to
328    * time out (will free resources associated with the peer).
329    */
330   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
331
332   /**
333    * How long until we should consider this peer dead
334    * (if we don't receive another message in the
335    * meantime)?
336    */
337   struct GNUNET_TIME_Absolute peer_timeout;
338
339   /**
340    * At what time did we reset last_received last?
341    */
342   struct GNUNET_TIME_Absolute last_quota_update;
343
344   /**
345    * At what time should we try to again add plugins to
346    * our ready list?
347    */
348   struct GNUNET_TIME_Absolute retry_plugins_time;
349
350   /**
351    * How many bytes have we received since the "last_quota_update"
352    * timestamp?
353    */
354   uint64_t last_received;
355
356   /**
357    * Global quota for inbound traffic for the neighbour in bytes/ms.
358    */
359   uint32_t quota_in;
360
361   /**
362    * How often has the other peer (recently) violated the
363    * inbound traffic limit?  Incremented by 10 per violation,
364    * decremented by 1 per non-violation (for each
365    * time interval).
366    */
367   unsigned int quota_violation_count;
368
369   /**
370    * Have we seen an ACK from this neighbour in the past?
371    * (used to make up a fake ACK for clients connecting after
372    * the neighbour connected to us).
373    */
374   int saw_ack;
375
376 };
377
378
379 /**
380  * Linked list of messages to be transmitted to
381  * the client.  Each entry is followed by the
382  * actual message.
383  */
384 struct ClientMessageQueueEntry
385 {
386   /**
387    * This is a linked list.
388    */
389   struct ClientMessageQueueEntry *next;
390 };
391
392
393 /**
394  * Client connected to the transport service.
395  */
396 struct TransportClient
397 {
398
399   /**
400    * This is a linked list.
401    */
402   struct TransportClient *next;
403
404   /**
405    * Handle to the client.
406    */
407   struct GNUNET_SERVER_Client *client;
408
409   /**
410    * Linked list of messages yet to be transmitted to
411    * the client.
412    */
413   struct ClientMessageQueueEntry *message_queue_head;
414
415   /**
416    * Tail of linked list of messages yet to be transmitted to the
417    * client.
418    */
419   struct ClientMessageQueueEntry *message_queue_tail;
420
421   /**
422    * Is a call to "transmit_send_continuation" pending?  If so, we
423    * must not free this struct (even if the corresponding client
424    * disconnects) and instead only remove it from the linked list and
425    * set the "client" field to NULL.
426    */
427   int tcs_pending;
428
429   /**
430    * Length of the list of messages pending for this client.
431    */
432   unsigned int message_count;
433
434 };
435
436
437 /**
438  * For each HELLO, we may have to validate multiple addresses;
439  * each address gets its own request entry.
440  */
441 struct ValidationAddress
442 {
443   /**
444    * This is a linked list.
445    */
446   struct ValidationAddress *next;
447
448   /**
449    * Name of the transport.
450    */
451   char *transport_name;
452
453   /**
454    * When should this validated address expire?
455    */
456   struct GNUNET_TIME_Absolute expiration;
457
458   /**
459    * Length of the address we are validating.
460    */
461   size_t addr_len;
462
463   /**
464    * Challenge number we used.
465    */
466   uint32_t challenge;
467
468   /**
469    * Set to GNUNET_YES if the challenge was met,
470    * GNUNET_SYSERR if we know it failed, GNUNET_NO
471    * if we are waiting on a response.
472    */
473   int ok;
474 };
475
476
477 /**
478  * Entry in linked list of all HELLOs awaiting validation.
479  */
480 struct ValidationList
481 {
482
483   /**
484    * This is a linked list.
485    */
486   struct ValidationList *next;
487
488   /**
489    * Linked list with one entry per address from the HELLO
490    * that needs to be validated.
491    */
492   struct ValidationAddress *addresses;
493
494   /**
495    * The public key of the peer.
496    */
497   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
498
499   /**
500    * When does this record time-out? (assuming the
501    * challenge goes unanswered)
502    */
503   struct GNUNET_TIME_Absolute timeout;
504
505 };
506
507
508 /**
509  * HELLOs awaiting validation.
510  */
511 static struct ValidationList *pending_validations;
512
513 /**
514  * Our HELLO message.
515  */
516 static struct GNUNET_HELLO_Message *our_hello;
517
518 /**
519  * "version" of "our_hello".  Used to see if a given
520  * neighbour has already been sent the latest version
521  * of our HELLO message.
522  */
523 static unsigned int our_hello_version;
524
525 /**
526  * Our public key.
527  */
528 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
529
530 /**
531  * Our identity.
532  */
533 static struct GNUNET_PeerIdentity my_identity;
534
535 /**
536  * Our private key.
537  */
538 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
539
540 /**
541  * Our scheduler.
542  */
543 struct GNUNET_SCHEDULER_Handle *sched;
544
545 /**
546  * Our configuration.
547  */
548 const struct GNUNET_CONFIGURATION_Handle *cfg;
549
550 /**
551  * Linked list of all clients to this service.
552  */
553 static struct TransportClient *clients;
554
555 /**
556  * All loaded plugins.
557  */
558 static struct TransportPlugin *plugins;
559
560 /**
561  * Our server.
562  */
563 static struct GNUNET_SERVER_Handle *server;
564
565 /**
566  * All known neighbours and their HELLOs.
567  */
568 static struct NeighbourList *neighbours;
569
570 /**
571  * Number of neighbours we'd like to have.
572  */
573 static uint32_t max_connect_per_transport;
574
575
576 /**
577  * Find an entry in the neighbour list for a particular peer.
578  *
579  * @return NULL if not found.
580  */
581 static struct NeighbourList *
582 find_neighbour (const struct GNUNET_PeerIdentity *key)
583 {
584   struct NeighbourList *head = neighbours;
585   while ((head != NULL) &&
586          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
587     head = head->next;
588   return head;
589 }
590
591
592 /**
593  * Find an entry in the transport list for a particular transport.
594  *
595  * @return NULL if not found.
596  */
597 static struct TransportPlugin *
598 find_transport (const char *short_name)
599 {
600   struct TransportPlugin *head = plugins;
601   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
602     head = head->next;
603   return head;
604 }
605
606
607 /**
608  * Update the quota values for the given neighbour now.
609  */
610 static void
611 update_quota (struct NeighbourList *n)
612 {
613   struct GNUNET_TIME_Relative delta;
614   uint64_t allowed;
615   uint64_t remaining;
616
617   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
618   if (delta.value < MIN_QUOTA_REFRESH_TIME)
619     return;                     /* not enough time passed for doing quota update */
620   allowed = delta.value * n->quota_in;
621   if (n->last_received < allowed)
622     {
623       remaining = allowed - n->last_received;
624       if (n->quota_in > 0)
625         remaining /= n->quota_in;
626       else
627         remaining = 0;
628       if (remaining > MAX_BANDWIDTH_CARRY)
629         remaining = MAX_BANDWIDTH_CARRY;
630       n->last_received = 0;
631       n->last_quota_update = GNUNET_TIME_absolute_get ();
632       n->last_quota_update.value -= remaining;
633       if (n->quota_violation_count > 0)
634         n->quota_violation_count--;
635     }
636   else
637     {
638       n->last_received -= allowed;
639       n->last_quota_update = GNUNET_TIME_absolute_get ();
640       if (n->last_received > allowed)
641         {
642           /* more than twice the allowed rate! */
643           n->quota_violation_count += 10;
644         }
645     }
646 }
647
648
649 /**
650  * Function called to notify a client about the socket
651  * being ready to queue more data.  "buf" will be
652  * NULL and "size" zero if the socket was closed for
653  * writing in the meantime.
654  *
655  * @param cls closure
656  * @param size number of bytes available in buf
657  * @param buf where the callee should write the message
658  * @return number of bytes written to buf
659  */
660 static size_t
661 transmit_to_client_callback (void *cls, size_t size, void *buf)
662 {
663   struct TransportClient *client = cls;
664   struct ClientMessageQueueEntry *q;
665   uint16_t msize;
666   size_t tsize;
667   const struct GNUNET_MessageHeader *msg;
668   struct GNUNET_CONNECTION_TransmitHandle *th;
669   char *cbuf;
670
671   if (buf == NULL)
672     {
673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674                   "Transmission to client failed, closing connection.\n");
675       /* fatal error with client, free message queue! */
676       while (NULL != (q = client->message_queue_head))
677         {
678           client->message_queue_head = q->next;
679           GNUNET_free (q);
680         }
681       client->message_queue_tail = NULL;
682       client->message_count = 0;
683       return 0;
684     }
685   cbuf = buf;
686   tsize = 0;
687   while (NULL != (q = client->message_queue_head))
688     {
689       msg = (const struct GNUNET_MessageHeader *) &q[1];
690       msize = ntohs (msg->size);
691       if (msize + tsize > size)
692         break;
693 #if DEBUG_TRANSPORT
694       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
695                   "Transmitting message of type %u to client.\n",
696                   ntohs (msg->type));
697 #endif
698       client->message_queue_head = q->next;
699       if (q->next == NULL)
700         client->message_queue_tail = NULL;
701       memcpy (&cbuf[tsize], msg, msize);
702       tsize += msize;
703       GNUNET_free (q);
704       client->message_count--;
705     }
706   if (NULL != q)
707     {
708       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
709       th = GNUNET_SERVER_notify_transmit_ready (client->client,
710                                                 msize,
711                                                 GNUNET_TIME_UNIT_FOREVER_REL,
712                                                 &transmit_to_client_callback,
713                                                 client);
714       GNUNET_assert (th != NULL);
715     }
716   return tsize;
717 }
718
719
720 /**
721  * Send the specified message to the specified client.  Since multiple
722  * messages may be pending for the same client at a time, this code
723  * makes sure that no message is lost.
724  *
725  * @param client client to transmit the message to
726  * @param msg the message to send
727  * @param may_drop can this message be dropped if the
728  *        message queue for this client is getting far too large?
729  */
730 static void
731 transmit_to_client (struct TransportClient *client,
732                     const struct GNUNET_MessageHeader *msg, int may_drop)
733 {
734   struct ClientMessageQueueEntry *q;
735   uint16_t msize;
736   struct GNUNET_CONNECTION_TransmitHandle *th;
737
738   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
739     {
740       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
741                   _
742                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
743                   client->message_count, MAX_PENDING);
744       /* TODO: call to statistics... */
745       return;
746     }
747   client->message_count++;
748   msize = ntohs (msg->size);
749   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
750   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
751   memcpy (&q[1], msg, msize);
752   /* append to message queue */
753   if (client->message_queue_tail == NULL)
754     {
755       client->message_queue_tail = q;
756     }
757   else
758     {
759       client->message_queue_tail->next = q;
760       client->message_queue_tail = q;
761     }
762   if (client->message_queue_head == NULL)
763     {
764       client->message_queue_head = q;
765       th = GNUNET_SERVER_notify_transmit_ready (client->client,
766                                                 msize,
767                                                 GNUNET_TIME_UNIT_FOREVER_REL,
768                                                 &transmit_to_client_callback,
769                                                 client);
770       GNUNET_assert (th != NULL);
771     }
772 }
773
774
775 /**
776  * Find alternative plugins for communication.
777  *
778  * @param neighbour for which neighbour should we try to find
779  *        more plugins?
780  */
781 static void
782 try_alternative_plugins (struct NeighbourList *neighbour)
783 {
784   struct ReadyList *rl;
785
786   if ((neighbour->plugins != NULL) &&
787       (neighbour->retry_plugins_time.value >
788        GNUNET_TIME_absolute_get ().value))
789     return;                     /* don't try right now */
790   neighbour->retry_plugins_time
791     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
792
793   rl = neighbour->plugins;
794   while (rl != NULL)
795     {
796       if (rl->connect_attempts > 0)
797         rl->connect_attempts--; /* amnesty */
798       rl = rl->next;
799     }
800
801 }
802
803
804 /**
805  * The peer specified by the given neighbour has timed-out or a plugin
806  * has disconnected.  We may either need to do nothing (other plugins
807  * still up), or trigger a full disconnect and clean up.  This
808  * function updates our state and do the necessary notifications.
809  * Also notifies our clients that the neighbour is now officially
810  * gone.
811  *
812  * @param n the neighbour list entry for the peer
813  * @param check should we just check if all plugins
814  *        disconnected or must we ask all plugins to
815  *        disconnect?
816  */
817 static void disconnect_neighbour (struct NeighbourList *n, int check);
818
819
820 /**
821  * Check the ready list for the given neighbour and
822  * if a plugin is ready for transmission (and if we
823  * have a message), do so!
824  *
825  * @param neighbour target peer for which to check the plugins
826  */
827 static void try_transmission_to_peer (struct NeighbourList *neighbour);
828
829
830 /**
831  * Function called by the GNUNET_TRANSPORT_TransmitFunction
832  * upon "completion" of a send request.  This tells the API
833  * that it is now legal to send another message to the given
834  * peer.
835  *
836  * @param cls closure, identifies the entry on the
837  *            message queue that was transmitted and the
838  *            client responsible for queueing the message
839  * @param target the peer receiving the message
840  * @param result GNUNET_OK on success, if the transmission
841  *           failed, we should not tell the client to transmit
842  *           more messages
843  */
844 static void
845 transmit_send_continuation (void *cls,
846                             const struct GNUNET_PeerIdentity *target,
847                             int result)
848 {
849   struct MessageQueue *mq = cls;
850   struct ReadyList *rl;
851   struct SendOkMessage send_ok_msg;
852   struct NeighbourList *n;
853
854   GNUNET_assert (mq != NULL);
855   n = mq->neighbour;
856   GNUNET_assert (n != NULL);
857   GNUNET_assert (0 ==
858                  memcmp (&n->id, target,
859                          sizeof (struct GNUNET_PeerIdentity)));
860   rl = n->plugins;
861   while ((rl != NULL) && (rl->plugin != mq->plugin))
862     rl = rl->next;
863   GNUNET_assert (rl != NULL);
864   if (result == GNUNET_OK)
865     {
866       rl->timeout =
867         GNUNET_TIME_relative_to_absolute
868         (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
869     }
870   else
871     {
872       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                   "Transmission to peer `%s' failed, marking connection as down.\n",
874                   GNUNET_i2s (target));
875       rl->connected = GNUNET_NO;
876     }
877   if (!mq->internal_msg)
878     rl->transmit_ready = GNUNET_YES;
879   if (mq->client != NULL)
880     {
881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882                   "Notifying client %p about failed transission to peer `%4s'.\n",
883                   mq->client, GNUNET_i2s (target));
884       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
885       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
886       send_ok_msg.success = htonl (result);
887       send_ok_msg.peer = n->id;
888       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
889     }
890   GNUNET_free (mq->message);
891   GNUNET_free (mq);
892   /* one plugin just became ready again, try transmitting
893      another message (if available) */
894   if (result == GNUNET_OK)
895     try_transmission_to_peer (n);
896   else
897     disconnect_neighbour (n, GNUNET_YES);
898 }
899
900
901 /**
902  * Check the ready list for the given neighbour and
903  * if a plugin is ready for transmission (and if we
904  * have a message), do so!
905  */
906 static void
907 try_transmission_to_peer (struct NeighbourList *neighbour)
908 {
909   struct ReadyList *pos;
910   struct GNUNET_TIME_Relative min_latency;
911   struct ReadyList *rl;
912   struct MessageQueue *mq;
913   struct GNUNET_TIME_Absolute now;
914
915   if (neighbour->messages == NULL)
916     return;                     /* nothing to do */
917   try_alternative_plugins (neighbour);
918   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
919   rl = NULL;
920   mq = neighbour->messages;
921   now = GNUNET_TIME_absolute_get ();
922   pos = neighbour->plugins;
923   while (pos != NULL)
924     {
925       /* set plugins that are inactive for a long time back to disconnected */
926       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
927         {
928 #if DEBUG_TRANSPORT
929           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930                       "Marking long-time inactive connection to `%4s' as down.\n",
931                       GNUNET_i2s (&neighbour->id));
932 #endif
933           pos->connected = GNUNET_NO;
934         }
935       if (((GNUNET_YES == pos->transmit_ready) ||
936            (mq->internal_msg)) &&
937           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
938           ((rl == NULL) || (min_latency.value > pos->latency.value)))
939         {
940           rl = pos;
941           min_latency = pos->latency;
942         }
943       pos = pos->next;
944     }
945   if (rl == NULL)
946     {
947 #if DEBUG_TRANSPORT
948       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949                   "No plugin ready to transmit message\n");
950 #endif
951       return;                   /* nobody ready */
952     }
953   if (GNUNET_NO == rl->connected)
954     {
955       rl->connect_attempts++;
956       rl->connected = GNUNET_YES;
957 #if DEBUG_TRANSPORT
958       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959                   "Establishing fresh connection with `%4s' via plugin `%s'\n",
960                   GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
961 #endif
962     }
963   neighbour->messages = mq->next;
964   mq->plugin = rl->plugin;
965   if (!mq->internal_msg)
966     rl->transmit_ready = GNUNET_NO;
967 #if DEBUG_TRANSPORT
968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
970               ntohs (mq->message->type),
971               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
972 #endif
973   rl->plugin->api->send (rl->plugin->api->cls,
974                          &neighbour->id,
975                          mq->priority,
976                          mq->message,
977                          GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
978                          &transmit_send_continuation, mq);
979 }
980
981
982 /**
983  * Send the specified message to the specified peer.
984  *
985  * @param client source of the transmission request (can be NULL)
986  * @param priority how important is the message
987  * @param msg message to send
988  * @param is_internal is this an internal message
989  * @param neighbour handle to the neighbour for transmission
990  */
991 static void
992 transmit_to_peer (struct TransportClient *client,
993                   unsigned int priority,
994                   const struct GNUNET_MessageHeader *msg,
995                   int is_internal, struct NeighbourList *neighbour)
996 {
997   struct MessageQueue *mq;
998   struct MessageQueue *mqe;
999   struct GNUNET_MessageHeader *m;
1000
1001 #if DEBUG_TRANSPORT
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003               _("Sending message of type %u to peer `%4s'\n"),
1004               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
1005 #endif
1006   if (client != NULL)
1007     {
1008       /* check for duplicate submission */
1009       mq = neighbour->messages;
1010       while (NULL != mq)
1011         {
1012           if (mq->client == client)
1013             {
1014               /* client transmitted to same peer twice
1015                  before getting SendOk! */
1016               GNUNET_break (0);
1017               return;
1018             }
1019           mq = mq->next;
1020         }
1021     }
1022   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1023   mq->client = client;
1024   m = GNUNET_malloc (ntohs (msg->size));
1025   memcpy (m, msg, ntohs (msg->size));
1026   mq->message = m;
1027   mq->neighbour = neighbour;
1028   mq->internal_msg = is_internal;
1029   mq->priority = priority;
1030
1031   /* find tail */
1032   mqe = neighbour->messages;
1033   if (mqe != NULL)
1034     while (mqe->next != NULL)
1035       mqe = mqe->next;
1036   if (mqe == NULL)
1037     {
1038       /* new head */
1039       neighbour->messages = mq;
1040       try_transmission_to_peer (neighbour);
1041     }
1042   else
1043     {
1044       /* append */
1045       mqe->next = mq;
1046     }
1047 }
1048
1049
1050 /**
1051  * FIXME: document.
1052  */
1053 struct GeneratorContext
1054 {
1055   struct TransportPlugin *plug_pos;
1056   struct AddressList *addr_pos;
1057   struct GNUNET_TIME_Absolute expiration;
1058 };
1059
1060
1061 /**
1062  * FIXME: document.
1063  */
1064 static size_t
1065 address_generator (void *cls, size_t max, void *buf)
1066 {
1067   struct GeneratorContext *gc = cls;
1068   size_t ret;
1069
1070   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1071     {
1072       gc->plug_pos = gc->plug_pos->next;
1073       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1074     }
1075   if (NULL == gc->plug_pos)
1076     return 0;
1077   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1078                                   gc->expiration,
1079                                   gc->addr_pos->addr,
1080                                   gc->addr_pos->addrlen, buf, max);
1081   gc->addr_pos = gc->addr_pos->next;
1082   return ret;
1083 }
1084
1085
1086 /**
1087  * Construct our HELLO message from all of the addresses of
1088  * all of the transports.
1089  */
1090 static void
1091 refresh_hello ()
1092 {
1093   struct GNUNET_HELLO_Message *hello;
1094   struct TransportClient *cpos;
1095   struct NeighbourList *npos;
1096   struct GeneratorContext gc;
1097
1098 #if DEBUG_TRANSPORT
1099   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1100               "Refreshing my `%s'\n", "HELLO");
1101 #endif
1102   gc.plug_pos = plugins;
1103   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1104   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1105   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1106   cpos = clients;
1107   while (cpos != NULL)
1108     {
1109       transmit_to_client (cpos,
1110                           (const struct GNUNET_MessageHeader *) hello,
1111                           GNUNET_NO);
1112       cpos = cpos->next;
1113     }
1114
1115   GNUNET_free_non_null (our_hello);
1116   our_hello = hello;
1117   our_hello_version++;
1118   GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
1119   npos = neighbours;
1120   while (npos != NULL)
1121     {
1122 #if DEBUG_TRANSPORT
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1124                   "Transmitting updated `%s' to neighbour `%4s'\n",
1125                   "HELLO", GNUNET_i2s (&npos->id));
1126 #endif
1127       transmit_to_peer (NULL, 0,
1128                         (const struct GNUNET_MessageHeader *) our_hello,
1129                         GNUNET_YES, npos);
1130       npos = npos->next;
1131     }
1132 }
1133
1134
1135 /**
1136  * Task used to clean up expired addresses for a plugin.
1137  *
1138  * @param cls closure
1139  * @param tc context
1140  */
1141 static void
1142 expire_address_task (void *cls,
1143                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1144
1145
1146 /**
1147  * Update the list of addresses for this plugin,
1148  * expiring those that are past their expiration date.
1149  *
1150  * @param plugin addresses of which plugin should be recomputed?
1151  * @param fresh set to GNUNET_YES if a new address was added
1152  *        and we need to regenerate the HELLO even if nobody
1153  *        expired
1154  */
1155 static void
1156 update_addresses (struct TransportPlugin *plugin, int fresh)
1157 {
1158   struct GNUNET_TIME_Relative min_remaining;
1159   struct GNUNET_TIME_Relative remaining;
1160   struct GNUNET_TIME_Absolute now;
1161   struct AddressList *pos;
1162   struct AddressList *prev;
1163   struct AddressList *next;
1164   int expired;
1165
1166   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
1167     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1168   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1169   now = GNUNET_TIME_absolute_get ();
1170   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1171   expired = GNUNET_NO;
1172   prev = NULL;
1173   pos = plugin->addresses;
1174   while (pos != NULL)
1175     {
1176       next = pos->next;
1177       if (pos->expires.value < now.value)
1178         {
1179           expired = GNUNET_YES;
1180           if (prev == NULL)
1181             plugin->addresses = pos->next;
1182           else
1183             prev->next = pos->next;
1184           GNUNET_free (pos);
1185         }
1186       else
1187         {
1188           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1189           if (remaining.value < min_remaining.value)
1190             min_remaining = remaining;
1191           prev = pos;
1192         }
1193       pos = next;
1194     }
1195
1196   if (expired || fresh)
1197     refresh_hello ();
1198   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1199     plugin->address_update_task
1200       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1201                                       min_remaining,
1202                                       &expire_address_task, plugin);
1203
1204 }
1205
1206
1207 /**
1208  * Task used to clean up expired addresses for a plugin.
1209  *
1210  * @param cls closure
1211  * @param tc context
1212  */
1213 static void
1214 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1215 {
1216   struct TransportPlugin *plugin = cls;
1217   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1218   update_addresses (plugin, GNUNET_NO);
1219 }
1220
1221
1222 /**
1223  * Function that must be called by each plugin to notify the
1224  * transport service about the addresses under which the transport
1225  * provided by the plugin can be reached.
1226  *
1227  * @param cls closure
1228  * @param name name of the transport that generated the address
1229  * @param addr one of the addresses of the host, NULL for the last address
1230  *        the specific address format depends on the transport
1231  * @param addrlen length of the address
1232  * @param expires when should this address automatically expire?
1233  */
1234 static void
1235 plugin_env_notify_address (void *cls,
1236                            const char *name,
1237                            const void *addr,
1238                            size_t addrlen,
1239                            struct GNUNET_TIME_Relative expires)
1240 {
1241   struct TransportPlugin *p = cls;
1242   struct AddressList *al;
1243   struct GNUNET_TIME_Absolute abex;
1244
1245   abex = GNUNET_TIME_relative_to_absolute (expires);
1246   GNUNET_assert (p == find_transport (name));
1247
1248   al = p->addresses;
1249   while (al != NULL)
1250     {
1251       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1252         {
1253           if (al->expires.value < abex.value)
1254             al->expires = abex;
1255           return;
1256         }
1257       al = al->next;
1258     }
1259 #if DEBUG_TRANSPORT
1260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261               "Plugin `%s' informs us about a new address `%s'\n", name,
1262               GNUNET_a2s (addr, addrlen));
1263 #endif
1264   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1265   al->addr = &al[1];
1266   al->next = p->addresses;
1267   p->addresses = al;
1268   al->expires = abex;
1269   al->addrlen = addrlen;
1270   memcpy (&al[1], addr, addrlen);
1271   update_addresses (p, GNUNET_YES);
1272 }
1273
1274
1275 /**
1276  * Notify all of our clients about a peer connecting.
1277  */
1278 static void
1279 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1280                         struct GNUNET_TIME_Relative latency)
1281 {
1282   struct ConnectInfoMessage cim;
1283   struct TransportClient *cpos;
1284
1285 #if DEBUG_TRANSPORT
1286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287               "Informing clients about peer `%4s' connecting to us\n",
1288               GNUNET_i2s (peer));
1289 #endif
1290   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1291   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1292   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
1293   cim.latency = GNUNET_TIME_relative_hton (latency);
1294   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1295   cpos = clients;
1296   while (cpos != NULL)
1297     {
1298       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1299       cpos = cpos->next;
1300     }
1301 }
1302
1303
1304 /**
1305  * Notify all of our clients about a peer disconnecting.
1306  */
1307 static void
1308 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1309 {
1310   struct DisconnectInfoMessage dim;
1311   struct TransportClient *cpos;
1312
1313 #if DEBUG_TRANSPORT
1314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315               "Informing clients about peer `%4s' disconnecting\n",
1316               GNUNET_i2s (peer));
1317 #endif
1318   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1319   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1320   dim.reserved = htonl (0);
1321   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1322   cpos = clients;
1323   while (cpos != NULL)
1324     {
1325       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1326       cpos = cpos->next;
1327     }
1328 }
1329
1330
1331 /**
1332  * Copy any validated addresses to buf.
1333  *
1334  * @return 0 once all addresses have been
1335  *         returned
1336  */
1337 static size_t
1338 list_validated_addresses (void *cls, size_t max, void *buf)
1339 {
1340   struct ValidationAddress **va = cls;
1341   size_t ret;
1342
1343   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1344     *va = (*va)->next;
1345   if (NULL == *va)
1346     return 0;
1347   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1348                                   (*va)->expiration,
1349                                   &(*va)[1], (*va)->addr_len, buf, max);
1350   *va = (*va)->next;
1351   return ret;
1352 }
1353
1354
1355 /**
1356  * HELLO validation cleanup task.
1357  */
1358 static void
1359 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1360 {
1361   struct ValidationAddress *va;
1362   struct ValidationList *pos;
1363   struct ValidationList *prev;
1364   struct GNUNET_TIME_Absolute now;
1365   struct GNUNET_TIME_Absolute first;
1366   struct GNUNET_HELLO_Message *hello;
1367   struct GNUNET_PeerIdentity pid;
1368   struct NeighbourList *n;
1369
1370 #if DEBUG_TRANSPORT
1371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1372               "HELLO validation cleanup background task running...\n");
1373 #endif
1374   now = GNUNET_TIME_absolute_get ();
1375   prev = NULL;
1376   pos = pending_validations;
1377   while (pos != NULL)
1378     {
1379       if (pos->timeout.value < now.value)
1380         {
1381           if (prev == NULL)
1382             pending_validations = pos->next;
1383           else
1384             prev->next = pos->next;
1385           va = pos->addresses;
1386           hello = GNUNET_HELLO_create (&pos->publicKey,
1387                                        &list_validated_addresses, &va);
1388           GNUNET_CRYPTO_hash (&pos->publicKey,
1389                               sizeof (struct
1390                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1391                               &pid.hashPubKey);
1392 #if DEBUG_TRANSPORT
1393           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1395                       "HELLO", GNUNET_i2s (&pid));
1396 #endif
1397           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1398           n = find_neighbour (&pid);
1399           if (NULL != n)
1400             try_transmission_to_peer (n);
1401           GNUNET_free (hello);
1402           while (NULL != (va = pos->addresses))
1403             {
1404               pos->addresses = va->next;
1405               GNUNET_free (va->transport_name);
1406               GNUNET_free (va);
1407             }
1408           GNUNET_free (pos);
1409           if (prev == NULL)
1410             pos = pending_validations;
1411           else
1412             pos = prev->next;
1413           continue;
1414         }
1415       prev = pos;
1416       pos = pos->next;
1417     }
1418
1419   /* finally, reschedule cleanup if needed; list is
1420      ordered by timeout, so we need the last element... */
1421   if (NULL != pending_validations)
1422     {
1423       first = pending_validations->timeout;
1424       pos = pending_validations;
1425       while (pos != NULL)
1426         {
1427           first = GNUNET_TIME_absolute_min (first, pos->timeout);
1428           pos = pos->next;
1429         }
1430       GNUNET_SCHEDULER_add_delayed (sched,
1431                                     GNUNET_TIME_absolute_get_remaining
1432                                     (first), &cleanup_validation, NULL);
1433     }
1434 }
1435
1436
1437
1438
1439 /**
1440  * Function that will be called if we receive a validation
1441  * of an address challenge that we transmitted to another
1442  * peer.  Note that the validation should only be considered
1443  * acceptable if the challenge matches AND if the sender
1444  * address is at least a plausible address for this peer
1445  * (otherwise we may be seeing a MiM attack).
1446  *
1447  * @param cls closure
1448  * @param name name of the transport that generated the address
1449  * @param peer who responded to our challenge
1450  * @param challenge the challenge number we presumably used
1451  * @param sender_addr string describing our sender address (as observed
1452  *         by the other peer in human-readable format)
1453  */
1454 static void
1455 plugin_env_notify_validation (void *cls,
1456                               const char *name,
1457                               const struct GNUNET_PeerIdentity *peer,
1458                               uint32_t challenge, const char *sender_addr)
1459 {
1460   unsigned int not_done;
1461   int matched;
1462   struct ValidationList *pos;
1463   struct ValidationAddress *va;
1464   struct GNUNET_PeerIdentity id;
1465
1466   pos = pending_validations;
1467   while (pos != NULL)
1468     {
1469       GNUNET_CRYPTO_hash (&pos->publicKey,
1470                           sizeof (struct
1471                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1472                           &id.hashPubKey);
1473       if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1474         break;
1475       pos = pos->next;
1476     }
1477   if (pos == NULL)
1478     {
1479       /* TODO: call statistics (unmatched PONG) */
1480       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1481                   _
1482                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1483                   GNUNET_i2s (peer));
1484       return;
1485     }
1486   not_done = 0;
1487   matched = GNUNET_NO;
1488   va = pos->addresses;
1489   while (va != NULL)
1490     {
1491       if (va->challenge == challenge)
1492         {
1493 #if DEBUG_TRANSPORT
1494           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1495                       "Confirmed validity of address, peer `%4s' has address `%s'.\n",
1496                       GNUNET_i2s (peer),
1497                       GNUNET_a2s ((const struct sockaddr *) &va[1],
1498                                   va->addr_len));
1499 #endif
1500           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1501                       _
1502                       ("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1503                       sender_addr, name);
1504           va->ok = GNUNET_YES;
1505           va->expiration =
1506             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1507           matched = GNUNET_YES;
1508         }
1509       if (va->ok != GNUNET_YES)
1510         not_done++;
1511       va = va->next;
1512     }
1513   if (GNUNET_NO == matched)
1514     {
1515       /* TODO: call statistics (unmatched PONG) */
1516       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1517                   _
1518                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1519                   "PONG", "PING");
1520     }
1521   if (0 == not_done)
1522     {
1523 #if DEBUG_TRANSPORT
1524       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525                   "All addresses validated, will now construct `%s' for `%4s'.\n",
1526                   "HELLO", GNUNET_i2s (peer));
1527 #endif
1528       pos->timeout.value = 0;
1529       GNUNET_SCHEDULER_add_with_priority (sched,
1530                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1531                                           &cleanup_validation, NULL);
1532     }
1533   else
1534     {
1535 #if DEBUG_TRANSPORT
1536       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537                   "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n",
1538                   not_done, "PONG", "HELLO", GNUNET_i2s (peer));
1539 #endif
1540     }
1541 }
1542
1543
1544 struct CheckHelloValidatedContext
1545 {
1546   /**
1547    * Plugin for which we are validating.
1548    */
1549   struct TransportPlugin *plugin;
1550
1551   /**
1552    * Hello that we are validating.
1553    */
1554   struct GNUNET_HELLO_Message *hello;
1555
1556   /**
1557    * Validation list being build.
1558    */
1559   struct ValidationList *e;
1560
1561   /**
1562    * Context for peerinfo iteration.
1563    * NULL after we are done processing peerinfo's information.
1564    */
1565   struct GNUNET_PEERINFO_IteratorContext *piter;
1566
1567 };
1568
1569
1570 /**
1571  * Append the given address to the list of entries
1572  * that need to be validated.
1573  */
1574 static int
1575 run_validation (void *cls,
1576                 const char *tname,
1577                 struct GNUNET_TIME_Absolute expiration,
1578                 const void *addr, size_t addrlen)
1579 {
1580   struct ValidationList *e = cls;
1581   struct TransportPlugin *tp;
1582   struct ValidationAddress *va;
1583   struct GNUNET_PeerIdentity id;
1584
1585   tp = find_transport (tname);
1586   if (tp == NULL)
1587     {
1588       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1589                   GNUNET_ERROR_TYPE_BULK,
1590                   _
1591                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1592                   tname);
1593       return GNUNET_OK;
1594     }
1595   GNUNET_CRYPTO_hash (&e->publicKey,
1596                       sizeof (struct
1597                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1598                       &id.hashPubKey);
1599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1601               GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id));
1602
1603   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1604   va->next = e->addresses;
1605   e->addresses = va;
1606   va->transport_name = GNUNET_strdup (tname);
1607   va->addr_len = addrlen;
1608   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1609                                             (unsigned int) -1);
1610   memcpy (&va[1], addr, addrlen);
1611   return GNUNET_OK;
1612 }
1613
1614
1615 /**
1616  * Check if addresses in validated hello "h" overlap with
1617  * those in "chvc->hello" and update "chvc->hello" accordingly,
1618  * removing those addresses that have already been validated.
1619  */
1620 static void
1621 check_hello_validated (void *cls,
1622                        const struct GNUNET_PeerIdentity *peer,
1623                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1624 {
1625   struct CheckHelloValidatedContext *chvc = cls;
1626   struct ValidationAddress *va;
1627   struct TransportPlugin *tp;
1628   int first_call;
1629   struct GNUNET_PeerIdentity apeer;
1630
1631   first_call = GNUNET_NO;
1632   if (chvc->e == NULL)
1633     {
1634       chvc->piter = NULL;
1635       first_call = GNUNET_YES;
1636       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1637       GNUNET_assert (GNUNET_OK ==
1638                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1639                                            &chvc->e->publicKey));
1640       chvc->e->timeout =
1641         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1642       chvc->e->next = pending_validations;
1643       pending_validations = chvc->e;
1644     }
1645   if (h != NULL)
1646     {
1647       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1648                                           h,
1649                                           GNUNET_TIME_absolute_get (),
1650                                           &run_validation, chvc->e);
1651     }
1652   else if (GNUNET_YES == first_call)
1653     {
1654       /* no existing HELLO, all addresses are new */
1655       GNUNET_HELLO_iterate_addresses (chvc->hello,
1656                                       GNUNET_NO, &run_validation, chvc->e);
1657     }
1658   if (h != NULL)
1659     return;                     /* wait for next call */
1660   /* finally, transmit validation attempts */
1661   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer));
1662 #if DEBUG_TRANSPORT
1663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664               "Ready to validate addresses from `%s' message for peer `%4s'\n",
1665               "HELLO", GNUNET_i2s (&apeer));
1666 #endif
1667   va = chvc->e->addresses;
1668   while (va != NULL)
1669     {
1670 #if DEBUG_TRANSPORT
1671       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                   "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n",
1673                   va->transport_name,
1674                   "HELLO",
1675                   GNUNET_a2s ((const struct sockaddr *) &va[1],
1676                               va->addr_len), GNUNET_i2s (&apeer));
1677 #endif
1678       tp = find_transport (va->transport_name);
1679       GNUNET_assert (tp != NULL);
1680       if (GNUNET_OK !=
1681           tp->api->validate (tp->api->cls,
1682                              &apeer,
1683                              va->challenge,
1684                              HELLO_VERIFICATION_TIMEOUT,
1685                              &va[1], va->addr_len))
1686         va->ok = GNUNET_SYSERR;
1687       va = va->next;
1688     }
1689   GNUNET_SCHEDULER_add_delayed (sched,
1690                                 GNUNET_TIME_absolute_get_remaining (chvc->
1691                                                                     e->timeout),
1692                                 &cleanup_validation, NULL);
1693   GNUNET_free (chvc);
1694 }
1695
1696
1697 /**
1698  * Process HELLO-message.
1699  *
1700  * @param plugin transport involved, may be NULL
1701  * @param message the actual message
1702  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1703  */
1704 static int
1705 process_hello (struct TransportPlugin *plugin,
1706                const struct GNUNET_MessageHeader *message)
1707 {
1708   struct ValidationList *e;
1709   uint16_t hsize;
1710   struct GNUNET_PeerIdentity target;
1711   const struct GNUNET_HELLO_Message *hello;
1712   struct CheckHelloValidatedContext *chvc;
1713   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1714
1715   hsize = ntohs (message->size);
1716   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1717       (hsize < sizeof (struct GNUNET_MessageHeader)))
1718     {
1719       GNUNET_break (0);
1720       return GNUNET_SYSERR;
1721     }
1722   /* first, check if load is too high */
1723   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1724     {
1725       /* TODO: call to stats? */
1726       return GNUNET_OK;
1727     }
1728   hello = (const struct GNUNET_HELLO_Message *) message;
1729   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1730     {
1731       GNUNET_break_op (0);
1732       return GNUNET_SYSERR;
1733     }
1734   GNUNET_CRYPTO_hash (&publicKey,
1735                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1736                       &target.hashPubKey);
1737 #if DEBUG_TRANSPORT
1738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739               "Processing `%s' message for `%4s'\n",
1740               "HELLO", GNUNET_i2s (&target));
1741 #endif
1742   /* check if a HELLO for this peer is already on the validation list */
1743   e = pending_validations;
1744   while (e != NULL)
1745     {
1746       if (0 == memcmp (&e->publicKey,
1747                        &publicKey,
1748                        sizeof (struct
1749                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1750         {
1751           /* TODO: call to stats? */
1752 #if DEBUG_TRANSPORT
1753           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1755                       "HELLO", GNUNET_i2s (&target));
1756 #endif
1757           return GNUNET_OK;
1758         }
1759       e = e->next;
1760     }
1761   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1762   chvc->plugin = plugin;
1763   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1764   memcpy (chvc->hello, hello, hsize);
1765   /* finally, check if HELLO was previously validated
1766      (continuation will then schedule actual validation) */
1767   chvc->piter = GNUNET_PEERINFO_iterate (cfg,
1768                                          sched,
1769                                          &target,
1770                                          0,
1771                                          HELLO_VERIFICATION_TIMEOUT,
1772                                          &check_hello_validated, chvc);
1773   return GNUNET_OK;
1774 }
1775
1776
1777 /**
1778  * The peer specified by the given neighbour has timed-out or a plugin
1779  * has disconnected.  We may either need to do nothing (other plugins
1780  * still up), or trigger a full disconnect and clean up.  This
1781  * function updates our state and do the necessary notifications.
1782  * Also notifies our clients that the neighbour is now officially
1783  * gone.
1784  *
1785  * @param n the neighbour list entry for the peer
1786  * @param check should we just check if all plugins
1787  *        disconnected or must we ask all plugins to
1788  *        disconnect?
1789  */
1790 static void
1791 disconnect_neighbour (struct NeighbourList *n, int check)
1792 {
1793   struct ReadyList *rpos;
1794   struct NeighbourList *npos;
1795   struct NeighbourList *nprev;
1796   struct MessageQueue *mq;
1797
1798   if (GNUNET_YES == check)
1799     {
1800       rpos = n->plugins;
1801       while (NULL != rpos)
1802         {
1803           if (GNUNET_YES == rpos->connected)
1804             return;             /* still connected */
1805           rpos = rpos->next;
1806         }
1807     }
1808
1809 #if DEBUG_TRANSPORT
1810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1811               "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
1812 #endif
1813   /* remove n from neighbours list */
1814   nprev = NULL;
1815   npos = neighbours;
1816   while ((npos != NULL) && (npos != n))
1817     {
1818       nprev = npos;
1819       npos = npos->next;
1820     }
1821   GNUNET_assert (npos != NULL);
1822   if (nprev == NULL)
1823     neighbours = n->next;
1824   else
1825     nprev->next = n->next;
1826
1827   /* notify all clients about disconnect */
1828   notify_clients_disconnect (&n->id);
1829
1830   /* clean up all plugins, cancel connections and pending transmissions */
1831   while (NULL != (rpos = n->plugins))
1832     {
1833       n->plugins = rpos->next;
1834       GNUNET_assert (rpos->neighbour == n);
1835       if (GNUNET_YES == rpos->connected)
1836         rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
1837       GNUNET_free (rpos);
1838     }
1839
1840   /* free all messages on the queue */
1841   while (NULL != (mq = n->messages))
1842     {
1843       n->messages = mq->next;
1844       GNUNET_assert (mq->neighbour == n);
1845       GNUNET_free (mq);
1846     }
1847   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1848     GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
1849   /* finally, free n itself */
1850   GNUNET_free (n);
1851 }
1852
1853
1854 /**
1855  * Add an entry for each of our transport plugins
1856  * (that are able to send) to the list of plugins
1857  * for this neighbour.
1858  *
1859  * @param neighbour to initialize
1860  */
1861 static void
1862 add_plugins (struct NeighbourList *neighbour)
1863 {
1864   struct TransportPlugin *tp;
1865   struct ReadyList *rl;
1866
1867   neighbour->retry_plugins_time
1868     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1869   tp = plugins;
1870   while (tp != NULL)
1871     {
1872       if (tp->api->send != NULL)
1873         {
1874           rl = GNUNET_malloc (sizeof (struct ReadyList));
1875           rl->next = neighbour->plugins;
1876           neighbour->plugins = rl;
1877           rl->plugin = tp;
1878           rl->neighbour = neighbour;
1879           rl->transmit_ready = GNUNET_YES;
1880         }
1881       tp = tp->next;
1882     }
1883 }
1884
1885
1886 static void
1887 neighbour_timeout_task (void *cls,
1888                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1889 {
1890   struct NeighbourList *n = cls;
1891
1892 #if DEBUG_TRANSPORT
1893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1894               "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
1895 #endif
1896   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1897   disconnect_neighbour (n, GNUNET_NO);
1898 }
1899
1900
1901 /**
1902  * Create a fresh entry in our neighbour list for the given peer.
1903  * Will try to transmit our current HELLO to the new neighbour.  Also
1904  * notifies our clients about the new "connection".
1905  *
1906  * @param peer the peer for which we create the entry
1907  * @return the new neighbour list entry
1908  */
1909 static struct NeighbourList *
1910 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
1911 {
1912   struct NeighbourList *n;
1913
1914 #if DEBUG_TRANSPORT
1915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1916               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
1917               GNUNET_i2s (peer));
1918 #endif
1919   GNUNET_assert (our_hello != NULL);
1920   n = GNUNET_malloc (sizeof (struct NeighbourList));
1921   n->next = neighbours;
1922   neighbours = n;
1923   n->id = *peer;
1924   n->last_quota_update = GNUNET_TIME_absolute_get ();
1925   n->peer_timeout =
1926     GNUNET_TIME_relative_to_absolute
1927     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1928   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
1929   add_plugins (n);
1930   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
1931                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1932                                                   &neighbour_timeout_task, n);
1933   transmit_to_peer (NULL, 0,
1934                     (const struct GNUNET_MessageHeader *) our_hello,
1935                     GNUNET_YES, n);
1936   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
1937   return n;
1938 }
1939
1940
1941 /**
1942  * Function called by the plugin for each received message.
1943  * Update data volumes, possibly notify plugins about
1944  * reducing the rate at which they read from the socket
1945  * and generally forward to our receive callback.
1946  *
1947  * @param cls the "struct TransportPlugin *" we gave to the plugin
1948  * @param latency estimated latency for communicating with the
1949  *             given peer
1950  * @param peer (claimed) identity of the other peer
1951  * @param message the message, NULL if peer was disconnected
1952  * @return the new service_context that the plugin should use
1953  *         for future receive calls for messages from this
1954  *         particular peer
1955  */
1956 static void
1957 plugin_env_receive (void *cls,
1958                     struct GNUNET_TIME_Relative latency,
1959                     const struct GNUNET_PeerIdentity *peer,
1960                     const struct GNUNET_MessageHeader *message)
1961 {
1962   const struct GNUNET_MessageHeader ack = {
1963     htons (sizeof (struct GNUNET_MessageHeader)),
1964     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
1965   };
1966   struct ReadyList *service_context;
1967   struct TransportPlugin *plugin = cls;
1968   struct TransportClient *cpos;
1969   struct InboundMessage *im;
1970   uint16_t msize;
1971   struct NeighbourList *n;
1972
1973   n = find_neighbour (peer);
1974   if (n == NULL)
1975     {
1976       if (message == NULL)
1977         return;                 /* disconnect of peer already marked down */
1978       n = setup_new_neighbour (peer);
1979     }
1980   service_context = n->plugins;
1981   while ((service_context != NULL) && (plugin != service_context->plugin))
1982     service_context = service_context->next;
1983   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
1984   if (message == NULL)
1985     {
1986 #if DEBUG_TRANSPORT
1987       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1988                   "Receive failed from `%4s', triggering disconnect\n",
1989                   GNUNET_i2s (&n->id));
1990 #endif
1991       /* TODO: call stats */
1992       if (service_context != NULL)
1993         service_context->connected = GNUNET_NO;
1994       disconnect_neighbour (n, GNUNET_YES);
1995       return;
1996     }
1997 #if DEBUG_TRANSPORT
1998   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1999               "Processing message of type `%u' received by plugin...\n",
2000               ntohs (message->type));
2001 #endif
2002   if (service_context != NULL)
2003     {
2004       if (service_context->connected == GNUNET_NO)
2005         {
2006           service_context->connected = GNUNET_YES;
2007           service_context->transmit_ready = GNUNET_YES;
2008           service_context->connect_attempts++;
2009         }
2010       service_context->timeout
2011         =
2012         GNUNET_TIME_relative_to_absolute
2013         (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2014       service_context->latency = latency;
2015     }
2016   /* update traffic received amount ... */
2017   msize = ntohs (message->size);
2018   n->last_received += msize;
2019   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2020   n->peer_timeout =
2021     GNUNET_TIME_relative_to_absolute
2022     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2023   n->timeout_task =
2024     GNUNET_SCHEDULER_add_delayed (sched,
2025                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2026                                   &neighbour_timeout_task, n);
2027   update_quota (n);
2028   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2029     {
2030       /* dropping message due to frequent inbound volume violations! */
2031       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2032                   GNUNET_ERROR_TYPE_BULK,
2033                   _
2034                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2035       /* TODO: call stats */
2036       GNUNET_assert ((service_context == NULL) ||
2037                      (NULL != service_context->neighbour));
2038       return;
2039     }
2040   switch (ntohs (message->type))
2041     {
2042     case GNUNET_MESSAGE_TYPE_HELLO:
2043 #if DEBUG_TRANSPORT
2044       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2045                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2046                   GNUNET_i2s (peer));
2047 #endif
2048       process_hello (plugin, message);
2049 #if DEBUG_TRANSPORT
2050       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2051                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2052                   GNUNET_i2s (peer));
2053 #endif
2054       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2055       break;
2056     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2057       n->saw_ack = GNUNET_YES;
2058       /* intentional fall-through! */
2059     default:
2060 #if DEBUG_TRANSPORT
2061       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062                   "Received message of type %u from `%4s', sending to all clients.\n",
2063                   ntohs (message->type), GNUNET_i2s (peer));
2064 #endif
2065       /* transmit message to all clients */
2066       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2067       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2068       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2069       im->latency = GNUNET_TIME_relative_hton (latency);
2070       im->peer = *peer;
2071       memcpy (&im[1], message, msize);
2072
2073       cpos = clients;
2074       while (cpos != NULL)
2075         {
2076           transmit_to_client (cpos, &im->header, GNUNET_YES);
2077           cpos = cpos->next;
2078         }
2079       GNUNET_free (im);
2080     }
2081   GNUNET_assert ((service_context == NULL) ||
2082                  (NULL != service_context->neighbour));
2083 }
2084
2085
2086 /**
2087  * Handle START-message.  This is the first message sent to us
2088  * by any client which causes us to add it to our list.
2089  *
2090  * @param cls closure (always NULL)
2091  * @param client identification of the client
2092  * @param message the actual message
2093  */
2094 static void
2095 handle_start (void *cls,
2096               struct GNUNET_SERVER_Client *client,
2097               const struct GNUNET_MessageHeader *message)
2098 {
2099   struct TransportClient *c;
2100   struct ConnectInfoMessage cim;
2101   struct NeighbourList *n;
2102   struct InboundMessage *im;
2103   struct GNUNET_MessageHeader *ack;
2104
2105 #if DEBUG_TRANSPORT
2106   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2107               "Received `%s' request from client\n", "START");
2108 #endif
2109   c = clients;
2110   while (c != NULL)
2111     {
2112       if (c->client == client)
2113         {
2114           /* client already on our list! */
2115           GNUNET_break (0);
2116           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2117           return;
2118         }
2119       c = c->next;
2120     }
2121   c = GNUNET_malloc (sizeof (struct TransportClient));
2122   c->next = clients;
2123   clients = c;
2124   c->client = client;
2125   if (our_hello != NULL)
2126     {
2127 #if DEBUG_TRANSPORT
2128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2129                   "Sending our own `%s' to new client\n", "HELLO");
2130 #endif
2131       transmit_to_client (c,
2132                           (const struct GNUNET_MessageHeader *) our_hello,
2133                           GNUNET_NO);
2134       /* tell new client about all existing connections */
2135       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2136       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2137       cim.quota_out =
2138         htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2139       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2140       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2141                           sizeof (struct GNUNET_MessageHeader));
2142       im->header.size = htons (sizeof (struct InboundMessage) +
2143                                sizeof (struct GNUNET_MessageHeader));
2144       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2145       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2146       ack = (struct GNUNET_MessageHeader *) &im[1];
2147       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2148       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2149       for (n = neighbours; n != NULL; n = n->next)
2150         {
2151           cim.id = n->id;
2152           transmit_to_client (c, &cim.header, GNUNET_NO);
2153           if (n->saw_ack)
2154             {
2155               im->peer = n->id;
2156               transmit_to_client (c, &im->header, GNUNET_NO);
2157             }
2158         }
2159       GNUNET_free (im);
2160     }
2161   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2162 }
2163
2164
2165 /**
2166  * Handle HELLO-message.
2167  *
2168  * @param cls closure (always NULL)
2169  * @param client identification of the client
2170  * @param message the actual message
2171  */
2172 static void
2173 handle_hello (void *cls,
2174               struct GNUNET_SERVER_Client *client,
2175               const struct GNUNET_MessageHeader *message)
2176 {
2177   int ret;
2178
2179 #if DEBUG_TRANSPORT
2180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2181               "Received `%s' request from client\n", "HELLO");
2182 #endif
2183   ret = process_hello (NULL, message);
2184   GNUNET_SERVER_receive_done (client, ret);
2185 }
2186
2187
2188 /**
2189  * Handle SEND-message.
2190  *
2191  * @param cls closure (always NULL)
2192  * @param client identification of the client
2193  * @param message the actual message
2194  */
2195 static void
2196 handle_send (void *cls,
2197              struct GNUNET_SERVER_Client *client,
2198              const struct GNUNET_MessageHeader *message)
2199 {
2200   struct TransportClient *tc;
2201   struct NeighbourList *n;
2202   const struct OutboundMessage *obm;
2203   const struct GNUNET_MessageHeader *obmm;
2204   uint16_t size;
2205   uint16_t msize;
2206
2207   size = ntohs (message->size);
2208   if (size <
2209       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2210     {
2211       GNUNET_break (0);
2212       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2213       return;
2214     }
2215   obm = (const struct OutboundMessage *) message;
2216 #if DEBUG_TRANSPORT
2217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2218               "Received `%s' request from client with target `%4s'\n",
2219               "SEND", GNUNET_i2s (&obm->peer));
2220 #endif
2221   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2222   msize = ntohs (obmm->size);
2223   if (size != msize + sizeof (struct OutboundMessage))
2224     {
2225       GNUNET_break (0);
2226       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2227       return;
2228     }
2229   n = find_neighbour (&obm->peer);
2230   if (n == NULL)
2231     n = setup_new_neighbour (&obm->peer);
2232   tc = clients;
2233   while ((tc != NULL) && (tc->client != client))
2234     tc = tc->next;
2235
2236 #if DEBUG_TRANSPORT
2237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2238               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2239               ntohs (obmm->size),
2240               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2241 #endif
2242   transmit_to_peer (tc, ntohl (obm->priority), obmm, GNUNET_NO, n);
2243   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2244 }
2245
2246
2247 /**
2248  * Handle SET_QUOTA-message.
2249  *
2250  * @param cls closure (always NULL)
2251  * @param client identification of the client
2252  * @param message the actual message
2253  */
2254 static void
2255 handle_set_quota (void *cls,
2256                   struct GNUNET_SERVER_Client *client,
2257                   const struct GNUNET_MessageHeader *message)
2258 {
2259   const struct QuotaSetMessage *qsm =
2260     (const struct QuotaSetMessage *) message;
2261   struct NeighbourList *n;
2262   struct TransportPlugin *p;
2263   struct ReadyList *rl;
2264
2265 #if DEBUG_TRANSPORT
2266   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2267               "Received `%s' request from client for peer `%4s'\n",
2268               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2269 #endif
2270   n = find_neighbour (&qsm->peer);
2271   if (n == NULL)
2272     {
2273       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2274       return;
2275     }
2276   update_quota (n);
2277   if (n->quota_in < ntohl (qsm->quota_in))
2278     n->last_quota_update = GNUNET_TIME_absolute_get ();
2279   n->quota_in = ntohl (qsm->quota_in);
2280   rl = n->plugins;
2281   while (rl != NULL)
2282     {
2283       p = rl->plugin;
2284       p->api->set_receive_quota (p->api->cls,
2285                                  &qsm->peer, ntohl (qsm->quota_in));
2286       rl = rl->next;
2287     }
2288   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2289 }
2290
2291
2292 /**
2293  * Handle TRY_CONNECT-message.
2294  *
2295  * @param cls closure (always NULL)
2296  * @param client identification of the client
2297  * @param message the actual message
2298  */
2299 static void
2300 handle_try_connect (void *cls,
2301                     struct GNUNET_SERVER_Client *client,
2302                     const struct GNUNET_MessageHeader *message)
2303 {
2304   const struct TryConnectMessage *tcm;
2305
2306   tcm = (const struct TryConnectMessage *) message;
2307 #if DEBUG_TRANSPORT
2308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309               "Received `%s' request from client %p asking to connect to `%4s'\n",
2310               "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
2311 #endif
2312   if (NULL == find_neighbour (&tcm->peer))
2313     setup_new_neighbour (&tcm->peer);
2314 #if DEBUG_TRANSPORT
2315   else
2316     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2317                 "Client asked to connect to `%4s', but connection already exists\n",
2318                 "TRY_CONNECT", GNUNET_i2s (&tcm->peer));
2319 #endif
2320   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2321 }
2322
2323 static void
2324 transmit_address_to_client (void *cls, const char *address)
2325 {
2326   struct GNUNET_SERVER_TransmitContext *tc = cls;
2327   size_t slen;
2328
2329   if (NULL == address)
2330     slen = 0;
2331   else
2332     slen = strlen (address) + 1;
2333   GNUNET_SERVER_transmit_context_append (tc, address, slen,
2334                                          GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY);
2335   if (NULL == address)
2336     GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
2337 }
2338
2339 /**
2340  * Handle AddressLookup-message.
2341  *
2342  * @param cls closure (always NULL)
2343  * @param client identification of the client
2344  * @param message the actual message
2345  */
2346 static void
2347 handle_address_lookup (void *cls,
2348                        struct GNUNET_SERVER_Client *client,
2349                        const struct GNUNET_MessageHeader *message)
2350 {
2351   const struct AddressLookupMessage *alum;
2352   struct TransportPlugin *lsPlugin;
2353   const char *nameTransport;
2354   const char *address;
2355   uint16_t size;
2356   struct GNUNET_SERVER_TransmitContext *tc;
2357
2358   size = ntohs (message->size);
2359   if (size < sizeof (struct AddressLookupMessage))
2360     {
2361       GNUNET_break_op (0);
2362       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2363       return;
2364     }
2365   alum = (const struct AddressLookupMessage *) message;
2366   uint32_t addressLen = ntohl (alum->addrlen);
2367   if (size <= sizeof (struct AddressLookupMessage) + addressLen)
2368     {
2369       GNUNET_break_op (0);
2370       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2371       return;
2372     }
2373   address = (const char *) &alum[1];
2374   nameTransport = (const char *) &address[addressLen];
2375   if (nameTransport
2376       [size - sizeof (struct AddressLookupMessage) - addressLen - 1] != '\0')
2377     {
2378       GNUNET_break_op (0);
2379       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2380       return;
2381     }
2382   struct GNUNET_TIME_Absolute timeout =
2383     GNUNET_TIME_absolute_ntoh (alum->timeout);
2384   struct GNUNET_TIME_Relative rtimeout =
2385     GNUNET_TIME_absolute_get_remaining (timeout);
2386   lsPlugin = find_transport (nameTransport);
2387   if (NULL == lsPlugin)
2388     {
2389       tc = GNUNET_SERVER_transmit_context_create (client);
2390       GNUNET_SERVER_transmit_context_append (tc, NULL, 0,
2391                                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY);
2392       GNUNET_SERVER_transmit_context_run (tc, rtimeout);
2393       return;
2394     }
2395   tc = GNUNET_SERVER_transmit_context_create (client);
2396   lsPlugin->api->address_pretty_printer (cls, nameTransport,
2397                                          address, addressLen, GNUNET_YES,
2398                                          rtimeout,
2399                                          &transmit_address_to_client, tc);
2400 }
2401
2402 /**
2403  * List of handlers for the messages understood by this
2404  * service.
2405  */
2406 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2407   {&handle_start, NULL,
2408    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2409   {&handle_hello, NULL,
2410    GNUNET_MESSAGE_TYPE_HELLO, 0},
2411   {&handle_send, NULL,
2412    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2413   {&handle_set_quota, NULL,
2414    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2415   {&handle_try_connect, NULL,
2416    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2417    sizeof (struct TryConnectMessage)},
2418   {&handle_address_lookup, NULL,
2419    GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
2420    0},
2421   {NULL, NULL, 0, 0}
2422 };
2423
2424
2425 /**
2426  * Setup the environment for this plugin.
2427  */
2428 static void
2429 create_environment (struct TransportPlugin *plug)
2430 {
2431   plug->env.cfg = cfg;
2432   plug->env.sched = sched;
2433   plug->env.my_public_key = &my_public_key;
2434   plug->env.my_private_key = my_private_key;
2435   plug->env.my_identity = &my_identity;
2436   plug->env.cls = plug;
2437   plug->env.receive = &plugin_env_receive;
2438   plug->env.notify_address = &plugin_env_notify_address;
2439   plug->env.notify_validation = &plugin_env_notify_validation;
2440   plug->env.default_quota_in =
2441     (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2442   plug->env.max_connections = max_connect_per_transport;
2443 }
2444
2445
2446 /**
2447  * Start the specified transport (load the plugin).
2448  */
2449 static void
2450 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2451 {
2452   struct TransportPlugin *plug;
2453   char *libname;
2454
2455   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2456               _("Loading `%s' transport plugin\n"), name);
2457   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2458   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2459   create_environment (plug);
2460   plug->short_name = GNUNET_strdup (name);
2461   plug->lib_name = libname;
2462   plug->next = plugins;
2463   plugins = plug;
2464   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2465   if (plug->api == NULL)
2466     {
2467       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2468                   _("Failed to load transport plugin for `%s'\n"), name);
2469       GNUNET_free (plug->short_name);
2470       plugins = plug->next;
2471       GNUNET_free (libname);
2472       GNUNET_free (plug);
2473     }
2474 }
2475
2476
2477 /**
2478  * Called whenever a client is disconnected.  Frees our
2479  * resources associated with that client.
2480  *
2481  * @param cls closure
2482  * @param client identification of the client
2483  */
2484 static void
2485 client_disconnect_notification (void *cls,
2486                                 struct GNUNET_SERVER_Client *client)
2487 {
2488   struct TransportClient *pos;
2489   struct TransportClient *prev;
2490   struct ClientMessageQueueEntry *mqe;
2491
2492 #if DEBUG_TRANSPORT
2493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2494               "Client disconnected, cleaning up.\n");
2495 #endif
2496   prev = NULL;
2497   pos = clients;
2498   while ((pos != NULL) && (pos->client != client))
2499     {
2500       prev = pos;
2501       pos = pos->next;
2502     }
2503   if (pos == NULL)
2504     return;
2505   while (NULL != (mqe = pos->message_queue_head))
2506     {
2507       pos->message_queue_head = mqe->next;
2508       GNUNET_free (mqe);
2509     }
2510   pos->message_queue_head = NULL;
2511   if (prev == NULL)
2512     clients = pos->next;
2513   else
2514     prev->next = pos->next;
2515   if (GNUNET_YES == pos->tcs_pending)
2516     {
2517       pos->client = NULL;
2518       return;
2519     }
2520   GNUNET_free (pos);
2521 }
2522
2523
2524 /**
2525  * Function called when the service shuts down.  Unloads our plugins.
2526  *
2527  * @param cls closure, unused
2528  * @param tc task context (unused)
2529  */
2530 static void
2531 unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2532 {
2533   struct TransportPlugin *plug;
2534   struct AddressList *al;
2535
2536 #if DEBUG_TRANSPORT
2537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2538               "Transport service is unloading plugins...\n");
2539 #endif
2540   while (NULL != (plug = plugins))
2541     {
2542       plugins = plug->next;
2543       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2544       GNUNET_free (plug->lib_name);
2545       GNUNET_free (plug->short_name);
2546       while (NULL != (al = plug->addresses))
2547         {
2548           plug->addresses = al->next;
2549           GNUNET_free (al);
2550         }
2551       GNUNET_free (plug);
2552     }
2553   if (my_private_key != NULL)
2554     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2555   GNUNET_free_non_null (our_hello);
2556 }
2557
2558
2559 /**
2560  * Initiate transport service.
2561  *
2562  * @param cls closure
2563  * @param s scheduler to use
2564  * @param serv the initialized server
2565  * @param c configuration to use
2566  */
2567 static void
2568 run (void *cls,
2569      struct GNUNET_SCHEDULER_Handle *s,
2570      struct GNUNET_SERVER_Handle *serv,
2571      const struct GNUNET_CONFIGURATION_Handle *c)
2572 {
2573   char *plugs;
2574   char *pos;
2575   int no_transports;
2576   unsigned long long tneigh;
2577   char *keyfile;
2578
2579   sched = s;
2580   cfg = c;
2581   /* parse configuration */
2582   if ((GNUNET_OK !=
2583        GNUNET_CONFIGURATION_get_value_number (c,
2584                                               "TRANSPORT",
2585                                               "NEIGHBOUR_LIMIT",
2586                                               &tneigh)) ||
2587       (GNUNET_OK !=
2588        GNUNET_CONFIGURATION_get_value_filename (c,
2589                                                 "GNUNETD",
2590                                                 "HOSTKEY", &keyfile)))
2591     {
2592       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2593                   _
2594                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2595       GNUNET_SCHEDULER_shutdown (s);
2596       return;
2597     }
2598   max_connect_per_transport = (uint32_t) tneigh;
2599   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2600   GNUNET_free (keyfile);
2601   if (my_private_key == NULL)
2602     {
2603       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2604                   _
2605                   ("Transport service could not access hostkey.  Exiting.\n"));
2606       GNUNET_SCHEDULER_shutdown (s);
2607       return;
2608     }
2609   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2610   GNUNET_CRYPTO_hash (&my_public_key,
2611                       sizeof (my_public_key), &my_identity.hashPubKey);
2612   /* setup notification */
2613   server = serv;
2614   GNUNET_SERVER_disconnect_notify (server,
2615                                    &client_disconnect_notification, NULL);
2616   /* load plugins... */
2617   no_transports = 1;
2618   if (GNUNET_OK ==
2619       GNUNET_CONFIGURATION_get_value_string (c,
2620                                              "TRANSPORT", "PLUGINS", &plugs))
2621     {
2622       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2623                   _("Starting transport plugins `%s'\n"), plugs);
2624       pos = strtok (plugs, " ");
2625       while (pos != NULL)
2626         {
2627           start_transport (server, pos);
2628           no_transports = 0;
2629           pos = strtok (NULL, " ");
2630         }
2631       GNUNET_free (plugs);
2632     }
2633   GNUNET_SCHEDULER_add_delayed (sched,
2634                                 GNUNET_TIME_UNIT_FOREVER_REL,
2635                                 &unload_plugins, NULL);
2636   if (no_transports)
2637     refresh_hello ();
2638 #if DEBUG_TRANSPORT
2639   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Transport service ready.\n"));
2640 #endif
2641   /* process client requests */
2642   GNUNET_SERVER_add_handlers (server, handlers);
2643 }
2644
2645
2646 /**
2647  * The main function for the transport service.
2648  *
2649  * @param argc number of arguments from the command line
2650  * @param argv command line arguments
2651  * @return 0 ok, 1 on error
2652  */
2653 int
2654 main (int argc, char *const *argv)
2655 {
2656   return (GNUNET_OK ==
2657           GNUNET_SERVICE_run (argc,
2658                               argv,
2659                               "transport",
2660                               GNUNET_SERVICE_OPTION_NONE,
2661                               &run, NULL)) ? 0 : 1;
2662 }
2663
2664 /* end of gnunet-service-transport.c */