nicer msgs
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 /**
47  * Receive and send buffer windows grow over time.  For
48  * how long can 'unused' bandwidth accumulate before we
49  * need to cap it?  (specified in ms).
50  */
51 #define MAX_WINDOW_TIME (5 * 60 * 1000)
52
53 /**
54  * How many messages do we queue up at most for optional
55  * notifications to a client?  (this can cause notifications
56  * about outgoing messages to be dropped).
57  */
58 #define MAX_NOTIFY_QUEUE 16
59
60 /**
61  * Minimum of bytes per minute (out) to assign to any connected peer.
62  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
63  * sense.
64  */
65 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
66
67 /**
68  * What is the smallest change (in number of bytes per minute)
69  * that we consider significant enough to bother triggering?
70  */
71 #define MIN_BPM_CHANGE 32
72
73 /**
74  * After how much time past the "official" expiration time do
75  * we discard messages?  Should not be zero since we may 
76  * intentionally defer transmission until close to the deadline
77  * and then may be slightly past the deadline due to inaccuracy
78  * in sleep and our own CPU consumption.
79  */
80 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
81
82 /**
83  * What is the maximum delay for a SET_KEY message?
84  */
85 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
86
87 /**
88  * What how long do we wait for SET_KEY confirmation initially?
89  */
90 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
91
92 /**
93  * What is the maximum delay for a PING message?
94  */
95 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
96
97 /**
98  * What is the maximum delay for a PONG message?
99  */
100 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
101
102 /**
103  * How often do we recalculate bandwidth quotas?
104  */
105 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
106
107 /**
108  * What is the priority for a SET_KEY message?
109  */
110 #define SET_KEY_PRIORITY 0xFFFFFF
111
112 /**
113  * What is the priority for a PING message?
114  */
115 #define PING_PRIORITY 0xFFFFFF
116
117 /**
118  * What is the priority for a PONG message?
119  */
120 #define PONG_PRIORITY 0xFFFFFF
121
122 /**
123  * How many messages do we queue per peer at most?
124  */
125 #define MAX_PEER_QUEUE_SIZE 16
126
127 /**
128  * How many non-mandatory messages do we queue per client at most?
129  */
130 #define MAX_CLIENT_QUEUE_SIZE 32
131
132 /**
133  * What is the maximum age of a message for us to consider
134  * processing it?  Note that this looks at the timestamp used
135  * by the other peer, so clock skew between machines does
136  * come into play here.  So this should be picked high enough
137  * so that a little bit of clock skew does not prevent peers
138  * from connecting to us.
139  */
140 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
141
142 /**
143  * What is the maximum size for encrypted messages?  Note that this
144  * number imposes a clear limit on the maximum size of any message.
145  * Set to a value close to 64k but not so close that transports will
146  * have trouble with their headers.
147  */
148 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
149
150
151 /**
152  * State machine for our P2P encryption handshake.  Everyone starts in
153  * "DOWN", if we receive the other peer's key (other peer initiated)
154  * we start in state RECEIVED (since we will immediately send our
155  * own); otherwise we start in SENT.  If we get back a PONG from
156  * within either state, we move up to CONFIRMED (the PONG will always
157  * be sent back encrypted with the key we sent to the other peer).
158  */
159 enum PeerStateMachine
160 {
161   PEER_STATE_DOWN,
162   PEER_STATE_KEY_SENT,
163   PEER_STATE_KEY_RECEIVED,
164   PEER_STATE_KEY_CONFIRMED
165 };
166
167
168 /**
169  * Number of bytes (at the beginning) of "struct EncryptedMessage"
170  * that are NOT encrypted.
171  */
172 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
173
174
175 /**
176  * Encapsulation for encrypted messages exchanged between
177  * peers.  Followed by the actual encrypted data.
178  */
179 struct EncryptedMessage
180 {
181   /**
182    * Message type is either CORE_ENCRYPTED_MESSAGE.
183    */
184   struct GNUNET_MessageHeader header;
185
186   /**
187    * Always zero.
188    */
189   uint32_t reserved GNUNET_PACKED;
190
191   /**
192    * Hash of the plaintext, used to verify message integrity;
193    * ALSO used as the IV for the symmetric cipher!  Everything
194    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
195    * must be set to the offset of the next field.
196    */
197   GNUNET_HashCode plaintext_hash;
198
199   /**
200    * Sequence number, in network byte order.  This field
201    * must be the first encrypted/decrypted field and the
202    * first byte that is hashed for the plaintext hash.
203    */
204   uint32_t sequence_number GNUNET_PACKED;
205
206   /**
207    * Desired bandwidth (how much we should send to this
208    * peer / how much is the sender willing to receive),
209    * in bytes per minute.
210    */
211   uint32_t inbound_bpm_limit GNUNET_PACKED;
212
213   /**
214    * Timestamp.  Used to prevent reply of ancient messages
215    * (recent messages are caught with the sequence number).
216    */
217   struct GNUNET_TIME_AbsoluteNBO timestamp;
218
219 };
220
221 /**
222  * We're sending an (encrypted) PING to the other peer to check if he
223  * can decrypt.  The other peer should respond with a PONG with the
224  * same content, except this time encrypted with the receiver's key.
225  */
226 struct PingMessage
227 {
228   /**
229    * Message type is either CORE_PING or CORE_PONG.
230    */
231   struct GNUNET_MessageHeader header;
232
233   /**
234    * Random number chosen to make reply harder.
235    */
236   uint32_t challenge GNUNET_PACKED;
237
238   /**
239    * Intended target of the PING, used primarily to check
240    * that decryption actually worked.
241    */
242   struct GNUNET_PeerIdentity target;
243 };
244
245
246 /**
247  * Message transmitted to set (or update) a session key.
248  */
249 struct SetKeyMessage
250 {
251
252   /**
253    * Message type is either CORE_SET_KEY.
254    */
255   struct GNUNET_MessageHeader header;
256
257   /**
258    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
259    */
260   int32_t sender_status GNUNET_PACKED;
261
262   /**
263    * Purpose of the signature, will be
264    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
265    */
266   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
267
268   /**
269    * At what time was this key created?
270    */
271   struct GNUNET_TIME_AbsoluteNBO creation_time;
272
273   /**
274    * The encrypted session key.
275    */
276   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
277
278   /**
279    * Who is the intended recipient?
280    */
281   struct GNUNET_PeerIdentity target;
282
283   /**
284    * Signature of the stuff above (starting at purpose).
285    */
286   struct GNUNET_CRYPTO_RsaSignature signature;
287
288 };
289
290
291 /**
292  * Message waiting for transmission. This struct
293  * is followed by the actual content of the message.
294  */
295 struct MessageEntry
296 {
297
298   /**
299    * We keep messages in a doubly linked list.
300    */
301   struct MessageEntry *next;
302
303   /**
304    * We keep messages in a doubly linked list.
305    */
306   struct MessageEntry *prev;
307
308   /**
309    * By when are we supposed to transmit this message?
310    */
311   struct GNUNET_TIME_Absolute deadline;
312
313   /**
314    * How important is this message to us?
315    */
316   unsigned int priority;
317
318   /**
319    * How long is the message? (number of bytes following
320    * the "struct MessageEntry", but not including the
321    * size of "struct MessageEntry" itself!)
322    */
323   uint16_t size;
324
325   /**
326    * Was this message selected for transmission in the
327    * current round? GNUNET_YES or GNUNET_NO.
328    */
329   int8_t do_transmit;
330
331   /**
332    * Did we give this message some slack (delayed sending) previously
333    * (and hence should not give it any more slack)? GNUNET_YES or
334    * GNUNET_NO.
335    */
336   int8_t got_slack;
337
338 };
339
340
341 struct Neighbour
342 {
343   /**
344    * We keep neighbours in a linked list (for now).
345    */
346   struct Neighbour *next;
347
348   /**
349    * Unencrypted messages destined for this peer.
350    */
351   struct MessageEntry *messages;
352
353   /**
354    * Head of the batched, encrypted message queue (already ordered,
355    * transmit starting with the head).
356    */
357   struct MessageEntry *encrypted_head;
358
359   /**
360    * Tail of the batched, encrypted message queue (already ordered,
361    * append new messages to tail)
362    */
363   struct MessageEntry *encrypted_tail;
364
365   /**
366    * Handle for pending requests for transmission to this peer
367    * with the transport service.  NULL if no request is pending.
368    */
369   struct GNUNET_TRANSPORT_TransmitHandle *th;
370
371   /**
372    * Public key of the neighbour, NULL if we don't have it yet.
373    */
374   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
375
376   /**
377    * We received a PING message before we got the "public_key"
378    * (or the SET_KEY).  We keep it here until we have a key
379    * to decrypt it.  NULL if no PING is pending.
380    */
381   struct PingMessage *pending_ping;
382
383   /**
384    * We received a PONG message before we got the "public_key"
385    * (or the SET_KEY).  We keep it here until we have a key
386    * to decrypt it.  NULL if no PONG is pending.
387    */
388   struct PingMessage *pending_pong;
389
390   /**
391    * Non-NULL if we are currently looking up HELLOs for this peer.
392    * for this peer.
393    */
394   struct GNUNET_PEERINFO_IteratorContext *pitr;
395
396   /**
397    * SetKeyMessage to transmit, NULL if we are not currently trying
398    * to send one.
399    */
400   struct SetKeyMessage *skm;
401
402   /**
403    * Identity of the neighbour.
404    */
405   struct GNUNET_PeerIdentity peer;
406
407   /**
408    * Key we use to encrypt our messages for the other peer
409    * (initialized by us when we do the handshake).
410    */
411   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
412
413   /**
414    * Key we use to decrypt messages from the other peer
415    * (given to us by the other peer during the handshake).
416    */
417   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
418
419   /**
420    * ID of task used for re-trying plaintext scheduling.
421    */
422   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
423
424   /**
425    * ID of task used for re-trying SET_KEY and PING message.
426    */
427   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
428
429   /**
430    * ID of task used for updating bandwidth quota for this neighbour.
431    */
432   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
433
434   /**
435    * ID of task used for cleaning up dead neighbour entries.
436    */
437   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
438
439   /**
440    * At what time did we generate our encryption key?
441    */
442   struct GNUNET_TIME_Absolute encrypt_key_created;
443
444   /**
445    * At what time did the other peer generate the decryption key?
446    */
447   struct GNUNET_TIME_Absolute decrypt_key_created;
448
449   /**
450    * At what time did we initially establish (as in, complete session
451    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
452    */
453   struct GNUNET_TIME_Absolute time_established;
454
455   /**
456    * At what time did we last receive an encrypted message from the
457    * other peer?  Should be zero if status != KEY_CONFIRMED.
458    */
459   struct GNUNET_TIME_Absolute last_activity;
460
461   /**
462    * Last latency observed from this peer.
463    */
464   struct GNUNET_TIME_Relative last_latency;
465
466   /**
467    * At what frequency are we currently re-trying SET_KEY messages?
468    */
469   struct GNUNET_TIME_Relative set_key_retry_frequency;
470
471   /**
472    * Time of our last update to the "available_send_window".
473    */
474   struct GNUNET_TIME_Absolute last_asw_update;
475
476   /**
477    * Time of our last update to the "available_recv_window".
478    */
479   struct GNUNET_TIME_Absolute last_arw_update;
480
481   /**
482    * Number of bytes that we are eligible to transmit to this
483    * peer at this point.  Incremented every minute by max_out_bpm,
484    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
485    * bandwidth-hogs are sampled at a frequency of about 78s!);
486    * may get negative if we have VERY high priority content.
487    */
488   long long available_send_window; 
489
490   /**
491    * How much downstream capacity of this peer has been reserved for
492    * our traffic?  (Our clients can request that a certain amount of
493    * bandwidth is available for replies to them; this value is used to
494    * make sure that this reserved amount of bandwidth is actually
495    * available).
496    */
497   long long available_recv_window; 
498
499   /**
500    * How valueable were the messages of this peer recently?
501    */
502   unsigned long long current_preference;
503
504   /**
505    * Bit map indicating which of the 32 sequence numbers before the last
506    * were received (good for accepting out-of-order packets and
507    * estimating reliability of the connection)
508    */
509   unsigned int last_packets_bitmap;
510
511   /**
512    * last sequence number received on this connection (highest)
513    */
514   uint32_t last_sequence_number_received;
515
516   /**
517    * last sequence number transmitted
518    */
519   uint32_t last_sequence_number_sent;
520
521   /**
522    * Available bandwidth in for this peer (current target).
523    */
524   uint32_t bpm_in;
525
526   /**
527    * Available bandwidth out for this peer (current target).
528    */
529   uint32_t bpm_out;
530
531   /**
532    * Internal bandwidth limit set for this peer (initially
533    * typically set to "-1").  "bpm_out" is MAX of
534    * "bpm_out_internal_limit" and "bpm_out_external_limit".
535    */
536   uint32_t bpm_out_internal_limit;
537
538   /**
539    * External bandwidth limit set for this peer by the
540    * peer that we are communicating with.  "bpm_out" is MAX of
541    * "bpm_out_internal_limit" and "bpm_out_external_limit".
542    */
543   uint32_t bpm_out_external_limit;
544
545   /**
546    * What was our PING challenge number (for this peer)?
547    */
548   uint32_t ping_challenge;
549
550   /**
551    * What was the last distance to this peer as reported by the transports?
552    */
553   uint32_t last_distance;
554
555   /**
556    * What is our connection status?
557    */
558   enum PeerStateMachine status;
559
560   /**
561    * Are we currently connected to this neighbour?
562    */ 
563   int is_connected;
564 };
565
566
567 /**
568  * Data structure for each client connected to the core service.
569  */
570 struct Client
571 {
572   /**
573    * Clients are kept in a linked list.
574    */
575   struct Client *next;
576
577   /**
578    * Handle for the client with the server API.
579    */
580   struct GNUNET_SERVER_Client *client_handle;
581
582   /**
583    * Array of the types of messages this peer cares
584    * about (with "tcnt" entries).  Allocated as part
585    * of this client struct, do not free!
586    */
587   uint16_t *types;
588
589   /**
590    * Options for messages this client cares about,
591    * see GNUNET_CORE_OPTION_ values.
592    */
593   uint32_t options;
594
595   /**
596    * Number of types of incoming messages this client
597    * specifically cares about.  Size of the "types" array.
598    */
599   unsigned int tcnt;
600
601 };
602
603
604 /**
605  * Our public key.
606  */
607 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
608
609 /**
610  * Our identity.
611  */
612 static struct GNUNET_PeerIdentity my_identity;
613
614 /**
615  * Our private key.
616  */
617 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
618
619 /**
620  * Our scheduler.
621  */
622 struct GNUNET_SCHEDULER_Handle *sched;
623
624 /**
625  * Our configuration.
626  */
627 const struct GNUNET_CONFIGURATION_Handle *cfg;
628
629 /**
630  * Our server.
631  */
632 static struct GNUNET_SERVER_Handle *server;
633
634 /**
635  * Transport service.
636  */
637 static struct GNUNET_TRANSPORT_Handle *transport;
638
639 /**
640  * Linked list of our clients.
641  */
642 static struct Client *clients;
643
644 /**
645  * Context for notifications we need to send to our clients.
646  */
647 static struct GNUNET_SERVER_NotificationContext *notifier;
648
649 /**
650  * We keep neighbours in a linked list (for now).
651  */
652 static struct Neighbour *neighbours;
653
654 /**
655  * Sum of all preferences among all neighbours.
656  */
657 static unsigned long long preference_sum;
658
659 /**
660  * Total number of neighbours we have.
661  */
662 static unsigned int neighbour_count;
663
664 /**
665  * How much inbound bandwidth are we supposed to be using?
666  */
667 static unsigned long long bandwidth_target_in;
668
669 /**
670  * How much outbound bandwidth are we supposed to be using?
671  */
672 static unsigned long long bandwidth_target_out;
673
674
675
676 /**
677  * A preference value for a neighbour was update.  Update
678  * the preference sum accordingly.
679  *
680  * @param inc how much was a preference value increased?
681  */
682 static void
683 update_preference_sum (unsigned long long inc)
684 {
685   struct Neighbour *n;
686   unsigned long long os;
687
688   os = preference_sum;
689   preference_sum += inc;
690   if (preference_sum >= os)
691     return; /* done! */
692   /* overflow! compensate by cutting all values in half! */
693   preference_sum = 0;
694   n = neighbours;
695   while (n != NULL)
696     {
697       n->current_preference /= 2;
698       preference_sum += n->current_preference;
699       n = n->next;
700     }    
701 }
702
703
704 /**
705  * Recalculate the number of bytes we expect to
706  * receive or transmit in a given window.
707  *
708  * @param force force an update now (even if not much time has passed)
709  * @param window pointer to the byte counter (updated)
710  * @param ts pointer to the timestamp (updated)
711  * @param bpm number of bytes per minute that should
712  *        be added to the window.
713  */
714 static void
715 update_window (int force,
716                long long *window,
717                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
718 {
719   struct GNUNET_TIME_Relative since;
720
721   since = GNUNET_TIME_absolute_get_duration (*ts);
722   if ( (force == GNUNET_NO) &&
723        (since.value < 60 * 1000) )
724     return;                     /* not even a minute has passed */
725   *ts = GNUNET_TIME_absolute_get ();
726   *window += (bpm * since.value) / 60 / 1000;
727   if (*window > MAX_WINDOW_TIME * bpm)
728     *window = MAX_WINDOW_TIME * bpm;
729 }
730
731
732 /**
733  * Find the entry for the given neighbour.
734  *
735  * @param peer identity of the neighbour
736  * @return NULL if we are not connected, otherwise the
737  *         neighbour's entry.
738  */
739 static struct Neighbour *
740 find_neighbour (const struct GNUNET_PeerIdentity *peer)
741 {
742   struct Neighbour *ret;
743
744   ret = neighbours;
745   while ((ret != NULL) &&
746          (0 != memcmp (&ret->peer,
747                        peer, sizeof (struct GNUNET_PeerIdentity))))
748     ret = ret->next;
749   return ret;
750 }
751
752
753 /**
754  * Send a message to one of our clients.
755  *
756  * @param client target for the message
757  * @param msg message to transmit
758  * @param can_drop could this message be dropped if the
759  *        client's queue is getting too large?
760  */
761 static void
762 send_to_client (struct Client *client,
763                 const struct GNUNET_MessageHeader *msg, 
764                 int can_drop)
765 {
766 #if DEBUG_CORE_CLIENT
767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768               "Preparing to send message of type %u to client.\n",
769               ntohs (msg->type));
770 #endif  
771   GNUNET_SERVER_notification_context_unicast (notifier,
772                                               client->client_handle,
773                                               msg,
774                                               can_drop);
775 }
776
777
778 /**
779  * Send a message to all of our current clients that have
780  * the right options set.
781  * 
782  * @param msg message to multicast
783  * @param can_drop can this message be discarded if the queue is too long
784  * @param options mask to use 
785  */
786 static void
787 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
788                      int can_drop,
789                      int options)
790 {
791   struct Client *c;
792
793   c = clients;
794   while (c != NULL)
795     {
796       if (0 != (c->options & options))
797         send_to_client (c, msg, can_drop);
798       c = c->next;
799     }
800 }
801
802
803 /**
804  * Handle CORE_INIT request.
805  */
806 static void
807 handle_client_init (void *cls,
808                     struct GNUNET_SERVER_Client *client,
809                     const struct GNUNET_MessageHeader *message)
810 {
811   const struct InitMessage *im;
812   struct InitReplyMessage irm;
813   struct Client *c;
814   uint16_t msize;
815   const uint16_t *types;
816   struct Neighbour *n;
817   struct ConnectNotifyMessage cnm;
818
819 #if DEBUG_CORE_CLIENT
820   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821               "Client connecting to core service with `%s' message\n",
822               "INIT");
823 #endif
824   /* check that we don't have an entry already */
825   c = clients;
826   while (c != NULL)
827     {
828       if (client == c->client_handle)
829         {
830           GNUNET_break (0);
831           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832           return;
833         }
834       c = c->next;
835     }
836   msize = ntohs (message->size);
837   if (msize < sizeof (struct InitMessage))
838     {
839       GNUNET_break (0);
840       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
841       return;
842     }
843   GNUNET_SERVER_notification_context_add (notifier, client);
844   im = (const struct InitMessage *) message;
845   types = (const uint16_t *) &im[1];
846   msize -= sizeof (struct InitMessage);
847   c = GNUNET_malloc (sizeof (struct Client) + msize);
848   c->client_handle = client;
849   c->next = clients;
850   clients = c;
851   memcpy (&c[1], types, msize);
852   c->types = (uint16_t *) & c[1];
853   c->options = ntohl (im->options);
854   c->tcnt = msize / sizeof (uint16_t);
855   /* send init reply message */
856   irm.header.size = htons (sizeof (struct InitReplyMessage));
857   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
858   irm.reserved = htonl (0);
859   memcpy (&irm.publicKey,
860           &my_public_key,
861           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
862 #if DEBUG_CORE_CLIENT
863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
864               "Sending `%s' message to client.\n", "INIT_REPLY");
865 #endif
866   send_to_client (c, &irm.header, GNUNET_NO);
867   /* notify new client about existing neighbours */
868   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
869   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
870   n = neighbours;
871   while (n != NULL)
872     {
873       if (n->status == PEER_STATE_KEY_CONFIRMED)
874         {
875 #if DEBUG_CORE_CLIENT
876           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877                       "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
878 #endif
879           cnm.distance = htonl (n->last_distance);
880           cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
881           cnm.peer = n->peer;
882           send_to_client (c, &cnm.header, GNUNET_NO);
883         }
884       n = n->next;
885     }
886   GNUNET_SERVER_receive_done (client, GNUNET_OK);
887 }
888
889
890 /**
891  * A client disconnected, clean up.
892  *
893  * @param cls closure
894  * @param client identification of the client
895  */
896 static void
897 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
898 {
899   struct Client *pos;
900   struct Client *prev;
901
902   if (client == NULL)
903     return;
904 #if DEBUG_CORE_CLIENT
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "Client has disconnected from core service.\n");
907 #endif
908   prev = NULL;
909   pos = clients;
910   while (pos != NULL)
911     {
912       if (client == pos->client_handle)
913         {
914           if (prev == NULL)
915             clients = pos->next;
916           else
917             prev->next = pos->next;
918           GNUNET_free (pos);
919           return;
920         }
921       prev = pos;
922       pos = pos->next;
923     }
924   /* client never sent INIT */
925 }
926
927
928 /**
929  * Handle REQUEST_INFO request.
930  */
931 static void
932 handle_client_request_info (void *cls,
933                             struct GNUNET_SERVER_Client *client,
934                             const struct GNUNET_MessageHeader *message)
935 {
936   const struct RequestInfoMessage *rcm;
937   struct Neighbour *n;
938   struct ConfigurationInfoMessage cim;
939   int reserv;
940   unsigned long long old_preference;
941   struct GNUNET_SERVER_TransmitContext *tc;
942
943 #if DEBUG_CORE_CLIENT
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "Core service receives `%s' request.\n", "REQUEST_INFO");
946 #endif
947   rcm = (const struct RequestInfoMessage *) message;
948   n = find_neighbour (&rcm->peer);
949   memset (&cim, 0, sizeof (cim));
950   if (n != NULL) 
951     {
952       update_window (GNUNET_YES,
953                      &n->available_send_window,
954                      &n->last_asw_update,
955                      n->bpm_out);
956       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
957       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
958                                n->bpm_out_external_limit);
959       reserv = ntohl (rcm->reserve_inbound);
960       if (reserv < 0)
961         {
962           n->available_recv_window += reserv;
963         }
964       else if (reserv > 0)
965         {
966           update_window (GNUNET_NO,
967                          &n->available_recv_window,
968                          &n->last_arw_update, n->bpm_in);
969           if (n->available_recv_window < reserv)
970             reserv = n->available_recv_window;
971           n->available_recv_window -= reserv;
972         }
973       old_preference = n->current_preference;
974       n->current_preference += GNUNET_ntohll(rcm->preference_change);
975       if (old_preference > n->current_preference) 
976         {
977           /* overflow; cap at maximum value */
978           n->current_preference = (unsigned long long) -1;
979         }
980       update_preference_sum (n->current_preference - old_preference);
981       cim.reserved_amount = htonl (reserv);
982       cim.bpm_in = htonl (n->bpm_in);
983       cim.bpm_out = htonl (n->bpm_out);
984       cim.preference = n->current_preference;
985     }
986   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
987   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
988   cim.peer = rcm->peer;
989
990 #if DEBUG_CORE_CLIENT
991   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
993 #endif
994   tc = GNUNET_SERVER_transmit_context_create (client);
995   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
996   GNUNET_SERVER_transmit_context_run (tc,
997                                       GNUNET_TIME_UNIT_FOREVER_REL);
998 }
999
1000
1001 /**
1002  * Free the given entry for the neighbour (it has
1003  * already been removed from the list at this point).
1004  *
1005  * @param n neighbour to free
1006  */
1007 static void
1008 free_neighbour (struct Neighbour *n)
1009 {
1010   struct MessageEntry *m;
1011
1012   if (n->pitr != NULL)
1013     {
1014       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1015       n->pitr = NULL;
1016     }
1017   if (n->skm != NULL)
1018     {
1019       GNUNET_free (n->skm);
1020       n->skm = NULL;
1021     }
1022   while (NULL != (m = n->messages))
1023     {
1024       n->messages = m->next;
1025       GNUNET_free (m);
1026     }
1027   while (NULL != (m = n->encrypted_head))
1028     {
1029       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1030                                    n->encrypted_tail,
1031                                    m);
1032       GNUNET_free (m);
1033     }
1034   if (NULL != n->th)
1035     {
1036       GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1037       n->th = NULL;
1038     }
1039   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1040     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1041   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1042     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1043   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1044     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1045   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1046     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1047   GNUNET_free_non_null (n->public_key);
1048   GNUNET_free_non_null (n->pending_ping);
1049   GNUNET_free_non_null (n->pending_pong);
1050   GNUNET_free (n);
1051 }
1052
1053
1054 /**
1055  * Consider freeing the given neighbour since we may not need
1056  * to keep it around anymore.
1057  *
1058  * @param n neighbour to consider discarding
1059  */
1060 static void
1061 consider_free_neighbour (struct Neighbour *n);
1062
1063
1064 /**
1065  * Task triggered when a neighbour entry might have gotten stale.
1066  *
1067  * @param cls the 'struct Neighbour'
1068  * @param tc scheduler context (not used)
1069  */
1070 static void
1071 consider_free_task (void *cls,
1072                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1073 {
1074   struct Neighbour *n = cls;
1075   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1076   consider_free_neighbour (n);
1077 }
1078
1079
1080 /**
1081  * Consider freeing the given neighbour since we may not need
1082  * to keep it around anymore.
1083  *
1084  * @param n neighbour to consider discarding
1085  */
1086 static void
1087 consider_free_neighbour (struct Neighbour *n)
1088
1089   struct Neighbour *pos;
1090   struct Neighbour *prev;
1091   struct GNUNET_TIME_Relative left;
1092
1093   if ( (n->th != NULL) ||
1094        (n->pitr != NULL) ||
1095        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1096        (GNUNET_YES == n->is_connected) )
1097     return; /* no chance */
1098   
1099   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1100                                                                        MAX_PONG_DELAY));
1101   if (left.value > 0)
1102     {
1103       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1104         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1105       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1106                                                          left,
1107                                                          &consider_free_task,
1108                                                          n);
1109       return;
1110     }
1111   /* actually free the neighbour... */
1112   prev = NULL;
1113   pos = neighbours;
1114   while (pos != n)
1115     {
1116       prev = pos;
1117       pos = pos->next;
1118     }
1119   if (prev == NULL)
1120     neighbours = n->next;
1121   else
1122     prev->next = n->next;
1123   GNUNET_assert (neighbour_count > 0);
1124   neighbour_count--;
1125   free_neighbour (n);
1126 }
1127
1128
1129 /**
1130  * Check if we have encrypted messages for the specified neighbour
1131  * pending, and if so, check with the transport about sending them
1132  * out.
1133  *
1134  * @param n neighbour to check.
1135  */
1136 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1137
1138
1139 /**
1140  * Function called when the transport service is ready to
1141  * receive an encrypted message for the respective peer
1142  *
1143  * @param cls neighbour to use message from
1144  * @param size number of bytes we can transmit
1145  * @param buf where to copy the message
1146  * @return number of bytes transmitted
1147  */
1148 static size_t
1149 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1150 {
1151   struct Neighbour *n = cls;
1152   struct MessageEntry *m;
1153   size_t ret;
1154   char *cbuf;
1155
1156   n->th = NULL;
1157   GNUNET_assert (NULL != (m = n->encrypted_head));
1158   GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1159                                n->encrypted_tail,
1160                                m);
1161   ret = 0;
1162   cbuf = buf;
1163   if (buf != NULL)
1164     {
1165       GNUNET_assert (size >= m->size);
1166       memcpy (cbuf, &m[1], m->size);
1167       ret = m->size;
1168       n->available_send_window -= m->size;
1169       process_encrypted_neighbour_queue (n);
1170
1171 #if DEBUG_CORE
1172       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1174                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1175                   ret, GNUNET_i2s (&n->peer));
1176 #endif
1177     }
1178   else
1179     {
1180 #if DEBUG_CORE
1181       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182                   "Transmission of message of type %u and size %u failed\n",
1183                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1184                   m->size);
1185 #endif
1186     }
1187   GNUNET_free (m);
1188   consider_free_neighbour (n);
1189   return ret;
1190 }
1191
1192
1193 /**
1194  * Check if we have plaintext messages for the specified neighbour
1195  * pending, and if so, consider batching and encrypting them (and
1196  * then trigger processing of the encrypted queue if needed).
1197  *
1198  * @param n neighbour to check.
1199  */
1200 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1201
1202
1203 /**
1204  * Check if we have encrypted messages for the specified neighbour
1205  * pending, and if so, check with the transport about sending them
1206  * out.
1207  *
1208  * @param n neighbour to check.
1209  */
1210 static void
1211 process_encrypted_neighbour_queue (struct Neighbour *n)
1212 {
1213   struct MessageEntry *m;
1214  
1215   if (n->th != NULL)
1216     return;  /* request already pending */
1217   m = n->encrypted_head;
1218   if (m == NULL)
1219     {
1220       /* encrypted queue empty, try plaintext instead */
1221       process_plaintext_neighbour_queue (n);
1222       return;
1223     }
1224 #if DEBUG_CORE
1225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1226               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1227               m->size,
1228               GNUNET_i2s (&n->peer),
1229               GNUNET_TIME_absolute_get_remaining (m->deadline).
1230               value);
1231 #endif
1232   n->th =
1233     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1234                                             m->size,
1235                                             m->priority,
1236                                             GNUNET_TIME_absolute_get_remaining
1237                                             (m->deadline),
1238                                             &notify_encrypted_transmit_ready,
1239                                             n);
1240   if (n->th == NULL)
1241     {
1242       /* message request too large or duplicate request */
1243       GNUNET_break (0);
1244       /* discard encrypted message */
1245       GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
1246                                    n->encrypted_tail,
1247                                    m);
1248       GNUNET_free (m);
1249       process_encrypted_neighbour_queue (n);
1250     }
1251 }
1252
1253
1254 /**
1255  * Decrypt size bytes from in and write the result to out.  Use the
1256  * key for inbound traffic of the given neighbour.  This function does
1257  * NOT do any integrity-checks on the result.
1258  *
1259  * @param n neighbour we are receiving from
1260  * @param iv initialization vector to use
1261  * @param in ciphertext
1262  * @param out plaintext
1263  * @param size size of in/out
1264  * @return GNUNET_OK on success
1265  */
1266 static int
1267 do_decrypt (struct Neighbour *n,
1268             const GNUNET_HashCode * iv,
1269             const void *in, void *out, size_t size)
1270 {
1271   if (size != (uint16_t) size)
1272     {
1273       GNUNET_break (0);
1274       return GNUNET_NO;
1275     }
1276   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1277       (n->status != PEER_STATE_KEY_CONFIRMED))
1278     {
1279       GNUNET_break_op (0);
1280       return GNUNET_SYSERR;
1281     }
1282   if (size !=
1283       GNUNET_CRYPTO_aes_decrypt (in,
1284                                  (uint16_t) size,
1285                                  &n->decrypt_key,
1286                                  (const struct
1287                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1288                                  out))
1289     {
1290       GNUNET_break (0);
1291       return GNUNET_SYSERR;
1292     }
1293 #if DEBUG_CORE
1294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295               "Decrypted %u bytes from `%4s' using key %u\n",
1296               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1297 #endif
1298   return GNUNET_OK;
1299 }
1300
1301
1302 /**
1303  * Encrypt size bytes from in and write the result to out.  Use the
1304  * key for outbound traffic of the given neighbour.
1305  *
1306  * @param n neighbour we are sending to
1307  * @param iv initialization vector to use
1308  * @param in ciphertext
1309  * @param out plaintext
1310  * @param size size of in/out
1311  * @return GNUNET_OK on success
1312  */
1313 static int
1314 do_encrypt (struct Neighbour *n,
1315             const GNUNET_HashCode * iv,
1316             const void *in, void *out, size_t size)
1317 {
1318   if (size != (uint16_t) size)
1319     {
1320       GNUNET_break (0);
1321       return GNUNET_NO;
1322     }
1323   GNUNET_assert (size ==
1324                  GNUNET_CRYPTO_aes_encrypt (in,
1325                                             (uint16_t) size,
1326                                             &n->encrypt_key,
1327                                             (const struct
1328                                              GNUNET_CRYPTO_AesInitializationVector
1329                                              *) iv, out));
1330 #if DEBUG_CORE
1331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332               "Encrypted %u bytes for `%4s' using key %u\n", size,
1333               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1334 #endif
1335   return GNUNET_OK;
1336 }
1337
1338
1339 /**
1340  * Select messages for transmission.  This heuristic uses a combination
1341  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1342  * and priority-based discard (in case no feasible schedule exist) and
1343  * speculative optimization (defer any kind of transmission until
1344  * we either create a batch of significant size, 25% of max, or until
1345  * we are close to a deadline).  Furthermore, when scheduling the
1346  * heuristic also packs as many messages into the batch as possible,
1347  * starting with those with the earliest deadline.  Yes, this is fun.
1348  *
1349  * @param n neighbour to select messages from
1350  * @param size number of bytes to select for transmission
1351  * @param retry_time set to the time when we should try again
1352  *        (only valid if this function returns zero)
1353  * @return number of bytes selected, or 0 if we decided to
1354  *         defer scheduling overall; in that case, retry_time is set.
1355  */
1356 static size_t
1357 select_messages (struct Neighbour *n,
1358                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1359 {
1360   struct MessageEntry *pos;
1361   struct MessageEntry *min;
1362   struct MessageEntry *last;
1363   unsigned int min_prio;
1364   struct GNUNET_TIME_Absolute t;
1365   struct GNUNET_TIME_Absolute now;
1366   uint64_t delta;
1367   uint64_t avail;
1368   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1369   size_t off;
1370   int discard_low_prio;
1371
1372   GNUNET_assert (NULL != n->messages);
1373   now = GNUNET_TIME_absolute_get ();
1374   /* last entry in linked list of messages processed */
1375   last = NULL;
1376   /* should we remove the entry with the lowest
1377      priority from consideration for scheduling at the
1378      end of the loop? */
1379   discard_low_prio = GNUNET_YES;
1380   while (GNUNET_YES == discard_low_prio)
1381     {
1382       min = NULL;
1383       min_prio = -1;
1384       discard_low_prio = GNUNET_NO;
1385       /* calculate number of bytes available for transmission at time "t" */
1386       update_window (GNUNET_NO,
1387                      &n->available_send_window,
1388                      &n->last_asw_update,
1389                      n->bpm_out);
1390       avail = n->available_send_window;
1391       t = n->last_asw_update;
1392       /* how many bytes have we (hypothetically) scheduled so far */
1393       off = 0;
1394       /* maximum time we can wait before transmitting anything
1395          and still make all of our deadlines */
1396       slack = -1;
1397
1398       pos = n->messages;
1399       /* note that we use "*2" here because we want to look
1400          a bit further into the future; much more makes no
1401          sense since new message might be scheduled in the
1402          meantime... */
1403       while ((pos != NULL) && (off < size * 2))
1404         {
1405           if (pos->do_transmit == GNUNET_YES)
1406             {
1407               /* already removed from consideration */
1408               pos = pos->next;
1409               continue;
1410             }
1411           if (discard_low_prio == GNUNET_NO)
1412             {
1413               delta = pos->deadline.value;
1414               if (delta < t.value)
1415                 delta = 0;
1416               else
1417                 delta = t.value - delta;
1418               avail += delta * n->bpm_out / 1000 / 60;
1419               if (avail < pos->size)
1420                 {
1421                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1422                 }
1423               else
1424                 {
1425                   avail -= pos->size;
1426                   /* update slack, considering both its absolute deadline
1427                      and relative deadlines caused by other messages
1428                      with their respective load */
1429                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1430                   if ( (pos->deadline.value < now.value) ||
1431                        (GNUNET_YES == pos->got_slack) )                
1432                     {
1433                       slack = 0;
1434                     }
1435                   else
1436                     {
1437                       slack =
1438                         GNUNET_MIN (slack, pos->deadline.value - now.value);
1439                       pos->got_slack = GNUNET_YES;
1440                     }
1441                 }
1442             }
1443
1444           off += pos->size;
1445           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1446           if (pos->priority <= min_prio)
1447             {
1448               /* update min for discard */
1449               min_prio = pos->priority;
1450               min = pos;
1451             }
1452           pos = pos->next;
1453         }
1454       if (discard_low_prio)
1455         {
1456           GNUNET_assert (min != NULL);
1457           /* remove lowest-priority entry from consideration */
1458           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1459         }
1460       last = pos;
1461     }
1462   /* guard against sending "tiny" messages with large headers without
1463      urgent deadlines */
1464   if ( (slack > 1000) && (size > 4 * off) )
1465     {
1466       /* less than 25% of message would be filled with deadlines still
1467          being met if we delay by one second or more; so just wait for
1468          more data; but do not wait longer than 1s (since we don't want
1469          to delay messages for a really long time either). */
1470       retry_time->value = 1000;
1471       /* reset do_transmit values for next time */
1472       while (pos != last)
1473         {
1474           pos->do_transmit = GNUNET_NO;   
1475           pos = pos->next;
1476         }
1477 #if DEBUG_CORE
1478       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1479                   "Deferring transmission for 1s due to underfull message buffer size\n");
1480 #endif
1481       return 0;
1482     }
1483   /* select marked messages (up to size) for transmission */
1484   off = 0;
1485   pos = n->messages;
1486   while (pos != last)
1487     {
1488       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1489         {
1490           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1491           off += pos->size;
1492           size -= pos->size;
1493         }
1494       else
1495         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1496       pos = pos->next;
1497     }
1498 #if DEBUG_CORE
1499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1500               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1501               off, GNUNET_i2s (&n->peer));
1502 #endif
1503   return off;
1504 }
1505
1506
1507 /**
1508  * Batch multiple messages into a larger buffer.
1509  *
1510  * @param n neighbour to take messages from
1511  * @param buf target buffer
1512  * @param size size of buf
1513  * @param deadline set to transmission deadline for the result
1514  * @param retry_time set to the time when we should try again
1515  *        (only valid if this function returns zero)
1516  * @param priority set to the priority of the batch
1517  * @return number of bytes written to buf (can be zero)
1518  */
1519 static size_t
1520 batch_message (struct Neighbour *n,
1521                char *buf,
1522                size_t size,
1523                struct GNUNET_TIME_Absolute *deadline,
1524                struct GNUNET_TIME_Relative *retry_time,
1525                unsigned int *priority)
1526 {
1527   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1528   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1529   struct MessageEntry *pos;
1530   struct MessageEntry *prev;
1531   struct MessageEntry *next;
1532   size_t ret;
1533   
1534   ret = 0;
1535   *priority = 0;
1536   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1537   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1538   if (0 == select_messages (n, size, retry_time))
1539     {
1540       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1541                   "No messages selected, will try again in %llu ms\n",
1542                   retry_time->value);
1543       return 0;
1544     }
1545   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1546   ntm->distance = htonl (n->last_distance);
1547   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1548   ntm->peer = n->peer;
1549   
1550   pos = n->messages;
1551   prev = NULL;
1552   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1553     {
1554       next = pos->next;
1555       if (GNUNET_YES == pos->do_transmit)
1556         {
1557           GNUNET_assert (pos->size <= size);
1558           /* do notifications */
1559           /* FIXME: track if we have *any* client that wants
1560              full notifications and only do this if that is
1561              actually true */
1562           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1563             {
1564               memcpy (&ntm[1], &pos[1], pos->size);
1565               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1566                                         sizeof (struct GNUNET_MessageHeader));
1567               send_to_all_clients (&ntm->header,
1568                                    GNUNET_YES,
1569                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1570             }
1571           else
1572             {
1573               /* message too large for 'full' notifications, we do at
1574                  least the 'hdr' type */
1575               memcpy (&ntm[1],
1576                       &pos[1],
1577                       sizeof (struct GNUNET_MessageHeader));
1578             }
1579           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1580                                     pos->size);
1581           send_to_all_clients (&ntm->header,
1582                                GNUNET_YES,
1583                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1584 #if DEBUG_HANDSHAKE
1585           fprintf (stderr,
1586                    "Encrypting message of type %u\n",
1587                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1588 #endif
1589           /* copy for encrypted transmission */
1590           memcpy (&buf[ret], &pos[1], pos->size);
1591           ret += pos->size;
1592           size -= pos->size;
1593           *priority += pos->priority;
1594 #if DEBUG_CORE
1595           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1596                       "Adding plaintext message with deadline %llu ms to batch\n",
1597                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1598 #endif
1599           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1600           GNUNET_free (pos);
1601           if (prev == NULL)
1602             n->messages = next;
1603           else
1604             prev->next = next;
1605         }
1606       else
1607         {
1608           prev = pos;
1609         }
1610       pos = next;
1611     }
1612 #if DEBUG_CORE
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614               "Deadline for message batch is %llu ms\n",
1615               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1616 #endif
1617   return ret;
1618 }
1619
1620
1621 /**
1622  * Remove messages with deadlines that have long expired from
1623  * the queue.
1624  *
1625  * @param n neighbour to inspect
1626  */
1627 static void
1628 discard_expired_messages (struct Neighbour *n)
1629 {
1630   struct MessageEntry *prev;
1631   struct MessageEntry *next;
1632   struct MessageEntry *pos;
1633   struct GNUNET_TIME_Absolute now;
1634   struct GNUNET_TIME_Relative delta;
1635
1636   now = GNUNET_TIME_absolute_get ();
1637   prev = NULL;
1638   pos = n->messages;
1639   while (pos != NULL) 
1640     {
1641       next = pos->next;
1642       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1643       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1644         {
1645 #if DEBUG_CORE
1646           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1647                       "Message is %llu ms past due, discarding.\n",
1648                       delta.value);
1649 #endif
1650           if (prev == NULL)
1651             n->messages = next;
1652           else
1653             prev->next = next;
1654           GNUNET_free (pos);
1655         }
1656       else
1657         prev = pos;
1658       pos = next;
1659     }
1660 }
1661
1662
1663 /**
1664  * Signature of the main function of a task.
1665  *
1666  * @param cls closure
1667  * @param tc context information (why was this task triggered now)
1668  */
1669 static void
1670 retry_plaintext_processing (void *cls,
1671                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1672 {
1673   struct Neighbour *n = cls;
1674
1675   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1676   process_plaintext_neighbour_queue (n);
1677 }
1678
1679
1680 /**
1681  * Send our key (and encrypted PING) to the other peer.
1682  *
1683  * @param n the other peer
1684  */
1685 static void send_key (struct Neighbour *n);
1686
1687 /**
1688  * Task that will retry "send_key" if our previous attempt failed
1689  * to yield a PONG.
1690  */
1691 static void
1692 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1693 {
1694   struct Neighbour *n = cls;
1695
1696   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1697   n->set_key_retry_frequency =
1698     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1699   send_key (n);
1700 }
1701
1702
1703 /**
1704  * Check if we have plaintext messages for the specified neighbour
1705  * pending, and if so, consider batching and encrypting them (and
1706  * then trigger processing of the encrypted queue if needed).
1707  *
1708  * @param n neighbour to check.
1709  */
1710 static void
1711 process_plaintext_neighbour_queue (struct Neighbour *n)
1712 {
1713   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1714   size_t used;
1715   size_t esize;
1716   struct EncryptedMessage *em;  /* encrypted message */
1717   struct EncryptedMessage *ph;  /* plaintext header */
1718   struct MessageEntry *me;
1719   unsigned int priority;
1720   struct GNUNET_TIME_Absolute deadline;
1721   struct GNUNET_TIME_Relative retry_time;
1722
1723   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1724     {
1725       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1726       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1727     }
1728   switch (n->status)
1729     {
1730     case PEER_STATE_DOWN:
1731       send_key (n);
1732 #if DEBUG_CORE
1733       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1734                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1735                   GNUNET_i2s(&n->peer));
1736 #endif
1737       return;
1738     case PEER_STATE_KEY_SENT:
1739       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1740         n->retry_set_key_task
1741           = GNUNET_SCHEDULER_add_delayed (sched,
1742                                           n->set_key_retry_frequency,
1743                                           &set_key_retry_task, n);    
1744 #if DEBUG_CORE
1745       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1746                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1747                   GNUNET_i2s(&n->peer));
1748 #endif
1749       return;
1750     case PEER_STATE_KEY_RECEIVED:
1751       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1752         n->retry_set_key_task
1753           = GNUNET_SCHEDULER_add_delayed (sched,
1754                                           n->set_key_retry_frequency,
1755                                           &set_key_retry_task, n);        
1756 #if DEBUG_CORE
1757       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1758                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1759                   GNUNET_i2s(&n->peer));
1760 #endif
1761       return;
1762     case PEER_STATE_KEY_CONFIRMED:
1763       /* ready to continue */
1764       break;
1765     }
1766   discard_expired_messages (n);
1767   if (n->messages == NULL)
1768     {
1769 #if DEBUG_CORE
1770       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771                   "Plaintext message queue for `%4s' is empty.\n",
1772                   GNUNET_i2s(&n->peer));
1773 #endif
1774       return;                   /* no pending messages */
1775     }
1776   if (n->encrypted_head != NULL)
1777     {
1778 #if DEBUG_CORE
1779       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1780                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1781                   GNUNET_i2s(&n->peer));
1782 #endif
1783       return;                   /* wait for messages already encrypted to be
1784                                    processed first! */
1785     }
1786   ph = (struct EncryptedMessage *) pbuf;
1787   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1788   priority = 0;
1789   used = sizeof (struct EncryptedMessage);
1790   used += batch_message (n,
1791                          &pbuf[used],
1792                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1793                          &deadline, &retry_time, &priority);
1794   if (used == sizeof (struct EncryptedMessage))
1795     {
1796 #if DEBUG_CORE
1797       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1798                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1799                   GNUNET_i2s(&n->peer));
1800 #endif
1801       /* no messages selected for sending, try again later... */
1802       n->retry_plaintext_task =
1803         GNUNET_SCHEDULER_add_delayed (sched,
1804                                       retry_time,
1805                                       &retry_plaintext_processing, n);
1806       return;
1807     }
1808   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1809   ph->inbound_bpm_limit = htonl (n->bpm_in);
1810   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1811
1812   /* setup encryption message header */
1813   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1814   me->deadline = deadline;
1815   me->priority = priority;
1816   me->size = used;
1817   em = (struct EncryptedMessage *) &me[1];
1818   em->header.size = htons (used);
1819   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1820   em->reserved = htonl (0);
1821   esize = used - ENCRYPTED_HEADER_SIZE;
1822   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1823   /* encrypt */
1824 #if DEBUG_CORE
1825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1827               esize,
1828               GNUNET_i2s(&n->peer),
1829               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1830 #endif
1831   GNUNET_assert (GNUNET_OK ==
1832                  do_encrypt (n,
1833                              &em->plaintext_hash,
1834                              &ph->sequence_number,
1835                              &em->sequence_number, esize));
1836   /* append to transmission list */
1837   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
1838                                      n->encrypted_tail,
1839                                      n->encrypted_tail,
1840                                      me);
1841   process_encrypted_neighbour_queue (n);
1842 }
1843
1844
1845 /**
1846  * Function that recalculates the bandwidth quota for the
1847  * given neighbour and transmits it to the transport service.
1848  * 
1849  * @param cls neighbour for the quota update
1850  * @param tc context
1851  */
1852 static void
1853 neighbour_quota_update (void *cls,
1854                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1855
1856
1857 /**
1858  * Schedule the task that will recalculate the bandwidth
1859  * quota for this peer (and possibly force a disconnect of
1860  * idle peers by calculating a bandwidth of zero).
1861  */
1862 static void
1863 schedule_quota_update (struct Neighbour *n)
1864 {
1865   GNUNET_assert (n->quota_update_task ==
1866                  GNUNET_SCHEDULER_NO_TASK);
1867   n->quota_update_task
1868     = GNUNET_SCHEDULER_add_delayed (sched,
1869                                     QUOTA_UPDATE_FREQUENCY,
1870                                     &neighbour_quota_update,
1871                                     n);
1872 }
1873
1874
1875 /**
1876  * Initialize a new 'struct Neighbour'.
1877  *
1878  * @param pid ID of the new neighbour
1879  * @return handle for the new neighbour
1880  */
1881 static struct Neighbour *
1882 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1883 {
1884   struct Neighbour *n;
1885   struct GNUNET_TIME_Absolute now;
1886
1887   n = GNUNET_malloc (sizeof (struct Neighbour));
1888   n->next = neighbours;
1889   neighbours = n;
1890   neighbour_count++;
1891   n->peer = *pid;
1892   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1893   now = GNUNET_TIME_absolute_get ();
1894   n->encrypt_key_created = now;
1895   n->last_activity = now;
1896   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1897   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1898   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1899   n->bpm_out_internal_limit = (uint32_t) - 1;
1900   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1901   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1902                                                 (uint32_t) - 1);
1903   schedule_quota_update (n);
1904   return n;
1905 }
1906
1907
1908
1909
1910
1911 /**
1912  * Handle CORE_SEND request.
1913  *
1914  * @param cls unused
1915  * @param client the client issuing the request
1916  * @param message the "struct SendMessage"
1917  */
1918 static void
1919 handle_client_send (void *cls,
1920                     struct GNUNET_SERVER_Client *client,
1921                     const struct GNUNET_MessageHeader *message)
1922 {
1923   const struct SendMessage *sm;
1924   const struct GNUNET_MessageHeader *mh;
1925   struct Neighbour *n;
1926   struct MessageEntry *prev;
1927   struct MessageEntry *pos;
1928   struct MessageEntry *e; 
1929   struct MessageEntry *min_prio_entry;
1930   struct MessageEntry *min_prio_prev;
1931   unsigned int min_prio;
1932   unsigned int queue_size;
1933   uint16_t msize;
1934
1935   msize = ntohs (message->size);
1936   if (msize <
1937       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1938     {
1939       GNUNET_break (0);
1940       if (client != NULL)
1941         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1942       return;
1943     }
1944   sm = (const struct SendMessage *) message;
1945   msize -= sizeof (struct SendMessage);
1946   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1947   if (msize != ntohs (mh->size))
1948     {
1949       GNUNET_break (0);
1950       if (client != NULL)
1951         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1952       return;
1953     }
1954   n = find_neighbour (&sm->peer);
1955   if (n == NULL)
1956     n = create_neighbour (&sm->peer);
1957 #if DEBUG_CORE
1958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1960               "SEND",
1961               msize, 
1962               GNUNET_i2s (&sm->peer));
1963 #endif
1964   /* bound queue size */
1965   discard_expired_messages (n);
1966   min_prio = (unsigned int) -1;
1967   min_prio_entry = NULL;
1968   min_prio_prev = NULL;
1969   queue_size = 0;
1970   prev = NULL;
1971   pos = n->messages;
1972   while (pos != NULL) 
1973     {
1974       if (pos->priority < min_prio)
1975         {
1976           min_prio_entry = pos;
1977           min_prio_prev = prev;
1978           min_prio = pos->priority;
1979         }
1980       queue_size++;
1981       prev = pos;
1982       pos = pos->next;
1983     }
1984   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1985     {
1986       /* queue full */
1987       if (ntohl(sm->priority) <= min_prio)
1988         {
1989           /* discard new entry */
1990 #if DEBUG_CORE
1991           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992                       "Queue full, discarding new request\n");
1993 #endif
1994           if (client != NULL)
1995             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1996           return;
1997         }
1998       /* discard "min_prio_entry" */
1999 #if DEBUG_CORE
2000       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001                   "Queue full, discarding existing older request\n");
2002 #endif
2003       if (min_prio_prev == NULL)
2004         n->messages = min_prio_entry->next;
2005       else
2006         min_prio_prev->next = min_prio_entry->next;      
2007       GNUNET_free (min_prio_entry);     
2008     }
2009
2010 #if DEBUG_CORE
2011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2012               "Adding transmission request for `%4s' to queue\n",
2013               GNUNET_i2s (&sm->peer));
2014 #endif  
2015   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2016   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2017   e->priority = ntohl (sm->priority);
2018   e->size = msize;
2019   memcpy (&e[1], mh, msize);
2020
2021   /* insert, keep list sorted by deadline */
2022   prev = NULL;
2023   pos = n->messages;
2024   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2025     {
2026       prev = pos;
2027       pos = pos->next;
2028     }
2029   if (prev == NULL)
2030     n->messages = e;
2031   else
2032     prev->next = e;
2033   e->next = pos;
2034
2035   /* consider scheduling now */
2036   process_plaintext_neighbour_queue (n);
2037   if (client != NULL)
2038     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2039 }
2040
2041
2042 /**
2043  * Function called when the transport service is ready to
2044  * receive a message.  Only resets 'n->th' to NULL.
2045  *
2046  * @param cls neighbour to use message from
2047  * @param size number of bytes we can transmit
2048  * @param buf where to copy the message
2049  * @return number of bytes transmitted
2050  */
2051 static size_t
2052 notify_transport_connect_done (void *cls, size_t size, void *buf)
2053 {
2054   struct Neighbour *n = cls;
2055   n->th = NULL;
2056   send_key (n);
2057   return 0;
2058 }
2059
2060
2061 /**
2062  * Handle CORE_REQUEST_CONNECT request.
2063  *
2064  * @param cls unused
2065  * @param client the client issuing the request
2066  * @param message the "struct ConnectMessage"
2067  */
2068 static void
2069 handle_client_request_connect (void *cls,
2070                                struct GNUNET_SERVER_Client *client,
2071                                const struct GNUNET_MessageHeader *message)
2072 {
2073   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2074   struct Neighbour *n;
2075   struct GNUNET_TIME_Relative timeout;
2076
2077   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2078   n = find_neighbour (&cm->peer);
2079   if (n == NULL)
2080     n = create_neighbour (&cm->peer);
2081   if ( (n->is_connected) ||
2082        (n->th != NULL) )
2083     return; /* already connected, or at least trying */
2084 #if DEBUG_CORE
2085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086               "Core received `%s' request for `%4s', will try to establish connection\n",
2087               "REQUEST_CONNECT",
2088               GNUNET_i2s (&cm->peer));
2089 #endif
2090   timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
2091   /* ask transport to connect to the peer */
2092   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2093                                                   &cm->peer,
2094                                                   sizeof (struct GNUNET_MessageHeader), 0,
2095                                                   timeout,
2096                                                   &notify_transport_connect_done,
2097                                                   n);
2098   GNUNET_break (NULL != n->th);
2099 }
2100
2101
2102 /**
2103  * List of handlers for the messages understood by this
2104  * service.
2105  */
2106 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2107   {&handle_client_init, NULL,
2108    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2109   {&handle_client_request_info, NULL,
2110    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2111    sizeof (struct RequestInfoMessage)},
2112   {&handle_client_send, NULL,
2113    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2114   {&handle_client_request_connect, NULL,
2115    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2116    sizeof (struct ConnectMessage)},
2117   {NULL, NULL, 0, 0}
2118 };
2119
2120
2121 /**
2122  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2123  * the neighbour's struct and retry send_key.  Or, if we did not get a
2124  * HELLO, just do nothing.
2125  *
2126  * @param cls the 'struct Neighbour' to retry sending the key for
2127  * @param peer the peer for which this is the HELLO
2128  * @param hello HELLO message of that peer
2129  * @param trust amount of trust we currently have in that peer
2130  */
2131 static void
2132 process_hello_retry_send_key (void *cls,
2133                               const struct GNUNET_PeerIdentity *peer,
2134                               const struct GNUNET_HELLO_Message *hello,
2135                               uint32_t trust)
2136 {
2137   struct Neighbour *n = cls;
2138
2139   if (peer == NULL)
2140     {
2141 #if DEBUG_CORE
2142       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2143                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2144 #endif
2145       n->pitr = NULL;
2146       if (n->public_key != NULL)
2147         {
2148           send_key (n);
2149         }
2150       else
2151         {
2152           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2153             n->retry_set_key_task
2154               = GNUNET_SCHEDULER_add_delayed (sched,
2155                                               n->set_key_retry_frequency,
2156                                               &set_key_retry_task, n);
2157         }
2158       return;
2159     }
2160
2161 #if DEBUG_CORE
2162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2163               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2164               GNUNET_i2s (peer));
2165 #endif
2166   if (n->public_key != NULL)
2167     {
2168 #if DEBUG_CORE
2169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170               "already have public key for peer %s!! (so why are we here?)\n",
2171               GNUNET_i2s (peer));
2172 #endif
2173       return;
2174     }
2175
2176 #if DEBUG_CORE
2177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178               "Received new `%s' message for `%4s', initiating key exchange.\n",
2179               "HELLO",
2180               GNUNET_i2s (peer));
2181 #endif
2182   n->public_key =
2183     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2184   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2185     {
2186       GNUNET_free (n->public_key);
2187       n->public_key = NULL;
2188 #if DEBUG_CORE
2189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2190               "GNUNET_HELLO_get_key returned awfully\n");
2191 #endif
2192       return;
2193     }
2194 }
2195
2196
2197 /**
2198  * Send our key (and encrypted PING) to the other peer.
2199  *
2200  * @param n the other peer
2201  */
2202 static void
2203 send_key (struct Neighbour *n)
2204 {
2205   struct SetKeyMessage *sm;
2206   struct MessageEntry *me;
2207   struct PingMessage pp;
2208   struct PingMessage *pm;
2209
2210   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2211        (n->pitr != NULL) )
2212     {
2213 #if DEBUG_CORE
2214       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215                   "Key exchange in progress with `%4s'.\n",
2216                   GNUNET_i2s (&n->peer));
2217 #endif
2218       return; /* already in progress */
2219     }
2220
2221 #if DEBUG_CORE
2222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2223               "Asked to perform key exchange with `%4s'.\n",
2224               GNUNET_i2s (&n->peer));
2225 #endif
2226   if (n->public_key == NULL)
2227     {
2228       /* lookup n's public key, then try again */
2229 #if DEBUG_CORE
2230       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2231                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2232                   GNUNET_i2s (&n->peer));
2233 #endif
2234       GNUNET_assert (n->pitr == NULL);
2235       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2236                                          sched,
2237                                          &n->peer,
2238                                          0,
2239                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2240                                          &process_hello_retry_send_key, n);
2241       return;
2242     }
2243   /* first, set key message */
2244   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2245                       sizeof (struct SetKeyMessage));
2246   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2247   me->priority = SET_KEY_PRIORITY;
2248   me->size = sizeof (struct SetKeyMessage);
2249   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2250                                      n->encrypted_tail,
2251                                      n->encrypted_tail,
2252                                      me);
2253   sm = (struct SetKeyMessage *) &me[1];
2254   sm->header.size = htons (sizeof (struct SetKeyMessage));
2255   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2256   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2257                                         PEER_STATE_KEY_SENT : n->status));
2258   sm->purpose.size =
2259     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2260            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2261            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2262            sizeof (struct GNUNET_PeerIdentity));
2263   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2264   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2265   sm->target = n->peer;
2266   GNUNET_assert (GNUNET_OK ==
2267                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2268                                             sizeof (struct
2269                                                     GNUNET_CRYPTO_AesSessionKey),
2270                                             n->public_key,
2271                                             &sm->encrypted_key));
2272   GNUNET_assert (GNUNET_OK ==
2273                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2274                                          &sm->signature));
2275
2276   /* second, encrypted PING message */
2277   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2278                       sizeof (struct PingMessage));
2279   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2280   me->priority = PING_PRIORITY;
2281   me->size = sizeof (struct PingMessage);
2282   n->encrypted_tail->next = me;
2283   n->encrypted_tail = me;
2284   pm = (struct PingMessage *) &me[1];
2285   pm->header.size = htons (sizeof (struct PingMessage));
2286   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2287   pp.challenge = htonl (n->ping_challenge);
2288   pp.target = n->peer;
2289 #if DEBUG_CORE
2290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2292               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2294               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2295               "PING",
2296               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2297 #endif
2298   do_encrypt (n,
2299               &n->peer.hashPubKey,
2300               &pp.challenge,
2301               &pm->challenge,
2302               sizeof (struct PingMessage) -
2303               sizeof (struct GNUNET_MessageHeader));
2304   /* update status */
2305   switch (n->status)
2306     {
2307     case PEER_STATE_DOWN:
2308       n->status = PEER_STATE_KEY_SENT;
2309       break;
2310     case PEER_STATE_KEY_SENT:
2311       break;
2312     case PEER_STATE_KEY_RECEIVED:
2313       break;
2314     case PEER_STATE_KEY_CONFIRMED:
2315       break;
2316     default:
2317       GNUNET_break (0);
2318       break;
2319     }
2320 #if DEBUG_CORE
2321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322               "Have %llu ms left for `%s' transmission.\n",
2323               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2324               "SET_KEY");
2325 #endif
2326   /* trigger queue processing */
2327   process_encrypted_neighbour_queue (n);
2328   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2329        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2330     n->retry_set_key_task
2331       = GNUNET_SCHEDULER_add_delayed (sched,
2332                                       n->set_key_retry_frequency,
2333                                       &set_key_retry_task, n);    
2334 }
2335
2336
2337 /**
2338  * We received a SET_KEY message.  Validate and update
2339  * our key material and status.
2340  *
2341  * @param n the neighbour from which we received message m
2342  * @param m the set key message we received
2343  */
2344 static void
2345 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2346
2347
2348 /**
2349  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2350  * the neighbour's struct and retry handling the set_key message.  Or,
2351  * if we did not get a HELLO, just free the set key message.
2352  *
2353  * @param cls pointer to the set key message
2354  * @param peer the peer for which this is the HELLO
2355  * @param hello HELLO message of that peer
2356  * @param trust amount of trust we currently have in that peer
2357  */
2358 static void
2359 process_hello_retry_handle_set_key (void *cls,
2360                                     const struct GNUNET_PeerIdentity *peer,
2361                                     const struct GNUNET_HELLO_Message *hello,
2362                                     uint32_t trust)
2363 {
2364   struct Neighbour *n = cls;
2365   struct SetKeyMessage *sm = n->skm;
2366
2367   if (peer == NULL)
2368     {
2369       GNUNET_free (sm);
2370       n->skm = NULL;
2371       n->pitr = NULL;
2372       return;
2373     }
2374   if (n->public_key != NULL)
2375     return;                     /* multiple HELLOs match!? */
2376   n->public_key =
2377     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2378   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2379     {
2380       GNUNET_break_op (0);
2381       GNUNET_free (n->public_key);
2382       n->public_key = NULL;
2383       return;
2384     }
2385 #if DEBUG_CORE
2386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2387               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2388               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2389 #endif
2390   handle_set_key (n, sm);
2391 }
2392
2393
2394 /**
2395  * We received a PING message.  Validate and transmit
2396  * PONG.
2397  *
2398  * @param n sender of the PING
2399  * @param m the encrypted PING message itself
2400  */
2401 static void
2402 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2403 {
2404   struct PingMessage t;
2405   struct PingMessage *tp;
2406   struct MessageEntry *me;
2407
2408 #if DEBUG_CORE
2409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2410               "Core service receives `%s' request from `%4s'.\n",
2411               "PING", GNUNET_i2s (&n->peer));
2412 #endif
2413   if (GNUNET_OK !=
2414       do_decrypt (n,
2415                   &my_identity.hashPubKey,
2416                   &m->challenge,
2417                   &t.challenge,
2418                   sizeof (struct PingMessage) -
2419                   sizeof (struct GNUNET_MessageHeader)))
2420     return;
2421 #if DEBUG_CORE
2422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2423               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2424               "PING",
2425               GNUNET_i2s (&t.target),
2426               ntohl (t.challenge), n->decrypt_key.crc32);
2427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2428               "Target of `%s' request is `%4s'.\n",
2429               "PING", GNUNET_i2s (&t.target));
2430 #endif
2431   if (0 != memcmp (&t.target,
2432                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2433     {
2434       GNUNET_break_op (0);
2435       return;
2436     }
2437   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2438                       sizeof (struct PingMessage));
2439   GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
2440                                      n->encrypted_tail,
2441                                      n->encrypted_tail,
2442                                      me);
2443   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2444   me->priority = PONG_PRIORITY;
2445   me->size = sizeof (struct PingMessage);
2446   tp = (struct PingMessage *) &me[1];
2447   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2448   tp->header.size = htons (sizeof (struct PingMessage));
2449   do_encrypt (n,
2450               &my_identity.hashPubKey,
2451               &t.challenge,
2452               &tp->challenge,
2453               sizeof (struct PingMessage) -
2454               sizeof (struct GNUNET_MessageHeader));
2455 #if DEBUG_CORE
2456   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2457               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2458               ntohl (t.challenge), n->encrypt_key.crc32);
2459 #endif
2460   /* trigger queue processing */
2461   process_encrypted_neighbour_queue (n);
2462 }
2463
2464
2465 /**
2466  * We received a PONG message.  Validate and update our status.
2467  *
2468  * @param n sender of the PONG
2469  * @param m the encrypted PONG message itself
2470  */
2471 static void
2472 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2473 {
2474   struct PingMessage t;
2475   struct ConnectNotifyMessage cnm;
2476
2477 #if DEBUG_CORE
2478   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2479               "Core service receives `%s' request from `%4s'.\n",
2480               "PONG", GNUNET_i2s (&n->peer));
2481 #endif
2482   if (GNUNET_OK !=
2483       do_decrypt (n,
2484                   &n->peer.hashPubKey,
2485                   &m->challenge,
2486                   &t.challenge,
2487                   sizeof (struct PingMessage) -
2488                   sizeof (struct GNUNET_MessageHeader)))
2489     return;
2490 #if DEBUG_CORE
2491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2492               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2493               "PONG",
2494               GNUNET_i2s (&t.target),
2495               ntohl (t.challenge), n->decrypt_key.crc32);
2496 #endif
2497   if ((0 != memcmp (&t.target,
2498                     &n->peer,
2499                     sizeof (struct GNUNET_PeerIdentity))) ||
2500       (n->ping_challenge != ntohl (t.challenge)))
2501     {
2502       /* PONG malformed */
2503 #if DEBUG_CORE
2504       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2505                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2506                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2508                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2509                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2510 #endif
2511       GNUNET_break_op (0);
2512       return;
2513     }
2514   switch (n->status)
2515     {
2516     case PEER_STATE_DOWN:
2517       GNUNET_break (0);         /* should be impossible */
2518       return;
2519     case PEER_STATE_KEY_SENT:
2520       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2521       return;
2522     case PEER_STATE_KEY_RECEIVED:
2523       n->status = PEER_STATE_KEY_CONFIRMED;
2524 #if DEBUG_CORE
2525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526                   "Confirmed key via `%s' message for peer `%4s'\n",
2527                   "PONG", GNUNET_i2s (&n->peer));
2528 #endif
2529       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2530         {
2531           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2532           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2533         }      
2534       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2535       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2536       cnm.distance = htonl (n->last_distance);
2537       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2538       cnm.peer = n->peer;
2539       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2540       process_encrypted_neighbour_queue (n);
2541       break;
2542     case PEER_STATE_KEY_CONFIRMED:
2543       /* duplicate PONG? */
2544       break;
2545     default:
2546       GNUNET_break (0);
2547       break;
2548     }
2549 }
2550
2551
2552 /**
2553  * We received a SET_KEY message.  Validate and update
2554  * our key material and status.
2555  *
2556  * @param n the neighbour from which we received message m
2557  * @param m the set key message we received
2558  */
2559 static void
2560 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2561 {
2562   struct SetKeyMessage *m_cpy;
2563   struct GNUNET_TIME_Absolute t;
2564   struct GNUNET_CRYPTO_AesSessionKey k;
2565   struct PingMessage *ping;
2566   struct PingMessage *pong;
2567   enum PeerStateMachine sender_status;
2568
2569 #if DEBUG_CORE
2570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2571               "Core service receives `%s' request from `%4s'.\n",
2572               "SET_KEY", GNUNET_i2s (&n->peer));
2573 #endif
2574   if (n->public_key == NULL)
2575     {
2576       if (n->pitr != NULL)
2577         {
2578 #if DEBUG_CORE
2579           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2580                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2581                       "SET_KEY");
2582 #endif
2583           return;
2584         }
2585 #if DEBUG_CORE
2586       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2588 #endif
2589       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2590       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2591       /* lookup n's public key, then try again */
2592       GNUNET_assert (n->skm == NULL);
2593       n->skm = m_cpy;
2594       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2595                                          sched,
2596                                          &n->peer,
2597                                          0,
2598                                          GNUNET_TIME_UNIT_MINUTES,
2599                                          &process_hello_retry_handle_set_key, n);
2600       return;
2601     }
2602   if (0 != memcmp (&m->target,
2603                    &my_identity,
2604                    sizeof (struct GNUNET_PeerIdentity)))
2605     {
2606       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2607                   _("Received `%s' message that was not for me.  Ignoring.\n"),
2608                   "SET_KEY");
2609       return;
2610     }
2611   if ((ntohl (m->purpose.size) !=
2612        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2613        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2614        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2615        sizeof (struct GNUNET_PeerIdentity)) ||
2616       (GNUNET_OK !=
2617        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2618                                  &m->purpose, &m->signature, n->public_key)))
2619     {
2620       /* invalid signature */
2621       GNUNET_break_op (0);
2622       return;
2623     }
2624   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2625   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2626        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2627       (t.value < n->decrypt_key_created.value))
2628     {
2629       /* this could rarely happen due to massive re-ordering of
2630          messages on the network level, but is most likely either
2631          a bug or some adversary messing with us.  Report. */
2632       GNUNET_break_op (0);
2633       return;
2634     }
2635 #if DEBUG_CORE
2636   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2637 #endif  
2638   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2639                                   &m->encrypted_key,
2640                                   &k,
2641                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2642        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2643       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2644     {
2645       /* failed to decrypt !? */
2646       GNUNET_break_op (0);
2647       return;
2648     }
2649
2650   n->decrypt_key = k;
2651   if (n->decrypt_key_created.value != t.value)
2652     {
2653       /* fresh key, reset sequence numbers */
2654       n->last_sequence_number_received = 0;
2655       n->last_packets_bitmap = 0;
2656       n->decrypt_key_created = t;
2657     }
2658   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2659   switch (n->status)
2660     {
2661     case PEER_STATE_DOWN:
2662       n->status = PEER_STATE_KEY_RECEIVED;
2663 #if DEBUG_CORE
2664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2665                   "Responding to `%s' with my own key.\n", "SET_KEY");
2666 #endif
2667       send_key (n);
2668       break;
2669     case PEER_STATE_KEY_SENT:
2670     case PEER_STATE_KEY_RECEIVED:
2671       n->status = PEER_STATE_KEY_RECEIVED;
2672       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2673           (sender_status != PEER_STATE_KEY_CONFIRMED))
2674         {
2675 #if DEBUG_CORE
2676           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2677                       "Responding to `%s' with my own key (other peer has status %u).\n",
2678                       "SET_KEY", sender_status);
2679 #endif
2680           send_key (n);
2681         }
2682       break;
2683     case PEER_STATE_KEY_CONFIRMED:
2684       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2685           (sender_status != PEER_STATE_KEY_CONFIRMED))
2686         {         
2687 #if DEBUG_CORE
2688           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2689                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2690                       "SET_KEY", sender_status);
2691 #endif
2692           send_key (n);
2693         }
2694       break;
2695     default:
2696       GNUNET_break (0);
2697       break;
2698     }
2699   if (n->pending_ping != NULL)
2700     {
2701       ping = n->pending_ping;
2702       n->pending_ping = NULL;
2703       handle_ping (n, ping);
2704       GNUNET_free (ping);
2705     }
2706   if (n->pending_pong != NULL)
2707     {
2708       pong = n->pending_pong;
2709       n->pending_pong = NULL;
2710       handle_pong (n, pong);
2711       GNUNET_free (pong);
2712     }
2713 }
2714
2715
2716 /**
2717  * Send a P2P message to a client.
2718  *
2719  * @param sender who sent us the message?
2720  * @param client who should we give the message to?
2721  * @param m contains the message to transmit
2722  * @param msize number of bytes in buf to transmit
2723  */
2724 static void
2725 send_p2p_message_to_client (struct Neighbour *sender,
2726                             struct Client *client,
2727                             const void *m, size_t msize)
2728 {
2729   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2730   struct NotifyTrafficMessage *ntm;
2731
2732 #if DEBUG_CORE
2733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2734               "Core service passes message from `%4s' of type %u to client.\n",
2735               GNUNET_i2s(&sender->peer),
2736               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2737 #endif
2738   ntm = (struct NotifyTrafficMessage *) buf;
2739   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2740   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2741   ntm->distance = htonl (sender->last_distance);
2742   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2743   ntm->peer = sender->peer;
2744   memcpy (&ntm[1], m, msize);
2745   send_to_client (client, &ntm->header, GNUNET_YES);
2746 }
2747
2748
2749 /**
2750  * Deliver P2P message to interested clients.
2751  *
2752  * @param sender who sent us the message?
2753  * @param m the message
2754  * @param msize size of the message (including header)
2755  */
2756 static void
2757 deliver_message (struct Neighbour *sender,
2758                  const struct GNUNET_MessageHeader *m, size_t msize)
2759 {
2760   struct Client *cpos;
2761   uint16_t type;
2762   unsigned int tpos;
2763   int deliver_full;
2764
2765   type = ntohs (m->type);
2766 #if DEBUG_HANDSHAKE
2767   fprintf (stderr,
2768            "Received encapsulated message of type %u from `%4s'\n",
2769            type,
2770            GNUNET_i2s (&sender->peer));
2771 #endif
2772   cpos = clients;
2773   while (cpos != NULL)
2774     {
2775       deliver_full = GNUNET_NO;
2776       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2777         deliver_full = GNUNET_YES;
2778       else
2779         {
2780           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2781             {
2782               if (type != cpos->types[tpos])
2783                 continue;
2784               deliver_full = GNUNET_YES;
2785               break;
2786             }
2787         }
2788       if (GNUNET_YES == deliver_full)
2789         send_p2p_message_to_client (sender, cpos, m, msize);
2790       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2791         send_p2p_message_to_client (sender, cpos, m,
2792                                     sizeof (struct GNUNET_MessageHeader));
2793       cpos = cpos->next;
2794     }
2795 }
2796
2797
2798 /**
2799  * Align P2P message and then deliver to interested clients.
2800  *
2801  * @param sender who sent us the message?
2802  * @param buffer unaligned (!) buffer containing message
2803  * @param msize size of the message (including header)
2804  */
2805 static void
2806 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2807 {
2808   char abuf[msize];
2809
2810   /* TODO: call to statistics? */
2811   memcpy (abuf, buffer, msize);
2812   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2813 }
2814
2815
2816 /**
2817  * Deliver P2P messages to interested clients.
2818  *
2819  * @param sender who sent us the message?
2820  * @param buffer buffer containing messages, can be modified
2821  * @param buffer_size size of the buffer (overall)
2822  * @param offset offset where messages in the buffer start
2823  */
2824 static void
2825 deliver_messages (struct Neighbour *sender,
2826                   const char *buffer, size_t buffer_size, size_t offset)
2827 {
2828   struct GNUNET_MessageHeader *mhp;
2829   struct GNUNET_MessageHeader mh;
2830   uint16_t msize;
2831   int need_align;
2832
2833   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2834     {
2835       if (0 != offset % sizeof (uint16_t))
2836         {
2837           /* outch, need to copy to access header */
2838           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2839           mhp = &mh;
2840         }
2841       else
2842         {
2843           /* can access header directly */
2844           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2845         }
2846       msize = ntohs (mhp->size);
2847       if (msize + offset > buffer_size)
2848         {
2849           /* malformed message, header says it is larger than what
2850              would fit into the overall buffer */
2851           GNUNET_break_op (0);
2852           break;
2853         }
2854 #if HAVE_UNALIGNED_64_ACCESS
2855       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2856 #else
2857       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2858 #endif
2859       if (GNUNET_YES == need_align)
2860         align_and_deliver (sender, &buffer[offset], msize);
2861       else
2862         deliver_message (sender,
2863                          (const struct GNUNET_MessageHeader *)
2864                          &buffer[offset], msize);
2865       offset += msize;
2866     }
2867 }
2868
2869
2870 /**
2871  * We received an encrypted message.  Decrypt, validate and
2872  * pass on to the appropriate clients.
2873  */
2874 static void
2875 handle_encrypted_message (struct Neighbour *n,
2876                           const struct EncryptedMessage *m)
2877 {
2878   size_t size = ntohs (m->header.size);
2879   char buf[size];
2880   struct EncryptedMessage *pt;  /* plaintext */
2881   GNUNET_HashCode ph;
2882   size_t off;
2883   uint32_t snum;
2884   struct GNUNET_TIME_Absolute t;
2885
2886 #if DEBUG_CORE
2887   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2888               "Core service receives `%s' request from `%4s'.\n",
2889               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2890 #endif  
2891   /* decrypt */
2892   if (GNUNET_OK !=
2893       do_decrypt (n,
2894                   &m->plaintext_hash,
2895                   &m->sequence_number,
2896                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2897     return;
2898   pt = (struct EncryptedMessage *) buf;
2899
2900   /* validate hash */
2901   GNUNET_CRYPTO_hash (&pt->sequence_number,
2902                       size - ENCRYPTED_HEADER_SIZE, &ph);
2903   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2904     {
2905       /* checksum failed */
2906       GNUNET_break_op (0);
2907       return;
2908     }
2909
2910   /* validate sequence number */
2911   snum = ntohl (pt->sequence_number);
2912   if (n->last_sequence_number_received == snum)
2913     {
2914       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2915                   "Received duplicate message, ignoring.\n");
2916       /* duplicate, ignore */
2917       return;
2918     }
2919   if ((n->last_sequence_number_received > snum) &&
2920       (n->last_sequence_number_received - snum > 32))
2921     {
2922       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2923                   "Received ancient out of sequence message, ignoring.\n");
2924       /* ancient out of sequence, ignore */
2925       return;
2926     }
2927   if (n->last_sequence_number_received > snum)
2928     {
2929       unsigned int rotbit =
2930         1 << (n->last_sequence_number_received - snum - 1);
2931       if ((n->last_packets_bitmap & rotbit) != 0)
2932         {
2933           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2934                       "Received duplicate message, ignoring.\n");
2935           /* duplicate, ignore */
2936           return;
2937         }
2938       n->last_packets_bitmap |= rotbit;
2939     }
2940   if (n->last_sequence_number_received < snum)
2941     {
2942       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2943       n->last_sequence_number_received = snum;
2944     }
2945
2946   /* check timestamp */
2947   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2948   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2949     {
2950       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2951                   _
2952                   ("Message received far too old (%llu ms). Content ignored.\n"),
2953                   GNUNET_TIME_absolute_get_duration (t).value);
2954       return;
2955     }
2956
2957   /* process decrypted message(s) */
2958   update_window (GNUNET_YES,
2959                  &n->available_send_window,
2960                  &n->last_asw_update,
2961                  n->bpm_out);
2962   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2963   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2964                            n->bpm_out_internal_limit);
2965   n->last_activity = GNUNET_TIME_absolute_get ();
2966   off = sizeof (struct EncryptedMessage);
2967   deliver_messages (n, buf, size, off);
2968 }
2969
2970
2971 /**
2972  * Function called by the transport for each received message.
2973  *
2974  * @param cls closure
2975  * @param peer (claimed) identity of the other peer
2976  * @param message the message
2977  * @param latency estimated latency for communicating with the
2978  *             given peer (round-trip)
2979  * @param distance in overlay hops, as given by transport plugin
2980  */
2981 static void
2982 handle_transport_receive (void *cls,
2983                           const struct GNUNET_PeerIdentity *peer,
2984                           const struct GNUNET_MessageHeader *message,
2985                           struct GNUNET_TIME_Relative latency,
2986                           unsigned int distance)
2987 {
2988   struct Neighbour *n;
2989   struct GNUNET_TIME_Absolute now;
2990   int up;
2991   uint16_t type;
2992   uint16_t size;
2993
2994 #if DEBUG_CORE
2995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2996               "Received message of type %u from `%4s', demultiplexing.\n",
2997               ntohs (message->type), GNUNET_i2s (peer));
2998 #endif
2999   n = find_neighbour (peer);
3000   if (n == NULL)
3001     n = create_neighbour (peer);
3002   if (n == NULL)
3003     return;   
3004   n->last_latency = latency;
3005   n->last_distance = distance;
3006   up = (n->status == PEER_STATE_KEY_CONFIRMED);
3007   type = ntohs (message->type);
3008   size = ntohs (message->size);
3009 #if DEBUG_HANDSHAKE
3010   fprintf (stderr,
3011            "Received message of type %u from `%4s'\n",
3012            type,
3013            GNUNET_i2s (peer));
3014 #endif
3015   switch (type)
3016     {
3017     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
3018       if (size != sizeof (struct SetKeyMessage))
3019         {
3020           GNUNET_break_op (0);
3021           return;
3022         }
3023       handle_set_key (n, (const struct SetKeyMessage *) message);
3024       break;
3025     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
3026       if (size < sizeof (struct EncryptedMessage) +
3027           sizeof (struct GNUNET_MessageHeader))
3028         {
3029           GNUNET_break_op (0);
3030           return;
3031         }
3032       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3033           (n->status != PEER_STATE_KEY_CONFIRMED))
3034         {
3035           GNUNET_break_op (0);
3036           return;
3037         }
3038       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3039       break;
3040     case GNUNET_MESSAGE_TYPE_CORE_PING:
3041       if (size != sizeof (struct PingMessage))
3042         {
3043           GNUNET_break_op (0);
3044           return;
3045         }
3046       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3047           (n->status != PEER_STATE_KEY_CONFIRMED))
3048         {
3049 #if DEBUG_CORE
3050           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3051                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3052                       "PING", GNUNET_i2s (&n->peer));
3053 #endif
3054           GNUNET_free_non_null (n->pending_ping);
3055           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3056           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3057           return;
3058         }
3059       handle_ping (n, (const struct PingMessage *) message);
3060       break;
3061     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3062       if (size != sizeof (struct PingMessage))
3063         {
3064           GNUNET_break_op (0);
3065           return;
3066         }
3067       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3068            (n->status != PEER_STATE_KEY_CONFIRMED) )
3069         {
3070 #if DEBUG_CORE
3071           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3072                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3073                       "PONG", GNUNET_i2s (&n->peer));
3074 #endif
3075           GNUNET_free_non_null (n->pending_pong);
3076           n->pending_pong = GNUNET_malloc (sizeof (struct PingMessage));
3077           memcpy (n->pending_pong, message, sizeof (struct PingMessage));
3078           return;
3079         }
3080       handle_pong (n, (const struct PingMessage *) message);
3081       break;
3082     default:
3083       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3084                   _("Unsupported message of type %u received.\n"), type);
3085       return;
3086     }
3087   if (n->status == PEER_STATE_KEY_CONFIRMED)
3088     {
3089       now = GNUNET_TIME_absolute_get ();
3090       n->last_activity = now;
3091       if (!up)
3092         n->time_established = now;
3093     }
3094 }
3095
3096
3097 /**
3098  * Function that recalculates the bandwidth quota for the
3099  * given neighbour and transmits it to the transport service.
3100  * 
3101  * @param cls neighbour for the quota update
3102  * @param tc context
3103  */
3104 static void
3105 neighbour_quota_update (void *cls,
3106                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3107 {
3108   struct Neighbour *n = cls;
3109   uint32_t q_in;
3110   double pref_rel;
3111   double share;
3112   unsigned long long distributable;
3113   
3114   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3115   /* calculate relative preference among all neighbours;
3116      divides by a bit more to avoid division by zero AND to
3117      account for possibility of new neighbours joining any time 
3118      AND to convert to double... */
3119   pref_rel = n->current_preference / (1.0 + preference_sum);
3120   distributable = 0;
3121   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
3122     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
3123   share = distributable * pref_rel;
3124   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
3125   /* check if we want to disconnect for good due to inactivity */
3126   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3127        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3128     {
3129 #if DEBUG_CORE
3130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3131               "Forcing disconnect of `%4s' due to inactivity (?).\n",
3132               GNUNET_i2s (&n->peer));
3133 #endif
3134       q_in = 0; /* force disconnect */
3135     }
3136   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
3137        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
3138     {
3139       n->bpm_in = q_in;
3140       GNUNET_TRANSPORT_set_quota (transport,
3141                                   &n->peer,
3142                                   n->bpm_in, 
3143                                   n->bpm_out,
3144                                   GNUNET_TIME_UNIT_FOREVER_REL,
3145                                   NULL, NULL);
3146     }
3147   schedule_quota_update (n);
3148 }
3149
3150
3151 /**
3152  * Function called by transport to notify us that
3153  * a peer connected to us (on the network level).
3154  *
3155  * @param cls closure
3156  * @param peer the peer that connected
3157  * @param latency current latency of the connection
3158  * @param distance in overlay hops, as given by transport plugin
3159  */
3160 static void
3161 handle_transport_notify_connect (void *cls,
3162                                  const struct GNUNET_PeerIdentity *peer,
3163                                  struct GNUNET_TIME_Relative latency,
3164                                  unsigned int distance)
3165 {
3166   struct Neighbour *n;
3167   struct GNUNET_TIME_Absolute now;
3168   struct ConnectNotifyMessage cnm;
3169
3170   n = find_neighbour (peer);
3171   if (n != NULL)
3172     {
3173       if (n->is_connected)
3174         {
3175           /* duplicate connect notification!? */
3176           GNUNET_break (0);
3177           return;
3178         }
3179     }
3180   else
3181     {
3182       n = create_neighbour (peer);
3183     }
3184   now = GNUNET_TIME_absolute_get ();
3185   n->is_connected = GNUNET_YES;      
3186   n->last_latency = latency;
3187   n->last_distance = distance;
3188   n->last_asw_update = now;
3189   n->last_arw_update = now;
3190 #if DEBUG_CORE
3191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3192               "Received connection from `%4s'.\n",
3193               GNUNET_i2s (&n->peer));
3194 #endif
3195   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3196   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3197   cnm.distance = htonl (n->last_distance);
3198   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3199   cnm.peer = *peer;
3200   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3201   send_key (n);
3202 }
3203
3204
3205 /**
3206  * Function called by transport telling us that a peer
3207  * disconnected.
3208  *
3209  * @param cls closure
3210  * @param peer the peer that disconnected
3211  */
3212 static void
3213 handle_transport_notify_disconnect (void *cls,
3214                                     const struct GNUNET_PeerIdentity *peer)
3215 {
3216   struct DisconnectNotifyMessage cnm;
3217   struct Neighbour *n;
3218
3219 #if DEBUG_CORE
3220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3221               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3222 #endif
3223   n = find_neighbour (peer);
3224   if (n == NULL)
3225     {
3226       GNUNET_break (0);
3227       return;
3228     }
3229   GNUNET_break (n->is_connected);
3230   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3231   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3232   cnm.peer = *peer;
3233   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3234   n->is_connected = GNUNET_NO;
3235 }
3236
3237
3238 /**
3239  * Last task run during shutdown.  Disconnects us from
3240  * the transport.
3241  */
3242 static void
3243 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3244 {
3245   struct Neighbour *n;
3246   struct Client *c;
3247
3248 #if DEBUG_CORE
3249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3250               "Core service shutting down.\n");
3251 #endif
3252   GNUNET_assert (transport != NULL);
3253   GNUNET_TRANSPORT_disconnect (transport);
3254   transport = NULL;
3255   while (NULL != (n = neighbours))
3256     {
3257       neighbours = n->next;
3258       GNUNET_assert (neighbour_count > 0);
3259       neighbour_count--;
3260       free_neighbour (n);
3261     }
3262   GNUNET_SERVER_notification_context_destroy (notifier);
3263   notifier = NULL;
3264   while (NULL != (c = clients))
3265     handle_client_disconnect (NULL, c->client_handle);
3266   if (my_private_key != NULL)
3267     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3268 }
3269
3270
3271 /**
3272  * Initiate core service.
3273  *
3274  * @param cls closure
3275  * @param s scheduler to use
3276  * @param serv the initialized server
3277  * @param c configuration to use
3278  */
3279 static void
3280 run (void *cls,
3281      struct GNUNET_SCHEDULER_Handle *s,
3282      struct GNUNET_SERVER_Handle *serv,
3283      const struct GNUNET_CONFIGURATION_Handle *c)
3284 {
3285 #if 0
3286   unsigned long long qin;
3287   unsigned long long qout;
3288   unsigned long long tneigh;
3289 #endif
3290   char *keyfile;
3291
3292   sched = s;
3293   cfg = c;  
3294   /* parse configuration */
3295   if (
3296        (GNUNET_OK !=
3297         GNUNET_CONFIGURATION_get_value_number (c,
3298                                                "CORE",
3299                                                "TOTAL_QUOTA_IN",
3300                                                &bandwidth_target_in)) ||
3301        (GNUNET_OK !=
3302         GNUNET_CONFIGURATION_get_value_number (c,
3303                                                "CORE",
3304                                                "TOTAL_QUOTA_OUT",
3305                                                &bandwidth_target_out)) ||
3306        (GNUNET_OK !=
3307         GNUNET_CONFIGURATION_get_value_filename (c,
3308                                                  "GNUNETD",
3309                                                  "HOSTKEY", &keyfile)))
3310     {
3311       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3312                   _
3313                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3314       GNUNET_SCHEDULER_shutdown (s);
3315       return;
3316     }
3317   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3318   GNUNET_free (keyfile);
3319   if (my_private_key == NULL)
3320     {
3321       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3322                   _("Core service could not access hostkey.  Exiting.\n"));
3323       GNUNET_SCHEDULER_shutdown (s);
3324       return;
3325     }
3326   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3327   GNUNET_CRYPTO_hash (&my_public_key,
3328                       sizeof (my_public_key), &my_identity.hashPubKey);
3329   /* setup notification */
3330   server = serv;
3331   notifier = GNUNET_SERVER_notification_context_create (server, 
3332                                                         MAX_NOTIFY_QUEUE);
3333   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3334   /* setup transport connection */
3335   transport = GNUNET_TRANSPORT_connect (sched,
3336                                         cfg,
3337                                         NULL,
3338                                         &handle_transport_receive,
3339                                         &handle_transport_notify_connect,
3340                                         &handle_transport_notify_disconnect);
3341   GNUNET_assert (NULL != transport);
3342   GNUNET_SCHEDULER_add_delayed (sched,
3343                                 GNUNET_TIME_UNIT_FOREVER_REL,
3344                                 &cleaning_task, NULL);
3345   /* process client requests */
3346   GNUNET_SERVER_add_handlers (server, handlers);
3347   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3348               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3349 }
3350
3351
3352
3353 /**
3354  * The main function for the transport service.
3355  *
3356  * @param argc number of arguments from the command line
3357  * @param argv command line arguments
3358  * @return 0 ok, 1 on error
3359  */
3360 int
3361 main (int argc, char *const *argv)
3362 {
3363   return (GNUNET_OK ==
3364           GNUNET_SERVICE_run (argc,
3365                               argv,
3366                               "core",
3367                               GNUNET_SERVICE_OPTION_NONE,
3368                               &run, NULL)) ? 0 : 1;
3369 }
3370
3371 /* end of gnunet-service-core.c */