fixing corking
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 #define DEBUG_CORE_QUOTA GNUNET_YES
47
48 /**
49  * Receive and send buffer windows grow over time.  For
50  * how long can 'unused' bandwidth accumulate before we
51  * need to cap it?  (specified in seconds).
52  */
53 #define MAX_WINDOW_TIME_S (5 * 60)
54
55 /**
56  * How many messages do we queue up at most for optional
57  * notifications to a client?  (this can cause notifications
58  * about outgoing messages to be dropped).
59  */
60 #define MAX_NOTIFY_QUEUE 16
61
62 /**
63  * Minimum bandwidth (out) to assign to any connected peer.
64  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
65  * sense.
66  */
67 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * How long do we delay messages to get larger packet sizes (CORKing)?
80  */
81 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
82
83 /**
84  * What is the maximum delay for a SET_KEY message?
85  */
86 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
87
88 /**
89  * What how long do we wait for SET_KEY confirmation initially?
90  */
91 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
92
93 /**
94  * What is the maximum delay for a PING message?
95  */
96 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
97
98 /**
99  * What is the maximum delay for a PONG message?
100  */
101 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
102
103 /**
104  * How often do we recalculate bandwidth quotas?
105  */
106 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
107
108 /**
109  * What is the priority for a SET_KEY message?
110  */
111 #define SET_KEY_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PING message?
115  */
116 #define PING_PRIORITY 0xFFFFFF
117
118 /**
119  * What is the priority for a PONG message?
120  */
121 #define PONG_PRIORITY 0xFFFFFF
122
123 /**
124  * How many messages do we queue per peer at most?  Must be at
125  * least two.
126  */
127 #define MAX_PEER_QUEUE_SIZE 16
128
129 /**
130  * How many non-mandatory messages do we queue per client at most?
131  */
132 #define MAX_CLIENT_QUEUE_SIZE 32
133
134 /**
135  * What is the maximum age of a message for us to consider
136  * processing it?  Note that this looks at the timestamp used
137  * by the other peer, so clock skew between machines does
138  * come into play here.  So this should be picked high enough
139  * so that a little bit of clock skew does not prevent peers
140  * from connecting to us.
141  */
142 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
143
144 /**
145  * What is the maximum size for encrypted messages?  Note that this
146  * number imposes a clear limit on the maximum size of any message.
147  * Set to a value close to 64k but not so close that transports will
148  * have trouble with their headers.
149  */
150 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
151
152
153 /**
154  * State machine for our P2P encryption handshake.  Everyone starts in
155  * "DOWN", if we receive the other peer's key (other peer initiated)
156  * we start in state RECEIVED (since we will immediately send our
157  * own); otherwise we start in SENT.  If we get back a PONG from
158  * within either state, we move up to CONFIRMED (the PONG will always
159  * be sent back encrypted with the key we sent to the other peer).
160  */
161 enum PeerStateMachine
162 {
163   PEER_STATE_DOWN,
164   PEER_STATE_KEY_SENT,
165   PEER_STATE_KEY_RECEIVED,
166   PEER_STATE_KEY_CONFIRMED
167 };
168
169
170 /**
171  * Number of bytes (at the beginning) of "struct EncryptedMessage"
172  * that are NOT encrypted.
173  */
174 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
175
176
177 /**
178  * Encapsulation for encrypted messages exchanged between
179  * peers.  Followed by the actual encrypted data.
180  */
181 struct EncryptedMessage
182 {
183   /**
184    * Message type is either CORE_ENCRYPTED_MESSAGE.
185    */
186   struct GNUNET_MessageHeader header;
187
188   /**
189    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
190    * be set to the offset of the *next* field.
191    */
192   uint32_t iv_seed GNUNET_PACKED;
193
194   /**
195    * Hash of the plaintext (starting at 'sequence_number'), used to
196    * verify message integrity.  Everything after this hash (including
197    * this hash itself) will be encrypted.  
198    */
199   GNUNET_HashCode plaintext_hash;
200
201   /**
202    * Sequence number, in network byte order.  This field
203    * must be the first encrypted/decrypted field and the
204    * first byte that is hashed for the plaintext hash.
205    */
206   uint32_t sequence_number GNUNET_PACKED;
207
208   /**
209    * Desired bandwidth (how much we should send to this peer / how
210    * much is the sender willing to receive)?
211    */
212   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
213
214   /**
215    * Timestamp.  Used to prevent reply of ancient messages
216    * (recent messages are caught with the sequence number).
217    */
218   struct GNUNET_TIME_AbsoluteNBO timestamp;
219
220 };
221
222
223 /**
224  * We're sending an (encrypted) PING to the other peer to check if he
225  * can decrypt.  The other peer should respond with a PONG with the
226  * same content, except this time encrypted with the receiver's key.
227  */
228 struct PingMessage
229 {
230   /**
231    * Message type is CORE_PING.
232    */
233   struct GNUNET_MessageHeader header;
234
235   /**
236    * Random number chosen to make reply harder.
237    */
238   uint32_t challenge GNUNET_PACKED;
239
240   /**
241    * Intended target of the PING, used primarily to check
242    * that decryption actually worked.
243    */
244   struct GNUNET_PeerIdentity target;
245 };
246
247
248
249 /**
250  * Response to a PING.  Includes data from the original PING
251  * plus initial bandwidth quota information.
252  */
253 struct PongMessage
254 {
255   /**
256    * Message type is CORE_PONG.
257    */
258   struct GNUNET_MessageHeader header;
259
260   /**
261    * Random number proochosen to make reply harder.  Must be
262    * first field after header (this is where we start to encrypt!).
263    */
264   uint32_t challenge GNUNET_PACKED;
265
266   /**
267    * Must be zero.
268    */
269   uint32_t reserved GNUNET_PACKED;
270
271   /**
272    * Desired bandwidth (how much we should send to this
273    * peer / how much is the sender willing to receive).
274    */
275   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
276
277   /**
278    * Intended target of the PING, used primarily to check
279    * that decryption actually worked.
280    */
281   struct GNUNET_PeerIdentity target;
282 };
283
284
285 /**
286  * Message transmitted to set (or update) a session key.
287  */
288 struct SetKeyMessage
289 {
290
291   /**
292    * Message type is either CORE_SET_KEY.
293    */
294   struct GNUNET_MessageHeader header;
295
296   /**
297    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
298    */
299   int32_t sender_status GNUNET_PACKED;
300
301   /**
302    * Purpose of the signature, will be
303    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
304    */
305   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
306
307   /**
308    * At what time was this key created?
309    */
310   struct GNUNET_TIME_AbsoluteNBO creation_time;
311
312   /**
313    * The encrypted session key.
314    */
315   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
316
317   /**
318    * Who is the intended recipient?
319    */
320   struct GNUNET_PeerIdentity target;
321
322   /**
323    * Signature of the stuff above (starting at purpose).
324    */
325   struct GNUNET_CRYPTO_RsaSignature signature;
326
327 };
328
329
330 /**
331  * Message waiting for transmission. This struct
332  * is followed by the actual content of the message.
333  */
334 struct MessageEntry
335 {
336
337   /**
338    * We keep messages in a doubly linked list.
339    */
340   struct MessageEntry *next;
341
342   /**
343    * We keep messages in a doubly linked list.
344    */
345   struct MessageEntry *prev;
346
347   /**
348    * By when are we supposed to transmit this message?
349    */
350   struct GNUNET_TIME_Absolute deadline;
351
352   /**
353    * By when are we supposed to transmit this message (after
354    * giving slack)?
355    */
356   struct GNUNET_TIME_Absolute slack_deadline;
357
358   /**
359    * How important is this message to us?
360    */
361   unsigned int priority;
362
363   /**
364    * How long is the message? (number of bytes following
365    * the "struct MessageEntry", but not including the
366    * size of "struct MessageEntry" itself!)
367    */
368   uint16_t size;
369
370   /**
371    * Was this message selected for transmission in the
372    * current round? GNUNET_YES or GNUNET_NO.
373    */
374   int8_t do_transmit;
375
376   /**
377    * Did we give this message some slack (delayed sending) previously
378    * (and hence should not give it any more slack)? GNUNET_YES or
379    * GNUNET_NO.
380    */
381   int8_t got_slack;
382
383 };
384
385
386 struct Neighbour
387 {
388   /**
389    * We keep neighbours in a linked list (for now).
390    */
391   struct Neighbour *next;
392
393   /**
394    * Unencrypted messages destined for this peer.
395    */
396   struct MessageEntry *messages;
397
398   /**
399    * Head of the batched, encrypted message queue (already ordered,
400    * transmit starting with the head).
401    */
402   struct MessageEntry *encrypted_head;
403
404   /**
405    * Tail of the batched, encrypted message queue (already ordered,
406    * append new messages to tail)
407    */
408   struct MessageEntry *encrypted_tail;
409
410   /**
411    * Handle for pending requests for transmission to this peer
412    * with the transport service.  NULL if no request is pending.
413    */
414   struct GNUNET_TRANSPORT_TransmitHandle *th;
415
416   /**
417    * Public key of the neighbour, NULL if we don't have it yet.
418    */
419   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
420
421   /**
422    * We received a PING message before we got the "public_key"
423    * (or the SET_KEY).  We keep it here until we have a key
424    * to decrypt it.  NULL if no PING is pending.
425    */
426   struct PingMessage *pending_ping;
427
428   /**
429    * We received a PONG message before we got the "public_key"
430    * (or the SET_KEY).  We keep it here until we have a key
431    * to decrypt it.  NULL if no PONG is pending.
432    */
433   struct PongMessage *pending_pong;
434
435   /**
436    * Non-NULL if we are currently looking up HELLOs for this peer.
437    * for this peer.
438    */
439   struct GNUNET_PEERINFO_IteratorContext *pitr;
440
441   /**
442    * SetKeyMessage to transmit, NULL if we are not currently trying
443    * to send one.
444    */
445   struct SetKeyMessage *skm;
446
447   /**
448    * Identity of the neighbour.
449    */
450   struct GNUNET_PeerIdentity peer;
451
452   /**
453    * Key we use to encrypt our messages for the other peer
454    * (initialized by us when we do the handshake).
455    */
456   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
457
458   /**
459    * Key we use to decrypt messages from the other peer
460    * (given to us by the other peer during the handshake).
461    */
462   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
463
464   /**
465    * ID of task used for re-trying plaintext scheduling.
466    */
467   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
468
469   /**
470    * ID of task used for re-trying SET_KEY and PING message.
471    */
472   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
473
474   /**
475    * ID of task used for updating bandwidth quota for this neighbour.
476    */
477   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
478
479   /**
480    * ID of task used for cleaning up dead neighbour entries.
481    */
482   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
483
484   /**
485    * At what time did we generate our encryption key?
486    */
487   struct GNUNET_TIME_Absolute encrypt_key_created;
488
489   /**
490    * At what time did the other peer generate the decryption key?
491    */
492   struct GNUNET_TIME_Absolute decrypt_key_created;
493
494   /**
495    * At what time did we initially establish (as in, complete session
496    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
497    */
498   struct GNUNET_TIME_Absolute time_established;
499
500   /**
501    * At what time did we last receive an encrypted message from the
502    * other peer?  Should be zero if status != KEY_CONFIRMED.
503    */
504   struct GNUNET_TIME_Absolute last_activity;
505
506   /**
507    * Last latency observed from this peer.
508    */
509   struct GNUNET_TIME_Relative last_latency;
510
511   /**
512    * At what frequency are we currently re-trying SET_KEY messages?
513    */
514   struct GNUNET_TIME_Relative set_key_retry_frequency;
515
516   /**
517    * Tracking bandwidth for sending to this peer.
518    */
519   struct GNUNET_BANDWIDTH_Tracker available_send_window;
520
521   /**
522    * Tracking bandwidth for receiving from this peer.
523    */
524   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
525
526   /**
527    * How valueable were the messages of this peer recently?
528    */
529   unsigned long long current_preference;
530
531   /**
532    * Bit map indicating which of the 32 sequence numbers before the last
533    * were received (good for accepting out-of-order packets and
534    * estimating reliability of the connection)
535    */
536   unsigned int last_packets_bitmap;
537
538   /**
539    * last sequence number received on this connection (highest)
540    */
541   uint32_t last_sequence_number_received;
542
543   /**
544    * last sequence number transmitted
545    */
546   uint32_t last_sequence_number_sent;
547
548   /**
549    * Available bandwidth in for this peer (current target).
550    */
551   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
552
553   /**
554    * Available bandwidth out for this peer (current target).
555    */
556   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
557
558   /**
559    * Internal bandwidth limit set for this peer (initially typically
560    * set to "-1").  Actual "bw_out" is MIN of
561    * "bpm_out_internal_limit" and "bw_out_external_limit".
562    */
563   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
564
565   /**
566    * External bandwidth limit set for this peer by the
567    * peer that we are communicating with.  "bw_out" is MIN of
568    * "bw_out_internal_limit" and "bw_out_external_limit".
569    */
570   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
571
572   /**
573    * What was our PING challenge number (for this peer)?
574    */
575   uint32_t ping_challenge;
576
577   /**
578    * What was the last distance to this peer as reported by the transports?
579    */
580   uint32_t last_distance;
581
582   /**
583    * What is our connection status?
584    */
585   enum PeerStateMachine status;
586
587   /**
588    * Are we currently connected to this neighbour?
589    */ 
590   int is_connected;
591
592 };
593
594
595 /**
596  * Data structure for each client connected to the core service.
597  */
598 struct Client
599 {
600   /**
601    * Clients are kept in a linked list.
602    */
603   struct Client *next;
604
605   /**
606    * Handle for the client with the server API.
607    */
608   struct GNUNET_SERVER_Client *client_handle;
609
610   /**
611    * Array of the types of messages this peer cares
612    * about (with "tcnt" entries).  Allocated as part
613    * of this client struct, do not free!
614    */
615   const uint16_t *types;
616
617   /**
618    * Options for messages this client cares about,
619    * see GNUNET_CORE_OPTION_ values.
620    */
621   uint32_t options;
622
623   /**
624    * Number of types of incoming messages this client
625    * specifically cares about.  Size of the "types" array.
626    */
627   unsigned int tcnt;
628
629 };
630
631
632 /**
633  * Our public key.
634  */
635 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
636
637 /**
638  * Our identity.
639  */
640 static struct GNUNET_PeerIdentity my_identity;
641
642 /**
643  * Our private key.
644  */
645 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
646
647 /**
648  * Our scheduler.
649  */
650 struct GNUNET_SCHEDULER_Handle *sched;
651
652 /**
653  * Our configuration.
654  */
655 const struct GNUNET_CONFIGURATION_Handle *cfg;
656
657 /**
658  * Our server.
659  */
660 static struct GNUNET_SERVER_Handle *server;
661
662 /**
663  * Transport service.
664  */
665 static struct GNUNET_TRANSPORT_Handle *transport;
666
667 /**
668  * Linked list of our clients.
669  */
670 static struct Client *clients;
671
672 /**
673  * Context for notifications we need to send to our clients.
674  */
675 static struct GNUNET_SERVER_NotificationContext *notifier;
676
677 /**
678  * We keep neighbours in a linked list (for now).
679  */
680 static struct Neighbour *neighbours;
681
682 /**
683  * Sum of all preferences among all neighbours.
684  */
685 static unsigned long long preference_sum;
686
687 /**
688  * Total number of neighbours we have.
689  */
690 static unsigned int neighbour_count;
691
692 /**
693  * How much inbound bandwidth are we supposed to be using per second?
694  * FIXME: this value is not used!
695  */
696 static unsigned long long bandwidth_target_in_bps;
697
698 /**
699  * How much outbound bandwidth are we supposed to be using per second?
700  */
701 static unsigned long long bandwidth_target_out_bps;
702
703
704
705 /**
706  * A preference value for a neighbour was update.  Update
707  * the preference sum accordingly.
708  *
709  * @param inc how much was a preference value increased?
710  */
711 static void
712 update_preference_sum (unsigned long long inc)
713 {
714   struct Neighbour *n;
715   unsigned long long os;
716
717   os = preference_sum;
718   preference_sum += inc;
719   if (preference_sum >= os)
720     return; /* done! */
721   /* overflow! compensate by cutting all values in half! */
722   preference_sum = 0;
723   n = neighbours;
724   while (n != NULL)
725     {
726       n->current_preference /= 2;
727       preference_sum += n->current_preference;
728       n = n->next;
729     }    
730 }
731
732
733 /**
734  * Find the entry for the given neighbour.
735  *
736  * @param peer identity of the neighbour
737  * @return NULL if we are not connected, otherwise the
738  *         neighbour's entry.
739  */
740 static struct Neighbour *
741 find_neighbour (const struct GNUNET_PeerIdentity *peer)
742 {
743   struct Neighbour *ret;
744
745   ret = neighbours;
746   while ((ret != NULL) &&
747          (0 != memcmp (&ret->peer,
748                        peer, sizeof (struct GNUNET_PeerIdentity))))
749     ret = ret->next;
750   return ret;
751 }
752
753
754 /**
755  * Send a message to one of our clients.
756  *
757  * @param client target for the message
758  * @param msg message to transmit
759  * @param can_drop could this message be dropped if the
760  *        client's queue is getting too large?
761  */
762 static void
763 send_to_client (struct Client *client,
764                 const struct GNUNET_MessageHeader *msg, 
765                 int can_drop)
766 {
767 #if DEBUG_CORE_CLIENT
768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769               "Preparing to send message of type %u to client.\n",
770               ntohs (msg->type));
771 #endif  
772   GNUNET_SERVER_notification_context_unicast (notifier,
773                                               client->client_handle,
774                                               msg,
775                                               can_drop);
776 }
777
778
779 /**
780  * Send a message to all of our current clients that have
781  * the right options set.
782  * 
783  * @param msg message to multicast
784  * @param can_drop can this message be discarded if the queue is too long
785  * @param options mask to use 
786  */
787 static void
788 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
789                      int can_drop,
790                      int options)
791 {
792   struct Client *c;
793
794   c = clients;
795   while (c != NULL)
796     {
797       if (0 != (c->options & options))
798         send_to_client (c, msg, can_drop);
799       c = c->next;
800     }
801 }
802
803
804 /**
805  * Handle CORE_INIT request.
806  */
807 static void
808 handle_client_init (void *cls,
809                     struct GNUNET_SERVER_Client *client,
810                     const struct GNUNET_MessageHeader *message)
811 {
812   const struct InitMessage *im;
813   struct InitReplyMessage irm;
814   struct Client *c;
815   uint16_t msize;
816   const uint16_t *types;
817   uint16_t *wtypes;
818   struct Neighbour *n;
819   struct ConnectNotifyMessage cnm;
820   unsigned int i;
821
822 #if DEBUG_CORE_CLIENT
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824               "Client connecting to core service with `%s' message\n",
825               "INIT");
826 #endif
827   /* check that we don't have an entry already */
828   c = clients;
829   while (c != NULL)
830     {
831       if (client == c->client_handle)
832         {
833           GNUNET_break (0);
834           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
835           return;
836         }
837       c = c->next;
838     }
839   msize = ntohs (message->size);
840   if (msize < sizeof (struct InitMessage))
841     {
842       GNUNET_break (0);
843       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
844       return;
845     }
846   GNUNET_SERVER_notification_context_add (notifier, client);
847   im = (const struct InitMessage *) message;
848   types = (const uint16_t *) &im[1];
849   msize -= sizeof (struct InitMessage);
850   c = GNUNET_malloc (sizeof (struct Client) + msize);
851   c->client_handle = client;
852   c->next = clients;
853   clients = c;
854   c->tcnt = msize / sizeof (uint16_t);
855   c->types = (const uint16_t *) &c[1];
856   wtypes = (uint16_t *) &c[1];
857   for (i=0;i<c->tcnt;i++)
858     wtypes[i] = ntohs (types[i]);
859   c->options = ntohl (im->options);
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Client %p is interested in %u message types\n",
863               c,
864               c->tcnt);
865 #endif
866   /* send init reply message */
867   irm.header.size = htons (sizeof (struct InitReplyMessage));
868   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
869   irm.reserved = htonl (0);
870   memcpy (&irm.publicKey,
871           &my_public_key,
872           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
873 #if DEBUG_CORE_CLIENT
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "Sending `%s' message to client.\n", "INIT_REPLY");
876 #endif
877   send_to_client (c, &irm.header, GNUNET_NO);
878   /* notify new client about existing neighbours */
879   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
880   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
881   n = neighbours;
882   while (n != NULL)
883     {
884       if (n->status == PEER_STATE_KEY_CONFIRMED)
885         {
886 #if DEBUG_CORE_CLIENT
887           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888                       "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
889 #endif
890           cnm.distance = htonl (n->last_distance);
891           cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
892           cnm.peer = n->peer;
893           send_to_client (c, &cnm.header, GNUNET_NO);
894         }
895       n = n->next;
896     }
897   GNUNET_SERVER_receive_done (client, GNUNET_OK);
898 }
899
900
901 /**
902  * A client disconnected, clean up.
903  *
904  * @param cls closure
905  * @param client identification of the client
906  */
907 static void
908 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
909 {
910   struct Client *pos;
911   struct Client *prev;
912
913   if (client == NULL)
914     return;
915 #if DEBUG_CORE_CLIENT
916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
917               "Client %p has disconnected from core service.\n",
918               client);
919 #endif
920   prev = NULL;
921   pos = clients;
922   while (pos != NULL)
923     {
924       if (client == pos->client_handle)
925         {
926           if (prev == NULL)
927             clients = pos->next;
928           else
929             prev->next = pos->next;
930           GNUNET_free (pos);
931           return;
932         }
933       prev = pos;
934       pos = pos->next;
935     }
936   /* client never sent INIT */
937 }
938
939
940 /**
941  * Handle REQUEST_INFO request.
942  */
943 static void
944 handle_client_request_info (void *cls,
945                             struct GNUNET_SERVER_Client *client,
946                             const struct GNUNET_MessageHeader *message)
947 {
948   const struct RequestInfoMessage *rcm;
949   struct Neighbour *n;
950   struct ConfigurationInfoMessage cim;
951   int32_t want_reserv;
952   int32_t got_reserv;
953   unsigned long long old_preference;
954   struct GNUNET_SERVER_TransmitContext *tc;
955
956 #if DEBUG_CORE_CLIENT
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "Core service receives `%s' request.\n", "REQUEST_INFO");
959 #endif
960   rcm = (const struct RequestInfoMessage *) message;
961   n = find_neighbour (&rcm->peer);
962   memset (&cim, 0, sizeof (cim));
963   if (n != NULL) 
964     {
965       want_reserv = ntohl (rcm->reserve_inbound);
966       n->bw_out_internal_limit = rcm->limit_outbound;
967       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
968                                               n->bw_out_external_limit);
969       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
970                                              n->bw_out);
971       if (want_reserv < 0)
972         {
973           got_reserv = want_reserv;
974         }
975       else if (want_reserv > 0)
976         {
977           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
978                                                   want_reserv).value == 0)
979             got_reserv = want_reserv;
980           else
981             got_reserv = 0; /* all or nothing */
982         }
983       else
984         got_reserv = 0;
985       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
986                                         got_reserv);
987       old_preference = n->current_preference;
988       n->current_preference += GNUNET_ntohll(rcm->preference_change);
989       if (old_preference > n->current_preference) 
990         {
991           /* overflow; cap at maximum value */
992           n->current_preference = (unsigned long long) -1;
993         }
994       update_preference_sum (n->current_preference - old_preference);
995 #if DEBUG_CORE_QUOTA
996       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
998                   (int) want_reserv,
999                   GNUNET_i2s (&rcm->peer),
1000                   (int) got_reserv);
1001 #endif
1002       cim.reserved_amount = htonl (got_reserv);
1003       cim.bw_in = n->bw_in;
1004       cim.bw_out = n->bw_out;
1005       cim.preference = n->current_preference;
1006     }
1007   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1008   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1009   cim.peer = rcm->peer;
1010
1011 #if DEBUG_CORE_CLIENT
1012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1014 #endif
1015   tc = GNUNET_SERVER_transmit_context_create (client);
1016   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1017   GNUNET_SERVER_transmit_context_run (tc,
1018                                       GNUNET_TIME_UNIT_FOREVER_REL);
1019 }
1020
1021
1022 /**
1023  * Free the given entry for the neighbour (it has
1024  * already been removed from the list at this point).
1025  *
1026  * @param n neighbour to free
1027  */
1028 static void
1029 free_neighbour (struct Neighbour *n)
1030 {
1031   struct MessageEntry *m;
1032
1033   if (n->pitr != NULL)
1034     {
1035       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1036       n->pitr = NULL;
1037     }
1038   if (n->skm != NULL)
1039     {
1040       GNUNET_free (n->skm);
1041       n->skm = NULL;
1042     }
1043   while (NULL != (m = n->messages))
1044     {
1045       n->messages = m->next;
1046       GNUNET_free (m);
1047     }
1048   while (NULL != (m = n->encrypted_head))
1049     {
1050       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1051                                    n->encrypted_tail,
1052                                    m);
1053       GNUNET_free (m);
1054     }
1055   if (NULL != n->th)
1056     {
1057       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1058       n->th = NULL;
1059     }
1060   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1061     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1062   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1063     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1064   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1065     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1066   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1067     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1068   GNUNET_free_non_null (n->public_key);
1069   GNUNET_free_non_null (n->pending_ping);
1070   GNUNET_free_non_null (n->pending_pong);
1071   GNUNET_free (n);
1072 }
1073
1074
1075 /**
1076  * Consider freeing the given neighbour since we may not need
1077  * to keep it around anymore.
1078  *
1079  * @param n neighbour to consider discarding
1080  */
1081 static void
1082 consider_free_neighbour (struct Neighbour *n);
1083
1084
1085 /**
1086  * Task triggered when a neighbour entry might have gotten stale.
1087  *
1088  * @param cls the 'struct Neighbour'
1089  * @param tc scheduler context (not used)
1090  */
1091 static void
1092 consider_free_task (void *cls,
1093                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1094 {
1095   struct Neighbour *n = cls;
1096   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1097   consider_free_neighbour (n);
1098 }
1099
1100
1101 /**
1102  * Consider freeing the given neighbour since we may not need
1103  * to keep it around anymore.
1104  *
1105  * @param n neighbour to consider discarding
1106  */
1107 static void
1108 consider_free_neighbour (struct Neighbour *n)
1109
1110   struct Neighbour *pos;
1111   struct Neighbour *prev;
1112   struct GNUNET_TIME_Relative left;
1113
1114   if ( (n->th != NULL) ||
1115        (n->pitr != NULL) ||
1116        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1117        (GNUNET_YES == n->is_connected) )
1118     return; /* no chance */
1119   
1120   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1121                                                                        MAX_PONG_DELAY));
1122   if (left.value > 0)
1123     {
1124       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1125         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1126       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1127                                                          left,
1128                                                          &consider_free_task,
1129                                                          n);
1130       return;
1131     }
1132   /* actually free the neighbour... */
1133   prev = NULL;
1134   pos = neighbours;
1135   while (pos != n)
1136     {
1137       prev = pos;
1138       pos = pos->next;
1139     }
1140   if (prev == NULL)
1141     neighbours = n->next;
1142   else
1143     prev->next = n->next;
1144   GNUNET_assert (neighbour_count > 0);
1145   neighbour_count--;
1146   free_neighbour (n);
1147 }
1148
1149
1150 /**
1151  * Check if we have encrypted messages for the specified neighbour
1152  * pending, and if so, check with the transport about sending them
1153  * out.
1154  *
1155  * @param n neighbour to check.
1156  */
1157 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1158
1159
1160 /**
1161  * Function called when the transport service is ready to
1162  * receive an encrypted message for the respective peer
1163  *
1164  * @param cls neighbour to use message from
1165  * @param size number of bytes we can transmit
1166  * @param buf where to copy the message
1167  * @return number of bytes transmitted
1168  */
1169 static size_t
1170 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1171 {
1172   struct Neighbour *n = cls;
1173   struct MessageEntry *m;
1174   size_t ret;
1175   char *cbuf;
1176
1177   n->th = NULL;
1178   GNUNET_assert (NULL != (m = n->encrypted_head));
1179   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1180                                n->encrypted_tail,
1181                                m);
1182   ret = 0;
1183   cbuf = buf;
1184   if (buf != NULL)
1185     {
1186       GNUNET_assert (size >= m->size);
1187       memcpy (cbuf, &m[1], m->size);
1188       ret = m->size;
1189       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1190                                         m->size);
1191 #if DEBUG_CORE
1192       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1194                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1195                   ret, GNUNET_i2s (&n->peer));
1196 #endif
1197       process_encrypted_neighbour_queue (n);
1198     }
1199   else
1200     {
1201 #if DEBUG_CORE
1202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203                   "Transmission of message of type %u and size %u failed\n",
1204                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1205                   m->size);
1206 #endif
1207     }
1208   GNUNET_free (m);
1209   consider_free_neighbour (n);
1210   return ret;
1211 }
1212
1213
1214 /**
1215  * Check if we have plaintext messages for the specified neighbour
1216  * pending, and if so, consider batching and encrypting them (and
1217  * then trigger processing of the encrypted queue if needed).
1218  *
1219  * @param n neighbour to check.
1220  */
1221 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1222
1223
1224 /**
1225  * Check if we have encrypted messages for the specified neighbour
1226  * pending, and if so, check with the transport about sending them
1227  * out.
1228  *
1229  * @param n neighbour to check.
1230  */
1231 static void
1232 process_encrypted_neighbour_queue (struct Neighbour *n)
1233 {
1234   struct MessageEntry *m;
1235  
1236   if (n->th != NULL)
1237     return;  /* request already pending */
1238   m = n->encrypted_head;
1239   if (m == NULL)
1240     {
1241       /* encrypted queue empty, try plaintext instead */
1242       process_plaintext_neighbour_queue (n);
1243       return;
1244     }
1245 #if DEBUG_CORE
1246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1248               m->size,
1249               GNUNET_i2s (&n->peer),
1250               GNUNET_TIME_absolute_get_remaining (m->deadline).
1251               value);
1252 #endif
1253   n->th =
1254     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1255                                             m->size,
1256                                             m->priority,
1257                                             GNUNET_TIME_absolute_get_remaining
1258                                             (m->deadline),
1259                                             &notify_encrypted_transmit_ready,
1260                                             n);
1261   if (n->th == NULL)
1262     {
1263       /* message request too large or duplicate request */
1264       GNUNET_break (0);
1265       /* discard encrypted message */
1266       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1267                                    n->encrypted_tail,
1268                                    m);
1269       GNUNET_free (m);
1270       process_encrypted_neighbour_queue (n);
1271     }
1272 }
1273
1274
1275 /**
1276  * Decrypt size bytes from in and write the result to out.  Use the
1277  * key for inbound traffic of the given neighbour.  This function does
1278  * NOT do any integrity-checks on the result.
1279  *
1280  * @param n neighbour we are receiving from
1281  * @param iv initialization vector to use
1282  * @param in ciphertext
1283  * @param out plaintext
1284  * @param size size of in/out
1285  * @return GNUNET_OK on success
1286  */
1287 static int
1288 do_decrypt (struct Neighbour *n,
1289             const GNUNET_HashCode * iv,
1290             const void *in, void *out, size_t size)
1291 {
1292   if (size != (uint16_t) size)
1293     {
1294       GNUNET_break (0);
1295       return GNUNET_NO;
1296     }
1297   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1298       (n->status != PEER_STATE_KEY_CONFIRMED))
1299     {
1300       GNUNET_break_op (0);
1301       return GNUNET_SYSERR;
1302     }
1303   if (size !=
1304       GNUNET_CRYPTO_aes_decrypt (in,
1305                                  (uint16_t) size,
1306                                  &n->decrypt_key,
1307                                  (const struct
1308                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1309                                  out))
1310     {
1311       GNUNET_break (0);
1312       return GNUNET_SYSERR;
1313     }
1314 #if DEBUG_CORE
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Decrypted %u bytes from `%4s' using key %u\n",
1317               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1318 #endif
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Encrypt size bytes from in and write the result to out.  Use the
1325  * key for outbound traffic of the given neighbour.
1326  *
1327  * @param n neighbour we are sending to
1328  * @param iv initialization vector to use
1329  * @param in ciphertext
1330  * @param out plaintext
1331  * @param size size of in/out
1332  * @return GNUNET_OK on success
1333  */
1334 static int
1335 do_encrypt (struct Neighbour *n,
1336             const GNUNET_HashCode * iv,
1337             const void *in, void *out, size_t size)
1338 {
1339   if (size != (uint16_t) size)
1340     {
1341       GNUNET_break (0);
1342       return GNUNET_NO;
1343     }
1344   GNUNET_assert (size ==
1345                  GNUNET_CRYPTO_aes_encrypt (in,
1346                                             (uint16_t) size,
1347                                             &n->encrypt_key,
1348                                             (const struct
1349                                              GNUNET_CRYPTO_AesInitializationVector
1350                                              *) iv, out));
1351 #if DEBUG_CORE
1352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353               "Encrypted %u bytes for `%4s' using key %u\n", size,
1354               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1355 #endif
1356   return GNUNET_OK;
1357 }
1358
1359
1360 /**
1361  * Select messages for transmission.  This heuristic uses a combination
1362  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1363  * and priority-based discard (in case no feasible schedule exist) and
1364  * speculative optimization (defer any kind of transmission until
1365  * we either create a batch of significant size, 25% of max, or until
1366  * we are close to a deadline).  Furthermore, when scheduling the
1367  * heuristic also packs as many messages into the batch as possible,
1368  * starting with those with the earliest deadline.  Yes, this is fun.
1369  *
1370  * @param n neighbour to select messages from
1371  * @param size number of bytes to select for transmission
1372  * @param retry_time set to the time when we should try again
1373  *        (only valid if this function returns zero)
1374  * @return number of bytes selected, or 0 if we decided to
1375  *         defer scheduling overall; in that case, retry_time is set.
1376  */
1377 static size_t
1378 select_messages (struct Neighbour *n,
1379                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1380 {
1381   struct MessageEntry *pos;
1382   struct MessageEntry *min;
1383   struct MessageEntry *last;
1384   unsigned int min_prio;
1385   struct GNUNET_TIME_Absolute t;
1386   struct GNUNET_TIME_Absolute now;
1387   struct GNUNET_TIME_Relative delta;
1388   uint64_t avail;
1389   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1390   size_t off;
1391   uint64_t tsize;
1392   unsigned int queue_size;
1393   int discard_low_prio;
1394
1395   GNUNET_assert (NULL != n->messages);
1396   now = GNUNET_TIME_absolute_get ();
1397   /* last entry in linked list of messages processed */
1398   last = NULL;
1399   /* should we remove the entry with the lowest
1400      priority from consideration for scheduling at the
1401      end of the loop? */
1402   queue_size = 0;
1403   tsize = 0;
1404   pos = n->messages;
1405   while (pos != NULL)
1406     {
1407       queue_size++;
1408       tsize += pos->size;
1409       pos = pos->next;
1410     }
1411   discard_low_prio = GNUNET_YES;
1412   while (GNUNET_YES == discard_low_prio)
1413     {
1414       min = NULL;
1415       min_prio = -1;
1416       discard_low_prio = GNUNET_NO;
1417       /* calculate number of bytes available for transmission at time "t" */
1418       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1419       t = now;
1420       /* how many bytes have we (hypothetically) scheduled so far */
1421       off = 0;
1422       /* maximum time we can wait before transmitting anything
1423          and still make all of our deadlines */
1424       slack = MAX_CORK_DELAY;
1425       pos = n->messages;
1426       /* note that we use "*2" here because we want to look
1427          a bit further into the future; much more makes no
1428          sense since new message might be scheduled in the
1429          meantime... */
1430       while ((pos != NULL) && (off < size * 2))
1431         {         
1432           if (pos->do_transmit == GNUNET_YES)
1433             {
1434               /* already removed from consideration */
1435               pos = pos->next;
1436               continue;
1437             }
1438           if (discard_low_prio == GNUNET_NO)
1439             {
1440               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1441               if (delta.value > 0)
1442                 {
1443                   // FIXME: HUH? Check!
1444                   t = pos->deadline;
1445                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1446                                                                        delta);
1447                 }
1448               if (avail < pos->size)
1449                 {
1450                   // FIXME: HUH? Check!
1451                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1452                 }
1453               else
1454                 {
1455                   avail -= pos->size;
1456                   /* update slack, considering both its absolute deadline
1457                      and relative deadlines caused by other messages
1458                      with their respective load */
1459                   slack = GNUNET_TIME_relative_min (slack,
1460                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1461                                                                                           avail));
1462                   if (pos->deadline.value <= now.value) 
1463                     {
1464                       /* now or never */
1465                       slack = GNUNET_TIME_UNIT_ZERO;
1466                     }
1467                   else if (GNUNET_YES == pos->got_slack)
1468                     {
1469                       /* should be soon now! */
1470                       slack = GNUNET_TIME_relative_min (slack,
1471                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1472                     }
1473                   else
1474                     {
1475                       slack =
1476                         GNUNET_TIME_relative_min (slack, 
1477                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1478                       pos->got_slack = GNUNET_YES;
1479                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1480                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1481                     }
1482                 }
1483             }
1484           off += pos->size;
1485           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1486           if (pos->priority <= min_prio)
1487             {
1488               /* update min for discard */
1489               min_prio = pos->priority;
1490               min = pos;
1491             }
1492           pos = pos->next;
1493         }
1494       if (discard_low_prio)
1495         {
1496           GNUNET_assert (min != NULL);
1497           /* remove lowest-priority entry from consideration */
1498           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1499         }
1500       last = pos;
1501     }
1502   /* guard against sending "tiny" messages with large headers without
1503      urgent deadlines */
1504   if ( (slack.value > 0) && 
1505        (size > 4 * off) &&
1506        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1507     {
1508       /* less than 25% of message would be filled with deadlines still
1509          being met if we delay by one second or more; so just wait for
1510          more data; but do not wait longer than 1s (since we don't want
1511          to delay messages for a really long time either). */
1512       *retry_time = MAX_CORK_DELAY;
1513       /* reset do_transmit values for next time */
1514       while (pos != last)
1515         {
1516           pos->do_transmit = GNUNET_NO;   
1517           pos = pos->next;
1518         }
1519 #if DEBUG_CORE
1520       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1521                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1522                   (unsigned long long) slack.value,
1523                   (unsigned int) off,
1524                   (unsigned int) size);
1525 #endif
1526       return 0;
1527     }
1528   /* select marked messages (up to size) for transmission */
1529   off = 0;
1530   pos = n->messages;
1531   while (pos != last)
1532     {
1533       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1534         {
1535           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1536           off += pos->size;
1537           size -= pos->size;
1538         }
1539       else
1540         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1541       pos = pos->next;
1542     }
1543 #if DEBUG_CORE
1544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1546               off, tsize,
1547               queue_size, MAX_PEER_QUEUE_SIZE,
1548               GNUNET_i2s (&n->peer));
1549 #endif
1550   return off;
1551 }
1552
1553
1554 /**
1555  * Batch multiple messages into a larger buffer.
1556  *
1557  * @param n neighbour to take messages from
1558  * @param buf target buffer
1559  * @param size size of buf
1560  * @param deadline set to transmission deadline for the result
1561  * @param retry_time set to the time when we should try again
1562  *        (only valid if this function returns zero)
1563  * @param priority set to the priority of the batch
1564  * @return number of bytes written to buf (can be zero)
1565  */
1566 static size_t
1567 batch_message (struct Neighbour *n,
1568                char *buf,
1569                size_t size,
1570                struct GNUNET_TIME_Absolute *deadline,
1571                struct GNUNET_TIME_Relative *retry_time,
1572                unsigned int *priority)
1573 {
1574   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1575   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1576   struct MessageEntry *pos;
1577   struct MessageEntry *prev;
1578   struct MessageEntry *next;
1579   size_t ret;
1580   
1581   ret = 0;
1582   *priority = 0;
1583   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1584   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1585   if (0 == select_messages (n, size, retry_time))
1586     {
1587       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1588                   "No messages selected, will try again in %llu ms\n",
1589                   retry_time->value);
1590       return 0;
1591     }
1592   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1593   ntm->distance = htonl (n->last_distance);
1594   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1595   ntm->peer = n->peer;
1596   
1597   pos = n->messages;
1598   prev = NULL;
1599   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1600     {
1601       next = pos->next;
1602       if (GNUNET_YES == pos->do_transmit)
1603         {
1604           GNUNET_assert (pos->size <= size);
1605           /* do notifications */
1606           /* FIXME: track if we have *any* client that wants
1607              full notifications and only do this if that is
1608              actually true */
1609           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1610             {
1611               memcpy (&ntm[1], &pos[1], pos->size);
1612               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1613                                         sizeof (struct GNUNET_MessageHeader));
1614               send_to_all_clients (&ntm->header,
1615                                    GNUNET_YES,
1616                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1617             }
1618           else
1619             {
1620               /* message too large for 'full' notifications, we do at
1621                  least the 'hdr' type */
1622               memcpy (&ntm[1],
1623                       &pos[1],
1624                       sizeof (struct GNUNET_MessageHeader));
1625             }
1626           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1627                                     pos->size);
1628           send_to_all_clients (&ntm->header,
1629                                GNUNET_YES,
1630                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1631 #if DEBUG_HANDSHAKE
1632           fprintf (stderr,
1633                    "Encrypting message of type %u\n",
1634                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1635 #endif
1636           /* copy for encrypted transmission */
1637           memcpy (&buf[ret], &pos[1], pos->size);
1638           ret += pos->size;
1639           size -= pos->size;
1640           *priority += pos->priority;
1641 #if DEBUG_CORE
1642           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1643                       "Adding plaintext message with deadline %llu ms to batch\n",
1644                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1645 #endif
1646           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1647           GNUNET_free (pos);
1648           if (prev == NULL)
1649             n->messages = next;
1650           else
1651             prev->next = next;
1652         }
1653       else
1654         {
1655           prev = pos;
1656         }
1657       pos = next;
1658     }
1659 #if DEBUG_CORE
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "Deadline for message batch is %llu ms\n",
1662               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1663 #endif
1664   return ret;
1665 }
1666
1667
1668 /**
1669  * Remove messages with deadlines that have long expired from
1670  * the queue.
1671  *
1672  * @param n neighbour to inspect
1673  */
1674 static void
1675 discard_expired_messages (struct Neighbour *n)
1676 {
1677   struct MessageEntry *prev;
1678   struct MessageEntry *next;
1679   struct MessageEntry *pos;
1680   struct GNUNET_TIME_Absolute now;
1681   struct GNUNET_TIME_Relative delta;
1682
1683   now = GNUNET_TIME_absolute_get ();
1684   prev = NULL;
1685   pos = n->messages;
1686   while (pos != NULL) 
1687     {
1688       next = pos->next;
1689       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1690       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1691         {
1692 #if DEBUG_CORE
1693           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1694                       "Message is %llu ms past due, discarding.\n",
1695                       delta.value);
1696 #endif
1697           if (prev == NULL)
1698             n->messages = next;
1699           else
1700             prev->next = next;
1701           GNUNET_free (pos);
1702         }
1703       else
1704         prev = pos;
1705       pos = next;
1706     }
1707 }
1708
1709
1710 /**
1711  * Signature of the main function of a task.
1712  *
1713  * @param cls closure
1714  * @param tc context information (why was this task triggered now)
1715  */
1716 static void
1717 retry_plaintext_processing (void *cls,
1718                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1719 {
1720   struct Neighbour *n = cls;
1721
1722   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1723   process_plaintext_neighbour_queue (n);
1724 }
1725
1726
1727 /**
1728  * Send our key (and encrypted PING) to the other peer.
1729  *
1730  * @param n the other peer
1731  */
1732 static void send_key (struct Neighbour *n);
1733
1734 /**
1735  * Task that will retry "send_key" if our previous attempt failed
1736  * to yield a PONG.
1737  */
1738 static void
1739 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1740 {
1741   struct Neighbour *n = cls;
1742
1743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1744               "Retrying key transmission to `%4s'\n",
1745               GNUNET_i2s (&n->peer));
1746   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1747   n->set_key_retry_frequency =
1748     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1749   send_key (n);
1750 }
1751
1752
1753 /**
1754  * Check if we have plaintext messages for the specified neighbour
1755  * pending, and if so, consider batching and encrypting them (and
1756  * then trigger processing of the encrypted queue if needed).
1757  *
1758  * @param n neighbour to check.
1759  */
1760 static void
1761 process_plaintext_neighbour_queue (struct Neighbour *n)
1762 {
1763   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1764   size_t used;
1765   size_t esize;
1766   struct EncryptedMessage *em;  /* encrypted message */
1767   struct EncryptedMessage *ph;  /* plaintext header */
1768   struct MessageEntry *me;
1769   unsigned int priority;
1770   struct GNUNET_TIME_Absolute deadline;
1771   struct GNUNET_TIME_Relative retry_time;
1772   GNUNET_HashCode iv;
1773
1774   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1775     {
1776       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1777       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1778     }
1779   switch (n->status)
1780     {
1781     case PEER_STATE_DOWN:
1782       send_key (n);
1783 #if DEBUG_CORE
1784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1786                   GNUNET_i2s(&n->peer));
1787 #endif
1788       return;
1789     case PEER_STATE_KEY_SENT:
1790       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1791         n->retry_set_key_task
1792           = GNUNET_SCHEDULER_add_delayed (sched,
1793                                           n->set_key_retry_frequency,
1794                                           &set_key_retry_task, n);    
1795 #if DEBUG_CORE
1796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1798                   GNUNET_i2s(&n->peer));
1799 #endif
1800       return;
1801     case PEER_STATE_KEY_RECEIVED:
1802       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1803         n->retry_set_key_task
1804           = GNUNET_SCHEDULER_add_delayed (sched,
1805                                           n->set_key_retry_frequency,
1806                                           &set_key_retry_task, n);        
1807 #if DEBUG_CORE
1808       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1810                   GNUNET_i2s(&n->peer));
1811 #endif
1812       return;
1813     case PEER_STATE_KEY_CONFIRMED:
1814       /* ready to continue */
1815       break;
1816     }
1817   discard_expired_messages (n);
1818   if (n->messages == NULL)
1819     {
1820 #if DEBUG_CORE
1821       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1822                   "Plaintext message queue for `%4s' is empty.\n",
1823                   GNUNET_i2s(&n->peer));
1824 #endif
1825       return;                   /* no pending messages */
1826     }
1827   if (n->encrypted_head != NULL)
1828     {
1829 #if DEBUG_CORE
1830       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1831                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1832                   GNUNET_i2s(&n->peer));
1833 #endif
1834       return;                   /* wait for messages already encrypted to be
1835                                    processed first! */
1836     }
1837   ph = (struct EncryptedMessage *) pbuf;
1838   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1839   priority = 0;
1840   used = sizeof (struct EncryptedMessage);
1841   used += batch_message (n,
1842                          &pbuf[used],
1843                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1844                          &deadline, &retry_time, &priority);
1845   if (used == sizeof (struct EncryptedMessage))
1846     {
1847 #if DEBUG_CORE
1848       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1849                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1850                   GNUNET_i2s(&n->peer));
1851 #endif
1852       /* no messages selected for sending, try again later... */
1853       n->retry_plaintext_task =
1854         GNUNET_SCHEDULER_add_delayed (sched,
1855                                       retry_time,
1856                                       &retry_plaintext_processing, n);
1857       return;
1858     }
1859 #if DEBUG_CORE_QUOTA
1860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1861               "Sending %u b/s as new limit to peer `%4s'\n",
1862               (unsigned int) ntohl (n->bw_in.value__),
1863               GNUNET_i2s (&n->peer));
1864 #endif
1865   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1866   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1867   ph->inbound_bw_limit = n->bw_in;
1868   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1869
1870   /* setup encryption message header */
1871   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1872   me->deadline = deadline;
1873   me->priority = priority;
1874   me->size = used;
1875   em = (struct EncryptedMessage *) &me[1];
1876   em->header.size = htons (used);
1877   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1878   em->iv_seed = ph->iv_seed;
1879   esize = used - ENCRYPTED_HEADER_SIZE;
1880   GNUNET_CRYPTO_hash (&ph->sequence_number,
1881                       esize - sizeof (GNUNET_HashCode), 
1882                       &ph->plaintext_hash);
1883   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
1884   /* encrypt */
1885 #if DEBUG_CORE
1886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1887               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1888               esize,
1889               GNUNET_i2s(&n->peer),
1890               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1891 #endif
1892   GNUNET_assert (GNUNET_OK ==
1893                  do_encrypt (n,
1894                              &iv,
1895                              &ph->plaintext_hash,
1896                              &em->plaintext_hash, esize));
1897   /* append to transmission list */
1898   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1899                                      n->encrypted_tail,
1900                                      n->encrypted_tail,
1901                                      me);
1902   process_encrypted_neighbour_queue (n);
1903 }
1904
1905
1906 /**
1907  * Function that recalculates the bandwidth quota for the
1908  * given neighbour and transmits it to the transport service.
1909  * 
1910  * @param cls neighbour for the quota update
1911  * @param tc context
1912  */
1913 static void
1914 neighbour_quota_update (void *cls,
1915                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1916
1917
1918 /**
1919  * Schedule the task that will recalculate the bandwidth
1920  * quota for this peer (and possibly force a disconnect of
1921  * idle peers by calculating a bandwidth of zero).
1922  */
1923 static void
1924 schedule_quota_update (struct Neighbour *n)
1925 {
1926   GNUNET_assert (n->quota_update_task ==
1927                  GNUNET_SCHEDULER_NO_TASK);
1928   n->quota_update_task
1929     = GNUNET_SCHEDULER_add_delayed (sched,
1930                                     QUOTA_UPDATE_FREQUENCY,
1931                                     &neighbour_quota_update,
1932                                     n);
1933 }
1934
1935
1936 /**
1937  * Initialize a new 'struct Neighbour'.
1938  *
1939  * @param pid ID of the new neighbour
1940  * @return handle for the new neighbour
1941  */
1942 static struct Neighbour *
1943 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1944 {
1945   struct Neighbour *n;
1946   struct GNUNET_TIME_Absolute now;
1947
1948   n = GNUNET_malloc (sizeof (struct Neighbour));
1949   n->next = neighbours;
1950   neighbours = n;
1951   neighbour_count++;
1952   n->peer = *pid;
1953   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1954   now = GNUNET_TIME_absolute_get ();
1955   n->encrypt_key_created = now;
1956   n->last_activity = now;
1957   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1958   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1959   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1960   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
1961   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1962   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1963                                                 (uint32_t) - 1);
1964   neighbour_quota_update (n, NULL);
1965   return n;
1966 }
1967
1968
1969 /**
1970  * Handle CORE_SEND request.
1971  *
1972  * @param cls unused
1973  * @param client the client issuing the request
1974  * @param message the "struct SendMessage"
1975  */
1976 static void
1977 handle_client_send (void *cls,
1978                     struct GNUNET_SERVER_Client *client,
1979                     const struct GNUNET_MessageHeader *message)
1980 {
1981   const struct SendMessage *sm;
1982   struct Neighbour *n;
1983   struct MessageEntry *prev;
1984   struct MessageEntry *pos;
1985   struct MessageEntry *e; 
1986   struct MessageEntry *min_prio_entry;
1987   struct MessageEntry *min_prio_prev;
1988   unsigned int min_prio;
1989   unsigned int queue_size;
1990   uint16_t msize;
1991
1992   msize = ntohs (message->size);
1993   if (msize <
1994       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1995     {
1996       GNUNET_break (0);
1997       if (client != NULL)
1998         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1999       return;
2000     }
2001   sm = (const struct SendMessage *) message;
2002   msize -= sizeof (struct SendMessage);
2003   n = find_neighbour (&sm->peer);
2004   if (n == NULL)
2005     n = create_neighbour (&sm->peer);
2006 #if DEBUG_CORE
2007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2008               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2009               "SEND",
2010               msize, 
2011               GNUNET_i2s (&sm->peer));
2012 #endif
2013   /* bound queue size */
2014   discard_expired_messages (n);
2015   min_prio = (unsigned int) -1;
2016   min_prio_entry = NULL;
2017   min_prio_prev = NULL;
2018   queue_size = 0;
2019   prev = NULL;
2020   pos = n->messages;
2021   while (pos != NULL) 
2022     {
2023       if (pos->priority < min_prio)
2024         {
2025           min_prio_entry = pos;
2026           min_prio_prev = prev;
2027           min_prio = pos->priority;
2028         }
2029       queue_size++;
2030       prev = pos;
2031       pos = pos->next;
2032     }
2033   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2034     {
2035       /* queue full */
2036       if (ntohl(sm->priority) <= min_prio)
2037         {
2038           /* discard new entry */
2039 #if DEBUG_CORE
2040           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2041                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2042                       queue_size,
2043                       MAX_PEER_QUEUE_SIZE,
2044                       msize,
2045                       ntohs (message->type));
2046 #endif
2047           if (client != NULL)
2048             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2049           return;
2050         }
2051       /* discard "min_prio_entry" */
2052 #if DEBUG_CORE
2053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054                   "Queue full, discarding existing older request\n");
2055 #endif
2056       if (min_prio_prev == NULL)
2057         n->messages = min_prio_entry->next;
2058       else
2059         min_prio_prev->next = min_prio_entry->next;      
2060       GNUNET_free (min_prio_entry);     
2061     }
2062
2063 #if DEBUG_CORE
2064   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2065               "Adding transmission request for `%4s' of size %u to queue\n",
2066               GNUNET_i2s (&sm->peer),
2067               msize);
2068 #endif  
2069   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2070   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2071   e->priority = ntohl (sm->priority);
2072   e->size = msize;
2073   memcpy (&e[1], &sm[1], msize);
2074
2075   /* insert, keep list sorted by deadline */
2076   prev = NULL;
2077   pos = n->messages;
2078   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2079     {
2080       prev = pos;
2081       pos = pos->next;
2082     }
2083   if (prev == NULL)
2084     n->messages = e;
2085   else
2086     prev->next = e;
2087   e->next = pos;
2088
2089   /* consider scheduling now */
2090   process_plaintext_neighbour_queue (n);
2091   if (client != NULL)
2092     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2093 }
2094
2095
2096 /**
2097  * Function called when the transport service is ready to
2098  * receive a message.  Only resets 'n->th' to NULL.
2099  *
2100  * @param cls neighbour to use message from
2101  * @param size number of bytes we can transmit
2102  * @param buf where to copy the message
2103  * @return number of bytes transmitted
2104  */
2105 static size_t
2106 notify_transport_connect_done (void *cls, size_t size, void *buf)
2107 {
2108   struct Neighbour *n = cls;
2109
2110   n->th = NULL;
2111   if (buf == NULL)
2112     {
2113       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2114                   _("Failed to connect to `%4s': transport failed to connect\n"),
2115                   GNUNET_i2s (&n->peer));
2116       return 0;
2117     }
2118   send_key (n);
2119   return 0;
2120 }
2121
2122
2123 /**
2124  * Handle CORE_REQUEST_CONNECT request.
2125  *
2126  * @param cls unused
2127  * @param client the client issuing the request
2128  * @param message the "struct ConnectMessage"
2129  */
2130 static void
2131 handle_client_request_connect (void *cls,
2132                                struct GNUNET_SERVER_Client *client,
2133                                const struct GNUNET_MessageHeader *message)
2134 {
2135   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2136   struct Neighbour *n;
2137   struct GNUNET_TIME_Relative timeout;
2138
2139   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2140     {
2141       GNUNET_break (0);
2142       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2143       return;
2144     }
2145   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2146   n = find_neighbour (&cm->peer);
2147   if (n == NULL)
2148     n = create_neighbour (&cm->peer);
2149   if ( (n->is_connected) ||
2150        (n->th != NULL) )
2151     return; /* already connected, or at least trying */
2152 #if DEBUG_CORE
2153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154               "Core received `%s' request for `%4s', will try to establish connection\n",
2155               "REQUEST_CONNECT",
2156               GNUNET_i2s (&cm->peer));
2157 #endif
2158   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2159   /* ask transport to connect to the peer */
2160   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2161                                                   &cm->peer,
2162                                                   sizeof (struct GNUNET_MessageHeader), 0,
2163                                                   timeout,
2164                                                   &notify_transport_connect_done,
2165                                                   n);
2166   GNUNET_break (NULL != n->th);
2167 }
2168
2169
2170 /**
2171  * List of handlers for the messages understood by this
2172  * service.
2173  */
2174 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2175   {&handle_client_init, NULL,
2176    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2177   {&handle_client_request_info, NULL,
2178    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2179    sizeof (struct RequestInfoMessage)},
2180   {&handle_client_send, NULL,
2181    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2182   {&handle_client_request_connect, NULL,
2183    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2184    sizeof (struct ConnectMessage)},
2185   {NULL, NULL, 0, 0}
2186 };
2187
2188
2189 /**
2190  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2191  * the neighbour's struct and retry send_key.  Or, if we did not get a
2192  * HELLO, just do nothing.
2193  *
2194  * @param cls the 'struct Neighbour' to retry sending the key for
2195  * @param peer the peer for which this is the HELLO
2196  * @param hello HELLO message of that peer
2197  * @param trust amount of trust we currently have in that peer
2198  */
2199 static void
2200 process_hello_retry_send_key (void *cls,
2201                               const struct GNUNET_PeerIdentity *peer,
2202                               const struct GNUNET_HELLO_Message *hello,
2203                               uint32_t trust)
2204 {
2205   struct Neighbour *n = cls;
2206
2207   if (peer == NULL)
2208     {
2209 #if DEBUG_CORE
2210       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2211                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2212 #endif
2213       n->pitr = NULL;
2214       if (n->public_key != NULL)
2215         {
2216           send_key (n);
2217         }
2218       else
2219         {
2220           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2221             n->retry_set_key_task
2222               = GNUNET_SCHEDULER_add_delayed (sched,
2223                                               n->set_key_retry_frequency,
2224                                               &set_key_retry_task, n);
2225         }
2226       return;
2227     }
2228
2229 #if DEBUG_CORE
2230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2231               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2232               GNUNET_i2s (peer));
2233 #endif
2234   if (n->public_key != NULL)
2235     {
2236 #if DEBUG_CORE
2237       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2238               "already have public key for peer %s!! (so why are we here?)\n",
2239               GNUNET_i2s (peer));
2240 #endif
2241       return;
2242     }
2243
2244 #if DEBUG_CORE
2245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2246               "Received new `%s' message for `%4s', initiating key exchange.\n",
2247               "HELLO",
2248               GNUNET_i2s (peer));
2249 #endif
2250   n->public_key =
2251     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2252   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2253     {
2254       GNUNET_free (n->public_key);
2255       n->public_key = NULL;
2256 #if DEBUG_CORE
2257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258               "GNUNET_HELLO_get_key returned awfully\n");
2259 #endif
2260       return;
2261     }
2262 }
2263
2264
2265 /**
2266  * Send our key (and encrypted PING) to the other peer.
2267  *
2268  * @param n the other peer
2269  */
2270 static void
2271 send_key (struct Neighbour *n)
2272 {
2273   struct SetKeyMessage *sm;
2274   struct MessageEntry *me;
2275   struct PingMessage pp;
2276   struct PingMessage *pm;
2277
2278   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2279        (n->pitr != NULL) )
2280     {
2281 #if DEBUG_CORE
2282       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2283                   "Key exchange in progress with `%4s'.\n",
2284                   GNUNET_i2s (&n->peer));
2285 #endif
2286       return; /* already in progress */
2287     }
2288
2289 #if DEBUG_CORE
2290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291               "Asked to perform key exchange with `%4s'.\n",
2292               GNUNET_i2s (&n->peer));
2293 #endif
2294   if (n->public_key == NULL)
2295     {
2296       /* lookup n's public key, then try again */
2297 #if DEBUG_CORE
2298       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2299                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2300                   GNUNET_i2s (&n->peer));
2301 #endif
2302       GNUNET_assert (n->pitr == NULL);
2303       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2304                                          sched,
2305                                          &n->peer,
2306                                          0,
2307                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2308                                          &process_hello_retry_send_key, n);
2309       return;
2310     }
2311   /* first, set key message */
2312   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2313                       sizeof (struct SetKeyMessage));
2314   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2315   me->priority = SET_KEY_PRIORITY;
2316   me->size = sizeof (struct SetKeyMessage);
2317   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2318                                      n->encrypted_tail,
2319                                      n->encrypted_tail,
2320                                      me);
2321   sm = (struct SetKeyMessage *) &me[1];
2322   sm->header.size = htons (sizeof (struct SetKeyMessage));
2323   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2324   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2325                                         PEER_STATE_KEY_SENT : n->status));
2326   sm->purpose.size =
2327     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2328            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2329            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2330            sizeof (struct GNUNET_PeerIdentity));
2331   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2332   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2333   sm->target = n->peer;
2334   GNUNET_assert (GNUNET_OK ==
2335                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2336                                             sizeof (struct
2337                                                     GNUNET_CRYPTO_AesSessionKey),
2338                                             n->public_key,
2339                                             &sm->encrypted_key));
2340   GNUNET_assert (GNUNET_OK ==
2341                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2342                                          &sm->signature));
2343
2344   /* second, encrypted PING message */
2345   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2346                       sizeof (struct PingMessage));
2347   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2348   me->priority = PING_PRIORITY;
2349   me->size = sizeof (struct PingMessage);
2350   n->encrypted_tail->next = me;
2351   n->encrypted_tail = me;
2352   pm = (struct PingMessage *) &me[1];
2353   pm->header.size = htons (sizeof (struct PingMessage));
2354   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2355   pp.challenge = htonl (n->ping_challenge);
2356   pp.target = n->peer;
2357 #if DEBUG_CORE
2358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2359               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2360               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2363               "PING",
2364               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2365 #endif
2366   do_encrypt (n,
2367               &n->peer.hashPubKey,
2368               &pp.challenge,
2369               &pm->challenge,
2370               sizeof (struct PingMessage) -
2371               sizeof (struct GNUNET_MessageHeader));
2372   /* update status */
2373   switch (n->status)
2374     {
2375     case PEER_STATE_DOWN:
2376       n->status = PEER_STATE_KEY_SENT;
2377       break;
2378     case PEER_STATE_KEY_SENT:
2379       break;
2380     case PEER_STATE_KEY_RECEIVED:
2381       break;
2382     case PEER_STATE_KEY_CONFIRMED:
2383       break;
2384     default:
2385       GNUNET_break (0);
2386       break;
2387     }
2388 #if DEBUG_CORE
2389   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2390               "Have %llu ms left for `%s' transmission.\n",
2391               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2392               "SET_KEY");
2393 #endif
2394   /* trigger queue processing */
2395   process_encrypted_neighbour_queue (n);
2396   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2397        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2398     n->retry_set_key_task
2399       = GNUNET_SCHEDULER_add_delayed (sched,
2400                                       n->set_key_retry_frequency,
2401                                       &set_key_retry_task, n);    
2402 }
2403
2404
2405 /**
2406  * We received a SET_KEY message.  Validate and update
2407  * our key material and status.
2408  *
2409  * @param n the neighbour from which we received message m
2410  * @param m the set key message we received
2411  */
2412 static void
2413 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2414
2415
2416 /**
2417  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2418  * the neighbour's struct and retry handling the set_key message.  Or,
2419  * if we did not get a HELLO, just free the set key message.
2420  *
2421  * @param cls pointer to the set key message
2422  * @param peer the peer for which this is the HELLO
2423  * @param hello HELLO message of that peer
2424  * @param trust amount of trust we currently have in that peer
2425  */
2426 static void
2427 process_hello_retry_handle_set_key (void *cls,
2428                                     const struct GNUNET_PeerIdentity *peer,
2429                                     const struct GNUNET_HELLO_Message *hello,
2430                                     uint32_t trust)
2431 {
2432   struct Neighbour *n = cls;
2433   struct SetKeyMessage *sm = n->skm;
2434
2435   if (peer == NULL)
2436     {
2437       GNUNET_free (sm);
2438       n->skm = NULL;
2439       n->pitr = NULL;
2440       return;
2441     }
2442   if (n->public_key != NULL)
2443     return;                     /* multiple HELLOs match!? */
2444   n->public_key =
2445     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2446   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2447     {
2448       GNUNET_break_op (0);
2449       GNUNET_free (n->public_key);
2450       n->public_key = NULL;
2451       return;
2452     }
2453 #if DEBUG_CORE
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2456               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2457 #endif
2458   handle_set_key (n, sm);
2459 }
2460
2461
2462 /**
2463  * We received a PING message.  Validate and transmit
2464  * PONG.
2465  *
2466  * @param n sender of the PING
2467  * @param m the encrypted PING message itself
2468  */
2469 static void
2470 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2471 {
2472   struct PingMessage t;
2473   struct PongMessage tx;
2474   struct PongMessage *tp;
2475   struct MessageEntry *me;
2476
2477 #if DEBUG_CORE
2478   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2479               "Core service receives `%s' request from `%4s'.\n",
2480               "PING", GNUNET_i2s (&n->peer));
2481 #endif
2482   if (GNUNET_OK !=
2483       do_decrypt (n,
2484                   &my_identity.hashPubKey,
2485                   &m->challenge,
2486                   &t.challenge,
2487                   sizeof (struct PingMessage) -
2488                   sizeof (struct GNUNET_MessageHeader)))
2489     return;
2490 #if DEBUG_CORE
2491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2492               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2493               "PING",
2494               GNUNET_i2s (&t.target),
2495               ntohl (t.challenge), n->decrypt_key.crc32);
2496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2497               "Target of `%s' request is `%4s'.\n",
2498               "PING", GNUNET_i2s (&t.target));
2499 #endif
2500   if (0 != memcmp (&t.target,
2501                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2502     {
2503       GNUNET_break_op (0);
2504       return;
2505     }
2506   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2507                       sizeof (struct PongMessage));
2508   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2509                                      n->encrypted_tail,
2510                                      n->encrypted_tail,
2511                                      me);
2512   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2513   me->priority = PONG_PRIORITY;
2514   me->size = sizeof (struct PongMessage);
2515   tx.reserved = htonl (0);
2516   tx.inbound_bw_limit = n->bw_in;
2517   tx.challenge = t.challenge;
2518   tx.target = t.target;
2519   tp = (struct PongMessage *) &me[1];
2520   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2521   tp->header.size = htons (sizeof (struct PongMessage));
2522   do_encrypt (n,
2523               &my_identity.hashPubKey,
2524               &tx.challenge,
2525               &tp->challenge,
2526               sizeof (struct PongMessage) -
2527               sizeof (struct GNUNET_MessageHeader));
2528 #if DEBUG_CORE
2529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2530               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2531               ntohl (t.challenge), n->encrypt_key.crc32);
2532 #endif
2533   /* trigger queue processing */
2534   process_encrypted_neighbour_queue (n);
2535 }
2536
2537
2538 /**
2539  * We received a PONG message.  Validate and update our status.
2540  *
2541  * @param n sender of the PONG
2542  * @param m the encrypted PONG message itself
2543  */
2544 static void
2545 handle_pong (struct Neighbour *n, 
2546              const struct PongMessage *m)
2547 {
2548   struct PongMessage t;
2549   struct ConnectNotifyMessage cnm;
2550
2551 #if DEBUG_CORE
2552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2553               "Core service receives `%s' request from `%4s'.\n",
2554               "PONG", GNUNET_i2s (&n->peer));
2555 #endif
2556   if (GNUNET_OK !=
2557       do_decrypt (n,
2558                   &n->peer.hashPubKey,
2559                   &m->challenge,
2560                   &t.challenge,
2561                   sizeof (struct PongMessage) -
2562                   sizeof (struct GNUNET_MessageHeader)))
2563     return;
2564   if (0 != ntohl (t.reserved))
2565     {
2566       GNUNET_break_op (0);
2567       return;
2568     }
2569 #if DEBUG_CORE
2570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2571               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2572               "PONG",
2573               GNUNET_i2s (&t.target),
2574               ntohl (t.challenge), n->decrypt_key.crc32);
2575 #endif
2576   if ((0 != memcmp (&t.target,
2577                     &n->peer,
2578                     sizeof (struct GNUNET_PeerIdentity))) ||
2579       (n->ping_challenge != ntohl (t.challenge)))
2580     {
2581       /* PONG malformed */
2582 #if DEBUG_CORE
2583       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2584                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2585                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2586       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2588                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2589 #endif
2590       GNUNET_break_op (0);
2591       return;
2592     }
2593   switch (n->status)
2594     {
2595     case PEER_STATE_DOWN:
2596       GNUNET_break (0);         /* should be impossible */
2597       return;
2598     case PEER_STATE_KEY_SENT:
2599       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2600       return;
2601     case PEER_STATE_KEY_RECEIVED:
2602       n->status = PEER_STATE_KEY_CONFIRMED;
2603       n->bw_out_external_limit = t.inbound_bw_limit;
2604       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2605                                               n->bw_out_internal_limit);
2606       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2607                                              n->bw_out);
2608 #if DEBUG_CORE
2609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610                   "Confirmed key via `%s' message for peer `%4s'\n",
2611                   "PONG", GNUNET_i2s (&n->peer));
2612 #endif
2613       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2614         {
2615           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2616           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2617         }      
2618       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2619       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2620       cnm.distance = htonl (n->last_distance);
2621       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2622       cnm.peer = n->peer;
2623       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2624       process_encrypted_neighbour_queue (n);
2625       break;
2626     case PEER_STATE_KEY_CONFIRMED:
2627       /* duplicate PONG? */
2628       break;
2629     default:
2630       GNUNET_break (0);
2631       break;
2632     }
2633 }
2634
2635
2636 /**
2637  * We received a SET_KEY message.  Validate and update
2638  * our key material and status.
2639  *
2640  * @param n the neighbour from which we received message m
2641  * @param m the set key message we received
2642  */
2643 static void
2644 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2645 {
2646   struct SetKeyMessage *m_cpy;
2647   struct GNUNET_TIME_Absolute t;
2648   struct GNUNET_CRYPTO_AesSessionKey k;
2649   struct PingMessage *ping;
2650   struct PongMessage *pong;
2651   enum PeerStateMachine sender_status;
2652
2653 #if DEBUG_CORE
2654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2655               "Core service receives `%s' request from `%4s'.\n",
2656               "SET_KEY", GNUNET_i2s (&n->peer));
2657 #endif
2658   if (n->public_key == NULL)
2659     {
2660       if (n->pitr != NULL)
2661         {
2662 #if DEBUG_CORE
2663           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2664                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2665                       "SET_KEY");
2666 #endif
2667           return;
2668         }
2669 #if DEBUG_CORE
2670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2671                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2672 #endif
2673       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2674       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2675       /* lookup n's public key, then try again */
2676       GNUNET_assert (n->skm == NULL);
2677       n->skm = m_cpy;
2678       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2679                                          sched,
2680                                          &n->peer,
2681                                          0,
2682                                          GNUNET_TIME_UNIT_MINUTES,
2683                                          &process_hello_retry_handle_set_key, n);
2684       return;
2685     }
2686   if (0 != memcmp (&m->target,
2687                    &my_identity,
2688                    sizeof (struct GNUNET_PeerIdentity)))
2689     {
2690       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2691                   _("Received `%s' message that was not for me.  Ignoring.\n"),
2692                   "SET_KEY");
2693       return;
2694     }
2695   if ((ntohl (m->purpose.size) !=
2696        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2697        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2698        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2699        sizeof (struct GNUNET_PeerIdentity)) ||
2700       (GNUNET_OK !=
2701        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2702                                  &m->purpose, &m->signature, n->public_key)))
2703     {
2704       /* invalid signature */
2705       GNUNET_break_op (0);
2706       return;
2707     }
2708   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2709   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2710        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2711       (t.value < n->decrypt_key_created.value))
2712     {
2713       /* this could rarely happen due to massive re-ordering of
2714          messages on the network level, but is most likely either
2715          a bug or some adversary messing with us.  Report. */
2716       GNUNET_break_op (0);
2717       return;
2718     }
2719 #if DEBUG_CORE
2720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2721 #endif  
2722   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2723                                   &m->encrypted_key,
2724                                   &k,
2725                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2726        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2727       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2728     {
2729       /* failed to decrypt !? */
2730       GNUNET_break_op (0);
2731       return;
2732     }
2733
2734   n->decrypt_key = k;
2735   if (n->decrypt_key_created.value != t.value)
2736     {
2737       /* fresh key, reset sequence numbers */
2738       n->last_sequence_number_received = 0;
2739       n->last_packets_bitmap = 0;
2740       n->decrypt_key_created = t;
2741     }
2742   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2743   switch (n->status)
2744     {
2745     case PEER_STATE_DOWN:
2746       n->status = PEER_STATE_KEY_RECEIVED;
2747 #if DEBUG_CORE
2748       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2749                   "Responding to `%s' with my own key.\n", "SET_KEY");
2750 #endif
2751       send_key (n);
2752       break;
2753     case PEER_STATE_KEY_SENT:
2754     case PEER_STATE_KEY_RECEIVED:
2755       n->status = PEER_STATE_KEY_RECEIVED;
2756       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2757           (sender_status != PEER_STATE_KEY_CONFIRMED))
2758         {
2759 #if DEBUG_CORE
2760           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2761                       "Responding to `%s' with my own key (other peer has status %u).\n",
2762                       "SET_KEY", sender_status);
2763 #endif
2764           send_key (n);
2765         }
2766       break;
2767     case PEER_STATE_KEY_CONFIRMED:
2768       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2769           (sender_status != PEER_STATE_KEY_CONFIRMED))
2770         {         
2771 #if DEBUG_CORE
2772           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2774                       "SET_KEY", sender_status);
2775 #endif
2776           send_key (n);
2777         }
2778       break;
2779     default:
2780       GNUNET_break (0);
2781       break;
2782     }
2783   if (n->pending_ping != NULL)
2784     {
2785       ping = n->pending_ping;
2786       n->pending_ping = NULL;
2787       handle_ping (n, ping);
2788       GNUNET_free (ping);
2789     }
2790   if (n->pending_pong != NULL)
2791     {
2792       pong = n->pending_pong;
2793       n->pending_pong = NULL;
2794       handle_pong (n, pong);
2795       GNUNET_free (pong);
2796     }
2797 }
2798
2799
2800 /**
2801  * Send a P2P message to a client.
2802  *
2803  * @param sender who sent us the message?
2804  * @param client who should we give the message to?
2805  * @param m contains the message to transmit
2806  * @param msize number of bytes in buf to transmit
2807  */
2808 static void
2809 send_p2p_message_to_client (struct Neighbour *sender,
2810                             struct Client *client,
2811                             const void *m, size_t msize)
2812 {
2813   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2814   struct NotifyTrafficMessage *ntm;
2815
2816 #if DEBUG_CORE
2817   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2818               "Core service passes message from `%4s' of type %u to client.\n",
2819               GNUNET_i2s(&sender->peer),
2820               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2821 #endif
2822   ntm = (struct NotifyTrafficMessage *) buf;
2823   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2824   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2825   ntm->distance = htonl (sender->last_distance);
2826   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2827   ntm->peer = sender->peer;
2828   memcpy (&ntm[1], m, msize);
2829   send_to_client (client, &ntm->header, GNUNET_YES);
2830 }
2831
2832
2833 /**
2834  * Deliver P2P message to interested clients.
2835  *
2836  * @param sender who sent us the message?
2837  * @param m the message
2838  * @param msize size of the message (including header)
2839  */
2840 static void
2841 deliver_message (struct Neighbour *sender,
2842                  const struct GNUNET_MessageHeader *m, size_t msize)
2843 {
2844   struct Client *cpos;
2845   uint16_t type;
2846   unsigned int tpos;
2847   int deliver_full;
2848   int dropped;
2849
2850   type = ntohs (m->type);
2851 #if DEBUG_CORE
2852   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2853               "Received encapsulated message of type %u from `%4s'\n",
2854               type,
2855               GNUNET_i2s (&sender->peer));
2856 #endif
2857   dropped = GNUNET_YES;
2858   cpos = clients;
2859   while (cpos != NULL)
2860     {
2861       deliver_full = GNUNET_NO;
2862       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2863         deliver_full = GNUNET_YES;
2864       else
2865         {
2866           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2867             {
2868               if (type != cpos->types[tpos])
2869                 continue;
2870               deliver_full = GNUNET_YES;
2871               break;
2872             }
2873         }
2874       if (GNUNET_YES == deliver_full)
2875         {
2876           send_p2p_message_to_client (sender, cpos, m, msize);
2877           dropped = GNUNET_NO;
2878         }
2879       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2880         {
2881           send_p2p_message_to_client (sender, cpos, m,
2882                                       sizeof (struct GNUNET_MessageHeader));
2883         }
2884       cpos = cpos->next;
2885     }
2886   if (dropped == GNUNET_YES)
2887     {
2888 #if DEBUG_CORE
2889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2890                   "Message of type %u from `%4s' not delivered to any client.\n",
2891                   type,
2892                   GNUNET_i2s (&sender->peer));
2893 #endif
2894       /* FIXME: stats... */
2895     }
2896 }
2897
2898
2899 /**
2900  * Align P2P message and then deliver to interested clients.
2901  *
2902  * @param sender who sent us the message?
2903  * @param buffer unaligned (!) buffer containing message
2904  * @param msize size of the message (including header)
2905  */
2906 static void
2907 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2908 {
2909   char abuf[msize];
2910
2911   /* TODO: call to statistics? */
2912   memcpy (abuf, buffer, msize);
2913   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2914 }
2915
2916
2917 /**
2918  * Deliver P2P messages to interested clients.
2919  *
2920  * @param sender who sent us the message?
2921  * @param buffer buffer containing messages, can be modified
2922  * @param buffer_size size of the buffer (overall)
2923  * @param offset offset where messages in the buffer start
2924  */
2925 static void
2926 deliver_messages (struct Neighbour *sender,
2927                   const char *buffer, size_t buffer_size, size_t offset)
2928 {
2929   struct GNUNET_MessageHeader *mhp;
2930   struct GNUNET_MessageHeader mh;
2931   uint16_t msize;
2932   int need_align;
2933
2934   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2935     {
2936       if (0 != offset % sizeof (uint16_t))
2937         {
2938           /* outch, need to copy to access header */
2939           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2940           mhp = &mh;
2941         }
2942       else
2943         {
2944           /* can access header directly */
2945           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2946         }
2947       msize = ntohs (mhp->size);
2948       if (msize + offset > buffer_size)
2949         {
2950           /* malformed message, header says it is larger than what
2951              would fit into the overall buffer */
2952           GNUNET_break_op (0);
2953           break;
2954         }
2955 #if HAVE_UNALIGNED_64_ACCESS
2956       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2957 #else
2958       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2959 #endif
2960       if (GNUNET_YES == need_align)
2961         align_and_deliver (sender, &buffer[offset], msize);
2962       else
2963         deliver_message (sender,
2964                          (const struct GNUNET_MessageHeader *)
2965                          &buffer[offset], msize);
2966       offset += msize;
2967     }
2968 }
2969
2970
2971 /**
2972  * We received an encrypted message.  Decrypt, validate and
2973  * pass on to the appropriate clients.
2974  */
2975 static void
2976 handle_encrypted_message (struct Neighbour *n,
2977                           const struct EncryptedMessage *m)
2978 {
2979   size_t size = ntohs (m->header.size);
2980   char buf[size];
2981   struct EncryptedMessage *pt;  /* plaintext */
2982   GNUNET_HashCode ph;
2983   size_t off;
2984   uint32_t snum;
2985   struct GNUNET_TIME_Absolute t;
2986   GNUNET_HashCode iv;
2987
2988 #if DEBUG_CORE
2989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2990               "Core service receives `%s' request from `%4s'.\n",
2991               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2992 #endif  
2993   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
2994   /* decrypt */
2995   if (GNUNET_OK !=
2996       do_decrypt (n,
2997                   &iv,
2998                   &m->plaintext_hash,
2999                   &buf[ENCRYPTED_HEADER_SIZE], 
3000                   size - ENCRYPTED_HEADER_SIZE))
3001     return;
3002   pt = (struct EncryptedMessage *) buf;
3003
3004   /* validate hash */
3005   GNUNET_CRYPTO_hash (&pt->sequence_number,
3006                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3007   if (0 != memcmp (&ph, 
3008                    &pt->plaintext_hash, 
3009                    sizeof (GNUNET_HashCode)))
3010     {
3011       /* checksum failed */
3012       GNUNET_break_op (0);
3013       return;
3014     }
3015
3016   /* validate sequence number */
3017   snum = ntohl (pt->sequence_number);
3018   if (n->last_sequence_number_received == snum)
3019     {
3020       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3021                   "Received duplicate message, ignoring.\n");
3022       /* duplicate, ignore */
3023       return;
3024     }
3025   if ((n->last_sequence_number_received > snum) &&
3026       (n->last_sequence_number_received - snum > 32))
3027     {
3028       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3029                   "Received ancient out of sequence message, ignoring.\n");
3030       /* ancient out of sequence, ignore */
3031       return;
3032     }
3033   if (n->last_sequence_number_received > snum)
3034     {
3035       unsigned int rotbit =
3036         1 << (n->last_sequence_number_received - snum - 1);
3037       if ((n->last_packets_bitmap & rotbit) != 0)
3038         {
3039           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3040                       "Received duplicate message, ignoring.\n");
3041           /* duplicate, ignore */
3042           return;
3043         }
3044       n->last_packets_bitmap |= rotbit;
3045     }
3046   if (n->last_sequence_number_received < snum)
3047     {
3048       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3049       n->last_sequence_number_received = snum;
3050     }
3051
3052   /* check timestamp */
3053   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3054   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3055     {
3056       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3057                   _
3058                   ("Message received far too old (%llu ms). Content ignored.\n"),
3059                   GNUNET_TIME_absolute_get_duration (t).value);
3060       return;
3061     }
3062
3063   /* process decrypted message(s) */
3064 #if DEBUG_CORE_QUOTA
3065   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3066     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3067                 "Received %u b/s as new inbound limit for peer `%4s'\n",
3068                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3069                 GNUNET_i2s (&n->peer));
3070 #endif
3071   n->bw_out_external_limit = pt->inbound_bw_limit;
3072   n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3073                                           n->bw_out_internal_limit);
3074   GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3075                                          n->bw_out);
3076   n->last_activity = GNUNET_TIME_absolute_get ();
3077   off = sizeof (struct EncryptedMessage);
3078   deliver_messages (n, buf, size, off);
3079 }
3080
3081
3082 /**
3083  * Function called by the transport for each received message.
3084  *
3085  * @param cls closure
3086  * @param peer (claimed) identity of the other peer
3087  * @param message the message
3088  * @param latency estimated latency for communicating with the
3089  *             given peer (round-trip)
3090  * @param distance in overlay hops, as given by transport plugin
3091  */
3092 static void
3093 handle_transport_receive (void *cls,
3094                           const struct GNUNET_PeerIdentity *peer,
3095                           const struct GNUNET_MessageHeader *message,
3096                           struct GNUNET_TIME_Relative latency,
3097                           unsigned int distance)
3098 {
3099   struct Neighbour *n;
3100   struct GNUNET_TIME_Absolute now;
3101   int up;
3102   uint16_t type;
3103   uint16_t size;
3104
3105 #if DEBUG_CORE
3106   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3107               "Received message of type %u from `%4s', demultiplexing.\n",
3108               ntohs (message->type), GNUNET_i2s (peer));
3109 #endif
3110   n = find_neighbour (peer);
3111   if (n == NULL)
3112     n = create_neighbour (peer);
3113   if (n == NULL)
3114     return;   
3115   n->last_latency = latency;
3116   n->last_distance = distance;
3117   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3118   type = ntohs (message->type);
3119   size = ntohs (message->size);
3120 #if DEBUG_HANDSHAKE
3121   fprintf (stderr,
3122            "Received message of type %u from `%4s'\n",
3123            type,
3124            GNUNET_i2s (peer));
3125 #endif
3126   switch (type)
3127     {
3128     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3129       if (size != sizeof (struct SetKeyMessage))
3130         {
3131           GNUNET_break_op (0);
3132           return;
3133         }
3134       handle_set_key (n, (const struct SetKeyMessage *) message);
3135       break;
3136     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3137       if (size < sizeof (struct EncryptedMessage) +
3138           sizeof (struct GNUNET_MessageHeader))
3139         {
3140           GNUNET_break_op (0);
3141           return;
3142         }
3143       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3144           (n->status != PEER_STATE_KEY_CONFIRMED))
3145         {
3146           GNUNET_break_op (0);
3147           return;
3148         }
3149       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3150       break;
3151     case GNUNET_MESSAGE_TYPE_CORE_PING:
3152       if (size != sizeof (struct PingMessage))
3153         {
3154           GNUNET_break_op (0);
3155           return;
3156         }
3157       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3158           (n->status != PEER_STATE_KEY_CONFIRMED))
3159         {
3160 #if DEBUG_CORE
3161           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3162                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3163                       "PING", GNUNET_i2s (&n->peer));
3164 #endif
3165           GNUNET_free_non_null (n->pending_ping);
3166           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3167           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3168           return;
3169         }
3170       handle_ping (n, (const struct PingMessage *) message);
3171       break;
3172     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3173       if (size != sizeof (struct PongMessage))
3174         {
3175           GNUNET_break_op (0);
3176           return;
3177         }
3178       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3179            (n->status != PEER_STATE_KEY_CONFIRMED) )
3180         {
3181 #if DEBUG_CORE
3182           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3183                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3184                       "PONG", GNUNET_i2s (&n->peer));
3185 #endif
3186           GNUNET_free_non_null (n->pending_pong);
3187           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3188           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3189           return;
3190         }
3191       handle_pong (n, (const struct PongMessage *) message);
3192       break;
3193     default:
3194       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3195                   _("Unsupported message of type %u received.\n"), type);
3196       return;
3197     }
3198   if (n->status == PEER_STATE_KEY_CONFIRMED)
3199     {
3200       now = GNUNET_TIME_absolute_get ();
3201       n->last_activity = now;
3202       if (!up)
3203         n->time_established = now;
3204     }
3205 }
3206
3207
3208 /**
3209  * Function that recalculates the bandwidth quota for the
3210  * given neighbour and transmits it to the transport service.
3211  * 
3212  * @param cls neighbour for the quota update
3213  * @param tc context
3214  */
3215 static void
3216 neighbour_quota_update (void *cls,
3217                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3218 {
3219   struct Neighbour *n = cls;
3220   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3221   double pref_rel;
3222   double share;
3223   unsigned long long distributable;
3224   uint64_t need_per_peer;
3225   uint64_t need_per_second;
3226   
3227   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3228   /* calculate relative preference among all neighbours;
3229      divides by a bit more to avoid division by zero AND to
3230      account for possibility of new neighbours joining any time 
3231      AND to convert to double... */
3232   if (preference_sum == 0)
3233     {
3234       pref_rel = 1.0 / (double) neighbour_count;
3235     }
3236   else
3237     {
3238       pref_rel = n->current_preference / preference_sum;
3239     }
3240   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3241                                                               GNUNET_TIME_UNIT_SECONDS);  
3242   need_per_second = need_per_peer * neighbour_count;
3243   distributable = 0;
3244   if (bandwidth_target_out_bps > need_per_second)
3245     distributable = bandwidth_target_out_bps - need_per_second;
3246   share = distributable * pref_rel;
3247   if (share + need_per_peer > ( (uint32_t)-1))
3248     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3249   else
3250     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3251   /* check if we want to disconnect for good due to inactivity */
3252   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3253        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3254     {
3255 #if DEBUG_CORE
3256       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3257                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3258                   GNUNET_i2s (&n->peer));
3259 #endif
3260       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3261     }
3262 #if DEBUG_CORE_QUOTA
3263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3264               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3265               GNUNET_i2s (&n->peer),
3266               (unsigned int) ntohl (q_in.value__),
3267               bandwidth_target_out_bps,
3268               (unsigned int) ntohl (n->bw_in.value__),
3269               (unsigned int) ntohl (n->bw_out.value__),
3270               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3271 #endif
3272   if (n->bw_in.value__ != q_in.value__) 
3273     {
3274       n->bw_in = q_in;
3275       GNUNET_TRANSPORT_set_quota (transport,
3276                                   &n->peer,
3277                                   n->bw_in,
3278                                   n->bw_out,
3279                                   GNUNET_TIME_UNIT_FOREVER_REL,
3280                                   NULL, NULL);
3281     }
3282   schedule_quota_update (n);
3283 }
3284
3285
3286 /**
3287  * Function called by transport to notify us that
3288  * a peer connected to us (on the network level).
3289  *
3290  * @param cls closure
3291  * @param peer the peer that connected
3292  * @param latency current latency of the connection
3293  * @param distance in overlay hops, as given by transport plugin
3294  */
3295 static void
3296 handle_transport_notify_connect (void *cls,
3297                                  const struct GNUNET_PeerIdentity *peer,
3298                                  struct GNUNET_TIME_Relative latency,
3299                                  unsigned int distance)
3300 {
3301   struct Neighbour *n;
3302   struct GNUNET_TIME_Absolute now;
3303   struct ConnectNotifyMessage cnm;
3304
3305   n = find_neighbour (peer);
3306   if (n != NULL)
3307     {
3308       if (n->is_connected)
3309         {
3310           /* duplicate connect notification!? */
3311           GNUNET_break (0);
3312           return;
3313         }
3314     }
3315   else
3316     {
3317       n = create_neighbour (peer);
3318     }
3319   now = GNUNET_TIME_absolute_get ();
3320   n->is_connected = GNUNET_YES;      
3321   n->last_latency = latency;
3322   n->last_distance = distance;
3323   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3324                                  n->bw_out,
3325                                  MAX_WINDOW_TIME_S);
3326   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3327                                  n->bw_in,
3328                                  MAX_WINDOW_TIME_S);  
3329 #if DEBUG_CORE
3330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3331               "Received connection from `%4s'.\n",
3332               GNUNET_i2s (&n->peer));
3333 #endif
3334   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3335   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3336   cnm.distance = htonl (n->last_distance);
3337   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3338   cnm.peer = *peer;
3339   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3340   send_key (n);
3341 }
3342
3343
3344 /**
3345  * Function called by transport telling us that a peer
3346  * disconnected.
3347  *
3348  * @param cls closure
3349  * @param peer the peer that disconnected
3350  */
3351 static void
3352 handle_transport_notify_disconnect (void *cls,
3353                                     const struct GNUNET_PeerIdentity *peer)
3354 {
3355   struct DisconnectNotifyMessage cnm;
3356   struct Neighbour *n;
3357
3358 #if DEBUG_CORE
3359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3360               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3361 #endif
3362   n = find_neighbour (peer);
3363   if (n == NULL)
3364     {
3365       GNUNET_break (0);
3366       return;
3367     }
3368   GNUNET_break (n->is_connected);
3369   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3370   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3371   cnm.peer = *peer;
3372   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3373   n->is_connected = GNUNET_NO;
3374 }
3375
3376
3377 /**
3378  * Last task run during shutdown.  Disconnects us from
3379  * the transport.
3380  */
3381 static void
3382 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3383 {
3384   struct Neighbour *n;
3385   struct Client *c;
3386
3387 #if DEBUG_CORE
3388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3389               "Core service shutting down.\n");
3390 #endif
3391   GNUNET_assert (transport != NULL);
3392   GNUNET_TRANSPORT_disconnect (transport);
3393   transport = NULL;
3394   while (NULL != (n = neighbours))
3395     {
3396       neighbours = n->next;
3397       GNUNET_assert (neighbour_count > 0);
3398       neighbour_count--;
3399       free_neighbour (n);
3400     }
3401   GNUNET_SERVER_notification_context_destroy (notifier);
3402   notifier = NULL;
3403   while (NULL != (c = clients))
3404     handle_client_disconnect (NULL, c->client_handle);
3405   if (my_private_key != NULL)
3406     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3407 }
3408
3409
3410 /**
3411  * Initiate core service.
3412  *
3413  * @param cls closure
3414  * @param s scheduler to use
3415  * @param serv the initialized server
3416  * @param c configuration to use
3417  */
3418 static void
3419 run (void *cls,
3420      struct GNUNET_SCHEDULER_Handle *s,
3421      struct GNUNET_SERVER_Handle *serv,
3422      const struct GNUNET_CONFIGURATION_Handle *c)
3423 {
3424   char *keyfile;
3425
3426   sched = s;
3427   cfg = c;  
3428   /* parse configuration */
3429   if (
3430        (GNUNET_OK !=
3431         GNUNET_CONFIGURATION_get_value_number (c,
3432                                                "CORE",
3433                                                "TOTAL_QUOTA_IN",
3434                                                &bandwidth_target_in_bps)) ||
3435        (GNUNET_OK !=
3436         GNUNET_CONFIGURATION_get_value_number (c,
3437                                                "CORE",
3438                                                "TOTAL_QUOTA_OUT",
3439                                                &bandwidth_target_out_bps)) ||
3440        (GNUNET_OK !=
3441         GNUNET_CONFIGURATION_get_value_filename (c,
3442                                                  "GNUNETD",
3443                                                  "HOSTKEY", &keyfile)))
3444     {
3445       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3446                   _
3447                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3448       GNUNET_SCHEDULER_shutdown (s);
3449       return;
3450     }
3451   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3452   GNUNET_free (keyfile);
3453   if (my_private_key == NULL)
3454     {
3455       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3456                   _("Core service could not access hostkey.  Exiting.\n"));
3457       GNUNET_SCHEDULER_shutdown (s);
3458       return;
3459     }
3460   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3461   GNUNET_CRYPTO_hash (&my_public_key,
3462                       sizeof (my_public_key), &my_identity.hashPubKey);
3463   /* setup notification */
3464   server = serv;
3465   notifier = GNUNET_SERVER_notification_context_create (server, 
3466                                                         MAX_NOTIFY_QUEUE);
3467   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3468   /* setup transport connection */
3469   transport = GNUNET_TRANSPORT_connect (sched,
3470                                         cfg,
3471                                         NULL,
3472                                         &handle_transport_receive,
3473                                         &handle_transport_notify_connect,
3474                                         &handle_transport_notify_disconnect);
3475   GNUNET_assert (NULL != transport);
3476   GNUNET_SCHEDULER_add_delayed (sched,
3477                                 GNUNET_TIME_UNIT_FOREVER_REL,
3478                                 &cleaning_task, NULL);
3479   /* process client requests */
3480   GNUNET_SERVER_add_handlers (server, handlers);
3481   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3482               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3483 }
3484
3485
3486
3487 /**
3488  * The main function for the transport service.
3489  *
3490  * @param argc number of arguments from the command line
3491  * @param argv command line arguments
3492  * @return 0 ok, 1 on error
3493  */
3494 int
3495 main (int argc, char *const *argv)
3496 {
3497   return (GNUNET_OK ==
3498           GNUNET_SERVICE_run (argc,
3499                               argv,
3500                               "core",
3501                               GNUNET_SERVICE_OPTION_NONE,
3502                               &run, NULL)) ? 0 : 1;
3503 }
3504
3505 /* end of gnunet-service-core.c */