nodeb
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 #define DEBUG_CORE_QUOTA GNUNET_NO
47
48 /**
49  * Receive and send buffer windows grow over time.  For
50  * how long can 'unused' bandwidth accumulate before we
51  * need to cap it?  (specified in seconds).
52  */
53 #define MAX_WINDOW_TIME_S (5 * 60)
54
55 /**
56  * How many messages do we queue up at most for optional
57  * notifications to a client?  (this can cause notifications
58  * about outgoing messages to be dropped).
59  */
60 #define MAX_NOTIFY_QUEUE 16
61
62 /**
63  * Minimum bandwidth (out) to assign to any connected peer.
64  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
65  * sense.
66  */
67 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * How long do we delay messages to get larger packet sizes (CORKing)?
80  */
81 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
82
83 /**
84  * What is the maximum delay for a SET_KEY message?
85  */
86 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
87
88 /**
89  * What how long do we wait for SET_KEY confirmation initially?
90  */
91 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
92
93 /**
94  * What is the maximum delay for a PING message?
95  */
96 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
97
98 /**
99  * What is the maximum delay for a PONG message?
100  */
101 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
102
103 /**
104  * How often do we recalculate bandwidth quotas?
105  */
106 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
107
108 /**
109  * What is the priority for a SET_KEY message?
110  */
111 #define SET_KEY_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PING message?
115  */
116 #define PING_PRIORITY 0xFFFFFF
117
118 /**
119  * What is the priority for a PONG message?
120  */
121 #define PONG_PRIORITY 0xFFFFFF
122
123 /**
124  * How many messages do we queue per peer at most?  Must be at
125  * least two.
126  */
127 #define MAX_PEER_QUEUE_SIZE 16
128
129 /**
130  * How many non-mandatory messages do we queue per client at most?
131  */
132 #define MAX_CLIENT_QUEUE_SIZE 32
133
134 /**
135  * What is the maximum age of a message for us to consider
136  * processing it?  Note that this looks at the timestamp used
137  * by the other peer, so clock skew between machines does
138  * come into play here.  So this should be picked high enough
139  * so that a little bit of clock skew does not prevent peers
140  * from connecting to us.
141  */
142 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
143
144 /**
145  * What is the maximum size for encrypted messages?  Note that this
146  * number imposes a clear limit on the maximum size of any message.
147  * Set to a value close to 64k but not so close that transports will
148  * have trouble with their headers.
149  */
150 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
151
152
153 /**
154  * State machine for our P2P encryption handshake.  Everyone starts in
155  * "DOWN", if we receive the other peer's key (other peer initiated)
156  * we start in state RECEIVED (since we will immediately send our
157  * own); otherwise we start in SENT.  If we get back a PONG from
158  * within either state, we move up to CONFIRMED (the PONG will always
159  * be sent back encrypted with the key we sent to the other peer).
160  */
161 enum PeerStateMachine
162 {
163   PEER_STATE_DOWN,
164   PEER_STATE_KEY_SENT,
165   PEER_STATE_KEY_RECEIVED,
166   PEER_STATE_KEY_CONFIRMED
167 };
168
169
170 /**
171  * Number of bytes (at the beginning) of "struct EncryptedMessage"
172  * that are NOT encrypted.
173  */
174 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
175
176
177 /**
178  * Encapsulation for encrypted messages exchanged between
179  * peers.  Followed by the actual encrypted data.
180  */
181 struct EncryptedMessage
182 {
183   /**
184    * Message type is either CORE_ENCRYPTED_MESSAGE.
185    */
186   struct GNUNET_MessageHeader header;
187
188   /**
189    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
190    * be set to the offset of the *next* field.
191    */
192   uint32_t iv_seed GNUNET_PACKED;
193
194   /**
195    * Hash of the plaintext (starting at 'sequence_number'), used to
196    * verify message integrity.  Everything after this hash (including
197    * this hash itself) will be encrypted.  
198    */
199   GNUNET_HashCode plaintext_hash;
200
201   /**
202    * Sequence number, in network byte order.  This field
203    * must be the first encrypted/decrypted field and the
204    * first byte that is hashed for the plaintext hash.
205    */
206   uint32_t sequence_number GNUNET_PACKED;
207
208   /**
209    * Desired bandwidth (how much we should send to this peer / how
210    * much is the sender willing to receive)?
211    */
212   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
213
214   /**
215    * Timestamp.  Used to prevent reply of ancient messages
216    * (recent messages are caught with the sequence number).
217    */
218   struct GNUNET_TIME_AbsoluteNBO timestamp;
219
220 };
221
222
223 /**
224  * We're sending an (encrypted) PING to the other peer to check if he
225  * can decrypt.  The other peer should respond with a PONG with the
226  * same content, except this time encrypted with the receiver's key.
227  */
228 struct PingMessage
229 {
230   /**
231    * Message type is CORE_PING.
232    */
233   struct GNUNET_MessageHeader header;
234
235   /**
236    * Random number chosen to make reply harder.
237    */
238   uint32_t challenge GNUNET_PACKED;
239
240   /**
241    * Intended target of the PING, used primarily to check
242    * that decryption actually worked.
243    */
244   struct GNUNET_PeerIdentity target;
245 };
246
247
248
249 /**
250  * Response to a PING.  Includes data from the original PING
251  * plus initial bandwidth quota information.
252  */
253 struct PongMessage
254 {
255   /**
256    * Message type is CORE_PONG.
257    */
258   struct GNUNET_MessageHeader header;
259
260   /**
261    * Random number proochosen to make reply harder.  Must be
262    * first field after header (this is where we start to encrypt!).
263    */
264   uint32_t challenge GNUNET_PACKED;
265
266   /**
267    * Must be zero.
268    */
269   uint32_t reserved GNUNET_PACKED;
270
271   /**
272    * Desired bandwidth (how much we should send to this
273    * peer / how much is the sender willing to receive).
274    */
275   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
276
277   /**
278    * Intended target of the PING, used primarily to check
279    * that decryption actually worked.
280    */
281   struct GNUNET_PeerIdentity target;
282 };
283
284
285 /**
286  * Message transmitted to set (or update) a session key.
287  */
288 struct SetKeyMessage
289 {
290
291   /**
292    * Message type is either CORE_SET_KEY.
293    */
294   struct GNUNET_MessageHeader header;
295
296   /**
297    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
298    */
299   int32_t sender_status GNUNET_PACKED;
300
301   /**
302    * Purpose of the signature, will be
303    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
304    */
305   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
306
307   /**
308    * At what time was this key created?
309    */
310   struct GNUNET_TIME_AbsoluteNBO creation_time;
311
312   /**
313    * The encrypted session key.
314    */
315   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
316
317   /**
318    * Who is the intended recipient?
319    */
320   struct GNUNET_PeerIdentity target;
321
322   /**
323    * Signature of the stuff above (starting at purpose).
324    */
325   struct GNUNET_CRYPTO_RsaSignature signature;
326
327 };
328
329
330 /**
331  * Message waiting for transmission. This struct
332  * is followed by the actual content of the message.
333  */
334 struct MessageEntry
335 {
336
337   /**
338    * We keep messages in a doubly linked list.
339    */
340   struct MessageEntry *next;
341
342   /**
343    * We keep messages in a doubly linked list.
344    */
345   struct MessageEntry *prev;
346
347   /**
348    * By when are we supposed to transmit this message?
349    */
350   struct GNUNET_TIME_Absolute deadline;
351
352   /**
353    * By when are we supposed to transmit this message (after
354    * giving slack)?
355    */
356   struct GNUNET_TIME_Absolute slack_deadline;
357
358   /**
359    * How important is this message to us?
360    */
361   unsigned int priority;
362
363   /**
364    * How long is the message? (number of bytes following
365    * the "struct MessageEntry", but not including the
366    * size of "struct MessageEntry" itself!)
367    */
368   uint16_t size;
369
370   /**
371    * Was this message selected for transmission in the
372    * current round? GNUNET_YES or GNUNET_NO.
373    */
374   int8_t do_transmit;
375
376   /**
377    * Did we give this message some slack (delayed sending) previously
378    * (and hence should not give it any more slack)? GNUNET_YES or
379    * GNUNET_NO.
380    */
381   int8_t got_slack;
382
383 };
384
385
386 struct Neighbour
387 {
388   /**
389    * We keep neighbours in a linked list (for now).
390    */
391   struct Neighbour *next;
392
393   /**
394    * Unencrypted messages destined for this peer.
395    */
396   struct MessageEntry *messages;
397
398   /**
399    * Head of the batched, encrypted message queue (already ordered,
400    * transmit starting with the head).
401    */
402   struct MessageEntry *encrypted_head;
403
404   /**
405    * Tail of the batched, encrypted message queue (already ordered,
406    * append new messages to tail)
407    */
408   struct MessageEntry *encrypted_tail;
409
410   /**
411    * Handle for pending requests for transmission to this peer
412    * with the transport service.  NULL if no request is pending.
413    */
414   struct GNUNET_TRANSPORT_TransmitHandle *th;
415
416   /**
417    * Public key of the neighbour, NULL if we don't have it yet.
418    */
419   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
420
421   /**
422    * We received a PING message before we got the "public_key"
423    * (or the SET_KEY).  We keep it here until we have a key
424    * to decrypt it.  NULL if no PING is pending.
425    */
426   struct PingMessage *pending_ping;
427
428   /**
429    * We received a PONG message before we got the "public_key"
430    * (or the SET_KEY).  We keep it here until we have a key
431    * to decrypt it.  NULL if no PONG is pending.
432    */
433   struct PongMessage *pending_pong;
434
435   /**
436    * Non-NULL if we are currently looking up HELLOs for this peer.
437    * for this peer.
438    */
439   struct GNUNET_PEERINFO_IteratorContext *pitr;
440
441   /**
442    * SetKeyMessage to transmit, NULL if we are not currently trying
443    * to send one.
444    */
445   struct SetKeyMessage *skm;
446
447   /**
448    * Identity of the neighbour.
449    */
450   struct GNUNET_PeerIdentity peer;
451
452   /**
453    * Key we use to encrypt our messages for the other peer
454    * (initialized by us when we do the handshake).
455    */
456   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
457
458   /**
459    * Key we use to decrypt messages from the other peer
460    * (given to us by the other peer during the handshake).
461    */
462   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
463
464   /**
465    * ID of task used for re-trying plaintext scheduling.
466    */
467   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
468
469   /**
470    * ID of task used for re-trying SET_KEY and PING message.
471    */
472   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
473
474   /**
475    * ID of task used for updating bandwidth quota for this neighbour.
476    */
477   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
478
479   /**
480    * ID of task used for cleaning up dead neighbour entries.
481    */
482   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
483
484   /**
485    * At what time did we generate our encryption key?
486    */
487   struct GNUNET_TIME_Absolute encrypt_key_created;
488
489   /**
490    * At what time did the other peer generate the decryption key?
491    */
492   struct GNUNET_TIME_Absolute decrypt_key_created;
493
494   /**
495    * At what time did we initially establish (as in, complete session
496    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
497    */
498   struct GNUNET_TIME_Absolute time_established;
499
500   /**
501    * At what time did we last receive an encrypted message from the
502    * other peer?  Should be zero if status != KEY_CONFIRMED.
503    */
504   struct GNUNET_TIME_Absolute last_activity;
505
506   /**
507    * Last latency observed from this peer.
508    */
509   struct GNUNET_TIME_Relative last_latency;
510
511   /**
512    * At what frequency are we currently re-trying SET_KEY messages?
513    */
514   struct GNUNET_TIME_Relative set_key_retry_frequency;
515
516   /**
517    * Tracking bandwidth for sending to this peer.
518    */
519   struct GNUNET_BANDWIDTH_Tracker available_send_window;
520
521   /**
522    * Tracking bandwidth for receiving from this peer.
523    */
524   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
525
526   /**
527    * How valueable were the messages of this peer recently?
528    */
529   unsigned long long current_preference;
530
531   /**
532    * Bit map indicating which of the 32 sequence numbers before the last
533    * were received (good for accepting out-of-order packets and
534    * estimating reliability of the connection)
535    */
536   unsigned int last_packets_bitmap;
537
538   /**
539    * last sequence number received on this connection (highest)
540    */
541   uint32_t last_sequence_number_received;
542
543   /**
544    * last sequence number transmitted
545    */
546   uint32_t last_sequence_number_sent;
547
548   /**
549    * Available bandwidth in for this peer (current target).
550    */
551   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
552
553   /**
554    * Available bandwidth out for this peer (current target).
555    */
556   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
557
558   /**
559    * Internal bandwidth limit set for this peer (initially typically
560    * set to "-1").  Actual "bw_out" is MIN of
561    * "bpm_out_internal_limit" and "bw_out_external_limit".
562    */
563   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
564
565   /**
566    * External bandwidth limit set for this peer by the
567    * peer that we are communicating with.  "bw_out" is MIN of
568    * "bw_out_internal_limit" and "bw_out_external_limit".
569    */
570   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
571
572   /**
573    * What was our PING challenge number (for this peer)?
574    */
575   uint32_t ping_challenge;
576
577   /**
578    * What was the last distance to this peer as reported by the transports?
579    */
580   uint32_t last_distance;
581
582   /**
583    * What is our connection status?
584    */
585   enum PeerStateMachine status;
586
587   /**
588    * Are we currently connected to this neighbour?
589    */ 
590   int is_connected;
591
592 };
593
594
595 /**
596  * Data structure for each client connected to the core service.
597  */
598 struct Client
599 {
600   /**
601    * Clients are kept in a linked list.
602    */
603   struct Client *next;
604
605   /**
606    * Handle for the client with the server API.
607    */
608   struct GNUNET_SERVER_Client *client_handle;
609
610   /**
611    * Array of the types of messages this peer cares
612    * about (with "tcnt" entries).  Allocated as part
613    * of this client struct, do not free!
614    */
615   const uint16_t *types;
616
617   /**
618    * Options for messages this client cares about,
619    * see GNUNET_CORE_OPTION_ values.
620    */
621   uint32_t options;
622
623   /**
624    * Number of types of incoming messages this client
625    * specifically cares about.  Size of the "types" array.
626    */
627   unsigned int tcnt;
628
629 };
630
631
632 /**
633  * Our public key.
634  */
635 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
636
637 /**
638  * Our identity.
639  */
640 static struct GNUNET_PeerIdentity my_identity;
641
642 /**
643  * Our private key.
644  */
645 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
646
647 /**
648  * Our scheduler.
649  */
650 struct GNUNET_SCHEDULER_Handle *sched;
651
652 /**
653  * Our configuration.
654  */
655 const struct GNUNET_CONFIGURATION_Handle *cfg;
656
657 /**
658  * Our server.
659  */
660 static struct GNUNET_SERVER_Handle *server;
661
662 /**
663  * Transport service.
664  */
665 static struct GNUNET_TRANSPORT_Handle *transport;
666
667 /**
668  * Linked list of our clients.
669  */
670 static struct Client *clients;
671
672 /**
673  * Context for notifications we need to send to our clients.
674  */
675 static struct GNUNET_SERVER_NotificationContext *notifier;
676
677 /**
678  * We keep neighbours in a linked list (for now).
679  */
680 static struct Neighbour *neighbours;
681
682 /**
683  * Sum of all preferences among all neighbours.
684  */
685 static unsigned long long preference_sum;
686
687 /**
688  * Total number of neighbours we have.
689  */
690 static unsigned int neighbour_count;
691
692 /**
693  * How much inbound bandwidth are we supposed to be using per second?
694  * FIXME: this value is not used!
695  */
696 static unsigned long long bandwidth_target_in_bps;
697
698 /**
699  * How much outbound bandwidth are we supposed to be using per second?
700  */
701 static unsigned long long bandwidth_target_out_bps;
702
703
704
705 /**
706  * A preference value for a neighbour was update.  Update
707  * the preference sum accordingly.
708  *
709  * @param inc how much was a preference value increased?
710  */
711 static void
712 update_preference_sum (unsigned long long inc)
713 {
714   struct Neighbour *n;
715   unsigned long long os;
716
717   os = preference_sum;
718   preference_sum += inc;
719   if (preference_sum >= os)
720     return; /* done! */
721   /* overflow! compensate by cutting all values in half! */
722   preference_sum = 0;
723   n = neighbours;
724   while (n != NULL)
725     {
726       n->current_preference /= 2;
727       preference_sum += n->current_preference;
728       n = n->next;
729     }    
730 }
731
732
733 /**
734  * Find the entry for the given neighbour.
735  *
736  * @param peer identity of the neighbour
737  * @return NULL if we are not connected, otherwise the
738  *         neighbour's entry.
739  */
740 static struct Neighbour *
741 find_neighbour (const struct GNUNET_PeerIdentity *peer)
742 {
743   struct Neighbour *ret;
744
745   ret = neighbours;
746   while ((ret != NULL) &&
747          (0 != memcmp (&ret->peer,
748                        peer, sizeof (struct GNUNET_PeerIdentity))))
749     ret = ret->next;
750   return ret;
751 }
752
753
754 /**
755  * Send a message to one of our clients.
756  *
757  * @param client target for the message
758  * @param msg message to transmit
759  * @param can_drop could this message be dropped if the
760  *        client's queue is getting too large?
761  */
762 static void
763 send_to_client (struct Client *client,
764                 const struct GNUNET_MessageHeader *msg, 
765                 int can_drop)
766 {
767 #if DEBUG_CORE_CLIENT
768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769               "Preparing to send message of type %u to client.\n",
770               ntohs (msg->type));
771 #endif  
772   GNUNET_SERVER_notification_context_unicast (notifier,
773                                               client->client_handle,
774                                               msg,
775                                               can_drop);
776 }
777
778
779 /**
780  * Send a message to all of our current clients that have
781  * the right options set.
782  * 
783  * @param msg message to multicast
784  * @param can_drop can this message be discarded if the queue is too long
785  * @param options mask to use 
786  */
787 static void
788 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
789                      int can_drop,
790                      int options)
791 {
792   struct Client *c;
793
794   c = clients;
795   while (c != NULL)
796     {
797       if (0 != (c->options & options))
798         send_to_client (c, msg, can_drop);
799       c = c->next;
800     }
801 }
802
803
804 /**
805  * Handle CORE_INIT request.
806  */
807 static void
808 handle_client_init (void *cls,
809                     struct GNUNET_SERVER_Client *client,
810                     const struct GNUNET_MessageHeader *message)
811 {
812   const struct InitMessage *im;
813   struct InitReplyMessage irm;
814   struct Client *c;
815   uint16_t msize;
816   const uint16_t *types;
817   uint16_t *wtypes;
818   struct Neighbour *n;
819   struct ConnectNotifyMessage cnm;
820   unsigned int i;
821
822 #if DEBUG_CORE_CLIENT
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824               "Client connecting to core service with `%s' message\n",
825               "INIT");
826 #endif
827   /* check that we don't have an entry already */
828   c = clients;
829   while (c != NULL)
830     {
831       if (client == c->client_handle)
832         {
833           GNUNET_break (0);
834           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
835           return;
836         }
837       c = c->next;
838     }
839   msize = ntohs (message->size);
840   if (msize < sizeof (struct InitMessage))
841     {
842       GNUNET_break (0);
843       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
844       return;
845     }
846   GNUNET_SERVER_notification_context_add (notifier, client);
847   im = (const struct InitMessage *) message;
848   types = (const uint16_t *) &im[1];
849   msize -= sizeof (struct InitMessage);
850   c = GNUNET_malloc (sizeof (struct Client) + msize);
851   c->client_handle = client;
852   c->next = clients;
853   clients = c;
854   c->tcnt = msize / sizeof (uint16_t);
855   c->types = (const uint16_t *) &c[1];
856   wtypes = (uint16_t *) &c[1];
857   for (i=0;i<c->tcnt;i++)
858     wtypes[i] = ntohs (types[i]);
859   c->options = ntohl (im->options);
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Client %p is interested in %u message types\n",
863               c,
864               c->tcnt);
865 #endif
866   /* send init reply message */
867   irm.header.size = htons (sizeof (struct InitReplyMessage));
868   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
869   irm.reserved = htonl (0);
870   memcpy (&irm.publicKey,
871           &my_public_key,
872           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
873 #if DEBUG_CORE_CLIENT
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "Sending `%s' message to client.\n", "INIT_REPLY");
876 #endif
877   send_to_client (c, &irm.header, GNUNET_NO);
878   /* notify new client about existing neighbours */
879   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
880   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
881   n = neighbours;
882   while (n != NULL)
883     {
884       if (n->status == PEER_STATE_KEY_CONFIRMED)
885         {
886 #if DEBUG_CORE_CLIENT
887           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888                       "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
889 #endif
890           cnm.distance = htonl (n->last_distance);
891           cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
892           cnm.peer = n->peer;
893           send_to_client (c, &cnm.header, GNUNET_NO);
894         }
895       n = n->next;
896     }
897   GNUNET_SERVER_receive_done (client, GNUNET_OK);
898 }
899
900
901 /**
902  * A client disconnected, clean up.
903  *
904  * @param cls closure
905  * @param client identification of the client
906  */
907 static void
908 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
909 {
910   struct Client *pos;
911   struct Client *prev;
912
913   if (client == NULL)
914     return;
915 #if DEBUG_CORE_CLIENT
916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
917               "Client %p has disconnected from core service.\n",
918               client);
919 #endif
920   prev = NULL;
921   pos = clients;
922   while (pos != NULL)
923     {
924       if (client == pos->client_handle)
925         {
926           if (prev == NULL)
927             clients = pos->next;
928           else
929             prev->next = pos->next;
930           GNUNET_free (pos);
931           return;
932         }
933       prev = pos;
934       pos = pos->next;
935     }
936   /* client never sent INIT */
937 }
938
939
940 /**
941  * Handle REQUEST_INFO request.
942  */
943 static void
944 handle_client_request_info (void *cls,
945                             struct GNUNET_SERVER_Client *client,
946                             const struct GNUNET_MessageHeader *message)
947 {
948   const struct RequestInfoMessage *rcm;
949   struct Neighbour *n;
950   struct ConfigurationInfoMessage cim;
951   int32_t want_reserv;
952   int32_t got_reserv;
953   unsigned long long old_preference;
954   struct GNUNET_SERVER_TransmitContext *tc;
955
956 #if DEBUG_CORE_CLIENT
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "Core service receives `%s' request.\n", "REQUEST_INFO");
959 #endif
960   rcm = (const struct RequestInfoMessage *) message;
961   n = find_neighbour (&rcm->peer);
962   memset (&cim, 0, sizeof (cim));
963   if (n != NULL) 
964     {
965       want_reserv = ntohl (rcm->reserve_inbound);
966       if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
967         {
968           n->bw_out_internal_limit = rcm->limit_outbound;
969           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
970                                                   n->bw_out_external_limit);
971           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
972                                                  n->bw_out);
973           GNUNET_TRANSPORT_set_quota (transport,
974                                       &n->peer,
975                                       n->bw_in,
976                                       n->bw_out,
977                                       GNUNET_TIME_UNIT_FOREVER_REL,
978                                       NULL, NULL); 
979         }
980       if (want_reserv < 0)
981         {
982           got_reserv = want_reserv;
983         }
984       else if (want_reserv > 0)
985         {
986           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
987                                                   want_reserv).value == 0)
988             got_reserv = want_reserv;
989           else
990             got_reserv = 0; /* all or nothing */
991         }
992       else
993         got_reserv = 0;
994       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
995                                         got_reserv);
996       old_preference = n->current_preference;
997       n->current_preference += GNUNET_ntohll(rcm->preference_change);
998       if (old_preference > n->current_preference) 
999         {
1000           /* overflow; cap at maximum value */
1001           n->current_preference = (unsigned long long) -1;
1002         }
1003       update_preference_sum (n->current_preference - old_preference);
1004 #if DEBUG_CORE_QUOTA
1005       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
1007                   (int) want_reserv,
1008                   GNUNET_i2s (&rcm->peer),
1009                   (int) got_reserv);
1010 #endif
1011       cim.reserved_amount = htonl (got_reserv);
1012       cim.bw_in = n->bw_in;
1013       cim.bw_out = n->bw_out;
1014       cim.preference = n->current_preference;
1015     }
1016   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1017   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1018   cim.peer = rcm->peer;
1019
1020 #if DEBUG_CORE_CLIENT
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1023 #endif
1024   tc = GNUNET_SERVER_transmit_context_create (client);
1025   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1026   GNUNET_SERVER_transmit_context_run (tc,
1027                                       GNUNET_TIME_UNIT_FOREVER_REL);
1028 }
1029
1030
1031 /**
1032  * Free the given entry for the neighbour (it has
1033  * already been removed from the list at this point).
1034  *
1035  * @param n neighbour to free
1036  */
1037 static void
1038 free_neighbour (struct Neighbour *n)
1039 {
1040   struct MessageEntry *m;
1041
1042   if (n->pitr != NULL)
1043     {
1044       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1045       n->pitr = NULL;
1046     }
1047   if (n->skm != NULL)
1048     {
1049       GNUNET_free (n->skm);
1050       n->skm = NULL;
1051     }
1052   while (NULL != (m = n->messages))
1053     {
1054       n->messages = m->next;
1055       GNUNET_free (m);
1056     }
1057   while (NULL != (m = n->encrypted_head))
1058     {
1059       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1060                                    n->encrypted_tail,
1061                                    m);
1062       GNUNET_free (m);
1063     }
1064   if (NULL != n->th)
1065     {
1066       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1067       n->th = NULL;
1068     }
1069   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1070     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1071   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1072     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1073   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1074     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1075   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1076     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1077   GNUNET_free_non_null (n->public_key);
1078   GNUNET_free_non_null (n->pending_ping);
1079   GNUNET_free_non_null (n->pending_pong);
1080   GNUNET_free (n);
1081 }
1082
1083
1084 /**
1085  * Consider freeing the given neighbour since we may not need
1086  * to keep it around anymore.
1087  *
1088  * @param n neighbour to consider discarding
1089  */
1090 static void
1091 consider_free_neighbour (struct Neighbour *n);
1092
1093
1094 /**
1095  * Task triggered when a neighbour entry might have gotten stale.
1096  *
1097  * @param cls the 'struct Neighbour'
1098  * @param tc scheduler context (not used)
1099  */
1100 static void
1101 consider_free_task (void *cls,
1102                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1103 {
1104   struct Neighbour *n = cls;
1105   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1106   consider_free_neighbour (n);
1107 }
1108
1109
1110 /**
1111  * Consider freeing the given neighbour since we may not need
1112  * to keep it around anymore.
1113  *
1114  * @param n neighbour to consider discarding
1115  */
1116 static void
1117 consider_free_neighbour (struct Neighbour *n)
1118
1119   struct Neighbour *pos;
1120   struct Neighbour *prev;
1121   struct GNUNET_TIME_Relative left;
1122
1123   if ( (n->th != NULL) ||
1124        (n->pitr != NULL) ||
1125        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1126        (GNUNET_YES == n->is_connected) )
1127     return; /* no chance */
1128   
1129   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1130                                                                        MAX_PONG_DELAY));
1131   if (left.value > 0)
1132     {
1133       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1134         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1135       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1136                                                          left,
1137                                                          &consider_free_task,
1138                                                          n);
1139       return;
1140     }
1141   /* actually free the neighbour... */
1142   prev = NULL;
1143   pos = neighbours;
1144   while (pos != n)
1145     {
1146       prev = pos;
1147       pos = pos->next;
1148     }
1149   if (prev == NULL)
1150     neighbours = n->next;
1151   else
1152     prev->next = n->next;
1153   GNUNET_assert (neighbour_count > 0);
1154   neighbour_count--;
1155   free_neighbour (n);
1156 }
1157
1158
1159 /**
1160  * Check if we have encrypted messages for the specified neighbour
1161  * pending, and if so, check with the transport about sending them
1162  * out.
1163  *
1164  * @param n neighbour to check.
1165  */
1166 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1167
1168
1169 /**
1170  * Function called when the transport service is ready to
1171  * receive an encrypted message for the respective peer
1172  *
1173  * @param cls neighbour to use message from
1174  * @param size number of bytes we can transmit
1175  * @param buf where to copy the message
1176  * @return number of bytes transmitted
1177  */
1178 static size_t
1179 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1180 {
1181   struct Neighbour *n = cls;
1182   struct MessageEntry *m;
1183   size_t ret;
1184   char *cbuf;
1185
1186   n->th = NULL;
1187   GNUNET_assert (NULL != (m = n->encrypted_head));
1188   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1189                                n->encrypted_tail,
1190                                m);
1191   ret = 0;
1192   cbuf = buf;
1193   if (buf != NULL)
1194     {
1195       GNUNET_assert (size >= m->size);
1196       memcpy (cbuf, &m[1], m->size);
1197       ret = m->size;
1198       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1199                                         m->size);
1200 #if DEBUG_CORE
1201       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1203                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1204                   ret, GNUNET_i2s (&n->peer));
1205 #endif
1206       process_encrypted_neighbour_queue (n);
1207     }
1208   else
1209     {
1210 #if DEBUG_CORE
1211       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                   "Transmission of message of type %u and size %u failed\n",
1213                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1214                   m->size);
1215 #endif
1216     }
1217   GNUNET_free (m);
1218   consider_free_neighbour (n);
1219   return ret;
1220 }
1221
1222
1223 /**
1224  * Check if we have plaintext messages for the specified neighbour
1225  * pending, and if so, consider batching and encrypting them (and
1226  * then trigger processing of the encrypted queue if needed).
1227  *
1228  * @param n neighbour to check.
1229  */
1230 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1231
1232
1233 /**
1234  * Check if we have encrypted messages for the specified neighbour
1235  * pending, and if so, check with the transport about sending them
1236  * out.
1237  *
1238  * @param n neighbour to check.
1239  */
1240 static void
1241 process_encrypted_neighbour_queue (struct Neighbour *n)
1242 {
1243   struct MessageEntry *m;
1244  
1245   if (n->th != NULL)
1246     return;  /* request already pending */
1247   m = n->encrypted_head;
1248   if (m == NULL)
1249     {
1250       /* encrypted queue empty, try plaintext instead */
1251       process_plaintext_neighbour_queue (n);
1252       return;
1253     }
1254 #if DEBUG_CORE
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1257               m->size,
1258               GNUNET_i2s (&n->peer),
1259               GNUNET_TIME_absolute_get_remaining (m->deadline).
1260               value);
1261 #endif
1262   n->th =
1263     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1264                                             m->size,
1265                                             m->priority,
1266                                             GNUNET_TIME_absolute_get_remaining
1267                                             (m->deadline),
1268                                             &notify_encrypted_transmit_ready,
1269                                             n);
1270   if (n->th == NULL)
1271     {
1272       /* message request too large or duplicate request */
1273       GNUNET_break (0);
1274       /* discard encrypted message */
1275       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1276                                    n->encrypted_tail,
1277                                    m);
1278       GNUNET_free (m);
1279       process_encrypted_neighbour_queue (n);
1280     }
1281 }
1282
1283
1284 /**
1285  * Decrypt size bytes from in and write the result to out.  Use the
1286  * key for inbound traffic of the given neighbour.  This function does
1287  * NOT do any integrity-checks on the result.
1288  *
1289  * @param n neighbour we are receiving from
1290  * @param iv initialization vector to use
1291  * @param in ciphertext
1292  * @param out plaintext
1293  * @param size size of in/out
1294  * @return GNUNET_OK on success
1295  */
1296 static int
1297 do_decrypt (struct Neighbour *n,
1298             const GNUNET_HashCode * iv,
1299             const void *in, void *out, size_t size)
1300 {
1301   if (size != (uint16_t) size)
1302     {
1303       GNUNET_break (0);
1304       return GNUNET_NO;
1305     }
1306   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1307       (n->status != PEER_STATE_KEY_CONFIRMED))
1308     {
1309       GNUNET_break_op (0);
1310       return GNUNET_SYSERR;
1311     }
1312   if (size !=
1313       GNUNET_CRYPTO_aes_decrypt (in,
1314                                  (uint16_t) size,
1315                                  &n->decrypt_key,
1316                                  (const struct
1317                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1318                                  out))
1319     {
1320       GNUNET_break (0);
1321       return GNUNET_SYSERR;
1322     }
1323 #if DEBUG_CORE
1324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325               "Decrypted %u bytes from `%4s' using key %u\n",
1326               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1327 #endif
1328   return GNUNET_OK;
1329 }
1330
1331
1332 /**
1333  * Encrypt size bytes from in and write the result to out.  Use the
1334  * key for outbound traffic of the given neighbour.
1335  *
1336  * @param n neighbour we are sending to
1337  * @param iv initialization vector to use
1338  * @param in ciphertext
1339  * @param out plaintext
1340  * @param size size of in/out
1341  * @return GNUNET_OK on success
1342  */
1343 static int
1344 do_encrypt (struct Neighbour *n,
1345             const GNUNET_HashCode * iv,
1346             const void *in, void *out, size_t size)
1347 {
1348   if (size != (uint16_t) size)
1349     {
1350       GNUNET_break (0);
1351       return GNUNET_NO;
1352     }
1353   GNUNET_assert (size ==
1354                  GNUNET_CRYPTO_aes_encrypt (in,
1355                                             (uint16_t) size,
1356                                             &n->encrypt_key,
1357                                             (const struct
1358                                              GNUNET_CRYPTO_AesInitializationVector
1359                                              *) iv, out));
1360 #if DEBUG_CORE
1361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1362               "Encrypted %u bytes for `%4s' using key %u\n", size,
1363               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1364 #endif
1365   return GNUNET_OK;
1366 }
1367
1368
1369 /**
1370  * Select messages for transmission.  This heuristic uses a combination
1371  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1372  * and priority-based discard (in case no feasible schedule exist) and
1373  * speculative optimization (defer any kind of transmission until
1374  * we either create a batch of significant size, 25% of max, or until
1375  * we are close to a deadline).  Furthermore, when scheduling the
1376  * heuristic also packs as many messages into the batch as possible,
1377  * starting with those with the earliest deadline.  Yes, this is fun.
1378  *
1379  * @param n neighbour to select messages from
1380  * @param size number of bytes to select for transmission
1381  * @param retry_time set to the time when we should try again
1382  *        (only valid if this function returns zero)
1383  * @return number of bytes selected, or 0 if we decided to
1384  *         defer scheduling overall; in that case, retry_time is set.
1385  */
1386 static size_t
1387 select_messages (struct Neighbour *n,
1388                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1389 {
1390   struct MessageEntry *pos;
1391   struct MessageEntry *min;
1392   struct MessageEntry *last;
1393   unsigned int min_prio;
1394   struct GNUNET_TIME_Absolute t;
1395   struct GNUNET_TIME_Absolute now;
1396   struct GNUNET_TIME_Relative delta;
1397   uint64_t avail;
1398   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1399   size_t off;
1400   uint64_t tsize;
1401   unsigned int queue_size;
1402   int discard_low_prio;
1403
1404   GNUNET_assert (NULL != n->messages);
1405   now = GNUNET_TIME_absolute_get ();
1406   /* last entry in linked list of messages processed */
1407   last = NULL;
1408   /* should we remove the entry with the lowest
1409      priority from consideration for scheduling at the
1410      end of the loop? */
1411   queue_size = 0;
1412   tsize = 0;
1413   pos = n->messages;
1414   while (pos != NULL)
1415     {
1416       queue_size++;
1417       tsize += pos->size;
1418       pos = pos->next;
1419     }
1420   discard_low_prio = GNUNET_YES;
1421   while (GNUNET_YES == discard_low_prio)
1422     {
1423       min = NULL;
1424       min_prio = -1;
1425       discard_low_prio = GNUNET_NO;
1426       /* calculate number of bytes available for transmission at time "t" */
1427       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1428       t = now;
1429       /* how many bytes have we (hypothetically) scheduled so far */
1430       off = 0;
1431       /* maximum time we can wait before transmitting anything
1432          and still make all of our deadlines */
1433       slack = MAX_CORK_DELAY;
1434       pos = n->messages;
1435       /* note that we use "*2" here because we want to look
1436          a bit further into the future; much more makes no
1437          sense since new message might be scheduled in the
1438          meantime... */
1439       while ((pos != NULL) && (off < size * 2))
1440         {         
1441           if (pos->do_transmit == GNUNET_YES)
1442             {
1443               /* already removed from consideration */
1444               pos = pos->next;
1445               continue;
1446             }
1447           if (discard_low_prio == GNUNET_NO)
1448             {
1449               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1450               if (delta.value > 0)
1451                 {
1452                   // FIXME: HUH? Check!
1453                   t = pos->deadline;
1454                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1455                                                                        delta);
1456                 }
1457               if (avail < pos->size)
1458                 {
1459                   // FIXME: HUH? Check!
1460                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1461                 }
1462               else
1463                 {
1464                   avail -= pos->size;
1465                   /* update slack, considering both its absolute deadline
1466                      and relative deadlines caused by other messages
1467                      with their respective load */
1468                   slack = GNUNET_TIME_relative_min (slack,
1469                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1470                                                                                           avail));
1471                   if (pos->deadline.value <= now.value) 
1472                     {
1473                       /* now or never */
1474                       slack = GNUNET_TIME_UNIT_ZERO;
1475                     }
1476                   else if (GNUNET_YES == pos->got_slack)
1477                     {
1478                       /* should be soon now! */
1479                       slack = GNUNET_TIME_relative_min (slack,
1480                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1481                     }
1482                   else
1483                     {
1484                       slack =
1485                         GNUNET_TIME_relative_min (slack, 
1486                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1487                       pos->got_slack = GNUNET_YES;
1488                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1489                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1490                     }
1491                 }
1492             }
1493           off += pos->size;
1494           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1495           if (pos->priority <= min_prio)
1496             {
1497               /* update min for discard */
1498               min_prio = pos->priority;
1499               min = pos;
1500             }
1501           pos = pos->next;
1502         }
1503       if (discard_low_prio)
1504         {
1505           GNUNET_assert (min != NULL);
1506           /* remove lowest-priority entry from consideration */
1507           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1508         }
1509       last = pos;
1510     }
1511   /* guard against sending "tiny" messages with large headers without
1512      urgent deadlines */
1513   if ( (slack.value > 0) && 
1514        (size > 4 * off) &&
1515        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1516     {
1517       /* less than 25% of message would be filled with deadlines still
1518          being met if we delay by one second or more; so just wait for
1519          more data; but do not wait longer than 1s (since we don't want
1520          to delay messages for a really long time either). */
1521       *retry_time = MAX_CORK_DELAY;
1522       /* reset do_transmit values for next time */
1523       while (pos != last)
1524         {
1525           pos->do_transmit = GNUNET_NO;   
1526           pos = pos->next;
1527         }
1528 #if DEBUG_CORE
1529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1531                   (unsigned long long) slack.value,
1532                   (unsigned int) off,
1533                   (unsigned int) size);
1534 #endif
1535       return 0;
1536     }
1537   /* select marked messages (up to size) for transmission */
1538   off = 0;
1539   pos = n->messages;
1540   while (pos != last)
1541     {
1542       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1543         {
1544           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1545           off += pos->size;
1546           size -= pos->size;
1547         }
1548       else
1549         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1550       pos = pos->next;
1551     }
1552 #if DEBUG_CORE
1553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1554               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1555               off, tsize,
1556               queue_size, MAX_PEER_QUEUE_SIZE,
1557               GNUNET_i2s (&n->peer));
1558 #endif
1559   return off;
1560 }
1561
1562
1563 /**
1564  * Batch multiple messages into a larger buffer.
1565  *
1566  * @param n neighbour to take messages from
1567  * @param buf target buffer
1568  * @param size size of buf
1569  * @param deadline set to transmission deadline for the result
1570  * @param retry_time set to the time when we should try again
1571  *        (only valid if this function returns zero)
1572  * @param priority set to the priority of the batch
1573  * @return number of bytes written to buf (can be zero)
1574  */
1575 static size_t
1576 batch_message (struct Neighbour *n,
1577                char *buf,
1578                size_t size,
1579                struct GNUNET_TIME_Absolute *deadline,
1580                struct GNUNET_TIME_Relative *retry_time,
1581                unsigned int *priority)
1582 {
1583   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1584   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1585   struct MessageEntry *pos;
1586   struct MessageEntry *prev;
1587   struct MessageEntry *next;
1588   size_t ret;
1589   
1590   ret = 0;
1591   *priority = 0;
1592   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1593   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1594   if (0 == select_messages (n, size, retry_time))
1595     {
1596       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1597                   "No messages selected, will try again in %llu ms\n",
1598                   retry_time->value);
1599       return 0;
1600     }
1601   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1602   ntm->distance = htonl (n->last_distance);
1603   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1604   ntm->peer = n->peer;
1605   
1606   pos = n->messages;
1607   prev = NULL;
1608   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1609     {
1610       next = pos->next;
1611       if (GNUNET_YES == pos->do_transmit)
1612         {
1613           GNUNET_assert (pos->size <= size);
1614           /* do notifications */
1615           /* FIXME: track if we have *any* client that wants
1616              full notifications and only do this if that is
1617              actually true */
1618           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1619             {
1620               memcpy (&ntm[1], &pos[1], pos->size);
1621               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1622                                         sizeof (struct GNUNET_MessageHeader));
1623               send_to_all_clients (&ntm->header,
1624                                    GNUNET_YES,
1625                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1626             }
1627           else
1628             {
1629               /* message too large for 'full' notifications, we do at
1630                  least the 'hdr' type */
1631               memcpy (&ntm[1],
1632                       &pos[1],
1633                       sizeof (struct GNUNET_MessageHeader));
1634             }
1635           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1636                                     pos->size);
1637           send_to_all_clients (&ntm->header,
1638                                GNUNET_YES,
1639                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1640 #if DEBUG_HANDSHAKE
1641           fprintf (stderr,
1642                    "Encrypting message of type %u\n",
1643                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1644 #endif
1645           /* copy for encrypted transmission */
1646           memcpy (&buf[ret], &pos[1], pos->size);
1647           ret += pos->size;
1648           size -= pos->size;
1649           *priority += pos->priority;
1650 #if DEBUG_CORE
1651           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652                       "Adding plaintext message of size %u with deadline %llu ms to batch\n",
1653                       pos->size,
1654                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1655 #endif
1656           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1657           GNUNET_free (pos);
1658           if (prev == NULL)
1659             n->messages = next;
1660           else
1661             prev->next = next;
1662         }
1663       else
1664         {
1665           prev = pos;
1666         }
1667       pos = next;
1668     }
1669 #if DEBUG_CORE
1670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671               "Deadline for message batch is %llu ms\n",
1672               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1673 #endif
1674   return ret;
1675 }
1676
1677
1678 /**
1679  * Remove messages with deadlines that have long expired from
1680  * the queue.
1681  *
1682  * @param n neighbour to inspect
1683  */
1684 static void
1685 discard_expired_messages (struct Neighbour *n)
1686 {
1687   struct MessageEntry *prev;
1688   struct MessageEntry *next;
1689   struct MessageEntry *pos;
1690   struct GNUNET_TIME_Absolute now;
1691   struct GNUNET_TIME_Relative delta;
1692
1693   now = GNUNET_TIME_absolute_get ();
1694   prev = NULL;
1695   pos = n->messages;
1696   while (pos != NULL) 
1697     {
1698       next = pos->next;
1699       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1700       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1701         {
1702 #if DEBUG_CORE
1703           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1704                       "Message is %llu ms past due, discarding.\n",
1705                       delta.value);
1706 #endif
1707           if (prev == NULL)
1708             n->messages = next;
1709           else
1710             prev->next = next;
1711           GNUNET_free (pos);
1712         }
1713       else
1714         prev = pos;
1715       pos = next;
1716     }
1717 }
1718
1719
1720 /**
1721  * Signature of the main function of a task.
1722  *
1723  * @param cls closure
1724  * @param tc context information (why was this task triggered now)
1725  */
1726 static void
1727 retry_plaintext_processing (void *cls,
1728                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1729 {
1730   struct Neighbour *n = cls;
1731
1732   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1733   process_plaintext_neighbour_queue (n);
1734 }
1735
1736
1737 /**
1738  * Send our key (and encrypted PING) to the other peer.
1739  *
1740  * @param n the other peer
1741  */
1742 static void send_key (struct Neighbour *n);
1743
1744 /**
1745  * Task that will retry "send_key" if our previous attempt failed
1746  * to yield a PONG.
1747  */
1748 static void
1749 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1750 {
1751   struct Neighbour *n = cls;
1752
1753 #if DEBUG_CORE
1754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1755               "Retrying key transmission to `%4s'\n",
1756               GNUNET_i2s (&n->peer));
1757 #endif
1758   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1759   n->set_key_retry_frequency =
1760     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1761   send_key (n);
1762 }
1763
1764
1765 /**
1766  * Check if we have plaintext messages for the specified neighbour
1767  * pending, and if so, consider batching and encrypting them (and
1768  * then trigger processing of the encrypted queue if needed).
1769  *
1770  * @param n neighbour to check.
1771  */
1772 static void
1773 process_plaintext_neighbour_queue (struct Neighbour *n)
1774 {
1775   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1776   size_t used;
1777   size_t esize;
1778   struct EncryptedMessage *em;  /* encrypted message */
1779   struct EncryptedMessage *ph;  /* plaintext header */
1780   struct MessageEntry *me;
1781   unsigned int priority;
1782   struct GNUNET_TIME_Absolute deadline;
1783   struct GNUNET_TIME_Relative retry_time;
1784   GNUNET_HashCode iv;
1785
1786   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1787     {
1788       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1789       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1790     }
1791   switch (n->status)
1792     {
1793     case PEER_STATE_DOWN:
1794       send_key (n);
1795 #if DEBUG_CORE
1796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1798                   GNUNET_i2s(&n->peer));
1799 #endif
1800       return;
1801     case PEER_STATE_KEY_SENT:
1802       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1803         n->retry_set_key_task
1804           = GNUNET_SCHEDULER_add_delayed (sched,
1805                                           n->set_key_retry_frequency,
1806                                           &set_key_retry_task, n);    
1807 #if DEBUG_CORE
1808       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1810                   GNUNET_i2s(&n->peer));
1811 #endif
1812       return;
1813     case PEER_STATE_KEY_RECEIVED:
1814       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1815         n->retry_set_key_task
1816           = GNUNET_SCHEDULER_add_delayed (sched,
1817                                           n->set_key_retry_frequency,
1818                                           &set_key_retry_task, n);        
1819 #if DEBUG_CORE
1820       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1821                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1822                   GNUNET_i2s(&n->peer));
1823 #endif
1824       return;
1825     case PEER_STATE_KEY_CONFIRMED:
1826       /* ready to continue */
1827       break;
1828     }
1829   discard_expired_messages (n);
1830   if (n->messages == NULL)
1831     {
1832 #if DEBUG_CORE
1833       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834                   "Plaintext message queue for `%4s' is empty.\n",
1835                   GNUNET_i2s(&n->peer));
1836 #endif
1837       return;                   /* no pending messages */
1838     }
1839   if (n->encrypted_head != NULL)
1840     {
1841 #if DEBUG_CORE
1842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1843                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1844                   GNUNET_i2s(&n->peer));
1845 #endif
1846       return;                   /* wait for messages already encrypted to be
1847                                    processed first! */
1848     }
1849   ph = (struct EncryptedMessage *) pbuf;
1850   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1851   priority = 0;
1852   used = sizeof (struct EncryptedMessage);
1853   used += batch_message (n,
1854                          &pbuf[used],
1855                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1856                          &deadline, &retry_time, &priority);
1857   if (used == sizeof (struct EncryptedMessage))
1858     {
1859 #if DEBUG_CORE
1860       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1861                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1862                   GNUNET_i2s(&n->peer));
1863 #endif
1864       /* no messages selected for sending, try again later... */
1865       n->retry_plaintext_task =
1866         GNUNET_SCHEDULER_add_delayed (sched,
1867                                       retry_time,
1868                                       &retry_plaintext_processing, n);
1869       return;
1870     }
1871 #if DEBUG_CORE_QUOTA
1872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1873               "Sending %u b/s as new limit to peer `%4s'\n",
1874               (unsigned int) ntohl (n->bw_in.value__),
1875               GNUNET_i2s (&n->peer));
1876 #endif
1877   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1878   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1879   ph->inbound_bw_limit = n->bw_in;
1880   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1881
1882   /* setup encryption message header */
1883   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1884   me->deadline = deadline;
1885   me->priority = priority;
1886   me->size = used;
1887   em = (struct EncryptedMessage *) &me[1];
1888   em->header.size = htons (used);
1889   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1890   em->iv_seed = ph->iv_seed;
1891   esize = used - ENCRYPTED_HEADER_SIZE;
1892   GNUNET_CRYPTO_hash (&ph->sequence_number,
1893                       esize - sizeof (GNUNET_HashCode), 
1894                       &ph->plaintext_hash);
1895   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
1896   /* encrypt */
1897 #if DEBUG_CORE
1898   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1899               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1900               esize,
1901               GNUNET_i2s(&n->peer),
1902               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1903 #endif
1904   GNUNET_assert (GNUNET_OK ==
1905                  do_encrypt (n,
1906                              &iv,
1907                              &ph->plaintext_hash,
1908                              &em->plaintext_hash, esize));
1909   /* append to transmission list */
1910   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1911                                      n->encrypted_tail,
1912                                      n->encrypted_tail,
1913                                      me);
1914   process_encrypted_neighbour_queue (n);
1915 }
1916
1917
1918 /**
1919  * Function that recalculates the bandwidth quota for the
1920  * given neighbour and transmits it to the transport service.
1921  * 
1922  * @param cls neighbour for the quota update
1923  * @param tc context
1924  */
1925 static void
1926 neighbour_quota_update (void *cls,
1927                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1928
1929
1930 /**
1931  * Schedule the task that will recalculate the bandwidth
1932  * quota for this peer (and possibly force a disconnect of
1933  * idle peers by calculating a bandwidth of zero).
1934  */
1935 static void
1936 schedule_quota_update (struct Neighbour *n)
1937 {
1938   GNUNET_assert (n->quota_update_task ==
1939                  GNUNET_SCHEDULER_NO_TASK);
1940   n->quota_update_task
1941     = GNUNET_SCHEDULER_add_delayed (sched,
1942                                     QUOTA_UPDATE_FREQUENCY,
1943                                     &neighbour_quota_update,
1944                                     n);
1945 }
1946
1947
1948 /**
1949  * Initialize a new 'struct Neighbour'.
1950  *
1951  * @param pid ID of the new neighbour
1952  * @return handle for the new neighbour
1953  */
1954 static struct Neighbour *
1955 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1956 {
1957   struct Neighbour *n;
1958   struct GNUNET_TIME_Absolute now;
1959
1960   n = GNUNET_malloc (sizeof (struct Neighbour));
1961   n->next = neighbours;
1962   neighbours = n;
1963   neighbour_count++;
1964   n->peer = *pid;
1965   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1966   now = GNUNET_TIME_absolute_get ();
1967   n->encrypt_key_created = now;
1968   n->last_activity = now;
1969   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1970   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1971   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1972   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
1973   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1974   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1975                                                 (uint32_t) - 1);
1976   neighbour_quota_update (n, NULL);
1977   return n;
1978 }
1979
1980
1981 /**
1982  * Handle CORE_SEND request.
1983  *
1984  * @param cls unused
1985  * @param client the client issuing the request
1986  * @param message the "struct SendMessage"
1987  */
1988 static void
1989 handle_client_send (void *cls,
1990                     struct GNUNET_SERVER_Client *client,
1991                     const struct GNUNET_MessageHeader *message)
1992 {
1993   const struct SendMessage *sm;
1994   struct Neighbour *n;
1995   struct MessageEntry *prev;
1996   struct MessageEntry *pos;
1997   struct MessageEntry *e; 
1998   struct MessageEntry *min_prio_entry;
1999   struct MessageEntry *min_prio_prev;
2000   unsigned int min_prio;
2001   unsigned int queue_size;
2002   uint16_t msize;
2003
2004   msize = ntohs (message->size);
2005   if (msize <
2006       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
2007     {
2008       GNUNET_break (0);
2009       if (client != NULL)
2010         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2011       return;
2012     }
2013   sm = (const struct SendMessage *) message;
2014   msize -= sizeof (struct SendMessage);
2015   n = find_neighbour (&sm->peer);
2016   if (n == NULL)
2017     n = create_neighbour (&sm->peer);
2018 #if DEBUG_CORE
2019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2020               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2021               "SEND",
2022               msize, 
2023               GNUNET_i2s (&sm->peer));
2024 #endif
2025   /* bound queue size */
2026   discard_expired_messages (n);
2027   min_prio = (unsigned int) -1;
2028   min_prio_entry = NULL;
2029   min_prio_prev = NULL;
2030   queue_size = 0;
2031   prev = NULL;
2032   pos = n->messages;
2033   while (pos != NULL) 
2034     {
2035       if (pos->priority < min_prio)
2036         {
2037           min_prio_entry = pos;
2038           min_prio_prev = prev;
2039           min_prio = pos->priority;
2040         }
2041       queue_size++;
2042       prev = pos;
2043       pos = pos->next;
2044     }
2045   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2046     {
2047       /* queue full */
2048       if (ntohl(sm->priority) <= min_prio)
2049         {
2050           /* discard new entry */
2051 #if DEBUG_CORE
2052           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2053                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2054                       queue_size,
2055                       MAX_PEER_QUEUE_SIZE,
2056                       msize,
2057                       ntohs (message->type));
2058 #endif
2059           if (client != NULL)
2060             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2061           return;
2062         }
2063       /* discard "min_prio_entry" */
2064 #if DEBUG_CORE
2065       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2066                   "Queue full, discarding existing older request\n");
2067 #endif
2068       if (min_prio_prev == NULL)
2069         n->messages = min_prio_entry->next;
2070       else
2071         min_prio_prev->next = min_prio_entry->next;      
2072       GNUNET_free (min_prio_entry);     
2073     }
2074
2075 #if DEBUG_CORE
2076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077               "Adding transmission request for `%4s' of size %u to queue\n",
2078               GNUNET_i2s (&sm->peer),
2079               msize);
2080 #endif  
2081   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2082   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2083   e->priority = ntohl (sm->priority);
2084   e->size = msize;
2085   memcpy (&e[1], &sm[1], msize);
2086
2087   /* insert, keep list sorted by deadline */
2088   prev = NULL;
2089   pos = n->messages;
2090   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2091     {
2092       prev = pos;
2093       pos = pos->next;
2094     }
2095   if (prev == NULL)
2096     n->messages = e;
2097   else
2098     prev->next = e;
2099   e->next = pos;
2100
2101   /* consider scheduling now */
2102   process_plaintext_neighbour_queue (n);
2103   if (client != NULL)
2104     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2105 }
2106
2107
2108 /**
2109  * Function called when the transport service is ready to
2110  * receive a message.  Only resets 'n->th' to NULL.
2111  *
2112  * @param cls neighbour to use message from
2113  * @param size number of bytes we can transmit
2114  * @param buf where to copy the message
2115  * @return number of bytes transmitted
2116  */
2117 static size_t
2118 notify_transport_connect_done (void *cls, size_t size, void *buf)
2119 {
2120   struct Neighbour *n = cls;
2121
2122   n->th = NULL;
2123   if (buf == NULL)
2124     {
2125       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2126                   _("Failed to connect to `%4s': transport failed to connect\n"),
2127                   GNUNET_i2s (&n->peer));
2128       return 0;
2129     }
2130   send_key (n);
2131   return 0;
2132 }
2133
2134
2135 /**
2136  * Handle CORE_REQUEST_CONNECT request.
2137  *
2138  * @param cls unused
2139  * @param client the client issuing the request
2140  * @param message the "struct ConnectMessage"
2141  */
2142 static void
2143 handle_client_request_connect (void *cls,
2144                                struct GNUNET_SERVER_Client *client,
2145                                const struct GNUNET_MessageHeader *message)
2146 {
2147   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2148   struct Neighbour *n;
2149   struct GNUNET_TIME_Relative timeout;
2150
2151   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2152     {
2153       GNUNET_break (0);
2154       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2155       return;
2156     }
2157   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2158   n = find_neighbour (&cm->peer);
2159   if (n == NULL)
2160     n = create_neighbour (&cm->peer);
2161   if ( (n->is_connected) ||
2162        (n->th != NULL) )
2163     return; /* already connected, or at least trying */
2164 #if DEBUG_CORE
2165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2166               "Core received `%s' request for `%4s', will try to establish connection\n",
2167               "REQUEST_CONNECT",
2168               GNUNET_i2s (&cm->peer));
2169 #endif
2170   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2171   /* ask transport to connect to the peer */
2172   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2173                                                   &cm->peer,
2174                                                   sizeof (struct GNUNET_MessageHeader), 0,
2175                                                   timeout,
2176                                                   &notify_transport_connect_done,
2177                                                   n);
2178   GNUNET_break (NULL != n->th);
2179 }
2180
2181
2182 /**
2183  * List of handlers for the messages understood by this
2184  * service.
2185  */
2186 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2187   {&handle_client_init, NULL,
2188    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2189   {&handle_client_request_info, NULL,
2190    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2191    sizeof (struct RequestInfoMessage)},
2192   {&handle_client_send, NULL,
2193    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2194   {&handle_client_request_connect, NULL,
2195    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2196    sizeof (struct ConnectMessage)},
2197   {NULL, NULL, 0, 0}
2198 };
2199
2200
2201 /**
2202  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2203  * the neighbour's struct and retry send_key.  Or, if we did not get a
2204  * HELLO, just do nothing.
2205  *
2206  * @param cls the 'struct Neighbour' to retry sending the key for
2207  * @param peer the peer for which this is the HELLO
2208  * @param hello HELLO message of that peer
2209  * @param trust amount of trust we currently have in that peer
2210  */
2211 static void
2212 process_hello_retry_send_key (void *cls,
2213                               const struct GNUNET_PeerIdentity *peer,
2214                               const struct GNUNET_HELLO_Message *hello,
2215                               uint32_t trust)
2216 {
2217   struct Neighbour *n = cls;
2218
2219   if (peer == NULL)
2220     {
2221 #if DEBUG_CORE
2222       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2223                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2224 #endif
2225       n->pitr = NULL;
2226       if (n->public_key != NULL)
2227         {
2228           send_key (n);
2229         }
2230       else
2231         {
2232           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2233             n->retry_set_key_task
2234               = GNUNET_SCHEDULER_add_delayed (sched,
2235                                               n->set_key_retry_frequency,
2236                                               &set_key_retry_task, n);
2237         }
2238       return;
2239     }
2240
2241 #if DEBUG_CORE
2242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2243               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2244               GNUNET_i2s (peer));
2245 #endif
2246   if (n->public_key != NULL)
2247     {
2248 #if DEBUG_CORE
2249       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250               "already have public key for peer %s!! (so why are we here?)\n",
2251               GNUNET_i2s (peer));
2252 #endif
2253       return;
2254     }
2255
2256 #if DEBUG_CORE
2257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258               "Received new `%s' message for `%4s', initiating key exchange.\n",
2259               "HELLO",
2260               GNUNET_i2s (peer));
2261 #endif
2262   n->public_key =
2263     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2264   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2265     {
2266       GNUNET_free (n->public_key);
2267       n->public_key = NULL;
2268 #if DEBUG_CORE
2269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2270               "GNUNET_HELLO_get_key returned awfully\n");
2271 #endif
2272       return;
2273     }
2274 }
2275
2276
2277 /**
2278  * Send our key (and encrypted PING) to the other peer.
2279  *
2280  * @param n the other peer
2281  */
2282 static void
2283 send_key (struct Neighbour *n)
2284 {
2285   struct SetKeyMessage *sm;
2286   struct MessageEntry *me;
2287   struct PingMessage pp;
2288   struct PingMessage *pm;
2289
2290   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2291        (n->pitr != NULL) )
2292     {
2293 #if DEBUG_CORE
2294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295                   "Key exchange in progress with `%4s'.\n",
2296                   GNUNET_i2s (&n->peer));
2297 #endif
2298       return; /* already in progress */
2299     }
2300
2301 #if DEBUG_CORE
2302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2303               "Asked to perform key exchange with `%4s'.\n",
2304               GNUNET_i2s (&n->peer));
2305 #endif
2306   if (n->public_key == NULL)
2307     {
2308       /* lookup n's public key, then try again */
2309 #if DEBUG_CORE
2310       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2311                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2312                   GNUNET_i2s (&n->peer));
2313 #endif
2314       GNUNET_assert (n->pitr == NULL);
2315       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2316                                          sched,
2317                                          &n->peer,
2318                                          0,
2319                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2320                                          &process_hello_retry_send_key, n);
2321       return;
2322     }
2323   /* first, set key message */
2324   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2325                       sizeof (struct SetKeyMessage));
2326   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2327   me->priority = SET_KEY_PRIORITY;
2328   me->size = sizeof (struct SetKeyMessage);
2329   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2330                                      n->encrypted_tail,
2331                                      n->encrypted_tail,
2332                                      me);
2333   sm = (struct SetKeyMessage *) &me[1];
2334   sm->header.size = htons (sizeof (struct SetKeyMessage));
2335   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2336   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2337                                         PEER_STATE_KEY_SENT : n->status));
2338   sm->purpose.size =
2339     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2340            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2341            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2342            sizeof (struct GNUNET_PeerIdentity));
2343   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2344   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2345   sm->target = n->peer;
2346   GNUNET_assert (GNUNET_OK ==
2347                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2348                                             sizeof (struct
2349                                                     GNUNET_CRYPTO_AesSessionKey),
2350                                             n->public_key,
2351                                             &sm->encrypted_key));
2352   GNUNET_assert (GNUNET_OK ==
2353                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2354                                          &sm->signature));
2355
2356   /* second, encrypted PING message */
2357   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2358                       sizeof (struct PingMessage));
2359   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2360   me->priority = PING_PRIORITY;
2361   me->size = sizeof (struct PingMessage);
2362   n->encrypted_tail->next = me;
2363   n->encrypted_tail = me;
2364   pm = (struct PingMessage *) &me[1];
2365   pm->header.size = htons (sizeof (struct PingMessage));
2366   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2367   pp.challenge = htonl (n->ping_challenge);
2368   pp.target = n->peer;
2369 #if DEBUG_CORE
2370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2372               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2373   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2374               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2375               "PING",
2376               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2377 #endif
2378   do_encrypt (n,
2379               &n->peer.hashPubKey,
2380               &pp.challenge,
2381               &pm->challenge,
2382               sizeof (struct PingMessage) -
2383               sizeof (struct GNUNET_MessageHeader));
2384   /* update status */
2385   switch (n->status)
2386     {
2387     case PEER_STATE_DOWN:
2388       n->status = PEER_STATE_KEY_SENT;
2389       break;
2390     case PEER_STATE_KEY_SENT:
2391       break;
2392     case PEER_STATE_KEY_RECEIVED:
2393       break;
2394     case PEER_STATE_KEY_CONFIRMED:
2395       break;
2396     default:
2397       GNUNET_break (0);
2398       break;
2399     }
2400 #if DEBUG_CORE
2401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402               "Have %llu ms left for `%s' transmission.\n",
2403               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2404               "SET_KEY");
2405 #endif
2406   /* trigger queue processing */
2407   process_encrypted_neighbour_queue (n);
2408   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2409        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2410     n->retry_set_key_task
2411       = GNUNET_SCHEDULER_add_delayed (sched,
2412                                       n->set_key_retry_frequency,
2413                                       &set_key_retry_task, n);    
2414 }
2415
2416
2417 /**
2418  * We received a SET_KEY message.  Validate and update
2419  * our key material and status.
2420  *
2421  * @param n the neighbour from which we received message m
2422  * @param m the set key message we received
2423  */
2424 static void
2425 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2426
2427
2428 /**
2429  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2430  * the neighbour's struct and retry handling the set_key message.  Or,
2431  * if we did not get a HELLO, just free the set key message.
2432  *
2433  * @param cls pointer to the set key message
2434  * @param peer the peer for which this is the HELLO
2435  * @param hello HELLO message of that peer
2436  * @param trust amount of trust we currently have in that peer
2437  */
2438 static void
2439 process_hello_retry_handle_set_key (void *cls,
2440                                     const struct GNUNET_PeerIdentity *peer,
2441                                     const struct GNUNET_HELLO_Message *hello,
2442                                     uint32_t trust)
2443 {
2444   struct Neighbour *n = cls;
2445   struct SetKeyMessage *sm = n->skm;
2446
2447   if (peer == NULL)
2448     {
2449       GNUNET_free (sm);
2450       n->skm = NULL;
2451       n->pitr = NULL;
2452       return;
2453     }
2454   if (n->public_key != NULL)
2455     return;                     /* multiple HELLOs match!? */
2456   n->public_key =
2457     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2458   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2459     {
2460       GNUNET_break_op (0);
2461       GNUNET_free (n->public_key);
2462       n->public_key = NULL;
2463       return;
2464     }
2465 #if DEBUG_CORE
2466   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2467               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2468               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2469 #endif
2470   handle_set_key (n, sm);
2471 }
2472
2473
2474 /**
2475  * We received a PING message.  Validate and transmit
2476  * PONG.
2477  *
2478  * @param n sender of the PING
2479  * @param m the encrypted PING message itself
2480  */
2481 static void
2482 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2483 {
2484   struct PingMessage t;
2485   struct PongMessage tx;
2486   struct PongMessage *tp;
2487   struct MessageEntry *me;
2488
2489 #if DEBUG_CORE
2490   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2491               "Core service receives `%s' request from `%4s'.\n",
2492               "PING", GNUNET_i2s (&n->peer));
2493 #endif
2494   if (GNUNET_OK !=
2495       do_decrypt (n,
2496                   &my_identity.hashPubKey,
2497                   &m->challenge,
2498                   &t.challenge,
2499                   sizeof (struct PingMessage) -
2500                   sizeof (struct GNUNET_MessageHeader)))
2501     return;
2502 #if DEBUG_CORE
2503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2504               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2505               "PING",
2506               GNUNET_i2s (&t.target),
2507               ntohl (t.challenge), n->decrypt_key.crc32);
2508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2509               "Target of `%s' request is `%4s'.\n",
2510               "PING", GNUNET_i2s (&t.target));
2511 #endif
2512   if (0 != memcmp (&t.target,
2513                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2514     {
2515       GNUNET_break_op (0);
2516       return;
2517     }
2518   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2519                       sizeof (struct PongMessage));
2520   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2521                                      n->encrypted_tail,
2522                                      n->encrypted_tail,
2523                                      me);
2524   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2525   me->priority = PONG_PRIORITY;
2526   me->size = sizeof (struct PongMessage);
2527   tx.reserved = htonl (0);
2528   tx.inbound_bw_limit = n->bw_in;
2529   tx.challenge = t.challenge;
2530   tx.target = t.target;
2531   tp = (struct PongMessage *) &me[1];
2532   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2533   tp->header.size = htons (sizeof (struct PongMessage));
2534   do_encrypt (n,
2535               &my_identity.hashPubKey,
2536               &tx.challenge,
2537               &tp->challenge,
2538               sizeof (struct PongMessage) -
2539               sizeof (struct GNUNET_MessageHeader));
2540 #if DEBUG_CORE
2541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2542               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2543               ntohl (t.challenge), n->encrypt_key.crc32);
2544 #endif
2545   /* trigger queue processing */
2546   process_encrypted_neighbour_queue (n);
2547 }
2548
2549
2550 /**
2551  * We received a PONG message.  Validate and update our status.
2552  *
2553  * @param n sender of the PONG
2554  * @param m the encrypted PONG message itself
2555  */
2556 static void
2557 handle_pong (struct Neighbour *n, 
2558              const struct PongMessage *m)
2559 {
2560   struct PongMessage t;
2561   struct ConnectNotifyMessage cnm;
2562
2563 #if DEBUG_CORE
2564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2565               "Core service receives `%s' request from `%4s'.\n",
2566               "PONG", GNUNET_i2s (&n->peer));
2567 #endif
2568   if (GNUNET_OK !=
2569       do_decrypt (n,
2570                   &n->peer.hashPubKey,
2571                   &m->challenge,
2572                   &t.challenge,
2573                   sizeof (struct PongMessage) -
2574                   sizeof (struct GNUNET_MessageHeader)))
2575     return;
2576   if (0 != ntohl (t.reserved))
2577     {
2578       GNUNET_break_op (0);
2579       return;
2580     }
2581 #if DEBUG_CORE
2582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2583               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2584               "PONG",
2585               GNUNET_i2s (&t.target),
2586               ntohl (t.challenge), n->decrypt_key.crc32);
2587 #endif
2588   if ((0 != memcmp (&t.target,
2589                     &n->peer,
2590                     sizeof (struct GNUNET_PeerIdentity))) ||
2591       (n->ping_challenge != ntohl (t.challenge)))
2592     {
2593       /* PONG malformed */
2594 #if DEBUG_CORE
2595       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2596                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2597                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2598       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2599                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2600                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2601 #endif
2602       GNUNET_break_op (0);
2603       return;
2604     }
2605   switch (n->status)
2606     {
2607     case PEER_STATE_DOWN:
2608       GNUNET_break (0);         /* should be impossible */
2609       return;
2610     case PEER_STATE_KEY_SENT:
2611       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2612       return;
2613     case PEER_STATE_KEY_RECEIVED:
2614       n->status = PEER_STATE_KEY_CONFIRMED;
2615       if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
2616         {
2617           n->bw_out_external_limit = t.inbound_bw_limit;
2618           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2619                                                   n->bw_out_internal_limit);
2620           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2621                                                  n->bw_out);       
2622           GNUNET_TRANSPORT_set_quota (transport,
2623                                       &n->peer,
2624                                       n->bw_in,
2625                                       n->bw_out,
2626                                       GNUNET_TIME_UNIT_FOREVER_REL,
2627                                       NULL, NULL); 
2628         }
2629 #if DEBUG_CORE
2630       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2631                   "Confirmed key via `%s' message for peer `%4s'\n",
2632                   "PONG", GNUNET_i2s (&n->peer));
2633 #endif
2634       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2635         {
2636           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2637           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2638         }      
2639       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2640       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2641       cnm.distance = htonl (n->last_distance);
2642       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2643       cnm.peer = n->peer;
2644       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2645       process_encrypted_neighbour_queue (n);
2646       break;
2647     case PEER_STATE_KEY_CONFIRMED:
2648       /* duplicate PONG? */
2649       break;
2650     default:
2651       GNUNET_break (0);
2652       break;
2653     }
2654 }
2655
2656
2657 /**
2658  * We received a SET_KEY message.  Validate and update
2659  * our key material and status.
2660  *
2661  * @param n the neighbour from which we received message m
2662  * @param m the set key message we received
2663  */
2664 static void
2665 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2666 {
2667   struct SetKeyMessage *m_cpy;
2668   struct GNUNET_TIME_Absolute t;
2669   struct GNUNET_CRYPTO_AesSessionKey k;
2670   struct PingMessage *ping;
2671   struct PongMessage *pong;
2672   enum PeerStateMachine sender_status;
2673
2674 #if DEBUG_CORE
2675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2676               "Core service receives `%s' request from `%4s'.\n",
2677               "SET_KEY", GNUNET_i2s (&n->peer));
2678 #endif
2679   if (n->public_key == NULL)
2680     {
2681       if (n->pitr != NULL)
2682         {
2683 #if DEBUG_CORE
2684           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2685                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2686                       "SET_KEY");
2687 #endif
2688           return;
2689         }
2690 #if DEBUG_CORE
2691       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2692                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2693 #endif
2694       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2695       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2696       /* lookup n's public key, then try again */
2697       GNUNET_assert (n->skm == NULL);
2698       n->skm = m_cpy;
2699       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2700                                          sched,
2701                                          &n->peer,
2702                                          0,
2703                                          GNUNET_TIME_UNIT_MINUTES,
2704                                          &process_hello_retry_handle_set_key, n);
2705       return;
2706     }
2707   if (0 != memcmp (&m->target,
2708                    &my_identity,
2709                    sizeof (struct GNUNET_PeerIdentity)))
2710     {
2711       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2712                   _("Received `%s' message that was not for me.  Ignoring.\n"),
2713                   "SET_KEY");
2714       return;
2715     }
2716   if ((ntohl (m->purpose.size) !=
2717        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2718        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2719        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2720        sizeof (struct GNUNET_PeerIdentity)) ||
2721       (GNUNET_OK !=
2722        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2723                                  &m->purpose, &m->signature, n->public_key)))
2724     {
2725       /* invalid signature */
2726       GNUNET_break_op (0);
2727       return;
2728     }
2729   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2730   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2731        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2732       (t.value < n->decrypt_key_created.value))
2733     {
2734       /* this could rarely happen due to massive re-ordering of
2735          messages on the network level, but is most likely either
2736          a bug or some adversary messing with us.  Report. */
2737       GNUNET_break_op (0);
2738       return;
2739     }
2740 #if DEBUG_CORE
2741   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2742               "Decrypting key material.\n");
2743 #endif  
2744   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2745                                   &m->encrypted_key,
2746                                   &k,
2747                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2748        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2749       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2750     {
2751       /* failed to decrypt !? */
2752       GNUNET_break_op (0);
2753       return;
2754     }
2755
2756   n->decrypt_key = k;
2757   if (n->decrypt_key_created.value != t.value)
2758     {
2759       /* fresh key, reset sequence numbers */
2760       n->last_sequence_number_received = 0;
2761       n->last_packets_bitmap = 0;
2762       n->decrypt_key_created = t;
2763     }
2764   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2765   switch (n->status)
2766     {
2767     case PEER_STATE_DOWN:
2768       n->status = PEER_STATE_KEY_RECEIVED;
2769 #if DEBUG_CORE
2770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2771                   "Responding to `%s' with my own key.\n", "SET_KEY");
2772 #endif
2773       send_key (n);
2774       break;
2775     case PEER_STATE_KEY_SENT:
2776     case PEER_STATE_KEY_RECEIVED:
2777       n->status = PEER_STATE_KEY_RECEIVED;
2778       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2779           (sender_status != PEER_STATE_KEY_CONFIRMED))
2780         {
2781 #if DEBUG_CORE
2782           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2783                       "Responding to `%s' with my own key (other peer has status %u).\n",
2784                       "SET_KEY", sender_status);
2785 #endif
2786           send_key (n);
2787         }
2788       break;
2789     case PEER_STATE_KEY_CONFIRMED:
2790       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2791           (sender_status != PEER_STATE_KEY_CONFIRMED))
2792         {         
2793 #if DEBUG_CORE
2794           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2795                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2796                       "SET_KEY", sender_status);
2797 #endif
2798           send_key (n);
2799         }
2800       break;
2801     default:
2802       GNUNET_break (0);
2803       break;
2804     }
2805   if (n->pending_ping != NULL)
2806     {
2807       ping = n->pending_ping;
2808       n->pending_ping = NULL;
2809       handle_ping (n, ping);
2810       GNUNET_free (ping);
2811     }
2812   if (n->pending_pong != NULL)
2813     {
2814       pong = n->pending_pong;
2815       n->pending_pong = NULL;
2816       handle_pong (n, pong);
2817       GNUNET_free (pong);
2818     }
2819 }
2820
2821
2822 /**
2823  * Send a P2P message to a client.
2824  *
2825  * @param sender who sent us the message?
2826  * @param client who should we give the message to?
2827  * @param m contains the message to transmit
2828  * @param msize number of bytes in buf to transmit
2829  */
2830 static void
2831 send_p2p_message_to_client (struct Neighbour *sender,
2832                             struct Client *client,
2833                             const void *m, size_t msize)
2834 {
2835   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2836   struct NotifyTrafficMessage *ntm;
2837
2838 #if DEBUG_CORE
2839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2840               "Core service passes message from `%4s' of type %u to client.\n",
2841               GNUNET_i2s(&sender->peer),
2842               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2843 #endif
2844   ntm = (struct NotifyTrafficMessage *) buf;
2845   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2846   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2847   ntm->distance = htonl (sender->last_distance);
2848   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2849   ntm->peer = sender->peer;
2850   memcpy (&ntm[1], m, msize);
2851   send_to_client (client, &ntm->header, GNUNET_YES);
2852 }
2853
2854
2855 /**
2856  * Deliver P2P message to interested clients.
2857  *
2858  * @param sender who sent us the message?
2859  * @param m the message
2860  * @param msize size of the message (including header)
2861  */
2862 static void
2863 deliver_message (struct Neighbour *sender,
2864                  const struct GNUNET_MessageHeader *m, size_t msize)
2865 {
2866   struct Client *cpos;
2867   uint16_t type;
2868   unsigned int tpos;
2869   int deliver_full;
2870   int dropped;
2871
2872   type = ntohs (m->type);
2873 #if DEBUG_CORE
2874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2875               "Received encapsulated message of type %u from `%4s'\n",
2876               type,
2877               GNUNET_i2s (&sender->peer));
2878 #endif
2879   dropped = GNUNET_YES;
2880   cpos = clients;
2881   while (cpos != NULL)
2882     {
2883       deliver_full = GNUNET_NO;
2884       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2885         deliver_full = GNUNET_YES;
2886       else
2887         {
2888           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2889             {
2890               if (type != cpos->types[tpos])
2891                 continue;
2892               deliver_full = GNUNET_YES;
2893               break;
2894             }
2895         }
2896       if (GNUNET_YES == deliver_full)
2897         {
2898           send_p2p_message_to_client (sender, cpos, m, msize);
2899           dropped = GNUNET_NO;
2900         }
2901       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2902         {
2903           send_p2p_message_to_client (sender, cpos, m,
2904                                       sizeof (struct GNUNET_MessageHeader));
2905         }
2906       cpos = cpos->next;
2907     }
2908   if (dropped == GNUNET_YES)
2909     {
2910 #if DEBUG_CORE
2911       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2912                   "Message of type %u from `%4s' not delivered to any client.\n",
2913                   type,
2914                   GNUNET_i2s (&sender->peer));
2915 #endif
2916       /* FIXME: stats... */
2917     }
2918 }
2919
2920
2921 /**
2922  * Align P2P message and then deliver to interested clients.
2923  *
2924  * @param sender who sent us the message?
2925  * @param buffer unaligned (!) buffer containing message
2926  * @param msize size of the message (including header)
2927  */
2928 static void
2929 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2930 {
2931   char abuf[msize];
2932
2933   /* TODO: call to statistics? */
2934   memcpy (abuf, buffer, msize);
2935   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2936 }
2937
2938
2939 /**
2940  * Deliver P2P messages to interested clients.
2941  *
2942  * @param sender who sent us the message?
2943  * @param buffer buffer containing messages, can be modified
2944  * @param buffer_size size of the buffer (overall)
2945  * @param offset offset where messages in the buffer start
2946  */
2947 static void
2948 deliver_messages (struct Neighbour *sender,
2949                   const char *buffer, size_t buffer_size, size_t offset)
2950 {
2951   struct GNUNET_MessageHeader *mhp;
2952   struct GNUNET_MessageHeader mh;
2953   uint16_t msize;
2954   int need_align;
2955
2956   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2957     {
2958       if (0 != offset % sizeof (uint16_t))
2959         {
2960           /* outch, need to copy to access header */
2961           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2962           mhp = &mh;
2963         }
2964       else
2965         {
2966           /* can access header directly */
2967           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2968         }
2969       msize = ntohs (mhp->size);
2970       if (msize + offset > buffer_size)
2971         {
2972           /* malformed message, header says it is larger than what
2973              would fit into the overall buffer */
2974           GNUNET_break_op (0);
2975           break;
2976         }
2977 #if HAVE_UNALIGNED_64_ACCESS
2978       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2979 #else
2980       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2981 #endif
2982       if (GNUNET_YES == need_align)
2983         align_and_deliver (sender, &buffer[offset], msize);
2984       else
2985         deliver_message (sender,
2986                          (const struct GNUNET_MessageHeader *)
2987                          &buffer[offset], msize);
2988       offset += msize;
2989     }
2990 }
2991
2992
2993 /**
2994  * We received an encrypted message.  Decrypt, validate and
2995  * pass on to the appropriate clients.
2996  */
2997 static void
2998 handle_encrypted_message (struct Neighbour *n,
2999                           const struct EncryptedMessage *m)
3000 {
3001   size_t size = ntohs (m->header.size);
3002   char buf[size];
3003   struct EncryptedMessage *pt;  /* plaintext */
3004   GNUNET_HashCode ph;
3005   size_t off;
3006   uint32_t snum;
3007   struct GNUNET_TIME_Absolute t;
3008   GNUNET_HashCode iv;
3009
3010 #if DEBUG_CORE
3011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3012               "Core service receives `%s' request from `%4s'.\n",
3013               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
3014 #endif  
3015   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
3016   /* decrypt */
3017   if (GNUNET_OK !=
3018       do_decrypt (n,
3019                   &iv,
3020                   &m->plaintext_hash,
3021                   &buf[ENCRYPTED_HEADER_SIZE], 
3022                   size - ENCRYPTED_HEADER_SIZE))
3023     return;
3024   pt = (struct EncryptedMessage *) buf;
3025
3026   /* validate hash */
3027   GNUNET_CRYPTO_hash (&pt->sequence_number,
3028                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3029   if (0 != memcmp (&ph, 
3030                    &pt->plaintext_hash, 
3031                    sizeof (GNUNET_HashCode)))
3032     {
3033       /* checksum failed */
3034       GNUNET_break_op (0);
3035       return;
3036     }
3037
3038   /* validate sequence number */
3039   snum = ntohl (pt->sequence_number);
3040   if (n->last_sequence_number_received == snum)
3041     {
3042       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3043                   "Received duplicate message, ignoring.\n");
3044       /* duplicate, ignore */
3045       return;
3046     }
3047   if ((n->last_sequence_number_received > snum) &&
3048       (n->last_sequence_number_received - snum > 32))
3049     {
3050       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3051                   "Received ancient out of sequence message, ignoring.\n");
3052       /* ancient out of sequence, ignore */
3053       return;
3054     }
3055   if (n->last_sequence_number_received > snum)
3056     {
3057       unsigned int rotbit =
3058         1 << (n->last_sequence_number_received - snum - 1);
3059       if ((n->last_packets_bitmap & rotbit) != 0)
3060         {
3061           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3062                       "Received duplicate message, ignoring.\n");
3063           /* duplicate, ignore */
3064           return;
3065         }
3066       n->last_packets_bitmap |= rotbit;
3067     }
3068   if (n->last_sequence_number_received < snum)
3069     {
3070       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3071       n->last_sequence_number_received = snum;
3072     }
3073
3074   /* check timestamp */
3075   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3076   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3077     {
3078       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3079                   _
3080                   ("Message received far too old (%llu ms). Content ignored.\n"),
3081                   GNUNET_TIME_absolute_get_duration (t).value);
3082       return;
3083     }
3084
3085   /* process decrypted message(s) */
3086   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3087     {
3088 #if DEBUG_CORE_SET_QUOTA
3089       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3090                   "Received %u b/s as new inbound limit for peer `%4s'\n",
3091                   (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3092                   GNUNET_i2s (&n->peer));
3093 #endif
3094       n->bw_out_external_limit = pt->inbound_bw_limit;
3095       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3096                                               n->bw_out_internal_limit);
3097       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3098                                              n->bw_out);
3099       GNUNET_TRANSPORT_set_quota (transport,
3100                                   &n->peer,
3101                                   n->bw_in,
3102                                   n->bw_out,
3103                                   GNUNET_TIME_UNIT_FOREVER_REL,
3104                                   NULL, NULL); 
3105     }
3106   n->last_activity = GNUNET_TIME_absolute_get ();
3107   off = sizeof (struct EncryptedMessage);
3108   deliver_messages (n, buf, size, off);
3109 }
3110
3111
3112 /**
3113  * Function called by the transport for each received message.
3114  *
3115  * @param cls closure
3116  * @param peer (claimed) identity of the other peer
3117  * @param message the message
3118  * @param latency estimated latency for communicating with the
3119  *             given peer (round-trip)
3120  * @param distance in overlay hops, as given by transport plugin
3121  */
3122 static void
3123 handle_transport_receive (void *cls,
3124                           const struct GNUNET_PeerIdentity *peer,
3125                           const struct GNUNET_MessageHeader *message,
3126                           struct GNUNET_TIME_Relative latency,
3127                           unsigned int distance)
3128 {
3129   struct Neighbour *n;
3130   struct GNUNET_TIME_Absolute now;
3131   int up;
3132   uint16_t type;
3133   uint16_t size;
3134
3135 #if DEBUG_CORE
3136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3137               "Received message of type %u from `%4s', demultiplexing.\n",
3138               ntohs (message->type), GNUNET_i2s (peer));
3139 #endif
3140   n = find_neighbour (peer);
3141   if (n == NULL)
3142     n = create_neighbour (peer);
3143   if (n == NULL)
3144     return;   
3145   n->last_latency = latency;
3146   n->last_distance = distance;
3147   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3148   type = ntohs (message->type);
3149   size = ntohs (message->size);
3150 #if DEBUG_HANDSHAKE
3151   fprintf (stderr,
3152            "Received message of type %u from `%4s'\n",
3153            type,
3154            GNUNET_i2s (peer));
3155 #endif
3156   switch (type)
3157     {
3158     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3159       if (size != sizeof (struct SetKeyMessage))
3160         {
3161           GNUNET_break_op (0);
3162           return;
3163         }
3164       handle_set_key (n, (const struct SetKeyMessage *) message);
3165       break;
3166     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3167       if (size < sizeof (struct EncryptedMessage) +
3168           sizeof (struct GNUNET_MessageHeader))
3169         {
3170           GNUNET_break_op (0);
3171           return;
3172         }
3173       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3174           (n->status != PEER_STATE_KEY_CONFIRMED))
3175         {
3176           GNUNET_break_op (0);
3177           return;
3178         }
3179       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3180       break;
3181     case GNUNET_MESSAGE_TYPE_CORE_PING:
3182       if (size != sizeof (struct PingMessage))
3183         {
3184           GNUNET_break_op (0);
3185           return;
3186         }
3187       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3188           (n->status != PEER_STATE_KEY_CONFIRMED))
3189         {
3190 #if DEBUG_CORE
3191           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3192                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3193                       "PING", GNUNET_i2s (&n->peer));
3194 #endif
3195           GNUNET_free_non_null (n->pending_ping);
3196           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3197           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3198           return;
3199         }
3200       handle_ping (n, (const struct PingMessage *) message);
3201       break;
3202     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3203       if (size != sizeof (struct PongMessage))
3204         {
3205           GNUNET_break_op (0);
3206           return;
3207         }
3208       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3209            (n->status != PEER_STATE_KEY_CONFIRMED) )
3210         {
3211 #if DEBUG_CORE
3212           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3213                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3214                       "PONG", GNUNET_i2s (&n->peer));
3215 #endif
3216           GNUNET_free_non_null (n->pending_pong);
3217           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3218           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3219           return;
3220         }
3221       handle_pong (n, (const struct PongMessage *) message);
3222       break;
3223     default:
3224       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3225                   _("Unsupported message of type %u received.\n"), type);
3226       return;
3227     }
3228   if (n->status == PEER_STATE_KEY_CONFIRMED)
3229     {
3230       now = GNUNET_TIME_absolute_get ();
3231       n->last_activity = now;
3232       if (!up)
3233         n->time_established = now;
3234     }
3235 }
3236
3237
3238 /**
3239  * Function that recalculates the bandwidth quota for the
3240  * given neighbour and transmits it to the transport service.
3241  * 
3242  * @param cls neighbour for the quota update
3243  * @param tc context
3244  */
3245 static void
3246 neighbour_quota_update (void *cls,
3247                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3248 {
3249   struct Neighbour *n = cls;
3250   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3251   double pref_rel;
3252   double share;
3253   unsigned long long distributable;
3254   uint64_t need_per_peer;
3255   uint64_t need_per_second;
3256   
3257   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3258   /* calculate relative preference among all neighbours;
3259      divides by a bit more to avoid division by zero AND to
3260      account for possibility of new neighbours joining any time 
3261      AND to convert to double... */
3262   if (preference_sum == 0)
3263     {
3264       pref_rel = 1.0 / (double) neighbour_count;
3265     }
3266   else
3267     {
3268       pref_rel = n->current_preference / preference_sum;
3269     }
3270   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3271                                                               GNUNET_TIME_UNIT_SECONDS);  
3272   need_per_second = need_per_peer * neighbour_count;
3273   distributable = 0;
3274   if (bandwidth_target_out_bps > need_per_second)
3275     distributable = bandwidth_target_out_bps - need_per_second;
3276   share = distributable * pref_rel;
3277   if (share + need_per_peer > ( (uint32_t)-1))
3278     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3279   else
3280     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3281   /* check if we want to disconnect for good due to inactivity */
3282   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3283        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3284     {
3285 #if DEBUG_CORE
3286       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3287                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3288                   GNUNET_i2s (&n->peer));
3289 #endif
3290       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3291     }
3292 #if DEBUG_CORE_QUOTA
3293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3294               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3295               GNUNET_i2s (&n->peer),
3296               (unsigned int) ntohl (q_in.value__),
3297               bandwidth_target_out_bps,
3298               (unsigned int) ntohl (n->bw_in.value__),
3299               (unsigned int) ntohl (n->bw_out.value__),
3300               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3301 #endif
3302   if (n->bw_in.value__ != q_in.value__) 
3303     {
3304       n->bw_in = q_in;
3305       GNUNET_TRANSPORT_set_quota (transport,
3306                                   &n->peer,
3307                                   n->bw_in,
3308                                   n->bw_out,
3309                                   GNUNET_TIME_UNIT_FOREVER_REL,
3310                                   NULL, NULL);
3311     }
3312   schedule_quota_update (n);
3313 }
3314
3315
3316 /**
3317  * Function called by transport to notify us that
3318  * a peer connected to us (on the network level).
3319  *
3320  * @param cls closure
3321  * @param peer the peer that connected
3322  * @param latency current latency of the connection
3323  * @param distance in overlay hops, as given by transport plugin
3324  */
3325 static void
3326 handle_transport_notify_connect (void *cls,
3327                                  const struct GNUNET_PeerIdentity *peer,
3328                                  struct GNUNET_TIME_Relative latency,
3329                                  unsigned int distance)
3330 {
3331   struct Neighbour *n;
3332   struct GNUNET_TIME_Absolute now;
3333   struct ConnectNotifyMessage cnm;
3334
3335   n = find_neighbour (peer);
3336   if (n != NULL)
3337     {
3338       if (n->is_connected)
3339         {
3340           /* duplicate connect notification!? */
3341           GNUNET_break (0);
3342           return;
3343         }
3344     }
3345   else
3346     {
3347       n = create_neighbour (peer);
3348     }
3349   now = GNUNET_TIME_absolute_get ();
3350   n->is_connected = GNUNET_YES;      
3351   n->last_latency = latency;
3352   n->last_distance = distance;
3353   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3354                                  n->bw_out,
3355                                  MAX_WINDOW_TIME_S);
3356   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3357                                  n->bw_in,
3358                                  MAX_WINDOW_TIME_S);  
3359 #if DEBUG_CORE
3360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3361               "Received connection from `%4s'.\n",
3362               GNUNET_i2s (&n->peer));
3363 #endif
3364   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3365   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3366   cnm.distance = htonl (n->last_distance);
3367   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3368   cnm.peer = *peer;
3369   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3370   GNUNET_TRANSPORT_set_quota (transport,
3371                               &n->peer,
3372                               n->bw_in,
3373                               n->bw_out,
3374                               GNUNET_TIME_UNIT_FOREVER_REL,
3375                               NULL, NULL);
3376   send_key (n); 
3377 }
3378
3379
3380 /**
3381  * Function called by transport telling us that a peer
3382  * disconnected.
3383  *
3384  * @param cls closure
3385  * @param peer the peer that disconnected
3386  */
3387 static void
3388 handle_transport_notify_disconnect (void *cls,
3389                                     const struct GNUNET_PeerIdentity *peer)
3390 {
3391   struct DisconnectNotifyMessage cnm;
3392   struct Neighbour *n;
3393
3394 #if DEBUG_CORE
3395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3396               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3397 #endif
3398   n = find_neighbour (peer);
3399   if (n == NULL)
3400     {
3401       GNUNET_break (0);
3402       return;
3403     }
3404   GNUNET_break (n->is_connected);
3405   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3406   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3407   cnm.peer = *peer;
3408   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3409   n->is_connected = GNUNET_NO;
3410 }
3411
3412
3413 /**
3414  * Last task run during shutdown.  Disconnects us from
3415  * the transport.
3416  */
3417 static void
3418 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3419 {
3420   struct Neighbour *n;
3421   struct Client *c;
3422
3423 #if DEBUG_CORE
3424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3425               "Core service shutting down.\n");
3426 #endif
3427   GNUNET_assert (transport != NULL);
3428   GNUNET_TRANSPORT_disconnect (transport);
3429   transport = NULL;
3430   while (NULL != (n = neighbours))
3431     {
3432       neighbours = n->next;
3433       GNUNET_assert (neighbour_count > 0);
3434       neighbour_count--;
3435       free_neighbour (n);
3436     }
3437   GNUNET_SERVER_notification_context_destroy (notifier);
3438   notifier = NULL;
3439   while (NULL != (c = clients))
3440     handle_client_disconnect (NULL, c->client_handle);
3441   if (my_private_key != NULL)
3442     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3443 }
3444
3445
3446 /**
3447  * Initiate core service.
3448  *
3449  * @param cls closure
3450  * @param s scheduler to use
3451  * @param serv the initialized server
3452  * @param c configuration to use
3453  */
3454 static void
3455 run (void *cls,
3456      struct GNUNET_SCHEDULER_Handle *s,
3457      struct GNUNET_SERVER_Handle *serv,
3458      const struct GNUNET_CONFIGURATION_Handle *c)
3459 {
3460   char *keyfile;
3461
3462   sched = s;
3463   cfg = c;  
3464   /* parse configuration */
3465   if (
3466        (GNUNET_OK !=
3467         GNUNET_CONFIGURATION_get_value_number (c,
3468                                                "CORE",
3469                                                "TOTAL_QUOTA_IN",
3470                                                &bandwidth_target_in_bps)) ||
3471        (GNUNET_OK !=
3472         GNUNET_CONFIGURATION_get_value_number (c,
3473                                                "CORE",
3474                                                "TOTAL_QUOTA_OUT",
3475                                                &bandwidth_target_out_bps)) ||
3476        (GNUNET_OK !=
3477         GNUNET_CONFIGURATION_get_value_filename (c,
3478                                                  "GNUNETD",
3479                                                  "HOSTKEY", &keyfile)))
3480     {
3481       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3482                   _
3483                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3484       GNUNET_SCHEDULER_shutdown (s);
3485       return;
3486     }
3487   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3488   GNUNET_free (keyfile);
3489   if (my_private_key == NULL)
3490     {
3491       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3492                   _("Core service could not access hostkey.  Exiting.\n"));
3493       GNUNET_SCHEDULER_shutdown (s);
3494       return;
3495     }
3496   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3497   GNUNET_CRYPTO_hash (&my_public_key,
3498                       sizeof (my_public_key), &my_identity.hashPubKey);
3499   /* setup notification */
3500   server = serv;
3501   notifier = GNUNET_SERVER_notification_context_create (server, 
3502                                                         MAX_NOTIFY_QUEUE);
3503   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3504   /* setup transport connection */
3505   transport = GNUNET_TRANSPORT_connect (sched,
3506                                         cfg,
3507                                         NULL,
3508                                         &handle_transport_receive,
3509                                         &handle_transport_notify_connect,
3510                                         &handle_transport_notify_disconnect);
3511   GNUNET_assert (NULL != transport);
3512   GNUNET_SCHEDULER_add_delayed (sched,
3513                                 GNUNET_TIME_UNIT_FOREVER_REL,
3514                                 &cleaning_task, NULL);
3515   /* process client requests */
3516   GNUNET_SERVER_add_handlers (server, handlers);
3517   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3518               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3519 }
3520
3521
3522
3523 /**
3524  * The main function for the transport service.
3525  *
3526  * @param argc number of arguments from the command line
3527  * @param argv command line arguments
3528  * @return 0 ok, 1 on error
3529  */
3530 int
3531 main (int argc, char *const *argv)
3532 {
3533   return (GNUNET_OK ==
3534           GNUNET_SERVICE_run (argc,
3535                               argv,
3536                               "core",
3537                               GNUNET_SERVICE_OPTION_NONE,
3538                               &run, NULL)) ? 0 : 1;
3539 }
3540
3541 /* end of gnunet-service-core.c */