only notify when desired
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 #define DEBUG_CORE_QUOTA GNUNET_NO
47
48 /**
49  * Receive and send buffer windows grow over time.  For
50  * how long can 'unused' bandwidth accumulate before we
51  * need to cap it?  (specified in seconds).
52  */
53 #define MAX_WINDOW_TIME_S (5 * 60)
54
55 /**
56  * How many messages do we queue up at most for optional
57  * notifications to a client?  (this can cause notifications
58  * about outgoing messages to be dropped).
59  */
60 #define MAX_NOTIFY_QUEUE 16
61
62 /**
63  * Minimum bandwidth (out) to assign to any connected peer.
64  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
65  * sense.
66  */
67 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * How long do we delay messages to get larger packet sizes (CORKing)?
80  */
81 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
82
83 /**
84  * What is the maximum delay for a SET_KEY message?
85  */
86 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
87
88 /**
89  * What how long do we wait for SET_KEY confirmation initially?
90  */
91 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
92
93 /**
94  * What is the maximum delay for a PING message?
95  */
96 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
97
98 /**
99  * What is the maximum delay for a PONG message?
100  */
101 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
102
103 /**
104  * What is the minimum frequency for a PING message?
105  */
106 #define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
107
108 /**
109  * How often do we recalculate bandwidth quotas?
110  */
111 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
112
113 /**
114  * What is the priority for a SET_KEY message?
115  */
116 #define SET_KEY_PRIORITY 0xFFFFFF
117
118 /**
119  * What is the priority for a PING message?
120  */
121 #define PING_PRIORITY 0xFFFFFF
122
123 /**
124  * What is the priority for a PONG message?
125  */
126 #define PONG_PRIORITY 0xFFFFFF
127
128 /**
129  * How many messages do we queue per peer at most?  Must be at
130  * least two.
131  */
132 #define MAX_PEER_QUEUE_SIZE 16
133
134 /**
135  * How many non-mandatory messages do we queue per client at most?
136  */
137 #define MAX_CLIENT_QUEUE_SIZE 32
138
139 /**
140  * What is the maximum age of a message for us to consider
141  * processing it?  Note that this looks at the timestamp used
142  * by the other peer, so clock skew between machines does
143  * come into play here.  So this should be picked high enough
144  * so that a little bit of clock skew does not prevent peers
145  * from connecting to us.
146  */
147 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
148
149 /**
150  * What is the maximum size for encrypted messages?  Note that this
151  * number imposes a clear limit on the maximum size of any message.
152  * Set to a value close to 64k but not so close that transports will
153  * have trouble with their headers.
154  */
155 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
156
157
158 /**
159  * State machine for our P2P encryption handshake.  Everyone starts in
160  * "DOWN", if we receive the other peer's key (other peer initiated)
161  * we start in state RECEIVED (since we will immediately send our
162  * own); otherwise we start in SENT.  If we get back a PONG from
163  * within either state, we move up to CONFIRMED (the PONG will always
164  * be sent back encrypted with the key we sent to the other peer).
165  */
166 enum PeerStateMachine
167 {
168   PEER_STATE_DOWN,
169   PEER_STATE_KEY_SENT,
170   PEER_STATE_KEY_RECEIVED,
171   PEER_STATE_KEY_CONFIRMED
172 };
173
174
175 /**
176  * Number of bytes (at the beginning) of "struct EncryptedMessage"
177  * that are NOT encrypted.
178  */
179 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
180
181
182 /**
183  * Encapsulation for encrypted messages exchanged between
184  * peers.  Followed by the actual encrypted data.
185  */
186 struct EncryptedMessage
187 {
188   /**
189    * Message type is either CORE_ENCRYPTED_MESSAGE.
190    */
191   struct GNUNET_MessageHeader header;
192
193   /**
194    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
195    * be set to the offset of the *next* field.
196    */
197   uint32_t iv_seed GNUNET_PACKED;
198
199   /**
200    * Hash of the plaintext (starting at 'sequence_number'), used to
201    * verify message integrity.  Everything after this hash (including
202    * this hash itself) will be encrypted.  
203    */
204   GNUNET_HashCode plaintext_hash;
205
206   /**
207    * Sequence number, in network byte order.  This field
208    * must be the first encrypted/decrypted field and the
209    * first byte that is hashed for the plaintext hash.
210    */
211   uint32_t sequence_number GNUNET_PACKED;
212
213   /**
214    * Desired bandwidth (how much we should send to this peer / how
215    * much is the sender willing to receive)?
216    */
217   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
218
219   /**
220    * Timestamp.  Used to prevent reply of ancient messages
221    * (recent messages are caught with the sequence number).
222    */
223   struct GNUNET_TIME_AbsoluteNBO timestamp;
224
225 };
226
227
228 /**
229  * We're sending an (encrypted) PING to the other peer to check if he
230  * can decrypt.  The other peer should respond with a PONG with the
231  * same content, except this time encrypted with the receiver's key.
232  */
233 struct PingMessage
234 {
235   /**
236    * Message type is CORE_PING.
237    */
238   struct GNUNET_MessageHeader header;
239
240   /**
241    * Random number chosen to make reply harder.
242    */
243   uint32_t challenge GNUNET_PACKED;
244
245   /**
246    * Intended target of the PING, used primarily to check
247    * that decryption actually worked.
248    */
249   struct GNUNET_PeerIdentity target;
250 };
251
252
253
254 /**
255  * Response to a PING.  Includes data from the original PING
256  * plus initial bandwidth quota information.
257  */
258 struct PongMessage
259 {
260   /**
261    * Message type is CORE_PONG.
262    */
263   struct GNUNET_MessageHeader header;
264
265   /**
266    * Random number proochosen to make reply harder.  Must be
267    * first field after header (this is where we start to encrypt!).
268    */
269   uint32_t challenge GNUNET_PACKED;
270
271   /**
272    * Must be zero.
273    */
274   uint32_t reserved GNUNET_PACKED;
275
276   /**
277    * Desired bandwidth (how much we should send to this
278    * peer / how much is the sender willing to receive).
279    */
280   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
281
282   /**
283    * Intended target of the PING, used primarily to check
284    * that decryption actually worked.
285    */
286   struct GNUNET_PeerIdentity target;
287 };
288
289
290 /**
291  * Message transmitted to set (or update) a session key.
292  */
293 struct SetKeyMessage
294 {
295
296   /**
297    * Message type is either CORE_SET_KEY.
298    */
299   struct GNUNET_MessageHeader header;
300
301   /**
302    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
303    */
304   int32_t sender_status GNUNET_PACKED;
305
306   /**
307    * Purpose of the signature, will be
308    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
309    */
310   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
311
312   /**
313    * At what time was this key created?
314    */
315   struct GNUNET_TIME_AbsoluteNBO creation_time;
316
317   /**
318    * The encrypted session key.
319    */
320   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
321
322   /**
323    * Who is the intended recipient?
324    */
325   struct GNUNET_PeerIdentity target;
326
327   /**
328    * Signature of the stuff above (starting at purpose).
329    */
330   struct GNUNET_CRYPTO_RsaSignature signature;
331
332 };
333
334
335 /**
336  * Message waiting for transmission. This struct
337  * is followed by the actual content of the message.
338  */
339 struct MessageEntry
340 {
341
342   /**
343    * We keep messages in a doubly linked list.
344    */
345   struct MessageEntry *next;
346
347   /**
348    * We keep messages in a doubly linked list.
349    */
350   struct MessageEntry *prev;
351
352   /**
353    * By when are we supposed to transmit this message?
354    */
355   struct GNUNET_TIME_Absolute deadline;
356
357   /**
358    * By when are we supposed to transmit this message (after
359    * giving slack)?
360    */
361   struct GNUNET_TIME_Absolute slack_deadline;
362
363   /**
364    * How important is this message to us?
365    */
366   unsigned int priority;
367
368   /**
369    * How long is the message? (number of bytes following
370    * the "struct MessageEntry", but not including the
371    * size of "struct MessageEntry" itself!)
372    */
373   uint16_t size;
374
375   /**
376    * Was this message selected for transmission in the
377    * current round? GNUNET_YES or GNUNET_NO.
378    */
379   int8_t do_transmit;
380
381   /**
382    * Did we give this message some slack (delayed sending) previously
383    * (and hence should not give it any more slack)? GNUNET_YES or
384    * GNUNET_NO.
385    */
386   int8_t got_slack;
387
388 };
389
390
391 struct Neighbour
392 {
393   /**
394    * We keep neighbours in a linked list (for now).
395    */
396   struct Neighbour *next;
397
398   /**
399    * Unencrypted messages destined for this peer.
400    */
401   struct MessageEntry *messages;
402
403   /**
404    * Head of the batched, encrypted message queue (already ordered,
405    * transmit starting with the head).
406    */
407   struct MessageEntry *encrypted_head;
408
409   /**
410    * Tail of the batched, encrypted message queue (already ordered,
411    * append new messages to tail)
412    */
413   struct MessageEntry *encrypted_tail;
414
415   /**
416    * Handle for pending requests for transmission to this peer
417    * with the transport service.  NULL if no request is pending.
418    */
419   struct GNUNET_TRANSPORT_TransmitHandle *th;
420
421   /**
422    * Public key of the neighbour, NULL if we don't have it yet.
423    */
424   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
425
426   /**
427    * We received a PING message before we got the "public_key"
428    * (or the SET_KEY).  We keep it here until we have a key
429    * to decrypt it.  NULL if no PING is pending.
430    */
431   struct PingMessage *pending_ping;
432
433   /**
434    * We received a PONG message before we got the "public_key"
435    * (or the SET_KEY).  We keep it here until we have a key
436    * to decrypt it.  NULL if no PONG is pending.
437    */
438   struct PongMessage *pending_pong;
439
440   /**
441    * Non-NULL if we are currently looking up HELLOs for this peer.
442    * for this peer.
443    */
444   struct GNUNET_PEERINFO_IteratorContext *pitr;
445
446   /**
447    * SetKeyMessage to transmit, NULL if we are not currently trying
448    * to send one.
449    */
450   struct SetKeyMessage *skm;
451
452   /**
453    * Identity of the neighbour.
454    */
455   struct GNUNET_PeerIdentity peer;
456
457   /**
458    * Key we use to encrypt our messages for the other peer
459    * (initialized by us when we do the handshake).
460    */
461   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
462
463   /**
464    * Key we use to decrypt messages from the other peer
465    * (given to us by the other peer during the handshake).
466    */
467   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
468
469   /**
470    * ID of task used for re-trying plaintext scheduling.
471    */
472   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
473
474   /**
475    * ID of task used for re-trying SET_KEY and PING message.
476    */
477   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
478
479   /**
480    * ID of task used for updating bandwidth quota for this neighbour.
481    */
482   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
483
484   /**
485    * ID of task used for sending keep-alive pings.
486    */
487   GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
488
489   /**
490    * ID of task used for cleaning up dead neighbour entries.
491    */
492   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
493
494   /**
495    * At what time did we generate our encryption key?
496    */
497   struct GNUNET_TIME_Absolute encrypt_key_created;
498
499   /**
500    * At what time did the other peer generate the decryption key?
501    */
502   struct GNUNET_TIME_Absolute decrypt_key_created;
503
504   /**
505    * At what time did we initially establish (as in, complete session
506    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
507    */
508   struct GNUNET_TIME_Absolute time_established;
509
510   /**
511    * At what time did we last receive an encrypted message from the
512    * other peer?  Should be zero if status != KEY_CONFIRMED.
513    */
514   struct GNUNET_TIME_Absolute last_activity;
515
516   /**
517    * Last latency observed from this peer.
518    */
519   struct GNUNET_TIME_Relative last_latency;
520
521   /**
522    * At what frequency are we currently re-trying SET_KEY messages?
523    */
524   struct GNUNET_TIME_Relative set_key_retry_frequency;
525
526   /**
527    * Tracking bandwidth for sending to this peer.
528    */
529   struct GNUNET_BANDWIDTH_Tracker available_send_window;
530
531   /**
532    * Tracking bandwidth for receiving from this peer.
533    */
534   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
535
536   /**
537    * How valueable were the messages of this peer recently?
538    */
539   unsigned long long current_preference;
540
541   /**
542    * Bit map indicating which of the 32 sequence numbers before the last
543    * were received (good for accepting out-of-order packets and
544    * estimating reliability of the connection)
545    */
546   unsigned int last_packets_bitmap;
547
548   /**
549    * last sequence number received on this connection (highest)
550    */
551   uint32_t last_sequence_number_received;
552
553   /**
554    * last sequence number transmitted
555    */
556   uint32_t last_sequence_number_sent;
557
558   /**
559    * Available bandwidth in for this peer (current target).
560    */
561   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
562
563   /**
564    * Available bandwidth out for this peer (current target).
565    */
566   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
567
568   /**
569    * Internal bandwidth limit set for this peer (initially typically
570    * set to "-1").  Actual "bw_out" is MIN of
571    * "bpm_out_internal_limit" and "bw_out_external_limit".
572    */
573   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
574
575   /**
576    * External bandwidth limit set for this peer by the
577    * peer that we are communicating with.  "bw_out" is MIN of
578    * "bw_out_internal_limit" and "bw_out_external_limit".
579    */
580   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
581
582   /**
583    * What was our PING challenge number (for this peer)?
584    */
585   uint32_t ping_challenge;
586
587   /**
588    * What was the last distance to this peer as reported by the transports?
589    */
590   uint32_t last_distance;
591
592   /**
593    * What is our connection status?
594    */
595   enum PeerStateMachine status;
596
597   /**
598    * Are we currently connected to this neighbour?
599    */ 
600   int is_connected;
601
602 };
603
604
605 /**
606  * Data structure for each client connected to the core service.
607  */
608 struct Client
609 {
610   /**
611    * Clients are kept in a linked list.
612    */
613   struct Client *next;
614
615   /**
616    * Handle for the client with the server API.
617    */
618   struct GNUNET_SERVER_Client *client_handle;
619
620   /**
621    * Array of the types of messages this peer cares
622    * about (with "tcnt" entries).  Allocated as part
623    * of this client struct, do not free!
624    */
625   const uint16_t *types;
626
627   /**
628    * Options for messages this client cares about,
629    * see GNUNET_CORE_OPTION_ values.
630    */
631   uint32_t options;
632
633   /**
634    * Number of types of incoming messages this client
635    * specifically cares about.  Size of the "types" array.
636    */
637   unsigned int tcnt;
638
639 };
640
641
642 /**
643  * Our public key.
644  */
645 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
646
647 /**
648  * Our identity.
649  */
650 static struct GNUNET_PeerIdentity my_identity;
651
652 /**
653  * Our private key.
654  */
655 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
656
657 /**
658  * Our scheduler.
659  */
660 struct GNUNET_SCHEDULER_Handle *sched;
661
662 /**
663  * Our configuration.
664  */
665 const struct GNUNET_CONFIGURATION_Handle *cfg;
666
667 /**
668  * Our server.
669  */
670 static struct GNUNET_SERVER_Handle *server;
671
672 /**
673  * Transport service.
674  */
675 static struct GNUNET_TRANSPORT_Handle *transport;
676
677 /**
678  * Linked list of our clients.
679  */
680 static struct Client *clients;
681
682 /**
683  * Context for notifications we need to send to our clients.
684  */
685 static struct GNUNET_SERVER_NotificationContext *notifier;
686
687 /**
688  * We keep neighbours in a linked list (for now).
689  */
690 static struct Neighbour *neighbours;
691
692 /**
693  * Sum of all preferences among all neighbours.
694  */
695 static unsigned long long preference_sum;
696
697 /**
698  * Total number of neighbours we have.
699  */
700 static unsigned int neighbour_count;
701
702 /**
703  * How much inbound bandwidth are we supposed to be using per second?
704  * FIXME: this value is not used!
705  */
706 static unsigned long long bandwidth_target_in_bps;
707
708 /**
709  * How much outbound bandwidth are we supposed to be using per second?
710  */
711 static unsigned long long bandwidth_target_out_bps;
712
713
714
715 /**
716  * A preference value for a neighbour was update.  Update
717  * the preference sum accordingly.
718  *
719  * @param inc how much was a preference value increased?
720  */
721 static void
722 update_preference_sum (unsigned long long inc)
723 {
724   struct Neighbour *n;
725   unsigned long long os;
726
727   os = preference_sum;
728   preference_sum += inc;
729   if (preference_sum >= os)
730     return; /* done! */
731   /* overflow! compensate by cutting all values in half! */
732   preference_sum = 0;
733   n = neighbours;
734   while (n != NULL)
735     {
736       n->current_preference /= 2;
737       preference_sum += n->current_preference;
738       n = n->next;
739     }    
740 }
741
742
743 /**
744  * Find the entry for the given neighbour.
745  *
746  * @param peer identity of the neighbour
747  * @return NULL if we are not connected, otherwise the
748  *         neighbour's entry.
749  */
750 static struct Neighbour *
751 find_neighbour (const struct GNUNET_PeerIdentity *peer)
752 {
753   struct Neighbour *ret;
754
755   ret = neighbours;
756   while ((ret != NULL) &&
757          (0 != memcmp (&ret->peer,
758                        peer, sizeof (struct GNUNET_PeerIdentity))))
759     ret = ret->next;
760   return ret;
761 }
762
763
764 /**
765  * Send a message to one of our clients.
766  *
767  * @param client target for the message
768  * @param msg message to transmit
769  * @param can_drop could this message be dropped if the
770  *        client's queue is getting too large?
771  */
772 static void
773 send_to_client (struct Client *client,
774                 const struct GNUNET_MessageHeader *msg, 
775                 int can_drop)
776 {
777 #if DEBUG_CORE_CLIENT
778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779               "Preparing to send message of type %u to client.\n",
780               ntohs (msg->type));
781 #endif  
782   GNUNET_SERVER_notification_context_unicast (notifier,
783                                               client->client_handle,
784                                               msg,
785                                               can_drop);
786 }
787
788
789 /**
790  * Send a message to all of our current clients that have
791  * the right options set.
792  * 
793  * @param msg message to multicast
794  * @param can_drop can this message be discarded if the queue is too long
795  * @param options mask to use 
796  */
797 static void
798 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
799                      int can_drop,
800                      int options)
801 {
802   struct Client *c;
803
804   c = clients;
805   while (c != NULL)
806     {
807       if (0 != (c->options & options))
808         send_to_client (c, msg, can_drop);
809       c = c->next;
810     }
811 }
812
813
814 /**
815  * Handle CORE_INIT request.
816  */
817 static void
818 handle_client_init (void *cls,
819                     struct GNUNET_SERVER_Client *client,
820                     const struct GNUNET_MessageHeader *message)
821 {
822   const struct InitMessage *im;
823   struct InitReplyMessage irm;
824   struct Client *c;
825   uint16_t msize;
826   const uint16_t *types;
827   uint16_t *wtypes;
828   struct Neighbour *n;
829   struct ConnectNotifyMessage cnm;
830   unsigned int i;
831
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Client connecting to core service with `%s' message\n",
835               "INIT");
836 #endif
837   /* check that we don't have an entry already */
838   c = clients;
839   while (c != NULL)
840     {
841       if (client == c->client_handle)
842         {
843           GNUNET_break (0);
844           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
845           return;
846         }
847       c = c->next;
848     }
849   msize = ntohs (message->size);
850   if (msize < sizeof (struct InitMessage))
851     {
852       GNUNET_break (0);
853       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
854       return;
855     }
856   GNUNET_SERVER_notification_context_add (notifier, client);
857   im = (const struct InitMessage *) message;
858   types = (const uint16_t *) &im[1];
859   msize -= sizeof (struct InitMessage);
860   c = GNUNET_malloc (sizeof (struct Client) + msize);
861   c->client_handle = client;
862   c->next = clients;
863   clients = c;
864   c->tcnt = msize / sizeof (uint16_t);
865   c->types = (const uint16_t *) &c[1];
866   wtypes = (uint16_t *) &c[1];
867   for (i=0;i<c->tcnt;i++)
868     wtypes[i] = ntohs (types[i]);
869   c->options = ntohl (im->options);
870 #if DEBUG_CORE_CLIENT
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872               "Client %p is interested in %u message types\n",
873               c,
874               c->tcnt);
875 #endif
876   /* send init reply message */
877   irm.header.size = htons (sizeof (struct InitReplyMessage));
878   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
879   irm.reserved = htonl (0);
880   memcpy (&irm.publicKey,
881           &my_public_key,
882           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
883 #if DEBUG_CORE_CLIENT
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "Sending `%s' message to client.\n", "INIT_REPLY");
886 #endif
887   send_to_client (c, &irm.header, GNUNET_NO);
888   if (c->options & GNUNET_CORE_OPTION_SEND_CONNECT)
889     {
890       /* notify new client about existing neighbours */
891       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
892       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
893       n = neighbours;
894       while (n != NULL)
895         {
896           if (n->status == PEER_STATE_KEY_CONFIRMED)
897             {
898 #if DEBUG_CORE_CLIENT
899               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900                           "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
901 #endif
902               cnm.distance = htonl (n->last_distance);
903               cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
904               cnm.peer = n->peer;
905               send_to_client (c, &cnm.header, GNUNET_NO);
906             }
907           n = n->next;
908         }
909     }
910   GNUNET_SERVER_receive_done (client, GNUNET_OK);
911 }
912
913
914 /**
915  * A client disconnected, clean up.
916  *
917  * @param cls closure
918  * @param client identification of the client
919  */
920 static void
921 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
922 {
923   struct Client *pos;
924   struct Client *prev;
925
926   if (client == NULL)
927     return;
928 #if DEBUG_CORE_CLIENT
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "Client %p has disconnected from core service.\n",
931               client);
932 #endif
933   prev = NULL;
934   pos = clients;
935   while (pos != NULL)
936     {
937       if (client == pos->client_handle)
938         {
939           if (prev == NULL)
940             clients = pos->next;
941           else
942             prev->next = pos->next;
943           GNUNET_free (pos);
944           return;
945         }
946       prev = pos;
947       pos = pos->next;
948     }
949   /* client never sent INIT */
950 }
951
952
953 /**
954  * Handle REQUEST_INFO request.
955  */
956 static void
957 handle_client_request_info (void *cls,
958                             struct GNUNET_SERVER_Client *client,
959                             const struct GNUNET_MessageHeader *message)
960 {
961   const struct RequestInfoMessage *rcm;
962   struct Neighbour *n;
963   struct ConfigurationInfoMessage cim;
964   int32_t want_reserv;
965   int32_t got_reserv;
966   unsigned long long old_preference;
967   struct GNUNET_SERVER_TransmitContext *tc;
968
969 #if DEBUG_CORE_CLIENT
970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971               "Core service receives `%s' request.\n", "REQUEST_INFO");
972 #endif
973   rcm = (const struct RequestInfoMessage *) message;
974   n = find_neighbour (&rcm->peer);
975   memset (&cim, 0, sizeof (cim));
976   if (n != NULL) 
977     {
978       want_reserv = ntohl (rcm->reserve_inbound);
979       if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
980         {
981           n->bw_out_internal_limit = rcm->limit_outbound;
982           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
983                                                   n->bw_out_external_limit);
984           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
985                                                  n->bw_out);
986           GNUNET_TRANSPORT_set_quota (transport,
987                                       &n->peer,
988                                       n->bw_in,
989                                       n->bw_out,
990                                       GNUNET_TIME_UNIT_FOREVER_REL,
991                                       NULL, NULL); 
992         }
993       if (want_reserv < 0)
994         {
995           got_reserv = want_reserv;
996         }
997       else if (want_reserv > 0)
998         {
999           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1000                                                   want_reserv).value == 0)
1001             got_reserv = want_reserv;
1002           else
1003             got_reserv = 0; /* all or nothing */
1004         }
1005       else
1006         got_reserv = 0;
1007       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
1008                                         got_reserv);
1009       old_preference = n->current_preference;
1010       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1011       if (old_preference > n->current_preference) 
1012         {
1013           /* overflow; cap at maximum value */
1014           n->current_preference = (unsigned long long) -1;
1015         }
1016       update_preference_sum (n->current_preference - old_preference);
1017 #if DEBUG_CORE_QUOTA
1018       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
1020                   (int) want_reserv,
1021                   GNUNET_i2s (&rcm->peer),
1022                   (int) got_reserv);
1023 #endif
1024       cim.reserved_amount = htonl (got_reserv);
1025       cim.bw_in = n->bw_in;
1026       cim.bw_out = n->bw_out;
1027       cim.preference = n->current_preference;
1028     }
1029   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1030   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1031   cim.peer = rcm->peer;
1032
1033 #if DEBUG_CORE_CLIENT
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1036 #endif
1037   tc = GNUNET_SERVER_transmit_context_create (client);
1038   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1039   GNUNET_SERVER_transmit_context_run (tc,
1040                                       GNUNET_TIME_UNIT_FOREVER_REL);
1041 }
1042
1043
1044 /**
1045  * Free the given entry for the neighbour (it has
1046  * already been removed from the list at this point).
1047  *
1048  * @param n neighbour to free
1049  */
1050 static void
1051 free_neighbour (struct Neighbour *n)
1052 {
1053   struct MessageEntry *m;
1054
1055   if (n->pitr != NULL)
1056     {
1057       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1058       n->pitr = NULL;
1059     }
1060   if (n->skm != NULL)
1061     {
1062       GNUNET_free (n->skm);
1063       n->skm = NULL;
1064     }
1065   while (NULL != (m = n->messages))
1066     {
1067       n->messages = m->next;
1068       GNUNET_free (m);
1069     }
1070   while (NULL != (m = n->encrypted_head))
1071     {
1072       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1073                                    n->encrypted_tail,
1074                                    m);
1075       GNUNET_free (m);
1076     }
1077   if (NULL != n->th)
1078     {
1079       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1080       n->th = NULL;
1081     }
1082   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1083     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1084   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1085     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1086   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1087     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1088   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1089     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1090   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1091     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
1092   GNUNET_free_non_null (n->public_key);
1093   GNUNET_free_non_null (n->pending_ping);
1094   GNUNET_free_non_null (n->pending_pong);
1095   GNUNET_free (n);
1096 }
1097
1098
1099 /**
1100  * Check if we have encrypted messages for the specified neighbour
1101  * pending, and if so, check with the transport about sending them
1102  * out.
1103  *
1104  * @param n neighbour to check.
1105  */
1106 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1107
1108
1109 /**
1110  * Encrypt size bytes from in and write the result to out.  Use the
1111  * key for outbound traffic of the given neighbour.
1112  *
1113  * @param n neighbour we are sending to
1114  * @param iv initialization vector to use
1115  * @param in ciphertext
1116  * @param out plaintext
1117  * @param size size of in/out
1118  * @return GNUNET_OK on success
1119  */
1120 static int
1121 do_encrypt (struct Neighbour *n,
1122             const GNUNET_HashCode * iv,
1123             const void *in, void *out, size_t size)
1124 {
1125   if (size != (uint16_t) size)
1126     {
1127       GNUNET_break (0);
1128       return GNUNET_NO;
1129     }
1130   GNUNET_assert (size ==
1131                  GNUNET_CRYPTO_aes_encrypt (in,
1132                                             (uint16_t) size,
1133                                             &n->encrypt_key,
1134                                             (const struct
1135                                              GNUNET_CRYPTO_AesInitializationVector
1136                                              *) iv, out));
1137 #if DEBUG_CORE
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Encrypted %u bytes for `%4s' using key %u\n", size,
1140               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1141 #endif
1142   return GNUNET_OK;
1143 }
1144
1145
1146 /**
1147  * Consider freeing the given neighbour since we may not need
1148  * to keep it around anymore.
1149  *
1150  * @param n neighbour to consider discarding
1151  */
1152 static void
1153 consider_free_neighbour (struct Neighbour *n);
1154
1155
1156 /**
1157  * Task triggered when a neighbour entry is about to time out 
1158  * (and we should prevent this by sending a PING).
1159  *
1160  * @param cls the 'struct Neighbour'
1161  * @param tc scheduler context (not used)
1162  */
1163 static void
1164 send_keep_alive (void *cls,
1165                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1166 {
1167   struct Neighbour *n = cls;
1168   struct GNUNET_TIME_Relative retry;
1169   struct GNUNET_TIME_Relative left;
1170   struct MessageEntry *me;
1171   struct PingMessage pp;
1172   struct PingMessage *pm;
1173
1174   n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
1175   /* send PING */
1176   me = GNUNET_malloc (sizeof (struct MessageEntry) +
1177                       sizeof (struct PingMessage));
1178   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
1179   me->priority = PING_PRIORITY;
1180   me->size = sizeof (struct PingMessage);
1181   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1182                                      n->encrypted_tail,
1183                                      n->encrypted_tail,
1184                                      me);
1185   pm = (struct PingMessage *) &me[1];
1186   pm->header.size = htons (sizeof (struct PingMessage));
1187   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
1188   pp.challenge = htonl (n->ping_challenge);
1189   pp.target = n->peer;
1190 #if DEBUG_CORE
1191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192               "Encrypting `%s' and `%s' messages for `%4s'.\n",
1193               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
1194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
1196               "PING",
1197               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
1198 #endif
1199   do_encrypt (n,
1200               &n->peer.hashPubKey,
1201               &pp.challenge,
1202               &pm->challenge,
1203               sizeof (struct PingMessage) -
1204               sizeof (struct GNUNET_MessageHeader));
1205   process_encrypted_neighbour_queue (n);
1206   /* reschedule PING job */
1207   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1208                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1209   retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
1210                                     MIN_PING_FREQUENCY);
1211   n->keep_alive_task 
1212     = GNUNET_SCHEDULER_add_delayed (sched, 
1213                                     retry,
1214                                     &send_keep_alive,
1215                                     n);
1216
1217 }
1218
1219
1220 /**
1221  * Task triggered when a neighbour entry might have gotten stale.
1222  *
1223  * @param cls the 'struct Neighbour'
1224  * @param tc scheduler context (not used)
1225  */
1226 static void
1227 consider_free_task (void *cls,
1228                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1229 {
1230   struct Neighbour *n = cls;
1231
1232   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1233   consider_free_neighbour (n);
1234 }
1235
1236
1237 /**
1238  * Consider freeing the given neighbour since we may not need
1239  * to keep it around anymore.
1240  *
1241  * @param n neighbour to consider discarding
1242  */
1243 static void
1244 consider_free_neighbour (struct Neighbour *n)
1245
1246   struct Neighbour *pos;
1247   struct Neighbour *prev;
1248   struct GNUNET_TIME_Relative left;
1249
1250   if ( (n->th != NULL) ||
1251        (n->pitr != NULL) ||
1252        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1253        (GNUNET_YES == n->is_connected) )
1254     return; /* no chance */
1255   
1256   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1257                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1258   if (left.value > 0)
1259     {
1260       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1261         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1262       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1263                                                          left,
1264                                                          &consider_free_task,
1265                                                          n);
1266       return;
1267     }
1268   /* actually free the neighbour... */
1269   prev = NULL;
1270   pos = neighbours;
1271   while (pos != n)
1272     {
1273       prev = pos;
1274       pos = pos->next;
1275     }
1276   if (prev == NULL)
1277     neighbours = n->next;
1278   else
1279     prev->next = n->next;
1280   GNUNET_assert (neighbour_count > 0);
1281   neighbour_count--;
1282   free_neighbour (n);
1283 }
1284
1285
1286 /**
1287  * Function called when the transport service is ready to
1288  * receive an encrypted message for the respective peer
1289  *
1290  * @param cls neighbour to use message from
1291  * @param size number of bytes we can transmit
1292  * @param buf where to copy the message
1293  * @return number of bytes transmitted
1294  */
1295 static size_t
1296 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1297 {
1298   struct Neighbour *n = cls;
1299   struct MessageEntry *m;
1300   size_t ret;
1301   char *cbuf;
1302
1303   n->th = NULL;
1304   GNUNET_assert (NULL != (m = n->encrypted_head));
1305   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1306                                n->encrypted_tail,
1307                                m);
1308   ret = 0;
1309   cbuf = buf;
1310   if (buf != NULL)
1311     {
1312       GNUNET_assert (size >= m->size);
1313       memcpy (cbuf, &m[1], m->size);
1314       ret = m->size;
1315       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1316                                         m->size);
1317 #if DEBUG_CORE
1318       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1320                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1321                   ret, GNUNET_i2s (&n->peer));
1322 #endif
1323       process_encrypted_neighbour_queue (n);
1324     }
1325   else
1326     {
1327 #if DEBUG_CORE
1328       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329                   "Transmission of message of type %u and size %u failed\n",
1330                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1331                   m->size);
1332 #endif
1333     }
1334   GNUNET_free (m);
1335   consider_free_neighbour (n);
1336   return ret;
1337 }
1338
1339
1340 /**
1341  * Check if we have plaintext messages for the specified neighbour
1342  * pending, and if so, consider batching and encrypting them (and
1343  * then trigger processing of the encrypted queue if needed).
1344  *
1345  * @param n neighbour to check.
1346  */
1347 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1348
1349
1350 /**
1351  * Check if we have encrypted messages for the specified neighbour
1352  * pending, and if so, check with the transport about sending them
1353  * out.
1354  *
1355  * @param n neighbour to check.
1356  */
1357 static void
1358 process_encrypted_neighbour_queue (struct Neighbour *n)
1359 {
1360   struct MessageEntry *m;
1361  
1362   if (n->th != NULL)
1363     return;  /* request already pending */
1364   m = n->encrypted_head;
1365   if (m == NULL)
1366     {
1367       /* encrypted queue empty, try plaintext instead */
1368       process_plaintext_neighbour_queue (n);
1369       return;
1370     }
1371 #if DEBUG_CORE
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1374               m->size,
1375               GNUNET_i2s (&n->peer),
1376               GNUNET_TIME_absolute_get_remaining (m->deadline).
1377               value);
1378 #endif
1379   n->th =
1380     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1381                                             m->size,
1382                                             m->priority,
1383                                             GNUNET_TIME_absolute_get_remaining
1384                                             (m->deadline),
1385                                             &notify_encrypted_transmit_ready,
1386                                             n);
1387   if (n->th == NULL)
1388     {
1389       /* message request too large or duplicate request */
1390       GNUNET_break (0);
1391       /* discard encrypted message */
1392       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1393                                    n->encrypted_tail,
1394                                    m);
1395       GNUNET_free (m);
1396       process_encrypted_neighbour_queue (n);
1397     }
1398 }
1399
1400
1401 /**
1402  * Decrypt size bytes from in and write the result to out.  Use the
1403  * key for inbound traffic of the given neighbour.  This function does
1404  * NOT do any integrity-checks on the result.
1405  *
1406  * @param n neighbour we are receiving from
1407  * @param iv initialization vector to use
1408  * @param in ciphertext
1409  * @param out plaintext
1410  * @param size size of in/out
1411  * @return GNUNET_OK on success
1412  */
1413 static int
1414 do_decrypt (struct Neighbour *n,
1415             const GNUNET_HashCode * iv,
1416             const void *in, void *out, size_t size)
1417 {
1418   if (size != (uint16_t) size)
1419     {
1420       GNUNET_break (0);
1421       return GNUNET_NO;
1422     }
1423   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1424       (n->status != PEER_STATE_KEY_CONFIRMED))
1425     {
1426       GNUNET_break_op (0);
1427       return GNUNET_SYSERR;
1428     }
1429   if (size !=
1430       GNUNET_CRYPTO_aes_decrypt (in,
1431                                  (uint16_t) size,
1432                                  &n->decrypt_key,
1433                                  (const struct
1434                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1435                                  out))
1436     {
1437       GNUNET_break (0);
1438       return GNUNET_SYSERR;
1439     }
1440 #if DEBUG_CORE
1441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1442               "Decrypted %u bytes from `%4s' using key %u\n",
1443               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1444 #endif
1445   return GNUNET_OK;
1446 }
1447
1448
1449 /**
1450  * Select messages for transmission.  This heuristic uses a combination
1451  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1452  * and priority-based discard (in case no feasible schedule exist) and
1453  * speculative optimization (defer any kind of transmission until
1454  * we either create a batch of significant size, 25% of max, or until
1455  * we are close to a deadline).  Furthermore, when scheduling the
1456  * heuristic also packs as many messages into the batch as possible,
1457  * starting with those with the earliest deadline.  Yes, this is fun.
1458  *
1459  * @param n neighbour to select messages from
1460  * @param size number of bytes to select for transmission
1461  * @param retry_time set to the time when we should try again
1462  *        (only valid if this function returns zero)
1463  * @return number of bytes selected, or 0 if we decided to
1464  *         defer scheduling overall; in that case, retry_time is set.
1465  */
1466 static size_t
1467 select_messages (struct Neighbour *n,
1468                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1469 {
1470   struct MessageEntry *pos;
1471   struct MessageEntry *min;
1472   struct MessageEntry *last;
1473   unsigned int min_prio;
1474   struct GNUNET_TIME_Absolute t;
1475   struct GNUNET_TIME_Absolute now;
1476   struct GNUNET_TIME_Relative delta;
1477   uint64_t avail;
1478   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1479   size_t off;
1480   uint64_t tsize;
1481   unsigned int queue_size;
1482   int discard_low_prio;
1483
1484   GNUNET_assert (NULL != n->messages);
1485   now = GNUNET_TIME_absolute_get ();
1486   /* last entry in linked list of messages processed */
1487   last = NULL;
1488   /* should we remove the entry with the lowest
1489      priority from consideration for scheduling at the
1490      end of the loop? */
1491   queue_size = 0;
1492   tsize = 0;
1493   pos = n->messages;
1494   while (pos != NULL)
1495     {
1496       queue_size++;
1497       tsize += pos->size;
1498       pos = pos->next;
1499     }
1500   discard_low_prio = GNUNET_YES;
1501   while (GNUNET_YES == discard_low_prio)
1502     {
1503       min = NULL;
1504       min_prio = -1;
1505       discard_low_prio = GNUNET_NO;
1506       /* calculate number of bytes available for transmission at time "t" */
1507       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1508       t = now;
1509       /* how many bytes have we (hypothetically) scheduled so far */
1510       off = 0;
1511       /* maximum time we can wait before transmitting anything
1512          and still make all of our deadlines */
1513       slack = MAX_CORK_DELAY;
1514       pos = n->messages;
1515       /* note that we use "*2" here because we want to look
1516          a bit further into the future; much more makes no
1517          sense since new message might be scheduled in the
1518          meantime... */
1519       while ((pos != NULL) && (off < size * 2))
1520         {         
1521           if (pos->do_transmit == GNUNET_YES)
1522             {
1523               /* already removed from consideration */
1524               pos = pos->next;
1525               continue;
1526             }
1527           if (discard_low_prio == GNUNET_NO)
1528             {
1529               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1530               if (delta.value > 0)
1531                 {
1532                   // FIXME: HUH? Check!
1533                   t = pos->deadline;
1534                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1535                                                                        delta);
1536                 }
1537               if (avail < pos->size)
1538                 {
1539                   // FIXME: HUH? Check!
1540                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1541                 }
1542               else
1543                 {
1544                   avail -= pos->size;
1545                   /* update slack, considering both its absolute deadline
1546                      and relative deadlines caused by other messages
1547                      with their respective load */
1548                   slack = GNUNET_TIME_relative_min (slack,
1549                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1550                                                                                           avail));
1551                   if (pos->deadline.value <= now.value) 
1552                     {
1553                       /* now or never */
1554                       slack = GNUNET_TIME_UNIT_ZERO;
1555                     }
1556                   else if (GNUNET_YES == pos->got_slack)
1557                     {
1558                       /* should be soon now! */
1559                       slack = GNUNET_TIME_relative_min (slack,
1560                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1561                     }
1562                   else
1563                     {
1564                       slack =
1565                         GNUNET_TIME_relative_min (slack, 
1566                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1567                       pos->got_slack = GNUNET_YES;
1568                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1569                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1570                     }
1571                 }
1572             }
1573           off += pos->size;
1574           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1575           if (pos->priority <= min_prio)
1576             {
1577               /* update min for discard */
1578               min_prio = pos->priority;
1579               min = pos;
1580             }
1581           pos = pos->next;
1582         }
1583       if (discard_low_prio)
1584         {
1585           GNUNET_assert (min != NULL);
1586           /* remove lowest-priority entry from consideration */
1587           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1588         }
1589       last = pos;
1590     }
1591   /* guard against sending "tiny" messages with large headers without
1592      urgent deadlines */
1593   if ( (slack.value > 0) && 
1594        (size > 4 * off) &&
1595        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1596     {
1597       /* less than 25% of message would be filled with deadlines still
1598          being met if we delay by one second or more; so just wait for
1599          more data; but do not wait longer than 1s (since we don't want
1600          to delay messages for a really long time either). */
1601       *retry_time = MAX_CORK_DELAY;
1602       /* reset do_transmit values for next time */
1603       while (pos != last)
1604         {
1605           pos->do_transmit = GNUNET_NO;   
1606           pos = pos->next;
1607         }
1608 #if DEBUG_CORE
1609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1610                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1611                   (unsigned long long) slack.value,
1612                   (unsigned int) off,
1613                   (unsigned int) size);
1614 #endif
1615       return 0;
1616     }
1617   /* select marked messages (up to size) for transmission */
1618   off = 0;
1619   pos = n->messages;
1620   while (pos != last)
1621     {
1622       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1623         {
1624           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1625           off += pos->size;
1626           size -= pos->size;
1627         }
1628       else
1629         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1630       pos = pos->next;
1631     }
1632 #if DEBUG_CORE
1633   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1634               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1635               off, tsize,
1636               queue_size, MAX_PEER_QUEUE_SIZE,
1637               GNUNET_i2s (&n->peer));
1638 #endif
1639   return off;
1640 }
1641
1642
1643 /**
1644  * Batch multiple messages into a larger buffer.
1645  *
1646  * @param n neighbour to take messages from
1647  * @param buf target buffer
1648  * @param size size of buf
1649  * @param deadline set to transmission deadline for the result
1650  * @param retry_time set to the time when we should try again
1651  *        (only valid if this function returns zero)
1652  * @param priority set to the priority of the batch
1653  * @return number of bytes written to buf (can be zero)
1654  */
1655 static size_t
1656 batch_message (struct Neighbour *n,
1657                char *buf,
1658                size_t size,
1659                struct GNUNET_TIME_Absolute *deadline,
1660                struct GNUNET_TIME_Relative *retry_time,
1661                unsigned int *priority)
1662 {
1663   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1664   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1665   struct MessageEntry *pos;
1666   struct MessageEntry *prev;
1667   struct MessageEntry *next;
1668   size_t ret;
1669   
1670   ret = 0;
1671   *priority = 0;
1672   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1673   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1674   if (0 == select_messages (n, size, retry_time))
1675     {
1676 #if DEBUG_CORE
1677       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678                   "No messages selected, will try again in %llu ms\n",
1679                   retry_time->value);
1680 #endif
1681       return 0;
1682     }
1683   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1684   ntm->distance = htonl (n->last_distance);
1685   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1686   ntm->peer = n->peer;
1687   
1688   pos = n->messages;
1689   prev = NULL;
1690   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1691     {
1692       next = pos->next;
1693       if (GNUNET_YES == pos->do_transmit)
1694         {
1695           GNUNET_assert (pos->size <= size);
1696           /* do notifications */
1697           /* FIXME: track if we have *any* client that wants
1698              full notifications and only do this if that is
1699              actually true */
1700           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1701             {
1702               memcpy (&ntm[1], &pos[1], pos->size);
1703               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1704                                         sizeof (struct GNUNET_MessageHeader));
1705               send_to_all_clients (&ntm->header,
1706                                    GNUNET_YES,
1707                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1708             }
1709           else
1710             {
1711               /* message too large for 'full' notifications, we do at
1712                  least the 'hdr' type */
1713               memcpy (&ntm[1],
1714                       &pos[1],
1715                       sizeof (struct GNUNET_MessageHeader));
1716             }
1717           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1718                                     pos->size);
1719           send_to_all_clients (&ntm->header,
1720                                GNUNET_YES,
1721                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1722 #if DEBUG_HANDSHAKE
1723           fprintf (stderr,
1724                    "Encrypting message of type %u\n",
1725                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1726 #endif
1727           /* copy for encrypted transmission */
1728           memcpy (&buf[ret], &pos[1], pos->size);
1729           ret += pos->size;
1730           size -= pos->size;
1731           *priority += pos->priority;
1732 #if DEBUG_CORE
1733           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1734                       "Adding plaintext message of size %u with deadline %llu ms to batch\n",
1735                       pos->size,
1736                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1737 #endif
1738           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1739           GNUNET_free (pos);
1740           if (prev == NULL)
1741             n->messages = next;
1742           else
1743             prev->next = next;
1744         }
1745       else
1746         {
1747           prev = pos;
1748         }
1749       pos = next;
1750     }
1751 #if DEBUG_CORE
1752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753               "Deadline for message batch is %llu ms\n",
1754               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1755 #endif
1756   return ret;
1757 }
1758
1759
1760 /**
1761  * Remove messages with deadlines that have long expired from
1762  * the queue.
1763  *
1764  * @param n neighbour to inspect
1765  */
1766 static void
1767 discard_expired_messages (struct Neighbour *n)
1768 {
1769   struct MessageEntry *prev;
1770   struct MessageEntry *next;
1771   struct MessageEntry *pos;
1772   struct GNUNET_TIME_Absolute now;
1773   struct GNUNET_TIME_Relative delta;
1774
1775   now = GNUNET_TIME_absolute_get ();
1776   prev = NULL;
1777   pos = n->messages;
1778   while (pos != NULL) 
1779     {
1780       next = pos->next;
1781       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1782       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1783         {
1784 #if DEBUG_CORE
1785           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1786                       "Message is %llu ms past due, discarding.\n",
1787                       delta.value);
1788 #endif
1789           if (prev == NULL)
1790             n->messages = next;
1791           else
1792             prev->next = next;
1793           GNUNET_free (pos);
1794         }
1795       else
1796         prev = pos;
1797       pos = next;
1798     }
1799 }
1800
1801
1802 /**
1803  * Signature of the main function of a task.
1804  *
1805  * @param cls closure
1806  * @param tc context information (why was this task triggered now)
1807  */
1808 static void
1809 retry_plaintext_processing (void *cls,
1810                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1811 {
1812   struct Neighbour *n = cls;
1813
1814   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1815   process_plaintext_neighbour_queue (n);
1816 }
1817
1818
1819 /**
1820  * Send our key (and encrypted PING) to the other peer.
1821  *
1822  * @param n the other peer
1823  */
1824 static void send_key (struct Neighbour *n);
1825
1826 /**
1827  * Task that will retry "send_key" if our previous attempt failed
1828  * to yield a PONG.
1829  */
1830 static void
1831 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1832 {
1833   struct Neighbour *n = cls;
1834
1835 #if DEBUG_CORE
1836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837               "Retrying key transmission to `%4s'\n",
1838               GNUNET_i2s (&n->peer));
1839 #endif
1840   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1841   n->set_key_retry_frequency =
1842     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1843   send_key (n);
1844 }
1845
1846
1847 /**
1848  * Check if we have plaintext messages for the specified neighbour
1849  * pending, and if so, consider batching and encrypting them (and
1850  * then trigger processing of the encrypted queue if needed).
1851  *
1852  * @param n neighbour to check.
1853  */
1854 static void
1855 process_plaintext_neighbour_queue (struct Neighbour *n)
1856 {
1857   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1858   size_t used;
1859   size_t esize;
1860   struct EncryptedMessage *em;  /* encrypted message */
1861   struct EncryptedMessage *ph;  /* plaintext header */
1862   struct MessageEntry *me;
1863   unsigned int priority;
1864   struct GNUNET_TIME_Absolute deadline;
1865   struct GNUNET_TIME_Relative retry_time;
1866   GNUNET_HashCode iv;
1867
1868   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1869     {
1870       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1871       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1872     }
1873   switch (n->status)
1874     {
1875     case PEER_STATE_DOWN:
1876       send_key (n);
1877 #if DEBUG_CORE
1878       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1879                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1880                   GNUNET_i2s(&n->peer));
1881 #endif
1882       return;
1883     case PEER_STATE_KEY_SENT:
1884       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1885         n->retry_set_key_task
1886           = GNUNET_SCHEDULER_add_delayed (sched,
1887                                           n->set_key_retry_frequency,
1888                                           &set_key_retry_task, n);    
1889 #if DEBUG_CORE
1890       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1892                   GNUNET_i2s(&n->peer));
1893 #endif
1894       return;
1895     case PEER_STATE_KEY_RECEIVED:
1896       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1897         n->retry_set_key_task
1898           = GNUNET_SCHEDULER_add_delayed (sched,
1899                                           n->set_key_retry_frequency,
1900                                           &set_key_retry_task, n);        
1901 #if DEBUG_CORE
1902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1903                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1904                   GNUNET_i2s(&n->peer));
1905 #endif
1906       return;
1907     case PEER_STATE_KEY_CONFIRMED:
1908       /* ready to continue */
1909       break;
1910     }
1911   discard_expired_messages (n);
1912   if (n->messages == NULL)
1913     {
1914 #if DEBUG_CORE
1915       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916                   "Plaintext message queue for `%4s' is empty.\n",
1917                   GNUNET_i2s(&n->peer));
1918 #endif
1919       return;                   /* no pending messages */
1920     }
1921   if (n->encrypted_head != NULL)
1922     {
1923 #if DEBUG_CORE
1924       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1925                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1926                   GNUNET_i2s(&n->peer));
1927 #endif
1928       return;                   /* wait for messages already encrypted to be
1929                                    processed first! */
1930     }
1931   ph = (struct EncryptedMessage *) pbuf;
1932   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1933   priority = 0;
1934   used = sizeof (struct EncryptedMessage);
1935   used += batch_message (n,
1936                          &pbuf[used],
1937                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1938                          &deadline, &retry_time, &priority);
1939   if (used == sizeof (struct EncryptedMessage))
1940     {
1941 #if DEBUG_CORE
1942       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1943                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1944                   GNUNET_i2s(&n->peer));
1945 #endif
1946       /* no messages selected for sending, try again later... */
1947       n->retry_plaintext_task =
1948         GNUNET_SCHEDULER_add_delayed (sched,
1949                                       retry_time,
1950                                       &retry_plaintext_processing, n);
1951       return;
1952     }
1953 #if DEBUG_CORE_QUOTA
1954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955               "Sending %u b/s as new limit to peer `%4s'\n",
1956               (unsigned int) ntohl (n->bw_in.value__),
1957               GNUNET_i2s (&n->peer));
1958 #endif
1959   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1960   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1961   ph->inbound_bw_limit = n->bw_in;
1962   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1963
1964   /* setup encryption message header */
1965   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1966   me->deadline = deadline;
1967   me->priority = priority;
1968   me->size = used;
1969   em = (struct EncryptedMessage *) &me[1];
1970   em->header.size = htons (used);
1971   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1972   em->iv_seed = ph->iv_seed;
1973   esize = used - ENCRYPTED_HEADER_SIZE;
1974   GNUNET_CRYPTO_hash (&ph->sequence_number,
1975                       esize - sizeof (GNUNET_HashCode), 
1976                       &ph->plaintext_hash);
1977   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
1978   /* encrypt */
1979 #if DEBUG_CORE
1980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1982               esize,
1983               GNUNET_i2s(&n->peer),
1984               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1985 #endif
1986   GNUNET_assert (GNUNET_OK ==
1987                  do_encrypt (n,
1988                              &iv,
1989                              &ph->plaintext_hash,
1990                              &em->plaintext_hash, esize));
1991   /* append to transmission list */
1992   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1993                                      n->encrypted_tail,
1994                                      n->encrypted_tail,
1995                                      me);
1996   process_encrypted_neighbour_queue (n);
1997 }
1998
1999
2000 /**
2001  * Function that recalculates the bandwidth quota for the
2002  * given neighbour and transmits it to the transport service.
2003  * 
2004  * @param cls neighbour for the quota update
2005  * @param tc context
2006  */
2007 static void
2008 neighbour_quota_update (void *cls,
2009                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2010
2011
2012 /**
2013  * Schedule the task that will recalculate the bandwidth
2014  * quota for this peer (and possibly force a disconnect of
2015  * idle peers by calculating a bandwidth of zero).
2016  */
2017 static void
2018 schedule_quota_update (struct Neighbour *n)
2019 {
2020   GNUNET_assert (n->quota_update_task ==
2021                  GNUNET_SCHEDULER_NO_TASK);
2022   n->quota_update_task
2023     = GNUNET_SCHEDULER_add_delayed (sched,
2024                                     QUOTA_UPDATE_FREQUENCY,
2025                                     &neighbour_quota_update,
2026                                     n);
2027 }
2028
2029
2030 /**
2031  * Initialize a new 'struct Neighbour'.
2032  *
2033  * @param pid ID of the new neighbour
2034  * @return handle for the new neighbour
2035  */
2036 static struct Neighbour *
2037 create_neighbour (const struct GNUNET_PeerIdentity *pid)
2038 {
2039   struct Neighbour *n;
2040   struct GNUNET_TIME_Absolute now;
2041
2042   n = GNUNET_malloc (sizeof (struct Neighbour));
2043   n->next = neighbours;
2044   neighbours = n;
2045   neighbour_count++;
2046   n->peer = *pid;
2047   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2048   now = GNUNET_TIME_absolute_get ();
2049   n->encrypt_key_created = now;
2050   n->last_activity = now;
2051   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2052   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2053   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2054   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
2055   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2056   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2057                                                 (uint32_t) - 1);
2058   neighbour_quota_update (n, NULL);
2059   return n;
2060 }
2061
2062
2063 /**
2064  * Handle CORE_SEND request.
2065  *
2066  * @param cls unused
2067  * @param client the client issuing the request
2068  * @param message the "struct SendMessage"
2069  */
2070 static void
2071 handle_client_send (void *cls,
2072                     struct GNUNET_SERVER_Client *client,
2073                     const struct GNUNET_MessageHeader *message)
2074 {
2075   const struct SendMessage *sm;
2076   struct Neighbour *n;
2077   struct MessageEntry *prev;
2078   struct MessageEntry *pos;
2079   struct MessageEntry *e; 
2080   struct MessageEntry *min_prio_entry;
2081   struct MessageEntry *min_prio_prev;
2082   unsigned int min_prio;
2083   unsigned int queue_size;
2084   uint16_t msize;
2085
2086   msize = ntohs (message->size);
2087   if (msize <
2088       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
2089     {
2090       GNUNET_break (0);
2091       if (client != NULL)
2092         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2093       return;
2094     }
2095   sm = (const struct SendMessage *) message;
2096   msize -= sizeof (struct SendMessage);
2097   n = find_neighbour (&sm->peer);
2098   if (n == NULL)
2099     n = create_neighbour (&sm->peer);
2100 #if DEBUG_CORE
2101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2103               "SEND",
2104               msize, 
2105               GNUNET_i2s (&sm->peer));
2106 #endif
2107   /* bound queue size */
2108   discard_expired_messages (n);
2109   min_prio = (unsigned int) -1;
2110   min_prio_entry = NULL;
2111   min_prio_prev = NULL;
2112   queue_size = 0;
2113   prev = NULL;
2114   pos = n->messages;
2115   while (pos != NULL) 
2116     {
2117       if (pos->priority < min_prio)
2118         {
2119           min_prio_entry = pos;
2120           min_prio_prev = prev;
2121           min_prio = pos->priority;
2122         }
2123       queue_size++;
2124       prev = pos;
2125       pos = pos->next;
2126     }
2127   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2128     {
2129       /* queue full */
2130       if (ntohl(sm->priority) <= min_prio)
2131         {
2132           /* discard new entry */
2133 #if DEBUG_CORE
2134           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2135                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2136                       queue_size,
2137                       MAX_PEER_QUEUE_SIZE,
2138                       msize,
2139                       ntohs (message->type));
2140 #endif
2141           if (client != NULL)
2142             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2143           return;
2144         }
2145       /* discard "min_prio_entry" */
2146 #if DEBUG_CORE
2147       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2148                   "Queue full, discarding existing older request\n");
2149 #endif
2150       if (min_prio_prev == NULL)
2151         n->messages = min_prio_entry->next;
2152       else
2153         min_prio_prev->next = min_prio_entry->next;      
2154       GNUNET_free (min_prio_entry);     
2155     }
2156
2157 #if DEBUG_CORE
2158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2159               "Adding transmission request for `%4s' of size %u to queue\n",
2160               GNUNET_i2s (&sm->peer),
2161               msize);
2162 #endif  
2163   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2164   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2165   e->priority = ntohl (sm->priority);
2166   e->size = msize;
2167   memcpy (&e[1], &sm[1], msize);
2168
2169   /* insert, keep list sorted by deadline */
2170   prev = NULL;
2171   pos = n->messages;
2172   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2173     {
2174       prev = pos;
2175       pos = pos->next;
2176     }
2177   if (prev == NULL)
2178     n->messages = e;
2179   else
2180     prev->next = e;
2181   e->next = pos;
2182
2183   /* consider scheduling now */
2184   process_plaintext_neighbour_queue (n);
2185   if (client != NULL)
2186     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2187 }
2188
2189
2190 /**
2191  * Function called when the transport service is ready to
2192  * receive a message.  Only resets 'n->th' to NULL.
2193  *
2194  * @param cls neighbour to use message from
2195  * @param size number of bytes we can transmit
2196  * @param buf where to copy the message
2197  * @return number of bytes transmitted
2198  */
2199 static size_t
2200 notify_transport_connect_done (void *cls, size_t size, void *buf)
2201 {
2202   struct Neighbour *n = cls;
2203
2204   n->th = NULL;
2205   if (buf == NULL)
2206     {
2207       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2208                   _("Failed to connect to `%4s': transport failed to connect\n"),
2209                   GNUNET_i2s (&n->peer));
2210       return 0;
2211     }
2212   send_key (n);
2213   return 0;
2214 }
2215
2216
2217 /**
2218  * Handle CORE_REQUEST_CONNECT request.
2219  *
2220  * @param cls unused
2221  * @param client the client issuing the request
2222  * @param message the "struct ConnectMessage"
2223  */
2224 static void
2225 handle_client_request_connect (void *cls,
2226                                struct GNUNET_SERVER_Client *client,
2227                                const struct GNUNET_MessageHeader *message)
2228 {
2229   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2230   struct Neighbour *n;
2231   struct GNUNET_TIME_Relative timeout;
2232
2233   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2234     {
2235       GNUNET_break (0);
2236       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2237       return;
2238     }
2239   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2240   n = find_neighbour (&cm->peer);
2241   if (n == NULL)
2242     n = create_neighbour (&cm->peer);
2243   if ( (n->is_connected) ||
2244        (n->th != NULL) )
2245     return; /* already connected, or at least trying */
2246 #if DEBUG_CORE
2247   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2248               "Core received `%s' request for `%4s', will try to establish connection\n",
2249               "REQUEST_CONNECT",
2250               GNUNET_i2s (&cm->peer));
2251 #endif
2252   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2253   /* ask transport to connect to the peer */
2254   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2255                                                   &cm->peer,
2256                                                   sizeof (struct GNUNET_MessageHeader), 0,
2257                                                   timeout,
2258                                                   &notify_transport_connect_done,
2259                                                   n);
2260   GNUNET_break (NULL != n->th);
2261 }
2262
2263
2264 /**
2265  * List of handlers for the messages understood by this
2266  * service.
2267  */
2268 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2269   {&handle_client_init, NULL,
2270    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2271   {&handle_client_request_info, NULL,
2272    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2273    sizeof (struct RequestInfoMessage)},
2274   {&handle_client_send, NULL,
2275    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2276   {&handle_client_request_connect, NULL,
2277    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2278    sizeof (struct ConnectMessage)},
2279   {NULL, NULL, 0, 0}
2280 };
2281
2282
2283 /**
2284  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2285  * the neighbour's struct and retry send_key.  Or, if we did not get a
2286  * HELLO, just do nothing.
2287  *
2288  * @param cls the 'struct Neighbour' to retry sending the key for
2289  * @param peer the peer for which this is the HELLO
2290  * @param hello HELLO message of that peer
2291  * @param trust amount of trust we currently have in that peer
2292  */
2293 static void
2294 process_hello_retry_send_key (void *cls,
2295                               const struct GNUNET_PeerIdentity *peer,
2296                               const struct GNUNET_HELLO_Message *hello,
2297                               uint32_t trust)
2298 {
2299   struct Neighbour *n = cls;
2300
2301   if (peer == NULL)
2302     {
2303 #if DEBUG_CORE
2304       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2305                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2306 #endif
2307       n->pitr = NULL;
2308       if (n->public_key != NULL)
2309         {
2310           send_key (n);
2311         }
2312       else
2313         {
2314           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2315             n->retry_set_key_task
2316               = GNUNET_SCHEDULER_add_delayed (sched,
2317                                               n->set_key_retry_frequency,
2318                                               &set_key_retry_task, n);
2319         }
2320       return;
2321     }
2322
2323 #if DEBUG_CORE
2324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2326               GNUNET_i2s (peer));
2327 #endif
2328   if (n->public_key != NULL)
2329     {
2330 #if DEBUG_CORE
2331       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2332               "already have public key for peer %s!! (so why are we here?)\n",
2333               GNUNET_i2s (peer));
2334 #endif
2335       return;
2336     }
2337
2338 #if DEBUG_CORE
2339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2340               "Received new `%s' message for `%4s', initiating key exchange.\n",
2341               "HELLO",
2342               GNUNET_i2s (peer));
2343 #endif
2344   n->public_key =
2345     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2346   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2347     {
2348       GNUNET_free (n->public_key);
2349       n->public_key = NULL;
2350 #if DEBUG_CORE
2351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352               "GNUNET_HELLO_get_key returned awfully\n");
2353 #endif
2354       return;
2355     }
2356 }
2357
2358
2359 /**
2360  * Send our key (and encrypted PING) to the other peer.
2361  *
2362  * @param n the other peer
2363  */
2364 static void
2365 send_key (struct Neighbour *n)
2366 {
2367   struct SetKeyMessage *sm;
2368   struct MessageEntry *me;
2369   struct PingMessage pp;
2370   struct PingMessage *pm;
2371
2372   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2373        (n->pitr != NULL) )
2374     {
2375 #if DEBUG_CORE
2376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377                   "Key exchange in progress with `%4s'.\n",
2378                   GNUNET_i2s (&n->peer));
2379 #endif
2380       return; /* already in progress */
2381     }
2382
2383 #if DEBUG_CORE
2384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2385               "Asked to perform key exchange with `%4s'.\n",
2386               GNUNET_i2s (&n->peer));
2387 #endif
2388   if (n->public_key == NULL)
2389     {
2390       /* lookup n's public key, then try again */
2391 #if DEBUG_CORE
2392       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2394                   GNUNET_i2s (&n->peer));
2395 #endif
2396       GNUNET_assert (n->pitr == NULL);
2397       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2398                                          sched,
2399                                          &n->peer,
2400                                          0,
2401                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2402                                          &process_hello_retry_send_key, n);
2403       return;
2404     }
2405   /* first, set key message */
2406   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2407                       sizeof (struct SetKeyMessage));
2408   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2409   me->priority = SET_KEY_PRIORITY;
2410   me->size = sizeof (struct SetKeyMessage);
2411   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2412                                      n->encrypted_tail,
2413                                      n->encrypted_tail,
2414                                      me);
2415   sm = (struct SetKeyMessage *) &me[1];
2416   sm->header.size = htons (sizeof (struct SetKeyMessage));
2417   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2418   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2419                                         PEER_STATE_KEY_SENT : n->status));
2420   sm->purpose.size =
2421     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2422            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2423            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2424            sizeof (struct GNUNET_PeerIdentity));
2425   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2426   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2427   sm->target = n->peer;
2428   GNUNET_assert (GNUNET_OK ==
2429                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2430                                             sizeof (struct
2431                                                     GNUNET_CRYPTO_AesSessionKey),
2432                                             n->public_key,
2433                                             &sm->encrypted_key));
2434   GNUNET_assert (GNUNET_OK ==
2435                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2436                                          &sm->signature));
2437
2438   /* second, encrypted PING message */
2439   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2440                       sizeof (struct PingMessage));
2441   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2442   me->priority = PING_PRIORITY;
2443   me->size = sizeof (struct PingMessage);
2444   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2445                                      n->encrypted_tail,
2446                                      n->encrypted_tail,
2447                                      me);
2448   pm = (struct PingMessage *) &me[1];
2449   pm->header.size = htons (sizeof (struct PingMessage));
2450   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2451   pp.challenge = htonl (n->ping_challenge);
2452   pp.target = n->peer;
2453 #if DEBUG_CORE
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2456               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2458               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2459               "PING",
2460               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2461 #endif
2462   do_encrypt (n,
2463               &n->peer.hashPubKey,
2464               &pp.challenge,
2465               &pm->challenge,
2466               sizeof (struct PingMessage) -
2467               sizeof (struct GNUNET_MessageHeader));
2468   /* update status */
2469   switch (n->status)
2470     {
2471     case PEER_STATE_DOWN:
2472       n->status = PEER_STATE_KEY_SENT;
2473       break;
2474     case PEER_STATE_KEY_SENT:
2475       break;
2476     case PEER_STATE_KEY_RECEIVED:
2477       break;
2478     case PEER_STATE_KEY_CONFIRMED:
2479       break;
2480     default:
2481       GNUNET_break (0);
2482       break;
2483     }
2484 #if DEBUG_CORE
2485   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486               "Have %llu ms left for `%s' transmission.\n",
2487               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2488               "SET_KEY");
2489 #endif
2490   /* trigger queue processing */
2491   process_encrypted_neighbour_queue (n);
2492   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2493        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2494     n->retry_set_key_task
2495       = GNUNET_SCHEDULER_add_delayed (sched,
2496                                       n->set_key_retry_frequency,
2497                                       &set_key_retry_task, n);    
2498 }
2499
2500
2501 /**
2502  * We received a SET_KEY message.  Validate and update
2503  * our key material and status.
2504  *
2505  * @param n the neighbour from which we received message m
2506  * @param m the set key message we received
2507  */
2508 static void
2509 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2510
2511
2512 /**
2513  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2514  * the neighbour's struct and retry handling the set_key message.  Or,
2515  * if we did not get a HELLO, just free the set key message.
2516  *
2517  * @param cls pointer to the set key message
2518  * @param peer the peer for which this is the HELLO
2519  * @param hello HELLO message of that peer
2520  * @param trust amount of trust we currently have in that peer
2521  */
2522 static void
2523 process_hello_retry_handle_set_key (void *cls,
2524                                     const struct GNUNET_PeerIdentity *peer,
2525                                     const struct GNUNET_HELLO_Message *hello,
2526                                     uint32_t trust)
2527 {
2528   struct Neighbour *n = cls;
2529   struct SetKeyMessage *sm = n->skm;
2530
2531   if (peer == NULL)
2532     {
2533       GNUNET_free (sm);
2534       n->skm = NULL;
2535       n->pitr = NULL;
2536       return;
2537     }
2538   if (n->public_key != NULL)
2539     return;                     /* multiple HELLOs match!? */
2540   n->public_key =
2541     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2542   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2543     {
2544       GNUNET_break_op (0);
2545       GNUNET_free (n->public_key);
2546       n->public_key = NULL;
2547       return;
2548     }
2549 #if DEBUG_CORE
2550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2551               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2552               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2553 #endif
2554   handle_set_key (n, sm);
2555 }
2556
2557
2558 /**
2559  * We received a PING message.  Validate and transmit
2560  * PONG.
2561  *
2562  * @param n sender of the PING
2563  * @param m the encrypted PING message itself
2564  */
2565 static void
2566 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2567 {
2568   struct PingMessage t;
2569   struct PongMessage tx;
2570   struct PongMessage *tp;
2571   struct MessageEntry *me;
2572
2573 #if DEBUG_CORE
2574   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2575               "Core service receives `%s' request from `%4s'.\n",
2576               "PING", GNUNET_i2s (&n->peer));
2577 #endif
2578   if (GNUNET_OK !=
2579       do_decrypt (n,
2580                   &my_identity.hashPubKey,
2581                   &m->challenge,
2582                   &t.challenge,
2583                   sizeof (struct PingMessage) -
2584                   sizeof (struct GNUNET_MessageHeader)))
2585     return;
2586 #if DEBUG_CORE
2587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2588               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2589               "PING",
2590               GNUNET_i2s (&t.target),
2591               ntohl (t.challenge), n->decrypt_key.crc32);
2592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593               "Target of `%s' request is `%4s'.\n",
2594               "PING", GNUNET_i2s (&t.target));
2595 #endif
2596   if (0 != memcmp (&t.target,
2597                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2598     {
2599       GNUNET_break_op (0);
2600       return;
2601     }
2602   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2603                       sizeof (struct PongMessage));
2604   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2605                                      n->encrypted_tail,
2606                                      n->encrypted_tail,
2607                                      me);
2608   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2609   me->priority = PONG_PRIORITY;
2610   me->size = sizeof (struct PongMessage);
2611   tx.reserved = htonl (0);
2612   tx.inbound_bw_limit = n->bw_in;
2613   tx.challenge = t.challenge;
2614   tx.target = t.target;
2615   tp = (struct PongMessage *) &me[1];
2616   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2617   tp->header.size = htons (sizeof (struct PongMessage));
2618   do_encrypt (n,
2619               &my_identity.hashPubKey,
2620               &tx.challenge,
2621               &tp->challenge,
2622               sizeof (struct PongMessage) -
2623               sizeof (struct GNUNET_MessageHeader));
2624 #if DEBUG_CORE
2625   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2626               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2627               ntohl (t.challenge), n->encrypt_key.crc32);
2628 #endif
2629   /* trigger queue processing */
2630   process_encrypted_neighbour_queue (n);
2631 }
2632
2633
2634 /**
2635  * We received a PONG message.  Validate and update our status.
2636  *
2637  * @param n sender of the PONG
2638  * @param m the encrypted PONG message itself
2639  */
2640 static void
2641 handle_pong (struct Neighbour *n, 
2642              const struct PongMessage *m)
2643 {
2644   struct PongMessage t;
2645   struct ConnectNotifyMessage cnm;
2646
2647 #if DEBUG_CORE
2648   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2649               "Core service receives `%s' request from `%4s'.\n",
2650               "PONG", GNUNET_i2s (&n->peer));
2651 #endif
2652   if (GNUNET_OK !=
2653       do_decrypt (n,
2654                   &n->peer.hashPubKey,
2655                   &m->challenge,
2656                   &t.challenge,
2657                   sizeof (struct PongMessage) -
2658                   sizeof (struct GNUNET_MessageHeader)))
2659     return;
2660   if (0 != ntohl (t.reserved))
2661     {
2662       GNUNET_break_op (0);
2663       return;
2664     }
2665 #if DEBUG_CORE
2666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2667               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2668               "PONG",
2669               GNUNET_i2s (&t.target),
2670               ntohl (t.challenge), n->decrypt_key.crc32);
2671 #endif
2672   if ((0 != memcmp (&t.target,
2673                     &n->peer,
2674                     sizeof (struct GNUNET_PeerIdentity))) ||
2675       (n->ping_challenge != ntohl (t.challenge)))
2676     {
2677       /* PONG malformed */
2678 #if DEBUG_CORE
2679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2680                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2681                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2682       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2683                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2684                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2685 #endif
2686       GNUNET_break_op (0);
2687       return;
2688     }
2689   switch (n->status)
2690     {
2691     case PEER_STATE_DOWN:
2692       GNUNET_break (0);         /* should be impossible */
2693       return;
2694     case PEER_STATE_KEY_SENT:
2695       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2696       return;
2697     case PEER_STATE_KEY_RECEIVED:
2698       n->status = PEER_STATE_KEY_CONFIRMED;
2699       if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
2700         {
2701           n->bw_out_external_limit = t.inbound_bw_limit;
2702           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2703                                                   n->bw_out_internal_limit);
2704           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2705                                                  n->bw_out);       
2706           GNUNET_TRANSPORT_set_quota (transport,
2707                                       &n->peer,
2708                                       n->bw_in,
2709                                       n->bw_out,
2710                                       GNUNET_TIME_UNIT_FOREVER_REL,
2711                                       NULL, NULL); 
2712         }
2713 #if DEBUG_CORE
2714       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2715                   "Confirmed key via `%s' message for peer `%4s'\n",
2716                   "PONG", GNUNET_i2s (&n->peer));
2717 #endif
2718       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2719         {
2720           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2721           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2722         }      
2723       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2724       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2725       cnm.distance = htonl (n->last_distance);
2726       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2727       cnm.peer = n->peer;
2728       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2729       process_encrypted_neighbour_queue (n);
2730       /* fall-through! */
2731     case PEER_STATE_KEY_CONFIRMED:
2732       n->last_activity = GNUNET_TIME_absolute_get ();
2733       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
2734         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
2735       n->keep_alive_task 
2736         = GNUNET_SCHEDULER_add_delayed (sched, 
2737                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
2738                                         &send_keep_alive,
2739                                         n);
2740       break;
2741     default:
2742       GNUNET_break (0);
2743       break;
2744     }
2745 }
2746
2747
2748 /**
2749  * We received a SET_KEY message.  Validate and update
2750  * our key material and status.
2751  *
2752  * @param n the neighbour from which we received message m
2753  * @param m the set key message we received
2754  */
2755 static void
2756 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2757 {
2758   struct SetKeyMessage *m_cpy;
2759   struct GNUNET_TIME_Absolute t;
2760   struct GNUNET_CRYPTO_AesSessionKey k;
2761   struct PingMessage *ping;
2762   struct PongMessage *pong;
2763   enum PeerStateMachine sender_status;
2764
2765 #if DEBUG_CORE
2766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2767               "Core service receives `%s' request from `%4s'.\n",
2768               "SET_KEY", GNUNET_i2s (&n->peer));
2769 #endif
2770   if (n->public_key == NULL)
2771     {
2772       if (n->pitr != NULL)
2773         {
2774 #if DEBUG_CORE
2775           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2776                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2777                       "SET_KEY");
2778 #endif
2779           return;
2780         }
2781 #if DEBUG_CORE
2782       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2783                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2784 #endif
2785       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2786       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2787       /* lookup n's public key, then try again */
2788       GNUNET_assert (n->skm == NULL);
2789       n->skm = m_cpy;
2790       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2791                                          sched,
2792                                          &n->peer,
2793                                          0,
2794                                          GNUNET_TIME_UNIT_MINUTES,
2795                                          &process_hello_retry_handle_set_key, n);
2796       return;
2797     }
2798   if (0 != memcmp (&m->target,
2799                    &my_identity,
2800                    sizeof (struct GNUNET_PeerIdentity)))
2801     {
2802       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2803                   _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
2804                   "SET_KEY",
2805                   GNUNET_i2s (&m->target));
2806       return;
2807     }
2808   if ((ntohl (m->purpose.size) !=
2809        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2810        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2811        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2812        sizeof (struct GNUNET_PeerIdentity)) ||
2813       (GNUNET_OK !=
2814        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2815                                  &m->purpose, &m->signature, n->public_key)))
2816     {
2817       /* invalid signature */
2818       GNUNET_break_op (0);
2819       return;
2820     }
2821   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2822   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2823        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2824       (t.value < n->decrypt_key_created.value))
2825     {
2826       /* this could rarely happen due to massive re-ordering of
2827          messages on the network level, but is most likely either
2828          a bug or some adversary messing with us.  Report. */
2829       GNUNET_break_op (0);
2830       return;
2831     }
2832 #if DEBUG_CORE
2833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2834               "Decrypting key material.\n");
2835 #endif  
2836   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2837                                   &m->encrypted_key,
2838                                   &k,
2839                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2840        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2841       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2842     {
2843       /* failed to decrypt !? */
2844       GNUNET_break_op (0);
2845       return;
2846     }
2847
2848   n->decrypt_key = k;
2849   if (n->decrypt_key_created.value != t.value)
2850     {
2851       /* fresh key, reset sequence numbers */
2852       n->last_sequence_number_received = 0;
2853       n->last_packets_bitmap = 0;
2854       n->decrypt_key_created = t;
2855     }
2856   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2857   switch (n->status)
2858     {
2859     case PEER_STATE_DOWN:
2860       n->status = PEER_STATE_KEY_RECEIVED;
2861 #if DEBUG_CORE
2862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2863                   "Responding to `%s' with my own key.\n", "SET_KEY");
2864 #endif
2865       send_key (n);
2866       break;
2867     case PEER_STATE_KEY_SENT:
2868     case PEER_STATE_KEY_RECEIVED:
2869       n->status = PEER_STATE_KEY_RECEIVED;
2870       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2871           (sender_status != PEER_STATE_KEY_CONFIRMED))
2872         {
2873 #if DEBUG_CORE
2874           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2875                       "Responding to `%s' with my own key (other peer has status %u).\n",
2876                       "SET_KEY", sender_status);
2877 #endif
2878           send_key (n);
2879         }
2880       break;
2881     case PEER_STATE_KEY_CONFIRMED:
2882       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2883           (sender_status != PEER_STATE_KEY_CONFIRMED))
2884         {         
2885 #if DEBUG_CORE
2886           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2887                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2888                       "SET_KEY", sender_status);
2889 #endif
2890           send_key (n);
2891         }
2892       break;
2893     default:
2894       GNUNET_break (0);
2895       break;
2896     }
2897   if (n->pending_ping != NULL)
2898     {
2899       ping = n->pending_ping;
2900       n->pending_ping = NULL;
2901       handle_ping (n, ping);
2902       GNUNET_free (ping);
2903     }
2904   if (n->pending_pong != NULL)
2905     {
2906       pong = n->pending_pong;
2907       n->pending_pong = NULL;
2908       handle_pong (n, pong);
2909       GNUNET_free (pong);
2910     }
2911 }
2912
2913
2914 /**
2915  * Send a P2P message to a client.
2916  *
2917  * @param sender who sent us the message?
2918  * @param client who should we give the message to?
2919  * @param m contains the message to transmit
2920  * @param msize number of bytes in buf to transmit
2921  */
2922 static void
2923 send_p2p_message_to_client (struct Neighbour *sender,
2924                             struct Client *client,
2925                             const void *m, size_t msize)
2926 {
2927   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2928   struct NotifyTrafficMessage *ntm;
2929
2930 #if DEBUG_CORE
2931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2932               "Core service passes message from `%4s' of type %u to client.\n",
2933               GNUNET_i2s(&sender->peer),
2934               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2935 #endif
2936   ntm = (struct NotifyTrafficMessage *) buf;
2937   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2938   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2939   ntm->distance = htonl (sender->last_distance);
2940   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2941   ntm->peer = sender->peer;
2942   memcpy (&ntm[1], m, msize);
2943   send_to_client (client, &ntm->header, GNUNET_YES);
2944 }
2945
2946
2947 /**
2948  * Deliver P2P message to interested clients.
2949  *
2950  * @param sender who sent us the message?
2951  * @param m the message
2952  * @param msize size of the message (including header)
2953  */
2954 static void
2955 deliver_message (struct Neighbour *sender,
2956                  const struct GNUNET_MessageHeader *m, size_t msize)
2957 {
2958   struct Client *cpos;
2959   uint16_t type;
2960   unsigned int tpos;
2961   int deliver_full;
2962   int dropped;
2963
2964   type = ntohs (m->type);
2965 #if DEBUG_CORE
2966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2967               "Received encapsulated message of type %u from `%4s'\n",
2968               type,
2969               GNUNET_i2s (&sender->peer));
2970 #endif
2971   dropped = GNUNET_YES;
2972   cpos = clients;
2973   while (cpos != NULL)
2974     {
2975       deliver_full = GNUNET_NO;
2976       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2977         deliver_full = GNUNET_YES;
2978       else
2979         {
2980           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2981             {
2982               if (type != cpos->types[tpos])
2983                 continue;
2984               deliver_full = GNUNET_YES;
2985               break;
2986             }
2987         }
2988       if (GNUNET_YES == deliver_full)
2989         {
2990           send_p2p_message_to_client (sender, cpos, m, msize);
2991           dropped = GNUNET_NO;
2992         }
2993       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2994         {
2995           send_p2p_message_to_client (sender, cpos, m,
2996                                       sizeof (struct GNUNET_MessageHeader));
2997         }
2998       cpos = cpos->next;
2999     }
3000   if (dropped == GNUNET_YES)
3001     {
3002 #if DEBUG_CORE
3003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3004                   "Message of type %u from `%4s' not delivered to any client.\n",
3005                   type,
3006                   GNUNET_i2s (&sender->peer));
3007 #endif
3008       /* FIXME: stats... */
3009     }
3010 }
3011
3012
3013 /**
3014  * Align P2P message and then deliver to interested clients.
3015  *
3016  * @param sender who sent us the message?
3017  * @param buffer unaligned (!) buffer containing message
3018  * @param msize size of the message (including header)
3019  */
3020 static void
3021 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
3022 {
3023   char abuf[msize];
3024
3025   /* TODO: call to statistics? */
3026   memcpy (abuf, buffer, msize);
3027   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
3028 }
3029
3030
3031 /**
3032  * Deliver P2P messages to interested clients.
3033  *
3034  * @param sender who sent us the message?
3035  * @param buffer buffer containing messages, can be modified
3036  * @param buffer_size size of the buffer (overall)
3037  * @param offset offset where messages in the buffer start
3038  */
3039 static void
3040 deliver_messages (struct Neighbour *sender,
3041                   const char *buffer, size_t buffer_size, size_t offset)
3042 {
3043   struct GNUNET_MessageHeader *mhp;
3044   struct GNUNET_MessageHeader mh;
3045   uint16_t msize;
3046   int need_align;
3047
3048   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
3049     {
3050       if (0 != offset % sizeof (uint16_t))
3051         {
3052           /* outch, need to copy to access header */
3053           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
3054           mhp = &mh;
3055         }
3056       else
3057         {
3058           /* can access header directly */
3059           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
3060         }
3061       msize = ntohs (mhp->size);
3062       if (msize + offset > buffer_size)
3063         {
3064           /* malformed message, header says it is larger than what
3065              would fit into the overall buffer */
3066           GNUNET_break_op (0);
3067           break;
3068         }
3069 #if HAVE_UNALIGNED_64_ACCESS
3070       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
3071 #else
3072       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
3073 #endif
3074       if (GNUNET_YES == need_align)
3075         align_and_deliver (sender, &buffer[offset], msize);
3076       else
3077         deliver_message (sender,
3078                          (const struct GNUNET_MessageHeader *)
3079                          &buffer[offset], msize);
3080       offset += msize;
3081     }
3082 }
3083
3084
3085 /**
3086  * We received an encrypted message.  Decrypt, validate and
3087  * pass on to the appropriate clients.
3088  */
3089 static void
3090 handle_encrypted_message (struct Neighbour *n,
3091                           const struct EncryptedMessage *m)
3092 {
3093   size_t size = ntohs (m->header.size);
3094   char buf[size];
3095   struct EncryptedMessage *pt;  /* plaintext */
3096   GNUNET_HashCode ph;
3097   size_t off;
3098   uint32_t snum;
3099   struct GNUNET_TIME_Absolute t;
3100   GNUNET_HashCode iv;
3101
3102 #if DEBUG_CORE
3103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3104               "Core service receives `%s' request from `%4s'.\n",
3105               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
3106 #endif  
3107   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
3108   /* decrypt */
3109   if (GNUNET_OK !=
3110       do_decrypt (n,
3111                   &iv,
3112                   &m->plaintext_hash,
3113                   &buf[ENCRYPTED_HEADER_SIZE], 
3114                   size - ENCRYPTED_HEADER_SIZE))
3115     return;
3116   pt = (struct EncryptedMessage *) buf;
3117
3118   /* validate hash */
3119   GNUNET_CRYPTO_hash (&pt->sequence_number,
3120                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3121   if (0 != memcmp (&ph, 
3122                    &pt->plaintext_hash, 
3123                    sizeof (GNUNET_HashCode)))
3124     {
3125       /* checksum failed */
3126       GNUNET_break_op (0);
3127       return;
3128     }
3129
3130   /* validate sequence number */
3131   snum = ntohl (pt->sequence_number);
3132   if (n->last_sequence_number_received == snum)
3133     {
3134       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3135                   "Received duplicate message, ignoring.\n");
3136       /* duplicate, ignore */
3137       return;
3138     }
3139   if ((n->last_sequence_number_received > snum) &&
3140       (n->last_sequence_number_received - snum > 32))
3141     {
3142       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3143                   "Received ancient out of sequence message, ignoring.\n");
3144       /* ancient out of sequence, ignore */
3145       return;
3146     }
3147   if (n->last_sequence_number_received > snum)
3148     {
3149       unsigned int rotbit =
3150         1 << (n->last_sequence_number_received - snum - 1);
3151       if ((n->last_packets_bitmap & rotbit) != 0)
3152         {
3153           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3154                       "Received duplicate message, ignoring.\n");
3155           /* duplicate, ignore */
3156           return;
3157         }
3158       n->last_packets_bitmap |= rotbit;
3159     }
3160   if (n->last_sequence_number_received < snum)
3161     {
3162       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3163       n->last_sequence_number_received = snum;
3164     }
3165
3166   /* check timestamp */
3167   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3168   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3169     {
3170       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3171                   _
3172                   ("Message received far too old (%llu ms). Content ignored.\n"),
3173                   GNUNET_TIME_absolute_get_duration (t).value);
3174       return;
3175     }
3176
3177   /* process decrypted message(s) */
3178   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3179     {
3180 #if DEBUG_CORE_SET_QUOTA
3181       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3182                   "Received %u b/s as new inbound limit for peer `%4s'\n",
3183                   (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3184                   GNUNET_i2s (&n->peer));
3185 #endif
3186       n->bw_out_external_limit = pt->inbound_bw_limit;
3187       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3188                                               n->bw_out_internal_limit);
3189       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3190                                              n->bw_out);
3191       GNUNET_TRANSPORT_set_quota (transport,
3192                                   &n->peer,
3193                                   n->bw_in,
3194                                   n->bw_out,
3195                                   GNUNET_TIME_UNIT_FOREVER_REL,
3196                                   NULL, NULL); 
3197     }
3198   n->last_activity = GNUNET_TIME_absolute_get ();
3199   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3200     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3201   n->keep_alive_task 
3202     = GNUNET_SCHEDULER_add_delayed (sched, 
3203                                     GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3204                                     &send_keep_alive,
3205                                     n);
3206   off = sizeof (struct EncryptedMessage);
3207   deliver_messages (n, buf, size, off);
3208 }
3209
3210
3211 /**
3212  * Function called by the transport for each received message.
3213  *
3214  * @param cls closure
3215  * @param peer (claimed) identity of the other peer
3216  * @param message the message
3217  * @param latency estimated latency for communicating with the
3218  *             given peer (round-trip)
3219  * @param distance in overlay hops, as given by transport plugin
3220  */
3221 static void
3222 handle_transport_receive (void *cls,
3223                           const struct GNUNET_PeerIdentity *peer,
3224                           const struct GNUNET_MessageHeader *message,
3225                           struct GNUNET_TIME_Relative latency,
3226                           unsigned int distance)
3227 {
3228   struct Neighbour *n;
3229   struct GNUNET_TIME_Absolute now;
3230   int up;
3231   uint16_t type;
3232   uint16_t size;
3233
3234 #if DEBUG_CORE
3235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3236               "Received message of type %u from `%4s', demultiplexing.\n",
3237               ntohs (message->type), GNUNET_i2s (peer));
3238 #endif
3239   n = find_neighbour (peer);
3240   if (n == NULL)
3241     n = create_neighbour (peer);
3242   if (n == NULL)
3243     return;   
3244   n->last_latency = latency;
3245   n->last_distance = distance;
3246   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3247   type = ntohs (message->type);
3248   size = ntohs (message->size);
3249 #if DEBUG_HANDSHAKE
3250   fprintf (stderr,
3251            "Received message of type %u from `%4s'\n",
3252            type,
3253            GNUNET_i2s (peer));
3254 #endif
3255   switch (type)
3256     {
3257     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3258       if (size != sizeof (struct SetKeyMessage))
3259         {
3260           GNUNET_break_op (0);
3261           return;
3262         }
3263       handle_set_key (n, (const struct SetKeyMessage *) message);
3264       break;
3265     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3266       if (size < sizeof (struct EncryptedMessage) +
3267           sizeof (struct GNUNET_MessageHeader))
3268         {
3269           GNUNET_break_op (0);
3270           return;
3271         }
3272       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3273           (n->status != PEER_STATE_KEY_CONFIRMED))
3274         {
3275           GNUNET_break_op (0);
3276           return;
3277         }
3278       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3279       break;
3280     case GNUNET_MESSAGE_TYPE_CORE_PING:
3281       if (size != sizeof (struct PingMessage))
3282         {
3283           GNUNET_break_op (0);
3284           return;
3285         }
3286       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3287           (n->status != PEER_STATE_KEY_CONFIRMED))
3288         {
3289 #if DEBUG_CORE
3290           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3291                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3292                       "PING", GNUNET_i2s (&n->peer));
3293 #endif
3294           GNUNET_free_non_null (n->pending_ping);
3295           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3296           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3297           return;
3298         }
3299       handle_ping (n, (const struct PingMessage *) message);
3300       break;
3301     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3302       if (size != sizeof (struct PongMessage))
3303         {
3304           GNUNET_break_op (0);
3305           return;
3306         }
3307       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3308            (n->status != PEER_STATE_KEY_CONFIRMED) )
3309         {
3310 #if DEBUG_CORE
3311           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3312                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3313                       "PONG", GNUNET_i2s (&n->peer));
3314 #endif
3315           GNUNET_free_non_null (n->pending_pong);
3316           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3317           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3318           return;
3319         }
3320       handle_pong (n, (const struct PongMessage *) message);
3321       break;
3322     default:
3323       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3324                   _("Unsupported message of type %u received.\n"), type);
3325       return;
3326     }
3327   if (n->status == PEER_STATE_KEY_CONFIRMED)
3328     {
3329       now = GNUNET_TIME_absolute_get ();
3330       n->last_activity = now;
3331       if (!up)
3332         n->time_established = now;
3333       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3334         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3335       n->keep_alive_task 
3336         = GNUNET_SCHEDULER_add_delayed (sched, 
3337                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3338                                         &send_keep_alive,
3339                                         n);
3340     }
3341 }
3342
3343
3344 /**
3345  * Function that recalculates the bandwidth quota for the
3346  * given neighbour and transmits it to the transport service.
3347  * 
3348  * @param cls neighbour for the quota update
3349  * @param tc context
3350  */
3351 static void
3352 neighbour_quota_update (void *cls,
3353                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3354 {
3355   struct Neighbour *n = cls;
3356   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3357   double pref_rel;
3358   double share;
3359   unsigned long long distributable;
3360   uint64_t need_per_peer;
3361   uint64_t need_per_second;
3362   
3363   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3364   /* calculate relative preference among all neighbours;
3365      divides by a bit more to avoid division by zero AND to
3366      account for possibility of new neighbours joining any time 
3367      AND to convert to double... */
3368   if (preference_sum == 0)
3369     {
3370       pref_rel = 1.0 / (double) neighbour_count;
3371     }
3372   else
3373     {
3374       pref_rel = n->current_preference / preference_sum;
3375     }
3376   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3377                                                               GNUNET_TIME_UNIT_SECONDS);  
3378   need_per_second = need_per_peer * neighbour_count;
3379   distributable = 0;
3380   if (bandwidth_target_out_bps > need_per_second)
3381     distributable = bandwidth_target_out_bps - need_per_second;
3382   share = distributable * pref_rel;
3383   if (share + need_per_peer > ( (uint32_t)-1))
3384     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3385   else
3386     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3387   /* check if we want to disconnect for good due to inactivity */
3388   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3389        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3390     {
3391 #if DEBUG_CORE
3392       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3393                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3394                   GNUNET_i2s (&n->peer));
3395 #endif
3396       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3397     }
3398 #if DEBUG_CORE_QUOTA
3399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3400               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3401               GNUNET_i2s (&n->peer),
3402               (unsigned int) ntohl (q_in.value__),
3403               bandwidth_target_out_bps,
3404               (unsigned int) ntohl (n->bw_in.value__),
3405               (unsigned int) ntohl (n->bw_out.value__),
3406               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3407 #endif
3408   if (n->bw_in.value__ != q_in.value__) 
3409     {
3410       n->bw_in = q_in;
3411       GNUNET_TRANSPORT_set_quota (transport,
3412                                   &n->peer,
3413                                   n->bw_in,
3414                                   n->bw_out,
3415                                   GNUNET_TIME_UNIT_FOREVER_REL,
3416                                   NULL, NULL);
3417     }
3418   schedule_quota_update (n);
3419 }
3420
3421
3422 /**
3423  * Function called by transport to notify us that
3424  * a peer connected to us (on the network level).
3425  *
3426  * @param cls closure
3427  * @param peer the peer that connected
3428  * @param latency current latency of the connection
3429  * @param distance in overlay hops, as given by transport plugin
3430  */
3431 static void
3432 handle_transport_notify_connect (void *cls,
3433                                  const struct GNUNET_PeerIdentity *peer,
3434                                  struct GNUNET_TIME_Relative latency,
3435                                  unsigned int distance)
3436 {
3437   struct Neighbour *n;
3438   struct ConnectNotifyMessage cnm;
3439
3440   n = find_neighbour (peer);
3441   if (n != NULL)
3442     {
3443       if (n->is_connected)
3444         {
3445           /* duplicate connect notification!? */
3446           GNUNET_break (0);
3447           return;
3448         }
3449     }
3450   else
3451     {
3452       n = create_neighbour (peer);
3453     }
3454   n->is_connected = GNUNET_YES;      
3455   n->last_latency = latency;
3456   n->last_distance = distance;
3457   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3458                                  n->bw_out,
3459                                  MAX_WINDOW_TIME_S);
3460   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3461                                  n->bw_in,
3462                                  MAX_WINDOW_TIME_S);  
3463 #if DEBUG_CORE
3464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3465               "Received connection from `%4s'.\n",
3466               GNUNET_i2s (&n->peer));
3467 #endif
3468   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3469   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3470   cnm.distance = htonl (n->last_distance);
3471   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3472   cnm.peer = *peer;
3473   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3474   GNUNET_TRANSPORT_set_quota (transport,
3475                               &n->peer,
3476                               n->bw_in,
3477                               n->bw_out,
3478                               GNUNET_TIME_UNIT_FOREVER_REL,
3479                               NULL, NULL);
3480   send_key (n); 
3481 }
3482
3483
3484 /**
3485  * Function called by transport telling us that a peer
3486  * disconnected.
3487  *
3488  * @param cls closure
3489  * @param peer the peer that disconnected
3490  */
3491 static void
3492 handle_transport_notify_disconnect (void *cls,
3493                                     const struct GNUNET_PeerIdentity *peer)
3494 {
3495   struct DisconnectNotifyMessage cnm;
3496   struct Neighbour *n;
3497
3498 #if DEBUG_CORE
3499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3500               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3501 #endif
3502   n = find_neighbour (peer);
3503   if (n == NULL)
3504     {
3505       GNUNET_break (0);
3506       return;
3507     }
3508   GNUNET_break (n->is_connected);
3509   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3510   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3511   cnm.peer = *peer;
3512   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3513   n->is_connected = GNUNET_NO;
3514 }
3515
3516
3517 /**
3518  * Last task run during shutdown.  Disconnects us from
3519  * the transport.
3520  */
3521 static void
3522 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3523 {
3524   struct Neighbour *n;
3525   struct Client *c;
3526
3527 #if DEBUG_CORE
3528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3529               "Core service shutting down.\n");
3530 #endif
3531   GNUNET_assert (transport != NULL);
3532   GNUNET_TRANSPORT_disconnect (transport);
3533   transport = NULL;
3534   while (NULL != (n = neighbours))
3535     {
3536       neighbours = n->next;
3537       GNUNET_assert (neighbour_count > 0);
3538       neighbour_count--;
3539       free_neighbour (n);
3540     }
3541   GNUNET_SERVER_notification_context_destroy (notifier);
3542   notifier = NULL;
3543   while (NULL != (c = clients))
3544     handle_client_disconnect (NULL, c->client_handle);
3545   if (my_private_key != NULL)
3546     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3547 }
3548
3549
3550 /**
3551  * Initiate core service.
3552  *
3553  * @param cls closure
3554  * @param s scheduler to use
3555  * @param serv the initialized server
3556  * @param c configuration to use
3557  */
3558 static void
3559 run (void *cls,
3560      struct GNUNET_SCHEDULER_Handle *s,
3561      struct GNUNET_SERVER_Handle *serv,
3562      const struct GNUNET_CONFIGURATION_Handle *c)
3563 {
3564   char *keyfile;
3565
3566   sched = s;
3567   cfg = c;  
3568   /* parse configuration */
3569   if (
3570        (GNUNET_OK !=
3571         GNUNET_CONFIGURATION_get_value_number (c,
3572                                                "CORE",
3573                                                "TOTAL_QUOTA_IN",
3574                                                &bandwidth_target_in_bps)) ||
3575        (GNUNET_OK !=
3576         GNUNET_CONFIGURATION_get_value_number (c,
3577                                                "CORE",
3578                                                "TOTAL_QUOTA_OUT",
3579                                                &bandwidth_target_out_bps)) ||
3580        (GNUNET_OK !=
3581         GNUNET_CONFIGURATION_get_value_filename (c,
3582                                                  "GNUNETD",
3583                                                  "HOSTKEY", &keyfile)))
3584     {
3585       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3586                   _
3587                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3588       GNUNET_SCHEDULER_shutdown (s);
3589       return;
3590     }
3591   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3592   GNUNET_free (keyfile);
3593   if (my_private_key == NULL)
3594     {
3595       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3596                   _("Core service could not access hostkey.  Exiting.\n"));
3597       GNUNET_SCHEDULER_shutdown (s);
3598       return;
3599     }
3600   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3601   GNUNET_CRYPTO_hash (&my_public_key,
3602                       sizeof (my_public_key), &my_identity.hashPubKey);
3603   /* setup notification */
3604   server = serv;
3605   notifier = GNUNET_SERVER_notification_context_create (server, 
3606                                                         MAX_NOTIFY_QUEUE);
3607   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3608   /* setup transport connection */
3609   transport = GNUNET_TRANSPORT_connect (sched,
3610                                         cfg,
3611                                         NULL,
3612                                         &handle_transport_receive,
3613                                         &handle_transport_notify_connect,
3614                                         &handle_transport_notify_disconnect);
3615   GNUNET_assert (NULL != transport);
3616   GNUNET_SCHEDULER_add_delayed (sched,
3617                                 GNUNET_TIME_UNIT_FOREVER_REL,
3618                                 &cleaning_task, NULL);
3619   /* process client requests */
3620   GNUNET_SERVER_add_handlers (server, handlers);
3621   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3622               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3623 }
3624
3625
3626
3627 /**
3628  * The main function for the transport service.
3629  *
3630  * @param argc number of arguments from the command line
3631  * @param argv command line arguments
3632  * @return 0 ok, 1 on error
3633  */
3634 int
3635 main (int argc, char *const *argv)
3636 {
3637   return (GNUNET_OK ==
3638           GNUNET_SERVICE_run (argc,
3639                               argv,
3640                               "core",
3641                               GNUNET_SERVICE_OPTION_NONE,
3642                               &run, NULL)) ? 0 : 1;
3643 }
3644
3645 /* end of gnunet-service-core.c */