adjust code to new peerinfo API
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Non-NULL if we are currently looking up HELLOs for this peer.
364    * for this peer.
365    */
366   struct GNUNET_PEERINFO_IteratorContext *pitr;
367
368   /**
369    * SetKeyMessage to transmit, NULL if we are not currently trying
370    * to send one.
371    */
372   struct SetKeyMessage *skm;
373
374   /**
375    * Identity of the neighbour.
376    */
377   struct GNUNET_PeerIdentity peer;
378
379   /**
380    * Key we use to encrypt our messages for the other peer
381    * (initialized by us when we do the handshake).
382    */
383   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
384
385   /**
386    * Key we use to decrypt messages from the other peer
387    * (given to us by the other peer during the handshake).
388    */
389   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
390
391   /**
392    * ID of task used for re-trying plaintext scheduling.
393    */
394   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
395
396   /**
397    * ID of task used for re-trying SET_KEY and PING message.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
400
401   /**
402    * ID of task used for updating bandwidth quota for this neighbour.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
405
406   /**
407    * At what time did we generate our encryption key?
408    */
409   struct GNUNET_TIME_Absolute encrypt_key_created;
410
411   /**
412    * At what time did the other peer generate the decryption key?
413    */
414   struct GNUNET_TIME_Absolute decrypt_key_created;
415
416   /**
417    * At what time did we initially establish (as in, complete session
418    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
419    */
420   struct GNUNET_TIME_Absolute time_established;
421
422   /**
423    * At what time did we last receive an encrypted message from the
424    * other peer?  Should be zero if status != KEY_CONFIRMED.
425    */
426   struct GNUNET_TIME_Absolute last_activity;
427
428   /**
429    * Last latency observed from this peer.
430    */
431   struct GNUNET_TIME_Relative last_latency;
432
433   /**
434    * At what frequency are we currently re-trying SET_KEY messages?
435    */
436   struct GNUNET_TIME_Relative set_key_retry_frequency;
437
438   /**
439    * Time of our last update to the "available_send_window".
440    */
441   struct GNUNET_TIME_Absolute last_asw_update;
442
443   /**
444    * Time of our last update to the "available_recv_window".
445    */
446   struct GNUNET_TIME_Absolute last_arw_update;
447
448   /**
449    * Number of bytes that we are eligible to transmit to this
450    * peer at this point.  Incremented every minute by max_out_bpm,
451    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
452    * bandwidth-hogs are sampled at a frequency of about 78s!);
453    * may get negative if we have VERY high priority content.
454    */
455   long long available_send_window; 
456
457   /**
458    * How much downstream capacity of this peer has been reserved for
459    * our traffic?  (Our clients can request that a certain amount of
460    * bandwidth is available for replies to them; this value is used to
461    * make sure that this reserved amount of bandwidth is actually
462    * available).
463    */
464   long long available_recv_window; 
465
466   /**
467    * How valueable were the messages of this peer recently?
468    */
469   unsigned long long current_preference;
470
471   /**
472    * Bit map indicating which of the 32 sequence numbers before the last
473    * were received (good for accepting out-of-order packets and
474    * estimating reliability of the connection)
475    */
476   unsigned int last_packets_bitmap;
477
478   /**
479    * Number of messages in the message queue for this peer.
480    */
481   unsigned int message_queue_size;
482
483   /**
484    * last sequence number received on this connection (highest)
485    */
486   uint32_t last_sequence_number_received;
487
488   /**
489    * last sequence number transmitted
490    */
491   uint32_t last_sequence_number_sent;
492
493   /**
494    * Available bandwidth in for this peer (current target).
495    */
496   uint32_t bpm_in;
497
498   /**
499    * Available bandwidth out for this peer (current target).
500    */
501   uint32_t bpm_out;
502
503   /**
504    * Internal bandwidth limit set for this peer (initially
505    * typically set to "-1").  "bpm_out" is MAX of
506    * "bpm_out_internal_limit" and "bpm_out_external_limit".
507    */
508   uint32_t bpm_out_internal_limit;
509
510   /**
511    * External bandwidth limit set for this peer by the
512    * peer that we are communicating with.  "bpm_out" is MAX of
513    * "bpm_out_internal_limit" and "bpm_out_external_limit".
514    */
515   uint32_t bpm_out_external_limit;
516
517   /**
518    * What was our PING challenge number (for this peer)?
519    */
520   uint32_t ping_challenge;
521
522   /**
523    * What is our connection status?
524    */
525   enum PeerStateMachine status;
526
527 };
528
529
530 /**
531  * Events are messages for clients.  The struct
532  * itself is followed by the actual message.
533  */
534 struct Event
535 {
536   /**
537    * This is a linked list.
538    */
539   struct Event *next;
540
541   /**
542    * Size of the message.
543    */
544   size_t size;
545
546   /**
547    * Could this event be dropped if this queue
548    * is getting too large? (NOT YET USED!)
549    */
550   int can_drop;
551
552 };
553
554
555 /**
556  * Data structure for each client connected to the core service.
557  */
558 struct Client
559 {
560   /**
561    * Clients are kept in a linked list.
562    */
563   struct Client *next;
564
565   /**
566    * Handle for the client with the server API.
567    */
568   struct GNUNET_SERVER_Client *client_handle;
569
570   /**
571    * Linked list of messages we still need to deliver to
572    * this client.
573    */
574   struct Event *event_head;
575
576   /**
577    * Tail of the linked list of events.
578    */
579   struct Event *event_tail;
580
581   /**
582    * Current transmit handle, NULL if no transmission request
583    * is pending.
584    */
585   struct GNUNET_CONNECTION_TransmitHandle *th;
586
587   /**
588    * Array of the types of messages this peer cares
589    * about (with "tcnt" entries).  Allocated as part
590    * of this client struct, do not free!
591    */
592   uint16_t *types;
593
594   /**
595    * Options for messages this client cares about,
596    * see GNUNET_CORE_OPTION_ values.
597    */
598   uint32_t options;
599
600   /**
601    * Number of types of incoming messages this client
602    * specifically cares about.  Size of the "types" array.
603    */
604   unsigned int tcnt;
605
606 };
607
608
609 /**
610  * Our public key.
611  */
612 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
613
614 /**
615  * Our identity.
616  */
617 static struct GNUNET_PeerIdentity my_identity;
618
619 /**
620  * Our private key.
621  */
622 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
623
624 /**
625  * Our scheduler.
626  */
627 struct GNUNET_SCHEDULER_Handle *sched;
628
629 /**
630  * Our configuration.
631  */
632 const struct GNUNET_CONFIGURATION_Handle *cfg;
633
634 /**
635  * Our server.
636  */
637 static struct GNUNET_SERVER_Handle *server;
638
639 /**
640  * Transport service.
641  */
642 static struct GNUNET_TRANSPORT_Handle *transport;
643
644 /**
645  * Linked list of our clients.
646  */
647 static struct Client *clients;
648
649 /**
650  * We keep neighbours in a linked list (for now).
651  */
652 static struct Neighbour *neighbours;
653
654 /**
655  * Sum of all preferences among all neighbours.
656  */
657 static unsigned long long preference_sum;
658
659 /**
660  * Total number of neighbours we have.
661  */
662 static unsigned int neighbour_count;
663
664 /**
665  * How much inbound bandwidth are we supposed to be using?
666  */
667 static unsigned long long bandwidth_target_in;
668
669 /**
670  * How much outbound bandwidth are we supposed to be using?
671  */
672 static unsigned long long bandwidth_target_out;
673
674
675
676 /**
677  * A preference value for a neighbour was update.  Update
678  * the preference sum accordingly.
679  *
680  * @param inc how much was a preference value increased?
681  */
682 static void
683 update_preference_sum (unsigned long long inc)
684 {
685   struct Neighbour *n;
686   unsigned long long os;
687
688   os = preference_sum;
689   preference_sum += inc;
690   if (preference_sum >= os)
691     return; /* done! */
692   /* overflow! compensate by cutting all values in half! */
693   preference_sum = 0;
694   n = neighbours;
695   while (n != NULL)
696     {
697       n->current_preference /= 2;
698       preference_sum += n->current_preference;
699       n = n->next;
700     }    
701 }
702
703
704 /**
705  * Recalculate the number of bytes we expect to
706  * receive or transmit in a given window.
707  *
708  * @param force force an update now (even if not much time has passed)
709  * @param window pointer to the byte counter (updated)
710  * @param ts pointer to the timestamp (updated)
711  * @param bpm number of bytes per minute that should
712  *        be added to the window.
713  */
714 static void
715 update_window (int force,
716                long long *window,
717                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
718 {
719   struct GNUNET_TIME_Relative since;
720
721   since = GNUNET_TIME_absolute_get_duration (*ts);
722   if ( (force == GNUNET_NO) &&
723        (since.value < 60 * 1000) )
724     return;                     /* not even a minute has passed */
725   *ts = GNUNET_TIME_absolute_get ();
726   *window += (bpm * since.value) / 60 / 1000;
727   if (*window > MAX_WINDOW_TIME * bpm)
728     *window = MAX_WINDOW_TIME * bpm;
729 }
730
731
732 /**
733  * Find the entry for the given neighbour.
734  *
735  * @param peer identity of the neighbour
736  * @return NULL if we are not connected, otherwise the
737  *         neighbour's entry.
738  */
739 static struct Neighbour *
740 find_neighbour (const struct GNUNET_PeerIdentity *peer)
741 {
742   struct Neighbour *ret;
743
744   ret = neighbours;
745   while ((ret != NULL) &&
746          (0 != memcmp (&ret->peer,
747                        peer, sizeof (struct GNUNET_PeerIdentity))))
748     ret = ret->next;
749   return ret;
750 }
751
752
753 /**
754  * Find the entry for the given client.
755  *
756  * @param client handle for the client
757  * @return NULL if we are not connected, otherwise the
758  *         client's struct.
759  */
760 static struct Client *
761 find_client (const struct GNUNET_SERVER_Client *client)
762 {
763   struct Client *ret;
764
765   ret = clients;
766   while ((ret != NULL) && (client != ret->client_handle))
767     ret = ret->next;
768   return ret;
769 }
770
771
772 /**
773  * If necessary, initiate a request with the server to
774  * transmit messages from the queue of the given client.
775  * @param client who to transfer messages to
776  */
777 static void request_transmit (struct Client *client);
778
779
780 /**
781  * Client is ready to receive data, provide it.
782  *
783  * @param cls closure
784  * @param size number of bytes available in buf
785  * @param buf where the callee should write the message
786  * @return number of bytes written to buf
787  */
788 static size_t
789 do_client_transmit (void *cls, size_t size, void *buf)
790 {
791   struct Client *client = cls;
792   struct Event *e;
793   char *tgt;
794   size_t ret;
795
796   client->th = NULL;
797 #if DEBUG_CORE_CLIENT
798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799               "Client ready to receive %u bytes.\n", size);
800 #endif
801   if (buf == NULL)
802     {
803 #if DEBUG_CORE
804       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
805                   "Failed to transmit data to client (disconnect)?\n");
806 #endif
807       return 0;                 /* we'll surely get a disconnect soon... */
808     }
809   tgt = buf;
810   ret = 0;
811   while ((NULL != (e = client->event_head)) && (e->size <= size))
812     {
813       memcpy (&tgt[ret], &e[1], e->size);
814       size -= e->size;
815       ret += e->size;
816       client->event_head = e->next;
817       GNUNET_free (e);
818     }
819   GNUNET_assert (ret > 0);
820   if (client->event_head == NULL)
821     client->event_tail = NULL;
822 #if DEBUG_CORE_CLIENT
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824               "Transmitting %u bytes to client\n", ret);
825 #endif
826   request_transmit (client);
827   return ret;
828 }
829
830
831 /**
832  * If necessary, initiate a request with the server to
833  * transmit messages from the queue of the given client.
834  * @param client who to transfer messages to
835  */
836 static void
837 request_transmit (struct Client *client)
838 {
839
840   if (NULL != client->th)
841     return;                     /* already pending */
842   if (NULL == client->event_head)
843     return;                     /* no more events pending */
844 #if DEBUG_CORE_CLIENT
845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846               "Asking server to transmit %u bytes to client\n",
847               client->event_head->size);
848 #endif
849   client->th
850     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
851                                            client->event_head->size,
852                                            GNUNET_TIME_UNIT_FOREVER_REL,
853                                            &do_client_transmit, client);
854 }
855
856
857 /**
858  * Send a message to one of our clients.
859  *
860  * @param client target for the message
861  * @param msg message to transmit
862  * @param can_drop could this message be dropped if the
863  *        client's queue is getting too large?
864  */
865 static void
866 send_to_client (struct Client *client,
867                 const struct GNUNET_MessageHeader *msg, int can_drop)
868 {
869   struct Event *e;
870   unsigned int queue_size;
871   uint16_t msize;
872
873 #if DEBUG_CORE_CLIENT
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "Preparing to send message of type %u to client.\n",
876               ntohs (msg->type));
877 #endif
878   queue_size = 0;
879   e = client->event_head;
880   while (e != NULL)
881     {
882       queue_size++;
883       e = e->next;
884     }
885   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
886        (can_drop == GNUNET_YES) )
887     {
888 #if DEBUG_CORE
889       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
890                   "Too many messages in queue for the client, dropping the new message.\n");
891 #endif
892       return;
893     }
894
895   msize = ntohs (msg->size);
896   e = GNUNET_malloc (sizeof (struct Event) + msize);
897   /* append */
898   if (client->event_tail != NULL)
899     client->event_tail->next = e;
900   else
901     client->event_head = e;
902   client->event_tail = e;
903   e->can_drop = can_drop;
904   e->size = msize;
905   memcpy (&e[1], msg, msize);
906   request_transmit (client);
907 }
908
909
910 /**
911  * Send a message to all of our current clients.
912  */
913 static void
914 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
915 {
916   struct Client *c;
917
918   c = clients;
919   while (c != NULL)
920     {
921       send_to_client (c, msg, can_drop);
922       c = c->next;
923     }
924 }
925
926
927 /**
928  * Handle CORE_INIT request.
929  */
930 static void
931 handle_client_init (void *cls,
932                     struct GNUNET_SERVER_Client *client,
933                     const struct GNUNET_MessageHeader *message)
934 {
935   const struct InitMessage *im;
936   struct InitReplyMessage irm;
937   struct Client *c;
938   uint16_t msize;
939   const uint16_t *types;
940   struct Neighbour *n;
941   struct ConnectNotifyMessage cnm;
942
943 #if DEBUG_CORE_CLIENT
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "Client connecting to core service with `%s' message\n",
946               "INIT");
947 #endif
948   /* check that we don't have an entry already */
949   c = clients;
950   while (c != NULL)
951     {
952       if (client == c->client_handle)
953         {
954           GNUNET_break (0);
955           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
956           return;
957         }
958       c = c->next;
959     }
960   msize = ntohs (message->size);
961   if (msize < sizeof (struct InitMessage))
962     {
963       GNUNET_break (0);
964       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
965       return;
966     }
967   im = (const struct InitMessage *) message;
968   types = (const uint16_t *) &im[1];
969   msize -= sizeof (struct InitMessage);
970   c = GNUNET_malloc (sizeof (struct Client) + msize);
971   c->client_handle = client;
972   c->next = clients;
973   clients = c;
974   memcpy (&c[1], types, msize);
975   c->types = (uint16_t *) & c[1];
976   c->options = ntohl (im->options);
977   c->tcnt = msize / sizeof (uint16_t);
978   /* send init reply message */
979   irm.header.size = htons (sizeof (struct InitReplyMessage));
980   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
981   irm.reserved = htonl (0);
982   memcpy (&irm.publicKey,
983           &my_public_key,
984           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
985 #if DEBUG_CORE_CLIENT
986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987               "Sending `%s' message to client.\n", "INIT_REPLY");
988 #endif
989   send_to_client (c, &irm.header, GNUNET_NO);
990   /* notify new client about existing neighbours */
991   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
992   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
993   n = neighbours;
994   while (n != NULL)
995     {
996 #if DEBUG_CORE_CLIENT
997       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
999 #endif
1000       cnm.reserved = htonl (0);
1001       cnm.peer = n->peer;
1002       send_to_client (c, &cnm.header, GNUNET_NO);
1003       n = n->next;
1004     }
1005   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1006 }
1007
1008
1009 /**
1010  * A client disconnected, clean up.
1011  *
1012  * @param cls closure
1013  * @param client identification of the client
1014  */
1015 static void
1016 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1017 {
1018   struct Client *pos;
1019   struct Client *prev;
1020   struct Event *e;
1021
1022 #if DEBUG_CORE_CLIENT
1023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1024               "Client has disconnected from core service.\n");
1025 #endif
1026   prev = NULL;
1027   pos = clients;
1028   while (pos != NULL)
1029     {
1030       if (client == pos->client_handle)
1031         {
1032           if (prev == NULL)
1033             clients = pos->next;
1034           else
1035             prev->next = pos->next;
1036           if (pos->th != NULL)
1037             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1038           while (NULL != (e = pos->event_head))
1039             {
1040               pos->event_head = e->next;
1041               GNUNET_free (e);
1042             }
1043           GNUNET_free (pos);
1044           return;
1045         }
1046       prev = pos;
1047       pos = pos->next;
1048     }
1049   /* client never sent INIT */
1050 }
1051
1052
1053 /**
1054  * Handle REQUEST_CONFIGURE request.
1055  */
1056 static void
1057 handle_client_request_configure (void *cls,
1058                                  struct GNUNET_SERVER_Client *client,
1059                                  const struct GNUNET_MessageHeader *message)
1060 {
1061   const struct RequestConfigureMessage *rcm;
1062   struct Neighbour *n;
1063   struct ConfigurationInfoMessage cim;
1064   struct Client *c;
1065   int reserv;
1066   unsigned long long old_preference;
1067
1068 #if DEBUG_CORE_CLIENT
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "Core service receives `%s' request.\n", "CONFIGURE");
1071 #endif
1072   rcm = (const struct RequestConfigureMessage *) message;
1073   n = find_neighbour (&rcm->peer);
1074   memset (&cim, 0, sizeof (cim));
1075   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1076     {
1077       update_window (GNUNET_YES,
1078                      &n->available_send_window,
1079                      &n->last_asw_update,
1080                      n->bpm_out);
1081       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1082       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1083                                n->bpm_out_external_limit);
1084       reserv = ntohl (rcm->reserve_inbound);
1085       if (reserv < 0)
1086         {
1087           n->available_recv_window += reserv;
1088         }
1089       else if (reserv > 0)
1090         {
1091           update_window (GNUNET_NO,
1092                          &n->available_recv_window,
1093                          &n->last_arw_update, n->bpm_in);
1094           if (n->available_recv_window < reserv)
1095             reserv = n->available_recv_window;
1096           n->available_recv_window -= reserv;
1097         }
1098       old_preference = n->current_preference;
1099       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1100       if (old_preference > n->current_preference) 
1101         {
1102           /* overflow; cap at maximum value */
1103           n->current_preference = (unsigned long long) -1;
1104         }
1105       update_preference_sum (n->current_preference - old_preference);
1106       cim.reserved_amount = htonl (reserv);
1107       cim.bpm_in = htonl (n->bpm_in);
1108       cim.bpm_out = htonl (n->bpm_out);
1109       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1110       cim.preference = n->current_preference;
1111     }
1112   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1113   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1114   cim.peer = rcm->peer;
1115   c = find_client (client);
1116   if (c == NULL)
1117     {
1118       GNUNET_break (0);
1119       return;
1120     }
1121 #if DEBUG_CORE_CLIENT
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1124 #endif
1125   send_to_client (c, &cim.header, GNUNET_NO);
1126 }
1127
1128
1129 /**
1130  * Check if we have encrypted messages for the specified neighbour
1131  * pending, and if so, check with the transport about sending them
1132  * out.
1133  *
1134  * @param n neighbour to check.
1135  */
1136 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1137
1138
1139 /**
1140  * Function called when the transport service is ready to
1141  * receive an encrypted message for the respective peer
1142  *
1143  * @param cls neighbour to use message from
1144  * @param size number of bytes we can transmit
1145  * @param buf where to copy the message
1146  * @return number of bytes transmitted
1147  */
1148 static size_t
1149 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1150 {
1151   struct Neighbour *n = cls;
1152   struct MessageEntry *m;
1153   size_t ret;
1154   char *cbuf;
1155
1156   n->th = NULL;
1157   GNUNET_assert (NULL != (m = n->encrypted_head));
1158   n->encrypted_head = m->next;
1159   if (m->next == NULL)
1160     n->encrypted_tail = NULL;
1161   ret = 0;
1162   cbuf = buf;
1163   if (buf != NULL)
1164     {
1165       GNUNET_assert (size >= m->size);
1166       memcpy (cbuf, &m[1], m->size);
1167       ret = m->size;
1168       n->available_send_window -= m->size;
1169       process_encrypted_neighbour_queue (n);
1170 #if DEBUG_CORE
1171       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1173                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1174                   ret, GNUNET_i2s (&n->peer));
1175 #endif
1176     }
1177   else
1178     {
1179       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1180                   "Transmission for message of type %u and size %u failed\n",
1181                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1182                   m->size);
1183     }
1184   GNUNET_free (m);
1185   return ret;
1186 }
1187
1188
1189 /**
1190  * Check if we have plaintext messages for the specified neighbour
1191  * pending, and if so, consider batching and encrypting them (and
1192  * then trigger processing of the encrypted queue if needed).
1193  *
1194  * @param n neighbour to check.
1195  */
1196 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1197
1198
1199 /**
1200  * Check if we have encrypted messages for the specified neighbour
1201  * pending, and if so, check with the transport about sending them
1202  * out.
1203  *
1204  * @param n neighbour to check.
1205  */
1206 static void
1207 process_encrypted_neighbour_queue (struct Neighbour *n)
1208 {
1209   struct MessageEntry *m;
1210  
1211   if (n->th != NULL)
1212     return;  /* request already pending */
1213   if (n->encrypted_head == NULL)
1214     {
1215       /* encrypted queue empty, try plaintext instead */
1216       process_plaintext_neighbour_queue (n);
1217       return;
1218     }
1219 #if DEBUG_CORE
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1222               n->encrypted_head->size,
1223               GNUNET_i2s (&n->peer),
1224               GNUNET_TIME_absolute_get_remaining (n->
1225                                                   encrypted_head->deadline).
1226               value);
1227 #endif
1228   n->th =
1229     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1230                                             n->encrypted_head->size,
1231                                             n->encrypted_head->priority,
1232                                             GNUNET_TIME_absolute_get_remaining
1233                                             (n->encrypted_head->deadline),
1234                                             &notify_encrypted_transmit_ready,
1235                                             n);
1236   if (n->th == NULL)
1237     {
1238       /* message request too large (oops) */
1239       GNUNET_break (0);
1240       /* discard encrypted message */
1241       GNUNET_assert (NULL != (m = n->encrypted_head));
1242       n->encrypted_head = m->next;
1243       if (m->next == NULL)
1244         n->encrypted_tail = NULL;
1245       GNUNET_free (m);
1246       process_encrypted_neighbour_queue (n);
1247     }
1248 }
1249
1250
1251 /**
1252  * Decrypt size bytes from in and write the result to out.  Use the
1253  * key for inbound traffic of the given neighbour.  This function does
1254  * NOT do any integrity-checks on the result.
1255  *
1256  * @param n neighbour we are receiving from
1257  * @param iv initialization vector to use
1258  * @param in ciphertext
1259  * @param out plaintext
1260  * @param size size of in/out
1261  * @return GNUNET_OK on success
1262  */
1263 static int
1264 do_decrypt (struct Neighbour *n,
1265             const GNUNET_HashCode * iv,
1266             const void *in, void *out, size_t size)
1267 {
1268   if (size != (uint16_t) size)
1269     {
1270       GNUNET_break (0);
1271       return GNUNET_NO;
1272     }
1273   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1274       (n->status != PEER_STATE_KEY_CONFIRMED))
1275     {
1276       GNUNET_break_op (0);
1277       return GNUNET_SYSERR;
1278     }
1279   if (size !=
1280       GNUNET_CRYPTO_aes_decrypt (in,
1281                                  (uint16_t) size,
1282                                  &n->decrypt_key,
1283                                  (const struct
1284                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1285                                  out))
1286     {
1287       GNUNET_break (0);
1288       return GNUNET_SYSERR;
1289     }
1290 #if DEBUG_CORE
1291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292               "Decrypted %u bytes from `%4s' using key %u\n",
1293               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1294 #endif
1295   return GNUNET_OK;
1296 }
1297
1298
1299 /**
1300  * Encrypt size bytes from in and write the result to out.  Use the
1301  * key for outbound traffic of the given neighbour.
1302  *
1303  * @param n neighbour we are sending to
1304  * @param iv initialization vector to use
1305  * @param in ciphertext
1306  * @param out plaintext
1307  * @param size size of in/out
1308  * @return GNUNET_OK on success
1309  */
1310 static int
1311 do_encrypt (struct Neighbour *n,
1312             const GNUNET_HashCode * iv,
1313             const void *in, void *out, size_t size)
1314 {
1315   if (size != (uint16_t) size)
1316     {
1317       GNUNET_break (0);
1318       return GNUNET_NO;
1319     }
1320   GNUNET_assert (size ==
1321                  GNUNET_CRYPTO_aes_encrypt (in,
1322                                             (uint16_t) size,
1323                                             &n->encrypt_key,
1324                                             (const struct
1325                                              GNUNET_CRYPTO_AesInitializationVector
1326                                              *) iv, out));
1327 #if DEBUG_CORE
1328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329               "Encrypted %u bytes for `%4s' using key %u\n", size,
1330               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1331 #endif
1332   return GNUNET_OK;
1333 }
1334
1335
1336 /**
1337  * Select messages for transmission.  This heuristic uses a combination
1338  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1339  * and priority-based discard (in case no feasible schedule exist) and
1340  * speculative optimization (defer any kind of transmission until
1341  * we either create a batch of significant size, 25% of max, or until
1342  * we are close to a deadline).  Furthermore, when scheduling the
1343  * heuristic also packs as many messages into the batch as possible,
1344  * starting with those with the earliest deadline.  Yes, this is fun.
1345  *
1346  * @param n neighbour to select messages from
1347  * @param size number of bytes to select for transmission
1348  * @param retry_time set to the time when we should try again
1349  *        (only valid if this function returns zero)
1350  * @return number of bytes selected, or 0 if we decided to
1351  *         defer scheduling overall; in that case, retry_time is set.
1352  */
1353 static size_t
1354 select_messages (struct Neighbour *n,
1355                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1356 {
1357   struct MessageEntry *pos;
1358   struct MessageEntry *min;
1359   struct MessageEntry *last;
1360   unsigned int min_prio;
1361   struct GNUNET_TIME_Absolute t;
1362   struct GNUNET_TIME_Absolute now;
1363   uint64_t delta;
1364   uint64_t avail;
1365   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1366   size_t off;
1367   int discard_low_prio;
1368
1369   GNUNET_assert (NULL != n->messages);
1370   now = GNUNET_TIME_absolute_get ();
1371   /* last entry in linked list of messages processed */
1372   last = NULL;
1373   /* should we remove the entry with the lowest
1374      priority from consideration for scheduling at the
1375      end of the loop? */
1376   discard_low_prio = GNUNET_YES;
1377   while (GNUNET_YES == discard_low_prio)
1378     {
1379       min = NULL;
1380       min_prio = -1;
1381       discard_low_prio = GNUNET_NO;
1382       /* calculate number of bytes available for transmission at time "t" */
1383       update_window (GNUNET_NO,
1384                      &n->available_send_window,
1385                      &n->last_asw_update,
1386                      n->bpm_out);
1387       avail = n->available_send_window;
1388       t = n->last_asw_update;
1389       /* how many bytes have we (hypothetically) scheduled so far */
1390       off = 0;
1391       /* maximum time we can wait before transmitting anything
1392          and still make all of our deadlines */
1393       slack = -1;
1394
1395       pos = n->messages;
1396       /* note that we use "*2" here because we want to look
1397          a bit further into the future; much more makes no
1398          sense since new message might be scheduled in the
1399          meantime... */
1400       while ((pos != NULL) && (off < size * 2))
1401         {
1402           if (pos->do_transmit == GNUNET_YES)
1403             {
1404               /* already removed from consideration */
1405               pos = pos->next;
1406               continue;
1407             }
1408           if (discard_low_prio == GNUNET_NO)
1409             {
1410               delta = pos->deadline.value;
1411               if (delta < t.value)
1412                 delta = 0;
1413               else
1414                 delta = t.value - delta;
1415               avail += delta * n->bpm_out / 1000 / 60;
1416               if (avail < pos->size)
1417                 {
1418                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1419                 }
1420               else
1421                 {
1422                   avail -= pos->size;
1423                   /* update slack, considering both its absolute deadline
1424                      and relative deadlines caused by other messages
1425                      with their respective load */
1426                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1427                   if (pos->deadline.value < now.value)
1428                     slack = 0;
1429                   else
1430                     slack =
1431                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1432                 }
1433             }
1434           off += pos->size;
1435           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1436           if (pos->priority <= min_prio)
1437             {
1438               /* update min for discard */
1439               min_prio = pos->priority;
1440               min = pos;
1441             }
1442           pos = pos->next;
1443         }
1444       if (discard_low_prio)
1445         {
1446           GNUNET_assert (min != NULL);
1447           /* remove lowest-priority entry from consideration */
1448           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1449         }
1450       last = pos;
1451     }
1452   /* guard against sending "tiny" messages with large headers without
1453      urgent deadlines */
1454   if ( (slack > 1000) && (size > 4 * off) )
1455     {
1456       /* less than 25% of message would be filled with
1457          deadlines still being met if we delay by one
1458          second or more; so just wait for more data */
1459       retry_time->value = slack / 2;
1460       /* reset do_transmit values for next time */
1461       while (pos != last)
1462         {
1463           pos->do_transmit = GNUNET_NO;
1464           pos = pos->next;
1465         }
1466       return 0;
1467     }
1468   /* select marked messages (up to size) for transmission */
1469   off = 0;
1470   pos = n->messages;
1471   while (pos != last)
1472     {
1473       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1474         {
1475           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1476           off += pos->size;
1477           size -= pos->size;
1478         }
1479       else
1480         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1481       pos = pos->next;
1482     }
1483 #if DEBUG_CORE
1484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1485               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1486               off, GNUNET_i2s (&n->peer));
1487 #endif
1488   return off;
1489 }
1490
1491
1492 /**
1493  * Batch multiple messages into a larger buffer.
1494  *
1495  * @param n neighbour to take messages from
1496  * @param buf target buffer
1497  * @param size size of buf
1498  * @param deadline set to transmission deadline for the result
1499  * @param retry_time set to the time when we should try again
1500  *        (only valid if this function returns zero)
1501  * @param priority set to the priority of the batch
1502  * @return number of bytes written to buf (can be zero)
1503  */
1504 static size_t
1505 batch_message (struct Neighbour *n,
1506                char *buf,
1507                size_t size,
1508                struct GNUNET_TIME_Absolute *deadline,
1509                struct GNUNET_TIME_Relative *retry_time,
1510                unsigned int *priority)
1511 {
1512   struct MessageEntry *pos;
1513   struct MessageEntry *prev;
1514   struct MessageEntry *next;
1515   size_t ret;
1516
1517   ret = 0;
1518   *priority = 0;
1519   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1520   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1521   if (0 == select_messages (n, size, retry_time))
1522     {
1523       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1524                   "No messages selected, will try again in %llu ms\n",
1525                   retry_time->value);
1526       return 0;
1527     }
1528   pos = n->messages;
1529   prev = NULL;
1530   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1531     {
1532       next = pos->next;
1533       if (GNUNET_YES == pos->do_transmit)
1534         {
1535           GNUNET_assert (pos->size <= size);
1536           memcpy (&buf[ret], &pos[1], pos->size);
1537           ret += pos->size;
1538           size -= pos->size;
1539           *priority += pos->priority;
1540 #if DEBUG_CORE
1541           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542                       "Adding plaintext message with deadline %llu ms to batch\n",
1543                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1544 #endif
1545           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1546           GNUNET_free (pos);
1547           if (prev == NULL)
1548             n->messages = next;
1549           else
1550             prev->next = next;
1551         }
1552       else
1553         {
1554           prev = pos;
1555         }
1556       pos = next;
1557     }
1558 #if DEBUG_CORE
1559   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560               "Deadline for message batch is %llu ms\n",
1561               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1562 #endif
1563   return ret;
1564 }
1565
1566
1567 /**
1568  * Remove messages with deadlines that have long expired from
1569  * the queue.
1570  *
1571  * @param n neighbour to inspect
1572  */
1573 static void
1574 discard_expired_messages (struct Neighbour *n)
1575 {
1576   struct MessageEntry *prev;
1577   struct MessageEntry *next;
1578   struct MessageEntry *pos;
1579   struct GNUNET_TIME_Absolute now;
1580   struct GNUNET_TIME_Relative delta;
1581
1582   now = GNUNET_TIME_absolute_get ();
1583   prev = NULL;
1584   pos = n->messages;
1585   while (pos != NULL) 
1586     {
1587       next = pos->next;
1588       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1589       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1590         {
1591 #if DEBUG_CORE
1592           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1593                       "Message is %llu ms past due, discarding.\n",
1594                       delta.value);
1595 #endif
1596           if (prev == NULL)
1597             n->messages = next;
1598           else
1599             prev->next = next;
1600           GNUNET_free (pos);
1601         }
1602       else
1603         prev = pos;
1604       pos = next;
1605     }
1606 }
1607
1608
1609 /**
1610  * Signature of the main function of a task.
1611  *
1612  * @param cls closure
1613  * @param tc context information (why was this task triggered now)
1614  */
1615 static void
1616 retry_plaintext_processing (void *cls,
1617                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1618 {
1619   struct Neighbour *n = cls;
1620
1621   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1622   process_plaintext_neighbour_queue (n);
1623 }
1624
1625
1626 /**
1627  * Send our key (and encrypted PING) to the other peer.
1628  *
1629  * @param n the other peer
1630  */
1631 static void send_key (struct Neighbour *n);
1632
1633
1634 /**
1635  * Check if we have plaintext messages for the specified neighbour
1636  * pending, and if so, consider batching and encrypting them (and
1637  * then trigger processing of the encrypted queue if needed).
1638  *
1639  * @param n neighbour to check.
1640  */
1641 static void
1642 process_plaintext_neighbour_queue (struct Neighbour *n)
1643 {
1644   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1645   size_t used;
1646   size_t esize;
1647   struct EncryptedMessage *em;  /* encrypted message */
1648   struct EncryptedMessage *ph;  /* plaintext header */
1649   struct MessageEntry *me;
1650   unsigned int priority;
1651   struct GNUNET_TIME_Absolute deadline;
1652   struct GNUNET_TIME_Relative retry_time;
1653
1654   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1655     {
1656       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1657       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1658     }
1659   switch (n->status)
1660     {
1661     case PEER_STATE_DOWN:
1662       send_key (n);
1663 #if DEBUG_CORE
1664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1666                   GNUNET_i2s(&n->peer));
1667 #endif
1668       return;
1669     case PEER_STATE_KEY_SENT:
1670       GNUNET_assert (n->retry_set_key_task !=
1671                      GNUNET_SCHEDULER_NO_TASK);
1672 #if DEBUG_CORE
1673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1674                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1675                   GNUNET_i2s(&n->peer));
1676 #endif
1677       return;
1678     case PEER_STATE_KEY_RECEIVED:
1679       GNUNET_assert (n->retry_set_key_task !=
1680                      GNUNET_SCHEDULER_NO_TASK);
1681 #if DEBUG_CORE
1682       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1684                   GNUNET_i2s(&n->peer));
1685 #endif
1686       return;
1687     case PEER_STATE_KEY_CONFIRMED:
1688       /* ready to continue */
1689       break;
1690     }
1691   discard_expired_messages (n);
1692   if (n->messages == NULL)
1693     {
1694 #if DEBUG_CORE
1695       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696                   "Plaintext message queue for `%4s' is empty.\n",
1697                   GNUNET_i2s(&n->peer));
1698 #endif
1699       return;                   /* no pending messages */
1700     }
1701   if (n->encrypted_head != NULL)
1702     {
1703 #if DEBUG_CORE
1704       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1705                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1706                   GNUNET_i2s(&n->peer));
1707 #endif
1708       return;                   /* wait for messages already encrypted to be
1709                                    processed first! */
1710     }
1711   ph = (struct EncryptedMessage *) pbuf;
1712   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1713   priority = 0;
1714   used = sizeof (struct EncryptedMessage);
1715   used += batch_message (n,
1716                          &pbuf[used],
1717                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1718                          &deadline, &retry_time, &priority);
1719   if (used == sizeof (struct EncryptedMessage))
1720     {
1721 #if DEBUG_CORE
1722       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1724                   GNUNET_i2s(&n->peer));
1725 #endif
1726       /* no messages selected for sending, try again later... */
1727       n->retry_plaintext_task =
1728         GNUNET_SCHEDULER_add_delayed (sched,
1729                                       retry_time,
1730                                       &retry_plaintext_processing, n);
1731       return;
1732     }
1733   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1734   ph->inbound_bpm_limit = htonl (n->bpm_in);
1735   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1736
1737   /* setup encryption message header */
1738   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1739   me->deadline = deadline;
1740   me->priority = priority;
1741   me->size = used;
1742   em = (struct EncryptedMessage *) &me[1];
1743   em->header.size = htons (used);
1744   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1745   em->reserved = htonl (0);
1746   esize = used - ENCRYPTED_HEADER_SIZE;
1747   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1748   /* encrypt */
1749 #if DEBUG_CORE
1750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1751               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1752               esize,
1753               GNUNET_i2s(&n->peer),
1754               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1755 #endif
1756   GNUNET_assert (GNUNET_OK ==
1757                  do_encrypt (n,
1758                              &em->plaintext_hash,
1759                              &ph->sequence_number,
1760                              &em->sequence_number, esize));
1761   /* append to transmission list */
1762   if (n->encrypted_tail == NULL)
1763     n->encrypted_head = me;
1764   else
1765     n->encrypted_tail->next = me;
1766   n->encrypted_tail = me;
1767   process_encrypted_neighbour_queue (n);
1768 }
1769
1770
1771 /**
1772  * Handle CORE_SEND request.
1773  *
1774  * @param cls unused
1775  * @param client the client issuing the request
1776  * @param message the "struct SendMessage"
1777  */
1778 static void
1779 handle_client_send (void *cls,
1780                     struct GNUNET_SERVER_Client *client,
1781                     const struct GNUNET_MessageHeader *message);
1782
1783
1784 /**
1785  * Function called to notify us that we either succeeded
1786  * or failed to connect (at the transport level) to another
1787  * peer.  We should either free the message we were asked
1788  * to transmit or re-try adding it to the queue.
1789  *
1790  * @param cls closure
1791  * @param size number of bytes available in buf
1792  * @param buf where the callee should write the message
1793  * @return number of bytes written to buf
1794  */
1795 static size_t
1796 send_connect_continuation (void *cls, size_t size, void *buf)
1797 {
1798   struct SendMessage *sm = cls;
1799
1800   if (buf == NULL)
1801     {
1802 #if DEBUG_CORE
1803       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1804                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1805                   GNUNET_i2s (&sm->peer));
1806 #endif
1807       GNUNET_free (sm);
1808       return 0;
1809     }
1810 #if DEBUG_CORE
1811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1812               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1813               GNUNET_i2s (&sm->peer));
1814 #endif
1815   handle_client_send (NULL, NULL, &sm->header);
1816   GNUNET_free (sm);
1817   return 0;
1818 }
1819
1820
1821 /**
1822  * Handle CORE_SEND request.
1823  *
1824  * @param cls unused
1825  * @param client the client issuing the request
1826  * @param message the "struct SendMessage"
1827  */
1828 static void
1829 handle_client_send (void *cls,
1830                     struct GNUNET_SERVER_Client *client,
1831                     const struct GNUNET_MessageHeader *message)
1832 {
1833   const struct SendMessage *sm;
1834   struct SendMessage *smc;
1835   const struct GNUNET_MessageHeader *mh;
1836   struct Neighbour *n;
1837   struct MessageEntry *prev;
1838   struct MessageEntry *pos;
1839   struct MessageEntry *e; 
1840   struct MessageEntry *min_prio_entry;
1841   struct MessageEntry *min_prio_prev;
1842   unsigned int min_prio;
1843   unsigned int queue_size;
1844   uint16_t msize;
1845
1846   msize = ntohs (message->size);
1847   if (msize <
1848       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1849     {
1850       GNUNET_break (0);
1851       if (client != NULL)
1852         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1853       return;
1854     }
1855   sm = (const struct SendMessage *) message;
1856   msize -= sizeof (struct SendMessage);
1857   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1858   if (msize != ntohs (mh->size))
1859     {
1860       GNUNET_break (0);
1861       if (client != NULL)
1862         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1863       return;
1864     }
1865   n = find_neighbour (&sm->peer);
1866   if (n == NULL)
1867     {
1868 #if DEBUG_CORE
1869       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1870                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1871                   "SEND",
1872                   GNUNET_i2s (&sm->peer),
1873                   GNUNET_TIME_absolute_get_remaining
1874                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1875 #endif
1876       msize += sizeof (struct SendMessage);
1877       /* ask transport to connect to the peer */
1878       smc = GNUNET_malloc (msize);
1879       memcpy (smc, sm, msize);
1880       if (NULL ==
1881           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1882                                                   &sm->peer,
1883                                                   0, 0,
1884                                                   GNUNET_TIME_absolute_get_remaining
1885                                                   (GNUNET_TIME_absolute_ntoh
1886                                                    (sm->deadline)),
1887                                                   &send_connect_continuation,
1888                                                   smc))
1889         {
1890           /* transport has already a request pending for this peer! */
1891 #if DEBUG_CORE
1892           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1893                       "Dropped second message destined for `%4s' since connection is still down.\n",
1894                       GNUNET_i2s(&sm->peer));
1895 #endif
1896           GNUNET_free (smc);
1897         }
1898       if (client != NULL)
1899         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1900       return;
1901     }
1902 #if DEBUG_CORE
1903   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1905               "SEND",
1906               msize, 
1907               GNUNET_i2s (&sm->peer));
1908 #endif
1909   /* bound queue size */
1910   discard_expired_messages (n);
1911   min_prio = (unsigned int) -1;
1912   min_prio_entry = NULL;
1913   min_prio_prev = NULL;
1914   queue_size = 0;
1915   prev = NULL;
1916   pos = n->messages;
1917   while (pos != NULL) 
1918     {
1919       if (pos->priority < min_prio)
1920         {
1921           min_prio_entry = pos;
1922           min_prio_prev = prev;
1923           min_prio = pos->priority;
1924         }
1925       queue_size++;
1926       prev = pos;
1927       pos = pos->next;
1928     }
1929   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1930     {
1931       /* queue full */
1932       if (ntohl(sm->priority) <= min_prio)
1933         {
1934           /* discard new entry */
1935 #if DEBUG_CORE
1936           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937                       "Queue full, discarding new request\n");
1938 #endif
1939           if (client != NULL)
1940             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1941           return;
1942         }
1943       /* discard "min_prio_entry" */
1944 #if DEBUG_CORE
1945       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1946                   "Queue full, discarding existing older request\n");
1947 #endif
1948       if (min_prio_prev == NULL)
1949         n->messages = min_prio_entry->next;
1950       else
1951         min_prio_prev->next = min_prio_entry->next;      
1952       GNUNET_free (min_prio_entry);     
1953     }
1954
1955 #if DEBUG_CORE
1956   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1957               "Adding transmission request for `%4s' to queue\n",
1958               GNUNET_i2s (&sm->peer));
1959 #endif  
1960   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1961   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1962   e->priority = ntohl (sm->priority);
1963   e->size = msize;
1964   memcpy (&e[1], mh, msize);
1965
1966   /* insert, keep list sorted by deadline */
1967   prev = NULL;
1968   pos = n->messages;
1969   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1970     {
1971       prev = pos;
1972       pos = pos->next;
1973     }
1974   if (prev == NULL)
1975     n->messages = e;
1976   else
1977     prev->next = e;
1978   e->next = pos;
1979
1980   /* consider scheduling now */
1981   process_plaintext_neighbour_queue (n);
1982   if (client != NULL)
1983     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1984 }
1985
1986
1987 /**
1988  * List of handlers for the messages understood by this
1989  * service.
1990  */
1991 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1992   {&handle_client_init, NULL,
1993    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1994   {&handle_client_request_configure, NULL,
1995    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1996    sizeof (struct RequestConfigureMessage)},
1997   {&handle_client_send, NULL,
1998    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1999   {NULL, NULL, 0, 0}
2000 };
2001
2002
2003 /**
2004  * PEERINFO is giving us a HELLO for a peer.  Add the
2005  * public key to the neighbour's struct and retry
2006  * send_key.  Or, if we did not get a HELLO, just do
2007  * nothing.
2008  *
2009  * @param cls NULL
2010  * @param peer the peer for which this is the HELLO
2011  * @param hello HELLO message of that peer
2012  * @param trust amount of trust we currently have in that peer
2013  */
2014 static void
2015 process_hello_retry_send_key (void *cls,
2016                               const struct GNUNET_PeerIdentity *peer,
2017                               const struct GNUNET_HELLO_Message *hello,
2018                               uint32_t trust)
2019 {
2020   struct Neighbour *n = cls;
2021
2022   if (peer == NULL)
2023     {
2024       n->pitr = NULL;
2025       return;
2026     }
2027   if (n->public_key != NULL)
2028     return;
2029 #if DEBUG_CORE
2030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2031               "Received new `%s' message for `%4s', initiating key exchange.\n",
2032               "HELLO",
2033               GNUNET_i2s (peer));
2034 #endif
2035   n->public_key =
2036     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2037   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2038     {
2039       GNUNET_free (n->public_key);
2040       n->public_key = NULL;
2041       return;
2042     }
2043   send_key (n);
2044 }
2045
2046
2047 /**
2048  * Task that will retry "send_key" if our previous attempt failed
2049  * to yield a PONG.
2050  */
2051 static void
2052 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2053 {
2054   struct Neighbour *n = cls;
2055
2056   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2057   n->set_key_retry_frequency =
2058     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2059   send_key (n);
2060 }
2061
2062
2063 /**
2064  * Send our key (and encrypted PING) to the other peer.
2065  *
2066  * @param n the other peer
2067  */
2068 static void
2069 send_key (struct Neighbour *n)
2070 {
2071   struct SetKeyMessage *sm;
2072   struct MessageEntry *me;
2073   struct PingMessage pp;
2074   struct PingMessage *pm;
2075
2076 #if DEBUG_CORE
2077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078               "Asked to perform key exchange with `%4s'.\n",
2079               GNUNET_i2s (&n->peer));
2080 #endif
2081   if (n->public_key == NULL)
2082     {
2083       /* lookup n's public key, then try again */
2084 #if DEBUG_CORE
2085       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086                   "Lacking public key for `%4s', trying to obtain one.\n",
2087                   GNUNET_i2s (&n->peer));
2088 #endif
2089       GNUNET_assert (n->pitr == NULL);
2090       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2091                                          sched,
2092                                          &n->peer,
2093                                          0,
2094                                          GNUNET_TIME_UNIT_MINUTES,
2095                                          &process_hello_retry_send_key, n);
2096       return;
2097     }
2098   /* first, set key message */
2099   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2100                       sizeof (struct SetKeyMessage));
2101   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2102   me->priority = SET_KEY_PRIORITY;
2103   me->size = sizeof (struct SetKeyMessage);
2104   if (n->encrypted_head == NULL)
2105     n->encrypted_head = me;
2106   else
2107     n->encrypted_tail->next = me;
2108   n->encrypted_tail = me;
2109   sm = (struct SetKeyMessage *) &me[1];
2110   sm->header.size = htons (sizeof (struct SetKeyMessage));
2111   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2112   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2113                                         PEER_STATE_KEY_SENT : n->status));
2114   sm->purpose.size =
2115     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2116            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2117            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2118            sizeof (struct GNUNET_PeerIdentity));
2119   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2120   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2121   sm->target = n->peer;
2122   GNUNET_assert (GNUNET_OK ==
2123                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2124                                             sizeof (struct
2125                                                     GNUNET_CRYPTO_AesSessionKey),
2126                                             n->public_key,
2127                                             &sm->encrypted_key));
2128   GNUNET_assert (GNUNET_OK ==
2129                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2130                                          &sm->signature));
2131
2132   /* second, encrypted PING message */
2133   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2134                       sizeof (struct PingMessage));
2135   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2136   me->priority = PING_PRIORITY;
2137   me->size = sizeof (struct PingMessage);
2138   n->encrypted_tail->next = me;
2139   n->encrypted_tail = me;
2140   pm = (struct PingMessage *) &me[1];
2141   pm->header.size = htons (sizeof (struct PingMessage));
2142   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2143   pp.challenge = htonl (n->ping_challenge);
2144   pp.target = n->peer;
2145 #if DEBUG_CORE
2146   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2147               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2148               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2149   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2150               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2151               "PING",
2152               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2153 #endif
2154   do_encrypt (n,
2155               &n->peer.hashPubKey,
2156               &pp.challenge,
2157               &pm->challenge,
2158               sizeof (struct PingMessage) -
2159               sizeof (struct GNUNET_MessageHeader));
2160   /* update status */
2161   switch (n->status)
2162     {
2163     case PEER_STATE_DOWN:
2164       n->status = PEER_STATE_KEY_SENT;
2165       break;
2166     case PEER_STATE_KEY_SENT:
2167       break;
2168     case PEER_STATE_KEY_RECEIVED:
2169       break;
2170     case PEER_STATE_KEY_CONFIRMED:
2171       break;
2172     default:
2173       GNUNET_break (0);
2174       break;
2175     }
2176 #if DEBUG_CORE
2177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178               "Have %llu ms left for `%s' transmission.\n",
2179               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2180               "SET_KEY");
2181 #endif
2182   /* trigger queue processing */
2183   process_encrypted_neighbour_queue (n);
2184   if (n->status != PEER_STATE_KEY_CONFIRMED)
2185     n->retry_set_key_task
2186       = GNUNET_SCHEDULER_add_delayed (sched,
2187                                       n->set_key_retry_frequency,
2188                                       &set_key_retry_task, n);
2189 }
2190
2191
2192 /**
2193  * We received a SET_KEY message.  Validate and update
2194  * our key material and status.
2195  *
2196  * @param n the neighbour from which we received message m
2197  * @param m the set key message we received
2198  */
2199 static void
2200 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2201
2202
2203 /**
2204  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2205  * the neighbour's struct and retry handling the set_key message.  Or,
2206  * if we did not get a HELLO, just free the set key message.
2207  *
2208  * @param cls pointer to the set key message
2209  * @param peer the peer for which this is the HELLO
2210  * @param hello HELLO message of that peer
2211  * @param trust amount of trust we currently have in that peer
2212  */
2213 static void
2214 process_hello_retry_handle_set_key (void *cls,
2215                                     const struct GNUNET_PeerIdentity *peer,
2216                                     const struct GNUNET_HELLO_Message *hello,
2217                                     uint32_t trust)
2218 {
2219   struct Neighbour *n = cls;
2220   struct SetKeyMessage *sm = n->skm;
2221
2222   if (peer == NULL)
2223     {
2224       GNUNET_free (sm);
2225       n->skm = NULL;
2226       n->pitr = NULL;
2227       return;
2228     }
2229   if (n->public_key != NULL)
2230     return;                     /* multiple HELLOs match!? */
2231   n->public_key =
2232     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2233   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2234     {
2235       GNUNET_break_op (0);
2236       GNUNET_free (n->public_key);
2237       n->public_key = NULL;
2238       return;
2239     }
2240 #if DEBUG_CORE
2241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2243               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2244 #endif
2245   handle_set_key (n, sm);
2246 }
2247
2248
2249 /**
2250  * We received a PING message.  Validate and transmit
2251  * PONG.
2252  *
2253  * @param n sender of the PING
2254  * @param m the encrypted PING message itself
2255  */
2256 static void
2257 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2258 {
2259   struct PingMessage t;
2260   struct PingMessage *tp;
2261   struct MessageEntry *me;
2262
2263 #if DEBUG_CORE
2264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265               "Core service receives `%s' request from `%4s'.\n",
2266               "PING", GNUNET_i2s (&n->peer));
2267 #endif
2268   if (GNUNET_OK !=
2269       do_decrypt (n,
2270                   &my_identity.hashPubKey,
2271                   &m->challenge,
2272                   &t.challenge,
2273                   sizeof (struct PingMessage) -
2274                   sizeof (struct GNUNET_MessageHeader)))
2275     return;
2276 #if DEBUG_CORE
2277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2278               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2279               "PING",
2280               GNUNET_i2s (&t.target),
2281               ntohl (t.challenge), n->decrypt_key.crc32);
2282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2283               "Target of `%s' request is `%4s'.\n",
2284               "PING", GNUNET_i2s (&t.target));
2285 #endif
2286   if (0 != memcmp (&t.target,
2287                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2288     {
2289       GNUNET_break_op (0);
2290       return;
2291     }
2292   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2293                       sizeof (struct PingMessage));
2294   if (n->encrypted_tail != NULL)
2295     n->encrypted_tail->next = me;
2296   else
2297     {
2298       n->encrypted_tail = me;
2299       n->encrypted_head = me;
2300     }
2301   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2302   me->priority = PONG_PRIORITY;
2303   me->size = sizeof (struct PingMessage);
2304   tp = (struct PingMessage *) &me[1];
2305   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2306   tp->header.size = htons (sizeof (struct PingMessage));
2307   do_encrypt (n,
2308               &my_identity.hashPubKey,
2309               &t.challenge,
2310               &tp->challenge,
2311               sizeof (struct PingMessage) -
2312               sizeof (struct GNUNET_MessageHeader));
2313 #if DEBUG_CORE
2314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2315               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2316               ntohl (t.challenge), n->encrypt_key.crc32);
2317 #endif
2318   /* trigger queue processing */
2319   process_encrypted_neighbour_queue (n);
2320 }
2321
2322
2323 /**
2324  * We received a SET_KEY message.  Validate and update
2325  * our key material and status.
2326  *
2327  * @param n the neighbour from which we received message m
2328  * @param m the set key message we received
2329  */
2330 static void
2331 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2332 {
2333   struct SetKeyMessage *m_cpy;
2334   struct GNUNET_TIME_Absolute t;
2335   struct GNUNET_CRYPTO_AesSessionKey k;
2336   struct PingMessage *ping;
2337   enum PeerStateMachine sender_status;
2338
2339 #if DEBUG_CORE
2340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2341               "Core service receives `%s' request from `%4s'.\n",
2342               "SET_KEY", GNUNET_i2s (&n->peer));
2343 #endif
2344   if (n->public_key == NULL)
2345     {
2346 #if DEBUG_CORE
2347       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2348                   "Lacking public key for peer, trying to obtain one.\n");
2349 #endif
2350       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2351       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2352       /* lookup n's public key, then try again */
2353       GNUNET_assert (n->pitr == NULL);
2354       GNUNET_assert (n->skm == NULL);
2355       n->skm = m_cpy;
2356       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2357                                          sched,
2358                                          &n->peer,
2359                                          0,
2360                                          GNUNET_TIME_UNIT_MINUTES,
2361                                          &process_hello_retry_handle_set_key, n);
2362       return;
2363     }
2364   if (0 != memcmp (&m->target,
2365                    &my_identity,
2366                    sizeof (struct GNUNET_PeerIdentity)))
2367     {
2368       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2369                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2370       return;
2371     }
2372   if ((ntohl (m->purpose.size) !=
2373        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2374        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2375        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2376        sizeof (struct GNUNET_PeerIdentity)) ||
2377       (GNUNET_OK !=
2378        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2379                                  &m->purpose, &m->signature, n->public_key)))
2380     {
2381       /* invalid signature */
2382       GNUNET_break_op (0);
2383       return;
2384     }
2385   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2386   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2387        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2388       (t.value < n->decrypt_key_created.value))
2389     {
2390       /* this could rarely happen due to massive re-ordering of
2391          messages on the network level, but is most likely either
2392          a bug or some adversary messing with us.  Report. */
2393       GNUNET_break_op (0);
2394       return;
2395     }
2396 #if DEBUG_CORE
2397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2398 #endif  
2399   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2400                                   &m->encrypted_key,
2401                                   &k,
2402                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2403        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2404       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2405     {
2406       /* failed to decrypt !? */
2407       GNUNET_break_op (0);
2408       return;
2409     }
2410
2411   n->decrypt_key = k;
2412   if (n->decrypt_key_created.value != t.value)
2413     {
2414       /* fresh key, reset sequence numbers */
2415       n->last_sequence_number_received = 0;
2416       n->last_packets_bitmap = 0;
2417       n->decrypt_key_created = t;
2418     }
2419   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2420   switch (n->status)
2421     {
2422     case PEER_STATE_DOWN:
2423       n->status = PEER_STATE_KEY_RECEIVED;
2424 #if DEBUG_CORE
2425       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2426                   "Responding to `%s' with my own key.\n", "SET_KEY");
2427 #endif
2428       send_key (n);
2429       break;
2430     case PEER_STATE_KEY_SENT:
2431     case PEER_STATE_KEY_RECEIVED:
2432       n->status = PEER_STATE_KEY_RECEIVED;
2433       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2434           (sender_status != PEER_STATE_KEY_CONFIRMED))
2435         {
2436 #if DEBUG_CORE
2437           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2438                       "Responding to `%s' with my own key (other peer has status %u).\n",
2439                       "SET_KEY", sender_status);
2440 #endif
2441           send_key (n);
2442         }
2443       break;
2444     case PEER_STATE_KEY_CONFIRMED:
2445       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2446           (sender_status != PEER_STATE_KEY_CONFIRMED))
2447         {         
2448 #if DEBUG_CORE
2449           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2450                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2451                       "SET_KEY", sender_status);
2452 #endif
2453           send_key (n);
2454         }
2455       break;
2456     default:
2457       GNUNET_break (0);
2458       break;
2459     }
2460   if (n->pending_ping != NULL)
2461     {
2462       ping = n->pending_ping;
2463       n->pending_ping = NULL;
2464       handle_ping (n, ping);
2465       GNUNET_free (ping);
2466     }
2467 }
2468
2469
2470 /**
2471  * We received a PONG message.  Validate and update
2472  * our status.
2473  *
2474  * @param n sender of the PONG
2475  * @param m the encrypted PONG message itself
2476  */
2477 static void
2478 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2479 {
2480   struct PingMessage t;
2481
2482 #if DEBUG_CORE
2483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2484               "Core service receives `%s' request from `%4s'.\n",
2485               "PONG", GNUNET_i2s (&n->peer));
2486 #endif
2487   if (GNUNET_OK !=
2488       do_decrypt (n,
2489                   &n->peer.hashPubKey,
2490                   &m->challenge,
2491                   &t.challenge,
2492                   sizeof (struct PingMessage) -
2493                   sizeof (struct GNUNET_MessageHeader)))
2494     return;
2495 #if DEBUG_CORE
2496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2497               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2498               "PONG",
2499               GNUNET_i2s (&t.target),
2500               ntohl (t.challenge), n->decrypt_key.crc32);
2501 #endif
2502   if ((0 != memcmp (&t.target,
2503                     &n->peer,
2504                     sizeof (struct GNUNET_PeerIdentity))) ||
2505       (n->ping_challenge != ntohl (t.challenge)))
2506     {
2507       /* PONG malformed */
2508 #if DEBUG_CORE
2509       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2510                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2511                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2513                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2514                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2515 #endif
2516       GNUNET_break_op (0);
2517       return;
2518     }
2519   switch (n->status)
2520     {
2521     case PEER_STATE_DOWN:
2522       GNUNET_break (0);         /* should be impossible */
2523       return;
2524     case PEER_STATE_KEY_SENT:
2525       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2526       return;
2527     case PEER_STATE_KEY_RECEIVED:
2528       n->status = PEER_STATE_KEY_CONFIRMED;
2529       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2530         {
2531           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2532           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2533         }
2534       process_encrypted_neighbour_queue (n);
2535       break;
2536     case PEER_STATE_KEY_CONFIRMED:
2537       /* duplicate PONG? */
2538       break;
2539     default:
2540       GNUNET_break (0);
2541       break;
2542     }
2543 }
2544
2545
2546 /**
2547  * Send a P2P message to a client.
2548  *
2549  * @param sender who sent us the message?
2550  * @param client who should we give the message to?
2551  * @param m contains the message to transmit
2552  * @param msize number of bytes in buf to transmit
2553  */
2554 static void
2555 send_p2p_message_to_client (struct Neighbour *sender,
2556                             struct Client *client,
2557                             const void *m, size_t msize)
2558 {
2559   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2560   struct NotifyTrafficMessage *ntm;
2561
2562 #if DEBUG_CORE
2563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2564               "Core service passes message from `%4s' of type %u to client.\n",
2565               GNUNET_i2s(&sender->peer),
2566               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2567 #endif
2568   ntm = (struct NotifyTrafficMessage *) buf;
2569   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2570   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2571   ntm->reserved = htonl (0);
2572   ntm->peer = sender->peer;
2573   memcpy (&ntm[1], m, msize);
2574   send_to_client (client, &ntm->header, GNUNET_YES);
2575 }
2576
2577
2578 /**
2579  * Deliver P2P message to interested clients.
2580  *
2581  * @param sender who sent us the message?
2582  * @param m the message
2583  * @param msize size of the message (including header)
2584  */
2585 static void
2586 deliver_message (struct Neighbour *sender,
2587                  const struct GNUNET_MessageHeader *m, size_t msize)
2588 {
2589   struct Client *cpos;
2590   uint16_t type;
2591   unsigned int tpos;
2592   int deliver_full;
2593
2594   type = ntohs (m->type);
2595   cpos = clients;
2596   while (cpos != NULL)
2597     {
2598       deliver_full = GNUNET_NO;
2599       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2600         deliver_full = GNUNET_YES;
2601       else
2602         {
2603           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2604             {
2605               if (type != cpos->types[tpos])
2606                 continue;
2607               deliver_full = GNUNET_YES;
2608               break;
2609             }
2610         }
2611       if (GNUNET_YES == deliver_full)
2612         send_p2p_message_to_client (sender, cpos, m, msize);
2613       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2614         send_p2p_message_to_client (sender, cpos, m,
2615                                     sizeof (struct GNUNET_MessageHeader));
2616       cpos = cpos->next;
2617     }
2618 }
2619
2620
2621 /**
2622  * Align P2P message and then deliver to interested clients.
2623  *
2624  * @param sender who sent us the message?
2625  * @param buffer unaligned (!) buffer containing message
2626  * @param msize size of the message (including header)
2627  */
2628 static void
2629 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2630 {
2631   char abuf[msize];
2632
2633   /* TODO: call to statistics? */
2634   memcpy (abuf, buffer, msize);
2635   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2636 }
2637
2638
2639 /**
2640  * Deliver P2P messages to interested clients.
2641  *
2642  * @param sender who sent us the message?
2643  * @param buffer buffer containing messages, can be modified
2644  * @param buffer_size size of the buffer (overall)
2645  * @param offset offset where messages in the buffer start
2646  */
2647 static void
2648 deliver_messages (struct Neighbour *sender,
2649                   const char *buffer, size_t buffer_size, size_t offset)
2650 {
2651   struct GNUNET_MessageHeader *mhp;
2652   struct GNUNET_MessageHeader mh;
2653   uint16_t msize;
2654   int need_align;
2655
2656   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2657     {
2658       if (0 != offset % sizeof (uint16_t))
2659         {
2660           /* outch, need to copy to access header */
2661           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2662           mhp = &mh;
2663         }
2664       else
2665         {
2666           /* can access header directly */
2667           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2668         }
2669       msize = ntohs (mhp->size);
2670       if (msize + offset > buffer_size)
2671         {
2672           /* malformed message, header says it is larger than what
2673              would fit into the overall buffer */
2674           GNUNET_break_op (0);
2675           break;
2676         }
2677 #if HAVE_UNALIGNED_64_ACCESS
2678       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2679 #else
2680       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2681 #endif
2682       if (GNUNET_YES == need_align)
2683         align_and_deliver (sender, &buffer[offset], msize);
2684       else
2685         deliver_message (sender,
2686                          (const struct GNUNET_MessageHeader *)
2687                          &buffer[offset], msize);
2688       offset += msize;
2689     }
2690 }
2691
2692
2693 /**
2694  * We received an encrypted message.  Decrypt, validate and
2695  * pass on to the appropriate clients.
2696  */
2697 static void
2698 handle_encrypted_message (struct Neighbour *n,
2699                           const struct EncryptedMessage *m)
2700 {
2701   size_t size = ntohs (m->header.size);
2702   char buf[size];
2703   struct EncryptedMessage *pt;  /* plaintext */
2704   GNUNET_HashCode ph;
2705   size_t off;
2706   uint32_t snum;
2707   struct GNUNET_TIME_Absolute t;
2708
2709 #if DEBUG_CORE
2710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2711               "Core service receives `%s' request from `%4s'.\n",
2712               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2713 #endif  
2714   /* decrypt */
2715   if (GNUNET_OK !=
2716       do_decrypt (n,
2717                   &m->plaintext_hash,
2718                   &m->sequence_number,
2719                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2720     return;
2721   pt = (struct EncryptedMessage *) buf;
2722
2723   /* validate hash */
2724   GNUNET_CRYPTO_hash (&pt->sequence_number,
2725                       size - ENCRYPTED_HEADER_SIZE, &ph);
2726   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2727     {
2728       /* checksum failed */
2729       GNUNET_break_op (0);
2730       return;
2731     }
2732
2733   /* validate sequence number */
2734   snum = ntohl (pt->sequence_number);
2735   if (n->last_sequence_number_received == snum)
2736     {
2737       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2738                   "Received duplicate message, ignoring.\n");
2739       /* duplicate, ignore */
2740       return;
2741     }
2742   if ((n->last_sequence_number_received > snum) &&
2743       (n->last_sequence_number_received - snum > 32))
2744     {
2745       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2746                   "Received ancient out of sequence message, ignoring.\n");
2747       /* ancient out of sequence, ignore */
2748       return;
2749     }
2750   if (n->last_sequence_number_received > snum)
2751     {
2752       unsigned int rotbit =
2753         1 << (n->last_sequence_number_received - snum - 1);
2754       if ((n->last_packets_bitmap & rotbit) != 0)
2755         {
2756           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2757                       "Received duplicate message, ignoring.\n");
2758           /* duplicate, ignore */
2759           return;
2760         }
2761       n->last_packets_bitmap |= rotbit;
2762     }
2763   if (n->last_sequence_number_received < snum)
2764     {
2765       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2766       n->last_sequence_number_received = snum;
2767     }
2768
2769   /* check timestamp */
2770   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2771   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2772     {
2773       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2774                   _
2775                   ("Message received far too old (%llu ms). Content ignored.\n"),
2776                   GNUNET_TIME_absolute_get_duration (t).value);
2777       return;
2778     }
2779
2780   /* process decrypted message(s) */
2781   update_window (GNUNET_YES,
2782                  &n->available_send_window,
2783                  &n->last_asw_update,
2784                  n->bpm_out);
2785   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2786   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2787                            n->bpm_out_internal_limit);
2788   n->last_activity = GNUNET_TIME_absolute_get ();
2789   off = sizeof (struct EncryptedMessage);
2790   deliver_messages (n, buf, size, off);
2791 }
2792
2793
2794 /**
2795  * Function called by the transport for each received message.
2796  *
2797  * @param cls closure
2798  * @param latency estimated latency for communicating with the
2799  *             given peer
2800  * @param peer (claimed) identity of the other peer
2801  * @param message the message
2802  */
2803 static void
2804 handle_transport_receive (void *cls,
2805                           struct GNUNET_TIME_Relative latency,
2806                           const struct GNUNET_PeerIdentity *peer,
2807                           const struct GNUNET_MessageHeader *message)
2808 {
2809   struct Neighbour *n;
2810   struct GNUNET_TIME_Absolute now;
2811   int up;
2812   uint16_t type;
2813   uint16_t size;
2814
2815 #if DEBUG_CORE
2816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2817               "Received message of type %u from `%4s', demultiplexing.\n",
2818               ntohs (message->type), GNUNET_i2s (peer));
2819 #endif
2820   n = find_neighbour (peer);
2821   if (n == NULL)
2822     {
2823       GNUNET_break (0);
2824       return;
2825     }
2826   n->last_latency = latency;
2827   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2828   type = ntohs (message->type);
2829   size = ntohs (message->size);
2830   switch (type)
2831     {
2832     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2833       if (size != sizeof (struct SetKeyMessage))
2834         {
2835           GNUNET_break_op (0);
2836           return;
2837         }
2838       handle_set_key (n, (const struct SetKeyMessage *) message);
2839       break;
2840     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2841       if (size < sizeof (struct EncryptedMessage) +
2842           sizeof (struct GNUNET_MessageHeader))
2843         {
2844           GNUNET_break_op (0);
2845           return;
2846         }
2847       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2848           (n->status != PEER_STATE_KEY_CONFIRMED))
2849         {
2850           GNUNET_break_op (0);
2851           return;
2852         }
2853       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2854       break;
2855     case GNUNET_MESSAGE_TYPE_CORE_PING:
2856       if (size != sizeof (struct PingMessage))
2857         {
2858           GNUNET_break_op (0);
2859           return;
2860         }
2861       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2862           (n->status != PEER_STATE_KEY_CONFIRMED))
2863         {
2864 #if DEBUG_CORE
2865           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2866                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2867                       "PING", GNUNET_i2s (&n->peer));
2868 #endif
2869           GNUNET_free_non_null (n->pending_ping);
2870           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2871           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2872           return;
2873         }
2874       handle_ping (n, (const struct PingMessage *) message);
2875       break;
2876     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2877       if (size != sizeof (struct PingMessage))
2878         {
2879           GNUNET_break_op (0);
2880           return;
2881         }
2882       if ((n->status != PEER_STATE_KEY_SENT) &&
2883           (n->status != PEER_STATE_KEY_RECEIVED) &&
2884           (n->status != PEER_STATE_KEY_CONFIRMED))
2885         {
2886           /* could not decrypt pong, oops! */
2887           GNUNET_break_op (0);
2888           return;
2889         }
2890       handle_pong (n, (const struct PingMessage *) message);
2891       break;
2892     default:
2893       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2894                   _("Unsupported message of type %u received.\n"), type);
2895       return;
2896     }
2897   if (n->status == PEER_STATE_KEY_CONFIRMED)
2898     {
2899       now = GNUNET_TIME_absolute_get ();
2900       n->last_activity = now;
2901       if (!up)
2902         n->time_established = now;
2903     }
2904 }
2905
2906
2907 /**
2908  * Function that recalculates the bandwidth quota for the
2909  * given neighbour and transmits it to the transport service.
2910  * 
2911  * @param cls neighbour for the quota update
2912  * @param tc context
2913  */
2914 static void
2915 neighbour_quota_update (void *cls,
2916                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2917
2918
2919 /**
2920  * Schedule the task that will recalculate the bandwidth
2921  * quota for this peer (and possibly force a disconnect of
2922  * idle peers by calculating a bandwidth of zero).
2923  */
2924 static void
2925 schedule_quota_update (struct Neighbour *n)
2926 {
2927   GNUNET_assert (n->quota_update_task ==
2928                  GNUNET_SCHEDULER_NO_TASK);
2929   n->quota_update_task
2930     = GNUNET_SCHEDULER_add_delayed (sched,
2931                                     QUOTA_UPDATE_FREQUENCY,
2932                                     &neighbour_quota_update,
2933                                     n);
2934 }
2935
2936
2937 /**
2938  * Function that recalculates the bandwidth quota for the
2939  * given neighbour and transmits it to the transport service.
2940  * 
2941  * @param cls neighbour for the quota update
2942  * @param tc context
2943  */
2944 static void
2945 neighbour_quota_update (void *cls,
2946                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2947 {
2948   struct Neighbour *n = cls;
2949   uint32_t q_in;
2950   double pref_rel;
2951   double share;
2952   unsigned long long distributable;
2953   
2954   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2955   /* calculate relative preference among all neighbours;
2956      divides by a bit more to avoid division by zero AND to
2957      account for possibility of new neighbours joining any time 
2958      AND to convert to double... */
2959   pref_rel = n->current_preference / (1.0 + preference_sum);
2960   distributable = 0;
2961   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2962     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2963   share = distributable * pref_rel;
2964   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2965   /* check if we want to disconnect for good due to inactivity */
2966   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2967        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2968     q_in = 0; /* force disconnect */
2969   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2970        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2971     {
2972       n->bpm_in = q_in;
2973       GNUNET_TRANSPORT_set_quota (transport,
2974                                   &n->peer,
2975                                   n->bpm_in, 
2976                                   n->bpm_out,
2977                                   GNUNET_TIME_UNIT_FOREVER_REL,
2978                                   NULL, NULL);
2979     }
2980   schedule_quota_update (n);
2981 }
2982
2983
2984 /**
2985  * Function called by transport to notify us that
2986  * a peer connected to us (on the network level).
2987  *
2988  * @param cls closure
2989  * @param peer the peer that connected
2990  * @param latency current latency of the connection
2991  */
2992 static void
2993 handle_transport_notify_connect (void *cls,
2994                                  const struct GNUNET_PeerIdentity *peer,
2995                                  struct GNUNET_TIME_Relative latency)
2996 {
2997   struct Neighbour *n;
2998   struct GNUNET_TIME_Absolute now;
2999   struct ConnectNotifyMessage cnm;
3000
3001   n = find_neighbour (peer);
3002   if (n != NULL)
3003     {
3004       /* duplicate connect notification!? */
3005       GNUNET_break (0);
3006       return;
3007     }
3008   now = GNUNET_TIME_absolute_get ();
3009   n = GNUNET_malloc (sizeof (struct Neighbour));
3010   n->next = neighbours;
3011   neighbours = n;
3012   neighbour_count++;
3013   n->peer = *peer;
3014   n->last_latency = latency;
3015   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
3016   n->encrypt_key_created = now;
3017   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
3018   n->last_asw_update = now;
3019   n->last_arw_update = now;
3020   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3021   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3022   n->bpm_out_internal_limit = (uint32_t) - 1;
3023   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3024   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3025                                                 (uint32_t) - 1);
3026 #if DEBUG_CORE
3027   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3028               "Received connection from `%4s'.\n",
3029               GNUNET_i2s (&n->peer));
3030 #endif
3031   schedule_quota_update (n);
3032   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3033   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
3034   cnm.reserved = htonl (0);
3035   cnm.peer = *peer;
3036   send_to_all_clients (&cnm.header, GNUNET_YES);
3037 }
3038
3039
3040 /**
3041  * Free the given entry for the neighbour (it has
3042  * already been removed from the list at this point).
3043  *
3044  * @param n neighbour to free
3045  */
3046 static void
3047 free_neighbour (struct Neighbour *n)
3048 {
3049   struct MessageEntry *m;
3050
3051   if (n->pitr != NULL)
3052     {
3053       GNUNET_PEERINFO_iterate_cancel (n->pitr);
3054       n->pitr = NULL;
3055     }
3056   if (n->skm != NULL)
3057     {
3058       GNUNET_free (n->skm);
3059       n->skm = NULL;
3060     }
3061   while (NULL != (m = n->messages))
3062     {
3063       n->messages = m->next;
3064       GNUNET_free (m);
3065     }
3066   while (NULL != (m = n->encrypted_head))
3067     {
3068       n->encrypted_head = m->next;
3069       GNUNET_free (m);
3070     }
3071   if (NULL != n->th)
3072     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3073   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3074     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3075   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3076     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3077   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3078     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3079   GNUNET_free_non_null (n->public_key);
3080   GNUNET_free_non_null (n->pending_ping);
3081   GNUNET_free (n);
3082 }
3083
3084
3085 /**
3086  * Function called by transport telling us that a peer
3087  * disconnected.
3088  *
3089  * @param cls closure
3090  * @param peer the peer that disconnected
3091  */
3092 static void
3093 handle_transport_notify_disconnect (void *cls,
3094                                     const struct GNUNET_PeerIdentity *peer)
3095 {
3096   struct ConnectNotifyMessage cnm;
3097   struct Neighbour *n;
3098   struct Neighbour *p;
3099
3100 #if DEBUG_CORE
3101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3102               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3103 #endif
3104   p = NULL;
3105   n = neighbours;
3106   while ((n != NULL) &&
3107          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3108     {
3109       p = n;
3110       n = n->next;
3111     }
3112   if (n == NULL)
3113     {
3114       GNUNET_break (0);
3115       return;
3116     }
3117   if (p == NULL)
3118     neighbours = n->next;
3119   else
3120     p->next = n->next;
3121   GNUNET_assert (neighbour_count > 0);
3122   neighbour_count--;
3123   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3124   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3125   cnm.reserved = htonl (0);
3126   cnm.peer = *peer;
3127   send_to_all_clients (&cnm.header, GNUNET_YES);
3128   free_neighbour (n);
3129 }
3130
3131
3132 /**
3133  * Last task run during shutdown.  Disconnects us from
3134  * the transport.
3135  */
3136 static void
3137 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3138 {
3139   struct Neighbour *n;
3140   struct Client *c;
3141
3142 #if DEBUG_CORE
3143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3144               "Core service shutting down.\n");
3145 #endif
3146   GNUNET_assert (transport != NULL);
3147   GNUNET_TRANSPORT_disconnect (transport);
3148   transport = NULL;
3149   while (NULL != (n = neighbours))
3150     {
3151       neighbours = n->next;
3152       GNUNET_assert (neighbour_count > 0);
3153       neighbour_count--;
3154       free_neighbour (n);
3155     }
3156   while (NULL != (c = clients))
3157     handle_client_disconnect (NULL, c->client_handle);
3158   if (my_private_key != NULL)
3159     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3160 }
3161
3162
3163 /**
3164  * Initiate core service.
3165  *
3166  * @param cls closure
3167  * @param s scheduler to use
3168  * @param serv the initialized server
3169  * @param c configuration to use
3170  */
3171 static void
3172 run (void *cls,
3173      struct GNUNET_SCHEDULER_Handle *s,
3174      struct GNUNET_SERVER_Handle *serv,
3175      const struct GNUNET_CONFIGURATION_Handle *c)
3176 {
3177 #if 0
3178   unsigned long long qin;
3179   unsigned long long qout;
3180   unsigned long long tneigh;
3181 #endif
3182   char *keyfile;
3183
3184   sched = s;
3185   cfg = c;
3186   /* parse configuration */
3187   if (
3188        (GNUNET_OK !=
3189         GNUNET_CONFIGURATION_get_value_number (c,
3190                                                "CORE",
3191                                                "TOTAL_QUOTA_IN",
3192                                                &bandwidth_target_in)) ||
3193        (GNUNET_OK !=
3194         GNUNET_CONFIGURATION_get_value_number (c,
3195                                                "CORE",
3196                                                "TOTAL_QUOTA_OUT",
3197                                                &bandwidth_target_out)) ||
3198 #if 0
3199        (GNUNET_OK !=
3200         GNUNET_CONFIGURATION_get_value_number (c,
3201                                                "CORE",
3202                                                "YY",
3203                                                &qout)) ||
3204        (GNUNET_OK !=
3205         GNUNET_CONFIGURATION_get_value_number (c,
3206                                                "CORE",
3207                                                "ZZ_LIMIT", &tneigh)) ||
3208 #endif
3209        (GNUNET_OK !=
3210         GNUNET_CONFIGURATION_get_value_filename (c,
3211                                                  "GNUNETD",
3212                                                  "HOSTKEY", &keyfile)))
3213     {
3214       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3215                   _
3216                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3217       GNUNET_SCHEDULER_shutdown (s);
3218       return;
3219     }
3220   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3221   GNUNET_free (keyfile);
3222   if (my_private_key == NULL)
3223     {
3224       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3225                   _("Core service could not access hostkey.  Exiting.\n"));
3226       GNUNET_SCHEDULER_shutdown (s);
3227       return;
3228     }
3229   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3230   GNUNET_CRYPTO_hash (&my_public_key,
3231                       sizeof (my_public_key), &my_identity.hashPubKey);
3232   /* setup notification */
3233   server = serv;
3234   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3235   /* setup transport connection */
3236   transport = GNUNET_TRANSPORT_connect (sched,
3237                                         cfg,
3238                                         NULL,
3239                                         &handle_transport_receive,
3240                                         &handle_transport_notify_connect,
3241                                         &handle_transport_notify_disconnect);
3242   GNUNET_assert (NULL != transport);
3243   GNUNET_SCHEDULER_add_delayed (sched,
3244                                 GNUNET_TIME_UNIT_FOREVER_REL,
3245                                 &cleaning_task, NULL);
3246   /* process client requests */
3247   GNUNET_SERVER_add_handlers (server, handlers);
3248   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3249               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3250 }
3251
3252
3253
3254 /**
3255  * The main function for the transport service.
3256  *
3257  * @param argc number of arguments from the command line
3258  * @param argv command line arguments
3259  * @return 0 ok, 1 on error
3260  */
3261 int
3262 main (int argc, char *const *argv)
3263 {
3264   return (GNUNET_OK ==
3265           GNUNET_SERVICE_run (argc,
3266                               argv,
3267                               "core",
3268                               GNUNET_SERVICE_OPTION_NONE,
3269                               &run, NULL)) ? 0 : 1;
3270 }
3271
3272 /* end of gnunet-service-core.c */