7cb5869e9a0448fc76ca13a77ef20501cd62a535
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  */
71 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_UNIT_MINUTES
72
73 /**
74  * How often do we re-add (cheaper) plugins to our list of plugins
75  * to try for a given connected peer?
76  */
77 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
78
79 /**
80  * After how long do we expire an address in a HELLO
81  * that we just validated?  This value is also used
82  * for our own addresses when we create a HELLO.
83  */
84 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
85
86
87 /**
88  * Entry in linked list of network addresses.
89  */
90 struct AddressList
91 {
92   /**
93    * This is a linked list.
94    */
95   struct AddressList *next;
96
97   /**
98    * The address, actually a pointer to the end
99    * of this struct.  Do not free!
100    */
101   void *addr;
102
103   /**
104    * How long until we auto-expire this address (unless it is
105    * re-confirmed by the transport)?
106    */
107   struct GNUNET_TIME_Absolute expires;
108
109   /**
110    * Length of addr.
111    */
112   size_t addrlen;
113
114 };
115
116
117 /**
118  * Entry in linked list of all of our plugins.
119  */
120 struct TransportPlugin
121 {
122
123   /**
124    * This is a linked list.
125    */
126   struct TransportPlugin *next;
127
128   /**
129    * API of the transport as returned by the plugin's
130    * initialization function.
131    */
132   struct GNUNET_TRANSPORT_PluginFunctions *api;
133
134   /**
135    * Short name for the plugin (i.e. "tcp").
136    */
137   char *short_name;
138
139   /**
140    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
141    */
142   char *lib_name;
143
144   /**
145    * List of our known addresses for this transport.
146    */
147   struct AddressList *addresses;
148
149   /**
150    * Environment this transport service is using
151    * for this plugin.
152    */
153   struct GNUNET_TRANSPORT_PluginEnvironment env;
154
155   /**
156    * ID of task that is used to clean up expired addresses.
157    */
158   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
159
160
161   /**
162    * Set to GNUNET_YES if we need to scrap the existing
163    * list of "addresses" and start fresh when we receive
164    * the next address update from a transport.  Set to
165    * GNUNET_NO if we should just add the new address
166    * to the list and wait for the commit call.
167    */
168   int rebuild;
169 };
170
171 struct NeighbourList;
172
173 /**
174  * For each neighbour we keep a list of messages
175  * that we still want to transmit to the neighbour.
176  */
177 struct MessageQueue
178 {
179
180   /**
181    * This is a linked list.
182    */
183   struct MessageQueue *next;
184
185   /**
186    * The message we want to transmit.
187    */
188   struct GNUNET_MessageHeader *message;
189
190   /**
191    * Client responsible for queueing the message;
192    * used to check that a client has not two messages
193    * pending for the same target.  Can be NULL.
194    */
195   struct TransportClient *client;
196
197   /**
198    * Neighbour this entry belongs to.
199    */
200   struct NeighbourList *neighbour;
201
202   /**
203    * Plugin that we used for the transmission.
204    * NULL until we scheduled a transmission.
205    */
206   struct TransportPlugin *plugin;
207
208   /**
209    * Internal message of the transport system that should not be
210    * included in the usual SEND-SEND_OK transmission confirmation
211    * traffic management scheme.  Typically, "internal_msg" will
212    * be set whenever "client" is NULL (but it is not strictly
213    * required).
214    */
215   int internal_msg;
216
217   /**
218    * How important is the message?
219    */
220   unsigned int priority;
221   
222 };
223
224
225 /**
226  * For a given Neighbour, which plugins are available
227  * to talk to this peer and what are their costs?
228  */
229 struct ReadyList
230 {
231
232   /**
233    * This is a linked list.
234    */
235   struct ReadyList *next;
236
237   /**
238    * Which of our transport plugins does this entry
239    * represent?
240    */
241   struct TransportPlugin *plugin;
242
243   /**
244    * Neighbour this entry belongs to.
245    */
246   struct NeighbourList *neighbour;
247
248   /**
249    * Opaque handle (specific to the plugin) for the
250    * connection to our target; can be NULL.
251    */
252   void *plugin_handle;
253
254   /**
255    * What was the last latency observed for this plugin
256    * and peer?  Invalid if connected is GNUNET_NO.
257    */
258   struct GNUNET_TIME_Relative latency;
259
260   /**
261    * If we did not successfully transmit a message to the
262    * given peer via this connection during the specified
263    * time, we should consider the connection to be dead.
264    * This is used in the case that a TCP transport simply
265    * stalls writing to the stream but does not formerly
266    * get a signal that the other peer died.
267    */
268   struct GNUNET_TIME_Absolute timeout;
269
270   /**
271    * Is this plugin currently connected?  The first time
272    * we transmit or send data to a peer via a particular
273    * plugin, we set this to GNUNET_YES.  If we later get
274    * an error (disconnect notification or transmission
275    * failure), we set it back to GNUNET_NO.  Each time the
276    * value is set to GNUNET_YES, we increment the
277    * "connect_attempts" counter.  If that one reaches a
278    * particular threshold, we consider the plugin to not
279    * be working properly at this time for the given peer
280    * and remove it from the eligible list.
281    */
282   int connected;
283
284   /**
285    * How often have we tried to connect using this plugin?
286    */
287   unsigned int connect_attempts;
288
289   /**
290    * Is this plugin ready to transmit to the specific
291    * target?  GNUNET_NO if not.  Initially, all plugins
292    * are marked ready.  If a transmission is in progress,
293    * "transmit_ready" is set to GNUNET_NO.
294    */
295   int transmit_ready;
296
297 };
298
299
300 /**
301  * Entry in linked list of all of our current neighbours.
302  */
303 struct NeighbourList
304 {
305
306   /**
307    * This is a linked list.
308    */
309   struct NeighbourList *next;
310
311   /**
312    * Which of our transports is connected to this peer
313    * and what is their status?
314    */
315   struct ReadyList *plugins;
316
317   /**
318    * List of messages we would like to send to this peer;
319    * must contain at most one message per client.
320    */
321   struct MessageQueue *messages;
322
323   /**
324    * Identity of this neighbour.
325    */
326   struct GNUNET_PeerIdentity id;
327
328   /**
329    * ID of task scheduled to run when this peer is about to
330    * time out (will free resources associated with the peer).
331    */
332   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
333
334   /**
335    * How long until we should consider this peer dead
336    * (if we don't receive another message in the
337    * meantime)?
338    */
339   struct GNUNET_TIME_Absolute peer_timeout;
340
341   /**
342    * At what time did we reset last_received last?
343    */
344   struct GNUNET_TIME_Absolute last_quota_update;
345
346   /**
347    * At what time should we try to again add plugins to
348    * our ready list?
349    */
350   struct GNUNET_TIME_Absolute retry_plugins_time;
351
352   /**
353    * How many bytes have we received since the "last_quota_update"
354    * timestamp?
355    */
356   uint64_t last_received;
357
358   /**
359    * Global quota for inbound traffic for the neighbour in bytes/ms.
360    */
361   uint32_t quota_in;
362
363   /**
364    * How often has the other peer (recently) violated the
365    * inbound traffic limit?  Incremented by 10 per violation,
366    * decremented by 1 per non-violation (for each
367    * time interval).
368    */
369   unsigned int quota_violation_count;
370
371   /**
372    * Have we seen an ACK from this neighbour in the past?
373    * (used to make up a fake ACK for clients connecting after
374    * the neighbour connected to us).
375    */
376   int saw_ack;
377
378 };
379
380
381 /**
382  * Linked list of messages to be transmitted to
383  * the client.  Each entry is followed by the
384  * actual message.
385  */
386 struct ClientMessageQueueEntry
387 {
388   /**
389    * This is a linked list.
390    */
391   struct ClientMessageQueueEntry *next;
392 };
393
394
395 /**
396  * Client connected to the transport service.
397  */
398 struct TransportClient
399 {
400
401   /**
402    * This is a linked list.
403    */
404   struct TransportClient *next;
405
406   /**
407    * Handle to the client.
408    */
409   struct GNUNET_SERVER_Client *client;
410
411   /**
412    * Linked list of messages yet to be transmitted to
413    * the client.
414    */
415   struct ClientMessageQueueEntry *message_queue_head;
416
417   /**
418    * Tail of linked list of messages yet to be transmitted to the
419    * client.
420    */
421   struct ClientMessageQueueEntry *message_queue_tail;
422
423   /**
424    * Is a call to "transmit_send_continuation" pending?  If so, we
425    * must not free this struct (even if the corresponding client
426    * disconnects) and instead only remove it from the linked list and
427    * set the "client" field to NULL.
428    */
429   int tcs_pending;
430
431   /**
432    * Length of the list of messages pending for this client.
433    */
434   unsigned int message_count;
435
436 };
437
438
439 /**
440  * For each HELLO, we may have to validate multiple addresses;
441  * each address gets its own request entry.
442  */
443 struct ValidationAddress
444 {
445   /**
446    * This is a linked list.
447    */
448   struct ValidationAddress *next;
449
450   /**
451    * Name of the transport.
452    */
453   char *transport_name;
454
455   /**
456    * When should this validated address expire?
457    */
458   struct GNUNET_TIME_Absolute expiration;
459
460   /**
461    * Length of the address we are validating.
462    */
463   size_t addr_len;
464
465   /**
466    * Challenge number we used.
467    */
468   uint32_t challenge;
469
470   /**
471    * Set to GNUNET_YES if the challenge was met,
472    * GNUNET_SYSERR if we know it failed, GNUNET_NO
473    * if we are waiting on a response.
474    */
475   int ok;
476 };
477
478
479 /**
480  * Entry in linked list of all HELLOs awaiting validation.
481  */
482 struct ValidationList
483 {
484
485   /**
486    * This is a linked list.
487    */
488   struct ValidationList *next;
489
490   /**
491    * Linked list with one entry per address from the HELLO
492    * that needs to be validated.
493    */
494   struct ValidationAddress *addresses;
495
496   /**
497    * The public key of the peer.
498    */
499   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
500
501   /**
502    * When does this record time-out? (assuming the
503    * challenge goes unanswered)
504    */
505   struct GNUNET_TIME_Absolute timeout;
506
507 };
508
509
510 /**
511  * HELLOs awaiting validation.
512  */
513 static struct ValidationList *pending_validations;
514
515 /**
516  * Our HELLO message.
517  */
518 static struct GNUNET_HELLO_Message *our_hello;
519
520 /**
521  * "version" of "our_hello".  Used to see if a given
522  * neighbour has already been sent the latest version
523  * of our HELLO message.
524  */
525 static unsigned int our_hello_version;
526
527 /**
528  * Our public key.
529  */
530 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
531
532 /**
533  * Our identity.
534  */
535 static struct GNUNET_PeerIdentity my_identity;
536
537 /**
538  * Our private key.
539  */
540 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
541
542 /**
543  * Our scheduler.
544  */
545 struct GNUNET_SCHEDULER_Handle *sched;
546
547 /**
548  * Our configuration.
549  */
550 struct GNUNET_CONFIGURATION_Handle *cfg;
551
552 /**
553  * Linked list of all clients to this service.
554  */
555 static struct TransportClient *clients;
556
557 /**
558  * All loaded plugins.
559  */
560 static struct TransportPlugin *plugins;
561
562 /**
563  * Our server.
564  */
565 static struct GNUNET_SERVER_Handle *server;
566
567 /**
568  * All known neighbours and their HELLOs.
569  */
570 static struct NeighbourList *neighbours;
571
572 /**
573  * Number of neighbours we'd like to have.
574  */
575 static uint32_t max_connect_per_transport;
576
577
578 /**
579  * Find an entry in the neighbour list for a particular peer.
580  *
581  * @return NULL if not found.
582  */
583 static struct NeighbourList *
584 find_neighbour (const struct GNUNET_PeerIdentity *key)
585 {
586   struct NeighbourList *head = neighbours;
587   while ((head != NULL) &&
588          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
589     head = head->next;
590   return head;
591 }
592
593
594 /**
595  * Find an entry in the transport list for a particular transport.
596  *
597  * @return NULL if not found.
598  */
599 static struct TransportPlugin *
600 find_transport (const char *short_name)
601 {
602   struct TransportPlugin *head = plugins;
603   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
604     head = head->next;
605   return head;
606 }
607
608
609 /**
610  * Update the quota values for the given neighbour now.
611  */
612 static void
613 update_quota (struct NeighbourList *n)
614 {
615   struct GNUNET_TIME_Relative delta;
616   uint64_t allowed;
617   uint64_t remaining;
618
619   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
620   if (delta.value < MIN_QUOTA_REFRESH_TIME)
621     return;                     /* not enough time passed for doing quota update */
622   allowed = delta.value * n->quota_in;
623   if (n->last_received < allowed)
624     {
625       remaining = allowed - n->last_received;
626       if (n->quota_in > 0)
627         remaining /= n->quota_in;
628       else
629         remaining = 0;
630       if (remaining > MAX_BANDWIDTH_CARRY)
631         remaining = MAX_BANDWIDTH_CARRY;
632       n->last_received = 0;
633       n->last_quota_update = GNUNET_TIME_absolute_get ();
634       n->last_quota_update.value -= remaining;
635       if (n->quota_violation_count > 0)
636         n->quota_violation_count--;
637     }
638   else
639     {
640       n->last_received -= allowed;
641       n->last_quota_update = GNUNET_TIME_absolute_get ();
642       if (n->last_received > allowed)
643         {
644           /* more than twice the allowed rate! */
645           n->quota_violation_count += 10;
646         }
647     }
648 }
649
650
651 /**
652  * Function called to notify a client about the socket
653  * being ready to queue more data.  "buf" will be
654  * NULL and "size" zero if the socket was closed for
655  * writing in the meantime.
656  *
657  * @param cls closure
658  * @param size number of bytes available in buf
659  * @param buf where the callee should write the message
660  * @return number of bytes written to buf
661  */
662 static size_t
663 transmit_to_client_callback (void *cls, size_t size, void *buf)
664 {
665   struct TransportClient *client = cls;
666   struct ClientMessageQueueEntry *q;
667   uint16_t msize;
668   size_t tsize;
669   const struct GNUNET_MessageHeader *msg;
670   struct GNUNET_NETWORK_TransmitHandle *th;
671   char *cbuf;
672
673   if (buf == NULL)
674     {
675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676                   "Transmission to client failed, closing connection.\n");
677       /* fatal error with client, free message queue! */
678       while (NULL != (q = client->message_queue_head))
679         {
680           client->message_queue_head = q->next;
681           GNUNET_free (q);
682         }
683       client->message_queue_tail = NULL;
684       client->message_count = 0;
685       return 0;
686     }
687   cbuf = buf;
688   tsize = 0;
689   while (NULL != (q = client->message_queue_head))
690     {
691       msg = (const struct GNUNET_MessageHeader *) &q[1];
692       msize = ntohs (msg->size);
693       if (msize + tsize > size)
694         break;
695       client->message_queue_head = q->next;
696       if (q->next == NULL)
697         client->message_queue_tail = NULL;
698       memcpy (&cbuf[tsize], msg, msize);
699       tsize += msize;
700       GNUNET_free (q);
701       client->message_count--;
702     }
703   GNUNET_assert (tsize > 0);
704   if (NULL != q)
705     {
706       th = GNUNET_SERVER_notify_transmit_ready (client->client,
707                                                 msize,
708                                                 GNUNET_TIME_UNIT_FOREVER_REL,
709                                                 &transmit_to_client_callback,
710                                                 client);
711       GNUNET_assert (th != NULL);
712     }
713   return tsize;
714 }
715
716
717 /**
718  * Send the specified message to the specified client.  Since multiple
719  * messages may be pending for the same client at a time, this code
720  * makes sure that no message is lost.
721  *
722  * @param client client to transmit the message to
723  * @param msg the message to send
724  * @param may_drop can this message be dropped if the
725  *        message queue for this client is getting far too large?
726  */
727 static void
728 transmit_to_client (struct TransportClient *client,
729                     const struct GNUNET_MessageHeader *msg, int may_drop)
730 {
731   struct ClientMessageQueueEntry *q;
732   uint16_t msize;
733   struct GNUNET_NETWORK_TransmitHandle *th;
734
735   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
736     {
737       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
738                   _
739                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
740                   client->message_count, MAX_PENDING);
741       /* TODO: call to statistics... */
742       return;
743     }
744   client->message_count++;
745   msize = ntohs (msg->size);
746   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
747   memcpy (&q[1], msg, msize);
748   /* append to message queue */
749   if (client->message_queue_tail == NULL)
750     {
751       client->message_queue_tail = q;
752     }
753   else
754     {
755       client->message_queue_tail->next = q;
756       client->message_queue_tail = q;
757     }
758   if (client->message_queue_head == NULL)
759     {
760       client->message_queue_head = q;
761       th = GNUNET_SERVER_notify_transmit_ready (client->client,
762                                                 msize,
763                                                 GNUNET_TIME_UNIT_FOREVER_REL,
764                                                 &transmit_to_client_callback,
765                                                 client);
766       GNUNET_assert (th != NULL);
767     }
768 }
769
770
771 /**
772  * Find alternative plugins for communication.
773  *
774  * @param neighbour for which neighbour should we try to find
775  *        more plugins?
776  */
777 static void
778 try_alternative_plugins (struct NeighbourList *neighbour)
779 {
780   struct ReadyList *rl;
781
782   if ((neighbour->plugins != NULL) &&
783       (neighbour->retry_plugins_time.value >
784        GNUNET_TIME_absolute_get ().value))
785     return;                     /* don't try right now */
786   neighbour->retry_plugins_time
787     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
788
789   rl = neighbour->plugins;
790   while (rl != NULL)
791     {
792       if (rl->connect_attempts > 0)
793         rl->connect_attempts--; /* amnesty */
794       rl = rl->next;
795     }
796
797 }
798
799
800 /**
801  * Check the ready list for the given neighbour and
802  * if a plugin is ready for transmission (and if we
803  * have a message), do so!
804  *
805  * @param neighbour target peer for which to check the plugins
806  */
807 static void try_transmission_to_peer (struct NeighbourList *neighbour);
808
809
810 /**
811  * The peer specified by the given neighbour has timed-out.  Update
812  * our state and do the necessary notifications.  Also notifies
813  * our clients that the neighbour is now officially gone.
814  *
815  * @param n the neighbour list entry for the peer
816  */
817 static void
818 disconnect_neighbour (struct NeighbourList *n);
819
820
821 /**
822  * Function called by the GNUNET_TRANSPORT_TransmitFunction
823  * upon "completion" of a send request.  This tells the API
824  * that it is now legal to send another message to the given
825  * peer.
826  *
827  * @param cls closure, identifies the entry on the
828  *            message queue that was transmitted and the
829  *            client responsible for queueing the message
830  * @param rl identifies plugin used for the transmission for
831  *           this neighbour; needs to be re-enabled for
832  *           future transmissions
833  * @param target the peer receiving the message
834  * @param result GNUNET_OK on success, if the transmission
835  *           failed, we should not tell the client to transmit
836  *           more messages
837  */
838 static void
839 transmit_send_continuation (void *cls,
840                             struct ReadyList *rl,
841                             const struct GNUNET_PeerIdentity *target,
842                             int result)
843 {
844   struct MessageQueue *mq = cls;
845   struct SendOkMessage send_ok_msg;
846   struct NeighbourList *n;
847
848   GNUNET_assert (mq != NULL);
849   n = mq->neighbour;
850   GNUNET_assert (n != NULL);
851   GNUNET_assert (0 ==
852                  memcmp (&n->id, target,
853                          sizeof (struct GNUNET_PeerIdentity)));
854   if (rl == NULL)
855     {
856       rl = n->plugins;
857       while ((rl != NULL) && (rl->plugin != mq->plugin))
858         rl = rl->next;
859       GNUNET_assert (rl != NULL);
860     }
861   if (result == GNUNET_OK)
862     {
863       rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
864     }
865   else
866     {
867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                   "Transmission to peer `%s' failed, marking connection as down.\n",
869                   GNUNET_i2s(target));
870       rl->connected = GNUNET_NO;
871     }
872   if (!mq->internal_msg)
873     rl->transmit_ready = GNUNET_YES;
874   if (mq->client != NULL)
875     {
876       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
877       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
878       send_ok_msg.success = htonl (result);
879       send_ok_msg.peer = n->id;
880       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
881     }
882   GNUNET_free (mq->message);
883   GNUNET_free (mq);
884   /* one plugin just became ready again, try transmitting
885      another message (if available) */
886   if (result == GNUNET_OK)
887     try_transmission_to_peer (n);
888 }
889
890
891 /**
892  * Check the ready list for the given neighbour and
893  * if a plugin is ready for transmission (and if we
894  * have a message), do so!
895  */
896 static void
897 try_transmission_to_peer (struct NeighbourList *neighbour)
898 {
899   struct ReadyList *pos;
900   struct GNUNET_TIME_Relative min_latency;
901   struct ReadyList *rl;
902   struct MessageQueue *mq;
903   struct GNUNET_TIME_Absolute now;
904
905   if (neighbour->messages == NULL)
906     return;                     /* nothing to do */
907   try_alternative_plugins (neighbour);
908   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
909   rl = NULL;
910   mq = neighbour->messages;
911   now = GNUNET_TIME_absolute_get ();
912   pos = neighbour->plugins;
913   while (pos != NULL)
914     {
915       /* set plugins that are inactive for a long time back to disconnected */
916       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
917         {
918 #if DEBUG_TRANSPORT
919           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                       "Marking long-time inactive connection to `%4s' as down.\n",
921                       GNUNET_i2s (&neighbour->id));
922 #endif
923           pos->connected = GNUNET_NO;
924         }
925       if (((GNUNET_YES == pos->transmit_ready) ||
926            (mq->internal_msg)) &&
927           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
928           ((rl == NULL) || (min_latency.value > pos->latency.value)))
929         {
930           rl = pos;
931           min_latency = pos->latency;
932         }
933       pos = pos->next;
934     }
935   if (rl == NULL)
936     {
937 #if DEBUG_TRANSPORT
938       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                   "No plugin ready to transmit message\n");
940 #endif
941       return;                   /* nobody ready */
942     }
943   if (GNUNET_NO == rl->connected)
944     {
945       rl->connect_attempts++;
946       rl->connected = GNUNET_YES;
947 #if DEBUG_TRANSPORT
948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949               "Establishing fresh connection with `%4s' via plugin `%s'\n",
950               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
951 #endif
952     }
953   neighbour->messages = mq->next;
954   mq->plugin = rl->plugin;
955   if (!mq->internal_msg)
956     rl->transmit_ready = GNUNET_NO;
957 #if DEBUG_TRANSPORT
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
960               ntohs (mq->message->type),
961               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
962 #endif
963   rl->plugin_handle
964     = rl->plugin->api->send (rl->plugin->api->cls,
965                              rl->plugin_handle,
966                              rl,
967                              &neighbour->id,
968                              mq->priority,
969                              mq->message,
970                              GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
971                              &transmit_send_continuation, mq);
972 }
973
974
975 /**
976  * Send the specified message to the specified peer.
977  *
978  * @param client source of the transmission request (can be NULL)
979  * @param priority how important is the message
980  * @param msg message to send
981  * @param is_internal is this an internal message
982  * @param neighbour handle to the neighbour for transmission
983  */
984 static void
985 transmit_to_peer (struct TransportClient *client,
986                   unsigned int priority,
987                   const struct GNUNET_MessageHeader *msg,
988                   int is_internal, struct NeighbourList *neighbour)
989 {
990   struct MessageQueue *mq;
991   struct MessageQueue *mqe;
992   struct GNUNET_MessageHeader *m;
993
994 #if DEBUG_TRANSPORT
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996               _("Sending message of type %u to peer `%4s'\n"),
997               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
998 #endif
999   if (client != NULL)
1000     {
1001       /* check for duplicate submission */
1002       mq = neighbour->messages;
1003       while (NULL != mq)
1004         {
1005           if (mq->client == client)
1006             {
1007               /* client transmitted to same peer twice
1008                  before getting SendOk! */
1009               GNUNET_break (0);
1010               return;
1011             }
1012           mq = mq->next;
1013         }
1014     }
1015   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1016   mq->client = client;
1017   m = GNUNET_malloc (ntohs (msg->size));
1018   memcpy (m, msg, ntohs (msg->size));
1019   mq->message = m;
1020   mq->neighbour = neighbour;
1021   mq->internal_msg = is_internal;
1022   mq->priority = priority;
1023
1024   /* find tail */
1025   mqe = neighbour->messages;
1026   if (mqe != NULL)
1027     while (mqe->next != NULL)
1028       mqe = mqe->next;
1029   if (mqe == NULL)
1030     {
1031       /* new head */
1032       neighbour->messages = mq;
1033       try_transmission_to_peer (neighbour);
1034     }
1035   else
1036     {
1037       /* append */
1038       mqe->next = mq;
1039     }
1040 }
1041
1042
1043 /**
1044  * FIXME: document.
1045  */
1046 struct GeneratorContext
1047 {
1048   struct TransportPlugin *plug_pos;
1049   struct AddressList *addr_pos;
1050   struct GNUNET_TIME_Absolute expiration;
1051 };
1052
1053
1054 /**
1055  * FIXME: document.
1056  */
1057 static size_t
1058 address_generator (void *cls, size_t max, void *buf)
1059 {
1060   struct GeneratorContext *gc = cls;
1061   size_t ret;
1062
1063   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1064     {
1065       gc->plug_pos = gc->plug_pos->next;
1066       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1067     }
1068   if (NULL == gc->plug_pos)
1069     return 0;
1070   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1071                                   gc->expiration,
1072                                   gc->addr_pos->addr,
1073                                   gc->addr_pos->addrlen, buf, max);
1074   gc->addr_pos = gc->addr_pos->next;
1075   return ret;
1076 }
1077
1078
1079 /**
1080  * Construct our HELLO message from all of the addresses of
1081  * all of the transports.
1082  */
1083 static void
1084 refresh_hello ()
1085 {
1086   struct GNUNET_HELLO_Message *hello;
1087   struct TransportClient *cpos;
1088   struct NeighbourList *npos;
1089   struct GeneratorContext gc;
1090
1091 #if DEBUG_TRANSPORT
1092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1093               "Refreshing my `%s'\n",
1094               "HELLO");
1095 #endif
1096   gc.plug_pos = plugins;
1097   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1098   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1099   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1100   cpos = clients;
1101   while (cpos != NULL)
1102     {
1103       transmit_to_client (cpos,
1104                           (const struct GNUNET_MessageHeader *) hello,
1105                           GNUNET_NO);
1106       cpos = cpos->next;
1107     }
1108
1109   GNUNET_free_non_null (our_hello);
1110   our_hello = hello;
1111   our_hello_version++;
1112   npos = neighbours;
1113   while (npos != NULL)
1114     {
1115 #if DEBUG_TRANSPORT
1116       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1117                   "Transmitting updated `%s' to neighbour `%4s'\n",
1118                   "HELLO",
1119                   GNUNET_i2s(&npos->id));
1120 #endif
1121       transmit_to_peer (NULL, 0,
1122                         (const struct GNUNET_MessageHeader *) our_hello,
1123                         GNUNET_YES, npos);
1124       npos = npos->next;
1125     }
1126 }
1127
1128
1129 /**
1130  * Task used to clean up expired addresses for a plugin.
1131  *
1132  * @param cls closure
1133  * @param tc context
1134  */
1135 static void
1136 expire_address_task (void *cls,
1137                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1138
1139
1140 /**
1141  * Update the list of addresses for this plugin,
1142  * expiring those that are past their expiration date.
1143  *
1144  * @param plugin addresses of which plugin should be recomputed?
1145  * @param fresh set to GNUNET_YES if a new address was added
1146  *        and we need to regenerate the HELLO even if nobody
1147  *        expired
1148  */
1149 static void
1150 update_addresses (struct TransportPlugin *plugin, int fresh)
1151 {
1152   struct GNUNET_TIME_Relative min_remaining;
1153   struct GNUNET_TIME_Relative remaining;
1154   struct GNUNET_TIME_Absolute now;
1155   struct AddressList *pos;
1156   struct AddressList *prev;
1157   struct AddressList *next;
1158   int expired;
1159
1160   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1161     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1162   plugin->address_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1163   now = GNUNET_TIME_absolute_get ();
1164   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1165   expired = GNUNET_NO;
1166   prev = NULL;
1167   pos = plugin->addresses;
1168   while (pos != NULL)
1169     {
1170       next = pos->next;
1171       if (pos->expires.value < now.value)
1172         {
1173           expired = GNUNET_YES;
1174           if (prev == NULL)
1175             plugin->addresses = pos->next;
1176           else
1177             prev->next = pos->next;
1178           GNUNET_free (pos);
1179         }
1180       else
1181         {
1182           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1183           if (remaining.value < min_remaining.value)
1184             min_remaining = remaining;
1185           prev = pos;
1186         }
1187       pos = next;
1188     }
1189
1190   if (expired || fresh)
1191     refresh_hello ();
1192   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1193     plugin->address_update_task
1194       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1195                                       GNUNET_NO,
1196                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1197                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1198                                       min_remaining,
1199                                       &expire_address_task, plugin);
1200
1201 }
1202
1203
1204 /**
1205  * Task used to clean up expired addresses for a plugin.
1206  *
1207  * @param cls closure
1208  * @param tc context
1209  */
1210 static void
1211 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1212 {
1213   struct TransportPlugin *plugin = cls;
1214   plugin->address_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1215   update_addresses (plugin, GNUNET_NO);
1216 }
1217
1218
1219 /**
1220  * Function that must be called by each plugin to notify the
1221  * transport service about the addresses under which the transport
1222  * provided by the plugin can be reached.
1223  *
1224  * @param cls closure
1225  * @param name name of the transport that generated the address
1226  * @param addr one of the addresses of the host, NULL for the last address
1227  *        the specific address format depends on the transport
1228  * @param addrlen length of the address
1229  * @param expires when should this address automatically expire?
1230  */
1231 static void
1232 plugin_env_notify_address (void *cls,
1233                            const char *name,
1234                            const void *addr,
1235                            size_t addrlen,
1236                            struct GNUNET_TIME_Relative expires)
1237 {
1238   struct TransportPlugin *p = cls;
1239   struct AddressList *al;
1240   struct GNUNET_TIME_Absolute abex;
1241
1242   abex = GNUNET_TIME_relative_to_absolute (expires);
1243   GNUNET_assert (p == find_transport (name));
1244
1245   al = p->addresses;
1246   while (al != NULL)
1247     {
1248       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1249         {
1250           if (al->expires.value < abex.value)
1251             al->expires = abex;
1252           return;
1253         }
1254       al = al->next;
1255     }
1256 #if DEBUG_TRANSPORT
1257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258               "Plugin `%s' informs us about a new address `%s'\n", name,
1259               GNUNET_a2s(addr, addrlen));
1260 #endif
1261   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1262   al->addr = &al[1];
1263   al->next = p->addresses;
1264   p->addresses = al;
1265   al->expires = abex;
1266   al->addrlen = addrlen;
1267   memcpy (&al[1], addr, addrlen);
1268   update_addresses (p, GNUNET_YES);
1269 }
1270
1271
1272 /**
1273  * FIXME: document.
1274  */
1275 struct LookupHelloContext
1276 {
1277   GNUNET_TRANSPORT_AddressCallback iterator;
1278
1279   void *iterator_cls;
1280 };
1281
1282
1283 /**
1284  * FIXME: document.
1285  */
1286 static int
1287 lookup_address_callback (void *cls,
1288                          const char *tname,
1289                          struct GNUNET_TIME_Absolute expiration,
1290                          const void *addr, size_t addrlen)
1291 {
1292   struct LookupHelloContext *lhc = cls;
1293   lhc->iterator (lhc->iterator_cls, tname, addr, addrlen);
1294   return GNUNET_OK;
1295 }
1296
1297
1298 /**
1299  * FIXME: document.
1300  */
1301 static void
1302 lookup_hello_callback (void *cls,
1303                        const struct GNUNET_PeerIdentity *peer,
1304                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1305 {
1306   struct LookupHelloContext *lhc = cls;
1307
1308   if (peer == NULL)
1309     {
1310       lhc->iterator (lhc->iterator_cls, NULL, NULL, 0);
1311       GNUNET_free (lhc);
1312       return;
1313     }
1314   if (h == NULL)
1315     return;
1316   GNUNET_HELLO_iterate_addresses (h,
1317                                   GNUNET_NO, &lookup_address_callback, lhc);
1318 }
1319
1320
1321 /**
1322  * Function that allows a transport to query the known
1323  * network addresses for a given peer.
1324  *
1325  * @param cls closure
1326  * @param timeout after how long should we time out?
1327  * @param target which peer are we looking for?
1328  * @param iter function to call for each known address
1329  * @param iter_cls closure for iter
1330  */
1331 static void
1332 plugin_env_lookup_address (void *cls,
1333                            struct GNUNET_TIME_Relative timeout,
1334                            const struct GNUNET_PeerIdentity *target,
1335                            GNUNET_TRANSPORT_AddressCallback iter,
1336                            void *iter_cls)
1337 {
1338   struct LookupHelloContext *lhc;
1339
1340   lhc = GNUNET_malloc (sizeof (struct LookupHelloContext));
1341   lhc->iterator = iter;
1342   lhc->iterator_cls = iter_cls;
1343   GNUNET_PEERINFO_for_all (cfg,
1344                            sched,
1345                            target, 0, timeout, &lookup_hello_callback, &lhc);
1346 }
1347
1348
1349 /**
1350  * Notify all of our clients about a peer connecting.
1351  */
1352 static void
1353 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1354                         struct GNUNET_TIME_Relative latency)
1355 {
1356   struct ConnectInfoMessage cim;
1357   struct TransportClient *cpos;
1358
1359 #if DEBUG_TRANSPORT
1360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361               "Informing clients about peer `%4s' connecting to us\n",
1362               GNUNET_i2s (peer));
1363 #endif
1364   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1365   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1366   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000));
1367   cim.latency = GNUNET_TIME_relative_hton (latency);
1368   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1369   cpos = clients;
1370   while (cpos != NULL)
1371     {
1372       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1373       cpos = cpos->next;
1374     }
1375 }
1376
1377
1378 /**
1379  * Notify all of our clients about a peer disconnecting.
1380  */
1381 static void
1382 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1383 {
1384   struct DisconnectInfoMessage dim;
1385   struct TransportClient *cpos;
1386
1387 #if DEBUG_TRANSPORT
1388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1389               "Informing clients about peer `%4s' disconnecting\n",
1390               GNUNET_i2s (peer));
1391 #endif
1392   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1393   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1394   dim.reserved = htonl (0);
1395   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1396   cpos = clients;
1397   while (cpos != NULL)
1398     {
1399       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1400       cpos = cpos->next;
1401     }
1402 }
1403
1404
1405 /**
1406  * Copy any validated addresses to buf.
1407  *
1408  * @return 0 once all addresses have been
1409  *         returned
1410  */
1411 static size_t
1412 list_validated_addresses (void *cls, size_t max, void *buf)
1413 {
1414   struct ValidationAddress **va = cls;
1415   size_t ret;
1416
1417   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1418     *va = (*va)->next;
1419   if (NULL == *va)
1420     return 0;
1421   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1422                                   (*va)->expiration,
1423                                   &(*va)[1], (*va)->addr_len, buf, max);
1424   *va = (*va)->next;
1425   return ret;
1426 }
1427
1428
1429 /**
1430  * HELLO validation cleanup task.
1431  */
1432 static void
1433 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1434 {
1435   struct ValidationAddress *va;
1436   struct ValidationList *pos;
1437   struct ValidationList *prev;
1438   struct GNUNET_TIME_Absolute now;
1439   struct GNUNET_HELLO_Message *hello;
1440   struct GNUNET_PeerIdentity pid;
1441   struct NeighbourList *n;
1442
1443 #if DEBUG_TRANSPORT
1444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1445               "HELLO validation cleanup background task running...\n");
1446 #endif
1447   now = GNUNET_TIME_absolute_get ();
1448   prev = NULL;
1449   pos = pending_validations;
1450   while (pos != NULL)
1451     {
1452       if (pos->timeout.value < now.value)
1453         {
1454           if (prev == NULL)
1455             pending_validations = pos->next;
1456           else
1457             prev->next = pos->next;
1458           va = pos->addresses;
1459           hello = GNUNET_HELLO_create (&pos->publicKey,
1460                                        &list_validated_addresses, &va);
1461           GNUNET_CRYPTO_hash (&pos->publicKey,
1462                               sizeof (struct
1463                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1464                               &pid.hashPubKey);
1465 #if DEBUG_TRANSPORT
1466           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1467                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1468                       "HELLO", GNUNET_i2s (&pid));
1469 #endif
1470           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1471           n = find_neighbour (&pid);
1472           if (NULL != n)
1473             try_transmission_to_peer (n);           
1474           GNUNET_free (hello);
1475           while (NULL != (va = pos->addresses))
1476             {
1477               pos->addresses = va->next;
1478               GNUNET_free (va->transport_name);
1479               GNUNET_free (va);
1480             }
1481           GNUNET_free (pos);
1482           if (prev == NULL)
1483             pos = pending_validations;
1484           else
1485             pos = prev->next;
1486           continue;
1487         }
1488       prev = pos;
1489       pos = pos->next;
1490     }
1491
1492   /* finally, reschedule cleanup if needed; list is
1493      ordered by timeout, so we need the last element... */
1494   pos = pending_validations;
1495   while ((pos != NULL) && (pos->next != NULL))
1496     pos = pos->next;
1497   if (NULL != pos)
1498     GNUNET_SCHEDULER_add_delayed (sched,
1499                                   GNUNET_NO,
1500                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1501                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1502                                   GNUNET_TIME_absolute_get_remaining
1503                                   (pos->timeout), &cleanup_validation, NULL);
1504 }
1505
1506
1507
1508
1509 /**
1510  * Function that will be called if we receive a validation
1511  * of an address challenge that we transmitted to another
1512  * peer.  Note that the validation should only be considered
1513  * acceptable if the challenge matches AND if the sender
1514  * address is at least a plausible address for this peer
1515  * (otherwise we may be seeing a MiM attack).
1516  *
1517  * @param cls closure
1518  * @param name name of the transport that generated the address
1519  * @param peer who responded to our challenge
1520  * @param challenge the challenge number we presumably used
1521  * @param sender_addr string describing our sender address (as observed
1522  *         by the other peer in human-readable format)
1523  */
1524 static void
1525 plugin_env_notify_validation (void *cls,
1526                               const char *name,
1527                               const struct GNUNET_PeerIdentity *peer,
1528                               uint32_t challenge,
1529                               const char *sender_addr)
1530 {
1531   int all_done;
1532   int matched;
1533   struct ValidationList *pos;
1534   struct ValidationAddress *va;
1535   struct GNUNET_PeerIdentity id;
1536
1537   pos = pending_validations;
1538   while (pos != NULL)
1539     {
1540       GNUNET_CRYPTO_hash (&pos->publicKey,
1541                           sizeof (struct
1542                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1543                           &id.hashPubKey);
1544       if (0 ==
1545           memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1546         break;
1547       pos = pos->next;
1548     }
1549   if (pos == NULL)
1550     {
1551       /* TODO: call statistics (unmatched PONG) */
1552       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1553                   _
1554                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1555                   GNUNET_i2s(peer));
1556       return;
1557     }
1558   all_done = GNUNET_YES;
1559   matched = GNUNET_NO;
1560   va = pos->addresses;
1561   while (va != NULL)
1562     {
1563       if (va->challenge == challenge)
1564         {
1565 #if DEBUG_TRANSPORT
1566           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1567                       "Confirmed validity of peer address.\n");
1568 #endif
1569           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1570                       _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1571                       sender_addr, 
1572                       name);
1573           va->ok = GNUNET_YES;
1574           va->expiration =
1575             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1576           matched = GNUNET_YES;
1577         }        
1578       if (va->ok != GNUNET_YES)
1579         all_done = GNUNET_NO;
1580       va = va->next;
1581     }
1582   if (GNUNET_NO == matched)
1583     {
1584       /* TODO: call statistics (unmatched PONG) */
1585       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1586                   _
1587                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1588                   "PONG", "PING");
1589     }
1590   if (GNUNET_YES == all_done)
1591     {
1592       pos->timeout.value = 0;
1593       GNUNET_SCHEDULER_add_delayed (sched,
1594                                     GNUNET_NO,
1595                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1596                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1597                                     GNUNET_TIME_UNIT_ZERO,
1598                                     &cleanup_validation, NULL);
1599     }
1600 }
1601
1602
1603 struct CheckHelloValidatedContext
1604 {
1605   /**
1606    * Plugin for which we are validating.
1607    */
1608   struct TransportPlugin *plugin;
1609
1610   /**
1611    * Hello that we are validating.
1612    */
1613   struct GNUNET_HELLO_Message *hello;
1614
1615   /**
1616    * Validation list being build.
1617    */
1618   struct ValidationList *e;
1619
1620 };
1621
1622
1623 /**
1624  * Append the given address to the list of entries
1625  * that need to be validated.
1626  */
1627 static int
1628 run_validation (void *cls,
1629                 const char *tname,
1630                 struct GNUNET_TIME_Absolute expiration,
1631                 const void *addr, size_t addrlen)
1632 {
1633   struct ValidationList *e = cls;
1634   struct TransportPlugin *tp;
1635   struct ValidationAddress *va;
1636   struct GNUNET_PeerIdentity id;
1637
1638   tp = find_transport (tname);
1639   if (tp == NULL)
1640     {
1641       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1642                   GNUNET_ERROR_TYPE_BULK,
1643                   _
1644                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1645                   tname);
1646       return GNUNET_OK;
1647     }
1648   GNUNET_CRYPTO_hash (&e->publicKey,
1649                       sizeof (struct
1650                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1651                       &id.hashPubKey);
1652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1654               GNUNET_a2s(addr, addrlen),
1655               tname,
1656               GNUNET_i2s(&id));
1657
1658   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1659   va->next = e->addresses;
1660   e->addresses = va;
1661   va->transport_name = GNUNET_strdup (tname);
1662   va->addr_len = addrlen;
1663   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1664                                             (unsigned int) -1);
1665   memcpy (&va[1], addr, addrlen);
1666   return GNUNET_OK;
1667 }
1668
1669
1670 /**
1671  * Check if addresses in validated hello "h" overlap with
1672  * those in "chvc->hello" and update "chvc->hello" accordingly,
1673  * removing those addresses that have already been validated.
1674  */
1675 static void
1676 check_hello_validated (void *cls,
1677                        const struct GNUNET_PeerIdentity *peer,
1678                        const struct GNUNET_HELLO_Message *h, 
1679                        uint32_t trust)
1680 {
1681   struct CheckHelloValidatedContext *chvc = cls;
1682   struct ValidationAddress *va;
1683   struct TransportPlugin *tp;
1684   int first_call;
1685   struct GNUNET_PeerIdentity apeer;
1686
1687   first_call = GNUNET_NO;
1688   if (chvc->e == NULL)
1689     {
1690       first_call = GNUNET_YES;
1691       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1692       GNUNET_assert (GNUNET_OK ==
1693                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1694                                            &chvc->e->publicKey));
1695       chvc->e->timeout =
1696         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1697       chvc->e->next = pending_validations;
1698       pending_validations = chvc->e;
1699     }
1700   if (h != NULL)
1701     {
1702       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1703                                           h,
1704                                           GNUNET_TIME_absolute_get (),
1705                                           &run_validation, chvc->e);
1706     }
1707   else if (GNUNET_YES == first_call)
1708     {
1709       /* no existing HELLO, all addresses are new */
1710       GNUNET_HELLO_iterate_addresses (chvc->hello,
1711                                       GNUNET_NO, &run_validation, chvc->e);
1712     }
1713   if (h != NULL)
1714     return;                     /* wait for next call */
1715   /* finally, transmit validation attempts */
1716   GNUNET_assert (GNUNET_OK ==
1717                  GNUNET_HELLO_get_id (chvc->hello,
1718                                       &apeer));
1719   va = chvc->e->addresses;
1720   while (va != NULL)
1721     {
1722 #if DEBUG_TRANSPORT
1723       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1724                   "Establishing `%s' connection to validate `%s' of `%4s'\n",
1725                   va->transport_name,
1726                   "HELLO",
1727                   GNUNET_i2s (&apeer));
1728 #endif
1729       tp = find_transport (va->transport_name);
1730       GNUNET_assert (tp != NULL);
1731       if (GNUNET_OK !=
1732           tp->api->validate (tp->api->cls,
1733                              &apeer,
1734                              va->challenge,
1735                              HELLO_VERIFICATION_TIMEOUT,
1736                              &va[1],
1737                              va->addr_len))
1738         va->ok = GNUNET_SYSERR;
1739       va = va->next;
1740     }
1741   if (chvc->e->next == NULL)
1742     GNUNET_SCHEDULER_add_delayed (sched,
1743                                   GNUNET_NO,
1744                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1745                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1746                                   GNUNET_TIME_absolute_get_remaining
1747                                   (chvc->e->timeout), &cleanup_validation,
1748                                   NULL);
1749   GNUNET_free (chvc);
1750 }
1751
1752
1753 /**
1754  * Process HELLO-message.
1755  *
1756  * @param plugin transport involved, may be NULL
1757  * @param message the actual message
1758  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1759  */
1760 static int
1761 process_hello (struct TransportPlugin *plugin,
1762                const struct GNUNET_MessageHeader *message)
1763 {
1764   struct ValidationList *e;
1765   uint16_t hsize;
1766   struct GNUNET_PeerIdentity target;
1767   const struct GNUNET_HELLO_Message *hello;
1768   struct CheckHelloValidatedContext *chvc;
1769   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1770
1771   hsize = ntohs (message->size);
1772   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1773       (hsize < sizeof (struct GNUNET_MessageHeader)))
1774     {
1775       GNUNET_break (0);
1776       return GNUNET_SYSERR;
1777     }
1778   /* first, check if load is too high */
1779   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1780     {
1781       /* TODO: call to stats? */
1782       return GNUNET_OK;
1783     }
1784   hello = (const struct GNUNET_HELLO_Message *) message;
1785   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1786     {
1787       GNUNET_break_op (0);
1788       return GNUNET_SYSERR;
1789     }
1790   GNUNET_CRYPTO_hash (&publicKey,
1791                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1792                       &target.hashPubKey);
1793 #if DEBUG_TRANSPORT
1794   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1795               "Processing `%s' message for `%4s'\n",
1796               "HELLO", GNUNET_i2s (&target));
1797 #endif
1798   /* check if a HELLO for this peer is already on the validation list */
1799   e = pending_validations;
1800   while (e != NULL)
1801     {
1802       if (0 == memcmp (&e->publicKey,
1803                        &publicKey,
1804                        sizeof (struct
1805                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1806         {
1807           /* TODO: call to stats? */
1808 #if DEBUG_TRANSPORT
1809           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1811                       "HELLO", GNUNET_i2s (&target));
1812 #endif    
1813           return GNUNET_OK;
1814         }
1815       e = e->next;
1816     }
1817   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1818   chvc->plugin = plugin;
1819   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1820   memcpy (chvc->hello, hello, hsize);
1821   /* finally, check if HELLO was previously validated
1822      (continuation will then schedule actual validation) */
1823   GNUNET_PEERINFO_for_all (cfg,
1824                            sched,
1825                            &target,
1826                            0,
1827                            HELLO_VERIFICATION_TIMEOUT,
1828                            &check_hello_validated, chvc);
1829   return GNUNET_OK;
1830 }
1831
1832
1833 /**
1834  * The peer specified by the given neighbour has timed-out.  Update
1835  * our state and do the necessary notifications.  Also notifies
1836  * our clients that the neighbour is now officially gone.
1837  *
1838  * @param n the neighbour list entry for the peer
1839  */
1840 static void
1841 disconnect_neighbour (struct NeighbourList *n)
1842 {
1843   struct ReadyList *rpos;
1844   struct NeighbourList *npos;
1845   struct NeighbourList *nprev;
1846   struct MessageQueue *mq;
1847
1848 #if DEBUG_TRANSPORT
1849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1850               "Disconnecting from `%4s'\n",
1851               GNUNET_i2s(&n->id));
1852 #endif
1853   /* remove n from neighbours list */
1854   nprev = NULL;
1855   npos = neighbours;
1856   while ((npos != NULL) && (npos != n))
1857     {
1858       nprev = npos;
1859       npos = npos->next;
1860     }
1861   GNUNET_assert (npos != NULL);
1862   if (nprev == NULL)
1863     neighbours = n->next;
1864   else
1865     nprev->next = n->next;
1866
1867   /* notify all clients about disconnect */
1868   notify_clients_disconnect (&n->id);
1869
1870   /* clean up all plugins, cancel connections & pending transmissions */
1871   while (NULL != (rpos = n->plugins))
1872     {
1873       n->plugins = rpos->next;
1874       GNUNET_assert (rpos->neighbour == n);
1875       rpos->plugin->api->cancel (rpos->plugin->api->cls,
1876                                  rpos->plugin_handle, rpos, &n->id);
1877       GNUNET_free (rpos);
1878     }
1879
1880   /* free all messages on the queue */
1881   while (NULL != (mq = n->messages))
1882     {
1883       n->messages = mq->next;
1884       GNUNET_assert (mq->neighbour == n);
1885       GNUNET_free (mq);
1886     }
1887
1888   /* finally, free n itself */
1889   GNUNET_free (n);
1890 }
1891
1892
1893 /**
1894  * Add an entry for each of our transport plugins
1895  * (that are able to send) to the list of plugins
1896  * for this neighbour.
1897  *
1898  * @param neighbour to initialize
1899  */
1900 static void
1901 add_plugins (struct NeighbourList *neighbour)
1902 {
1903   struct TransportPlugin *tp;
1904   struct ReadyList *rl;
1905
1906   neighbour->retry_plugins_time
1907     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1908   tp = plugins;
1909   while (tp != NULL)
1910     {
1911       if (tp->api->send != NULL)
1912         {
1913           rl = GNUNET_malloc (sizeof (struct ReadyList));
1914           rl->next = neighbour->plugins;
1915           neighbour->plugins = rl;
1916           rl->plugin = tp;
1917           rl->neighbour = neighbour;
1918           rl->transmit_ready = GNUNET_YES;
1919         }
1920       tp = tp->next;
1921     }
1922 }
1923
1924
1925 static void
1926 neighbour_timeout_task (void *cls,
1927                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1928 {
1929   struct NeighbourList *n = cls;
1930
1931 #if DEBUG_TRANSPORT
1932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1933               "Neighbour `%4s' has timed out!\n",
1934               GNUNET_i2s(&n->id));
1935 #endif
1936   n->timeout_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1937   disconnect_neighbour (n);
1938 }
1939
1940
1941 /**
1942  * Create a fresh entry in our neighbour list for the given peer.
1943  * Will try to transmit our current HELLO to the new neighbour.  Also
1944  * notifies our clients about the new "connection".
1945  *
1946  * @param peer the peer for which we create the entry
1947  * @return the new neighbour list entry
1948  */
1949 static struct NeighbourList *
1950 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
1951 {
1952   struct NeighbourList *n;
1953
1954 #if DEBUG_TRANSPORT
1955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1956               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
1957               GNUNET_i2s (peer));
1958 #endif
1959   GNUNET_assert (our_hello != NULL);
1960   n = GNUNET_malloc (sizeof (struct NeighbourList));
1961   n->next = neighbours;
1962   neighbours = n;
1963   n->id = *peer;
1964   n->last_quota_update = GNUNET_TIME_absolute_get ();
1965   n->peer_timeout =
1966     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1967   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
1968   add_plugins (n);
1969   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
1970                                                   GNUNET_NO,
1971                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1972                                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1973                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1974                                                   &neighbour_timeout_task, n);
1975   transmit_to_peer (NULL, 0,
1976                     (const struct GNUNET_MessageHeader *) our_hello,
1977                     GNUNET_YES, n);
1978   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
1979   return n;
1980 }
1981
1982
1983 /**
1984  * Function called by the plugin for each received message.
1985  * Update data volumes, possibly notify plugins about
1986  * reducing the rate at which they read from the socket
1987  * and generally forward to our receive callback.
1988  *
1989  * @param plugin_context value to pass to this plugin
1990  *        to respond to the given peer (use is optional,
1991  *        but may speed up processing)
1992  * @param service_context value passed to the transport-service
1993  *        to identify the neighbour; will be NULL on the first
1994  *        call for a given peer
1995  * @param latency estimated latency for communicating with the
1996  *             given peer
1997  * @param peer (claimed) identity of the other peer
1998  * @param message the message, NULL if peer was disconnected
1999  * @return the new service_context that the plugin should use
2000  *         for future receive calls for messages from this
2001  *         particular peer
2002  */
2003 static struct ReadyList *
2004 plugin_env_receive (void *cls,
2005                     void *plugin_context,
2006                     struct ReadyList *service_context,
2007                     struct GNUNET_TIME_Relative latency,
2008                     const struct GNUNET_PeerIdentity *peer,
2009                     const struct GNUNET_MessageHeader *message)
2010 {
2011   const struct GNUNET_MessageHeader ack = {
2012     htons (sizeof (struct GNUNET_MessageHeader)),
2013     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
2014   };
2015   struct TransportPlugin *plugin = cls;
2016   struct TransportClient *cpos;
2017   struct InboundMessage *im;
2018   uint16_t msize;
2019   struct NeighbourList *n;
2020
2021   if (service_context != NULL)
2022     {
2023       n = service_context->neighbour;
2024       GNUNET_assert (n != NULL);
2025     }
2026   else
2027     {
2028       n = find_neighbour (peer);
2029       if (n == NULL)
2030         {
2031           if (message == NULL)
2032             return NULL;        /* disconnect of peer already marked down */
2033           n = setup_new_neighbour (peer);
2034         }
2035       service_context = n->plugins;
2036       while ((service_context != NULL) && (plugin != service_context->plugin))
2037         service_context = service_context->next;
2038       GNUNET_assert ((plugin->api->send == NULL) ||
2039                      (service_context != NULL));
2040     }
2041   if (message == NULL)
2042     {
2043       if ((service_context != NULL) &&
2044           (service_context->plugin_handle == plugin_context))
2045         {
2046           service_context->connected = GNUNET_NO;
2047           service_context->plugin_handle = NULL;
2048         }
2049       /* TODO: call stats */
2050       disconnect_neighbour (n);
2051       return NULL;
2052     }
2053 #if DEBUG_TRANSPORT
2054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2055               "Processing message of type `%u' received by plugin...\n",
2056               ntohs (message->type));
2057 #endif
2058   if (service_context != NULL)
2059     {
2060       if (service_context->connected == GNUNET_NO)
2061         {
2062           service_context->connected = GNUNET_YES;
2063           service_context->transmit_ready = GNUNET_YES;
2064           service_context->connect_attempts++;
2065         }
2066       service_context->timeout
2067         = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2068       service_context->plugin_handle = plugin_context;
2069       service_context->latency = latency;
2070     }
2071   /* update traffic received amount ... */
2072   msize = ntohs (message->size);
2073   n->last_received += msize;
2074   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2075   n->peer_timeout =
2076     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2077   n->timeout_task =
2078     GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO,
2079                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
2080                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2081                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2082                                   &neighbour_timeout_task, n);
2083   update_quota (n);
2084   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2085     {
2086       /* dropping message due to frequent inbound volume violations! */
2087       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2088                   GNUNET_ERROR_TYPE_BULK,
2089                   _
2090                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2091       /* TODO: call stats */
2092       GNUNET_assert (NULL != service_context->neighbour);
2093       return service_context;
2094     }
2095   switch (ntohs (message->type))
2096     {
2097     case GNUNET_MESSAGE_TYPE_HELLO:
2098 #if DEBUG_TRANSPORT
2099       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2101                   GNUNET_i2s(peer));
2102 #endif
2103       process_hello (plugin, message);
2104 #if DEBUG_TRANSPORT
2105       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2106                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2107                   GNUNET_i2s(peer));
2108 #endif
2109       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2110       break;
2111     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2112       n->saw_ack = GNUNET_YES;
2113       /* intentional fall-through! */
2114     default:
2115 #if DEBUG_TRANSPORT
2116       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117                   "Received message of type %u from `%4s', sending to all clients.\n",
2118                   ntohs (message->type),
2119                   GNUNET_i2s(peer));
2120 #endif
2121       /* transmit message to all clients */
2122       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2123       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2124       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2125       im->latency = GNUNET_TIME_relative_hton (latency);
2126       im->peer = *peer;
2127       memcpy (&im[1], message, msize);
2128
2129       cpos = clients;
2130       while (cpos != NULL)
2131         {
2132           transmit_to_client (cpos, &im->header, GNUNET_YES);
2133           cpos = cpos->next;
2134         }
2135       GNUNET_free (im);
2136     }
2137   GNUNET_assert (NULL != service_context->neighbour);
2138   return service_context;
2139 }
2140
2141
2142 /**
2143  * Handle START-message.  This is the first message sent to us
2144  * by any client which causes us to add it to our list.
2145  *
2146  * @param cls closure (always NULL)
2147  * @param client identification of the client
2148  * @param message the actual message
2149  */
2150 static void
2151 handle_start (void *cls,
2152               struct GNUNET_SERVER_Client *client,
2153               const struct GNUNET_MessageHeader *message)
2154 {
2155   struct TransportClient *c;
2156   struct ConnectInfoMessage cim;
2157   struct NeighbourList *n;
2158   struct InboundMessage *im;
2159   struct GNUNET_MessageHeader *ack;
2160
2161 #if DEBUG_TRANSPORT
2162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2163               "Received `%s' request from client\n", "START");
2164 #endif
2165   c = clients;
2166   while (c != NULL)
2167     {
2168       if (c->client == client)
2169         {
2170           /* client already on our list! */
2171           GNUNET_break (0);
2172           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2173           return;
2174         }
2175       c = c->next;
2176     }
2177   c = GNUNET_malloc (sizeof (struct TransportClient));
2178   c->next = clients;
2179   clients = c;
2180   c->client = client;
2181   if (our_hello != NULL)
2182     {
2183 #if DEBUG_TRANSPORT
2184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2185                   "Sending our own `%s' to new client\n",
2186                   "HELLO");
2187 #endif
2188       transmit_to_client (c,
2189                           (const struct GNUNET_MessageHeader *) our_hello,
2190                           GNUNET_NO);
2191       /* tell new client about all existing connections */
2192       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2193       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2194       cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2195       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2196       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2197                           sizeof (struct GNUNET_MessageHeader));
2198       im->header.size = htons (sizeof (struct InboundMessage) +
2199                                sizeof (struct GNUNET_MessageHeader));
2200       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2201       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2202       ack = (struct GNUNET_MessageHeader *) &im[1];
2203       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2204       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2205       for (n = neighbours; n != NULL; n = n->next)
2206         {
2207           cim.id = n->id;
2208           transmit_to_client (c, &cim.header, GNUNET_NO);
2209           if (n->saw_ack)
2210             {
2211               im->peer = n->id;
2212               transmit_to_client (c, &im->header, GNUNET_NO);
2213             }
2214         }
2215       GNUNET_free (im);
2216     }
2217   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2218 }
2219
2220
2221 /**
2222  * Handle HELLO-message.
2223  *
2224  * @param cls closure (always NULL)
2225  * @param client identification of the client
2226  * @param message the actual message
2227  */
2228 static void
2229 handle_hello (void *cls,
2230               struct GNUNET_SERVER_Client *client,
2231               const struct GNUNET_MessageHeader *message)
2232 {
2233   int ret;
2234
2235 #if DEBUG_TRANSPORT
2236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237               "Received `%s' request from client\n", "HELLO");
2238 #endif
2239   ret = process_hello (NULL, message);
2240   GNUNET_SERVER_receive_done (client, ret);
2241 }
2242
2243
2244 /**
2245  * Handle SEND-message.
2246  *
2247  * @param cls closure (always NULL)
2248  * @param client identification of the client
2249  * @param message the actual message
2250  */
2251 static void
2252 handle_send (void *cls,
2253              struct GNUNET_SERVER_Client *client,
2254              const struct GNUNET_MessageHeader *message)
2255 {
2256   struct TransportClient *tc;
2257   struct NeighbourList *n;
2258   const struct OutboundMessage *obm;
2259   const struct GNUNET_MessageHeader *obmm;
2260   uint16_t size;
2261   uint16_t msize;
2262
2263   size = ntohs (message->size);
2264   if (size <
2265       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2266     {
2267       GNUNET_break (0);
2268       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2269       return;
2270     }
2271   obm = (const struct OutboundMessage *) message;
2272 #if DEBUG_TRANSPORT
2273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2274               "Received `%s' request from client with target `%4s'\n",
2275               "SEND", GNUNET_i2s (&obm->peer));
2276 #endif
2277   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2278   msize = ntohs (obmm->size);
2279   if (size != msize + sizeof (struct OutboundMessage))
2280     {
2281       GNUNET_break (0);
2282       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2283       return;
2284     }
2285   n = find_neighbour (&obm->peer);
2286   if (n == NULL)
2287     n = setup_new_neighbour (&obm->peer);
2288   tc = clients;
2289   while ((tc != NULL) && (tc->client != client))
2290     tc = tc->next;
2291
2292 #if DEBUG_TRANSPORT
2293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2294               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2295               ntohs (obmm->size),
2296               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2297 #endif
2298   transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n);
2299   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2300 }
2301
2302
2303 /**
2304  * Handle SET_QUOTA-message.
2305  *
2306  * @param cls closure (always NULL)
2307  * @param client identification of the client
2308  * @param message the actual message
2309  */
2310 static void
2311 handle_set_quota (void *cls,
2312                   struct GNUNET_SERVER_Client *client,
2313                   const struct GNUNET_MessageHeader *message)
2314 {
2315   const struct QuotaSetMessage *qsm =
2316     (const struct QuotaSetMessage *) message;
2317   struct NeighbourList *n;
2318   struct TransportPlugin *p;
2319   struct ReadyList *rl;
2320
2321 #if DEBUG_TRANSPORT
2322   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2323               "Received `%s' request from client for peer `%4s'\n",
2324               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2325 #endif
2326   n = find_neighbour (&qsm->peer);
2327   if (n == NULL)
2328     {
2329       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2330       return;
2331     }
2332   update_quota (n);
2333   if (n->quota_in < ntohl (qsm->quota_in))
2334     n->last_quota_update = GNUNET_TIME_absolute_get ();
2335   n->quota_in = ntohl (qsm->quota_in);
2336   rl = n->plugins;
2337   while (rl != NULL)
2338     {
2339       p = rl->plugin;
2340       p->api->set_receive_quota (p->api->cls,
2341                                  &qsm->peer, ntohl (qsm->quota_in));
2342       rl = rl->next;
2343     }
2344   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2345 }
2346
2347
2348 /**
2349  * Handle TRY_CONNECT-message.
2350  *
2351  * @param cls closure (always NULL)
2352  * @param client identification of the client
2353  * @param message the actual message
2354  */
2355 static void
2356 handle_try_connect (void *cls,
2357                     struct GNUNET_SERVER_Client *client,
2358                     const struct GNUNET_MessageHeader *message)
2359 {
2360   const struct TryConnectMessage *tcm;
2361
2362   tcm = (const struct TryConnectMessage *) message;
2363 #if DEBUG_TRANSPORT
2364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2365               "Received `%s' request from client asking to connect to `%4s'\n",
2366               "TRY_CONNECT", GNUNET_i2s (&tcm->peer));
2367 #endif
2368   if (NULL == find_neighbour (&tcm->peer))
2369     setup_new_neighbour (&tcm->peer);
2370 #if DEBUG_TRANSPORT
2371   else
2372     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2373                 "Client asked to connect to `%4s', but connection already exists\n",
2374                 "TRY_CONNECT", 
2375                 GNUNET_i2s (&tcm->peer));
2376 #endif    
2377   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2378 }
2379
2380
2381 /**
2382  * List of handlers for the messages understood by this
2383  * service.
2384  */
2385 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2386   {&handle_start, NULL,
2387    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2388   {&handle_hello, NULL,
2389    GNUNET_MESSAGE_TYPE_HELLO, 0},
2390   {&handle_send, NULL,
2391    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2392   {&handle_set_quota, NULL,
2393    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2394   {&handle_try_connect, NULL,
2395    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2396    sizeof (struct TryConnectMessage)},
2397   {NULL, NULL, 0, 0}
2398 };
2399
2400
2401 /**
2402  * Setup the environment for this plugin.
2403  */
2404 static void
2405 create_environment (struct TransportPlugin *plug)
2406 {
2407   plug->env.cfg = cfg;
2408   plug->env.sched = sched;
2409   plug->env.my_public_key = &my_public_key;
2410   plug->env.my_private_key = my_private_key;
2411   plug->env.my_identity = &my_identity;
2412   plug->env.cls = plug;
2413   plug->env.receive = &plugin_env_receive;
2414   plug->env.lookup = &plugin_env_lookup_address;
2415   plug->env.notify_address = &plugin_env_notify_address;
2416   plug->env.notify_validation = &plugin_env_notify_validation;
2417   plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2418   plug->env.max_connections = max_connect_per_transport;
2419 }
2420
2421
2422 /**
2423  * Start the specified transport (load the plugin).
2424  */
2425 static void
2426 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2427 {
2428   struct TransportPlugin *plug;
2429   char *libname;
2430
2431   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2432               _("Loading `%s' transport plugin\n"), name);
2433   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2434   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2435   create_environment (plug);
2436   plug->short_name = GNUNET_strdup (name);
2437   plug->lib_name = libname;
2438   plug->next = plugins;
2439   plugins = plug;
2440   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2441   if (plug->api == NULL)
2442     {
2443       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2444                   _("Failed to load transport plugin for `%s'\n"), name);
2445       GNUNET_free (plug->short_name);
2446       plugins = plug->next;
2447       GNUNET_free (libname);
2448       GNUNET_free (plug);
2449     }
2450 }
2451
2452
2453 /**
2454  * Called whenever a client is disconnected.  Frees our
2455  * resources associated with that client.
2456  *
2457  * @param cls closure
2458  * @param client identification of the client
2459  */
2460 static void
2461 client_disconnect_notification (void *cls,
2462                                 struct GNUNET_SERVER_Client *client)
2463 {
2464   struct TransportClient *pos;
2465   struct TransportClient *prev;
2466   struct ClientMessageQueueEntry *mqe;
2467
2468 #if DEBUG_TRANSPORT
2469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2470               "Client disconnected, cleaning up.\n");
2471 #endif
2472   prev = NULL;
2473   pos = clients;
2474   while ((pos != NULL) && (pos->client != client))
2475     {
2476       prev = pos;
2477       pos = pos->next;
2478     }
2479   if (pos == NULL)
2480     return;
2481   while (NULL != (mqe = pos->message_queue_head))
2482     {
2483       pos->message_queue_head = mqe->next;
2484       GNUNET_free (mqe);
2485     }
2486   pos->message_queue_head = NULL;
2487   if (prev == NULL)
2488     clients = pos->next;
2489   else
2490     prev->next = pos->next;
2491   if (GNUNET_YES == pos->tcs_pending)
2492     {
2493       pos->client = NULL;
2494       return;
2495     }
2496   GNUNET_free (pos);
2497 }
2498
2499
2500 /**
2501  * Initiate transport service.
2502  *
2503  * @param cls closure
2504  * @param s scheduler to use
2505  * @param serv the initialized server
2506  * @param c configuration to use
2507  */
2508 static void
2509 run (void *cls,
2510      struct GNUNET_SCHEDULER_Handle *s,
2511      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
2512 {
2513   char *plugs;
2514   char *pos;
2515   int no_transports;
2516   unsigned long long tneigh;
2517   char *keyfile;
2518
2519   sched = s;
2520   cfg = c;
2521   /* parse configuration */
2522   if ((GNUNET_OK !=
2523        GNUNET_CONFIGURATION_get_value_number (c,
2524                                               "TRANSPORT",
2525                                               "NEIGHBOUR_LIMIT",
2526                                               &tneigh)) ||
2527       (GNUNET_OK !=
2528        GNUNET_CONFIGURATION_get_value_filename (c,
2529                                                 "GNUNETD",
2530                                                 "HOSTKEY", &keyfile)))
2531     {
2532       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2533                   _
2534                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2535       GNUNET_SCHEDULER_shutdown (s);
2536       return;
2537     }
2538   max_connect_per_transport = (uint32_t) tneigh;
2539   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2540   GNUNET_free (keyfile);
2541   if (my_private_key == NULL)
2542     {
2543       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2544                   _
2545                   ("Transport service could not access hostkey.  Exiting.\n"));
2546       GNUNET_SCHEDULER_shutdown (s);
2547       return;
2548     }
2549   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2550   GNUNET_CRYPTO_hash (&my_public_key,
2551                       sizeof (my_public_key), &my_identity.hashPubKey);
2552   /* setup notification */
2553   server = serv;
2554   GNUNET_SERVER_disconnect_notify (server,
2555                                    &client_disconnect_notification, NULL);
2556   /* load plugins... */
2557   no_transports = 1;
2558   if (GNUNET_OK ==
2559       GNUNET_CONFIGURATION_get_value_string (c,
2560                                              "TRANSPORT", "PLUGINS", &plugs))
2561     {
2562       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2563                   _("Starting transport plugins `%s'\n"), plugs);
2564       pos = strtok (plugs, " ");
2565       while (pos != NULL)
2566         {
2567           start_transport (server, pos);
2568           no_transports = 0;
2569           pos = strtok (NULL, " ");
2570         }
2571       GNUNET_free (plugs);
2572     }
2573   if (no_transports)
2574     refresh_hello ();
2575 #if DEBUG_TRANSPORT
2576   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2577               _("Transport service ready.\n"));
2578 #endif
2579   /* process client requests */
2580   GNUNET_SERVER_add_handlers (server, handlers);
2581 }
2582
2583
2584 /**
2585  * Function called when the service shuts
2586  * down.  Unloads our plugins.
2587  *
2588  * @param cls closure
2589  * @param cfg configuration to use
2590  */
2591 static void
2592 unload_plugins (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
2593 {
2594   struct TransportPlugin *plug;
2595   struct AddressList *al;
2596
2597 #if DEBUG_TRANSPORT
2598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2599               "Transport service is unloading plugins...\n");
2600 #endif
2601   while (NULL != (plug = plugins))
2602     {
2603       plugins = plug->next;
2604       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2605       GNUNET_free (plug->lib_name);
2606       GNUNET_free (plug->short_name);
2607       while (NULL != (al = plug->addresses))
2608         {
2609           plug->addresses = al->next;
2610           GNUNET_free (al);
2611         }
2612       GNUNET_free (plug);
2613     }
2614   if (my_private_key != NULL)
2615     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2616 }
2617
2618
2619 /**
2620  * The main function for the transport service.
2621  *
2622  * @param argc number of arguments from the command line
2623  * @param argv command line arguments
2624  * @return 0 ok, 1 on error
2625  */
2626 int
2627 main (int argc, char *const *argv)
2628 {
2629   return (GNUNET_OK ==
2630           GNUNET_SERVICE_run (argc,
2631                               argv,
2632                               "transport",
2633                               &run, NULL, &unload_plugins, NULL)) ? 0 : 1;
2634 }
2635
2636 /* end of gnunet-service-transport.c */