adding support for outbound message monitoring
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * How many messages do we queue up at most for optional
53  * notifications to a client?  (this can cause notifications
54  * about outgoing messages to be dropped).
55  */
56 #define MAX_NOTIFY_QUEUE 16
57
58 /**
59  * Minimum of bytes per minute (out) to assign to any connected peer.
60  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
61  * sense.
62  */
63 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
64
65 /**
66  * What is the smallest change (in number of bytes per minute)
67  * that we consider significant enough to bother triggering?
68  */
69 #define MIN_BPM_CHANGE 32
70
71 /**
72  * After how much time past the "official" expiration time do
73  * we discard messages?  Should not be zero since we may 
74  * intentionally defer transmission until close to the deadline
75  * and then may be slightly past the deadline due to inaccuracy
76  * in sleep and our own CPU consumption.
77  */
78 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
79
80 /**
81  * What is the maximum delay for a SET_KEY message?
82  */
83 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
84
85 /**
86  * What how long do we wait for SET_KEY confirmation initially?
87  */
88 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
89
90 /**
91  * What is the maximum delay for a PING message?
92  */
93 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
94
95 /**
96  * What is the maximum delay for a PONG message?
97  */
98 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
99
100 /**
101  * How often do we recalculate bandwidth quotas?
102  */
103 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
104
105 /**
106  * What is the priority for a SET_KEY message?
107  */
108 #define SET_KEY_PRIORITY 0xFFFFFF
109
110 /**
111  * What is the priority for a PING message?
112  */
113 #define PING_PRIORITY 0xFFFFFF
114
115 /**
116  * What is the priority for a PONG message?
117  */
118 #define PONG_PRIORITY 0xFFFFFF
119
120 /**
121  * How many messages do we queue per peer at most?
122  */
123 #define MAX_PEER_QUEUE_SIZE 16
124
125 /**
126  * How many non-mandatory messages do we queue per client at most?
127  */
128 #define MAX_CLIENT_QUEUE_SIZE 32
129
130 /**
131  * What is the maximum age of a message for us to consider
132  * processing it?  Note that this looks at the timestamp used
133  * by the other peer, so clock skew between machines does
134  * come into play here.  So this should be picked high enough
135  * so that a little bit of clock skew does not prevent peers
136  * from connecting to us.
137  */
138 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
139
140 /**
141  * What is the maximum size for encrypted messages?  Note that this
142  * number imposes a clear limit on the maximum size of any message.
143  * Set to a value close to 64k but not so close that transports will
144  * have trouble with their headers.
145  */
146 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
147
148
149 /**
150  * State machine for our P2P encryption handshake.  Everyone starts in
151  * "DOWN", if we receive the other peer's key (other peer initiated)
152  * we start in state RECEIVED (since we will immediately send our
153  * own); otherwise we start in SENT.  If we get back a PONG from
154  * within either state, we move up to CONFIRMED (the PONG will always
155  * be sent back encrypted with the key we sent to the other peer).
156  */
157 enum PeerStateMachine
158 {
159   PEER_STATE_DOWN,
160   PEER_STATE_KEY_SENT,
161   PEER_STATE_KEY_RECEIVED,
162   PEER_STATE_KEY_CONFIRMED
163 };
164
165
166 /**
167  * Number of bytes (at the beginning) of "struct EncryptedMessage"
168  * that are NOT encrypted.
169  */
170 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
171
172
173 /**
174  * Encapsulation for encrypted messages exchanged between
175  * peers.  Followed by the actual encrypted data.
176  */
177 struct EncryptedMessage
178 {
179   /**
180    * Message type is either CORE_ENCRYPTED_MESSAGE.
181    */
182   struct GNUNET_MessageHeader header;
183
184   /**
185    * Always zero.
186    */
187   uint32_t reserved GNUNET_PACKED;
188
189   /**
190    * Hash of the plaintext, used to verify message integrity;
191    * ALSO used as the IV for the symmetric cipher!  Everything
192    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
193    * must be set to the offset of the next field.
194    */
195   GNUNET_HashCode plaintext_hash;
196
197   /**
198    * Sequence number, in network byte order.  This field
199    * must be the first encrypted/decrypted field and the
200    * first byte that is hashed for the plaintext hash.
201    */
202   uint32_t sequence_number GNUNET_PACKED;
203
204   /**
205    * Desired bandwidth (how much we should send to this
206    * peer / how much is the sender willing to receive),
207    * in bytes per minute.
208    */
209   uint32_t inbound_bpm_limit GNUNET_PACKED;
210
211   /**
212    * Timestamp.  Used to prevent reply of ancient messages
213    * (recent messages are caught with the sequence number).
214    */
215   struct GNUNET_TIME_AbsoluteNBO timestamp;
216
217 };
218
219 /**
220  * We're sending an (encrypted) PING to the other peer to check if he
221  * can decrypt.  The other peer should respond with a PONG with the
222  * same content, except this time encrypted with the receiver's key.
223  */
224 struct PingMessage
225 {
226   /**
227    * Message type is either CORE_PING or CORE_PONG.
228    */
229   struct GNUNET_MessageHeader header;
230
231   /**
232    * Random number chosen to make reply harder.
233    */
234   uint32_t challenge GNUNET_PACKED;
235
236   /**
237    * Intended target of the PING, used primarily to check
238    * that decryption actually worked.
239    */
240   struct GNUNET_PeerIdentity target;
241 };
242
243
244 /**
245  * Message transmitted to set (or update) a session key.
246  */
247 struct SetKeyMessage
248 {
249
250   /**
251    * Message type is either CORE_SET_KEY.
252    */
253   struct GNUNET_MessageHeader header;
254
255   /**
256    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
257    */
258   int32_t sender_status GNUNET_PACKED;
259
260   /**
261    * Purpose of the signature, will be
262    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
263    */
264   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
265
266   /**
267    * At what time was this key created?
268    */
269   struct GNUNET_TIME_AbsoluteNBO creation_time;
270
271   /**
272    * The encrypted session key.
273    */
274   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
275
276   /**
277    * Who is the intended recipient?
278    */
279   struct GNUNET_PeerIdentity target;
280
281   /**
282    * Signature of the stuff above (starting at purpose).
283    */
284   struct GNUNET_CRYPTO_RsaSignature signature;
285
286 };
287
288
289 /**
290  * Message waiting for transmission. This struct
291  * is followed by the actual content of the message.
292  */
293 struct MessageEntry
294 {
295
296   /**
297    * We keep messages in a linked list (for now).
298    */
299   struct MessageEntry *next;
300
301   /**
302    * By when are we supposed to transmit this message?
303    */
304   struct GNUNET_TIME_Absolute deadline;
305
306   /**
307    * How important is this message to us?
308    */
309   unsigned int priority;
310
311   /**
312    * How long is the message? (number of bytes following
313    * the "struct MessageEntry", but not including the
314    * size of "struct MessageEntry" itself!)
315    */
316   uint16_t size;
317
318   /**
319    * Was this message selected for transmission in the
320    * current round? GNUNET_YES or GNUNET_NO.
321    */
322   int16_t do_transmit;
323
324 };
325
326
327 struct Neighbour
328 {
329   /**
330    * We keep neighbours in a linked list (for now).
331    */
332   struct Neighbour *next;
333
334   /**
335    * Unencrypted messages destined for this peer.
336    */
337   struct MessageEntry *messages;
338
339   /**
340    * Head of the batched, encrypted message queue (already ordered,
341    * transmit starting with the head).
342    */
343   struct MessageEntry *encrypted_head;
344
345   /**
346    * Tail of the batched, encrypted message queue (already ordered,
347    * append new messages to tail)
348    */
349   struct MessageEntry *encrypted_tail;
350
351   /**
352    * Handle for pending requests for transmission to this peer
353    * with the transport service.  NULL if no request is pending.
354    */
355   struct GNUNET_TRANSPORT_TransmitHandle *th;
356
357   /**
358    * Public key of the neighbour, NULL if we don't have it yet.
359    */
360   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
361
362   /**
363    * We received a PING message before we got the "public_key"
364    * (or the SET_KEY).  We keep it here until we have a key
365    * to decrypt it.  NULL if no PING is pending.
366    */
367   struct PingMessage *pending_ping;
368
369   /**
370    * Non-NULL if we are currently looking up HELLOs for this peer.
371    * for this peer.
372    */
373   struct GNUNET_PEERINFO_IteratorContext *pitr;
374
375   /**
376    * SetKeyMessage to transmit, NULL if we are not currently trying
377    * to send one.
378    */
379   struct SetKeyMessage *skm;
380
381   /**
382    * Identity of the neighbour.
383    */
384   struct GNUNET_PeerIdentity peer;
385
386   /**
387    * Key we use to encrypt our messages for the other peer
388    * (initialized by us when we do the handshake).
389    */
390   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
391
392   /**
393    * Key we use to decrypt messages from the other peer
394    * (given to us by the other peer during the handshake).
395    */
396   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
397
398   /**
399    * ID of task used for re-trying plaintext scheduling.
400    */
401   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
402
403   /**
404    * ID of task used for re-trying SET_KEY and PING message.
405    */
406   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
407
408   /**
409    * ID of task used for updating bandwidth quota for this neighbour.
410    */
411   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
412
413   /**
414    * At what time did we generate our encryption key?
415    */
416   struct GNUNET_TIME_Absolute encrypt_key_created;
417
418   /**
419    * At what time did the other peer generate the decryption key?
420    */
421   struct GNUNET_TIME_Absolute decrypt_key_created;
422
423   /**
424    * At what time did we initially establish (as in, complete session
425    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
426    */
427   struct GNUNET_TIME_Absolute time_established;
428
429   /**
430    * At what time did we last receive an encrypted message from the
431    * other peer?  Should be zero if status != KEY_CONFIRMED.
432    */
433   struct GNUNET_TIME_Absolute last_activity;
434
435   /**
436    * Last latency observed from this peer.
437    */
438   struct GNUNET_TIME_Relative last_latency;
439
440   /**
441    * At what frequency are we currently re-trying SET_KEY messages?
442    */
443   struct GNUNET_TIME_Relative set_key_retry_frequency;
444
445   /**
446    * Time of our last update to the "available_send_window".
447    */
448   struct GNUNET_TIME_Absolute last_asw_update;
449
450   /**
451    * Time of our last update to the "available_recv_window".
452    */
453   struct GNUNET_TIME_Absolute last_arw_update;
454
455   /**
456    * Number of bytes that we are eligible to transmit to this
457    * peer at this point.  Incremented every minute by max_out_bpm,
458    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
459    * bandwidth-hogs are sampled at a frequency of about 78s!);
460    * may get negative if we have VERY high priority content.
461    */
462   long long available_send_window; 
463
464   /**
465    * How much downstream capacity of this peer has been reserved for
466    * our traffic?  (Our clients can request that a certain amount of
467    * bandwidth is available for replies to them; this value is used to
468    * make sure that this reserved amount of bandwidth is actually
469    * available).
470    */
471   long long available_recv_window; 
472
473   /**
474    * How valueable were the messages of this peer recently?
475    */
476   unsigned long long current_preference;
477
478   /**
479    * Bit map indicating which of the 32 sequence numbers before the last
480    * were received (good for accepting out-of-order packets and
481    * estimating reliability of the connection)
482    */
483   unsigned int last_packets_bitmap;
484
485   /**
486    * Number of messages in the message queue for this peer.
487    */
488   unsigned int message_queue_size;
489
490   /**
491    * last sequence number received on this connection (highest)
492    */
493   uint32_t last_sequence_number_received;
494
495   /**
496    * last sequence number transmitted
497    */
498   uint32_t last_sequence_number_sent;
499
500   /**
501    * Available bandwidth in for this peer (current target).
502    */
503   uint32_t bpm_in;
504
505   /**
506    * Available bandwidth out for this peer (current target).
507    */
508   uint32_t bpm_out;
509
510   /**
511    * Internal bandwidth limit set for this peer (initially
512    * typically set to "-1").  "bpm_out" is MAX of
513    * "bpm_out_internal_limit" and "bpm_out_external_limit".
514    */
515   uint32_t bpm_out_internal_limit;
516
517   /**
518    * External bandwidth limit set for this peer by the
519    * peer that we are communicating with.  "bpm_out" is MAX of
520    * "bpm_out_internal_limit" and "bpm_out_external_limit".
521    */
522   uint32_t bpm_out_external_limit;
523
524   /**
525    * What was our PING challenge number (for this peer)?
526    */
527   uint32_t ping_challenge;
528
529   /**
530    * What was the last distance to this peer as reported by the transports?
531    */
532   uint32_t last_distance;
533
534   /**
535    * What is our connection status?
536    */
537   enum PeerStateMachine status;
538
539 };
540
541
542 /**
543  * Data structure for each client connected to the core service.
544  */
545 struct Client
546 {
547   /**
548    * Clients are kept in a linked list.
549    */
550   struct Client *next;
551
552   /**
553    * Handle for the client with the server API.
554    */
555   struct GNUNET_SERVER_Client *client_handle;
556
557   /**
558    * Array of the types of messages this peer cares
559    * about (with "tcnt" entries).  Allocated as part
560    * of this client struct, do not free!
561    */
562   uint16_t *types;
563
564   /**
565    * Options for messages this client cares about,
566    * see GNUNET_CORE_OPTION_ values.
567    */
568   uint32_t options;
569
570   /**
571    * Number of types of incoming messages this client
572    * specifically cares about.  Size of the "types" array.
573    */
574   unsigned int tcnt;
575
576 };
577
578
579 /**
580  * Our public key.
581  */
582 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
583
584 /**
585  * Our identity.
586  */
587 static struct GNUNET_PeerIdentity my_identity;
588
589 /**
590  * Our private key.
591  */
592 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
593
594 /**
595  * Our scheduler.
596  */
597 struct GNUNET_SCHEDULER_Handle *sched;
598
599 /**
600  * Our configuration.
601  */
602 const struct GNUNET_CONFIGURATION_Handle *cfg;
603
604 /**
605  * Our server.
606  */
607 static struct GNUNET_SERVER_Handle *server;
608
609 /**
610  * Transport service.
611  */
612 static struct GNUNET_TRANSPORT_Handle *transport;
613
614 /**
615  * Linked list of our clients.
616  */
617 static struct Client *clients;
618
619 /**
620  * Context for notifications we need to send to our clients.
621  */
622 static struct GNUNET_SERVER_NotificationContext *notifier;
623
624 /**
625  * We keep neighbours in a linked list (for now).
626  */
627 static struct Neighbour *neighbours;
628
629 /**
630  * Sum of all preferences among all neighbours.
631  */
632 static unsigned long long preference_sum;
633
634 /**
635  * Total number of neighbours we have.
636  */
637 static unsigned int neighbour_count;
638
639 /**
640  * How much inbound bandwidth are we supposed to be using?
641  */
642 static unsigned long long bandwidth_target_in;
643
644 /**
645  * How much outbound bandwidth are we supposed to be using?
646  */
647 static unsigned long long bandwidth_target_out;
648
649
650
651 /**
652  * A preference value for a neighbour was update.  Update
653  * the preference sum accordingly.
654  *
655  * @param inc how much was a preference value increased?
656  */
657 static void
658 update_preference_sum (unsigned long long inc)
659 {
660   struct Neighbour *n;
661   unsigned long long os;
662
663   os = preference_sum;
664   preference_sum += inc;
665   if (preference_sum >= os)
666     return; /* done! */
667   /* overflow! compensate by cutting all values in half! */
668   preference_sum = 0;
669   n = neighbours;
670   while (n != NULL)
671     {
672       n->current_preference /= 2;
673       preference_sum += n->current_preference;
674       n = n->next;
675     }    
676 }
677
678
679 /**
680  * Recalculate the number of bytes we expect to
681  * receive or transmit in a given window.
682  *
683  * @param force force an update now (even if not much time has passed)
684  * @param window pointer to the byte counter (updated)
685  * @param ts pointer to the timestamp (updated)
686  * @param bpm number of bytes per minute that should
687  *        be added to the window.
688  */
689 static void
690 update_window (int force,
691                long long *window,
692                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
693 {
694   struct GNUNET_TIME_Relative since;
695
696   since = GNUNET_TIME_absolute_get_duration (*ts);
697   if ( (force == GNUNET_NO) &&
698        (since.value < 60 * 1000) )
699     return;                     /* not even a minute has passed */
700   *ts = GNUNET_TIME_absolute_get ();
701   *window += (bpm * since.value) / 60 / 1000;
702   if (*window > MAX_WINDOW_TIME * bpm)
703     *window = MAX_WINDOW_TIME * bpm;
704 }
705
706
707 /**
708  * Find the entry for the given neighbour.
709  *
710  * @param peer identity of the neighbour
711  * @return NULL if we are not connected, otherwise the
712  *         neighbour's entry.
713  */
714 static struct Neighbour *
715 find_neighbour (const struct GNUNET_PeerIdentity *peer)
716 {
717   struct Neighbour *ret;
718
719   ret = neighbours;
720   while ((ret != NULL) &&
721          (0 != memcmp (&ret->peer,
722                        peer, sizeof (struct GNUNET_PeerIdentity))))
723     ret = ret->next;
724   return ret;
725 }
726
727
728 /**
729  * Send a message to one of our clients.
730  *
731  * @param client target for the message
732  * @param msg message to transmit
733  * @param can_drop could this message be dropped if the
734  *        client's queue is getting too large?
735  */
736 static void
737 send_to_client (struct Client *client,
738                 const struct GNUNET_MessageHeader *msg, 
739                 int can_drop)
740 {
741 #if DEBUG_CORE_CLIENT
742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743               "Preparing to send message of type %u to client.\n",
744               ntohs (msg->type));
745 #endif  
746   GNUNET_SERVER_notification_context_unicast (notifier,
747                                               client->client_handle,
748                                               msg,
749                                               can_drop);
750 }
751
752
753 /**
754  * Send a message to all of our current clients that have
755  * the right options set.
756  * 
757  * @param msg message to multicast
758  * @param can_drop can this message be discarded if the queue is too long
759  * @param options mask to use 
760  */
761 static void
762 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
763                      int can_drop,
764                      int options)
765 {
766   struct Client *c;
767
768   c = clients;
769   while (c != NULL)
770     {
771       if (0 != (c->options & options))
772         send_to_client (c, msg, can_drop);
773       c = c->next;
774     }
775 }
776
777
778 /**
779  * Handle CORE_INIT request.
780  */
781 static void
782 handle_client_init (void *cls,
783                     struct GNUNET_SERVER_Client *client,
784                     const struct GNUNET_MessageHeader *message)
785 {
786   const struct InitMessage *im;
787   struct InitReplyMessage irm;
788   struct Client *c;
789   uint16_t msize;
790   const uint16_t *types;
791   struct Neighbour *n;
792   struct ConnectNotifyMessage cnm;
793
794 #if DEBUG_CORE_CLIENT
795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796               "Client connecting to core service with `%s' message\n",
797               "INIT");
798 #endif
799   /* check that we don't have an entry already */
800   c = clients;
801   while (c != NULL)
802     {
803       if (client == c->client_handle)
804         {
805           GNUNET_break (0);
806           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
807           return;
808         }
809       c = c->next;
810     }
811   msize = ntohs (message->size);
812   if (msize < sizeof (struct InitMessage))
813     {
814       GNUNET_break (0);
815       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
816       return;
817     }
818   GNUNET_SERVER_notification_context_add (notifier, client);
819   im = (const struct InitMessage *) message;
820   types = (const uint16_t *) &im[1];
821   msize -= sizeof (struct InitMessage);
822   c = GNUNET_malloc (sizeof (struct Client) + msize);
823   c->client_handle = client;
824   c->next = clients;
825   clients = c;
826   memcpy (&c[1], types, msize);
827   c->types = (uint16_t *) & c[1];
828   c->options = ntohl (im->options);
829   c->tcnt = msize / sizeof (uint16_t);
830   /* send init reply message */
831   irm.header.size = htons (sizeof (struct InitReplyMessage));
832   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
833   irm.reserved = htonl (0);
834   memcpy (&irm.publicKey,
835           &my_public_key,
836           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
837 #if DEBUG_CORE_CLIENT
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Sending `%s' message to client.\n", "INIT_REPLY");
840 #endif
841   send_to_client (c, &irm.header, GNUNET_NO);
842   /* notify new client about existing neighbours */
843   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
844   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
845   n = neighbours;
846   while (n != NULL)
847     {
848 #if DEBUG_CORE_CLIENT
849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
851 #endif
852       cnm.distance = htonl (n->last_distance);
853       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
854       cnm.peer = n->peer;
855       send_to_client (c, &cnm.header, GNUNET_NO);
856       n = n->next;
857     }
858 }
859
860
861 /**
862  * A client disconnected, clean up.
863  *
864  * @param cls closure
865  * @param client identification of the client
866  */
867 static void
868 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
869 {
870   struct Client *pos;
871   struct Client *prev;
872
873   if (client == NULL)
874     return;
875 #if DEBUG_CORE_CLIENT
876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877               "Client has disconnected from core service.\n");
878 #endif
879   prev = NULL;
880   pos = clients;
881   while (pos != NULL)
882     {
883       if (client == pos->client_handle)
884         {
885           if (prev == NULL)
886             clients = pos->next;
887           else
888             prev->next = pos->next;
889           GNUNET_free (pos);
890           return;
891         }
892       prev = pos;
893       pos = pos->next;
894     }
895   /* client never sent INIT */
896 }
897
898
899 /**
900  * Handle REQUEST_INFO request.
901  */
902 static void
903 handle_client_request_info (void *cls,
904                             struct GNUNET_SERVER_Client *client,
905                             const struct GNUNET_MessageHeader *message)
906 {
907   const struct RequestInfoMessage *rcm;
908   struct Neighbour *n;
909   struct ConfigurationInfoMessage cim;
910   int reserv;
911   unsigned long long old_preference;
912   struct GNUNET_SERVER_TransmitContext *tc;
913
914 #if DEBUG_CORE_CLIENT
915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
916               "Core service receives `%s' request.\n", "REQUEST_INFO");
917 #endif
918   rcm = (const struct RequestInfoMessage *) message;
919   n = find_neighbour (&rcm->peer);
920   memset (&cim, 0, sizeof (cim));
921   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
922     {
923       update_window (GNUNET_YES,
924                      &n->available_send_window,
925                      &n->last_asw_update,
926                      n->bpm_out);
927       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
928       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
929                                n->bpm_out_external_limit);
930       reserv = ntohl (rcm->reserve_inbound);
931       if (reserv < 0)
932         {
933           n->available_recv_window += reserv;
934         }
935       else if (reserv > 0)
936         {
937           update_window (GNUNET_NO,
938                          &n->available_recv_window,
939                          &n->last_arw_update, n->bpm_in);
940           if (n->available_recv_window < reserv)
941             reserv = n->available_recv_window;
942           n->available_recv_window -= reserv;
943         }
944       old_preference = n->current_preference;
945       n->current_preference += GNUNET_ntohll(rcm->preference_change);
946       if (old_preference > n->current_preference) 
947         {
948           /* overflow; cap at maximum value */
949           n->current_preference = (unsigned long long) -1;
950         }
951       update_preference_sum (n->current_preference - old_preference);
952       cim.reserved_amount = htonl (reserv);
953       cim.bpm_in = htonl (n->bpm_in);
954       cim.bpm_out = htonl (n->bpm_out);
955       cim.preference = n->current_preference;
956     }
957   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
958   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
959   cim.peer = rcm->peer;
960
961 #if DEBUG_CORE_CLIENT
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
964 #endif
965   tc = GNUNET_SERVER_transmit_context_create (client);
966   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
967   GNUNET_SERVER_transmit_context_run (tc,
968                                       GNUNET_TIME_UNIT_FOREVER_REL);
969 }
970
971
972 /**
973  * Check if we have encrypted messages for the specified neighbour
974  * pending, and if so, check with the transport about sending them
975  * out.
976  *
977  * @param n neighbour to check.
978  */
979 static void process_encrypted_neighbour_queue (struct Neighbour *n);
980
981
982 /**
983  * Function called when the transport service is ready to
984  * receive an encrypted message for the respective peer
985  *
986  * @param cls neighbour to use message from
987  * @param size number of bytes we can transmit
988  * @param buf where to copy the message
989  * @return number of bytes transmitted
990  */
991 static size_t
992 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
993 {
994   struct Neighbour *n = cls;
995   struct MessageEntry *m;
996   size_t ret;
997   char *cbuf;
998
999   n->th = NULL;
1000   GNUNET_assert (NULL != (m = n->encrypted_head));
1001   n->encrypted_head = m->next;
1002   if (m->next == NULL)
1003     n->encrypted_tail = NULL;
1004   ret = 0;
1005   cbuf = buf;
1006   if (buf != NULL)
1007     {
1008       GNUNET_assert (size >= m->size);
1009       memcpy (cbuf, &m[1], m->size);
1010       ret = m->size;
1011       n->available_send_window -= m->size;
1012       process_encrypted_neighbour_queue (n);
1013 #if DEBUG_CORE
1014       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1016                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1017                   ret, GNUNET_i2s (&n->peer));
1018 #endif
1019     }
1020   else
1021     {
1022       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1023                   "Transmission for message of type %u and size %u failed\n",
1024                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1025                   m->size);
1026     }
1027   GNUNET_free (m);
1028   return ret;
1029 }
1030
1031
1032 /**
1033  * Check if we have plaintext messages for the specified neighbour
1034  * pending, and if so, consider batching and encrypting them (and
1035  * then trigger processing of the encrypted queue if needed).
1036  *
1037  * @param n neighbour to check.
1038  */
1039 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1040
1041
1042 /**
1043  * Check if we have encrypted messages for the specified neighbour
1044  * pending, and if so, check with the transport about sending them
1045  * out.
1046  *
1047  * @param n neighbour to check.
1048  */
1049 static void
1050 process_encrypted_neighbour_queue (struct Neighbour *n)
1051 {
1052   struct MessageEntry *m;
1053  
1054   if (n->th != NULL)
1055     return;  /* request already pending */
1056   if (n->encrypted_head == NULL)
1057     {
1058       /* encrypted queue empty, try plaintext instead */
1059       process_plaintext_neighbour_queue (n);
1060       return;
1061     }
1062 #if DEBUG_CORE
1063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1065               n->encrypted_head->size,
1066               GNUNET_i2s (&n->peer),
1067               GNUNET_TIME_absolute_get_remaining (n->
1068                                                   encrypted_head->deadline).
1069               value);
1070 #endif
1071   n->th =
1072     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1073                                             n->encrypted_head->size,
1074                                             n->encrypted_head->priority,
1075                                             GNUNET_TIME_absolute_get_remaining
1076                                             (n->encrypted_head->deadline),
1077                                             &notify_encrypted_transmit_ready,
1078                                             n);
1079   if (n->th == NULL)
1080     {
1081       /* message request too large (oops) */
1082       GNUNET_break (0);
1083       /* discard encrypted message */
1084       GNUNET_assert (NULL != (m = n->encrypted_head));
1085       n->encrypted_head = m->next;
1086       if (m->next == NULL)
1087         n->encrypted_tail = NULL;
1088       GNUNET_free (m);
1089       process_encrypted_neighbour_queue (n);
1090     }
1091 }
1092
1093
1094 /**
1095  * Decrypt size bytes from in and write the result to out.  Use the
1096  * key for inbound traffic of the given neighbour.  This function does
1097  * NOT do any integrity-checks on the result.
1098  *
1099  * @param n neighbour we are receiving from
1100  * @param iv initialization vector to use
1101  * @param in ciphertext
1102  * @param out plaintext
1103  * @param size size of in/out
1104  * @return GNUNET_OK on success
1105  */
1106 static int
1107 do_decrypt (struct Neighbour *n,
1108             const GNUNET_HashCode * iv,
1109             const void *in, void *out, size_t size)
1110 {
1111   if (size != (uint16_t) size)
1112     {
1113       GNUNET_break (0);
1114       return GNUNET_NO;
1115     }
1116   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1117       (n->status != PEER_STATE_KEY_CONFIRMED))
1118     {
1119       GNUNET_break_op (0);
1120       return GNUNET_SYSERR;
1121     }
1122   if (size !=
1123       GNUNET_CRYPTO_aes_decrypt (in,
1124                                  (uint16_t) size,
1125                                  &n->decrypt_key,
1126                                  (const struct
1127                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1128                                  out))
1129     {
1130       GNUNET_break (0);
1131       return GNUNET_SYSERR;
1132     }
1133 #if DEBUG_CORE
1134   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135               "Decrypted %u bytes from `%4s' using key %u\n",
1136               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1137 #endif
1138   return GNUNET_OK;
1139 }
1140
1141
1142 /**
1143  * Encrypt size bytes from in and write the result to out.  Use the
1144  * key for outbound traffic of the given neighbour.
1145  *
1146  * @param n neighbour we are sending to
1147  * @param iv initialization vector to use
1148  * @param in ciphertext
1149  * @param out plaintext
1150  * @param size size of in/out
1151  * @return GNUNET_OK on success
1152  */
1153 static int
1154 do_encrypt (struct Neighbour *n,
1155             const GNUNET_HashCode * iv,
1156             const void *in, void *out, size_t size)
1157 {
1158   if (size != (uint16_t) size)
1159     {
1160       GNUNET_break (0);
1161       return GNUNET_NO;
1162     }
1163   GNUNET_assert (size ==
1164                  GNUNET_CRYPTO_aes_encrypt (in,
1165                                             (uint16_t) size,
1166                                             &n->encrypt_key,
1167                                             (const struct
1168                                              GNUNET_CRYPTO_AesInitializationVector
1169                                              *) iv, out));
1170 #if DEBUG_CORE
1171   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172               "Encrypted %u bytes for `%4s' using key %u\n", size,
1173               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1174 #endif
1175   return GNUNET_OK;
1176 }
1177
1178
1179 /**
1180  * Select messages for transmission.  This heuristic uses a combination
1181  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1182  * and priority-based discard (in case no feasible schedule exist) and
1183  * speculative optimization (defer any kind of transmission until
1184  * we either create a batch of significant size, 25% of max, or until
1185  * we are close to a deadline).  Furthermore, when scheduling the
1186  * heuristic also packs as many messages into the batch as possible,
1187  * starting with those with the earliest deadline.  Yes, this is fun.
1188  *
1189  * @param n neighbour to select messages from
1190  * @param size number of bytes to select for transmission
1191  * @param retry_time set to the time when we should try again
1192  *        (only valid if this function returns zero)
1193  * @return number of bytes selected, or 0 if we decided to
1194  *         defer scheduling overall; in that case, retry_time is set.
1195  */
1196 static size_t
1197 select_messages (struct Neighbour *n,
1198                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1199 {
1200   struct MessageEntry *pos;
1201   struct MessageEntry *min;
1202   struct MessageEntry *last;
1203   unsigned int min_prio;
1204   struct GNUNET_TIME_Absolute t;
1205   struct GNUNET_TIME_Absolute now;
1206   uint64_t delta;
1207   uint64_t avail;
1208   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1209   size_t off;
1210   int discard_low_prio;
1211
1212   GNUNET_assert (NULL != n->messages);
1213   now = GNUNET_TIME_absolute_get ();
1214   /* last entry in linked list of messages processed */
1215   last = NULL;
1216   /* should we remove the entry with the lowest
1217      priority from consideration for scheduling at the
1218      end of the loop? */
1219   discard_low_prio = GNUNET_YES;
1220   while (GNUNET_YES == discard_low_prio)
1221     {
1222       min = NULL;
1223       min_prio = -1;
1224       discard_low_prio = GNUNET_NO;
1225       /* calculate number of bytes available for transmission at time "t" */
1226       update_window (GNUNET_NO,
1227                      &n->available_send_window,
1228                      &n->last_asw_update,
1229                      n->bpm_out);
1230       avail = n->available_send_window;
1231       t = n->last_asw_update;
1232       /* how many bytes have we (hypothetically) scheduled so far */
1233       off = 0;
1234       /* maximum time we can wait before transmitting anything
1235          and still make all of our deadlines */
1236       slack = -1;
1237
1238       pos = n->messages;
1239       /* note that we use "*2" here because we want to look
1240          a bit further into the future; much more makes no
1241          sense since new message might be scheduled in the
1242          meantime... */
1243       while ((pos != NULL) && (off < size * 2))
1244         {
1245           if (pos->do_transmit == GNUNET_YES)
1246             {
1247               /* already removed from consideration */
1248               pos = pos->next;
1249               continue;
1250             }
1251           if (discard_low_prio == GNUNET_NO)
1252             {
1253               delta = pos->deadline.value;
1254               if (delta < t.value)
1255                 delta = 0;
1256               else
1257                 delta = t.value - delta;
1258               avail += delta * n->bpm_out / 1000 / 60;
1259               if (avail < pos->size)
1260                 {
1261                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1262                 }
1263               else
1264                 {
1265                   avail -= pos->size;
1266                   /* update slack, considering both its absolute deadline
1267                      and relative deadlines caused by other messages
1268                      with their respective load */
1269                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1270                   if (pos->deadline.value < now.value)
1271                     slack = 0;
1272                   else
1273                     slack =
1274                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1275                 }
1276             }
1277           off += pos->size;
1278           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1279           if (pos->priority <= min_prio)
1280             {
1281               /* update min for discard */
1282               min_prio = pos->priority;
1283               min = pos;
1284             }
1285           pos = pos->next;
1286         }
1287       if (discard_low_prio)
1288         {
1289           GNUNET_assert (min != NULL);
1290           /* remove lowest-priority entry from consideration */
1291           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1292         }
1293       last = pos;
1294     }
1295   /* guard against sending "tiny" messages with large headers without
1296      urgent deadlines */
1297   if ( (slack > 1000) && (size > 4 * off) )
1298     {
1299       /* less than 25% of message would be filled with
1300          deadlines still being met if we delay by one
1301          second or more; so just wait for more data */
1302       retry_time->value = slack / 2;
1303       /* reset do_transmit values for next time */
1304       while (pos != last)
1305         {
1306           pos->do_transmit = GNUNET_NO;
1307           pos = pos->next;
1308         }
1309       return 0;
1310     }
1311   /* select marked messages (up to size) for transmission */
1312   off = 0;
1313   pos = n->messages;
1314   while (pos != last)
1315     {
1316       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1317         {
1318           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1319           off += pos->size;
1320           size -= pos->size;
1321         }
1322       else
1323         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1324       pos = pos->next;
1325     }
1326 #if DEBUG_CORE
1327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1328               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1329               off, GNUNET_i2s (&n->peer));
1330 #endif
1331   return off;
1332 }
1333
1334
1335 /**
1336  * Batch multiple messages into a larger buffer.
1337  *
1338  * @param n neighbour to take messages from
1339  * @param buf target buffer
1340  * @param size size of buf
1341  * @param deadline set to transmission deadline for the result
1342  * @param retry_time set to the time when we should try again
1343  *        (only valid if this function returns zero)
1344  * @param priority set to the priority of the batch
1345  * @return number of bytes written to buf (can be zero)
1346  */
1347 static size_t
1348 batch_message (struct Neighbour *n,
1349                char *buf,
1350                size_t size,
1351                struct GNUNET_TIME_Absolute *deadline,
1352                struct GNUNET_TIME_Relative *retry_time,
1353                unsigned int *priority)
1354 {
1355   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1356   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
1357   struct MessageEntry *pos;
1358   struct MessageEntry *prev;
1359   struct MessageEntry *next;
1360   size_t ret;
1361   
1362   ret = 0;
1363   *priority = 0;
1364   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1365   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1366   if (0 == select_messages (n, size, retry_time))
1367     {
1368       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1369                   "No messages selected, will try again in %llu ms\n",
1370                   retry_time->value);
1371       return 0;
1372     }
1373   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
1374   ntm->distance = htonl (n->last_distance);
1375   ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
1376   ntm->peer = n->peer;
1377   
1378   pos = n->messages;
1379   prev = NULL;
1380   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1381     {
1382       next = pos->next;
1383       if (GNUNET_YES == pos->do_transmit)
1384         {
1385           GNUNET_assert (pos->size <= size);
1386           /* do notifications */
1387           /* FIXME: track if we have *any* client that wants
1388              full notifications and only do this if that is
1389              actually true */
1390           if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
1391             {
1392               memcpy (&ntm[1], &pos[1], pos->size);
1393               ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1394                                         sizeof (struct GNUNET_MessageHeader));
1395               send_to_all_clients (&ntm->header,
1396                                    GNUNET_YES,
1397                                    GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
1398             }
1399           else
1400             {
1401               /* message too large for 'full' notifications, we do at
1402                  least the 'hdr' type */
1403               memcpy (&ntm[1],
1404                       &pos[1],
1405                       sizeof (struct GNUNET_MessageHeader));
1406             }
1407           ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
1408                                     pos->size);
1409           send_to_all_clients (&ntm->header,
1410                                GNUNET_YES,
1411                                GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
1412           /* copy for encrypted transmission */
1413           memcpy (&buf[ret], &pos[1], pos->size);
1414           ret += pos->size;
1415           size -= pos->size;
1416           *priority += pos->priority;
1417 #if DEBUG_CORE
1418           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419                       "Adding plaintext message with deadline %llu ms to batch\n",
1420                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1421 #endif
1422           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1423           GNUNET_free (pos);
1424           if (prev == NULL)
1425             n->messages = next;
1426           else
1427             prev->next = next;
1428         }
1429       else
1430         {
1431           prev = pos;
1432         }
1433       pos = next;
1434     }
1435 #if DEBUG_CORE
1436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437               "Deadline for message batch is %llu ms\n",
1438               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1439 #endif
1440   return ret;
1441 }
1442
1443
1444 /**
1445  * Remove messages with deadlines that have long expired from
1446  * the queue.
1447  *
1448  * @param n neighbour to inspect
1449  */
1450 static void
1451 discard_expired_messages (struct Neighbour *n)
1452 {
1453   struct MessageEntry *prev;
1454   struct MessageEntry *next;
1455   struct MessageEntry *pos;
1456   struct GNUNET_TIME_Absolute now;
1457   struct GNUNET_TIME_Relative delta;
1458
1459   now = GNUNET_TIME_absolute_get ();
1460   prev = NULL;
1461   pos = n->messages;
1462   while (pos != NULL) 
1463     {
1464       next = pos->next;
1465       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1466       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1467         {
1468 #if DEBUG_CORE
1469           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1470                       "Message is %llu ms past due, discarding.\n",
1471                       delta.value);
1472 #endif
1473           if (prev == NULL)
1474             n->messages = next;
1475           else
1476             prev->next = next;
1477           GNUNET_free (pos);
1478         }
1479       else
1480         prev = pos;
1481       pos = next;
1482     }
1483 }
1484
1485
1486 /**
1487  * Signature of the main function of a task.
1488  *
1489  * @param cls closure
1490  * @param tc context information (why was this task triggered now)
1491  */
1492 static void
1493 retry_plaintext_processing (void *cls,
1494                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1495 {
1496   struct Neighbour *n = cls;
1497
1498   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1499   process_plaintext_neighbour_queue (n);
1500 }
1501
1502
1503 /**
1504  * Send our key (and encrypted PING) to the other peer.
1505  *
1506  * @param n the other peer
1507  */
1508 static void send_key (struct Neighbour *n);
1509
1510
1511 /**
1512  * Check if we have plaintext messages for the specified neighbour
1513  * pending, and if so, consider batching and encrypting them (and
1514  * then trigger processing of the encrypted queue if needed).
1515  *
1516  * @param n neighbour to check.
1517  */
1518 static void
1519 process_plaintext_neighbour_queue (struct Neighbour *n)
1520 {
1521   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1522   size_t used;
1523   size_t esize;
1524   struct EncryptedMessage *em;  /* encrypted message */
1525   struct EncryptedMessage *ph;  /* plaintext header */
1526   struct MessageEntry *me;
1527   unsigned int priority;
1528   struct GNUNET_TIME_Absolute deadline;
1529   struct GNUNET_TIME_Relative retry_time;
1530
1531   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1532     {
1533       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1534       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1535     }
1536   switch (n->status)
1537     {
1538     case PEER_STATE_DOWN:
1539       send_key (n);
1540 #if DEBUG_CORE
1541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1543                   GNUNET_i2s(&n->peer));
1544 #endif
1545       return;
1546     case PEER_STATE_KEY_SENT:
1547       GNUNET_assert (n->retry_set_key_task !=
1548                      GNUNET_SCHEDULER_NO_TASK);
1549 #if DEBUG_CORE
1550       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1552                   GNUNET_i2s(&n->peer));
1553 #endif
1554       return;
1555     case PEER_STATE_KEY_RECEIVED:
1556       GNUNET_assert (n->retry_set_key_task !=
1557                      GNUNET_SCHEDULER_NO_TASK);
1558 #if DEBUG_CORE
1559       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1561                   GNUNET_i2s(&n->peer));
1562 #endif
1563       return;
1564     case PEER_STATE_KEY_CONFIRMED:
1565       /* ready to continue */
1566       break;
1567     }
1568   discard_expired_messages (n);
1569   if (n->messages == NULL)
1570     {
1571 #if DEBUG_CORE
1572       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1573                   "Plaintext message queue for `%4s' is empty.\n",
1574                   GNUNET_i2s(&n->peer));
1575 #endif
1576       return;                   /* no pending messages */
1577     }
1578   if (n->encrypted_head != NULL)
1579     {
1580 #if DEBUG_CORE
1581       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1582                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1583                   GNUNET_i2s(&n->peer));
1584 #endif
1585       return;                   /* wait for messages already encrypted to be
1586                                    processed first! */
1587     }
1588   ph = (struct EncryptedMessage *) pbuf;
1589   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1590   priority = 0;
1591   used = sizeof (struct EncryptedMessage);
1592   used += batch_message (n,
1593                          &pbuf[used],
1594                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1595                          &deadline, &retry_time, &priority);
1596   if (used == sizeof (struct EncryptedMessage))
1597     {
1598 #if DEBUG_CORE
1599       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1601                   GNUNET_i2s(&n->peer));
1602 #endif
1603       /* no messages selected for sending, try again later... */
1604       n->retry_plaintext_task =
1605         GNUNET_SCHEDULER_add_delayed (sched,
1606                                       retry_time,
1607                                       &retry_plaintext_processing, n);
1608       return;
1609     }
1610   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1611   ph->inbound_bpm_limit = htonl (n->bpm_in);
1612   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1613
1614   /* setup encryption message header */
1615   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1616   me->deadline = deadline;
1617   me->priority = priority;
1618   me->size = used;
1619   em = (struct EncryptedMessage *) &me[1];
1620   em->header.size = htons (used);
1621   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1622   em->reserved = htonl (0);
1623   esize = used - ENCRYPTED_HEADER_SIZE;
1624   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1625   /* encrypt */
1626 #if DEBUG_CORE
1627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1629               esize,
1630               GNUNET_i2s(&n->peer),
1631               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1632 #endif
1633   GNUNET_assert (GNUNET_OK ==
1634                  do_encrypt (n,
1635                              &em->plaintext_hash,
1636                              &ph->sequence_number,
1637                              &em->sequence_number, esize));
1638   /* append to transmission list */
1639   if (n->encrypted_tail == NULL)
1640     n->encrypted_head = me;
1641   else
1642     n->encrypted_tail->next = me;
1643   n->encrypted_tail = me;
1644   process_encrypted_neighbour_queue (n);
1645 }
1646
1647
1648 /**
1649  * Handle CORE_SEND request.
1650  *
1651  * @param cls unused
1652  * @param client the client issuing the request
1653  * @param message the "struct SendMessage"
1654  */
1655 static void
1656 handle_client_send (void *cls,
1657                     struct GNUNET_SERVER_Client *client,
1658                     const struct GNUNET_MessageHeader *message);
1659
1660
1661 /**
1662  * Function called to notify us that we either succeeded
1663  * or failed to connect (at the transport level) to another
1664  * peer.  We should either free the message we were asked
1665  * to transmit or re-try adding it to the queue.
1666  *
1667  * @param cls closure
1668  * @param size number of bytes available in buf
1669  * @param buf where the callee should write the message
1670  * @return number of bytes written to buf
1671  */
1672 static size_t
1673 send_connect_continuation (void *cls, size_t size, void *buf)
1674 {
1675   struct SendMessage *sm = cls;
1676
1677   if (buf == NULL)
1678     {
1679 #if DEBUG_CORE
1680       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1681                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1682                   GNUNET_i2s (&sm->peer));
1683 #endif
1684       GNUNET_free (sm);
1685       return 0;
1686     }
1687 #if DEBUG_CORE
1688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1690               GNUNET_i2s (&sm->peer));
1691 #endif
1692   handle_client_send (NULL, NULL, &sm->header);
1693   GNUNET_free (sm);
1694   return 0;
1695 }
1696
1697
1698 /**
1699  * Handle CORE_SEND request.
1700  *
1701  * @param cls unused
1702  * @param client the client issuing the request
1703  * @param message the "struct SendMessage"
1704  */
1705 static void
1706 handle_client_send (void *cls,
1707                     struct GNUNET_SERVER_Client *client,
1708                     const struct GNUNET_MessageHeader *message)
1709 {
1710   const struct SendMessage *sm;
1711   struct SendMessage *smc;
1712   const struct GNUNET_MessageHeader *mh;
1713   struct Neighbour *n;
1714   struct MessageEntry *prev;
1715   struct MessageEntry *pos;
1716   struct MessageEntry *e; 
1717   struct MessageEntry *min_prio_entry;
1718   struct MessageEntry *min_prio_prev;
1719   unsigned int min_prio;
1720   unsigned int queue_size;
1721   uint16_t msize;
1722
1723   msize = ntohs (message->size);
1724   if (msize <
1725       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1726     {
1727       GNUNET_break (0);
1728       if (client != NULL)
1729         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1730       return;
1731     }
1732   sm = (const struct SendMessage *) message;
1733   msize -= sizeof (struct SendMessage);
1734   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1735   if (msize != ntohs (mh->size))
1736     {
1737       GNUNET_break (0);
1738       if (client != NULL)
1739         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1740       return;
1741     }
1742   n = find_neighbour (&sm->peer);
1743   if (n == NULL)
1744     {
1745 #if DEBUG_CORE
1746       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1748                   "SEND",
1749                   GNUNET_i2s (&sm->peer),
1750                   GNUNET_TIME_absolute_get_remaining
1751                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1752 #endif
1753       msize += sizeof (struct SendMessage);
1754       /* ask transport to connect to the peer */
1755       smc = GNUNET_malloc (msize);
1756       memcpy (smc, sm, msize);
1757       if (NULL ==
1758           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1759                                                   &sm->peer,
1760                                                   0, 0,
1761                                                   GNUNET_TIME_absolute_get_remaining
1762                                                   (GNUNET_TIME_absolute_ntoh
1763                                                    (sm->deadline)),
1764                                                   &send_connect_continuation,
1765                                                   smc))
1766         {
1767           /* transport has already a request pending for this peer! */
1768 #if DEBUG_CORE
1769           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1770                       "Dropped second message destined for `%4s' since connection is still down.\n",
1771                       GNUNET_i2s(&sm->peer));
1772 #endif
1773           GNUNET_free (smc);
1774         }
1775       if (client != NULL)
1776         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1777       return;
1778     }
1779 #if DEBUG_CORE
1780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1781               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1782               "SEND",
1783               msize, 
1784               GNUNET_i2s (&sm->peer));
1785 #endif
1786   /* bound queue size */
1787   discard_expired_messages (n);
1788   min_prio = (unsigned int) -1;
1789   min_prio_entry = NULL;
1790   min_prio_prev = NULL;
1791   queue_size = 0;
1792   prev = NULL;
1793   pos = n->messages;
1794   while (pos != NULL) 
1795     {
1796       if (pos->priority < min_prio)
1797         {
1798           min_prio_entry = pos;
1799           min_prio_prev = prev;
1800           min_prio = pos->priority;
1801         }
1802       queue_size++;
1803       prev = pos;
1804       pos = pos->next;
1805     }
1806   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1807     {
1808       /* queue full */
1809       if (ntohl(sm->priority) <= min_prio)
1810         {
1811           /* discard new entry */
1812 #if DEBUG_CORE
1813           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814                       "Queue full, discarding new request\n");
1815 #endif
1816           if (client != NULL)
1817             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1818           return;
1819         }
1820       /* discard "min_prio_entry" */
1821 #if DEBUG_CORE
1822       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1823                   "Queue full, discarding existing older request\n");
1824 #endif
1825       if (min_prio_prev == NULL)
1826         n->messages = min_prio_entry->next;
1827       else
1828         min_prio_prev->next = min_prio_entry->next;      
1829       GNUNET_free (min_prio_entry);     
1830     }
1831
1832 #if DEBUG_CORE
1833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834               "Adding transmission request for `%4s' to queue\n",
1835               GNUNET_i2s (&sm->peer));
1836 #endif  
1837   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1838   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1839   e->priority = ntohl (sm->priority);
1840   e->size = msize;
1841   memcpy (&e[1], mh, msize);
1842
1843   /* insert, keep list sorted by deadline */
1844   prev = NULL;
1845   pos = n->messages;
1846   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1847     {
1848       prev = pos;
1849       pos = pos->next;
1850     }
1851   if (prev == NULL)
1852     n->messages = e;
1853   else
1854     prev->next = e;
1855   e->next = pos;
1856
1857   /* consider scheduling now */
1858   process_plaintext_neighbour_queue (n);
1859   if (client != NULL)
1860     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1861 }
1862
1863
1864 /**
1865  * Handle CORE_REQUEST_CONNECT request.
1866  *
1867  * @param cls unused
1868  * @param client the client issuing the request
1869  * @param message the "struct ConnectMessage"
1870  */
1871 static void
1872 handle_client_request_connect (void *cls,
1873                                struct GNUNET_SERVER_Client *client,
1874                                const struct GNUNET_MessageHeader *message)
1875 {
1876   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
1877   struct Neighbour *n;
1878
1879   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1880   n = find_neighbour (&cm->peer);
1881   if (n != NULL)
1882     return; /* already connected, or at least trying */
1883 #if DEBUG_CORE
1884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1885               "Core received `%s' request for `%4s', will try to establish connection\n",
1886               "REQUEST_CONNECT",
1887               GNUNET_i2s (&cm->peer));
1888 #endif
1889   /* ask transport to connect to the peer */
1890   /* FIXME: timeout zero OK? need for cancellation? */
1891   GNUNET_TRANSPORT_notify_transmit_ready (transport,
1892                                           &cm->peer,
1893                                           0, 0,
1894                                           GNUNET_TIME_UNIT_ZERO,
1895                                           NULL,
1896                                           NULL);
1897 }
1898
1899
1900 /**
1901  * List of handlers for the messages understood by this
1902  * service.
1903  */
1904 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1905   {&handle_client_init, NULL,
1906    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1907   {&handle_client_request_info, NULL,
1908    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
1909    sizeof (struct RequestInfoMessage)},
1910   {&handle_client_send, NULL,
1911    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1912   {&handle_client_request_connect, NULL,
1913    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
1914    sizeof (struct ConnectMessage)},
1915   {NULL, NULL, 0, 0}
1916 };
1917
1918
1919 /**
1920  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
1921  * the neighbour's struct and retry send_key.  Or, if we did not get a
1922  * HELLO, just do nothing.
1923  *
1924  * @param cls NULL
1925  * @param peer the peer for which this is the HELLO
1926  * @param hello HELLO message of that peer
1927  * @param trust amount of trust we currently have in that peer
1928  */
1929 static void
1930 process_hello_retry_send_key (void *cls,
1931                               const struct GNUNET_PeerIdentity *peer,
1932                               const struct GNUNET_HELLO_Message *hello,
1933                               uint32_t trust)
1934 {
1935   struct Neighbour *n = cls;
1936
1937   if (peer == NULL)
1938     {
1939       n->pitr = NULL;
1940       return;
1941     }
1942   if (n->public_key != NULL)
1943     return;
1944 #if DEBUG_CORE
1945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1946               "Received new `%s' message for `%4s', initiating key exchange.\n",
1947               "HELLO",
1948               GNUNET_i2s (peer));
1949 #endif
1950   n->public_key =
1951     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
1952   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
1953     {
1954       GNUNET_free (n->public_key);
1955       n->public_key = NULL;
1956       return;
1957     }
1958   send_key (n);
1959 }
1960
1961
1962 /**
1963  * Task that will retry "send_key" if our previous attempt failed
1964  * to yield a PONG.
1965  */
1966 static void
1967 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1968 {
1969   struct Neighbour *n = cls;
1970
1971   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1972   n->set_key_retry_frequency =
1973     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1974   send_key (n);
1975 }
1976
1977
1978 /**
1979  * Send our key (and encrypted PING) to the other peer.
1980  *
1981  * @param n the other peer
1982  */
1983 static void
1984 send_key (struct Neighbour *n)
1985 {
1986   struct SetKeyMessage *sm;
1987   struct MessageEntry *me;
1988   struct PingMessage pp;
1989   struct PingMessage *pm;
1990
1991   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
1992        (n->pitr != NULL) )
1993     return; /* already in progress */
1994 #if DEBUG_CORE
1995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996               "Asked to perform key exchange with `%4s'.\n",
1997               GNUNET_i2s (&n->peer));
1998 #endif
1999   if (n->public_key == NULL)
2000     {
2001       /* lookup n's public key, then try again */
2002 #if DEBUG_CORE
2003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2004                   "Lacking public key for `%4s', trying to obtain one.\n",
2005                   GNUNET_i2s (&n->peer));
2006 #endif
2007       GNUNET_assert (n->pitr == NULL);
2008       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2009                                          sched,
2010                                          &n->peer,
2011                                          0,
2012                                          GNUNET_TIME_UNIT_MINUTES,
2013                                          &process_hello_retry_send_key, n);
2014       return;
2015     }
2016   /* first, set key message */
2017   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2018                       sizeof (struct SetKeyMessage));
2019   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2020   me->priority = SET_KEY_PRIORITY;
2021   me->size = sizeof (struct SetKeyMessage);
2022   if (n->encrypted_head == NULL)
2023     n->encrypted_head = me;
2024   else
2025     n->encrypted_tail->next = me;
2026   n->encrypted_tail = me;
2027   sm = (struct SetKeyMessage *) &me[1];
2028   sm->header.size = htons (sizeof (struct SetKeyMessage));
2029   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2030   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2031                                         PEER_STATE_KEY_SENT : n->status));
2032   sm->purpose.size =
2033     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2034            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2035            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2036            sizeof (struct GNUNET_PeerIdentity));
2037   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2038   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2039   sm->target = n->peer;
2040   GNUNET_assert (GNUNET_OK ==
2041                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2042                                             sizeof (struct
2043                                                     GNUNET_CRYPTO_AesSessionKey),
2044                                             n->public_key,
2045                                             &sm->encrypted_key));
2046   GNUNET_assert (GNUNET_OK ==
2047                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2048                                          &sm->signature));
2049
2050   /* second, encrypted PING message */
2051   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2052                       sizeof (struct PingMessage));
2053   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2054   me->priority = PING_PRIORITY;
2055   me->size = sizeof (struct PingMessage);
2056   n->encrypted_tail->next = me;
2057   n->encrypted_tail = me;
2058   pm = (struct PingMessage *) &me[1];
2059   pm->header.size = htons (sizeof (struct PingMessage));
2060   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2061   pp.challenge = htonl (n->ping_challenge);
2062   pp.target = n->peer;
2063 #if DEBUG_CORE
2064   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2065               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2066               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2069               "PING",
2070               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2071 #endif
2072   do_encrypt (n,
2073               &n->peer.hashPubKey,
2074               &pp.challenge,
2075               &pm->challenge,
2076               sizeof (struct PingMessage) -
2077               sizeof (struct GNUNET_MessageHeader));
2078   /* update status */
2079   switch (n->status)
2080     {
2081     case PEER_STATE_DOWN:
2082       n->status = PEER_STATE_KEY_SENT;
2083       break;
2084     case PEER_STATE_KEY_SENT:
2085       break;
2086     case PEER_STATE_KEY_RECEIVED:
2087       break;
2088     case PEER_STATE_KEY_CONFIRMED:
2089       break;
2090     default:
2091       GNUNET_break (0);
2092       break;
2093     }
2094 #if DEBUG_CORE
2095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2096               "Have %llu ms left for `%s' transmission.\n",
2097               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2098               "SET_KEY");
2099 #endif
2100   /* trigger queue processing */
2101   process_encrypted_neighbour_queue (n);
2102   if (n->status != PEER_STATE_KEY_CONFIRMED)
2103     {
2104       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task);
2105       n->retry_set_key_task
2106         = GNUNET_SCHEDULER_add_delayed (sched,
2107                                         n->set_key_retry_frequency,
2108                                         &set_key_retry_task, n);
2109     }
2110 }
2111
2112
2113 /**
2114  * We received a SET_KEY message.  Validate and update
2115  * our key material and status.
2116  *
2117  * @param n the neighbour from which we received message m
2118  * @param m the set key message we received
2119  */
2120 static void
2121 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2122
2123
2124 /**
2125  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2126  * the neighbour's struct and retry handling the set_key message.  Or,
2127  * if we did not get a HELLO, just free the set key message.
2128  *
2129  * @param cls pointer to the set key message
2130  * @param peer the peer for which this is the HELLO
2131  * @param hello HELLO message of that peer
2132  * @param trust amount of trust we currently have in that peer
2133  */
2134 static void
2135 process_hello_retry_handle_set_key (void *cls,
2136                                     const struct GNUNET_PeerIdentity *peer,
2137                                     const struct GNUNET_HELLO_Message *hello,
2138                                     uint32_t trust)
2139 {
2140   struct Neighbour *n = cls;
2141   struct SetKeyMessage *sm = n->skm;
2142
2143   if (peer == NULL)
2144     {
2145       GNUNET_free (sm);
2146       n->skm = NULL;
2147       n->pitr = NULL;
2148       return;
2149     }
2150   if (n->public_key != NULL)
2151     return;                     /* multiple HELLOs match!? */
2152   n->public_key =
2153     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2154   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2155     {
2156       GNUNET_break_op (0);
2157       GNUNET_free (n->public_key);
2158       n->public_key = NULL;
2159       return;
2160     }
2161 #if DEBUG_CORE
2162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2163               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2164               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2165 #endif
2166   handle_set_key (n, sm);
2167 }
2168
2169
2170 /**
2171  * We received a PING message.  Validate and transmit
2172  * PONG.
2173  *
2174  * @param n sender of the PING
2175  * @param m the encrypted PING message itself
2176  */
2177 static void
2178 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2179 {
2180   struct PingMessage t;
2181   struct PingMessage *tp;
2182   struct MessageEntry *me;
2183
2184 #if DEBUG_CORE
2185   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2186               "Core service receives `%s' request from `%4s'.\n",
2187               "PING", GNUNET_i2s (&n->peer));
2188 #endif
2189   if (GNUNET_OK !=
2190       do_decrypt (n,
2191                   &my_identity.hashPubKey,
2192                   &m->challenge,
2193                   &t.challenge,
2194                   sizeof (struct PingMessage) -
2195                   sizeof (struct GNUNET_MessageHeader)))
2196     return;
2197 #if DEBUG_CORE
2198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2199               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2200               "PING",
2201               GNUNET_i2s (&t.target),
2202               ntohl (t.challenge), n->decrypt_key.crc32);
2203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2204               "Target of `%s' request is `%4s'.\n",
2205               "PING", GNUNET_i2s (&t.target));
2206 #endif
2207   if (0 != memcmp (&t.target,
2208                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2209     {
2210       GNUNET_break_op (0);
2211       return;
2212     }
2213   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2214                       sizeof (struct PingMessage));
2215   if (n->encrypted_tail != NULL)
2216     n->encrypted_tail->next = me;
2217   else
2218     {
2219       n->encrypted_tail = me;
2220       n->encrypted_head = me;
2221     }
2222   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2223   me->priority = PONG_PRIORITY;
2224   me->size = sizeof (struct PingMessage);
2225   tp = (struct PingMessage *) &me[1];
2226   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2227   tp->header.size = htons (sizeof (struct PingMessage));
2228   do_encrypt (n,
2229               &my_identity.hashPubKey,
2230               &t.challenge,
2231               &tp->challenge,
2232               sizeof (struct PingMessage) -
2233               sizeof (struct GNUNET_MessageHeader));
2234 #if DEBUG_CORE
2235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2237               ntohl (t.challenge), n->encrypt_key.crc32);
2238 #endif
2239   /* trigger queue processing */
2240   process_encrypted_neighbour_queue (n);
2241 }
2242
2243
2244 /**
2245  * We received a SET_KEY message.  Validate and update
2246  * our key material and status.
2247  *
2248  * @param n the neighbour from which we received message m
2249  * @param m the set key message we received
2250  */
2251 static void
2252 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2253 {
2254   struct SetKeyMessage *m_cpy;
2255   struct GNUNET_TIME_Absolute t;
2256   struct GNUNET_CRYPTO_AesSessionKey k;
2257   struct PingMessage *ping;
2258   enum PeerStateMachine sender_status;
2259
2260 #if DEBUG_CORE
2261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2262               "Core service receives `%s' request from `%4s'.\n",
2263               "SET_KEY", GNUNET_i2s (&n->peer));
2264 #endif
2265   if (n->public_key == NULL)
2266     {
2267 #if DEBUG_CORE
2268       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2269                   "Lacking public key for peer, trying to obtain one.\n");
2270 #endif
2271       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2272       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2273       /* lookup n's public key, then try again */
2274       GNUNET_assert (n->pitr == NULL);
2275       GNUNET_assert (n->skm == NULL);
2276       n->skm = m_cpy;
2277       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2278                                          sched,
2279                                          &n->peer,
2280                                          0,
2281                                          GNUNET_TIME_UNIT_MINUTES,
2282                                          &process_hello_retry_handle_set_key, n);
2283       return;
2284     }
2285   if (0 != memcmp (&m->target,
2286                    &my_identity,
2287                    sizeof (struct GNUNET_PeerIdentity)))
2288     {
2289       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2290                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2291       return;
2292     }
2293   if ((ntohl (m->purpose.size) !=
2294        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2295        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2296        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2297        sizeof (struct GNUNET_PeerIdentity)) ||
2298       (GNUNET_OK !=
2299        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2300                                  &m->purpose, &m->signature, n->public_key)))
2301     {
2302       /* invalid signature */
2303       GNUNET_break_op (0);
2304       return;
2305     }
2306   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2307   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2308        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2309       (t.value < n->decrypt_key_created.value))
2310     {
2311       /* this could rarely happen due to massive re-ordering of
2312          messages on the network level, but is most likely either
2313          a bug or some adversary messing with us.  Report. */
2314       GNUNET_break_op (0);
2315       return;
2316     }
2317 #if DEBUG_CORE
2318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2319 #endif  
2320   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2321                                   &m->encrypted_key,
2322                                   &k,
2323                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2324        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2325       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2326     {
2327       /* failed to decrypt !? */
2328       GNUNET_break_op (0);
2329       return;
2330     }
2331
2332   n->decrypt_key = k;
2333   if (n->decrypt_key_created.value != t.value)
2334     {
2335       /* fresh key, reset sequence numbers */
2336       n->last_sequence_number_received = 0;
2337       n->last_packets_bitmap = 0;
2338       n->decrypt_key_created = t;
2339     }
2340   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2341   switch (n->status)
2342     {
2343     case PEER_STATE_DOWN:
2344       n->status = PEER_STATE_KEY_RECEIVED;
2345 #if DEBUG_CORE
2346       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347                   "Responding to `%s' with my own key.\n", "SET_KEY");
2348 #endif
2349       send_key (n);
2350       break;
2351     case PEER_STATE_KEY_SENT:
2352     case PEER_STATE_KEY_RECEIVED:
2353       n->status = PEER_STATE_KEY_RECEIVED;
2354       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2355           (sender_status != PEER_STATE_KEY_CONFIRMED))
2356         {
2357 #if DEBUG_CORE
2358           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2359                       "Responding to `%s' with my own key (other peer has status %u).\n",
2360                       "SET_KEY", sender_status);
2361 #endif
2362           send_key (n);
2363         }
2364       break;
2365     case PEER_STATE_KEY_CONFIRMED:
2366       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2367           (sender_status != PEER_STATE_KEY_CONFIRMED))
2368         {         
2369 #if DEBUG_CORE
2370           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2372                       "SET_KEY", sender_status);
2373 #endif
2374           send_key (n);
2375         }
2376       break;
2377     default:
2378       GNUNET_break (0);
2379       break;
2380     }
2381   if (n->pending_ping != NULL)
2382     {
2383       ping = n->pending_ping;
2384       n->pending_ping = NULL;
2385       handle_ping (n, ping);
2386       GNUNET_free (ping);
2387     }
2388 }
2389
2390
2391 /**
2392  * We received a PONG message.  Validate and update our status.
2393  *
2394  * @param n sender of the PONG
2395  * @param m the encrypted PONG message itself
2396  */
2397 static void
2398 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2399 {
2400   struct PingMessage t;
2401   struct ConnectNotifyMessage cnm;
2402
2403 #if DEBUG_CORE
2404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405               "Core service receives `%s' request from `%4s'.\n",
2406               "PONG", GNUNET_i2s (&n->peer));
2407 #endif
2408   if (GNUNET_OK !=
2409       do_decrypt (n,
2410                   &n->peer.hashPubKey,
2411                   &m->challenge,
2412                   &t.challenge,
2413                   sizeof (struct PingMessage) -
2414                   sizeof (struct GNUNET_MessageHeader)))
2415     return;
2416 #if DEBUG_CORE
2417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2418               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2419               "PONG",
2420               GNUNET_i2s (&t.target),
2421               ntohl (t.challenge), n->decrypt_key.crc32);
2422 #endif
2423   if ((0 != memcmp (&t.target,
2424                     &n->peer,
2425                     sizeof (struct GNUNET_PeerIdentity))) ||
2426       (n->ping_challenge != ntohl (t.challenge)))
2427     {
2428       /* PONG malformed */
2429 #if DEBUG_CORE
2430       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2431                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2432                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2433       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2434                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2435                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2436 #endif
2437       GNUNET_break_op (0);
2438       return;
2439     }
2440   switch (n->status)
2441     {
2442     case PEER_STATE_DOWN:
2443       GNUNET_break (0);         /* should be impossible */
2444       return;
2445     case PEER_STATE_KEY_SENT:
2446       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2447       return;
2448     case PEER_STATE_KEY_RECEIVED:
2449       n->status = PEER_STATE_KEY_CONFIRMED;
2450       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2451         {
2452           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2453           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2454         }      
2455       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2456       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2457       cnm.distance = htonl (n->last_distance);
2458       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2459       cnm.peer = n->peer;
2460       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2461       process_encrypted_neighbour_queue (n);
2462       break;
2463     case PEER_STATE_KEY_CONFIRMED:
2464       /* duplicate PONG? */
2465       break;
2466     default:
2467       GNUNET_break (0);
2468       break;
2469     }
2470 }
2471
2472
2473 /**
2474  * Send a P2P message to a client.
2475  *
2476  * @param sender who sent us the message?
2477  * @param client who should we give the message to?
2478  * @param m contains the message to transmit
2479  * @param msize number of bytes in buf to transmit
2480  */
2481 static void
2482 send_p2p_message_to_client (struct Neighbour *sender,
2483                             struct Client *client,
2484                             const void *m, size_t msize)
2485 {
2486   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2487   struct NotifyTrafficMessage *ntm;
2488
2489 #if DEBUG_CORE
2490   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2491               "Core service passes message from `%4s' of type %u to client.\n",
2492               GNUNET_i2s(&sender->peer),
2493               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2494 #endif
2495   ntm = (struct NotifyTrafficMessage *) buf;
2496   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2497   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2498   ntm->distance = htonl (sender->last_distance);
2499   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2500   ntm->peer = sender->peer;
2501   memcpy (&ntm[1], m, msize);
2502   send_to_client (client, &ntm->header, GNUNET_YES);
2503 }
2504
2505
2506 /**
2507  * Deliver P2P message to interested clients.
2508  *
2509  * @param sender who sent us the message?
2510  * @param m the message
2511  * @param msize size of the message (including header)
2512  */
2513 static void
2514 deliver_message (struct Neighbour *sender,
2515                  const struct GNUNET_MessageHeader *m, size_t msize)
2516 {
2517   struct Client *cpos;
2518   uint16_t type;
2519   unsigned int tpos;
2520   int deliver_full;
2521
2522   type = ntohs (m->type);
2523   cpos = clients;
2524   while (cpos != NULL)
2525     {
2526       deliver_full = GNUNET_NO;
2527       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2528         deliver_full = GNUNET_YES;
2529       else
2530         {
2531           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2532             {
2533               if (type != cpos->types[tpos])
2534                 continue;
2535               deliver_full = GNUNET_YES;
2536               break;
2537             }
2538         }
2539       if (GNUNET_YES == deliver_full)
2540         send_p2p_message_to_client (sender, cpos, m, msize);
2541       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2542         send_p2p_message_to_client (sender, cpos, m,
2543                                     sizeof (struct GNUNET_MessageHeader));
2544       cpos = cpos->next;
2545     }
2546 }
2547
2548
2549 /**
2550  * Align P2P message and then deliver to interested clients.
2551  *
2552  * @param sender who sent us the message?
2553  * @param buffer unaligned (!) buffer containing message
2554  * @param msize size of the message (including header)
2555  */
2556 static void
2557 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2558 {
2559   char abuf[msize];
2560
2561   /* TODO: call to statistics? */
2562   memcpy (abuf, buffer, msize);
2563   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2564 }
2565
2566
2567 /**
2568  * Deliver P2P messages to interested clients.
2569  *
2570  * @param sender who sent us the message?
2571  * @param buffer buffer containing messages, can be modified
2572  * @param buffer_size size of the buffer (overall)
2573  * @param offset offset where messages in the buffer start
2574  */
2575 static void
2576 deliver_messages (struct Neighbour *sender,
2577                   const char *buffer, size_t buffer_size, size_t offset)
2578 {
2579   struct GNUNET_MessageHeader *mhp;
2580   struct GNUNET_MessageHeader mh;
2581   uint16_t msize;
2582   int need_align;
2583
2584   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2585     {
2586       if (0 != offset % sizeof (uint16_t))
2587         {
2588           /* outch, need to copy to access header */
2589           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2590           mhp = &mh;
2591         }
2592       else
2593         {
2594           /* can access header directly */
2595           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2596         }
2597       msize = ntohs (mhp->size);
2598       if (msize + offset > buffer_size)
2599         {
2600           /* malformed message, header says it is larger than what
2601              would fit into the overall buffer */
2602           GNUNET_break_op (0);
2603           break;
2604         }
2605 #if HAVE_UNALIGNED_64_ACCESS
2606       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2607 #else
2608       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2609 #endif
2610       if (GNUNET_YES == need_align)
2611         align_and_deliver (sender, &buffer[offset], msize);
2612       else
2613         deliver_message (sender,
2614                          (const struct GNUNET_MessageHeader *)
2615                          &buffer[offset], msize);
2616       offset += msize;
2617     }
2618 }
2619
2620
2621 /**
2622  * We received an encrypted message.  Decrypt, validate and
2623  * pass on to the appropriate clients.
2624  */
2625 static void
2626 handle_encrypted_message (struct Neighbour *n,
2627                           const struct EncryptedMessage *m)
2628 {
2629   size_t size = ntohs (m->header.size);
2630   char buf[size];
2631   struct EncryptedMessage *pt;  /* plaintext */
2632   GNUNET_HashCode ph;
2633   size_t off;
2634   uint32_t snum;
2635   struct GNUNET_TIME_Absolute t;
2636
2637 #if DEBUG_CORE
2638   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2639               "Core service receives `%s' request from `%4s'.\n",
2640               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2641 #endif  
2642   /* decrypt */
2643   if (GNUNET_OK !=
2644       do_decrypt (n,
2645                   &m->plaintext_hash,
2646                   &m->sequence_number,
2647                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2648     return;
2649   pt = (struct EncryptedMessage *) buf;
2650
2651   /* validate hash */
2652   GNUNET_CRYPTO_hash (&pt->sequence_number,
2653                       size - ENCRYPTED_HEADER_SIZE, &ph);
2654   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2655     {
2656       /* checksum failed */
2657       GNUNET_break_op (0);
2658       return;
2659     }
2660
2661   /* validate sequence number */
2662   snum = ntohl (pt->sequence_number);
2663   if (n->last_sequence_number_received == snum)
2664     {
2665       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2666                   "Received duplicate message, ignoring.\n");
2667       /* duplicate, ignore */
2668       return;
2669     }
2670   if ((n->last_sequence_number_received > snum) &&
2671       (n->last_sequence_number_received - snum > 32))
2672     {
2673       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2674                   "Received ancient out of sequence message, ignoring.\n");
2675       /* ancient out of sequence, ignore */
2676       return;
2677     }
2678   if (n->last_sequence_number_received > snum)
2679     {
2680       unsigned int rotbit =
2681         1 << (n->last_sequence_number_received - snum - 1);
2682       if ((n->last_packets_bitmap & rotbit) != 0)
2683         {
2684           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2685                       "Received duplicate message, ignoring.\n");
2686           /* duplicate, ignore */
2687           return;
2688         }
2689       n->last_packets_bitmap |= rotbit;
2690     }
2691   if (n->last_sequence_number_received < snum)
2692     {
2693       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2694       n->last_sequence_number_received = snum;
2695     }
2696
2697   /* check timestamp */
2698   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2699   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2700     {
2701       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2702                   _
2703                   ("Message received far too old (%llu ms). Content ignored.\n"),
2704                   GNUNET_TIME_absolute_get_duration (t).value);
2705       return;
2706     }
2707
2708   /* process decrypted message(s) */
2709   update_window (GNUNET_YES,
2710                  &n->available_send_window,
2711                  &n->last_asw_update,
2712                  n->bpm_out);
2713   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2714   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2715                            n->bpm_out_internal_limit);
2716   n->last_activity = GNUNET_TIME_absolute_get ();
2717   off = sizeof (struct EncryptedMessage);
2718   deliver_messages (n, buf, size, off);
2719 }
2720
2721
2722 /**
2723  * Function called by the transport for each received message.
2724  *
2725  * @param cls closure
2726  * @param peer (claimed) identity of the other peer
2727  * @param message the message
2728  * @param latency estimated latency for communicating with the
2729  *             given peer (round-trip)
2730  * @param distance in overlay hops, as given by transport plugin
2731  */
2732 static void
2733 handle_transport_receive (void *cls,
2734                           const struct GNUNET_PeerIdentity *peer,
2735                           const struct GNUNET_MessageHeader *message,
2736                           struct GNUNET_TIME_Relative latency,
2737                           unsigned int distance)
2738 {
2739   struct Neighbour *n;
2740   struct GNUNET_TIME_Absolute now;
2741   int up;
2742   uint16_t type;
2743   uint16_t size;
2744
2745 #if DEBUG_CORE
2746   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2747               "Received message of type %u from `%4s', demultiplexing.\n",
2748               ntohs (message->type), GNUNET_i2s (peer));
2749 #endif
2750   n = find_neighbour (peer);
2751   if (n == NULL)
2752     {
2753       GNUNET_break (0);
2754       return;
2755     }
2756   n->last_latency = latency;
2757   n->last_distance = distance;
2758   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2759   type = ntohs (message->type);
2760   size = ntohs (message->size);
2761   switch (type)
2762     {
2763     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2764       if (size != sizeof (struct SetKeyMessage))
2765         {
2766           GNUNET_break_op (0);
2767           return;
2768         }
2769       handle_set_key (n, (const struct SetKeyMessage *) message);
2770       break;
2771     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2772       if (size < sizeof (struct EncryptedMessage) +
2773           sizeof (struct GNUNET_MessageHeader))
2774         {
2775           GNUNET_break_op (0);
2776           return;
2777         }
2778       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2779           (n->status != PEER_STATE_KEY_CONFIRMED))
2780         {
2781           GNUNET_break_op (0);
2782           return;
2783         }
2784       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2785       break;
2786     case GNUNET_MESSAGE_TYPE_CORE_PING:
2787       if (size != sizeof (struct PingMessage))
2788         {
2789           GNUNET_break_op (0);
2790           return;
2791         }
2792       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2793           (n->status != PEER_STATE_KEY_CONFIRMED))
2794         {
2795 #if DEBUG_CORE
2796           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2798                       "PING", GNUNET_i2s (&n->peer));
2799 #endif
2800           GNUNET_free_non_null (n->pending_ping);
2801           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2802           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2803           return;
2804         }
2805       handle_ping (n, (const struct PingMessage *) message);
2806       break;
2807     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2808       if (size != sizeof (struct PingMessage))
2809         {
2810           GNUNET_break_op (0);
2811           return;
2812         }
2813       if ((n->status != PEER_STATE_KEY_SENT) &&
2814           (n->status != PEER_STATE_KEY_RECEIVED) &&
2815           (n->status != PEER_STATE_KEY_CONFIRMED))
2816         {
2817           /* could not decrypt pong, oops! */
2818           GNUNET_break_op (0);
2819           return;
2820         }
2821       handle_pong (n, (const struct PingMessage *) message);
2822       break;
2823     default:
2824       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2825                   _("Unsupported message of type %u received.\n"), type);
2826       return;
2827     }
2828   if (n->status == PEER_STATE_KEY_CONFIRMED)
2829     {
2830       now = GNUNET_TIME_absolute_get ();
2831       n->last_activity = now;
2832       if (!up)
2833         n->time_established = now;
2834     }
2835 }
2836
2837
2838 /**
2839  * Function that recalculates the bandwidth quota for the
2840  * given neighbour and transmits it to the transport service.
2841  * 
2842  * @param cls neighbour for the quota update
2843  * @param tc context
2844  */
2845 static void
2846 neighbour_quota_update (void *cls,
2847                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2848
2849
2850 /**
2851  * Schedule the task that will recalculate the bandwidth
2852  * quota for this peer (and possibly force a disconnect of
2853  * idle peers by calculating a bandwidth of zero).
2854  */
2855 static void
2856 schedule_quota_update (struct Neighbour *n)
2857 {
2858   GNUNET_assert (n->quota_update_task ==
2859                  GNUNET_SCHEDULER_NO_TASK);
2860   n->quota_update_task
2861     = GNUNET_SCHEDULER_add_delayed (sched,
2862                                     QUOTA_UPDATE_FREQUENCY,
2863                                     &neighbour_quota_update,
2864                                     n);
2865 }
2866
2867
2868 /**
2869  * Function that recalculates the bandwidth quota for the
2870  * given neighbour and transmits it to the transport service.
2871  * 
2872  * @param cls neighbour for the quota update
2873  * @param tc context
2874  */
2875 static void
2876 neighbour_quota_update (void *cls,
2877                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2878 {
2879   struct Neighbour *n = cls;
2880   uint32_t q_in;
2881   double pref_rel;
2882   double share;
2883   unsigned long long distributable;
2884   
2885   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2886   /* calculate relative preference among all neighbours;
2887      divides by a bit more to avoid division by zero AND to
2888      account for possibility of new neighbours joining any time 
2889      AND to convert to double... */
2890   pref_rel = n->current_preference / (1.0 + preference_sum);
2891   distributable = 0;
2892   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2893     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2894   share = distributable * pref_rel;
2895   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2896   /* check if we want to disconnect for good due to inactivity */
2897   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2898        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2899     q_in = 0; /* force disconnect */
2900   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2901        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2902     {
2903       n->bpm_in = q_in;
2904       GNUNET_TRANSPORT_set_quota (transport,
2905                                   &n->peer,
2906                                   n->bpm_in, 
2907                                   n->bpm_out,
2908                                   GNUNET_TIME_UNIT_FOREVER_REL,
2909                                   NULL, NULL);
2910     }
2911   schedule_quota_update (n);
2912 }
2913
2914
2915 /**
2916  * Function called by transport to notify us that
2917  * a peer connected to us (on the network level).
2918  *
2919  * @param cls closure
2920  * @param peer the peer that connected
2921  * @param latency current latency of the connection
2922  * @param distance in overlay hops, as given by transport plugin
2923  */
2924 static void
2925 handle_transport_notify_connect (void *cls,
2926                                  const struct GNUNET_PeerIdentity *peer,
2927                                  struct GNUNET_TIME_Relative latency,
2928                                  unsigned int distance)
2929 {
2930   struct Neighbour *n;
2931   struct GNUNET_TIME_Absolute now;
2932   struct ConnectNotifyMessage cnm;
2933
2934   n = find_neighbour (peer);
2935   if (n != NULL)
2936     {
2937       /* duplicate connect notification!? */
2938       GNUNET_break (0);
2939       return;
2940     }
2941   now = GNUNET_TIME_absolute_get ();
2942   n = GNUNET_malloc (sizeof (struct Neighbour));
2943   n->next = neighbours;
2944   neighbours = n;
2945   neighbour_count++;
2946   n->peer = *peer;
2947   n->last_latency = latency;
2948   n->last_distance = distance;
2949   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2950   n->encrypt_key_created = now;
2951   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2952   n->last_asw_update = now;
2953   n->last_arw_update = now;
2954   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2955   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2956   n->bpm_out_internal_limit = (uint32_t) - 1;
2957   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2958   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2959                                                 (uint32_t) - 1);
2960 #if DEBUG_CORE
2961   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2962               "Received connection from `%4s'.\n",
2963               GNUNET_i2s (&n->peer));
2964 #endif
2965   schedule_quota_update (n);
2966   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2967   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
2968   cnm.distance = htonl (n->last_distance);
2969   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2970   cnm.peer = *peer;
2971   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
2972   send_key (n);
2973 }
2974
2975
2976 /**
2977  * Free the given entry for the neighbour (it has
2978  * already been removed from the list at this point).
2979  *
2980  * @param n neighbour to free
2981  */
2982 static void
2983 free_neighbour (struct Neighbour *n)
2984 {
2985   struct MessageEntry *m;
2986
2987   if (n->pitr != NULL)
2988     {
2989       GNUNET_PEERINFO_iterate_cancel (n->pitr);
2990       n->pitr = NULL;
2991     }
2992   if (n->skm != NULL)
2993     {
2994       GNUNET_free (n->skm);
2995       n->skm = NULL;
2996     }
2997   while (NULL != (m = n->messages))
2998     {
2999       n->messages = m->next;
3000       GNUNET_free (m);
3001     }
3002   while (NULL != (m = n->encrypted_head))
3003     {
3004       n->encrypted_head = m->next;
3005       GNUNET_free (m);
3006     }
3007   if (NULL != n->th)
3008     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3009   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3010     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3011   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3012     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3013   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3014     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3015   GNUNET_free_non_null (n->public_key);
3016   GNUNET_free_non_null (n->pending_ping);
3017   GNUNET_free (n);
3018 }
3019
3020
3021 /**
3022  * Function called by transport telling us that a peer
3023  * disconnected.
3024  *
3025  * @param cls closure
3026  * @param peer the peer that disconnected
3027  */
3028 static void
3029 handle_transport_notify_disconnect (void *cls,
3030                                     const struct GNUNET_PeerIdentity *peer)
3031 {
3032   struct DisconnectNotifyMessage cnm;
3033   struct Neighbour *n;
3034   struct Neighbour *p;
3035
3036 #if DEBUG_CORE
3037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3038               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3039 #endif
3040   p = NULL;
3041   n = neighbours;
3042   while ((n != NULL) &&
3043          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3044     {
3045       p = n;
3046       n = n->next;
3047     }
3048   if (n == NULL)
3049     {
3050       GNUNET_break (0);
3051       return;
3052     }
3053   if (p == NULL)
3054     neighbours = n->next;
3055   else
3056     p->next = n->next;
3057   GNUNET_assert (neighbour_count > 0);
3058   neighbour_count--;
3059   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3060   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3061   cnm.peer = *peer;
3062   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3063   free_neighbour (n);
3064 }
3065
3066
3067 /**
3068  * Last task run during shutdown.  Disconnects us from
3069  * the transport.
3070  */
3071 static void
3072 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3073 {
3074   struct Neighbour *n;
3075   struct Client *c;
3076
3077 #if DEBUG_CORE
3078   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3079               "Core service shutting down.\n");
3080 #endif
3081   GNUNET_assert (transport != NULL);
3082   GNUNET_TRANSPORT_disconnect (transport);
3083   transport = NULL;
3084   while (NULL != (n = neighbours))
3085     {
3086       neighbours = n->next;
3087       GNUNET_assert (neighbour_count > 0);
3088       neighbour_count--;
3089       free_neighbour (n);
3090     }
3091   GNUNET_SERVER_notification_context_destroy (notifier);
3092   notifier = NULL;
3093   while (NULL != (c = clients))
3094     handle_client_disconnect (NULL, c->client_handle);
3095   if (my_private_key != NULL)
3096     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3097 }
3098
3099
3100 /**
3101  * Initiate core service.
3102  *
3103  * @param cls closure
3104  * @param s scheduler to use
3105  * @param serv the initialized server
3106  * @param c configuration to use
3107  */
3108 static void
3109 run (void *cls,
3110      struct GNUNET_SCHEDULER_Handle *s,
3111      struct GNUNET_SERVER_Handle *serv,
3112      const struct GNUNET_CONFIGURATION_Handle *c)
3113 {
3114 #if 0
3115   unsigned long long qin;
3116   unsigned long long qout;
3117   unsigned long long tneigh;
3118 #endif
3119   char *keyfile;
3120
3121   sched = s;
3122   cfg = c;  
3123   /* parse configuration */
3124   if (
3125        (GNUNET_OK !=
3126         GNUNET_CONFIGURATION_get_value_number (c,
3127                                                "CORE",
3128                                                "TOTAL_QUOTA_IN",
3129                                                &bandwidth_target_in)) ||
3130        (GNUNET_OK !=
3131         GNUNET_CONFIGURATION_get_value_number (c,
3132                                                "CORE",
3133                                                "TOTAL_QUOTA_OUT",
3134                                                &bandwidth_target_out)) ||
3135        (GNUNET_OK !=
3136         GNUNET_CONFIGURATION_get_value_filename (c,
3137                                                  "GNUNETD",
3138                                                  "HOSTKEY", &keyfile)))
3139     {
3140       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3141                   _
3142                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3143       GNUNET_SCHEDULER_shutdown (s);
3144       return;
3145     }
3146   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3147   GNUNET_free (keyfile);
3148   if (my_private_key == NULL)
3149     {
3150       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3151                   _("Core service could not access hostkey.  Exiting.\n"));
3152       GNUNET_SCHEDULER_shutdown (s);
3153       return;
3154     }
3155   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3156   GNUNET_CRYPTO_hash (&my_public_key,
3157                       sizeof (my_public_key), &my_identity.hashPubKey);
3158   /* setup notification */
3159   server = serv;
3160   notifier = GNUNET_SERVER_notification_context_create (server, 
3161                                                         MAX_NOTIFY_QUEUE);
3162   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3163   /* setup transport connection */
3164   transport = GNUNET_TRANSPORT_connect (sched,
3165                                         cfg,
3166                                         NULL,
3167                                         &handle_transport_receive,
3168                                         &handle_transport_notify_connect,
3169                                         &handle_transport_notify_disconnect);
3170   GNUNET_assert (NULL != transport);
3171   GNUNET_SCHEDULER_add_delayed (sched,
3172                                 GNUNET_TIME_UNIT_FOREVER_REL,
3173                                 &cleaning_task, NULL);
3174   /* process client requests */
3175   GNUNET_SERVER_add_handlers (server, handlers);
3176   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3177               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3178 }
3179
3180
3181
3182 /**
3183  * The main function for the transport service.
3184  *
3185  * @param argc number of arguments from the command line
3186  * @param argv command line arguments
3187  * @return 0 ok, 1 on error
3188  */
3189 int
3190 main (int argc, char *const *argv)
3191 {
3192   return (GNUNET_OK ==
3193           GNUNET_SERVICE_run (argc,
3194                               argv,
3195                               "core",
3196                               GNUNET_SERVICE_OPTION_NONE,
3197                               &run, NULL)) ? 0 : 1;
3198 }
3199
3200 /* end of gnunet-service-core.c */