clean up
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 #define DEBUG_HANDSHAKE GNUNET_NO
45
46 /**
47  * Receive and send buffer windows grow over time.  For
48  * how long can 'unused' bandwidth accumulate before we
49  * need to cap it?  (specified in ms).
50  */
51 #define MAX_WINDOW_TIME (5 * 60 * 1000)
52
53 /**
54  * How many messages do we queue up at most for optional
55  * notifications to a client?  (this can cause notifications
56  * about outgoing messages to be dropped).
57  */
58 #define MAX_NOTIFY_QUEUE 16
59
60 /**
61  * Minimum of bytes per minute (out) to assign to any connected peer.
62  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
63  * sense.
64  */
65 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
66
67 /**
68  * What is the smallest change (in number of bytes per minute)
69  * that we consider significant enough to bother triggering?
70  */
71 #define MIN_BPM_CHANGE 32
72
73 /**
74  * After how much time past the "official" expiration time do
75  * we discard messages?  Should not be zero since we may 
76  * intentionally defer transmission until close to the deadline
77  * and then may be slightly past the deadline due to inaccuracy
78  * in sleep and our own CPU consumption.
79  */
80 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
81
82 /**
83  * What is the maximum delay for a SET_KEY message?
84  */
85 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
86
87 /**
88  * What how long do we wait for SET_KEY confirmation initially?
89  */
90 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
91
92 /**
93  * What is the maximum delay for a PING message?
94  */
95 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
96
97 /**
98  * What is the maximum delay for a PONG message?
99  */
100 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
101
102 /**
103  * How often do we recalculate bandwidth quotas?
104  */
105 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
106
107 /**
108  * What is the priority for a SET_KEY message?
109  */
110 #define SET_KEY_PRIORITY 0xFFFFFF
111
112 /**
113  * What is the priority for a PING message?
114  */
115 #define PING_PRIORITY 0xFFFFFF
116
117 /**
118  * What is the priority for a PONG message?
119  */
120 #define PONG_PRIORITY 0xFFFFFF
121
122 /**
123  * How many messages do we queue per peer at most?
124  */
125 #define MAX_PEER_QUEUE_SIZE 16
126
127 /**
128  * How many non-mandatory messages do we queue per client at most?
129  */
130 #define MAX_CLIENT_QUEUE_SIZE 32
131
132 /**
133  * What is the maximum age of a message for us to consider
134  * processing it?  Note that this looks at the timestamp used
135  * by the other peer, so clock skew between machines does
136  * come into play here.  So this should be picked high enough
137  * so that a little bit of clock skew does not prevent peers
138  * from connecting to us.
139  */
140 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
141
142 /**
143  * What is the maximum size for encrypted messages?  Note that this
144  * number imposes a clear limit on the maximum size of any message.
145  * Set to a value close to 64k but not so close that transports will
146  * have trouble with their headers.
147  */
148 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
149
150
151 /**
152  * State machine for our P2P encryption handshake.  Everyone starts in
153  * "DOWN", if we receive the other peer's key (other peer initiated)
154  * we start in state RECEIVED (since we will immediately send our
155  * own); otherwise we start in SENT.  If we get back a PONG from
156  * within either state, we move up to CONFIRMED (the PONG will always
157  * be sent back encrypted with the key we sent to the other peer).
158  */
159 enum PeerStateMachine
160 {
161   PEER_STATE_DOWN,
162   PEER_STATE_KEY_SENT,
163   PEER_STATE_KEY_RECEIVED,
164   PEER_STATE_KEY_CONFIRMED
165 };
166
167
168 /**
169  * Number of bytes (at the beginning) of "struct EncryptedMessage"
170  * that are NOT encrypted.
171  */
172 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
173
174
175 /**
176  * Encapsulation for encrypted messages exchanged between
177  * peers.  Followed by the actual encrypted data.
178  */
179 struct EncryptedMessage
180 {
181   /**
182    * Message type is either CORE_ENCRYPTED_MESSAGE.
183    */
184   struct GNUNET_MessageHeader header;
185
186   /**
187    * Always zero.
188    */
189   uint32_t reserved GNUNET_PACKED;
190
191   /**
192    * Hash of the plaintext, used to verify message integrity;
193    * ALSO used as the IV for the symmetric cipher!  Everything
194    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
195    * must be set to the offset of the next field.
196    */
197   GNUNET_HashCode plaintext_hash;
198
199   /**
200    * Sequence number, in network byte order.  This field
201    * must be the first encrypted/decrypted field and the
202    * first byte that is hashed for the plaintext hash.
203    */
204   uint32_t sequence_number GNUNET_PACKED;
205
206   /**
207    * Desired bandwidth (how much we should send to this
208    * peer / how much is the sender willing to receive),
209    * in bytes per minute.
210    */
211   uint32_t inbound_bpm_limit GNUNET_PACKED;
212
213   /**
214    * Timestamp.  Used to prevent reply of ancient messages
215    * (recent messages are caught with the sequence number).
216    */
217   struct GNUNET_TIME_AbsoluteNBO timestamp;
218
219 };
220
221 /**
222  * We're sending an (encrypted) PING to the other peer to check if he
223  * can decrypt.  The other peer should respond with a PONG with the
224  * same content, except this time encrypted with the receiver's key.
225  */
226 struct PingMessage
227 {
228   /**
229    * Message type is either CORE_PING or CORE_PONG.
230    */
231   struct GNUNET_MessageHeader header;
232
233   /**
234    * Random number chosen to make reply harder.
235    */
236   uint32_t challenge GNUNET_PACKED;
237
238   /**
239    * Intended target of the PING, used primarily to check
240    * that decryption actually worked.
241    */
242   struct GNUNET_PeerIdentity target;
243 };
244
245
246 /**
247  * Message transmitted to set (or update) a session key.
248  */
249 struct SetKeyMessage
250 {
251
252   /**
253    * Message type is either CORE_SET_KEY.
254    */
255   struct GNUNET_MessageHeader header;
256
257   /**
258    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
259    */
260   int32_t sender_status GNUNET_PACKED;
261
262   /**
263    * Purpose of the signature, will be
264    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
265    */
266   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
267
268   /**
269    * At what time was this key created?
270    */
271   struct GNUNET_TIME_AbsoluteNBO creation_time;
272
273   /**
274    * The encrypted session key.
275    */
276   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
277
278   /**
279    * Who is the intended recipient?
280    */
281   struct GNUNET_PeerIdentity target;
282
283   /**
284    * Signature of the stuff above (starting at purpose).
285    */
286   struct GNUNET_CRYPTO_RsaSignature signature;
287
288 };
289
290
291 /**
292  * Message waiting for transmission. This struct
293  * is followed by the actual content of the message.
294  */
295 struct MessageEntry
296 {
297
298   /**
299    * We keep messages in a linked list (for now).
300    */
301   struct MessageEntry *next;
302
303   /**
304    * By when are we supposed to transmit this message?
305    */
306   struct GNUNET_TIME_Absolute deadline;
307
308   /**
309    * How important is this message to us?
310    */
311   unsigned int priority;
312
313   /**
314    * How long is the message? (number of bytes following
315    * the "struct MessageEntry", but not including the
316    * size of "struct MessageEntry" itself!)
317    */
318   uint16_t size;
319
320   /**
321    * Was this message selected for transmission in the
322    * current round? GNUNET_YES or GNUNET_NO.
323    */
324   int8_t do_transmit;
325
326   /**
327    * Did we give this message some slack (delayed sending) previously
328    * (and hence should not give it any more slack)? GNUNET_YES or
329    * GNUNET_NO.
330    */
331   int8_t got_slack;
332
333 };
334
335
336 struct Neighbour
337 {
338   /**
339    * We keep neighbours in a linked list (for now).
340    */
341   struct Neighbour *next;
342
343   /**
344    * Unencrypted messages destined for this peer.
345    */
346   struct MessageEntry *messages;
347
348   /**
349    * Head of the batched, encrypted message queue (already ordered,
350    * transmit starting with the head).
351    */
352   struct MessageEntry *encrypted_head;
353
354   /**
355    * Tail of the batched, encrypted message queue (already ordered,
356    * append new messages to tail)
357    */
358   struct MessageEntry *encrypted_tail;
359
360   /**
361    * Handle for pending requests for transmission to this peer
362    * with the transport service.  NULL if no request is pending.
363    */
364   struct GNUNET_TRANSPORT_TransmitHandle *th;
365
366   /**
367    * Public key of the neighbour, NULL if we don't have it yet.
368    */
369   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
370
371   /**
372    * We received a PING message before we got the "public_key"
373    * (or the SET_KEY).  We keep it here until we have a key
374    * to decrypt it.  NULL if no PING is pending.
375    */
376   struct PingMessage *pending_ping;
377
378   /**
379    * We received a PONG message before we got the "public_key"
380    * (or the SET_KEY).  We keep it here until we have a key
381    * to decrypt it.  NULL if no PONG is pending.
382    */
383   struct PingMessage *pending_pong;
384
385   /**
386    * Non-NULL if we are currently looking up HELLOs for this peer.
387    * for this peer.
388    */
389   struct GNUNET_PEERINFO_IteratorContext *pitr;
390
391   /**
392    * SetKeyMessage to transmit, NULL if we are not currently trying
393    * to send one.
394    */
395   struct SetKeyMessage *skm;
396
397   /**
398    * Identity of the neighbour.
399    */
400   struct GNUNET_PeerIdentity peer;
401
402   /**
403    * Key we use to encrypt our messages for the other peer
404    * (initialized by us when we do the handshake).
405    */
406   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
407
408   /**
409    * Key we use to decrypt messages from the other peer
410    * (given to us by the other peer during the handshake).
411    */
412   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
413
414   /**
415    * ID of task used for re-trying plaintext scheduling.
416    */
417   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
418
419   /**
420    * ID of task used for re-trying SET_KEY and PING message.
421    */
422   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
423
424   /**
425    * ID of task used for updating bandwidth quota for this neighbour.
426    */
427   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
428
429   /**
430    * ID of task used for cleaning up dead neighbour entries.
431    */
432   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
433
434   /**
435    * At what time did we generate our encryption key?
436    */
437   struct GNUNET_TIME_Absolute encrypt_key_created;
438
439   /**
440    * At what time did the other peer generate the decryption key?
441    */
442   struct GNUNET_TIME_Absolute decrypt_key_created;
443
444   /**
445    * At what time did we initially establish (as in, complete session
446    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
447    */
448   struct GNUNET_TIME_Absolute time_established;
449
450   /**
451    * At what time did we last receive an encrypted message from the
452    * other peer?  Should be zero if status != KEY_CONFIRMED.
453    */
454   struct GNUNET_TIME_Absolute last_activity;
455
456   /**
457    * Last latency observed from this peer.
458    */
459   struct GNUNET_TIME_Relative last_latency;
460
461   /**
462    * At what frequency are we currently re-trying SET_KEY messages?
463    */
464   struct GNUNET_TIME_Relative set_key_retry_frequency;
465
466   /**
467    * Time of our last update to the "available_send_window".
468    */
469   struct GNUNET_TIME_Absolute last_asw_update;
470
471   /**
472    * Time of our last update to the "available_recv_window".
473    */
474   struct GNUNET_TIME_Absolute last_arw_update;
475
476   /**
477    * Number of bytes that we are eligible to transmit to this
478    * peer at this point.  Incremented every minute by max_out_bpm,
479    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
480    * bandwidth-hogs are sampled at a frequency of about 78s!);
481    * may get negative if we have VERY high priority content.
482    */
483   long long available_send_window; 
484
485   /**
486    * How much downstream capacity of this peer has been reserved for
487    * our traffic?  (Our clients can request that a certain amount of
488    * bandwidth is available for replies to them; this value is used to
489    * make sure that this reserved amount of bandwidth is actually
490    * available).
491    */
492   long long available_recv_window; 
493
494   /**
495    * How valueable were the messages of this peer recently?
496    */
497   unsigned long long current_preference;
498
499   /**
500    * Bit map indicating which of the 32 sequence numbers before the last
501    * were received (good for accepting out-of-order packets and
502    * estimating reliability of the connection)
503    */
504   unsigned int last_packets_bitmap;
505
506   /**
507    * last sequence number received on this connection (highest)
508    */
509   uint32_t last_sequence_number_received;
510
511   /**
512    * last sequence number transmitted
513    */
514   uint32_t last_sequence_number_sent;
515
516   /**
517    * Available bandwidth in for this peer (current target).
518    */
519   uint32_t bpm_in;
520
521   /**
522    * Available bandwidth out for this peer (current target).
523    */
524   uint32_t bpm_out;
525
526   /**
527    * Internal bandwidth limit set for this peer (initially
528    * typically set to "-1").  "bpm_out" is MAX of
529    * "bpm_out_internal_limit" and "bpm_out_external_limit".
530    */
531   uint32_t bpm_out_internal_limit;
532
533   /**
534    * External bandwidth limit set for this peer by the
535    * peer that we are communicating with.  "bpm_out" is MAX of
536    * "bpm_out_internal_limit" and "bpm_out_external_limit".
537    */
538   uint32_t bpm_out_external_limit;
539
540   /**
541    * What was our PING challenge number (for this peer)?
542    */
543   uint32_t ping_challenge;
544
545   /**
546    * What was the last distance to this peer as reported by the transports?
547    */
548   uint32_t last_distance;
549
550   /**
551    * What is our connection status?
552    */
553   enum PeerStateMachine status;
554
555   /**
556    * Are we currently connected to this neighbour?
557    */ 
558   int is_connected;
559 };
560
561
562 /**
563  * Data structure for each client connected to the core service.
564  */
565 struct Client
566 {
567   /**
568    * Clients are kept in a linked list.
569    */
570   struct Client *next;
571
572   /**
573    * Handle for the client with the server API.
574    */
575   struct GNUNET_SERVER_Client *client_handle;
576
577   /**
578    * Array of the types of messages this peer cares
579    * about (with "tcnt" entries).  Allocated as part
580    * of this client struct, do not free!
581    */
582   uint16_t *types;
583
584   /**
585    * Options for messages this client cares about,
586    * see GNUNET_CORE_OPTION_ values.
587    */
588   uint32_t options;
589
590   /**
591    * Number of types of incoming messages this client
592    * specifically cares about.  Size of the "types" array.
593    */
594   unsigned int tcnt;
595
596 };
597
598
599 /**
600  * Our public key.
601  */
602 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
603
604 /**
605  * Our identity.
606  */
607 static struct GNUNET_PeerIdentity my_identity;
608
609 /**
610  * Our private key.
611  */
612 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
613
614 /**
615  * Our scheduler.
616  */
617 struct GNUNET_SCHEDULER_Handle *sched;
618
619 /**
620  * Our configuration.
621  */
622 const struct GNUNET_CONFIGURATION_Handle *cfg;
623
624 /**
625  * Our server.
626  */
627 static struct GNUNET_SERVER_Handle *server;
628
629 /**
630  * Transport service.
631  */
632 static struct GNUNET_TRANSPORT_Handle *transport;
633
634 /**
635  * Linked list of our clients.
636  */
637 static struct Client *clients;
638
639 /**
640  * Context for notifications we need to send to our clients.
641  */
642 static struct GNUNET_SERVER_NotificationContext *notifier;
643
644 /**
645  * We keep neighbours in a linked list (for now).
646  */
647 static struct Neighbour *neighbours;
648
649 /**
650  * Sum of all preferences among all neighbours.
651  */
652 static unsigned long long preference_sum;
653
654 /**
655  * Total number of neighbours we have.
656  */
657 static unsigned int neighbour_count;
658
659 /**
660  * How much inbound bandwidth are we supposed to be using?
661  */
662 static unsigned long long bandwidth_target_in;
663
664 /**
665  * How much outbound bandwidth are we supposed to be using?
666  */
667 static unsigned long long bandwidth_target_out;
668
669
670
671 /**
672  * A preference value for a neighbour was update.  Update
673  * the preference sum accordingly.
674  *
675  * @param inc how much was a preference value increased?
676  */
677 static void
678 update_preference_sum (unsigned long long inc)
679 {
680   struct Neighbour *n;
681   unsigned long long os;
682
683   os = preference_sum;
684   preference_sum += inc;
685   if (preference_sum >= os)
686     return; /* done! */
687   /* overflow! compensate by cutting all values in half! */
688   preference_sum = 0;
689   n = neighbours;
690   while (n != NULL)
691     {
692       n->current_preference /= 2;
693       preference_sum += n->current_preference;
694       n = n->next;
695     }    
696 }
697
698
699 /**
700  * Recalculate the number of bytes we expect to
701  * receive or transmit in a given window.
702  *
703  * @param force force an update now (even if not much time has passed)
704  * @param window pointer to the byte counter (updated)
705  * @param ts pointer to the timestamp (updated)
706  * @param bpm number of bytes per minute that should
707  *        be added to the window.
708  */
709 static void
710 update_window (int force,
711                long long *window,
712                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
713 {
714   struct GNUNET_TIME_Relative since;
715
716   since = GNUNET_TIME_absolute_get_duration (*ts);
717   if ( (force == GNUNET_NO) &&
718        (since.value < 60 * 1000) )
719     return;                     /* not even a minute has passed */
720   *ts = GNUNET_TIME_absolute_get ();
721   *window += (bpm * since.value) / 60 / 1000;
722   if (*window > MAX_WINDOW_TIME * bpm)
723     *window = MAX_WINDOW_TIME * bpm;
724 }
725
726
727 /**
728  * Find the entry for the given neighbour.
729  *
730  * @param peer identity of the neighbour
731  * @return NULL if we are not connected, otherwise the
732  *         neighbour's entry.
733  */
734 static struct Neighbour *
735 find_neighbour (const struct GNUNET_PeerIdentity *peer)
736 {
737   struct Neighbour *ret;
738
739   ret = neighbours;
740   while ((ret != NULL) &&
741          (0 != memcmp (&ret->peer,
742                        peer, sizeof (struct GNUNET_PeerIdentity))))
743     ret = ret->next;
744   return ret;
745 }
746
747
748 /**
749  * Send a message to one of our clients.
750  *
751  * @param client target for the message
752  * @param msg message to transmit
753  * @param can_drop could this message be dropped if the
754  *        client's queue is getting too large?
755  */
756 static void
757 send_to_client (struct Client *client,
758                 const struct GNUNET_MessageHeader *msg, 
759                 int can_drop)
760 {
761 #if DEBUG_CORE_CLIENT
762   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763               "Preparing to send message of type %u to client.\n",
764               ntohs (msg->type));
765 #endif  
766   GNUNET_SERVER_notification_context_unicast (notifier,
767                                               client->client_handle,
768                                               msg,
769                                               can_drop);
770 }
771
772
773 /**
774  * Send a message to all of our current clients that have
775  * the right options set.
776  * 
777  * @param msg message to multicast
778  * @param can_drop can this message be discarded if the queue is too long
779  * @param options mask to use 
780  */
781 static void
782 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
783                      int can_drop,
784                      int options)
785 {
786   struct Client *c;
787
788   c = clients;
789   while (c != NULL)
790     {
791       if (0 != (c->options & options))
792         send_to_client (c, msg, can_drop);
793       c = c->next;
794     }
795 }
796
797
798 /**
799  * Handle CORE_INIT request.
800  */
801 static void
802 handle_client_init (void *cls,
803                     struct GNUNET_SERVER_Client *client,
804                     const struct GNUNET_MessageHeader *message)
805 {
806   const struct InitMessage *im;
807   struct InitReplyMessage irm;
808   struct Client *c;
809   uint16_t msize;
810   const uint16_t *types;
811   struct Neighbour *n;
812   struct ConnectNotifyMessage cnm;
813
814 #if DEBUG_CORE_CLIENT
815   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
816               "Client connecting to core service with `%s' message\n",
817               "INIT");
818 #endif
819   /* check that we don't have an entry already */
820   c = clients;
821   while (c != NULL)
822     {
823       if (client == c->client_handle)
824         {
825           GNUNET_break (0);
826           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
827           return;
828         }
829       c = c->next;
830     }
831   msize = ntohs (message->size);
832   if (msize < sizeof (struct InitMessage))
833     {
834       GNUNET_break (0);
835       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
836       return;
837     }
838   GNUNET_SERVER_notification_context_add (notifier, client);
839   im = (const struct InitMessage *) message;
840   types = (const uint16_t *) &im[1];
841   msize -= sizeof (struct InitMessage);
842   c = GNUNET_malloc (sizeof (struct Client) + msize);
843   c->client_handle = client;
844   c->next = clients;
845   clients = c;
846   memcpy (&c[1], types, msize);
847   c->types = (uint16_t *) & c[1];
848   c->options = ntohl (im->options);
849   c->tcnt = msize / sizeof (uint16_t);
850   /* send init reply message */
851   irm.header.size = htons (sizeof (struct InitReplyMessage));
852   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
853   irm.reserved = htonl (0);
854   memcpy (&irm.publicKey,
855           &my_public_key,
856           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
857 #if DEBUG_CORE_CLIENT
858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859               "Sending `%s' message to client.\n", "INIT_REPLY");
860 #endif
861   send_to_client (c, &irm.header, GNUNET_NO);
862   /* notify new client about existing neighbours */
863   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
864   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
865   n = neighbours;
866   while (n != NULL)
867     {
868       if (n->status == PEER_STATE_KEY_CONFIRMED)
869         {
870 #if DEBUG_CORE_CLIENT
871           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872                       "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
873 #endif
874           cnm.distance = htonl (n->last_distance);
875           cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
876           cnm.peer = n->peer;
877           send_to_client (c, &cnm.header, GNUNET_NO);
878         }
879       n = n->next;
880     }
881   GNUNET_SERVER_receive_done (client, GNUNET_OK);
882 }
883
884
885 /**
886  * A client disconnected, clean up.
887  *
888  * @param cls closure
889  * @param client identification of the client
890  */
891 static void
892 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
893 {
894   struct Client *pos;
895   struct Client *prev;
896
897   if (client == NULL)
898     return;
899 #if DEBUG_CORE_CLIENT
900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901               "Client has disconnected from core service.\n");
902 #endif
903   prev = NULL;
904   pos = clients;
905   while (pos != NULL)
906     {
907       if (client == pos->client_handle)
908         {
909           if (prev == NULL)
910             clients = pos->next;
911           else
912             prev->next = pos->next;
913           GNUNET_free (pos);
914           return;
915         }
916       prev = pos;
917       pos = pos->next;
918     }
919   /* client never sent INIT */
920 }
921
922
923 /**
924  * Handle REQUEST_INFO request.
925  */
926 static void
927 handle_client_request_info (void *cls,
928                             struct GNUNET_SERVER_Client *client,
929                             const struct GNUNET_MessageHeader *message)
930 {
931   const struct RequestInfoMessage *rcm;
932   struct Neighbour *n;
933   struct ConfigurationInfoMessage cim;
934   int reserv;
935   unsigned long long old_preference;
936   struct GNUNET_SERVER_TransmitContext *tc;
937
938 #if DEBUG_CORE_CLIENT
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "Core service receives `%s' request.\n", "REQUEST_INFO");
941 #endif
942   rcm = (const struct RequestInfoMessage *) message;
943   n = find_neighbour (&rcm->peer);
944   memset (&cim, 0, sizeof (cim));
945   if (n != NULL) 
946     {
947       update_window (GNUNET_YES,
948                      &n->available_send_window,
949                      &n->last_asw_update,
950                      n->bpm_out);
951       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
952       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
953                                n->bpm_out_external_limit);
954       reserv = ntohl (rcm->reserve_inbound);
955       if (reserv < 0)
956         {
957           n->available_recv_window += reserv;
958         }
959       else if (reserv > 0)
960         {
961           update_window (GNUNET_NO,
962                          &n->available_recv_window,
963                          &n->last_arw_update, n->bpm_in);
964           if (n->available_recv_window < reserv)
965             reserv = n->available_recv_window;
966           n->available_recv_window -= reserv;
967         }
968       old_preference = n->current_preference;
969       n->current_preference += GNUNET_ntohll(rcm->preference_change);
970       if (old_preference > n->current_preference) 
971         {
972           /* overflow; cap at maximum value */
973           n->current_preference = (unsigned long long) -1;
974         }
975       update_preference_sum (n->current_preference - old_preference);
976       cim.reserved_amount = htonl (reserv);
977       cim.bpm_in = htonl (n->bpm_in);
978       cim.bpm_out = htonl (n->bpm_out);
979       cim.preference = n->current_preference;
980     }
981   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
982   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
983   cim.peer = rcm->peer;
984
985 #if DEBUG_CORE_CLIENT
986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
988 #endif
989   tc = GNUNET_SERVER_transmit_context_create (client);
990   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
991   GNUNET_SERVER_transmit_context_run (tc,
992                                       GNUNET_TIME_UNIT_FOREVER_REL);
993 }
994
995
996 /**
997  * Free the given entry for the neighbour (it has
998  * already been removed from the list at this point).
999  *
1000  * @param n neighbour to free
1001  */
1002 static void
1003 free_neighbour (struct Neighbour *n)
1004 {
1005   struct MessageEntry *m;
1006
1007   if (n->pitr != NULL)
1008     {
1009       GNUNET_PEERINFO_iterate_cancel (n->pitr);
1010       n->pitr = NULL;
1011     }
1012   if (n->skm != NULL)
1013     {
1014       GNUNET_free (n->skm);
1015       n->skm = NULL;
1016     }
1017   while (NULL != (m = n->messages))
1018     {
1019       n->messages = m->next;
1020       GNUNET_free (m);
1021     }
1022   while (NULL != (m = n->encrypted_head))
1023     {
1024       n->encrypted_head = m->next;
1025       GNUNET_free (m);
1026     }
1027   if (NULL != n->th)
1028     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
1029   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1030     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1031   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
1032     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
1033   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
1034     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
1035   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1036     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1037   GNUNET_free_non_null (n->public_key);
1038   GNUNET_free_non_null (n->pending_ping);
1039   GNUNET_free_non_null (n->pending_pong);
1040   GNUNET_free (n);
1041 }
1042
1043
1044 /**
1045  * Consider freeing the given neighbour since we may not need
1046  * to keep it around anymore.
1047  *
1048  * @param n neighbour to consider discarding
1049  */
1050 static void
1051 consider_free_neighbour (struct Neighbour *n);
1052
1053
1054 /**
1055  * Task triggered when a neighbour entry might have gotten stale.
1056  *
1057  * @param cls the 'struct Neighbour'
1058  * @param tc scheduler context (not used)
1059  */
1060 static void
1061 consider_free_task (void *cls,
1062                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1063 {
1064   struct Neighbour *n = cls;
1065   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
1066   consider_free_neighbour (n);
1067 }
1068
1069
1070 /**
1071  * Consider freeing the given neighbour since we may not need
1072  * to keep it around anymore.
1073  *
1074  * @param n neighbour to consider discarding
1075  */
1076 static void
1077 consider_free_neighbour (struct Neighbour *n)
1078
1079   struct Neighbour *pos;
1080   struct Neighbour *prev;
1081   struct GNUNET_TIME_Relative left;
1082
1083   if ( (n->th != NULL) ||
1084        (n->pitr != NULL) ||
1085        (n->status == PEER_STATE_KEY_CONFIRMED) ||
1086        (GNUNET_YES == n->is_connected) )
1087     return; /* no chance */
1088   
1089   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
1090                                                                        MAX_PONG_DELAY));
1091   if (left.value > 0)
1092     {
1093       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
1094         GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
1095       n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
1096                                                          left,
1097                                                          &consider_free_task,
1098                                                          n);
1099       return;
1100     }
1101   /* actually free the neighbour... */
1102   prev = NULL;
1103   pos = neighbours;
1104   while (pos != n)
1105     {
1106       prev = pos;
1107       pos = pos->next;
1108     }
1109   if (prev == NULL)
1110     neighbours = n->next;
1111   else
1112     prev->next = n->next;
1113   GNUNET_assert (neighbour_count > 0);
1114   neighbour_count--;
1115   free_neighbour (n);
1116 }
1117
1118
1119 /**
1120  * Check if we have encrypted messages for the specified neighbour
1121  * pending, and if so, check with the transport about sending them
1122  * out.
1123  *
1124  * @param n neighbour to check.
1125  */
1126 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1127
1128
1129 /**
1130  * Function called when the transport service is ready to
1131  * receive an encrypted message for the respective peer
1132  *
1133  * @param cls neighbour to use message from
1134  * @param size number of bytes we can transmit
1135  * @param buf where to copy the message
1136  * @return number of bytes transmitted
1137  */
1138 static size_t
1139 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1140 {
1141   struct Neighbour *n = cls;
1142   struct MessageEntry *m;
1143   size_t ret;
1144   char *cbuf;
1145
1146   n->th = NULL;
1147   GNUNET_assert (NULL != (m = n->encrypted_head));
1148   n->encrypted_head = m->next;
1149   if (m->next == NULL)
1150     n->encrypted_tail = NULL;
1151   ret = 0;
1152   cbuf = buf;
1153   if (buf != NULL)
1154     {
1155       GNUNET_assert (size >= m->size);
1156       memcpy (cbuf, &m[1], m->size);
1157       ret = m->size;
1158       n->available_send_window -= m->size;
1159       process_encrypted_neighbour_queue (n);
1160
1161 #if DEBUG_CORE
1162       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1164                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1165                   ret, GNUNET_i2s (&n->peer));
1166 #endif
1167     }
1168   else
1169     {
1170       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171                   "Transmission of message of type %u and size %u failed\n",
1172                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1173                   m->size);
1174     }
1175   GNUNET_free (m);
1176   consider_free_neighbour (n);
1177   return ret;
1178 }
1179
1180
1181 /**
1182  * Check if we have plaintext messages for the specified neighbour
1183  * pending, and if so, consider batching and encrypting them (and
1184  * then trigger processing of the encrypted queue if needed).
1185  *
1186  * @param n neighbour to check.
1187  */
1188 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1189
1190
1191 /**
1192  * Check if we have encrypted messages for the specified neighbour
1193  * pending, and if so, check with the transport about sending them
1194  * out.
1195  *
1196  * @param n neighbour to check.
1197  */
1198 static void
1199 process_encrypted_neighbour_queue (struct Neighbour *n)
1200 {
1201   struct MessageEntry *m;
1202  
1203   if (n->th != NULL)
1204     return;  /* request already pending */
1205   if (n->encrypted_head == NULL)
1206     {
1207       /* encrypted queue empty, try plaintext instead */
1208       process_plaintext_neighbour_queue (n);
1209       return;
1210     }
1211 #if DEBUG_CORE
1212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1214               n->encrypted_head->size,
1215               GNUNET_i2s (&n->peer),
1216               GNUNET_TIME_absolute_get_remaining (n->
1217                                                   encrypted_head->deadline).
1218               value);
1219 #endif
1220   n->th =
1221     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1222                                             n->encrypted_head->size,
1223                                             n->encrypted_head->priority,
1224                                             GNUNET_TIME_absolute_get_remaining
1225                                             (n->encrypted_head->deadline),
1226                                             &notify_encrypted_transmit_ready,
1227                                             n);
1228   if (n->th == NULL)
1229     {
1230       /* message request too large or duplicate request */
1231       GNUNET_break (0);
1232       /* discard encrypted message */
1233       GNUNET_assert (NULL != (m = n->encrypted_head));
1234       n->encrypted_head = m->next;
1235       if (m->next == NULL)
1236         n->encrypted_tail = NULL;
1237       GNUNET_free (m);
1238       process_encrypted_neighbour_queue (n);
1239     }
1240 }
1241
1242
1243 /**
1244  * Decrypt size bytes from in and write the result to out.  Use the
1245  * key for inbound traffic of the given neighbour.  This function does
1246  * NOT do any integrity-checks on the result.
1247  *
1248  * @param n neighbour we are receiving from
1249  * @param iv initialization vector to use
1250  * @param in ciphertext
1251  * @param out plaintext
1252  * @param size size of in/out
1253  * @return GNUNET_OK on success
1254  */
1255 static int
1256 do_decrypt (struct Neighbour *n,
1257             const GNUNET_HashCode * iv,
1258             const void *in, void *out, size_t size)
1259 {
1260   if (size != (uint16_t) size)
1261     {
1262       GNUNET_break (0);
1263       return GNUNET_NO;
1264     }
1265   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1266       (n->status != PEER_STATE_KEY_CONFIRMED))
1267     {
1268       GNUNET_break_op (0);
1269       return GNUNET_SYSERR;
1270     }
1271   if (size !=
1272       GNUNET_CRYPTO_aes_decrypt (in,
1273                                  (uint16_t) size,
1274                                  &n->decrypt_key,
1275                                  (const struct
1276                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1277                                  out))
1278     {
1279       GNUNET_break (0);
1280       return GNUNET_SYSERR;
1281     }
1282 #if DEBUG_CORE
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Decrypted %u bytes from `%4s' using key %u\n",
1285               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1286 #endif
1287   return GNUNET_OK;
1288 }
1289
1290
1291 /**
1292  * Encrypt size bytes from in and write the result to out.  Use the
1293  * key for outbound traffic of the given neighbour.
1294  *
1295  * @param n neighbour we are sending to
1296  * @param iv initialization vector to use
1297  * @param in ciphertext
1298  * @param out plaintext
1299  * @param size size of in/out
1300  * @return GNUNET_OK on success
1301  */
1302 static int
1303 do_encrypt (struct Neighbour *n,
1304             const GNUNET_HashCode * iv,
1305             const void *in, void *out, size_t size)
1306 {
1307   if (size != (uint16_t) size)
1308     {
1309       GNUNET_break (0);
1310       return GNUNET_NO;
1311     }
1312   GNUNET_assert (size ==
1313                  GNUNET_CRYPTO_aes_encrypt (in,
1314                                             (uint16_t) size,
1315                                             &n->encrypt_key,
1316                                             (const struct
1317                                              GNUNET_CRYPTO_AesInitializationVector
1318                                              *) iv, out));
1319 #if DEBUG_CORE
1320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321               "Encrypted %u bytes for `%4s' using key %u\n", size,
1322               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1323 #endif
1324   return GNUNET_OK;
1325 }
1326
1327
1328 /**
1329  * Select messages for transmission.  This heuristic uses a combination
1330  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1331  * and priority-based discard (in case no feasible schedule exist) and
1332  * speculative optimization (defer any kind of transmission until
1333  * we either create a batch of significant size, 25% of max, or until
1334  * we are close to a deadline).  Furthermore, when scheduling the
1335  * heuristic also packs as many messages into the batch as possible,
1336  * starting with those with the earliest deadline.  Yes, this is fun.
1337  *
1338  * @param n neighbour to select messages from
1339  * @param size number of bytes to select for transmission
1340  * @param retry_time set to the time when we should try again
1341  *        (only valid if this function returns zero)
1342  * @return number of bytes selected, or 0 if we decided to
1343  *         defer scheduling overall; in that case, retry_time is set.
1344  */
1345 static size_t
1346 select_messages (struct Neighbour *n,
1347                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1348 {
1349   struct MessageEntry *pos;
1350   struct MessageEntry *min;
1351   struct MessageEntry *last;
1352   unsigned int min_prio;
1353   struct GNUNET_TIME_Absolute t;
1354   struct GNUNET_TIME_Absolute now;
1355   uint64_t delta;
1356   uint64_t avail;
1357   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1358   size_t off;
1359   int discard_low_prio;
1360
1361   GNUNET_assert (NULL != n->messages);
1362   now = GNUNET_TIME_absolute_get ();
1363   /* last entry in linked list of messages processed */
1364   last = NULL;
1365   /* should we remove the entry with the lowest
1366      priority from consideration for scheduling at the
1367      end of the loop? */
1368   discard_low_prio = GNUNET_YES;
1369   while (GNUNET_YES == discard_low_prio)
1370     {
1371       min = NULL;
1372       min_prio = -1;
1373       discard_low_prio = GNUNET_NO;
1374       /* calculate number of bytes available for transmission at time "t" */
1375       update_window (GNUNET_NO,
1376                      &n->available_send_window,
1377                      &n->last_asw_update,
1378                      n->bpm_out);
1379       avail = n->available_send_window;
1380       t = n->last_asw_update;
1381       /* how many bytes have we (hypothetically) scheduled so far */
1382       off = 0;
1383       /* maximum time we can wait before transmitting anything
1384          and still make all of our deadlines */
1385       slack = -1;
1386
1387       pos = n->messages;
1388       /* note that we use "*2" here because we want to look
1389          a bit further into the future; much more makes no
1390          sense since new message might be scheduled in the
1391          meantime... */
1392       while ((pos != NULL) && (off < size * 2))
1393         {
1394           if (pos->do_transmit == GNUNET_YES)
1395             {
1396               /* already removed from consideration */
1397               pos = pos->next;
1398               continue;
1399             }
1400           if (discard_low_prio == GNUNET_NO)
1401             {
1402               delta = pos->deadline.value;
1403               if (delta < t.value)
1404                 delta = 0;
1405               else
1406                 delta = t.value - delta;
1407               avail += delta * n->bpm_out / 1000 / 60;
1408               if (avail < pos->size)
1409                 {
1410                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1411                 }
1412               else
1413                 {
1414                   avail -= pos->size;
1415                   /* update slack, considering both its absolute deadline
1416                      and relative deadlines caused by other messages
1417                      with their respective load */
1418                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1419                   if ( (pos->deadline.value < now.value) ||
1420                        (GNUNET_YES == pos->got_slack) )                
1421                     {
1422                       slack = 0;
1423                     }
1424                   else
1425                     {
1426                       slack =
1427                         GNUNET_MIN (slack, pos->deadline.value - now.value);
1428                       pos->got_slack = GNUNET_YES;
1429                     }
1430                 }
1431             }
1432
1433           off += pos->size;
1434           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1435           if (pos->priority <= min_prio)
1436             {
1437               /* update min for discard */
1438               min_prio = pos->priority;
1439               min = pos;
1440             }
1441           pos = pos->next;
1442         }
1443       if (discard_low_prio)
1444         {
1445           GNUNET_assert (min != NULL);
1446           /* remove lowest-priority entry from consideration */
1447           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1448         }
1449       last = pos;
1450     }
1451   /* guard against sending "tiny" messages with large headers without
1452      urgent deadlines */
1453   if ( (slack > 1000) && (size > 4 * off) )
1454     {
1455       /* less than 25% of message would be filled with deadlines still
1456          being met if we delay by one second or more; so just wait for
1457          more data; but do not wait longer than 1s (since we don't want
1458          to delay messages for a really long time either). */
1459       retry_time->value = 1000;
1460       /* reset do_transmit values for next time */
1461       while (pos != last)
1462         {
1463           pos->do_transmit = GNUNET_NO;   
1464           pos = pos->next;
1465         }
1466 #if DEBUG_CORE
1467       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1468                   "Deferring transmission for 1s due to underfull message buffer size\n");
1469 #endif
1470       return 0;
1471     }
1472   /* select marked messages (up to size) for transmission */
1473   off = 0;
1474   pos = n->messages;
1475   while (pos != last)
1476     {
1477       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1478         {
1479           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1480           off += pos->size;
1481           size -= pos->size;
1482         }
1483       else
1484         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1485       pos = pos->next;
1486     }
1487 #if DEBUG_CORE
1488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1489               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1490               off, GNUNET_i2s (&n->peer));
1491 #endif
1492   return off;
1493 }
1494
1495
1496 /**
1497  * Batch multiple messages into a larger buffer.
1498  *
1499  * @param n neighbour to take messages from
1500  * @param buf target buffer
1501  * @param size size of buf
1502  * @param deadline set to transmission deadline for the result
1503  * @param retry_time set to the time when we should try again
1504  *        (only valid if this function returns zero)
1505  * @param priority set to the priority of the batch
1506  * @return number of bytes written to buf (can be zero)
1507  */
1508 static size_t
1509 batch_message (struct Neighbour *n,
1510                char *buf,
1511                size_t size,
1512                struct GNUNET_TIME_Absolute *deadline,
1513                struct GNUNET_TIME_Relative *retry_time,
1514                unsigned int *priority)
1515 {
1516   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1517   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1518   struct MessageEntry *pos;
1519   struct MessageEntry *prev;
1520   struct MessageEntry *next;
1521   size_t ret;
1522   
1523   ret = 0;
1524   *priority = 0;
1525   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1526   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1527   if (0 == select_messages (n, size, retry_time))
1528     {
1529       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1530                   "No messages selected, will try again in %llu ms\n",
1531                   retry_time->value);
1532       return 0;
1533     }
1534   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1535   ntm->distance = htonl (n->last_distance);
1536   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1537   ntm->peer = n->peer;
1538   
1539   pos = n->messages;
1540   prev = NULL;
1541   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1542     {
1543       next = pos->next;
1544       if (GNUNET_YES == pos->do_transmit)
1545         {
1546           GNUNET_assert (pos->size <= size);
1547           /* do notifications */
1548           /* FIXME: track if we have *any* client that wants
1549              full notifications and only do this if that is
1550              actually true */
1551           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1552             {
1553               memcpy (&ntm[1], &pos[1], pos->size);
1554               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1555                                         sizeof (struct GNUNET_MessageHeader));
1556               send_to_all_clients (&ntm->header,
1557                                    GNUNET_YES,
1558                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1559             }
1560           else
1561             {
1562               /* message too large for 'full' notifications, we do at
1563                  least the 'hdr' type */
1564               memcpy (&ntm[1],
1565                       &pos[1],
1566                       sizeof (struct GNUNET_MessageHeader));
1567             }
1568           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1569                                     pos->size);
1570           send_to_all_clients (&ntm->header,
1571                                GNUNET_YES,
1572                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1573 #if DEBUG_HANDSHAKE
1574           fprintf (stderr,
1575                    "Encrypting message of type %u\n",
1576                    ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
1577 #endif
1578           /* copy for encrypted transmission */
1579           memcpy (&buf[ret], &pos[1], pos->size);
1580           ret += pos->size;
1581           size -= pos->size;
1582           *priority += pos->priority;
1583 #if DEBUG_CORE
1584           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1585                       "Adding plaintext message with deadline %llu ms to batch\n",
1586                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1587 #endif
1588           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1589           GNUNET_free (pos);
1590           if (prev == NULL)
1591             n->messages = next;
1592           else
1593             prev->next = next;
1594         }
1595       else
1596         {
1597           prev = pos;
1598         }
1599       pos = next;
1600     }
1601 #if DEBUG_CORE
1602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603               "Deadline for message batch is %llu ms\n",
1604               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1605 #endif
1606   return ret;
1607 }
1608
1609
1610 /**
1611  * Remove messages with deadlines that have long expired from
1612  * the queue.
1613  *
1614  * @param n neighbour to inspect
1615  */
1616 static void
1617 discard_expired_messages (struct Neighbour *n)
1618 {
1619   struct MessageEntry *prev;
1620   struct MessageEntry *next;
1621   struct MessageEntry *pos;
1622   struct GNUNET_TIME_Absolute now;
1623   struct GNUNET_TIME_Relative delta;
1624
1625   now = GNUNET_TIME_absolute_get ();
1626   prev = NULL;
1627   pos = n->messages;
1628   while (pos != NULL) 
1629     {
1630       next = pos->next;
1631       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1632       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1633         {
1634 #if DEBUG_CORE
1635           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1636                       "Message is %llu ms past due, discarding.\n",
1637                       delta.value);
1638 #endif
1639           if (prev == NULL)
1640             n->messages = next;
1641           else
1642             prev->next = next;
1643           GNUNET_free (pos);
1644         }
1645       else
1646         prev = pos;
1647       pos = next;
1648     }
1649 }
1650
1651
1652 /**
1653  * Signature of the main function of a task.
1654  *
1655  * @param cls closure
1656  * @param tc context information (why was this task triggered now)
1657  */
1658 static void
1659 retry_plaintext_processing (void *cls,
1660                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1661 {
1662   struct Neighbour *n = cls;
1663
1664   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1665   process_plaintext_neighbour_queue (n);
1666 }
1667
1668
1669 /**
1670  * Send our key (and encrypted PING) to the other peer.
1671  *
1672  * @param n the other peer
1673  */
1674 static void send_key (struct Neighbour *n);
1675
1676 /**
1677  * Task that will retry "send_key" if our previous attempt failed
1678  * to yield a PONG.
1679  */
1680 static void
1681 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1682 {
1683   struct Neighbour *n = cls;
1684
1685   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1686   n->set_key_retry_frequency =
1687     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1688   send_key (n);
1689 }
1690
1691
1692 /**
1693  * Check if we have plaintext messages for the specified neighbour
1694  * pending, and if so, consider batching and encrypting them (and
1695  * then trigger processing of the encrypted queue if needed).
1696  *
1697  * @param n neighbour to check.
1698  */
1699 static void
1700 process_plaintext_neighbour_queue (struct Neighbour *n)
1701 {
1702   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1703   size_t used;
1704   size_t esize;
1705   struct EncryptedMessage *em;  /* encrypted message */
1706   struct EncryptedMessage *ph;  /* plaintext header */
1707   struct MessageEntry *me;
1708   unsigned int priority;
1709   struct GNUNET_TIME_Absolute deadline;
1710   struct GNUNET_TIME_Relative retry_time;
1711
1712   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1713     {
1714       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1715       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1716     }
1717   switch (n->status)
1718     {
1719     case PEER_STATE_DOWN:
1720       send_key (n);
1721 #if DEBUG_CORE
1722       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1724                   GNUNET_i2s(&n->peer));
1725 #endif
1726       return;
1727     case PEER_STATE_KEY_SENT:
1728       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1729         n->retry_set_key_task
1730           = GNUNET_SCHEDULER_add_delayed (sched,
1731                                           n->set_key_retry_frequency,
1732                                           &set_key_retry_task, n);    
1733 #if DEBUG_CORE
1734       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1735                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1736                   GNUNET_i2s(&n->peer));
1737 #endif
1738       return;
1739     case PEER_STATE_KEY_RECEIVED:
1740       if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
1741         n->retry_set_key_task
1742           = GNUNET_SCHEDULER_add_delayed (sched,
1743                                           n->set_key_retry_frequency,
1744                                           &set_key_retry_task, n);        
1745 #if DEBUG_CORE
1746       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1748                   GNUNET_i2s(&n->peer));
1749 #endif
1750       return;
1751     case PEER_STATE_KEY_CONFIRMED:
1752       /* ready to continue */
1753       break;
1754     }
1755   discard_expired_messages (n);
1756   if (n->messages == NULL)
1757     {
1758 #if DEBUG_CORE
1759       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1760                   "Plaintext message queue for `%4s' is empty.\n",
1761                   GNUNET_i2s(&n->peer));
1762 #endif
1763       return;                   /* no pending messages */
1764     }
1765   if (n->encrypted_head != NULL)
1766     {
1767 #if DEBUG_CORE
1768       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1769                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1770                   GNUNET_i2s(&n->peer));
1771 #endif
1772       return;                   /* wait for messages already encrypted to be
1773                                    processed first! */
1774     }
1775   ph = (struct EncryptedMessage *) pbuf;
1776   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1777   priority = 0;
1778   used = sizeof (struct EncryptedMessage);
1779   used += batch_message (n,
1780                          &pbuf[used],
1781                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1782                          &deadline, &retry_time, &priority);
1783   if (used == sizeof (struct EncryptedMessage))
1784     {
1785 #if DEBUG_CORE
1786       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1788                   GNUNET_i2s(&n->peer));
1789 #endif
1790       /* no messages selected for sending, try again later... */
1791       n->retry_plaintext_task =
1792         GNUNET_SCHEDULER_add_delayed (sched,
1793                                       retry_time,
1794                                       &retry_plaintext_processing, n);
1795       return;
1796     }
1797   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1798   ph->inbound_bpm_limit = htonl (n->bpm_in);
1799   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1800
1801   /* setup encryption message header */
1802   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1803   me->deadline = deadline;
1804   me->priority = priority;
1805   me->size = used;
1806   em = (struct EncryptedMessage *) &me[1];
1807   em->header.size = htons (used);
1808   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1809   em->reserved = htonl (0);
1810   esize = used - ENCRYPTED_HEADER_SIZE;
1811   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1812   /* encrypt */
1813 #if DEBUG_CORE
1814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1815               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1816               esize,
1817               GNUNET_i2s(&n->peer),
1818               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1819 #endif
1820   GNUNET_assert (GNUNET_OK ==
1821                  do_encrypt (n,
1822                              &em->plaintext_hash,
1823                              &ph->sequence_number,
1824                              &em->sequence_number, esize));
1825   /* append to transmission list */
1826   if (n->encrypted_tail == NULL)
1827     n->encrypted_head = me;
1828   else
1829     n->encrypted_tail->next = me;
1830   n->encrypted_tail = me;
1831   process_encrypted_neighbour_queue (n);
1832 }
1833
1834
1835 /**
1836  * Function that recalculates the bandwidth quota for the
1837  * given neighbour and transmits it to the transport service.
1838  * 
1839  * @param cls neighbour for the quota update
1840  * @param tc context
1841  */
1842 static void
1843 neighbour_quota_update (void *cls,
1844                         const struct GNUNET_SCHEDULER_TaskContext *tc);
1845
1846
1847 /**
1848  * Schedule the task that will recalculate the bandwidth
1849  * quota for this peer (and possibly force a disconnect of
1850  * idle peers by calculating a bandwidth of zero).
1851  */
1852 static void
1853 schedule_quota_update (struct Neighbour *n)
1854 {
1855   GNUNET_assert (n->quota_update_task ==
1856                  GNUNET_SCHEDULER_NO_TASK);
1857   n->quota_update_task
1858     = GNUNET_SCHEDULER_add_delayed (sched,
1859                                     QUOTA_UPDATE_FREQUENCY,
1860                                     &neighbour_quota_update,
1861                                     n);
1862 }
1863
1864
1865 /**
1866  * Initialize a new 'struct Neighbour'.
1867  *
1868  * @param pid ID of the new neighbour
1869  * @return handle for the new neighbour
1870  */
1871 static struct Neighbour *
1872 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1873 {
1874   struct Neighbour *n;
1875   struct GNUNET_TIME_Absolute now;
1876
1877   n = GNUNET_malloc (sizeof (struct Neighbour));
1878   n->next = neighbours;
1879   neighbours = n;
1880   neighbour_count++;
1881   n->peer = *pid;
1882   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1883   now = GNUNET_TIME_absolute_get ();
1884   n->encrypt_key_created = now;
1885   n->last_activity = now;
1886   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1887   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1888   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1889   n->bpm_out_internal_limit = (uint32_t) - 1;
1890   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1891   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1892                                                 (uint32_t) - 1);
1893   schedule_quota_update (n);
1894   return n;
1895 }
1896
1897
1898
1899
1900
1901 /**
1902  * Handle CORE_SEND request.
1903  *
1904  * @param cls unused
1905  * @param client the client issuing the request
1906  * @param message the "struct SendMessage"
1907  */
1908 static void
1909 handle_client_send (void *cls,
1910                     struct GNUNET_SERVER_Client *client,
1911                     const struct GNUNET_MessageHeader *message)
1912 {
1913   const struct SendMessage *sm;
1914   const struct GNUNET_MessageHeader *mh;
1915   struct Neighbour *n;
1916   struct MessageEntry *prev;
1917   struct MessageEntry *pos;
1918   struct MessageEntry *e; 
1919   struct MessageEntry *min_prio_entry;
1920   struct MessageEntry *min_prio_prev;
1921   unsigned int min_prio;
1922   unsigned int queue_size;
1923   uint16_t msize;
1924
1925   msize = ntohs (message->size);
1926   if (msize <
1927       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1928     {
1929       GNUNET_break (0);
1930       if (client != NULL)
1931         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1932       return;
1933     }
1934   sm = (const struct SendMessage *) message;
1935   msize -= sizeof (struct SendMessage);
1936   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1937   if (msize != ntohs (mh->size))
1938     {
1939       GNUNET_break (0);
1940       if (client != NULL)
1941         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1942       return;
1943     }
1944   n = find_neighbour (&sm->peer);
1945   if (n == NULL)
1946     n = create_neighbour (&sm->peer);
1947 #if DEBUG_CORE
1948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1949               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1950               "SEND",
1951               msize, 
1952               GNUNET_i2s (&sm->peer));
1953 #endif
1954   /* bound queue size */
1955   discard_expired_messages (n);
1956   min_prio = (unsigned int) -1;
1957   min_prio_entry = NULL;
1958   min_prio_prev = NULL;
1959   queue_size = 0;
1960   prev = NULL;
1961   pos = n->messages;
1962   while (pos != NULL) 
1963     {
1964       if (pos->priority < min_prio)
1965         {
1966           min_prio_entry = pos;
1967           min_prio_prev = prev;
1968           min_prio = pos->priority;
1969         }
1970       queue_size++;
1971       prev = pos;
1972       pos = pos->next;
1973     }
1974   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1975     {
1976       /* queue full */
1977       if (ntohl(sm->priority) <= min_prio)
1978         {
1979           /* discard new entry */
1980 #if DEBUG_CORE
1981           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1982                       "Queue full, discarding new request\n");
1983 #endif
1984           if (client != NULL)
1985             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1986           return;
1987         }
1988       /* discard "min_prio_entry" */
1989 #if DEBUG_CORE
1990       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991                   "Queue full, discarding existing older request\n");
1992 #endif
1993       if (min_prio_prev == NULL)
1994         n->messages = min_prio_entry->next;
1995       else
1996         min_prio_prev->next = min_prio_entry->next;      
1997       GNUNET_free (min_prio_entry);     
1998     }
1999
2000 #if DEBUG_CORE
2001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002               "Adding transmission request for `%4s' to queue\n",
2003               GNUNET_i2s (&sm->peer));
2004 #endif  
2005   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
2006   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
2007   e->priority = ntohl (sm->priority);
2008   e->size = msize;
2009   memcpy (&e[1], mh, msize);
2010
2011   /* insert, keep list sorted by deadline */
2012   prev = NULL;
2013   pos = n->messages;
2014   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
2015     {
2016       prev = pos;
2017       pos = pos->next;
2018     }
2019   if (prev == NULL)
2020     n->messages = e;
2021   else
2022     prev->next = e;
2023   e->next = pos;
2024
2025   /* consider scheduling now */
2026   process_plaintext_neighbour_queue (n);
2027   if (client != NULL)
2028     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2029 }
2030
2031
2032 /**
2033  * Handle CORE_REQUEST_CONNECT request.
2034  *
2035  * @param cls unused
2036  * @param client the client issuing the request
2037  * @param message the "struct ConnectMessage"
2038  */
2039 static void
2040 handle_client_request_connect (void *cls,
2041                                struct GNUNET_SERVER_Client *client,
2042                                const struct GNUNET_MessageHeader *message)
2043 {
2044   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2045   struct Neighbour *n;
2046
2047   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2048   n = find_neighbour (&cm->peer);
2049   if (n == NULL)
2050     n = create_neighbour (&cm->peer);
2051   if ( (n->is_connected) ||
2052        (n->th != NULL) )
2053     return; /* already connected, or at least trying */
2054 #if DEBUG_CORE
2055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2056               "Core received `%s' request for `%4s', will try to establish connection\n",
2057               "REQUEST_CONNECT",
2058               GNUNET_i2s (&cm->peer));
2059 #endif
2060   /* ask transport to connect to the peer */
2061   /* FIXME: timeout zero OK? */
2062   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
2063                                                   &cm->peer,
2064                                                   0, 0,
2065                                                   GNUNET_TIME_UNIT_ZERO,
2066                                                   NULL,
2067                                                   NULL);
2068 }
2069
2070
2071 /**
2072  * List of handlers for the messages understood by this
2073  * service.
2074  */
2075 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2076   {&handle_client_init, NULL,
2077    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2078   {&handle_client_request_info, NULL,
2079    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2080    sizeof (struct RequestInfoMessage)},
2081   {&handle_client_send, NULL,
2082    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2083   {&handle_client_request_connect, NULL,
2084    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2085    sizeof (struct ConnectMessage)},
2086   {NULL, NULL, 0, 0}
2087 };
2088
2089
2090 /**
2091  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2092  * the neighbour's struct and retry send_key.  Or, if we did not get a
2093  * HELLO, just do nothing.
2094  *
2095  * @param cls the 'struct Neighbour' to retry sending the key for
2096  * @param peer the peer for which this is the HELLO
2097  * @param hello HELLO message of that peer
2098  * @param trust amount of trust we currently have in that peer
2099  */
2100 static void
2101 process_hello_retry_send_key (void *cls,
2102                               const struct GNUNET_PeerIdentity *peer,
2103                               const struct GNUNET_HELLO_Message *hello,
2104                               uint32_t trust)
2105 {
2106   struct Neighbour *n = cls;
2107
2108   if (peer == NULL)
2109     {
2110 #if DEBUG_CORE
2111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112                   "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
2113 #endif
2114       n->pitr = NULL;
2115       if (n->public_key != NULL)
2116         {
2117           send_key (n);
2118         }
2119       else
2120         {
2121           if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
2122             n->retry_set_key_task
2123               = GNUNET_SCHEDULER_add_delayed (sched,
2124                                               n->set_key_retry_frequency,
2125                                               &set_key_retry_task, n);
2126         }
2127       return;
2128     }
2129
2130 #if DEBUG_CORE
2131   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2132               "Entered `process_hello_retry_send_key' for peer `%4s'\n",
2133               GNUNET_i2s (peer));
2134 #endif
2135   if (n->public_key != NULL)
2136     {
2137 #if DEBUG_CORE
2138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2139               "already have public key for peer %s!! (so why are we here?)\n",
2140               GNUNET_i2s (peer));
2141 #endif
2142       return;
2143     }
2144
2145 #if DEBUG_CORE
2146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2147               "Received new `%s' message for `%4s', initiating key exchange.\n",
2148               "HELLO",
2149               GNUNET_i2s (peer));
2150 #endif
2151   n->public_key =
2152     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2153   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2154     {
2155       GNUNET_free (n->public_key);
2156       n->public_key = NULL;
2157 #if DEBUG_CORE
2158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2159               "GNUNET_HELLO_get_key returned awfully\n");
2160 #endif
2161       return;
2162     }
2163 }
2164
2165
2166 /**
2167  * Send our key (and encrypted PING) to the other peer.
2168  *
2169  * @param n the other peer
2170  */
2171 static void
2172 send_key (struct Neighbour *n)
2173 {
2174   struct SetKeyMessage *sm;
2175   struct MessageEntry *me;
2176   struct PingMessage pp;
2177   struct PingMessage *pm;
2178
2179   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2180        (n->pitr != NULL) )
2181     {
2182 #if DEBUG_CORE
2183       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2184                   "Key exchange in progress with `%4s'.\n",
2185                   GNUNET_i2s (&n->peer));
2186 #endif
2187       return; /* already in progress */
2188     }
2189
2190 #if DEBUG_CORE
2191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2192               "Asked to perform key exchange with `%4s'.\n",
2193               GNUNET_i2s (&n->peer));
2194 #endif
2195   if (n->public_key == NULL)
2196     {
2197       /* lookup n's public key, then try again */
2198 #if DEBUG_CORE
2199       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2200                   "Lacking public key for `%4s', trying to obtain one (send_key).\n",
2201                   GNUNET_i2s (&n->peer));
2202 #endif
2203       GNUNET_assert (n->pitr == NULL);
2204       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2205                                          sched,
2206                                          &n->peer,
2207                                          0,
2208                                          GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
2209                                          &process_hello_retry_send_key, n);
2210       return;
2211     }
2212   /* first, set key message */
2213   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2214                       sizeof (struct SetKeyMessage));
2215   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2216   me->priority = SET_KEY_PRIORITY;
2217   me->size = sizeof (struct SetKeyMessage);
2218   if (n->encrypted_head == NULL)
2219     n->encrypted_head = me;
2220   else
2221     n->encrypted_tail->next = me;
2222   n->encrypted_tail = me;
2223   sm = (struct SetKeyMessage *) &me[1];
2224   sm->header.size = htons (sizeof (struct SetKeyMessage));
2225   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2226   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2227                                         PEER_STATE_KEY_SENT : n->status));
2228   sm->purpose.size =
2229     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2230            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2231            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2232            sizeof (struct GNUNET_PeerIdentity));
2233   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2234   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2235   sm->target = n->peer;
2236   GNUNET_assert (GNUNET_OK ==
2237                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2238                                             sizeof (struct
2239                                                     GNUNET_CRYPTO_AesSessionKey),
2240                                             n->public_key,
2241                                             &sm->encrypted_key));
2242   GNUNET_assert (GNUNET_OK ==
2243                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2244                                          &sm->signature));
2245
2246   /* second, encrypted PING message */
2247   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2248                       sizeof (struct PingMessage));
2249   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2250   me->priority = PING_PRIORITY;
2251   me->size = sizeof (struct PingMessage);
2252   n->encrypted_tail->next = me;
2253   n->encrypted_tail = me;
2254   pm = (struct PingMessage *) &me[1];
2255   pm->header.size = htons (sizeof (struct PingMessage));
2256   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2257   pp.challenge = htonl (n->ping_challenge);
2258   pp.target = n->peer;
2259 #if DEBUG_CORE
2260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2262               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2264               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2265               "PING",
2266               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2267 #endif
2268   do_encrypt (n,
2269               &n->peer.hashPubKey,
2270               &pp.challenge,
2271               &pm->challenge,
2272               sizeof (struct PingMessage) -
2273               sizeof (struct GNUNET_MessageHeader));
2274   /* update status */
2275   switch (n->status)
2276     {
2277     case PEER_STATE_DOWN:
2278       n->status = PEER_STATE_KEY_SENT;
2279       break;
2280     case PEER_STATE_KEY_SENT:
2281       break;
2282     case PEER_STATE_KEY_RECEIVED:
2283       break;
2284     case PEER_STATE_KEY_CONFIRMED:
2285       break;
2286     default:
2287       GNUNET_break (0);
2288       break;
2289     }
2290 #if DEBUG_CORE
2291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2292               "Have %llu ms left for `%s' transmission.\n",
2293               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2294               "SET_KEY");
2295 #endif
2296   /* trigger queue processing */
2297   process_encrypted_neighbour_queue (n);
2298   if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
2299        (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
2300     n->retry_set_key_task
2301       = GNUNET_SCHEDULER_add_delayed (sched,
2302                                       n->set_key_retry_frequency,
2303                                       &set_key_retry_task, n);    
2304 }
2305
2306
2307 /**
2308  * We received a SET_KEY message.  Validate and update
2309  * our key material and status.
2310  *
2311  * @param n the neighbour from which we received message m
2312  * @param m the set key message we received
2313  */
2314 static void
2315 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2316
2317
2318 /**
2319  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2320  * the neighbour's struct and retry handling the set_key message.  Or,
2321  * if we did not get a HELLO, just free the set key message.
2322  *
2323  * @param cls pointer to the set key message
2324  * @param peer the peer for which this is the HELLO
2325  * @param hello HELLO message of that peer
2326  * @param trust amount of trust we currently have in that peer
2327  */
2328 static void
2329 process_hello_retry_handle_set_key (void *cls,
2330                                     const struct GNUNET_PeerIdentity *peer,
2331                                     const struct GNUNET_HELLO_Message *hello,
2332                                     uint32_t trust)
2333 {
2334   struct Neighbour *n = cls;
2335   struct SetKeyMessage *sm = n->skm;
2336
2337   if (peer == NULL)
2338     {
2339       GNUNET_free (sm);
2340       n->skm = NULL;
2341       n->pitr = NULL;
2342       return;
2343     }
2344   if (n->public_key != NULL)
2345     return;                     /* multiple HELLOs match!? */
2346   n->public_key =
2347     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2348   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2349     {
2350       GNUNET_break_op (0);
2351       GNUNET_free (n->public_key);
2352       n->public_key = NULL;
2353       return;
2354     }
2355 #if DEBUG_CORE
2356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2357               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2358               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2359 #endif
2360   handle_set_key (n, sm);
2361 }
2362
2363
2364 /**
2365  * We received a PING message.  Validate and transmit
2366  * PONG.
2367  *
2368  * @param n sender of the PING
2369  * @param m the encrypted PING message itself
2370  */
2371 static void
2372 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2373 {
2374   struct PingMessage t;
2375   struct PingMessage *tp;
2376   struct MessageEntry *me;
2377
2378 #if DEBUG_CORE
2379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380               "Core service receives `%s' request from `%4s'.\n",
2381               "PING", GNUNET_i2s (&n->peer));
2382 #endif
2383   if (GNUNET_OK !=
2384       do_decrypt (n,
2385                   &my_identity.hashPubKey,
2386                   &m->challenge,
2387                   &t.challenge,
2388                   sizeof (struct PingMessage) -
2389                   sizeof (struct GNUNET_MessageHeader)))
2390     return;
2391 #if DEBUG_CORE
2392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2394               "PING",
2395               GNUNET_i2s (&t.target),
2396               ntohl (t.challenge), n->decrypt_key.crc32);
2397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2398               "Target of `%s' request is `%4s'.\n",
2399               "PING", GNUNET_i2s (&t.target));
2400 #endif
2401   if (0 != memcmp (&t.target,
2402                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2403     {
2404       GNUNET_break_op (0);
2405       return;
2406     }
2407   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2408                       sizeof (struct PingMessage));
2409   if (n->encrypted_tail != NULL)
2410     n->encrypted_tail->next = me;
2411   else
2412     {
2413       n->encrypted_tail = me;
2414       n->encrypted_head = me;
2415     }
2416   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2417   me->priority = PONG_PRIORITY;
2418   me->size = sizeof (struct PingMessage);
2419   tp = (struct PingMessage *) &me[1];
2420   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2421   tp->header.size = htons (sizeof (struct PingMessage));
2422   do_encrypt (n,
2423               &my_identity.hashPubKey,
2424               &t.challenge,
2425               &tp->challenge,
2426               sizeof (struct PingMessage) -
2427               sizeof (struct GNUNET_MessageHeader));
2428 #if DEBUG_CORE
2429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2431               ntohl (t.challenge), n->encrypt_key.crc32);
2432 #endif
2433   /* trigger queue processing */
2434   process_encrypted_neighbour_queue (n);
2435 }
2436
2437
2438 /**
2439  * We received a PONG message.  Validate and update our status.
2440  *
2441  * @param n sender of the PONG
2442  * @param m the encrypted PONG message itself
2443  */
2444 static void
2445 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2446 {
2447   struct PingMessage t;
2448   struct ConnectNotifyMessage cnm;
2449
2450 #if DEBUG_CORE
2451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452               "Core service receives `%s' request from `%4s'.\n",
2453               "PONG", GNUNET_i2s (&n->peer));
2454 #endif
2455   if (GNUNET_OK !=
2456       do_decrypt (n,
2457                   &n->peer.hashPubKey,
2458                   &m->challenge,
2459                   &t.challenge,
2460                   sizeof (struct PingMessage) -
2461                   sizeof (struct GNUNET_MessageHeader)))
2462     return;
2463 #if DEBUG_CORE
2464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2465               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2466               "PONG",
2467               GNUNET_i2s (&t.target),
2468               ntohl (t.challenge), n->decrypt_key.crc32);
2469 #endif
2470   if ((0 != memcmp (&t.target,
2471                     &n->peer,
2472                     sizeof (struct GNUNET_PeerIdentity))) ||
2473       (n->ping_challenge != ntohl (t.challenge)))
2474     {
2475       /* PONG malformed */
2476 #if DEBUG_CORE
2477       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2478                   "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
2479                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2481                   "Received malformed `%s' received from `%4s' with challenge %u\n",
2482                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2483 #endif
2484       GNUNET_break_op (0);
2485       return;
2486     }
2487   switch (n->status)
2488     {
2489     case PEER_STATE_DOWN:
2490       GNUNET_break (0);         /* should be impossible */
2491       return;
2492     case PEER_STATE_KEY_SENT:
2493       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2494       return;
2495     case PEER_STATE_KEY_RECEIVED:
2496       n->status = PEER_STATE_KEY_CONFIRMED;
2497 #if DEBUG_CORE
2498       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2499                   "Confirmed key via `%s' message for peer `%4s'\n",
2500                   "PONG", GNUNET_i2s (&n->peer));
2501 #endif
2502       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2503         {
2504           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2505           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2506         }      
2507       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2508       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2509       cnm.distance = htonl (n->last_distance);
2510       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2511       cnm.peer = n->peer;
2512       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2513       process_encrypted_neighbour_queue (n);
2514       break;
2515     case PEER_STATE_KEY_CONFIRMED:
2516       /* duplicate PONG? */
2517       break;
2518     default:
2519       GNUNET_break (0);
2520       break;
2521     }
2522 }
2523
2524
2525 /**
2526  * We received a SET_KEY message.  Validate and update
2527  * our key material and status.
2528  *
2529  * @param n the neighbour from which we received message m
2530  * @param m the set key message we received
2531  */
2532 static void
2533 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2534 {
2535   struct SetKeyMessage *m_cpy;
2536   struct GNUNET_TIME_Absolute t;
2537   struct GNUNET_CRYPTO_AesSessionKey k;
2538   struct PingMessage *ping;
2539   struct PingMessage *pong;
2540   enum PeerStateMachine sender_status;
2541
2542 #if DEBUG_CORE
2543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2544               "Core service receives `%s' request from `%4s'.\n",
2545               "SET_KEY", GNUNET_i2s (&n->peer));
2546 #endif
2547   if (n->public_key == NULL)
2548     {
2549       if (n->pitr != NULL)
2550         {
2551 #if DEBUG_CORE
2552           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2553                       "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
2554                       "SET_KEY");
2555 #endif
2556           return;
2557         }
2558 #if DEBUG_CORE
2559       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2560                   "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
2561 #endif
2562       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2563       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2564       /* lookup n's public key, then try again */
2565       GNUNET_assert (n->skm == NULL);
2566       n->skm = m_cpy;
2567       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2568                                          sched,
2569                                          &n->peer,
2570                                          0,
2571                                          GNUNET_TIME_UNIT_MINUTES,
2572                                          &process_hello_retry_handle_set_key, n);
2573       return;
2574     }
2575   if (0 != memcmp (&m->target,
2576                    &my_identity,
2577                    sizeof (struct GNUNET_PeerIdentity)))
2578     {
2579       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2580                   _("Received `%s' message that was not for me.  Ignoring.\n"),
2581                   "SET_KEY");
2582       return;
2583     }
2584   if ((ntohl (m->purpose.size) !=
2585        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2586        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2587        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2588        sizeof (struct GNUNET_PeerIdentity)) ||
2589       (GNUNET_OK !=
2590        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2591                                  &m->purpose, &m->signature, n->public_key)))
2592     {
2593       /* invalid signature */
2594       GNUNET_break_op (0);
2595       return;
2596     }
2597   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2598   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2599        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2600       (t.value < n->decrypt_key_created.value))
2601     {
2602       /* this could rarely happen due to massive re-ordering of
2603          messages on the network level, but is most likely either
2604          a bug or some adversary messing with us.  Report. */
2605       GNUNET_break_op (0);
2606       return;
2607     }
2608 #if DEBUG_CORE
2609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2610 #endif  
2611   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2612                                   &m->encrypted_key,
2613                                   &k,
2614                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2615        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2616       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2617     {
2618       /* failed to decrypt !? */
2619       GNUNET_break_op (0);
2620       return;
2621     }
2622
2623   n->decrypt_key = k;
2624   if (n->decrypt_key_created.value != t.value)
2625     {
2626       /* fresh key, reset sequence numbers */
2627       n->last_sequence_number_received = 0;
2628       n->last_packets_bitmap = 0;
2629       n->decrypt_key_created = t;
2630     }
2631   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2632   switch (n->status)
2633     {
2634     case PEER_STATE_DOWN:
2635       n->status = PEER_STATE_KEY_RECEIVED;
2636 #if DEBUG_CORE
2637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2638                   "Responding to `%s' with my own key.\n", "SET_KEY");
2639 #endif
2640       send_key (n);
2641       break;
2642     case PEER_STATE_KEY_SENT:
2643     case PEER_STATE_KEY_RECEIVED:
2644       n->status = PEER_STATE_KEY_RECEIVED;
2645       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2646           (sender_status != PEER_STATE_KEY_CONFIRMED))
2647         {
2648 #if DEBUG_CORE
2649           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2650                       "Responding to `%s' with my own key (other peer has status %u).\n",
2651                       "SET_KEY", sender_status);
2652 #endif
2653           send_key (n);
2654         }
2655       break;
2656     case PEER_STATE_KEY_CONFIRMED:
2657       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2658           (sender_status != PEER_STATE_KEY_CONFIRMED))
2659         {         
2660 #if DEBUG_CORE
2661           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2662                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2663                       "SET_KEY", sender_status);
2664 #endif
2665           send_key (n);
2666         }
2667       break;
2668     default:
2669       GNUNET_break (0);
2670       break;
2671     }
2672   if (n->pending_ping != NULL)
2673     {
2674       ping = n->pending_ping;
2675       n->pending_ping = NULL;
2676       handle_ping (n, ping);
2677       GNUNET_free (ping);
2678     }
2679   if (n->pending_pong != NULL)
2680     {
2681       pong = n->pending_pong;
2682       n->pending_pong = NULL;
2683       handle_pong (n, pong);
2684       GNUNET_free (pong);
2685     }
2686 }
2687
2688
2689 /**
2690  * Send a P2P message to a client.
2691  *
2692  * @param sender who sent us the message?
2693  * @param client who should we give the message to?
2694  * @param m contains the message to transmit
2695  * @param msize number of bytes in buf to transmit
2696  */
2697 static void
2698 send_p2p_message_to_client (struct Neighbour *sender,
2699                             struct Client *client,
2700                             const void *m, size_t msize)
2701 {
2702   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2703   struct NotifyTrafficMessage *ntm;
2704
2705 #if DEBUG_CORE
2706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707               "Core service passes message from `%4s' of type %u to client.\n",
2708               GNUNET_i2s(&sender->peer),
2709               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2710 #endif
2711   ntm = (struct NotifyTrafficMessage *) buf;
2712   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2713   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2714   ntm->distance = htonl (sender->last_distance);
2715   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2716   ntm->peer = sender->peer;
2717   memcpy (&ntm[1], m, msize);
2718   send_to_client (client, &ntm->header, GNUNET_YES);
2719 }
2720
2721
2722 /**
2723  * Deliver P2P message to interested clients.
2724  *
2725  * @param sender who sent us the message?
2726  * @param m the message
2727  * @param msize size of the message (including header)
2728  */
2729 static void
2730 deliver_message (struct Neighbour *sender,
2731                  const struct GNUNET_MessageHeader *m, size_t msize)
2732 {
2733   struct Client *cpos;
2734   uint16_t type;
2735   unsigned int tpos;
2736   int deliver_full;
2737
2738   type = ntohs (m->type);
2739 #if DEBUG_HANDSHAKE
2740   fprintf (stderr,
2741            "Received encapsulated message of type %u from `%4s'\n",
2742            type,
2743            GNUNET_i2s (&sender->peer));
2744 #endif
2745   cpos = clients;
2746   while (cpos != NULL)
2747     {
2748       deliver_full = GNUNET_NO;
2749       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2750         deliver_full = GNUNET_YES;
2751       else
2752         {
2753           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2754             {
2755               if (type != cpos->types[tpos])
2756                 continue;
2757               deliver_full = GNUNET_YES;
2758               break;
2759             }
2760         }
2761       if (GNUNET_YES == deliver_full)
2762         send_p2p_message_to_client (sender, cpos, m, msize);
2763       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2764         send_p2p_message_to_client (sender, cpos, m,
2765                                     sizeof (struct GNUNET_MessageHeader));
2766       cpos = cpos->next;
2767     }
2768 }
2769
2770
2771 /**
2772  * Align P2P message and then deliver to interested clients.
2773  *
2774  * @param sender who sent us the message?
2775  * @param buffer unaligned (!) buffer containing message
2776  * @param msize size of the message (including header)
2777  */
2778 static void
2779 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2780 {
2781   char abuf[msize];
2782
2783   /* TODO: call to statistics? */
2784   memcpy (abuf, buffer, msize);
2785   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2786 }
2787
2788
2789 /**
2790  * Deliver P2P messages to interested clients.
2791  *
2792  * @param sender who sent us the message?
2793  * @param buffer buffer containing messages, can be modified
2794  * @param buffer_size size of the buffer (overall)
2795  * @param offset offset where messages in the buffer start
2796  */
2797 static void
2798 deliver_messages (struct Neighbour *sender,
2799                   const char *buffer, size_t buffer_size, size_t offset)
2800 {
2801   struct GNUNET_MessageHeader *mhp;
2802   struct GNUNET_MessageHeader mh;
2803   uint16_t msize;
2804   int need_align;
2805
2806   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2807     {
2808       if (0 != offset % sizeof (uint16_t))
2809         {
2810           /* outch, need to copy to access header */
2811           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2812           mhp = &mh;
2813         }
2814       else
2815         {
2816           /* can access header directly */
2817           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2818         }
2819       msize = ntohs (mhp->size);
2820       if (msize + offset > buffer_size)
2821         {
2822           /* malformed message, header says it is larger than what
2823              would fit into the overall buffer */
2824           GNUNET_break_op (0);
2825           break;
2826         }
2827 #if HAVE_UNALIGNED_64_ACCESS
2828       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2829 #else
2830       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2831 #endif
2832       if (GNUNET_YES == need_align)
2833         align_and_deliver (sender, &buffer[offset], msize);
2834       else
2835         deliver_message (sender,
2836                          (const struct GNUNET_MessageHeader *)
2837                          &buffer[offset], msize);
2838       offset += msize;
2839     }
2840 }
2841
2842
2843 /**
2844  * We received an encrypted message.  Decrypt, validate and
2845  * pass on to the appropriate clients.
2846  */
2847 static void
2848 handle_encrypted_message (struct Neighbour *n,
2849                           const struct EncryptedMessage *m)
2850 {
2851   size_t size = ntohs (m->header.size);
2852   char buf[size];
2853   struct EncryptedMessage *pt;  /* plaintext */
2854   GNUNET_HashCode ph;
2855   size_t off;
2856   uint32_t snum;
2857   struct GNUNET_TIME_Absolute t;
2858
2859 #if DEBUG_CORE
2860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2861               "Core service receives `%s' request from `%4s'.\n",
2862               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2863 #endif  
2864   /* decrypt */
2865   if (GNUNET_OK !=
2866       do_decrypt (n,
2867                   &m->plaintext_hash,
2868                   &m->sequence_number,
2869                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2870     return;
2871   pt = (struct EncryptedMessage *) buf;
2872
2873   /* validate hash */
2874   GNUNET_CRYPTO_hash (&pt->sequence_number,
2875                       size - ENCRYPTED_HEADER_SIZE, &ph);
2876   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2877     {
2878       /* checksum failed */
2879       GNUNET_break_op (0);
2880       return;
2881     }
2882
2883   /* validate sequence number */
2884   snum = ntohl (pt->sequence_number);
2885   if (n->last_sequence_number_received == snum)
2886     {
2887       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2888                   "Received duplicate message, ignoring.\n");
2889       /* duplicate, ignore */
2890       return;
2891     }
2892   if ((n->last_sequence_number_received > snum) &&
2893       (n->last_sequence_number_received - snum > 32))
2894     {
2895       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2896                   "Received ancient out of sequence message, ignoring.\n");
2897       /* ancient out of sequence, ignore */
2898       return;
2899     }
2900   if (n->last_sequence_number_received > snum)
2901     {
2902       unsigned int rotbit =
2903         1 << (n->last_sequence_number_received - snum - 1);
2904       if ((n->last_packets_bitmap & rotbit) != 0)
2905         {
2906           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2907                       "Received duplicate message, ignoring.\n");
2908           /* duplicate, ignore */
2909           return;
2910         }
2911       n->last_packets_bitmap |= rotbit;
2912     }
2913   if (n->last_sequence_number_received < snum)
2914     {
2915       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2916       n->last_sequence_number_received = snum;
2917     }
2918
2919   /* check timestamp */
2920   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2921   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2922     {
2923       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2924                   _
2925                   ("Message received far too old (%llu ms). Content ignored.\n"),
2926                   GNUNET_TIME_absolute_get_duration (t).value);
2927       return;
2928     }
2929
2930   /* process decrypted message(s) */
2931   update_window (GNUNET_YES,
2932                  &n->available_send_window,
2933                  &n->last_asw_update,
2934                  n->bpm_out);
2935   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2936   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2937                            n->bpm_out_internal_limit);
2938   n->last_activity = GNUNET_TIME_absolute_get ();
2939   off = sizeof (struct EncryptedMessage);
2940   deliver_messages (n, buf, size, off);
2941 }
2942
2943
2944 /**
2945  * Function called by the transport for each received message.
2946  *
2947  * @param cls closure
2948  * @param peer (claimed) identity of the other peer
2949  * @param message the message
2950  * @param latency estimated latency for communicating with the
2951  *             given peer (round-trip)
2952  * @param distance in overlay hops, as given by transport plugin
2953  */
2954 static void
2955 handle_transport_receive (void *cls,
2956                           const struct GNUNET_PeerIdentity *peer,
2957                           const struct GNUNET_MessageHeader *message,
2958                           struct GNUNET_TIME_Relative latency,
2959                           unsigned int distance)
2960 {
2961   struct Neighbour *n;
2962   struct GNUNET_TIME_Absolute now;
2963   int up;
2964   uint16_t type;
2965   uint16_t size;
2966
2967 #if DEBUG_CORE
2968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2969               "Received message of type %u from `%4s', demultiplexing.\n",
2970               ntohs (message->type), GNUNET_i2s (peer));
2971 #endif
2972   n = find_neighbour (peer);
2973   if (n == NULL)
2974     n = create_neighbour (peer);
2975   if (n == NULL)
2976     return;   
2977   n->last_latency = latency;
2978   n->last_distance = distance;
2979   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2980   type = ntohs (message->type);
2981   size = ntohs (message->size);
2982 #if DEBUG_HANDSHAKE
2983   fprintf (stderr,
2984            "Received message of type %u from `%4s'\n",
2985            type,
2986            GNUNET_i2s (peer));
2987 #endif
2988   switch (type)
2989     {
2990     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2991       if (size != sizeof (struct SetKeyMessage))
2992         {
2993           GNUNET_break_op (0);
2994           return;
2995         }
2996       handle_set_key (n, (const struct SetKeyMessage *) message);
2997       break;
2998     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2999       if (size < sizeof (struct EncryptedMessage) +
3000           sizeof (struct GNUNET_MessageHeader))
3001         {
3002           GNUNET_break_op (0);
3003           return;
3004         }
3005       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3006           (n->status != PEER_STATE_KEY_CONFIRMED))
3007         {
3008           GNUNET_break_op (0);
3009           return;
3010         }
3011       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
3012       break;
3013     case GNUNET_MESSAGE_TYPE_CORE_PING:
3014       if (size != sizeof (struct PingMessage))
3015         {
3016           GNUNET_break_op (0);
3017           return;
3018         }
3019       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
3020           (n->status != PEER_STATE_KEY_CONFIRMED))
3021         {
3022 #if DEBUG_CORE
3023           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3024                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3025                       "PING", GNUNET_i2s (&n->peer));
3026 #endif
3027           GNUNET_free_non_null (n->pending_ping);
3028           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
3029           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
3030           return;
3031         }
3032       handle_ping (n, (const struct PingMessage *) message);
3033       break;
3034     case GNUNET_MESSAGE_TYPE_CORE_PONG:
3035       if (size != sizeof (struct PingMessage))
3036         {
3037           GNUNET_break_op (0);
3038           return;
3039         }
3040       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
3041            (n->status != PEER_STATE_KEY_CONFIRMED) )
3042         {
3043 #if DEBUG_CORE
3044           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3045                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
3046                       "PONG", GNUNET_i2s (&n->peer));
3047 #endif
3048           GNUNET_free_non_null (n->pending_pong);
3049           n->pending_pong = GNUNET_malloc (sizeof (struct PingMessage));
3050           memcpy (n->pending_pong, message, sizeof (struct PingMessage));
3051           return;
3052         }
3053       handle_pong (n, (const struct PingMessage *) message);
3054       break;
3055     default:
3056       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3057                   _("Unsupported message of type %u received.\n"), type);
3058       return;
3059     }
3060   if (n->status == PEER_STATE_KEY_CONFIRMED)
3061     {
3062       now = GNUNET_TIME_absolute_get ();
3063       n->last_activity = now;
3064       if (!up)
3065         n->time_established = now;
3066     }
3067 }
3068
3069
3070 /**
3071  * Function that recalculates the bandwidth quota for the
3072  * given neighbour and transmits it to the transport service.
3073  * 
3074  * @param cls neighbour for the quota update
3075  * @param tc context
3076  */
3077 static void
3078 neighbour_quota_update (void *cls,
3079                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3080 {
3081   struct Neighbour *n = cls;
3082   uint32_t q_in;
3083   double pref_rel;
3084   double share;
3085   unsigned long long distributable;
3086   
3087   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3088   /* calculate relative preference among all neighbours;
3089      divides by a bit more to avoid division by zero AND to
3090      account for possibility of new neighbours joining any time 
3091      AND to convert to double... */
3092   pref_rel = n->current_preference / (1.0 + preference_sum);
3093   distributable = 0;
3094   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
3095     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
3096   share = distributable * pref_rel;
3097   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
3098   /* check if we want to disconnect for good due to inactivity */
3099   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3100        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3101     {
3102 #if DEBUG_CORE
3103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3104               "Forcing disconnect of `%4s' due to inactivity (?).\n",
3105               GNUNET_i2s (&n->peer));
3106 #endif
3107       q_in = 0; /* force disconnect */
3108     }
3109   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
3110        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
3111     {
3112       n->bpm_in = q_in;
3113       GNUNET_TRANSPORT_set_quota (transport,
3114                                   &n->peer,
3115                                   n->bpm_in, 
3116                                   n->bpm_out,
3117                                   GNUNET_TIME_UNIT_FOREVER_REL,
3118                                   NULL, NULL);
3119     }
3120   schedule_quota_update (n);
3121 }
3122
3123
3124 /**
3125  * Function called by transport to notify us that
3126  * a peer connected to us (on the network level).
3127  *
3128  * @param cls closure
3129  * @param peer the peer that connected
3130  * @param latency current latency of the connection
3131  * @param distance in overlay hops, as given by transport plugin
3132  */
3133 static void
3134 handle_transport_notify_connect (void *cls,
3135                                  const struct GNUNET_PeerIdentity *peer,
3136                                  struct GNUNET_TIME_Relative latency,
3137                                  unsigned int distance)
3138 {
3139   struct Neighbour *n;
3140   struct GNUNET_TIME_Absolute now;
3141   struct ConnectNotifyMessage cnm;
3142
3143   n = find_neighbour (peer);
3144   if (n != NULL)
3145     {
3146       if (n->is_connected)
3147         {
3148           /* duplicate connect notification!? */
3149           GNUNET_break (0);
3150           return;
3151         }
3152     }
3153   else
3154     {
3155       n = create_neighbour (peer);
3156     }
3157   now = GNUNET_TIME_absolute_get ();
3158   n->is_connected = GNUNET_YES;      
3159   n->last_latency = latency;
3160   n->last_distance = distance;
3161   n->last_asw_update = now;
3162   n->last_arw_update = now;
3163 #if DEBUG_CORE
3164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3165               "Received connection from `%4s'.\n",
3166               GNUNET_i2s (&n->peer));
3167 #endif
3168   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3169   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3170   cnm.distance = htonl (n->last_distance);
3171   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3172   cnm.peer = *peer;
3173   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3174   send_key (n);
3175 }
3176
3177
3178 /**
3179  * Function called by transport telling us that a peer
3180  * disconnected.
3181  *
3182  * @param cls closure
3183  * @param peer the peer that disconnected
3184  */
3185 static void
3186 handle_transport_notify_disconnect (void *cls,
3187                                     const struct GNUNET_PeerIdentity *peer)
3188 {
3189   struct DisconnectNotifyMessage cnm;
3190   struct Neighbour *n;
3191
3192 #if DEBUG_CORE
3193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3194               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3195 #endif
3196   n = find_neighbour (peer);
3197   GNUNET_break (n->is_connected);
3198   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3199   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3200   cnm.peer = *peer;
3201   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3202   n->is_connected = GNUNET_NO;
3203 }
3204
3205
3206 /**
3207  * Last task run during shutdown.  Disconnects us from
3208  * the transport.
3209  */
3210 static void
3211 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3212 {
3213   struct Neighbour *n;
3214   struct Client *c;
3215
3216 #if DEBUG_CORE
3217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3218               "Core service shutting down.\n");
3219 #endif
3220   GNUNET_assert (transport != NULL);
3221   GNUNET_TRANSPORT_disconnect (transport);
3222   transport = NULL;
3223   while (NULL != (n = neighbours))
3224     {
3225       neighbours = n->next;
3226       GNUNET_assert (neighbour_count > 0);
3227       neighbour_count--;
3228       free_neighbour (n);
3229     }
3230   GNUNET_SERVER_notification_context_destroy (notifier);
3231   notifier = NULL;
3232   while (NULL != (c = clients))
3233     handle_client_disconnect (NULL, c->client_handle);
3234   if (my_private_key != NULL)
3235     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3236 }
3237
3238
3239 /**
3240  * Initiate core service.
3241  *
3242  * @param cls closure
3243  * @param s scheduler to use
3244  * @param serv the initialized server
3245  * @param c configuration to use
3246  */
3247 static void
3248 run (void *cls,
3249      struct GNUNET_SCHEDULER_Handle *s,
3250      struct GNUNET_SERVER_Handle *serv,
3251      const struct GNUNET_CONFIGURATION_Handle *c)
3252 {
3253 #if 0
3254   unsigned long long qin;
3255   unsigned long long qout;
3256   unsigned long long tneigh;
3257 #endif
3258   char *keyfile;
3259
3260   sched = s;
3261   cfg = c;  
3262   /* parse configuration */
3263   if (
3264        (GNUNET_OK !=
3265         GNUNET_CONFIGURATION_get_value_number (c,
3266                                                "CORE",
3267                                                "TOTAL_QUOTA_IN",
3268                                                &bandwidth_target_in)) ||
3269        (GNUNET_OK !=
3270         GNUNET_CONFIGURATION_get_value_number (c,
3271                                                "CORE",
3272                                                "TOTAL_QUOTA_OUT",
3273                                                &bandwidth_target_out)) ||
3274        (GNUNET_OK !=
3275         GNUNET_CONFIGURATION_get_value_filename (c,
3276                                                  "GNUNETD",
3277                                                  "HOSTKEY", &keyfile)))
3278     {
3279       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3280                   _
3281                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3282       GNUNET_SCHEDULER_shutdown (s);
3283       return;
3284     }
3285   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3286   GNUNET_free (keyfile);
3287   if (my_private_key == NULL)
3288     {
3289       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3290                   _("Core service could not access hostkey.  Exiting.\n"));
3291       GNUNET_SCHEDULER_shutdown (s);
3292       return;
3293     }
3294   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3295   GNUNET_CRYPTO_hash (&my_public_key,
3296                       sizeof (my_public_key), &my_identity.hashPubKey);
3297   /* setup notification */
3298   server = serv;
3299   notifier = GNUNET_SERVER_notification_context_create (server, 
3300                                                         MAX_NOTIFY_QUEUE);
3301   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3302   /* setup transport connection */
3303   transport = GNUNET_TRANSPORT_connect (sched,
3304                                         cfg,
3305                                         NULL,
3306                                         &handle_transport_receive,
3307                                         &handle_transport_notify_connect,
3308                                         &handle_transport_notify_disconnect);
3309   GNUNET_assert (NULL != transport);
3310   GNUNET_SCHEDULER_add_delayed (sched,
3311                                 GNUNET_TIME_UNIT_FOREVER_REL,
3312                                 &cleaning_task, NULL);
3313   /* process client requests */
3314   GNUNET_SERVER_add_handlers (server, handlers);
3315   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3316               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3317 }
3318
3319
3320
3321 /**
3322  * The main function for the transport service.
3323  *
3324  * @param argc number of arguments from the command line
3325  * @param argv command line arguments
3326  * @return 0 ok, 1 on error
3327  */
3328 int
3329 main (int argc, char *const *argv)
3330 {
3331   return (GNUNET_OK ==
3332           GNUNET_SERVICE_run (argc,
3333                               argv,
3334                               "core",
3335                               GNUNET_SERVICE_OPTION_NONE,
3336                               &run, NULL)) ? 0 : 1;
3337 }
3338
3339 /* end of gnunet-service-core.c */