fixes
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 #define DEBUG_CORE_QUOTA GNUNET_NO
47
48 /**
49  * Receive and send buffer windows grow over time.  For
50  * how long can 'unused' bandwidth accumulate before we
51  * need to cap it?  (specified in seconds).
52  */
53 #define MAX_WINDOW_TIME_S (5 * 60)
54
55 /**
56  * How many messages do we queue up at most for optional
57  * notifications to a client?  (this can cause notifications
58  * about outgoing messages to be dropped).
59  */
60 #define MAX_NOTIFY_QUEUE 16
61
62 /**
63  * Minimum bandwidth (out) to assign to any connected peer.
64  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
65  * sense.
66  */
67 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * How long do we delay messages to get larger packet sizes (CORKing)?
80  */
81 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
82
83 /**
84  * What is the maximum delay for a SET_KEY message?
85  */
86 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
87
88 /**
89  * What how long do we wait for SET_KEY confirmation initially?
90  */
91 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
92
93 /**
94  * What is the maximum delay for a PING message?
95  */
96 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
97
98 /**
99  * What is the maximum delay for a PONG message?
100  */
101 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
102
103 /**
104  * What is the minimum frequency for a PING message?
105  */
106 #define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
107
108 /**
109  * How often do we recalculate bandwidth quotas?
110  */
111 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
112
113 /**
114  * What is the priority for a SET_KEY message?
115  */
116 #define SET_KEY_PRIORITY 0xFFFFFF
117
118 /**
119  * What is the priority for a PING message?
120  */
121 #define PING_PRIORITY 0xFFFFFF
122
123 /**
124  * What is the priority for a PONG message?
125  */
126 #define PONG_PRIORITY 0xFFFFFF
127
128 /**
129  * How many messages do we queue per peer at most?  Must be at
130  * least two.
131  */
132 #define MAX_PEER_QUEUE_SIZE 16
133
134 /**
135  * How many non-mandatory messages do we queue per client at most?
136  */
137 #define MAX_CLIENT_QUEUE_SIZE 32
138
139 /**
140  * What is the maximum age of a message for us to consider
141  * processing it?  Note that this looks at the timestamp used
142  * by the other peer, so clock skew between machines does
143  * come into play here.  So this should be picked high enough
144  * so that a little bit of clock skew does not prevent peers
145  * from connecting to us.
146  */
147 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
148
149 /**
150  * What is the maximum size for encrypted messages?  Note that this
151  * number imposes a clear limit on the maximum size of any message.
152  * Set to a value close to 64k but not so close that transports will
153  * have trouble with their headers.
154  */
155 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
156
157
158 /**
159  * State machine for our P2P encryption handshake.  Everyone starts in
160  * "DOWN", if we receive the other peer's key (other peer initiated)
161  * we start in state RECEIVED (since we will immediately send our
162  * own); otherwise we start in SENT.  If we get back a PONG from
163  * within either state, we move up to CONFIRMED (the PONG will always
164  * be sent back encrypted with the key we sent to the other peer).
165  */
166 enum PeerStateMachine
167 {
168   PEER_STATE_DOWN,
169   PEER_STATE_KEY_SENT,
170   PEER_STATE_KEY_RECEIVED,
171   PEER_STATE_KEY_CONFIRMED
172 };
173
174
175 /**
176  * Number of bytes (at the beginning) of "struct EncryptedMessage"
177  * that are NOT encrypted.
178  */
179 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
180
181
182 /**
183  * Encapsulation for encrypted messages exchanged between
184  * peers.  Followed by the actual encrypted data.
185  */
186 struct EncryptedMessage
187 {
188   /**
189    * Message type is either CORE_ENCRYPTED_MESSAGE.
190    */
191   struct GNUNET_MessageHeader header;
192
193   /**
194    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
195    * be set to the offset of the *next* field.
196    */
197   uint32_t iv_seed GNUNET_PACKED;
198
199   /**
200    * Hash of the plaintext (starting at 'sequence_number'), used to
201    * verify message integrity.  Everything after this hash (including
202    * this hash itself) will be encrypted.  
203    */
204   GNUNET_HashCode plaintext_hash;
205
206   /**
207    * Sequence number, in network byte order.  This field
208    * must be the first encrypted/decrypted field and the
209    * first byte that is hashed for the plaintext hash.
210    */
211   uint32_t sequence_number GNUNET_PACKED;
212
213   /**
214    * Desired bandwidth (how much we should send to this peer / how
215    * much is the sender willing to receive)?
216    */
217   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
218
219   /**
220    * Timestamp.  Used to prevent reply of ancient messages
221    * (recent messages are caught with the sequence number).
222    */
223   struct GNUNET_TIME_AbsoluteNBO timestamp;
224
225 };
226
227
228 /**
229  * We're sending an (encrypted) PING to the other peer to check if he
230  * can decrypt.  The other peer should respond with a PONG with the
231  * same content, except this time encrypted with the receiver's key.
232  */
233 struct PingMessage
234 {
235   /**
236    * Message type is CORE_PING.
237    */
238   struct GNUNET_MessageHeader header;
239
240   /**
241    * Random number chosen to make reply harder.
242    */
243   uint32_t challenge GNUNET_PACKED;
244
245   /**
246    * Intended target of the PING, used primarily to check
247    * that decryption actually worked.
248    */
249   struct GNUNET_PeerIdentity target;
250 };
251
252
253
254 /**
255  * Response to a PING.  Includes data from the original PING
256  * plus initial bandwidth quota information.
257  */
258 struct PongMessage
259 {
260   /**
261    * Message type is CORE_PONG.
262    */
263   struct GNUNET_MessageHeader header;
264
265   /**
266    * Random number proochosen to make reply harder.  Must be
267    * first field after header (this is where we start to encrypt!).
268    */
269   uint32_t challenge GNUNET_PACKED;
270
271   /**
272    * Must be zero.
273    */
274   uint32_t reserved GNUNET_PACKED;
275
276   /**
277    * Desired bandwidth (how much we should send to this
278    * peer / how much is the sender willing to receive).
279    */
280   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
281
282   /**
283    * Intended target of the PING, used primarily to check
284    * that decryption actually worked.
285    */
286   struct GNUNET_PeerIdentity target;
287 };
288
289
290 /**
291  * Message transmitted to set (or update) a session key.
292  */
293 struct SetKeyMessage
294 {
295
296   /**
297    * Message type is either CORE_SET_KEY.
298    */
299   struct GNUNET_MessageHeader header;
300
301   /**
302    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
303    */
304   int32_t sender_status GNUNET_PACKED;
305
306   /**
307    * Purpose of the signature, will be
308    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
309    */
310   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
311
312   /**
313    * At what time was this key created?
314    */
315   struct GNUNET_TIME_AbsoluteNBO creation_time;
316
317   /**
318    * The encrypted session key.
319    */
320   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
321
322   /**
323    * Who is the intended recipient?
324    */
325   struct GNUNET_PeerIdentity target;
326
327   /**
328    * Signature of the stuff above (starting at purpose).
329    */
330   struct GNUNET_CRYPTO_RsaSignature signature;
331
332 };
333
334
335 /**
336  * Message waiting for transmission. This struct
337  * is followed by the actual content of the message.
338  */
339 struct MessageEntry
340 {
341
342   /**
343    * We keep messages in a doubly linked list.
344    */
345   struct MessageEntry *next;
346
347   /**
348    * We keep messages in a doubly linked list.
349    */
350   struct MessageEntry *prev;
351
352   /**
353    * By when are we supposed to transmit this message?
354    */
355   struct GNUNET_TIME_Absolute deadline;
356
357   /**
358    * By when are we supposed to transmit this message (after
359    * giving slack)?
360    */
361   struct GNUNET_TIME_Absolute slack_deadline;
362
363   /**
364    * How important is this message to us?
365    */
366   unsigned int priority;
367
368   /**
369    * How long is the message? (number of bytes following
370    * the "struct MessageEntry", but not including the
371    * size of "struct MessageEntry" itself!)
372    */
373   uint16_t size;
374
375   /**
376    * Was this message selected for transmission in the
377    * current round? GNUNET_YES or GNUNET_NO.
378    */
379   int8_t do_transmit;
380
381   /**
382    * Did we give this message some slack (delayed sending) previously
383    * (and hence should not give it any more slack)? GNUNET_YES or
384    * GNUNET_NO.
385    */
386   int8_t got_slack;
387
388 };
389
390
391 struct Neighbour
392 {
393   /**
394    * We keep neighbours in a linked list (for now).
395    */
396   struct Neighbour *next;
397
398   /**
399    * Unencrypted messages destined for this peer.
400    */
401   struct MessageEntry *messages;
402
403   /**
404    * Head of the batched, encrypted message queue (already ordered,
405    * transmit starting with the head).
406    */
407   struct MessageEntry *encrypted_head;
408
409   /**
410    * Tail of the batched, encrypted message queue (already ordered,
411    * append new messages to tail)
412    */
413   struct MessageEntry *encrypted_tail;
414
415   /**
416    * Handle for pending requests for transmission to this peer
417    * with the transport service.  NULL if no request is pending.
418    */
419   struct GNUNET_TRANSPORT_TransmitHandle *th;
420
421   /**
422    * Public key of the neighbour, NULL if we don't have it yet.
423    */
424   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
425
426   /**
427    * We received a PING message before we got the "public_key"
428    * (or the SET_KEY).  We keep it here until we have a key
429    * to decrypt it.  NULL if no PING is pending.
430    */
431   struct PingMessage *pending_ping;
432
433   /**
434    * We received a PONG message before we got the "public_key"
435    * (or the SET_KEY).  We keep it here until we have a key
436    * to decrypt it.  NULL if no PONG is pending.
437    */
438   struct PongMessage *pending_pong;
439
440   /**
441    * Non-NULL if we are currently looking up HELLOs for this peer.
442    * for this peer.
443    */
444   struct GNUNET_PEERINFO_IteratorContext *pitr;
445
446   /**
447    * SetKeyMessage to transmit, NULL if we are not currently trying
448    * to send one.
449    */
450   struct SetKeyMessage *skm;
451
452   /**
453    * Identity of the neighbour.
454    */
455   struct GNUNET_PeerIdentity peer;
456
457   /**
458    * Key we use to encrypt our messages for the other peer
459    * (initialized by us when we do the handshake).
460    */
461   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
462
463   /**
464    * Key we use to decrypt messages from the other peer
465    * (given to us by the other peer during the handshake).
466    */
467   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
468
469   /**
470    * ID of task used for re-trying plaintext scheduling.
471    */
472   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
473
474   /**
475    * ID of task used for re-trying SET_KEY and PING message.
476    */
477   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
478
479   /**
480    * ID of task used for updating bandwidth quota for this neighbour.
481    */
482   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
483
484   /**
485    * ID of task used for sending keep-alive pings.
486    */
487   GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
488
489   /**
490    * ID of task used for cleaning up dead neighbour entries.
491    */
492   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
493
494   /**
495    * At what time did we generate our encryption key?
496    */
497   struct GNUNET_TIME_Absolute encrypt_key_created;
498
499   /**
500    * At what time did the other peer generate the decryption key?
501    */
502   struct GNUNET_TIME_Absolute decrypt_key_created;
503
504   /**
505    * At what time did we initially establish (as in, complete session
506    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
507    */
508   struct GNUNET_TIME_Absolute time_established;
509
510   /**
511    * At what time did we last receive an encrypted message from the
512    * other peer?  Should be zero if status != KEY_CONFIRMED.
513    */
514   struct GNUNET_TIME_Absolute last_activity;
515
516   /**
517    * Last latency observed from this peer.
518    */
519   struct GNUNET_TIME_Relative last_latency;
520
521   /**
522    * At what frequency are we currently re-trying SET_KEY messages?
523    */
524   struct GNUNET_TIME_Relative set_key_retry_frequency;
525
526   /**
527    * Tracking bandwidth for sending to this peer.
528    */
529   struct GNUNET_BANDWIDTH_Tracker available_send_window;
530
531   /**
532    * Tracking bandwidth for receiving from this peer.
533    */
534   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
535
536   /**
537    * How valueable were the messages of this peer recently?
538    */
539   unsigned long long current_preference;
540
541   /**
542    * Bit map indicating which of the 32 sequence numbers before the last
543    * were received (good for accepting out-of-order packets and
544    * estimating reliability of the connection)
545    */
546   unsigned int last_packets_bitmap;
547
548   /**
549    * last sequence number received on this connection (highest)
550    */
551   uint32_t last_sequence_number_received;
552
553   /**
554    * last sequence number transmitted
555    */
556   uint32_t last_sequence_number_sent;
557
558   /**
559    * Available bandwidth in for this peer (current target).
560    */
561   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
562
563   /**
564    * Available bandwidth out for this peer (current target).
565    */
566   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
567
568   /**
569    * Internal bandwidth limit set for this peer (initially typically
570    * set to "-1").  Actual "bw_out" is MIN of
571    * "bpm_out_internal_limit" and "bw_out_external_limit".
572    */
573   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
574
575   /**
576    * External bandwidth limit set for this peer by the
577    * peer that we are communicating with.  "bw_out" is MIN of
578    * "bw_out_internal_limit" and "bw_out_external_limit".
579    */
580   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
581
582   /**
583    * What was our PING challenge number (for this peer)?
584    */
585   uint32_t ping_challenge;
586
587   /**
588    * What was the last distance to this peer as reported by the transports?
589    */
590   uint32_t last_distance;
591
592   /**
593    * What is our connection status?
594    */
595   enum PeerStateMachine status;
596
597   /**
598    * Are we currently connected to this neighbour?
599    */ 
600   int is_connected;
601
602 };
603
604
605 /**
606  * Data structure for each client connected to the core service.
607  */
608 struct Client
609 {
610   /**
611    * Clients are kept in a linked list.
612    */
613   struct Client *next;
614
615   /**
616    * Handle for the client with the server API.
617    */
618   struct GNUNET_SERVER_Client *client_handle;
619
620   /**
621    * Array of the types of messages this peer cares
622    * about (with "tcnt" entries).  Allocated as part
623    * of this client struct, do not free!
624    */
625   const uint16_t *types;
626
627   /**
628    * Options for messages this client cares about,
629    * see GNUNET_CORE_OPTION_ values.
630    */
631   uint32_t options;
632
633   /**
634    * Number of types of incoming messages this client
635    * specifically cares about.  Size of the "types" array.
636    */
637   unsigned int tcnt;
638
639 };
640
641
642 /**
643  * Our public key.
644  */
645 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
646
647 /**
648  * Our identity.
649  */
650 static struct GNUNET_PeerIdentity my_identity;
651
652 /**
653  * Our private key.
654  */
655 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
656
657 /**
658  * Our scheduler.
659  */
660 struct GNUNET_SCHEDULER_Handle *sched;
661
662 /**
663  * Our configuration.
664  */
665 const struct GNUNET_CONFIGURATION_Handle *cfg;
666
667 /**
668  * Our server.
669  */
670 static struct GNUNET_SERVER_Handle *server;
671
672 /**
673  * Transport service.
674  */
675 static struct GNUNET_TRANSPORT_Handle *transport;
676
677 /**
678  * Linked list of our clients.
679  */
680 static struct Client *clients;
681
682 /**
683  * Context for notifications we need to send to our clients.
684  */
685 static struct GNUNET_SERVER_NotificationContext *notifier;
686
687 /**
688  * We keep neighbours in a linked list (for now).
689  */
690 static struct Neighbour *neighbours;
691
692 /**
693  * Sum of all preferences among all neighbours.
694  */
695 static unsigned long long preference_sum;
696
697 /**
698  * Total number of neighbours we have.
699  */
700 static unsigned int neighbour_count;
701
702 /**
703  * How much inbound bandwidth are we supposed to be using per second?
704  * FIXME: this value is not used!
705  */
706 static unsigned long long bandwidth_target_in_bps;
707
708 /**
709  * How much outbound bandwidth are we supposed to be using per second?
710  */
711 static unsigned long long bandwidth_target_out_bps;
712
713
714
715 /**
716  * A preference value for a neighbour was update.  Update
717  * the preference sum accordingly.
718  *
719  * @param inc how much was a preference value increased?
720  */
721 static void
722 update_preference_sum (unsigned long long inc)
723 {
724   struct Neighbour *n;
725   unsigned long long os;
726
727   os = preference_sum;
728   preference_sum += inc;
729   if (preference_sum >= os)
730     return; /* done! */
731   /* overflow! compensate by cutting all values in half! */
732   preference_sum = 0;
733   n = neighbours;
734   while (n != NULL)
735     {
736       n->current_preference /= 2;
737       preference_sum += n->current_preference;
738       n = n->next;
739     }    
740 }
741
742
743 /**
744  * Find the entry for the given neighbour.
745  *
746  * @param peer identity of the neighbour
747  * @return NULL if we are not connected, otherwise the
748  *         neighbour's entry.
749  */
750 static struct Neighbour *
751 find_neighbour (const struct GNUNET_PeerIdentity *peer)
752 {
753   struct Neighbour *ret;
754
755   ret = neighbours;
756   while ((ret != NULL) &&
757          (0 != memcmp (&ret->peer,
758                        peer, sizeof (struct GNUNET_PeerIdentity))))
759     ret = ret->next;
760   return ret;
761 }
762
763
764 /**
765  * Send a message to one of our clients.
766  *
767  * @param client target for the message
768  * @param msg message to transmit
769  * @param can_drop could this message be dropped if the
770  *        client's queue is getting too large?
771  */
772 static void
773 send_to_client (struct Client *client,
774                 const struct GNUNET_MessageHeader *msg, 
775                 int can_drop)
776 {
777 #if DEBUG_CORE_CLIENT
778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779               "Preparing to send message of type %u to client.\n",
780               ntohs (msg->type));
781 #endif  
782   GNUNET_SERVER_notification_context_unicast (notifier,
783                                               client->client_handle,
784                                               msg,
785                                               can_drop);
786 }
787
788
789 /**
790  * Send a message to all of our current clients that have
791  * the right options set.
792  * 
793  * @param msg message to multicast
794  * @param can_drop can this message be discarded if the queue is too long
795  * @param options mask to use 
796  */
797 static void
798 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
799                      int can_drop,
800                      int options)
801 {
802   struct Client *c;
803
804   c = clients;
805   while (c != NULL)
806     {
807       if (0 != (c->options & options))
808         {
809 #if DEBUG_CORE_CLIENT
810           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                       "Sending message of type %u to client.\n",
812                       ntohs (msg->type));
813 #endif
814           send_to_client (c, msg, can_drop);
815         }
816       c = c->next;
817     }
818 }
819
820
821 /**
822  * Handle CORE_INIT request.
823  */
824 static void
825 handle_client_init (void *cls,
826                     struct GNUNET_SERVER_Client *client,
827                     const struct GNUNET_MessageHeader *message)
828 {
829   const struct InitMessage *im;
830   struct InitReplyMessage irm;
831   struct Client *c;
832   uint16_t msize;
833   const uint16_t *types;
834   uint16_t *wtypes;
835   struct Neighbour *n;
836   struct ConnectNotifyMessage cnm;
837   unsigned int i;
838
839 #if DEBUG_CORE_CLIENT
840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841               "Client connecting to core service with `%s' message\n",
842               "INIT");
843 #endif
844   /* check that we don't have an entry already */
845   c = clients;
846   while (c != NULL)
847     {
848       if (client == c->client_handle)
849         {
850           GNUNET_break (0);
851           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
852           return;
853         }
854       c = c->next;
855     }
856   msize = ntohs (message->size);
857   if (msize < sizeof (struct InitMessage))
858     {
859       GNUNET_break (0);
860       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
861       return;
862     }
863   GNUNET_SERVER_notification_context_add (notifier, client);
864   im = (const struct InitMessage *) message;
865   types = (const uint16_t *) &im[1];
866   msize -= sizeof (struct InitMessage);
867   c = GNUNET_malloc (sizeof (struct Client) + msize);
868   c->client_handle = client;
869   c->next = clients;
870   clients = c;
871   c->tcnt = msize / sizeof (uint16_t);
872   c->types = (const uint16_t *) &c[1];
873   wtypes = (uint16_t *) &c[1];
874   for (i=0;i<c->tcnt;i++)
875     wtypes[i] = ntohs (types[i]);
876   c->options = ntohl (im->options);
877 #if DEBUG_CORE_CLIENT
878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879               "Client %p is interested in %u message types\n",
880               c,
881               c->tcnt);
882 #endif
883   /* send init reply message */
884   irm.header.size = htons (sizeof (struct InitReplyMessage));
885   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
886   irm.reserved = htonl (0);
887   memcpy (&irm.publicKey,
888           &my_public_key,
889           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
890 #if DEBUG_CORE_CLIENT
891   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892               "Sending `%s' message to client.\n", "INIT_REPLY");
893 #endif
894   send_to_client (c, &irm.header, GNUNET_NO);
895   if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
896     {
897       /* notify new client about existing neighbours */
898       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
899       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
900       n = neighbours;
901       while (n != NULL)
902         {
903           if (n->status == PEER_STATE_KEY_CONFIRMED)
904             {
905 #if DEBUG_CORE_CLIENT
906               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                           "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
908 #endif
909               cnm.distance = htonl (n->last_distance);
910               cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
911               cnm.peer = n->peer;
912               send_to_client (c, &cnm.header, GNUNET_NO);
913             }
914           n = n->next;
915         }
916     }
917   GNUNET_SERVER_receive_done (client, GNUNET_OK);
918 }
919
920
921 /**
922  * A client disconnected, clean up.
923  *
924  * @param cls closure
925  * @param client identification of the client
926  */
927 static void
928 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
929 {
930   struct Client *pos;
931   struct Client *prev;
932
933   if (client == NULL)
934     return;
935 #if DEBUG_CORE_CLIENT
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "Client %p has disconnected from core service.\n",
938               client);
939 #endif
940   prev = NULL;
941   pos = clients;
942   while (pos != NULL)
943     {
944       if (client == pos->client_handle)
945         {
946           if (prev == NULL)
947             clients = pos->next;
948           else
949             prev->next = pos->next;
950           GNUNET_free (pos);
951           return;
952         }
953       prev = pos;
954       pos = pos->next;
955     }
956   /* client never sent INIT */
957 }
958
959
960 /**
961  * Handle REQUEST_INFO request.
962  */
963 static void
964 handle_client_request_info (void *cls,
965                             struct GNUNET_SERVER_Client *client,
966                             const struct GNUNET_MessageHeader *message)
967 {
968   const struct RequestInfoMessage *rcm;
969   struct Neighbour *n;
970   struct ConfigurationInfoMessage cim;
971   int32_t want_reserv;
972   int32_t got_reserv;
973   unsigned long long old_preference;
974   struct GNUNET_SERVER_TransmitContext *tc;
975
976 #if DEBUG_CORE_CLIENT
977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978               "Core service receives `%s' request.\n", "REQUEST_INFO");
979 #endif
980   rcm = (const struct RequestInfoMessage *) message;
981   n = find_neighbour (&rcm->peer);
982   memset (&cim, 0, sizeof (cim));
983   if (n != NULL) 
984     {
985       want_reserv = ntohl (rcm->reserve_inbound);
986       if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
987         {
988           n->bw_out_internal_limit = rcm->limit_outbound;
989           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
990                                                   n->bw_out_external_limit);
991           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
992                                                  n->bw_out);
993           GNUNET_TRANSPORT_set_quota (transport,
994                                       &n->peer,
995                                       n->bw_in,
996                                       n->bw_out,
997                                       GNUNET_TIME_UNIT_FOREVER_REL,
998                                       NULL, NULL); 
999         }
1000       if (want_reserv < 0)
1001         {
1002           got_reserv = want_reserv;
1003         }
1004       else if (want_reserv > 0)
1005         {
1006           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1007                                                   want_reserv).value == 0)
1008             got_reserv = want_reserv;
1009           else
1010             got_reserv = 0; /* all or nothing */
1011         }
1012       else
1013         got_reserv = 0;
1014       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
1015                                         got_reserv);
1016       old_preference = n->current_preference;
1017       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1018       if (old_preference > n->current_preference) 
1019         {
1020           /* overflow; cap at maximum value */
1021           n->current_preference = (unsigned long long) -1;
1022         }
1023       update_preference_sum (n->current_preference - old_preference);
1024 #if DEBUG_CORE_QUOTA
1025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
1027                   (int) want_reserv,
1028                   GNUNET_i2s (&rcm->peer),
1029                   (int) got_reserv);
1030 #endif
1031       cim.reserved_amount = htonl (got_reserv);
1032       cim.bw_in = n->bw_in;
1033       cim.bw_out = n->bw_out;
1034       cim.preference = n->current_preference;
1035     }
1036   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1037   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1038   cim.peer = rcm->peer;
1039
1040 #if DEBUG_CORE_CLIENT
1041   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1043 #endif
1044   tc = GNUNET_SERVER_transmit_context_create (client);
1045   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1046   GNUNET_SERVER_transmit_context_run (tc,
1047                                       GNUNET_TIME_UNIT_FOREVER_REL);
1048 }
1049
1050
1051 /**
1052  * Free the given entry for the neighbour (it has
1053  * already been removed from the list at this point).
1054  *
1055  * @param n neighbour to free
1056  */
1057 static void
1058 free_neighbour (struct Neighbour *n)
1059 {
1060   struct MessageEntry *m;
1061
1062   if (n->pitr != NULL)
1063     {
1064       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1065       n->pitr = NULL;
1066     }
1067   if (n->skm != NULL)
1068     {
1069       GNUNET_free (n->skm);
1070       n->skm = NULL;
1071     }
1072   while (NULL != (m = n->messages))
1073     {
1074       n->messages = m->next;
1075       GNUNET_free (m);
1076     }
1077   while (NULL != (m = n->encrypted_head))
1078     {
1079       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1080                                    n->encrypted_tail,
1081                                    m);
1082       GNUNET_free (m);
1083     }
1084   if (NULL != n->th)
1085     {
1086       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1087       n->th = NULL;
1088     }
1089   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1090     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1091   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1092     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1093   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1094     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1095   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1096     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1097   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1098     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
1099   GNUNET_free_non_null (n->public_key);
1100   GNUNET_free_non_null (n->pending_ping);
1101   GNUNET_free_non_null (n->pending_pong);
1102   GNUNET_free (n);
1103 }
1104
1105
1106 /**
1107  * Check if we have encrypted messages for the specified neighbour
1108  * pending, and if so, check with the transport about sending them
1109  * out.
1110  *
1111  * @param n neighbour to check.
1112  */
1113 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1114
1115
1116 /**
1117  * Encrypt size bytes from in and write the result to out.  Use the
1118  * key for outbound traffic of the given neighbour.
1119  *
1120  * @param n neighbour we are sending to
1121  * @param iv initialization vector to use
1122  * @param in ciphertext
1123  * @param out plaintext
1124  * @param size size of in/out
1125  * @return GNUNET_OK on success
1126  */
1127 static int
1128 do_encrypt (struct Neighbour *n,
1129             const GNUNET_HashCode * iv,
1130             const void *in, void *out, size_t size)
1131 {
1132   if (size != (uint16_t) size)
1133     {
1134       GNUNET_break (0);
1135       return GNUNET_NO;
1136     }
1137   GNUNET_assert (size ==
1138                  GNUNET_CRYPTO_aes_encrypt (in,
1139                                             (uint16_t) size,
1140                                             &n->encrypt_key,
1141                                             (const struct
1142                                              GNUNET_CRYPTO_AesInitializationVector
1143                                              *) iv, out));
1144 #if DEBUG_CORE
1145   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146               "Encrypted %u bytes for `%4s' using key %u\n", size,
1147               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1148 #endif
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Consider freeing the given neighbour since we may not need
1155  * to keep it around anymore.
1156  *
1157  * @param n neighbour to consider discarding
1158  */
1159 static void
1160 consider_free_neighbour (struct Neighbour *n);
1161
1162
1163 /**
1164  * Task triggered when a neighbour entry is about to time out 
1165  * (and we should prevent this by sending a PING).
1166  *
1167  * @param cls the 'struct Neighbour'
1168  * @param tc scheduler context (not used)
1169  */
1170 static void
1171 send_keep_alive (void *cls,
1172                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1173 {
1174   struct Neighbour *n = cls;
1175   struct GNUNET_TIME_Relative retry;
1176   struct GNUNET_TIME_Relative left;
1177   struct MessageEntry *me;
1178   struct PingMessage pp;
1179   struct PingMessage *pm;
1180
1181   n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
1182   /* send PING */
1183   me = GNUNET_malloc (sizeof (struct MessageEntry) +
1184                       sizeof (struct PingMessage));
1185   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
1186   me->priority = PING_PRIORITY;
1187   me->size = sizeof (struct PingMessage);
1188   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1189                                      n->encrypted_tail,
1190                                      n->encrypted_tail,
1191                                      me);
1192   pm = (struct PingMessage *) &me[1];
1193   pm->header.size = htons (sizeof (struct PingMessage));
1194   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
1195   pp.challenge = htonl (n->ping_challenge);
1196   pp.target = n->peer;
1197 #if DEBUG_CORE
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Encrypting `%s' and `%s' messages for `%4s'.\n",
1200               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
1201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
1203               "PING",
1204               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
1205 #endif
1206   do_encrypt (n,
1207               &n->peer.hashPubKey,
1208               &pp.challenge,
1209               &pm->challenge,
1210               sizeof (struct PingMessage) -
1211               sizeof (struct GNUNET_MessageHeader));
1212   process_encrypted_neighbour_queue (n);
1213   /* reschedule PING job */
1214   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1215                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1216   retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
1217                                     MIN_PING_FREQUENCY);
1218   n->keep_alive_task 
1219     = GNUNET_SCHEDULER_add_delayed (sched, 
1220                                     retry,
1221                                     &send_keep_alive,
1222                                     n);
1223
1224 }
1225
1226
1227 /**
1228  * Task triggered when a neighbour entry might have gotten stale.
1229  *
1230  * @param cls the 'struct Neighbour'
1231  * @param tc scheduler context (not used)
1232  */
1233 static void
1234 consider_free_task (void *cls,
1235                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1236 {
1237   struct Neighbour *n = cls;
1238
1239   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1240   consider_free_neighbour (n);
1241 }
1242
1243
1244 /**
1245  * Consider freeing the given neighbour since we may not need
1246  * to keep it around anymore.
1247  *
1248  * @param n neighbour to consider discarding
1249  */
1250 static void
1251 consider_free_neighbour (struct Neighbour *n)
1252
1253   struct Neighbour *pos;
1254   struct Neighbour *prev;
1255   struct GNUNET_TIME_Relative left;
1256
1257   if ( (n->th != NULL) ||
1258        (n->pitr != NULL) ||
1259        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1260        (GNUNET_YES == n->is_connected) )
1261     return; /* no chance */
1262   
1263   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1264                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1265   if (left.value > 0)
1266     {
1267       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1268         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1269       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1270                                                          left,
1271                                                          &consider_free_task,
1272                                                          n);
1273       return;
1274     }
1275   /* actually free the neighbour... */
1276   prev = NULL;
1277   pos = neighbours;
1278   while (pos != n)
1279     {
1280       prev = pos;
1281       pos = pos->next;
1282     }
1283   if (prev == NULL)
1284     neighbours = n->next;
1285   else
1286     prev->next = n->next;
1287   GNUNET_assert (neighbour_count > 0);
1288   neighbour_count--;
1289   free_neighbour (n);
1290 }
1291
1292
1293 /**
1294  * Function called when the transport service is ready to
1295  * receive an encrypted message for the respective peer
1296  *
1297  * @param cls neighbour to use message from
1298  * @param size number of bytes we can transmit
1299  * @param buf where to copy the message
1300  * @return number of bytes transmitted
1301  */
1302 static size_t
1303 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1304 {
1305   struct Neighbour *n = cls;
1306   struct MessageEntry *m;
1307   size_t ret;
1308   char *cbuf;
1309
1310   n->th = NULL;
1311   GNUNET_assert (NULL != (m = n->encrypted_head));
1312   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1313                                n->encrypted_tail,
1314                                m);
1315   ret = 0;
1316   cbuf = buf;
1317   if (buf != NULL)
1318     {
1319       GNUNET_assert (size >= m->size);
1320       memcpy (cbuf, &m[1], m->size);
1321       ret = m->size;
1322       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1323                                         m->size);
1324 #if DEBUG_CORE
1325       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1327                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1328                   ret, GNUNET_i2s (&n->peer));
1329 #endif
1330       process_encrypted_neighbour_queue (n);
1331     }
1332   else
1333     {
1334 #if DEBUG_CORE
1335       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336                   "Transmission of message of type %u and size %u failed\n",
1337                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1338                   m->size);
1339 #endif
1340     }
1341   GNUNET_free (m);
1342   consider_free_neighbour (n);
1343   return ret;
1344 }
1345
1346
1347 /**
1348  * Check if we have plaintext messages for the specified neighbour
1349  * pending, and if so, consider batching and encrypting them (and
1350  * then trigger processing of the encrypted queue if needed).
1351  *
1352  * @param n neighbour to check.
1353  */
1354 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1355
1356
1357 /**
1358  * Check if we have encrypted messages for the specified neighbour
1359  * pending, and if so, check with the transport about sending them
1360  * out.
1361  *
1362  * @param n neighbour to check.
1363  */
1364 static void
1365 process_encrypted_neighbour_queue (struct Neighbour *n)
1366 {
1367   struct MessageEntry *m;
1368  
1369   if (n->th != NULL)
1370     return;  /* request already pending */
1371   m = n->encrypted_head;
1372   if (m == NULL)
1373     {
1374       /* encrypted queue empty, try plaintext instead */
1375       process_plaintext_neighbour_queue (n);
1376       return;
1377     }
1378 #if DEBUG_CORE
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1381               m->size,
1382               GNUNET_i2s (&n->peer),
1383               GNUNET_TIME_absolute_get_remaining (m->deadline).
1384               value);
1385 #endif
1386   n->th =
1387     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1388                                             m->size,
1389                                             m->priority,
1390                                             GNUNET_TIME_absolute_get_remaining
1391                                             (m->deadline),
1392                                             &notify_encrypted_transmit_ready,
1393                                             n);
1394   if (n->th == NULL)
1395     {
1396       /* message request too large or duplicate request */
1397       GNUNET_break (0);
1398       /* discard encrypted message */
1399       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1400                                    n->encrypted_tail,
1401                                    m);
1402       GNUNET_free (m);
1403       process_encrypted_neighbour_queue (n);
1404     }
1405 }
1406
1407
1408 /**
1409  * Decrypt size bytes from in and write the result to out.  Use the
1410  * key for inbound traffic of the given neighbour.  This function does
1411  * NOT do any integrity-checks on the result.
1412  *
1413  * @param n neighbour we are receiving from
1414  * @param iv initialization vector to use
1415  * @param in ciphertext
1416  * @param out plaintext
1417  * @param size size of in/out
1418  * @return GNUNET_OK on success
1419  */
1420 static int
1421 do_decrypt (struct Neighbour *n,
1422             const GNUNET_HashCode * iv,
1423             const void *in, void *out, size_t size)
1424 {
1425   if (size != (uint16_t) size)
1426     {
1427       GNUNET_break (0);
1428       return GNUNET_NO;
1429     }
1430   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1431       (n->status != PEER_STATE_KEY_CONFIRMED))
1432     {
1433       GNUNET_break_op (0);
1434       return GNUNET_SYSERR;
1435     }
1436   if (size !=
1437       GNUNET_CRYPTO_aes_decrypt (in,
1438                                  (uint16_t) size,
1439                                  &n->decrypt_key,
1440                                  (const struct
1441                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1442                                  out))
1443     {
1444       GNUNET_break (0);
1445       return GNUNET_SYSERR;
1446     }
1447 #if DEBUG_CORE
1448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1449               "Decrypted %u bytes from `%4s' using key %u\n",
1450               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1451 #endif
1452   return GNUNET_OK;
1453 }
1454
1455
1456 /**
1457  * Select messages for transmission.  This heuristic uses a combination
1458  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1459  * and priority-based discard (in case no feasible schedule exist) and
1460  * speculative optimization (defer any kind of transmission until
1461  * we either create a batch of significant size, 25% of max, or until
1462  * we are close to a deadline).  Furthermore, when scheduling the
1463  * heuristic also packs as many messages into the batch as possible,
1464  * starting with those with the earliest deadline.  Yes, this is fun.
1465  *
1466  * @param n neighbour to select messages from
1467  * @param size number of bytes to select for transmission
1468  * @param retry_time set to the time when we should try again
1469  *        (only valid if this function returns zero)
1470  * @return number of bytes selected, or 0 if we decided to
1471  *         defer scheduling overall; in that case, retry_time is set.
1472  */
1473 static size_t
1474 select_messages (struct Neighbour *n,
1475                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1476 {
1477   struct MessageEntry *pos;
1478   struct MessageEntry *min;
1479   struct MessageEntry *last;
1480   unsigned int min_prio;
1481   struct GNUNET_TIME_Absolute t;
1482   struct GNUNET_TIME_Absolute now;
1483   struct GNUNET_TIME_Relative delta;
1484   uint64_t avail;
1485   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1486   size_t off;
1487   uint64_t tsize;
1488   unsigned int queue_size;
1489   int discard_low_prio;
1490
1491   GNUNET_assert (NULL != n->messages);
1492   now = GNUNET_TIME_absolute_get ();
1493   /* last entry in linked list of messages processed */
1494   last = NULL;
1495   /* should we remove the entry with the lowest
1496      priority from consideration for scheduling at the
1497      end of the loop? */
1498   queue_size = 0;
1499   tsize = 0;
1500   pos = n->messages;
1501   while (pos != NULL)
1502     {
1503       queue_size++;
1504       tsize += pos->size;
1505       pos = pos->next;
1506     }
1507   discard_low_prio = GNUNET_YES;
1508   while (GNUNET_YES == discard_low_prio)
1509     {
1510       min = NULL;
1511       min_prio = -1;
1512       discard_low_prio = GNUNET_NO;
1513       /* calculate number of bytes available for transmission at time "t" */
1514       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1515       t = now;
1516       /* how many bytes have we (hypothetically) scheduled so far */
1517       off = 0;
1518       /* maximum time we can wait before transmitting anything
1519          and still make all of our deadlines */
1520       slack = MAX_CORK_DELAY;
1521       pos = n->messages;
1522       /* note that we use "*2" here because we want to look
1523          a bit further into the future; much more makes no
1524          sense since new message might be scheduled in the
1525          meantime... */
1526       while ((pos != NULL) && (off < size * 2))
1527         {         
1528           if (pos->do_transmit == GNUNET_YES)
1529             {
1530               /* already removed from consideration */
1531               pos = pos->next;
1532               continue;
1533             }
1534           if (discard_low_prio == GNUNET_NO)
1535             {
1536               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1537               if (delta.value > 0)
1538                 {
1539                   // FIXME: HUH? Check!
1540                   t = pos->deadline;
1541                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1542                                                                        delta);
1543                 }
1544               if (avail < pos->size)
1545                 {
1546                   // FIXME: HUH? Check!
1547                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1548                 }
1549               else
1550                 {
1551                   avail -= pos->size;
1552                   /* update slack, considering both its absolute deadline
1553                      and relative deadlines caused by other messages
1554                      with their respective load */
1555                   slack = GNUNET_TIME_relative_min (slack,
1556                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1557                                                                                           avail));
1558                   if (pos->deadline.value <= now.value) 
1559                     {
1560                       /* now or never */
1561                       slack = GNUNET_TIME_UNIT_ZERO;
1562                     }
1563                   else if (GNUNET_YES == pos->got_slack)
1564                     {
1565                       /* should be soon now! */
1566                       slack = GNUNET_TIME_relative_min (slack,
1567                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1568                     }
1569                   else
1570                     {
1571                       slack =
1572                         GNUNET_TIME_relative_min (slack, 
1573                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1574                       pos->got_slack = GNUNET_YES;
1575                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1576                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1577                     }
1578                 }
1579             }
1580           off += pos->size;
1581           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1582           if (pos->priority <= min_prio)
1583             {
1584               /* update min for discard */
1585               min_prio = pos->priority;
1586               min = pos;
1587             }
1588           pos = pos->next;
1589         }
1590       if (discard_low_prio)
1591         {
1592           GNUNET_assert (min != NULL);
1593           /* remove lowest-priority entry from consideration */
1594           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1595         }
1596       last = pos;
1597     }
1598   /* guard against sending "tiny" messages with large headers without
1599      urgent deadlines */
1600   if ( (slack.value > 0) && 
1601        (size > 4 * off) &&
1602        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1603     {
1604       /* less than 25% of message would be filled with deadlines still
1605          being met if we delay by one second or more; so just wait for
1606          more data; but do not wait longer than 1s (since we don't want
1607          to delay messages for a really long time either). */
1608       *retry_time = MAX_CORK_DELAY;
1609       /* reset do_transmit values for next time */
1610       while (pos != last)
1611         {
1612           pos->do_transmit = GNUNET_NO;   
1613           pos = pos->next;
1614         }
1615 #if DEBUG_CORE
1616       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1618                   (unsigned long long) slack.value,
1619                   (unsigned int) off,
1620                   (unsigned int) size);
1621 #endif
1622       return 0;
1623     }
1624   /* select marked messages (up to size) for transmission */
1625   off = 0;
1626   pos = n->messages;
1627   while (pos != last)
1628     {
1629       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1630         {
1631           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1632           off += pos->size;
1633           size -= pos->size;
1634         }
1635       else
1636         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1637       pos = pos->next;
1638     }
1639 #if DEBUG_CORE
1640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1642               off, tsize,
1643               queue_size, MAX_PEER_QUEUE_SIZE,
1644               GNUNET_i2s (&n->peer));
1645 #endif
1646   return off;
1647 }
1648
1649
1650 /**
1651  * Batch multiple messages into a larger buffer.
1652  *
1653  * @param n neighbour to take messages from
1654  * @param buf target buffer
1655  * @param size size of buf
1656  * @param deadline set to transmission deadline for the result
1657  * @param retry_time set to the time when we should try again
1658  *        (only valid if this function returns zero)
1659  * @param priority set to the priority of the batch
1660  * @return number of bytes written to buf (can be zero)
1661  */
1662 static size_t
1663 batch_message (struct Neighbour *n,
1664                char *buf,
1665                size_t size,
1666                struct GNUNET_TIME_Absolute *deadline,
1667                struct GNUNET_TIME_Relative *retry_time,
1668                unsigned int *priority)
1669 {
1670   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1671   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1672   struct MessageEntry *pos;
1673   struct MessageEntry *prev;
1674   struct MessageEntry *next;
1675   size_t ret;
1676   
1677   ret = 0;
1678   *priority = 0;
1679   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1680   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1681   if (0 == select_messages (n, size, retry_time))
1682     {
1683 #if DEBUG_CORE
1684       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1685                   "No messages selected, will try again in %llu ms\n",
1686                   retry_time->value);
1687 #endif
1688       return 0;
1689     }
1690   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1691   ntm->distance = htonl (n->last_distance);
1692   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1693   ntm->peer = n->peer;
1694   
1695   pos = n->messages;
1696   prev = NULL;
1697   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1698     {
1699       next = pos->next;
1700       if (GNUNET_YES == pos->do_transmit)
1701         {
1702           GNUNET_assert (pos->size <= size);
1703           /* do notifications */
1704           /* FIXME: track if we have *any* client that wants
1705              full notifications and only do this if that is
1706              actually true */
1707           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1708             {
1709               memcpy (&ntm[1], &pos[1], pos->size);
1710               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1711                                         sizeof (struct GNUNET_MessageHeader));
1712               send_to_all_clients (&ntm->header,
1713                                    GNUNET_YES,
1714                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1715             }
1716           else
1717             {
1718               /* message too large for 'full' notifications, we do at
1719                  least the 'hdr' type */
1720               memcpy (&ntm[1],
1721                       &pos[1],
1722                       sizeof (struct GNUNET_MessageHeader));
1723             }
1724           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1725                                     pos->size);
1726           send_to_all_clients (&ntm->header,
1727                                GNUNET_YES,
1728                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1729 #if DEBUG_HANDSHAKE
1730           fprintf (stderr,
1731                    "Encrypting message of type %u\n",
1732                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1733 #endif
1734           /* copy for encrypted transmission */
1735           memcpy (&buf[ret], &pos[1], pos->size);
1736           ret += pos->size;
1737           size -= pos->size;
1738           *priority += pos->priority;
1739 #if DEBUG_CORE
1740           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1741                       "Adding plaintext message of size %u with deadline %llu ms to batch\n",
1742                       pos->size,
1743                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1744 #endif
1745           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1746           GNUNET_free (pos);
1747           if (prev == NULL)
1748             n->messages = next;
1749           else
1750             prev->next = next;
1751         }
1752       else
1753         {
1754           prev = pos;
1755         }
1756       pos = next;
1757     }
1758 #if DEBUG_CORE
1759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1760               "Deadline for message batch is %llu ms\n",
1761               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1762 #endif
1763   return ret;
1764 }
1765
1766
1767 /**
1768  * Remove messages with deadlines that have long expired from
1769  * the queue.
1770  *
1771  * @param n neighbour to inspect
1772  */
1773 static void
1774 discard_expired_messages (struct Neighbour *n)
1775 {
1776   struct MessageEntry *prev;
1777   struct MessageEntry *next;
1778   struct MessageEntry *pos;
1779   struct GNUNET_TIME_Absolute now;
1780   struct GNUNET_TIME_Relative delta;
1781
1782   now = GNUNET_TIME_absolute_get ();
1783   prev = NULL;
1784   pos = n->messages;
1785   while (pos != NULL) 
1786     {
1787       next = pos->next;
1788       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1789       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1790         {
1791 #if DEBUG_CORE
1792           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1793                       "Message is %llu ms past due, discarding.\n",
1794                       delta.value);
1795 #endif
1796           if (prev == NULL)
1797             n->messages = next;
1798           else
1799             prev->next = next;
1800           GNUNET_free (pos);
1801         }
1802       else
1803         prev = pos;
1804       pos = next;
1805     }
1806 }
1807
1808
1809 /**
1810  * Signature of the main function of a task.
1811  *
1812  * @param cls closure
1813  * @param tc context information (why was this task triggered now)
1814  */
1815 static void
1816 retry_plaintext_processing (void *cls,
1817                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1818 {
1819   struct Neighbour *n = cls;
1820
1821   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1822   process_plaintext_neighbour_queue (n);
1823 }
1824
1825
1826 /**
1827  * Send our key (and encrypted PING) to the other peer.
1828  *
1829  * @param n the other peer
1830  */
1831 static void send_key (struct Neighbour *n);
1832
1833 /**
1834  * Task that will retry "send_key" if our previous attempt failed
1835  * to yield a PONG.
1836  */
1837 static void
1838 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1839 {
1840   struct Neighbour *n = cls;
1841
1842 #if DEBUG_CORE
1843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844               "Retrying key transmission to `%4s'\n",
1845               GNUNET_i2s (&n->peer));
1846 #endif
1847   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1848   n->set_key_retry_frequency =
1849     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1850   send_key (n);
1851 }
1852
1853
1854 /**
1855  * Check if we have plaintext messages for the specified neighbour
1856  * pending, and if so, consider batching and encrypting them (and
1857  * then trigger processing of the encrypted queue if needed).
1858  *
1859  * @param n neighbour to check.
1860  */
1861 static void
1862 process_plaintext_neighbour_queue (struct Neighbour *n)
1863 {
1864   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1865   size_t used;
1866   size_t esize;
1867   struct EncryptedMessage *em;  /* encrypted message */
1868   struct EncryptedMessage *ph;  /* plaintext header */
1869   struct MessageEntry *me;
1870   unsigned int priority;
1871   struct GNUNET_TIME_Absolute deadline;
1872   struct GNUNET_TIME_Relative retry_time;
1873   GNUNET_HashCode iv;
1874
1875   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1876     {
1877       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1878       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1879     }
1880   switch (n->status)
1881     {
1882     case PEER_STATE_DOWN:
1883       send_key (n);
1884 #if DEBUG_CORE
1885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1886                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1887                   GNUNET_i2s(&n->peer));
1888 #endif
1889       return;
1890     case PEER_STATE_KEY_SENT:
1891       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1892         n->retry_set_key_task
1893           = GNUNET_SCHEDULER_add_delayed (sched,
1894                                           n->set_key_retry_frequency,
1895                                           &set_key_retry_task, n);    
1896 #if DEBUG_CORE
1897       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1898                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1899                   GNUNET_i2s(&n->peer));
1900 #endif
1901       return;
1902     case PEER_STATE_KEY_RECEIVED:
1903       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1904         n->retry_set_key_task
1905           = GNUNET_SCHEDULER_add_delayed (sched,
1906                                           n->set_key_retry_frequency,
1907                                           &set_key_retry_task, n);        
1908 #if DEBUG_CORE
1909       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1910                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1911                   GNUNET_i2s(&n->peer));
1912 #endif
1913       return;
1914     case PEER_STATE_KEY_CONFIRMED:
1915       /* ready to continue */
1916       break;
1917     }
1918   discard_expired_messages (n);
1919   if (n->messages == NULL)
1920     {
1921 #if DEBUG_CORE
1922       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923                   "Plaintext message queue for `%4s' is empty.\n",
1924                   GNUNET_i2s(&n->peer));
1925 #endif
1926       return;                   /* no pending messages */
1927     }
1928   if (n->encrypted_head != NULL)
1929     {
1930 #if DEBUG_CORE
1931       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1932                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1933                   GNUNET_i2s(&n->peer));
1934 #endif
1935       return;                   /* wait for messages already encrypted to be
1936                                    processed first! */
1937     }
1938   ph = (struct EncryptedMessage *) pbuf;
1939   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1940   priority = 0;
1941   used = sizeof (struct EncryptedMessage);
1942   used += batch_message (n,
1943                          &pbuf[used],
1944                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1945                          &deadline, &retry_time, &priority);
1946   if (used == sizeof (struct EncryptedMessage))
1947     {
1948 #if DEBUG_CORE
1949       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1950                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1951                   GNUNET_i2s(&n->peer));
1952 #endif
1953       /* no messages selected for sending, try again later... */
1954       n->retry_plaintext_task =
1955         GNUNET_SCHEDULER_add_delayed (sched,
1956                                       retry_time,
1957                                       &retry_plaintext_processing, n);
1958       return;
1959     }
1960 #if DEBUG_CORE_QUOTA
1961   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1962               "Sending %u b/s as new limit to peer `%4s'\n",
1963               (unsigned int) ntohl (n->bw_in.value__),
1964               GNUNET_i2s (&n->peer));
1965 #endif
1966   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1967   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1968   ph->inbound_bw_limit = n->bw_in;
1969   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1970
1971   /* setup encryption message header */
1972   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1973   me->deadline = deadline;
1974   me->priority = priority;
1975   me->size = used;
1976   em = (struct EncryptedMessage *) &me[1];
1977   em->header.size = htons (used);
1978   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1979   em->iv_seed = ph->iv_seed;
1980   esize = used - ENCRYPTED_HEADER_SIZE;
1981   GNUNET_CRYPTO_hash (&ph->sequence_number,
1982                       esize - sizeof (GNUNET_HashCode), 
1983                       &ph->plaintext_hash);
1984   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
1985   /* encrypt */
1986 #if DEBUG_CORE
1987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1988               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1989               esize,
1990               GNUNET_i2s(&n->peer),
1991               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1992 #endif
1993   GNUNET_assert (GNUNET_OK ==
1994                  do_encrypt (n,
1995                              &iv,
1996                              &ph->plaintext_hash,
1997                              &em->plaintext_hash, esize));
1998   /* append to transmission list */
1999   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2000                                      n->encrypted_tail,
2001                                      n->encrypted_tail,
2002                                      me);
2003   process_encrypted_neighbour_queue (n);
2004 }
2005
2006
2007 /**
2008  * Function that recalculates the bandwidth quota for the
2009  * given neighbour and transmits it to the transport service.
2010  * 
2011  * @param cls neighbour for the quota update
2012  * @param tc context
2013  */
2014 static void
2015 neighbour_quota_update (void *cls,
2016                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2017
2018
2019 /**
2020  * Schedule the task that will recalculate the bandwidth
2021  * quota for this peer (and possibly force a disconnect of
2022  * idle peers by calculating a bandwidth of zero).
2023  */
2024 static void
2025 schedule_quota_update (struct Neighbour *n)
2026 {
2027   GNUNET_assert (n->quota_update_task ==
2028                  GNUNET_SCHEDULER_NO_TASK);
2029   n->quota_update_task
2030     = GNUNET_SCHEDULER_add_delayed (sched,
2031                                     QUOTA_UPDATE_FREQUENCY,
2032                                     &neighbour_quota_update,
2033                                     n);
2034 }
2035
2036
2037 /**
2038  * Initialize a new 'struct Neighbour'.
2039  *
2040  * @param pid ID of the new neighbour
2041  * @return handle for the new neighbour
2042  */
2043 static struct Neighbour *
2044 create_neighbour (const struct GNUNET_PeerIdentity *pid)
2045 {
2046   struct Neighbour *n;
2047   struct GNUNET_TIME_Absolute now;
2048
2049   n = GNUNET_malloc (sizeof (struct Neighbour));
2050   n->next = neighbours;
2051   neighbours = n;
2052   neighbour_count++;
2053   n->peer = *pid;
2054   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2055   now = GNUNET_TIME_absolute_get ();
2056   n->encrypt_key_created = now;
2057   n->last_activity = now;
2058   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2059   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2060   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2061   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
2062   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2063   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2064                                                 (uint32_t) - 1);
2065   neighbour_quota_update (n, NULL);
2066   return n;
2067 }
2068
2069
2070 /**
2071  * Handle CORE_SEND request.
2072  *
2073  * @param cls unused
2074  * @param client the client issuing the request
2075  * @param message the "struct SendMessage"
2076  */
2077 static void
2078 handle_client_send (void *cls,
2079                     struct GNUNET_SERVER_Client *client,
2080                     const struct GNUNET_MessageHeader *message)
2081 {
2082   const struct SendMessage *sm;
2083   struct Neighbour *n;
2084   struct MessageEntry *prev;
2085   struct MessageEntry *pos;
2086   struct MessageEntry *e; 
2087   struct MessageEntry *min_prio_entry;
2088   struct MessageEntry *min_prio_prev;
2089   unsigned int min_prio;
2090   unsigned int queue_size;
2091   uint16_t msize;
2092
2093   msize = ntohs (message->size);
2094   if (msize <
2095       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
2096     {
2097       GNUNET_break (0);
2098       if (client != NULL)
2099         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2100       return;
2101     }
2102   sm = (const struct SendMessage *) message;
2103   msize -= sizeof (struct SendMessage);
2104   n = find_neighbour (&sm->peer);
2105   if (n == NULL)
2106     n = create_neighbour (&sm->peer);
2107 #if DEBUG_CORE
2108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2109               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2110               "SEND",
2111               msize, 
2112               GNUNET_i2s (&sm->peer));
2113 #endif
2114   /* bound queue size */
2115   discard_expired_messages (n);
2116   min_prio = (unsigned int) -1;
2117   min_prio_entry = NULL;
2118   min_prio_prev = NULL;
2119   queue_size = 0;
2120   prev = NULL;
2121   pos = n->messages;
2122   while (pos != NULL) 
2123     {
2124       if (pos->priority < min_prio)
2125         {
2126           min_prio_entry = pos;
2127           min_prio_prev = prev;
2128           min_prio = pos->priority;
2129         }
2130       queue_size++;
2131       prev = pos;
2132       pos = pos->next;
2133     }
2134   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2135     {
2136       /* queue full */
2137       if (ntohl(sm->priority) <= min_prio)
2138         {
2139           /* discard new entry */
2140 #if DEBUG_CORE
2141           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2142                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2143                       queue_size,
2144                       MAX_PEER_QUEUE_SIZE,
2145                       msize,
2146                       ntohs (message->type));
2147 #endif
2148           if (client != NULL)
2149             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2150           return;
2151         }
2152       /* discard "min_prio_entry" */
2153 #if DEBUG_CORE
2154       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2155                   "Queue full, discarding existing older request\n");
2156 #endif
2157       if (min_prio_prev == NULL)
2158         n->messages = min_prio_entry->next;
2159       else
2160         min_prio_prev->next = min_prio_entry->next;      
2161       GNUNET_free (min_prio_entry);     
2162     }
2163
2164 #if DEBUG_CORE
2165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2166               "Adding transmission request for `%4s' of size %u to queue\n",
2167               GNUNET_i2s (&sm->peer),
2168               msize);
2169 #endif  
2170   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2171   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2172   e->priority = ntohl (sm->priority);
2173   e->size = msize;
2174   memcpy (&e[1], &sm[1], msize);
2175
2176   /* insert, keep list sorted by deadline */
2177   prev = NULL;
2178   pos = n->messages;
2179   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2180     {
2181       prev = pos;
2182       pos = pos->next;
2183     }
2184   if (prev == NULL)
2185     n->messages = e;
2186   else
2187     prev->next = e;
2188   e->next = pos;
2189
2190   /* consider scheduling now */
2191   process_plaintext_neighbour_queue (n);
2192   if (client != NULL)
2193     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2194 }
2195
2196
2197 /**
2198  * Function called when the transport service is ready to
2199  * receive a message.  Only resets 'n->th' to NULL.
2200  *
2201  * @param cls neighbour to use message from
2202  * @param size number of bytes we can transmit
2203  * @param buf where to copy the message
2204  * @return number of bytes transmitted
2205  */
2206 static size_t
2207 notify_transport_connect_done (void *cls, size_t size, void *buf)
2208 {
2209   struct Neighbour *n = cls;
2210
2211   n->th = NULL;
2212   if (buf == NULL)
2213     {
2214       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2215                   _("Failed to connect to `%4s': transport failed to connect\n"),
2216                   GNUNET_i2s (&n->peer));
2217       return 0;
2218     }
2219   send_key (n);
2220   return 0;
2221 }
2222
2223
2224 /**
2225  * Handle CORE_REQUEST_CONNECT request.
2226  *
2227  * @param cls unused
2228  * @param client the client issuing the request
2229  * @param message the "struct ConnectMessage"
2230  */
2231 static void
2232 handle_client_request_connect (void *cls,
2233                                struct GNUNET_SERVER_Client *client,
2234                                const struct GNUNET_MessageHeader *message)
2235 {
2236   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2237   struct Neighbour *n;
2238   struct GNUNET_TIME_Relative timeout;
2239
2240   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2241     {
2242       GNUNET_break (0);
2243       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2244       return;
2245     }
2246   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2247   n = find_neighbour (&cm->peer);
2248   if (n == NULL)
2249     n = create_neighbour (&cm->peer);
2250   if ( (n->is_connected) ||
2251        (n->th != NULL) )
2252     return; /* already connected, or at least trying */
2253 #if DEBUG_CORE
2254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2255               "Core received `%s' request for `%4s', will try to establish connection\n",
2256               "REQUEST_CONNECT",
2257               GNUNET_i2s (&cm->peer));
2258 #endif
2259   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2260   /* ask transport to connect to the peer */
2261   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2262                                                   &cm->peer,
2263                                                   sizeof (struct GNUNET_MessageHeader), 0,
2264                                                   timeout,
2265                                                   &notify_transport_connect_done,
2266                                                   n);
2267   GNUNET_break (NULL != n->th);
2268 }
2269
2270
2271 /**
2272  * List of handlers for the messages understood by this
2273  * service.
2274  */
2275 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2276   {&handle_client_init, NULL,
2277    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2278   {&handle_client_request_info, NULL,
2279    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2280    sizeof (struct RequestInfoMessage)},
2281   {&handle_client_send, NULL,
2282    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2283   {&handle_client_request_connect, NULL,
2284    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2285    sizeof (struct ConnectMessage)},
2286   {NULL, NULL, 0, 0}
2287 };
2288
2289
2290 /**
2291  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2292  * the neighbour's struct and retry send_key.  Or, if we did not get a
2293  * HELLO, just do nothing.
2294  *
2295  * @param cls the 'struct Neighbour' to retry sending the key for
2296  * @param peer the peer for which this is the HELLO
2297  * @param hello HELLO message of that peer
2298  * @param trust amount of trust we currently have in that peer
2299  */
2300 static void
2301 process_hello_retry_send_key (void *cls,
2302                               const struct GNUNET_PeerIdentity *peer,
2303                               const struct GNUNET_HELLO_Message *hello,
2304                               uint32_t trust)
2305 {
2306   struct Neighbour *n = cls;
2307
2308   if (peer == NULL)
2309     {
2310 #if DEBUG_CORE
2311       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2312                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2313 #endif
2314       n->pitr = NULL;
2315       if (n->public_key != NULL)
2316         {
2317           send_key (n);
2318         }
2319       else
2320         {
2321           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2322             n->retry_set_key_task
2323               = GNUNET_SCHEDULER_add_delayed (sched,
2324                                               n->set_key_retry_frequency,
2325                                               &set_key_retry_task, n);
2326         }
2327       return;
2328     }
2329
2330 #if DEBUG_CORE
2331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2332               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2333               GNUNET_i2s (peer));
2334 #endif
2335   if (n->public_key != NULL)
2336     {
2337 #if DEBUG_CORE
2338       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2339               "already have public key for peer %s!! (so why are we here?)\n",
2340               GNUNET_i2s (peer));
2341 #endif
2342       return;
2343     }
2344
2345 #if DEBUG_CORE
2346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347               "Received new `%s' message for `%4s', initiating key exchange.\n",
2348               "HELLO",
2349               GNUNET_i2s (peer));
2350 #endif
2351   n->public_key =
2352     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2353   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2354     {
2355       GNUNET_free (n->public_key);
2356       n->public_key = NULL;
2357 #if DEBUG_CORE
2358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2359               "GNUNET_HELLO_get_key returned awfully\n");
2360 #endif
2361       return;
2362     }
2363 }
2364
2365
2366 /**
2367  * Send our key (and encrypted PING) to the other peer.
2368  *
2369  * @param n the other peer
2370  */
2371 static void
2372 send_key (struct Neighbour *n)
2373 {
2374   struct SetKeyMessage *sm;
2375   struct MessageEntry *me;
2376   struct PingMessage pp;
2377   struct PingMessage *pm;
2378
2379   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2380        (n->pitr != NULL) )
2381     {
2382 #if DEBUG_CORE
2383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2384                   "Key exchange in progress with `%4s'.\n",
2385                   GNUNET_i2s (&n->peer));
2386 #endif
2387       return; /* already in progress */
2388     }
2389
2390 #if DEBUG_CORE
2391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2392               "Asked to perform key exchange with `%4s'.\n",
2393               GNUNET_i2s (&n->peer));
2394 #endif
2395   if (n->public_key == NULL)
2396     {
2397       /* lookup n's public key, then try again */
2398 #if DEBUG_CORE
2399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2401                   GNUNET_i2s (&n->peer));
2402 #endif
2403       GNUNET_assert (n->pitr == NULL);
2404       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2405                                          sched,
2406                                          &n->peer,
2407                                          0,
2408                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2409                                          &process_hello_retry_send_key, n);
2410       return;
2411     }
2412   /* first, set key message */
2413   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2414                       sizeof (struct SetKeyMessage));
2415   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2416   me->priority = SET_KEY_PRIORITY;
2417   me->size = sizeof (struct SetKeyMessage);
2418   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2419                                      n->encrypted_tail,
2420                                      n->encrypted_tail,
2421                                      me);
2422   sm = (struct SetKeyMessage *) &me[1];
2423   sm->header.size = htons (sizeof (struct SetKeyMessage));
2424   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2425   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2426                                         PEER_STATE_KEY_SENT : n->status));
2427   sm->purpose.size =
2428     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2429            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2430            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2431            sizeof (struct GNUNET_PeerIdentity));
2432   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2433   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2434   sm->target = n->peer;
2435   GNUNET_assert (GNUNET_OK ==
2436                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2437                                             sizeof (struct
2438                                                     GNUNET_CRYPTO_AesSessionKey),
2439                                             n->public_key,
2440                                             &sm->encrypted_key));
2441   GNUNET_assert (GNUNET_OK ==
2442                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2443                                          &sm->signature));
2444
2445   /* second, encrypted PING message */
2446   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2447                       sizeof (struct PingMessage));
2448   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2449   me->priority = PING_PRIORITY;
2450   me->size = sizeof (struct PingMessage);
2451   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2452                                      n->encrypted_tail,
2453                                      n->encrypted_tail,
2454                                      me);
2455   pm = (struct PingMessage *) &me[1];
2456   pm->header.size = htons (sizeof (struct PingMessage));
2457   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2458   pp.challenge = htonl (n->ping_challenge);
2459   pp.target = n->peer;
2460 #if DEBUG_CORE
2461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2462               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2463               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2465               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2466               "PING",
2467               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2468 #endif
2469   do_encrypt (n,
2470               &n->peer.hashPubKey,
2471               &pp.challenge,
2472               &pm->challenge,
2473               sizeof (struct PingMessage) -
2474               sizeof (struct GNUNET_MessageHeader));
2475   /* update status */
2476   switch (n->status)
2477     {
2478     case PEER_STATE_DOWN:
2479       n->status = PEER_STATE_KEY_SENT;
2480       break;
2481     case PEER_STATE_KEY_SENT:
2482       break;
2483     case PEER_STATE_KEY_RECEIVED:
2484       break;
2485     case PEER_STATE_KEY_CONFIRMED:
2486       break;
2487     default:
2488       GNUNET_break (0);
2489       break;
2490     }
2491 #if DEBUG_CORE
2492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2493               "Have %llu ms left for `%s' transmission.\n",
2494               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2495               "SET_KEY");
2496 #endif
2497   /* trigger queue processing */
2498   process_encrypted_neighbour_queue (n);
2499   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2500        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2501     n->retry_set_key_task
2502       = GNUNET_SCHEDULER_add_delayed (sched,
2503                                       n->set_key_retry_frequency,
2504                                       &set_key_retry_task, n);    
2505 }
2506
2507
2508 /**
2509  * We received a SET_KEY message.  Validate and update
2510  * our key material and status.
2511  *
2512  * @param n the neighbour from which we received message m
2513  * @param m the set key message we received
2514  */
2515 static void
2516 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2517
2518
2519 /**
2520  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2521  * the neighbour's struct and retry handling the set_key message.  Or,
2522  * if we did not get a HELLO, just free the set key message.
2523  *
2524  * @param cls pointer to the set key message
2525  * @param peer the peer for which this is the HELLO
2526  * @param hello HELLO message of that peer
2527  * @param trust amount of trust we currently have in that peer
2528  */
2529 static void
2530 process_hello_retry_handle_set_key (void *cls,
2531                                     const struct GNUNET_PeerIdentity *peer,
2532                                     const struct GNUNET_HELLO_Message *hello,
2533                                     uint32_t trust)
2534 {
2535   struct Neighbour *n = cls;
2536   struct SetKeyMessage *sm = n->skm;
2537
2538   if (peer == NULL)
2539     {
2540       GNUNET_free (sm);
2541       n->skm = NULL;
2542       n->pitr = NULL;
2543       return;
2544     }
2545   if (n->public_key != NULL)
2546     return;                     /* multiple HELLOs match!? */
2547   n->public_key =
2548     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2549   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2550     {
2551       GNUNET_break_op (0);
2552       GNUNET_free (n->public_key);
2553       n->public_key = NULL;
2554       return;
2555     }
2556 #if DEBUG_CORE
2557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2558               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2559               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2560 #endif
2561   handle_set_key (n, sm);
2562 }
2563
2564
2565 /**
2566  * We received a PING message.  Validate and transmit
2567  * PONG.
2568  *
2569  * @param n sender of the PING
2570  * @param m the encrypted PING message itself
2571  */
2572 static void
2573 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2574 {
2575   struct PingMessage t;
2576   struct PongMessage tx;
2577   struct PongMessage *tp;
2578   struct MessageEntry *me;
2579
2580 #if DEBUG_CORE
2581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2582               "Core service receives `%s' request from `%4s'.\n",
2583               "PING", GNUNET_i2s (&n->peer));
2584 #endif
2585   if (GNUNET_OK !=
2586       do_decrypt (n,
2587                   &my_identity.hashPubKey,
2588                   &m->challenge,
2589                   &t.challenge,
2590                   sizeof (struct PingMessage) -
2591                   sizeof (struct GNUNET_MessageHeader)))
2592     return;
2593 #if DEBUG_CORE
2594   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2595               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2596               "PING",
2597               GNUNET_i2s (&t.target),
2598               ntohl (t.challenge), n->decrypt_key.crc32);
2599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2600               "Target of `%s' request is `%4s'.\n",
2601               "PING", GNUNET_i2s (&t.target));
2602 #endif
2603   if (0 != memcmp (&t.target,
2604                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2605     {
2606       GNUNET_break_op (0);
2607       return;
2608     }
2609   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2610                       sizeof (struct PongMessage));
2611   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2612                                      n->encrypted_tail,
2613                                      n->encrypted_tail,
2614                                      me);
2615   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2616   me->priority = PONG_PRIORITY;
2617   me->size = sizeof (struct PongMessage);
2618   tx.reserved = htonl (0);
2619   tx.inbound_bw_limit = n->bw_in;
2620   tx.challenge = t.challenge;
2621   tx.target = t.target;
2622   tp = (struct PongMessage *) &me[1];
2623   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2624   tp->header.size = htons (sizeof (struct PongMessage));
2625   do_encrypt (n,
2626               &my_identity.hashPubKey,
2627               &tx.challenge,
2628               &tp->challenge,
2629               sizeof (struct PongMessage) -
2630               sizeof (struct GNUNET_MessageHeader));
2631 #if DEBUG_CORE
2632   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2633               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2634               ntohl (t.challenge), n->encrypt_key.crc32);
2635 #endif
2636   /* trigger queue processing */
2637   process_encrypted_neighbour_queue (n);
2638 }
2639
2640
2641 /**
2642  * We received a PONG message.  Validate and update our status.
2643  *
2644  * @param n sender of the PONG
2645  * @param m the encrypted PONG message itself
2646  */
2647 static void
2648 handle_pong (struct Neighbour *n, 
2649              const struct PongMessage *m)
2650 {
2651   struct PongMessage t;
2652   struct ConnectNotifyMessage cnm;
2653
2654 #if DEBUG_CORE
2655   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2656               "Core service receives `%s' request from `%4s'.\n",
2657               "PONG", GNUNET_i2s (&n->peer));
2658 #endif
2659   if (GNUNET_OK !=
2660       do_decrypt (n,
2661                   &n->peer.hashPubKey,
2662                   &m->challenge,
2663                   &t.challenge,
2664                   sizeof (struct PongMessage) -
2665                   sizeof (struct GNUNET_MessageHeader)))
2666     return;
2667   if (0 != ntohl (t.reserved))
2668     {
2669       GNUNET_break_op (0);
2670       return;
2671     }
2672 #if DEBUG_CORE
2673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2674               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2675               "PONG",
2676               GNUNET_i2s (&t.target),
2677               ntohl (t.challenge), n->decrypt_key.crc32);
2678 #endif
2679   if ((0 != memcmp (&t.target,
2680                     &n->peer,
2681                     sizeof (struct GNUNET_PeerIdentity))) ||
2682       (n->ping_challenge != ntohl (t.challenge)))
2683     {
2684       /* PONG malformed */
2685 #if DEBUG_CORE
2686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2687                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2688                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2689       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2690                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2691                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2692 #endif
2693       GNUNET_break_op (0);
2694       return;
2695     }
2696   switch (n->status)
2697     {
2698     case PEER_STATE_DOWN:
2699       GNUNET_break (0);         /* should be impossible */
2700       return;
2701     case PEER_STATE_KEY_SENT:
2702       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2703       return;
2704     case PEER_STATE_KEY_RECEIVED:
2705       n->status = PEER_STATE_KEY_CONFIRMED;
2706       if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
2707         {
2708           n->bw_out_external_limit = t.inbound_bw_limit;
2709           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2710                                                   n->bw_out_internal_limit);
2711           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2712                                                  n->bw_out);       
2713           GNUNET_TRANSPORT_set_quota (transport,
2714                                       &n->peer,
2715                                       n->bw_in,
2716                                       n->bw_out,
2717                                       GNUNET_TIME_UNIT_FOREVER_REL,
2718                                       NULL, NULL); 
2719         }
2720 #if DEBUG_CORE
2721       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2722                   "Confirmed key via `%s' message for peer `%4s'\n",
2723                   "PONG", GNUNET_i2s (&n->peer));
2724 #endif
2725       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2726         {
2727           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2728           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2729         }      
2730       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2731       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2732       cnm.distance = htonl (n->last_distance);
2733       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2734       cnm.peer = n->peer;
2735       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2736       process_encrypted_neighbour_queue (n);
2737       /* fall-through! */
2738     case PEER_STATE_KEY_CONFIRMED:
2739       n->last_activity = GNUNET_TIME_absolute_get ();
2740       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
2741         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
2742       n->keep_alive_task 
2743         = GNUNET_SCHEDULER_add_delayed (sched, 
2744                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
2745                                         &send_keep_alive,
2746                                         n);
2747       break;
2748     default:
2749       GNUNET_break (0);
2750       break;
2751     }
2752 }
2753
2754
2755 /**
2756  * We received a SET_KEY message.  Validate and update
2757  * our key material and status.
2758  *
2759  * @param n the neighbour from which we received message m
2760  * @param m the set key message we received
2761  */
2762 static void
2763 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2764 {
2765   struct SetKeyMessage *m_cpy;
2766   struct GNUNET_TIME_Absolute t;
2767   struct GNUNET_CRYPTO_AesSessionKey k;
2768   struct PingMessage *ping;
2769   struct PongMessage *pong;
2770   enum PeerStateMachine sender_status;
2771
2772 #if DEBUG_CORE
2773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2774               "Core service receives `%s' request from `%4s'.\n",
2775               "SET_KEY", GNUNET_i2s (&n->peer));
2776 #endif
2777   if (n->public_key == NULL)
2778     {
2779       if (n->pitr != NULL)
2780         {
2781 #if DEBUG_CORE
2782           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2783                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2784                       "SET_KEY");
2785 #endif
2786           return;
2787         }
2788 #if DEBUG_CORE
2789       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2790                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2791 #endif
2792       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2793       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2794       /* lookup n's public key, then try again */
2795       GNUNET_assert (n->skm == NULL);
2796       n->skm = m_cpy;
2797       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2798                                          sched,
2799                                          &n->peer,
2800                                          0,
2801                                          GNUNET_TIME_UNIT_MINUTES,
2802                                          &process_hello_retry_handle_set_key, n);
2803       return;
2804     }
2805   if (0 != memcmp (&m->target,
2806                    &my_identity,
2807                    sizeof (struct GNUNET_PeerIdentity)))
2808     {
2809       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2810                   _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
2811                   "SET_KEY",
2812                   GNUNET_i2s (&m->target));
2813       return;
2814     }
2815   if ((ntohl (m->purpose.size) !=
2816        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2817        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2818        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2819        sizeof (struct GNUNET_PeerIdentity)) ||
2820       (GNUNET_OK !=
2821        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2822                                  &m->purpose, &m->signature, n->public_key)))
2823     {
2824       /* invalid signature */
2825       GNUNET_break_op (0);
2826       return;
2827     }
2828   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2829   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2830        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2831       (t.value < n->decrypt_key_created.value))
2832     {
2833       /* this could rarely happen due to massive re-ordering of
2834          messages on the network level, but is most likely either
2835          a bug or some adversary messing with us.  Report. */
2836       GNUNET_break_op (0);
2837       return;
2838     }
2839 #if DEBUG_CORE
2840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2841               "Decrypting key material.\n");
2842 #endif  
2843   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2844                                   &m->encrypted_key,
2845                                   &k,
2846                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2847        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2848       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2849     {
2850       /* failed to decrypt !? */
2851       GNUNET_break_op (0);
2852       return;
2853     }
2854
2855   n->decrypt_key = k;
2856   if (n->decrypt_key_created.value != t.value)
2857     {
2858       /* fresh key, reset sequence numbers */
2859       n->last_sequence_number_received = 0;
2860       n->last_packets_bitmap = 0;
2861       n->decrypt_key_created = t;
2862     }
2863   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2864   switch (n->status)
2865     {
2866     case PEER_STATE_DOWN:
2867       n->status = PEER_STATE_KEY_RECEIVED;
2868 #if DEBUG_CORE
2869       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2870                   "Responding to `%s' with my own key.\n", "SET_KEY");
2871 #endif
2872       send_key (n);
2873       break;
2874     case PEER_STATE_KEY_SENT:
2875     case PEER_STATE_KEY_RECEIVED:
2876       n->status = PEER_STATE_KEY_RECEIVED;
2877       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2878           (sender_status != PEER_STATE_KEY_CONFIRMED))
2879         {
2880 #if DEBUG_CORE
2881           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2882                       "Responding to `%s' with my own key (other peer has status %u).\n",
2883                       "SET_KEY", sender_status);
2884 #endif
2885           send_key (n);
2886         }
2887       break;
2888     case PEER_STATE_KEY_CONFIRMED:
2889       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2890           (sender_status != PEER_STATE_KEY_CONFIRMED))
2891         {         
2892 #if DEBUG_CORE
2893           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2894                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2895                       "SET_KEY", sender_status);
2896 #endif
2897           send_key (n);
2898         }
2899       break;
2900     default:
2901       GNUNET_break (0);
2902       break;
2903     }
2904   if (n->pending_ping != NULL)
2905     {
2906       ping = n->pending_ping;
2907       n->pending_ping = NULL;
2908       handle_ping (n, ping);
2909       GNUNET_free (ping);
2910     }
2911   if (n->pending_pong != NULL)
2912     {
2913       pong = n->pending_pong;
2914       n->pending_pong = NULL;
2915       handle_pong (n, pong);
2916       GNUNET_free (pong);
2917     }
2918 }
2919
2920
2921 /**
2922  * Send a P2P message to a client.
2923  *
2924  * @param sender who sent us the message?
2925  * @param client who should we give the message to?
2926  * @param m contains the message to transmit
2927  * @param msize number of bytes in buf to transmit
2928  */
2929 static void
2930 send_p2p_message_to_client (struct Neighbour *sender,
2931                             struct Client *client,
2932                             const void *m, size_t msize)
2933 {
2934   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2935   struct NotifyTrafficMessage *ntm;
2936
2937 #if DEBUG_CORE
2938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2939               "Core service passes message from `%4s' of type %u to client.\n",
2940               GNUNET_i2s(&sender->peer),
2941               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2942 #endif
2943   ntm = (struct NotifyTrafficMessage *) buf;
2944   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2945   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2946   ntm->distance = htonl (sender->last_distance);
2947   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2948   ntm->peer = sender->peer;
2949   memcpy (&ntm[1], m, msize);
2950   send_to_client (client, &ntm->header, GNUNET_YES);
2951 }
2952
2953
2954 /**
2955  * Deliver P2P message to interested clients.
2956  *
2957  * @param sender who sent us the message?
2958  * @param m the message
2959  * @param msize size of the message (including header)
2960  */
2961 static void
2962 deliver_message (struct Neighbour *sender,
2963                  const struct GNUNET_MessageHeader *m, size_t msize)
2964 {
2965   struct Client *cpos;
2966   uint16_t type;
2967   unsigned int tpos;
2968   int deliver_full;
2969   int dropped;
2970
2971   type = ntohs (m->type);
2972 #if DEBUG_CORE
2973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2974               "Received encapsulated message of type %u from `%4s'\n",
2975               type,
2976               GNUNET_i2s (&sender->peer));
2977 #endif
2978   dropped = GNUNET_YES;
2979   cpos = clients;
2980   while (cpos != NULL)
2981     {
2982       deliver_full = GNUNET_NO;
2983       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2984         deliver_full = GNUNET_YES;
2985       else
2986         {
2987           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2988             {
2989               if (type != cpos->types[tpos])
2990                 continue;
2991               deliver_full = GNUNET_YES;
2992               break;
2993             }
2994         }
2995       if (GNUNET_YES == deliver_full)
2996         {
2997           send_p2p_message_to_client (sender, cpos, m, msize);
2998           dropped = GNUNET_NO;
2999         }
3000       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
3001         {
3002           send_p2p_message_to_client (sender, cpos, m,
3003                                       sizeof (struct GNUNET_MessageHeader));
3004         }
3005       cpos = cpos->next;
3006     }
3007   if (dropped == GNUNET_YES)
3008     {
3009 #if DEBUG_CORE
3010       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3011                   "Message of type %u from `%4s' not delivered to any client.\n",
3012                   type,
3013                   GNUNET_i2s (&sender->peer));
3014 #endif
3015       /* FIXME: stats... */
3016     }
3017 }
3018
3019
3020 /**
3021  * Align P2P message and then deliver to interested clients.
3022  *
3023  * @param sender who sent us the message?
3024  * @param buffer unaligned (!) buffer containing message
3025  * @param msize size of the message (including header)
3026  */
3027 static void
3028 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
3029 {
3030   char abuf[msize];
3031
3032   /* TODO: call to statistics? */
3033   memcpy (abuf, buffer, msize);
3034   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
3035 }
3036
3037
3038 /**
3039  * Deliver P2P messages to interested clients.
3040  *
3041  * @param sender who sent us the message?
3042  * @param buffer buffer containing messages, can be modified
3043  * @param buffer_size size of the buffer (overall)
3044  * @param offset offset where messages in the buffer start
3045  */
3046 static void
3047 deliver_messages (struct Neighbour *sender,
3048                   const char *buffer, size_t buffer_size, size_t offset)
3049 {
3050   struct GNUNET_MessageHeader *mhp;
3051   struct GNUNET_MessageHeader mh;
3052   uint16_t msize;
3053   int need_align;
3054
3055   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
3056     {
3057       if (0 != offset % sizeof (uint16_t))
3058         {
3059           /* outch, need to copy to access header */
3060           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
3061           mhp = &mh;
3062         }
3063       else
3064         {
3065           /* can access header directly */
3066           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
3067         }
3068       msize = ntohs (mhp->size);
3069       if (msize + offset > buffer_size)
3070         {
3071           /* malformed message, header says it is larger than what
3072              would fit into the overall buffer */
3073           GNUNET_break_op (0);
3074           break;
3075         }
3076 #if HAVE_UNALIGNED_64_ACCESS
3077       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
3078 #else
3079       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
3080 #endif
3081       if (GNUNET_YES == need_align)
3082         align_and_deliver (sender, &buffer[offset], msize);
3083       else
3084         deliver_message (sender,
3085                          (const struct GNUNET_MessageHeader *)
3086                          &buffer[offset], msize);
3087       offset += msize;
3088     }
3089 }
3090
3091
3092 /**
3093  * We received an encrypted message.  Decrypt, validate and
3094  * pass on to the appropriate clients.
3095  */
3096 static void
3097 handle_encrypted_message (struct Neighbour *n,
3098                           const struct EncryptedMessage *m)
3099 {
3100   size_t size = ntohs (m->header.size);
3101   char buf[size];
3102   struct EncryptedMessage *pt;  /* plaintext */
3103   GNUNET_HashCode ph;
3104   size_t off;
3105   uint32_t snum;
3106   struct GNUNET_TIME_Absolute t;
3107   GNUNET_HashCode iv;
3108
3109 #if DEBUG_CORE
3110   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3111               "Core service receives `%s' request from `%4s'.\n",
3112               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
3113 #endif  
3114   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
3115   /* decrypt */
3116   if (GNUNET_OK !=
3117       do_decrypt (n,
3118                   &iv,
3119                   &m->plaintext_hash,
3120                   &buf[ENCRYPTED_HEADER_SIZE], 
3121                   size - ENCRYPTED_HEADER_SIZE))
3122     return;
3123   pt = (struct EncryptedMessage *) buf;
3124
3125   /* validate hash */
3126   GNUNET_CRYPTO_hash (&pt->sequence_number,
3127                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3128   if (0 != memcmp (&ph, 
3129                    &pt->plaintext_hash, 
3130                    sizeof (GNUNET_HashCode)))
3131     {
3132       /* checksum failed */
3133       GNUNET_break_op (0);
3134       return;
3135     }
3136
3137   /* validate sequence number */
3138   snum = ntohl (pt->sequence_number);
3139   if (n->last_sequence_number_received == snum)
3140     {
3141       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3142                   "Received duplicate message, ignoring.\n");
3143       /* duplicate, ignore */
3144       return;
3145     }
3146   if ((n->last_sequence_number_received > snum) &&
3147       (n->last_sequence_number_received - snum > 32))
3148     {
3149       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3150                   "Received ancient out of sequence message, ignoring.\n");
3151       /* ancient out of sequence, ignore */
3152       return;
3153     }
3154   if (n->last_sequence_number_received > snum)
3155     {
3156       unsigned int rotbit =
3157         1 << (n->last_sequence_number_received - snum - 1);
3158       if ((n->last_packets_bitmap & rotbit) != 0)
3159         {
3160           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3161                       "Received duplicate message, ignoring.\n");
3162           /* duplicate, ignore */
3163           return;
3164         }
3165       n->last_packets_bitmap |= rotbit;
3166     }
3167   if (n->last_sequence_number_received < snum)
3168     {
3169       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3170       n->last_sequence_number_received = snum;
3171     }
3172
3173   /* check timestamp */
3174   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3175   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3176     {
3177       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3178                   _
3179                   ("Message received far too old (%llu ms). Content ignored.\n"),
3180                   GNUNET_TIME_absolute_get_duration (t).value);
3181       return;
3182     }
3183
3184   /* process decrypted message(s) */
3185   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3186     {
3187 #if DEBUG_CORE_SET_QUOTA
3188       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3189                   "Received %u b/s as new inbound limit for peer `%4s'\n",
3190                   (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3191                   GNUNET_i2s (&n->peer));
3192 #endif
3193       n->bw_out_external_limit = pt->inbound_bw_limit;
3194       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3195                                               n->bw_out_internal_limit);
3196       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3197                                              n->bw_out);
3198       GNUNET_TRANSPORT_set_quota (transport,
3199                                   &n->peer,
3200                                   n->bw_in,
3201                                   n->bw_out,
3202                                   GNUNET_TIME_UNIT_FOREVER_REL,
3203                                   NULL, NULL); 
3204     }
3205   n->last_activity = GNUNET_TIME_absolute_get ();
3206   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3207     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3208   n->keep_alive_task 
3209     = GNUNET_SCHEDULER_add_delayed (sched, 
3210                                     GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3211                                     &send_keep_alive,
3212                                     n);
3213   off = sizeof (struct EncryptedMessage);
3214   deliver_messages (n, buf, size, off);
3215 }
3216
3217
3218 /**
3219  * Function called by the transport for each received message.
3220  *
3221  * @param cls closure
3222  * @param peer (claimed) identity of the other peer
3223  * @param message the message
3224  * @param latency estimated latency for communicating with the
3225  *             given peer (round-trip)
3226  * @param distance in overlay hops, as given by transport plugin
3227  */
3228 static void
3229 handle_transport_receive (void *cls,
3230                           const struct GNUNET_PeerIdentity *peer,
3231                           const struct GNUNET_MessageHeader *message,
3232                           struct GNUNET_TIME_Relative latency,
3233                           unsigned int distance)
3234 {
3235   struct Neighbour *n;
3236   struct GNUNET_TIME_Absolute now;
3237   int up;
3238   uint16_t type;
3239   uint16_t size;
3240
3241 #if DEBUG_CORE
3242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3243               "Received message of type %u from `%4s', demultiplexing.\n",
3244               ntohs (message->type), GNUNET_i2s (peer));
3245 #endif
3246   n = find_neighbour (peer);
3247   if (n == NULL)
3248     n = create_neighbour (peer);
3249   if (n == NULL)
3250     return;   
3251   n->last_latency = latency;
3252   n->last_distance = distance;
3253   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3254   type = ntohs (message->type);
3255   size = ntohs (message->size);
3256 #if DEBUG_HANDSHAKE
3257   fprintf (stderr,
3258            "Received message of type %u from `%4s'\n",
3259            type,
3260            GNUNET_i2s (peer));
3261 #endif
3262   switch (type)
3263     {
3264     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3265       if (size != sizeof (struct SetKeyMessage))
3266         {
3267           GNUNET_break_op (0);
3268           return;
3269         }
3270       handle_set_key (n, (const struct SetKeyMessage *) message);
3271       break;
3272     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3273       if (size < sizeof (struct EncryptedMessage) +
3274           sizeof (struct GNUNET_MessageHeader))
3275         {
3276           GNUNET_break_op (0);
3277           return;
3278         }
3279       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3280           (n->status != PEER_STATE_KEY_CONFIRMED))
3281         {
3282           GNUNET_break_op (0);
3283           return;
3284         }
3285       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3286       break;
3287     case GNUNET_MESSAGE_TYPE_CORE_PING:
3288       if (size != sizeof (struct PingMessage))
3289         {
3290           GNUNET_break_op (0);
3291           return;
3292         }
3293       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3294           (n->status != PEER_STATE_KEY_CONFIRMED))
3295         {
3296 #if DEBUG_CORE
3297           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3298                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3299                       "PING", GNUNET_i2s (&n->peer));
3300 #endif
3301           GNUNET_free_non_null (n->pending_ping);
3302           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3303           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3304           return;
3305         }
3306       handle_ping (n, (const struct PingMessage *) message);
3307       break;
3308     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3309       if (size != sizeof (struct PongMessage))
3310         {
3311           GNUNET_break_op (0);
3312           return;
3313         }
3314       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3315            (n->status != PEER_STATE_KEY_CONFIRMED) )
3316         {
3317 #if DEBUG_CORE
3318           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3319                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3320                       "PONG", GNUNET_i2s (&n->peer));
3321 #endif
3322           GNUNET_free_non_null (n->pending_pong);
3323           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3324           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3325           return;
3326         }
3327       handle_pong (n, (const struct PongMessage *) message);
3328       break;
3329     default:
3330       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3331                   _("Unsupported message of type %u received.\n"), type);
3332       return;
3333     }
3334   if (n->status == PEER_STATE_KEY_CONFIRMED)
3335     {
3336       now = GNUNET_TIME_absolute_get ();
3337       n->last_activity = now;
3338       if (!up)
3339         n->time_established = now;
3340       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3341         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3342       n->keep_alive_task 
3343         = GNUNET_SCHEDULER_add_delayed (sched, 
3344                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3345                                         &send_keep_alive,
3346                                         n);
3347     }
3348 }
3349
3350
3351 /**
3352  * Function that recalculates the bandwidth quota for the
3353  * given neighbour and transmits it to the transport service.
3354  * 
3355  * @param cls neighbour for the quota update
3356  * @param tc context
3357  */
3358 static void
3359 neighbour_quota_update (void *cls,
3360                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3361 {
3362   struct Neighbour *n = cls;
3363   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3364   double pref_rel;
3365   double share;
3366   unsigned long long distributable;
3367   uint64_t need_per_peer;
3368   uint64_t need_per_second;
3369   
3370   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3371   /* calculate relative preference among all neighbours;
3372      divides by a bit more to avoid division by zero AND to
3373      account for possibility of new neighbours joining any time 
3374      AND to convert to double... */
3375   if (preference_sum == 0)
3376     {
3377       pref_rel = 1.0 / (double) neighbour_count;
3378     }
3379   else
3380     {
3381       pref_rel = n->current_preference / preference_sum;
3382     }
3383   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3384                                                               GNUNET_TIME_UNIT_SECONDS);  
3385   need_per_second = need_per_peer * neighbour_count;
3386   distributable = 0;
3387   if (bandwidth_target_out_bps > need_per_second)
3388     distributable = bandwidth_target_out_bps - need_per_second;
3389   share = distributable * pref_rel;
3390   if (share + need_per_peer > ( (uint32_t)-1))
3391     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3392   else
3393     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3394   /* check if we want to disconnect for good due to inactivity */
3395   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3396        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3397     {
3398 #if DEBUG_CORE
3399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3400                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3401                   GNUNET_i2s (&n->peer));
3402 #endif
3403       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3404     }
3405 #if DEBUG_CORE_QUOTA
3406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3407               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3408               GNUNET_i2s (&n->peer),
3409               (unsigned int) ntohl (q_in.value__),
3410               bandwidth_target_out_bps,
3411               (unsigned int) ntohl (n->bw_in.value__),
3412               (unsigned int) ntohl (n->bw_out.value__),
3413               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3414 #endif
3415   if (n->bw_in.value__ != q_in.value__) 
3416     {
3417       n->bw_in = q_in;
3418       GNUNET_TRANSPORT_set_quota (transport,
3419                                   &n->peer,
3420                                   n->bw_in,
3421                                   n->bw_out,
3422                                   GNUNET_TIME_UNIT_FOREVER_REL,
3423                                   NULL, NULL);
3424     }
3425   schedule_quota_update (n);
3426 }
3427
3428
3429 /**
3430  * Function called by transport to notify us that
3431  * a peer connected to us (on the network level).
3432  *
3433  * @param cls closure
3434  * @param peer the peer that connected
3435  * @param latency current latency of the connection
3436  * @param distance in overlay hops, as given by transport plugin
3437  */
3438 static void
3439 handle_transport_notify_connect (void *cls,
3440                                  const struct GNUNET_PeerIdentity *peer,
3441                                  struct GNUNET_TIME_Relative latency,
3442                                  unsigned int distance)
3443 {
3444   struct Neighbour *n;
3445   struct ConnectNotifyMessage cnm;
3446
3447   n = find_neighbour (peer);
3448   if (n != NULL)
3449     {
3450       if (n->is_connected)
3451         {
3452           /* duplicate connect notification!? */
3453           GNUNET_break (0);
3454           return;
3455         }
3456     }
3457   else
3458     {
3459       n = create_neighbour (peer);
3460     }
3461   n->is_connected = GNUNET_YES;      
3462   n->last_latency = latency;
3463   n->last_distance = distance;
3464   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3465                                  n->bw_out,
3466                                  MAX_WINDOW_TIME_S);
3467   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3468                                  n->bw_in,
3469                                  MAX_WINDOW_TIME_S);  
3470 #if DEBUG_CORE
3471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3472               "Received connection from `%4s'.\n",
3473               GNUNET_i2s (&n->peer));
3474 #endif
3475   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3476   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3477   cnm.distance = htonl (n->last_distance);
3478   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3479   cnm.peer = *peer;
3480   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3481   GNUNET_TRANSPORT_set_quota (transport,
3482                               &n->peer,
3483                               n->bw_in,
3484                               n->bw_out,
3485                               GNUNET_TIME_UNIT_FOREVER_REL,
3486                               NULL, NULL);
3487   send_key (n); 
3488 }
3489
3490
3491 /**
3492  * Function called by transport telling us that a peer
3493  * disconnected.
3494  *
3495  * @param cls closure
3496  * @param peer the peer that disconnected
3497  */
3498 static void
3499 handle_transport_notify_disconnect (void *cls,
3500                                     const struct GNUNET_PeerIdentity *peer)
3501 {
3502   struct DisconnectNotifyMessage cnm;
3503   struct Neighbour *n;
3504
3505 #if DEBUG_CORE
3506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3507               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3508 #endif
3509   n = find_neighbour (peer);
3510   if (n == NULL)
3511     {
3512       GNUNET_break (0);
3513       return;
3514     }
3515   GNUNET_break (n->is_connected);
3516   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3517   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3518   cnm.peer = *peer;
3519   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3520   n->is_connected = GNUNET_NO;
3521 }
3522
3523
3524 /**
3525  * Last task run during shutdown.  Disconnects us from
3526  * the transport.
3527  */
3528 static void
3529 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3530 {
3531   struct Neighbour *n;
3532   struct Client *c;
3533
3534 #if DEBUG_CORE
3535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3536               "Core service shutting down.\n");
3537 #endif
3538   GNUNET_assert (transport != NULL);
3539   GNUNET_TRANSPORT_disconnect (transport);
3540   transport = NULL;
3541   while (NULL != (n = neighbours))
3542     {
3543       neighbours = n->next;
3544       GNUNET_assert (neighbour_count > 0);
3545       neighbour_count--;
3546       free_neighbour (n);
3547     }
3548   GNUNET_SERVER_notification_context_destroy (notifier);
3549   notifier = NULL;
3550   while (NULL != (c = clients))
3551     handle_client_disconnect (NULL, c->client_handle);
3552   if (my_private_key != NULL)
3553     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3554 }
3555
3556
3557 /**
3558  * Initiate core service.
3559  *
3560  * @param cls closure
3561  * @param s scheduler to use
3562  * @param serv the initialized server
3563  * @param c configuration to use
3564  */
3565 static void
3566 run (void *cls,
3567      struct GNUNET_SCHEDULER_Handle *s,
3568      struct GNUNET_SERVER_Handle *serv,
3569      const struct GNUNET_CONFIGURATION_Handle *c)
3570 {
3571   char *keyfile;
3572
3573   sched = s;
3574   cfg = c;  
3575   /* parse configuration */
3576   if (
3577        (GNUNET_OK !=
3578         GNUNET_CONFIGURATION_get_value_number (c,
3579                                                "CORE",
3580                                                "TOTAL_QUOTA_IN",
3581                                                &bandwidth_target_in_bps)) ||
3582        (GNUNET_OK !=
3583         GNUNET_CONFIGURATION_get_value_number (c,
3584                                                "CORE",
3585                                                "TOTAL_QUOTA_OUT",
3586                                                &bandwidth_target_out_bps)) ||
3587        (GNUNET_OK !=
3588         GNUNET_CONFIGURATION_get_value_filename (c,
3589                                                  "GNUNETD",
3590                                                  "HOSTKEY", &keyfile)))
3591     {
3592       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3593                   _
3594                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3595       GNUNET_SCHEDULER_shutdown (s);
3596       return;
3597     }
3598   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3599   GNUNET_free (keyfile);
3600   if (my_private_key == NULL)
3601     {
3602       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3603                   _("Core service could not access hostkey.  Exiting.\n"));
3604       GNUNET_SCHEDULER_shutdown (s);
3605       return;
3606     }
3607   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3608   GNUNET_CRYPTO_hash (&my_public_key,
3609                       sizeof (my_public_key), &my_identity.hashPubKey);
3610   /* setup notification */
3611   server = serv;
3612   notifier = GNUNET_SERVER_notification_context_create (server, 
3613                                                         MAX_NOTIFY_QUEUE);
3614   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3615   /* setup transport connection */
3616   transport = GNUNET_TRANSPORT_connect (sched,
3617                                         cfg,
3618                                         NULL,
3619                                         &handle_transport_receive,
3620                                         &handle_transport_notify_connect,
3621                                         &handle_transport_notify_disconnect);
3622   GNUNET_assert (NULL != transport);
3623   GNUNET_SCHEDULER_add_delayed (sched,
3624                                 GNUNET_TIME_UNIT_FOREVER_REL,
3625                                 &cleaning_task, NULL);
3626   /* process client requests */
3627   GNUNET_SERVER_add_handlers (server, handlers);
3628   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3629               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3630 }
3631
3632
3633
3634 /**
3635  * The main function for the transport service.
3636  *
3637  * @param argc number of arguments from the command line
3638  * @param argv command line arguments
3639  * @return 0 ok, 1 on error
3640  */
3641 int
3642 main (int argc, char *const *argv)
3643 {
3644   return (GNUNET_OK ==
3645           GNUNET_SERVICE_run (argc,
3646                               argv,
3647                               "core",
3648                               GNUNET_SERVICE_OPTION_NONE,
3649                               &run, NULL)) ? 0 : 1;
3650 }
3651
3652 /* end of gnunet-service-core.c */