API improvements
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * POST-TESTING:
27  * - topology management:
28  *   + bootstrapping (transport offer hello, plugins)
29  *   + internal neighbour selection
30  *   + update (and use!) bandwidth usage statistics
31  *
32  * Considerations for later:
33  * - check that hostkey used by transport (for HELLOs) is the
34  *   same as the hostkey that we are using!
35  * - add code to send PINGs if we are about to time-out otherwise
36  * - optimize lookup (many O(n) list traversals
37  *   could ideally be changed to O(1) hash map lookups)
38  */
39 #include "platform.h"
40 #include "gnunet_constants.h"
41 #include "gnunet_util_lib.h"
42 #include "gnunet_hello_lib.h"
43 #include "gnunet_peerinfo_service.h"
44 #include "gnunet_protocols.h"
45 #include "gnunet_signatures.h"
46 #include "gnunet_transport_service.h"
47 #include "core.h"
48
49
50 /**
51  * Receive and send buffer windows grow over time.  For
52  * how long can 'unused' bandwidth accumulate before we
53  * need to cap it?  (specified in ms).
54  */
55 #define MAX_WINDOW_TIME (5 * 60 * 1000)
56
57 /**
58  * Minimum of bytes per minute (out) to assign to any connected peer.
59  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
60  * sense.
61  */
62 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
63
64 /**
65  * What is the smallest change (in number of bytes per minute)
66  * that we consider significant enough to bother triggering?
67  */
68 #define MIN_BPM_CHANGE 32
69
70 /**
71  * After how much time past the "official" expiration time do
72  * we discard messages?  Should not be zero since we may 
73  * intentionally defer transmission until close to the deadline
74  * and then may be slightly past the deadline due to inaccuracy
75  * in sleep and our own CPU consumption.
76  */
77 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
78
79 /**
80  * What is the maximum delay for a SET_KEY message?
81  */
82 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
83
84 /**
85  * What how long do we wait for SET_KEY confirmation initially?
86  */
87 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
88
89 /**
90  * What is the maximum delay for a PING message?
91  */
92 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
93
94 /**
95  * What is the maximum delay for a PONG message?
96  */
97 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
98
99 /**
100  * How often do we recalculate bandwidth quotas?
101  */
102 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
103
104 /**
105  * What is the priority for a SET_KEY message?
106  */
107 #define SET_KEY_PRIORITY 0xFFFFFF
108
109 /**
110  * What is the priority for a PING message?
111  */
112 #define PING_PRIORITY 0xFFFFFF
113
114 /**
115  * What is the priority for a PONG message?
116  */
117 #define PONG_PRIORITY 0xFFFFFF
118
119 /**
120  * How many messages do we queue per peer at most?
121  */
122 #define MAX_PEER_QUEUE_SIZE 16
123
124 /**
125  * How many non-mandatory messages do we queue per client at most?
126  */
127 #define MAX_CLIENT_QUEUE_SIZE 32
128
129 /**
130  * What is the maximum age of a message for us to consider
131  * processing it?  Note that this looks at the timestamp used
132  * by the other peer, so clock skew between machines does
133  * come into play here.  So this should be picked high enough
134  * so that a little bit of clock skew does not prevent peers
135  * from connecting to us.
136  */
137 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
138
139 /**
140  * What is the maximum size for encrypted messages?  Note that this
141  * number imposes a clear limit on the maximum size of any message.
142  * Set to a value close to 64k but not so close that transports will
143  * have trouble with their headers.
144  */
145 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
146
147
148 /**
149  * State machine for our P2P encryption handshake.  Everyone starts in
150  * "DOWN", if we receive the other peer's key (other peer initiated)
151  * we start in state RECEIVED (since we will immediately send our
152  * own); otherwise we start in SENT.  If we get back a PONG from
153  * within either state, we move up to CONFIRMED (the PONG will always
154  * be sent back encrypted with the key we sent to the other peer).
155  */
156 enum PeerStateMachine
157 {
158   PEER_STATE_DOWN,
159   PEER_STATE_KEY_SENT,
160   PEER_STATE_KEY_RECEIVED,
161   PEER_STATE_KEY_CONFIRMED
162 };
163
164
165 /**
166  * Number of bytes (at the beginning) of "struct EncryptedMessage"
167  * that are NOT encrypted.
168  */
169 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
170
171
172 /**
173  * Encapsulation for encrypted messages exchanged between
174  * peers.  Followed by the actual encrypted data.
175  */
176 struct EncryptedMessage
177 {
178   /**
179    * Message type is either CORE_ENCRYPTED_MESSAGE.
180    */
181   struct GNUNET_MessageHeader header;
182
183   /**
184    * Always zero.
185    */
186   uint32_t reserved GNUNET_PACKED;
187
188   /**
189    * Hash of the plaintext, used to verify message integrity;
190    * ALSO used as the IV for the symmetric cipher!  Everything
191    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
192    * must be set to the offset of the next field.
193    */
194   GNUNET_HashCode plaintext_hash;
195
196   /**
197    * Sequence number, in network byte order.  This field
198    * must be the first encrypted/decrypted field and the
199    * first byte that is hashed for the plaintext hash.
200    */
201   uint32_t sequence_number GNUNET_PACKED;
202
203   /**
204    * Desired bandwidth (how much we should send to this
205    * peer / how much is the sender willing to receive),
206    * in bytes per minute.
207    */
208   uint32_t inbound_bpm_limit GNUNET_PACKED;
209
210   /**
211    * Timestamp.  Used to prevent reply of ancient messages
212    * (recent messages are caught with the sequence number).
213    */
214   struct GNUNET_TIME_AbsoluteNBO timestamp;
215
216 };
217
218 /**
219  * We're sending an (encrypted) PING to the other peer to check if he
220  * can decrypt.  The other peer should respond with a PONG with the
221  * same content, except this time encrypted with the receiver's key.
222  */
223 struct PingMessage
224 {
225   /**
226    * Message type is either CORE_PING or CORE_PONG.
227    */
228   struct GNUNET_MessageHeader header;
229
230   /**
231    * Random number chosen to make reply harder.
232    */
233   uint32_t challenge GNUNET_PACKED;
234
235   /**
236    * Intended target of the PING, used primarily to check
237    * that decryption actually worked.
238    */
239   struct GNUNET_PeerIdentity target;
240 };
241
242
243 /**
244  * Message transmitted to set (or update) a session key.
245  */
246 struct SetKeyMessage
247 {
248
249   /**
250    * Message type is either CORE_SET_KEY.
251    */
252   struct GNUNET_MessageHeader header;
253
254   /**
255    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
256    */
257   int32_t sender_status GNUNET_PACKED;
258
259   /**
260    * Purpose of the signature, will be
261    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
262    */
263   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
264
265   /**
266    * At what time was this key created?
267    */
268   struct GNUNET_TIME_AbsoluteNBO creation_time;
269
270   /**
271    * The encrypted session key.
272    */
273   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
274
275   /**
276    * Who is the intended recipient?
277    */
278   struct GNUNET_PeerIdentity target;
279
280   /**
281    * Signature of the stuff above (starting at purpose).
282    */
283   struct GNUNET_CRYPTO_RsaSignature signature;
284
285 };
286
287
288 /**
289  * Message waiting for transmission. This struct
290  * is followed by the actual content of the message.
291  */
292 struct MessageEntry
293 {
294
295   /**
296    * We keep messages in a linked list (for now).
297    */
298   struct MessageEntry *next;
299
300   /**
301    * By when are we supposed to transmit this message?
302    */
303   struct GNUNET_TIME_Absolute deadline;
304
305   /**
306    * How important is this message to us?
307    */
308   unsigned int priority;
309
310   /**
311    * How long is the message? (number of bytes following
312    * the "struct MessageEntry", but not including the
313    * size of "struct MessageEntry" itself!)
314    */
315   uint16_t size;
316
317   /**
318    * Was this message selected for transmission in the
319    * current round? GNUNET_YES or GNUNET_NO.
320    */
321   int16_t do_transmit;
322
323 };
324
325
326 struct Neighbour
327 {
328   /**
329    * We keep neighbours in a linked list (for now).
330    */
331   struct Neighbour *next;
332
333   /**
334    * Unencrypted messages destined for this peer.
335    */
336   struct MessageEntry *messages;
337
338   /**
339    * Head of the batched, encrypted message queue (already ordered,
340    * transmit starting with the head).
341    */
342   struct MessageEntry *encrypted_head;
343
344   /**
345    * Tail of the batched, encrypted message queue (already ordered,
346    * append new messages to tail)
347    */
348   struct MessageEntry *encrypted_tail;
349
350   /**
351    * Handle for pending requests for transmission to this peer
352    * with the transport service.  NULL if no request is pending.
353    */
354   struct GNUNET_TRANSPORT_TransmitHandle *th;
355
356   /**
357    * Public key of the neighbour, NULL if we don't have it yet.
358    */
359   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
360
361   /**
362    * We received a PING message before we got the "public_key"
363    * (or the SET_KEY).  We keep it here until we have a key
364    * to decrypt it.  NULL if no PING is pending.
365    */
366   struct PingMessage *pending_ping;
367
368   /**
369    * Identity of the neighbour.
370    */
371   struct GNUNET_PeerIdentity peer;
372
373   /**
374    * Key we use to encrypt our messages for the other peer
375    * (initialized by us when we do the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
378
379   /**
380    * Key we use to decrypt messages from the other peer
381    * (given to us by the other peer during the handshake).
382    */
383   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
384
385   /**
386    * ID of task used for re-trying plaintext scheduling.
387    */
388   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
389
390   /**
391    * ID of task used for re-trying SET_KEY and PING message.
392    */
393   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
394
395   /**
396    * ID of task used for updating bandwidth quota for this neighbour.
397    */
398   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
399
400   /**
401    * At what time did we generate our encryption key?
402    */
403   struct GNUNET_TIME_Absolute encrypt_key_created;
404
405   /**
406    * At what time did the other peer generate the decryption key?
407    */
408   struct GNUNET_TIME_Absolute decrypt_key_created;
409
410   /**
411    * At what time did we initially establish (as in, complete session
412    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute time_established;
415
416   /**
417    * At what time did we last receive an encrypted message from the
418    * other peer?  Should be zero if status != KEY_CONFIRMED.
419    */
420   struct GNUNET_TIME_Absolute last_activity;
421
422   /**
423    * Last latency observed from this peer.
424    */
425   struct GNUNET_TIME_Relative last_latency;
426
427   /**
428    * At what frequency are we currently re-trying SET_KEY messages?
429    */
430   struct GNUNET_TIME_Relative set_key_retry_frequency;
431
432   /**
433    * Time of our last update to the "available_send_window".
434    */
435   struct GNUNET_TIME_Absolute last_asw_update;
436
437   /**
438    * Time of our last update to the "available_recv_window".
439    */
440   struct GNUNET_TIME_Absolute last_arw_update;
441
442   /**
443    * Number of bytes that we are eligible to transmit to this
444    * peer at this point.  Incremented every minute by max_out_bpm,
445    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
446    * bandwidth-hogs are sampled at a frequency of about 78s!);
447    * may get negative if we have VERY high priority content.
448    */
449   long long available_send_window; // USE!
450
451   /**
452    * How much downstream capacity of this peer has been reserved for
453    * our traffic?  (Our clients can request that a certain amount of
454    * bandwidth is available for replies to them; this value is used to
455    * make sure that this reserved amount of bandwidth is actually
456    * available).
457    */
458   long long available_recv_window; // USE!
459
460   /**
461    * How valueable were the messages of this peer recently?
462    */
463   unsigned long long current_preference;
464
465   /**
466    * Bit map indicating which of the 32 sequence numbers before the last
467    * were received (good for accepting out-of-order packets and
468    * estimating reliability of the connection)
469    */
470   unsigned int last_packets_bitmap;
471
472   /**
473    * Number of messages in the message queue for this peer.
474    */
475   unsigned int message_queue_size;
476
477   /**
478    * last sequence number received on this connection (highest)
479    */
480   uint32_t last_sequence_number_received;
481
482   /**
483    * last sequence number transmitted
484    */
485   uint32_t last_sequence_number_sent;
486
487   /**
488    * Available bandwidth in for this peer (current target).
489    */
490   uint32_t bpm_in;
491
492   /**
493    * Available bandwidth out for this peer (current target).
494    */
495   uint32_t bpm_out;
496
497   /**
498    * Internal bandwidth limit set for this peer (initially
499    * typically set to "-1").  "bpm_out" is MAX of
500    * "bpm_out_internal_limit" and "bpm_out_external_limit".
501    */
502   uint32_t bpm_out_internal_limit;
503
504   /**
505    * External bandwidth limit set for this peer by the
506    * peer that we are communicating with.  "bpm_out" is MAX of
507    * "bpm_out_internal_limit" and "bpm_out_external_limit".
508    */
509   uint32_t bpm_out_external_limit;
510
511   /**
512    * What was our PING challenge number (for this peer)?
513    */
514   uint32_t ping_challenge;
515
516   /**
517    * What is our connection status?
518    */
519   enum PeerStateMachine status;
520
521 };
522
523
524 /**
525  * Events are messages for clients.  The struct
526  * itself is followed by the actual message.
527  */
528 struct Event
529 {
530   /**
531    * This is a linked list.
532    */
533   struct Event *next;
534
535   /**
536    * Size of the message.
537    */
538   size_t size;
539
540   /**
541    * Could this event be dropped if this queue
542    * is getting too large? (NOT YET USED!)
543    */
544   int can_drop;
545
546 };
547
548
549 /**
550  * Data structure for each client connected to the core service.
551  */
552 struct Client
553 {
554   /**
555    * Clients are kept in a linked list.
556    */
557   struct Client *next;
558
559   /**
560    * Handle for the client with the server API.
561    */
562   struct GNUNET_SERVER_Client *client_handle;
563
564   /**
565    * Linked list of messages we still need to deliver to
566    * this client.
567    */
568   struct Event *event_head;
569
570   /**
571    * Tail of the linked list of events.
572    */
573   struct Event *event_tail;
574
575   /**
576    * Current transmit handle, NULL if no transmission request
577    * is pending.
578    */
579   struct GNUNET_NETWORK_TransmitHandle *th;
580
581   /**
582    * Array of the types of messages this peer cares
583    * about (with "tcnt" entries).  Allocated as part
584    * of this client struct, do not free!
585    */
586   uint16_t *types;
587
588   /**
589    * Options for messages this client cares about,
590    * see GNUNET_CORE_OPTION_ values.
591    */
592   uint32_t options;
593
594   /**
595    * Number of types of incoming messages this client
596    * specifically cares about.  Size of the "types" array.
597    */
598   unsigned int tcnt;
599
600 };
601
602
603 /**
604  * Our public key.
605  */
606 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
607
608 /**
609  * Our identity.
610  */
611 static struct GNUNET_PeerIdentity my_identity;
612
613 /**
614  * Our private key.
615  */
616 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
617
618 /**
619  * Our scheduler.
620  */
621 struct GNUNET_SCHEDULER_Handle *sched;
622
623 /**
624  * Our configuration.
625  */
626 struct GNUNET_CONFIGURATION_Handle *cfg;
627
628 /**
629  * Our server.
630  */
631 static struct GNUNET_SERVER_Handle *server;
632
633 /**
634  * Transport service.
635  */
636 static struct GNUNET_TRANSPORT_Handle *transport;
637
638 /**
639  * Linked list of our clients.
640  */
641 static struct Client *clients;
642
643 /**
644  * We keep neighbours in a linked list (for now).
645  */
646 static struct Neighbour *neighbours;
647
648 /**
649  * Sum of all preferences among all neighbours.
650  */
651 static unsigned long long preference_sum;
652
653 /**
654  * Total number of neighbours we have.
655  */
656 static unsigned int neighbour_count;
657
658 /**
659  * How much inbound bandwidth are we supposed to be using?
660  */
661 static unsigned long long bandwidth_target_in;
662
663 /**
664  * How much outbound bandwidth are we supposed to be using?
665  */
666 static unsigned long long bandwidth_target_out;
667
668
669
670 /**
671  * A preference value for a neighbour was update.  Update
672  * the preference sum accordingly.
673  *
674  * @param inc how much was a preference value increased?
675  */
676 static void
677 update_preference_sum (unsigned long long inc)
678 {
679   struct Neighbour *n;
680   unsigned long long os;
681
682   os = preference_sum;
683   preference_sum += inc;
684   if (preference_sum >= os)
685     return; /* done! */
686   /* overflow! compensate by cutting all values in half! */
687   preference_sum = 0;
688   n = neighbours;
689   while (n != NULL)
690     {
691       n->current_preference /= 2;
692       preference_sum += n->current_preference;
693       n = n->next;
694     }    
695 }
696
697
698 /**
699  * Recalculate the number of bytes we expect to
700  * receive or transmit in a given window.
701  *
702  * @param window pointer to the byte counter (updated)
703  * @param ts pointer to the timestamp (updated)
704  * @param bpm number of bytes per minute that should
705  *        be added to the window.
706  */
707 static void
708 update_window (long long *window,
709                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
710 {
711   struct GNUNET_TIME_Relative since;
712
713   since = GNUNET_TIME_absolute_get_duration (*ts);
714   if (since.value < 60 * 1000)
715     return;                     /* not even a minute has passed */
716   *ts = GNUNET_TIME_absolute_get ();
717   *window += (bpm * since.value) / 60 / 1000;
718   if (*window > MAX_WINDOW_TIME * bpm)
719     *window = MAX_WINDOW_TIME * bpm;
720 }
721
722
723 /**
724  * Find the entry for the given neighbour.
725  *
726  * @param peer identity of the neighbour
727  * @return NULL if we are not connected, otherwise the
728  *         neighbour's entry.
729  */
730 static struct Neighbour *
731 find_neighbour (const struct GNUNET_PeerIdentity *peer)
732 {
733   struct Neighbour *ret;
734
735   ret = neighbours;
736   while ((ret != NULL) &&
737          (0 != memcmp (&ret->peer,
738                        peer, sizeof (struct GNUNET_PeerIdentity))))
739     ret = ret->next;
740   return ret;
741 }
742
743
744 /**
745  * Find the entry for the given client.
746  *
747  * @param client handle for the client
748  * @return NULL if we are not connected, otherwise the
749  *         client's struct.
750  */
751 static struct Client *
752 find_client (const struct GNUNET_SERVER_Client *client)
753 {
754   struct Client *ret;
755
756   ret = clients;
757   while ((ret != NULL) && (client != ret->client_handle))
758     ret = ret->next;
759   return ret;
760 }
761
762
763 /**
764  * If necessary, initiate a request with the server to
765  * transmit messages from the queue of the given client.
766  * @param client who to transfer messages to
767  */
768 static void request_transmit (struct Client *client);
769
770
771 /**
772  * Client is ready to receive data, provide it.
773  *
774  * @param cls closure
775  * @param size number of bytes available in buf
776  * @param buf where the callee should write the message
777  * @return number of bytes written to buf
778  */
779 static size_t
780 do_client_transmit (void *cls, size_t size, void *buf)
781 {
782   struct Client *client = cls;
783   struct Event *e;
784   char *tgt;
785   size_t ret;
786
787   client->th = NULL;
788 #if DEBUG_CORE_CLIENT
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Client ready to receive %u bytes.\n", size);
791 #endif
792   if (buf == NULL)
793     {
794 #if DEBUG_CORE
795       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
796                   "Failed to transmit data to client (disconnect)?\n");
797 #endif
798       return 0;                 /* we'll surely get a disconnect soon... */
799     }
800   tgt = buf;
801   ret = 0;
802   while ((NULL != (e = client->event_head)) && (e->size <= size))
803     {
804       memcpy (&tgt[ret], &e[1], e->size);
805       size -= e->size;
806       ret += e->size;
807       client->event_head = e->next;
808       GNUNET_free (e);
809     }
810   GNUNET_assert (ret > 0);
811   if (client->event_head == NULL)
812     client->event_tail = NULL;
813 #if DEBUG_CORE_CLIENT
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "Transmitting %u bytes to client\n", ret);
816 #endif
817   request_transmit (client);
818   return ret;
819 }
820
821
822 /**
823  * If necessary, initiate a request with the server to
824  * transmit messages from the queue of the given client.
825  * @param client who to transfer messages to
826  */
827 static void
828 request_transmit (struct Client *client)
829 {
830
831   if (NULL != client->th)
832     return;                     /* already pending */
833   if (NULL == client->event_head)
834     return;                     /* no more events pending */
835 #if DEBUG_CORE_CLIENT
836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
837               "Asking server to transmit %u bytes to client\n",
838               client->event_head->size);
839 #endif
840   client->th
841     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
842                                            client->event_head->size,
843                                            GNUNET_TIME_UNIT_FOREVER_REL,
844                                            &do_client_transmit, client);
845 }
846
847
848 /**
849  * Send a message to one of our clients.
850  * @param client target for the message
851  * @param msg message to transmit
852  * @param can_drop could this message be dropped if the
853  *        client's queue is getting too large?
854  */
855 static void
856 send_to_client (struct Client *client,
857                 const struct GNUNET_MessageHeader *msg, int can_drop)
858 {
859   struct Event *e;
860   unsigned int queue_size;
861   uint16_t msize;
862
863 #if DEBUG_CORE_CLIENT
864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865               "Preparing to send message of type %u to client.\n",
866               ntohs (msg->type));
867 #endif
868   queue_size = 0;
869   e = client->event_head;
870   while (e != NULL)
871     {
872       queue_size++;
873       e = e->next;
874     }
875   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
876        (can_drop == GNUNET_YES) )
877     {
878 #if DEBUG_CORE
879       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
880                   "Too many messages in queue for the client, dropping the new message.\n");
881 #endif
882       return;
883     }
884
885   msize = ntohs (msg->size);
886   e = GNUNET_malloc (sizeof (struct Event) + msize);
887   /* append */
888   if (client->event_tail != NULL)
889     client->event_tail->next = e;
890   else
891     client->event_head = e;
892   client->event_tail = e;
893   e->can_drop = can_drop;
894   e->size = msize;
895   memcpy (&e[1], msg, msize);
896   request_transmit (client);
897 }
898
899
900 /**
901  * Send a message to all of our current clients.
902  */
903 static void
904 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
905 {
906   struct Client *c;
907
908   c = clients;
909   while (c != NULL)
910     {
911       send_to_client (c, msg, can_drop);
912       c = c->next;
913     }
914 }
915
916
917 /**
918  * Handle CORE_INIT request.
919  */
920 static void
921 handle_client_init (void *cls,
922                     struct GNUNET_SERVER_Client *client,
923                     const struct GNUNET_MessageHeader *message)
924 {
925   const struct InitMessage *im;
926   struct InitReplyMessage irm;
927   struct Client *c;
928   uint16_t msize;
929   const uint16_t *types;
930   struct Neighbour *n;
931   struct ConnectNotifyMessage cnm;
932
933 #if DEBUG_CORE_CLIENT
934   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935               "Client connecting to core service with `%s' message\n",
936               "INIT");
937 #endif
938   /* check that we don't have an entry already */
939   c = clients;
940   while (c != NULL)
941     {
942       if (client == c->client_handle)
943         {
944           GNUNET_break (0);
945           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
946           return;
947         }
948       c = c->next;
949     }
950   msize = ntohs (message->size);
951   if (msize < sizeof (struct InitMessage))
952     {
953       GNUNET_break (0);
954       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
955       return;
956     }
957   im = (const struct InitMessage *) message;
958   types = (const uint16_t *) &im[1];
959   msize -= sizeof (struct InitMessage);
960   c = GNUNET_malloc (sizeof (struct Client) + msize);
961   c->client_handle = client;
962   c->next = clients;
963   clients = c;
964   memcpy (&c[1], types, msize);
965   c->types = (uint16_t *) & c[1];
966   c->options = ntohl (im->options);
967   c->tcnt = msize / sizeof (uint16_t);
968   /* send init reply message */
969   irm.header.size = htons (sizeof (struct InitReplyMessage));
970   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
971   irm.reserved = htonl (0);
972   memcpy (&irm.publicKey,
973           &my_public_key,
974           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
975 #if DEBUG_CORE_CLIENT
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "Sending `%s' message to client.\n", "INIT_REPLY");
978 #endif
979   send_to_client (c, &irm.header, GNUNET_NO);
980   /* notify new client about existing neighbours */
981   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
982   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
983   n = neighbours;
984   while (n != NULL)
985     {
986 #if DEBUG_CORE_CLIENT
987       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
989 #endif
990       cnm.bpm_available = htonl (n->bpm_out);
991       cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
992       cnm.peer = n->peer;
993       send_to_client (c, &cnm.header, GNUNET_NO);
994       n = n->next;
995     }
996   GNUNET_SERVER_receive_done (client, GNUNET_OK);
997 }
998
999
1000 /**
1001  * A client disconnected, clean up.
1002  *
1003  * @param cls closure
1004  * @param client identification of the client
1005  */
1006 static void
1007 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1008 {
1009   struct Client *pos;
1010   struct Client *prev;
1011   struct Event *e;
1012
1013 #if DEBUG_CORE_CLIENT
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015               "Client has disconnected from core service.\n");
1016 #endif
1017   prev = NULL;
1018   pos = clients;
1019   while (pos != NULL)
1020     {
1021       if (client == pos->client_handle)
1022         {
1023           if (prev == NULL)
1024             clients = pos->next;
1025           else
1026             prev->next = pos->next;
1027           if (pos->th != NULL)
1028             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
1029           while (NULL != (e = pos->event_head))
1030             {
1031               pos->event_head = e->next;
1032               GNUNET_free (e);
1033             }
1034           GNUNET_free (pos);
1035           return;
1036         }
1037       prev = pos;
1038       pos = pos->next;
1039     }
1040   /* client never sent INIT */
1041 }
1042
1043
1044 /**
1045  * Handle REQUEST_CONFIGURE request.
1046  */
1047 static void
1048 handle_client_request_configure (void *cls,
1049                                  struct GNUNET_SERVER_Client *client,
1050                                  const struct GNUNET_MessageHeader *message)
1051 {
1052   const struct RequestConfigureMessage *rcm;
1053   struct Neighbour *n;
1054   struct ConfigurationInfoMessage cim;
1055   struct Client *c;
1056   int reserv;
1057   unsigned long long old_preference;
1058
1059 #if DEBUG_CORE_CLIENT
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "Core service receives `%s' request.\n", "CONFIGURE");
1062 #endif
1063   rcm = (const struct RequestConfigureMessage *) message;
1064   n = find_neighbour (&rcm->peer);
1065   memset (&cim, 0, sizeof (cim));
1066   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1067     {
1068       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1069       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1070                                n->bpm_out_external_limit);
1071       reserv = ntohl (rcm->reserve_inbound);
1072       if (reserv < 0)
1073         {
1074           n->available_recv_window += reserv;
1075         }
1076       else if (reserv > 0)
1077         {
1078           update_window (&n->available_recv_window,
1079                          &n->last_arw_update, n->bpm_in);
1080           if (n->available_recv_window < reserv)
1081             reserv = n->available_recv_window;
1082           n->available_recv_window -= reserv;
1083         }
1084       old_preference = n->current_preference;
1085       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1086       if (old_preference > n->current_preference) 
1087         {
1088           /* overflow; cap at maximum value */
1089           n->current_preference = (unsigned long long) -1;
1090         }
1091       update_preference_sum (n->current_preference - old_preference);
1092       cim.reserved_amount = htonl (reserv);
1093       cim.bpm_in = htonl (n->bpm_in);
1094       cim.bpm_out = htonl (n->bpm_out);
1095       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1096       cim.preference = n->current_preference;
1097     }
1098   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1099   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1100   cim.peer = rcm->peer;
1101   c = find_client (client);
1102   if (c == NULL)
1103     {
1104       GNUNET_break (0);
1105       return;
1106     }
1107 #if DEBUG_CORE_CLIENT
1108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1110 #endif
1111   send_to_client (c, &cim.header, GNUNET_NO);
1112 }
1113
1114
1115 /**
1116  * Check if we have encrypted messages for the specified neighbour
1117  * pending, and if so, check with the transport about sending them
1118  * out.
1119  *
1120  * @param n neighbour to check.
1121  */
1122 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1123
1124
1125 /**
1126  * Function called when the transport service is ready to
1127  * receive an encrypted message for the respective peer
1128  *
1129  * @param cls neighbour to use message from
1130  * @param size number of bytes we can transmit
1131  * @param buf where to copy the message
1132  * @return number of bytes transmitted
1133  */
1134 static size_t
1135 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1136 {
1137   struct Neighbour *n = cls;
1138   struct MessageEntry *m;
1139   size_t ret;
1140   char *cbuf;
1141
1142   n->th = NULL;
1143   GNUNET_assert (NULL != (m = n->encrypted_head));
1144   n->encrypted_head = m->next;
1145   if (m->next == NULL)
1146     n->encrypted_tail = NULL;
1147   ret = 0;
1148   cbuf = buf;
1149   if (buf != NULL)
1150     {
1151       GNUNET_assert (size >= m->size);
1152       memcpy (cbuf, &m[1], m->size);
1153       ret = m->size;
1154       process_encrypted_neighbour_queue (n);
1155 #if DEBUG_CORE
1156       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1158                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1159                   ret, GNUNET_i2s (&n->peer));
1160 #endif
1161     }
1162   else
1163     {
1164       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1165                   "Transmission for message of type %u and size %u failed\n",
1166                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1167                   m->size);
1168     }
1169   GNUNET_free (m);
1170   return ret;
1171 }
1172
1173
1174 /**
1175  * Check if we have plaintext messages for the specified neighbour
1176  * pending, and if so, consider batching and encrypting them (and
1177  * then trigger processing of the encrypted queue if needed).
1178  *
1179  * @param n neighbour to check.
1180  */
1181 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1182
1183
1184 /**
1185  * Check if we have encrypted messages for the specified neighbour
1186  * pending, and if so, check with the transport about sending them
1187  * out.
1188  *
1189  * @param n neighbour to check.
1190  */
1191 static void
1192 process_encrypted_neighbour_queue (struct Neighbour *n)
1193 {
1194   struct MessageEntry *m;
1195  
1196   if (n->th != NULL)
1197     return;  /* request already pending */
1198   if (n->encrypted_head == NULL)
1199     {
1200       /* encrypted queue empty, try plaintext instead */
1201       process_plaintext_neighbour_queue (n);
1202       return;
1203     }
1204 #if DEBUG_CORE
1205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1207               n->encrypted_head->size,
1208               GNUNET_i2s (&n->peer),
1209               GNUNET_TIME_absolute_get_remaining (n->
1210                                                   encrypted_head->deadline).
1211               value);
1212 #endif
1213   n->th =
1214     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1215                                             n->encrypted_head->size,
1216                                             n->encrypted_head->priority,
1217                                             GNUNET_TIME_absolute_get_remaining
1218                                             (n->encrypted_head->deadline),
1219                                             &notify_encrypted_transmit_ready,
1220                                             n);
1221   if (n->th == NULL)
1222     {
1223       /* message request too large (oops) */
1224       GNUNET_break (0);
1225       /* discard encrypted message */
1226       GNUNET_assert (NULL != (m = n->encrypted_head));
1227       n->encrypted_head = m->next;
1228       if (m->next == NULL)
1229         n->encrypted_tail = NULL;
1230       GNUNET_free (m);
1231       process_encrypted_neighbour_queue (n);
1232     }
1233 }
1234
1235
1236 /**
1237  * Decrypt size bytes from in and write the result to out.  Use the
1238  * key for inbound traffic of the given neighbour.  This function does
1239  * NOT do any integrity-checks on the result.
1240  *
1241  * @param n neighbour we are receiving from
1242  * @param iv initialization vector to use
1243  * @param in ciphertext
1244  * @param out plaintext
1245  * @param size size of in/out
1246  * @return GNUNET_OK on success
1247  */
1248 static int
1249 do_decrypt (struct Neighbour *n,
1250             const GNUNET_HashCode * iv,
1251             const void *in, void *out, size_t size)
1252 {
1253   if (size != (uint16_t) size)
1254     {
1255       GNUNET_break (0);
1256       return GNUNET_NO;
1257     }
1258   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1259       (n->status != PEER_STATE_KEY_CONFIRMED))
1260     {
1261       GNUNET_break_op (0);
1262       return GNUNET_SYSERR;
1263     }
1264   if (size !=
1265       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1266                                  in,
1267                                  (uint16_t) size,
1268                                  (const struct
1269                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1270                                  out))
1271     {
1272       GNUNET_break (0);
1273       return GNUNET_SYSERR;
1274     }
1275 #if DEBUG_CORE
1276   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277               "Decrypted %u bytes from `%4s' using key %u\n",
1278               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1279 #endif
1280   return GNUNET_OK;
1281 }
1282
1283
1284 /**
1285  * Encrypt size bytes from in and write the result to out.  Use the
1286  * key for outbound traffic of the given neighbour.
1287  *
1288  * @param n neighbour we are sending to
1289  * @param iv initialization vector to use
1290  * @param in ciphertext
1291  * @param out plaintext
1292  * @param size size of in/out
1293  * @return GNUNET_OK on success
1294  */
1295 static int
1296 do_encrypt (struct Neighbour *n,
1297             const GNUNET_HashCode * iv,
1298             const void *in, void *out, size_t size)
1299 {
1300   if (size != (uint16_t) size)
1301     {
1302       GNUNET_break (0);
1303       return GNUNET_NO;
1304     }
1305   GNUNET_assert (size ==
1306                  GNUNET_CRYPTO_aes_encrypt (in,
1307                                             (uint16_t) size,
1308                                             &n->encrypt_key,
1309                                             (const struct
1310                                              GNUNET_CRYPTO_AesInitializationVector
1311                                              *) iv, out));
1312 #if DEBUG_CORE
1313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1314               "Encrypted %u bytes for `%4s' using key %u\n", size,
1315               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1316 #endif
1317   return GNUNET_OK;
1318 }
1319
1320
1321 /**
1322  * Select messages for transmission.  This heuristic uses a combination
1323  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1324  * and priority-based discard (in case no feasible schedule exist) and
1325  * speculative optimization (defer any kind of transmission until
1326  * we either create a batch of significant size, 25% of max, or until
1327  * we are close to a deadline).  Furthermore, when scheduling the
1328  * heuristic also packs as many messages into the batch as possible,
1329  * starting with those with the earliest deadline.  Yes, this is fun.
1330  *
1331  * @param n neighbour to select messages from
1332  * @param size number of bytes to select for transmission
1333  * @param retry_time set to the time when we should try again
1334  *        (only valid if this function returns zero)
1335  * @return number of bytes selected, or 0 if we decided to
1336  *         defer scheduling overall; in that case, retry_time is set.
1337  */
1338 static size_t
1339 select_messages (struct Neighbour *n,
1340                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1341 {
1342   struct MessageEntry *pos;
1343   struct MessageEntry *min;
1344   struct MessageEntry *last;
1345   unsigned int min_prio;
1346   struct GNUNET_TIME_Absolute t;
1347   struct GNUNET_TIME_Absolute now;
1348   uint64_t delta;
1349   uint64_t avail;
1350   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1351   size_t off;
1352   int discard_low_prio;
1353
1354   GNUNET_assert (NULL != n->messages);
1355   now = GNUNET_TIME_absolute_get ();
1356   /* last entry in linked list of messages processed */
1357   last = NULL;
1358   /* should we remove the entry with the lowest
1359      priority from consideration for scheduling at the
1360      end of the loop? */
1361   discard_low_prio = GNUNET_YES;
1362   while (GNUNET_YES == discard_low_prio)
1363     {
1364       min = NULL;
1365       min_prio = -1;
1366       discard_low_prio = GNUNET_NO;
1367       /* number of bytes available for transmission at time "t" */
1368       avail = n->available_send_window;
1369       t = n->last_asw_update;
1370       /* how many bytes have we (hypothetically) scheduled so far */
1371       off = 0;
1372       /* maximum time we can wait before transmitting anything
1373          and still make all of our deadlines */
1374       slack = -1;
1375
1376       pos = n->messages;
1377       /* note that we use "*2" here because we want to look
1378          a bit further into the future; much more makes no
1379          sense since new message might be scheduled in the
1380          meantime... */
1381       while ((pos != NULL) && (off < size * 2))
1382         {
1383           if (pos->do_transmit == GNUNET_YES)
1384             {
1385               /* already removed from consideration */
1386               pos = pos->next;
1387               continue;
1388             }
1389           if (discard_low_prio == GNUNET_NO)
1390             {
1391               delta = pos->deadline.value;
1392               if (delta < t.value)
1393                 delta = 0;
1394               else
1395                 delta = t.value - delta;
1396               avail += delta * n->bpm_out / 1000 / 60;
1397               if (avail < pos->size)
1398                 {
1399                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1400                 }
1401               else
1402                 {
1403                   avail -= pos->size;
1404                   /* update slack, considering both its absolute deadline
1405                      and relative deadlines caused by other messages
1406                      with their respective load */
1407                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1408                   if (pos->deadline.value < now.value)
1409                     slack = 0;
1410                   else
1411                     slack =
1412                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1413                 }
1414             }
1415           off += pos->size;
1416           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1417           if (pos->priority <= min_prio)
1418             {
1419               /* update min for discard */
1420               min_prio = pos->priority;
1421               min = pos;
1422             }
1423           pos = pos->next;
1424         }
1425       if (discard_low_prio)
1426         {
1427           GNUNET_assert (min != NULL);
1428           /* remove lowest-priority entry from consideration */
1429           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1430         }
1431       last = pos;
1432     }
1433   /* guard against sending "tiny" messages with large headers without
1434      urgent deadlines */
1435   if ( (slack > 1000) && (size > 4 * off) )
1436     {
1437       /* less than 25% of message would be filled with
1438          deadlines still being met if we delay by one
1439          second or more; so just wait for more data */
1440       retry_time->value = slack / 2;
1441       /* reset do_transmit values for next time */
1442       while (pos != last)
1443         {
1444           pos->do_transmit = GNUNET_NO;
1445           pos = pos->next;
1446         }
1447       return 0;
1448     }
1449   /* select marked messages (up to size) for transmission */
1450   off = 0;
1451   pos = n->messages;
1452   while (pos != last)
1453     {
1454       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1455         {
1456           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1457           off += pos->size;
1458           size -= pos->size;
1459         }
1460       else
1461         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1462       pos = pos->next;
1463     }
1464 #if DEBUG_CORE
1465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1466               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1467               off, GNUNET_i2s (&n->peer));
1468 #endif
1469   return off;
1470 }
1471
1472
1473 /**
1474  * Batch multiple messages into a larger buffer.
1475  *
1476  * @param n neighbour to take messages from
1477  * @param buf target buffer
1478  * @param size size of buf
1479  * @param deadline set to transmission deadline for the result
1480  * @param retry_time set to the time when we should try again
1481  *        (only valid if this function returns zero)
1482  * @param priority set to the priority of the batch
1483  * @return number of bytes written to buf (can be zero)
1484  */
1485 static size_t
1486 batch_message (struct Neighbour *n,
1487                char *buf,
1488                size_t size,
1489                struct GNUNET_TIME_Absolute *deadline,
1490                struct GNUNET_TIME_Relative *retry_time,
1491                unsigned int *priority)
1492 {
1493   struct MessageEntry *pos;
1494   struct MessageEntry *prev;
1495   struct MessageEntry *next;
1496   size_t ret;
1497
1498   ret = 0;
1499   *priority = 0;
1500   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1501   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1502   if (0 == select_messages (n, size, retry_time))
1503     {
1504       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1505                   "No messages selected, will try again in %llu ms\n",
1506                   retry_time->value);
1507       return 0;
1508     }
1509   pos = n->messages;
1510   prev = NULL;
1511   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1512     {
1513       next = pos->next;
1514       if (GNUNET_YES == pos->do_transmit)
1515         {
1516           GNUNET_assert (pos->size <= size);
1517           memcpy (&buf[ret], &pos[1], pos->size);
1518           ret += pos->size;
1519           size -= pos->size;
1520           *priority += pos->priority;
1521           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1522           GNUNET_free (pos);
1523           if (prev == NULL)
1524             n->messages = next;
1525           else
1526             prev->next = next;
1527         }
1528       else
1529         {
1530           prev = pos;
1531         }
1532       pos = next;
1533     }
1534   return ret;
1535 }
1536
1537
1538 /**
1539  * Remove messages with deadlines that have long expired from
1540  * the queue.
1541  *
1542  * @param n neighbour to inspect
1543  */
1544 static void
1545 discard_expired_messages (struct Neighbour *n)
1546 {
1547   struct MessageEntry *prev;
1548   struct MessageEntry *next;
1549   struct MessageEntry *pos;
1550   struct GNUNET_TIME_Absolute now;
1551   struct GNUNET_TIME_Relative delta;
1552
1553   now = GNUNET_TIME_absolute_get ();
1554   prev = NULL;
1555   pos = n->messages;
1556   while (pos != NULL) 
1557     {
1558       next = pos->next;
1559       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1560       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1561         {
1562 #if DEBUG_CORE
1563           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1564                       "Message is %llu ms past due, discarding.\n",
1565                       delta.value);
1566 #endif
1567           if (prev == NULL)
1568             n->messages = next;
1569           else
1570             prev->next = next;
1571           GNUNET_free (pos);
1572         }
1573       else
1574         prev = pos;
1575       pos = next;
1576     }
1577 }
1578
1579
1580 /**
1581  * Signature of the main function of a task.
1582  *
1583  * @param cls closure
1584  * @param tc context information (why was this task triggered now)
1585  */
1586 static void
1587 retry_plaintext_processing (void *cls,
1588                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1589 {
1590   struct Neighbour *n = cls;
1591
1592   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1593   process_plaintext_neighbour_queue (n);
1594 }
1595
1596
1597 /**
1598  * Send our key (and encrypted PING) to the other peer.
1599  *
1600  * @param n the other peer
1601  */
1602 static void send_key (struct Neighbour *n);
1603
1604
1605 /**
1606  * Check if we have plaintext messages for the specified neighbour
1607  * pending, and if so, consider batching and encrypting them (and
1608  * then trigger processing of the encrypted queue if needed).
1609  *
1610  * @param n neighbour to check.
1611  */
1612 static void
1613 process_plaintext_neighbour_queue (struct Neighbour *n)
1614 {
1615   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1616   size_t used;
1617   size_t esize;
1618   struct EncryptedMessage *em;  /* encrypted message */
1619   struct EncryptedMessage *ph;  /* plaintext header */
1620   struct MessageEntry *me;
1621   unsigned int priority;
1622   struct GNUNET_TIME_Absolute deadline;
1623   struct GNUNET_TIME_Relative retry_time;
1624
1625   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1626     {
1627       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1628       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1629     }
1630   switch (n->status)
1631     {
1632     case PEER_STATE_DOWN:
1633       send_key (n);
1634 #if DEBUG_CORE
1635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1637                   GNUNET_i2s(&n->peer));
1638 #endif
1639       return;
1640     case PEER_STATE_KEY_SENT:
1641       GNUNET_assert (n->retry_set_key_task !=
1642                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1643 #if DEBUG_CORE
1644       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1646                   GNUNET_i2s(&n->peer));
1647 #endif
1648       return;
1649     case PEER_STATE_KEY_RECEIVED:
1650       GNUNET_assert (n->retry_set_key_task !=
1651                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1652 #if DEBUG_CORE
1653       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1654                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1655                   GNUNET_i2s(&n->peer));
1656 #endif
1657       return;
1658     case PEER_STATE_KEY_CONFIRMED:
1659       /* ready to continue */
1660       break;
1661     }
1662   discard_expired_messages (n);
1663   if (n->messages == NULL)
1664     {
1665 #if DEBUG_CORE
1666       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1667                   "Plaintext message queue for `%4s' is empty.\n",
1668                   GNUNET_i2s(&n->peer));
1669 #endif
1670       return;                   /* no pending messages */
1671     }
1672   if (n->encrypted_head != NULL)
1673     {
1674 #if DEBUG_CORE
1675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1677                   GNUNET_i2s(&n->peer));
1678 #endif
1679       return;                   /* wait for messages already encrypted to be
1680                                    processed first! */
1681     }
1682   ph = (struct EncryptedMessage *) pbuf;
1683   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1684   priority = 0;
1685   used = sizeof (struct EncryptedMessage);
1686   used += batch_message (n,
1687                          &pbuf[used],
1688                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1689                          &deadline, &retry_time, &priority);
1690   if (used == sizeof (struct EncryptedMessage))
1691     {
1692 #if DEBUG_CORE
1693       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1694                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1695                   GNUNET_i2s(&n->peer));
1696 #endif
1697       /* no messages selected for sending, try again later... */
1698       n->retry_plaintext_task =
1699         GNUNET_SCHEDULER_add_delayed (sched,
1700                                       GNUNET_NO,
1701                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1702                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1703                                       retry_time,
1704                                       &retry_plaintext_processing, n);
1705       return;
1706     }
1707
1708   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1709   ph->inbound_bpm_limit = htonl (n->bpm_in);
1710   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1711
1712   /* setup encryption message header */
1713   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1714   me->deadline = deadline;
1715   me->priority = priority;
1716   me->size = used;
1717   em = (struct EncryptedMessage *) &me[1];
1718   em->header.size = htons (used);
1719   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1720   em->reserved = htonl (0);
1721   esize = used - ENCRYPTED_HEADER_SIZE;
1722   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1723   /* encrypt */
1724 #if DEBUG_CORE
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1727               esize,
1728               GNUNET_i2s(&n->peer));
1729 #endif
1730   GNUNET_assert (GNUNET_OK ==
1731                  do_encrypt (n,
1732                              &em->plaintext_hash,
1733                              &ph->sequence_number,
1734                              &em->sequence_number, esize));
1735   /* append to transmission list */
1736   if (n->encrypted_tail == NULL)
1737     n->encrypted_head = me;
1738   else
1739     n->encrypted_tail->next = me;
1740   n->encrypted_tail = me;
1741   process_encrypted_neighbour_queue (n);
1742 }
1743
1744
1745 /**
1746  * Handle CORE_SEND request.
1747  */
1748 static void
1749 handle_client_send (void *cls,
1750                     struct GNUNET_SERVER_Client *client,
1751                     const struct GNUNET_MessageHeader *message);
1752
1753
1754 /**
1755  * Function called to notify us that we either succeeded
1756  * or failed to connect (at the transport level) to another
1757  * peer.  We should either free the message we were asked
1758  * to transmit or re-try adding it to the queue.
1759  *
1760  * @param cls closure
1761  * @param size number of bytes available in buf
1762  * @param buf where the callee should write the message
1763  * @return number of bytes written to buf
1764  */
1765 static size_t
1766 send_connect_continuation (void *cls, size_t size, void *buf)
1767 {
1768   struct SendMessage *sm = cls;
1769
1770   if (buf == NULL)
1771     {
1772 #if DEBUG_CORE
1773       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1774                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1775                   GNUNET_i2s (&sm->peer));
1776 #endif
1777       GNUNET_free (sm);
1778       return 0;
1779     }
1780 #if DEBUG_CORE
1781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1783               GNUNET_i2s (&sm->peer));
1784 #endif
1785   handle_client_send (NULL, NULL, &sm->header);
1786   GNUNET_free (sm);
1787   return 0;
1788 }
1789
1790
1791 /**
1792  * Handle CORE_SEND request.
1793  */
1794 static void
1795 handle_client_send (void *cls,
1796                     struct GNUNET_SERVER_Client *client,
1797                     const struct GNUNET_MessageHeader *message)
1798 {
1799   const struct SendMessage *sm;
1800   struct SendMessage *smc;
1801   const struct GNUNET_MessageHeader *mh;
1802   struct Neighbour *n;
1803   struct MessageEntry *prev;
1804   struct MessageEntry *pos;
1805   struct MessageEntry *e; 
1806   struct MessageEntry *min_prio_entry;
1807   struct MessageEntry *min_prio_prev;
1808   unsigned int min_prio;
1809   unsigned int queue_size;
1810   uint16_t msize;
1811
1812   msize = ntohs (message->size);
1813   if (msize <
1814       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1815     {
1816       GNUNET_break (0);
1817       if (client != NULL)
1818         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1819       return;
1820     }
1821   sm = (const struct SendMessage *) message;
1822   msize -= sizeof (struct SendMessage);
1823   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1824   if (msize != ntohs (mh->size))
1825     {
1826       GNUNET_break (0);
1827       if (client != NULL)
1828         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1829       return;
1830     }
1831   n = find_neighbour (&sm->peer);
1832   if (n == NULL)
1833     {
1834 #if DEBUG_CORE
1835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1836                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1837                   "SEND",
1838                   GNUNET_i2s (&sm->peer),
1839                   GNUNET_TIME_absolute_get_remaining
1840                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1841 #endif
1842       msize += sizeof (struct SendMessage);
1843       /* ask transport to connect to the peer */
1844       smc = GNUNET_malloc (msize);
1845       memcpy (smc, sm, msize);
1846       if (NULL ==
1847           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1848                                                   &sm->peer,
1849                                                   0, 0,
1850                                                   GNUNET_TIME_absolute_get_remaining
1851                                                   (GNUNET_TIME_absolute_ntoh
1852                                                    (sm->deadline)),
1853                                                   &send_connect_continuation,
1854                                                   smc))
1855         {
1856           /* transport has already a request pending for this peer! */
1857 #if DEBUG_CORE
1858           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1859                       "Dropped second message destined for `%4s' since connection is still down.\n",
1860                       GNUNET_i2s(&sm->peer));
1861 #endif
1862           GNUNET_free (smc);
1863         }
1864       if (client != NULL)
1865         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1866       return;
1867     }
1868 #if DEBUG_CORE
1869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1870               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1871               "SEND",
1872               msize, 
1873               GNUNET_i2s (&sm->peer));
1874 #endif
1875   /* bound queue size */
1876   discard_expired_messages (n);
1877   min_prio = (unsigned int) -1;
1878   queue_size = 0;
1879   prev = NULL;
1880   pos = n->messages;
1881   while (pos != NULL) 
1882     {
1883       if (pos->priority < min_prio)
1884         {
1885           min_prio_entry = pos;
1886           min_prio_prev = prev;
1887           min_prio = pos->priority;
1888         }
1889       queue_size++;
1890       prev = pos;
1891       pos = pos->next;
1892     }
1893   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1894     {
1895       /* queue full */
1896       if (ntohl(sm->priority) <= min_prio)
1897         {
1898           /* discard new entry */
1899 #if DEBUG_CORE
1900           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1901                       "Queue full, discarding new request\n");
1902 #endif
1903           if (client != NULL)
1904             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1905           return;
1906         }
1907       /* discard "min_prio_entry" */
1908 #if DEBUG_CORE
1909       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1910                   "Queue full, discarding existing older request\n");
1911 #endif
1912       if (min_prio_prev == NULL)
1913         n->messages = min_prio_entry->next;
1914       else
1915         min_prio_prev->next = min_prio_entry->next;      
1916       GNUNET_free (min_prio_entry);     
1917     }
1918   
1919   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1920   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1921   e->priority = ntohl (sm->priority);
1922   e->size = msize;
1923   memcpy (&e[1], mh, msize);
1924
1925   /* insert, keep list sorted by deadline */
1926   prev = NULL;
1927   pos = n->messages;
1928   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1929     {
1930       prev = pos;
1931       pos = pos->next;
1932     }
1933   if (prev == NULL)
1934     n->messages = e;
1935   else
1936     prev->next = e;
1937   e->next = pos;
1938
1939   /* consider scheduling now */
1940   process_plaintext_neighbour_queue (n);
1941   if (client != NULL)
1942     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1943 }
1944
1945
1946 /**
1947  * List of handlers for the messages understood by this
1948  * service.
1949  */
1950 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1951   {&handle_client_init, NULL,
1952    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1953   {&handle_client_request_configure, NULL,
1954    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1955    sizeof (struct RequestConfigureMessage)},
1956   {&handle_client_send, NULL,
1957    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1958   {NULL, NULL, 0, 0}
1959 };
1960
1961
1962 /**
1963  * PEERINFO is giving us a HELLO for a peer.  Add the
1964  * public key to the neighbour's struct and retry
1965  * send_key.  Or, if we did not get a HELLO, just do
1966  * nothing.
1967  *
1968  * @param cls NULL
1969  * @param peer the peer for which this is the HELLO
1970  * @param hello HELLO message of that peer
1971  * @param trust amount of trust we currently have in that peer
1972  */
1973 static void
1974 process_hello_retry_send_key (void *cls,
1975                               const struct GNUNET_PeerIdentity *peer,
1976                               const struct GNUNET_HELLO_Message *hello,
1977                               uint32_t trust)
1978 {
1979   struct Neighbour *n;
1980
1981   if (peer == NULL)
1982     return;
1983   n = find_neighbour (peer);
1984   if (n == NULL)
1985     return;
1986   if (n->public_key != NULL)
1987     return;
1988 #if DEBUG_CORE
1989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1990               "Received new `%s' message for `%4s', initiating key exchange.\n",
1991               "HELLO",
1992               GNUNET_i2s (peer));
1993 #endif
1994   n->public_key =
1995     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
1996   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
1997     {
1998       GNUNET_free (n->public_key);
1999       n->public_key = NULL;
2000       return;
2001     }
2002   send_key (n);
2003 }
2004
2005
2006 /**
2007  * Task that will retry "send_key" if our previous attempt failed
2008  * to yield a PONG.
2009  */
2010 static void
2011 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2012 {
2013   struct Neighbour *n = cls;
2014
2015   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2016   n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2017   n->set_key_retry_frequency =
2018     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2019   send_key (n);
2020 }
2021
2022
2023 /**
2024  * Send our key (and encrypted PING) to the other peer.
2025  *
2026  * @param n the other peer
2027  */
2028 static void
2029 send_key (struct Neighbour *n)
2030 {
2031   struct SetKeyMessage *sm;
2032   struct MessageEntry *me;
2033   struct PingMessage pp;
2034   struct PingMessage *pm;
2035
2036 #if DEBUG_CORE
2037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038               "Asked to perform key exchange with `%4s'.\n",
2039               GNUNET_i2s (&n->peer));
2040 #endif
2041   if (n->public_key == NULL)
2042     {
2043       /* lookup n's public key, then try again */
2044 #if DEBUG_CORE
2045       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2046                   "Lacking public key for `%4s', trying to obtain one.\n",
2047                   GNUNET_i2s (&n->peer));
2048 #endif
2049       GNUNET_PEERINFO_for_all (cfg,
2050                                sched,
2051                                &n->peer,
2052                                0,
2053                                GNUNET_TIME_UNIT_MINUTES,
2054                                &process_hello_retry_send_key, NULL);
2055       return;
2056     }
2057   /* first, set key message */
2058   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2059                       sizeof (struct SetKeyMessage));
2060   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2061   me->priority = SET_KEY_PRIORITY;
2062   me->size = sizeof (struct SetKeyMessage);
2063   if (n->encrypted_head == NULL)
2064     n->encrypted_head = me;
2065   else
2066     n->encrypted_tail->next = me;
2067   n->encrypted_tail = me;
2068   sm = (struct SetKeyMessage *) &me[1];
2069   sm->header.size = htons (sizeof (struct SetKeyMessage));
2070   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2071   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2072                                         PEER_STATE_KEY_SENT : n->status));
2073   sm->purpose.size =
2074     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2075            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2076            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2077            sizeof (struct GNUNET_PeerIdentity));
2078   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2079   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2080   sm->target = n->peer;
2081   GNUNET_assert (GNUNET_OK ==
2082                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2083                                             sizeof (struct
2084                                                     GNUNET_CRYPTO_AesSessionKey),
2085                                             n->public_key,
2086                                             &sm->encrypted_key));
2087   GNUNET_assert (GNUNET_OK ==
2088                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2089                                          &sm->signature));
2090
2091   /* second, encrypted PING message */
2092   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2093                       sizeof (struct PingMessage));
2094   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2095   me->priority = PING_PRIORITY;
2096   me->size = sizeof (struct PingMessage);
2097   n->encrypted_tail->next = me;
2098   n->encrypted_tail = me;
2099   pm = (struct PingMessage *) &me[1];
2100   pm->header.size = htons (sizeof (struct PingMessage));
2101   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2102   pp.challenge = htonl (n->ping_challenge);
2103   pp.target = n->peer;
2104 #if DEBUG_CORE
2105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2106               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2107               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2109               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2110               "PING",
2111               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2112 #endif
2113   do_encrypt (n,
2114               &n->peer.hashPubKey,
2115               &pp.challenge,
2116               &pm->challenge,
2117               sizeof (struct PingMessage) -
2118               sizeof (struct GNUNET_MessageHeader));
2119   /* update status */
2120   switch (n->status)
2121     {
2122     case PEER_STATE_DOWN:
2123       n->status = PEER_STATE_KEY_SENT;
2124       break;
2125     case PEER_STATE_KEY_SENT:
2126       break;
2127     case PEER_STATE_KEY_RECEIVED:
2128       break;
2129     case PEER_STATE_KEY_CONFIRMED:
2130       GNUNET_break (0);
2131       break;
2132     default:
2133       GNUNET_break (0);
2134       break;
2135     }
2136   /* trigger queue processing */
2137   process_encrypted_neighbour_queue (n);
2138   if (n->status != PEER_STATE_KEY_CONFIRMED)
2139     n->retry_set_key_task
2140       = GNUNET_SCHEDULER_add_delayed (sched,
2141                                       GNUNET_NO,
2142                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2143                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2144                                       n->set_key_retry_frequency,
2145                                       &set_key_retry_task, n);
2146 }
2147
2148
2149 /**
2150  * We received a SET_KEY message.  Validate and update
2151  * our key material and status.
2152  *
2153  * @param n the neighbour from which we received message m
2154  * @param m the set key message we received
2155  */
2156 static void
2157 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2158
2159
2160 /**
2161  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2162  * the neighbour's struct and retry handling the set_key message.  Or,
2163  * if we did not get a HELLO, just free the set key message.
2164  *
2165  * @param cls pointer to the set key message
2166  * @param peer the peer for which this is the HELLO
2167  * @param hello HELLO message of that peer
2168  * @param trust amount of trust we currently have in that peer
2169  */
2170 static void
2171 process_hello_retry_handle_set_key (void *cls,
2172                                     const struct GNUNET_PeerIdentity *peer,
2173                                     const struct GNUNET_HELLO_Message *hello,
2174                                     uint32_t trust)
2175 {
2176   struct SetKeyMessage *sm = cls;
2177   struct Neighbour *n;
2178
2179   if (peer == NULL)
2180     {
2181       GNUNET_free (sm);
2182       return;
2183     }
2184   n = find_neighbour (peer);
2185   if (n == NULL)
2186     {
2187       GNUNET_break (0);
2188       return;
2189     }
2190   if (n->public_key != NULL)
2191     return;                     /* multiple HELLOs match!? */
2192   n->public_key =
2193     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2194   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2195     {
2196       GNUNET_break_op (0);
2197       GNUNET_free (n->public_key);
2198       n->public_key = NULL;
2199       return;
2200     }
2201 #if DEBUG_CORE
2202   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2203               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2204               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2205 #endif
2206   handle_set_key (n, sm);
2207 }
2208
2209
2210 /**
2211  * We received a PING message.  Validate and transmit
2212  * PONG.
2213  *
2214  * @param n sender of the PING
2215  * @param m the encrypted PING message itself
2216  */
2217 static void
2218 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2219 {
2220   struct PingMessage t;
2221   struct PingMessage *tp;
2222   struct MessageEntry *me;
2223
2224 #if DEBUG_CORE
2225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226               "Core service receives `%s' request from `%4s'.\n",
2227               "PING", GNUNET_i2s (&n->peer));
2228 #endif
2229   if (GNUNET_OK !=
2230       do_decrypt (n,
2231                   &my_identity.hashPubKey,
2232                   &m->challenge,
2233                   &t.challenge,
2234                   sizeof (struct PingMessage) -
2235                   sizeof (struct GNUNET_MessageHeader)))
2236     return;
2237 #if DEBUG_CORE
2238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2239               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2240               "PING",
2241               GNUNET_i2s (&t.target),
2242               ntohl (t.challenge), n->decrypt_key.crc32);
2243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244               "Target of `%s' request is `%4s'.\n",
2245               "PING", GNUNET_i2s (&t.target));
2246 #endif
2247   if (0 != memcmp (&t.target,
2248                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2249     {
2250       GNUNET_break_op (0);
2251       return;
2252     }
2253   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2254                       sizeof (struct PingMessage));
2255   if (n->encrypted_tail != NULL)
2256     n->encrypted_tail->next = me;
2257   else
2258     {
2259       n->encrypted_tail = me;
2260       n->encrypted_head = me;
2261     }
2262   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2263   me->priority = PONG_PRIORITY;
2264   me->size = sizeof (struct PingMessage);
2265   tp = (struct PingMessage *) &me[1];
2266   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2267   tp->header.size = htons (sizeof (struct PingMessage));
2268   do_encrypt (n,
2269               &my_identity.hashPubKey,
2270               &t.challenge,
2271               &tp->challenge,
2272               sizeof (struct PingMessage) -
2273               sizeof (struct GNUNET_MessageHeader));
2274 #if DEBUG_CORE
2275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2276               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2277               ntohl (t.challenge), n->encrypt_key.crc32);
2278 #endif
2279   /* trigger queue processing */
2280   process_encrypted_neighbour_queue (n);
2281 }
2282
2283
2284 /**
2285  * We received a SET_KEY message.  Validate and update
2286  * our key material and status.
2287  *
2288  * @param n the neighbour from which we received message m
2289  * @param m the set key message we received
2290  */
2291 static void
2292 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2293 {
2294   struct SetKeyMessage *m_cpy;
2295   struct GNUNET_TIME_Absolute t;
2296   struct GNUNET_CRYPTO_AesSessionKey k;
2297   struct PingMessage *ping;
2298   enum PeerStateMachine sender_status;
2299
2300 #if DEBUG_CORE
2301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2302               "Core service receives `%s' request from `%4s'.\n",
2303               "SET_KEY", GNUNET_i2s (&n->peer));
2304 #endif
2305   if (n->public_key == NULL)
2306     {
2307 #if DEBUG_CORE
2308       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309                   "Lacking public key for peer, trying to obtain one.\n");
2310 #endif
2311       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2312       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2313       /* lookup n's public key, then try again */
2314       GNUNET_PEERINFO_for_all (cfg,
2315                                sched,
2316                                &n->peer,
2317                                0,
2318                                GNUNET_TIME_UNIT_MINUTES,
2319                                &process_hello_retry_handle_set_key, m_cpy);
2320       return;
2321     }
2322   if ((ntohl (m->purpose.size) !=
2323        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2324        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2325        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2326        sizeof (struct GNUNET_PeerIdentity)) ||
2327       (GNUNET_OK !=
2328        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2329                                  &m->purpose, &m->signature, n->public_key)))
2330     {
2331       /* invalid signature */
2332       GNUNET_break_op (0);
2333       return;
2334     }
2335   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2336   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2337        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2338       (t.value < n->decrypt_key_created.value))
2339     {
2340       /* this could rarely happen due to massive re-ordering of
2341          messages on the network level, but is most likely either
2342          a bug or some adversary messing with us.  Report. */
2343       GNUNET_break_op (0);
2344       return;
2345     }
2346 #if DEBUG_CORE
2347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2348 #endif
2349   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2350                                   &m->encrypted_key,
2351                                   &k,
2352                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2353        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2354       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2355     {
2356       /* failed to decrypt !? */
2357       GNUNET_break_op (0);
2358       return;
2359     }
2360
2361   n->decrypt_key = k;
2362   if (n->decrypt_key_created.value != t.value)
2363     {
2364       /* fresh key, reset sequence numbers */
2365       n->last_sequence_number_received = 0;
2366       n->last_packets_bitmap = 0;
2367       n->decrypt_key_created = t;
2368     }
2369   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2370   switch (n->status)
2371     {
2372     case PEER_STATE_DOWN:
2373       n->status = PEER_STATE_KEY_RECEIVED;
2374 #if DEBUG_CORE
2375       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376                   "Responding to `%s' with my own key.\n", "SET_KEY");
2377 #endif
2378       send_key (n);
2379       break;
2380     case PEER_STATE_KEY_SENT:
2381     case PEER_STATE_KEY_RECEIVED:
2382       n->status = PEER_STATE_KEY_RECEIVED;
2383       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2384           (sender_status != PEER_STATE_KEY_CONFIRMED))
2385         {
2386 #if DEBUG_CORE
2387           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2388                       "Responding to `%s' with my own key (other peer has status %u).\n",
2389                       "SET_KEY", sender_status);
2390 #endif
2391           send_key (n);
2392         }
2393       break;
2394     case PEER_STATE_KEY_CONFIRMED:
2395       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2396           (sender_status != PEER_STATE_KEY_CONFIRMED))
2397         {
2398 #if DEBUG_CORE
2399           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2401                       "SET_KEY", sender_status);
2402 #endif
2403           send_key (n);
2404         }
2405       break;
2406     default:
2407       GNUNET_break (0);
2408       break;
2409     }
2410   if (n->pending_ping != NULL)
2411     {
2412       ping = n->pending_ping;
2413       n->pending_ping = NULL;
2414       handle_ping (n, ping);
2415       GNUNET_free (ping);
2416     }
2417 }
2418
2419
2420 /**
2421  * We received a PONG message.  Validate and update
2422  * our status.
2423  *
2424  * @param n sender of the PONG
2425  * @param m the encrypted PONG message itself
2426  */
2427 static void
2428 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2429 {
2430   struct PingMessage t;
2431
2432 #if DEBUG_CORE
2433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2434               "Core service receives `%s' request from `%4s'.\n",
2435               "PONG", GNUNET_i2s (&n->peer));
2436 #endif
2437   if (GNUNET_OK !=
2438       do_decrypt (n,
2439                   &n->peer.hashPubKey,
2440                   &m->challenge,
2441                   &t.challenge,
2442                   sizeof (struct PingMessage) -
2443                   sizeof (struct GNUNET_MessageHeader)))
2444     return;
2445 #if DEBUG_CORE
2446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2447               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2448               "PONG",
2449               GNUNET_i2s (&t.target),
2450               ntohl (t.challenge), n->decrypt_key.crc32);
2451 #endif
2452   if ((0 != memcmp (&t.target,
2453                     &n->peer,
2454                     sizeof (struct GNUNET_PeerIdentity))) ||
2455       (n->ping_challenge != ntohl (t.challenge)))
2456     {
2457       /* PONG malformed */
2458 #if DEBUG_CORE
2459       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2460                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2461                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2462       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2463                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2464                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2465 #endif
2466       GNUNET_break_op (0);
2467       return;
2468     }
2469   switch (n->status)
2470     {
2471     case PEER_STATE_DOWN:
2472       GNUNET_break (0);         /* should be impossible */
2473       return;
2474     case PEER_STATE_KEY_SENT:
2475       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2476       return;
2477     case PEER_STATE_KEY_RECEIVED:
2478       n->status = PEER_STATE_KEY_CONFIRMED;
2479       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
2480         {
2481           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2482           n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2483         }
2484       process_encrypted_neighbour_queue (n);
2485       break;
2486     case PEER_STATE_KEY_CONFIRMED:
2487       /* duplicate PONG? */
2488       break;
2489     default:
2490       GNUNET_break (0);
2491       break;
2492     }
2493 }
2494
2495
2496 /**
2497  * Send a P2P message to a client.
2498  *
2499  * @param sender who sent us the message?
2500  * @param client who should we give the message to?
2501  * @param m contains the message to transmit
2502  * @param msize number of bytes in buf to transmit
2503  */
2504 static void
2505 send_p2p_message_to_client (struct Neighbour *sender,
2506                             struct Client *client,
2507                             const void *m, size_t msize)
2508 {
2509   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2510   struct NotifyTrafficMessage *ntm;
2511
2512 #if DEBUG_CORE
2513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2514               "Core service passes message from `%4s' of type %u to client.\n",
2515               GNUNET_i2s(&sender->peer),
2516               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2517 #endif
2518   ntm = (struct NotifyTrafficMessage *) buf;
2519   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2520   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2521   ntm->reserved = htonl (0);
2522   ntm->peer = sender->peer;
2523   memcpy (&ntm[1], m, msize);
2524   send_to_client (client, &ntm->header, GNUNET_YES);
2525 }
2526
2527
2528 /**
2529  * Deliver P2P message to interested clients.
2530  *
2531  * @param sender who sent us the message?
2532  * @param m the message
2533  * @param msize size of the message (including header)
2534  */
2535 static void
2536 deliver_message (struct Neighbour *sender,
2537                  const struct GNUNET_MessageHeader *m, size_t msize)
2538 {
2539   struct Client *cpos;
2540   uint16_t type;
2541   unsigned int tpos;
2542   int deliver_full;
2543
2544   type = ntohs (m->type);
2545   cpos = clients;
2546   while (cpos != NULL)
2547     {
2548       deliver_full = GNUNET_NO;
2549       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2550         deliver_full = GNUNET_YES;
2551       else
2552         {
2553           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2554             {
2555               if (type != cpos->types[tpos])
2556                 continue;
2557               deliver_full = GNUNET_YES;
2558               break;
2559             }
2560         }
2561       if (GNUNET_YES == deliver_full)
2562         send_p2p_message_to_client (sender, cpos, m, msize);
2563       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2564         send_p2p_message_to_client (sender, cpos, m,
2565                                     sizeof (struct GNUNET_MessageHeader));
2566       cpos = cpos->next;
2567     }
2568 }
2569
2570
2571 /**
2572  * Align P2P message and then deliver to interested clients.
2573  *
2574  * @param sender who sent us the message?
2575  * @param buffer unaligned (!) buffer containing message
2576  * @param msize size of the message (including header)
2577  */
2578 static void
2579 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2580 {
2581   char abuf[msize];
2582
2583   /* TODO: call to statistics? */
2584   memcpy (abuf, buffer, msize);
2585   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2586 }
2587
2588
2589 /**
2590  * Deliver P2P messages to interested clients.
2591  *
2592  * @param sender who sent us the message?
2593  * @param buffer buffer containing messages, can be modified
2594  * @param buffer_size size of the buffer (overall)
2595  * @param offset offset where messages in the buffer start
2596  */
2597 static void
2598 deliver_messages (struct Neighbour *sender,
2599                   const char *buffer, size_t buffer_size, size_t offset)
2600 {
2601   struct GNUNET_MessageHeader *mhp;
2602   struct GNUNET_MessageHeader mh;
2603   uint16_t msize;
2604   int need_align;
2605
2606   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2607     {
2608       if (0 != offset % sizeof (uint16_t))
2609         {
2610           /* outch, need to copy to access header */
2611           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2612           mhp = &mh;
2613         }
2614       else
2615         {
2616           /* can access header directly */
2617           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2618         }
2619       msize = ntohs (mhp->size);
2620       if (msize + offset > buffer_size)
2621         {
2622           /* malformed message, header says it is larger than what
2623              would fit into the overall buffer */
2624           GNUNET_break_op (0);
2625           break;
2626         }
2627 #if HAVE_UNALIGNED_64_ACCESS
2628       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2629 #else
2630       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2631 #endif
2632       if (GNUNET_YES == need_align)
2633         align_and_deliver (sender, &buffer[offset], msize);
2634       else
2635         deliver_message (sender,
2636                          (const struct GNUNET_MessageHeader *)
2637                          &buffer[offset], msize);
2638       offset += msize;
2639     }
2640 }
2641
2642
2643 /**
2644  * We received an encrypted message.  Decrypt, validate and
2645  * pass on to the appropriate clients.
2646  */
2647 static void
2648 handle_encrypted_message (struct Neighbour *n,
2649                           const struct EncryptedMessage *m)
2650 {
2651   size_t size = ntohs (m->header.size);
2652   char buf[size];
2653   struct EncryptedMessage *pt;  /* plaintext */
2654   GNUNET_HashCode ph;
2655   size_t off;
2656   uint32_t snum;
2657   struct GNUNET_TIME_Absolute t;
2658
2659 #if DEBUG_CORE
2660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2661               "Core service receives `%s' request from `%4s'.\n",
2662               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2663 #endif
2664   /* decrypt */
2665   if (GNUNET_OK !=
2666       do_decrypt (n,
2667                   &m->plaintext_hash,
2668                   &m->sequence_number,
2669                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2670     return;
2671   pt = (struct EncryptedMessage *) buf;
2672
2673   /* validate hash */
2674   GNUNET_CRYPTO_hash (&pt->sequence_number,
2675                       size - ENCRYPTED_HEADER_SIZE, &ph);
2676   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2677     {
2678       /* checksum failed */
2679       GNUNET_break_op (0);
2680       return;
2681     }
2682
2683   /* validate sequence number */
2684   snum = ntohl (pt->sequence_number);
2685   if (n->last_sequence_number_received == snum)
2686     {
2687       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2688                   "Received duplicate message, ignoring.\n");
2689       /* duplicate, ignore */
2690       return;
2691     }
2692   if ((n->last_sequence_number_received > snum) &&
2693       (n->last_sequence_number_received - snum > 32))
2694     {
2695       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2696                   "Received ancient out of sequence message, ignoring.\n");
2697       /* ancient out of sequence, ignore */
2698       return;
2699     }
2700   if (n->last_sequence_number_received > snum)
2701     {
2702       unsigned int rotbit =
2703         1 << (n->last_sequence_number_received - snum - 1);
2704       if ((n->last_packets_bitmap & rotbit) != 0)
2705         {
2706           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2707                       "Received duplicate message, ignoring.\n");
2708           /* duplicate, ignore */
2709           return;
2710         }
2711       n->last_packets_bitmap |= rotbit;
2712     }
2713   if (n->last_sequence_number_received < snum)
2714     {
2715       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2716       n->last_sequence_number_received = snum;
2717     }
2718
2719   /* check timestamp */
2720   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2721   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2722     {
2723       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2724                   _
2725                   ("Message received far too old (%llu ms). Content ignored.\n"),
2726                   GNUNET_TIME_absolute_get_duration (t).value);
2727       return;
2728     }
2729
2730   /* process decrypted message(s) */
2731   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2732   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2733                            n->bpm_out_internal_limit);
2734   n->last_activity = GNUNET_TIME_absolute_get ();
2735   off = sizeof (struct EncryptedMessage);
2736   deliver_messages (n, buf, size, off);
2737 }
2738
2739
2740 /**
2741  * Function called by the transport for each received message.
2742  *
2743  * @param cls closure
2744  * @param latency estimated latency for communicating with the
2745  *             given peer
2746  * @param peer (claimed) identity of the other peer
2747  * @param message the message
2748  */
2749 static void
2750 handle_transport_receive (void *cls,
2751                           struct GNUNET_TIME_Relative latency,
2752                           const struct GNUNET_PeerIdentity *peer,
2753                           const struct GNUNET_MessageHeader *message)
2754 {
2755   struct Neighbour *n;
2756   struct GNUNET_TIME_Absolute now;
2757   int up;
2758   uint16_t type;
2759   uint16_t size;
2760
2761 #if DEBUG_CORE
2762   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2763               "Received message of type %u from `%4s', demultiplexing.\n",
2764               ntohs (message->type), GNUNET_i2s (peer));
2765 #endif
2766   n = find_neighbour (peer);
2767   if (n == NULL)
2768     {
2769       GNUNET_break (0);
2770       return;
2771     }
2772   n->last_latency = latency;
2773   up = n->status == PEER_STATE_KEY_CONFIRMED;
2774   type = ntohs (message->type);
2775   size = ntohs (message->size);
2776   switch (type)
2777     {
2778     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2779       if (size != sizeof (struct SetKeyMessage))
2780         {
2781           GNUNET_break_op (0);
2782           return;
2783         }
2784       handle_set_key (n, (const struct SetKeyMessage *) message);
2785       break;
2786     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2787       if (size < sizeof (struct EncryptedMessage) +
2788           sizeof (struct GNUNET_MessageHeader))
2789         {
2790           GNUNET_break_op (0);
2791           return;
2792         }
2793       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2794           (n->status != PEER_STATE_KEY_CONFIRMED))
2795         {
2796           GNUNET_break_op (0);
2797           return;
2798         }
2799       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2800       break;
2801     case GNUNET_MESSAGE_TYPE_CORE_PING:
2802       if (size != sizeof (struct PingMessage))
2803         {
2804           GNUNET_break_op (0);
2805           return;
2806         }
2807       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2808           (n->status != PEER_STATE_KEY_CONFIRMED))
2809         {
2810 #if DEBUG_CORE
2811           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2812                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2813                       "PING", GNUNET_i2s (&n->peer));
2814 #endif
2815           GNUNET_free_non_null (n->pending_ping);
2816           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2817           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2818           return;
2819         }
2820       handle_ping (n, (const struct PingMessage *) message);
2821       break;
2822     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2823       if (size != sizeof (struct PingMessage))
2824         {
2825           GNUNET_break_op (0);
2826           return;
2827         }
2828       if ((n->status != PEER_STATE_KEY_SENT) &&
2829           (n->status != PEER_STATE_KEY_RECEIVED) &&
2830           (n->status != PEER_STATE_KEY_CONFIRMED))
2831         {
2832           /* could not decrypt pong, oops! */
2833           GNUNET_break_op (0);
2834           return;
2835         }
2836       handle_pong (n, (const struct PingMessage *) message);
2837       break;
2838     default:
2839       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2840                   _("Unsupported message of type %u received.\n"), type);
2841       return;
2842     }
2843   if (n->status == PEER_STATE_KEY_CONFIRMED)
2844     {
2845       now = GNUNET_TIME_absolute_get ();
2846       n->last_activity = now;
2847       if (!up)
2848         n->time_established = now;
2849     }
2850 }
2851
2852
2853 /**
2854  * Function that recalculates the bandwidth quota for the
2855  * given neighbour and transmits it to the transport service.
2856  * 
2857  * @param cls neighbour for the quota update
2858  * @param tc context
2859  */
2860 static void
2861 neighbour_quota_update (void *cls,
2862                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2863
2864
2865 /**
2866  * Schedule the task that will recalculate the bandwidth
2867  * quota for this peer (and possibly force a disconnect of
2868  * idle peers by calculating a bandwidth of zero).
2869  */
2870 static void
2871 schedule_quota_update (struct Neighbour *n)
2872 {
2873   GNUNET_assert (n->quota_update_task ==
2874                  GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
2875   n->quota_update_task
2876     = GNUNET_SCHEDULER_add_delayed (sched,
2877                                     GNUNET_NO,
2878                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2879                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2880                                     QUOTA_UPDATE_FREQUENCY,
2881                                     &neighbour_quota_update,
2882                                     n);
2883 }
2884
2885
2886 /**
2887  * Function that recalculates the bandwidth quota for the
2888  * given neighbour and transmits it to the transport service.
2889  * 
2890  * @param cls neighbour for the quota update
2891  * @param tc context
2892  */
2893 static void
2894 neighbour_quota_update (void *cls,
2895                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2896 {
2897   struct Neighbour *n = cls;
2898   uint32_t q_in;
2899   double pref_rel;
2900   double share;
2901   unsigned long long distributable;
2902   
2903   n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2904   /* calculate relative preference among all neighbours;
2905      divides by a bit more to avoid division by zero AND to
2906      account for possibility of new neighbours joining any time 
2907      AND to convert to double... */
2908   pref_rel = n->current_preference / (1.0 + preference_sum);
2909   share = 0;
2910   distributable = 0;
2911   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2912     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2913   share = distributable * pref_rel;
2914   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2915   /* check if we want to disconnect for good due to inactivity */
2916   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2917        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2918     q_in = 0; /* force disconnect */
2919   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2920        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2921     {
2922       n->bpm_in = q_in;
2923       GNUNET_TRANSPORT_set_quota (transport,
2924                                   &n->peer,
2925                                   n->bpm_in, 
2926                                   n->bpm_out,
2927                                   GNUNET_TIME_UNIT_FOREVER_REL,
2928                                   NULL, NULL);
2929     }
2930   schedule_quota_update (n);
2931 }
2932
2933
2934 /**
2935  * Function called by transport to notify us that
2936  * a peer connected to us (on the network level).
2937  *
2938  * @param cls closure
2939  * @param peer the peer that connected
2940  * @param latency current latency of the connection
2941  */
2942 static void
2943 handle_transport_notify_connect (void *cls,
2944                                  const struct GNUNET_PeerIdentity *peer,
2945                                  struct GNUNET_TIME_Relative latency)
2946 {
2947   struct Neighbour *n;
2948   struct GNUNET_TIME_Absolute now;
2949   struct ConnectNotifyMessage cnm;
2950
2951   n = find_neighbour (peer);
2952   if (n != NULL)
2953     {
2954       /* duplicate connect notification!? */
2955       GNUNET_break (0);
2956       return;
2957     }
2958   now = GNUNET_TIME_absolute_get ();
2959   n = GNUNET_malloc (sizeof (struct Neighbour));
2960   n->next = neighbours;
2961   neighbours = n;
2962   neighbour_count++;
2963   n->peer = *peer;
2964   n->last_latency = latency;
2965   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2966   n->encrypt_key_created = now;
2967   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2968   n->last_asw_update = now;
2969   n->last_arw_update = now;
2970   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2971   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2972   n->bpm_out_internal_limit = (uint32_t) - 1;
2973   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2974   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2975                                                 (uint32_t) - 1);
2976 #if DEBUG_CORE
2977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2978               "Received connection from `%4s'.\n",
2979               GNUNET_i2s (&n->peer));
2980 #endif
2981   schedule_quota_update (n);
2982   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2983   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2984   cnm.bpm_available = htonl (n->bpm_out);
2985   cnm.peer = *peer;
2986   cnm.last_activity = GNUNET_TIME_absolute_hton (now);
2987   send_to_all_clients (&cnm.header, GNUNET_YES);
2988 }
2989
2990
2991 /**
2992  * Free the given entry for the neighbour (it has
2993  * already been removed from the list at this point).
2994  *
2995  * @param n neighbour to free
2996  */
2997 static void
2998 free_neighbour (struct Neighbour *n)
2999 {
3000   struct MessageEntry *m;
3001
3002   while (NULL != (m = n->messages))
3003     {
3004       n->messages = m->next;
3005       GNUNET_free (m);
3006     }
3007   while (NULL != (m = n->encrypted_head))
3008     {
3009       n->encrypted_head = m->next;
3010       GNUNET_free (m);
3011     }
3012   if (NULL != n->th)
3013     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3014   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3015     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3016   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3017     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3018   if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3019     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3020   GNUNET_free_non_null (n->public_key);
3021   GNUNET_free_non_null (n->pending_ping);
3022   GNUNET_free (n);
3023 }
3024
3025
3026 /**
3027  * Function called by transport telling us that a peer
3028  * disconnected.
3029  *
3030  * @param cls closure
3031  * @param peer the peer that disconnected
3032  */
3033 static void
3034 handle_transport_notify_disconnect (void *cls,
3035                                     const struct GNUNET_PeerIdentity *peer)
3036 {
3037   struct ConnectNotifyMessage cnm;
3038   struct Neighbour *n;
3039   struct Neighbour *p;
3040
3041 #if DEBUG_CORE
3042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3043               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3044 #endif
3045   p = NULL;
3046   n = neighbours;
3047   while ((n != NULL) &&
3048          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3049     {
3050       p = n;
3051       n = n->next;
3052     }
3053   if (n == NULL)
3054     {
3055       GNUNET_break (0);
3056       return;
3057     }
3058   if (p == NULL)
3059     neighbours = n->next;
3060   else
3061     p->next = n->next;
3062   GNUNET_assert (neighbour_count > 0);
3063   neighbour_count--;
3064   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3065   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3066   cnm.bpm_available = htonl (0);
3067   cnm.peer = *peer;
3068   cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
3069   send_to_all_clients (&cnm.header, GNUNET_YES);
3070   free_neighbour (n);
3071 }
3072
3073
3074 /**
3075  * Last task run during shutdown.  Disconnects us from
3076  * the transport.
3077  */
3078 static void
3079 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3080 {
3081   struct Neighbour *n;
3082   struct Client *c;
3083
3084 #if DEBUG_CORE
3085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3086               "Core service shutting down.\n");
3087 #endif
3088   GNUNET_assert (transport != NULL);
3089   GNUNET_TRANSPORT_disconnect (transport);
3090   transport = NULL;
3091   while (NULL != (n = neighbours))
3092     {
3093       neighbours = n->next;
3094       GNUNET_assert (neighbour_count > 0);
3095       neighbour_count--;
3096       free_neighbour (n);
3097     }
3098   while (NULL != (c = clients))
3099     handle_client_disconnect (NULL, c->client_handle);
3100 }
3101
3102
3103 /**
3104  * Initiate core service.
3105  *
3106  * @param cls closure
3107  * @param s scheduler to use
3108  * @param serv the initialized server
3109  * @param c configuration to use
3110  */
3111 static void
3112 run (void *cls,
3113      struct GNUNET_SCHEDULER_Handle *s,
3114      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
3115 {
3116 #if 0
3117   unsigned long long qin;
3118   unsigned long long qout;
3119   unsigned long long tneigh;
3120 #endif
3121   char *keyfile;
3122
3123   sched = s;
3124   cfg = c;
3125   /* parse configuration */
3126   if (
3127        (GNUNET_OK !=
3128         GNUNET_CONFIGURATION_get_value_number (c,
3129                                                "CORE",
3130                                                "TOTAL_QUOTA_IN",
3131                                                &bandwidth_target_in)) ||
3132        (GNUNET_OK !=
3133         GNUNET_CONFIGURATION_get_value_number (c,
3134                                                "CORE",
3135                                                "TOTAL_QUOTA_OUT",
3136                                                &bandwidth_target_out)) ||
3137 #if 0
3138        (GNUNET_OK !=
3139         GNUNET_CONFIGURATION_get_value_number (c,
3140                                                "CORE",
3141                                                "YY",
3142                                                &qout)) ||
3143        (GNUNET_OK !=
3144         GNUNET_CONFIGURATION_get_value_number (c,
3145                                                "CORE",
3146                                                "ZZ_LIMIT", &tneigh)) ||
3147 #endif
3148        (GNUNET_OK !=
3149         GNUNET_CONFIGURATION_get_value_filename (c,
3150                                                  "GNUNETD",
3151                                                  "HOSTKEY", &keyfile)))
3152     {
3153       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3154                   _
3155                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3156       GNUNET_SCHEDULER_shutdown (s);
3157       return;
3158     }
3159   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3160   GNUNET_free (keyfile);
3161   if (my_private_key == NULL)
3162     {
3163       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3164                   _("Core service could not access hostkey.  Exiting.\n"));
3165       GNUNET_SCHEDULER_shutdown (s);
3166       return;
3167     }
3168   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3169   GNUNET_CRYPTO_hash (&my_public_key,
3170                       sizeof (my_public_key), &my_identity.hashPubKey);
3171   /* setup notification */
3172   server = serv;
3173   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3174   /* setup transport connection */
3175   transport = GNUNET_TRANSPORT_connect (sched,
3176                                         cfg,
3177                                         NULL,
3178                                         &handle_transport_receive,
3179                                         &handle_transport_notify_connect,
3180                                         &handle_transport_notify_disconnect);
3181   GNUNET_assert (NULL != transport);
3182   GNUNET_SCHEDULER_add_delayed (sched,
3183                                 GNUNET_YES,
3184                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3185                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
3186                                 GNUNET_TIME_UNIT_FOREVER_REL,
3187                                 &cleaning_task, NULL);
3188   /* process client requests */
3189   GNUNET_SERVER_add_handlers (server, handlers);
3190   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3191               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3192 }
3193
3194
3195 /**
3196  * Function called during shutdown.  Clean up our state.
3197  */
3198 static void
3199 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
3200 {
3201   if (my_private_key != NULL)
3202     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3203 }
3204
3205
3206 /**
3207  * The main function for the transport service.
3208  *
3209  * @param argc number of arguments from the command line
3210  * @param argv command line arguments
3211  * @return 0 ok, 1 on error
3212  */
3213 int
3214 main (int argc, char *const *argv)
3215 {
3216   return (GNUNET_OK ==
3217           GNUNET_SERVICE_run (argc,
3218                               argv,
3219                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3220 }
3221
3222 /* end of gnunet-service-core.c */