fix
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_transport_service.h"
42 #include "core.h"
43
44
45 #define DEBUG_HANDSHAKE GNUNET_YES
46
47 #define DEBUG_CORE_QUOTA GNUNET_NO
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in seconds).
53  */
54 #define MAX_WINDOW_TIME_S (5 * 60)
55
56 /**
57  * How many messages do we queue up at most for optional
58  * notifications to a client?  (this can cause notifications
59  * about outgoing messages to be dropped).
60  */
61 #define MAX_NOTIFY_QUEUE 16
62
63 /**
64  * Minimum bandwidth (out) to assign to any connected peer.
65  * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
66  * sense.
67  */
68 #define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
69
70 /**
71  * After how much time past the "official" expiration time do
72  * we discard messages?  Should not be zero since we may 
73  * intentionally defer transmission until close to the deadline
74  * and then may be slightly past the deadline due to inaccuracy
75  * in sleep and our own CPU consumption.
76  */
77 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
78
79 /**
80  * How long do we delay messages to get larger packet sizes (CORKing)?
81  */
82 #define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
83
84 /**
85  * What is the maximum delay for a SET_KEY message?
86  */
87 #define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * What how long do we wait for SET_KEY confirmation initially?
91  */
92 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
93
94 /**
95  * What is the maximum delay for a PING message?
96  */
97 #define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
98
99 /**
100  * What is the maximum delay for a PONG message?
101  */
102 #define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
103
104 /**
105  * What is the minimum frequency for a PING message?
106  */
107 #define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
108
109 /**
110  * How often do we recalculate bandwidth quotas?
111  */
112 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
113
114 /**
115  * What is the priority for a SET_KEY message?
116  */
117 #define SET_KEY_PRIORITY 0xFFFFFF
118
119 /**
120  * What is the priority for a PING message?
121  */
122 #define PING_PRIORITY 0xFFFFFF
123
124 /**
125  * What is the priority for a PONG message?
126  */
127 #define PONG_PRIORITY 0xFFFFFF
128
129 /**
130  * How many messages do we queue per peer at most?  Must be at
131  * least two.
132  */
133 #define MAX_PEER_QUEUE_SIZE 16
134
135 /**
136  * How many non-mandatory messages do we queue per client at most?
137  */
138 #define MAX_CLIENT_QUEUE_SIZE 32
139
140 /**
141  * What is the maximum age of a message for us to consider
142  * processing it?  Note that this looks at the timestamp used
143  * by the other peer, so clock skew between machines does
144  * come into play here.  So this should be picked high enough
145  * so that a little bit of clock skew does not prevent peers
146  * from connecting to us.
147  */
148 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
149
150 /**
151  * What is the maximum size for encrypted messages?  Note that this
152  * number imposes a clear limit on the maximum size of any message.
153  * Set to a value close to 64k but not so close that transports will
154  * have trouble with their headers.
155  */
156 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
157
158
159 /**
160  * State machine for our P2P encryption handshake.  Everyone starts in
161  * "DOWN", if we receive the other peer's key (other peer initiated)
162  * we start in state RECEIVED (since we will immediately send our
163  * own); otherwise we start in SENT.  If we get back a PONG from
164  * within either state, we move up to CONFIRMED (the PONG will always
165  * be sent back encrypted with the key we sent to the other peer).
166  */
167 enum PeerStateMachine
168 {
169   PEER_STATE_DOWN,
170   PEER_STATE_KEY_SENT,
171   PEER_STATE_KEY_RECEIVED,
172   PEER_STATE_KEY_CONFIRMED
173 };
174
175
176 /**
177  * Number of bytes (at the beginning) of "struct EncryptedMessage"
178  * that are NOT encrypted.
179  */
180 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
181
182
183 /**
184  * Encapsulation for encrypted messages exchanged between
185  * peers.  Followed by the actual encrypted data.
186  */
187 struct EncryptedMessage
188 {
189   /**
190    * Message type is either CORE_ENCRYPTED_MESSAGE.
191    */
192   struct GNUNET_MessageHeader header;
193
194   /**
195    * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
196    * be set to the offset of the *next* field.
197    */
198   uint32_t iv_seed GNUNET_PACKED;
199
200   /**
201    * Hash of the plaintext (starting at 'sequence_number'), used to
202    * verify message integrity.  Everything after this hash (including
203    * this hash itself) will be encrypted.  
204    */
205   GNUNET_HashCode plaintext_hash;
206
207   /**
208    * Sequence number, in network byte order.  This field
209    * must be the first encrypted/decrypted field and the
210    * first byte that is hashed for the plaintext hash.
211    */
212   uint32_t sequence_number GNUNET_PACKED;
213
214   /**
215    * Desired bandwidth (how much we should send to this peer / how
216    * much is the sender willing to receive)?
217    */
218   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
219
220   /**
221    * Timestamp.  Used to prevent reply of ancient messages
222    * (recent messages are caught with the sequence number).
223    */
224   struct GNUNET_TIME_AbsoluteNBO timestamp;
225
226 };
227
228
229 /**
230  * We're sending an (encrypted) PING to the other peer to check if he
231  * can decrypt.  The other peer should respond with a PONG with the
232  * same content, except this time encrypted with the receiver's key.
233  */
234 struct PingMessage
235 {
236   /**
237    * Message type is CORE_PING.
238    */
239   struct GNUNET_MessageHeader header;
240
241   /**
242    * Random number chosen to make reply harder.
243    */
244   uint32_t challenge GNUNET_PACKED;
245
246   /**
247    * Intended target of the PING, used primarily to check
248    * that decryption actually worked.
249    */
250   struct GNUNET_PeerIdentity target;
251 };
252
253
254
255 /**
256  * Response to a PING.  Includes data from the original PING
257  * plus initial bandwidth quota information.
258  */
259 struct PongMessage
260 {
261   /**
262    * Message type is CORE_PONG.
263    */
264   struct GNUNET_MessageHeader header;
265
266   /**
267    * Random number proochosen to make reply harder.  Must be
268    * first field after header (this is where we start to encrypt!).
269    */
270   uint32_t challenge GNUNET_PACKED;
271
272   /**
273    * Must be zero.
274    */
275   uint32_t reserved GNUNET_PACKED;
276
277   /**
278    * Desired bandwidth (how much we should send to this
279    * peer / how much is the sender willing to receive).
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
282
283   /**
284    * Intended target of the PING, used primarily to check
285    * that decryption actually worked.
286    */
287   struct GNUNET_PeerIdentity target;
288 };
289
290
291 /**
292  * Message transmitted to set (or update) a session key.
293  */
294 struct SetKeyMessage
295 {
296
297   /**
298    * Message type is either CORE_SET_KEY.
299    */
300   struct GNUNET_MessageHeader header;
301
302   /**
303    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
304    */
305   int32_t sender_status GNUNET_PACKED;
306
307   /**
308    * Purpose of the signature, will be
309    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
310    */
311   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
312
313   /**
314    * At what time was this key created?
315    */
316   struct GNUNET_TIME_AbsoluteNBO creation_time;
317
318   /**
319    * The encrypted session key.
320    */
321   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
322
323   /**
324    * Who is the intended recipient?
325    */
326   struct GNUNET_PeerIdentity target;
327
328   /**
329    * Signature of the stuff above (starting at purpose).
330    */
331   struct GNUNET_CRYPTO_RsaSignature signature;
332
333 };
334
335
336 /**
337  * Message waiting for transmission. This struct
338  * is followed by the actual content of the message.
339  */
340 struct MessageEntry
341 {
342
343   /**
344    * We keep messages in a doubly linked list.
345    */
346   struct MessageEntry *next;
347
348   /**
349    * We keep messages in a doubly linked list.
350    */
351   struct MessageEntry *prev;
352
353   /**
354    * By when are we supposed to transmit this message?
355    */
356   struct GNUNET_TIME_Absolute deadline;
357
358   /**
359    * By when are we supposed to transmit this message (after
360    * giving slack)?
361    */
362   struct GNUNET_TIME_Absolute slack_deadline;
363
364   /**
365    * How important is this message to us?
366    */
367   unsigned int priority;
368
369   /**
370    * How long is the message? (number of bytes following
371    * the "struct MessageEntry", but not including the
372    * size of "struct MessageEntry" itself!)
373    */
374   uint16_t size;
375
376   /**
377    * Was this message selected for transmission in the
378    * current round? GNUNET_YES or GNUNET_NO.
379    */
380   int8_t do_transmit;
381
382   /**
383    * Did we give this message some slack (delayed sending) previously
384    * (and hence should not give it any more slack)? GNUNET_YES or
385    * GNUNET_NO.
386    */
387   int8_t got_slack;
388
389 };
390
391
392 struct Neighbour
393 {
394   /**
395    * We keep neighbours in a linked list (for now).
396    */
397   struct Neighbour *next;
398
399   /**
400    * Unencrypted messages destined for this peer.
401    */
402   struct MessageEntry *messages;
403
404   /**
405    * Head of the batched, encrypted message queue (already ordered,
406    * transmit starting with the head).
407    */
408   struct MessageEntry *encrypted_head;
409
410   /**
411    * Tail of the batched, encrypted message queue (already ordered,
412    * append new messages to tail)
413    */
414   struct MessageEntry *encrypted_tail;
415
416   /**
417    * Handle for pending requests for transmission to this peer
418    * with the transport service.  NULL if no request is pending.
419    */
420   struct GNUNET_TRANSPORT_TransmitHandle *th;
421
422   /**
423    * Public key of the neighbour, NULL if we don't have it yet.
424    */
425   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
426
427   /**
428    * We received a PING message before we got the "public_key"
429    * (or the SET_KEY).  We keep it here until we have a key
430    * to decrypt it.  NULL if no PING is pending.
431    */
432   struct PingMessage *pending_ping;
433
434   /**
435    * We received a PONG message before we got the "public_key"
436    * (or the SET_KEY).  We keep it here until we have a key
437    * to decrypt it.  NULL if no PONG is pending.
438    */
439   struct PongMessage *pending_pong;
440
441   /**
442    * Non-NULL if we are currently looking up HELLOs for this peer.
443    * for this peer.
444    */
445   struct GNUNET_PEERINFO_IteratorContext *pitr;
446
447   /**
448    * SetKeyMessage to transmit, NULL if we are not currently trying
449    * to send one.
450    */
451   struct SetKeyMessage *skm;
452
453   /**
454    * Identity of the neighbour.
455    */
456   struct GNUNET_PeerIdentity peer;
457
458   /**
459    * Key we use to encrypt our messages for the other peer
460    * (initialized by us when we do the handshake).
461    */
462   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
463
464   /**
465    * Key we use to decrypt messages from the other peer
466    * (given to us by the other peer during the handshake).
467    */
468   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
469
470   /**
471    * ID of task used for re-trying plaintext scheduling.
472    */
473   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
474
475   /**
476    * ID of task used for re-trying SET_KEY and PING message.
477    */
478   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
479
480   /**
481    * ID of task used for updating bandwidth quota for this neighbour.
482    */
483   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
484
485   /**
486    * ID of task used for sending keep-alive pings.
487    */
488   GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
489
490   /**
491    * ID of task used for cleaning up dead neighbour entries.
492    */
493   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
494
495   /**
496    * At what time did we generate our encryption key?
497    */
498   struct GNUNET_TIME_Absolute encrypt_key_created;
499
500   /**
501    * At what time did the other peer generate the decryption key?
502    */
503   struct GNUNET_TIME_Absolute decrypt_key_created;
504
505   /**
506    * At what time did we initially establish (as in, complete session
507    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
508    */
509   struct GNUNET_TIME_Absolute time_established;
510
511   /**
512    * At what time did we last receive an encrypted message from the
513    * other peer?  Should be zero if status != KEY_CONFIRMED.
514    */
515   struct GNUNET_TIME_Absolute last_activity;
516
517   /**
518    * Last latency observed from this peer.
519    */
520   struct GNUNET_TIME_Relative last_latency;
521
522   /**
523    * At what frequency are we currently re-trying SET_KEY messages?
524    */
525   struct GNUNET_TIME_Relative set_key_retry_frequency;
526
527   /**
528    * Tracking bandwidth for sending to this peer.
529    */
530   struct GNUNET_BANDWIDTH_Tracker available_send_window;
531
532   /**
533    * Tracking bandwidth for receiving from this peer.
534    */
535   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
536
537   /**
538    * How valueable were the messages of this peer recently?
539    */
540   unsigned long long current_preference;
541
542   /**
543    * Bit map indicating which of the 32 sequence numbers before the last
544    * were received (good for accepting out-of-order packets and
545    * estimating reliability of the connection)
546    */
547   unsigned int last_packets_bitmap;
548
549   /**
550    * last sequence number received on this connection (highest)
551    */
552   uint32_t last_sequence_number_received;
553
554   /**
555    * last sequence number transmitted
556    */
557   uint32_t last_sequence_number_sent;
558
559   /**
560    * Available bandwidth in for this peer (current target).
561    */
562   struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
563
564   /**
565    * Available bandwidth out for this peer (current target).
566    */
567   struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
568
569   /**
570    * Internal bandwidth limit set for this peer (initially typically
571    * set to "-1").  Actual "bw_out" is MIN of
572    * "bpm_out_internal_limit" and "bw_out_external_limit".
573    */
574   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
575
576   /**
577    * External bandwidth limit set for this peer by the
578    * peer that we are communicating with.  "bw_out" is MIN of
579    * "bw_out_internal_limit" and "bw_out_external_limit".
580    */
581   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
582
583   /**
584    * What was our PING challenge number (for this peer)?
585    */
586   uint32_t ping_challenge;
587
588   /**
589    * What was the last distance to this peer as reported by the transports?
590    */
591   uint32_t last_distance;
592
593   /**
594    * What is our connection status?
595    */
596   enum PeerStateMachine status;
597
598   /**
599    * Are we currently connected to this neighbour?
600    */ 
601   int is_connected;
602
603 };
604
605
606 /**
607  * Data structure for each client connected to the core service.
608  */
609 struct Client
610 {
611   /**
612    * Clients are kept in a linked list.
613    */
614   struct Client *next;
615
616   /**
617    * Handle for the client with the server API.
618    */
619   struct GNUNET_SERVER_Client *client_handle;
620
621   /**
622    * Array of the types of messages this peer cares
623    * about (with "tcnt" entries).  Allocated as part
624    * of this client struct, do not free!
625    */
626   const uint16_t *types;
627
628   /**
629    * Options for messages this client cares about,
630    * see GNUNET_CORE_OPTION_ values.
631    */
632   uint32_t options;
633
634   /**
635    * Number of types of incoming messages this client
636    * specifically cares about.  Size of the "types" array.
637    */
638   unsigned int tcnt;
639
640 };
641
642
643 /**
644  * Our public key.
645  */
646 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
647
648 /**
649  * Our identity.
650  */
651 static struct GNUNET_PeerIdentity my_identity;
652
653 /**
654  * Our private key.
655  */
656 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
657
658 /**
659  * Our scheduler.
660  */
661 struct GNUNET_SCHEDULER_Handle *sched;
662
663 /**
664  * Our configuration.
665  */
666 const struct GNUNET_CONFIGURATION_Handle *cfg;
667
668 /**
669  * Our server.
670  */
671 static struct GNUNET_SERVER_Handle *server;
672
673 /**
674  * Transport service.
675  */
676 static struct GNUNET_TRANSPORT_Handle *transport;
677
678 /**
679  * Linked list of our clients.
680  */
681 static struct Client *clients;
682
683 /**
684  * Context for notifications we need to send to our clients.
685  */
686 static struct GNUNET_SERVER_NotificationContext *notifier;
687
688 /**
689  * We keep neighbours in a linked list (for now).
690  */
691 static struct Neighbour *neighbours;
692
693 /**
694  * For creating statistics.
695  */
696 static struct GNUNET_STATISTICS_Handle *stats;
697
698 /**
699  * Sum of all preferences among all neighbours.
700  */
701 static unsigned long long preference_sum;
702
703 /**
704  * Total number of neighbours we have.
705  */
706 static unsigned int neighbour_count;
707
708 /**
709  * How much inbound bandwidth are we supposed to be using per second?
710  * FIXME: this value is not used!
711  */
712 static unsigned long long bandwidth_target_in_bps;
713
714 /**
715  * How much outbound bandwidth are we supposed to be using per second?
716  */
717 static unsigned long long bandwidth_target_out_bps;
718
719
720
721 /**
722  * A preference value for a neighbour was update.  Update
723  * the preference sum accordingly.
724  *
725  * @param inc how much was a preference value increased?
726  */
727 static void
728 update_preference_sum (unsigned long long inc)
729 {
730   struct Neighbour *n;
731   unsigned long long os;
732
733   os = preference_sum;
734   preference_sum += inc;
735   if (preference_sum >= os)
736     return; /* done! */
737   /* overflow! compensate by cutting all values in half! */
738   preference_sum = 0;
739   n = neighbours;
740   while (n != NULL)
741     {
742       n->current_preference /= 2;
743       preference_sum += n->current_preference;
744       n = n->next;
745     }    
746   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), preference_sum, GNUNET_NO);
747 }
748
749
750 /**
751  * Find the entry for the given neighbour.
752  *
753  * @param peer identity of the neighbour
754  * @return NULL if we are not connected, otherwise the
755  *         neighbour's entry.
756  */
757 static struct Neighbour *
758 find_neighbour (const struct GNUNET_PeerIdentity *peer)
759 {
760   struct Neighbour *ret;
761
762   ret = neighbours;
763   while ((ret != NULL) &&
764          (0 != memcmp (&ret->peer,
765                        peer, sizeof (struct GNUNET_PeerIdentity))))
766     ret = ret->next;
767   return ret;
768 }
769
770
771 /**
772  * Send a message to one of our clients.
773  *
774  * @param client target for the message
775  * @param msg message to transmit
776  * @param can_drop could this message be dropped if the
777  *        client's queue is getting too large?
778  */
779 static void
780 send_to_client (struct Client *client,
781                 const struct GNUNET_MessageHeader *msg, 
782                 int can_drop)
783 {
784 #if DEBUG_CORE_CLIENT
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786               "Preparing to send message of type %u to client.\n",
787               ntohs (msg->type));
788 #endif  
789   GNUNET_SERVER_notification_context_unicast (notifier,
790                                               client->client_handle,
791                                               msg,
792                                               can_drop);
793 }
794
795
796 /**
797  * Send a message to all of our current clients that have
798  * the right options set.
799  * 
800  * @param msg message to multicast
801  * @param can_drop can this message be discarded if the queue is too long
802  * @param options mask to use 
803  */
804 static void
805 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
806                      int can_drop,
807                      int options)
808 {
809   struct Client *c;
810
811   c = clients;
812   while (c != NULL)
813     {
814       if (0 != (c->options & options))
815         {
816 #if DEBUG_CORE_CLIENT
817           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
818                       "Sending message of type %u to client.\n",
819                       ntohs (msg->type));
820 #endif
821           send_to_client (c, msg, can_drop);
822         }
823       c = c->next;
824     }
825 }
826
827
828 /**
829  * Handle CORE_INIT request.
830  */
831 static void
832 handle_client_init (void *cls,
833                     struct GNUNET_SERVER_Client *client,
834                     const struct GNUNET_MessageHeader *message)
835 {
836   const struct InitMessage *im;
837   struct InitReplyMessage irm;
838   struct Client *c;
839   uint16_t msize;
840   const uint16_t *types;
841   uint16_t *wtypes;
842   struct Neighbour *n;
843   struct ConnectNotifyMessage cnm;
844   unsigned int i;
845
846 #if DEBUG_CORE_CLIENT
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "Client connecting to core service with `%s' message\n",
849               "INIT");
850 #endif
851   /* check that we don't have an entry already */
852   c = clients;
853   while (c != NULL)
854     {
855       if (client == c->client_handle)
856         {
857           GNUNET_break (0);
858           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
859           return;
860         }
861       c = c->next;
862     }
863   msize = ntohs (message->size);
864   if (msize < sizeof (struct InitMessage))
865     {
866       GNUNET_break (0);
867       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
868       return;
869     }
870   GNUNET_SERVER_notification_context_add (notifier, client);
871   im = (const struct InitMessage *) message;
872   types = (const uint16_t *) &im[1];
873   msize -= sizeof (struct InitMessage);
874   c = GNUNET_malloc (sizeof (struct Client) + msize);
875   c->client_handle = client;
876   c->next = clients;
877   clients = c;
878   c->tcnt = msize / sizeof (uint16_t);
879   c->types = (const uint16_t *) &c[1];
880   wtypes = (uint16_t *) &c[1];
881   for (i=0;i<c->tcnt;i++)
882     wtypes[i] = ntohs (types[i]);
883   c->options = ntohl (im->options);
884 #if DEBUG_CORE_CLIENT
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "Client %p is interested in %u message types\n",
887               c,
888               c->tcnt);
889 #endif
890   /* send init reply message */
891   irm.header.size = htons (sizeof (struct InitReplyMessage));
892   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
893   irm.reserved = htonl (0);
894   memcpy (&irm.publicKey,
895           &my_public_key,
896           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
897 #if DEBUG_CORE_CLIENT
898   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
899               "Sending `%s' message to client.\n", "INIT_REPLY");
900 #endif
901   send_to_client (c, &irm.header, GNUNET_NO);
902   if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
903     {
904       /* notify new client about existing neighbours */
905       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
906       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
907       n = neighbours;
908       while (n != NULL)
909         {
910           if (n->status == PEER_STATE_KEY_CONFIRMED)
911             {
912 #if DEBUG_CORE_CLIENT
913               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                           "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
915 #endif
916               cnm.distance = htonl (n->last_distance);
917               cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
918               cnm.peer = n->peer;
919               send_to_client (c, &cnm.header, GNUNET_NO);
920             }
921           n = n->next;
922         }
923     }
924   GNUNET_SERVER_receive_done (client, GNUNET_OK);
925 }
926
927
928 /**
929  * A client disconnected, clean up.
930  *
931  * @param cls closure
932  * @param client identification of the client
933  */
934 static void
935 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
936 {
937   struct Client *pos;
938   struct Client *prev;
939
940   if (client == NULL)
941     return;
942 #if DEBUG_CORE_CLIENT
943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944               "Client %p has disconnected from core service.\n",
945               client);
946 #endif
947   prev = NULL;
948   pos = clients;
949   while (pos != NULL)
950     {
951       if (client == pos->client_handle)
952         {
953           if (prev == NULL)
954             clients = pos->next;
955           else
956             prev->next = pos->next;
957           GNUNET_free (pos);
958           return;
959         }
960       prev = pos;
961       pos = pos->next;
962     }
963   /* client never sent INIT */
964 }
965
966
967 /**
968  * Handle REQUEST_INFO request.
969  */
970 static void
971 handle_client_request_info (void *cls,
972                             struct GNUNET_SERVER_Client *client,
973                             const struct GNUNET_MessageHeader *message)
974 {
975   const struct RequestInfoMessage *rcm;
976   struct Neighbour *n;
977   struct ConfigurationInfoMessage cim;
978   int32_t want_reserv;
979   int32_t got_reserv;
980   unsigned long long old_preference;
981   struct GNUNET_SERVER_TransmitContext *tc;
982
983 #if DEBUG_CORE_CLIENT
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               "Core service receives `%s' request.\n", "REQUEST_INFO");
986 #endif
987   rcm = (const struct RequestInfoMessage *) message;
988   n = find_neighbour (&rcm->peer);
989   memset (&cim, 0, sizeof (cim));
990   if (n != NULL) 
991     {
992       want_reserv = ntohl (rcm->reserve_inbound);
993       if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
994         {
995           n->bw_out_internal_limit = rcm->limit_outbound;
996           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
997                                                   n->bw_out_external_limit);
998           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
999                                                  n->bw_out);
1000           GNUNET_TRANSPORT_set_quota (transport,
1001                                       &n->peer,
1002                                       n->bw_in,
1003                                       n->bw_out,
1004                                       GNUNET_TIME_UNIT_FOREVER_REL,
1005                                       NULL, NULL); 
1006         }
1007       if (want_reserv < 0)
1008         {
1009           got_reserv = want_reserv;
1010         }
1011       else if (want_reserv > 0)
1012         {
1013           if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1014                                                   want_reserv).value == 0)
1015             got_reserv = want_reserv;
1016           else
1017             got_reserv = 0; /* all or nothing */
1018         }
1019       else
1020         got_reserv = 0;
1021       GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
1022                                         got_reserv);
1023       old_preference = n->current_preference;
1024       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1025       if (old_preference > n->current_preference) 
1026         {
1027           /* overflow; cap at maximum value */
1028           n->current_preference = (unsigned long long) -1;
1029         }
1030       update_preference_sum (n->current_preference - old_preference);
1031 #if DEBUG_CORE_QUOTA
1032       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                   "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
1034                   (int) want_reserv,
1035                   GNUNET_i2s (&rcm->peer),
1036                   (int) got_reserv);
1037 #endif
1038       cim.reserved_amount = htonl (got_reserv);
1039       cim.bw_in = n->bw_in;
1040       cim.bw_out = n->bw_out;
1041       cim.preference = n->current_preference;
1042     }
1043   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1044   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1045   cim.peer = rcm->peer;
1046
1047 #if DEBUG_CORE_CLIENT
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1050 #endif
1051   tc = GNUNET_SERVER_transmit_context_create (client);
1052   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
1053   GNUNET_SERVER_transmit_context_run (tc,
1054                                       GNUNET_TIME_UNIT_FOREVER_REL);
1055 }
1056
1057
1058 /**
1059  * Free the given entry for the neighbour (it has
1060  * already been removed from the list at this point).
1061  *
1062  * @param n neighbour to free
1063  */
1064 static void
1065 free_neighbour (struct Neighbour *n)
1066 {
1067   struct MessageEntry *m;
1068
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "Destroying neighbour entry for peer `%4s'\n",
1071               GNUNET_i2s (&n->peer));
1072   if (n->pitr != NULL)
1073     {
1074       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1075       n->pitr = NULL;
1076     }
1077   if (n->skm != NULL)
1078     {
1079       GNUNET_free (n->skm);
1080       n->skm = NULL;
1081     }
1082   while (NULL != (m = n->messages))
1083     {
1084       n->messages = m->next;
1085       GNUNET_free (m);
1086     }
1087   while (NULL != (m = n->encrypted_head))
1088     {
1089       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1090                                    n->encrypted_tail,
1091                                    m);
1092       GNUNET_free (m);
1093     }
1094   if (NULL != n->th)
1095     {
1096       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1097       n->th = NULL;
1098     }
1099   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1100     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1101   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1102     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1103   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1104     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1105   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1106     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1107   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)    
1108       GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
1109   if (n->status == PEER_STATE_KEY_CONFIRMED)
1110     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), -1, GNUNET_NO);
1111   GNUNET_free_non_null (n->public_key);
1112   GNUNET_free_non_null (n->pending_ping);
1113   GNUNET_free_non_null (n->pending_pong);
1114   GNUNET_free (n);
1115 }
1116
1117
1118 /**
1119  * Check if we have encrypted messages for the specified neighbour
1120  * pending, and if so, check with the transport about sending them
1121  * out.
1122  *
1123  * @param n neighbour to check.
1124  */
1125 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1126
1127
1128 /**
1129  * Encrypt size bytes from in and write the result to out.  Use the
1130  * key for outbound traffic of the given neighbour.
1131  *
1132  * @param n neighbour we are sending to
1133  * @param iv initialization vector to use
1134  * @param in ciphertext
1135  * @param out plaintext
1136  * @param size size of in/out
1137  * @return GNUNET_OK on success
1138  */
1139 static int
1140 do_encrypt (struct Neighbour *n,
1141             const GNUNET_HashCode * iv,
1142             const void *in, void *out, size_t size)
1143 {
1144   if (size != (uint16_t) size)
1145     {
1146       GNUNET_break (0);
1147       return GNUNET_NO;
1148     }
1149   GNUNET_assert (size ==
1150                  GNUNET_CRYPTO_aes_encrypt (in,
1151                                             (uint16_t) size,
1152                                             &n->encrypt_key,
1153                                             (const struct
1154                                              GNUNET_CRYPTO_AesInitializationVector
1155                                              *) iv, out));
1156   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes encrypted"), size, GNUNET_NO);
1157 #if DEBUG_CORE
1158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159               "Encrypted %u bytes for `%4s' using key %u\n", size,
1160               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1161 #endif
1162   return GNUNET_OK;
1163 }
1164
1165
1166 /**
1167  * Consider freeing the given neighbour since we may not need
1168  * to keep it around anymore.
1169  *
1170  * @param n neighbour to consider discarding
1171  */
1172 static void
1173 consider_free_neighbour (struct Neighbour *n);
1174
1175
1176 /**
1177  * Task triggered when a neighbour entry is about to time out 
1178  * (and we should prevent this by sending a PING).
1179  *
1180  * @param cls the 'struct Neighbour'
1181  * @param tc scheduler context (not used)
1182  */
1183 static void
1184 send_keep_alive (void *cls,
1185                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1186 {
1187   struct Neighbour *n = cls;
1188   struct GNUNET_TIME_Relative retry;
1189   struct GNUNET_TIME_Relative left;
1190   struct MessageEntry *me;
1191   struct PingMessage pp;
1192   struct PingMessage *pm;
1193
1194   n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
1195   /* send PING */
1196   me = GNUNET_malloc (sizeof (struct MessageEntry) +
1197                       sizeof (struct PingMessage));
1198   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
1199   me->priority = PING_PRIORITY;
1200   me->size = sizeof (struct PingMessage);
1201   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1202                                      n->encrypted_tail,
1203                                      n->encrypted_tail,
1204                                      me);
1205   pm = (struct PingMessage *) &me[1];
1206   pm->header.size = htons (sizeof (struct PingMessage));
1207   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
1208   pp.challenge = htonl (n->ping_challenge);
1209   pp.target = n->peer;
1210 #if DEBUG_CORE
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Encrypting `%s' and `%s' messages for `%4s'.\n",
1213               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
1216               "PING",
1217               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
1218 #endif
1219   do_encrypt (n,
1220               &n->peer.hashPubKey,
1221               &pp.challenge,
1222               &pm->challenge,
1223               sizeof (struct PingMessage) -
1224               sizeof (struct GNUNET_MessageHeader));
1225   process_encrypted_neighbour_queue (n);
1226   /* reschedule PING job */
1227   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1228                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1229   retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
1230                                     MIN_PING_FREQUENCY);
1231   n->keep_alive_task 
1232     = GNUNET_SCHEDULER_add_delayed (sched, 
1233                                     retry,
1234                                     &send_keep_alive,
1235                                     n);
1236
1237 }
1238
1239
1240 /**
1241  * Task triggered when a neighbour entry might have gotten stale.
1242  *
1243  * @param cls the 'struct Neighbour'
1244  * @param tc scheduler context (not used)
1245  */
1246 static void
1247 consider_free_task (void *cls,
1248                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1249 {
1250   struct Neighbour *n = cls;
1251
1252   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1253   consider_free_neighbour (n);
1254 }
1255
1256
1257 /**
1258  * Consider freeing the given neighbour since we may not need
1259  * to keep it around anymore.
1260  *
1261  * @param n neighbour to consider discarding
1262  */
1263 static void
1264 consider_free_neighbour (struct Neighbour *n)
1265
1266   struct Neighbour *pos;
1267   struct Neighbour *prev;
1268   struct GNUNET_TIME_Relative left;
1269
1270   if ( (n->th != NULL) ||
1271        (n->pitr != NULL) ||
1272        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1273        (GNUNET_YES == n->is_connected) )
1274     return; /* no chance */
1275   
1276   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1277                                                                        GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
1278   if (left.value > 0)
1279     {
1280       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1281         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1282       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1283                                                          left,
1284                                                          &consider_free_task,
1285                                                          n);
1286       return;
1287     }
1288   /* actually free the neighbour... */
1289   prev = NULL;
1290   pos = neighbours;
1291   while (pos != n)
1292     {
1293       prev = pos;
1294       pos = pos->next;
1295     }
1296   if (prev == NULL)
1297     neighbours = n->next;
1298   else
1299     prev->next = n->next;
1300   GNUNET_assert (neighbour_count > 0);
1301   neighbour_count--;
1302   GNUNET_STATISTICS_set (stats,
1303                          gettext_noop ("# active neighbours"), 
1304                          neighbour_count,
1305                          GNUNET_NO);
1306   free_neighbour (n);
1307 }
1308
1309
1310 /**
1311  * Function called when the transport service is ready to
1312  * receive an encrypted message for the respective peer
1313  *
1314  * @param cls neighbour to use message from
1315  * @param size number of bytes we can transmit
1316  * @param buf where to copy the message
1317  * @return number of bytes transmitted
1318  */
1319 static size_t
1320 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1321 {
1322   struct Neighbour *n = cls;
1323   struct MessageEntry *m;
1324   size_t ret;
1325   char *cbuf;
1326
1327   n->th = NULL;
1328   m = n->encrypted_head;
1329   if (m == NULL)
1330     return 0;
1331   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1332                                n->encrypted_tail,
1333                                m);
1334   ret = 0;
1335   cbuf = buf;
1336   if (buf != NULL)
1337     {
1338       GNUNET_assert (size >= m->size);
1339       memcpy (cbuf, &m[1], m->size);
1340       ret = m->size;
1341       GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
1342                                         m->size);
1343 #if DEBUG_CORE
1344       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1346                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1347                   ret, GNUNET_i2s (&n->peer));
1348 #endif
1349       process_encrypted_neighbour_queue (n);
1350     }
1351   else
1352     {
1353 #if DEBUG_CORE
1354       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355                   "Transmission of message of type %u and size %u failed\n",
1356                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1357                   m->size);
1358 #endif
1359     }
1360   GNUNET_free (m);
1361   consider_free_neighbour (n);
1362   return ret;
1363 }
1364
1365
1366 /**
1367  * Check if we have plaintext messages for the specified neighbour
1368  * pending, and if so, consider batching and encrypting them (and
1369  * then trigger processing of the encrypted queue if needed).
1370  *
1371  * @param n neighbour to check.
1372  */
1373 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1374
1375
1376 /**
1377  * Check if we have encrypted messages for the specified neighbour
1378  * pending, and if so, check with the transport about sending them
1379  * out.
1380  *
1381  * @param n neighbour to check.
1382  */
1383 static void
1384 process_encrypted_neighbour_queue (struct Neighbour *n)
1385 {
1386   struct MessageEntry *m;
1387  
1388   if (n->th != NULL)
1389     return;  /* request already pending */
1390   m = n->encrypted_head;
1391   if (m == NULL)
1392     {
1393       /* encrypted queue empty, try plaintext instead */
1394       process_plaintext_neighbour_queue (n);
1395       return;
1396     }
1397 #if DEBUG_CORE
1398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1400               m->size,
1401               GNUNET_i2s (&n->peer),
1402               GNUNET_TIME_absolute_get_remaining (m->deadline).
1403               value);
1404 #endif
1405   n->th =
1406     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1407                                             m->size,
1408                                             m->priority,
1409                                             GNUNET_TIME_absolute_get_remaining
1410                                             (m->deadline),
1411                                             &notify_encrypted_transmit_ready,
1412                                             n);
1413   if (n->th == NULL)
1414     {
1415       /* message request too large or duplicate request */
1416       GNUNET_break (0);
1417       /* discard encrypted message */
1418       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1419                                    n->encrypted_tail,
1420                                    m);
1421       GNUNET_free (m);
1422       process_encrypted_neighbour_queue (n);
1423     }
1424 }
1425
1426
1427 /**
1428  * Decrypt size bytes from in and write the result to out.  Use the
1429  * key for inbound traffic of the given neighbour.  This function does
1430  * NOT do any integrity-checks on the result.
1431  *
1432  * @param n neighbour we are receiving from
1433  * @param iv initialization vector to use
1434  * @param in ciphertext
1435  * @param out plaintext
1436  * @param size size of in/out
1437  * @return GNUNET_OK on success
1438  */
1439 static int
1440 do_decrypt (struct Neighbour *n,
1441             const GNUNET_HashCode * iv,
1442             const void *in, void *out, size_t size)
1443 {
1444   if (size != (uint16_t) size)
1445     {
1446       GNUNET_break (0);
1447       return GNUNET_NO;
1448     }
1449   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1450       (n->status != PEER_STATE_KEY_CONFIRMED))
1451     {
1452       GNUNET_break_op (0);
1453       return GNUNET_SYSERR;
1454     }
1455   if (size !=
1456       GNUNET_CRYPTO_aes_decrypt (in,
1457                                  (uint16_t) size,
1458                                  &n->decrypt_key,
1459                                  (const struct
1460                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1461                                  out))
1462     {
1463       GNUNET_break (0);
1464       return GNUNET_SYSERR;
1465     }
1466   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes decrypted"), size, GNUNET_NO);
1467 #if DEBUG_CORE
1468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1469               "Decrypted %u bytes from `%4s' using key %u\n",
1470               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1471 #endif
1472   return GNUNET_OK;
1473 }
1474
1475
1476 /**
1477  * Select messages for transmission.  This heuristic uses a combination
1478  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1479  * and priority-based discard (in case no feasible schedule exist) and
1480  * speculative optimization (defer any kind of transmission until
1481  * we either create a batch of significant size, 25% of max, or until
1482  * we are close to a deadline).  Furthermore, when scheduling the
1483  * heuristic also packs as many messages into the batch as possible,
1484  * starting with those with the earliest deadline.  Yes, this is fun.
1485  *
1486  * @param n neighbour to select messages from
1487  * @param size number of bytes to select for transmission
1488  * @param retry_time set to the time when we should try again
1489  *        (only valid if this function returns zero)
1490  * @return number of bytes selected, or 0 if we decided to
1491  *         defer scheduling overall; in that case, retry_time is set.
1492  */
1493 static size_t
1494 select_messages (struct Neighbour *n,
1495                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1496 {
1497   struct MessageEntry *pos;
1498   struct MessageEntry *min;
1499   struct MessageEntry *last;
1500   unsigned int min_prio;
1501   struct GNUNET_TIME_Absolute t;
1502   struct GNUNET_TIME_Absolute now;
1503   struct GNUNET_TIME_Relative delta;
1504   uint64_t avail;
1505   struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
1506   size_t off;
1507   uint64_t tsize;
1508   unsigned int queue_size;
1509   int discard_low_prio;
1510
1511   GNUNET_assert (NULL != n->messages);
1512   now = GNUNET_TIME_absolute_get ();
1513   /* last entry in linked list of messages processed */
1514   last = NULL;
1515   /* should we remove the entry with the lowest
1516      priority from consideration for scheduling at the
1517      end of the loop? */
1518   queue_size = 0;
1519   tsize = 0;
1520   pos = n->messages;
1521   while (pos != NULL)
1522     {
1523       queue_size++;
1524       tsize += pos->size;
1525       pos = pos->next;
1526     }
1527   discard_low_prio = GNUNET_YES;
1528   while (GNUNET_YES == discard_low_prio)
1529     {
1530       min = NULL;
1531       min_prio = -1;
1532       discard_low_prio = GNUNET_NO;
1533       /* calculate number of bytes available for transmission at time "t" */
1534       avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
1535       t = now;
1536       /* how many bytes have we (hypothetically) scheduled so far */
1537       off = 0;
1538       /* maximum time we can wait before transmitting anything
1539          and still make all of our deadlines */
1540       slack = MAX_CORK_DELAY;
1541       pos = n->messages;
1542       /* note that we use "*2" here because we want to look
1543          a bit further into the future; much more makes no
1544          sense since new message might be scheduled in the
1545          meantime... */
1546       while ((pos != NULL) && (off < size * 2))
1547         {         
1548           if (pos->do_transmit == GNUNET_YES)
1549             {
1550               /* already removed from consideration */
1551               pos = pos->next;
1552               continue;
1553             }
1554           if (discard_low_prio == GNUNET_NO)
1555             {
1556               delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
1557               if (delta.value > 0)
1558                 {
1559                   // FIXME: HUH? Check!
1560                   t = pos->deadline;
1561                   avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
1562                                                                        delta);
1563                 }
1564               if (avail < pos->size)
1565                 {
1566                   // FIXME: HUH? Check!
1567                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1568                 }
1569               else
1570                 {
1571                   avail -= pos->size;
1572                   /* update slack, considering both its absolute deadline
1573                      and relative deadlines caused by other messages
1574                      with their respective load */
1575                   slack = GNUNET_TIME_relative_min (slack,
1576                                                     GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
1577                                                                                           avail));
1578                   if (pos->deadline.value <= now.value) 
1579                     {
1580                       /* now or never */
1581                       slack = GNUNET_TIME_UNIT_ZERO;
1582                     }
1583                   else if (GNUNET_YES == pos->got_slack)
1584                     {
1585                       /* should be soon now! */
1586                       slack = GNUNET_TIME_relative_min (slack,
1587                                                         GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
1588                     }
1589                   else
1590                     {
1591                       slack =
1592                         GNUNET_TIME_relative_min (slack, 
1593                                                   GNUNET_TIME_absolute_get_difference (now, pos->deadline));
1594                       pos->got_slack = GNUNET_YES;
1595                       pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
1596                                                                       GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
1597                     }
1598                 }
1599             }
1600           off += pos->size;
1601           t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
1602           if (pos->priority <= min_prio)
1603             {
1604               /* update min for discard */
1605               min_prio = pos->priority;
1606               min = pos;
1607             }
1608           pos = pos->next;
1609         }
1610       if (discard_low_prio)
1611         {
1612           GNUNET_assert (min != NULL);
1613           /* remove lowest-priority entry from consideration */
1614           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1615         }
1616       last = pos;
1617     }
1618   /* guard against sending "tiny" messages with large headers without
1619      urgent deadlines */
1620   if ( (slack.value > 0) && 
1621        (size > 4 * off) &&
1622        (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
1623     {
1624       /* less than 25% of message would be filled with deadlines still
1625          being met if we delay by one second or more; so just wait for
1626          more data; but do not wait longer than 1s (since we don't want
1627          to delay messages for a really long time either). */
1628       *retry_time = MAX_CORK_DELAY;
1629       /* reset do_transmit values for next time */
1630       while (pos != last)
1631         {
1632           pos->do_transmit = GNUNET_NO;   
1633           pos = pos->next;
1634         }
1635 #if DEBUG_CORE
1636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                   "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
1638                   (unsigned long long) slack.value,
1639                   (unsigned int) off,
1640                   (unsigned int) size);
1641 #endif
1642       return 0;
1643     }
1644   /* select marked messages (up to size) for transmission */
1645   off = 0;
1646   pos = n->messages;
1647   while (pos != last)
1648     {
1649       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1650         {
1651           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1652           off += pos->size;
1653           size -= pos->size;
1654         }
1655       else
1656         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1657       pos = pos->next;
1658     }
1659 #if DEBUG_CORE
1660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661               "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
1662               off, tsize,
1663               queue_size, MAX_PEER_QUEUE_SIZE,
1664               GNUNET_i2s (&n->peer));
1665 #endif
1666   return off;
1667 }
1668
1669
1670 /**
1671  * Batch multiple messages into a larger buffer.
1672  *
1673  * @param n neighbour to take messages from
1674  * @param buf target buffer
1675  * @param size size of buf
1676  * @param deadline set to transmission deadline for the result
1677  * @param retry_time set to the time when we should try again
1678  *        (only valid if this function returns zero)
1679  * @param priority set to the priority of the batch
1680  * @return number of bytes written to buf (can be zero)
1681  */
1682 static size_t
1683 batch_message (struct Neighbour *n,
1684                char *buf,
1685                size_t size,
1686                struct GNUNET_TIME_Absolute *deadline,
1687                struct GNUNET_TIME_Relative *retry_time,
1688                unsigned int *priority)
1689 {
1690   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1691   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1692   struct MessageEntry *pos;
1693   struct MessageEntry *prev;
1694   struct MessageEntry *next;
1695   size_t ret;
1696   
1697   ret = 0;
1698   *priority = 0;
1699   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1700   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1701   if (0 == select_messages (n, size, retry_time))
1702     {
1703 #if DEBUG_CORE
1704       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1705                   "No messages selected, will try again in %llu ms\n",
1706                   retry_time->value);
1707 #endif
1708       return 0;
1709     }
1710   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1711   ntm->distance = htonl (n->last_distance);
1712   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1713   ntm->peer = n->peer;
1714   
1715   pos = n->messages;
1716   prev = NULL;
1717   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1718     {
1719       next = pos->next;
1720       if (GNUNET_YES == pos->do_transmit)
1721         {
1722           GNUNET_assert (pos->size <= size);
1723           /* do notifications */
1724           /* FIXME: track if we have *any* client that wants
1725              full notifications and only do this if that is
1726              actually true */
1727           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1728             {
1729               memcpy (&ntm[1], &pos[1], pos->size);
1730               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1731                                         sizeof (struct GNUNET_MessageHeader));
1732               send_to_all_clients (&ntm->header,
1733                                    GNUNET_YES,
1734                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1735             }
1736           else
1737             {
1738               /* message too large for 'full' notifications, we do at
1739                  least the 'hdr' type */
1740               memcpy (&ntm[1],
1741                       &pos[1],
1742                       sizeof (struct GNUNET_MessageHeader));
1743             }
1744           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1745                                     pos->size);
1746           send_to_all_clients (&ntm->header,
1747                                GNUNET_YES,
1748                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1749 #if DEBUG_HANDSHAKE
1750           fprintf (stderr,
1751                    "Encrypting message of type %u\n",
1752                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1753 #endif
1754           /* copy for encrypted transmission */
1755           memcpy (&buf[ret], &pos[1], pos->size);
1756           ret += pos->size;
1757           size -= pos->size;
1758           *priority += pos->priority;
1759 #if DEBUG_CORE
1760           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1761                       "Adding plaintext message of size %u with deadline %llu ms to batch\n",
1762                       pos->size,
1763                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1764 #endif
1765           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1766           GNUNET_free (pos);
1767           if (prev == NULL)
1768             n->messages = next;
1769           else
1770             prev->next = next;
1771         }
1772       else
1773         {
1774           prev = pos;
1775         }
1776       pos = next;
1777     }
1778 #if DEBUG_CORE
1779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1780               "Deadline for message batch is %llu ms\n",
1781               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1782 #endif
1783   return ret;
1784 }
1785
1786
1787 /**
1788  * Remove messages with deadlines that have long expired from
1789  * the queue.
1790  *
1791  * @param n neighbour to inspect
1792  */
1793 static void
1794 discard_expired_messages (struct Neighbour *n)
1795 {
1796   struct MessageEntry *prev;
1797   struct MessageEntry *next;
1798   struct MessageEntry *pos;
1799   struct GNUNET_TIME_Absolute now;
1800   struct GNUNET_TIME_Relative delta;
1801
1802   now = GNUNET_TIME_absolute_get ();
1803   prev = NULL;
1804   pos = n->messages;
1805   while (pos != NULL) 
1806     {
1807       next = pos->next;
1808       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1809       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1810         {
1811 #if DEBUG_CORE
1812           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1813                       "Message is %llu ms past due, discarding.\n",
1814                       delta.value);
1815 #endif
1816           if (prev == NULL)
1817             n->messages = next;
1818           else
1819             prev->next = next;
1820           GNUNET_free (pos);
1821         }
1822       else
1823         prev = pos;
1824       pos = next;
1825     }
1826 }
1827
1828
1829 /**
1830  * Signature of the main function of a task.
1831  *
1832  * @param cls closure
1833  * @param tc context information (why was this task triggered now)
1834  */
1835 static void
1836 retry_plaintext_processing (void *cls,
1837                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1838 {
1839   struct Neighbour *n = cls;
1840
1841   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1842   process_plaintext_neighbour_queue (n);
1843 }
1844
1845
1846 /**
1847  * Send our key (and encrypted PING) to the other peer.
1848  *
1849  * @param n the other peer
1850  */
1851 static void send_key (struct Neighbour *n);
1852
1853 /**
1854  * Task that will retry "send_key" if our previous attempt failed
1855  * to yield a PONG.
1856  */
1857 static void
1858 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1859 {
1860   struct Neighbour *n = cls;
1861
1862 #if DEBUG_CORE
1863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1864               "Retrying key transmission to `%4s'\n",
1865               GNUNET_i2s (&n->peer));
1866 #endif
1867   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1868   n->set_key_retry_frequency =
1869     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1870   send_key (n);
1871 }
1872
1873
1874 /**
1875  * Check if we have plaintext messages for the specified neighbour
1876  * pending, and if so, consider batching and encrypting them (and
1877  * then trigger processing of the encrypted queue if needed).
1878  *
1879  * @param n neighbour to check.
1880  */
1881 static void
1882 process_plaintext_neighbour_queue (struct Neighbour *n)
1883 {
1884   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1885   size_t used;
1886   size_t esize;
1887   struct EncryptedMessage *em;  /* encrypted message */
1888   struct EncryptedMessage *ph;  /* plaintext header */
1889   struct MessageEntry *me;
1890   unsigned int priority;
1891   struct GNUNET_TIME_Absolute deadline;
1892   struct GNUNET_TIME_Relative retry_time;
1893   GNUNET_HashCode iv;
1894
1895   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1896     {
1897       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1898       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1899     }
1900   switch (n->status)
1901     {
1902     case PEER_STATE_DOWN:
1903       send_key (n);
1904 #if DEBUG_CORE
1905       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1907                   GNUNET_i2s(&n->peer));
1908 #endif
1909       return;
1910     case PEER_STATE_KEY_SENT:
1911       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1912         n->retry_set_key_task
1913           = GNUNET_SCHEDULER_add_delayed (sched,
1914                                           n->set_key_retry_frequency,
1915                                           &set_key_retry_task, n);    
1916 #if DEBUG_CORE
1917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1919                   GNUNET_i2s(&n->peer));
1920 #endif
1921       return;
1922     case PEER_STATE_KEY_RECEIVED:
1923       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1924         n->retry_set_key_task
1925           = GNUNET_SCHEDULER_add_delayed (sched,
1926                                           n->set_key_retry_frequency,
1927                                           &set_key_retry_task, n);        
1928 #if DEBUG_CORE
1929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1930                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1931                   GNUNET_i2s(&n->peer));
1932 #endif
1933       return;
1934     case PEER_STATE_KEY_CONFIRMED:
1935       /* ready to continue */
1936       break;
1937     }
1938   discard_expired_messages (n);
1939   if (n->messages == NULL)
1940     {
1941 #if DEBUG_CORE
1942       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1943                   "Plaintext message queue for `%4s' is empty.\n",
1944                   GNUNET_i2s(&n->peer));
1945 #endif
1946       return;                   /* no pending messages */
1947     }
1948   if (n->encrypted_head != NULL)
1949     {
1950 #if DEBUG_CORE
1951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1953                   GNUNET_i2s(&n->peer));
1954 #endif
1955       return;                   /* wait for messages already encrypted to be
1956                                    processed first! */
1957     }
1958   ph = (struct EncryptedMessage *) pbuf;
1959   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1960   priority = 0;
1961   used = sizeof (struct EncryptedMessage);
1962   used += batch_message (n,
1963                          &pbuf[used],
1964                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1965                          &deadline, &retry_time, &priority);
1966   if (used == sizeof (struct EncryptedMessage))
1967     {
1968 #if DEBUG_CORE
1969       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1971                   GNUNET_i2s(&n->peer));
1972 #endif
1973       /* no messages selected for sending, try again later... */
1974       n->retry_plaintext_task =
1975         GNUNET_SCHEDULER_add_delayed (sched,
1976                                       retry_time,
1977                                       &retry_plaintext_processing, n);
1978       return;
1979     }
1980 #if DEBUG_CORE_QUOTA
1981   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1982               "Sending %u b/s as new limit to peer `%4s'\n",
1983               (unsigned int) ntohl (n->bw_in.value__),
1984               GNUNET_i2s (&n->peer));
1985 #endif
1986   ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
1987   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1988   ph->inbound_bw_limit = n->bw_in;
1989   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1990
1991   /* setup encryption message header */
1992   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1993   me->deadline = deadline;
1994   me->priority = priority;
1995   me->size = used;
1996   em = (struct EncryptedMessage *) &me[1];
1997   em->header.size = htons (used);
1998   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1999   em->iv_seed = ph->iv_seed;
2000   esize = used - ENCRYPTED_HEADER_SIZE;
2001   GNUNET_CRYPTO_hash (&ph->sequence_number,
2002                       esize - sizeof (GNUNET_HashCode), 
2003                       &ph->plaintext_hash);
2004   GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
2005 #if DEBUG_CORE
2006   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2007               "Hashed %u bytes of plaintext (`%s') using IV `%d'\n",
2008               esize - sizeof (GNUNET_HashCode),
2009               GNUNET_h2s (&ph->plaintext_hash),
2010               (int) ph->iv_seed);
2011 #endif
2012   /* encrypt */
2013 #if DEBUG_CORE
2014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
2016               esize,
2017               GNUNET_i2s(&n->peer),
2018               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
2019 #endif
2020   GNUNET_assert (GNUNET_OK ==
2021                  do_encrypt (n,
2022                              &iv,
2023                              &ph->plaintext_hash,
2024                              &em->plaintext_hash, esize));
2025   /* append to transmission list */
2026   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2027                                      n->encrypted_tail,
2028                                      n->encrypted_tail,
2029                                      me);
2030   process_encrypted_neighbour_queue (n);
2031 }
2032
2033
2034 /**
2035  * Function that recalculates the bandwidth quota for the
2036  * given neighbour and transmits it to the transport service.
2037  * 
2038  * @param cls neighbour for the quota update
2039  * @param tc context
2040  */
2041 static void
2042 neighbour_quota_update (void *cls,
2043                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2044
2045
2046 /**
2047  * Schedule the task that will recalculate the bandwidth
2048  * quota for this peer (and possibly force a disconnect of
2049  * idle peers by calculating a bandwidth of zero).
2050  */
2051 static void
2052 schedule_quota_update (struct Neighbour *n)
2053 {
2054   GNUNET_assert (n->quota_update_task ==
2055                  GNUNET_SCHEDULER_NO_TASK);
2056   n->quota_update_task
2057     = GNUNET_SCHEDULER_add_delayed (sched,
2058                                     QUOTA_UPDATE_FREQUENCY,
2059                                     &neighbour_quota_update,
2060                                     n);
2061 }
2062
2063
2064 /**
2065  * Initialize a new 'struct Neighbour'.
2066  *
2067  * @param pid ID of the new neighbour
2068  * @return handle for the new neighbour
2069  */
2070 static struct Neighbour *
2071 create_neighbour (const struct GNUNET_PeerIdentity *pid)
2072 {
2073   struct Neighbour *n;
2074   struct GNUNET_TIME_Absolute now;
2075
2076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077               "Creating neighbour entry for peer `%4s'\n",
2078               GNUNET_i2s (pid));
2079   n = GNUNET_malloc (sizeof (struct Neighbour));
2080   n->next = neighbours;
2081   neighbours = n;
2082   neighbour_count++;
2083   GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
2084   n->peer = *pid;
2085   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2086   now = GNUNET_TIME_absolute_get ();
2087   n->encrypt_key_created = now;
2088   n->last_activity = now;
2089   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2090   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2091   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2092   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
2093   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
2094   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2095                                                 (uint32_t) - 1);
2096   neighbour_quota_update (n, NULL);
2097   return n;
2098 }
2099
2100
2101 /**
2102  * Handle CORE_SEND request.
2103  *
2104  * @param cls unused
2105  * @param client the client issuing the request
2106  * @param message the "struct SendMessage"
2107  */
2108 static void
2109 handle_client_send (void *cls,
2110                     struct GNUNET_SERVER_Client *client,
2111                     const struct GNUNET_MessageHeader *message)
2112 {
2113   const struct SendMessage *sm;
2114   struct Neighbour *n;
2115   struct MessageEntry *prev;
2116   struct MessageEntry *pos;
2117   struct MessageEntry *e; 
2118   struct MessageEntry *min_prio_entry;
2119   struct MessageEntry *min_prio_prev;
2120   unsigned int min_prio;
2121   unsigned int queue_size;
2122   uint16_t msize;
2123
2124   msize = ntohs (message->size);
2125   if (msize <
2126       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
2127     {
2128       GNUNET_break (0);
2129       if (client != NULL)
2130         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2131       return;
2132     }
2133   sm = (const struct SendMessage *) message;
2134   msize -= sizeof (struct SendMessage);
2135   if (0 == memcmp (&sm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2136     {
2137       /* FIXME: should we not allow loopback-injection here? */
2138       GNUNET_break (0);
2139       if (client != NULL)
2140         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2141       return;
2142     }
2143   n = find_neighbour (&sm->peer);
2144   if (n == NULL)
2145     n = create_neighbour (&sm->peer);
2146 #if DEBUG_CORE
2147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2148               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
2149               "SEND",
2150               msize, 
2151               GNUNET_i2s (&sm->peer));
2152 #endif
2153   /* bound queue size */
2154   discard_expired_messages (n);
2155   min_prio = (unsigned int) -1;
2156   min_prio_entry = NULL;
2157   min_prio_prev = NULL;
2158   queue_size = 0;
2159   prev = NULL;
2160   pos = n->messages;
2161   while (pos != NULL) 
2162     {
2163       if (pos->priority < min_prio)
2164         {
2165           min_prio_entry = pos;
2166           min_prio_prev = prev;
2167           min_prio = pos->priority;
2168         }
2169       queue_size++;
2170       prev = pos;
2171       pos = pos->next;
2172     }
2173   if (queue_size >= MAX_PEER_QUEUE_SIZE)
2174     {
2175       /* queue full */
2176       if (ntohl(sm->priority) <= min_prio)
2177         {
2178           /* discard new entry */
2179 #if DEBUG_CORE
2180           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2181                       "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
2182                       queue_size,
2183                       MAX_PEER_QUEUE_SIZE,
2184                       msize,
2185                       ntohs (message->type));
2186 #endif
2187           if (client != NULL)
2188             GNUNET_SERVER_receive_done (client, GNUNET_OK);
2189           return;
2190         }
2191       /* discard "min_prio_entry" */
2192 #if DEBUG_CORE
2193       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2194                   "Queue full, discarding existing older request\n");
2195 #endif
2196       if (min_prio_prev == NULL)
2197         n->messages = min_prio_entry->next;
2198       else
2199         min_prio_prev->next = min_prio_entry->next;      
2200       GNUNET_free (min_prio_entry);     
2201     }
2202
2203 #if DEBUG_CORE
2204   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2205               "Adding transmission request for `%4s' of size %u to queue\n",
2206               GNUNET_i2s (&sm->peer),
2207               msize);
2208 #endif  
2209   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2210   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2211   e->priority = ntohl (sm->priority);
2212   e->size = msize;
2213   memcpy (&e[1], &sm[1], msize);
2214
2215   /* insert, keep list sorted by deadline */
2216   prev = NULL;
2217   pos = n->messages;
2218   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2219     {
2220       prev = pos;
2221       pos = pos->next;
2222     }
2223   if (prev == NULL)
2224     n->messages = e;
2225   else
2226     prev->next = e;
2227   e->next = pos;
2228
2229   /* consider scheduling now */
2230   process_plaintext_neighbour_queue (n);
2231   if (client != NULL)
2232     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2233 }
2234
2235
2236 /**
2237  * Function called when the transport service is ready to
2238  * receive a message.  Only resets 'n->th' to NULL.
2239  *
2240  * @param cls neighbour to use message from
2241  * @param size number of bytes we can transmit
2242  * @param buf where to copy the message
2243  * @return number of bytes transmitted
2244  */
2245 static size_t
2246 notify_transport_connect_done (void *cls, size_t size, void *buf)
2247 {
2248   struct Neighbour *n = cls;
2249
2250   n->th = NULL;
2251   if (buf == NULL)
2252     {
2253       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2254                   _("Failed to connect to `%4s': transport failed to connect\n"),
2255                   GNUNET_i2s (&n->peer));
2256       return 0;
2257     }
2258   send_key (n);
2259   return 0;
2260 }
2261
2262
2263 /**
2264  * Handle CORE_REQUEST_CONNECT request.
2265  *
2266  * @param cls unused
2267  * @param client the client issuing the request
2268  * @param message the "struct ConnectMessage"
2269  */
2270 static void
2271 handle_client_request_connect (void *cls,
2272                                struct GNUNET_SERVER_Client *client,
2273                                const struct GNUNET_MessageHeader *message)
2274 {
2275   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2276   struct Neighbour *n;
2277   struct GNUNET_TIME_Relative timeout;
2278
2279   if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2280     {
2281       GNUNET_break (0);
2282       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2283       return;
2284     }
2285   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2286   n = find_neighbour (&cm->peer);
2287   if (n == NULL)
2288     n = create_neighbour (&cm->peer);
2289   if ( (n->is_connected) ||
2290        (n->th != NULL) )
2291     return; /* already connected, or at least trying */
2292   GNUNET_STATISTICS_update (stats, gettext_noop ("# connection requests received"), 1, GNUNET_NO);
2293 #if DEBUG_CORE
2294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295               "Core received `%s' request for `%4s', will try to establish connection\n",
2296               "REQUEST_CONNECT",
2297               GNUNET_i2s (&cm->peer));
2298 #endif
2299   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2300   /* ask transport to connect to the peer */
2301   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2302                                                   &cm->peer,
2303                                                   sizeof (struct GNUNET_MessageHeader), 0,
2304                                                   timeout,
2305                                                   &notify_transport_connect_done,
2306                                                   n);
2307   GNUNET_break (NULL != n->th);
2308 }
2309
2310
2311 /**
2312  * List of handlers for the messages understood by this
2313  * service.
2314  */
2315 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2316   {&handle_client_init, NULL,
2317    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2318   {&handle_client_request_info, NULL,
2319    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2320    sizeof (struct RequestInfoMessage)},
2321   {&handle_client_send, NULL,
2322    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2323   {&handle_client_request_connect, NULL,
2324    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2325    sizeof (struct ConnectMessage)},
2326   {NULL, NULL, 0, 0}
2327 };
2328
2329
2330 /**
2331  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2332  * the neighbour's struct and retry send_key.  Or, if we did not get a
2333  * HELLO, just do nothing.
2334  *
2335  * @param cls the 'struct Neighbour' to retry sending the key for
2336  * @param peer the peer for which this is the HELLO
2337  * @param hello HELLO message of that peer
2338  * @param trust amount of trust we currently have in that peer
2339  */
2340 static void
2341 process_hello_retry_send_key (void *cls,
2342                               const struct GNUNET_PeerIdentity *peer,
2343                               const struct GNUNET_HELLO_Message *hello,
2344                               uint32_t trust)
2345 {
2346   struct Neighbour *n = cls;
2347
2348   if (peer == NULL)
2349     {
2350 #if DEBUG_CORE
2351       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2353 #endif
2354       n->pitr = NULL;
2355       if (n->public_key != NULL)
2356         {
2357           GNUNET_STATISTICS_update (stats,
2358                                     gettext_noop ("# SETKEY messages deferred (need public key)"), 
2359                                     -1, 
2360                                     GNUNET_NO);
2361           send_key (n);
2362         }
2363       else
2364         {
2365           GNUNET_STATISTICS_update (stats,
2366                                     gettext_noop ("# Delayed connecting due to lack of public key"),
2367                                     1,
2368                                     GNUNET_NO);      
2369           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2370             n->retry_set_key_task
2371               = GNUNET_SCHEDULER_add_delayed (sched,
2372                                               n->set_key_retry_frequency,
2373                                               &set_key_retry_task, n);
2374         }
2375       return;
2376     }
2377
2378 #if DEBUG_CORE
2379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2381               GNUNET_i2s (peer));
2382 #endif
2383   if (n->public_key != NULL)
2384     {
2385 #if DEBUG_CORE
2386       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2387               "already have public key for peer %s!! (so why are we here?)\n",
2388               GNUNET_i2s (peer));
2389 #endif
2390       return;
2391     }
2392
2393 #if DEBUG_CORE
2394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2395               "Received new `%s' message for `%4s', initiating key exchange.\n",
2396               "HELLO",
2397               GNUNET_i2s (peer));
2398 #endif
2399   n->public_key =
2400     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2401   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2402     {
2403       GNUNET_STATISTICS_update (stats,
2404                                 gettext_noop ("# Error extracting public key from HELLO"),
2405                                 1,
2406                                 GNUNET_NO);      
2407       GNUNET_free (n->public_key);
2408       n->public_key = NULL;
2409 #if DEBUG_CORE
2410   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2411               "GNUNET_HELLO_get_key returned awfully\n");
2412 #endif
2413       return;
2414     }
2415 }
2416
2417
2418 /**
2419  * Send our key (and encrypted PING) to the other peer.
2420  *
2421  * @param n the other peer
2422  */
2423 static void
2424 send_key (struct Neighbour *n)
2425 {
2426   struct SetKeyMessage *sm;
2427   struct MessageEntry *me;
2428   struct PingMessage pp;
2429   struct PingMessage *pm;
2430
2431   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2432        (n->pitr != NULL) )
2433     {
2434 #if DEBUG_CORE
2435       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2436                   "Key exchange in progress with `%4s'.\n",
2437                   GNUNET_i2s (&n->peer));
2438 #endif
2439       return; /* already in progress */
2440     }
2441   if (! n->is_connected)
2442     {
2443       if (NULL == n->th)
2444         {
2445           GNUNET_STATISTICS_update (stats, 
2446                                     gettext_noop ("# Asking transport to connect (for SETKEY)"), 
2447                                     1, 
2448                                     GNUNET_NO);
2449           n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2450                                                           &n->peer,
2451                                                           sizeof (struct SetKeyMessage) + sizeof (struct PingMessage),
2452                                                           0,
2453                                                           GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2454                                                           &notify_encrypted_transmit_ready,
2455                                                           n);
2456         }
2457       return; 
2458     }
2459 #if DEBUG_CORE
2460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2461               "Asked to perform key exchange with `%4s'.\n",
2462               GNUNET_i2s (&n->peer));
2463 #endif
2464   if (n->public_key == NULL)
2465     {
2466       /* lookup n's public key, then try again */
2467 #if DEBUG_CORE
2468       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2469                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2470                   GNUNET_i2s (&n->peer));
2471 #endif
2472       GNUNET_assert (n->pitr == NULL);
2473       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2474                                          sched,
2475                                          &n->peer,
2476                                          0,
2477                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2478                                          &process_hello_retry_send_key, n);
2479       return;
2480     }
2481   /* first, set key message */
2482   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2483                       sizeof (struct SetKeyMessage));
2484   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2485   me->priority = SET_KEY_PRIORITY;
2486   me->size = sizeof (struct SetKeyMessage);
2487   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2488                                      n->encrypted_tail,
2489                                      n->encrypted_tail,
2490                                      me);
2491   sm = (struct SetKeyMessage *) &me[1];
2492   sm->header.size = htons (sizeof (struct SetKeyMessage));
2493   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2494   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2495                                         PEER_STATE_KEY_SENT : n->status));
2496   sm->purpose.size =
2497     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2498            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2499            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2500            sizeof (struct GNUNET_PeerIdentity));
2501   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2502   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2503   sm->target = n->peer;
2504   GNUNET_assert (GNUNET_OK ==
2505                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2506                                             sizeof (struct
2507                                                     GNUNET_CRYPTO_AesSessionKey),
2508                                             n->public_key,
2509                                             &sm->encrypted_key));
2510   GNUNET_assert (GNUNET_OK ==
2511                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2512                                          &sm->signature));
2513
2514   /* second, encrypted PING message */
2515   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2516                       sizeof (struct PingMessage));
2517   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2518   me->priority = PING_PRIORITY;
2519   me->size = sizeof (struct PingMessage);
2520   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2521                                      n->encrypted_tail,
2522                                      n->encrypted_tail,
2523                                      me);
2524   pm = (struct PingMessage *) &me[1];
2525   pm->header.size = htons (sizeof (struct PingMessage));
2526   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2527   pp.challenge = htonl (n->ping_challenge);
2528   pp.target = n->peer;
2529 #if DEBUG_CORE
2530   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2531               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2532               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2535               "PING",
2536               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2537 #endif
2538   do_encrypt (n,
2539               &n->peer.hashPubKey,
2540               &pp.challenge,
2541               &pm->challenge,
2542               sizeof (struct PingMessage) -
2543               sizeof (struct GNUNET_MessageHeader));
2544   /* update status */
2545   switch (n->status)
2546     {
2547     case PEER_STATE_DOWN:
2548       n->status = PEER_STATE_KEY_SENT;
2549       break;
2550     case PEER_STATE_KEY_SENT:
2551       break;
2552     case PEER_STATE_KEY_RECEIVED:
2553       break;
2554     case PEER_STATE_KEY_CONFIRMED:
2555       break;
2556     default:
2557       GNUNET_break (0);
2558       break;
2559     }
2560   GNUNET_STATISTICS_update (stats, 
2561                             gettext_noop ("# SETKEY and PING messages created"), 
2562                             1, 
2563                             GNUNET_NO);
2564 #if DEBUG_CORE
2565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2566               "Have %llu ms left for `%s' transmission.\n",
2567               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2568               "SET_KEY");
2569 #endif
2570   /* trigger queue processing */
2571   process_encrypted_neighbour_queue (n);
2572   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2573        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2574     n->retry_set_key_task
2575       = GNUNET_SCHEDULER_add_delayed (sched,
2576                                       n->set_key_retry_frequency,
2577                                       &set_key_retry_task, n);    
2578 }
2579
2580
2581 /**
2582  * We received a SET_KEY message.  Validate and update
2583  * our key material and status.
2584  *
2585  * @param n the neighbour from which we received message m
2586  * @param m the set key message we received
2587  */
2588 static void
2589 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2590
2591
2592 /**
2593  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2594  * the neighbour's struct and retry handling the set_key message.  Or,
2595  * if we did not get a HELLO, just free the set key message.
2596  *
2597  * @param cls pointer to the set key message
2598  * @param peer the peer for which this is the HELLO
2599  * @param hello HELLO message of that peer
2600  * @param trust amount of trust we currently have in that peer
2601  */
2602 static void
2603 process_hello_retry_handle_set_key (void *cls,
2604                                     const struct GNUNET_PeerIdentity *peer,
2605                                     const struct GNUNET_HELLO_Message *hello,
2606                                     uint32_t trust)
2607 {
2608   struct Neighbour *n = cls;
2609   struct SetKeyMessage *sm = n->skm;
2610
2611   if (peer == NULL)
2612     {
2613       GNUNET_free (sm);
2614       n->skm = NULL;
2615       n->pitr = NULL;
2616       return;
2617     }
2618   if (n->public_key != NULL)
2619     return;                     /* multiple HELLOs match!? */
2620   n->public_key =
2621     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2622   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2623     {
2624       GNUNET_break_op (0);
2625       GNUNET_free (n->public_key);
2626       n->public_key = NULL;
2627       return;
2628     }
2629 #if DEBUG_CORE
2630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2631               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2632               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2633 #endif
2634   handle_set_key (n, sm);
2635 }
2636
2637
2638 /**
2639  * We received a PING message.  Validate and transmit
2640  * PONG.
2641  *
2642  * @param n sender of the PING
2643  * @param m the encrypted PING message itself
2644  */
2645 static void
2646 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2647 {
2648   struct PingMessage t;
2649   struct PongMessage tx;
2650   struct PongMessage *tp;
2651   struct MessageEntry *me;
2652
2653 #if DEBUG_CORE
2654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2655               "Core service receives `%s' request from `%4s'.\n",
2656               "PING", GNUNET_i2s (&n->peer));
2657 #endif
2658   if (GNUNET_OK !=
2659       do_decrypt (n,
2660                   &my_identity.hashPubKey,
2661                   &m->challenge,
2662                   &t.challenge,
2663                   sizeof (struct PingMessage) -
2664                   sizeof (struct GNUNET_MessageHeader)))
2665     return;
2666 #if DEBUG_CORE
2667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2668               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2669               "PING",
2670               GNUNET_i2s (&t.target),
2671               ntohl (t.challenge), n->decrypt_key.crc32);
2672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2673               "Target of `%s' request is `%4s'.\n",
2674               "PING", GNUNET_i2s (&t.target));
2675 #endif
2676   GNUNET_STATISTICS_update (stats,
2677                             gettext_noop ("# PING messages decrypted"), 
2678                             1,
2679                             GNUNET_NO);
2680   if (0 != memcmp (&t.target,
2681                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2682     {
2683       GNUNET_break_op (0);
2684       return;
2685     }
2686   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2687                       sizeof (struct PongMessage));
2688   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2689                                      n->encrypted_tail,
2690                                      n->encrypted_tail,
2691                                      me);
2692   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2693   me->priority = PONG_PRIORITY;
2694   me->size = sizeof (struct PongMessage);
2695   tx.reserved = htonl (0);
2696   tx.inbound_bw_limit = n->bw_in;
2697   tx.challenge = t.challenge;
2698   tx.target = t.target;
2699   tp = (struct PongMessage *) &me[1];
2700   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2701   tp->header.size = htons (sizeof (struct PongMessage));
2702   do_encrypt (n,
2703               &my_identity.hashPubKey,
2704               &tx.challenge,
2705               &tp->challenge,
2706               sizeof (struct PongMessage) -
2707               sizeof (struct GNUNET_MessageHeader));
2708   GNUNET_STATISTICS_update (stats, 
2709                             gettext_noop ("# PONG messages created"), 
2710                             1, 
2711                             GNUNET_NO);
2712 #if DEBUG_CORE
2713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2714               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2715               ntohl (t.challenge), n->encrypt_key.crc32);
2716 #endif
2717   /* trigger queue processing */
2718   process_encrypted_neighbour_queue (n);
2719 }
2720
2721
2722 /**
2723  * We received a PONG message.  Validate and update our status.
2724  *
2725  * @param n sender of the PONG
2726  * @param m the encrypted PONG message itself
2727  */
2728 static void
2729 handle_pong (struct Neighbour *n, 
2730              const struct PongMessage *m)
2731 {
2732   struct PongMessage t;
2733   struct ConnectNotifyMessage cnm;
2734
2735 #if DEBUG_CORE
2736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2737               "Core service receives `%s' request from `%4s'.\n",
2738               "PONG", GNUNET_i2s (&n->peer));
2739 #endif
2740   if (GNUNET_OK !=
2741       do_decrypt (n,
2742                   &n->peer.hashPubKey,
2743                   &m->challenge,
2744                   &t.challenge,
2745                   sizeof (struct PongMessage) -
2746                   sizeof (struct GNUNET_MessageHeader)))
2747     {
2748       GNUNET_break_op (0);
2749       return;
2750     }
2751   GNUNET_STATISTICS_update (stats, 
2752                             gettext_noop ("# PONG messages decrypted"), 
2753                             1, 
2754                             GNUNET_NO);
2755   if (0 != ntohl (t.reserved))
2756     {
2757       GNUNET_break_op (0);
2758       return;
2759     }
2760 #if DEBUG_CORE
2761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2762               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2763               "PONG",
2764               GNUNET_i2s (&t.target),
2765               ntohl (t.challenge), n->decrypt_key.crc32);
2766 #endif
2767   if ((0 != memcmp (&t.target,
2768                     &n->peer,
2769                     sizeof (struct GNUNET_PeerIdentity))) ||
2770       (n->ping_challenge != ntohl (t.challenge)))
2771     {
2772       /* PONG malformed */
2773 #if DEBUG_CORE
2774       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2775                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2776                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2777       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2778                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2779                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2780 #endif
2781       GNUNET_break_op (0);
2782       return;
2783     }
2784   switch (n->status)
2785     {
2786     case PEER_STATE_DOWN:
2787       GNUNET_break (0);         /* should be impossible */
2788       return;
2789     case PEER_STATE_KEY_SENT:
2790       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2791       return;
2792     case PEER_STATE_KEY_RECEIVED:
2793       GNUNET_STATISTICS_update (stats, 
2794                                 gettext_noop ("# Session keys confirmed via PONG"), 
2795                                 1, 
2796                                 GNUNET_NO);
2797       n->status = PEER_STATE_KEY_CONFIRMED;
2798       if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
2799         {
2800           n->bw_out_external_limit = t.inbound_bw_limit;
2801           n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
2802                                                   n->bw_out_internal_limit);
2803           GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
2804                                                  n->bw_out);       
2805           GNUNET_TRANSPORT_set_quota (transport,
2806                                       &n->peer,
2807                                       n->bw_in,
2808                                       n->bw_out,
2809                                       GNUNET_TIME_UNIT_FOREVER_REL,
2810                                       NULL, NULL); 
2811         }
2812 #if DEBUG_CORE
2813       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2814                   "Confirmed key via `%s' message for peer `%4s'\n",
2815                   "PONG", GNUNET_i2s (&n->peer));
2816 #endif      
2817       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2818         {
2819           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2820           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2821         }      
2822       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2823       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2824       cnm.distance = htonl (n->last_distance);
2825       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2826       cnm.peer = n->peer;
2827       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2828       process_encrypted_neighbour_queue (n);
2829       /* fall-through! */
2830     case PEER_STATE_KEY_CONFIRMED:
2831       n->last_activity = GNUNET_TIME_absolute_get ();
2832       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
2833         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
2834       n->keep_alive_task 
2835         = GNUNET_SCHEDULER_add_delayed (sched, 
2836                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
2837                                         &send_keep_alive,
2838                                         n);
2839       break;
2840     default:
2841       GNUNET_break (0);
2842       break;
2843     }
2844 }
2845
2846
2847 /**
2848  * We received a SET_KEY message.  Validate and update
2849  * our key material and status.
2850  *
2851  * @param n the neighbour from which we received message m
2852  * @param m the set key message we received
2853  */
2854 static void
2855 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2856 {
2857   struct SetKeyMessage *m_cpy;
2858   struct GNUNET_TIME_Absolute t;
2859   struct GNUNET_CRYPTO_AesSessionKey k;
2860   struct PingMessage *ping;
2861   struct PongMessage *pong;
2862   enum PeerStateMachine sender_status;
2863
2864 #if DEBUG_CORE
2865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2866               "Core service receives `%s' request from `%4s'.\n",
2867               "SET_KEY", GNUNET_i2s (&n->peer));
2868 #endif
2869   if (n->public_key == NULL)
2870     {
2871       if (n->pitr != NULL)
2872         {
2873 #if DEBUG_CORE
2874           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2875                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2876                       "SET_KEY");
2877 #endif
2878           return;
2879         }
2880 #if DEBUG_CORE
2881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2882                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2883 #endif
2884       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2885       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2886       /* lookup n's public key, then try again */
2887       GNUNET_assert (n->skm == NULL);
2888       n->skm = m_cpy;
2889       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2890                                          sched,
2891                                          &n->peer,
2892                                          0,
2893                                          GNUNET_TIME_UNIT_MINUTES,
2894                                          &process_hello_retry_handle_set_key, n);
2895       GNUNET_STATISTICS_update (stats, 
2896                                 gettext_noop ("# SETKEY messages deferred (need public key)"), 
2897                                 1, 
2898                                 GNUNET_NO);
2899       return;
2900     }
2901   if (0 != memcmp (&m->target,
2902                    &my_identity,
2903                    sizeof (struct GNUNET_PeerIdentity)))
2904     {
2905       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2906                   _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
2907                   "SET_KEY",
2908                   GNUNET_i2s (&m->target));
2909       return;
2910     }
2911   if ((ntohl (m->purpose.size) !=
2912        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2913        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2914        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2915        sizeof (struct GNUNET_PeerIdentity)) ||
2916       (GNUNET_OK !=
2917        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2918                                  &m->purpose, &m->signature, n->public_key)))
2919     {
2920       /* invalid signature */
2921       GNUNET_break_op (0);
2922       return;
2923     }
2924   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2925   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2926        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2927       (t.value < n->decrypt_key_created.value))
2928     {
2929       /* this could rarely happen due to massive re-ordering of
2930          messages on the network level, but is most likely either
2931          a bug or some adversary messing with us.  Report. */
2932       GNUNET_break_op (0);
2933       return;
2934     }
2935 #if DEBUG_CORE
2936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2937               "Decrypting key material.\n");
2938 #endif  
2939   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2940                                   &m->encrypted_key,
2941                                   &k,
2942                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2943        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2944       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2945     {
2946       /* failed to decrypt !? */
2947       GNUNET_break_op (0);
2948       return;
2949     }
2950   GNUNET_STATISTICS_update (stats, 
2951                             gettext_noop ("# SETKEY messages decrypted"), 
2952                             1, 
2953                             GNUNET_NO);
2954   n->decrypt_key = k;
2955   if (n->decrypt_key_created.value != t.value)
2956     {
2957       /* fresh key, reset sequence numbers */
2958       n->last_sequence_number_received = 0;
2959       n->last_packets_bitmap = 0;
2960       n->decrypt_key_created = t;
2961     }
2962   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2963   switch (n->status)
2964     {
2965     case PEER_STATE_DOWN:
2966       n->status = PEER_STATE_KEY_RECEIVED;
2967 #if DEBUG_CORE
2968       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2969                   "Responding to `%s' with my own key.\n", "SET_KEY");
2970 #endif
2971       send_key (n);
2972       break;
2973     case PEER_STATE_KEY_SENT:
2974     case PEER_STATE_KEY_RECEIVED:
2975       n->status = PEER_STATE_KEY_RECEIVED;
2976       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2977           (sender_status != PEER_STATE_KEY_CONFIRMED))
2978         {
2979 #if DEBUG_CORE
2980           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2981                       "Responding to `%s' with my own key (other peer has status %u).\n",
2982                       "SET_KEY", sender_status);
2983 #endif
2984           send_key (n);
2985         }
2986       break;
2987     case PEER_STATE_KEY_CONFIRMED:
2988       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2989           (sender_status != PEER_STATE_KEY_CONFIRMED))
2990         {         
2991 #if DEBUG_CORE
2992           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2993                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2994                       "SET_KEY", sender_status);
2995 #endif
2996           send_key (n);
2997         }
2998       break;
2999     default:
3000       GNUNET_break (0);
3001       break;
3002     }
3003   if (n->pending_ping != NULL)
3004     {
3005       ping = n->pending_ping;
3006       n->pending_ping = NULL;
3007       handle_ping (n, ping);
3008       GNUNET_free (ping);
3009     }
3010   if (n->pending_pong != NULL)
3011     {
3012       pong = n->pending_pong;
3013       n->pending_pong = NULL;
3014       handle_pong (n, pong);
3015       GNUNET_free (pong);
3016     }
3017 }
3018
3019
3020 /**
3021  * Send a P2P message to a client.
3022  *
3023  * @param sender who sent us the message?
3024  * @param client who should we give the message to?
3025  * @param m contains the message to transmit
3026  * @param msize number of bytes in buf to transmit
3027  */
3028 static void
3029 send_p2p_message_to_client (struct Neighbour *sender,
3030                             struct Client *client,
3031                             const void *m, size_t msize)
3032 {
3033   char buf[msize + sizeof (struct NotifyTrafficMessage)];
3034   struct NotifyTrafficMessage *ntm;
3035
3036 #if DEBUG_CORE
3037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3038               "Core service passes message from `%4s' of type %u to client.\n",
3039               GNUNET_i2s(&sender->peer),
3040               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
3041 #endif
3042   ntm = (struct NotifyTrafficMessage *) buf;
3043   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
3044   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
3045   ntm->distance = htonl (sender->last_distance);
3046   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
3047   ntm->peer = sender->peer;
3048   memcpy (&ntm[1], m, msize);
3049   send_to_client (client, &ntm->header, GNUNET_YES);
3050 }
3051
3052
3053 /**
3054  * Deliver P2P message to interested clients.
3055  *
3056  * @param sender who sent us the message?
3057  * @param m the message
3058  * @param msize size of the message (including header)
3059  */
3060 static void
3061 deliver_message (struct Neighbour *sender,
3062                  const struct GNUNET_MessageHeader *m, size_t msize)
3063 {
3064   char buf[256];
3065   struct Client *cpos;
3066   uint16_t type;
3067   unsigned int tpos;
3068   int deliver_full;
3069   int dropped;
3070
3071   type = ntohs (m->type);
3072 #if DEBUG_CORE
3073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3074               "Received encapsulated message of type %u from `%4s'\n",
3075               type,
3076               GNUNET_i2s (&sender->peer));
3077 #endif
3078   GNUNET_snprintf (buf,
3079                    sizeof(buf),
3080                    gettext_noop ("# bytes of messages of type %u received"),
3081                    type);
3082   GNUNET_STATISTICS_set (stats,
3083                          buf,
3084                          msize,
3085                          GNUNET_NO);     
3086   dropped = GNUNET_YES;
3087   cpos = clients;
3088   while (cpos != NULL)
3089     {
3090       deliver_full = GNUNET_NO;
3091       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
3092         deliver_full = GNUNET_YES;
3093       else
3094         {
3095           for (tpos = 0; tpos < cpos->tcnt; tpos++)
3096             {
3097               if (type != cpos->types[tpos])
3098                 continue;
3099               deliver_full = GNUNET_YES;
3100               break;
3101             }
3102         }
3103       if (GNUNET_YES == deliver_full)
3104         {
3105           send_p2p_message_to_client (sender, cpos, m, msize);
3106           dropped = GNUNET_NO;
3107         }
3108       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
3109         {
3110           send_p2p_message_to_client (sender, cpos, m,
3111                                       sizeof (struct GNUNET_MessageHeader));
3112         }
3113       cpos = cpos->next;
3114     }
3115   if (dropped == GNUNET_YES)
3116     {
3117 #if DEBUG_CORE
3118       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3119                   "Message of type %u from `%4s' not delivered to any client.\n",
3120                   type,
3121                   GNUNET_i2s (&sender->peer));
3122 #endif
3123       /* FIXME: stats... */
3124     }
3125 }
3126
3127
3128 /**
3129  * Align P2P message and then deliver to interested clients.
3130  *
3131  * @param sender who sent us the message?
3132  * @param buffer unaligned (!) buffer containing message
3133  * @param msize size of the message (including header)
3134  */
3135 static void
3136 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
3137 {
3138   char abuf[msize];
3139
3140   /* TODO: call to statistics? */
3141   memcpy (abuf, buffer, msize);
3142   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
3143 }
3144
3145
3146 /**
3147  * Deliver P2P messages to interested clients.
3148  *
3149  * @param sender who sent us the message?
3150  * @param buffer buffer containing messages, can be modified
3151  * @param buffer_size size of the buffer (overall)
3152  * @param offset offset where messages in the buffer start
3153  */
3154 static void
3155 deliver_messages (struct Neighbour *sender,
3156                   const char *buffer, size_t buffer_size, size_t offset)
3157 {
3158   struct GNUNET_MessageHeader *mhp;
3159   struct GNUNET_MessageHeader mh;
3160   uint16_t msize;
3161   int need_align;
3162
3163   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
3164     {
3165       if (0 != offset % sizeof (uint16_t))
3166         {
3167           /* outch, need to copy to access header */
3168           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
3169           mhp = &mh;
3170         }
3171       else
3172         {
3173           /* can access header directly */
3174           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
3175         }
3176       msize = ntohs (mhp->size);
3177       if (msize + offset > buffer_size)
3178         {
3179           /* malformed message, header says it is larger than what
3180              would fit into the overall buffer */
3181           GNUNET_break_op (0);
3182           break;
3183         }
3184 #if HAVE_UNALIGNED_64_ACCESS
3185       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
3186 #else
3187       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
3188 #endif
3189       if (GNUNET_YES == need_align)
3190         align_and_deliver (sender, &buffer[offset], msize);
3191       else
3192         deliver_message (sender,
3193                          (const struct GNUNET_MessageHeader *)
3194                          &buffer[offset], msize);
3195       offset += msize;
3196     }
3197 }
3198
3199
3200 /**
3201  * We received an encrypted message.  Decrypt, validate and
3202  * pass on to the appropriate clients.
3203  */
3204 static void
3205 handle_encrypted_message (struct Neighbour *n,
3206                           const struct EncryptedMessage *m)
3207 {
3208   size_t size = ntohs (m->header.size);
3209   char buf[size];
3210   struct EncryptedMessage *pt;  /* plaintext */
3211   GNUNET_HashCode ph;
3212   uint32_t snum;
3213   struct GNUNET_TIME_Absolute t;
3214   GNUNET_HashCode iv;
3215
3216 #if DEBUG_CORE
3217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3218               "Core service receives `%s' request from `%4s'.\n",
3219               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
3220 #endif  
3221   GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
3222   /* decrypt */
3223   if (GNUNET_OK !=
3224       do_decrypt (n,
3225                   &iv,
3226                   &m->plaintext_hash,
3227                   &buf[ENCRYPTED_HEADER_SIZE], 
3228                   size - ENCRYPTED_HEADER_SIZE))
3229     return;
3230   pt = (struct EncryptedMessage *) buf;
3231   /* validate hash */
3232   GNUNET_CRYPTO_hash (&pt->sequence_number,
3233                       size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
3234 #if DEBUG_CORE
3235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3236               "V-Hashed %u bytes of plaintext (`%s') using IV `%d'\n",
3237               size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode),
3238               GNUNET_h2s (&ph),
3239               (int) m->iv_seed);
3240 #endif
3241   if (0 != memcmp (&ph, 
3242                    &pt->plaintext_hash, 
3243                    sizeof (GNUNET_HashCode)))
3244     {
3245       /* checksum failed */
3246       GNUNET_break_op (0);
3247       return;
3248     }
3249
3250   /* validate sequence number */
3251   snum = ntohl (pt->sequence_number);
3252   if (n->last_sequence_number_received == snum)
3253     {
3254       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3255                   "Received duplicate message, ignoring.\n");
3256       /* duplicate, ignore */
3257       GNUNET_STATISTICS_set (stats,
3258                              gettext_noop ("# bytes dropped (duplicates)"),
3259                              size,
3260                              GNUNET_NO);      
3261       return;
3262     }
3263   if ((n->last_sequence_number_received > snum) &&
3264       (n->last_sequence_number_received - snum > 32))
3265     {
3266       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3267                   "Received ancient out of sequence message, ignoring.\n");
3268       /* ancient out of sequence, ignore */
3269       GNUNET_STATISTICS_set (stats,
3270                              gettext_noop ("# bytes dropped (out of sequence)"),
3271                              size,
3272                              GNUNET_NO);      
3273       return;
3274     }
3275   if (n->last_sequence_number_received > snum)
3276     {
3277       unsigned int rotbit =
3278         1 << (n->last_sequence_number_received - snum - 1);
3279       if ((n->last_packets_bitmap & rotbit) != 0)
3280         {
3281           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3282                       "Received duplicate message, ignoring.\n");
3283           GNUNET_STATISTICS_set (stats,
3284                                  gettext_noop ("# bytes dropped (duplicates)"),
3285                                  size,
3286                                  GNUNET_NO);      
3287           /* duplicate, ignore */
3288           return;
3289         }
3290       n->last_packets_bitmap |= rotbit;
3291     }
3292   if (n->last_sequence_number_received < snum)
3293     {
3294       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
3295       n->last_sequence_number_received = snum;
3296     }
3297
3298   /* check timestamp */
3299   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
3300   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
3301     {
3302       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3303                   _
3304                   ("Message received far too old (%llu ms). Content ignored.\n"),
3305                   GNUNET_TIME_absolute_get_duration (t).value);
3306       GNUNET_STATISTICS_set (stats,
3307                              gettext_noop ("# bytes dropped (ancient message)"),
3308                              size,
3309                              GNUNET_NO);      
3310       return;
3311     }
3312
3313   /* process decrypted message(s) */
3314   if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
3315     {
3316 #if DEBUG_CORE_SET_QUOTA
3317       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3318                   "Received %u b/s as new inbound limit for peer `%4s'\n",
3319                   (unsigned int) ntohl (pt->inbound_bw_limit.value__),
3320                   GNUNET_i2s (&n->peer));
3321 #endif
3322       n->bw_out_external_limit = pt->inbound_bw_limit;
3323       n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
3324                                               n->bw_out_internal_limit);
3325       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
3326                                              n->bw_out);
3327       GNUNET_TRANSPORT_set_quota (transport,
3328                                   &n->peer,
3329                                   n->bw_in,
3330                                   n->bw_out,
3331                                   GNUNET_TIME_UNIT_FOREVER_REL,
3332                                   NULL, NULL); 
3333     }
3334   n->last_activity = GNUNET_TIME_absolute_get ();
3335   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3336     GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3337   n->keep_alive_task 
3338     = GNUNET_SCHEDULER_add_delayed (sched, 
3339                                     GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3340                                     &send_keep_alive,
3341                                     n);
3342   GNUNET_STATISTICS_set (stats,
3343                          gettext_noop ("# bytes of payload decrypted"),
3344                          size - sizeof (struct EncryptedMessage),
3345                          GNUNET_NO);      
3346   deliver_messages (n, buf, size, sizeof (struct EncryptedMessage));
3347 }
3348
3349
3350 /**
3351  * Function called by the transport for each received message.
3352  *
3353  * @param cls closure
3354  * @param peer (claimed) identity of the other peer
3355  * @param message the message
3356  * @param latency estimated latency for communicating with the
3357  *             given peer (round-trip)
3358  * @param distance in overlay hops, as given by transport plugin
3359  */
3360 static void
3361 handle_transport_receive (void *cls,
3362                           const struct GNUNET_PeerIdentity *peer,
3363                           const struct GNUNET_MessageHeader *message,
3364                           struct GNUNET_TIME_Relative latency,
3365                           unsigned int distance)
3366 {
3367   struct Neighbour *n;
3368   struct GNUNET_TIME_Absolute now;
3369   int up;
3370   uint16_t type;
3371   uint16_t size;
3372
3373 #if DEBUG_CORE
3374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3375               "Received message of type %u from `%4s', demultiplexing.\n",
3376               ntohs (message->type), GNUNET_i2s (peer));
3377 #endif
3378   if (0 == memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
3379     {
3380       GNUNET_break (0);
3381       return;
3382     }
3383   n = find_neighbour (peer);
3384   if (n == NULL)
3385     n = create_neighbour (peer);
3386   n->last_latency = latency;
3387   n->last_distance = distance;
3388   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3389   type = ntohs (message->type);
3390   size = ntohs (message->size);
3391 #if DEBUG_HANDSHAKE
3392   fprintf (stderr,
3393            "Received message of type %u from `%4s'\n",
3394            type,
3395            GNUNET_i2s (peer));
3396 #endif
3397   switch (type)
3398     {
3399     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3400       if (size != sizeof (struct SetKeyMessage))
3401         {
3402           GNUNET_break_op (0);
3403           return;
3404         }
3405       GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"), 1, GNUNET_NO);
3406       handle_set_key (n, (const struct SetKeyMessage *) message);
3407       break;
3408     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3409       if (size < sizeof (struct EncryptedMessage) +
3410           sizeof (struct GNUNET_MessageHeader))
3411         {
3412           GNUNET_break_op (0);
3413           return;
3414         }
3415       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3416           (n->status != PEER_STATE_KEY_CONFIRMED))
3417         {
3418           GNUNET_break_op (0);
3419           /* blacklist briefly (?); might help recover (?) */
3420           GNUNET_TRANSPORT_blacklist (sched, cfg,
3421                                       &n->peer, 
3422                                       GNUNET_TIME_UNIT_SECONDS,
3423                                       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3424                                                                      5),
3425                                       NULL, NULL);
3426           return;
3427         }
3428       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3429       break;
3430     case GNUNET_MESSAGE_TYPE_CORE_PING:
3431       if (size != sizeof (struct PingMessage))
3432         {
3433           GNUNET_break_op (0);
3434           return;
3435         }
3436       GNUNET_STATISTICS_update (stats, gettext_noop ("# PING messages received"), 1, GNUNET_NO);
3437       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3438           (n->status != PEER_STATE_KEY_CONFIRMED))
3439         {
3440 #if DEBUG_CORE
3441           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3442                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3443                       "PING", GNUNET_i2s (&n->peer));
3444 #endif
3445           GNUNET_free_non_null (n->pending_ping);
3446           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3447           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3448           return;
3449         }
3450       handle_ping (n, (const struct PingMessage *) message);
3451       break;
3452     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3453       if (size != sizeof (struct PongMessage))
3454         {
3455           GNUNET_break_op (0);
3456           return;
3457         }
3458       GNUNET_STATISTICS_update (stats, gettext_noop ("# PONG messages received"), 1, GNUNET_NO);
3459       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3460            (n->status != PEER_STATE_KEY_CONFIRMED) )
3461         {
3462 #if DEBUG_CORE
3463           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3464                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3465                       "PONG", GNUNET_i2s (&n->peer));
3466 #endif
3467           GNUNET_free_non_null (n->pending_pong);
3468           n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
3469           memcpy (n->pending_pong, message, sizeof (struct PongMessage));
3470           return;
3471         }
3472       handle_pong (n, (const struct PongMessage *) message);
3473       break;
3474     default:
3475       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3476                   _("Unsupported message of type %u received.\n"), type);
3477       return;
3478     }
3479   if (n->status == PEER_STATE_KEY_CONFIRMED)
3480     {
3481       now = GNUNET_TIME_absolute_get ();
3482       n->last_activity = now;
3483       if (!up)
3484         {
3485           GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), 1, GNUNET_NO);
3486           n->time_established = now;
3487         }
3488       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
3489         GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
3490       n->keep_alive_task 
3491         = GNUNET_SCHEDULER_add_delayed (sched, 
3492                                         GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
3493                                         &send_keep_alive,
3494                                         n);
3495     }
3496 }
3497
3498
3499 /**
3500  * Function that recalculates the bandwidth quota for the
3501  * given neighbour and transmits it to the transport service.
3502  * 
3503  * @param cls neighbour for the quota update
3504  * @param tc context
3505  */
3506 static void
3507 neighbour_quota_update (void *cls,
3508                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3509 {
3510   struct Neighbour *n = cls;
3511   struct GNUNET_BANDWIDTH_Value32NBO q_in;
3512   double pref_rel;
3513   double share;
3514   unsigned long long distributable;
3515   uint64_t need_per_peer;
3516   uint64_t need_per_second;
3517   
3518   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3519   /* calculate relative preference among all neighbours;
3520      divides by a bit more to avoid division by zero AND to
3521      account for possibility of new neighbours joining any time 
3522      AND to convert to double... */
3523   if (preference_sum == 0)
3524     {
3525       pref_rel = 1.0 / (double) neighbour_count;
3526     }
3527   else
3528     {
3529       pref_rel = n->current_preference / preference_sum;
3530     }
3531   need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
3532                                                               GNUNET_TIME_UNIT_SECONDS);  
3533   need_per_second = need_per_peer * neighbour_count;
3534   distributable = 0;
3535   if (bandwidth_target_out_bps > need_per_second)
3536     distributable = bandwidth_target_out_bps - need_per_second;
3537   share = distributable * pref_rel;
3538   if (share + need_per_peer > ( (uint32_t)-1))
3539     q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
3540   else
3541     q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
3542   /* check if we want to disconnect for good due to inactivity */
3543   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3544        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3545     {
3546 #if DEBUG_CORE
3547       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3548                   "Forcing disconnect of `%4s' due to inactivity (?).\n",
3549                   GNUNET_i2s (&n->peer));
3550 #endif
3551       q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
3552     }
3553 #if DEBUG_CORE_QUOTA
3554   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3555               "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
3556               GNUNET_i2s (&n->peer),
3557               (unsigned int) ntohl (q_in.value__),
3558               bandwidth_target_out_bps,
3559               (unsigned int) ntohl (n->bw_in.value__),
3560               (unsigned int) ntohl (n->bw_out.value__),
3561               (unsigned int) ntohl (n->bw_out_internal_limit.value__));
3562 #endif
3563   if (n->bw_in.value__ != q_in.value__) 
3564     {
3565       n->bw_in = q_in;
3566       GNUNET_TRANSPORT_set_quota (transport,
3567                                   &n->peer,
3568                                   n->bw_in,
3569                                   n->bw_out,
3570                                   GNUNET_TIME_UNIT_FOREVER_REL,
3571                                   NULL, NULL);
3572     }
3573   schedule_quota_update (n);
3574 }
3575
3576
3577 /**
3578  * Function called by transport to notify us that
3579  * a peer connected to us (on the network level).
3580  *
3581  * @param cls closure
3582  * @param peer the peer that connected
3583  * @param latency current latency of the connection
3584  * @param distance in overlay hops, as given by transport plugin
3585  */
3586 static void
3587 handle_transport_notify_connect (void *cls,
3588                                  const struct GNUNET_PeerIdentity *peer,
3589                                  struct GNUNET_TIME_Relative latency,
3590                                  unsigned int distance)
3591 {
3592   struct Neighbour *n;
3593   struct ConnectNotifyMessage cnm;
3594
3595   if (0 == memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
3596     {
3597       GNUNET_break (0);
3598       return;
3599     }
3600   n = find_neighbour (peer);
3601   if (n != NULL)
3602     {
3603       if (n->is_connected)
3604         {
3605           /* duplicate connect notification!? */
3606           GNUNET_break (0);
3607           return;
3608         }
3609     }
3610   else
3611     {
3612       n = create_neighbour (peer);
3613     }
3614   GNUNET_STATISTICS_update (stats, 
3615                             gettext_noop ("# peers connected"), 
3616                             1, 
3617                             GNUNET_NO);
3618   n->is_connected = GNUNET_YES;      
3619   n->last_latency = latency;
3620   n->last_distance = distance;
3621   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
3622                                  n->bw_out,
3623                                  MAX_WINDOW_TIME_S);
3624   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
3625                                  n->bw_in,
3626                                  MAX_WINDOW_TIME_S);  
3627 #if DEBUG_CORE
3628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3629               "Received connection from `%4s'.\n",
3630               GNUNET_i2s (&n->peer));
3631 #endif
3632   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3633   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3634   cnm.distance = htonl (n->last_distance);
3635   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3636   cnm.peer = *peer;
3637   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3638   GNUNET_TRANSPORT_set_quota (transport,
3639                               &n->peer,
3640                               n->bw_in,
3641                               n->bw_out,
3642                               GNUNET_TIME_UNIT_FOREVER_REL,
3643                               NULL, NULL);
3644   send_key (n); 
3645 }
3646
3647
3648 /**
3649  * Function called by transport telling us that a peer
3650  * disconnected.
3651  *
3652  * @param cls closure
3653  * @param peer the peer that disconnected
3654  */
3655 static void
3656 handle_transport_notify_disconnect (void *cls,
3657                                     const struct GNUNET_PeerIdentity *peer)
3658 {
3659   struct DisconnectNotifyMessage cnm;
3660   struct Neighbour *n;
3661
3662 #if DEBUG_CORE
3663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3664               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3665 #endif
3666   n = find_neighbour (peer);
3667   if (n == NULL)
3668     {
3669       GNUNET_break (0);
3670       return;
3671     }
3672   GNUNET_break (n->is_connected);
3673   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3674   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3675   cnm.peer = *peer;
3676   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3677   n->is_connected = GNUNET_NO;
3678   GNUNET_STATISTICS_update (stats, 
3679                             gettext_noop ("# peers connected"), 
3680                             -1, 
3681                             GNUNET_NO);
3682 }
3683
3684
3685 /**
3686  * Last task run during shutdown.  Disconnects us from
3687  * the transport.
3688  */
3689 static void
3690 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3691 {
3692   struct Neighbour *n;
3693   struct Client *c;
3694
3695 #if DEBUG_CORE
3696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3697               "Core service shutting down.\n");
3698 #endif
3699   GNUNET_assert (transport != NULL);
3700   GNUNET_TRANSPORT_disconnect (transport);
3701   transport = NULL;
3702   while (NULL != (n = neighbours))
3703     {
3704       neighbours = n->next;
3705       GNUNET_assert (neighbour_count > 0);
3706       neighbour_count--;
3707       free_neighbour (n);
3708     }
3709   GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
3710   GNUNET_SERVER_notification_context_destroy (notifier);
3711   notifier = NULL;
3712   while (NULL != (c = clients))
3713     handle_client_disconnect (NULL, c->client_handle);
3714   if (my_private_key != NULL)
3715     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3716   if (stats != NULL)
3717     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
3718 }
3719
3720
3721 /**
3722  * Initiate core service.
3723  *
3724  * @param cls closure
3725  * @param s scheduler to use
3726  * @param serv the initialized server
3727  * @param c configuration to use
3728  */
3729 static void
3730 run (void *cls,
3731      struct GNUNET_SCHEDULER_Handle *s,
3732      struct GNUNET_SERVER_Handle *serv,
3733      const struct GNUNET_CONFIGURATION_Handle *c)
3734 {
3735   char *keyfile;
3736
3737   sched = s;
3738   cfg = c;  
3739   /* parse configuration */
3740   if (
3741        (GNUNET_OK !=
3742         GNUNET_CONFIGURATION_get_value_number (c,
3743                                                "CORE",
3744                                                "TOTAL_QUOTA_IN",
3745                                                &bandwidth_target_in_bps)) ||
3746        (GNUNET_OK !=
3747         GNUNET_CONFIGURATION_get_value_number (c,
3748                                                "CORE",
3749                                                "TOTAL_QUOTA_OUT",
3750                                                &bandwidth_target_out_bps)) ||
3751        (GNUNET_OK !=
3752         GNUNET_CONFIGURATION_get_value_filename (c,
3753                                                  "GNUNETD",
3754                                                  "HOSTKEY", &keyfile)))
3755     {
3756       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3757                   _
3758                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3759       GNUNET_SCHEDULER_shutdown (s);
3760       return;
3761     }
3762   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3763   GNUNET_free (keyfile);
3764   if (my_private_key == NULL)
3765     {
3766       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3767                   _("Core service could not access hostkey.  Exiting.\n"));
3768       GNUNET_SCHEDULER_shutdown (s);
3769       return;
3770     }
3771   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3772   GNUNET_CRYPTO_hash (&my_public_key,
3773                       sizeof (my_public_key), &my_identity.hashPubKey);
3774   /* setup notification */
3775   server = serv;
3776   notifier = GNUNET_SERVER_notification_context_create (server, 
3777                                                         MAX_NOTIFY_QUEUE);
3778   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3779   /* setup transport connection */
3780   transport = GNUNET_TRANSPORT_connect (sched,
3781                                         cfg,
3782                                         NULL,
3783                                         &handle_transport_receive,
3784                                         &handle_transport_notify_connect,
3785                                         &handle_transport_notify_disconnect);
3786   GNUNET_assert (NULL != transport);
3787   stats = GNUNET_STATISTICS_create (sched, "core", cfg);
3788   GNUNET_SCHEDULER_add_delayed (sched,
3789                                 GNUNET_TIME_UNIT_FOREVER_REL,
3790                                 &cleaning_task, NULL);
3791   /* process client requests */
3792   GNUNET_SERVER_add_handlers (server, handlers);
3793   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3794               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3795 }
3796
3797
3798
3799 /**
3800  * The main function for the transport service.
3801  *
3802  * @param argc number of arguments from the command line
3803  * @param argv command line arguments
3804  * @return 0 ok, 1 on error
3805  */
3806 int
3807 main (int argc, char *const *argv)
3808 {
3809   return (GNUNET_OK ==
3810           GNUNET_SERVICE_run (argc,
3811                               argv,
3812                               "core",
3813                               GNUNET_SERVICE_OPTION_NONE,
3814                               &run, NULL)) ? 0 : 1;
3815 }
3816
3817 /* end of gnunet-service-core.c */