set last_distance
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - not all GNUNET_CORE_OPTION_SEND_* flags are fully supported yet
28  *   (i.e. no SEND_XXX_OUTBOUND).
29  * - 'REQUEST_DISCONNECT' is not implemented (transport API is lacking!)
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Non-NULL if we are currently looking up HELLOs for this peer.
369    * for this peer.
370    */
371   struct GNUNET_PEERINFO_IteratorContext *pitr;
372
373   /**
374    * SetKeyMessage to transmit, NULL if we are not currently trying
375    * to send one.
376    */
377   struct SetKeyMessage *skm;
378
379   /**
380    * Identity of the neighbour.
381    */
382   struct GNUNET_PeerIdentity peer;
383
384   /**
385    * Key we use to encrypt our messages for the other peer
386    * (initialized by us when we do the handshake).
387    */
388   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
389
390   /**
391    * Key we use to decrypt messages from the other peer
392    * (given to us by the other peer during the handshake).
393    */
394   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
395
396   /**
397    * ID of task used for re-trying plaintext scheduling.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
400
401   /**
402    * ID of task used for re-trying SET_KEY and PING message.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
405
406   /**
407    * ID of task used for updating bandwidth quota for this neighbour.
408    */
409   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
410
411   /**
412    * At what time did we generate our encryption key?
413    */
414   struct GNUNET_TIME_Absolute encrypt_key_created;
415
416   /**
417    * At what time did the other peer generate the decryption key?
418    */
419   struct GNUNET_TIME_Absolute decrypt_key_created;
420
421   /**
422    * At what time did we initially establish (as in, complete session
423    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
424    */
425   struct GNUNET_TIME_Absolute time_established;
426
427   /**
428    * At what time did we last receive an encrypted message from the
429    * other peer?  Should be zero if status != KEY_CONFIRMED.
430    */
431   struct GNUNET_TIME_Absolute last_activity;
432
433   /**
434    * Last latency observed from this peer.
435    */
436   struct GNUNET_TIME_Relative last_latency;
437
438   /**
439    * At what frequency are we currently re-trying SET_KEY messages?
440    */
441   struct GNUNET_TIME_Relative set_key_retry_frequency;
442
443   /**
444    * Time of our last update to the "available_send_window".
445    */
446   struct GNUNET_TIME_Absolute last_asw_update;
447
448   /**
449    * Time of our last update to the "available_recv_window".
450    */
451   struct GNUNET_TIME_Absolute last_arw_update;
452
453   /**
454    * Number of bytes that we are eligible to transmit to this
455    * peer at this point.  Incremented every minute by max_out_bpm,
456    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
457    * bandwidth-hogs are sampled at a frequency of about 78s!);
458    * may get negative if we have VERY high priority content.
459    */
460   long long available_send_window; 
461
462   /**
463    * How much downstream capacity of this peer has been reserved for
464    * our traffic?  (Our clients can request that a certain amount of
465    * bandwidth is available for replies to them; this value is used to
466    * make sure that this reserved amount of bandwidth is actually
467    * available).
468    */
469   long long available_recv_window; 
470
471   /**
472    * How valueable were the messages of this peer recently?
473    */
474   unsigned long long current_preference;
475
476   /**
477    * Bit map indicating which of the 32 sequence numbers before the last
478    * were received (good for accepting out-of-order packets and
479    * estimating reliability of the connection)
480    */
481   unsigned int last_packets_bitmap;
482
483   /**
484    * Number of messages in the message queue for this peer.
485    */
486   unsigned int message_queue_size;
487
488   /**
489    * last sequence number received on this connection (highest)
490    */
491   uint32_t last_sequence_number_received;
492
493   /**
494    * last sequence number transmitted
495    */
496   uint32_t last_sequence_number_sent;
497
498   /**
499    * Available bandwidth in for this peer (current target).
500    */
501   uint32_t bpm_in;
502
503   /**
504    * Available bandwidth out for this peer (current target).
505    */
506   uint32_t bpm_out;
507
508   /**
509    * Internal bandwidth limit set for this peer (initially
510    * typically set to "-1").  "bpm_out" is MAX of
511    * "bpm_out_internal_limit" and "bpm_out_external_limit".
512    */
513   uint32_t bpm_out_internal_limit;
514
515   /**
516    * External bandwidth limit set for this peer by the
517    * peer that we are communicating with.  "bpm_out" is MAX of
518    * "bpm_out_internal_limit" and "bpm_out_external_limit".
519    */
520   uint32_t bpm_out_external_limit;
521
522   /**
523    * What was our PING challenge number (for this peer)?
524    */
525   uint32_t ping_challenge;
526
527   /**
528    * What was the last distance to this peer as reported by the transports?
529    */
530   uint32_t last_distance;
531
532   /**
533    * What is our connection status?
534    */
535   enum PeerStateMachine status;
536
537 };
538
539
540 /**
541  * Data structure for each client connected to the core service.
542  */
543 struct Client
544 {
545   /**
546    * Clients are kept in a linked list.
547    */
548   struct Client *next;
549
550   /**
551    * Handle for the client with the server API.
552    */
553   struct GNUNET_SERVER_Client *client_handle;
554
555   /**
556    * Array of the types of messages this peer cares
557    * about (with "tcnt" entries).  Allocated as part
558    * of this client struct, do not free!
559    */
560   uint16_t *types;
561
562   /**
563    * Options for messages this client cares about,
564    * see GNUNET_CORE_OPTION_ values.
565    */
566   uint32_t options;
567
568   /**
569    * Number of types of incoming messages this client
570    * specifically cares about.  Size of the "types" array.
571    */
572   unsigned int tcnt;
573
574 };
575
576
577 /**
578  * Our public key.
579  */
580 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
581
582 /**
583  * Our identity.
584  */
585 static struct GNUNET_PeerIdentity my_identity;
586
587 /**
588  * Our private key.
589  */
590 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
591
592 /**
593  * Our scheduler.
594  */
595 struct GNUNET_SCHEDULER_Handle *sched;
596
597 /**
598  * Our configuration.
599  */
600 const struct GNUNET_CONFIGURATION_Handle *cfg;
601
602 /**
603  * Our server.
604  */
605 static struct GNUNET_SERVER_Handle *server;
606
607 /**
608  * Transport service.
609  */
610 static struct GNUNET_TRANSPORT_Handle *transport;
611
612 /**
613  * Linked list of our clients.
614  */
615 static struct Client *clients;
616
617 /**
618  * Context for notifications we need to send to our clients.
619  */
620 static struct GNUNET_SERVER_NotificationContext *notifier;
621
622 /**
623  * We keep neighbours in a linked list (for now).
624  */
625 static struct Neighbour *neighbours;
626
627 /**
628  * Sum of all preferences among all neighbours.
629  */
630 static unsigned long long preference_sum;
631
632 /**
633  * Total number of neighbours we have.
634  */
635 static unsigned int neighbour_count;
636
637 /**
638  * How much inbound bandwidth are we supposed to be using?
639  */
640 static unsigned long long bandwidth_target_in;
641
642 /**
643  * How much outbound bandwidth are we supposed to be using?
644  */
645 static unsigned long long bandwidth_target_out;
646
647
648
649 /**
650  * A preference value for a neighbour was update.  Update
651  * the preference sum accordingly.
652  *
653  * @param inc how much was a preference value increased?
654  */
655 static void
656 update_preference_sum (unsigned long long inc)
657 {
658   struct Neighbour *n;
659   unsigned long long os;
660
661   os = preference_sum;
662   preference_sum += inc;
663   if (preference_sum >= os)
664     return; /* done! */
665   /* overflow! compensate by cutting all values in half! */
666   preference_sum = 0;
667   n = neighbours;
668   while (n != NULL)
669     {
670       n->current_preference /= 2;
671       preference_sum += n->current_preference;
672       n = n->next;
673     }    
674 }
675
676
677 /**
678  * Recalculate the number of bytes we expect to
679  * receive or transmit in a given window.
680  *
681  * @param force force an update now (even if not much time has passed)
682  * @param window pointer to the byte counter (updated)
683  * @param ts pointer to the timestamp (updated)
684  * @param bpm number of bytes per minute that should
685  *        be added to the window.
686  */
687 static void
688 update_window (int force,
689                long long *window,
690                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
691 {
692   struct GNUNET_TIME_Relative since;
693
694   since = GNUNET_TIME_absolute_get_duration (*ts);
695   if ( (force == GNUNET_NO) &&
696        (since.value < 60 * 1000) )
697     return;                     /* not even a minute has passed */
698   *ts = GNUNET_TIME_absolute_get ();
699   *window += (bpm * since.value) / 60 / 1000;
700   if (*window > MAX_WINDOW_TIME * bpm)
701     *window = MAX_WINDOW_TIME * bpm;
702 }
703
704
705 /**
706  * Find the entry for the given neighbour.
707  *
708  * @param peer identity of the neighbour
709  * @return NULL if we are not connected, otherwise the
710  *         neighbour's entry.
711  */
712 static struct Neighbour *
713 find_neighbour (const struct GNUNET_PeerIdentity *peer)
714 {
715   struct Neighbour *ret;
716
717   ret = neighbours;
718   while ((ret != NULL) &&
719          (0 != memcmp (&ret->peer,
720                        peer, sizeof (struct GNUNET_PeerIdentity))))
721     ret = ret->next;
722   return ret;
723 }
724
725
726 /**
727  * Send a message to one of our clients.
728  *
729  * @param client target for the message
730  * @param msg message to transmit
731  * @param can_drop could this message be dropped if the
732  *        client's queue is getting too large?
733  */
734 static void
735 send_to_client (struct Client *client,
736                 const struct GNUNET_MessageHeader *msg, 
737                 int can_drop)
738 {
739 #if DEBUG_CORE_CLIENT
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741               "Preparing to send message of type %u to client.\n",
742               ntohs (msg->type));
743 #endif  
744   GNUNET_SERVER_notification_context_unicast (notifier,
745                                               client->client_handle,
746                                               msg,
747                                               can_drop);
748 }
749
750
751 /**
752  * Send a message to all of our current clients that have
753  * the right options set.
754  * 
755  * @param msg message to multicast
756  * @param can_drop can this message be discarded if the queue is too long
757  * @param options mask to use 
758  */
759 static void
760 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
761                      int can_drop,
762                      int options)
763 {
764   struct Client *c;
765
766   c = clients;
767   while (c != NULL)
768     {
769       if (0 != (c->options & options))
770         send_to_client (c, msg, can_drop);
771       c = c->next;
772     }
773 }
774
775
776 /**
777  * Handle CORE_INIT request.
778  */
779 static void
780 handle_client_init (void *cls,
781                     struct GNUNET_SERVER_Client *client,
782                     const struct GNUNET_MessageHeader *message)
783 {
784   const struct InitMessage *im;
785   struct InitReplyMessage irm;
786   struct Client *c;
787   uint16_t msize;
788   const uint16_t *types;
789   struct Neighbour *n;
790   struct ConnectNotifyMessage cnm;
791
792 #if DEBUG_CORE_CLIENT
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Client connecting to core service with `%s' message\n",
795               "INIT");
796 #endif
797   /* check that we don't have an entry already */
798   c = clients;
799   while (c != NULL)
800     {
801       if (client == c->client_handle)
802         {
803           GNUNET_break (0);
804           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
805           return;
806         }
807       c = c->next;
808     }
809   msize = ntohs (message->size);
810   if (msize < sizeof (struct InitMessage))
811     {
812       GNUNET_break (0);
813       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
814       return;
815     }
816   GNUNET_SERVER_notification_context_add (notifier, client);
817   im = (const struct InitMessage *) message;
818   types = (const uint16_t *) &im[1];
819   msize -= sizeof (struct InitMessage);
820   c = GNUNET_malloc (sizeof (struct Client) + msize);
821   c->client_handle = client;
822   c->next = clients;
823   clients = c;
824   memcpy (&c[1], types, msize);
825   c->types = (uint16_t *) & c[1];
826   c->options = ntohl (im->options);
827   c->tcnt = msize / sizeof (uint16_t);
828   /* send init reply message */
829   irm.header.size = htons (sizeof (struct InitReplyMessage));
830   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
831   irm.reserved = htonl (0);
832   memcpy (&irm.publicKey,
833           &my_public_key,
834           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
835 #if DEBUG_CORE_CLIENT
836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
837               "Sending `%s' message to client.\n", "INIT_REPLY");
838 #endif
839   send_to_client (c, &irm.header, GNUNET_NO);
840   /* notify new client about existing neighbours */
841   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
842   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
843   n = neighbours;
844   while (n != NULL)
845     {
846 #if DEBUG_CORE_CLIENT
847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
849 #endif
850       cnm.distance = htonl (n->last_distance);
851       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
852       cnm.peer = n->peer;
853       send_to_client (c, &cnm.header, GNUNET_NO);
854       n = n->next;
855     }
856 }
857
858
859 /**
860  * A client disconnected, clean up.
861  *
862  * @param cls closure
863  * @param client identification of the client
864  */
865 static void
866 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
867 {
868   struct Client *pos;
869   struct Client *prev;
870
871   if (client == NULL)
872     return;
873 #if DEBUG_CORE_CLIENT
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "Client has disconnected from core service.\n");
876 #endif
877   prev = NULL;
878   pos = clients;
879   while (pos != NULL)
880     {
881       if (client == pos->client_handle)
882         {
883           if (prev == NULL)
884             clients = pos->next;
885           else
886             prev->next = pos->next;
887           GNUNET_free (pos);
888           return;
889         }
890       prev = pos;
891       pos = pos->next;
892     }
893   /* client never sent INIT */
894 }
895
896
897 /**
898  * Handle REQUEST_INFO request.
899  */
900 static void
901 handle_client_request_info (void *cls,
902                             struct GNUNET_SERVER_Client *client,
903                             const struct GNUNET_MessageHeader *message)
904 {
905   const struct RequestInfoMessage *rcm;
906   struct Neighbour *n;
907   struct ConfigurationInfoMessage cim;
908   int reserv;
909   unsigned long long old_preference;
910   struct GNUNET_SERVER_TransmitContext *tc;
911
912 #if DEBUG_CORE_CLIENT
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914               "Core service receives `%s' request.\n", "REQUEST_INFO");
915 #endif
916   rcm = (const struct RequestInfoMessage *) message;
917   n = find_neighbour (&rcm->peer);
918   memset (&cim, 0, sizeof (cim));
919   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
920     {
921       update_window (GNUNET_YES,
922                      &n->available_send_window,
923                      &n->last_asw_update,
924                      n->bpm_out);
925       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
926       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
927                                n->bpm_out_external_limit);
928       reserv = ntohl (rcm->reserve_inbound);
929       if (reserv < 0)
930         {
931           n->available_recv_window += reserv;
932         }
933       else if (reserv > 0)
934         {
935           update_window (GNUNET_NO,
936                          &n->available_recv_window,
937                          &n->last_arw_update, n->bpm_in);
938           if (n->available_recv_window < reserv)
939             reserv = n->available_recv_window;
940           n->available_recv_window -= reserv;
941         }
942       old_preference = n->current_preference;
943       n->current_preference += GNUNET_ntohll(rcm->preference_change);
944       if (old_preference > n->current_preference) 
945         {
946           /* overflow; cap at maximum value */
947           n->current_preference = (unsigned long long) -1;
948         }
949       update_preference_sum (n->current_preference - old_preference);
950       cim.reserved_amount = htonl (reserv);
951       cim.bpm_in = htonl (n->bpm_in);
952       cim.bpm_out = htonl (n->bpm_out);
953       cim.preference = n->current_preference;
954     }
955   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
956   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
957   cim.peer = rcm->peer;
958
959 #if DEBUG_CORE_CLIENT
960   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
962 #endif
963   tc = GNUNET_SERVER_transmit_context_create (client);
964   GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
965   GNUNET_SERVER_transmit_context_run (tc,
966                                       GNUNET_TIME_UNIT_FOREVER_REL);
967 }
968
969
970 /**
971  * Check if we have encrypted messages for the specified neighbour
972  * pending, and if so, check with the transport about sending them
973  * out.
974  *
975  * @param n neighbour to check.
976  */
977 static void process_encrypted_neighbour_queue (struct Neighbour *n);
978
979
980 /**
981  * Function called when the transport service is ready to
982  * receive an encrypted message for the respective peer
983  *
984  * @param cls neighbour to use message from
985  * @param size number of bytes we can transmit
986  * @param buf where to copy the message
987  * @return number of bytes transmitted
988  */
989 static size_t
990 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
991 {
992   struct Neighbour *n = cls;
993   struct MessageEntry *m;
994   size_t ret;
995   char *cbuf;
996
997   n->th = NULL;
998   GNUNET_assert (NULL != (m = n->encrypted_head));
999   n->encrypted_head = m->next;
1000   if (m->next == NULL)
1001     n->encrypted_tail = NULL;
1002   ret = 0;
1003   cbuf = buf;
1004   if (buf != NULL)
1005     {
1006       GNUNET_assert (size >= m->size);
1007       memcpy (cbuf, &m[1], m->size);
1008       ret = m->size;
1009       n->available_send_window -= m->size;
1010       process_encrypted_neighbour_queue (n);
1011 #if DEBUG_CORE
1012       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1014                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1015                   ret, GNUNET_i2s (&n->peer));
1016 #endif
1017     }
1018   else
1019     {
1020       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1021                   "Transmission for message of type %u and size %u failed\n",
1022                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1023                   m->size);
1024     }
1025   GNUNET_free (m);
1026   return ret;
1027 }
1028
1029
1030 /**
1031  * Check if we have plaintext messages for the specified neighbour
1032  * pending, and if so, consider batching and encrypting them (and
1033  * then trigger processing of the encrypted queue if needed).
1034  *
1035  * @param n neighbour to check.
1036  */
1037 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1038
1039
1040 /**
1041  * Check if we have encrypted messages for the specified neighbour
1042  * pending, and if so, check with the transport about sending them
1043  * out.
1044  *
1045  * @param n neighbour to check.
1046  */
1047 static void
1048 process_encrypted_neighbour_queue (struct Neighbour *n)
1049 {
1050   struct MessageEntry *m;
1051  
1052   if (n->th != NULL)
1053     return;  /* request already pending */
1054   if (n->encrypted_head == NULL)
1055     {
1056       /* encrypted queue empty, try plaintext instead */
1057       process_plaintext_neighbour_queue (n);
1058       return;
1059     }
1060 #if DEBUG_CORE
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1063               n->encrypted_head->size,
1064               GNUNET_i2s (&n->peer),
1065               GNUNET_TIME_absolute_get_remaining (n->
1066                                                   encrypted_head->deadline).
1067               value);
1068 #endif
1069   n->th =
1070     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1071                                             n->encrypted_head->size,
1072                                             n->encrypted_head->priority,
1073                                             GNUNET_TIME_absolute_get_remaining
1074                                             (n->encrypted_head->deadline),
1075                                             &notify_encrypted_transmit_ready,
1076                                             n);
1077   if (n->th == NULL)
1078     {
1079       /* message request too large (oops) */
1080       GNUNET_break (0);
1081       /* discard encrypted message */
1082       GNUNET_assert (NULL != (m = n->encrypted_head));
1083       n->encrypted_head = m->next;
1084       if (m->next == NULL)
1085         n->encrypted_tail = NULL;
1086       GNUNET_free (m);
1087       process_encrypted_neighbour_queue (n);
1088     }
1089 }
1090
1091
1092 /**
1093  * Decrypt size bytes from in and write the result to out.  Use the
1094  * key for inbound traffic of the given neighbour.  This function does
1095  * NOT do any integrity-checks on the result.
1096  *
1097  * @param n neighbour we are receiving from
1098  * @param iv initialization vector to use
1099  * @param in ciphertext
1100  * @param out plaintext
1101  * @param size size of in/out
1102  * @return GNUNET_OK on success
1103  */
1104 static int
1105 do_decrypt (struct Neighbour *n,
1106             const GNUNET_HashCode * iv,
1107             const void *in, void *out, size_t size)
1108 {
1109   if (size != (uint16_t) size)
1110     {
1111       GNUNET_break (0);
1112       return GNUNET_NO;
1113     }
1114   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1115       (n->status != PEER_STATE_KEY_CONFIRMED))
1116     {
1117       GNUNET_break_op (0);
1118       return GNUNET_SYSERR;
1119     }
1120   if (size !=
1121       GNUNET_CRYPTO_aes_decrypt (in,
1122                                  (uint16_t) size,
1123                                  &n->decrypt_key,
1124                                  (const struct
1125                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1126                                  out))
1127     {
1128       GNUNET_break (0);
1129       return GNUNET_SYSERR;
1130     }
1131 #if DEBUG_CORE
1132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133               "Decrypted %u bytes from `%4s' using key %u\n",
1134               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1135 #endif
1136   return GNUNET_OK;
1137 }
1138
1139
1140 /**
1141  * Encrypt size bytes from in and write the result to out.  Use the
1142  * key for outbound traffic of the given neighbour.
1143  *
1144  * @param n neighbour we are sending to
1145  * @param iv initialization vector to use
1146  * @param in ciphertext
1147  * @param out plaintext
1148  * @param size size of in/out
1149  * @return GNUNET_OK on success
1150  */
1151 static int
1152 do_encrypt (struct Neighbour *n,
1153             const GNUNET_HashCode * iv,
1154             const void *in, void *out, size_t size)
1155 {
1156   if (size != (uint16_t) size)
1157     {
1158       GNUNET_break (0);
1159       return GNUNET_NO;
1160     }
1161   GNUNET_assert (size ==
1162                  GNUNET_CRYPTO_aes_encrypt (in,
1163                                             (uint16_t) size,
1164                                             &n->encrypt_key,
1165                                             (const struct
1166                                              GNUNET_CRYPTO_AesInitializationVector
1167                                              *) iv, out));
1168 #if DEBUG_CORE
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Encrypted %u bytes for `%4s' using key %u\n", size,
1171               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1172 #endif
1173   return GNUNET_OK;
1174 }
1175
1176
1177 /**
1178  * Select messages for transmission.  This heuristic uses a combination
1179  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1180  * and priority-based discard (in case no feasible schedule exist) and
1181  * speculative optimization (defer any kind of transmission until
1182  * we either create a batch of significant size, 25% of max, or until
1183  * we are close to a deadline).  Furthermore, when scheduling the
1184  * heuristic also packs as many messages into the batch as possible,
1185  * starting with those with the earliest deadline.  Yes, this is fun.
1186  *
1187  * @param n neighbour to select messages from
1188  * @param size number of bytes to select for transmission
1189  * @param retry_time set to the time when we should try again
1190  *        (only valid if this function returns zero)
1191  * @return number of bytes selected, or 0 if we decided to
1192  *         defer scheduling overall; in that case, retry_time is set.
1193  */
1194 static size_t
1195 select_messages (struct Neighbour *n,
1196                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1197 {
1198   struct MessageEntry *pos;
1199   struct MessageEntry *min;
1200   struct MessageEntry *last;
1201   unsigned int min_prio;
1202   struct GNUNET_TIME_Absolute t;
1203   struct GNUNET_TIME_Absolute now;
1204   uint64_t delta;
1205   uint64_t avail;
1206   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1207   size_t off;
1208   int discard_low_prio;
1209
1210   GNUNET_assert (NULL != n->messages);
1211   now = GNUNET_TIME_absolute_get ();
1212   /* last entry in linked list of messages processed */
1213   last = NULL;
1214   /* should we remove the entry with the lowest
1215      priority from consideration for scheduling at the
1216      end of the loop? */
1217   discard_low_prio = GNUNET_YES;
1218   while (GNUNET_YES == discard_low_prio)
1219     {
1220       min = NULL;
1221       min_prio = -1;
1222       discard_low_prio = GNUNET_NO;
1223       /* calculate number of bytes available for transmission at time "t" */
1224       update_window (GNUNET_NO,
1225                      &n->available_send_window,
1226                      &n->last_asw_update,
1227                      n->bpm_out);
1228       avail = n->available_send_window;
1229       t = n->last_asw_update;
1230       /* how many bytes have we (hypothetically) scheduled so far */
1231       off = 0;
1232       /* maximum time we can wait before transmitting anything
1233          and still make all of our deadlines */
1234       slack = -1;
1235
1236       pos = n->messages;
1237       /* note that we use "*2" here because we want to look
1238          a bit further into the future; much more makes no
1239          sense since new message might be scheduled in the
1240          meantime... */
1241       while ((pos != NULL) && (off < size * 2))
1242         {
1243           if (pos->do_transmit == GNUNET_YES)
1244             {
1245               /* already removed from consideration */
1246               pos = pos->next;
1247               continue;
1248             }
1249           if (discard_low_prio == GNUNET_NO)
1250             {
1251               delta = pos->deadline.value;
1252               if (delta < t.value)
1253                 delta = 0;
1254               else
1255                 delta = t.value - delta;
1256               avail += delta * n->bpm_out / 1000 / 60;
1257               if (avail < pos->size)
1258                 {
1259                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1260                 }
1261               else
1262                 {
1263                   avail -= pos->size;
1264                   /* update slack, considering both its absolute deadline
1265                      and relative deadlines caused by other messages
1266                      with their respective load */
1267                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1268                   if (pos->deadline.value < now.value)
1269                     slack = 0;
1270                   else
1271                     slack =
1272                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1273                 }
1274             }
1275           off += pos->size;
1276           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1277           if (pos->priority <= min_prio)
1278             {
1279               /* update min for discard */
1280               min_prio = pos->priority;
1281               min = pos;
1282             }
1283           pos = pos->next;
1284         }
1285       if (discard_low_prio)
1286         {
1287           GNUNET_assert (min != NULL);
1288           /* remove lowest-priority entry from consideration */
1289           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1290         }
1291       last = pos;
1292     }
1293   /* guard against sending "tiny" messages with large headers without
1294      urgent deadlines */
1295   if ( (slack > 1000) && (size > 4 * off) )
1296     {
1297       /* less than 25% of message would be filled with
1298          deadlines still being met if we delay by one
1299          second or more; so just wait for more data */
1300       retry_time->value = slack / 2;
1301       /* reset do_transmit values for next time */
1302       while (pos != last)
1303         {
1304           pos->do_transmit = GNUNET_NO;
1305           pos = pos->next;
1306         }
1307       return 0;
1308     }
1309   /* select marked messages (up to size) for transmission */
1310   off = 0;
1311   pos = n->messages;
1312   while (pos != last)
1313     {
1314       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1315         {
1316           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1317           off += pos->size;
1318           size -= pos->size;
1319         }
1320       else
1321         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1322       pos = pos->next;
1323     }
1324 #if DEBUG_CORE
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1327               off, GNUNET_i2s (&n->peer));
1328 #endif
1329   return off;
1330 }
1331
1332
1333 /**
1334  * Batch multiple messages into a larger buffer.
1335  *
1336  * @param n neighbour to take messages from
1337  * @param buf target buffer
1338  * @param size size of buf
1339  * @param deadline set to transmission deadline for the result
1340  * @param retry_time set to the time when we should try again
1341  *        (only valid if this function returns zero)
1342  * @param priority set to the priority of the batch
1343  * @return number of bytes written to buf (can be zero)
1344  */
1345 static size_t
1346 batch_message (struct Neighbour *n,
1347                char *buf,
1348                size_t size,
1349                struct GNUNET_TIME_Absolute *deadline,
1350                struct GNUNET_TIME_Relative *retry_time,
1351                unsigned int *priority)
1352 {
1353   struct MessageEntry *pos;
1354   struct MessageEntry *prev;
1355   struct MessageEntry *next;
1356   size_t ret;
1357
1358   ret = 0;
1359   *priority = 0;
1360   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1361   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1362   if (0 == select_messages (n, size, retry_time))
1363     {
1364       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1365                   "No messages selected, will try again in %llu ms\n",
1366                   retry_time->value);
1367       return 0;
1368     }
1369   pos = n->messages;
1370   prev = NULL;
1371   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1372     {
1373       next = pos->next;
1374       if (GNUNET_YES == pos->do_transmit)
1375         {
1376           GNUNET_assert (pos->size <= size);
1377           memcpy (&buf[ret], &pos[1], pos->size);
1378           ret += pos->size;
1379           size -= pos->size;
1380           *priority += pos->priority;
1381 #if DEBUG_CORE
1382           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1383                       "Adding plaintext message with deadline %llu ms to batch\n",
1384                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1385 #endif
1386           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1387           GNUNET_free (pos);
1388           if (prev == NULL)
1389             n->messages = next;
1390           else
1391             prev->next = next;
1392         }
1393       else
1394         {
1395           prev = pos;
1396         }
1397       pos = next;
1398     }
1399 #if DEBUG_CORE
1400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401               "Deadline for message batch is %llu ms\n",
1402               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1403 #endif
1404   return ret;
1405 }
1406
1407
1408 /**
1409  * Remove messages with deadlines that have long expired from
1410  * the queue.
1411  *
1412  * @param n neighbour to inspect
1413  */
1414 static void
1415 discard_expired_messages (struct Neighbour *n)
1416 {
1417   struct MessageEntry *prev;
1418   struct MessageEntry *next;
1419   struct MessageEntry *pos;
1420   struct GNUNET_TIME_Absolute now;
1421   struct GNUNET_TIME_Relative delta;
1422
1423   now = GNUNET_TIME_absolute_get ();
1424   prev = NULL;
1425   pos = n->messages;
1426   while (pos != NULL) 
1427     {
1428       next = pos->next;
1429       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1430       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1431         {
1432 #if DEBUG_CORE
1433           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1434                       "Message is %llu ms past due, discarding.\n",
1435                       delta.value);
1436 #endif
1437           if (prev == NULL)
1438             n->messages = next;
1439           else
1440             prev->next = next;
1441           GNUNET_free (pos);
1442         }
1443       else
1444         prev = pos;
1445       pos = next;
1446     }
1447 }
1448
1449
1450 /**
1451  * Signature of the main function of a task.
1452  *
1453  * @param cls closure
1454  * @param tc context information (why was this task triggered now)
1455  */
1456 static void
1457 retry_plaintext_processing (void *cls,
1458                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1459 {
1460   struct Neighbour *n = cls;
1461
1462   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1463   process_plaintext_neighbour_queue (n);
1464 }
1465
1466
1467 /**
1468  * Send our key (and encrypted PING) to the other peer.
1469  *
1470  * @param n the other peer
1471  */
1472 static void send_key (struct Neighbour *n);
1473
1474
1475 /**
1476  * Check if we have plaintext messages for the specified neighbour
1477  * pending, and if so, consider batching and encrypting them (and
1478  * then trigger processing of the encrypted queue if needed).
1479  *
1480  * @param n neighbour to check.
1481  */
1482 static void
1483 process_plaintext_neighbour_queue (struct Neighbour *n)
1484 {
1485   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1486   size_t used;
1487   size_t esize;
1488   struct EncryptedMessage *em;  /* encrypted message */
1489   struct EncryptedMessage *ph;  /* plaintext header */
1490   struct MessageEntry *me;
1491   unsigned int priority;
1492   struct GNUNET_TIME_Absolute deadline;
1493   struct GNUNET_TIME_Relative retry_time;
1494
1495   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1496     {
1497       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1498       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1499     }
1500   switch (n->status)
1501     {
1502     case PEER_STATE_DOWN:
1503       send_key (n);
1504 #if DEBUG_CORE
1505       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1506                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1507                   GNUNET_i2s(&n->peer));
1508 #endif
1509       return;
1510     case PEER_STATE_KEY_SENT:
1511       GNUNET_assert (n->retry_set_key_task !=
1512                      GNUNET_SCHEDULER_NO_TASK);
1513 #if DEBUG_CORE
1514       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1515                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1516                   GNUNET_i2s(&n->peer));
1517 #endif
1518       return;
1519     case PEER_STATE_KEY_RECEIVED:
1520       GNUNET_assert (n->retry_set_key_task !=
1521                      GNUNET_SCHEDULER_NO_TASK);
1522 #if DEBUG_CORE
1523       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1525                   GNUNET_i2s(&n->peer));
1526 #endif
1527       return;
1528     case PEER_STATE_KEY_CONFIRMED:
1529       /* ready to continue */
1530       break;
1531     }
1532   discard_expired_messages (n);
1533   if (n->messages == NULL)
1534     {
1535 #if DEBUG_CORE
1536       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537                   "Plaintext message queue for `%4s' is empty.\n",
1538                   GNUNET_i2s(&n->peer));
1539 #endif
1540       return;                   /* no pending messages */
1541     }
1542   if (n->encrypted_head != NULL)
1543     {
1544 #if DEBUG_CORE
1545       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1547                   GNUNET_i2s(&n->peer));
1548 #endif
1549       return;                   /* wait for messages already encrypted to be
1550                                    processed first! */
1551     }
1552   ph = (struct EncryptedMessage *) pbuf;
1553   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1554   priority = 0;
1555   used = sizeof (struct EncryptedMessage);
1556   used += batch_message (n,
1557                          &pbuf[used],
1558                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1559                          &deadline, &retry_time, &priority);
1560   if (used == sizeof (struct EncryptedMessage))
1561     {
1562 #if DEBUG_CORE
1563       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1564                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1565                   GNUNET_i2s(&n->peer));
1566 #endif
1567       /* no messages selected for sending, try again later... */
1568       n->retry_plaintext_task =
1569         GNUNET_SCHEDULER_add_delayed (sched,
1570                                       retry_time,
1571                                       &retry_plaintext_processing, n);
1572       return;
1573     }
1574   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1575   ph->inbound_bpm_limit = htonl (n->bpm_in);
1576   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1577
1578   /* setup encryption message header */
1579   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1580   me->deadline = deadline;
1581   me->priority = priority;
1582   me->size = used;
1583   em = (struct EncryptedMessage *) &me[1];
1584   em->header.size = htons (used);
1585   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1586   em->reserved = htonl (0);
1587   esize = used - ENCRYPTED_HEADER_SIZE;
1588   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1589   /* encrypt */
1590 #if DEBUG_CORE
1591   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1592               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1593               esize,
1594               GNUNET_i2s(&n->peer),
1595               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1596 #endif
1597   GNUNET_assert (GNUNET_OK ==
1598                  do_encrypt (n,
1599                              &em->plaintext_hash,
1600                              &ph->sequence_number,
1601                              &em->sequence_number, esize));
1602   /* append to transmission list */
1603   if (n->encrypted_tail == NULL)
1604     n->encrypted_head = me;
1605   else
1606     n->encrypted_tail->next = me;
1607   n->encrypted_tail = me;
1608   process_encrypted_neighbour_queue (n);
1609 }
1610
1611
1612 /**
1613  * Handle CORE_SEND request.
1614  *
1615  * @param cls unused
1616  * @param client the client issuing the request
1617  * @param message the "struct SendMessage"
1618  */
1619 static void
1620 handle_client_send (void *cls,
1621                     struct GNUNET_SERVER_Client *client,
1622                     const struct GNUNET_MessageHeader *message);
1623
1624
1625 /**
1626  * Function called to notify us that we either succeeded
1627  * or failed to connect (at the transport level) to another
1628  * peer.  We should either free the message we were asked
1629  * to transmit or re-try adding it to the queue.
1630  *
1631  * @param cls closure
1632  * @param size number of bytes available in buf
1633  * @param buf where the callee should write the message
1634  * @return number of bytes written to buf
1635  */
1636 static size_t
1637 send_connect_continuation (void *cls, size_t size, void *buf)
1638 {
1639   struct SendMessage *sm = cls;
1640
1641   if (buf == NULL)
1642     {
1643 #if DEBUG_CORE
1644       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1645                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1646                   GNUNET_i2s (&sm->peer));
1647 #endif
1648       GNUNET_free (sm);
1649       return 0;
1650     }
1651 #if DEBUG_CORE
1652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1654               GNUNET_i2s (&sm->peer));
1655 #endif
1656   handle_client_send (NULL, NULL, &sm->header);
1657   GNUNET_free (sm);
1658   return 0;
1659 }
1660
1661
1662 /**
1663  * Handle CORE_SEND request.
1664  *
1665  * @param cls unused
1666  * @param client the client issuing the request
1667  * @param message the "struct SendMessage"
1668  */
1669 static void
1670 handle_client_send (void *cls,
1671                     struct GNUNET_SERVER_Client *client,
1672                     const struct GNUNET_MessageHeader *message)
1673 {
1674   const struct SendMessage *sm;
1675   struct SendMessage *smc;
1676   const struct GNUNET_MessageHeader *mh;
1677   struct Neighbour *n;
1678   struct MessageEntry *prev;
1679   struct MessageEntry *pos;
1680   struct MessageEntry *e; 
1681   struct MessageEntry *min_prio_entry;
1682   struct MessageEntry *min_prio_prev;
1683   unsigned int min_prio;
1684   unsigned int queue_size;
1685   uint16_t msize;
1686
1687   msize = ntohs (message->size);
1688   if (msize <
1689       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1690     {
1691       GNUNET_break (0);
1692       if (client != NULL)
1693         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1694       return;
1695     }
1696   sm = (const struct SendMessage *) message;
1697   msize -= sizeof (struct SendMessage);
1698   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1699   if (msize != ntohs (mh->size))
1700     {
1701       GNUNET_break (0);
1702       if (client != NULL)
1703         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1704       return;
1705     }
1706   n = find_neighbour (&sm->peer);
1707   if (n == NULL)
1708     {
1709 #if DEBUG_CORE
1710       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1712                   "SEND",
1713                   GNUNET_i2s (&sm->peer),
1714                   GNUNET_TIME_absolute_get_remaining
1715                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1716 #endif
1717       msize += sizeof (struct SendMessage);
1718       /* ask transport to connect to the peer */
1719       smc = GNUNET_malloc (msize);
1720       memcpy (smc, sm, msize);
1721       if (NULL ==
1722           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1723                                                   &sm->peer,
1724                                                   0, 0,
1725                                                   GNUNET_TIME_absolute_get_remaining
1726                                                   (GNUNET_TIME_absolute_ntoh
1727                                                    (sm->deadline)),
1728                                                   &send_connect_continuation,
1729                                                   smc))
1730         {
1731           /* transport has already a request pending for this peer! */
1732 #if DEBUG_CORE
1733           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1734                       "Dropped second message destined for `%4s' since connection is still down.\n",
1735                       GNUNET_i2s(&sm->peer));
1736 #endif
1737           GNUNET_free (smc);
1738         }
1739       if (client != NULL)
1740         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1741       return;
1742     }
1743 #if DEBUG_CORE
1744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1745               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1746               "SEND",
1747               msize, 
1748               GNUNET_i2s (&sm->peer));
1749 #endif
1750   /* bound queue size */
1751   discard_expired_messages (n);
1752   min_prio = (unsigned int) -1;
1753   min_prio_entry = NULL;
1754   min_prio_prev = NULL;
1755   queue_size = 0;
1756   prev = NULL;
1757   pos = n->messages;
1758   while (pos != NULL) 
1759     {
1760       if (pos->priority < min_prio)
1761         {
1762           min_prio_entry = pos;
1763           min_prio_prev = prev;
1764           min_prio = pos->priority;
1765         }
1766       queue_size++;
1767       prev = pos;
1768       pos = pos->next;
1769     }
1770   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1771     {
1772       /* queue full */
1773       if (ntohl(sm->priority) <= min_prio)
1774         {
1775           /* discard new entry */
1776 #if DEBUG_CORE
1777           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778                       "Queue full, discarding new request\n");
1779 #endif
1780           if (client != NULL)
1781             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1782           return;
1783         }
1784       /* discard "min_prio_entry" */
1785 #if DEBUG_CORE
1786       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787                   "Queue full, discarding existing older request\n");
1788 #endif
1789       if (min_prio_prev == NULL)
1790         n->messages = min_prio_entry->next;
1791       else
1792         min_prio_prev->next = min_prio_entry->next;      
1793       GNUNET_free (min_prio_entry);     
1794     }
1795
1796 #if DEBUG_CORE
1797   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1798               "Adding transmission request for `%4s' to queue\n",
1799               GNUNET_i2s (&sm->peer));
1800 #endif  
1801   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1802   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1803   e->priority = ntohl (sm->priority);
1804   e->size = msize;
1805   memcpy (&e[1], mh, msize);
1806
1807   /* insert, keep list sorted by deadline */
1808   prev = NULL;
1809   pos = n->messages;
1810   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1811     {
1812       prev = pos;
1813       pos = pos->next;
1814     }
1815   if (prev == NULL)
1816     n->messages = e;
1817   else
1818     prev->next = e;
1819   e->next = pos;
1820
1821   /* consider scheduling now */
1822   process_plaintext_neighbour_queue (n);
1823   if (client != NULL)
1824     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1825 }
1826
1827
1828 /**
1829  * Handle CORE_REQUEST_CONNECT request.
1830  *
1831  * @param cls unused
1832  * @param client the client issuing the request
1833  * @param message the "struct ConnectMessage"
1834  */
1835 static void
1836 handle_client_request_connect (void *cls,
1837                                struct GNUNET_SERVER_Client *client,
1838                                const struct GNUNET_MessageHeader *message)
1839 {
1840   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
1841   struct Neighbour *n;
1842
1843   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1844   n = find_neighbour (&cm->peer);
1845   if (n != NULL)
1846     return; /* already connected, or at least trying */
1847 #if DEBUG_CORE
1848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1849               "Core received `%s' request for `%4s', will try to establish connection\n",
1850               "REQUEST_CONNECT",
1851               GNUNET_i2s (&cm->peer));
1852 #endif
1853   /* ask transport to connect to the peer */
1854   /* FIXME: timeout zero OK? need for cancellation? */
1855   GNUNET_TRANSPORT_notify_transmit_ready (transport,
1856                                           &cm->peer,
1857                                           0, 0,
1858                                           GNUNET_TIME_UNIT_ZERO,
1859                                           NULL,
1860                                           NULL);
1861 }
1862
1863
1864 /**
1865  * Handle CORE_REQUEST_DISCONNECT request.
1866  *
1867  * @param cls unused
1868  * @param client the client issuing the request
1869  * @param message the "struct ConnectMessage"
1870  */
1871 static void
1872 handle_client_request_disconnect (void *cls,
1873                                   struct GNUNET_SERVER_Client *client,
1874                                   const struct GNUNET_MessageHeader *message)
1875 {
1876   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
1877   struct Neighbour *n;
1878
1879   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1880   n = find_neighbour (&cm->peer);
1881   if (n == NULL)
1882     return; /* done */
1883   /* FIXME: implement disconnect! */
1884 }
1885
1886
1887
1888 /**
1889  * List of handlers for the messages understood by this
1890  * service.
1891  */
1892 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1893   {&handle_client_init, NULL,
1894    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1895   {&handle_client_request_info, NULL,
1896    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
1897    sizeof (struct RequestInfoMessage)},
1898   {&handle_client_send, NULL,
1899    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1900   {&handle_client_request_connect, NULL,
1901    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
1902    sizeof (struct ConnectMessage)},
1903   {&handle_client_request_disconnect, NULL,
1904    GNUNET_MESSAGE_TYPE_CORE_REQUEST_DISCONNECT,
1905    sizeof (struct ConnectMessage)},
1906   {NULL, NULL, 0, 0}
1907 };
1908
1909
1910 /**
1911  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
1912  * the neighbour's struct and retry send_key.  Or, if we did not get a
1913  * HELLO, just do nothing.
1914  *
1915  * @param cls NULL
1916  * @param peer the peer for which this is the HELLO
1917  * @param hello HELLO message of that peer
1918  * @param trust amount of trust we currently have in that peer
1919  */
1920 static void
1921 process_hello_retry_send_key (void *cls,
1922                               const struct GNUNET_PeerIdentity *peer,
1923                               const struct GNUNET_HELLO_Message *hello,
1924                               uint32_t trust)
1925 {
1926   struct Neighbour *n = cls;
1927
1928   if (peer == NULL)
1929     {
1930       n->pitr = NULL;
1931       return;
1932     }
1933   if (n->public_key != NULL)
1934     return;
1935 #if DEBUG_CORE
1936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937               "Received new `%s' message for `%4s', initiating key exchange.\n",
1938               "HELLO",
1939               GNUNET_i2s (peer));
1940 #endif
1941   n->public_key =
1942     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
1943   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
1944     {
1945       GNUNET_free (n->public_key);
1946       n->public_key = NULL;
1947       return;
1948     }
1949   send_key (n);
1950 }
1951
1952
1953 /**
1954  * Task that will retry "send_key" if our previous attempt failed
1955  * to yield a PONG.
1956  */
1957 static void
1958 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1959 {
1960   struct Neighbour *n = cls;
1961
1962   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
1963   n->set_key_retry_frequency =
1964     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
1965   send_key (n);
1966 }
1967
1968
1969 /**
1970  * Send our key (and encrypted PING) to the other peer.
1971  *
1972  * @param n the other peer
1973  */
1974 static void
1975 send_key (struct Neighbour *n)
1976 {
1977   struct SetKeyMessage *sm;
1978   struct MessageEntry *me;
1979   struct PingMessage pp;
1980   struct PingMessage *pm;
1981
1982   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
1983        (n->pitr != NULL) )
1984     return; /* already in progress */
1985 #if DEBUG_CORE
1986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987               "Asked to perform key exchange with `%4s'.\n",
1988               GNUNET_i2s (&n->peer));
1989 #endif
1990   if (n->public_key == NULL)
1991     {
1992       /* lookup n's public key, then try again */
1993 #if DEBUG_CORE
1994       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1995                   "Lacking public key for `%4s', trying to obtain one.\n",
1996                   GNUNET_i2s (&n->peer));
1997 #endif
1998       GNUNET_assert (n->pitr == NULL);
1999       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2000                                          sched,
2001                                          &n->peer,
2002                                          0,
2003                                          GNUNET_TIME_UNIT_MINUTES,
2004                                          &process_hello_retry_send_key, n);
2005       return;
2006     }
2007   /* first, set key message */
2008   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2009                       sizeof (struct SetKeyMessage));
2010   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2011   me->priority = SET_KEY_PRIORITY;
2012   me->size = sizeof (struct SetKeyMessage);
2013   if (n->encrypted_head == NULL)
2014     n->encrypted_head = me;
2015   else
2016     n->encrypted_tail->next = me;
2017   n->encrypted_tail = me;
2018   sm = (struct SetKeyMessage *) &me[1];
2019   sm->header.size = htons (sizeof (struct SetKeyMessage));
2020   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2021   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2022                                         PEER_STATE_KEY_SENT : n->status));
2023   sm->purpose.size =
2024     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2025            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2026            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2027            sizeof (struct GNUNET_PeerIdentity));
2028   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2029   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2030   sm->target = n->peer;
2031   GNUNET_assert (GNUNET_OK ==
2032                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2033                                             sizeof (struct
2034                                                     GNUNET_CRYPTO_AesSessionKey),
2035                                             n->public_key,
2036                                             &sm->encrypted_key));
2037   GNUNET_assert (GNUNET_OK ==
2038                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2039                                          &sm->signature));
2040
2041   /* second, encrypted PING message */
2042   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2043                       sizeof (struct PingMessage));
2044   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2045   me->priority = PING_PRIORITY;
2046   me->size = sizeof (struct PingMessage);
2047   n->encrypted_tail->next = me;
2048   n->encrypted_tail = me;
2049   pm = (struct PingMessage *) &me[1];
2050   pm->header.size = htons (sizeof (struct PingMessage));
2051   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2052   pp.challenge = htonl (n->ping_challenge);
2053   pp.target = n->peer;
2054 #if DEBUG_CORE
2055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2056               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2057               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2059               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2060               "PING",
2061               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2062 #endif
2063   do_encrypt (n,
2064               &n->peer.hashPubKey,
2065               &pp.challenge,
2066               &pm->challenge,
2067               sizeof (struct PingMessage) -
2068               sizeof (struct GNUNET_MessageHeader));
2069   /* update status */
2070   switch (n->status)
2071     {
2072     case PEER_STATE_DOWN:
2073       n->status = PEER_STATE_KEY_SENT;
2074       break;
2075     case PEER_STATE_KEY_SENT:
2076       break;
2077     case PEER_STATE_KEY_RECEIVED:
2078       break;
2079     case PEER_STATE_KEY_CONFIRMED:
2080       break;
2081     default:
2082       GNUNET_break (0);
2083       break;
2084     }
2085 #if DEBUG_CORE
2086   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2087               "Have %llu ms left for `%s' transmission.\n",
2088               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2089               "SET_KEY");
2090 #endif
2091   /* trigger queue processing */
2092   process_encrypted_neighbour_queue (n);
2093   if (n->status != PEER_STATE_KEY_CONFIRMED)
2094     {
2095       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task);
2096       n->retry_set_key_task
2097         = GNUNET_SCHEDULER_add_delayed (sched,
2098                                         n->set_key_retry_frequency,
2099                                         &set_key_retry_task, n);
2100     }
2101 }
2102
2103
2104 /**
2105  * We received a SET_KEY message.  Validate and update
2106  * our key material and status.
2107  *
2108  * @param n the neighbour from which we received message m
2109  * @param m the set key message we received
2110  */
2111 static void
2112 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2113
2114
2115 /**
2116  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2117  * the neighbour's struct and retry handling the set_key message.  Or,
2118  * if we did not get a HELLO, just free the set key message.
2119  *
2120  * @param cls pointer to the set key message
2121  * @param peer the peer for which this is the HELLO
2122  * @param hello HELLO message of that peer
2123  * @param trust amount of trust we currently have in that peer
2124  */
2125 static void
2126 process_hello_retry_handle_set_key (void *cls,
2127                                     const struct GNUNET_PeerIdentity *peer,
2128                                     const struct GNUNET_HELLO_Message *hello,
2129                                     uint32_t trust)
2130 {
2131   struct Neighbour *n = cls;
2132   struct SetKeyMessage *sm = n->skm;
2133
2134   if (peer == NULL)
2135     {
2136       GNUNET_free (sm);
2137       n->skm = NULL;
2138       n->pitr = NULL;
2139       return;
2140     }
2141   if (n->public_key != NULL)
2142     return;                     /* multiple HELLOs match!? */
2143   n->public_key =
2144     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2145   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2146     {
2147       GNUNET_break_op (0);
2148       GNUNET_free (n->public_key);
2149       n->public_key = NULL;
2150       return;
2151     }
2152 #if DEBUG_CORE
2153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2155               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2156 #endif
2157   handle_set_key (n, sm);
2158 }
2159
2160
2161 /**
2162  * We received a PING message.  Validate and transmit
2163  * PONG.
2164  *
2165  * @param n sender of the PING
2166  * @param m the encrypted PING message itself
2167  */
2168 static void
2169 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2170 {
2171   struct PingMessage t;
2172   struct PingMessage *tp;
2173   struct MessageEntry *me;
2174
2175 #if DEBUG_CORE
2176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177               "Core service receives `%s' request from `%4s'.\n",
2178               "PING", GNUNET_i2s (&n->peer));
2179 #endif
2180   if (GNUNET_OK !=
2181       do_decrypt (n,
2182                   &my_identity.hashPubKey,
2183                   &m->challenge,
2184                   &t.challenge,
2185                   sizeof (struct PingMessage) -
2186                   sizeof (struct GNUNET_MessageHeader)))
2187     return;
2188 #if DEBUG_CORE
2189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2190               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2191               "PING",
2192               GNUNET_i2s (&t.target),
2193               ntohl (t.challenge), n->decrypt_key.crc32);
2194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2195               "Target of `%s' request is `%4s'.\n",
2196               "PING", GNUNET_i2s (&t.target));
2197 #endif
2198   if (0 != memcmp (&t.target,
2199                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2200     {
2201       GNUNET_break_op (0);
2202       return;
2203     }
2204   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2205                       sizeof (struct PingMessage));
2206   if (n->encrypted_tail != NULL)
2207     n->encrypted_tail->next = me;
2208   else
2209     {
2210       n->encrypted_tail = me;
2211       n->encrypted_head = me;
2212     }
2213   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2214   me->priority = PONG_PRIORITY;
2215   me->size = sizeof (struct PingMessage);
2216   tp = (struct PingMessage *) &me[1];
2217   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2218   tp->header.size = htons (sizeof (struct PingMessage));
2219   do_encrypt (n,
2220               &my_identity.hashPubKey,
2221               &t.challenge,
2222               &tp->challenge,
2223               sizeof (struct PingMessage) -
2224               sizeof (struct GNUNET_MessageHeader));
2225 #if DEBUG_CORE
2226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2227               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2228               ntohl (t.challenge), n->encrypt_key.crc32);
2229 #endif
2230   /* trigger queue processing */
2231   process_encrypted_neighbour_queue (n);
2232 }
2233
2234
2235 /**
2236  * We received a SET_KEY message.  Validate and update
2237  * our key material and status.
2238  *
2239  * @param n the neighbour from which we received message m
2240  * @param m the set key message we received
2241  */
2242 static void
2243 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2244 {
2245   struct SetKeyMessage *m_cpy;
2246   struct GNUNET_TIME_Absolute t;
2247   struct GNUNET_CRYPTO_AesSessionKey k;
2248   struct PingMessage *ping;
2249   enum PeerStateMachine sender_status;
2250
2251 #if DEBUG_CORE
2252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2253               "Core service receives `%s' request from `%4s'.\n",
2254               "SET_KEY", GNUNET_i2s (&n->peer));
2255 #endif
2256   if (n->public_key == NULL)
2257     {
2258 #if DEBUG_CORE
2259       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2260                   "Lacking public key for peer, trying to obtain one.\n");
2261 #endif
2262       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2263       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2264       /* lookup n's public key, then try again */
2265       GNUNET_assert (n->pitr == NULL);
2266       GNUNET_assert (n->skm == NULL);
2267       n->skm = m_cpy;
2268       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2269                                          sched,
2270                                          &n->peer,
2271                                          0,
2272                                          GNUNET_TIME_UNIT_MINUTES,
2273                                          &process_hello_retry_handle_set_key, n);
2274       return;
2275     }
2276   if (0 != memcmp (&m->target,
2277                    &my_identity,
2278                    sizeof (struct GNUNET_PeerIdentity)))
2279     {
2280       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2281                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2282       return;
2283     }
2284   if ((ntohl (m->purpose.size) !=
2285        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2286        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2287        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2288        sizeof (struct GNUNET_PeerIdentity)) ||
2289       (GNUNET_OK !=
2290        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2291                                  &m->purpose, &m->signature, n->public_key)))
2292     {
2293       /* invalid signature */
2294       GNUNET_break_op (0);
2295       return;
2296     }
2297   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2298   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2299        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2300       (t.value < n->decrypt_key_created.value))
2301     {
2302       /* this could rarely happen due to massive re-ordering of
2303          messages on the network level, but is most likely either
2304          a bug or some adversary messing with us.  Report. */
2305       GNUNET_break_op (0);
2306       return;
2307     }
2308 #if DEBUG_CORE
2309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2310 #endif  
2311   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2312                                   &m->encrypted_key,
2313                                   &k,
2314                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2315        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2316       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2317     {
2318       /* failed to decrypt !? */
2319       GNUNET_break_op (0);
2320       return;
2321     }
2322
2323   n->decrypt_key = k;
2324   if (n->decrypt_key_created.value != t.value)
2325     {
2326       /* fresh key, reset sequence numbers */
2327       n->last_sequence_number_received = 0;
2328       n->last_packets_bitmap = 0;
2329       n->decrypt_key_created = t;
2330     }
2331   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2332   switch (n->status)
2333     {
2334     case PEER_STATE_DOWN:
2335       n->status = PEER_STATE_KEY_RECEIVED;
2336 #if DEBUG_CORE
2337       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2338                   "Responding to `%s' with my own key.\n", "SET_KEY");
2339 #endif
2340       send_key (n);
2341       break;
2342     case PEER_STATE_KEY_SENT:
2343     case PEER_STATE_KEY_RECEIVED:
2344       n->status = PEER_STATE_KEY_RECEIVED;
2345       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2346           (sender_status != PEER_STATE_KEY_CONFIRMED))
2347         {
2348 #if DEBUG_CORE
2349           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350                       "Responding to `%s' with my own key (other peer has status %u).\n",
2351                       "SET_KEY", sender_status);
2352 #endif
2353           send_key (n);
2354         }
2355       break;
2356     case PEER_STATE_KEY_CONFIRMED:
2357       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2358           (sender_status != PEER_STATE_KEY_CONFIRMED))
2359         {         
2360 #if DEBUG_CORE
2361           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2363                       "SET_KEY", sender_status);
2364 #endif
2365           send_key (n);
2366         }
2367       break;
2368     default:
2369       GNUNET_break (0);
2370       break;
2371     }
2372   if (n->pending_ping != NULL)
2373     {
2374       ping = n->pending_ping;
2375       n->pending_ping = NULL;
2376       handle_ping (n, ping);
2377       GNUNET_free (ping);
2378     }
2379 }
2380
2381
2382 /**
2383  * We received a PONG message.  Validate and update our status.
2384  *
2385  * @param n sender of the PONG
2386  * @param m the encrypted PONG message itself
2387  */
2388 static void
2389 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2390 {
2391   struct PingMessage t;
2392   struct ConnectNotifyMessage cnm;
2393
2394 #if DEBUG_CORE
2395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2396               "Core service receives `%s' request from `%4s'.\n",
2397               "PONG", GNUNET_i2s (&n->peer));
2398 #endif
2399   if (GNUNET_OK !=
2400       do_decrypt (n,
2401                   &n->peer.hashPubKey,
2402                   &m->challenge,
2403                   &t.challenge,
2404                   sizeof (struct PingMessage) -
2405                   sizeof (struct GNUNET_MessageHeader)))
2406     return;
2407 #if DEBUG_CORE
2408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2409               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2410               "PONG",
2411               GNUNET_i2s (&t.target),
2412               ntohl (t.challenge), n->decrypt_key.crc32);
2413 #endif
2414   if ((0 != memcmp (&t.target,
2415                     &n->peer,
2416                     sizeof (struct GNUNET_PeerIdentity))) ||
2417       (n->ping_challenge != ntohl (t.challenge)))
2418     {
2419       /* PONG malformed */
2420 #if DEBUG_CORE
2421       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2422                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2423                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2424       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2425                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2426                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2427 #endif
2428       GNUNET_break_op (0);
2429       return;
2430     }
2431   switch (n->status)
2432     {
2433     case PEER_STATE_DOWN:
2434       GNUNET_break (0);         /* should be impossible */
2435       return;
2436     case PEER_STATE_KEY_SENT:
2437       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2438       return;
2439     case PEER_STATE_KEY_RECEIVED:
2440       n->status = PEER_STATE_KEY_CONFIRMED;
2441       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2442         {
2443           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2444           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2445         }      
2446       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2447       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2448       cnm.distance = htonl (n->last_distance);
2449       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2450       cnm.peer = n->peer;
2451       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2452       process_encrypted_neighbour_queue (n);
2453       break;
2454     case PEER_STATE_KEY_CONFIRMED:
2455       /* duplicate PONG? */
2456       break;
2457     default:
2458       GNUNET_break (0);
2459       break;
2460     }
2461 }
2462
2463
2464 /**
2465  * Send a P2P message to a client.
2466  *
2467  * @param sender who sent us the message?
2468  * @param client who should we give the message to?
2469  * @param m contains the message to transmit
2470  * @param msize number of bytes in buf to transmit
2471  */
2472 static void
2473 send_p2p_message_to_client (struct Neighbour *sender,
2474                             struct Client *client,
2475                             const void *m, size_t msize)
2476 {
2477   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2478   struct NotifyTrafficMessage *ntm;
2479
2480 #if DEBUG_CORE
2481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482               "Core service passes message from `%4s' of type %u to client.\n",
2483               GNUNET_i2s(&sender->peer),
2484               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2485 #endif
2486   ntm = (struct NotifyTrafficMessage *) buf;
2487   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2488   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2489   ntm->distance = htonl (sender->last_distance);
2490   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2491   ntm->peer = sender->peer;
2492   memcpy (&ntm[1], m, msize);
2493   send_to_client (client, &ntm->header, GNUNET_YES);
2494 }
2495
2496
2497 /**
2498  * Deliver P2P message to interested clients.
2499  *
2500  * @param sender who sent us the message?
2501  * @param m the message
2502  * @param msize size of the message (including header)
2503  */
2504 static void
2505 deliver_message (struct Neighbour *sender,
2506                  const struct GNUNET_MessageHeader *m, size_t msize)
2507 {
2508   struct Client *cpos;
2509   uint16_t type;
2510   unsigned int tpos;
2511   int deliver_full;
2512
2513   type = ntohs (m->type);
2514   cpos = clients;
2515   while (cpos != NULL)
2516     {
2517       deliver_full = GNUNET_NO;
2518       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2519         deliver_full = GNUNET_YES;
2520       else
2521         {
2522           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2523             {
2524               if (type != cpos->types[tpos])
2525                 continue;
2526               deliver_full = GNUNET_YES;
2527               break;
2528             }
2529         }
2530       if (GNUNET_YES == deliver_full)
2531         send_p2p_message_to_client (sender, cpos, m, msize);
2532       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2533         send_p2p_message_to_client (sender, cpos, m,
2534                                     sizeof (struct GNUNET_MessageHeader));
2535       cpos = cpos->next;
2536     }
2537 }
2538
2539
2540 /**
2541  * Align P2P message and then deliver to interested clients.
2542  *
2543  * @param sender who sent us the message?
2544  * @param buffer unaligned (!) buffer containing message
2545  * @param msize size of the message (including header)
2546  */
2547 static void
2548 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2549 {
2550   char abuf[msize];
2551
2552   /* TODO: call to statistics? */
2553   memcpy (abuf, buffer, msize);
2554   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2555 }
2556
2557
2558 /**
2559  * Deliver P2P messages to interested clients.
2560  *
2561  * @param sender who sent us the message?
2562  * @param buffer buffer containing messages, can be modified
2563  * @param buffer_size size of the buffer (overall)
2564  * @param offset offset where messages in the buffer start
2565  */
2566 static void
2567 deliver_messages (struct Neighbour *sender,
2568                   const char *buffer, size_t buffer_size, size_t offset)
2569 {
2570   struct GNUNET_MessageHeader *mhp;
2571   struct GNUNET_MessageHeader mh;
2572   uint16_t msize;
2573   int need_align;
2574
2575   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2576     {
2577       if (0 != offset % sizeof (uint16_t))
2578         {
2579           /* outch, need to copy to access header */
2580           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2581           mhp = &mh;
2582         }
2583       else
2584         {
2585           /* can access header directly */
2586           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2587         }
2588       msize = ntohs (mhp->size);
2589       if (msize + offset > buffer_size)
2590         {
2591           /* malformed message, header says it is larger than what
2592              would fit into the overall buffer */
2593           GNUNET_break_op (0);
2594           break;
2595         }
2596 #if HAVE_UNALIGNED_64_ACCESS
2597       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2598 #else
2599       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2600 #endif
2601       if (GNUNET_YES == need_align)
2602         align_and_deliver (sender, &buffer[offset], msize);
2603       else
2604         deliver_message (sender,
2605                          (const struct GNUNET_MessageHeader *)
2606                          &buffer[offset], msize);
2607       offset += msize;
2608     }
2609 }
2610
2611
2612 /**
2613  * We received an encrypted message.  Decrypt, validate and
2614  * pass on to the appropriate clients.
2615  */
2616 static void
2617 handle_encrypted_message (struct Neighbour *n,
2618                           const struct EncryptedMessage *m)
2619 {
2620   size_t size = ntohs (m->header.size);
2621   char buf[size];
2622   struct EncryptedMessage *pt;  /* plaintext */
2623   GNUNET_HashCode ph;
2624   size_t off;
2625   uint32_t snum;
2626   struct GNUNET_TIME_Absolute t;
2627
2628 #if DEBUG_CORE
2629   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2630               "Core service receives `%s' request from `%4s'.\n",
2631               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2632 #endif  
2633   /* decrypt */
2634   if (GNUNET_OK !=
2635       do_decrypt (n,
2636                   &m->plaintext_hash,
2637                   &m->sequence_number,
2638                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2639     return;
2640   pt = (struct EncryptedMessage *) buf;
2641
2642   /* validate hash */
2643   GNUNET_CRYPTO_hash (&pt->sequence_number,
2644                       size - ENCRYPTED_HEADER_SIZE, &ph);
2645   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2646     {
2647       /* checksum failed */
2648       GNUNET_break_op (0);
2649       return;
2650     }
2651
2652   /* validate sequence number */
2653   snum = ntohl (pt->sequence_number);
2654   if (n->last_sequence_number_received == snum)
2655     {
2656       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2657                   "Received duplicate message, ignoring.\n");
2658       /* duplicate, ignore */
2659       return;
2660     }
2661   if ((n->last_sequence_number_received > snum) &&
2662       (n->last_sequence_number_received - snum > 32))
2663     {
2664       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2665                   "Received ancient out of sequence message, ignoring.\n");
2666       /* ancient out of sequence, ignore */
2667       return;
2668     }
2669   if (n->last_sequence_number_received > snum)
2670     {
2671       unsigned int rotbit =
2672         1 << (n->last_sequence_number_received - snum - 1);
2673       if ((n->last_packets_bitmap & rotbit) != 0)
2674         {
2675           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2676                       "Received duplicate message, ignoring.\n");
2677           /* duplicate, ignore */
2678           return;
2679         }
2680       n->last_packets_bitmap |= rotbit;
2681     }
2682   if (n->last_sequence_number_received < snum)
2683     {
2684       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2685       n->last_sequence_number_received = snum;
2686     }
2687
2688   /* check timestamp */
2689   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2690   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2691     {
2692       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2693                   _
2694                   ("Message received far too old (%llu ms). Content ignored.\n"),
2695                   GNUNET_TIME_absolute_get_duration (t).value);
2696       return;
2697     }
2698
2699   /* process decrypted message(s) */
2700   update_window (GNUNET_YES,
2701                  &n->available_send_window,
2702                  &n->last_asw_update,
2703                  n->bpm_out);
2704   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2705   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2706                            n->bpm_out_internal_limit);
2707   n->last_activity = GNUNET_TIME_absolute_get ();
2708   off = sizeof (struct EncryptedMessage);
2709   deliver_messages (n, buf, size, off);
2710 }
2711
2712
2713 /**
2714  * Function called by the transport for each received message.
2715  *
2716  * @param cls closure
2717  * @param peer (claimed) identity of the other peer
2718  * @param message the message
2719  * @param latency estimated latency for communicating with the
2720  *             given peer (round-trip)
2721  * @param distance in overlay hops, as given by transport plugin
2722  */
2723 static void
2724 handle_transport_receive (void *cls,
2725                           const struct GNUNET_PeerIdentity *peer,
2726                           const struct GNUNET_MessageHeader *message,
2727                           struct GNUNET_TIME_Relative latency,
2728                           unsigned int distance)
2729 {
2730   struct Neighbour *n;
2731   struct GNUNET_TIME_Absolute now;
2732   int up;
2733   uint16_t type;
2734   uint16_t size;
2735
2736 #if DEBUG_CORE
2737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2738               "Received message of type %u from `%4s', demultiplexing.\n",
2739               ntohs (message->type), GNUNET_i2s (peer));
2740 #endif
2741   n = find_neighbour (peer);
2742   if (n == NULL)
2743     {
2744       GNUNET_break (0);
2745       return;
2746     }
2747   n->last_latency = latency;
2748   n->last_distance = distance;
2749   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2750   type = ntohs (message->type);
2751   size = ntohs (message->size);
2752   switch (type)
2753     {
2754     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2755       if (size != sizeof (struct SetKeyMessage))
2756         {
2757           GNUNET_break_op (0);
2758           return;
2759         }
2760       handle_set_key (n, (const struct SetKeyMessage *) message);
2761       break;
2762     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2763       if (size < sizeof (struct EncryptedMessage) +
2764           sizeof (struct GNUNET_MessageHeader))
2765         {
2766           GNUNET_break_op (0);
2767           return;
2768         }
2769       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2770           (n->status != PEER_STATE_KEY_CONFIRMED))
2771         {
2772           GNUNET_break_op (0);
2773           return;
2774         }
2775       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2776       break;
2777     case GNUNET_MESSAGE_TYPE_CORE_PING:
2778       if (size != sizeof (struct PingMessage))
2779         {
2780           GNUNET_break_op (0);
2781           return;
2782         }
2783       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2784           (n->status != PEER_STATE_KEY_CONFIRMED))
2785         {
2786 #if DEBUG_CORE
2787           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2788                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2789                       "PING", GNUNET_i2s (&n->peer));
2790 #endif
2791           GNUNET_free_non_null (n->pending_ping);
2792           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2793           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2794           return;
2795         }
2796       handle_ping (n, (const struct PingMessage *) message);
2797       break;
2798     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2799       if (size != sizeof (struct PingMessage))
2800         {
2801           GNUNET_break_op (0);
2802           return;
2803         }
2804       if ((n->status != PEER_STATE_KEY_SENT) &&
2805           (n->status != PEER_STATE_KEY_RECEIVED) &&
2806           (n->status != PEER_STATE_KEY_CONFIRMED))
2807         {
2808           /* could not decrypt pong, oops! */
2809           GNUNET_break_op (0);
2810           return;
2811         }
2812       handle_pong (n, (const struct PingMessage *) message);
2813       break;
2814     default:
2815       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2816                   _("Unsupported message of type %u received.\n"), type);
2817       return;
2818     }
2819   if (n->status == PEER_STATE_KEY_CONFIRMED)
2820     {
2821       now = GNUNET_TIME_absolute_get ();
2822       n->last_activity = now;
2823       if (!up)
2824         n->time_established = now;
2825     }
2826 }
2827
2828
2829 /**
2830  * Function that recalculates the bandwidth quota for the
2831  * given neighbour and transmits it to the transport service.
2832  * 
2833  * @param cls neighbour for the quota update
2834  * @param tc context
2835  */
2836 static void
2837 neighbour_quota_update (void *cls,
2838                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2839
2840
2841 /**
2842  * Schedule the task that will recalculate the bandwidth
2843  * quota for this peer (and possibly force a disconnect of
2844  * idle peers by calculating a bandwidth of zero).
2845  */
2846 static void
2847 schedule_quota_update (struct Neighbour *n)
2848 {
2849   GNUNET_assert (n->quota_update_task ==
2850                  GNUNET_SCHEDULER_NO_TASK);
2851   n->quota_update_task
2852     = GNUNET_SCHEDULER_add_delayed (sched,
2853                                     QUOTA_UPDATE_FREQUENCY,
2854                                     &neighbour_quota_update,
2855                                     n);
2856 }
2857
2858
2859 /**
2860  * Function that recalculates the bandwidth quota for the
2861  * given neighbour and transmits it to the transport service.
2862  * 
2863  * @param cls neighbour for the quota update
2864  * @param tc context
2865  */
2866 static void
2867 neighbour_quota_update (void *cls,
2868                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2869 {
2870   struct Neighbour *n = cls;
2871   uint32_t q_in;
2872   double pref_rel;
2873   double share;
2874   unsigned long long distributable;
2875   
2876   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2877   /* calculate relative preference among all neighbours;
2878      divides by a bit more to avoid division by zero AND to
2879      account for possibility of new neighbours joining any time 
2880      AND to convert to double... */
2881   pref_rel = n->current_preference / (1.0 + preference_sum);
2882   distributable = 0;
2883   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2884     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2885   share = distributable * pref_rel;
2886   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2887   /* check if we want to disconnect for good due to inactivity */
2888   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2889        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2890     q_in = 0; /* force disconnect */
2891   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2892        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2893     {
2894       n->bpm_in = q_in;
2895       GNUNET_TRANSPORT_set_quota (transport,
2896                                   &n->peer,
2897                                   n->bpm_in, 
2898                                   n->bpm_out,
2899                                   GNUNET_TIME_UNIT_FOREVER_REL,
2900                                   NULL, NULL);
2901     }
2902   schedule_quota_update (n);
2903 }
2904
2905
2906 /**
2907  * Function called by transport to notify us that
2908  * a peer connected to us (on the network level).
2909  *
2910  * @param cls closure
2911  * @param peer the peer that connected
2912  * @param latency current latency of the connection
2913  * @param distance in overlay hops, as given by transport plugin
2914  */
2915 static void
2916 handle_transport_notify_connect (void *cls,
2917                                  const struct GNUNET_PeerIdentity *peer,
2918                                  struct GNUNET_TIME_Relative latency,
2919                                  unsigned int distance)
2920 {
2921   struct Neighbour *n;
2922   struct GNUNET_TIME_Absolute now;
2923   struct ConnectNotifyMessage cnm;
2924
2925   n = find_neighbour (peer);
2926   if (n != NULL)
2927     {
2928       /* duplicate connect notification!? */
2929       GNUNET_break (0);
2930       return;
2931     }
2932   now = GNUNET_TIME_absolute_get ();
2933   n = GNUNET_malloc (sizeof (struct Neighbour));
2934   n->next = neighbours;
2935   neighbours = n;
2936   neighbour_count++;
2937   n->peer = *peer;
2938   n->last_latency = latency;
2939   n->last_distance = distance;
2940   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2941   n->encrypt_key_created = now;
2942   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2943   n->last_asw_update = now;
2944   n->last_arw_update = now;
2945   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2946   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2947   n->bpm_out_internal_limit = (uint32_t) - 1;
2948   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2949   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2950                                                 (uint32_t) - 1);
2951 #if DEBUG_CORE
2952   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2953               "Received connection from `%4s'.\n",
2954               GNUNET_i2s (&n->peer));
2955 #endif
2956   schedule_quota_update (n);
2957   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2958   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
2959   cnm.distance = htonl (n->last_distance);
2960   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2961   cnm.peer = *peer;
2962   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
2963   send_key (n);
2964 }
2965
2966
2967 /**
2968  * Free the given entry for the neighbour (it has
2969  * already been removed from the list at this point).
2970  *
2971  * @param n neighbour to free
2972  */
2973 static void
2974 free_neighbour (struct Neighbour *n)
2975 {
2976   struct MessageEntry *m;
2977
2978   if (n->pitr != NULL)
2979     {
2980       GNUNET_PEERINFO_iterate_cancel (n->pitr);
2981       n->pitr = NULL;
2982     }
2983   if (n->skm != NULL)
2984     {
2985       GNUNET_free (n->skm);
2986       n->skm = NULL;
2987     }
2988   while (NULL != (m = n->messages))
2989     {
2990       n->messages = m->next;
2991       GNUNET_free (m);
2992     }
2993   while (NULL != (m = n->encrypted_head))
2994     {
2995       n->encrypted_head = m->next;
2996       GNUNET_free (m);
2997     }
2998   if (NULL != n->th)
2999     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3000   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3001     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3002   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3003     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3004   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3005     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3006   GNUNET_free_non_null (n->public_key);
3007   GNUNET_free_non_null (n->pending_ping);
3008   GNUNET_free (n);
3009 }
3010
3011
3012 /**
3013  * Function called by transport telling us that a peer
3014  * disconnected.
3015  *
3016  * @param cls closure
3017  * @param peer the peer that disconnected
3018  */
3019 static void
3020 handle_transport_notify_disconnect (void *cls,
3021                                     const struct GNUNET_PeerIdentity *peer)
3022 {
3023   struct DisconnectNotifyMessage cnm;
3024   struct Neighbour *n;
3025   struct Neighbour *p;
3026
3027 #if DEBUG_CORE
3028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3029               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3030 #endif
3031   p = NULL;
3032   n = neighbours;
3033   while ((n != NULL) &&
3034          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3035     {
3036       p = n;
3037       n = n->next;
3038     }
3039   if (n == NULL)
3040     {
3041       GNUNET_break (0);
3042       return;
3043     }
3044   if (p == NULL)
3045     neighbours = n->next;
3046   else
3047     p->next = n->next;
3048   GNUNET_assert (neighbour_count > 0);
3049   neighbour_count--;
3050   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3051   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3052   cnm.peer = *peer;
3053   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3054   free_neighbour (n);
3055 }
3056
3057
3058 /**
3059  * Last task run during shutdown.  Disconnects us from
3060  * the transport.
3061  */
3062 static void
3063 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3064 {
3065   struct Neighbour *n;
3066   struct Client *c;
3067
3068 #if DEBUG_CORE
3069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3070               "Core service shutting down.\n");
3071 #endif
3072   GNUNET_assert (transport != NULL);
3073   GNUNET_TRANSPORT_disconnect (transport);
3074   transport = NULL;
3075   while (NULL != (n = neighbours))
3076     {
3077       neighbours = n->next;
3078       GNUNET_assert (neighbour_count > 0);
3079       neighbour_count--;
3080       free_neighbour (n);
3081     }
3082   GNUNET_SERVER_notification_context_destroy (notifier);
3083   notifier = NULL;
3084   while (NULL != (c = clients))
3085     handle_client_disconnect (NULL, c->client_handle);
3086   if (my_private_key != NULL)
3087     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3088 }
3089
3090
3091 /**
3092  * Initiate core service.
3093  *
3094  * @param cls closure
3095  * @param s scheduler to use
3096  * @param serv the initialized server
3097  * @param c configuration to use
3098  */
3099 static void
3100 run (void *cls,
3101      struct GNUNET_SCHEDULER_Handle *s,
3102      struct GNUNET_SERVER_Handle *serv,
3103      const struct GNUNET_CONFIGURATION_Handle *c)
3104 {
3105 #if 0
3106   unsigned long long qin;
3107   unsigned long long qout;
3108   unsigned long long tneigh;
3109 #endif
3110   char *keyfile;
3111
3112   sched = s;
3113   cfg = c;  
3114   /* parse configuration */
3115   if (
3116        (GNUNET_OK !=
3117         GNUNET_CONFIGURATION_get_value_number (c,
3118                                                "CORE",
3119                                                "TOTAL_QUOTA_IN",
3120                                                &bandwidth_target_in)) ||
3121        (GNUNET_OK !=
3122         GNUNET_CONFIGURATION_get_value_number (c,
3123                                                "CORE",
3124                                                "TOTAL_QUOTA_OUT",
3125                                                &bandwidth_target_out)) ||
3126 #if 0
3127        (GNUNET_OK !=
3128         GNUNET_CONFIGURATION_get_value_number (c,
3129                                                "CORE",
3130                                                "YY",
3131                                                &qout)) ||
3132        (GNUNET_OK !=
3133         GNUNET_CONFIGURATION_get_value_number (c,
3134                                                "CORE",
3135                                                "ZZ_LIMIT", &tneigh)) ||
3136 #endif
3137        (GNUNET_OK !=
3138         GNUNET_CONFIGURATION_get_value_filename (c,
3139                                                  "GNUNETD",
3140                                                  "HOSTKEY", &keyfile)))
3141     {
3142       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3143                   _
3144                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3145       GNUNET_SCHEDULER_shutdown (s);
3146       return;
3147     }
3148   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3149   GNUNET_free (keyfile);
3150   if (my_private_key == NULL)
3151     {
3152       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3153                   _("Core service could not access hostkey.  Exiting.\n"));
3154       GNUNET_SCHEDULER_shutdown (s);
3155       return;
3156     }
3157   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3158   GNUNET_CRYPTO_hash (&my_public_key,
3159                       sizeof (my_public_key), &my_identity.hashPubKey);
3160   /* setup notification */
3161   server = serv;
3162   notifier = GNUNET_SERVER_notification_context_create (server, 0);
3163   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3164   /* setup transport connection */
3165   transport = GNUNET_TRANSPORT_connect (sched,
3166                                         cfg,
3167                                         NULL,
3168                                         &handle_transport_receive,
3169                                         &handle_transport_notify_connect,
3170                                         &handle_transport_notify_disconnect);
3171   GNUNET_assert (NULL != transport);
3172   GNUNET_SCHEDULER_add_delayed (sched,
3173                                 GNUNET_TIME_UNIT_FOREVER_REL,
3174                                 &cleaning_task, NULL);
3175   /* process client requests */
3176   GNUNET_SERVER_add_handlers (server, handlers);
3177   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3178               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3179 }
3180
3181
3182
3183 /**
3184  * The main function for the transport service.
3185  *
3186  * @param argc number of arguments from the command line
3187  * @param argv command line arguments
3188  * @return 0 ok, 1 on error
3189  */
3190 int
3191 main (int argc, char *const *argv)
3192 {
3193   return (GNUNET_OK ==
3194           GNUNET_SERVICE_run (argc,
3195                               argv,
3196                               "core",
3197                               GNUNET_SERVICE_OPTION_NONE,
3198                               &run, NULL)) ? 0 : 1;
3199 }
3200
3201 /* end of gnunet-service-core.c */