adding bandwidth management
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * POST-TESTING:
27  * - topology management:
28  *   + bootstrapping (transport offer hello, plugins)
29  *   + internal neighbour selection
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Identity of the neighbour.
369    */
370   struct GNUNET_PeerIdentity peer;
371
372   /**
373    * Key we use to encrypt our messages for the other peer
374    * (initialized by us when we do the handshake).
375    */
376   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
377
378   /**
379    * Key we use to decrypt messages from the other peer
380    * (given to us by the other peer during the handshake).
381    */
382   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
383
384   /**
385    * ID of task used for re-trying plaintext scheduling.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
388
389   /**
390    * ID of task used for re-trying SET_KEY and PING message.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
393
394   /**
395    * ID of task used for updating bandwidth quota for this neighbour.
396    */
397   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
398
399   /**
400    * At what time did we generate our encryption key?
401    */
402   struct GNUNET_TIME_Absolute encrypt_key_created;
403
404   /**
405    * At what time did the other peer generate the decryption key?
406    */
407   struct GNUNET_TIME_Absolute decrypt_key_created;
408
409   /**
410    * At what time did we initially establish (as in, complete session
411    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
412    */
413   struct GNUNET_TIME_Absolute time_established;
414
415   /**
416    * At what time did we last receive an encrypted message from the
417    * other peer?  Should be zero if status != KEY_CONFIRMED.
418    */
419   struct GNUNET_TIME_Absolute last_activity;
420
421   /**
422    * Last latency observed from this peer.
423    */
424   struct GNUNET_TIME_Relative last_latency;
425
426   /**
427    * At what frequency are we currently re-trying SET_KEY messages?
428    */
429   struct GNUNET_TIME_Relative set_key_retry_frequency;
430
431   /**
432    * Time of our last update to the "available_send_window".
433    */
434   struct GNUNET_TIME_Absolute last_asw_update;
435
436   /**
437    * Time of our last update to the "available_recv_window".
438    */
439   struct GNUNET_TIME_Absolute last_arw_update;
440
441   /**
442    * Number of bytes that we are eligible to transmit to this
443    * peer at this point.  Incremented every minute by max_out_bpm,
444    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
445    * bandwidth-hogs are sampled at a frequency of about 78s!);
446    * may get negative if we have VERY high priority content.
447    */
448   long long available_send_window; 
449
450   /**
451    * How much downstream capacity of this peer has been reserved for
452    * our traffic?  (Our clients can request that a certain amount of
453    * bandwidth is available for replies to them; this value is used to
454    * make sure that this reserved amount of bandwidth is actually
455    * available).
456    */
457   long long available_recv_window; 
458
459   /**
460    * How valueable were the messages of this peer recently?
461    */
462   unsigned long long current_preference;
463
464   /**
465    * Bit map indicating which of the 32 sequence numbers before the last
466    * were received (good for accepting out-of-order packets and
467    * estimating reliability of the connection)
468    */
469   unsigned int last_packets_bitmap;
470
471   /**
472    * Number of messages in the message queue for this peer.
473    */
474   unsigned int message_queue_size;
475
476   /**
477    * last sequence number received on this connection (highest)
478    */
479   uint32_t last_sequence_number_received;
480
481   /**
482    * last sequence number transmitted
483    */
484   uint32_t last_sequence_number_sent;
485
486   /**
487    * Available bandwidth in for this peer (current target).
488    */
489   uint32_t bpm_in;
490
491   /**
492    * Available bandwidth out for this peer (current target).
493    */
494   uint32_t bpm_out;
495
496   /**
497    * Internal bandwidth limit set for this peer (initially
498    * typically set to "-1").  "bpm_out" is MAX of
499    * "bpm_out_internal_limit" and "bpm_out_external_limit".
500    */
501   uint32_t bpm_out_internal_limit;
502
503   /**
504    * External bandwidth limit set for this peer by the
505    * peer that we are communicating with.  "bpm_out" is MAX of
506    * "bpm_out_internal_limit" and "bpm_out_external_limit".
507    */
508   uint32_t bpm_out_external_limit;
509
510   /**
511    * What was our PING challenge number (for this peer)?
512    */
513   uint32_t ping_challenge;
514
515   /**
516    * What is our connection status?
517    */
518   enum PeerStateMachine status;
519
520 };
521
522
523 /**
524  * Events are messages for clients.  The struct
525  * itself is followed by the actual message.
526  */
527 struct Event
528 {
529   /**
530    * This is a linked list.
531    */
532   struct Event *next;
533
534   /**
535    * Size of the message.
536    */
537   size_t size;
538
539   /**
540    * Could this event be dropped if this queue
541    * is getting too large? (NOT YET USED!)
542    */
543   int can_drop;
544
545 };
546
547
548 /**
549  * Data structure for each client connected to the core service.
550  */
551 struct Client
552 {
553   /**
554    * Clients are kept in a linked list.
555    */
556   struct Client *next;
557
558   /**
559    * Handle for the client with the server API.
560    */
561   struct GNUNET_SERVER_Client *client_handle;
562
563   /**
564    * Linked list of messages we still need to deliver to
565    * this client.
566    */
567   struct Event *event_head;
568
569   /**
570    * Tail of the linked list of events.
571    */
572   struct Event *event_tail;
573
574   /**
575    * Current transmit handle, NULL if no transmission request
576    * is pending.
577    */
578   struct GNUNET_NETWORK_TransmitHandle *th;
579
580   /**
581    * Array of the types of messages this peer cares
582    * about (with "tcnt" entries).  Allocated as part
583    * of this client struct, do not free!
584    */
585   uint16_t *types;
586
587   /**
588    * Options for messages this client cares about,
589    * see GNUNET_CORE_OPTION_ values.
590    */
591   uint32_t options;
592
593   /**
594    * Number of types of incoming messages this client
595    * specifically cares about.  Size of the "types" array.
596    */
597   unsigned int tcnt;
598
599 };
600
601
602 /**
603  * Our public key.
604  */
605 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
606
607 /**
608  * Our identity.
609  */
610 static struct GNUNET_PeerIdentity my_identity;
611
612 /**
613  * Our private key.
614  */
615 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
616
617 /**
618  * Our scheduler.
619  */
620 struct GNUNET_SCHEDULER_Handle *sched;
621
622 /**
623  * Our configuration.
624  */
625 struct GNUNET_CONFIGURATION_Handle *cfg;
626
627 /**
628  * Our server.
629  */
630 static struct GNUNET_SERVER_Handle *server;
631
632 /**
633  * Transport service.
634  */
635 static struct GNUNET_TRANSPORT_Handle *transport;
636
637 /**
638  * Linked list of our clients.
639  */
640 static struct Client *clients;
641
642 /**
643  * We keep neighbours in a linked list (for now).
644  */
645 static struct Neighbour *neighbours;
646
647 /**
648  * Sum of all preferences among all neighbours.
649  */
650 static unsigned long long preference_sum;
651
652 /**
653  * Total number of neighbours we have.
654  */
655 static unsigned int neighbour_count;
656
657 /**
658  * How much inbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_in;
661
662 /**
663  * How much outbound bandwidth are we supposed to be using?
664  */
665 static unsigned long long bandwidth_target_out;
666
667
668
669 /**
670  * A preference value for a neighbour was update.  Update
671  * the preference sum accordingly.
672  *
673  * @param inc how much was a preference value increased?
674  */
675 static void
676 update_preference_sum (unsigned long long inc)
677 {
678   struct Neighbour *n;
679   unsigned long long os;
680
681   os = preference_sum;
682   preference_sum += inc;
683   if (preference_sum >= os)
684     return; /* done! */
685   /* overflow! compensate by cutting all values in half! */
686   preference_sum = 0;
687   n = neighbours;
688   while (n != NULL)
689     {
690       n->current_preference /= 2;
691       preference_sum += n->current_preference;
692       n = n->next;
693     }    
694 }
695
696
697 /**
698  * Recalculate the number of bytes we expect to
699  * receive or transmit in a given window.
700  *
701  * @param force force an update now (even if not much time has passed)
702  * @param window pointer to the byte counter (updated)
703  * @param ts pointer to the timestamp (updated)
704  * @param bpm number of bytes per minute that should
705  *        be added to the window.
706  */
707 static void
708 update_window (int force,
709                long long *window,
710                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
711 {
712   struct GNUNET_TIME_Relative since;
713
714   since = GNUNET_TIME_absolute_get_duration (*ts);
715   if ( (force == GNUNET_NO) &&
716        (since.value < 60 * 1000) )
717     return;                     /* not even a minute has passed */
718   *ts = GNUNET_TIME_absolute_get ();
719   *window += (bpm * since.value) / 60 / 1000;
720   if (*window > MAX_WINDOW_TIME * bpm)
721     *window = MAX_WINDOW_TIME * bpm;
722 }
723
724
725 /**
726  * Find the entry for the given neighbour.
727  *
728  * @param peer identity of the neighbour
729  * @return NULL if we are not connected, otherwise the
730  *         neighbour's entry.
731  */
732 static struct Neighbour *
733 find_neighbour (const struct GNUNET_PeerIdentity *peer)
734 {
735   struct Neighbour *ret;
736
737   ret = neighbours;
738   while ((ret != NULL) &&
739          (0 != memcmp (&ret->peer,
740                        peer, sizeof (struct GNUNET_PeerIdentity))))
741     ret = ret->next;
742   return ret;
743 }
744
745
746 /**
747  * Find the entry for the given client.
748  *
749  * @param client handle for the client
750  * @return NULL if we are not connected, otherwise the
751  *         client's struct.
752  */
753 static struct Client *
754 find_client (const struct GNUNET_SERVER_Client *client)
755 {
756   struct Client *ret;
757
758   ret = clients;
759   while ((ret != NULL) && (client != ret->client_handle))
760     ret = ret->next;
761   return ret;
762 }
763
764
765 /**
766  * If necessary, initiate a request with the server to
767  * transmit messages from the queue of the given client.
768  * @param client who to transfer messages to
769  */
770 static void request_transmit (struct Client *client);
771
772
773 /**
774  * Client is ready to receive data, provide it.
775  *
776  * @param cls closure
777  * @param size number of bytes available in buf
778  * @param buf where the callee should write the message
779  * @return number of bytes written to buf
780  */
781 static size_t
782 do_client_transmit (void *cls, size_t size, void *buf)
783 {
784   struct Client *client = cls;
785   struct Event *e;
786   char *tgt;
787   size_t ret;
788
789   client->th = NULL;
790 #if DEBUG_CORE_CLIENT
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Client ready to receive %u bytes.\n", size);
793 #endif
794   if (buf == NULL)
795     {
796 #if DEBUG_CORE
797       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
798                   "Failed to transmit data to client (disconnect)?\n");
799 #endif
800       return 0;                 /* we'll surely get a disconnect soon... */
801     }
802   tgt = buf;
803   ret = 0;
804   while ((NULL != (e = client->event_head)) && (e->size <= size))
805     {
806       memcpy (&tgt[ret], &e[1], e->size);
807       size -= e->size;
808       ret += e->size;
809       client->event_head = e->next;
810       GNUNET_free (e);
811     }
812   GNUNET_assert (ret > 0);
813   if (client->event_head == NULL)
814     client->event_tail = NULL;
815 #if DEBUG_CORE_CLIENT
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "Transmitting %u bytes to client\n", ret);
818 #endif
819   request_transmit (client);
820   return ret;
821 }
822
823
824 /**
825  * If necessary, initiate a request with the server to
826  * transmit messages from the queue of the given client.
827  * @param client who to transfer messages to
828  */
829 static void
830 request_transmit (struct Client *client)
831 {
832
833   if (NULL != client->th)
834     return;                     /* already pending */
835   if (NULL == client->event_head)
836     return;                     /* no more events pending */
837 #if DEBUG_CORE_CLIENT
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Asking server to transmit %u bytes to client\n",
840               client->event_head->size);
841 #endif
842   client->th
843     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
844                                            client->event_head->size,
845                                            GNUNET_TIME_UNIT_FOREVER_REL,
846                                            &do_client_transmit, client);
847 }
848
849
850 /**
851  * Send a message to one of our clients.
852  * @param client target for the message
853  * @param msg message to transmit
854  * @param can_drop could this message be dropped if the
855  *        client's queue is getting too large?
856  */
857 static void
858 send_to_client (struct Client *client,
859                 const struct GNUNET_MessageHeader *msg, int can_drop)
860 {
861   struct Event *e;
862   unsigned int queue_size;
863   uint16_t msize;
864
865 #if DEBUG_CORE_CLIENT
866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867               "Preparing to send message of type %u to client.\n",
868               ntohs (msg->type));
869 #endif
870   queue_size = 0;
871   e = client->event_head;
872   while (e != NULL)
873     {
874       queue_size++;
875       e = e->next;
876     }
877   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
878        (can_drop == GNUNET_YES) )
879     {
880 #if DEBUG_CORE
881       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
882                   "Too many messages in queue for the client, dropping the new message.\n");
883 #endif
884       return;
885     }
886
887   msize = ntohs (msg->size);
888   e = GNUNET_malloc (sizeof (struct Event) + msize);
889   /* append */
890   if (client->event_tail != NULL)
891     client->event_tail->next = e;
892   else
893     client->event_head = e;
894   client->event_tail = e;
895   e->can_drop = can_drop;
896   e->size = msize;
897   memcpy (&e[1], msg, msize);
898   request_transmit (client);
899 }
900
901
902 /**
903  * Send a message to all of our current clients.
904  */
905 static void
906 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
907 {
908   struct Client *c;
909
910   c = clients;
911   while (c != NULL)
912     {
913       send_to_client (c, msg, can_drop);
914       c = c->next;
915     }
916 }
917
918
919 /**
920  * Handle CORE_INIT request.
921  */
922 static void
923 handle_client_init (void *cls,
924                     struct GNUNET_SERVER_Client *client,
925                     const struct GNUNET_MessageHeader *message)
926 {
927   const struct InitMessage *im;
928   struct InitReplyMessage irm;
929   struct Client *c;
930   uint16_t msize;
931   const uint16_t *types;
932   struct Neighbour *n;
933   struct ConnectNotifyMessage cnm;
934
935 #if DEBUG_CORE_CLIENT
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "Client connecting to core service with `%s' message\n",
938               "INIT");
939 #endif
940   /* check that we don't have an entry already */
941   c = clients;
942   while (c != NULL)
943     {
944       if (client == c->client_handle)
945         {
946           GNUNET_break (0);
947           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
948           return;
949         }
950       c = c->next;
951     }
952   msize = ntohs (message->size);
953   if (msize < sizeof (struct InitMessage))
954     {
955       GNUNET_break (0);
956       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
957       return;
958     }
959   im = (const struct InitMessage *) message;
960   types = (const uint16_t *) &im[1];
961   msize -= sizeof (struct InitMessage);
962   c = GNUNET_malloc (sizeof (struct Client) + msize);
963   c->client_handle = client;
964   c->next = clients;
965   clients = c;
966   memcpy (&c[1], types, msize);
967   c->types = (uint16_t *) & c[1];
968   c->options = ntohl (im->options);
969   c->tcnt = msize / sizeof (uint16_t);
970   /* send init reply message */
971   irm.header.size = htons (sizeof (struct InitReplyMessage));
972   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
973   irm.reserved = htonl (0);
974   memcpy (&irm.publicKey,
975           &my_public_key,
976           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
977 #if DEBUG_CORE_CLIENT
978   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979               "Sending `%s' message to client.\n", "INIT_REPLY");
980 #endif
981   send_to_client (c, &irm.header, GNUNET_NO);
982   /* notify new client about existing neighbours */
983   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
984   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
985   n = neighbours;
986   while (n != NULL)
987     {
988 #if DEBUG_CORE_CLIENT
989       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
991 #endif
992       cnm.bpm_available = htonl (n->bpm_out);
993       cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
994       cnm.peer = n->peer;
995       send_to_client (c, &cnm.header, GNUNET_NO);
996       n = n->next;
997     }
998   GNUNET_SERVER_receive_done (client, GNUNET_OK);
999 }
1000
1001
1002 /**
1003  * A client disconnected, clean up.
1004  *
1005  * @param cls closure
1006  * @param client identification of the client
1007  */
1008 static void
1009 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1010 {
1011   struct Client *pos;
1012   struct Client *prev;
1013   struct Event *e;
1014
1015 #if DEBUG_CORE_CLIENT
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               "Client has disconnected from core service.\n");
1018 #endif
1019   prev = NULL;
1020   pos = clients;
1021   while (pos != NULL)
1022     {
1023       if (client == pos->client_handle)
1024         {
1025           if (prev == NULL)
1026             clients = pos->next;
1027           else
1028             prev->next = pos->next;
1029           if (pos->th != NULL)
1030             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
1031           while (NULL != (e = pos->event_head))
1032             {
1033               pos->event_head = e->next;
1034               GNUNET_free (e);
1035             }
1036           GNUNET_free (pos);
1037           return;
1038         }
1039       prev = pos;
1040       pos = pos->next;
1041     }
1042   /* client never sent INIT */
1043 }
1044
1045
1046 /**
1047  * Handle REQUEST_CONFIGURE request.
1048  */
1049 static void
1050 handle_client_request_configure (void *cls,
1051                                  struct GNUNET_SERVER_Client *client,
1052                                  const struct GNUNET_MessageHeader *message)
1053 {
1054   const struct RequestConfigureMessage *rcm;
1055   struct Neighbour *n;
1056   struct ConfigurationInfoMessage cim;
1057   struct Client *c;
1058   int reserv;
1059   unsigned long long old_preference;
1060
1061 #if DEBUG_CORE_CLIENT
1062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063               "Core service receives `%s' request.\n", "CONFIGURE");
1064 #endif
1065   rcm = (const struct RequestConfigureMessage *) message;
1066   n = find_neighbour (&rcm->peer);
1067   memset (&cim, 0, sizeof (cim));
1068   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1069     {
1070       update_window (GNUNET_YES,
1071                      &n->available_send_window,
1072                      &n->last_asw_update,
1073                      n->bpm_out);
1074       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1075       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1076                                n->bpm_out_external_limit);
1077       reserv = ntohl (rcm->reserve_inbound);
1078       if (reserv < 0)
1079         {
1080           n->available_recv_window += reserv;
1081         }
1082       else if (reserv > 0)
1083         {
1084           update_window (GNUNET_NO,
1085                          &n->available_recv_window,
1086                          &n->last_arw_update, n->bpm_in);
1087           if (n->available_recv_window < reserv)
1088             reserv = n->available_recv_window;
1089           n->available_recv_window -= reserv;
1090         }
1091       old_preference = n->current_preference;
1092       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1093       if (old_preference > n->current_preference) 
1094         {
1095           /* overflow; cap at maximum value */
1096           n->current_preference = (unsigned long long) -1;
1097         }
1098       update_preference_sum (n->current_preference - old_preference);
1099       cim.reserved_amount = htonl (reserv);
1100       cim.bpm_in = htonl (n->bpm_in);
1101       cim.bpm_out = htonl (n->bpm_out);
1102       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1103       cim.preference = n->current_preference;
1104     }
1105   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1106   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1107   cim.peer = rcm->peer;
1108   c = find_client (client);
1109   if (c == NULL)
1110     {
1111       GNUNET_break (0);
1112       return;
1113     }
1114 #if DEBUG_CORE_CLIENT
1115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1117 #endif
1118   send_to_client (c, &cim.header, GNUNET_NO);
1119 }
1120
1121
1122 /**
1123  * Check if we have encrypted messages for the specified neighbour
1124  * pending, and if so, check with the transport about sending them
1125  * out.
1126  *
1127  * @param n neighbour to check.
1128  */
1129 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1130
1131
1132 /**
1133  * Function called when the transport service is ready to
1134  * receive an encrypted message for the respective peer
1135  *
1136  * @param cls neighbour to use message from
1137  * @param size number of bytes we can transmit
1138  * @param buf where to copy the message
1139  * @return number of bytes transmitted
1140  */
1141 static size_t
1142 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1143 {
1144   struct Neighbour *n = cls;
1145   struct MessageEntry *m;
1146   size_t ret;
1147   char *cbuf;
1148
1149   n->th = NULL;
1150   GNUNET_assert (NULL != (m = n->encrypted_head));
1151   n->encrypted_head = m->next;
1152   if (m->next == NULL)
1153     n->encrypted_tail = NULL;
1154   ret = 0;
1155   cbuf = buf;
1156   if (buf != NULL)
1157     {
1158       GNUNET_assert (size >= m->size);
1159       memcpy (cbuf, &m[1], m->size);
1160       ret = m->size;
1161       n->available_send_window -= m->size;
1162       process_encrypted_neighbour_queue (n);
1163 #if DEBUG_CORE
1164       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1166                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1167                   ret, GNUNET_i2s (&n->peer));
1168 #endif
1169     }
1170   else
1171     {
1172       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1173                   "Transmission for message of type %u and size %u failed\n",
1174                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1175                   m->size);
1176     }
1177   GNUNET_free (m);
1178   return ret;
1179 }
1180
1181
1182 /**
1183  * Check if we have plaintext messages for the specified neighbour
1184  * pending, and if so, consider batching and encrypting them (and
1185  * then trigger processing of the encrypted queue if needed).
1186  *
1187  * @param n neighbour to check.
1188  */
1189 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1190
1191
1192 /**
1193  * Check if we have encrypted messages for the specified neighbour
1194  * pending, and if so, check with the transport about sending them
1195  * out.
1196  *
1197  * @param n neighbour to check.
1198  */
1199 static void
1200 process_encrypted_neighbour_queue (struct Neighbour *n)
1201 {
1202   struct MessageEntry *m;
1203  
1204   if (n->th != NULL)
1205     return;  /* request already pending */
1206   if (n->encrypted_head == NULL)
1207     {
1208       /* encrypted queue empty, try plaintext instead */
1209       process_plaintext_neighbour_queue (n);
1210       return;
1211     }
1212 #if DEBUG_CORE
1213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1215               n->encrypted_head->size,
1216               GNUNET_i2s (&n->peer),
1217               GNUNET_TIME_absolute_get_remaining (n->
1218                                                   encrypted_head->deadline).
1219               value);
1220 #endif
1221   n->th =
1222     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1223                                             n->encrypted_head->size,
1224                                             n->encrypted_head->priority,
1225                                             GNUNET_TIME_absolute_get_remaining
1226                                             (n->encrypted_head->deadline),
1227                                             &notify_encrypted_transmit_ready,
1228                                             n);
1229   if (n->th == NULL)
1230     {
1231       /* message request too large (oops) */
1232       GNUNET_break (0);
1233       /* discard encrypted message */
1234       GNUNET_assert (NULL != (m = n->encrypted_head));
1235       n->encrypted_head = m->next;
1236       if (m->next == NULL)
1237         n->encrypted_tail = NULL;
1238       GNUNET_free (m);
1239       process_encrypted_neighbour_queue (n);
1240     }
1241 }
1242
1243
1244 /**
1245  * Decrypt size bytes from in and write the result to out.  Use the
1246  * key for inbound traffic of the given neighbour.  This function does
1247  * NOT do any integrity-checks on the result.
1248  *
1249  * @param n neighbour we are receiving from
1250  * @param iv initialization vector to use
1251  * @param in ciphertext
1252  * @param out plaintext
1253  * @param size size of in/out
1254  * @return GNUNET_OK on success
1255  */
1256 static int
1257 do_decrypt (struct Neighbour *n,
1258             const GNUNET_HashCode * iv,
1259             const void *in, void *out, size_t size)
1260 {
1261   if (size != (uint16_t) size)
1262     {
1263       GNUNET_break (0);
1264       return GNUNET_NO;
1265     }
1266   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1267       (n->status != PEER_STATE_KEY_CONFIRMED))
1268     {
1269       GNUNET_break_op (0);
1270       return GNUNET_SYSERR;
1271     }
1272   if (size !=
1273       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1274                                  in,
1275                                  (uint16_t) size,
1276                                  (const struct
1277                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1278                                  out))
1279     {
1280       GNUNET_break (0);
1281       return GNUNET_SYSERR;
1282     }
1283 #if DEBUG_CORE
1284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1285               "Decrypted %u bytes from `%4s' using key %u\n",
1286               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1287 #endif
1288   return GNUNET_OK;
1289 }
1290
1291
1292 /**
1293  * Encrypt size bytes from in and write the result to out.  Use the
1294  * key for outbound traffic of the given neighbour.
1295  *
1296  * @param n neighbour we are sending to
1297  * @param iv initialization vector to use
1298  * @param in ciphertext
1299  * @param out plaintext
1300  * @param size size of in/out
1301  * @return GNUNET_OK on success
1302  */
1303 static int
1304 do_encrypt (struct Neighbour *n,
1305             const GNUNET_HashCode * iv,
1306             const void *in, void *out, size_t size)
1307 {
1308   if (size != (uint16_t) size)
1309     {
1310       GNUNET_break (0);
1311       return GNUNET_NO;
1312     }
1313   GNUNET_assert (size ==
1314                  GNUNET_CRYPTO_aes_encrypt (in,
1315                                             (uint16_t) size,
1316                                             &n->encrypt_key,
1317                                             (const struct
1318                                              GNUNET_CRYPTO_AesInitializationVector
1319                                              *) iv, out));
1320 #if DEBUG_CORE
1321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1322               "Encrypted %u bytes for `%4s' using key %u\n", size,
1323               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1324 #endif
1325   return GNUNET_OK;
1326 }
1327
1328
1329 /**
1330  * Select messages for transmission.  This heuristic uses a combination
1331  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1332  * and priority-based discard (in case no feasible schedule exist) and
1333  * speculative optimization (defer any kind of transmission until
1334  * we either create a batch of significant size, 25% of max, or until
1335  * we are close to a deadline).  Furthermore, when scheduling the
1336  * heuristic also packs as many messages into the batch as possible,
1337  * starting with those with the earliest deadline.  Yes, this is fun.
1338  *
1339  * @param n neighbour to select messages from
1340  * @param size number of bytes to select for transmission
1341  * @param retry_time set to the time when we should try again
1342  *        (only valid if this function returns zero)
1343  * @return number of bytes selected, or 0 if we decided to
1344  *         defer scheduling overall; in that case, retry_time is set.
1345  */
1346 static size_t
1347 select_messages (struct Neighbour *n,
1348                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1349 {
1350   struct MessageEntry *pos;
1351   struct MessageEntry *min;
1352   struct MessageEntry *last;
1353   unsigned int min_prio;
1354   struct GNUNET_TIME_Absolute t;
1355   struct GNUNET_TIME_Absolute now;
1356   uint64_t delta;
1357   uint64_t avail;
1358   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1359   size_t off;
1360   int discard_low_prio;
1361
1362   GNUNET_assert (NULL != n->messages);
1363   now = GNUNET_TIME_absolute_get ();
1364   /* last entry in linked list of messages processed */
1365   last = NULL;
1366   /* should we remove the entry with the lowest
1367      priority from consideration for scheduling at the
1368      end of the loop? */
1369   discard_low_prio = GNUNET_YES;
1370   while (GNUNET_YES == discard_low_prio)
1371     {
1372       min = NULL;
1373       min_prio = -1;
1374       discard_low_prio = GNUNET_NO;
1375       /* calculate number of bytes available for transmission at time "t" */
1376       update_window (GNUNET_NO,
1377                      &n->available_send_window,
1378                      &n->last_asw_update,
1379                      n->bpm_out);
1380       avail = n->available_send_window;
1381       t = n->last_asw_update;
1382       /* how many bytes have we (hypothetically) scheduled so far */
1383       off = 0;
1384       /* maximum time we can wait before transmitting anything
1385          and still make all of our deadlines */
1386       slack = -1;
1387
1388       pos = n->messages;
1389       /* note that we use "*2" here because we want to look
1390          a bit further into the future; much more makes no
1391          sense since new message might be scheduled in the
1392          meantime... */
1393       while ((pos != NULL) && (off < size * 2))
1394         {
1395           if (pos->do_transmit == GNUNET_YES)
1396             {
1397               /* already removed from consideration */
1398               pos = pos->next;
1399               continue;
1400             }
1401           if (discard_low_prio == GNUNET_NO)
1402             {
1403               delta = pos->deadline.value;
1404               if (delta < t.value)
1405                 delta = 0;
1406               else
1407                 delta = t.value - delta;
1408               avail += delta * n->bpm_out / 1000 / 60;
1409               if (avail < pos->size)
1410                 {
1411                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1412                 }
1413               else
1414                 {
1415                   avail -= pos->size;
1416                   /* update slack, considering both its absolute deadline
1417                      and relative deadlines caused by other messages
1418                      with their respective load */
1419                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1420                   if (pos->deadline.value < now.value)
1421                     slack = 0;
1422                   else
1423                     slack =
1424                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1425                 }
1426             }
1427           off += pos->size;
1428           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1429           if (pos->priority <= min_prio)
1430             {
1431               /* update min for discard */
1432               min_prio = pos->priority;
1433               min = pos;
1434             }
1435           pos = pos->next;
1436         }
1437       if (discard_low_prio)
1438         {
1439           GNUNET_assert (min != NULL);
1440           /* remove lowest-priority entry from consideration */
1441           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1442         }
1443       last = pos;
1444     }
1445   /* guard against sending "tiny" messages with large headers without
1446      urgent deadlines */
1447   if ( (slack > 1000) && (size > 4 * off) )
1448     {
1449       /* less than 25% of message would be filled with
1450          deadlines still being met if we delay by one
1451          second or more; so just wait for more data */
1452       retry_time->value = slack / 2;
1453       /* reset do_transmit values for next time */
1454       while (pos != last)
1455         {
1456           pos->do_transmit = GNUNET_NO;
1457           pos = pos->next;
1458         }
1459       return 0;
1460     }
1461   /* select marked messages (up to size) for transmission */
1462   off = 0;
1463   pos = n->messages;
1464   while (pos != last)
1465     {
1466       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1467         {
1468           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1469           off += pos->size;
1470           size -= pos->size;
1471         }
1472       else
1473         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1474       pos = pos->next;
1475     }
1476 #if DEBUG_CORE
1477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1478               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1479               off, GNUNET_i2s (&n->peer));
1480 #endif
1481   return off;
1482 }
1483
1484
1485 /**
1486  * Batch multiple messages into a larger buffer.
1487  *
1488  * @param n neighbour to take messages from
1489  * @param buf target buffer
1490  * @param size size of buf
1491  * @param deadline set to transmission deadline for the result
1492  * @param retry_time set to the time when we should try again
1493  *        (only valid if this function returns zero)
1494  * @param priority set to the priority of the batch
1495  * @return number of bytes written to buf (can be zero)
1496  */
1497 static size_t
1498 batch_message (struct Neighbour *n,
1499                char *buf,
1500                size_t size,
1501                struct GNUNET_TIME_Absolute *deadline,
1502                struct GNUNET_TIME_Relative *retry_time,
1503                unsigned int *priority)
1504 {
1505   struct MessageEntry *pos;
1506   struct MessageEntry *prev;
1507   struct MessageEntry *next;
1508   size_t ret;
1509
1510   ret = 0;
1511   *priority = 0;
1512   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1513   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1514   if (0 == select_messages (n, size, retry_time))
1515     {
1516       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1517                   "No messages selected, will try again in %llu ms\n",
1518                   retry_time->value);
1519       return 0;
1520     }
1521   pos = n->messages;
1522   prev = NULL;
1523   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1524     {
1525       next = pos->next;
1526       if (GNUNET_YES == pos->do_transmit)
1527         {
1528           GNUNET_assert (pos->size <= size);
1529           memcpy (&buf[ret], &pos[1], pos->size);
1530           ret += pos->size;
1531           size -= pos->size;
1532           *priority += pos->priority;
1533           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1534           GNUNET_free (pos);
1535           if (prev == NULL)
1536             n->messages = next;
1537           else
1538             prev->next = next;
1539         }
1540       else
1541         {
1542           prev = pos;
1543         }
1544       pos = next;
1545     }
1546   return ret;
1547 }
1548
1549
1550 /**
1551  * Remove messages with deadlines that have long expired from
1552  * the queue.
1553  *
1554  * @param n neighbour to inspect
1555  */
1556 static void
1557 discard_expired_messages (struct Neighbour *n)
1558 {
1559   struct MessageEntry *prev;
1560   struct MessageEntry *next;
1561   struct MessageEntry *pos;
1562   struct GNUNET_TIME_Absolute now;
1563   struct GNUNET_TIME_Relative delta;
1564
1565   now = GNUNET_TIME_absolute_get ();
1566   prev = NULL;
1567   pos = n->messages;
1568   while (pos != NULL) 
1569     {
1570       next = pos->next;
1571       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1572       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1573         {
1574 #if DEBUG_CORE
1575           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1576                       "Message is %llu ms past due, discarding.\n",
1577                       delta.value);
1578 #endif
1579           if (prev == NULL)
1580             n->messages = next;
1581           else
1582             prev->next = next;
1583           GNUNET_free (pos);
1584         }
1585       else
1586         prev = pos;
1587       pos = next;
1588     }
1589 }
1590
1591
1592 /**
1593  * Signature of the main function of a task.
1594  *
1595  * @param cls closure
1596  * @param tc context information (why was this task triggered now)
1597  */
1598 static void
1599 retry_plaintext_processing (void *cls,
1600                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1601 {
1602   struct Neighbour *n = cls;
1603
1604   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1605   process_plaintext_neighbour_queue (n);
1606 }
1607
1608
1609 /**
1610  * Send our key (and encrypted PING) to the other peer.
1611  *
1612  * @param n the other peer
1613  */
1614 static void send_key (struct Neighbour *n);
1615
1616
1617 /**
1618  * Check if we have plaintext messages for the specified neighbour
1619  * pending, and if so, consider batching and encrypting them (and
1620  * then trigger processing of the encrypted queue if needed).
1621  *
1622  * @param n neighbour to check.
1623  */
1624 static void
1625 process_plaintext_neighbour_queue (struct Neighbour *n)
1626 {
1627   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1628   size_t used;
1629   size_t esize;
1630   struct EncryptedMessage *em;  /* encrypted message */
1631   struct EncryptedMessage *ph;  /* plaintext header */
1632   struct MessageEntry *me;
1633   unsigned int priority;
1634   struct GNUNET_TIME_Absolute deadline;
1635   struct GNUNET_TIME_Relative retry_time;
1636
1637   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1638     {
1639       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1640       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1641     }
1642   switch (n->status)
1643     {
1644     case PEER_STATE_DOWN:
1645       send_key (n);
1646 #if DEBUG_CORE
1647       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1648                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1649                   GNUNET_i2s(&n->peer));
1650 #endif
1651       return;
1652     case PEER_STATE_KEY_SENT:
1653       GNUNET_assert (n->retry_set_key_task !=
1654                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1655 #if DEBUG_CORE
1656       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1657                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1658                   GNUNET_i2s(&n->peer));
1659 #endif
1660       return;
1661     case PEER_STATE_KEY_RECEIVED:
1662       GNUNET_assert (n->retry_set_key_task !=
1663                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1664 #if DEBUG_CORE
1665       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1666                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1667                   GNUNET_i2s(&n->peer));
1668 #endif
1669       return;
1670     case PEER_STATE_KEY_CONFIRMED:
1671       /* ready to continue */
1672       break;
1673     }
1674   discard_expired_messages (n);
1675   if (n->messages == NULL)
1676     {
1677 #if DEBUG_CORE
1678       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1679                   "Plaintext message queue for `%4s' is empty.\n",
1680                   GNUNET_i2s(&n->peer));
1681 #endif
1682       return;                   /* no pending messages */
1683     }
1684   if (n->encrypted_head != NULL)
1685     {
1686 #if DEBUG_CORE
1687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1689                   GNUNET_i2s(&n->peer));
1690 #endif
1691       return;                   /* wait for messages already encrypted to be
1692                                    processed first! */
1693     }
1694   ph = (struct EncryptedMessage *) pbuf;
1695   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1696   priority = 0;
1697   used = sizeof (struct EncryptedMessage);
1698   used += batch_message (n,
1699                          &pbuf[used],
1700                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1701                          &deadline, &retry_time, &priority);
1702   if (used == sizeof (struct EncryptedMessage))
1703     {
1704 #if DEBUG_CORE
1705       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1706                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1707                   GNUNET_i2s(&n->peer));
1708 #endif
1709       /* no messages selected for sending, try again later... */
1710       n->retry_plaintext_task =
1711         GNUNET_SCHEDULER_add_delayed (sched,
1712                                       GNUNET_NO,
1713                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1714                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1715                                       retry_time,
1716                                       &retry_plaintext_processing, n);
1717       return;
1718     }
1719
1720   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1721   ph->inbound_bpm_limit = htonl (n->bpm_in);
1722   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1723
1724   /* setup encryption message header */
1725   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1726   me->deadline = deadline;
1727   me->priority = priority;
1728   me->size = used;
1729   em = (struct EncryptedMessage *) &me[1];
1730   em->header.size = htons (used);
1731   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1732   em->reserved = htonl (0);
1733   esize = used - ENCRYPTED_HEADER_SIZE;
1734   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1735   /* encrypt */
1736 #if DEBUG_CORE
1737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1738               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1739               esize,
1740               GNUNET_i2s(&n->peer));
1741 #endif
1742   GNUNET_assert (GNUNET_OK ==
1743                  do_encrypt (n,
1744                              &em->plaintext_hash,
1745                              &ph->sequence_number,
1746                              &em->sequence_number, esize));
1747   /* append to transmission list */
1748   if (n->encrypted_tail == NULL)
1749     n->encrypted_head = me;
1750   else
1751     n->encrypted_tail->next = me;
1752   n->encrypted_tail = me;
1753   process_encrypted_neighbour_queue (n);
1754 }
1755
1756
1757 /**
1758  * Handle CORE_SEND request.
1759  */
1760 static void
1761 handle_client_send (void *cls,
1762                     struct GNUNET_SERVER_Client *client,
1763                     const struct GNUNET_MessageHeader *message);
1764
1765
1766 /**
1767  * Function called to notify us that we either succeeded
1768  * or failed to connect (at the transport level) to another
1769  * peer.  We should either free the message we were asked
1770  * to transmit or re-try adding it to the queue.
1771  *
1772  * @param cls closure
1773  * @param size number of bytes available in buf
1774  * @param buf where the callee should write the message
1775  * @return number of bytes written to buf
1776  */
1777 static size_t
1778 send_connect_continuation (void *cls, size_t size, void *buf)
1779 {
1780   struct SendMessage *sm = cls;
1781
1782   if (buf == NULL)
1783     {
1784 #if DEBUG_CORE
1785       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1786                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1787                   GNUNET_i2s (&sm->peer));
1788 #endif
1789       GNUNET_free (sm);
1790       return 0;
1791     }
1792 #if DEBUG_CORE
1793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1795               GNUNET_i2s (&sm->peer));
1796 #endif
1797   handle_client_send (NULL, NULL, &sm->header);
1798   GNUNET_free (sm);
1799   return 0;
1800 }
1801
1802
1803 /**
1804  * Handle CORE_SEND request.
1805  */
1806 static void
1807 handle_client_send (void *cls,
1808                     struct GNUNET_SERVER_Client *client,
1809                     const struct GNUNET_MessageHeader *message)
1810 {
1811   const struct SendMessage *sm;
1812   struct SendMessage *smc;
1813   const struct GNUNET_MessageHeader *mh;
1814   struct Neighbour *n;
1815   struct MessageEntry *prev;
1816   struct MessageEntry *pos;
1817   struct MessageEntry *e; 
1818   struct MessageEntry *min_prio_entry;
1819   struct MessageEntry *min_prio_prev;
1820   unsigned int min_prio;
1821   unsigned int queue_size;
1822   uint16_t msize;
1823
1824   msize = ntohs (message->size);
1825   if (msize <
1826       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1827     {
1828       GNUNET_break (0);
1829       if (client != NULL)
1830         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1831       return;
1832     }
1833   sm = (const struct SendMessage *) message;
1834   msize -= sizeof (struct SendMessage);
1835   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1836   if (msize != ntohs (mh->size))
1837     {
1838       GNUNET_break (0);
1839       if (client != NULL)
1840         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1841       return;
1842     }
1843   n = find_neighbour (&sm->peer);
1844   if (n == NULL)
1845     {
1846 #if DEBUG_CORE
1847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1849                   "SEND",
1850                   GNUNET_i2s (&sm->peer),
1851                   GNUNET_TIME_absolute_get_remaining
1852                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1853 #endif
1854       msize += sizeof (struct SendMessage);
1855       /* ask transport to connect to the peer */
1856       smc = GNUNET_malloc (msize);
1857       memcpy (smc, sm, msize);
1858       if (NULL ==
1859           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1860                                                   &sm->peer,
1861                                                   0, 0,
1862                                                   GNUNET_TIME_absolute_get_remaining
1863                                                   (GNUNET_TIME_absolute_ntoh
1864                                                    (sm->deadline)),
1865                                                   &send_connect_continuation,
1866                                                   smc))
1867         {
1868           /* transport has already a request pending for this peer! */
1869 #if DEBUG_CORE
1870           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1871                       "Dropped second message destined for `%4s' since connection is still down.\n",
1872                       GNUNET_i2s(&sm->peer));
1873 #endif
1874           GNUNET_free (smc);
1875         }
1876       if (client != NULL)
1877         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1878       return;
1879     }
1880 #if DEBUG_CORE
1881   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1882               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1883               "SEND",
1884               msize, 
1885               GNUNET_i2s (&sm->peer));
1886 #endif
1887   /* bound queue size */
1888   discard_expired_messages (n);
1889   min_prio = (unsigned int) -1;
1890   queue_size = 0;
1891   prev = NULL;
1892   pos = n->messages;
1893   while (pos != NULL) 
1894     {
1895       if (pos->priority < min_prio)
1896         {
1897           min_prio_entry = pos;
1898           min_prio_prev = prev;
1899           min_prio = pos->priority;
1900         }
1901       queue_size++;
1902       prev = pos;
1903       pos = pos->next;
1904     }
1905   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1906     {
1907       /* queue full */
1908       if (ntohl(sm->priority) <= min_prio)
1909         {
1910           /* discard new entry */
1911 #if DEBUG_CORE
1912           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1913                       "Queue full, discarding new request\n");
1914 #endif
1915           if (client != NULL)
1916             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1917           return;
1918         }
1919       /* discard "min_prio_entry" */
1920 #if DEBUG_CORE
1921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922                   "Queue full, discarding existing older request\n");
1923 #endif
1924       if (min_prio_prev == NULL)
1925         n->messages = min_prio_entry->next;
1926       else
1927         min_prio_prev->next = min_prio_entry->next;      
1928       GNUNET_free (min_prio_entry);     
1929     }
1930   
1931   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1932   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1933   e->priority = ntohl (sm->priority);
1934   e->size = msize;
1935   memcpy (&e[1], mh, msize);
1936
1937   /* insert, keep list sorted by deadline */
1938   prev = NULL;
1939   pos = n->messages;
1940   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1941     {
1942       prev = pos;
1943       pos = pos->next;
1944     }
1945   if (prev == NULL)
1946     n->messages = e;
1947   else
1948     prev->next = e;
1949   e->next = pos;
1950
1951   /* consider scheduling now */
1952   process_plaintext_neighbour_queue (n);
1953   if (client != NULL)
1954     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1955 }
1956
1957
1958 /**
1959  * List of handlers for the messages understood by this
1960  * service.
1961  */
1962 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1963   {&handle_client_init, NULL,
1964    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1965   {&handle_client_request_configure, NULL,
1966    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1967    sizeof (struct RequestConfigureMessage)},
1968   {&handle_client_send, NULL,
1969    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1970   {NULL, NULL, 0, 0}
1971 };
1972
1973
1974 /**
1975  * PEERINFO is giving us a HELLO for a peer.  Add the
1976  * public key to the neighbour's struct and retry
1977  * send_key.  Or, if we did not get a HELLO, just do
1978  * nothing.
1979  *
1980  * @param cls NULL
1981  * @param peer the peer for which this is the HELLO
1982  * @param hello HELLO message of that peer
1983  * @param trust amount of trust we currently have in that peer
1984  */
1985 static void
1986 process_hello_retry_send_key (void *cls,
1987                               const struct GNUNET_PeerIdentity *peer,
1988                               const struct GNUNET_HELLO_Message *hello,
1989                               uint32_t trust)
1990 {
1991   struct Neighbour *n;
1992
1993   if (peer == NULL)
1994     return;
1995   n = find_neighbour (peer);
1996   if (n == NULL)
1997     return;
1998   if (n->public_key != NULL)
1999     return;
2000 #if DEBUG_CORE
2001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002               "Received new `%s' message for `%4s', initiating key exchange.\n",
2003               "HELLO",
2004               GNUNET_i2s (peer));
2005 #endif
2006   n->public_key =
2007     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2008   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2009     {
2010       GNUNET_free (n->public_key);
2011       n->public_key = NULL;
2012       return;
2013     }
2014   send_key (n);
2015 }
2016
2017
2018 /**
2019  * Task that will retry "send_key" if our previous attempt failed
2020  * to yield a PONG.
2021  */
2022 static void
2023 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2024 {
2025   struct Neighbour *n = cls;
2026
2027   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2028   n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2029   n->set_key_retry_frequency =
2030     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2031   send_key (n);
2032 }
2033
2034
2035 /**
2036  * Send our key (and encrypted PING) to the other peer.
2037  *
2038  * @param n the other peer
2039  */
2040 static void
2041 send_key (struct Neighbour *n)
2042 {
2043   struct SetKeyMessage *sm;
2044   struct MessageEntry *me;
2045   struct PingMessage pp;
2046   struct PingMessage *pm;
2047
2048 #if DEBUG_CORE
2049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050               "Asked to perform key exchange with `%4s'.\n",
2051               GNUNET_i2s (&n->peer));
2052 #endif
2053   if (n->public_key == NULL)
2054     {
2055       /* lookup n's public key, then try again */
2056 #if DEBUG_CORE
2057       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058                   "Lacking public key for `%4s', trying to obtain one.\n",
2059                   GNUNET_i2s (&n->peer));
2060 #endif
2061       GNUNET_PEERINFO_for_all (cfg,
2062                                sched,
2063                                &n->peer,
2064                                0,
2065                                GNUNET_TIME_UNIT_MINUTES,
2066                                &process_hello_retry_send_key, NULL);
2067       return;
2068     }
2069   /* first, set key message */
2070   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2071                       sizeof (struct SetKeyMessage));
2072   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2073   me->priority = SET_KEY_PRIORITY;
2074   me->size = sizeof (struct SetKeyMessage);
2075   if (n->encrypted_head == NULL)
2076     n->encrypted_head = me;
2077   else
2078     n->encrypted_tail->next = me;
2079   n->encrypted_tail = me;
2080   sm = (struct SetKeyMessage *) &me[1];
2081   sm->header.size = htons (sizeof (struct SetKeyMessage));
2082   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2083   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2084                                         PEER_STATE_KEY_SENT : n->status));
2085   sm->purpose.size =
2086     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2087            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2088            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2089            sizeof (struct GNUNET_PeerIdentity));
2090   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2091   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2092   sm->target = n->peer;
2093   GNUNET_assert (GNUNET_OK ==
2094                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2095                                             sizeof (struct
2096                                                     GNUNET_CRYPTO_AesSessionKey),
2097                                             n->public_key,
2098                                             &sm->encrypted_key));
2099   GNUNET_assert (GNUNET_OK ==
2100                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2101                                          &sm->signature));
2102
2103   /* second, encrypted PING message */
2104   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2105                       sizeof (struct PingMessage));
2106   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2107   me->priority = PING_PRIORITY;
2108   me->size = sizeof (struct PingMessage);
2109   n->encrypted_tail->next = me;
2110   n->encrypted_tail = me;
2111   pm = (struct PingMessage *) &me[1];
2112   pm->header.size = htons (sizeof (struct PingMessage));
2113   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2114   pp.challenge = htonl (n->ping_challenge);
2115   pp.target = n->peer;
2116 #if DEBUG_CORE
2117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2118               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2119               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2120   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2121               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2122               "PING",
2123               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2124 #endif
2125   do_encrypt (n,
2126               &n->peer.hashPubKey,
2127               &pp.challenge,
2128               &pm->challenge,
2129               sizeof (struct PingMessage) -
2130               sizeof (struct GNUNET_MessageHeader));
2131   /* update status */
2132   switch (n->status)
2133     {
2134     case PEER_STATE_DOWN:
2135       n->status = PEER_STATE_KEY_SENT;
2136       break;
2137     case PEER_STATE_KEY_SENT:
2138       break;
2139     case PEER_STATE_KEY_RECEIVED:
2140       break;
2141     case PEER_STATE_KEY_CONFIRMED:
2142       GNUNET_break (0);
2143       break;
2144     default:
2145       GNUNET_break (0);
2146       break;
2147     }
2148   /* trigger queue processing */
2149   process_encrypted_neighbour_queue (n);
2150   if (n->status != PEER_STATE_KEY_CONFIRMED)
2151     n->retry_set_key_task
2152       = GNUNET_SCHEDULER_add_delayed (sched,
2153                                       GNUNET_NO,
2154                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2155                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2156                                       n->set_key_retry_frequency,
2157                                       &set_key_retry_task, n);
2158 }
2159
2160
2161 /**
2162  * We received a SET_KEY message.  Validate and update
2163  * our key material and status.
2164  *
2165  * @param n the neighbour from which we received message m
2166  * @param m the set key message we received
2167  */
2168 static void
2169 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2170
2171
2172 /**
2173  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2174  * the neighbour's struct and retry handling the set_key message.  Or,
2175  * if we did not get a HELLO, just free the set key message.
2176  *
2177  * @param cls pointer to the set key message
2178  * @param peer the peer for which this is the HELLO
2179  * @param hello HELLO message of that peer
2180  * @param trust amount of trust we currently have in that peer
2181  */
2182 static void
2183 process_hello_retry_handle_set_key (void *cls,
2184                                     const struct GNUNET_PeerIdentity *peer,
2185                                     const struct GNUNET_HELLO_Message *hello,
2186                                     uint32_t trust)
2187 {
2188   struct SetKeyMessage *sm = cls;
2189   struct Neighbour *n;
2190
2191   if (peer == NULL)
2192     {
2193       GNUNET_free (sm);
2194       return;
2195     }
2196   n = find_neighbour (peer);
2197   if (n == NULL)
2198     {
2199       GNUNET_break (0);
2200       return;
2201     }
2202   if (n->public_key != NULL)
2203     return;                     /* multiple HELLOs match!? */
2204   n->public_key =
2205     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2206   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2207     {
2208       GNUNET_break_op (0);
2209       GNUNET_free (n->public_key);
2210       n->public_key = NULL;
2211       return;
2212     }
2213 #if DEBUG_CORE
2214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2216               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2217 #endif
2218   handle_set_key (n, sm);
2219 }
2220
2221
2222 /**
2223  * We received a PING message.  Validate and transmit
2224  * PONG.
2225  *
2226  * @param n sender of the PING
2227  * @param m the encrypted PING message itself
2228  */
2229 static void
2230 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2231 {
2232   struct PingMessage t;
2233   struct PingMessage *tp;
2234   struct MessageEntry *me;
2235
2236 #if DEBUG_CORE
2237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2238               "Core service receives `%s' request from `%4s'.\n",
2239               "PING", GNUNET_i2s (&n->peer));
2240 #endif
2241   if (GNUNET_OK !=
2242       do_decrypt (n,
2243                   &my_identity.hashPubKey,
2244                   &m->challenge,
2245                   &t.challenge,
2246                   sizeof (struct PingMessage) -
2247                   sizeof (struct GNUNET_MessageHeader)))
2248     return;
2249 #if DEBUG_CORE
2250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2251               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2252               "PING",
2253               GNUNET_i2s (&t.target),
2254               ntohl (t.challenge), n->decrypt_key.crc32);
2255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2256               "Target of `%s' request is `%4s'.\n",
2257               "PING", GNUNET_i2s (&t.target));
2258 #endif
2259   if (0 != memcmp (&t.target,
2260                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2261     {
2262       GNUNET_break_op (0);
2263       return;
2264     }
2265   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2266                       sizeof (struct PingMessage));
2267   if (n->encrypted_tail != NULL)
2268     n->encrypted_tail->next = me;
2269   else
2270     {
2271       n->encrypted_tail = me;
2272       n->encrypted_head = me;
2273     }
2274   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2275   me->priority = PONG_PRIORITY;
2276   me->size = sizeof (struct PingMessage);
2277   tp = (struct PingMessage *) &me[1];
2278   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2279   tp->header.size = htons (sizeof (struct PingMessage));
2280   do_encrypt (n,
2281               &my_identity.hashPubKey,
2282               &t.challenge,
2283               &tp->challenge,
2284               sizeof (struct PingMessage) -
2285               sizeof (struct GNUNET_MessageHeader));
2286 #if DEBUG_CORE
2287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2288               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2289               ntohl (t.challenge), n->encrypt_key.crc32);
2290 #endif
2291   /* trigger queue processing */
2292   process_encrypted_neighbour_queue (n);
2293 }
2294
2295
2296 /**
2297  * We received a SET_KEY message.  Validate and update
2298  * our key material and status.
2299  *
2300  * @param n the neighbour from which we received message m
2301  * @param m the set key message we received
2302  */
2303 static void
2304 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2305 {
2306   struct SetKeyMessage *m_cpy;
2307   struct GNUNET_TIME_Absolute t;
2308   struct GNUNET_CRYPTO_AesSessionKey k;
2309   struct PingMessage *ping;
2310   enum PeerStateMachine sender_status;
2311
2312 #if DEBUG_CORE
2313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2314               "Core service receives `%s' request from `%4s'.\n",
2315               "SET_KEY", GNUNET_i2s (&n->peer));
2316 #endif
2317   if (n->public_key == NULL)
2318     {
2319 #if DEBUG_CORE
2320       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321                   "Lacking public key for peer, trying to obtain one.\n");
2322 #endif
2323       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2324       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2325       /* lookup n's public key, then try again */
2326       GNUNET_PEERINFO_for_all (cfg,
2327                                sched,
2328                                &n->peer,
2329                                0,
2330                                GNUNET_TIME_UNIT_MINUTES,
2331                                &process_hello_retry_handle_set_key, m_cpy);
2332       return;
2333     }
2334   if ((ntohl (m->purpose.size) !=
2335        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2336        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2337        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2338        sizeof (struct GNUNET_PeerIdentity)) ||
2339       (GNUNET_OK !=
2340        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2341                                  &m->purpose, &m->signature, n->public_key)))
2342     {
2343       /* invalid signature */
2344       GNUNET_break_op (0);
2345       return;
2346     }
2347   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2348   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2349        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2350       (t.value < n->decrypt_key_created.value))
2351     {
2352       /* this could rarely happen due to massive re-ordering of
2353          messages on the network level, but is most likely either
2354          a bug or some adversary messing with us.  Report. */
2355       GNUNET_break_op (0);
2356       return;
2357     }
2358 #if DEBUG_CORE
2359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2360 #endif
2361   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2362                                   &m->encrypted_key,
2363                                   &k,
2364                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2365        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2366       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2367     {
2368       /* failed to decrypt !? */
2369       GNUNET_break_op (0);
2370       return;
2371     }
2372
2373   n->decrypt_key = k;
2374   if (n->decrypt_key_created.value != t.value)
2375     {
2376       /* fresh key, reset sequence numbers */
2377       n->last_sequence_number_received = 0;
2378       n->last_packets_bitmap = 0;
2379       n->decrypt_key_created = t;
2380     }
2381   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2382   switch (n->status)
2383     {
2384     case PEER_STATE_DOWN:
2385       n->status = PEER_STATE_KEY_RECEIVED;
2386 #if DEBUG_CORE
2387       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2388                   "Responding to `%s' with my own key.\n", "SET_KEY");
2389 #endif
2390       send_key (n);
2391       break;
2392     case PEER_STATE_KEY_SENT:
2393     case PEER_STATE_KEY_RECEIVED:
2394       n->status = PEER_STATE_KEY_RECEIVED;
2395       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2396           (sender_status != PEER_STATE_KEY_CONFIRMED))
2397         {
2398 #if DEBUG_CORE
2399           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400                       "Responding to `%s' with my own key (other peer has status %u).\n",
2401                       "SET_KEY", sender_status);
2402 #endif
2403           send_key (n);
2404         }
2405       break;
2406     case PEER_STATE_KEY_CONFIRMED:
2407       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2408           (sender_status != PEER_STATE_KEY_CONFIRMED))
2409         {
2410 #if DEBUG_CORE
2411           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2412                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2413                       "SET_KEY", sender_status);
2414 #endif
2415           send_key (n);
2416         }
2417       break;
2418     default:
2419       GNUNET_break (0);
2420       break;
2421     }
2422   if (n->pending_ping != NULL)
2423     {
2424       ping = n->pending_ping;
2425       n->pending_ping = NULL;
2426       handle_ping (n, ping);
2427       GNUNET_free (ping);
2428     }
2429 }
2430
2431
2432 /**
2433  * We received a PONG message.  Validate and update
2434  * our status.
2435  *
2436  * @param n sender of the PONG
2437  * @param m the encrypted PONG message itself
2438  */
2439 static void
2440 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2441 {
2442   struct PingMessage t;
2443
2444 #if DEBUG_CORE
2445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446               "Core service receives `%s' request from `%4s'.\n",
2447               "PONG", GNUNET_i2s (&n->peer));
2448 #endif
2449   if (GNUNET_OK !=
2450       do_decrypt (n,
2451                   &n->peer.hashPubKey,
2452                   &m->challenge,
2453                   &t.challenge,
2454                   sizeof (struct PingMessage) -
2455                   sizeof (struct GNUNET_MessageHeader)))
2456     return;
2457 #if DEBUG_CORE
2458   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2459               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2460               "PONG",
2461               GNUNET_i2s (&t.target),
2462               ntohl (t.challenge), n->decrypt_key.crc32);
2463 #endif
2464   if ((0 != memcmp (&t.target,
2465                     &n->peer,
2466                     sizeof (struct GNUNET_PeerIdentity))) ||
2467       (n->ping_challenge != ntohl (t.challenge)))
2468     {
2469       /* PONG malformed */
2470 #if DEBUG_CORE
2471       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2472                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2473                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2474       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2476                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2477 #endif
2478       GNUNET_break_op (0);
2479       return;
2480     }
2481   switch (n->status)
2482     {
2483     case PEER_STATE_DOWN:
2484       GNUNET_break (0);         /* should be impossible */
2485       return;
2486     case PEER_STATE_KEY_SENT:
2487       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2488       return;
2489     case PEER_STATE_KEY_RECEIVED:
2490       n->status = PEER_STATE_KEY_CONFIRMED;
2491       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
2492         {
2493           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2494           n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2495         }
2496       process_encrypted_neighbour_queue (n);
2497       break;
2498     case PEER_STATE_KEY_CONFIRMED:
2499       /* duplicate PONG? */
2500       break;
2501     default:
2502       GNUNET_break (0);
2503       break;
2504     }
2505 }
2506
2507
2508 /**
2509  * Send a P2P message to a client.
2510  *
2511  * @param sender who sent us the message?
2512  * @param client who should we give the message to?
2513  * @param m contains the message to transmit
2514  * @param msize number of bytes in buf to transmit
2515  */
2516 static void
2517 send_p2p_message_to_client (struct Neighbour *sender,
2518                             struct Client *client,
2519                             const void *m, size_t msize)
2520 {
2521   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2522   struct NotifyTrafficMessage *ntm;
2523
2524 #if DEBUG_CORE
2525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526               "Core service passes message from `%4s' of type %u to client.\n",
2527               GNUNET_i2s(&sender->peer),
2528               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2529 #endif
2530   ntm = (struct NotifyTrafficMessage *) buf;
2531   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2532   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2533   ntm->reserved = htonl (0);
2534   ntm->peer = sender->peer;
2535   memcpy (&ntm[1], m, msize);
2536   send_to_client (client, &ntm->header, GNUNET_YES);
2537 }
2538
2539
2540 /**
2541  * Deliver P2P message to interested clients.
2542  *
2543  * @param sender who sent us the message?
2544  * @param m the message
2545  * @param msize size of the message (including header)
2546  */
2547 static void
2548 deliver_message (struct Neighbour *sender,
2549                  const struct GNUNET_MessageHeader *m, size_t msize)
2550 {
2551   struct Client *cpos;
2552   uint16_t type;
2553   unsigned int tpos;
2554   int deliver_full;
2555
2556   type = ntohs (m->type);
2557   cpos = clients;
2558   while (cpos != NULL)
2559     {
2560       deliver_full = GNUNET_NO;
2561       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2562         deliver_full = GNUNET_YES;
2563       else
2564         {
2565           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2566             {
2567               if (type != cpos->types[tpos])
2568                 continue;
2569               deliver_full = GNUNET_YES;
2570               break;
2571             }
2572         }
2573       if (GNUNET_YES == deliver_full)
2574         send_p2p_message_to_client (sender, cpos, m, msize);
2575       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2576         send_p2p_message_to_client (sender, cpos, m,
2577                                     sizeof (struct GNUNET_MessageHeader));
2578       cpos = cpos->next;
2579     }
2580 }
2581
2582
2583 /**
2584  * Align P2P message and then deliver to interested clients.
2585  *
2586  * @param sender who sent us the message?
2587  * @param buffer unaligned (!) buffer containing message
2588  * @param msize size of the message (including header)
2589  */
2590 static void
2591 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2592 {
2593   char abuf[msize];
2594
2595   /* TODO: call to statistics? */
2596   memcpy (abuf, buffer, msize);
2597   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2598 }
2599
2600
2601 /**
2602  * Deliver P2P messages to interested clients.
2603  *
2604  * @param sender who sent us the message?
2605  * @param buffer buffer containing messages, can be modified
2606  * @param buffer_size size of the buffer (overall)
2607  * @param offset offset where messages in the buffer start
2608  */
2609 static void
2610 deliver_messages (struct Neighbour *sender,
2611                   const char *buffer, size_t buffer_size, size_t offset)
2612 {
2613   struct GNUNET_MessageHeader *mhp;
2614   struct GNUNET_MessageHeader mh;
2615   uint16_t msize;
2616   int need_align;
2617
2618   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2619     {
2620       if (0 != offset % sizeof (uint16_t))
2621         {
2622           /* outch, need to copy to access header */
2623           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2624           mhp = &mh;
2625         }
2626       else
2627         {
2628           /* can access header directly */
2629           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2630         }
2631       msize = ntohs (mhp->size);
2632       if (msize + offset > buffer_size)
2633         {
2634           /* malformed message, header says it is larger than what
2635              would fit into the overall buffer */
2636           GNUNET_break_op (0);
2637           break;
2638         }
2639 #if HAVE_UNALIGNED_64_ACCESS
2640       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2641 #else
2642       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2643 #endif
2644       if (GNUNET_YES == need_align)
2645         align_and_deliver (sender, &buffer[offset], msize);
2646       else
2647         deliver_message (sender,
2648                          (const struct GNUNET_MessageHeader *)
2649                          &buffer[offset], msize);
2650       offset += msize;
2651     }
2652 }
2653
2654
2655 /**
2656  * We received an encrypted message.  Decrypt, validate and
2657  * pass on to the appropriate clients.
2658  */
2659 static void
2660 handle_encrypted_message (struct Neighbour *n,
2661                           const struct EncryptedMessage *m)
2662 {
2663   size_t size = ntohs (m->header.size);
2664   char buf[size];
2665   struct EncryptedMessage *pt;  /* plaintext */
2666   GNUNET_HashCode ph;
2667   size_t off;
2668   uint32_t snum;
2669   struct GNUNET_TIME_Absolute t;
2670
2671 #if DEBUG_CORE
2672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2673               "Core service receives `%s' request from `%4s'.\n",
2674               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2675 #endif  
2676   /* decrypt */
2677   if (GNUNET_OK !=
2678       do_decrypt (n,
2679                   &m->plaintext_hash,
2680                   &m->sequence_number,
2681                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2682     return;
2683   pt = (struct EncryptedMessage *) buf;
2684
2685   /* validate hash */
2686   GNUNET_CRYPTO_hash (&pt->sequence_number,
2687                       size - ENCRYPTED_HEADER_SIZE, &ph);
2688   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2689     {
2690       /* checksum failed */
2691       GNUNET_break_op (0);
2692       return;
2693     }
2694
2695   /* validate sequence number */
2696   snum = ntohl (pt->sequence_number);
2697   if (n->last_sequence_number_received == snum)
2698     {
2699       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2700                   "Received duplicate message, ignoring.\n");
2701       /* duplicate, ignore */
2702       return;
2703     }
2704   if ((n->last_sequence_number_received > snum) &&
2705       (n->last_sequence_number_received - snum > 32))
2706     {
2707       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2708                   "Received ancient out of sequence message, ignoring.\n");
2709       /* ancient out of sequence, ignore */
2710       return;
2711     }
2712   if (n->last_sequence_number_received > snum)
2713     {
2714       unsigned int rotbit =
2715         1 << (n->last_sequence_number_received - snum - 1);
2716       if ((n->last_packets_bitmap & rotbit) != 0)
2717         {
2718           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2719                       "Received duplicate message, ignoring.\n");
2720           /* duplicate, ignore */
2721           return;
2722         }
2723       n->last_packets_bitmap |= rotbit;
2724     }
2725   if (n->last_sequence_number_received < snum)
2726     {
2727       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2728       n->last_sequence_number_received = snum;
2729     }
2730
2731   /* check timestamp */
2732   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2733   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2734     {
2735       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2736                   _
2737                   ("Message received far too old (%llu ms). Content ignored.\n"),
2738                   GNUNET_TIME_absolute_get_duration (t).value);
2739       return;
2740     }
2741
2742   /* process decrypted message(s) */
2743   update_window (GNUNET_YES,
2744                  &n->available_send_window,
2745                  &n->last_asw_update,
2746                  n->bpm_out);
2747   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2748   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2749                            n->bpm_out_internal_limit);
2750   n->last_activity = GNUNET_TIME_absolute_get ();
2751   off = sizeof (struct EncryptedMessage);
2752   deliver_messages (n, buf, size, off);
2753 }
2754
2755
2756 /**
2757  * Function called by the transport for each received message.
2758  *
2759  * @param cls closure
2760  * @param latency estimated latency for communicating with the
2761  *             given peer
2762  * @param peer (claimed) identity of the other peer
2763  * @param message the message
2764  */
2765 static void
2766 handle_transport_receive (void *cls,
2767                           struct GNUNET_TIME_Relative latency,
2768                           const struct GNUNET_PeerIdentity *peer,
2769                           const struct GNUNET_MessageHeader *message)
2770 {
2771   struct Neighbour *n;
2772   struct GNUNET_TIME_Absolute now;
2773   int up;
2774   uint16_t type;
2775   uint16_t size;
2776
2777 #if DEBUG_CORE
2778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2779               "Received message of type %u from `%4s', demultiplexing.\n",
2780               ntohs (message->type), GNUNET_i2s (peer));
2781 #endif
2782   n = find_neighbour (peer);
2783   if (n == NULL)
2784     {
2785       GNUNET_break (0);
2786       return;
2787     }
2788   n->last_latency = latency;
2789   up = n->status == PEER_STATE_KEY_CONFIRMED;
2790   type = ntohs (message->type);
2791   size = ntohs (message->size);
2792   switch (type)
2793     {
2794     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2795       if (size != sizeof (struct SetKeyMessage))
2796         {
2797           GNUNET_break_op (0);
2798           return;
2799         }
2800       handle_set_key (n, (const struct SetKeyMessage *) message);
2801       break;
2802     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2803       if (size < sizeof (struct EncryptedMessage) +
2804           sizeof (struct GNUNET_MessageHeader))
2805         {
2806           GNUNET_break_op (0);
2807           return;
2808         }
2809       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2810           (n->status != PEER_STATE_KEY_CONFIRMED))
2811         {
2812           GNUNET_break_op (0);
2813           return;
2814         }
2815       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2816       break;
2817     case GNUNET_MESSAGE_TYPE_CORE_PING:
2818       if (size != sizeof (struct PingMessage))
2819         {
2820           GNUNET_break_op (0);
2821           return;
2822         }
2823       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2824           (n->status != PEER_STATE_KEY_CONFIRMED))
2825         {
2826 #if DEBUG_CORE
2827           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2828                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2829                       "PING", GNUNET_i2s (&n->peer));
2830 #endif
2831           GNUNET_free_non_null (n->pending_ping);
2832           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2833           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2834           return;
2835         }
2836       handle_ping (n, (const struct PingMessage *) message);
2837       break;
2838     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2839       if (size != sizeof (struct PingMessage))
2840         {
2841           GNUNET_break_op (0);
2842           return;
2843         }
2844       if ((n->status != PEER_STATE_KEY_SENT) &&
2845           (n->status != PEER_STATE_KEY_RECEIVED) &&
2846           (n->status != PEER_STATE_KEY_CONFIRMED))
2847         {
2848           /* could not decrypt pong, oops! */
2849           GNUNET_break_op (0);
2850           return;
2851         }
2852       handle_pong (n, (const struct PingMessage *) message);
2853       break;
2854     default:
2855       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2856                   _("Unsupported message of type %u received.\n"), type);
2857       return;
2858     }
2859   if (n->status == PEER_STATE_KEY_CONFIRMED)
2860     {
2861       now = GNUNET_TIME_absolute_get ();
2862       n->last_activity = now;
2863       if (!up)
2864         n->time_established = now;
2865     }
2866 }
2867
2868
2869 /**
2870  * Function that recalculates the bandwidth quota for the
2871  * given neighbour and transmits it to the transport service.
2872  * 
2873  * @param cls neighbour for the quota update
2874  * @param tc context
2875  */
2876 static void
2877 neighbour_quota_update (void *cls,
2878                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2879
2880
2881 /**
2882  * Schedule the task that will recalculate the bandwidth
2883  * quota for this peer (and possibly force a disconnect of
2884  * idle peers by calculating a bandwidth of zero).
2885  */
2886 static void
2887 schedule_quota_update (struct Neighbour *n)
2888 {
2889   GNUNET_assert (n->quota_update_task ==
2890                  GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
2891   n->quota_update_task
2892     = GNUNET_SCHEDULER_add_delayed (sched,
2893                                     GNUNET_NO,
2894                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2895                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2896                                     QUOTA_UPDATE_FREQUENCY,
2897                                     &neighbour_quota_update,
2898                                     n);
2899 }
2900
2901
2902 /**
2903  * Function that recalculates the bandwidth quota for the
2904  * given neighbour and transmits it to the transport service.
2905  * 
2906  * @param cls neighbour for the quota update
2907  * @param tc context
2908  */
2909 static void
2910 neighbour_quota_update (void *cls,
2911                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2912 {
2913   struct Neighbour *n = cls;
2914   uint32_t q_in;
2915   double pref_rel;
2916   double share;
2917   unsigned long long distributable;
2918   
2919   n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2920   /* calculate relative preference among all neighbours;
2921      divides by a bit more to avoid division by zero AND to
2922      account for possibility of new neighbours joining any time 
2923      AND to convert to double... */
2924   pref_rel = n->current_preference / (1.0 + preference_sum);
2925   share = 0;
2926   distributable = 0;
2927   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2928     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2929   share = distributable * pref_rel;
2930   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2931   /* check if we want to disconnect for good due to inactivity */
2932   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2933        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2934     q_in = 0; /* force disconnect */
2935   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2936        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2937     {
2938       n->bpm_in = q_in;
2939       GNUNET_TRANSPORT_set_quota (transport,
2940                                   &n->peer,
2941                                   n->bpm_in, 
2942                                   n->bpm_out,
2943                                   GNUNET_TIME_UNIT_FOREVER_REL,
2944                                   NULL, NULL);
2945     }
2946   schedule_quota_update (n);
2947 }
2948
2949
2950 /**
2951  * Function called by transport to notify us that
2952  * a peer connected to us (on the network level).
2953  *
2954  * @param cls closure
2955  * @param peer the peer that connected
2956  * @param latency current latency of the connection
2957  */
2958 static void
2959 handle_transport_notify_connect (void *cls,
2960                                  const struct GNUNET_PeerIdentity *peer,
2961                                  struct GNUNET_TIME_Relative latency)
2962 {
2963   struct Neighbour *n;
2964   struct GNUNET_TIME_Absolute now;
2965   struct ConnectNotifyMessage cnm;
2966
2967   n = find_neighbour (peer);
2968   if (n != NULL)
2969     {
2970       /* duplicate connect notification!? */
2971       GNUNET_break (0);
2972       return;
2973     }
2974   now = GNUNET_TIME_absolute_get ();
2975   n = GNUNET_malloc (sizeof (struct Neighbour));
2976   n->next = neighbours;
2977   neighbours = n;
2978   neighbour_count++;
2979   n->peer = *peer;
2980   n->last_latency = latency;
2981   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2982   n->encrypt_key_created = now;
2983   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2984   n->last_asw_update = now;
2985   n->last_arw_update = now;
2986   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2987   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2988   n->bpm_out_internal_limit = (uint32_t) - 1;
2989   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2990   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2991                                                 (uint32_t) - 1);
2992 #if DEBUG_CORE
2993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2994               "Received connection from `%4s'.\n",
2995               GNUNET_i2s (&n->peer));
2996 #endif
2997   schedule_quota_update (n);
2998   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2999   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
3000   cnm.bpm_available = htonl (n->bpm_out);
3001   cnm.peer = *peer;
3002   cnm.last_activity = GNUNET_TIME_absolute_hton (now);
3003   send_to_all_clients (&cnm.header, GNUNET_YES);
3004 }
3005
3006
3007 /**
3008  * Free the given entry for the neighbour (it has
3009  * already been removed from the list at this point).
3010  *
3011  * @param n neighbour to free
3012  */
3013 static void
3014 free_neighbour (struct Neighbour *n)
3015 {
3016   struct MessageEntry *m;
3017
3018   while (NULL != (m = n->messages))
3019     {
3020       n->messages = m->next;
3021       GNUNET_free (m);
3022     }
3023   while (NULL != (m = n->encrypted_head))
3024     {
3025       n->encrypted_head = m->next;
3026       GNUNET_free (m);
3027     }
3028   if (NULL != n->th)
3029     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3030   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3031     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3032   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3033     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3034   if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3035     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3036   GNUNET_free_non_null (n->public_key);
3037   GNUNET_free_non_null (n->pending_ping);
3038   GNUNET_free (n);
3039 }
3040
3041
3042 /**
3043  * Function called by transport telling us that a peer
3044  * disconnected.
3045  *
3046  * @param cls closure
3047  * @param peer the peer that disconnected
3048  */
3049 static void
3050 handle_transport_notify_disconnect (void *cls,
3051                                     const struct GNUNET_PeerIdentity *peer)
3052 {
3053   struct ConnectNotifyMessage cnm;
3054   struct Neighbour *n;
3055   struct Neighbour *p;
3056
3057 #if DEBUG_CORE
3058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3059               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3060 #endif
3061   p = NULL;
3062   n = neighbours;
3063   while ((n != NULL) &&
3064          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3065     {
3066       p = n;
3067       n = n->next;
3068     }
3069   if (n == NULL)
3070     {
3071       GNUNET_break (0);
3072       return;
3073     }
3074   if (p == NULL)
3075     neighbours = n->next;
3076   else
3077     p->next = n->next;
3078   GNUNET_assert (neighbour_count > 0);
3079   neighbour_count--;
3080   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3081   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3082   cnm.bpm_available = htonl (0);
3083   cnm.peer = *peer;
3084   cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
3085   send_to_all_clients (&cnm.header, GNUNET_YES);
3086   free_neighbour (n);
3087 }
3088
3089
3090 /**
3091  * Last task run during shutdown.  Disconnects us from
3092  * the transport.
3093  */
3094 static void
3095 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3096 {
3097   struct Neighbour *n;
3098   struct Client *c;
3099
3100 #if DEBUG_CORE
3101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3102               "Core service shutting down.\n");
3103 #endif
3104   GNUNET_assert (transport != NULL);
3105   GNUNET_TRANSPORT_disconnect (transport);
3106   transport = NULL;
3107   while (NULL != (n = neighbours))
3108     {
3109       neighbours = n->next;
3110       GNUNET_assert (neighbour_count > 0);
3111       neighbour_count--;
3112       free_neighbour (n);
3113     }
3114   while (NULL != (c = clients))
3115     handle_client_disconnect (NULL, c->client_handle);
3116 }
3117
3118
3119 /**
3120  * Initiate core service.
3121  *
3122  * @param cls closure
3123  * @param s scheduler to use
3124  * @param serv the initialized server
3125  * @param c configuration to use
3126  */
3127 static void
3128 run (void *cls,
3129      struct GNUNET_SCHEDULER_Handle *s,
3130      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
3131 {
3132 #if 0
3133   unsigned long long qin;
3134   unsigned long long qout;
3135   unsigned long long tneigh;
3136 #endif
3137   char *keyfile;
3138
3139   sched = s;
3140   cfg = c;
3141   /* parse configuration */
3142   if (
3143        (GNUNET_OK !=
3144         GNUNET_CONFIGURATION_get_value_number (c,
3145                                                "CORE",
3146                                                "TOTAL_QUOTA_IN",
3147                                                &bandwidth_target_in)) ||
3148        (GNUNET_OK !=
3149         GNUNET_CONFIGURATION_get_value_number (c,
3150                                                "CORE",
3151                                                "TOTAL_QUOTA_OUT",
3152                                                &bandwidth_target_out)) ||
3153 #if 0
3154        (GNUNET_OK !=
3155         GNUNET_CONFIGURATION_get_value_number (c,
3156                                                "CORE",
3157                                                "YY",
3158                                                &qout)) ||
3159        (GNUNET_OK !=
3160         GNUNET_CONFIGURATION_get_value_number (c,
3161                                                "CORE",
3162                                                "ZZ_LIMIT", &tneigh)) ||
3163 #endif
3164        (GNUNET_OK !=
3165         GNUNET_CONFIGURATION_get_value_filename (c,
3166                                                  "GNUNETD",
3167                                                  "HOSTKEY", &keyfile)))
3168     {
3169       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3170                   _
3171                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3172       GNUNET_SCHEDULER_shutdown (s);
3173       return;
3174     }
3175   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3176   GNUNET_free (keyfile);
3177   if (my_private_key == NULL)
3178     {
3179       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3180                   _("Core service could not access hostkey.  Exiting.\n"));
3181       GNUNET_SCHEDULER_shutdown (s);
3182       return;
3183     }
3184   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3185   GNUNET_CRYPTO_hash (&my_public_key,
3186                       sizeof (my_public_key), &my_identity.hashPubKey);
3187   /* setup notification */
3188   server = serv;
3189   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3190   /* setup transport connection */
3191   transport = GNUNET_TRANSPORT_connect (sched,
3192                                         cfg,
3193                                         NULL,
3194                                         &handle_transport_receive,
3195                                         &handle_transport_notify_connect,
3196                                         &handle_transport_notify_disconnect);
3197   GNUNET_assert (NULL != transport);
3198   GNUNET_SCHEDULER_add_delayed (sched,
3199                                 GNUNET_YES,
3200                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3201                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
3202                                 GNUNET_TIME_UNIT_FOREVER_REL,
3203                                 &cleaning_task, NULL);
3204   /* process client requests */
3205   GNUNET_SERVER_add_handlers (server, handlers);
3206   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3207               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3208 }
3209
3210
3211 /**
3212  * Function called during shutdown.  Clean up our state.
3213  */
3214 static void
3215 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
3216 {
3217   if (my_private_key != NULL)
3218     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3219 }
3220
3221
3222 /**
3223  * The main function for the transport service.
3224  *
3225  * @param argc number of arguments from the command line
3226  * @param argv command line arguments
3227  * @return 0 ok, 1 on error
3228  */
3229 int
3230 main (int argc, char *const *argv)
3231 {
3232   return (GNUNET_OK ==
3233           GNUNET_SERVICE_run (argc,
3234                               argv,
3235                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3236 }
3237
3238 /* end of gnunet-service-core.c */