d97cb15eb39ed70632868a11eca96c76cf921881
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport.c
23  * @brief low-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - if we do not receive an ACK in response to our
28  *   HELLO, retransmit HELLO!
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_getopt_lib.h"
34 #include "gnunet_hello_lib.h"
35 #include "gnunet_os_lib.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_plugin_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_service_lib.h"
40 #include "gnunet_signatures.h"
41 #include "plugin_transport.h"
42 #include "transport.h"
43
44 /**
45  * How many messages can we have pending for a given client process
46  * before we start to drop incoming messages?  We typically should
47  * have only one client and so this would be the primary buffer for
48  * messages, so the number should be chosen rather generously.
49  *
50  * The expectation here is that most of the time the queue is large
51  * enough so that a drop is virtually never required.
52  */
53 #define MAX_PENDING 128
54
55 /**
56  * How often should we try to reconnect to a peer using a particular
57  * transport plugin before giving up?  Note that the plugin may be
58  * added back to the list after PLUGIN_RETRY_FREQUENCY expires.
59  */
60 #define MAX_CONNECT_RETRY 3
61
62 /**
63  * How often must a peer violate bandwidth quotas before we start
64  * to simply drop its messages?
65  */
66 #define QUOTA_VIOLATION_DROP_THRESHOLD 100
67
68 /**
69  * How long until a HELLO verification attempt should time out?
70  */
71 #define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_UNIT_MINUTES
72
73 /**
74  * How often do we re-add (cheaper) plugins to our list of plugins
75  * to try for a given connected peer?
76  */
77 #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
78
79 /**
80  * After how long do we expire an address in a HELLO
81  * that we just validated?  This value is also used
82  * for our own addresses when we create a HELLO.
83  */
84 #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
85
86
87 /**
88  * Entry in linked list of network addresses.
89  */
90 struct AddressList
91 {
92   /**
93    * This is a linked list.
94    */
95   struct AddressList *next;
96
97   /**
98    * The address, actually a pointer to the end
99    * of this struct.  Do not free!
100    */
101   void *addr;
102
103   /**
104    * How long until we auto-expire this address (unless it is
105    * re-confirmed by the transport)?
106    */
107   struct GNUNET_TIME_Absolute expires;
108
109   /**
110    * Length of addr.
111    */
112   size_t addrlen;
113
114 };
115
116
117 /**
118  * Entry in linked list of all of our plugins.
119  */
120 struct TransportPlugin
121 {
122
123   /**
124    * This is a linked list.
125    */
126   struct TransportPlugin *next;
127
128   /**
129    * API of the transport as returned by the plugin's
130    * initialization function.
131    */
132   struct GNUNET_TRANSPORT_PluginFunctions *api;
133
134   /**
135    * Short name for the plugin (i.e. "tcp").
136    */
137   char *short_name;
138
139   /**
140    * Name of the library (i.e. "gnunet_plugin_transport_tcp").
141    */
142   char *lib_name;
143
144   /**
145    * List of our known addresses for this transport.
146    */
147   struct AddressList *addresses;
148
149   /**
150    * Environment this transport service is using
151    * for this plugin.
152    */
153   struct GNUNET_TRANSPORT_PluginEnvironment env;
154
155   /**
156    * ID of task that is used to clean up expired addresses.
157    */
158   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
159
160
161   /**
162    * Set to GNUNET_YES if we need to scrap the existing
163    * list of "addresses" and start fresh when we receive
164    * the next address update from a transport.  Set to
165    * GNUNET_NO if we should just add the new address
166    * to the list and wait for the commit call.
167    */
168   int rebuild;
169 };
170
171 struct NeighbourList;
172
173 /**
174  * For each neighbour we keep a list of messages
175  * that we still want to transmit to the neighbour.
176  */
177 struct MessageQueue
178 {
179
180   /**
181    * This is a linked list.
182    */
183   struct MessageQueue *next;
184
185   /**
186    * The message we want to transmit.
187    */
188   struct GNUNET_MessageHeader *message;
189
190   /**
191    * Client responsible for queueing the message;
192    * used to check that a client has not two messages
193    * pending for the same target.  Can be NULL.
194    */
195   struct TransportClient *client;
196
197   /**
198    * Neighbour this entry belongs to.
199    */
200   struct NeighbourList *neighbour;
201
202   /**
203    * Plugin that we used for the transmission.
204    * NULL until we scheduled a transmission.
205    */
206   struct TransportPlugin *plugin;
207
208   /**
209    * Internal message of the transport system that should not be
210    * included in the usual SEND-SEND_OK transmission confirmation
211    * traffic management scheme.  Typically, "internal_msg" will
212    * be set whenever "client" is NULL (but it is not strictly
213    * required).
214    */
215   int internal_msg;
216
217   /**
218    * How important is the message?
219    */
220   unsigned int priority;
221   
222 };
223
224
225 /**
226  * For a given Neighbour, which plugins are available
227  * to talk to this peer and what are their costs?
228  */
229 struct ReadyList
230 {
231
232   /**
233    * This is a linked list.
234    */
235   struct ReadyList *next;
236
237   /**
238    * Which of our transport plugins does this entry
239    * represent?
240    */
241   struct TransportPlugin *plugin;
242
243   /**
244    * Neighbour this entry belongs to.
245    */
246   struct NeighbourList *neighbour;
247
248   /**
249    * Opaque handle (specific to the plugin) for the
250    * connection to our target; can be NULL.
251    */
252   void *plugin_handle;
253
254   /**
255    * What was the last latency observed for this plugin
256    * and peer?  Invalid if connected is GNUNET_NO.
257    */
258   struct GNUNET_TIME_Relative latency;
259
260   /**
261    * If we did not successfully transmit a message to the
262    * given peer via this connection during the specified
263    * time, we should consider the connection to be dead.
264    * This is used in the case that a TCP transport simply
265    * stalls writing to the stream but does not formerly
266    * get a signal that the other peer died.
267    */
268   struct GNUNET_TIME_Absolute timeout;
269
270   /**
271    * Is this plugin currently connected?  The first time
272    * we transmit or send data to a peer via a particular
273    * plugin, we set this to GNUNET_YES.  If we later get
274    * an error (disconnect notification or transmission
275    * failure), we set it back to GNUNET_NO.  Each time the
276    * value is set to GNUNET_YES, we increment the
277    * "connect_attempts" counter.  If that one reaches a
278    * particular threshold, we consider the plugin to not
279    * be working properly at this time for the given peer
280    * and remove it from the eligible list.
281    */
282   int connected;
283
284   /**
285    * How often have we tried to connect using this plugin?
286    */
287   unsigned int connect_attempts;
288
289   /**
290    * Is this plugin ready to transmit to the specific
291    * target?  GNUNET_NO if not.  Initially, all plugins
292    * are marked ready.  If a transmission is in progress,
293    * "transmit_ready" is set to GNUNET_NO.
294    */
295   int transmit_ready;
296
297 };
298
299
300 /**
301  * Entry in linked list of all of our current neighbours.
302  */
303 struct NeighbourList
304 {
305
306   /**
307    * This is a linked list.
308    */
309   struct NeighbourList *next;
310
311   /**
312    * Which of our transports is connected to this peer
313    * and what is their status?
314    */
315   struct ReadyList *plugins;
316
317   /**
318    * List of messages we would like to send to this peer;
319    * must contain at most one message per client.
320    */
321   struct MessageQueue *messages;
322
323   /**
324    * Identity of this neighbour.
325    */
326   struct GNUNET_PeerIdentity id;
327
328   /**
329    * ID of task scheduled to run when this peer is about to
330    * time out (will free resources associated with the peer).
331    */
332   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
333
334   /**
335    * How long until we should consider this peer dead
336    * (if we don't receive another message in the
337    * meantime)?
338    */
339   struct GNUNET_TIME_Absolute peer_timeout;
340
341   /**
342    * At what time did we reset last_received last?
343    */
344   struct GNUNET_TIME_Absolute last_quota_update;
345
346   /**
347    * At what time should we try to again add plugins to
348    * our ready list?
349    */
350   struct GNUNET_TIME_Absolute retry_plugins_time;
351
352   /**
353    * How many bytes have we received since the "last_quota_update"
354    * timestamp?
355    */
356   uint64_t last_received;
357
358   /**
359    * Global quota for inbound traffic for the neighbour in bytes/ms.
360    */
361   uint32_t quota_in;
362
363   /**
364    * How often has the other peer (recently) violated the
365    * inbound traffic limit?  Incremented by 10 per violation,
366    * decremented by 1 per non-violation (for each
367    * time interval).
368    */
369   unsigned int quota_violation_count;
370
371   /**
372    * Have we seen an ACK from this neighbour in the past?
373    * (used to make up a fake ACK for clients connecting after
374    * the neighbour connected to us).
375    */
376   int saw_ack;
377
378 };
379
380
381 /**
382  * Linked list of messages to be transmitted to
383  * the client.  Each entry is followed by the
384  * actual message.
385  */
386 struct ClientMessageQueueEntry
387 {
388   /**
389    * This is a linked list.
390    */
391   struct ClientMessageQueueEntry *next;
392 };
393
394
395 /**
396  * Client connected to the transport service.
397  */
398 struct TransportClient
399 {
400
401   /**
402    * This is a linked list.
403    */
404   struct TransportClient *next;
405
406   /**
407    * Handle to the client.
408    */
409   struct GNUNET_SERVER_Client *client;
410
411   /**
412    * Linked list of messages yet to be transmitted to
413    * the client.
414    */
415   struct ClientMessageQueueEntry *message_queue_head;
416
417   /**
418    * Tail of linked list of messages yet to be transmitted to the
419    * client.
420    */
421   struct ClientMessageQueueEntry *message_queue_tail;
422
423   /**
424    * Is a call to "transmit_send_continuation" pending?  If so, we
425    * must not free this struct (even if the corresponding client
426    * disconnects) and instead only remove it from the linked list and
427    * set the "client" field to NULL.
428    */
429   int tcs_pending;
430
431   /**
432    * Length of the list of messages pending for this client.
433    */
434   unsigned int message_count;
435
436 };
437
438
439 /**
440  * For each HELLO, we may have to validate multiple addresses;
441  * each address gets its own request entry.
442  */
443 struct ValidationAddress
444 {
445   /**
446    * This is a linked list.
447    */
448   struct ValidationAddress *next;
449
450   /**
451    * Name of the transport.
452    */
453   char *transport_name;
454
455   /**
456    * When should this validated address expire?
457    */
458   struct GNUNET_TIME_Absolute expiration;
459
460   /**
461    * Length of the address we are validating.
462    */
463   size_t addr_len;
464
465   /**
466    * Challenge number we used.
467    */
468   uint32_t challenge;
469
470   /**
471    * Set to GNUNET_YES if the challenge was met,
472    * GNUNET_SYSERR if we know it failed, GNUNET_NO
473    * if we are waiting on a response.
474    */
475   int ok;
476 };
477
478
479 /**
480  * Entry in linked list of all HELLOs awaiting validation.
481  */
482 struct ValidationList
483 {
484
485   /**
486    * This is a linked list.
487    */
488   struct ValidationList *next;
489
490   /**
491    * Linked list with one entry per address from the HELLO
492    * that needs to be validated.
493    */
494   struct ValidationAddress *addresses;
495
496   /**
497    * The public key of the peer.
498    */
499   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
500
501   /**
502    * When does this record time-out? (assuming the
503    * challenge goes unanswered)
504    */
505   struct GNUNET_TIME_Absolute timeout;
506
507 };
508
509
510 /**
511  * HELLOs awaiting validation.
512  */
513 static struct ValidationList *pending_validations;
514
515 /**
516  * Our HELLO message.
517  */
518 static struct GNUNET_HELLO_Message *our_hello;
519
520 /**
521  * "version" of "our_hello".  Used to see if a given
522  * neighbour has already been sent the latest version
523  * of our HELLO message.
524  */
525 static unsigned int our_hello_version;
526
527 /**
528  * Our public key.
529  */
530 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
531
532 /**
533  * Our identity.
534  */
535 static struct GNUNET_PeerIdentity my_identity;
536
537 /**
538  * Our private key.
539  */
540 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
541
542 /**
543  * Our scheduler.
544  */
545 struct GNUNET_SCHEDULER_Handle *sched;
546
547 /**
548  * Our configuration.
549  */
550 const struct GNUNET_CONFIGURATION_Handle *cfg;
551
552 /**
553  * Linked list of all clients to this service.
554  */
555 static struct TransportClient *clients;
556
557 /**
558  * All loaded plugins.
559  */
560 static struct TransportPlugin *plugins;
561
562 /**
563  * Our server.
564  */
565 static struct GNUNET_SERVER_Handle *server;
566
567 /**
568  * All known neighbours and their HELLOs.
569  */
570 static struct NeighbourList *neighbours;
571
572 /**
573  * Number of neighbours we'd like to have.
574  */
575 static uint32_t max_connect_per_transport;
576
577
578 /**
579  * Find an entry in the neighbour list for a particular peer.
580  *
581  * @return NULL if not found.
582  */
583 static struct NeighbourList *
584 find_neighbour (const struct GNUNET_PeerIdentity *key)
585 {
586   struct NeighbourList *head = neighbours;
587   while ((head != NULL) &&
588          (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
589     head = head->next;
590   return head;
591 }
592
593
594 /**
595  * Find an entry in the transport list for a particular transport.
596  *
597  * @return NULL if not found.
598  */
599 static struct TransportPlugin *
600 find_transport (const char *short_name)
601 {
602   struct TransportPlugin *head = plugins;
603   while ((head != NULL) && (0 != strcmp (short_name, head->short_name)))
604     head = head->next;
605   return head;
606 }
607
608
609 /**
610  * Update the quota values for the given neighbour now.
611  */
612 static void
613 update_quota (struct NeighbourList *n)
614 {
615   struct GNUNET_TIME_Relative delta;
616   uint64_t allowed;
617   uint64_t remaining;
618
619   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
620   if (delta.value < MIN_QUOTA_REFRESH_TIME)
621     return;                     /* not enough time passed for doing quota update */
622   allowed = delta.value * n->quota_in;
623   if (n->last_received < allowed)
624     {
625       remaining = allowed - n->last_received;
626       if (n->quota_in > 0)
627         remaining /= n->quota_in;
628       else
629         remaining = 0;
630       if (remaining > MAX_BANDWIDTH_CARRY)
631         remaining = MAX_BANDWIDTH_CARRY;
632       n->last_received = 0;
633       n->last_quota_update = GNUNET_TIME_absolute_get ();
634       n->last_quota_update.value -= remaining;
635       if (n->quota_violation_count > 0)
636         n->quota_violation_count--;
637     }
638   else
639     {
640       n->last_received -= allowed;
641       n->last_quota_update = GNUNET_TIME_absolute_get ();
642       if (n->last_received > allowed)
643         {
644           /* more than twice the allowed rate! */
645           n->quota_violation_count += 10;
646         }
647     }
648 }
649
650
651 /**
652  * Function called to notify a client about the socket
653  * being ready to queue more data.  "buf" will be
654  * NULL and "size" zero if the socket was closed for
655  * writing in the meantime.
656  *
657  * @param cls closure
658  * @param size number of bytes available in buf
659  * @param buf where the callee should write the message
660  * @return number of bytes written to buf
661  */
662 static size_t
663 transmit_to_client_callback (void *cls, size_t size, void *buf)
664 {
665   struct TransportClient *client = cls;
666   struct ClientMessageQueueEntry *q;
667   uint16_t msize;
668   size_t tsize;
669   const struct GNUNET_MessageHeader *msg;
670   struct GNUNET_NETWORK_TransmitHandle *th;
671   char *cbuf;
672
673   if (buf == NULL)
674     {
675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676                   "Transmission to client failed, closing connection.\n");
677       /* fatal error with client, free message queue! */
678       while (NULL != (q = client->message_queue_head))
679         {
680           client->message_queue_head = q->next;
681           GNUNET_free (q);
682         }
683       client->message_queue_tail = NULL;
684       client->message_count = 0;
685       return 0;
686     }
687   cbuf = buf;
688   tsize = 0;
689   while (NULL != (q = client->message_queue_head))
690     {
691       msg = (const struct GNUNET_MessageHeader *) &q[1];
692       msize = ntohs (msg->size);
693       if (msize + tsize > size)
694         break;
695       client->message_queue_head = q->next;
696       if (q->next == NULL)
697         client->message_queue_tail = NULL;
698       memcpy (&cbuf[tsize], msg, msize);
699       tsize += msize;
700       GNUNET_free (q);
701       client->message_count--;
702     }
703   GNUNET_assert (tsize > 0);
704   if (NULL != q)
705     {
706       th = GNUNET_SERVER_notify_transmit_ready (client->client,
707                                                 msize,
708                                                 GNUNET_TIME_UNIT_FOREVER_REL,
709                                                 &transmit_to_client_callback,
710                                                 client);
711       GNUNET_assert (th != NULL);
712     }
713   return tsize;
714 }
715
716
717 /**
718  * Send the specified message to the specified client.  Since multiple
719  * messages may be pending for the same client at a time, this code
720  * makes sure that no message is lost.
721  *
722  * @param client client to transmit the message to
723  * @param msg the message to send
724  * @param may_drop can this message be dropped if the
725  *        message queue for this client is getting far too large?
726  */
727 static void
728 transmit_to_client (struct TransportClient *client,
729                     const struct GNUNET_MessageHeader *msg, int may_drop)
730 {
731   struct ClientMessageQueueEntry *q;
732   uint16_t msize;
733   struct GNUNET_NETWORK_TransmitHandle *th;
734
735   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
736     {
737       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
738                   _
739                   ("Dropping message, have %u messages pending (%u is the soft limit)\n"),
740                   client->message_count, MAX_PENDING);
741       /* TODO: call to statistics... */
742       return;
743     }
744   client->message_count++;
745   msize = ntohs (msg->size);
746   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
747   memcpy (&q[1], msg, msize);
748   /* append to message queue */
749   if (client->message_queue_tail == NULL)
750     {
751       client->message_queue_tail = q;
752     }
753   else
754     {
755       client->message_queue_tail->next = q;
756       client->message_queue_tail = q;
757     }
758   if (client->message_queue_head == NULL)
759     {
760       client->message_queue_head = q;
761       th = GNUNET_SERVER_notify_transmit_ready (client->client,
762                                                 msize,
763                                                 GNUNET_TIME_UNIT_FOREVER_REL,
764                                                 &transmit_to_client_callback,
765                                                 client);
766       GNUNET_assert (th != NULL);
767     }
768 }
769
770
771 /**
772  * Find alternative plugins for communication.
773  *
774  * @param neighbour for which neighbour should we try to find
775  *        more plugins?
776  */
777 static void
778 try_alternative_plugins (struct NeighbourList *neighbour)
779 {
780   struct ReadyList *rl;
781
782   if ((neighbour->plugins != NULL) &&
783       (neighbour->retry_plugins_time.value >
784        GNUNET_TIME_absolute_get ().value))
785     return;                     /* don't try right now */
786   neighbour->retry_plugins_time
787     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
788
789   rl = neighbour->plugins;
790   while (rl != NULL)
791     {
792       if (rl->connect_attempts > 0)
793         rl->connect_attempts--; /* amnesty */
794       rl = rl->next;
795     }
796
797 }
798
799
800 /**
801  * The peer specified by the given neighbour has timed-out or a plugin
802  * has disconnected.  We may either need to do nothing (other plugins
803  * still up), or trigger a full disconnect and clean up.  This
804  * function updates our state and do the necessary notifications.
805  * Also notifies our clients that the neighbour is now officially
806  * gone.
807  *
808  * @param n the neighbour list entry for the peer
809  * @param check should we just check if all plugins
810  *        disconnected or must we ask all plugins to
811  *        disconnect?
812  */
813 static void
814 disconnect_neighbour (struct NeighbourList *n,
815                       int check);
816
817
818 /**
819  * Check the ready list for the given neighbour and
820  * if a plugin is ready for transmission (and if we
821  * have a message), do so!
822  *
823  * @param neighbour target peer for which to check the plugins
824  */
825 static void 
826 try_transmission_to_peer (struct NeighbourList *neighbour);
827
828
829 /**
830  * Function called by the GNUNET_TRANSPORT_TransmitFunction
831  * upon "completion" of a send request.  This tells the API
832  * that it is now legal to send another message to the given
833  * peer.
834  *
835  * @param cls closure, identifies the entry on the
836  *            message queue that was transmitted and the
837  *            client responsible for queueing the message
838  * @param rl identifies plugin used for the transmission for
839  *           this neighbour; needs to be re-enabled for
840  *           future transmissions
841  * @param target the peer receiving the message
842  * @param result GNUNET_OK on success, if the transmission
843  *           failed, we should not tell the client to transmit
844  *           more messages
845  */
846 static void
847 transmit_send_continuation (void *cls,
848                             struct ReadyList *rl,
849                             const struct GNUNET_PeerIdentity *target,
850                             int result)
851 {
852   struct MessageQueue *mq = cls;
853   struct SendOkMessage send_ok_msg;
854   struct NeighbourList *n;
855
856   GNUNET_assert (mq != NULL);
857   n = mq->neighbour;
858   GNUNET_assert (n != NULL);
859   GNUNET_assert (0 ==
860                  memcmp (&n->id, target,
861                          sizeof (struct GNUNET_PeerIdentity)));
862   if (rl == NULL)
863     {
864       rl = n->plugins;
865       while ((rl != NULL) && (rl->plugin != mq->plugin))
866         rl = rl->next;
867       GNUNET_assert (rl != NULL);
868     }
869   if (result == GNUNET_OK)
870     {
871       rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
872     }
873   else
874     {
875       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
876                   "Transmission to peer `%s' failed, marking connection as down.\n",
877                   GNUNET_i2s(target));
878       rl->connected = GNUNET_NO;
879       rl->plugin_handle = NULL;
880     }
881   if (!mq->internal_msg)
882     rl->transmit_ready = GNUNET_YES;
883   if (mq->client != NULL)
884     {
885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                   "Notifying client %p about failed transission to peer `%4s'.\n",
887                   mq->client,
888                   GNUNET_i2s(target));
889       send_ok_msg.header.size = htons (sizeof (send_ok_msg));
890       send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
891       send_ok_msg.success = htonl (result);
892       send_ok_msg.peer = n->id;
893       transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
894     }
895   GNUNET_free (mq->message);
896   GNUNET_free (mq);
897   /* one plugin just became ready again, try transmitting
898      another message (if available) */
899   if (result == GNUNET_OK)
900     try_transmission_to_peer (n);
901   else    
902     disconnect_neighbour (n, GNUNET_YES); 
903 }
904
905
906 /**
907  * Check the ready list for the given neighbour and
908  * if a plugin is ready for transmission (and if we
909  * have a message), do so!
910  */
911 static void
912 try_transmission_to_peer (struct NeighbourList *neighbour)
913 {
914   struct ReadyList *pos;
915   struct GNUNET_TIME_Relative min_latency;
916   struct ReadyList *rl;
917   struct MessageQueue *mq;
918   struct GNUNET_TIME_Absolute now;
919
920   if (neighbour->messages == NULL)
921     return;                     /* nothing to do */
922   try_alternative_plugins (neighbour);
923   min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
924   rl = NULL;
925   mq = neighbour->messages;
926   now = GNUNET_TIME_absolute_get ();
927   pos = neighbour->plugins;
928   while (pos != NULL)
929     {
930       /* set plugins that are inactive for a long time back to disconnected */
931       if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES))
932         {
933 #if DEBUG_TRANSPORT
934           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                       "Marking long-time inactive connection to `%4s' as down.\n",
936                       GNUNET_i2s (&neighbour->id));
937 #endif
938           pos->connected = GNUNET_NO;
939         }
940       if (((GNUNET_YES == pos->transmit_ready) ||
941            (mq->internal_msg)) &&
942           (pos->connect_attempts < MAX_CONNECT_RETRY) &&
943           ((rl == NULL) || (min_latency.value > pos->latency.value)))
944         {
945           rl = pos;
946           min_latency = pos->latency;
947         }
948       pos = pos->next;
949     }
950   if (rl == NULL)
951     {
952 #if DEBUG_TRANSPORT
953       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954                   "No plugin ready to transmit message\n");
955 #endif
956       return;                   /* nobody ready */
957     }
958   if (GNUNET_NO == rl->connected)
959     {
960       rl->connect_attempts++;
961       rl->connected = GNUNET_YES;
962 #if DEBUG_TRANSPORT
963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964               "Establishing fresh connection with `%4s' via plugin `%s'\n",
965               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
966 #endif
967     }
968   neighbour->messages = mq->next;
969   mq->plugin = rl->plugin;
970   if (!mq->internal_msg)
971     rl->transmit_ready = GNUNET_NO;
972 #if DEBUG_TRANSPORT
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Giving message of type `%u' for `%4s' to plugin `%s'\n",
975               ntohs (mq->message->type),
976               GNUNET_i2s (&neighbour->id), rl->plugin->short_name);
977 #endif
978   rl->plugin_handle
979     = rl->plugin->api->send (rl->plugin->api->cls,
980                              rl->plugin_handle,
981                              rl,
982                              &neighbour->id,
983                              mq->priority,
984                              mq->message,
985                              GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
986                              &transmit_send_continuation, mq);
987 }
988
989
990 /**
991  * Send the specified message to the specified peer.
992  *
993  * @param client source of the transmission request (can be NULL)
994  * @param priority how important is the message
995  * @param msg message to send
996  * @param is_internal is this an internal message
997  * @param neighbour handle to the neighbour for transmission
998  */
999 static void
1000 transmit_to_peer (struct TransportClient *client,
1001                   unsigned int priority,
1002                   const struct GNUNET_MessageHeader *msg,
1003                   int is_internal, struct NeighbourList *neighbour)
1004 {
1005   struct MessageQueue *mq;
1006   struct MessageQueue *mqe;
1007   struct GNUNET_MessageHeader *m;
1008
1009 #if DEBUG_TRANSPORT
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               _("Sending message of type %u to peer `%4s'\n"),
1012               ntohs (msg->type), GNUNET_i2s (&neighbour->id));
1013 #endif
1014   if (client != NULL)
1015     {
1016       /* check for duplicate submission */
1017       mq = neighbour->messages;
1018       while (NULL != mq)
1019         {
1020           if (mq->client == client)
1021             {
1022               /* client transmitted to same peer twice
1023                  before getting SendOk! */
1024               GNUNET_break (0);
1025               return;
1026             }
1027           mq = mq->next;
1028         }
1029     }
1030   mq = GNUNET_malloc (sizeof (struct MessageQueue));
1031   mq->client = client;
1032   m = GNUNET_malloc (ntohs (msg->size));
1033   memcpy (m, msg, ntohs (msg->size));
1034   mq->message = m;
1035   mq->neighbour = neighbour;
1036   mq->internal_msg = is_internal;
1037   mq->priority = priority;
1038
1039   /* find tail */
1040   mqe = neighbour->messages;
1041   if (mqe != NULL)
1042     while (mqe->next != NULL)
1043       mqe = mqe->next;
1044   if (mqe == NULL)
1045     {
1046       /* new head */
1047       neighbour->messages = mq;
1048       try_transmission_to_peer (neighbour);
1049     }
1050   else
1051     {
1052       /* append */
1053       mqe->next = mq;
1054     }
1055 }
1056
1057
1058 /**
1059  * FIXME: document.
1060  */
1061 struct GeneratorContext
1062 {
1063   struct TransportPlugin *plug_pos;
1064   struct AddressList *addr_pos;
1065   struct GNUNET_TIME_Absolute expiration;
1066 };
1067
1068
1069 /**
1070  * FIXME: document.
1071  */
1072 static size_t
1073 address_generator (void *cls, size_t max, void *buf)
1074 {
1075   struct GeneratorContext *gc = cls;
1076   size_t ret;
1077
1078   while ((gc->addr_pos == NULL) && (gc->plug_pos != NULL))
1079     {
1080       gc->plug_pos = gc->plug_pos->next;
1081       gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL;
1082     }
1083   if (NULL == gc->plug_pos)
1084     return 0;
1085   ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name,
1086                                   gc->expiration,
1087                                   gc->addr_pos->addr,
1088                                   gc->addr_pos->addrlen, buf, max);
1089   gc->addr_pos = gc->addr_pos->next;
1090   return ret;
1091 }
1092
1093
1094 /**
1095  * Construct our HELLO message from all of the addresses of
1096  * all of the transports.
1097  */
1098 static void
1099 refresh_hello ()
1100 {
1101   struct GNUNET_HELLO_Message *hello;
1102   struct TransportClient *cpos;
1103   struct NeighbourList *npos;
1104   struct GeneratorContext gc;
1105
1106 #if DEBUG_TRANSPORT
1107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1108               "Refreshing my `%s'\n",
1109               "HELLO");
1110 #endif
1111   gc.plug_pos = plugins;
1112   gc.addr_pos = plugins != NULL ? plugins->addresses : NULL;
1113   gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1114   hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc);
1115   cpos = clients;
1116   while (cpos != NULL)
1117     {
1118       transmit_to_client (cpos,
1119                           (const struct GNUNET_MessageHeader *) hello,
1120                           GNUNET_NO);
1121       cpos = cpos->next;
1122     }
1123
1124   GNUNET_free_non_null (our_hello);
1125   our_hello = hello;
1126   our_hello_version++;
1127   npos = neighbours;
1128   while (npos != NULL)
1129     {
1130 #if DEBUG_TRANSPORT
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1132                   "Transmitting updated `%s' to neighbour `%4s'\n",
1133                   "HELLO",
1134                   GNUNET_i2s(&npos->id));
1135 #endif
1136       transmit_to_peer (NULL, 0,
1137                         (const struct GNUNET_MessageHeader *) our_hello,
1138                         GNUNET_YES, npos);
1139       npos = npos->next;
1140     }
1141 }
1142
1143
1144 /**
1145  * Task used to clean up expired addresses for a plugin.
1146  *
1147  * @param cls closure
1148  * @param tc context
1149  */
1150 static void
1151 expire_address_task (void *cls,
1152                      const struct GNUNET_SCHEDULER_TaskContext *tc);
1153
1154
1155 /**
1156  * Update the list of addresses for this plugin,
1157  * expiring those that are past their expiration date.
1158  *
1159  * @param plugin addresses of which plugin should be recomputed?
1160  * @param fresh set to GNUNET_YES if a new address was added
1161  *        and we need to regenerate the HELLO even if nobody
1162  *        expired
1163  */
1164 static void
1165 update_addresses (struct TransportPlugin *plugin, int fresh)
1166 {
1167   struct GNUNET_TIME_Relative min_remaining;
1168   struct GNUNET_TIME_Relative remaining;
1169   struct GNUNET_TIME_Absolute now;
1170   struct AddressList *pos;
1171   struct AddressList *prev;
1172   struct AddressList *next;
1173   int expired;
1174
1175   if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK)
1176     GNUNET_SCHEDULER_cancel (plugin->env.sched, plugin->address_update_task);
1177   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1178   now = GNUNET_TIME_absolute_get ();
1179   min_remaining = GNUNET_TIME_UNIT_FOREVER_REL;
1180   expired = GNUNET_NO;
1181   prev = NULL;
1182   pos = plugin->addresses;
1183   while (pos != NULL)
1184     {
1185       next = pos->next;
1186       if (pos->expires.value < now.value)
1187         {
1188           expired = GNUNET_YES;
1189           if (prev == NULL)
1190             plugin->addresses = pos->next;
1191           else
1192             prev->next = pos->next;
1193           GNUNET_free (pos);
1194         }
1195       else
1196         {
1197           remaining = GNUNET_TIME_absolute_get_remaining (pos->expires);
1198           if (remaining.value < min_remaining.value)
1199             min_remaining = remaining;
1200           prev = pos;
1201         }
1202       pos = next;
1203     }
1204
1205   if (expired || fresh)
1206     refresh_hello ();
1207   if (min_remaining.value < GNUNET_TIME_UNIT_FOREVER_REL.value)
1208     plugin->address_update_task
1209       = GNUNET_SCHEDULER_add_delayed (plugin->env.sched,
1210                                       GNUNET_NO,
1211                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1212                                       GNUNET_SCHEDULER_NO_TASK,
1213                                       min_remaining,
1214                                       &expire_address_task, plugin);
1215
1216 }
1217
1218
1219 /**
1220  * Task used to clean up expired addresses for a plugin.
1221  *
1222  * @param cls closure
1223  * @param tc context
1224  */
1225 static void
1226 expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1227 {
1228   struct TransportPlugin *plugin = cls;
1229   plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1230   update_addresses (plugin, GNUNET_NO);
1231 }
1232
1233
1234 /**
1235  * Function that must be called by each plugin to notify the
1236  * transport service about the addresses under which the transport
1237  * provided by the plugin can be reached.
1238  *
1239  * @param cls closure
1240  * @param name name of the transport that generated the address
1241  * @param addr one of the addresses of the host, NULL for the last address
1242  *        the specific address format depends on the transport
1243  * @param addrlen length of the address
1244  * @param expires when should this address automatically expire?
1245  */
1246 static void
1247 plugin_env_notify_address (void *cls,
1248                            const char *name,
1249                            const void *addr,
1250                            size_t addrlen,
1251                            struct GNUNET_TIME_Relative expires)
1252 {
1253   struct TransportPlugin *p = cls;
1254   struct AddressList *al;
1255   struct GNUNET_TIME_Absolute abex;
1256
1257   abex = GNUNET_TIME_relative_to_absolute (expires);
1258   GNUNET_assert (p == find_transport (name));
1259
1260   al = p->addresses;
1261   while (al != NULL)
1262     {
1263       if ((addrlen == al->addrlen) && (0 == memcmp (addr, &al[1], addrlen)))
1264         {
1265           if (al->expires.value < abex.value)
1266             al->expires = abex;
1267           return;
1268         }
1269       al = al->next;
1270     }
1271 #if DEBUG_TRANSPORT
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "Plugin `%s' informs us about a new address `%s'\n", name,
1274               GNUNET_a2s(addr, addrlen));
1275 #endif
1276   al = GNUNET_malloc (sizeof (struct AddressList) + addrlen);
1277   al->addr = &al[1];
1278   al->next = p->addresses;
1279   p->addresses = al;
1280   al->expires = abex;
1281   al->addrlen = addrlen;
1282   memcpy (&al[1], addr, addrlen);
1283   update_addresses (p, GNUNET_YES);
1284 }
1285
1286
1287 /**
1288  * FIXME: document.
1289  */
1290 struct LookupHelloContext
1291 {
1292   GNUNET_TRANSPORT_AddressCallback iterator;
1293
1294   void *iterator_cls;
1295 };
1296
1297
1298 /**
1299  * FIXME: document.
1300  */
1301 static int
1302 lookup_address_callback (void *cls,
1303                          const char *tname,
1304                          struct GNUNET_TIME_Absolute expiration,
1305                          const void *addr, size_t addrlen)
1306 {
1307   struct LookupHelloContext *lhc = cls;
1308   lhc->iterator (lhc->iterator_cls, tname, addr, addrlen);
1309   return GNUNET_OK;
1310 }
1311
1312
1313 /**
1314  * FIXME: document.
1315  */
1316 static void
1317 lookup_hello_callback (void *cls,
1318                        const struct GNUNET_PeerIdentity *peer,
1319                        const struct GNUNET_HELLO_Message *h, uint32_t trust)
1320 {
1321   struct LookupHelloContext *lhc = cls;
1322
1323   if (peer == NULL)
1324     {
1325       lhc->iterator (lhc->iterator_cls, NULL, NULL, 0);
1326       GNUNET_free (lhc);
1327       return;
1328     }
1329   if (h == NULL)
1330     return;
1331   GNUNET_HELLO_iterate_addresses (h,
1332                                   GNUNET_NO, &lookup_address_callback, lhc);
1333 }
1334
1335
1336 /**
1337  * Function that allows a transport to query the known
1338  * network addresses for a given peer.
1339  *
1340  * @param cls closure
1341  * @param timeout after how long should we time out?
1342  * @param target which peer are we looking for?
1343  * @param iter function to call for each known address
1344  * @param iter_cls closure for iter
1345  */
1346 static void
1347 plugin_env_lookup_address (void *cls,
1348                            struct GNUNET_TIME_Relative timeout,
1349                            const struct GNUNET_PeerIdentity *target,
1350                            GNUNET_TRANSPORT_AddressCallback iter,
1351                            void *iter_cls)
1352 {
1353   struct LookupHelloContext *lhc;
1354
1355   lhc = GNUNET_malloc (sizeof (struct LookupHelloContext));
1356   lhc->iterator = iter;
1357   lhc->iterator_cls = iter_cls;
1358   GNUNET_PEERINFO_for_all (cfg,
1359                            sched,
1360                            target, 0, timeout, &lookup_hello_callback, &lhc);
1361 }
1362
1363
1364 /**
1365  * Notify all of our clients about a peer connecting.
1366  */
1367 static void
1368 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
1369                         struct GNUNET_TIME_Relative latency)
1370 {
1371   struct ConnectInfoMessage cim;
1372   struct TransportClient *cpos;
1373
1374 #if DEBUG_TRANSPORT
1375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376               "Informing clients about peer `%4s' connecting to us\n",
1377               GNUNET_i2s (peer));
1378 #endif
1379   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
1380   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1381   cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000));
1382   cim.latency = GNUNET_TIME_relative_hton (latency);
1383   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
1384   cpos = clients;
1385   while (cpos != NULL)
1386     {
1387       transmit_to_client (cpos, &cim.header, GNUNET_NO);
1388       cpos = cpos->next;
1389     }
1390 }
1391
1392
1393 /**
1394  * Notify all of our clients about a peer disconnecting.
1395  */
1396 static void
1397 notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
1398 {
1399   struct DisconnectInfoMessage dim;
1400   struct TransportClient *cpos;
1401
1402 #if DEBUG_TRANSPORT
1403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1404               "Informing clients about peer `%4s' disconnecting\n",
1405               GNUNET_i2s (peer));
1406 #endif
1407   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
1408   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1409   dim.reserved = htonl (0);
1410   memcpy (&dim.peer, peer, sizeof (struct GNUNET_PeerIdentity));
1411   cpos = clients;
1412   while (cpos != NULL)
1413     {
1414       transmit_to_client (cpos, &dim.header, GNUNET_NO);
1415       cpos = cpos->next;
1416     }
1417 }
1418
1419
1420 /**
1421  * Copy any validated addresses to buf.
1422  *
1423  * @return 0 once all addresses have been
1424  *         returned
1425  */
1426 static size_t
1427 list_validated_addresses (void *cls, size_t max, void *buf)
1428 {
1429   struct ValidationAddress **va = cls;
1430   size_t ret;
1431
1432   while ((NULL != *va) && ((*va)->ok != GNUNET_YES))
1433     *va = (*va)->next;
1434   if (NULL == *va)
1435     return 0;
1436   ret = GNUNET_HELLO_add_address ((*va)->transport_name,
1437                                   (*va)->expiration,
1438                                   &(*va)[1], (*va)->addr_len, buf, max);
1439   *va = (*va)->next;
1440   return ret;
1441 }
1442
1443
1444 /**
1445  * HELLO validation cleanup task.
1446  */
1447 static void
1448 cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1449 {
1450   struct ValidationAddress *va;
1451   struct ValidationList *pos;
1452   struct ValidationList *prev;
1453   struct GNUNET_TIME_Absolute now;
1454   struct GNUNET_HELLO_Message *hello;
1455   struct GNUNET_PeerIdentity pid;
1456   struct NeighbourList *n;
1457
1458 #if DEBUG_TRANSPORT
1459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1460               "HELLO validation cleanup background task running...\n");
1461 #endif
1462   now = GNUNET_TIME_absolute_get ();
1463   prev = NULL;
1464   pos = pending_validations;
1465   while (pos != NULL)
1466     {
1467       if (pos->timeout.value < now.value)
1468         {
1469           if (prev == NULL)
1470             pending_validations = pos->next;
1471           else
1472             prev->next = pos->next;
1473           va = pos->addresses;
1474           hello = GNUNET_HELLO_create (&pos->publicKey,
1475                                        &list_validated_addresses, &va);
1476           GNUNET_CRYPTO_hash (&pos->publicKey,
1477                               sizeof (struct
1478                                       GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1479                               &pid.hashPubKey);
1480 #if DEBUG_TRANSPORT
1481           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482                       "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n",
1483                       "HELLO", GNUNET_i2s (&pid));
1484 #endif
1485           GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello);
1486           n = find_neighbour (&pid);
1487           if (NULL != n)
1488             try_transmission_to_peer (n);           
1489           GNUNET_free (hello);
1490           while (NULL != (va = pos->addresses))
1491             {
1492               pos->addresses = va->next;
1493               GNUNET_free (va->transport_name);
1494               GNUNET_free (va);
1495             }
1496           GNUNET_free (pos);
1497           if (prev == NULL)
1498             pos = pending_validations;
1499           else
1500             pos = prev->next;
1501           continue;
1502         }
1503       prev = pos;
1504       pos = pos->next;
1505     }
1506
1507   /* finally, reschedule cleanup if needed; list is
1508      ordered by timeout, so we need the last element... */
1509   pos = pending_validations;
1510   while ((pos != NULL) && (pos->next != NULL))
1511     pos = pos->next;
1512   if (NULL != pos)
1513     GNUNET_SCHEDULER_add_delayed (sched,
1514                                   GNUNET_NO,
1515                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1516                                   GNUNET_SCHEDULER_NO_TASK,
1517                                   GNUNET_TIME_absolute_get_remaining
1518                                   (pos->timeout), &cleanup_validation, NULL);
1519 }
1520
1521
1522
1523
1524 /**
1525  * Function that will be called if we receive a validation
1526  * of an address challenge that we transmitted to another
1527  * peer.  Note that the validation should only be considered
1528  * acceptable if the challenge matches AND if the sender
1529  * address is at least a plausible address for this peer
1530  * (otherwise we may be seeing a MiM attack).
1531  *
1532  * @param cls closure
1533  * @param name name of the transport that generated the address
1534  * @param peer who responded to our challenge
1535  * @param challenge the challenge number we presumably used
1536  * @param sender_addr string describing our sender address (as observed
1537  *         by the other peer in human-readable format)
1538  */
1539 static void
1540 plugin_env_notify_validation (void *cls,
1541                               const char *name,
1542                               const struct GNUNET_PeerIdentity *peer,
1543                               uint32_t challenge,
1544                               const char *sender_addr)
1545 {
1546   int all_done;
1547   int matched;
1548   struct ValidationList *pos;
1549   struct ValidationAddress *va;
1550   struct GNUNET_PeerIdentity id;
1551
1552   pos = pending_validations;
1553   while (pos != NULL)
1554     {
1555       GNUNET_CRYPTO_hash (&pos->publicKey,
1556                           sizeof (struct
1557                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1558                           &id.hashPubKey);
1559       if (0 ==
1560           memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity)))
1561         break;
1562       pos = pos->next;
1563     }
1564   if (pos == NULL)
1565     {
1566       /* TODO: call statistics (unmatched PONG) */
1567       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1568                   _
1569                   ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"),
1570                   GNUNET_i2s(peer));
1571       return;
1572     }
1573   all_done = GNUNET_YES;
1574   matched = GNUNET_NO;
1575   va = pos->addresses;
1576   while (va != NULL)
1577     {
1578       if (va->challenge == challenge)
1579         {
1580 #if DEBUG_TRANSPORT
1581           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1582                       "Confirmed validity of peer address.\n");
1583 #endif
1584           GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1585                       _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"),
1586                       sender_addr, 
1587                       name);
1588           va->ok = GNUNET_YES;
1589           va->expiration =
1590             GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
1591           matched = GNUNET_YES;
1592         }        
1593       if (va->ok != GNUNET_YES)
1594         all_done = GNUNET_NO;
1595       va = va->next;
1596     }
1597   if (GNUNET_NO == matched)
1598     {
1599       /* TODO: call statistics (unmatched PONG) */
1600       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1601                   _
1602                   ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"),
1603                   "PONG", "PING");
1604     }
1605   if (GNUNET_YES == all_done)
1606     {
1607       pos->timeout.value = 0;
1608       GNUNET_SCHEDULER_add_delayed (sched,
1609                                     GNUNET_NO,
1610                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1611                                     GNUNET_SCHEDULER_NO_TASK,
1612                                     GNUNET_TIME_UNIT_ZERO,
1613                                     &cleanup_validation, NULL);
1614     }
1615 }
1616
1617
1618 struct CheckHelloValidatedContext
1619 {
1620   /**
1621    * Plugin for which we are validating.
1622    */
1623   struct TransportPlugin *plugin;
1624
1625   /**
1626    * Hello that we are validating.
1627    */
1628   struct GNUNET_HELLO_Message *hello;
1629
1630   /**
1631    * Validation list being build.
1632    */
1633   struct ValidationList *e;
1634
1635 };
1636
1637
1638 /**
1639  * Append the given address to the list of entries
1640  * that need to be validated.
1641  */
1642 static int
1643 run_validation (void *cls,
1644                 const char *tname,
1645                 struct GNUNET_TIME_Absolute expiration,
1646                 const void *addr, size_t addrlen)
1647 {
1648   struct ValidationList *e = cls;
1649   struct TransportPlugin *tp;
1650   struct ValidationAddress *va;
1651   struct GNUNET_PeerIdentity id;
1652
1653   tp = find_transport (tname);
1654   if (tp == NULL)
1655     {
1656       GNUNET_log (GNUNET_ERROR_TYPE_INFO |
1657                   GNUNET_ERROR_TYPE_BULK,
1658                   _
1659                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
1660                   tname);
1661       return GNUNET_OK;
1662     }
1663   GNUNET_CRYPTO_hash (&e->publicKey,
1664                       sizeof (struct
1665                               GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1666                       &id.hashPubKey);
1667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1668               "Scheduling validation of address `%s' via `%s' for `%4s'\n",
1669               GNUNET_a2s(addr, addrlen),
1670               tname,
1671               GNUNET_i2s(&id));
1672
1673   va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen);
1674   va->next = e->addresses;
1675   e->addresses = va;
1676   va->transport_name = GNUNET_strdup (tname);
1677   va->addr_len = addrlen;
1678   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1679                                             (unsigned int) -1);
1680   memcpy (&va[1], addr, addrlen);
1681   return GNUNET_OK;
1682 }
1683
1684
1685 /**
1686  * Check if addresses in validated hello "h" overlap with
1687  * those in "chvc->hello" and update "chvc->hello" accordingly,
1688  * removing those addresses that have already been validated.
1689  */
1690 static void
1691 check_hello_validated (void *cls,
1692                        const struct GNUNET_PeerIdentity *peer,
1693                        const struct GNUNET_HELLO_Message *h, 
1694                        uint32_t trust)
1695 {
1696   struct CheckHelloValidatedContext *chvc = cls;
1697   struct ValidationAddress *va;
1698   struct TransportPlugin *tp;
1699   int first_call;
1700   struct GNUNET_PeerIdentity apeer;
1701
1702   first_call = GNUNET_NO;
1703   if (chvc->e == NULL)
1704     {
1705       first_call = GNUNET_YES;
1706       chvc->e = GNUNET_malloc (sizeof (struct ValidationList));
1707       GNUNET_assert (GNUNET_OK ==
1708                      GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello,
1709                                            &chvc->e->publicKey));
1710       chvc->e->timeout =
1711         GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT);
1712       chvc->e->next = pending_validations;
1713       pending_validations = chvc->e;
1714     }
1715   if (h != NULL)
1716     {
1717       GNUNET_HELLO_iterate_new_addresses (chvc->hello,
1718                                           h,
1719                                           GNUNET_TIME_absolute_get (),
1720                                           &run_validation, chvc->e);
1721     }
1722   else if (GNUNET_YES == first_call)
1723     {
1724       /* no existing HELLO, all addresses are new */
1725       GNUNET_HELLO_iterate_addresses (chvc->hello,
1726                                       GNUNET_NO, &run_validation, chvc->e);
1727     }
1728   if (h != NULL)
1729     return;                     /* wait for next call */
1730   /* finally, transmit validation attempts */
1731   GNUNET_assert (GNUNET_OK ==
1732                  GNUNET_HELLO_get_id (chvc->hello,
1733                                       &apeer));
1734   va = chvc->e->addresses;
1735   while (va != NULL)
1736     {
1737 #if DEBUG_TRANSPORT
1738       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739                   "Establishing `%s' connection to validate `%s' of `%4s'\n",
1740                   va->transport_name,
1741                   "HELLO",
1742                   GNUNET_i2s (&apeer));
1743 #endif
1744       tp = find_transport (va->transport_name);
1745       GNUNET_assert (tp != NULL);
1746       if (GNUNET_OK !=
1747           tp->api->validate (tp->api->cls,
1748                              &apeer,
1749                              va->challenge,
1750                              HELLO_VERIFICATION_TIMEOUT,
1751                              &va[1],
1752                              va->addr_len))
1753         va->ok = GNUNET_SYSERR;
1754       va = va->next;
1755     }
1756   if (chvc->e->next == NULL)
1757     GNUNET_SCHEDULER_add_delayed (sched,
1758                                   GNUNET_NO,
1759                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
1760                                   GNUNET_SCHEDULER_NO_TASK,
1761                                   GNUNET_TIME_absolute_get_remaining
1762                                   (chvc->e->timeout), &cleanup_validation,
1763                                   NULL);
1764   GNUNET_free (chvc);
1765 }
1766
1767
1768 /**
1769  * Process HELLO-message.
1770  *
1771  * @param plugin transport involved, may be NULL
1772  * @param message the actual message
1773  * @return GNUNET_OK if the HELLO was well-formed, GNUNET_SYSERR otherwise
1774  */
1775 static int
1776 process_hello (struct TransportPlugin *plugin,
1777                const struct GNUNET_MessageHeader *message)
1778 {
1779   struct ValidationList *e;
1780   uint16_t hsize;
1781   struct GNUNET_PeerIdentity target;
1782   const struct GNUNET_HELLO_Message *hello;
1783   struct CheckHelloValidatedContext *chvc;
1784   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
1785
1786   hsize = ntohs (message->size);
1787   if ((ntohs (message->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1788       (hsize < sizeof (struct GNUNET_MessageHeader)))
1789     {
1790       GNUNET_break (0);
1791       return GNUNET_SYSERR;
1792     }
1793   /* first, check if load is too high */
1794   if (GNUNET_OS_load_cpu_get (cfg) > 100)
1795     {
1796       /* TODO: call to stats? */
1797       return GNUNET_OK;
1798     }
1799   hello = (const struct GNUNET_HELLO_Message *) message;
1800   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, &publicKey))
1801     {
1802       GNUNET_break_op (0);
1803       return GNUNET_SYSERR;
1804     }
1805   GNUNET_CRYPTO_hash (&publicKey,
1806                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1807                       &target.hashPubKey);
1808 #if DEBUG_TRANSPORT
1809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810               "Processing `%s' message for `%4s'\n",
1811               "HELLO", GNUNET_i2s (&target));
1812 #endif
1813   /* check if a HELLO for this peer is already on the validation list */
1814   e = pending_validations;
1815   while (e != NULL)
1816     {
1817       if (0 == memcmp (&e->publicKey,
1818                        &publicKey,
1819                        sizeof (struct
1820                                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)))
1821         {
1822           /* TODO: call to stats? */
1823 #if DEBUG_TRANSPORT
1824           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1825                       "`%s' message for peer `%4s' is already pending; ignoring new message\n",
1826                       "HELLO", GNUNET_i2s (&target));
1827 #endif    
1828           return GNUNET_OK;
1829         }
1830       e = e->next;
1831     }
1832   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
1833   chvc->plugin = plugin;
1834   chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1];
1835   memcpy (chvc->hello, hello, hsize);
1836   /* finally, check if HELLO was previously validated
1837      (continuation will then schedule actual validation) */
1838   GNUNET_PEERINFO_for_all (cfg,
1839                            sched,
1840                            &target,
1841                            0,
1842                            HELLO_VERIFICATION_TIMEOUT,
1843                            &check_hello_validated, chvc);
1844   return GNUNET_OK;
1845 }
1846
1847
1848 /**
1849  * The peer specified by the given neighbour has timed-out or a plugin
1850  * has disconnected.  We may either need to do nothing (other plugins
1851  * still up), or trigger a full disconnect and clean up.  This
1852  * function updates our state and do the necessary notifications.
1853  * Also notifies our clients that the neighbour is now officially
1854  * gone.
1855  *
1856  * @param n the neighbour list entry for the peer
1857  * @param check should we just check if all plugins
1858  *        disconnected or must we ask all plugins to
1859  *        disconnect?
1860  */
1861 static void
1862 disconnect_neighbour (struct NeighbourList *n,
1863                       int check)
1864 {
1865   struct ReadyList *rpos;
1866   struct NeighbourList *npos;
1867   struct NeighbourList *nprev;
1868   struct MessageQueue *mq;
1869   
1870   if (GNUNET_YES == check)
1871     {
1872       rpos = n->plugins;
1873       while (NULL != rpos)
1874         {
1875           if (GNUNET_YES == rpos->connected)
1876             return; /* still connected */
1877           rpos = rpos->next;
1878         }
1879     }
1880
1881 #if DEBUG_TRANSPORT
1882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1883               "Disconnecting from `%4s'\n",
1884               GNUNET_i2s(&n->id));
1885 #endif
1886   /* remove n from neighbours list */
1887   nprev = NULL;
1888   npos = neighbours;
1889   while ((npos != NULL) && (npos != n))
1890     {
1891       nprev = npos;
1892       npos = npos->next;
1893     }
1894   GNUNET_assert (npos != NULL);
1895   if (nprev == NULL)
1896     neighbours = n->next;
1897   else
1898     nprev->next = n->next;
1899
1900   /* notify all clients about disconnect */
1901   notify_clients_disconnect (&n->id);
1902
1903   /* clean up all plugins, cancel connections and pending transmissions */
1904   while (NULL != (rpos = n->plugins))
1905     {
1906       n->plugins = rpos->next;
1907       GNUNET_assert (rpos->neighbour == n);
1908       if (GNUNET_YES == rpos->connected)
1909         rpos->plugin->api->cancel (rpos->plugin->api->cls,
1910                                    rpos->plugin_handle,
1911                                    rpos,
1912                                    &n->id);
1913       GNUNET_free (rpos);
1914     }
1915
1916   /* free all messages on the queue */
1917   while (NULL != (mq = n->messages))
1918     {
1919       n->messages = mq->next;
1920       GNUNET_assert (mq->neighbour == n);
1921       GNUNET_free (mq);
1922     }
1923   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1924     GNUNET_SCHEDULER_cancel (sched,
1925                              n->timeout_task);
1926   /* finally, free n itself */
1927   GNUNET_free (n);
1928 }
1929
1930
1931 /**
1932  * Add an entry for each of our transport plugins
1933  * (that are able to send) to the list of plugins
1934  * for this neighbour.
1935  *
1936  * @param neighbour to initialize
1937  */
1938 static void
1939 add_plugins (struct NeighbourList *neighbour)
1940 {
1941   struct TransportPlugin *tp;
1942   struct ReadyList *rl;
1943
1944   neighbour->retry_plugins_time
1945     = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY);
1946   tp = plugins;
1947   while (tp != NULL)
1948     {
1949       if (tp->api->send != NULL)
1950         {
1951           rl = GNUNET_malloc (sizeof (struct ReadyList));
1952           rl->next = neighbour->plugins;
1953           neighbour->plugins = rl;
1954           rl->plugin = tp;
1955           rl->neighbour = neighbour;
1956           rl->transmit_ready = GNUNET_YES;
1957         }
1958       tp = tp->next;
1959     }
1960 }
1961
1962
1963 static void
1964 neighbour_timeout_task (void *cls,
1965                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1966 {
1967   struct NeighbourList *n = cls;
1968
1969 #if DEBUG_TRANSPORT
1970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1971               "Neighbour `%4s' has timed out!\n",
1972               GNUNET_i2s(&n->id));
1973 #endif
1974   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1975   disconnect_neighbour (n, GNUNET_NO);
1976 }
1977
1978
1979 /**
1980  * Create a fresh entry in our neighbour list for the given peer.
1981  * Will try to transmit our current HELLO to the new neighbour.  Also
1982  * notifies our clients about the new "connection".
1983  *
1984  * @param peer the peer for which we create the entry
1985  * @return the new neighbour list entry
1986  */
1987 static struct NeighbourList *
1988 setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
1989 {
1990   struct NeighbourList *n;
1991
1992 #if DEBUG_TRANSPORT
1993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1994               "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n",
1995               GNUNET_i2s (peer));
1996 #endif
1997   GNUNET_assert (our_hello != NULL);
1998   n = GNUNET_malloc (sizeof (struct NeighbourList));
1999   n->next = neighbours;
2000   neighbours = n;
2001   n->id = *peer;
2002   n->last_quota_update = GNUNET_TIME_absolute_get ();
2003   n->peer_timeout =
2004     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2005   n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2006   add_plugins (n);
2007   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
2008                                                   GNUNET_NO,
2009                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
2010                                                   GNUNET_SCHEDULER_NO_TASK,
2011                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2012                                                   &neighbour_timeout_task, n);
2013   transmit_to_peer (NULL, 0,
2014                     (const struct GNUNET_MessageHeader *) our_hello,
2015                     GNUNET_YES, n);
2016   notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
2017   return n;
2018 }
2019
2020
2021 /**
2022  * Function called by the plugin for each received message.
2023  * Update data volumes, possibly notify plugins about
2024  * reducing the rate at which they read from the socket
2025  * and generally forward to our receive callback.
2026  *
2027  * @param plugin_context value to pass to this plugin
2028  *        to respond to the given peer (use is optional,
2029  *        but may speed up processing)
2030  * @param service_context value passed to the transport-service
2031  *        to identify the neighbour; will be NULL on the first
2032  *        call for a given peer
2033  * @param latency estimated latency for communicating with the
2034  *             given peer
2035  * @param peer (claimed) identity of the other peer
2036  * @param message the message, NULL if peer was disconnected
2037  * @return the new service_context that the plugin should use
2038  *         for future receive calls for messages from this
2039  *         particular peer
2040  */
2041 static struct ReadyList *
2042 plugin_env_receive (void *cls,
2043                     void *plugin_context,
2044                     struct ReadyList *service_context,
2045                     struct GNUNET_TIME_Relative latency,
2046                     const struct GNUNET_PeerIdentity *peer,
2047                     const struct GNUNET_MessageHeader *message)
2048 {
2049   const struct GNUNET_MessageHeader ack = {
2050     htons (sizeof (struct GNUNET_MessageHeader)),
2051     htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK)
2052   };
2053   struct TransportPlugin *plugin = cls;
2054   struct TransportClient *cpos;
2055   struct InboundMessage *im;
2056   uint16_t msize;
2057   struct NeighbourList *n;
2058
2059   if (service_context != NULL)
2060     {
2061       n = service_context->neighbour;
2062       GNUNET_assert (n != NULL);
2063     }
2064   else
2065     {
2066       n = find_neighbour (peer);
2067       if (n == NULL)
2068         {
2069           if (message == NULL)
2070             return NULL;        /* disconnect of peer already marked down */
2071           n = setup_new_neighbour (peer);
2072         }
2073       service_context = n->plugins;
2074       while ((service_context != NULL) && (plugin != service_context->plugin))
2075         service_context = service_context->next;
2076       GNUNET_assert ((plugin->api->send == NULL) ||
2077                      (service_context != NULL));
2078     }
2079   if (message == NULL)
2080     {
2081 #if DEBUG_TRANSPORT
2082       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2083                   "Receive failed from `%4s', triggering disconnect\n",
2084                   GNUNET_i2s(&n->id));
2085 #endif
2086       /* TODO: call stats */
2087       if ((service_context != NULL) &&
2088           (service_context->plugin_handle == plugin_context))
2089         {
2090           service_context->connected = GNUNET_NO;
2091           service_context->plugin_handle = NULL;
2092         }
2093       disconnect_neighbour (n, GNUNET_YES);
2094       return NULL;
2095     }
2096 #if DEBUG_TRANSPORT
2097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2098               "Processing message of type `%u' received by plugin...\n",
2099               ntohs (message->type));
2100 #endif
2101   if (service_context != NULL)
2102     {
2103       if (service_context->connected == GNUNET_NO)
2104         {
2105           service_context->connected = GNUNET_YES;
2106           service_context->transmit_ready = GNUNET_YES;
2107           service_context->connect_attempts++;
2108         }
2109       service_context->timeout
2110         = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2111       service_context->plugin_handle = plugin_context;
2112       service_context->latency = latency;
2113     }
2114   /* update traffic received amount ... */
2115   msize = ntohs (message->size);
2116   n->last_received += msize;
2117   GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
2118   n->peer_timeout =
2119     GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2120   n->timeout_task =
2121     GNUNET_SCHEDULER_add_delayed (sched, GNUNET_NO,
2122                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
2123                                   GNUNET_SCHEDULER_NO_TASK,
2124                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2125                                   &neighbour_timeout_task, n);
2126   update_quota (n);
2127   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2128     {
2129       /* dropping message due to frequent inbound volume violations! */
2130       GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2131                   GNUNET_ERROR_TYPE_BULK,
2132                   _
2133                   ("Dropping incoming message due to repeated bandwidth quota violations.\n"));
2134       /* TODO: call stats */
2135       GNUNET_assert (NULL != service_context->neighbour);
2136       return service_context;
2137     }
2138   switch (ntohs (message->type))
2139     {
2140     case GNUNET_MESSAGE_TYPE_HELLO:
2141 #if DEBUG_TRANSPORT
2142       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2143                   "Receiving `%s' message from `%4s'.\n", "HELLO",
2144                   GNUNET_i2s(peer));
2145 #endif
2146       process_hello (plugin, message);
2147 #if DEBUG_TRANSPORT
2148       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2149                   "Sending `%s' message to connecting peer `%4s'.\n", "ACK",
2150                   GNUNET_i2s(peer));
2151 #endif
2152       transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n);
2153       break;
2154     case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
2155       n->saw_ack = GNUNET_YES;
2156       /* intentional fall-through! */
2157     default:
2158 #if DEBUG_TRANSPORT
2159       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2160                   "Received message of type %u from `%4s', sending to all clients.\n",
2161                   ntohs (message->type),
2162                   GNUNET_i2s(peer));
2163 #endif
2164       /* transmit message to all clients */
2165       im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2166       im->header.size = htons (sizeof (struct InboundMessage) + msize);
2167       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2168       im->latency = GNUNET_TIME_relative_hton (latency);
2169       im->peer = *peer;
2170       memcpy (&im[1], message, msize);
2171
2172       cpos = clients;
2173       while (cpos != NULL)
2174         {
2175           transmit_to_client (cpos, &im->header, GNUNET_YES);
2176           cpos = cpos->next;
2177         }
2178       GNUNET_free (im);
2179     }
2180   GNUNET_assert (NULL != service_context->neighbour);
2181   return service_context;
2182 }
2183
2184
2185 /**
2186  * Handle START-message.  This is the first message sent to us
2187  * by any client which causes us to add it to our list.
2188  *
2189  * @param cls closure (always NULL)
2190  * @param client identification of the client
2191  * @param message the actual message
2192  */
2193 static void
2194 handle_start (void *cls,
2195               struct GNUNET_SERVER_Client *client,
2196               const struct GNUNET_MessageHeader *message)
2197 {
2198   struct TransportClient *c;
2199   struct ConnectInfoMessage cim;
2200   struct NeighbourList *n;
2201   struct InboundMessage *im;
2202   struct GNUNET_MessageHeader *ack;
2203
2204 #if DEBUG_TRANSPORT
2205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2206               "Received `%s' request from client\n", "START");
2207 #endif
2208   c = clients;
2209   while (c != NULL)
2210     {
2211       if (c->client == client)
2212         {
2213           /* client already on our list! */
2214           GNUNET_break (0);
2215           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2216           return;
2217         }
2218       c = c->next;
2219     }
2220   c = GNUNET_malloc (sizeof (struct TransportClient));
2221   c->next = clients;
2222   clients = c;
2223   c->client = client;
2224   if (our_hello != NULL)
2225     {
2226 #if DEBUG_TRANSPORT
2227       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2228                   "Sending our own `%s' to new client\n",
2229                   "HELLO");
2230 #endif
2231       transmit_to_client (c,
2232                           (const struct GNUNET_MessageHeader *) our_hello,
2233                           GNUNET_NO);
2234       /* tell new client about all existing connections */
2235       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
2236       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2237       cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
2238       cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2239       im = GNUNET_malloc (sizeof (struct InboundMessage) +
2240                           sizeof (struct GNUNET_MessageHeader));
2241       im->header.size = htons (sizeof (struct InboundMessage) +
2242                                sizeof (struct GNUNET_MessageHeader));
2243       im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2244       im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
2245       ack = (struct GNUNET_MessageHeader *) &im[1];
2246       ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2247       ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
2248       for (n = neighbours; n != NULL; n = n->next)
2249         {
2250           cim.id = n->id;
2251           transmit_to_client (c, &cim.header, GNUNET_NO);
2252           if (n->saw_ack)
2253             {
2254               im->peer = n->id;
2255               transmit_to_client (c, &im->header, GNUNET_NO);
2256             }
2257         }
2258       GNUNET_free (im);
2259     }
2260   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2261 }
2262
2263
2264 /**
2265  * Handle HELLO-message.
2266  *
2267  * @param cls closure (always NULL)
2268  * @param client identification of the client
2269  * @param message the actual message
2270  */
2271 static void
2272 handle_hello (void *cls,
2273               struct GNUNET_SERVER_Client *client,
2274               const struct GNUNET_MessageHeader *message)
2275 {
2276   int ret;
2277
2278 #if DEBUG_TRANSPORT
2279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2280               "Received `%s' request from client\n", "HELLO");
2281 #endif
2282   ret = process_hello (NULL, message);
2283   GNUNET_SERVER_receive_done (client, ret);
2284 }
2285
2286
2287 /**
2288  * Handle SEND-message.
2289  *
2290  * @param cls closure (always NULL)
2291  * @param client identification of the client
2292  * @param message the actual message
2293  */
2294 static void
2295 handle_send (void *cls,
2296              struct GNUNET_SERVER_Client *client,
2297              const struct GNUNET_MessageHeader *message)
2298 {
2299   struct TransportClient *tc;
2300   struct NeighbourList *n;
2301   const struct OutboundMessage *obm;
2302   const struct GNUNET_MessageHeader *obmm;
2303   uint16_t size;
2304   uint16_t msize;
2305
2306   size = ntohs (message->size);
2307   if (size <
2308       sizeof (struct OutboundMessage) + sizeof (struct GNUNET_MessageHeader))
2309     {
2310       GNUNET_break (0);
2311       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2312       return;
2313     }
2314   obm = (const struct OutboundMessage *) message;
2315 #if DEBUG_TRANSPORT
2316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2317               "Received `%s' request from client with target `%4s'\n",
2318               "SEND", GNUNET_i2s (&obm->peer));
2319 #endif
2320   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2321   msize = ntohs (obmm->size);
2322   if (size != msize + sizeof (struct OutboundMessage))
2323     {
2324       GNUNET_break (0);
2325       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2326       return;
2327     }
2328   n = find_neighbour (&obm->peer);
2329   if (n == NULL)
2330     n = setup_new_neighbour (&obm->peer);
2331   tc = clients;
2332   while ((tc != NULL) && (tc->client != client))
2333     tc = tc->next;
2334
2335 #if DEBUG_TRANSPORT
2336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2337               "Client asked to transmit %u-byte message of type %u to `%4s'\n",
2338               ntohs (obmm->size),
2339               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
2340 #endif
2341   transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n);
2342   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2343 }
2344
2345
2346 /**
2347  * Handle SET_QUOTA-message.
2348  *
2349  * @param cls closure (always NULL)
2350  * @param client identification of the client
2351  * @param message the actual message
2352  */
2353 static void
2354 handle_set_quota (void *cls,
2355                   struct GNUNET_SERVER_Client *client,
2356                   const struct GNUNET_MessageHeader *message)
2357 {
2358   const struct QuotaSetMessage *qsm =
2359     (const struct QuotaSetMessage *) message;
2360   struct NeighbourList *n;
2361   struct TransportPlugin *p;
2362   struct ReadyList *rl;
2363
2364 #if DEBUG_TRANSPORT
2365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366               "Received `%s' request from client for peer `%4s'\n",
2367               "SET_QUOTA", GNUNET_i2s (&qsm->peer));
2368 #endif
2369   n = find_neighbour (&qsm->peer);
2370   if (n == NULL)
2371     {
2372       GNUNET_SERVER_receive_done (client, GNUNET_OK);
2373       return;
2374     }
2375   update_quota (n);
2376   if (n->quota_in < ntohl (qsm->quota_in))
2377     n->last_quota_update = GNUNET_TIME_absolute_get ();
2378   n->quota_in = ntohl (qsm->quota_in);
2379   rl = n->plugins;
2380   while (rl != NULL)
2381     {
2382       p = rl->plugin;
2383       p->api->set_receive_quota (p->api->cls,
2384                                  &qsm->peer, ntohl (qsm->quota_in));
2385       rl = rl->next;
2386     }
2387   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2388 }
2389
2390
2391 /**
2392  * Handle TRY_CONNECT-message.
2393  *
2394  * @param cls closure (always NULL)
2395  * @param client identification of the client
2396  * @param message the actual message
2397  */
2398 static void
2399 handle_try_connect (void *cls,
2400                     struct GNUNET_SERVER_Client *client,
2401                     const struct GNUNET_MessageHeader *message)
2402 {
2403   const struct TryConnectMessage *tcm;
2404
2405   tcm = (const struct TryConnectMessage *) message;
2406 #if DEBUG_TRANSPORT
2407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2408               "Received `%s' request from client %p asking to connect to `%4s'\n",
2409               "TRY_CONNECT",
2410               client,
2411               GNUNET_i2s (&tcm->peer));
2412 #endif
2413   if (NULL == find_neighbour (&tcm->peer))
2414     setup_new_neighbour (&tcm->peer);
2415 #if DEBUG_TRANSPORT
2416   else
2417     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2418                 "Client asked to connect to `%4s', but connection already exists\n",
2419                 "TRY_CONNECT", 
2420                 GNUNET_i2s (&tcm->peer));
2421 #endif    
2422   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2423 }
2424
2425
2426 /**
2427  * List of handlers for the messages understood by this
2428  * service.
2429  */
2430 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2431   {&handle_start, NULL,
2432    GNUNET_MESSAGE_TYPE_TRANSPORT_START, 0},
2433   {&handle_hello, NULL,
2434    GNUNET_MESSAGE_TYPE_HELLO, 0},
2435   {&handle_send, NULL,
2436    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
2437   {&handle_set_quota, NULL,
2438    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
2439   {&handle_try_connect, NULL,
2440    GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
2441    sizeof (struct TryConnectMessage)},
2442   {NULL, NULL, 0, 0}
2443 };
2444
2445
2446 /**
2447  * Setup the environment for this plugin.
2448  */
2449 static void
2450 create_environment (struct TransportPlugin *plug)
2451 {
2452   plug->env.cfg = cfg;
2453   plug->env.sched = sched;
2454   plug->env.my_public_key = &my_public_key;
2455   plug->env.my_private_key = my_private_key;
2456   plug->env.my_identity = &my_identity;
2457   plug->env.cls = plug;
2458   plug->env.receive = &plugin_env_receive;
2459   plug->env.lookup = &plugin_env_lookup_address;
2460   plug->env.notify_address = &plugin_env_notify_address;
2461   plug->env.notify_validation = &plugin_env_notify_validation;
2462   plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2463   plug->env.max_connections = max_connect_per_transport;
2464 }
2465
2466
2467 /**
2468  * Start the specified transport (load the plugin).
2469  */
2470 static void
2471 start_transport (struct GNUNET_SERVER_Handle *server, const char *name)
2472 {
2473   struct TransportPlugin *plug;
2474   char *libname;
2475
2476   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2477               _("Loading `%s' transport plugin\n"), name);
2478   GNUNET_asprintf (&libname, "libgnunet_plugin_transport_%s", name);
2479   plug = GNUNET_malloc (sizeof (struct TransportPlugin));
2480   create_environment (plug);
2481   plug->short_name = GNUNET_strdup (name);
2482   plug->lib_name = libname;
2483   plug->next = plugins;
2484   plugins = plug;
2485   plug->api = GNUNET_PLUGIN_load (libname, &plug->env);
2486   if (plug->api == NULL)
2487     {
2488       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2489                   _("Failed to load transport plugin for `%s'\n"), name);
2490       GNUNET_free (plug->short_name);
2491       plugins = plug->next;
2492       GNUNET_free (libname);
2493       GNUNET_free (plug);
2494     }
2495 }
2496
2497
2498 /**
2499  * Called whenever a client is disconnected.  Frees our
2500  * resources associated with that client.
2501  *
2502  * @param cls closure
2503  * @param client identification of the client
2504  */
2505 static void
2506 client_disconnect_notification (void *cls,
2507                                 struct GNUNET_SERVER_Client *client)
2508 {
2509   struct TransportClient *pos;
2510   struct TransportClient *prev;
2511   struct ClientMessageQueueEntry *mqe;
2512
2513 #if DEBUG_TRANSPORT
2514   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2515               "Client disconnected, cleaning up.\n");
2516 #endif
2517   prev = NULL;
2518   pos = clients;
2519   while ((pos != NULL) && (pos->client != client))
2520     {
2521       prev = pos;
2522       pos = pos->next;
2523     }
2524   if (pos == NULL)
2525     return;
2526   while (NULL != (mqe = pos->message_queue_head))
2527     {
2528       pos->message_queue_head = mqe->next;
2529       GNUNET_free (mqe);
2530     }
2531   pos->message_queue_head = NULL;
2532   if (prev == NULL)
2533     clients = pos->next;
2534   else
2535     prev->next = pos->next;
2536   if (GNUNET_YES == pos->tcs_pending)
2537     {
2538       pos->client = NULL;
2539       return;
2540     }
2541   GNUNET_free (pos);
2542 }
2543
2544
2545 /**
2546  * Initiate transport service.
2547  *
2548  * @param cls closure
2549  * @param s scheduler to use
2550  * @param serv the initialized server
2551  * @param c configuration to use
2552  */
2553 static void
2554 run (void *cls,
2555      struct GNUNET_SCHEDULER_Handle *s,
2556      struct GNUNET_SERVER_Handle *serv, 
2557      const struct GNUNET_CONFIGURATION_Handle *c)
2558 {
2559   char *plugs;
2560   char *pos;
2561   int no_transports;
2562   unsigned long long tneigh;
2563   char *keyfile;
2564
2565   sched = s;
2566   cfg = c;
2567   /* parse configuration */
2568   if ((GNUNET_OK !=
2569        GNUNET_CONFIGURATION_get_value_number (c,
2570                                               "TRANSPORT",
2571                                               "NEIGHBOUR_LIMIT",
2572                                               &tneigh)) ||
2573       (GNUNET_OK !=
2574        GNUNET_CONFIGURATION_get_value_filename (c,
2575                                                 "GNUNETD",
2576                                                 "HOSTKEY", &keyfile)))
2577     {
2578       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2579                   _
2580                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
2581       GNUNET_SCHEDULER_shutdown (s);
2582       return;
2583     }
2584   max_connect_per_transport = (uint32_t) tneigh;
2585   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
2586   GNUNET_free (keyfile);
2587   if (my_private_key == NULL)
2588     {
2589       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2590                   _
2591                   ("Transport service could not access hostkey.  Exiting.\n"));
2592       GNUNET_SCHEDULER_shutdown (s);
2593       return;
2594     }
2595   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
2596   GNUNET_CRYPTO_hash (&my_public_key,
2597                       sizeof (my_public_key), &my_identity.hashPubKey);
2598   /* setup notification */
2599   server = serv;
2600   GNUNET_SERVER_disconnect_notify (server,
2601                                    &client_disconnect_notification, NULL);
2602   /* load plugins... */
2603   no_transports = 1;
2604   if (GNUNET_OK ==
2605       GNUNET_CONFIGURATION_get_value_string (c,
2606                                              "TRANSPORT", "PLUGINS", &plugs))
2607     {
2608       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2609                   _("Starting transport plugins `%s'\n"), plugs);
2610       pos = strtok (plugs, " ");
2611       while (pos != NULL)
2612         {
2613           start_transport (server, pos);
2614           no_transports = 0;
2615           pos = strtok (NULL, " ");
2616         }
2617       GNUNET_free (plugs);
2618     }
2619   if (no_transports)
2620     refresh_hello ();
2621 #if DEBUG_TRANSPORT
2622   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2623               _("Transport service ready.\n"));
2624 #endif
2625   /* process client requests */
2626   GNUNET_SERVER_add_handlers (server, handlers);
2627 }
2628
2629
2630 /**
2631  * Function called when the service shuts
2632  * down.  Unloads our plugins.
2633  *
2634  * @param cls closure
2635  * @param cfg configuration to use
2636  */
2637 static void
2638 unload_plugins (void *cls, 
2639                 const struct GNUNET_CONFIGURATION_Handle *cfg)
2640 {
2641   struct TransportPlugin *plug;
2642   struct AddressList *al;
2643
2644 #if DEBUG_TRANSPORT
2645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2646               "Transport service is unloading plugins...\n");
2647 #endif
2648   while (NULL != (plug = plugins))
2649     {
2650       plugins = plug->next;
2651       GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
2652       GNUNET_free (plug->lib_name);
2653       GNUNET_free (plug->short_name);
2654       while (NULL != (al = plug->addresses))
2655         {
2656           plug->addresses = al->next;
2657           GNUNET_free (al);
2658         }
2659       GNUNET_free (plug);
2660     }
2661   if (my_private_key != NULL)
2662     GNUNET_CRYPTO_rsa_key_free (my_private_key);
2663   GNUNET_free_non_null (our_hello);
2664 }
2665
2666
2667 /**
2668  * The main function for the transport service.
2669  *
2670  * @param argc number of arguments from the command line
2671  * @param argv command line arguments
2672  * @return 0 ok, 1 on error
2673  */
2674 int
2675 main (int argc, char *const *argv)
2676 {
2677   return (GNUNET_OK ==
2678           GNUNET_SERVICE_run (argc,
2679                               argv,
2680                               "transport",
2681                               &run, NULL, &unload_plugins, NULL)) ? 0 : 1;
2682 }
2683
2684 /* end of gnunet-service-transport.c */