dllfix
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 #define DEBUG_CORE_QUOTA GNUNET_NO
47
48 /**
49  * Receive and send buffer windows grow over time.  For
50  * how long can 'unused' bandwidth accumulate before we
51  * need to cap it?  (specified in seconds).
52  */
53 #define MAX_WINDOW_TIME_S (5 * 60)
54
55 /**
56  * How many messages do we queue up at most for optional
57  * notifications to a client?  (this can cause notifications
58  * about outgoing messages to be dropped).
59  */
60 #define MAX_NOTIFY_QUEUE 16
61
62 /**
63  * Minimum bandwidth (out) to assign to any connected peer.
64  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
65  * sense.
66  */
67 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * How long do we delay messages to get larger packet sizes (CORKing)?
80  */
81 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
82
83 /**
84  * What is the maximum delay for a SET_KEY message?
85  */
86 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
87
88 /**
89  * What how long do we wait for SET_KEY confirmation initially?
90  */
91 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
92
93 /**
94  * What is the maximum delay for a PING message?
95  */
96 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
97
98 /**
99  * What is the maximum delay for a PONG message?
100  */
101 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
102
103 /**
104  * What is the minimum frequency for a PING message?
105  */
106 #define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
107
108 /**
109  * How often do we recalculate bandwidth quotas?
110  */
111 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
112
113 /**
114  * What is the priority for a SET_KEY message?
115  */
116 #define SET_KEY_PRIORITY 0xFFFFFF
117
118 /**
119  * What is the priority for a PING message?
120  */
121 #define PING_PRIORITY 0xFFFFFF
122
123 /**
124  * What is the priority for a PONG message?
125  */
126 #define PONG_PRIORITY 0xFFFFFF
127
128 /**
129  * How many messages do we queue per peer at most?  Must be at
130  * least two.
131  */
132 #define MAX_PEER_QUEUE_SIZE 16
133
134 /**
135  * How many non-mandatory messages do we queue per client at most?
136  */
137 #define MAX_CLIENT_QUEUE_SIZE 32
138
139 /**
140  * What is the maximum age of a message for us to consider
141  * processing it?  Note that this looks at the timestamp used
142  * by the other peer, so clock skew between machines does
143  * come into play here.  So this should be picked high enough
144  * so that a little bit of clock skew does not prevent peers
145  * from connecting to us.
146  */
147 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
148
149 /**
150  * What is the maximum size for encrypted messages?  Note that this
151  * number imposes a clear limit on the maximum size of any message.
152  * Set to a value close to 64k but not so close that transports will
153  * have trouble with their headers.
154  */
155 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
156
157
158 /**
159  * State machine for our P2P encryption handshake.  Everyone starts in
160  * "DOWN", if we receive the other peer's key (other peer initiated)
161  * we start in state RECEIVED (since we will immediately send our
162  * own); otherwise we start in SENT.  If we get back a PONG from
163  * within either state, we move up to CONFIRMED (the PONG will always
164  * be sent back encrypted with the key we sent to the other peer).
165  */
166 enum PeerStateMachine
167 {
168   PEER_STATE_DOWN,
169   PEER_STATE_KEY_SENT,
170   PEER_STATE_KEY_RECEIVED,
171   PEER_STATE_KEY_CONFIRMED
172 };
173
174
175 /**
176  * Number of bytes (at the beginning) of "struct EncryptedMessage"
177  * that are NOT encrypted.
178  */
179 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
180
181
182 /**
183  * Encapsulation for encrypted messages exchanged between
184  * peers.  Followed by the actual encrypted data.
185  */
186 struct EncryptedMessage
187 {
188   /**
189    * Message type is either CORE_ENCRYPTED_MESSAGE.
190    */
191   struct GNUNET_MessageHeader header;
192
193   /**
194    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
195    * be set to the offset of the *next* field.
196    */
197   uint32_t iv_seed GNUNET_PACKED;
198
199   /**
200    * Hash of the plaintext (starting at 'sequence_number'), used to
201    * verify message integrity.  Everything after this hash (including
202    * this hash itself) will be encrypted.  
203    */
204   GNUNET_HashCode plaintext_hash;
205
206   /**
207    * Sequence number, in network byte order.  This field
208    * must be the first encrypted/decrypted field and the
209    * first byte that is hashed for the plaintext hash.
210    */
211   uint32_t sequence_number GNUNET_PACKED;
212
213   /**
214    * Desired bandwidth (how much we should send to this peer / how
215    * much is the sender willing to receive)?
216    */
217   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
218
219   /**
220    * Timestamp.  Used to prevent reply of ancient messages
221    * (recent messages are caught with the sequence number).
222    */
223   struct GNUNET_TIME_AbsoluteNBO timestamp;
224
225 };
226
227
228 /**
229  * We're sending an (encrypted) PING to the other peer to check if he
230  * can decrypt.  The other peer should respond with a PONG with the
231  * same content, except this time encrypted with the receiver's key.
232  */
233 struct PingMessage
234 {
235   /**
236    * Message type is CORE_PING.
237    */
238   struct GNUNET_MessageHeader header;
239
240   /**
241    * Random number chosen to make reply harder.
242    */
243   uint32_t challenge GNUNET_PACKED;
244
245   /**
246    * Intended target of the PING, used primarily to check
247    * that decryption actually worked.
248    */
249   struct GNUNET_PeerIdentity target;
250 };
251
252
253
254 /**
255  * Response to a PING.  Includes data from the original PING
256  * plus initial bandwidth quota information.
257  */
258 struct PongMessage
259 {
260   /**
261    * Message type is CORE_PONG.
262    */
263   struct GNUNET_MessageHeader header;
264
265   /**
266    * Random number proochosen to make reply harder.  Must be
267    * first field after header (this is where we start to encrypt!).
268    */
269   uint32_t challenge GNUNET_PACKED;
270
271   /**
272    * Must be zero.
273    */
274   uint32_t reserved GNUNET_PACKED;
275
276   /**
277    * Desired bandwidth (how much we should send to this
278    * peer / how much is the sender willing to receive).
279    */
280   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
281
282   /**
283    * Intended target of the PING, used primarily to check
284    * that decryption actually worked.
285    */
286   struct GNUNET_PeerIdentity target;
287 };
288
289
290 /**
291  * Message transmitted to set (or update) a session key.
292  */
293 struct SetKeyMessage
294 {
295
296   /**
297    * Message type is either CORE_SET_KEY.
298    */
299   struct GNUNET_MessageHeader header;
300
301   /**
302    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
303    */
304   int32_t sender_status GNUNET_PACKED;
305
306   /**
307    * Purpose of the signature, will be
308    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
309    */
310   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
311
312   /**
313    * At what time was this key created?
314    */
315   struct GNUNET_TIME_AbsoluteNBO creation_time;
316
317   /**
318    * The encrypted session key.
319    */
320   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
321
322   /**
323    * Who is the intended recipient?
324    */
325   struct GNUNET_PeerIdentity target;
326
327   /**
328    * Signature of the stuff above (starting at purpose).
329    */
330   struct GNUNET_CRYPTO_RsaSignature signature;
331
332 };
333
334
335 /**
336  * Message waiting for transmission. This struct
337  * is followed by the actual content of the message.
338  */
339 struct MessageEntry
340 {
341
342   /**
343    * We keep messages in a doubly linked list.
344    */
345   struct MessageEntry *next;
346
347   /**
348    * We keep messages in a doubly linked list.
349    */
350   struct MessageEntry *prev;
351
352   /**
353    * By when are we supposed to transmit this message?
354    */
355   struct GNUNET_TIME_Absolute deadline;
356
357   /**
358    * By when are we supposed to transmit this message (after
359    * giving slack)?
360    */
361   struct GNUNET_TIME_Absolute slack_deadline;
362
363   /**
364    * How important is this message to us?
365    */
366   unsigned int priority;
367
368   /**
369    * How long is the message? (number of bytes following
370    * the "struct MessageEntry", but not including the
371    * size of "struct MessageEntry" itself!)
372    */
373   uint16_t size;
374
375   /**
376    * Was this message selected for transmission in the
377    * current round? GNUNET_YES or GNUNET_NO.
378    */
379   int8_t do_transmit;
380
381   /**
382    * Did we give this message some slack (delayed sending) previously
383    * (and hence should not give it any more slack)? GNUNET_YES or
384    * GNUNET_NO.
385    */
386   int8_t got_slack;
387
388 };
389
390
391 struct Neighbour
392 {
393   /**
394    * We keep neighbours in a linked list (for now).
395    */
396   struct Neighbour *next;
397
398   /**
399    * Unencrypted messages destined for this peer.
400    */
401   struct MessageEntry *messages;
402
403   /**
404    * Head of the batched, encrypted message queue (already ordered,
405    * transmit starting with the head).
406    */
407   struct MessageEntry *encrypted_head;
408
409   /**
410    * Tail of the batched, encrypted message queue (already ordered,
411    * append new messages to tail)
412    */
413   struct MessageEntry *encrypted_tail;
414
415   /**
416    * Handle for pending requests for transmission to this peer
417    * with the transport service.  NULL if no request is pending.
418    */
419   struct GNUNET_TRANSPORT_TransmitHandle *th;
420
421   /**
422    * Public key of the neighbour, NULL if we don't have it yet.
423    */
424   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
425
426   /**
427    * We received a PING message before we got the "public_key"
428    * (or the SET_KEY).  We keep it here until we have a key
429    * to decrypt it.  NULL if no PING is pending.
430    */
431   struct PingMessage *pending_ping;
432
433   /**
434    * We received a PONG message before we got the "public_key"
435    * (or the SET_KEY).  We keep it here until we have a key
436    * to decrypt it.  NULL if no PONG is pending.
437    */
438   struct PongMessage *pending_pong;
439
440   /**
441    * Non-NULL if we are currently looking up HELLOs for this peer.
442    * for this peer.
443    */
444   struct GNUNET_PEERINFO_IteratorContext *pitr;
445
446   /**
447    * SetKeyMessage to transmit, NULL if we are not currently trying
448    * to send one.
449    */
450   struct SetKeyMessage *skm;
451
452   /**
453    * Identity of the neighbour.
454    */
455   struct GNUNET_PeerIdentity peer;
456
457   /**
458    * Key we use to encrypt our messages for the other peer
459    * (initialized by us when we do the handshake).
460    */
461   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
462
463   /**
464    * Key we use to decrypt messages from the other peer
465    * (given to us by the other peer during the handshake).
466    */
467   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
468
469   /**
470    * ID of task used for re-trying plaintext scheduling.
471    */
472   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
473
474   /**
475    * ID of task used for re-trying SET_KEY and PING message.
476    */
477   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
478
479   /**
480    * ID of task used for updating bandwidth quota for this neighbour.
481    */
482   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
483
484   /**
485    * ID of task used for sending keep-alive pings.
486    */
487   GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
488
489   /**
490    * ID of task used for cleaning up dead neighbour entries.
491    */
492   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
493
494   /**
495    * At what time did we generate our encryption key?
496    */
497   struct GNUNET_TIME_Absolute encrypt_key_created;
498
499   /**
500    * At what time did the other peer generate the decryption key?
501    */
502   struct GNUNET_TIME_Absolute decrypt_key_created;
503
504   /**
505    * At what time did we initially establish (as in, complete session
506    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
507    */
508   struct GNUNET_TIME_Absolute time_established;
509
510   /**
511    * At what time did we last receive an encrypted message from the
512    * other peer?  Should be zero if status != KEY_CONFIRMED.
513    */
514   struct GNUNET_TIME_Absolute last_activity;
515
516   /**
517    * Last latency observed from this peer.
518    */
519   struct GNUNET_TIME_Relative last_latency;
520
521   /**
522    * At what frequency are we currently re-trying SET_KEY messages?
523    */
524   struct GNUNET_TIME_Relative set_key_retry_frequency;
525
526   /**
527    * Tracking bandwidth for sending to this peer.
528    */
529   struct GNUNET_BANDWIDTH_Tracker available_send_window;
530
531   /**
532    * Tracking bandwidth for receiving from this peer.
533    */
534   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
535
536   /**
537    * How valueable were the messages of this peer recently?
538    */
539   unsigned long long current_preference;
540
541   /**
542    * Bit map indicating which of the 32 sequence numbers before the last
543    * were received (good for accepting out-of-order packets and
544    * estimating reliability of the connection)
545    */
546   unsigned int last_packets_bitmap;
547
548   /**
549    * last sequence number received on this connection (highest)
550    */
551   uint32_t last_sequence_number_received;
552
553   /**
554    * last sequence number transmitted
555    */
556   uint32_t last_sequence_number_sent;
557
558   /**
559    * Available bandwidth in for this peer (current target).
560    */
561   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
562
563   /**
564    * Available bandwidth out for this peer (current target).
565    */
566   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
567
568   /**
569    * Internal bandwidth limit set for this peer (initially typically
570    * set to "-1").  Actual "bw_out" is MIN of
571    * "bpm_out_internal_limit" and "bw_out_external_limit".
572    */
573   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
574
575   /**
576    * External bandwidth limit set for this peer by the
577    * peer that we are communicating with.  "bw_out" is MIN of
578    * "bw_out_internal_limit" and "bw_out_external_limit".
579    */
580   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
581
582   /**
583    * What was our PING challenge number (for this peer)?
584    */
585   uint32_t ping_challenge;
586
587   /**
588    * What was the last distance to this peer as reported by the transports?
589    */
590   uint32_t last_distance;
591
592   /**
593    * What is our connection status?
594    */
595   enum PeerStateMachine status;
596
597   /**
598    * Are we currently connected to this neighbour?
599    */ 
600   int is_connected;
601
602 };
603
604
605 /**
606  * Data structure for each client connected to the core service.
607  */
608 struct Client
609 {
610   /**
611    * Clients are kept in a linked list.
612    */
613   struct Client *next;
614
615   /**
616    * Handle for the client with the server API.
617    */
618   struct GNUNET_SERVER_Client *client_handle;
619
620   /**
621    * Array of the types of messages this peer cares
622    * about (with "tcnt" entries).  Allocated as part
623    * of this client struct, do not free!
624    */
625   const uint16_t *types;
626
627   /**
628    * Options for messages this client cares about,
629    * see GNUNET_CORE_OPTION_ values.
630    */
631   uint32_t options;
632
633   /**
634    * Number of types of incoming messages this client
635    * specifically cares about.  Size of the "types" array.
636    */
637   unsigned int tcnt;
638
639 };
640
641
642 /**
643  * Our public key.
644  */
645 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
646
647 /**
648  * Our identity.
649  */
650 static struct GNUNET_PeerIdentity my_identity;
651
652 /**
653  * Our private key.
654  */
655 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
656
657 /**
658  * Our scheduler.
659  */
660 struct GNUNET_SCHEDULER_Handle *sched;
661
662 /**
663  * Our configuration.
664  */
665 const struct GNUNET_CONFIGURATION_Handle *cfg;
666
667 /**
668  * Our server.
669  */
670 static struct GNUNET_SERVER_Handle *server;
671
672 /**
673  * Transport service.
674  */
675 static struct GNUNET_TRANSPORT_Handle *transport;
676
677 /**
678  * Linked list of our clients.
679  */
680 static struct Client *clients;
681
682 /**
683  * Context for notifications we need to send to our clients.
684  */
685 static struct GNUNET_SERVER_NotificationContext *notifier;
686
687 /**
688  * We keep neighbours in a linked list (for now).
689  */
690 static struct Neighbour *neighbours;
691
692 /**
693  * Sum of all preferences among all neighbours.
694  */
695 static unsigned long long preference_sum;
696
697 /**
698  * Total number of neighbours we have.
699  */
700 static unsigned int neighbour_count;
701
702 /**
703  * How much inbound bandwidth are we supposed to be using per second?
704  * FIXME: this value is not used!
705  */
706 static unsigned long long bandwidth_target_in_bps;
707
708 /**
709  * How much outbound bandwidth are we supposed to be using per second?
710  */
711 static unsigned long long bandwidth_target_out_bps;
712
713
714
715 /**
716  * A preference value for a neighbour was update.  Update
717  * the preference sum accordingly.
718  *
719  * @param inc how much was a preference value increased?
720  */
721 static void
722 update_preference_sum (unsigned long long inc)
723 {
724   struct Neighbour *n;
725   unsigned long long os;
726
727   os = preference_sum;
728   preference_sum += inc;
729   if (preference_sum >= os)
730     return; /* done! */
731   /* overflow! compensate by cutting all values in half! */
732   preference_sum = 0;
733   n = neighbours;
734   while (n != NULL)
735     {
736       n->current_preference /= 2;
737       preference_sum += n->current_preference;
738       n = n->next;
739     }    
740 }
741
742
743 /**
744  * Find the entry for the given neighbour.
745  *
746  * @param peer identity of the neighbour
747  * @return NULL if we are not connected, otherwise the
748  *         neighbour's entry.
749  */
750 static struct Neighbour *
751 find_neighbour (const struct GNUNET_PeerIdentity *peer)
752 {
753   struct Neighbour *ret;
754
755   ret = neighbours;
756   while ((ret != NULL) &&
757          (0 != memcmp (&ret->peer,
758                        peer, sizeof (struct GNUNET_PeerIdentity))))
759     ret = ret->next;
760   return ret;
761 }
762
763
764 /**
765  * Send a message to one of our clients.
766  *
767  * @param client target for the message
768  * @param msg message to transmit
769  * @param can_drop could this message be dropped if the
770  *        client's queue is getting too large?
771  */
772 static void
773 send_to_client (struct Client *client,
774                 const struct GNUNET_MessageHeader *msg, 
775                 int can_drop)
776 {
777 #if DEBUG_CORE_CLIENT
778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779               "Preparing to send message of type %u to client.\n",
780               ntohs (msg->type));
781 #endif  
782   GNUNET_SERVER_notification_context_unicast (notifier,
783                                               client->client_handle,
784                                               msg,
785                                               can_drop);
786 }
787
788
789 /**
790  * Send a message to all of our current clients that have
791  * the right options set.
792  * 
793  * @param msg message to multicast
794  * @param can_drop can this message be discarded if the queue is too long
795  * @param options mask to use 
796  */
797 static void
798 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
799                      int can_drop,
800                      int options)
801 {
802   struct Client *c;
803
804   c = clients;
805   while (c != NULL)
806     {
807       if (0 != (c->options & options))
808         send_to_client (c, msg, can_drop);
809       c = c->next;
810     }
811 }
812
813
814 /**
815  * Handle CORE_INIT request.
816  */
817 static void
818 handle_client_init (void *cls,
819                     struct GNUNET_SERVER_Client *client,
820                     const struct GNUNET_MessageHeader *message)
821 {
822   const struct InitMessage *im;
823   struct InitReplyMessage irm;
824   struct Client *c;
825   uint16_t msize;
826   const uint16_t *types;
827   uint16_t *wtypes;
828   struct Neighbour *n;
829   struct ConnectNotifyMessage cnm;
830   unsigned int i;
831
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Client connecting to core service with `%s' message\n",
835               "INIT");
836 #endif
837   /* check that we don't have an entry already */
838   c = clients;
839   while (c != NULL)
840     {
841       if (client == c->client_handle)
842         {
843           GNUNET_break (0);
844           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
845           return;
846         }
847       c = c->next;
848     }
849   msize = ntohs (message->size);
850   if (msize < sizeof (struct InitMessage))
851     {
852       GNUNET_break (0);
853       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
854       return;
855     }
856   GNUNET_SERVER_notification_context_add (notifier, client);
857   im = (const struct InitMessage *) message;
858   types = (const uint16_t *) &im[1];
859   msize -= sizeof (struct InitMessage);
860   c = GNUNET_malloc (sizeof (struct Client) + msize);
861   c->client_handle = client;
862   c->next = clients;
863   clients = c;
864   c->tcnt = msize / sizeof (uint16_t);
865   c->types = (const uint16_t *) &c[1];
866   wtypes = (uint16_t *) &c[1];
867   for (i=0;i<c->tcnt;i++)
868     wtypes[i] = ntohs (types[i]);
869   c->options = ntohl (im->options);
870 #if DEBUG_CORE_CLIENT
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872               "Client %p is interested in %u message types\n",
873               c,
874               c->tcnt);
875 #endif
876   /* send init reply message */
877   irm.header.size = htons (sizeof (struct InitReplyMessage));
878   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
879   irm.reserved = htonl (0);
880   memcpy (&irm.publicKey,
881           &my_public_key,
882           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
883 #if DEBUG_CORE_CLIENT
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "Sending `%s' message to client.\n", "INIT_REPLY");
886 #endif
887   send_to_client (c, &irm.header, GNUNET_NO);
888   /* notify new client about existing neighbours */
889   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
890   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
891   n = neighbours;
892   while (n != NULL)
893     {
894       if (n->status == PEER_STATE_KEY_CONFIRMED)
895         {
896 #if DEBUG_CORE_CLIENT
897           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898                       "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
899 #endif
900           cnm.distance = htonl (n->last_distance);
901           cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
902           cnm.peer = n->peer;
903           send_to_client (c, &cnm.header, GNUNET_NO);
904         }
905       n = n->next;
906     }
907   GNUNET_SERVER_receive_done (client, GNUNET_OK);
908 }
909
910
911 /**
912  * A client disconnected, clean up.
913  *
914  * @param cls closure
915  * @param client identification of the client
916  */
917 static void
918 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
919 {
920   struct Client *pos;
921   struct Client *prev;
922
923   if (client == NULL)
924     return;
925 #if DEBUG_CORE_CLIENT
926   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927               "Client %p has disconnected from core service.\n",
928               client);
929 #endif
930   prev = NULL;
931   pos = clients;
932   while (pos != NULL)
933     {
934       if (client == pos->client_handle)
935         {
936           if (prev == NULL)
937             clients = pos->next;
938           else
939             prev->next = pos->next;
940           GNUNET_free (pos);
941           return;
942         }
943       prev = pos;
944       pos = pos->next;
945     }
946   /* client never sent INIT */
947 }
948
949
950 /**
951  * Handle REQUEST_INFO request.
952  */
953 static void
954 handle_client_request_info (void *cls,
955                             struct GNUNET_SERVER_Client *client,
956                             const struct GNUNET_MessageHeader *message)
957 {
958   const struct RequestInfoMessage *rcm;
959   struct Neighbour *n;
960   struct ConfigurationInfoMessage cim;
961   int32_t want_reserv;
962   int32_t got_reserv;
963   unsigned long long old_preference;
964   struct GNUNET_SERVER_TransmitContext *tc;
965
966 #if DEBUG_CORE_CLIENT
967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968               "Core service receives `%s' request.\n", "REQUEST_INFO");
969 #endif
970   rcm = (const struct RequestInfoMessage *) message;
971   n = find_neighbour (&rcm->peer);
972   memset (&cim, 0, sizeof (cim));
973   if (n != NULL) 
974     {
975       want_reserv = ntohl (rcm->reserve_inbound);
976       if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
977         {
978           n->bw_out_internal_limit = rcm->limit_outbound;
979           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
980                                                   n->bw_out_external_limit);
981           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
982                                                  n->bw_out);
983           GNUNET_TRANSPORT_set_quota (transport,
984                                       &n->peer,
985                                       n->bw_in,
986                                       n->bw_out,
987                                       GNUNET_TIME_UNIT_FOREVER_REL,
988                                       NULL, NULL); 
989         }
990       if (want_reserv < 0)
991         {
992           got_reserv = want_reserv;
993         }
994       else if (want_reserv > 0)
995         {
996           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
997                                                   want_reserv).value == 0)
998             got_reserv = want_reserv;
999           else
1000             got_reserv = 0; /* all or nothing */
1001         }
1002       else
1003         got_reserv = 0;
1004       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
1005                                         got_reserv);
1006       old_preference = n->current_preference;
1007       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1008       if (old_preference > n->current_preference) 
1009         {
1010           /* overflow; cap at maximum value */
1011           n->current_preference = (unsigned long long) -1;
1012         }
1013       update_preference_sum (n->current_preference - old_preference);
1014 #if DEBUG_CORE_QUOTA
1015       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
1017                   (int) want_reserv,
1018                   GNUNET_i2s (&rcm->peer),
1019                   (int) got_reserv);
1020 #endif
1021       cim.reserved_amount = htonl (got_reserv);
1022       cim.bw_in = n->bw_in;
1023       cim.bw_out = n->bw_out;
1024       cim.preference = n->current_preference;
1025     }
1026   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1027   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1028   cim.peer = rcm->peer;
1029
1030 #if DEBUG_CORE_CLIENT
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1033 #endif
1034   tc = GNUNET_SERVER_transmit_context_create (client);
1035   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1036   GNUNET_SERVER_transmit_context_run (tc,
1037                                       GNUNET_TIME_UNIT_FOREVER_REL);
1038 }
1039
1040
1041 /**
1042  * Free the given entry for the neighbour (it has
1043  * already been removed from the list at this point).
1044  *
1045  * @param n neighbour to free
1046  */
1047 static void
1048 free_neighbour (struct Neighbour *n)
1049 {
1050   struct MessageEntry *m;
1051
1052   if (n->pitr != NULL)
1053     {
1054       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1055       n->pitr = NULL;
1056     }
1057   if (n->skm != NULL)
1058     {
1059       GNUNET_free (n->skm);
1060       n->skm = NULL;
1061     }
1062   while (NULL != (m = n->messages))
1063     {
1064       n->messages = m->next;
1065       GNUNET_free (m);
1066     }
1067   while (NULL != (m = n->encrypted_head))
1068     {
1069       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1070                                    n->encrypted_tail,
1071                                    m);
1072       GNUNET_free (m);
1073     }
1074   if (NULL != n->th)
1075     {
1076       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1077       n->th = NULL;
1078     }
1079   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1080     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1081   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1082     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1083   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1084     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1085   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1086     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1087   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1088     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
1089   GNUNET_free_non_null (n->public_key);
1090   GNUNET_free_non_null (n->pending_ping);
1091   GNUNET_free_non_null (n->pending_pong);
1092   GNUNET_free (n);
1093 }
1094
1095
1096 /**
1097  * Check if we have encrypted messages for the specified neighbour
1098  * pending, and if so, check with the transport about sending them
1099  * out.
1100  *
1101  * @param n neighbour to check.
1102  */
1103 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1104
1105
1106 /**
1107  * Encrypt size bytes from in and write the result to out.  Use the
1108  * key for outbound traffic of the given neighbour.
1109  *
1110  * @param n neighbour we are sending to
1111  * @param iv initialization vector to use
1112  * @param in ciphertext
1113  * @param out plaintext
1114  * @param size size of in/out
1115  * @return GNUNET_OK on success
1116  */
1117 static int
1118 do_encrypt (struct Neighbour *n,
1119             const GNUNET_HashCode * iv,
1120             const void *in, void *out, size_t size)
1121 {
1122   if (size != (uint16_t) size)
1123     {
1124       GNUNET_break (0);
1125       return GNUNET_NO;
1126     }
1127   GNUNET_assert (size ==
1128                  GNUNET_CRYPTO_aes_encrypt (in,
1129                                             (uint16_t) size,
1130                                             &n->encrypt_key,
1131                                             (const struct
1132                                              GNUNET_CRYPTO_AesInitializationVector
1133                                              *) iv, out));
1134 #if DEBUG_CORE
1135   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136               "Encrypted %u bytes for `%4s' using key %u\n", size,
1137               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1138 #endif
1139   return GNUNET_OK;
1140 }
1141
1142
1143 /**
1144  * Consider freeing the given neighbour since we may not need
1145  * to keep it around anymore.
1146  *
1147  * @param n neighbour to consider discarding
1148  */
1149 static void
1150 consider_free_neighbour (struct Neighbour *n);
1151
1152
1153 /**
1154  * Task triggered when a neighbour entry is about to time out 
1155  * (and we should prevent this by sending a PING).
1156  *
1157  * @param cls the 'struct Neighbour'
1158  * @param tc scheduler context (not used)
1159  */
1160 static void
1161 send_keep_alive (void *cls,
1162                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1163 {
1164   struct Neighbour *n = cls;
1165   struct GNUNET_TIME_Relative retry;
1166   struct GNUNET_TIME_Relative left;
1167   struct MessageEntry *me;
1168   struct PingMessage pp;
1169   struct PingMessage *pm;
1170
1171   n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
1172   /* send PING */
1173   me = GNUNET_malloc (sizeof (struct MessageEntry) +
1174                       sizeof (struct PingMessage));
1175   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
1176   me->priority = PING_PRIORITY;
1177   me->size = sizeof (struct PingMessage);
1178   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1179                                      n->encrypted_tail,
1180                                      n->encrypted_tail,
1181                                      me);
1182   pm = (struct PingMessage *) &me[1];
1183   pm->header.size = htons (sizeof (struct PingMessage));
1184   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
1185   pp.challenge = htonl (n->ping_challenge);
1186   pp.target = n->peer;
1187 #if DEBUG_CORE
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Encrypting `%s' and `%s' messages for `%4s'.\n",
1190               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
1191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
1193               "PING",
1194               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
1195 #endif
1196   do_encrypt (n,
1197               &n->peer.hashPubKey,
1198               &pp.challenge,
1199               &pm->challenge,
1200               sizeof (struct PingMessage) -
1201               sizeof (struct GNUNET_MessageHeader));
1202   process_encrypted_neighbour_queue (n);
1203   /* reschedule PING job */
1204   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1205                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1206   retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
1207                                     MIN_PING_FREQUENCY);
1208   n->keep_alive_task 
1209     = GNUNET_SCHEDULER_add_delayed (sched, 
1210                                     retry,
1211                                     &send_keep_alive,
1212                                     n);
1213
1214 }
1215
1216
1217 /**
1218  * Task triggered when a neighbour entry might have gotten stale.
1219  *
1220  * @param cls the 'struct Neighbour'
1221  * @param tc scheduler context (not used)
1222  */
1223 static void
1224 consider_free_task (void *cls,
1225                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1226 {
1227   struct Neighbour *n = cls;
1228
1229   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1230   consider_free_neighbour (n);
1231 }
1232
1233
1234 /**
1235  * Consider freeing the given neighbour since we may not need
1236  * to keep it around anymore.
1237  *
1238  * @param n neighbour to consider discarding
1239  */
1240 static void
1241 consider_free_neighbour (struct Neighbour *n)
1242
1243   struct Neighbour *pos;
1244   struct Neighbour *prev;
1245   struct GNUNET_TIME_Relative left;
1246
1247   if ( (n->th != NULL) ||
1248        (n->pitr != NULL) ||
1249        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1250        (GNUNET_YES == n->is_connected) )
1251     return; /* no chance */
1252   
1253   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1254                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1255   if (left.value > 0)
1256     {
1257       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1258         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1259       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1260                                                          left,
1261                                                          &consider_free_task,
1262                                                          n);
1263       return;
1264     }
1265   /* actually free the neighbour... */
1266   prev = NULL;
1267   pos = neighbours;
1268   while (pos != n)
1269     {
1270       prev = pos;
1271       pos = pos->next;
1272     }
1273   if (prev == NULL)
1274     neighbours = n->next;
1275   else
1276     prev->next = n->next;
1277   GNUNET_assert (neighbour_count > 0);
1278   neighbour_count--;
1279   free_neighbour (n);
1280 }
1281
1282
1283 /**
1284  * Function called when the transport service is ready to
1285  * receive an encrypted message for the respective peer
1286  *
1287  * @param cls neighbour to use message from
1288  * @param size number of bytes we can transmit
1289  * @param buf where to copy the message
1290  * @return number of bytes transmitted
1291  */
1292 static size_t
1293 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1294 {
1295   struct Neighbour *n = cls;
1296   struct MessageEntry *m;
1297   size_t ret;
1298   char *cbuf;
1299
1300   n->th = NULL;
1301   GNUNET_assert (NULL != (m = n->encrypted_head));
1302   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1303                                n->encrypted_tail,
1304                                m);
1305   ret = 0;
1306   cbuf = buf;
1307   if (buf != NULL)
1308     {
1309       GNUNET_assert (size >= m->size);
1310       memcpy (cbuf, &m[1], m->size);
1311       ret = m->size;
1312       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1313                                         m->size);
1314 #if DEBUG_CORE
1315       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1317                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1318                   ret, GNUNET_i2s (&n->peer));
1319 #endif
1320       process_encrypted_neighbour_queue (n);
1321     }
1322   else
1323     {
1324 #if DEBUG_CORE
1325       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326                   "Transmission of message of type %u and size %u failed\n",
1327                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1328                   m->size);
1329 #endif
1330     }
1331   GNUNET_free (m);
1332   consider_free_neighbour (n);
1333   return ret;
1334 }
1335
1336
1337 /**
1338  * Check if we have plaintext messages for the specified neighbour
1339  * pending, and if so, consider batching and encrypting them (and
1340  * then trigger processing of the encrypted queue if needed).
1341  *
1342  * @param n neighbour to check.
1343  */
1344 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1345
1346
1347 /**
1348  * Check if we have encrypted messages for the specified neighbour
1349  * pending, and if so, check with the transport about sending them
1350  * out.
1351  *
1352  * @param n neighbour to check.
1353  */
1354 static void
1355 process_encrypted_neighbour_queue (struct Neighbour *n)
1356 {
1357   struct MessageEntry *m;
1358  
1359   if (n->th != NULL)
1360     return;  /* request already pending */
1361   m = n->encrypted_head;
1362   if (m == NULL)
1363     {
1364       /* encrypted queue empty, try plaintext instead */
1365       process_plaintext_neighbour_queue (n);
1366       return;
1367     }
1368 #if DEBUG_CORE
1369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1371               m->size,
1372               GNUNET_i2s (&n->peer),
1373               GNUNET_TIME_absolute_get_remaining (m->deadline).
1374               value);
1375 #endif
1376   n->th =
1377     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1378                                             m->size,
1379                                             m->priority,
1380                                             GNUNET_TIME_absolute_get_remaining
1381                                             (m->deadline),
1382                                             &notify_encrypted_transmit_ready,
1383                                             n);
1384   if (n->th == NULL)
1385     {
1386       /* message request too large or duplicate request */
1387       GNUNET_break (0);
1388       /* discard encrypted message */
1389       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1390                                    n->encrypted_tail,
1391                                    m);
1392       GNUNET_free (m);
1393       process_encrypted_neighbour_queue (n);
1394     }
1395 }
1396
1397
1398 /**
1399  * Decrypt size bytes from in and write the result to out.  Use the
1400  * key for inbound traffic of the given neighbour.  This function does
1401  * NOT do any integrity-checks on the result.
1402  *
1403  * @param n neighbour we are receiving from
1404  * @param iv initialization vector to use
1405  * @param in ciphertext
1406  * @param out plaintext
1407  * @param size size of in/out
1408  * @return GNUNET_OK on success
1409  */
1410 static int
1411 do_decrypt (struct Neighbour *n,
1412             const GNUNET_HashCode * iv,
1413             const void *in, void *out, size_t size)
1414 {
1415   if (size != (uint16_t) size)
1416     {
1417       GNUNET_break (0);
1418       return GNUNET_NO;
1419     }
1420   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1421       (n->status != PEER_STATE_KEY_CONFIRMED))
1422     {
1423       GNUNET_break_op (0);
1424       return GNUNET_SYSERR;
1425     }
1426   if (size !=
1427       GNUNET_CRYPTO_aes_decrypt (in,
1428                                  (uint16_t) size,
1429                                  &n->decrypt_key,
1430                                  (const struct
1431                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1432                                  out))
1433     {
1434       GNUNET_break (0);
1435       return GNUNET_SYSERR;
1436     }
1437 #if DEBUG_CORE
1438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439               "Decrypted %u bytes from `%4s' using key %u\n",
1440               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1441 #endif
1442   return GNUNET_OK;
1443 }
1444
1445
1446 /**
1447  * Select messages for transmission.  This heuristic uses a combination
1448  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1449  * and priority-based discard (in case no feasible schedule exist) and
1450  * speculative optimization (defer any kind of transmission until
1451  * we either create a batch of significant size, 25% of max, or until
1452  * we are close to a deadline).  Furthermore, when scheduling the
1453  * heuristic also packs as many messages into the batch as possible,
1454  * starting with those with the earliest deadline.  Yes, this is fun.
1455  *
1456  * @param n neighbour to select messages from
1457  * @param size number of bytes to select for transmission
1458  * @param retry_time set to the time when we should try again
1459  *        (only valid if this function returns zero)
1460  * @return number of bytes selected, or 0 if we decided to
1461  *         defer scheduling overall; in that case, retry_time is set.
1462  */
1463 static size_t
1464 select_messages (struct Neighbour *n,
1465                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1466 {
1467   struct MessageEntry *pos;
1468   struct MessageEntry *min;
1469   struct MessageEntry *last;
1470   unsigned int min_prio;
1471   struct GNUNET_TIME_Absolute t;
1472   struct GNUNET_TIME_Absolute now;
1473   struct GNUNET_TIME_Relative delta;
1474   uint64_t avail;
1475   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1476   size_t off;
1477   uint64_t tsize;
1478   unsigned int queue_size;
1479   int discard_low_prio;
1480
1481   GNUNET_assert (NULL != n->messages);
1482   now = GNUNET_TIME_absolute_get ();
1483   /* last entry in linked list of messages processed */
1484   last = NULL;
1485   /* should we remove the entry with the lowest
1486      priority from consideration for scheduling at the
1487      end of the loop? */
1488   queue_size = 0;
1489   tsize = 0;
1490   pos = n->messages;
1491   while (pos != NULL)
1492     {
1493       queue_size++;
1494       tsize += pos->size;
1495       pos = pos->next;
1496     }
1497   discard_low_prio = GNUNET_YES;
1498   while (GNUNET_YES == discard_low_prio)
1499     {
1500       min = NULL;
1501       min_prio = -1;
1502       discard_low_prio = GNUNET_NO;
1503       /* calculate number of bytes available for transmission at time "t" */
1504       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1505       t = now;
1506       /* how many bytes have we (hypothetically) scheduled so far */
1507       off = 0;
1508       /* maximum time we can wait before transmitting anything
1509          and still make all of our deadlines */
1510       slack = MAX_CORK_DELAY;
1511       pos = n->messages;
1512       /* note that we use "*2" here because we want to look
1513          a bit further into the future; much more makes no
1514          sense since new message might be scheduled in the
1515          meantime... */
1516       while ((pos != NULL) && (off < size * 2))
1517         {         
1518           if (pos->do_transmit == GNUNET_YES)
1519             {
1520               /* already removed from consideration */
1521               pos = pos->next;
1522               continue;
1523             }
1524           if (discard_low_prio == GNUNET_NO)
1525             {
1526               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1527               if (delta.value > 0)
1528                 {
1529                   // FIXME: HUH? Check!
1530                   t = pos->deadline;
1531                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1532                                                                        delta);
1533                 }
1534               if (avail < pos->size)
1535                 {
1536                   // FIXME: HUH? Check!
1537                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1538                 }
1539               else
1540                 {
1541                   avail -= pos->size;
1542                   /* update slack, considering both its absolute deadline
1543                      and relative deadlines caused by other messages
1544                      with their respective load */
1545                   slack = GNUNET_TIME_relative_min (slack,
1546                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1547                                                                                           avail));
1548                   if (pos->deadline.value <= now.value) 
1549                     {
1550                       /* now or never */
1551                       slack = GNUNET_TIME_UNIT_ZERO;
1552                     }
1553                   else if (GNUNET_YES == pos->got_slack)
1554                     {
1555                       /* should be soon now! */
1556                       slack = GNUNET_TIME_relative_min (slack,
1557                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1558                     }
1559                   else
1560                     {
1561                       slack =
1562                         GNUNET_TIME_relative_min (slack, 
1563                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1564                       pos->got_slack = GNUNET_YES;
1565                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1566                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1567                     }
1568                 }
1569             }
1570           off += pos->size;
1571           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1572           if (pos->priority <= min_prio)
1573             {
1574               /* update min for discard */
1575               min_prio = pos->priority;
1576               min = pos;
1577             }
1578           pos = pos->next;
1579         }
1580       if (discard_low_prio)
1581         {
1582           GNUNET_assert (min != NULL);
1583           /* remove lowest-priority entry from consideration */
1584           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1585         }
1586       last = pos;
1587     }
1588   /* guard against sending "tiny" messages with large headers without
1589      urgent deadlines */
1590   if ( (slack.value > 0) && 
1591        (size > 4 * off) &&
1592        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1593     {
1594       /* less than 25% of message would be filled with deadlines still
1595          being met if we delay by one second or more; so just wait for
1596          more data; but do not wait longer than 1s (since we don't want
1597          to delay messages for a really long time either). */
1598       *retry_time = MAX_CORK_DELAY;
1599       /* reset do_transmit values for next time */
1600       while (pos != last)
1601         {
1602           pos->do_transmit = GNUNET_NO;   
1603           pos = pos->next;
1604         }
1605 #if DEBUG_CORE
1606       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1607                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1608                   (unsigned long long) slack.value,
1609                   (unsigned int) off,
1610                   (unsigned int) size);
1611 #endif
1612       return 0;
1613     }
1614   /* select marked messages (up to size) for transmission */
1615   off = 0;
1616   pos = n->messages;
1617   while (pos != last)
1618     {
1619       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1620         {
1621           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1622           off += pos->size;
1623           size -= pos->size;
1624         }
1625       else
1626         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1627       pos = pos->next;
1628     }
1629 #if DEBUG_CORE
1630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1632               off, tsize,
1633               queue_size, MAX_PEER_QUEUE_SIZE,
1634               GNUNET_i2s (&n->peer));
1635 #endif
1636   return off;
1637 }
1638
1639
1640 /**
1641  * Batch multiple messages into a larger buffer.
1642  *
1643  * @param n neighbour to take messages from
1644  * @param buf target buffer
1645  * @param size size of buf
1646  * @param deadline set to transmission deadline for the result
1647  * @param retry_time set to the time when we should try again
1648  *        (only valid if this function returns zero)
1649  * @param priority set to the priority of the batch
1650  * @return number of bytes written to buf (can be zero)
1651  */
1652 static size_t
1653 batch_message (struct Neighbour *n,
1654                char *buf,
1655                size_t size,
1656                struct GNUNET_TIME_Absolute *deadline,
1657                struct GNUNET_TIME_Relative *retry_time,
1658                unsigned int *priority)
1659 {
1660   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1661   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1662   struct MessageEntry *pos;
1663   struct MessageEntry *prev;
1664   struct MessageEntry *next;
1665   size_t ret;
1666   
1667   ret = 0;
1668   *priority = 0;
1669   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1670   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1671   if (0 == select_messages (n, size, retry_time))
1672     {
1673 #if DEBUG_CORE
1674       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1675                   "No messages selected, will try again in %llu ms\n",
1676                   retry_time->value);
1677 #endif
1678       return 0;
1679     }
1680   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1681   ntm->distance = htonl (n->last_distance);
1682   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1683   ntm->peer = n->peer;
1684   
1685   pos = n->messages;
1686   prev = NULL;
1687   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1688     {
1689       next = pos->next;
1690       if (GNUNET_YES == pos->do_transmit)
1691         {
1692           GNUNET_assert (pos->size <= size);
1693           /* do notifications */
1694           /* FIXME: track if we have *any* client that wants
1695              full notifications and only do this if that is
1696              actually true */
1697           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1698             {
1699               memcpy (&ntm[1], &pos[1], pos->size);
1700               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1701                                         sizeof (struct GNUNET_MessageHeader));
1702               send_to_all_clients (&ntm->header,
1703                                    GNUNET_YES,
1704                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1705             }
1706           else
1707             {
1708               /* message too large for 'full' notifications, we do at
1709                  least the 'hdr' type */
1710               memcpy (&ntm[1],
1711                       &pos[1],
1712                       sizeof (struct GNUNET_MessageHeader));
1713             }
1714           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1715                                     pos->size);
1716           send_to_all_clients (&ntm->header,
1717                                GNUNET_YES,
1718                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1719 #if DEBUG_HANDSHAKE
1720           fprintf (stderr,
1721                    "Encrypting message of type %u\n",
1722                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1723 #endif
1724           /* copy for encrypted transmission */
1725           memcpy (&buf[ret], &pos[1], pos->size);
1726           ret += pos->size;
1727           size -= pos->size;
1728           *priority += pos->priority;
1729 #if DEBUG_CORE
1730           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731                       "Adding plaintext message of size %u with deadline %llu ms to batch\n",
1732                       pos->size,
1733                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1734 #endif
1735           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1736           GNUNET_free (pos);
1737           if (prev == NULL)
1738             n->messages = next;
1739           else
1740             prev->next = next;
1741         }
1742       else
1743         {
1744           prev = pos;
1745         }
1746       pos = next;
1747     }
1748 #if DEBUG_CORE
1749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750               "Deadline for message batch is %llu ms\n",
1751               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1752 #endif
1753   return ret;
1754 }
1755
1756
1757 /**
1758  * Remove messages with deadlines that have long expired from
1759  * the queue.
1760  *
1761  * @param n neighbour to inspect
1762  */
1763 static void
1764 discard_expired_messages (struct Neighbour *n)
1765 {
1766   struct MessageEntry *prev;
1767   struct MessageEntry *next;
1768   struct MessageEntry *pos;
1769   struct GNUNET_TIME_Absolute now;
1770   struct GNUNET_TIME_Relative delta;
1771
1772   now = GNUNET_TIME_absolute_get ();
1773   prev = NULL;
1774   pos = n->messages;
1775   while (pos != NULL) 
1776     {
1777       next = pos->next;
1778       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1779       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1780         {
1781 #if DEBUG_CORE
1782           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1783                       "Message is %llu ms past due, discarding.\n",
1784                       delta.value);
1785 #endif
1786           if (prev == NULL)
1787             n->messages = next;
1788           else
1789             prev->next = next;
1790           GNUNET_free (pos);
1791         }
1792       else
1793         prev = pos;
1794       pos = next;
1795     }
1796 }
1797
1798
1799 /**
1800  * Signature of the main function of a task.
1801  *
1802  * @param cls closure
1803  * @param tc context information (why was this task triggered now)
1804  */
1805 static void
1806 retry_plaintext_processing (void *cls,
1807                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1808 {
1809   struct Neighbour *n = cls;
1810
1811   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1812   process_plaintext_neighbour_queue (n);
1813 }
1814
1815
1816 /**
1817  * Send our key (and encrypted PING) to the other peer.
1818  *
1819  * @param n the other peer
1820  */
1821 static void send_key (struct Neighbour *n);
1822
1823 /**
1824  * Task that will retry "send_key" if our previous attempt failed
1825  * to yield a PONG.
1826  */
1827 static void
1828 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1829 {
1830   struct Neighbour *n = cls;
1831
1832 #if DEBUG_CORE
1833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834               "Retrying key transmission to `%4s'\n",
1835               GNUNET_i2s (&n->peer));
1836 #endif
1837   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1838   n->set_key_retry_frequency =
1839     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1840   send_key (n);
1841 }
1842
1843
1844 /**
1845  * Check if we have plaintext messages for the specified neighbour
1846  * pending, and if so, consider batching and encrypting them (and
1847  * then trigger processing of the encrypted queue if needed).
1848  *
1849  * @param n neighbour to check.
1850  */
1851 static void
1852 process_plaintext_neighbour_queue (struct Neighbour *n)
1853 {
1854   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1855   size_t used;
1856   size_t esize;
1857   struct EncryptedMessage *em;  /* encrypted message */
1858   struct EncryptedMessage *ph;  /* plaintext header */
1859   struct MessageEntry *me;
1860   unsigned int priority;
1861   struct GNUNET_TIME_Absolute deadline;
1862   struct GNUNET_TIME_Relative retry_time;
1863   GNUNET_HashCode iv;
1864
1865   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1866     {
1867       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1868       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1869     }
1870   switch (n->status)
1871     {
1872     case PEER_STATE_DOWN:
1873       send_key (n);
1874 #if DEBUG_CORE
1875       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1876                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1877                   GNUNET_i2s(&n->peer));
1878 #endif
1879       return;
1880     case PEER_STATE_KEY_SENT:
1881       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1882         n->retry_set_key_task
1883           = GNUNET_SCHEDULER_add_delayed (sched,
1884                                           n->set_key_retry_frequency,
1885                                           &set_key_retry_task, n);    
1886 #if DEBUG_CORE
1887       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1888                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1889                   GNUNET_i2s(&n->peer));
1890 #endif
1891       return;
1892     case PEER_STATE_KEY_RECEIVED:
1893       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1894         n->retry_set_key_task
1895           = GNUNET_SCHEDULER_add_delayed (sched,
1896                                           n->set_key_retry_frequency,
1897                                           &set_key_retry_task, n);        
1898 #if DEBUG_CORE
1899       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1901                   GNUNET_i2s(&n->peer));
1902 #endif
1903       return;
1904     case PEER_STATE_KEY_CONFIRMED:
1905       /* ready to continue */
1906       break;
1907     }
1908   discard_expired_messages (n);
1909   if (n->messages == NULL)
1910     {
1911 #if DEBUG_CORE
1912       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913                   "Plaintext message queue for `%4s' is empty.\n",
1914                   GNUNET_i2s(&n->peer));
1915 #endif
1916       return;                   /* no pending messages */
1917     }
1918   if (n->encrypted_head != NULL)
1919     {
1920 #if DEBUG_CORE
1921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1923                   GNUNET_i2s(&n->peer));
1924 #endif
1925       return;                   /* wait for messages already encrypted to be
1926                                    processed first! */
1927     }
1928   ph = (struct EncryptedMessage *) pbuf;
1929   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1930   priority = 0;
1931   used = sizeof (struct EncryptedMessage);
1932   used += batch_message (n,
1933                          &pbuf[used],
1934                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1935                          &deadline, &retry_time, &priority);
1936   if (used == sizeof (struct EncryptedMessage))
1937     {
1938 #if DEBUG_CORE
1939       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1940                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1941                   GNUNET_i2s(&n->peer));
1942 #endif
1943       /* no messages selected for sending, try again later... */
1944       n->retry_plaintext_task =
1945         GNUNET_SCHEDULER_add_delayed (sched,
1946                                       retry_time,
1947                                       &retry_plaintext_processing, n);
1948       return;
1949     }
1950 #if DEBUG_CORE_QUOTA
1951   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952               "Sending %u b/s as new limit to peer `%4s'\n",
1953               (unsigned int) ntohl (n->bw_in.value__),
1954               GNUNET_i2s (&n->peer));
1955 #endif
1956   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1957   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1958   ph->inbound_bw_limit = n->bw_in;
1959   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1960
1961   /* setup encryption message header */
1962   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1963   me->deadline = deadline;
1964   me->priority = priority;
1965   me->size = used;
1966   em = (struct EncryptedMessage *) &me[1];
1967   em->header.size = htons (used);
1968   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1969   em->iv_seed = ph->iv_seed;
1970   esize = used - ENCRYPTED_HEADER_SIZE;
1971   GNUNET_CRYPTO_hash (&ph->sequence_number,
1972                       esize - sizeof (GNUNET_HashCode), 
1973                       &ph->plaintext_hash);
1974   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
1975   /* encrypt */
1976 #if DEBUG_CORE
1977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1978               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1979               esize,
1980               GNUNET_i2s(&n->peer),
1981               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1982 #endif
1983   GNUNET_assert (GNUNET_OK ==
1984                  do_encrypt (n,
1985                              &iv,
1986                              &ph->plaintext_hash,
1987                              &em->plaintext_hash, esize));
1988   /* append to transmission list */
1989   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1990                                      n->encrypted_tail,
1991                                      n->encrypted_tail,
1992                                      me);
1993   process_encrypted_neighbour_queue (n);
1994 }
1995
1996
1997 /**
1998  * Function that recalculates the bandwidth quota for the
1999  * given neighbour and transmits it to the transport service.
2000  * 
2001  * @param cls neighbour for the quota update
2002  * @param tc context
2003  */
2004 static void
2005 neighbour_quota_update (void *cls,
2006                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2007
2008
2009 /**
2010  * Schedule the task that will recalculate the bandwidth
2011  * quota for this peer (and possibly force a disconnect of
2012  * idle peers by calculating a bandwidth of zero).
2013  */
2014 static void
2015 schedule_quota_update (struct Neighbour *n)
2016 {
2017   GNUNET_assert (n->quota_update_task ==
2018                  GNUNET_SCHEDULER_NO_TASK);
2019   n->quota_update_task
2020     = GNUNET_SCHEDULER_add_delayed (sched,
2021                                     QUOTA_UPDATE_FREQUENCY,
2022                                     &neighbour_quota_update,
2023                                     n);
2024 }
2025
2026
2027 /**
2028  * Initialize a new 'struct Neighbour'.
2029  *
2030  * @param pid ID of the new neighbour
2031  * @return handle for the new neighbour
2032  */
2033 static struct Neighbour *
2034 create_neighbour (const struct GNUNET_PeerIdentity *pid)
2035 {
2036   struct Neighbour *n;
2037   struct GNUNET_TIME_Absolute now;
2038
2039   n = GNUNET_malloc (sizeof (struct Neighbour));
2040   n->next = neighbours;
2041   neighbours = n;
2042   neighbour_count++;
2043   n->peer = *pid;
2044   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2045   now = GNUNET_TIME_absolute_get ();
2046   n->encrypt_key_created = now;
2047   n->last_activity = now;
2048   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2049   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2050   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2051   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
2052   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2053   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2054                                                 (uint32_t) - 1);
2055   neighbour_quota_update (n, NULL);
2056   return n;
2057 }
2058
2059
2060 /**
2061  * Handle CORE_SEND request.
2062  *
2063  * @param cls unused
2064  * @param client the client issuing the request
2065  * @param message the "struct SendMessage"
2066  */
2067 static void
2068 handle_client_send (void *cls,
2069                     struct GNUNET_SERVER_Client *client,
2070                     const struct GNUNET_MessageHeader *message)
2071 {
2072   const struct SendMessage *sm;
2073   struct Neighbour *n;
2074   struct MessageEntry *prev;
2075   struct MessageEntry *pos;
2076   struct MessageEntry *e; 
2077   struct MessageEntry *min_prio_entry;
2078   struct MessageEntry *min_prio_prev;
2079   unsigned int min_prio;
2080   unsigned int queue_size;
2081   uint16_t msize;
2082
2083   msize = ntohs (message->size);
2084   if (msize <
2085       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
2086     {
2087       GNUNET_break (0);
2088       if (client != NULL)
2089         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2090       return;
2091     }
2092   sm = (const struct SendMessage *) message;
2093   msize -= sizeof (struct SendMessage);
2094   n = find_neighbour (&sm->peer);
2095   if (n == NULL)
2096     n = create_neighbour (&sm->peer);
2097 #if DEBUG_CORE
2098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2100               "SEND",
2101               msize, 
2102               GNUNET_i2s (&sm->peer));
2103 #endif
2104   /* bound queue size */
2105   discard_expired_messages (n);
2106   min_prio = (unsigned int) -1;
2107   min_prio_entry = NULL;
2108   min_prio_prev = NULL;
2109   queue_size = 0;
2110   prev = NULL;
2111   pos = n->messages;
2112   while (pos != NULL) 
2113     {
2114       if (pos->priority < min_prio)
2115         {
2116           min_prio_entry = pos;
2117           min_prio_prev = prev;
2118           min_prio = pos->priority;
2119         }
2120       queue_size++;
2121       prev = pos;
2122       pos = pos->next;
2123     }
2124   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2125     {
2126       /* queue full */
2127       if (ntohl(sm->priority) <= min_prio)
2128         {
2129           /* discard new entry */
2130 #if DEBUG_CORE
2131           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2132                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2133                       queue_size,
2134                       MAX_PEER_QUEUE_SIZE,
2135                       msize,
2136                       ntohs (message->type));
2137 #endif
2138           if (client != NULL)
2139             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2140           return;
2141         }
2142       /* discard "min_prio_entry" */
2143 #if DEBUG_CORE
2144       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2145                   "Queue full, discarding existing older request\n");
2146 #endif
2147       if (min_prio_prev == NULL)
2148         n->messages = min_prio_entry->next;
2149       else
2150         min_prio_prev->next = min_prio_entry->next;      
2151       GNUNET_free (min_prio_entry);     
2152     }
2153
2154 #if DEBUG_CORE
2155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2156               "Adding transmission request for `%4s' of size %u to queue\n",
2157               GNUNET_i2s (&sm->peer),
2158               msize);
2159 #endif  
2160   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2161   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2162   e->priority = ntohl (sm->priority);
2163   e->size = msize;
2164   memcpy (&e[1], &sm[1], msize);
2165
2166   /* insert, keep list sorted by deadline */
2167   prev = NULL;
2168   pos = n->messages;
2169   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2170     {
2171       prev = pos;
2172       pos = pos->next;
2173     }
2174   if (prev == NULL)
2175     n->messages = e;
2176   else
2177     prev->next = e;
2178   e->next = pos;
2179
2180   /* consider scheduling now */
2181   process_plaintext_neighbour_queue (n);
2182   if (client != NULL)
2183     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2184 }
2185
2186
2187 /**
2188  * Function called when the transport service is ready to
2189  * receive a message.  Only resets 'n->th' to NULL.
2190  *
2191  * @param cls neighbour to use message from
2192  * @param size number of bytes we can transmit
2193  * @param buf where to copy the message
2194  * @return number of bytes transmitted
2195  */
2196 static size_t
2197 notify_transport_connect_done (void *cls, size_t size, void *buf)
2198 {
2199   struct Neighbour *n = cls;
2200
2201   n->th = NULL;
2202   if (buf == NULL)
2203     {
2204       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2205                   _("Failed to connect to `%4s': transport failed to connect\n"),
2206                   GNUNET_i2s (&n->peer));
2207       return 0;
2208     }
2209   send_key (n);
2210   return 0;
2211 }
2212
2213
2214 /**
2215  * Handle CORE_REQUEST_CONNECT request.
2216  *
2217  * @param cls unused
2218  * @param client the client issuing the request
2219  * @param message the "struct ConnectMessage"
2220  */
2221 static void
2222 handle_client_request_connect (void *cls,
2223                                struct GNUNET_SERVER_Client *client,
2224                                const struct GNUNET_MessageHeader *message)
2225 {
2226   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2227   struct Neighbour *n;
2228   struct GNUNET_TIME_Relative timeout;
2229
2230   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2231     {
2232       GNUNET_break (0);
2233       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2234       return;
2235     }
2236   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2237   n = find_neighbour (&cm->peer);
2238   if (n == NULL)
2239     n = create_neighbour (&cm->peer);
2240   if ( (n->is_connected) ||
2241        (n->th != NULL) )
2242     return; /* already connected, or at least trying */
2243 #if DEBUG_CORE
2244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2245               "Core received `%s' request for `%4s', will try to establish connection\n",
2246               "REQUEST_CONNECT",
2247               GNUNET_i2s (&cm->peer));
2248 #endif
2249   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2250   /* ask transport to connect to the peer */
2251   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2252                                                   &cm->peer,
2253                                                   sizeof (struct GNUNET_MessageHeader), 0,
2254                                                   timeout,
2255                                                   &notify_transport_connect_done,
2256                                                   n);
2257   GNUNET_break (NULL != n->th);
2258 }
2259
2260
2261 /**
2262  * List of handlers for the messages understood by this
2263  * service.
2264  */
2265 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2266   {&handle_client_init, NULL,
2267    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2268   {&handle_client_request_info, NULL,
2269    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2270    sizeof (struct RequestInfoMessage)},
2271   {&handle_client_send, NULL,
2272    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2273   {&handle_client_request_connect, NULL,
2274    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2275    sizeof (struct ConnectMessage)},
2276   {NULL, NULL, 0, 0}
2277 };
2278
2279
2280 /**
2281  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2282  * the neighbour's struct and retry send_key.  Or, if we did not get a
2283  * HELLO, just do nothing.
2284  *
2285  * @param cls the 'struct Neighbour' to retry sending the key for
2286  * @param peer the peer for which this is the HELLO
2287  * @param hello HELLO message of that peer
2288  * @param trust amount of trust we currently have in that peer
2289  */
2290 static void
2291 process_hello_retry_send_key (void *cls,
2292                               const struct GNUNET_PeerIdentity *peer,
2293                               const struct GNUNET_HELLO_Message *hello,
2294                               uint32_t trust)
2295 {
2296   struct Neighbour *n = cls;
2297
2298   if (peer == NULL)
2299     {
2300 #if DEBUG_CORE
2301       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2302                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2303 #endif
2304       n->pitr = NULL;
2305       if (n->public_key != NULL)
2306         {
2307           send_key (n);
2308         }
2309       else
2310         {
2311           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2312             n->retry_set_key_task
2313               = GNUNET_SCHEDULER_add_delayed (sched,
2314                                               n->set_key_retry_frequency,
2315                                               &set_key_retry_task, n);
2316         }
2317       return;
2318     }
2319
2320 #if DEBUG_CORE
2321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2323               GNUNET_i2s (peer));
2324 #endif
2325   if (n->public_key != NULL)
2326     {
2327 #if DEBUG_CORE
2328       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2329               "already have public key for peer %s!! (so why are we here?)\n",
2330               GNUNET_i2s (peer));
2331 #endif
2332       return;
2333     }
2334
2335 #if DEBUG_CORE
2336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2337               "Received new `%s' message for `%4s', initiating key exchange.\n",
2338               "HELLO",
2339               GNUNET_i2s (peer));
2340 #endif
2341   n->public_key =
2342     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2343   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2344     {
2345       GNUNET_free (n->public_key);
2346       n->public_key = NULL;
2347 #if DEBUG_CORE
2348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2349               "GNUNET_HELLO_get_key returned awfully\n");
2350 #endif
2351       return;
2352     }
2353 }
2354
2355
2356 /**
2357  * Send our key (and encrypted PING) to the other peer.
2358  *
2359  * @param n the other peer
2360  */
2361 static void
2362 send_key (struct Neighbour *n)
2363 {
2364   struct SetKeyMessage *sm;
2365   struct MessageEntry *me;
2366   struct PingMessage pp;
2367   struct PingMessage *pm;
2368
2369   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2370        (n->pitr != NULL) )
2371     {
2372 #if DEBUG_CORE
2373       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2374                   "Key exchange in progress with `%4s'.\n",
2375                   GNUNET_i2s (&n->peer));
2376 #endif
2377       return; /* already in progress */
2378     }
2379
2380 #if DEBUG_CORE
2381   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2382               "Asked to perform key exchange with `%4s'.\n",
2383               GNUNET_i2s (&n->peer));
2384 #endif
2385   if (n->public_key == NULL)
2386     {
2387       /* lookup n's public key, then try again */
2388 #if DEBUG_CORE
2389       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2390                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2391                   GNUNET_i2s (&n->peer));
2392 #endif
2393       GNUNET_assert (n->pitr == NULL);
2394       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2395                                          sched,
2396                                          &n->peer,
2397                                          0,
2398                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2399                                          &process_hello_retry_send_key, n);
2400       return;
2401     }
2402   /* first, set key message */
2403   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2404                       sizeof (struct SetKeyMessage));
2405   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2406   me->priority = SET_KEY_PRIORITY;
2407   me->size = sizeof (struct SetKeyMessage);
2408   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2409                                      n->encrypted_tail,
2410                                      n->encrypted_tail,
2411                                      me);
2412   sm = (struct SetKeyMessage *) &me[1];
2413   sm->header.size = htons (sizeof (struct SetKeyMessage));
2414   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2415   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2416                                         PEER_STATE_KEY_SENT : n->status));
2417   sm->purpose.size =
2418     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2419            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2420            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2421            sizeof (struct GNUNET_PeerIdentity));
2422   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2423   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2424   sm->target = n->peer;
2425   GNUNET_assert (GNUNET_OK ==
2426                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2427                                             sizeof (struct
2428                                                     GNUNET_CRYPTO_AesSessionKey),
2429                                             n->public_key,
2430                                             &sm->encrypted_key));
2431   GNUNET_assert (GNUNET_OK ==
2432                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2433                                          &sm->signature));
2434
2435   /* second, encrypted PING message */
2436   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2437                       sizeof (struct PingMessage));
2438   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2439   me->priority = PING_PRIORITY;
2440   me->size = sizeof (struct PingMessage);
2441   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2442                                      n->encrypted_tail,
2443                                      n->encrypted_tail,
2444                                      me);
2445   pm = (struct PingMessage *) &me[1];
2446   pm->header.size = htons (sizeof (struct PingMessage));
2447   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2448   pp.challenge = htonl (n->ping_challenge);
2449   pp.target = n->peer;
2450 #if DEBUG_CORE
2451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2453               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2456               "PING",
2457               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2458 #endif
2459   do_encrypt (n,
2460               &n->peer.hashPubKey,
2461               &pp.challenge,
2462               &pm->challenge,
2463               sizeof (struct PingMessage) -
2464               sizeof (struct GNUNET_MessageHeader));
2465   /* update status */
2466   switch (n->status)
2467     {
2468     case PEER_STATE_DOWN:
2469       n->status = PEER_STATE_KEY_SENT;
2470       break;
2471     case PEER_STATE_KEY_SENT:
2472       break;
2473     case PEER_STATE_KEY_RECEIVED:
2474       break;
2475     case PEER_STATE_KEY_CONFIRMED:
2476       break;
2477     default:
2478       GNUNET_break (0);
2479       break;
2480     }
2481 #if DEBUG_CORE
2482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2483               "Have %llu ms left for `%s' transmission.\n",
2484               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2485               "SET_KEY");
2486 #endif
2487   /* trigger queue processing */
2488   process_encrypted_neighbour_queue (n);
2489   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2490        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2491     n->retry_set_key_task
2492       = GNUNET_SCHEDULER_add_delayed (sched,
2493                                       n->set_key_retry_frequency,
2494                                       &set_key_retry_task, n);    
2495 }
2496
2497
2498 /**
2499  * We received a SET_KEY message.  Validate and update
2500  * our key material and status.
2501  *
2502  * @param n the neighbour from which we received message m
2503  * @param m the set key message we received
2504  */
2505 static void
2506 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2507
2508
2509 /**
2510  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2511  * the neighbour's struct and retry handling the set_key message.  Or,
2512  * if we did not get a HELLO, just free the set key message.
2513  *
2514  * @param cls pointer to the set key message
2515  * @param peer the peer for which this is the HELLO
2516  * @param hello HELLO message of that peer
2517  * @param trust amount of trust we currently have in that peer
2518  */
2519 static void
2520 process_hello_retry_handle_set_key (void *cls,
2521                                     const struct GNUNET_PeerIdentity *peer,
2522                                     const struct GNUNET_HELLO_Message *hello,
2523                                     uint32_t trust)
2524 {
2525   struct Neighbour *n = cls;
2526   struct SetKeyMessage *sm = n->skm;
2527
2528   if (peer == NULL)
2529     {
2530       GNUNET_free (sm);
2531       n->skm = NULL;
2532       n->pitr = NULL;
2533       return;
2534     }
2535   if (n->public_key != NULL)
2536     return;                     /* multiple HELLOs match!? */
2537   n->public_key =
2538     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2539   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2540     {
2541       GNUNET_break_op (0);
2542       GNUNET_free (n->public_key);
2543       n->public_key = NULL;
2544       return;
2545     }
2546 #if DEBUG_CORE
2547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2548               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2549               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2550 #endif
2551   handle_set_key (n, sm);
2552 }
2553
2554
2555 /**
2556  * We received a PING message.  Validate and transmit
2557  * PONG.
2558  *
2559  * @param n sender of the PING
2560  * @param m the encrypted PING message itself
2561  */
2562 static void
2563 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2564 {
2565   struct PingMessage t;
2566   struct PongMessage tx;
2567   struct PongMessage *tp;
2568   struct MessageEntry *me;
2569
2570 #if DEBUG_CORE
2571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2572               "Core service receives `%s' request from `%4s'.\n",
2573               "PING", GNUNET_i2s (&n->peer));
2574 #endif
2575   if (GNUNET_OK !=
2576       do_decrypt (n,
2577                   &my_identity.hashPubKey,
2578                   &m->challenge,
2579                   &t.challenge,
2580                   sizeof (struct PingMessage) -
2581                   sizeof (struct GNUNET_MessageHeader)))
2582     return;
2583 #if DEBUG_CORE
2584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2585               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2586               "PING",
2587               GNUNET_i2s (&t.target),
2588               ntohl (t.challenge), n->decrypt_key.crc32);
2589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2590               "Target of `%s' request is `%4s'.\n",
2591               "PING", GNUNET_i2s (&t.target));
2592 #endif
2593   if (0 != memcmp (&t.target,
2594                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2595     {
2596       GNUNET_break_op (0);
2597       return;
2598     }
2599   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2600                       sizeof (struct PongMessage));
2601   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2602                                      n->encrypted_tail,
2603                                      n->encrypted_tail,
2604                                      me);
2605   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2606   me->priority = PONG_PRIORITY;
2607   me->size = sizeof (struct PongMessage);
2608   tx.reserved = htonl (0);
2609   tx.inbound_bw_limit = n->bw_in;
2610   tx.challenge = t.challenge;
2611   tx.target = t.target;
2612   tp = (struct PongMessage *) &me[1];
2613   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2614   tp->header.size = htons (sizeof (struct PongMessage));
2615   do_encrypt (n,
2616               &my_identity.hashPubKey,
2617               &tx.challenge,
2618               &tp->challenge,
2619               sizeof (struct PongMessage) -
2620               sizeof (struct GNUNET_MessageHeader));
2621 #if DEBUG_CORE
2622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2623               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2624               ntohl (t.challenge), n->encrypt_key.crc32);
2625 #endif
2626   /* trigger queue processing */
2627   process_encrypted_neighbour_queue (n);
2628 }
2629
2630
2631 /**
2632  * We received a PONG message.  Validate and update our status.
2633  *
2634  * @param n sender of the PONG
2635  * @param m the encrypted PONG message itself
2636  */
2637 static void
2638 handle_pong (struct Neighbour *n, 
2639              const struct PongMessage *m)
2640 {
2641   struct PongMessage t;
2642   struct ConnectNotifyMessage cnm;
2643
2644 #if DEBUG_CORE
2645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2646               "Core service receives `%s' request from `%4s'.\n",
2647               "PONG", GNUNET_i2s (&n->peer));
2648 #endif
2649   if (GNUNET_OK !=
2650       do_decrypt (n,
2651                   &n->peer.hashPubKey,
2652                   &m->challenge,
2653                   &t.challenge,
2654                   sizeof (struct PongMessage) -
2655                   sizeof (struct GNUNET_MessageHeader)))
2656     return;
2657   if (0 != ntohl (t.reserved))
2658     {
2659       GNUNET_break_op (0);
2660       return;
2661     }
2662 #if DEBUG_CORE
2663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2664               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2665               "PONG",
2666               GNUNET_i2s (&t.target),
2667               ntohl (t.challenge), n->decrypt_key.crc32);
2668 #endif
2669   if ((0 != memcmp (&t.target,
2670                     &n->peer,
2671                     sizeof (struct GNUNET_PeerIdentity))) ||
2672       (n->ping_challenge != ntohl (t.challenge)))
2673     {
2674       /* PONG malformed */
2675 #if DEBUG_CORE
2676       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2677                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2678                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2680                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2681                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2682 #endif
2683       GNUNET_break_op (0);
2684       return;
2685     }
2686   switch (n->status)
2687     {
2688     case PEER_STATE_DOWN:
2689       GNUNET_break (0);         /* should be impossible */
2690       return;
2691     case PEER_STATE_KEY_SENT:
2692       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2693       return;
2694     case PEER_STATE_KEY_RECEIVED:
2695       n->status = PEER_STATE_KEY_CONFIRMED;
2696       if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
2697         {
2698           n->bw_out_external_limit = t.inbound_bw_limit;
2699           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2700                                                   n->bw_out_internal_limit);
2701           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2702                                                  n->bw_out);       
2703           GNUNET_TRANSPORT_set_quota (transport,
2704                                       &n->peer,
2705                                       n->bw_in,
2706                                       n->bw_out,
2707                                       GNUNET_TIME_UNIT_FOREVER_REL,
2708                                       NULL, NULL); 
2709         }
2710 #if DEBUG_CORE
2711       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2712                   "Confirmed key via `%s' message for peer `%4s'\n",
2713                   "PONG", GNUNET_i2s (&n->peer));
2714 #endif
2715       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2716         {
2717           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2718           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2719         }      
2720       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2721       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2722       cnm.distance = htonl (n->last_distance);
2723       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2724       cnm.peer = n->peer;
2725       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2726       process_encrypted_neighbour_queue (n);
2727       /* fall-through! */
2728     case PEER_STATE_KEY_CONFIRMED:
2729       n->last_activity = GNUNET_TIME_absolute_get ();
2730       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
2731         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
2732       n->keep_alive_task 
2733         = GNUNET_SCHEDULER_add_delayed (sched, 
2734                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
2735                                         &send_keep_alive,
2736                                         n);
2737       break;
2738     default:
2739       GNUNET_break (0);
2740       break;
2741     }
2742 }
2743
2744
2745 /**
2746  * We received a SET_KEY message.  Validate and update
2747  * our key material and status.
2748  *
2749  * @param n the neighbour from which we received message m
2750  * @param m the set key message we received
2751  */
2752 static void
2753 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2754 {
2755   struct SetKeyMessage *m_cpy;
2756   struct GNUNET_TIME_Absolute t;
2757   struct GNUNET_CRYPTO_AesSessionKey k;
2758   struct PingMessage *ping;
2759   struct PongMessage *pong;
2760   enum PeerStateMachine sender_status;
2761
2762 #if DEBUG_CORE
2763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2764               "Core service receives `%s' request from `%4s'.\n",
2765               "SET_KEY", GNUNET_i2s (&n->peer));
2766 #endif
2767   if (n->public_key == NULL)
2768     {
2769       if (n->pitr != NULL)
2770         {
2771 #if DEBUG_CORE
2772           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2774                       "SET_KEY");
2775 #endif
2776           return;
2777         }
2778 #if DEBUG_CORE
2779       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2780                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2781 #endif
2782       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2783       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2784       /* lookup n's public key, then try again */
2785       GNUNET_assert (n->skm == NULL);
2786       n->skm = m_cpy;
2787       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2788                                          sched,
2789                                          &n->peer,
2790                                          0,
2791                                          GNUNET_TIME_UNIT_MINUTES,
2792                                          &process_hello_retry_handle_set_key, n);
2793       return;
2794     }
2795   if (0 != memcmp (&m->target,
2796                    &my_identity,
2797                    sizeof (struct GNUNET_PeerIdentity)))
2798     {
2799       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2800                   _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
2801                   "SET_KEY",
2802                   GNUNET_i2s (&m->target));
2803       return;
2804     }
2805   if ((ntohl (m->purpose.size) !=
2806        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2807        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2808        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2809        sizeof (struct GNUNET_PeerIdentity)) ||
2810       (GNUNET_OK !=
2811        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2812                                  &m->purpose, &m->signature, n->public_key)))
2813     {
2814       /* invalid signature */
2815       GNUNET_break_op (0);
2816       return;
2817     }
2818   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2819   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2820        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2821       (t.value < n->decrypt_key_created.value))
2822     {
2823       /* this could rarely happen due to massive re-ordering of
2824          messages on the network level, but is most likely either
2825          a bug or some adversary messing with us.  Report. */
2826       GNUNET_break_op (0);
2827       return;
2828     }
2829 #if DEBUG_CORE
2830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2831               "Decrypting key material.\n");
2832 #endif  
2833   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2834                                   &m->encrypted_key,
2835                                   &k,
2836                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2837        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2838       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2839     {
2840       /* failed to decrypt !? */
2841       GNUNET_break_op (0);
2842       return;
2843     }
2844
2845   n->decrypt_key = k;
2846   if (n->decrypt_key_created.value != t.value)
2847     {
2848       /* fresh key, reset sequence numbers */
2849       n->last_sequence_number_received = 0;
2850       n->last_packets_bitmap = 0;
2851       n->decrypt_key_created = t;
2852     }
2853   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2854   switch (n->status)
2855     {
2856     case PEER_STATE_DOWN:
2857       n->status = PEER_STATE_KEY_RECEIVED;
2858 #if DEBUG_CORE
2859       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2860                   "Responding to `%s' with my own key.\n", "SET_KEY");
2861 #endif
2862       send_key (n);
2863       break;
2864     case PEER_STATE_KEY_SENT:
2865     case PEER_STATE_KEY_RECEIVED:
2866       n->status = PEER_STATE_KEY_RECEIVED;
2867       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2868           (sender_status != PEER_STATE_KEY_CONFIRMED))
2869         {
2870 #if DEBUG_CORE
2871           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2872                       "Responding to `%s' with my own key (other peer has status %u).\n",
2873                       "SET_KEY", sender_status);
2874 #endif
2875           send_key (n);
2876         }
2877       break;
2878     case PEER_STATE_KEY_CONFIRMED:
2879       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2880           (sender_status != PEER_STATE_KEY_CONFIRMED))
2881         {         
2882 #if DEBUG_CORE
2883           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2884                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2885                       "SET_KEY", sender_status);
2886 #endif
2887           send_key (n);
2888         }
2889       break;
2890     default:
2891       GNUNET_break (0);
2892       break;
2893     }
2894   if (n->pending_ping != NULL)
2895     {
2896       ping = n->pending_ping;
2897       n->pending_ping = NULL;
2898       handle_ping (n, ping);
2899       GNUNET_free (ping);
2900     }
2901   if (n->pending_pong != NULL)
2902     {
2903       pong = n->pending_pong;
2904       n->pending_pong = NULL;
2905       handle_pong (n, pong);
2906       GNUNET_free (pong);
2907     }
2908 }
2909
2910
2911 /**
2912  * Send a P2P message to a client.
2913  *
2914  * @param sender who sent us the message?
2915  * @param client who should we give the message to?
2916  * @param m contains the message to transmit
2917  * @param msize number of bytes in buf to transmit
2918  */
2919 static void
2920 send_p2p_message_to_client (struct Neighbour *sender,
2921                             struct Client *client,
2922                             const void *m, size_t msize)
2923 {
2924   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2925   struct NotifyTrafficMessage *ntm;
2926
2927 #if DEBUG_CORE
2928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2929               "Core service passes message from `%4s' of type %u to client.\n",
2930               GNUNET_i2s(&sender->peer),
2931               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2932 #endif
2933   ntm = (struct NotifyTrafficMessage *) buf;
2934   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2935   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2936   ntm->distance = htonl (sender->last_distance);
2937   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2938   ntm->peer = sender->peer;
2939   memcpy (&ntm[1], m, msize);
2940   send_to_client (client, &ntm->header, GNUNET_YES);
2941 }
2942
2943
2944 /**
2945  * Deliver P2P message to interested clients.
2946  *
2947  * @param sender who sent us the message?
2948  * @param m the message
2949  * @param msize size of the message (including header)
2950  */
2951 static void
2952 deliver_message (struct Neighbour *sender,
2953                  const struct GNUNET_MessageHeader *m, size_t msize)
2954 {
2955   struct Client *cpos;
2956   uint16_t type;
2957   unsigned int tpos;
2958   int deliver_full;
2959   int dropped;
2960
2961   type = ntohs (m->type);
2962 #if DEBUG_CORE
2963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2964               "Received encapsulated message of type %u from `%4s'\n",
2965               type,
2966               GNUNET_i2s (&sender->peer));
2967 #endif
2968   dropped = GNUNET_YES;
2969   cpos = clients;
2970   while (cpos != NULL)
2971     {
2972       deliver_full = GNUNET_NO;
2973       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2974         deliver_full = GNUNET_YES;
2975       else
2976         {
2977           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2978             {
2979               if (type != cpos->types[tpos])
2980                 continue;
2981               deliver_full = GNUNET_YES;
2982               break;
2983             }
2984         }
2985       if (GNUNET_YES == deliver_full)
2986         {
2987           send_p2p_message_to_client (sender, cpos, m, msize);
2988           dropped = GNUNET_NO;
2989         }
2990       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2991         {
2992           send_p2p_message_to_client (sender, cpos, m,
2993                                       sizeof (struct GNUNET_MessageHeader));
2994         }
2995       cpos = cpos->next;
2996     }
2997   if (dropped == GNUNET_YES)
2998     {
2999 #if DEBUG_CORE
3000       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3001                   "Message of type %u from `%4s' not delivered to any client.\n",
3002                   type,
3003                   GNUNET_i2s (&sender->peer));
3004 #endif
3005       /* FIXME: stats... */
3006     }
3007 }
3008
3009
3010 /**
3011  * Align P2P message and then deliver to interested clients.
3012  *
3013  * @param sender who sent us the message?
3014  * @param buffer unaligned (!) buffer containing message
3015  * @param msize size of the message (including header)
3016  */
3017 static void
3018 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
3019 {
3020   char abuf[msize];
3021
3022   /* TODO: call to statistics? */
3023   memcpy (abuf, buffer, msize);
3024   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
3025 }
3026
3027
3028 /**
3029  * Deliver P2P messages to interested clients.
3030  *
3031  * @param sender who sent us the message?
3032  * @param buffer buffer containing messages, can be modified
3033  * @param buffer_size size of the buffer (overall)
3034  * @param offset offset where messages in the buffer start
3035  */
3036 static void
3037 deliver_messages (struct Neighbour *sender,
3038                   const char *buffer, size_t buffer_size, size_t offset)
3039 {
3040   struct GNUNET_MessageHeader *mhp;
3041   struct GNUNET_MessageHeader mh;
3042   uint16_t msize;
3043   int need_align;
3044
3045   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
3046     {
3047       if (0 != offset % sizeof (uint16_t))
3048         {
3049           /* outch, need to copy to access header */
3050           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
3051           mhp = &mh;
3052         }
3053       else
3054         {
3055           /* can access header directly */
3056           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
3057         }
3058       msize = ntohs (mhp->size);
3059       if (msize + offset > buffer_size)
3060         {
3061           /* malformed message, header says it is larger than what
3062              would fit into the overall buffer */
3063           GNUNET_break_op (0);
3064           break;
3065         }
3066 #if HAVE_UNALIGNED_64_ACCESS
3067       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
3068 #else
3069       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
3070 #endif
3071       if (GNUNET_YES == need_align)
3072         align_and_deliver (sender, &buffer[offset], msize);
3073       else
3074         deliver_message (sender,
3075                          (const struct GNUNET_MessageHeader *)
3076                          &buffer[offset], msize);
3077       offset += msize;
3078     }
3079 }
3080
3081
3082 /**
3083  * We received an encrypted message.  Decrypt, validate and
3084  * pass on to the appropriate clients.
3085  */
3086 static void
3087 handle_encrypted_message (struct Neighbour *n,
3088                           const struct EncryptedMessage *m)
3089 {
3090   size_t size = ntohs (m->header.size);
3091   char buf[size];
3092   struct EncryptedMessage *pt;  /* plaintext */
3093   GNUNET_HashCode ph;
3094   size_t off;
3095   uint32_t snum;
3096   struct GNUNET_TIME_Absolute t;
3097   GNUNET_HashCode iv;
3098
3099 #if DEBUG_CORE
3100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3101               "Core service receives `%s' request from `%4s'.\n",
3102               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
3103 #endif  
3104   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
3105   /* decrypt */
3106   if (GNUNET_OK !=
3107       do_decrypt (n,
3108                   &iv,
3109                   &m->plaintext_hash,
3110                   &buf[ENCRYPTED_HEADER_SIZE], 
3111                   size - ENCRYPTED_HEADER_SIZE))
3112     return;
3113   pt = (struct EncryptedMessage *) buf;
3114
3115   /* validate hash */
3116   GNUNET_CRYPTO_hash (&pt->sequence_number,
3117                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3118   if (0 != memcmp (&ph, 
3119                    &pt->plaintext_hash, 
3120                    sizeof (GNUNET_HashCode)))
3121     {
3122       /* checksum failed */
3123       GNUNET_break_op (0);
3124       return;
3125     }
3126
3127   /* validate sequence number */
3128   snum = ntohl (pt->sequence_number);
3129   if (n->last_sequence_number_received == snum)
3130     {
3131       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3132                   "Received duplicate message, ignoring.\n");
3133       /* duplicate, ignore */
3134       return;
3135     }
3136   if ((n->last_sequence_number_received > snum) &&
3137       (n->last_sequence_number_received - snum > 32))
3138     {
3139       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3140                   "Received ancient out of sequence message, ignoring.\n");
3141       /* ancient out of sequence, ignore */
3142       return;
3143     }
3144   if (n->last_sequence_number_received > snum)
3145     {
3146       unsigned int rotbit =
3147         1 << (n->last_sequence_number_received - snum - 1);
3148       if ((n->last_packets_bitmap & rotbit) != 0)
3149         {
3150           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3151                       "Received duplicate message, ignoring.\n");
3152           /* duplicate, ignore */
3153           return;
3154         }
3155       n->last_packets_bitmap |= rotbit;
3156     }
3157   if (n->last_sequence_number_received < snum)
3158     {
3159       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3160       n->last_sequence_number_received = snum;
3161     }
3162
3163   /* check timestamp */
3164   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3165   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3166     {
3167       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3168                   _
3169                   ("Message received far too old (%llu ms). Content ignored.\n"),
3170                   GNUNET_TIME_absolute_get_duration (t).value);
3171       return;
3172     }
3173
3174   /* process decrypted message(s) */
3175   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3176     {
3177 #if DEBUG_CORE_SET_QUOTA
3178       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3179                   "Received %u b/s as new inbound limit for peer `%4s'\n",
3180                   (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3181                   GNUNET_i2s (&n->peer));
3182 #endif
3183       n->bw_out_external_limit = pt->inbound_bw_limit;
3184       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3185                                               n->bw_out_internal_limit);
3186       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3187                                              n->bw_out);
3188       GNUNET_TRANSPORT_set_quota (transport,
3189                                   &n->peer,
3190                                   n->bw_in,
3191                                   n->bw_out,
3192                                   GNUNET_TIME_UNIT_FOREVER_REL,
3193                                   NULL, NULL); 
3194     }
3195   n->last_activity = GNUNET_TIME_absolute_get ();
3196   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3197     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3198   n->keep_alive_task 
3199     = GNUNET_SCHEDULER_add_delayed (sched, 
3200                                     GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3201                                     &send_keep_alive,
3202                                     n);
3203   off = sizeof (struct EncryptedMessage);
3204   deliver_messages (n, buf, size, off);
3205 }
3206
3207
3208 /**
3209  * Function called by the transport for each received message.
3210  *
3211  * @param cls closure
3212  * @param peer (claimed) identity of the other peer
3213  * @param message the message
3214  * @param latency estimated latency for communicating with the
3215  *             given peer (round-trip)
3216  * @param distance in overlay hops, as given by transport plugin
3217  */
3218 static void
3219 handle_transport_receive (void *cls,
3220                           const struct GNUNET_PeerIdentity *peer,
3221                           const struct GNUNET_MessageHeader *message,
3222                           struct GNUNET_TIME_Relative latency,
3223                           unsigned int distance)
3224 {
3225   struct Neighbour *n;
3226   struct GNUNET_TIME_Absolute now;
3227   int up;
3228   uint16_t type;
3229   uint16_t size;
3230
3231 #if DEBUG_CORE
3232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3233               "Received message of type %u from `%4s', demultiplexing.\n",
3234               ntohs (message->type), GNUNET_i2s (peer));
3235 #endif
3236   n = find_neighbour (peer);
3237   if (n == NULL)
3238     n = create_neighbour (peer);
3239   if (n == NULL)
3240     return;   
3241   n->last_latency = latency;
3242   n->last_distance = distance;
3243   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3244   type = ntohs (message->type);
3245   size = ntohs (message->size);
3246 #if DEBUG_HANDSHAKE
3247   fprintf (stderr,
3248            "Received message of type %u from `%4s'\n",
3249            type,
3250            GNUNET_i2s (peer));
3251 #endif
3252   switch (type)
3253     {
3254     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3255       if (size != sizeof (struct SetKeyMessage))
3256         {
3257           GNUNET_break_op (0);
3258           return;
3259         }
3260       handle_set_key (n, (const struct SetKeyMessage *) message);
3261       break;
3262     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3263       if (size < sizeof (struct EncryptedMessage) +
3264           sizeof (struct GNUNET_MessageHeader))
3265         {
3266           GNUNET_break_op (0);
3267           return;
3268         }
3269       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3270           (n->status != PEER_STATE_KEY_CONFIRMED))
3271         {
3272           GNUNET_break_op (0);
3273           return;
3274         }
3275       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3276       break;
3277     case GNUNET_MESSAGE_TYPE_CORE_PING:
3278       if (size != sizeof (struct PingMessage))
3279         {
3280           GNUNET_break_op (0);
3281           return;
3282         }
3283       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3284           (n->status != PEER_STATE_KEY_CONFIRMED))
3285         {
3286 #if DEBUG_CORE
3287           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3288                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3289                       "PING", GNUNET_i2s (&n->peer));
3290 #endif
3291           GNUNET_free_non_null (n->pending_ping);
3292           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3293           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3294           return;
3295         }
3296       handle_ping (n, (const struct PingMessage *) message);
3297       break;
3298     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3299       if (size != sizeof (struct PongMessage))
3300         {
3301           GNUNET_break_op (0);
3302           return;
3303         }
3304       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3305            (n->status != PEER_STATE_KEY_CONFIRMED) )
3306         {
3307 #if DEBUG_CORE
3308           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3309                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3310                       "PONG", GNUNET_i2s (&n->peer));
3311 #endif
3312           GNUNET_free_non_null (n->pending_pong);
3313           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3314           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3315           return;
3316         }
3317       handle_pong (n, (const struct PongMessage *) message);
3318       break;
3319     default:
3320       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3321                   _("Unsupported message of type %u received.\n"), type);
3322       return;
3323     }
3324   if (n->status == PEER_STATE_KEY_CONFIRMED)
3325     {
3326       now = GNUNET_TIME_absolute_get ();
3327       n->last_activity = now;
3328       if (!up)
3329         n->time_established = now;
3330       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3331         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3332       n->keep_alive_task 
3333         = GNUNET_SCHEDULER_add_delayed (sched, 
3334                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3335                                         &send_keep_alive,
3336                                         n);
3337     }
3338 }
3339
3340
3341 /**
3342  * Function that recalculates the bandwidth quota for the
3343  * given neighbour and transmits it to the transport service.
3344  * 
3345  * @param cls neighbour for the quota update
3346  * @param tc context
3347  */
3348 static void
3349 neighbour_quota_update (void *cls,
3350                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3351 {
3352   struct Neighbour *n = cls;
3353   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3354   double pref_rel;
3355   double share;
3356   unsigned long long distributable;
3357   uint64_t need_per_peer;
3358   uint64_t need_per_second;
3359   
3360   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3361   /* calculate relative preference among all neighbours;
3362      divides by a bit more to avoid division by zero AND to
3363      account for possibility of new neighbours joining any time 
3364      AND to convert to double... */
3365   if (preference_sum == 0)
3366     {
3367       pref_rel = 1.0 / (double) neighbour_count;
3368     }
3369   else
3370     {
3371       pref_rel = n->current_preference / preference_sum;
3372     }
3373   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3374                                                               GNUNET_TIME_UNIT_SECONDS);  
3375   need_per_second = need_per_peer * neighbour_count;
3376   distributable = 0;
3377   if (bandwidth_target_out_bps > need_per_second)
3378     distributable = bandwidth_target_out_bps - need_per_second;
3379   share = distributable * pref_rel;
3380   if (share + need_per_peer > ( (uint32_t)-1))
3381     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3382   else
3383     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3384   /* check if we want to disconnect for good due to inactivity */
3385   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3386        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3387     {
3388 #if DEBUG_CORE
3389       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3390                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3391                   GNUNET_i2s (&n->peer));
3392 #endif
3393       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3394     }
3395 #if DEBUG_CORE_QUOTA
3396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3397               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3398               GNUNET_i2s (&n->peer),
3399               (unsigned int) ntohl (q_in.value__),
3400               bandwidth_target_out_bps,
3401               (unsigned int) ntohl (n->bw_in.value__),
3402               (unsigned int) ntohl (n->bw_out.value__),
3403               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3404 #endif
3405   if (n->bw_in.value__ != q_in.value__) 
3406     {
3407       n->bw_in = q_in;
3408       GNUNET_TRANSPORT_set_quota (transport,
3409                                   &n->peer,
3410                                   n->bw_in,
3411                                   n->bw_out,
3412                                   GNUNET_TIME_UNIT_FOREVER_REL,
3413                                   NULL, NULL);
3414     }
3415   schedule_quota_update (n);
3416 }
3417
3418
3419 /**
3420  * Function called by transport to notify us that
3421  * a peer connected to us (on the network level).
3422  *
3423  * @param cls closure
3424  * @param peer the peer that connected
3425  * @param latency current latency of the connection
3426  * @param distance in overlay hops, as given by transport plugin
3427  */
3428 static void
3429 handle_transport_notify_connect (void *cls,
3430                                  const struct GNUNET_PeerIdentity *peer,
3431                                  struct GNUNET_TIME_Relative latency,
3432                                  unsigned int distance)
3433 {
3434   struct Neighbour *n;
3435   struct ConnectNotifyMessage cnm;
3436
3437   n = find_neighbour (peer);
3438   if (n != NULL)
3439     {
3440       if (n->is_connected)
3441         {
3442           /* duplicate connect notification!? */
3443           GNUNET_break (0);
3444           return;
3445         }
3446     }
3447   else
3448     {
3449       n = create_neighbour (peer);
3450     }
3451   n->is_connected = GNUNET_YES;      
3452   n->last_latency = latency;
3453   n->last_distance = distance;
3454   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3455                                  n->bw_out,
3456                                  MAX_WINDOW_TIME_S);
3457   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3458                                  n->bw_in,
3459                                  MAX_WINDOW_TIME_S);  
3460 #if DEBUG_CORE
3461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3462               "Received connection from `%4s'.\n",
3463               GNUNET_i2s (&n->peer));
3464 #endif
3465   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3466   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3467   cnm.distance = htonl (n->last_distance);
3468   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3469   cnm.peer = *peer;
3470   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3471   GNUNET_TRANSPORT_set_quota (transport,
3472                               &n->peer,
3473                               n->bw_in,
3474                               n->bw_out,
3475                               GNUNET_TIME_UNIT_FOREVER_REL,
3476                               NULL, NULL);
3477   send_key (n); 
3478 }
3479
3480
3481 /**
3482  * Function called by transport telling us that a peer
3483  * disconnected.
3484  *
3485  * @param cls closure
3486  * @param peer the peer that disconnected
3487  */
3488 static void
3489 handle_transport_notify_disconnect (void *cls,
3490                                     const struct GNUNET_PeerIdentity *peer)
3491 {
3492   struct DisconnectNotifyMessage cnm;
3493   struct Neighbour *n;
3494
3495 #if DEBUG_CORE
3496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3497               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3498 #endif
3499   n = find_neighbour (peer);
3500   if (n == NULL)
3501     {
3502       GNUNET_break (0);
3503       return;
3504     }
3505   GNUNET_break (n->is_connected);
3506   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3507   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3508   cnm.peer = *peer;
3509   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3510   n->is_connected = GNUNET_NO;
3511 }
3512
3513
3514 /**
3515  * Last task run during shutdown.  Disconnects us from
3516  * the transport.
3517  */
3518 static void
3519 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3520 {
3521   struct Neighbour *n;
3522   struct Client *c;
3523
3524 #if DEBUG_CORE
3525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3526               "Core service shutting down.\n");
3527 #endif
3528   GNUNET_assert (transport != NULL);
3529   GNUNET_TRANSPORT_disconnect (transport);
3530   transport = NULL;
3531   while (NULL != (n = neighbours))
3532     {
3533       neighbours = n->next;
3534       GNUNET_assert (neighbour_count > 0);
3535       neighbour_count--;
3536       free_neighbour (n);
3537     }
3538   GNUNET_SERVER_notification_context_destroy (notifier);
3539   notifier = NULL;
3540   while (NULL != (c = clients))
3541     handle_client_disconnect (NULL, c->client_handle);
3542   if (my_private_key != NULL)
3543     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3544 }
3545
3546
3547 /**
3548  * Initiate core service.
3549  *
3550  * @param cls closure
3551  * @param s scheduler to use
3552  * @param serv the initialized server
3553  * @param c configuration to use
3554  */
3555 static void
3556 run (void *cls,
3557      struct GNUNET_SCHEDULER_Handle *s,
3558      struct GNUNET_SERVER_Handle *serv,
3559      const struct GNUNET_CONFIGURATION_Handle *c)
3560 {
3561   char *keyfile;
3562
3563   sched = s;
3564   cfg = c;  
3565   /* parse configuration */
3566   if (
3567        (GNUNET_OK !=
3568         GNUNET_CONFIGURATION_get_value_number (c,
3569                                                "CORE",
3570                                                "TOTAL_QUOTA_IN",
3571                                                &bandwidth_target_in_bps)) ||
3572        (GNUNET_OK !=
3573         GNUNET_CONFIGURATION_get_value_number (c,
3574                                                "CORE",
3575                                                "TOTAL_QUOTA_OUT",
3576                                                &bandwidth_target_out_bps)) ||
3577        (GNUNET_OK !=
3578         GNUNET_CONFIGURATION_get_value_filename (c,
3579                                                  "GNUNETD",
3580                                                  "HOSTKEY", &keyfile)))
3581     {
3582       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3583                   _
3584                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3585       GNUNET_SCHEDULER_shutdown (s);
3586       return;
3587     }
3588   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3589   GNUNET_free (keyfile);
3590   if (my_private_key == NULL)
3591     {
3592       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3593                   _("Core service could not access hostkey.  Exiting.\n"));
3594       GNUNET_SCHEDULER_shutdown (s);
3595       return;
3596     }
3597   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3598   GNUNET_CRYPTO_hash (&my_public_key,
3599                       sizeof (my_public_key), &my_identity.hashPubKey);
3600   /* setup notification */
3601   server = serv;
3602   notifier = GNUNET_SERVER_notification_context_create (server, 
3603                                                         MAX_NOTIFY_QUEUE);
3604   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3605   /* setup transport connection */
3606   transport = GNUNET_TRANSPORT_connect (sched,
3607                                         cfg,
3608                                         NULL,
3609                                         &handle_transport_receive,
3610                                         &handle_transport_notify_connect,
3611                                         &handle_transport_notify_disconnect);
3612   GNUNET_assert (NULL != transport);
3613   GNUNET_SCHEDULER_add_delayed (sched,
3614                                 GNUNET_TIME_UNIT_FOREVER_REL,
3615                                 &cleaning_task, NULL);
3616   /* process client requests */
3617   GNUNET_SERVER_add_handlers (server, handlers);
3618   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3619               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3620 }
3621
3622
3623
3624 /**
3625  * The main function for the transport service.
3626  *
3627  * @param argc number of arguments from the command line
3628  * @param argv command line arguments
3629  * @return 0 ok, 1 on error
3630  */
3631 int
3632 main (int argc, char *const *argv)
3633 {
3634   return (GNUNET_OK ==
3635           GNUNET_SERVICE_run (argc,
3636                               argv,
3637                               "core",
3638                               GNUNET_SERVICE_OPTION_NONE,
3639                               &run, NULL)) ? 0 : 1;
3640 }
3641
3642 /* end of gnunet-service-core.c */