timeout code
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * POST-TESTING:
27  * - revisit API (which arguments are used, needed)?
28  * - add code to send PINGs if we are about to time-out otherwise (?)
29  * ? add heuristic to do another send_key in "handle_set_key"
30  *   in case previous attempt failed / didn't work / persist
31  *   (but don't do it always to avoid storm of SET_KEY's going
32  *   back and forth!) --- alternatively, add "status" field
33  *   of the other peer to the set key message, that way we'd
34  *   know for sure!
35  * - check that hostkey used by transport (for HELLOs) is the
36  *   same as the hostkey that we are using!
37  * - topology management:
38  *   + bootstrapping (transport offer hello, plugins)
39  *   + internal neighbour selection
40  *   + update bandwidth usage statistics
41  *   + bandwidth allocation (transport set quota)
42  * - optimize lookup (many O(n) list traversals
43  *   could ideally be changed to O(1) hash map lookups)
44  */
45 #include "platform.h"
46 #include "gnunet_constants.h"
47 #include "gnunet_util_lib.h"
48 #include "gnunet_hello_lib.h"
49 #include "gnunet_peerinfo_service.h"
50 #include "gnunet_protocols.h"
51 #include "gnunet_signatures.h"
52 #include "gnunet_transport_service.h"
53 #include "core.h"
54
55
56 /**
57  * Receive and send buffer windows grow over time.  For
58  * how long can 'unused' bandwidth accumulate before we
59  * need to cap it?  (specified in ms).
60  */
61 #define MAX_WINDOW_TIME (5 * 60 * 1000)
62
63 /**
64  * Minimum of bytes per minute (out) to assign to any connected peer.
65  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
66  * sense.
67  */
68 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
69
70 /**
71  * What is the smallest change (in number of bytes per minute)
72  * that we consider significant enough to bother triggering?
73  */
74 #define MIN_BPM_CHANGE 32
75
76 /**
77  * After how much time past the "official" expiration time do
78  * we discard messages?  Should not be zero since we may 
79  * intentionally defer transmission until close to the deadline
80  * and then may be slightly past the deadline due to inaccuracy
81  * in sleep and our own CPU consumption.
82  */
83 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
84
85 /**
86  * What is the maximum delay for a SET_KEY message?
87  */
88 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
89
90 /**
91  * What how long do we wait for SET_KEY confirmation initially?
92  */
93 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
94
95 /**
96  * What is the maximum delay for a PING message?
97  */
98 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
99
100 /**
101  * What is the maximum delay for a PONG message?
102  */
103 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
104
105 /**
106  * How often do we recalculate bandwidth quotas?
107  */
108 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
109
110 /**
111  * What is the priority for a SET_KEY message?
112  */
113 #define SET_KEY_PRIORITY 0xFFFFFF
114
115 /**
116  * What is the priority for a PING message?
117  */
118 #define PING_PRIORITY 0xFFFFFF
119
120 /**
121  * What is the priority for a PONG message?
122  */
123 #define PONG_PRIORITY 0xFFFFFF
124
125 /**
126  * How many messages do we queue per peer at most?
127  */
128 #define MAX_PEER_QUEUE_SIZE 16
129
130 /**
131  * How many non-mandatory messages do we queue per client at most?
132  */
133 #define MAX_CLIENT_QUEUE_SIZE 32
134
135 /**
136  * What is the maximum age of a message for us to consider
137  * processing it?  Note that this looks at the timestamp used
138  * by the other peer, so clock skew between machines does
139  * come into play here.  So this should be picked high enough
140  * so that a little bit of clock skew does not prevent peers
141  * from connecting to us.
142  */
143 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
144
145 /**
146  * What is the maximum size for encrypted messages?  Note that this
147  * number imposes a clear limit on the maximum size of any message.
148  * Set to a value close to 64k but not so close that transports will
149  * have trouble with their headers.
150  */
151 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
152
153
154 /**
155  * State machine for our P2P encryption handshake.  Everyone starts in
156  * "DOWN", if we receive the other peer's key (other peer initiated)
157  * we start in state RECEIVED (since we will immediately send our
158  * own); otherwise we start in SENT.  If we get back a PONG from
159  * within either state, we move up to CONFIRMED (the PONG will always
160  * be sent back encrypted with the key we sent to the other peer).
161  */
162 enum PeerStateMachine
163 {
164   PEER_STATE_DOWN,
165   PEER_STATE_KEY_SENT,
166   PEER_STATE_KEY_RECEIVED,
167   PEER_STATE_KEY_CONFIRMED
168 };
169
170
171 /**
172  * Number of bytes (at the beginning) of "struct EncryptedMessage"
173  * that are NOT encrypted.
174  */
175 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
176
177
178 /**
179  * Encapsulation for encrypted messages exchanged between
180  * peers.  Followed by the actual encrypted data.
181  */
182 struct EncryptedMessage
183 {
184   /**
185    * Message type is either CORE_ENCRYPTED_MESSAGE.
186    */
187   struct GNUNET_MessageHeader header;
188
189   /**
190    * Always zero.
191    */
192   uint32_t reserved GNUNET_PACKED;
193
194   /**
195    * Hash of the plaintext, used to verify message integrity;
196    * ALSO used as the IV for the symmetric cipher!  Everything
197    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
198    * must be set to the offset of the next field.
199    */
200   GNUNET_HashCode plaintext_hash;
201
202   /**
203    * Sequence number, in network byte order.  This field
204    * must be the first encrypted/decrypted field and the
205    * first byte that is hashed for the plaintext hash.
206    */
207   uint32_t sequence_number GNUNET_PACKED;
208
209   /**
210    * Desired bandwidth (how much we should send to this
211    * peer / how much is the sender willing to receive),
212    * in bytes per minute.
213    */
214   uint32_t inbound_bpm_limit GNUNET_PACKED;
215
216   /**
217    * Timestamp.  Used to prevent reply of ancient messages
218    * (recent messages are caught with the sequence number).
219    */
220   struct GNUNET_TIME_AbsoluteNBO timestamp;
221
222 };
223
224 /**
225  * We're sending an (encrypted) PING to the other peer to check if he
226  * can decrypt.  The other peer should respond with a PONG with the
227  * same content, except this time encrypted with the receiver's key.
228  */
229 struct PingMessage
230 {
231   /**
232    * Message type is either CORE_PING or CORE_PONG.
233    */
234   struct GNUNET_MessageHeader header;
235
236   /**
237    * Random number chosen to make reply harder.
238    */
239   uint32_t challenge GNUNET_PACKED;
240
241   /**
242    * Intended target of the PING, used primarily to check
243    * that decryption actually worked.
244    */
245   struct GNUNET_PeerIdentity target;
246 };
247
248
249 /**
250  * Message transmitted to set (or update) a session key.
251  */
252 struct SetKeyMessage
253 {
254
255   /**
256    * Message type is either CORE_SET_KEY.
257    */
258   struct GNUNET_MessageHeader header;
259
260   /**
261    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
262    */
263   int32_t sender_status GNUNET_PACKED;
264
265   /**
266    * Purpose of the signature, will be
267    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
268    */
269   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
270
271   /**
272    * At what time was this key created?
273    */
274   struct GNUNET_TIME_AbsoluteNBO creation_time;
275
276   /**
277    * The encrypted session key.
278    */
279   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
280
281   /**
282    * Who is the intended recipient?
283    */
284   struct GNUNET_PeerIdentity target;
285
286   /**
287    * Signature of the stuff above (starting at purpose).
288    */
289   struct GNUNET_CRYPTO_RsaSignature signature;
290
291 };
292
293
294 /**
295  * Message waiting for transmission. This struct
296  * is followed by the actual content of the message.
297  */
298 struct MessageEntry
299 {
300
301   /**
302    * We keep messages in a linked list (for now).
303    */
304   struct MessageEntry *next;
305
306   /**
307    * By when are we supposed to transmit this message?
308    */
309   struct GNUNET_TIME_Absolute deadline;
310
311   /**
312    * How important is this message to us?
313    */
314   unsigned int priority;
315
316   /**
317    * How long is the message? (number of bytes following
318    * the "struct MessageEntry", but not including the
319    * size of "struct MessageEntry" itself!)
320    */
321   uint16_t size;
322
323   /**
324    * Was this message selected for transmission in the
325    * current round? GNUNET_YES or GNUNET_NO.
326    */
327   int16_t do_transmit;
328
329 };
330
331
332 struct Neighbour
333 {
334   /**
335    * We keep neighbours in a linked list (for now).
336    */
337   struct Neighbour *next;
338
339   /**
340    * Unencrypted messages destined for this peer.
341    */
342   struct MessageEntry *messages;
343
344   /**
345    * Head of the batched, encrypted message queue (already ordered,
346    * transmit starting with the head).
347    */
348   struct MessageEntry *encrypted_head;
349
350   /**
351    * Tail of the batched, encrypted message queue (already ordered,
352    * append new messages to tail)
353    */
354   struct MessageEntry *encrypted_tail;
355
356   /**
357    * Handle for pending requests for transmission to this peer
358    * with the transport service.  NULL if no request is pending.
359    */
360   struct GNUNET_TRANSPORT_TransmitHandle *th;
361
362   /**
363    * Public key of the neighbour, NULL if we don't have it yet.
364    */
365   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
366
367   /**
368    * We received a PING message before we got the "public_key"
369    * (or the SET_KEY).  We keep it here until we have a key
370    * to decrypt it.  NULL if no PING is pending.
371    */
372   struct PingMessage *pending_ping;
373
374   /**
375    * Identity of the neighbour.
376    */
377   struct GNUNET_PeerIdentity peer;
378
379   /**
380    * Key we use to encrypt our messages for the other peer
381    * (initialized by us when we do the handshake).
382    */
383   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
384
385   /**
386    * Key we use to decrypt messages from the other peer
387    * (given to us by the other peer during the handshake).
388    */
389   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
390
391   /**
392    * ID of task used for re-trying plaintext scheduling.
393    */
394   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
395
396   /**
397    * ID of task used for re-trying SET_KEY and PING message.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
400
401   /**
402    * ID of task used for updating bandwidth quota for this neighbour.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
405
406   /**
407    * At what time did we generate our encryption key?
408    */
409   struct GNUNET_TIME_Absolute encrypt_key_created;
410
411   /**
412    * At what time did the other peer generate the decryption key?
413    */
414   struct GNUNET_TIME_Absolute decrypt_key_created;
415
416   /**
417    * At what time did we initially establish (as in, complete session
418    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
419    */
420   struct GNUNET_TIME_Absolute time_established;
421
422   /**
423    * At what time did we last receive an encrypted message from the
424    * other peer?  Should be zero if status != KEY_CONFIRMED.
425    */
426   struct GNUNET_TIME_Absolute last_activity;
427
428   /**
429    * Last latency observed from this peer.
430    */
431   struct GNUNET_TIME_Relative last_latency;
432
433   /**
434    * At what frequency are we currently re-trying SET_KEY messages?
435    */
436   struct GNUNET_TIME_Relative set_key_retry_frequency;
437
438   /**
439    * Time of our last update to the "available_send_window".
440    */
441   struct GNUNET_TIME_Absolute last_asw_update;
442
443   /**
444    * Time of our last update to the "available_recv_window".
445    */
446   struct GNUNET_TIME_Absolute last_arw_update;
447
448   /**
449    * Number of bytes that we are eligible to transmit to this
450    * peer at this point.  Incremented every minute by max_out_bpm,
451    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
452    * bandwidth-hogs are sampled at a frequency of about 78s!);
453    * may get negative if we have VERY high priority content.
454    */
455   long long available_send_window;
456
457   /**
458    * How much downstream capacity of this peer has been reserved for
459    * our traffic?  (Our clients can request that a certain amount of
460    * bandwidth is available for replies to them; this value is used to
461    * make sure that this reserved amount of bandwidth is actually
462    * available).
463    */
464   long long available_recv_window;
465
466   /**
467    * How valueable were the messages of this peer recently?
468    */
469   unsigned long long current_preference;
470
471   /**
472    * Bit map indicating which of the 32 sequence numbers before the last
473    * were received (good for accepting out-of-order packets and
474    * estimating reliability of the connection)
475    */
476   unsigned int last_packets_bitmap;
477
478   /**
479    * Number of messages in the message queue for this peer.
480    */
481   unsigned int message_queue_size;
482
483   /**
484    * last sequence number received on this connection (highest)
485    */
486   uint32_t last_sequence_number_received;
487
488   /**
489    * last sequence number transmitted
490    */
491   uint32_t last_sequence_number_sent;
492
493   /**
494    * Available bandwidth in for this peer (current target).
495    */
496   uint32_t bpm_in;
497
498   /**
499    * Available bandwidth out for this peer (current target).
500    */
501   uint32_t bpm_out;
502
503   /**
504    * Internal bandwidth limit set for this peer (initially
505    * typically set to "-1").  "bpm_out" is MAX of
506    * "bpm_out_internal_limit" and "bpm_out_external_limit".
507    */
508   uint32_t bpm_out_internal_limit;
509
510   /**
511    * External bandwidth limit set for this peer by the
512    * peer that we are communicating with.  "bpm_out" is MAX of
513    * "bpm_out_internal_limit" and "bpm_out_external_limit".
514    */
515   uint32_t bpm_out_external_limit;
516
517   /**
518    * What was our PING challenge number?
519    */
520   uint32_t ping_challenge;
521
522   /**
523    * What is our connection status?
524    */
525   enum PeerStateMachine status;
526
527 };
528
529
530 /**
531  * Events are messages for clients.  The struct
532  * itself is followed by the actual message.
533  */
534 struct Event
535 {
536   /**
537    * This is a linked list.
538    */
539   struct Event *next;
540
541   /**
542    * Size of the message.
543    */
544   size_t size;
545
546   /**
547    * Could this event be dropped if this queue
548    * is getting too large? (NOT YET USED!)
549    */
550   int can_drop;
551
552 };
553
554
555 /**
556  * Data structure for each client connected to the core service.
557  */
558 struct Client
559 {
560   /**
561    * Clients are kept in a linked list.
562    */
563   struct Client *next;
564
565   /**
566    * Handle for the client with the server API.
567    */
568   struct GNUNET_SERVER_Client *client_handle;
569
570   /**
571    * Linked list of messages we still need to deliver to
572    * this client.
573    */
574   struct Event *event_head;
575
576   /**
577    * Tail of the linked list of events.
578    */
579   struct Event *event_tail;
580
581   /**
582    * Current transmit handle, NULL if no transmission request
583    * is pending.
584    */
585   struct GNUNET_NETWORK_TransmitHandle *th;
586
587   /**
588    * Array of the types of messages this peer cares
589    * about (with "tcnt" entries).  Allocated as part
590    * of this client struct, do not free!
591    */
592   uint16_t *types;
593
594   /**
595    * Options for messages this client cares about,
596    * see GNUNET_CORE_OPTION_ values.
597    */
598   uint32_t options;
599
600   /**
601    * Number of types of incoming messages this client
602    * specifically cares about.  Size of the "types" array.
603    */
604   unsigned int tcnt;
605
606 };
607
608
609 /**
610  * Our public key.
611  */
612 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
613
614 /**
615  * Our identity.
616  */
617 static struct GNUNET_PeerIdentity my_identity;
618
619 /**
620  * Our private key.
621  */
622 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
623
624 /**
625  * Our scheduler.
626  */
627 struct GNUNET_SCHEDULER_Handle *sched;
628
629 /**
630  * Our configuration.
631  */
632 struct GNUNET_CONFIGURATION_Handle *cfg;
633
634 /**
635  * Our server.
636  */
637 static struct GNUNET_SERVER_Handle *server;
638
639 /**
640  * Transport service.
641  */
642 static struct GNUNET_TRANSPORT_Handle *transport;
643
644 /**
645  * Linked list of our clients.
646  */
647 static struct Client *clients;
648
649 /**
650  * We keep neighbours in a linked list (for now).
651  */
652 static struct Neighbour *neighbours;
653
654 /**
655  * Sum of all preferences among all neighbours.
656  */
657 static unsigned long long preference_sum;
658
659 /**
660  * Total number of neighbours we have.
661  */
662 static unsigned int neighbour_count;
663
664 /**
665  * How much inbound bandwidth are we supposed to be using?
666  */
667 static unsigned long long bandwidth_target_in;
668
669 /**
670  * How much outbound bandwidth are we supposed to be using?
671  */
672 static unsigned long long bandwidth_target_out;
673
674
675
676 /**
677  * A preference value for a neighbour was update.  Update
678  * the preference sum accordingly.
679  *
680  * @param inc how much was a preference value increased?
681  */
682 static void
683 update_preference_sum (unsigned long long inc)
684 {
685   struct Neighbour *n;
686   unsigned long long os;
687
688   os = preference_sum;
689   preference_sum += inc;
690   if (preference_sum >= os)
691     return; /* done! */
692   /* overflow! compensate by cutting all values in half! */
693   preference_sum = 0;
694   n = neighbours;
695   while (n != NULL)
696     {
697       n->current_preference /= 2;
698       preference_sum += n->current_preference;
699       n = n->next;
700     }    
701 }
702
703
704 /**
705  * Recalculate the number of bytes we expect to
706  * receive or transmit in a given window.
707  *
708  * @param window pointer to the byte counter (updated)
709  * @param ts pointer to the timestamp (updated)
710  * @param bpm number of bytes per minute that should
711  *        be added to the window.
712  */
713 static void
714 update_window (long long *window,
715                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
716 {
717   struct GNUNET_TIME_Relative since;
718
719   since = GNUNET_TIME_absolute_get_duration (*ts);
720   if (since.value < 60 * 1000)
721     return;                     /* not even a minute has passed */
722   *ts = GNUNET_TIME_absolute_get ();
723   *window += (bpm * since.value) / 60 / 1000;
724   if (*window > MAX_WINDOW_TIME * bpm)
725     *window = MAX_WINDOW_TIME * bpm;
726 }
727
728
729 /**
730  * Find the entry for the given neighbour.
731  *
732  * @param peer identity of the neighbour
733  * @return NULL if we are not connected, otherwise the
734  *         neighbour's entry.
735  */
736 static struct Neighbour *
737 find_neighbour (const struct GNUNET_PeerIdentity *peer)
738 {
739   struct Neighbour *ret;
740
741   ret = neighbours;
742   while ((ret != NULL) &&
743          (0 != memcmp (&ret->peer,
744                        peer, sizeof (struct GNUNET_PeerIdentity))))
745     ret = ret->next;
746   return ret;
747 }
748
749
750 /**
751  * Find the entry for the given client.
752  *
753  * @param client handle for the client
754  * @return NULL if we are not connected, otherwise the
755  *         client's struct.
756  */
757 static struct Client *
758 find_client (const struct GNUNET_SERVER_Client *client)
759 {
760   struct Client *ret;
761
762   ret = clients;
763   while ((ret != NULL) && (client != ret->client_handle))
764     ret = ret->next;
765   return ret;
766 }
767
768
769 /**
770  * If necessary, initiate a request with the server to
771  * transmit messages from the queue of the given client.
772  * @param client who to transfer messages to
773  */
774 static void request_transmit (struct Client *client);
775
776
777 /**
778  * Client is ready to receive data, provide it.
779  *
780  * @param cls closure
781  * @param size number of bytes available in buf
782  * @param buf where the callee should write the message
783  * @return number of bytes written to buf
784  */
785 static size_t
786 do_client_transmit (void *cls, size_t size, void *buf)
787 {
788   struct Client *client = cls;
789   struct Event *e;
790   char *tgt;
791   size_t ret;
792
793   client->th = NULL;
794 #if DEBUG_CORE_CLIENT
795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796               "Client ready to receive %u bytes.\n", size);
797 #endif
798   if (buf == NULL)
799     {
800 #if DEBUG_CORE
801       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
802                   "Failed to transmit data to client (disconnect)?\n");
803 #endif
804       return 0;                 /* we'll surely get a disconnect soon... */
805     }
806   tgt = buf;
807   ret = 0;
808   while ((NULL != (e = client->event_head)) && (e->size <= size))
809     {
810       memcpy (&tgt[ret], &e[1], e->size);
811       size -= e->size;
812       ret += e->size;
813       client->event_head = e->next;
814       GNUNET_free (e);
815     }
816   GNUNET_assert (ret > 0);
817   if (client->event_head == NULL)
818     client->event_tail = NULL;
819 #if DEBUG_CORE_CLIENT
820   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821               "Transmitting %u bytes to client\n", ret);
822 #endif
823   request_transmit (client);
824   return ret;
825 }
826
827
828 /**
829  * If necessary, initiate a request with the server to
830  * transmit messages from the queue of the given client.
831  * @param client who to transfer messages to
832  */
833 static void
834 request_transmit (struct Client *client)
835 {
836
837   if (NULL != client->th)
838     return;                     /* already pending */
839   if (NULL == client->event_head)
840     return;                     /* no more events pending */
841 #if DEBUG_CORE_CLIENT
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843               "Asking server to transmit %u bytes to client\n",
844               client->event_head->size);
845 #endif
846   client->th
847     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
848                                            client->event_head->size,
849                                            GNUNET_TIME_UNIT_FOREVER_REL,
850                                            &do_client_transmit, client);
851 }
852
853
854 /**
855  * Send a message to one of our clients.
856  * @param client target for the message
857  * @param msg message to transmit
858  * @param can_drop could this message be dropped if the
859  *        client's queue is getting too large?
860  */
861 static void
862 send_to_client (struct Client *client,
863                 const struct GNUNET_MessageHeader *msg, int can_drop)
864 {
865   struct Event *e;
866   unsigned int queue_size;
867   uint16_t msize;
868
869 #if DEBUG_CORE_CLIENT
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Preparing to send message of type %u to client.\n",
872               ntohs (msg->type));
873 #endif
874   queue_size = 0;
875   e = client->event_head;
876   while (e != NULL)
877     {
878       queue_size++;
879       e = e->next;
880     }
881   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
882        (can_drop == GNUNET_YES) )
883     {
884 #if DEBUG_CORE
885       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
886                   "Too many messages in queue for the client, dropping the new message.\n");
887 #endif
888       return;
889     }
890
891   msize = ntohs (msg->size);
892   e = GNUNET_malloc (sizeof (struct Event) + msize);
893   /* append */
894   if (client->event_tail != NULL)
895     client->event_tail->next = e;
896   else
897     client->event_head = e;
898   client->event_tail = e;
899   e->can_drop = can_drop;
900   e->size = msize;
901   memcpy (&e[1], msg, msize);
902   request_transmit (client);
903 }
904
905
906 /**
907  * Send a message to all of our current clients.
908  */
909 static void
910 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
911 {
912   struct Client *c;
913
914   c = clients;
915   while (c != NULL)
916     {
917       send_to_client (c, msg, can_drop);
918       c = c->next;
919     }
920 }
921
922
923 /**
924  * Handle CORE_INIT request.
925  */
926 static void
927 handle_client_init (void *cls,
928                     struct GNUNET_SERVER_Client *client,
929                     const struct GNUNET_MessageHeader *message)
930 {
931   const struct InitMessage *im;
932   struct InitReplyMessage irm;
933   struct Client *c;
934   uint16_t msize;
935   const uint16_t *types;
936   struct Neighbour *n;
937   struct ConnectNotifyMessage cnm;
938
939 #if DEBUG_CORE_CLIENT
940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941               "Client connecting to core service with `%s' message\n",
942               "INIT");
943 #endif
944   /* check that we don't have an entry already */
945   c = clients;
946   while (c != NULL)
947     {
948       if (client == c->client_handle)
949         {
950           GNUNET_break (0);
951           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
952           return;
953         }
954       c = c->next;
955     }
956   msize = ntohs (message->size);
957   if (msize < sizeof (struct InitMessage))
958     {
959       GNUNET_break (0);
960       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
961       return;
962     }
963   im = (const struct InitMessage *) message;
964   types = (const uint16_t *) &im[1];
965   msize -= sizeof (struct InitMessage);
966   c = GNUNET_malloc (sizeof (struct Client) + msize);
967   c->client_handle = client;
968   c->next = clients;
969   clients = c;
970   memcpy (&c[1], types, msize);
971   c->types = (uint16_t *) & c[1];
972   c->options = ntohl (im->options);
973   c->tcnt = msize / sizeof (uint16_t);
974   /* send init reply message */
975   irm.header.size = htons (sizeof (struct InitReplyMessage));
976   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
977   irm.reserved = htonl (0);
978   memcpy (&irm.publicKey,
979           &my_public_key,
980           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
981 #if DEBUG_CORE_CLIENT
982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983               "Sending `%s' message to client.\n", "INIT_REPLY");
984 #endif
985   send_to_client (c, &irm.header, GNUNET_NO);
986   /* notify new client about existing neighbours */
987   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
988   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
989   n = neighbours;
990   while (n != NULL)
991     {
992 #if DEBUG_CORE_CLIENT
993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
995 #endif
996       cnm.bpm_available = htonl (n->bpm_out);
997       cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
998       cnm.peer = n->peer;
999       send_to_client (c, &cnm.header, GNUNET_NO);
1000       n = n->next;
1001     }
1002   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1003 }
1004
1005
1006 /**
1007  * A client disconnected, clean up.
1008  *
1009  * @param cls closure
1010  * @param client identification of the client
1011  */
1012 static void
1013 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1014 {
1015   struct Client *pos;
1016   struct Client *prev;
1017   struct Event *e;
1018
1019 #if DEBUG_CORE_CLIENT
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1021               "Client has disconnected from core service.\n");
1022 #endif
1023   prev = NULL;
1024   pos = clients;
1025   while (pos != NULL)
1026     {
1027       if (client == pos->client_handle)
1028         {
1029           if (prev == NULL)
1030             clients = pos->next;
1031           else
1032             prev->next = pos->next;
1033           if (pos->th != NULL)
1034             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
1035           while (NULL != (e = pos->event_head))
1036             {
1037               pos->event_head = e->next;
1038               GNUNET_free (e);
1039             }
1040           GNUNET_free (pos);
1041           return;
1042         }
1043       prev = pos;
1044       pos = pos->next;
1045     }
1046   /* client never sent INIT */
1047 }
1048
1049
1050 /**
1051  * Handle REQUEST_CONFIGURE request.
1052  */
1053 static void
1054 handle_client_request_configure (void *cls,
1055                                  struct GNUNET_SERVER_Client *client,
1056                                  const struct GNUNET_MessageHeader *message)
1057 {
1058   const struct RequestConfigureMessage *rcm;
1059   struct Neighbour *n;
1060   struct ConfigurationInfoMessage cim;
1061   struct Client *c;
1062   int reserv;
1063   unsigned long long old_preference;
1064
1065 #if DEBUG_CORE_CLIENT
1066   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1067               "Core service receives `%s' request.\n", "CONFIGURE");
1068 #endif
1069   rcm = (const struct RequestConfigureMessage *) message;
1070   n = find_neighbour (&rcm->peer);
1071   memset (&cim, 0, sizeof (cim));
1072   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1073     {
1074       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1075       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1076                                n->bpm_out_external_limit);
1077       reserv = ntohl (rcm->reserve_inbound);
1078       if (reserv < 0)
1079         {
1080           n->available_recv_window += reserv;
1081         }
1082       else if (reserv > 0)
1083         {
1084           update_window (&n->available_recv_window,
1085                          &n->last_arw_update, n->bpm_in);
1086           if (n->available_recv_window < reserv)
1087             reserv = n->available_recv_window;
1088           n->available_recv_window -= reserv;
1089         }
1090       old_preference = n->current_preference;
1091       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1092       if (old_preference > n->current_preference) 
1093         {
1094           /* overflow; cap at maximum value */
1095           n->current_preference = (unsigned long long) -1;
1096         }
1097       update_preference_sum (n->current_preference - old_preference);
1098       cim.reserved_amount = htonl (reserv);
1099       cim.bpm_in = htonl (n->bpm_in);
1100       cim.bpm_out = htonl (n->bpm_out);
1101       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1102       cim.preference = n->current_preference;
1103     }
1104   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1105   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1106   cim.peer = rcm->peer;
1107   c = find_client (client);
1108   if (c == NULL)
1109     {
1110       GNUNET_break (0);
1111       return;
1112     }
1113 #if DEBUG_CORE_CLIENT
1114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1116 #endif
1117   send_to_client (c, &cim.header, GNUNET_NO);
1118 }
1119
1120
1121 /**
1122  * Check if we have encrypted messages for the specified neighbour
1123  * pending, and if so, check with the transport about sending them
1124  * out.
1125  *
1126  * @param n neighbour to check.
1127  */
1128 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1129
1130
1131 /**
1132  * Function called when the transport service is ready to
1133  * receive an encrypted message for the respective peer
1134  *
1135  * @param cls neighbour to use message from
1136  * @param size number of bytes we can transmit
1137  * @param buf where to copy the message
1138  * @return number of bytes transmitted
1139  */
1140 static size_t
1141 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1142 {
1143   struct Neighbour *n = cls;
1144   struct MessageEntry *m;
1145   size_t ret;
1146   char *cbuf;
1147
1148   n->th = NULL;
1149   GNUNET_assert (NULL != (m = n->encrypted_head));
1150   n->encrypted_head = m->next;
1151   if (m->next == NULL)
1152     n->encrypted_tail = NULL;
1153   ret = 0;
1154   cbuf = buf;
1155   if (buf != NULL)
1156     {
1157       GNUNET_assert (size >= m->size);
1158       memcpy (cbuf, &m[1], m->size);
1159       ret = m->size;
1160       process_encrypted_neighbour_queue (n);
1161 #if DEBUG_CORE
1162       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1164                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1165                   ret, GNUNET_i2s (&n->peer));
1166 #endif
1167     }
1168   else
1169     {
1170       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171                   "Transmission for message of type %u and size %u failed\n",
1172                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1173                   m->size);
1174     }
1175   GNUNET_free (m);
1176   return ret;
1177 }
1178
1179
1180 /**
1181  * Check if we have plaintext messages for the specified neighbour
1182  * pending, and if so, consider batching and encrypting them (and
1183  * then trigger processing of the encrypted queue if needed).
1184  *
1185  * @param n neighbour to check.
1186  */
1187 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1188
1189
1190 /**
1191  * Check if we have encrypted messages for the specified neighbour
1192  * pending, and if so, check with the transport about sending them
1193  * out.
1194  *
1195  * @param n neighbour to check.
1196  */
1197 static void
1198 process_encrypted_neighbour_queue (struct Neighbour *n)
1199 {
1200   struct MessageEntry *m;
1201  
1202   if (n->th != NULL)
1203     return;  /* request already pending */
1204   if (n->encrypted_head == NULL)
1205     {
1206       /* encrypted queue empty, try plaintext instead */
1207       process_plaintext_neighbour_queue (n);
1208       return;
1209     }
1210 #if DEBUG_CORE
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1213               n->encrypted_head->size,
1214               GNUNET_i2s (&n->peer),
1215               GNUNET_TIME_absolute_get_remaining (n->
1216                                                   encrypted_head->deadline).
1217               value);
1218 #endif
1219   n->th =
1220     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1221                                             n->encrypted_head->size,
1222                                             GNUNET_TIME_absolute_get_remaining
1223                                             (n->encrypted_head->deadline),
1224                                             &notify_encrypted_transmit_ready,
1225                                             n);
1226   if (n->th == NULL)
1227     {
1228       /* message request too large (oops) */
1229       GNUNET_break (0);
1230       /* discard encrypted message */
1231       GNUNET_assert (NULL != (m = n->encrypted_head));
1232       n->encrypted_head = m->next;
1233       if (m->next == NULL)
1234         n->encrypted_tail = NULL;
1235       GNUNET_free (m);
1236       process_encrypted_neighbour_queue (n);
1237     }
1238 }
1239
1240
1241 /**
1242  * Decrypt size bytes from in and write the result to out.  Use the
1243  * key for inbound traffic of the given neighbour.  This function does
1244  * NOT do any integrity-checks on the result.
1245  *
1246  * @param n neighbour we are receiving from
1247  * @param iv initialization vector to use
1248  * @param in ciphertext
1249  * @param out plaintext
1250  * @param size size of in/out
1251  * @return GNUNET_OK on success
1252  */
1253 static int
1254 do_decrypt (struct Neighbour *n,
1255             const GNUNET_HashCode * iv,
1256             const void *in, void *out, size_t size)
1257 {
1258   if (size != (uint16_t) size)
1259     {
1260       GNUNET_break (0);
1261       return GNUNET_NO;
1262     }
1263   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1264       (n->status != PEER_STATE_KEY_CONFIRMED))
1265     {
1266       GNUNET_break_op (0);
1267       return GNUNET_SYSERR;
1268     }
1269   if (size !=
1270       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1271                                  in,
1272                                  (uint16_t) size,
1273                                  (const struct
1274                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1275                                  out))
1276     {
1277       GNUNET_break (0);
1278       return GNUNET_SYSERR;
1279     }
1280 #if DEBUG_CORE
1281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282               "Decrypted %u bytes from `%4s' using key %u\n",
1283               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1284 #endif
1285   return GNUNET_OK;
1286 }
1287
1288
1289 /**
1290  * Encrypt size bytes from in and write the result to out.  Use the
1291  * key for outbound traffic of the given neighbour.
1292  *
1293  * @param n neighbour we are sending to
1294  * @param iv initialization vector to use
1295  * @param in ciphertext
1296  * @param out plaintext
1297  * @param size size of in/out
1298  * @return GNUNET_OK on success
1299  */
1300 static int
1301 do_encrypt (struct Neighbour *n,
1302             const GNUNET_HashCode * iv,
1303             const void *in, void *out, size_t size)
1304 {
1305   if (size != (uint16_t) size)
1306     {
1307       GNUNET_break (0);
1308       return GNUNET_NO;
1309     }
1310   GNUNET_assert (size ==
1311                  GNUNET_CRYPTO_aes_encrypt (in,
1312                                             (uint16_t) size,
1313                                             &n->encrypt_key,
1314                                             (const struct
1315                                              GNUNET_CRYPTO_AesInitializationVector
1316                                              *) iv, out));
1317 #if DEBUG_CORE
1318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319               "Encrypted %u bytes for `%4s' using key %u\n", size,
1320               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1321 #endif
1322   return GNUNET_OK;
1323 }
1324
1325
1326 /**
1327  * Select messages for transmission.  This heuristic uses a combination
1328  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1329  * and priority-based discard (in case no feasible schedule exist) and
1330  * speculative optimization (defer any kind of transmission until
1331  * we either create a batch of significant size, 25% of max, or until
1332  * we are close to a deadline).  Furthermore, when scheduling the
1333  * heuristic also packs as many messages into the batch as possible,
1334  * starting with those with the earliest deadline.  Yes, this is fun.
1335  *
1336  * @param n neighbour to select messages from
1337  * @param size number of bytes to select for transmission
1338  * @param retry_time set to the time when we should try again
1339  *        (only valid if this function returns zero)
1340  * @return number of bytes selected, or 0 if we decided to
1341  *         defer scheduling overall; in that case, retry_time is set.
1342  */
1343 static size_t
1344 select_messages (struct Neighbour *n,
1345                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1346 {
1347   struct MessageEntry *pos;
1348   struct MessageEntry *min;
1349   struct MessageEntry *last;
1350   unsigned int min_prio;
1351   struct GNUNET_TIME_Absolute t;
1352   struct GNUNET_TIME_Absolute now;
1353   uint64_t delta;
1354   uint64_t avail;
1355   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1356   size_t off;
1357   int discard_low_prio;
1358
1359   GNUNET_assert (NULL != n->messages);
1360   now = GNUNET_TIME_absolute_get ();
1361   /* last entry in linked list of messages processed */
1362   last = NULL;
1363   /* should we remove the entry with the lowest
1364      priority from consideration for scheduling at the
1365      end of the loop? */
1366   discard_low_prio = GNUNET_YES;
1367   while (GNUNET_YES == discard_low_prio)
1368     {
1369       min = NULL;
1370       min_prio = -1;
1371       discard_low_prio = GNUNET_NO;
1372       /* number of bytes available for transmission at time "t" */
1373       avail = n->available_send_window;
1374       t = n->last_asw_update;
1375       /* how many bytes have we (hyptothetically) scheduled so far */
1376       off = 0;
1377       /* maximum time we can wait before transmitting anything
1378          and still make all of our deadlines */
1379       slack = -1;
1380
1381       pos = n->messages;
1382       /* note that we use "*2" here because we want to look
1383          a bit further into the future; much more makes no
1384          sense since new message might be scheduled in the
1385          meantime... */
1386       while ((pos != NULL) && (off < size * 2))
1387         {
1388           if (pos->do_transmit == GNUNET_YES)
1389             {
1390               /* already removed from consideration */
1391               pos = pos->next;
1392               continue;
1393             }
1394           if (discard_low_prio == GNUNET_NO)
1395             {
1396               delta = pos->deadline.value;
1397               if (delta < t.value)
1398                 delta = 0;
1399               else
1400                 delta = t.value - delta;
1401               avail += delta * n->bpm_out / 1000 / 60;
1402               if (avail < pos->size)
1403                 {
1404                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1405                 }
1406               else
1407                 {
1408                   avail -= pos->size;
1409                   /* update slack, considering both its absolute deadline
1410                      and relative deadlines caused by other messages
1411                      with their respective load */
1412                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1413                   if (pos->deadline.value < now.value)
1414                     slack = 0;
1415                   else
1416                     slack =
1417                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1418                 }
1419             }
1420           off += pos->size;
1421           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1422           if (pos->priority <= min_prio)
1423             {
1424               /* update min for discard */
1425               min_prio = pos->priority;
1426               min = pos;
1427             }
1428           pos = pos->next;
1429         }
1430       if (discard_low_prio)
1431         {
1432           GNUNET_assert (min != NULL);
1433           /* remove lowest-priority entry from consideration */
1434           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1435         }
1436       last = pos;
1437     }
1438   /* guard against sending "tiny" messages with large headers without
1439      urgent deadlines */
1440   if ( (slack > 1000) && (size > 4 * off) )
1441     {
1442       /* less than 25% of message would be filled with
1443          deadlines still being met if we delay by one
1444          second or more; so just wait for more data */
1445       retry_time->value = slack / 2;
1446       /* reset do_transmit values for next time */
1447       while (pos != last)
1448         {
1449           pos->do_transmit = GNUNET_NO;
1450           pos = pos->next;
1451         }
1452       return 0;
1453     }
1454   /* select marked messages (up to size) for transmission */
1455   off = 0;
1456   pos = n->messages;
1457   while (pos != last)
1458     {
1459       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1460         {
1461           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1462           off += pos->size;
1463           size -= pos->size;
1464         }
1465       else
1466         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1467       pos = pos->next;
1468     }
1469 #if DEBUG_CORE
1470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1471               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1472               off, GNUNET_i2s (&n->peer));
1473 #endif
1474   return off;
1475 }
1476
1477
1478 /**
1479  * Batch multiple messages into a larger buffer.
1480  *
1481  * @param n neighbour to take messages from
1482  * @param buf target buffer
1483  * @param size size of buf
1484  * @param deadline set to transmission deadline for the result
1485  * @param retry_time set to the time when we should try again
1486  *        (only valid if this function returns zero)
1487  * @param priority set to the priority of the batch
1488  * @return number of bytes written to buf (can be zero)
1489  */
1490 static size_t
1491 batch_message (struct Neighbour *n,
1492                char *buf,
1493                size_t size,
1494                struct GNUNET_TIME_Absolute *deadline,
1495                struct GNUNET_TIME_Relative *retry_time,
1496                unsigned int *priority)
1497 {
1498   struct MessageEntry *pos;
1499   struct MessageEntry *prev;
1500   struct MessageEntry *next;
1501   size_t ret;
1502
1503   ret = 0;
1504   *priority = 0;
1505   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1506   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1507   if (0 == select_messages (n, size, retry_time))
1508     {
1509       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1510                   "No messages selected, will try again in %llu ms\n",
1511                   retry_time->value);
1512       return 0;
1513     }
1514   pos = n->messages;
1515   prev = NULL;
1516   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1517     {
1518       next = pos->next;
1519       if (GNUNET_YES == pos->do_transmit)
1520         {
1521           GNUNET_assert (pos->size <= size);
1522           memcpy (&buf[ret], &pos[1], pos->size);
1523           ret += pos->size;
1524           size -= pos->size;
1525           *priority += pos->priority;
1526           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1527           GNUNET_free (pos);
1528           if (prev == NULL)
1529             n->messages = next;
1530           else
1531             prev->next = next;
1532         }
1533       else
1534         {
1535           prev = pos;
1536         }
1537       pos = next;
1538     }
1539   return ret;
1540 }
1541
1542
1543 /**
1544  * Remove messages with deadlines that have long expired from
1545  * the queue.
1546  *
1547  * @param n neighbour to inspect
1548  */
1549 static void
1550 discard_expired_messages (struct Neighbour *n)
1551 {
1552   struct MessageEntry *prev;
1553   struct MessageEntry *next;
1554   struct MessageEntry *pos;
1555   struct GNUNET_TIME_Absolute now;
1556   struct GNUNET_TIME_Relative delta;
1557
1558   now = GNUNET_TIME_absolute_get ();
1559   prev = NULL;
1560   pos = n->messages;
1561   while (pos != NULL) 
1562     {
1563       next = pos->next;
1564       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1565       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1566         {
1567 #if DEBUG_CORE
1568           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1569                       "Message is %llu ms past due, discarding.\n",
1570                       delta.value);
1571 #endif
1572           if (prev == NULL)
1573             n->messages = next;
1574           else
1575             prev->next = next;
1576           GNUNET_free (pos);
1577         }
1578       else
1579         prev = pos;
1580       pos = next;
1581     }
1582 }
1583
1584
1585 /**
1586  * Signature of the main function of a task.
1587  *
1588  * @param cls closure
1589  * @param tc context information (why was this task triggered now)
1590  */
1591 static void
1592 retry_plaintext_processing (void *cls,
1593                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1594 {
1595   struct Neighbour *n = cls;
1596
1597   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1598   process_plaintext_neighbour_queue (n);
1599 }
1600
1601
1602 /**
1603  * Send our key (and encrypted PING) to the other peer.
1604  *
1605  * @param n the other peer
1606  */
1607 static void send_key (struct Neighbour *n);
1608
1609
1610 /**
1611  * Check if we have plaintext messages for the specified neighbour
1612  * pending, and if so, consider batching and encrypting them (and
1613  * then trigger processing of the encrypted queue if needed).
1614  *
1615  * @param n neighbour to check.
1616  */
1617 static void
1618 process_plaintext_neighbour_queue (struct Neighbour *n)
1619 {
1620   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1621   size_t used;
1622   size_t esize;
1623   struct EncryptedMessage *em;  /* encrypted message */
1624   struct EncryptedMessage *ph;  /* plaintext header */
1625   struct MessageEntry *me;
1626   unsigned int priority;
1627   struct GNUNET_TIME_Absolute deadline;
1628   struct GNUNET_TIME_Relative retry_time;
1629
1630   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1631     {
1632       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1633       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1634     }
1635   switch (n->status)
1636     {
1637     case PEER_STATE_DOWN:
1638       send_key (n);
1639 #if DEBUG_CORE
1640       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1642                   GNUNET_i2s(&n->peer));
1643 #endif
1644       return;
1645     case PEER_STATE_KEY_SENT:
1646       GNUNET_assert (n->retry_set_key_task !=
1647                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1648 #if DEBUG_CORE
1649       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1650                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1651                   GNUNET_i2s(&n->peer));
1652 #endif
1653       return;
1654     case PEER_STATE_KEY_RECEIVED:
1655       GNUNET_assert (n->retry_set_key_task !=
1656                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1657 #if DEBUG_CORE
1658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1660                   GNUNET_i2s(&n->peer));
1661 #endif
1662       return;
1663     case PEER_STATE_KEY_CONFIRMED:
1664       /* ready to continue */
1665       break;
1666     }
1667   discard_expired_messages (n);
1668   if (n->messages == NULL)
1669     {
1670 #if DEBUG_CORE
1671       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                   "Plaintext message queue for `%4s' is empty.\n",
1673                   GNUNET_i2s(&n->peer));
1674 #endif
1675       return;                   /* no pending messages */
1676     }
1677   if (n->encrypted_head != NULL)
1678     {
1679 #if DEBUG_CORE
1680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1682                   GNUNET_i2s(&n->peer));
1683 #endif
1684       return;                   /* wait for messages already encrypted to be
1685                                    processed first! */
1686     }
1687   ph = (struct EncryptedMessage *) pbuf;
1688   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1689   priority = 0;
1690   used = sizeof (struct EncryptedMessage);
1691   used += batch_message (n,
1692                          &pbuf[used],
1693                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1694                          &deadline, &retry_time, &priority);
1695   if (used == sizeof (struct EncryptedMessage))
1696     {
1697 #if DEBUG_CORE
1698       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1699                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1700                   GNUNET_i2s(&n->peer));
1701 #endif
1702       /* no messages selected for sending, try again later... */
1703       n->retry_plaintext_task =
1704         GNUNET_SCHEDULER_add_delayed (sched,
1705                                       GNUNET_NO,
1706                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1707                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1708                                       retry_time,
1709                                       &retry_plaintext_processing, n);
1710       return;
1711     }
1712
1713   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1714   ph->inbound_bpm_limit = htonl (n->bpm_in);
1715   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1716
1717   /* setup encryption message header */
1718   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1719   me->deadline = deadline;
1720   me->priority = priority;
1721   me->size = used;
1722   em = (struct EncryptedMessage *) &me[1];
1723   em->header.size = htons (used);
1724   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1725   em->reserved = htonl (0);
1726   esize = used - ENCRYPTED_HEADER_SIZE;
1727   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1728   /* encrypt */
1729 #if DEBUG_CORE
1730   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1732               esize,
1733               GNUNET_i2s(&n->peer));
1734 #endif
1735   GNUNET_assert (GNUNET_OK ==
1736                  do_encrypt (n,
1737                              &em->plaintext_hash,
1738                              &ph->sequence_number,
1739                              &em->sequence_number, esize));
1740   /* append to transmission list */
1741   if (n->encrypted_tail == NULL)
1742     n->encrypted_head = me;
1743   else
1744     n->encrypted_tail->next = me;
1745   n->encrypted_tail = me;
1746   process_encrypted_neighbour_queue (n);
1747 }
1748
1749
1750 /**
1751  * Handle CORE_SEND request.
1752  */
1753 static void
1754 handle_client_send (void *cls,
1755                     struct GNUNET_SERVER_Client *client,
1756                     const struct GNUNET_MessageHeader *message);
1757
1758
1759 /**
1760  * Function called to notify us that we either succeeded
1761  * or failed to connect (at the transport level) to another
1762  * peer.  We should either free the message we were asked
1763  * to transmit or re-try adding it to the queue.
1764  *
1765  * @param cls closure
1766  * @param size number of bytes available in buf
1767  * @param buf where the callee should write the message
1768  * @return number of bytes written to buf
1769  */
1770 static size_t
1771 send_connect_continuation (void *cls, size_t size, void *buf)
1772 {
1773   struct SendMessage *sm = cls;
1774
1775   if (buf == NULL)
1776     {
1777 #if DEBUG_CORE
1778       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1779                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1780                   GNUNET_i2s (&sm->peer));
1781 #endif
1782       GNUNET_free (sm);
1783       return 0;
1784     }
1785 #if DEBUG_CORE
1786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1788               GNUNET_i2s (&sm->peer));
1789 #endif
1790   handle_client_send (NULL, NULL, &sm->header);
1791   GNUNET_free (sm);
1792   return 0;
1793 }
1794
1795
1796 /**
1797  * Handle CORE_SEND request.
1798  */
1799 static void
1800 handle_client_send (void *cls,
1801                     struct GNUNET_SERVER_Client *client,
1802                     const struct GNUNET_MessageHeader *message)
1803 {
1804   const struct SendMessage *sm;
1805   struct SendMessage *smc;
1806   const struct GNUNET_MessageHeader *mh;
1807   struct Neighbour *n;
1808   struct MessageEntry *prev;
1809   struct MessageEntry *pos;
1810   struct MessageEntry *e; 
1811   struct MessageEntry *min_prio_entry;
1812   struct MessageEntry *min_prio_prev;
1813   unsigned int min_prio;
1814   unsigned int queue_size;
1815   uint16_t msize;
1816
1817   msize = ntohs (message->size);
1818   if (msize <
1819       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1820     {
1821       GNUNET_break (0);
1822       if (client != NULL)
1823         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1824       return;
1825     }
1826   sm = (const struct SendMessage *) message;
1827   msize -= sizeof (struct SendMessage);
1828   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1829   if (msize != ntohs (mh->size))
1830     {
1831       GNUNET_break (0);
1832       if (client != NULL)
1833         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1834       return;
1835     }
1836   n = find_neighbour (&sm->peer);
1837   if (n == NULL)
1838     {
1839 #if DEBUG_CORE
1840       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1842                   "SEND",
1843                   GNUNET_i2s (&sm->peer),
1844                   GNUNET_TIME_absolute_get_remaining
1845                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1846 #endif
1847       msize += sizeof (struct SendMessage);
1848       /* ask transport to connect to the peer */
1849       smc = GNUNET_malloc (msize);
1850       memcpy (smc, sm, msize);
1851       if (NULL ==
1852           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1853                                                   &sm->peer,
1854                                                   0,
1855                                                   GNUNET_TIME_absolute_get_remaining
1856                                                   (GNUNET_TIME_absolute_ntoh
1857                                                    (sm->deadline)),
1858                                                   &send_connect_continuation,
1859                                                   smc))
1860         {
1861           /* transport has already a request pending for this peer! */
1862 #if DEBUG_CORE
1863           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1864                       "Dropped second message destined for `%4s' since connection is still down.\n",
1865                       GNUNET_i2s(&sm->peer));
1866 #endif
1867           GNUNET_free (smc);
1868         }
1869       if (client != NULL)
1870         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1871       return;
1872     }
1873 #if DEBUG_CORE
1874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1876               "SEND",
1877               msize, 
1878               GNUNET_i2s (&sm->peer));
1879 #endif
1880   /* bound queue size */
1881   discard_expired_messages (n);
1882   min_prio = (unsigned int) -1;
1883   queue_size = 0;
1884   prev = NULL;
1885   pos = n->messages;
1886   while (pos != NULL) 
1887     {
1888       if (pos->priority < min_prio)
1889         {
1890           min_prio_entry = pos;
1891           min_prio_prev = prev;
1892           min_prio = pos->priority;
1893         }
1894       queue_size++;
1895       prev = pos;
1896       pos = pos->next;
1897     }
1898   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1899     {
1900       /* queue full */
1901       if (ntohl(sm->priority) <= min_prio)
1902         {
1903           /* discard new entry */
1904 #if DEBUG_CORE
1905           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                       "Queue full, discarding new request\n");
1907 #endif
1908           if (client != NULL)
1909             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1910           return;
1911         }
1912       /* discard "min_prio_entry" */
1913 #if DEBUG_CORE
1914       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1915                   "Queue full, discarding existing older request\n");
1916 #endif
1917       if (min_prio_prev == NULL)
1918         n->messages = min_prio_entry->next;
1919       else
1920         min_prio_prev->next = min_prio_entry->next;      
1921       GNUNET_free (min_prio_entry);     
1922     }
1923   
1924   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1925   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1926   e->priority = ntohl (sm->priority);
1927   e->size = msize;
1928   memcpy (&e[1], mh, msize);
1929
1930   /* insert, keep list sorted by deadline */
1931   prev = NULL;
1932   pos = n->messages;
1933   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1934     {
1935       prev = pos;
1936       pos = pos->next;
1937     }
1938   if (prev == NULL)
1939     n->messages = e;
1940   else
1941     prev->next = e;
1942   e->next = pos;
1943
1944   /* consider scheduling now */
1945   process_plaintext_neighbour_queue (n);
1946   if (client != NULL)
1947     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1948 }
1949
1950
1951 /**
1952  * List of handlers for the messages understood by this
1953  * service.
1954  */
1955 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1956   {&handle_client_init, NULL,
1957    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1958   {&handle_client_request_configure, NULL,
1959    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1960    sizeof (struct RequestConfigureMessage)},
1961   {&handle_client_send, NULL,
1962    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1963   {NULL, NULL, 0, 0}
1964 };
1965
1966
1967 /**
1968  * PEERINFO is giving us a HELLO for a peer.  Add the
1969  * public key to the neighbour's struct and retry
1970  * send_key.  Or, if we did not get a HELLO, just do
1971  * nothing.
1972  *
1973  * @param cls NULL
1974  * @param peer the peer for which this is the HELLO
1975  * @param hello HELLO message of that peer
1976  * @param trust amount of trust we currently have in that peer
1977  */
1978 static void
1979 process_hello_retry_send_key (void *cls,
1980                               const struct GNUNET_PeerIdentity *peer,
1981                               const struct GNUNET_HELLO_Message *hello,
1982                               uint32_t trust)
1983 {
1984   struct Neighbour *n;
1985
1986   if (peer == NULL)
1987     return;
1988   n = find_neighbour (peer);
1989   if (n == NULL)
1990     return;
1991   if (n->public_key != NULL)
1992     return;
1993 #if DEBUG_CORE
1994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1995               "Received new `%s' message for `%4s', initiating key exchange.\n",
1996               "HELLO",
1997               GNUNET_i2s (peer));
1998 #endif
1999   n->public_key =
2000     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2001   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2002     {
2003       GNUNET_free (n->public_key);
2004       n->public_key = NULL;
2005       return;
2006     }
2007   send_key (n);
2008 }
2009
2010
2011 /**
2012  * Task that will retry "send_key" if our previous attempt failed
2013  * to yield a PONG.
2014  */
2015 static void
2016 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2017 {
2018   struct Neighbour *n = cls;
2019
2020   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2021   n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2022   n->set_key_retry_frequency =
2023     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2024   send_key (n);
2025 }
2026
2027
2028 /**
2029  * Send our key (and encrypted PING) to the other peer.
2030  *
2031  * @param n the other peer
2032  */
2033 static void
2034 send_key (struct Neighbour *n)
2035 {
2036   struct SetKeyMessage *sm;
2037   struct MessageEntry *me;
2038   struct PingMessage pp;
2039   struct PingMessage *pm;
2040
2041 #if DEBUG_CORE
2042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2043               "Asked to perform key exchange with `%4s'.\n",
2044               GNUNET_i2s (&n->peer));
2045 #endif
2046   if (n->public_key == NULL)
2047     {
2048       /* lookup n's public key, then try again */
2049 #if DEBUG_CORE
2050       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2051                   "Lacking public key for `%4s', trying to obtain one.\n",
2052                   GNUNET_i2s (&n->peer));
2053 #endif
2054       GNUNET_PEERINFO_for_all (cfg,
2055                                sched,
2056                                &n->peer,
2057                                0,
2058                                GNUNET_TIME_UNIT_MINUTES,
2059                                &process_hello_retry_send_key, NULL);
2060       return;
2061     }
2062   /* first, set key message */
2063   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2064                       sizeof (struct SetKeyMessage));
2065   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2066   me->priority = SET_KEY_PRIORITY;
2067   me->size = sizeof (struct SetKeyMessage);
2068   if (n->encrypted_head == NULL)
2069     n->encrypted_head = me;
2070   else
2071     n->encrypted_tail->next = me;
2072   n->encrypted_tail = me;
2073   sm = (struct SetKeyMessage *) &me[1];
2074   sm->header.size = htons (sizeof (struct SetKeyMessage));
2075   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2076   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2077                                         PEER_STATE_KEY_SENT : n->status));
2078   sm->purpose.size =
2079     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2080            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2081            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2082            sizeof (struct GNUNET_PeerIdentity));
2083   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2084   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2085   sm->target = n->peer;
2086   GNUNET_assert (GNUNET_OK ==
2087                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2088                                             sizeof (struct
2089                                                     GNUNET_CRYPTO_AesSessionKey),
2090                                             n->public_key,
2091                                             &sm->encrypted_key));
2092   GNUNET_assert (GNUNET_OK ==
2093                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2094                                          &sm->signature));
2095
2096   /* second, encrypted PING message */
2097   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2098                       sizeof (struct PingMessage));
2099   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2100   me->priority = PING_PRIORITY;
2101   me->size = sizeof (struct PingMessage);
2102   n->encrypted_tail->next = me;
2103   n->encrypted_tail = me;
2104   pm = (struct PingMessage *) &me[1];
2105   pm->header.size = htons (sizeof (struct PingMessage));
2106   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2107   pp.challenge = htonl (n->ping_challenge);
2108   pp.target = n->peer;
2109 #if DEBUG_CORE
2110   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2111               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2112               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2114               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2115               "PING",
2116               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2117 #endif
2118   do_encrypt (n,
2119               &n->peer.hashPubKey,
2120               &pp.challenge,
2121               &pm->challenge,
2122               sizeof (struct PingMessage) -
2123               sizeof (struct GNUNET_MessageHeader));
2124   /* update status */
2125   switch (n->status)
2126     {
2127     case PEER_STATE_DOWN:
2128       n->status = PEER_STATE_KEY_SENT;
2129       break;
2130     case PEER_STATE_KEY_SENT:
2131       break;
2132     case PEER_STATE_KEY_RECEIVED:
2133       break;
2134     case PEER_STATE_KEY_CONFIRMED:
2135       GNUNET_break (0);
2136       break;
2137     default:
2138       GNUNET_break (0);
2139       break;
2140     }
2141   /* trigger queue processing */
2142   process_encrypted_neighbour_queue (n);
2143   if (n->status != PEER_STATE_KEY_CONFIRMED)
2144     n->retry_set_key_task
2145       = GNUNET_SCHEDULER_add_delayed (sched,
2146                                       GNUNET_NO,
2147                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2148                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2149                                       n->set_key_retry_frequency,
2150                                       &set_key_retry_task, n);
2151 }
2152
2153
2154 /**
2155  * We received a SET_KEY message.  Validate and update
2156  * our key material and status.
2157  *
2158  * @param n the neighbour from which we received message m
2159  * @param m the set key message we received
2160  */
2161 static void
2162 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2163
2164
2165 /**
2166  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2167  * the neighbour's struct and retry handling the set_key message.  Or,
2168  * if we did not get a HELLO, just free the set key message.
2169  *
2170  * @param cls pointer to the set key message
2171  * @param peer the peer for which this is the HELLO
2172  * @param hello HELLO message of that peer
2173  * @param trust amount of trust we currently have in that peer
2174  */
2175 static void
2176 process_hello_retry_handle_set_key (void *cls,
2177                                     const struct GNUNET_PeerIdentity *peer,
2178                                     const struct GNUNET_HELLO_Message *hello,
2179                                     uint32_t trust)
2180 {
2181   struct SetKeyMessage *sm = cls;
2182   struct Neighbour *n;
2183
2184   if (peer == NULL)
2185     {
2186       GNUNET_free (sm);
2187       return;
2188     }
2189   n = find_neighbour (peer);
2190   if (n == NULL)
2191     {
2192       GNUNET_break (0);
2193       return;
2194     }
2195   if (n->public_key != NULL)
2196     return;                     /* multiple HELLOs match!? */
2197   n->public_key =
2198     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2199   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2200     {
2201       GNUNET_break_op (0);
2202       GNUNET_free (n->public_key);
2203       n->public_key = NULL;
2204       return;
2205     }
2206 #if DEBUG_CORE
2207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2208               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2209               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2210 #endif
2211   handle_set_key (n, sm);
2212 }
2213
2214
2215 /**
2216  * We received a PING message.  Validate and transmit
2217  * PONG.
2218  *
2219  * @param n sender of the PING
2220  * @param m the encrypted PING message itself
2221  */
2222 static void
2223 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2224 {
2225   struct PingMessage t;
2226   struct PingMessage *tp;
2227   struct MessageEntry *me;
2228
2229 #if DEBUG_CORE
2230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2231               "Core service receives `%s' request from `%4s'.\n",
2232               "PING", GNUNET_i2s (&n->peer));
2233 #endif
2234   if (GNUNET_OK !=
2235       do_decrypt (n,
2236                   &my_identity.hashPubKey,
2237                   &m->challenge,
2238                   &t.challenge,
2239                   sizeof (struct PingMessage) -
2240                   sizeof (struct GNUNET_MessageHeader)))
2241     return;
2242 #if DEBUG_CORE
2243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2245               "PING",
2246               GNUNET_i2s (&t.target),
2247               ntohl (t.challenge), n->decrypt_key.crc32);
2248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249               "Target of `%s' request is `%4s'.\n",
2250               "PING", GNUNET_i2s (&t.target));
2251 #endif
2252   if (0 != memcmp (&t.target,
2253                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2254     {
2255       GNUNET_break_op (0);
2256       return;
2257     }
2258   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2259                       sizeof (struct PingMessage));
2260   if (n->encrypted_tail != NULL)
2261     n->encrypted_tail->next = me;
2262   else
2263     {
2264       n->encrypted_tail = me;
2265       n->encrypted_head = me;
2266     }
2267   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2268   me->priority = PONG_PRIORITY;
2269   me->size = sizeof (struct PingMessage);
2270   tp = (struct PingMessage *) &me[1];
2271   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2272   tp->header.size = htons (sizeof (struct PingMessage));
2273   do_encrypt (n,
2274               &my_identity.hashPubKey,
2275               &t.challenge,
2276               &tp->challenge,
2277               sizeof (struct PingMessage) -
2278               sizeof (struct GNUNET_MessageHeader));
2279 #if DEBUG_CORE
2280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2281               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2282               ntohl (t.challenge), n->encrypt_key.crc32);
2283 #endif
2284   /* trigger queue processing */
2285   process_encrypted_neighbour_queue (n);
2286 }
2287
2288
2289 /**
2290  * We received a SET_KEY message.  Validate and update
2291  * our key material and status.
2292  *
2293  * @param n the neighbour from which we received message m
2294  * @param m the set key message we received
2295  */
2296 static void
2297 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2298 {
2299   struct SetKeyMessage *m_cpy;
2300   struct GNUNET_TIME_Absolute t;
2301   struct GNUNET_CRYPTO_AesSessionKey k;
2302   struct PingMessage *ping;
2303   enum PeerStateMachine sender_status;
2304
2305 #if DEBUG_CORE
2306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2307               "Core service receives `%s' request from `%4s'.\n",
2308               "SET_KEY", GNUNET_i2s (&n->peer));
2309 #endif
2310   if (n->public_key == NULL)
2311     {
2312 #if DEBUG_CORE
2313       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2314                   "Lacking public key for peer, trying to obtain one.\n");
2315 #endif
2316       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2317       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2318       /* lookup n's public key, then try again */
2319       GNUNET_PEERINFO_for_all (cfg,
2320                                sched,
2321                                &n->peer,
2322                                0,
2323                                GNUNET_TIME_UNIT_MINUTES,
2324                                &process_hello_retry_handle_set_key, m_cpy);
2325       return;
2326     }
2327   if ((ntohl (m->purpose.size) !=
2328        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2329        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2330        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2331        sizeof (struct GNUNET_PeerIdentity)) ||
2332       (GNUNET_OK !=
2333        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2334                                  &m->purpose, &m->signature, n->public_key)))
2335     {
2336       /* invalid signature */
2337       GNUNET_break_op (0);
2338       return;
2339     }
2340   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2341   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2342        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2343       (t.value < n->decrypt_key_created.value))
2344     {
2345       /* this could rarely happen due to massive re-ordering of
2346          messages on the network level, but is most likely either
2347          a bug or some adversary messing with us.  Report. */
2348       GNUNET_break_op (0);
2349       return;
2350     }
2351 #if DEBUG_CORE
2352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2353 #endif
2354   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2355                                   &m->encrypted_key,
2356                                   &k,
2357                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2358        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2359       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2360     {
2361       /* failed to decrypt !? */
2362       GNUNET_break_op (0);
2363       return;
2364     }
2365
2366   n->decrypt_key = k;
2367   if (n->decrypt_key_created.value != t.value)
2368     {
2369       /* fresh key, reset sequence numbers */
2370       n->last_sequence_number_received = 0;
2371       n->last_packets_bitmap = 0;
2372       n->decrypt_key_created = t;
2373     }
2374   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2375   switch (n->status)
2376     {
2377     case PEER_STATE_DOWN:
2378       n->status = PEER_STATE_KEY_RECEIVED;
2379 #if DEBUG_CORE
2380       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381                   "Responding to `%s' with my own key.\n", "SET_KEY");
2382 #endif
2383       send_key (n);
2384       break;
2385     case PEER_STATE_KEY_SENT:
2386     case PEER_STATE_KEY_RECEIVED:
2387       n->status = PEER_STATE_KEY_RECEIVED;
2388       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2389           (sender_status != PEER_STATE_KEY_CONFIRMED))
2390         {
2391 #if DEBUG_CORE
2392           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393                       "Responding to `%s' with my own key (other peer has status %u).\n",
2394                       "SET_KEY", sender_status);
2395 #endif
2396           send_key (n);
2397         }
2398       break;
2399     case PEER_STATE_KEY_CONFIRMED:
2400       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2401           (sender_status != PEER_STATE_KEY_CONFIRMED))
2402         {
2403 #if DEBUG_CORE
2404           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2406                       "SET_KEY", sender_status);
2407 #endif
2408           send_key (n);
2409         }
2410       break;
2411     default:
2412       GNUNET_break (0);
2413       break;
2414     }
2415   if (n->pending_ping != NULL)
2416     {
2417       ping = n->pending_ping;
2418       n->pending_ping = NULL;
2419       handle_ping (n, ping);
2420       GNUNET_free (ping);
2421     }
2422 }
2423
2424
2425 /**
2426  * We received a PONG message.  Validate and update
2427  * our status.
2428  *
2429  * @param n sender of the PONG
2430  * @param m the encrypted PONG message itself
2431  */
2432 static void
2433 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2434 {
2435   struct PingMessage t;
2436
2437 #if DEBUG_CORE
2438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2439               "Core service receives `%s' request from `%4s'.\n",
2440               "PONG", GNUNET_i2s (&n->peer));
2441 #endif
2442   if (GNUNET_OK !=
2443       do_decrypt (n,
2444                   &n->peer.hashPubKey,
2445                   &m->challenge,
2446                   &t.challenge,
2447                   sizeof (struct PingMessage) -
2448                   sizeof (struct GNUNET_MessageHeader)))
2449     return;
2450 #if DEBUG_CORE
2451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2453               "PONG",
2454               GNUNET_i2s (&t.target),
2455               ntohl (t.challenge), n->decrypt_key.crc32);
2456 #endif
2457   if ((0 != memcmp (&t.target,
2458                     &n->peer,
2459                     sizeof (struct GNUNET_PeerIdentity))) ||
2460       (n->ping_challenge != ntohl (t.challenge)))
2461     {
2462       /* PONG malformed */
2463 #if DEBUG_CORE
2464       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2465                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2466                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2467       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2468                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2469                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2470 #endif
2471       GNUNET_break_op (0);
2472       return;
2473     }
2474   switch (n->status)
2475     {
2476     case PEER_STATE_DOWN:
2477       GNUNET_break (0);         /* should be impossible */
2478       return;
2479     case PEER_STATE_KEY_SENT:
2480       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2481       return;
2482     case PEER_STATE_KEY_RECEIVED:
2483       n->status = PEER_STATE_KEY_CONFIRMED;
2484       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
2485         {
2486           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2487           n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2488         }
2489       process_encrypted_neighbour_queue (n);
2490       break;
2491     case PEER_STATE_KEY_CONFIRMED:
2492       /* duplicate PONG? */
2493       break;
2494     default:
2495       GNUNET_break (0);
2496       break;
2497     }
2498 }
2499
2500
2501 /**
2502  * Send a P2P message to a client.
2503  *
2504  * @param sender who sent us the message?
2505  * @param client who should we give the message to?
2506  * @param m contains the message to transmit
2507  * @param msize number of bytes in buf to transmit
2508  */
2509 static void
2510 send_p2p_message_to_client (struct Neighbour *sender,
2511                             struct Client *client,
2512                             const void *m, size_t msize)
2513 {
2514   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2515   struct NotifyTrafficMessage *ntm;
2516
2517 #if DEBUG_CORE
2518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2519               "Core service passes message from `%4s' of type %u to client.\n",
2520               GNUNET_i2s(&sender->peer),
2521               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2522 #endif
2523   ntm = (struct NotifyTrafficMessage *) buf;
2524   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2525   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2526   ntm->reserved = htonl (0);
2527   ntm->peer = sender->peer;
2528   memcpy (&ntm[1], m, msize);
2529   send_to_client (client, &ntm->header, GNUNET_YES);
2530 }
2531
2532
2533 /**
2534  * Deliver P2P message to interested clients.
2535  *
2536  * @param sender who sent us the message?
2537  * @param m the message
2538  * @param msize size of the message (including header)
2539  */
2540 static void
2541 deliver_message (struct Neighbour *sender,
2542                  const struct GNUNET_MessageHeader *m, size_t msize)
2543 {
2544   struct Client *cpos;
2545   uint16_t type;
2546   unsigned int tpos;
2547   int deliver_full;
2548
2549   type = ntohs (m->type);
2550   cpos = clients;
2551   while (cpos != NULL)
2552     {
2553       deliver_full = GNUNET_NO;
2554       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2555         deliver_full = GNUNET_YES;
2556       else
2557         {
2558           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2559             {
2560               if (type != cpos->types[tpos])
2561                 continue;
2562               deliver_full = GNUNET_YES;
2563               break;
2564             }
2565         }
2566       if (GNUNET_YES == deliver_full)
2567         send_p2p_message_to_client (sender, cpos, m, msize);
2568       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2569         send_p2p_message_to_client (sender, cpos, m,
2570                                     sizeof (struct GNUNET_MessageHeader));
2571       cpos = cpos->next;
2572     }
2573 }
2574
2575
2576 /**
2577  * Align P2P message and then deliver to interested clients.
2578  *
2579  * @param sender who sent us the message?
2580  * @param buffer unaligned (!) buffer containing message
2581  * @param msize size of the message (including header)
2582  */
2583 static void
2584 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2585 {
2586   char abuf[msize];
2587
2588   /* TODO: call to statistics? */
2589   memcpy (abuf, buffer, msize);
2590   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2591 }
2592
2593
2594 /**
2595  * Deliver P2P messages to interested clients.
2596  *
2597  * @param sender who sent us the message?
2598  * @param buffer buffer containing messages, can be modified
2599  * @param buffer_size size of the buffer (overall)
2600  * @param offset offset where messages in the buffer start
2601  */
2602 static void
2603 deliver_messages (struct Neighbour *sender,
2604                   const char *buffer, size_t buffer_size, size_t offset)
2605 {
2606   struct GNUNET_MessageHeader *mhp;
2607   struct GNUNET_MessageHeader mh;
2608   uint16_t msize;
2609   int need_align;
2610
2611   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2612     {
2613       if (0 != offset % sizeof (uint16_t))
2614         {
2615           /* outch, need to copy to access header */
2616           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2617           mhp = &mh;
2618         }
2619       else
2620         {
2621           /* can access header directly */
2622           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2623         }
2624       msize = ntohs (mhp->size);
2625       if (msize + offset > buffer_size)
2626         {
2627           /* malformed message, header says it is larger than what
2628              would fit into the overall buffer */
2629           GNUNET_break_op (0);
2630           break;
2631         }
2632 #if HAVE_UNALIGNED_64_ACCESS
2633       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2634 #else
2635       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2636 #endif
2637       if (GNUNET_YES == need_align)
2638         align_and_deliver (sender, &buffer[offset], msize);
2639       else
2640         deliver_message (sender,
2641                          (const struct GNUNET_MessageHeader *)
2642                          &buffer[offset], msize);
2643       offset += msize;
2644     }
2645 }
2646
2647
2648 /**
2649  * We received an encrypted message.  Decrypt, validate and
2650  * pass on to the appropriate clients.
2651  */
2652 static void
2653 handle_encrypted_message (struct Neighbour *n,
2654                           const struct EncryptedMessage *m)
2655 {
2656   size_t size = ntohs (m->header.size);
2657   char buf[size];
2658   struct EncryptedMessage *pt;  /* plaintext */
2659   GNUNET_HashCode ph;
2660   size_t off;
2661   uint32_t snum;
2662   struct GNUNET_TIME_Absolute t;
2663
2664 #if DEBUG_CORE
2665   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2666               "Core service receives `%s' request from `%4s'.\n",
2667               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2668 #endif
2669   /* decrypt */
2670   if (GNUNET_OK !=
2671       do_decrypt (n,
2672                   &m->plaintext_hash,
2673                   &m->sequence_number,
2674                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2675     return;
2676   pt = (struct EncryptedMessage *) buf;
2677
2678   /* validate hash */
2679   GNUNET_CRYPTO_hash (&pt->sequence_number,
2680                       size - ENCRYPTED_HEADER_SIZE, &ph);
2681   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2682     {
2683       /* checksum failed */
2684       GNUNET_break_op (0);
2685       return;
2686     }
2687
2688   /* validate sequence number */
2689   snum = ntohl (pt->sequence_number);
2690   if (n->last_sequence_number_received == snum)
2691     {
2692       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2693                   "Received duplicate message, ignoring.\n");
2694       /* duplicate, ignore */
2695       return;
2696     }
2697   if ((n->last_sequence_number_received > snum) &&
2698       (n->last_sequence_number_received - snum > 32))
2699     {
2700       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2701                   "Received ancient out of sequence message, ignoring.\n");
2702       /* ancient out of sequence, ignore */
2703       return;
2704     }
2705   if (n->last_sequence_number_received > snum)
2706     {
2707       unsigned int rotbit =
2708         1 << (n->last_sequence_number_received - snum - 1);
2709       if ((n->last_packets_bitmap & rotbit) != 0)
2710         {
2711           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2712                       "Received duplicate message, ignoring.\n");
2713           /* duplicate, ignore */
2714           return;
2715         }
2716       n->last_packets_bitmap |= rotbit;
2717     }
2718   if (n->last_sequence_number_received < snum)
2719     {
2720       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2721       n->last_sequence_number_received = snum;
2722     }
2723
2724   /* check timestamp */
2725   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2726   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2727     {
2728       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2729                   _
2730                   ("Message received far too old (%llu ms). Content ignored.\n"),
2731                   GNUNET_TIME_absolute_get_duration (t).value);
2732       return;
2733     }
2734
2735   /* process decrypted message(s) */
2736   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2737   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2738                            n->bpm_out_internal_limit);
2739   n->last_activity = GNUNET_TIME_absolute_get ();
2740   off = sizeof (struct EncryptedMessage);
2741   deliver_messages (n, buf, size, off);
2742 }
2743
2744
2745 /**
2746  * Function called by the transport for each received message.
2747  *
2748  * @param cls closure
2749  * @param latency estimated latency for communicating with the
2750  *             given peer
2751  * @param peer (claimed) identity of the other peer
2752  * @param message the message
2753  */
2754 static void
2755 handle_transport_receive (void *cls,
2756                           struct GNUNET_TIME_Relative latency,
2757                           const struct GNUNET_PeerIdentity *peer,
2758                           const struct GNUNET_MessageHeader *message)
2759 {
2760   struct Neighbour *n;
2761   struct GNUNET_TIME_Absolute now;
2762   int up;
2763   uint16_t type;
2764   uint16_t size;
2765
2766 #if DEBUG_CORE
2767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2768               "Received message of type %u from `%4s', demultiplexing.\n",
2769               ntohs (message->type), GNUNET_i2s (peer));
2770 #endif
2771   n = find_neighbour (peer);
2772   if (n == NULL)
2773     {
2774       GNUNET_break (0);
2775       return;
2776     }
2777   n->last_latency = latency;
2778   up = n->status == PEER_STATE_KEY_CONFIRMED;
2779   type = ntohs (message->type);
2780   size = ntohs (message->size);
2781   switch (type)
2782     {
2783     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2784       if (size != sizeof (struct SetKeyMessage))
2785         {
2786           GNUNET_break_op (0);
2787           return;
2788         }
2789       handle_set_key (n, (const struct SetKeyMessage *) message);
2790       break;
2791     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2792       if (size < sizeof (struct EncryptedMessage) +
2793           sizeof (struct GNUNET_MessageHeader))
2794         {
2795           GNUNET_break_op (0);
2796           return;
2797         }
2798       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2799           (n->status != PEER_STATE_KEY_CONFIRMED))
2800         {
2801           GNUNET_break_op (0);
2802           return;
2803         }
2804       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2805       break;
2806     case GNUNET_MESSAGE_TYPE_CORE_PING:
2807       if (size != sizeof (struct PingMessage))
2808         {
2809           GNUNET_break_op (0);
2810           return;
2811         }
2812       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2813           (n->status != PEER_STATE_KEY_CONFIRMED))
2814         {
2815 #if DEBUG_CORE
2816           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2817                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2818                       "PING", GNUNET_i2s (&n->peer));
2819 #endif
2820           GNUNET_free_non_null (n->pending_ping);
2821           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2822           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2823           return;
2824         }
2825       handle_ping (n, (const struct PingMessage *) message);
2826       break;
2827     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2828       if (size != sizeof (struct PingMessage))
2829         {
2830           GNUNET_break_op (0);
2831           return;
2832         }
2833       if ((n->status != PEER_STATE_KEY_SENT) &&
2834           (n->status != PEER_STATE_KEY_RECEIVED) &&
2835           (n->status != PEER_STATE_KEY_CONFIRMED))
2836         {
2837           /* could not decrypt pong, oops! */
2838           GNUNET_break_op (0);
2839           return;
2840         }
2841       handle_pong (n, (const struct PingMessage *) message);
2842       break;
2843     default:
2844       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2845                   _("Unsupported message of type %u received.\n"), type);
2846       return;
2847     }
2848   if (n->status == PEER_STATE_KEY_CONFIRMED)
2849     {
2850       now = GNUNET_TIME_absolute_get ();
2851       n->last_activity = now;
2852       if (!up)
2853         n->time_established = now;
2854     }
2855 }
2856
2857
2858 /**
2859  * Function that recalculates the bandwidth quota for the
2860  * given neighbour and transmits it to the transport service.
2861  * 
2862  * @param cls neighbour for the quota update
2863  * @param tc context
2864  */
2865 static void
2866 neighbour_quota_update (void *cls,
2867                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2868
2869
2870 /**
2871  * Schedule the task that will recalculate the bandwidth
2872  * quota for this peer (and possibly force a disconnect of
2873  * idle peers by calculating a bandwidth of zero).
2874  */
2875 static void
2876 schedule_quota_update (struct Neighbour *n)
2877 {
2878   GNUNET_assert (n->quota_update_task ==
2879                  GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
2880   n->quota_update_task
2881     = GNUNET_SCHEDULER_add_delayed (sched,
2882                                     GNUNET_NO,
2883                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2884                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2885                                     QUOTA_UPDATE_FREQUENCY,
2886                                     &neighbour_quota_update,
2887                                     n);
2888 }
2889
2890
2891 /**
2892  * Function that recalculates the bandwidth quota for the
2893  * given neighbour and transmits it to the transport service.
2894  * 
2895  * @param cls neighbour for the quota update
2896  * @param tc context
2897  */
2898 static void
2899 neighbour_quota_update (void *cls,
2900                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2901 {
2902   struct Neighbour *n = cls;
2903   uint32_t q_in;
2904   double pref_rel;
2905   double share;
2906   unsigned long long distributable;
2907   
2908   n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2909   /* calculate relative preference among all neighbours;
2910      divides by a bit more to avoid division by zero AND to
2911      account for possibility of new neighbours joining any time 
2912      AND to convert to double... */
2913   pref_rel = n->current_preference / (1.0 + preference_sum);
2914   share = 0;
2915   distributable = 0;
2916   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2917     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2918   share = distributable * pref_rel;
2919   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2920   /* check if we want to disconnect for good due to inactivity */
2921   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2922        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2923     q_in = 0; /* force disconnect */
2924   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2925        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2926     {
2927       n->bpm_in = q_in;
2928       GNUNET_TRANSPORT_set_quota (transport,
2929                                   &n->peer,
2930                                   n->bpm_in, 
2931                                   n->bpm_out,
2932                                   GNUNET_TIME_UNIT_FOREVER_REL,
2933                                   NULL, NULL);
2934     }
2935   schedule_quota_update (n);
2936 }
2937
2938
2939 /**
2940  * Function called by transport to notify us that
2941  * a peer connected to us (on the network level).
2942  *
2943  * @param cls closure
2944  * @param peer the peer that connected
2945  * @param latency current latency of the connection
2946  */
2947 static void
2948 handle_transport_notify_connect (void *cls,
2949                                  const struct GNUNET_PeerIdentity *peer,
2950                                  struct GNUNET_TIME_Relative latency)
2951 {
2952   struct Neighbour *n;
2953   struct GNUNET_TIME_Absolute now;
2954   struct ConnectNotifyMessage cnm;
2955
2956   n = find_neighbour (peer);
2957   if (n != NULL)
2958     {
2959       /* duplicate connect notification!? */
2960       GNUNET_break (0);
2961       return;
2962     }
2963   now = GNUNET_TIME_absolute_get ();
2964   n = GNUNET_malloc (sizeof (struct Neighbour));
2965   n->next = neighbours;
2966   neighbours = n;
2967   neighbour_count++;
2968   n->peer = *peer;
2969   n->last_latency = latency;
2970   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2971   n->encrypt_key_created = now;
2972   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2973   n->last_asw_update = now;
2974   n->last_arw_update = now;
2975   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2976   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2977   n->bpm_out_internal_limit = (uint32_t) - 1;
2978   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2979   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2980                                                 (uint32_t) - 1);
2981 #if DEBUG_CORE
2982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2983               "Received connection from `%4s'.\n",
2984               GNUNET_i2s (&n->peer));
2985 #endif
2986   schedule_quota_update (n);
2987   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2988   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2989   cnm.bpm_available = htonl (n->bpm_out);
2990   cnm.peer = *peer;
2991   cnm.last_activity = GNUNET_TIME_absolute_hton (now);
2992   send_to_all_clients (&cnm.header, GNUNET_YES);
2993 }
2994
2995
2996 /**
2997  * Free the given entry for the neighbour (it has
2998  * already been removed from the list at this point).
2999  *
3000  * @param n neighbour to free
3001  */
3002 static void
3003 free_neighbour (struct Neighbour *n)
3004 {
3005   struct MessageEntry *m;
3006
3007   while (NULL != (m = n->messages))
3008     {
3009       n->messages = m->next;
3010       GNUNET_free (m);
3011     }
3012   while (NULL != (m = n->encrypted_head))
3013     {
3014       n->encrypted_head = m->next;
3015       GNUNET_free (m);
3016     }
3017   if (NULL != n->th)
3018     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3019   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3020     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3021   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3022     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3023   if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3024     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3025   GNUNET_free_non_null (n->public_key);
3026   GNUNET_free_non_null (n->pending_ping);
3027   GNUNET_free (n);
3028 }
3029
3030
3031 /**
3032  * Function called by transport telling us that a peer
3033  * disconnected.
3034  *
3035  * @param cls closure
3036  * @param peer the peer that disconnected
3037  */
3038 static void
3039 handle_transport_notify_disconnect (void *cls,
3040                                     const struct GNUNET_PeerIdentity *peer)
3041 {
3042   struct ConnectNotifyMessage cnm;
3043   struct Neighbour *n;
3044   struct Neighbour *p;
3045
3046 #if DEBUG_CORE
3047   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3048               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3049 #endif
3050   p = NULL;
3051   n = neighbours;
3052   while ((n != NULL) &&
3053          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3054     {
3055       p = n;
3056       n = n->next;
3057     }
3058   if (n == NULL)
3059     {
3060       GNUNET_break (0);
3061       return;
3062     }
3063   if (p == NULL)
3064     neighbours = n->next;
3065   else
3066     p->next = n->next;
3067   GNUNET_assert (neighbour_count > 0);
3068   neighbour_count--;
3069   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3070   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3071   cnm.bpm_available = htonl (0);
3072   cnm.peer = *peer;
3073   cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
3074   send_to_all_clients (&cnm.header, GNUNET_YES);
3075   free_neighbour (n);
3076 }
3077
3078
3079 /**
3080  * Last task run during shutdown.  Disconnects us from
3081  * the transport.
3082  */
3083 static void
3084 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3085 {
3086   struct Neighbour *n;
3087   struct Client *c;
3088
3089 #if DEBUG_CORE
3090   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3091               "Core service shutting down.\n");
3092 #endif
3093   GNUNET_assert (transport != NULL);
3094   GNUNET_TRANSPORT_disconnect (transport);
3095   transport = NULL;
3096   while (NULL != (n = neighbours))
3097     {
3098       neighbours = n->next;
3099       GNUNET_assert (neighbour_count > 0);
3100       neighbour_count--;
3101       free_neighbour (n);
3102     }
3103   while (NULL != (c = clients))
3104     handle_client_disconnect (NULL, c->client_handle);
3105 }
3106
3107
3108 /**
3109  * Initiate core service.
3110  *
3111  * @param cls closure
3112  * @param s scheduler to use
3113  * @param serv the initialized server
3114  * @param c configuration to use
3115  */
3116 static void
3117 run (void *cls,
3118      struct GNUNET_SCHEDULER_Handle *s,
3119      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
3120 {
3121 #if 0
3122   unsigned long long qin;
3123   unsigned long long qout;
3124   unsigned long long tneigh;
3125 #endif
3126   char *keyfile;
3127
3128   sched = s;
3129   cfg = c;
3130   /* parse configuration */
3131   if (
3132        (GNUNET_OK !=
3133         GNUNET_CONFIGURATION_get_value_number (c,
3134                                                "CORE",
3135                                                "TOTAL_QUOTA_IN",
3136                                                &bandwidth_target_in)) ||
3137        (GNUNET_OK !=
3138         GNUNET_CONFIGURATION_get_value_number (c,
3139                                                "CORE",
3140                                                "TOTAL_QUOTA_OUT",
3141                                                &bandwidth_target_out)) ||
3142 #if 0
3143        (GNUNET_OK !=
3144         GNUNET_CONFIGURATION_get_value_number (c,
3145                                                "CORE",
3146                                                "YY",
3147                                                &qout)) ||
3148        (GNUNET_OK !=
3149         GNUNET_CONFIGURATION_get_value_number (c,
3150                                                "CORE",
3151                                                "ZZ_LIMIT", &tneigh)) ||
3152 #endif
3153        (GNUNET_OK !=
3154         GNUNET_CONFIGURATION_get_value_filename (c,
3155                                                  "GNUNETD",
3156                                                  "HOSTKEY", &keyfile)))
3157     {
3158       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3159                   _
3160                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3161       GNUNET_SCHEDULER_shutdown (s);
3162       return;
3163     }
3164   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3165   GNUNET_free (keyfile);
3166   if (my_private_key == NULL)
3167     {
3168       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3169                   _("Core service could not access hostkey.  Exiting.\n"));
3170       GNUNET_SCHEDULER_shutdown (s);
3171       return;
3172     }
3173   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3174   GNUNET_CRYPTO_hash (&my_public_key,
3175                       sizeof (my_public_key), &my_identity.hashPubKey);
3176   /* setup notification */
3177   server = serv;
3178   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3179   /* setup transport connection */
3180   transport = GNUNET_TRANSPORT_connect (sched,
3181                                         cfg,
3182                                         NULL,
3183                                         &handle_transport_receive,
3184                                         &handle_transport_notify_connect,
3185                                         &handle_transport_notify_disconnect);
3186   GNUNET_assert (NULL != transport);
3187   GNUNET_SCHEDULER_add_delayed (sched,
3188                                 GNUNET_YES,
3189                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3190                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
3191                                 GNUNET_TIME_UNIT_FOREVER_REL,
3192                                 &cleaning_task, NULL);
3193   /* process client requests */
3194   GNUNET_SERVER_add_handlers (server, handlers);
3195   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3196               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3197 }
3198
3199
3200 /**
3201  * Function called during shutdown.  Clean up our state.
3202  */
3203 static void
3204 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
3205 {
3206   if (my_private_key != NULL)
3207     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3208 }
3209
3210
3211 /**
3212  * The main function for the transport service.
3213  *
3214  * @param argc number of arguments from the command line
3215  * @param argv command line arguments
3216  * @return 0 ok, 1 on error
3217  */
3218 int
3219 main (int argc, char *const *argv)
3220 {
3221   return (GNUNET_OK ==
3222           GNUNET_SERVICE_run (argc,
3223                               argv,
3224                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3225 }
3226
3227 /* end of gnunet-service-core.c */