fix
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - not all GNUNET_CORE_OPTION_SEND_* flags are fully supported yet
28  *   (i.e. no SEND_XXX_OUTBOUND).
29  * - 'REQUEST_DISCONNECT' is not implemented (transport API is lacking!)
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Non-NULL if we are currently looking up HELLOs for this peer.
369    * for this peer.
370    */
371   struct GNUNET_PEERINFO_IteratorContext *pitr;
372
373   /**
374    * SetKeyMessage to transmit, NULL if we are not currently trying
375    * to send one.
376    */
377   struct SetKeyMessage *skm;
378
379   /**
380    * Identity of the neighbour.
381    */
382   struct GNUNET_PeerIdentity peer;
383
384   /**
385    * Key we use to encrypt our messages for the other peer
386    * (initialized by us when we do the handshake).
387    */
388   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
389
390   /**
391    * Key we use to decrypt messages from the other peer
392    * (given to us by the other peer during the handshake).
393    */
394   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
395
396   /**
397    * ID of task used for re-trying plaintext scheduling.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
400
401   /**
402    * ID of task used for re-trying SET_KEY and PING message.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
405
406   /**
407    * ID of task used for updating bandwidth quota for this neighbour.
408    */
409   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
410
411   /**
412    * At what time did we generate our encryption key?
413    */
414   struct GNUNET_TIME_Absolute encrypt_key_created;
415
416   /**
417    * At what time did the other peer generate the decryption key?
418    */
419   struct GNUNET_TIME_Absolute decrypt_key_created;
420
421   /**
422    * At what time did we initially establish (as in, complete session
423    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
424    */
425   struct GNUNET_TIME_Absolute time_established;
426
427   /**
428    * At what time did we last receive an encrypted message from the
429    * other peer?  Should be zero if status != KEY_CONFIRMED.
430    */
431   struct GNUNET_TIME_Absolute last_activity;
432
433   /**
434    * Last latency observed from this peer.
435    */
436   struct GNUNET_TIME_Relative last_latency;
437
438   /**
439    * At what frequency are we currently re-trying SET_KEY messages?
440    */
441   struct GNUNET_TIME_Relative set_key_retry_frequency;
442
443   /**
444    * Time of our last update to the "available_send_window".
445    */
446   struct GNUNET_TIME_Absolute last_asw_update;
447
448   /**
449    * Time of our last update to the "available_recv_window".
450    */
451   struct GNUNET_TIME_Absolute last_arw_update;
452
453   /**
454    * Number of bytes that we are eligible to transmit to this
455    * peer at this point.  Incremented every minute by max_out_bpm,
456    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
457    * bandwidth-hogs are sampled at a frequency of about 78s!);
458    * may get negative if we have VERY high priority content.
459    */
460   long long available_send_window; 
461
462   /**
463    * How much downstream capacity of this peer has been reserved for
464    * our traffic?  (Our clients can request that a certain amount of
465    * bandwidth is available for replies to them; this value is used to
466    * make sure that this reserved amount of bandwidth is actually
467    * available).
468    */
469   long long available_recv_window; 
470
471   /**
472    * How valueable were the messages of this peer recently?
473    */
474   unsigned long long current_preference;
475
476   /**
477    * Bit map indicating which of the 32 sequence numbers before the last
478    * were received (good for accepting out-of-order packets and
479    * estimating reliability of the connection)
480    */
481   unsigned int last_packets_bitmap;
482
483   /**
484    * Number of messages in the message queue for this peer.
485    */
486   unsigned int message_queue_size;
487
488   /**
489    * last sequence number received on this connection (highest)
490    */
491   uint32_t last_sequence_number_received;
492
493   /**
494    * last sequence number transmitted
495    */
496   uint32_t last_sequence_number_sent;
497
498   /**
499    * Available bandwidth in for this peer (current target).
500    */
501   uint32_t bpm_in;
502
503   /**
504    * Available bandwidth out for this peer (current target).
505    */
506   uint32_t bpm_out;
507
508   /**
509    * Internal bandwidth limit set for this peer (initially
510    * typically set to "-1").  "bpm_out" is MAX of
511    * "bpm_out_internal_limit" and "bpm_out_external_limit".
512    */
513   uint32_t bpm_out_internal_limit;
514
515   /**
516    * External bandwidth limit set for this peer by the
517    * peer that we are communicating with.  "bpm_out" is MAX of
518    * "bpm_out_internal_limit" and "bpm_out_external_limit".
519    */
520   uint32_t bpm_out_external_limit;
521
522   /**
523    * What was our PING challenge number (for this peer)?
524    */
525   uint32_t ping_challenge;
526
527   /**
528    * What was the last distance to this peer as reported by the transports?
529    * (FIXME: actually set this!)
530    */
531   uint32_t last_distance;
532
533   /**
534    * What is our connection status?
535    */
536   enum PeerStateMachine status;
537
538 };
539
540
541 /**
542  * Events are messages for clients.  The struct
543  * itself is followed by the actual message.
544  */
545 struct Event
546 {
547   /**
548    * This is a linked list.
549    */
550   struct Event *next;
551
552   /**
553    * Size of the message.
554    */
555   size_t size;
556
557   /**
558    * Could this event be dropped if this queue
559    * is getting too large? (NOT YET USED!)
560    */
561   int can_drop;
562
563 };
564
565
566 /**
567  * Data structure for each client connected to the core service.
568  */
569 struct Client
570 {
571   /**
572    * Clients are kept in a linked list.
573    */
574   struct Client *next;
575
576   /**
577    * Handle for the client with the server API.
578    */
579   struct GNUNET_SERVER_Client *client_handle;
580
581   /**
582    * Linked list of messages we still need to deliver to
583    * this client.
584    */
585   struct Event *event_head;
586
587   /**
588    * Tail of the linked list of events.
589    */
590   struct Event *event_tail;
591
592   /**
593    * Current transmit handle, NULL if no transmission request
594    * is pending.
595    */
596   struct GNUNET_CONNECTION_TransmitHandle *th;
597
598   /**
599    * Array of the types of messages this peer cares
600    * about (with "tcnt" entries).  Allocated as part
601    * of this client struct, do not free!
602    */
603   uint16_t *types;
604
605   /**
606    * Options for messages this client cares about,
607    * see GNUNET_CORE_OPTION_ values.
608    */
609   uint32_t options;
610
611   /**
612    * Number of types of incoming messages this client
613    * specifically cares about.  Size of the "types" array.
614    */
615   unsigned int tcnt;
616
617 };
618
619
620 /**
621  * Our public key.
622  */
623 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
624
625 /**
626  * Our identity.
627  */
628 static struct GNUNET_PeerIdentity my_identity;
629
630 /**
631  * Our private key.
632  */
633 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
634
635 /**
636  * Our scheduler.
637  */
638 struct GNUNET_SCHEDULER_Handle *sched;
639
640 /**
641  * Our configuration.
642  */
643 const struct GNUNET_CONFIGURATION_Handle *cfg;
644
645 /**
646  * Our server.
647  */
648 static struct GNUNET_SERVER_Handle *server;
649
650 /**
651  * Transport service.
652  */
653 static struct GNUNET_TRANSPORT_Handle *transport;
654
655 /**
656  * Linked list of our clients.
657  */
658 static struct Client *clients;
659
660 /**
661  * We keep neighbours in a linked list (for now).
662  */
663 static struct Neighbour *neighbours;
664
665 /**
666  * Sum of all preferences among all neighbours.
667  */
668 static unsigned long long preference_sum;
669
670 /**
671  * Total number of neighbours we have.
672  */
673 static unsigned int neighbour_count;
674
675 /**
676  * How much inbound bandwidth are we supposed to be using?
677  */
678 static unsigned long long bandwidth_target_in;
679
680 /**
681  * How much outbound bandwidth are we supposed to be using?
682  */
683 static unsigned long long bandwidth_target_out;
684
685
686
687 /**
688  * A preference value for a neighbour was update.  Update
689  * the preference sum accordingly.
690  *
691  * @param inc how much was a preference value increased?
692  */
693 static void
694 update_preference_sum (unsigned long long inc)
695 {
696   struct Neighbour *n;
697   unsigned long long os;
698
699   os = preference_sum;
700   preference_sum += inc;
701   if (preference_sum >= os)
702     return; /* done! */
703   /* overflow! compensate by cutting all values in half! */
704   preference_sum = 0;
705   n = neighbours;
706   while (n != NULL)
707     {
708       n->current_preference /= 2;
709       preference_sum += n->current_preference;
710       n = n->next;
711     }    
712 }
713
714
715 /**
716  * Recalculate the number of bytes we expect to
717  * receive or transmit in a given window.
718  *
719  * @param force force an update now (even if not much time has passed)
720  * @param window pointer to the byte counter (updated)
721  * @param ts pointer to the timestamp (updated)
722  * @param bpm number of bytes per minute that should
723  *        be added to the window.
724  */
725 static void
726 update_window (int force,
727                long long *window,
728                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
729 {
730   struct GNUNET_TIME_Relative since;
731
732   since = GNUNET_TIME_absolute_get_duration (*ts);
733   if ( (force == GNUNET_NO) &&
734        (since.value < 60 * 1000) )
735     return;                     /* not even a minute has passed */
736   *ts = GNUNET_TIME_absolute_get ();
737   *window += (bpm * since.value) / 60 / 1000;
738   if (*window > MAX_WINDOW_TIME * bpm)
739     *window = MAX_WINDOW_TIME * bpm;
740 }
741
742
743 /**
744  * Find the entry for the given neighbour.
745  *
746  * @param peer identity of the neighbour
747  * @return NULL if we are not connected, otherwise the
748  *         neighbour's entry.
749  */
750 static struct Neighbour *
751 find_neighbour (const struct GNUNET_PeerIdentity *peer)
752 {
753   struct Neighbour *ret;
754
755   ret = neighbours;
756   while ((ret != NULL) &&
757          (0 != memcmp (&ret->peer,
758                        peer, sizeof (struct GNUNET_PeerIdentity))))
759     ret = ret->next;
760   return ret;
761 }
762
763
764 /**
765  * Find the entry for the given client.
766  *
767  * @param client handle for the client
768  * @return NULL if we are not connected, otherwise the
769  *         client's struct.
770  */
771 static struct Client *
772 find_client (const struct GNUNET_SERVER_Client *client)
773 {
774   struct Client *ret;
775
776   ret = clients;
777   while ((ret != NULL) && (client != ret->client_handle))
778     ret = ret->next;
779   return ret;
780 }
781
782
783 /**
784  * If necessary, initiate a request with the server to
785  * transmit messages from the queue of the given client.
786  * @param client who to transfer messages to
787  */
788 static void request_transmit (struct Client *client);
789
790
791 /**
792  * Client is ready to receive data, provide it.
793  *
794  * @param cls closure
795  * @param size number of bytes available in buf
796  * @param buf where the callee should write the message
797  * @return number of bytes written to buf
798  */
799 static size_t
800 do_client_transmit (void *cls, size_t size, void *buf)
801 {
802   struct Client *client = cls;
803   struct Event *e;
804   char *tgt;
805   size_t ret;
806
807   client->th = NULL;
808 #if DEBUG_CORE_CLIENT
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Client ready to receive %u bytes.\n", size);
811 #endif
812   if (buf == NULL)
813     {
814 #if DEBUG_CORE
815       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
816                   "Failed to transmit data to client (disconnect)?\n");
817 #endif
818       return 0;                 /* we'll surely get a disconnect soon... */
819     }
820   tgt = buf;
821   ret = 0;
822   while ((NULL != (e = client->event_head)) && (e->size <= size))
823     {
824       memcpy (&tgt[ret], &e[1], e->size);
825       size -= e->size;
826       ret += e->size;
827       client->event_head = e->next;
828       GNUNET_free (e);
829     }
830   GNUNET_assert (ret > 0);
831   if (client->event_head == NULL)
832     client->event_tail = NULL;
833 #if DEBUG_CORE_CLIENT
834   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835               "Transmitting %u bytes to client\n", ret);
836 #endif
837   request_transmit (client);
838   return ret;
839 }
840
841
842 /**
843  * If necessary, initiate a request with the server to
844  * transmit messages from the queue of the given client.
845  * @param client who to transfer messages to
846  */
847 static void
848 request_transmit (struct Client *client)
849 {
850
851   if (NULL != client->th)
852     return;                     /* already pending */
853   if (NULL == client->event_head)
854     return;                     /* no more events pending */
855 #if DEBUG_CORE_CLIENT
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Asking server to transmit %u bytes to client\n",
858               client->event_head->size);
859 #endif
860   client->th
861     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
862                                            client->event_head->size,
863                                            GNUNET_TIME_UNIT_FOREVER_REL,
864                                            &do_client_transmit, client);
865 }
866
867
868 /**
869  * Send a message to one of our clients.
870  *
871  * @param client target for the message
872  * @param msg message to transmit
873  * @param can_drop could this message be dropped if the
874  *        client's queue is getting too large?
875  */
876 static void
877 send_to_client (struct Client *client,
878                 const struct GNUNET_MessageHeader *msg, int can_drop)
879 {
880   struct Event *e;
881   unsigned int queue_size;
882   uint16_t msize;
883
884 #if DEBUG_CORE_CLIENT
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "Preparing to send message of type %u to client.\n",
887               ntohs (msg->type));
888 #endif
889   queue_size = 0;
890   e = client->event_head;
891   while (e != NULL)
892     {
893       queue_size++;
894       e = e->next;
895     }
896   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
897        (can_drop == GNUNET_YES) )
898     {
899 #if DEBUG_CORE
900       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
901                   "Too many messages in queue for the client, dropping the new message.\n");
902 #endif
903       return;
904     }
905
906   msize = ntohs (msg->size);
907   e = GNUNET_malloc (sizeof (struct Event) + msize);
908   /* append */
909   if (client->event_tail != NULL)
910     client->event_tail->next = e;
911   else
912     client->event_head = e;
913   client->event_tail = e;
914   e->can_drop = can_drop;
915   e->size = msize;
916   memcpy (&e[1], msg, msize);
917   request_transmit (client);
918 }
919
920
921 /**
922  * Send a message to all of our current clients.
923  */
924 static void
925 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
926                      int can_drop,
927                      int options)
928 {
929   struct Client *c;
930
931   c = clients;
932   while (c != NULL)
933     {
934       if (0 != (c->options & options))
935         send_to_client (c, msg, can_drop);
936       c = c->next;
937     }
938 }
939
940
941 /**
942  * Handle CORE_INIT request.
943  */
944 static void
945 handle_client_init (void *cls,
946                     struct GNUNET_SERVER_Client *client,
947                     const struct GNUNET_MessageHeader *message)
948 {
949   const struct InitMessage *im;
950   struct InitReplyMessage irm;
951   struct Client *c;
952   uint16_t msize;
953   const uint16_t *types;
954   struct Neighbour *n;
955   struct ConnectNotifyMessage cnm;
956
957 #if DEBUG_CORE_CLIENT
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Client connecting to core service with `%s' message\n",
960               "INIT");
961 #endif
962   /* check that we don't have an entry already */
963   c = clients;
964   while (c != NULL)
965     {
966       if (client == c->client_handle)
967         {
968           GNUNET_break (0);
969           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
970           return;
971         }
972       c = c->next;
973     }
974   msize = ntohs (message->size);
975   if (msize < sizeof (struct InitMessage))
976     {
977       GNUNET_break (0);
978       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
979       return;
980     }
981   im = (const struct InitMessage *) message;
982   types = (const uint16_t *) &im[1];
983   msize -= sizeof (struct InitMessage);
984   c = GNUNET_malloc (sizeof (struct Client) + msize);
985   c->client_handle = client;
986   c->next = clients;
987   clients = c;
988   memcpy (&c[1], types, msize);
989   c->types = (uint16_t *) & c[1];
990   c->options = ntohl (im->options);
991   c->tcnt = msize / sizeof (uint16_t);
992   /* send init reply message */
993   irm.header.size = htons (sizeof (struct InitReplyMessage));
994   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
995   irm.reserved = htonl (0);
996   memcpy (&irm.publicKey,
997           &my_public_key,
998           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
999 #if DEBUG_CORE_CLIENT
1000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001               "Sending `%s' message to client.\n", "INIT_REPLY");
1002 #endif
1003   send_to_client (c, &irm.header, GNUNET_NO);
1004   /* notify new client about existing neighbours */
1005   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
1006   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1007   n = neighbours;
1008   while (n != NULL)
1009     {
1010 #if DEBUG_CORE_CLIENT
1011       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
1013 #endif
1014       cnm.distance = htonl (n->last_distance);
1015       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
1016       cnm.peer = n->peer;
1017       send_to_client (c, &cnm.header, GNUNET_NO);
1018       n = n->next;
1019     }
1020   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1021 }
1022
1023
1024 /**
1025  * A client disconnected, clean up.
1026  *
1027  * @param cls closure
1028  * @param client identification of the client
1029  */
1030 static void
1031 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1032 {
1033   struct Client *pos;
1034   struct Client *prev;
1035   struct Event *e;
1036
1037 #if DEBUG_CORE_CLIENT
1038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1039               "Client has disconnected from core service.\n");
1040 #endif
1041   prev = NULL;
1042   pos = clients;
1043   while (pos != NULL)
1044     {
1045       if (client == pos->client_handle)
1046         {
1047           if (prev == NULL)
1048             clients = pos->next;
1049           else
1050             prev->next = pos->next;
1051           if (pos->th != NULL)
1052             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1053           while (NULL != (e = pos->event_head))
1054             {
1055               pos->event_head = e->next;
1056               GNUNET_free (e);
1057             }
1058           GNUNET_free (pos);
1059           return;
1060         }
1061       prev = pos;
1062       pos = pos->next;
1063     }
1064   /* client never sent INIT */
1065 }
1066
1067
1068 /**
1069  * Handle REQUEST_INFO request.
1070  */
1071 static void
1072 handle_client_request_info (void *cls,
1073                                  struct GNUNET_SERVER_Client *client,
1074                                  const struct GNUNET_MessageHeader *message)
1075 {
1076   const struct RequestInfoMessage *rcm;
1077   struct Neighbour *n;
1078   struct ConfigurationInfoMessage cim;
1079   struct Client *c;
1080   int reserv;
1081   unsigned long long old_preference;
1082
1083 #if DEBUG_CORE_CLIENT
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "Core service receives `%s' request.\n", "REQUEST_INFO");
1086 #endif
1087   rcm = (const struct RequestInfoMessage *) message;
1088   n = find_neighbour (&rcm->peer);
1089   memset (&cim, 0, sizeof (cim));
1090   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1091     {
1092       update_window (GNUNET_YES,
1093                      &n->available_send_window,
1094                      &n->last_asw_update,
1095                      n->bpm_out);
1096       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1097       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1098                                n->bpm_out_external_limit);
1099       reserv = ntohl (rcm->reserve_inbound);
1100       if (reserv < 0)
1101         {
1102           n->available_recv_window += reserv;
1103         }
1104       else if (reserv > 0)
1105         {
1106           update_window (GNUNET_NO,
1107                          &n->available_recv_window,
1108                          &n->last_arw_update, n->bpm_in);
1109           if (n->available_recv_window < reserv)
1110             reserv = n->available_recv_window;
1111           n->available_recv_window -= reserv;
1112         }
1113       old_preference = n->current_preference;
1114       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1115       if (old_preference > n->current_preference) 
1116         {
1117           /* overflow; cap at maximum value */
1118           n->current_preference = (unsigned long long) -1;
1119         }
1120       update_preference_sum (n->current_preference - old_preference);
1121       cim.reserved_amount = htonl (reserv);
1122       cim.bpm_in = htonl (n->bpm_in);
1123       cim.bpm_out = htonl (n->bpm_out);
1124       cim.preference = n->current_preference;
1125     }
1126   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1127   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1128   cim.peer = rcm->peer;
1129   c = find_client (client);
1130   if (c == NULL)
1131     {
1132       GNUNET_break (0);
1133       return;
1134     }
1135 #if DEBUG_CORE_CLIENT
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1138 #endif
1139   send_to_client (c, &cim.header, GNUNET_NO);
1140 }
1141
1142
1143 /**
1144  * Check if we have encrypted messages for the specified neighbour
1145  * pending, and if so, check with the transport about sending them
1146  * out.
1147  *
1148  * @param n neighbour to check.
1149  */
1150 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1151
1152
1153 /**
1154  * Function called when the transport service is ready to
1155  * receive an encrypted message for the respective peer
1156  *
1157  * @param cls neighbour to use message from
1158  * @param size number of bytes we can transmit
1159  * @param buf where to copy the message
1160  * @return number of bytes transmitted
1161  */
1162 static size_t
1163 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1164 {
1165   struct Neighbour *n = cls;
1166   struct MessageEntry *m;
1167   size_t ret;
1168   char *cbuf;
1169
1170   n->th = NULL;
1171   GNUNET_assert (NULL != (m = n->encrypted_head));
1172   n->encrypted_head = m->next;
1173   if (m->next == NULL)
1174     n->encrypted_tail = NULL;
1175   ret = 0;
1176   cbuf = buf;
1177   if (buf != NULL)
1178     {
1179       GNUNET_assert (size >= m->size);
1180       memcpy (cbuf, &m[1], m->size);
1181       ret = m->size;
1182       n->available_send_window -= m->size;
1183       process_encrypted_neighbour_queue (n);
1184 #if DEBUG_CORE
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1187                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1188                   ret, GNUNET_i2s (&n->peer));
1189 #endif
1190     }
1191   else
1192     {
1193       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1194                   "Transmission for message of type %u and size %u failed\n",
1195                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1196                   m->size);
1197     }
1198   GNUNET_free (m);
1199   return ret;
1200 }
1201
1202
1203 /**
1204  * Check if we have plaintext messages for the specified neighbour
1205  * pending, and if so, consider batching and encrypting them (and
1206  * then trigger processing of the encrypted queue if needed).
1207  *
1208  * @param n neighbour to check.
1209  */
1210 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1211
1212
1213 /**
1214  * Check if we have encrypted messages for the specified neighbour
1215  * pending, and if so, check with the transport about sending them
1216  * out.
1217  *
1218  * @param n neighbour to check.
1219  */
1220 static void
1221 process_encrypted_neighbour_queue (struct Neighbour *n)
1222 {
1223   struct MessageEntry *m;
1224  
1225   if (n->th != NULL)
1226     return;  /* request already pending */
1227   if (n->encrypted_head == NULL)
1228     {
1229       /* encrypted queue empty, try plaintext instead */
1230       process_plaintext_neighbour_queue (n);
1231       return;
1232     }
1233 #if DEBUG_CORE
1234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1236               n->encrypted_head->size,
1237               GNUNET_i2s (&n->peer),
1238               GNUNET_TIME_absolute_get_remaining (n->
1239                                                   encrypted_head->deadline).
1240               value);
1241 #endif
1242   n->th =
1243     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1244                                             n->encrypted_head->size,
1245                                             n->encrypted_head->priority,
1246                                             GNUNET_TIME_absolute_get_remaining
1247                                             (n->encrypted_head->deadline),
1248                                             &notify_encrypted_transmit_ready,
1249                                             n);
1250   if (n->th == NULL)
1251     {
1252       /* message request too large (oops) */
1253       GNUNET_break (0);
1254       /* discard encrypted message */
1255       GNUNET_assert (NULL != (m = n->encrypted_head));
1256       n->encrypted_head = m->next;
1257       if (m->next == NULL)
1258         n->encrypted_tail = NULL;
1259       GNUNET_free (m);
1260       process_encrypted_neighbour_queue (n);
1261     }
1262 }
1263
1264
1265 /**
1266  * Decrypt size bytes from in and write the result to out.  Use the
1267  * key for inbound traffic of the given neighbour.  This function does
1268  * NOT do any integrity-checks on the result.
1269  *
1270  * @param n neighbour we are receiving from
1271  * @param iv initialization vector to use
1272  * @param in ciphertext
1273  * @param out plaintext
1274  * @param size size of in/out
1275  * @return GNUNET_OK on success
1276  */
1277 static int
1278 do_decrypt (struct Neighbour *n,
1279             const GNUNET_HashCode * iv,
1280             const void *in, void *out, size_t size)
1281 {
1282   if (size != (uint16_t) size)
1283     {
1284       GNUNET_break (0);
1285       return GNUNET_NO;
1286     }
1287   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1288       (n->status != PEER_STATE_KEY_CONFIRMED))
1289     {
1290       GNUNET_break_op (0);
1291       return GNUNET_SYSERR;
1292     }
1293   if (size !=
1294       GNUNET_CRYPTO_aes_decrypt (in,
1295                                  (uint16_t) size,
1296                                  &n->decrypt_key,
1297                                  (const struct
1298                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1299                                  out))
1300     {
1301       GNUNET_break (0);
1302       return GNUNET_SYSERR;
1303     }
1304 #if DEBUG_CORE
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "Decrypted %u bytes from `%4s' using key %u\n",
1307               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1308 #endif
1309   return GNUNET_OK;
1310 }
1311
1312
1313 /**
1314  * Encrypt size bytes from in and write the result to out.  Use the
1315  * key for outbound traffic of the given neighbour.
1316  *
1317  * @param n neighbour we are sending to
1318  * @param iv initialization vector to use
1319  * @param in ciphertext
1320  * @param out plaintext
1321  * @param size size of in/out
1322  * @return GNUNET_OK on success
1323  */
1324 static int
1325 do_encrypt (struct Neighbour *n,
1326             const GNUNET_HashCode * iv,
1327             const void *in, void *out, size_t size)
1328 {
1329   if (size != (uint16_t) size)
1330     {
1331       GNUNET_break (0);
1332       return GNUNET_NO;
1333     }
1334   GNUNET_assert (size ==
1335                  GNUNET_CRYPTO_aes_encrypt (in,
1336                                             (uint16_t) size,
1337                                             &n->encrypt_key,
1338                                             (const struct
1339                                              GNUNET_CRYPTO_AesInitializationVector
1340                                              *) iv, out));
1341 #if DEBUG_CORE
1342   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343               "Encrypted %u bytes for `%4s' using key %u\n", size,
1344               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1345 #endif
1346   return GNUNET_OK;
1347 }
1348
1349
1350 /**
1351  * Select messages for transmission.  This heuristic uses a combination
1352  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1353  * and priority-based discard (in case no feasible schedule exist) and
1354  * speculative optimization (defer any kind of transmission until
1355  * we either create a batch of significant size, 25% of max, or until
1356  * we are close to a deadline).  Furthermore, when scheduling the
1357  * heuristic also packs as many messages into the batch as possible,
1358  * starting with those with the earliest deadline.  Yes, this is fun.
1359  *
1360  * @param n neighbour to select messages from
1361  * @param size number of bytes to select for transmission
1362  * @param retry_time set to the time when we should try again
1363  *        (only valid if this function returns zero)
1364  * @return number of bytes selected, or 0 if we decided to
1365  *         defer scheduling overall; in that case, retry_time is set.
1366  */
1367 static size_t
1368 select_messages (struct Neighbour *n,
1369                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1370 {
1371   struct MessageEntry *pos;
1372   struct MessageEntry *min;
1373   struct MessageEntry *last;
1374   unsigned int min_prio;
1375   struct GNUNET_TIME_Absolute t;
1376   struct GNUNET_TIME_Absolute now;
1377   uint64_t delta;
1378   uint64_t avail;
1379   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1380   size_t off;
1381   int discard_low_prio;
1382
1383   GNUNET_assert (NULL != n->messages);
1384   now = GNUNET_TIME_absolute_get ();
1385   /* last entry in linked list of messages processed */
1386   last = NULL;
1387   /* should we remove the entry with the lowest
1388      priority from consideration for scheduling at the
1389      end of the loop? */
1390   discard_low_prio = GNUNET_YES;
1391   while (GNUNET_YES == discard_low_prio)
1392     {
1393       min = NULL;
1394       min_prio = -1;
1395       discard_low_prio = GNUNET_NO;
1396       /* calculate number of bytes available for transmission at time "t" */
1397       update_window (GNUNET_NO,
1398                      &n->available_send_window,
1399                      &n->last_asw_update,
1400                      n->bpm_out);
1401       avail = n->available_send_window;
1402       t = n->last_asw_update;
1403       /* how many bytes have we (hypothetically) scheduled so far */
1404       off = 0;
1405       /* maximum time we can wait before transmitting anything
1406          and still make all of our deadlines */
1407       slack = -1;
1408
1409       pos = n->messages;
1410       /* note that we use "*2" here because we want to look
1411          a bit further into the future; much more makes no
1412          sense since new message might be scheduled in the
1413          meantime... */
1414       while ((pos != NULL) && (off < size * 2))
1415         {
1416           if (pos->do_transmit == GNUNET_YES)
1417             {
1418               /* already removed from consideration */
1419               pos = pos->next;
1420               continue;
1421             }
1422           if (discard_low_prio == GNUNET_NO)
1423             {
1424               delta = pos->deadline.value;
1425               if (delta < t.value)
1426                 delta = 0;
1427               else
1428                 delta = t.value - delta;
1429               avail += delta * n->bpm_out / 1000 / 60;
1430               if (avail < pos->size)
1431                 {
1432                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1433                 }
1434               else
1435                 {
1436                   avail -= pos->size;
1437                   /* update slack, considering both its absolute deadline
1438                      and relative deadlines caused by other messages
1439                      with their respective load */
1440                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1441                   if (pos->deadline.value < now.value)
1442                     slack = 0;
1443                   else
1444                     slack =
1445                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1446                 }
1447             }
1448           off += pos->size;
1449           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1450           if (pos->priority <= min_prio)
1451             {
1452               /* update min for discard */
1453               min_prio = pos->priority;
1454               min = pos;
1455             }
1456           pos = pos->next;
1457         }
1458       if (discard_low_prio)
1459         {
1460           GNUNET_assert (min != NULL);
1461           /* remove lowest-priority entry from consideration */
1462           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1463         }
1464       last = pos;
1465     }
1466   /* guard against sending "tiny" messages with large headers without
1467      urgent deadlines */
1468   if ( (slack > 1000) && (size > 4 * off) )
1469     {
1470       /* less than 25% of message would be filled with
1471          deadlines still being met if we delay by one
1472          second or more; so just wait for more data */
1473       retry_time->value = slack / 2;
1474       /* reset do_transmit values for next time */
1475       while (pos != last)
1476         {
1477           pos->do_transmit = GNUNET_NO;
1478           pos = pos->next;
1479         }
1480       return 0;
1481     }
1482   /* select marked messages (up to size) for transmission */
1483   off = 0;
1484   pos = n->messages;
1485   while (pos != last)
1486     {
1487       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1488         {
1489           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1490           off += pos->size;
1491           size -= pos->size;
1492         }
1493       else
1494         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1495       pos = pos->next;
1496     }
1497 #if DEBUG_CORE
1498   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1499               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1500               off, GNUNET_i2s (&n->peer));
1501 #endif
1502   return off;
1503 }
1504
1505
1506 /**
1507  * Batch multiple messages into a larger buffer.
1508  *
1509  * @param n neighbour to take messages from
1510  * @param buf target buffer
1511  * @param size size of buf
1512  * @param deadline set to transmission deadline for the result
1513  * @param retry_time set to the time when we should try again
1514  *        (only valid if this function returns zero)
1515  * @param priority set to the priority of the batch
1516  * @return number of bytes written to buf (can be zero)
1517  */
1518 static size_t
1519 batch_message (struct Neighbour *n,
1520                char *buf,
1521                size_t size,
1522                struct GNUNET_TIME_Absolute *deadline,
1523                struct GNUNET_TIME_Relative *retry_time,
1524                unsigned int *priority)
1525 {
1526   struct MessageEntry *pos;
1527   struct MessageEntry *prev;
1528   struct MessageEntry *next;
1529   size_t ret;
1530
1531   ret = 0;
1532   *priority = 0;
1533   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1534   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1535   if (0 == select_messages (n, size, retry_time))
1536     {
1537       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1538                   "No messages selected, will try again in %llu ms\n",
1539                   retry_time->value);
1540       return 0;
1541     }
1542   pos = n->messages;
1543   prev = NULL;
1544   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1545     {
1546       next = pos->next;
1547       if (GNUNET_YES == pos->do_transmit)
1548         {
1549           GNUNET_assert (pos->size <= size);
1550           memcpy (&buf[ret], &pos[1], pos->size);
1551           ret += pos->size;
1552           size -= pos->size;
1553           *priority += pos->priority;
1554 #if DEBUG_CORE
1555           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1556                       "Adding plaintext message with deadline %llu ms to batch\n",
1557                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1558 #endif
1559           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1560           GNUNET_free (pos);
1561           if (prev == NULL)
1562             n->messages = next;
1563           else
1564             prev->next = next;
1565         }
1566       else
1567         {
1568           prev = pos;
1569         }
1570       pos = next;
1571     }
1572 #if DEBUG_CORE
1573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1574               "Deadline for message batch is %llu ms\n",
1575               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1576 #endif
1577   return ret;
1578 }
1579
1580
1581 /**
1582  * Remove messages with deadlines that have long expired from
1583  * the queue.
1584  *
1585  * @param n neighbour to inspect
1586  */
1587 static void
1588 discard_expired_messages (struct Neighbour *n)
1589 {
1590   struct MessageEntry *prev;
1591   struct MessageEntry *next;
1592   struct MessageEntry *pos;
1593   struct GNUNET_TIME_Absolute now;
1594   struct GNUNET_TIME_Relative delta;
1595
1596   now = GNUNET_TIME_absolute_get ();
1597   prev = NULL;
1598   pos = n->messages;
1599   while (pos != NULL) 
1600     {
1601       next = pos->next;
1602       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1603       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1604         {
1605 #if DEBUG_CORE
1606           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1607                       "Message is %llu ms past due, discarding.\n",
1608                       delta.value);
1609 #endif
1610           if (prev == NULL)
1611             n->messages = next;
1612           else
1613             prev->next = next;
1614           GNUNET_free (pos);
1615         }
1616       else
1617         prev = pos;
1618       pos = next;
1619     }
1620 }
1621
1622
1623 /**
1624  * Signature of the main function of a task.
1625  *
1626  * @param cls closure
1627  * @param tc context information (why was this task triggered now)
1628  */
1629 static void
1630 retry_plaintext_processing (void *cls,
1631                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1632 {
1633   struct Neighbour *n = cls;
1634
1635   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1636   process_plaintext_neighbour_queue (n);
1637 }
1638
1639
1640 /**
1641  * Send our key (and encrypted PING) to the other peer.
1642  *
1643  * @param n the other peer
1644  */
1645 static void send_key (struct Neighbour *n);
1646
1647
1648 /**
1649  * Check if we have plaintext messages for the specified neighbour
1650  * pending, and if so, consider batching and encrypting them (and
1651  * then trigger processing of the encrypted queue if needed).
1652  *
1653  * @param n neighbour to check.
1654  */
1655 static void
1656 process_plaintext_neighbour_queue (struct Neighbour *n)
1657 {
1658   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1659   size_t used;
1660   size_t esize;
1661   struct EncryptedMessage *em;  /* encrypted message */
1662   struct EncryptedMessage *ph;  /* plaintext header */
1663   struct MessageEntry *me;
1664   unsigned int priority;
1665   struct GNUNET_TIME_Absolute deadline;
1666   struct GNUNET_TIME_Relative retry_time;
1667
1668   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1669     {
1670       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1671       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1672     }
1673   switch (n->status)
1674     {
1675     case PEER_STATE_DOWN:
1676       send_key (n);
1677 #if DEBUG_CORE
1678       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1679                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1680                   GNUNET_i2s(&n->peer));
1681 #endif
1682       return;
1683     case PEER_STATE_KEY_SENT:
1684       GNUNET_assert (n->retry_set_key_task !=
1685                      GNUNET_SCHEDULER_NO_TASK);
1686 #if DEBUG_CORE
1687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1689                   GNUNET_i2s(&n->peer));
1690 #endif
1691       return;
1692     case PEER_STATE_KEY_RECEIVED:
1693       GNUNET_assert (n->retry_set_key_task !=
1694                      GNUNET_SCHEDULER_NO_TASK);
1695 #if DEBUG_CORE
1696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1698                   GNUNET_i2s(&n->peer));
1699 #endif
1700       return;
1701     case PEER_STATE_KEY_CONFIRMED:
1702       /* ready to continue */
1703       break;
1704     }
1705   discard_expired_messages (n);
1706   if (n->messages == NULL)
1707     {
1708 #if DEBUG_CORE
1709       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1710                   "Plaintext message queue for `%4s' is empty.\n",
1711                   GNUNET_i2s(&n->peer));
1712 #endif
1713       return;                   /* no pending messages */
1714     }
1715   if (n->encrypted_head != NULL)
1716     {
1717 #if DEBUG_CORE
1718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1719                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1720                   GNUNET_i2s(&n->peer));
1721 #endif
1722       return;                   /* wait for messages already encrypted to be
1723                                    processed first! */
1724     }
1725   ph = (struct EncryptedMessage *) pbuf;
1726   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1727   priority = 0;
1728   used = sizeof (struct EncryptedMessage);
1729   used += batch_message (n,
1730                          &pbuf[used],
1731                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1732                          &deadline, &retry_time, &priority);
1733   if (used == sizeof (struct EncryptedMessage))
1734     {
1735 #if DEBUG_CORE
1736       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1738                   GNUNET_i2s(&n->peer));
1739 #endif
1740       /* no messages selected for sending, try again later... */
1741       n->retry_plaintext_task =
1742         GNUNET_SCHEDULER_add_delayed (sched,
1743                                       retry_time,
1744                                       &retry_plaintext_processing, n);
1745       return;
1746     }
1747   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1748   ph->inbound_bpm_limit = htonl (n->bpm_in);
1749   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1750
1751   /* setup encryption message header */
1752   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1753   me->deadline = deadline;
1754   me->priority = priority;
1755   me->size = used;
1756   em = (struct EncryptedMessage *) &me[1];
1757   em->header.size = htons (used);
1758   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1759   em->reserved = htonl (0);
1760   esize = used - ENCRYPTED_HEADER_SIZE;
1761   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1762   /* encrypt */
1763 #if DEBUG_CORE
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1766               esize,
1767               GNUNET_i2s(&n->peer),
1768               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1769 #endif
1770   GNUNET_assert (GNUNET_OK ==
1771                  do_encrypt (n,
1772                              &em->plaintext_hash,
1773                              &ph->sequence_number,
1774                              &em->sequence_number, esize));
1775   /* append to transmission list */
1776   if (n->encrypted_tail == NULL)
1777     n->encrypted_head = me;
1778   else
1779     n->encrypted_tail->next = me;
1780   n->encrypted_tail = me;
1781   process_encrypted_neighbour_queue (n);
1782 }
1783
1784
1785 /**
1786  * Handle CORE_SEND request.
1787  *
1788  * @param cls unused
1789  * @param client the client issuing the request
1790  * @param message the "struct SendMessage"
1791  */
1792 static void
1793 handle_client_send (void *cls,
1794                     struct GNUNET_SERVER_Client *client,
1795                     const struct GNUNET_MessageHeader *message);
1796
1797
1798 /**
1799  * Function called to notify us that we either succeeded
1800  * or failed to connect (at the transport level) to another
1801  * peer.  We should either free the message we were asked
1802  * to transmit or re-try adding it to the queue.
1803  *
1804  * @param cls closure
1805  * @param size number of bytes available in buf
1806  * @param buf where the callee should write the message
1807  * @return number of bytes written to buf
1808  */
1809 static size_t
1810 send_connect_continuation (void *cls, size_t size, void *buf)
1811 {
1812   struct SendMessage *sm = cls;
1813
1814   if (buf == NULL)
1815     {
1816 #if DEBUG_CORE
1817       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1818                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1819                   GNUNET_i2s (&sm->peer));
1820 #endif
1821       GNUNET_free (sm);
1822       return 0;
1823     }
1824 #if DEBUG_CORE
1825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1827               GNUNET_i2s (&sm->peer));
1828 #endif
1829   handle_client_send (NULL, NULL, &sm->header);
1830   GNUNET_free (sm);
1831   return 0;
1832 }
1833
1834
1835 /**
1836  * Handle CORE_SEND request.
1837  *
1838  * @param cls unused
1839  * @param client the client issuing the request
1840  * @param message the "struct SendMessage"
1841  */
1842 static void
1843 handle_client_send (void *cls,
1844                     struct GNUNET_SERVER_Client *client,
1845                     const struct GNUNET_MessageHeader *message)
1846 {
1847   const struct SendMessage *sm;
1848   struct SendMessage *smc;
1849   const struct GNUNET_MessageHeader *mh;
1850   struct Neighbour *n;
1851   struct MessageEntry *prev;
1852   struct MessageEntry *pos;
1853   struct MessageEntry *e; 
1854   struct MessageEntry *min_prio_entry;
1855   struct MessageEntry *min_prio_prev;
1856   unsigned int min_prio;
1857   unsigned int queue_size;
1858   uint16_t msize;
1859
1860   msize = ntohs (message->size);
1861   if (msize <
1862       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1863     {
1864       GNUNET_break (0);
1865       if (client != NULL)
1866         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1867       return;
1868     }
1869   sm = (const struct SendMessage *) message;
1870   msize -= sizeof (struct SendMessage);
1871   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1872   if (msize != ntohs (mh->size))
1873     {
1874       GNUNET_break (0);
1875       if (client != NULL)
1876         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1877       return;
1878     }
1879   n = find_neighbour (&sm->peer);
1880   if (n == NULL)
1881     {
1882 #if DEBUG_CORE
1883       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1884                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1885                   "SEND",
1886                   GNUNET_i2s (&sm->peer),
1887                   GNUNET_TIME_absolute_get_remaining
1888                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1889 #endif
1890       msize += sizeof (struct SendMessage);
1891       /* ask transport to connect to the peer */
1892       smc = GNUNET_malloc (msize);
1893       memcpy (smc, sm, msize);
1894       if (NULL ==
1895           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1896                                                   &sm->peer,
1897                                                   0, 0,
1898                                                   GNUNET_TIME_absolute_get_remaining
1899                                                   (GNUNET_TIME_absolute_ntoh
1900                                                    (sm->deadline)),
1901                                                   &send_connect_continuation,
1902                                                   smc))
1903         {
1904           /* transport has already a request pending for this peer! */
1905 #if DEBUG_CORE
1906           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1907                       "Dropped second message destined for `%4s' since connection is still down.\n",
1908                       GNUNET_i2s(&sm->peer));
1909 #endif
1910           GNUNET_free (smc);
1911         }
1912       if (client != NULL)
1913         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1914       return;
1915     }
1916 #if DEBUG_CORE
1917   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1919               "SEND",
1920               msize, 
1921               GNUNET_i2s (&sm->peer));
1922 #endif
1923   /* bound queue size */
1924   discard_expired_messages (n);
1925   min_prio = (unsigned int) -1;
1926   min_prio_entry = NULL;
1927   min_prio_prev = NULL;
1928   queue_size = 0;
1929   prev = NULL;
1930   pos = n->messages;
1931   while (pos != NULL) 
1932     {
1933       if (pos->priority < min_prio)
1934         {
1935           min_prio_entry = pos;
1936           min_prio_prev = prev;
1937           min_prio = pos->priority;
1938         }
1939       queue_size++;
1940       prev = pos;
1941       pos = pos->next;
1942     }
1943   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1944     {
1945       /* queue full */
1946       if (ntohl(sm->priority) <= min_prio)
1947         {
1948           /* discard new entry */
1949 #if DEBUG_CORE
1950           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1951                       "Queue full, discarding new request\n");
1952 #endif
1953           if (client != NULL)
1954             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1955           return;
1956         }
1957       /* discard "min_prio_entry" */
1958 #if DEBUG_CORE
1959       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1960                   "Queue full, discarding existing older request\n");
1961 #endif
1962       if (min_prio_prev == NULL)
1963         n->messages = min_prio_entry->next;
1964       else
1965         min_prio_prev->next = min_prio_entry->next;      
1966       GNUNET_free (min_prio_entry);     
1967     }
1968
1969 #if DEBUG_CORE
1970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1971               "Adding transmission request for `%4s' to queue\n",
1972               GNUNET_i2s (&sm->peer));
1973 #endif  
1974   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1975   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1976   e->priority = ntohl (sm->priority);
1977   e->size = msize;
1978   memcpy (&e[1], mh, msize);
1979
1980   /* insert, keep list sorted by deadline */
1981   prev = NULL;
1982   pos = n->messages;
1983   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1984     {
1985       prev = pos;
1986       pos = pos->next;
1987     }
1988   if (prev == NULL)
1989     n->messages = e;
1990   else
1991     prev->next = e;
1992   e->next = pos;
1993
1994   /* consider scheduling now */
1995   process_plaintext_neighbour_queue (n);
1996   if (client != NULL)
1997     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1998 }
1999
2000
2001 /**
2002  * Handle CORE_REQUEST_CONNECT request.
2003  *
2004  * @param cls unused
2005  * @param client the client issuing the request
2006  * @param message the "struct ConnectMessage"
2007  */
2008 static void
2009 handle_client_request_connect (void *cls,
2010                                struct GNUNET_SERVER_Client *client,
2011                                const struct GNUNET_MessageHeader *message)
2012 {
2013   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2014   struct Neighbour *n;
2015
2016   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2017   n = find_neighbour (&cm->peer);
2018   if (n != NULL)
2019     return; /* already connected, or at least trying */
2020 #if DEBUG_CORE
2021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2022               "Core received `%s' request for `%4s', will try to establish connection\n",
2023               "REQUEST_CONNECT",
2024               GNUNET_i2s (&cm->peer));
2025 #endif
2026   /* ask transport to connect to the peer */
2027   /* FIXME: timeout zero OK? need for cancellation? */
2028   GNUNET_TRANSPORT_notify_transmit_ready (transport,
2029                                           &cm->peer,
2030                                           0, 0,
2031                                           GNUNET_TIME_UNIT_ZERO,
2032                                           NULL,
2033                                           NULL);
2034 }
2035
2036
2037 /**
2038  * Handle CORE_REQUEST_DISCONNECT request.
2039  *
2040  * @param cls unused
2041  * @param client the client issuing the request
2042  * @param message the "struct ConnectMessage"
2043  */
2044 static void
2045 handle_client_request_disconnect (void *cls,
2046                                   struct GNUNET_SERVER_Client *client,
2047                                   const struct GNUNET_MessageHeader *message)
2048 {
2049   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2050   struct Neighbour *n;
2051
2052   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2053   n = find_neighbour (&cm->peer);
2054   if (n == NULL)
2055     return; /* done */
2056   /* FIXME: implement disconnect! */
2057 }
2058
2059
2060
2061 /**
2062  * List of handlers for the messages understood by this
2063  * service.
2064  */
2065 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2066   {&handle_client_init, NULL,
2067    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2068   {&handle_client_request_info, NULL,
2069    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2070    sizeof (struct RequestInfoMessage)},
2071   {&handle_client_send, NULL,
2072    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2073   {&handle_client_request_connect, NULL,
2074    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2075    sizeof (struct ConnectMessage)},
2076   {&handle_client_request_disconnect, NULL,
2077    GNUNET_MESSAGE_TYPE_CORE_REQUEST_DISCONNECT,
2078    sizeof (struct ConnectMessage)},
2079   {NULL, NULL, 0, 0}
2080 };
2081
2082
2083 /**
2084  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2085  * the neighbour's struct and retry send_key.  Or, if we did not get a
2086  * HELLO, just do nothing.
2087  *
2088  * @param cls NULL
2089  * @param peer the peer for which this is the HELLO
2090  * @param hello HELLO message of that peer
2091  * @param trust amount of trust we currently have in that peer
2092  */
2093 static void
2094 process_hello_retry_send_key (void *cls,
2095                               const struct GNUNET_PeerIdentity *peer,
2096                               const struct GNUNET_HELLO_Message *hello,
2097                               uint32_t trust)
2098 {
2099   struct Neighbour *n = cls;
2100
2101   if (peer == NULL)
2102     {
2103       n->pitr = NULL;
2104       return;
2105     }
2106   if (n->public_key != NULL)
2107     return;
2108 #if DEBUG_CORE
2109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110               "Received new `%s' message for `%4s', initiating key exchange.\n",
2111               "HELLO",
2112               GNUNET_i2s (peer));
2113 #endif
2114   n->public_key =
2115     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2116   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2117     {
2118       GNUNET_free (n->public_key);
2119       n->public_key = NULL;
2120       return;
2121     }
2122   send_key (n);
2123 }
2124
2125
2126 /**
2127  * Task that will retry "send_key" if our previous attempt failed
2128  * to yield a PONG.
2129  */
2130 static void
2131 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2132 {
2133   struct Neighbour *n = cls;
2134
2135   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2136   n->set_key_retry_frequency =
2137     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2138   send_key (n);
2139 }
2140
2141
2142 /**
2143  * Send our key (and encrypted PING) to the other peer.
2144  *
2145  * @param n the other peer
2146  */
2147 static void
2148 send_key (struct Neighbour *n)
2149 {
2150   struct SetKeyMessage *sm;
2151   struct MessageEntry *me;
2152   struct PingMessage pp;
2153   struct PingMessage *pm;
2154
2155   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2156        (n->pitr != NULL) )
2157     return; /* already in progress */
2158 #if DEBUG_CORE
2159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2160               "Asked to perform key exchange with `%4s'.\n",
2161               GNUNET_i2s (&n->peer));
2162 #endif
2163   if (n->public_key == NULL)
2164     {
2165       /* lookup n's public key, then try again */
2166 #if DEBUG_CORE
2167       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2168                   "Lacking public key for `%4s', trying to obtain one.\n",
2169                   GNUNET_i2s (&n->peer));
2170 #endif
2171       GNUNET_assert (n->pitr == NULL);
2172       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2173                                          sched,
2174                                          &n->peer,
2175                                          0,
2176                                          GNUNET_TIME_UNIT_MINUTES,
2177                                          &process_hello_retry_send_key, n);
2178       return;
2179     }
2180   /* first, set key message */
2181   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2182                       sizeof (struct SetKeyMessage));
2183   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2184   me->priority = SET_KEY_PRIORITY;
2185   me->size = sizeof (struct SetKeyMessage);
2186   if (n->encrypted_head == NULL)
2187     n->encrypted_head = me;
2188   else
2189     n->encrypted_tail->next = me;
2190   n->encrypted_tail = me;
2191   sm = (struct SetKeyMessage *) &me[1];
2192   sm->header.size = htons (sizeof (struct SetKeyMessage));
2193   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2194   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2195                                         PEER_STATE_KEY_SENT : n->status));
2196   sm->purpose.size =
2197     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2198            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2199            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2200            sizeof (struct GNUNET_PeerIdentity));
2201   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2202   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2203   sm->target = n->peer;
2204   GNUNET_assert (GNUNET_OK ==
2205                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2206                                             sizeof (struct
2207                                                     GNUNET_CRYPTO_AesSessionKey),
2208                                             n->public_key,
2209                                             &sm->encrypted_key));
2210   GNUNET_assert (GNUNET_OK ==
2211                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2212                                          &sm->signature));
2213
2214   /* second, encrypted PING message */
2215   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2216                       sizeof (struct PingMessage));
2217   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2218   me->priority = PING_PRIORITY;
2219   me->size = sizeof (struct PingMessage);
2220   n->encrypted_tail->next = me;
2221   n->encrypted_tail = me;
2222   pm = (struct PingMessage *) &me[1];
2223   pm->header.size = htons (sizeof (struct PingMessage));
2224   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2225   pp.challenge = htonl (n->ping_challenge);
2226   pp.target = n->peer;
2227 #if DEBUG_CORE
2228   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2229               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2230               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2233               "PING",
2234               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2235 #endif
2236   do_encrypt (n,
2237               &n->peer.hashPubKey,
2238               &pp.challenge,
2239               &pm->challenge,
2240               sizeof (struct PingMessage) -
2241               sizeof (struct GNUNET_MessageHeader));
2242   /* update status */
2243   switch (n->status)
2244     {
2245     case PEER_STATE_DOWN:
2246       n->status = PEER_STATE_KEY_SENT;
2247       break;
2248     case PEER_STATE_KEY_SENT:
2249       break;
2250     case PEER_STATE_KEY_RECEIVED:
2251       break;
2252     case PEER_STATE_KEY_CONFIRMED:
2253       break;
2254     default:
2255       GNUNET_break (0);
2256       break;
2257     }
2258 #if DEBUG_CORE
2259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2260               "Have %llu ms left for `%s' transmission.\n",
2261               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2262               "SET_KEY");
2263 #endif
2264   /* trigger queue processing */
2265   process_encrypted_neighbour_queue (n);
2266   if (n->status != PEER_STATE_KEY_CONFIRMED)
2267     {
2268       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task);
2269       n->retry_set_key_task
2270         = GNUNET_SCHEDULER_add_delayed (sched,
2271                                         n->set_key_retry_frequency,
2272                                         &set_key_retry_task, n);
2273     }
2274 }
2275
2276
2277 /**
2278  * We received a SET_KEY message.  Validate and update
2279  * our key material and status.
2280  *
2281  * @param n the neighbour from which we received message m
2282  * @param m the set key message we received
2283  */
2284 static void
2285 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2286
2287
2288 /**
2289  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2290  * the neighbour's struct and retry handling the set_key message.  Or,
2291  * if we did not get a HELLO, just free the set key message.
2292  *
2293  * @param cls pointer to the set key message
2294  * @param peer the peer for which this is the HELLO
2295  * @param hello HELLO message of that peer
2296  * @param trust amount of trust we currently have in that peer
2297  */
2298 static void
2299 process_hello_retry_handle_set_key (void *cls,
2300                                     const struct GNUNET_PeerIdentity *peer,
2301                                     const struct GNUNET_HELLO_Message *hello,
2302                                     uint32_t trust)
2303 {
2304   struct Neighbour *n = cls;
2305   struct SetKeyMessage *sm = n->skm;
2306
2307   if (peer == NULL)
2308     {
2309       GNUNET_free (sm);
2310       n->skm = NULL;
2311       n->pitr = NULL;
2312       return;
2313     }
2314   if (n->public_key != NULL)
2315     return;                     /* multiple HELLOs match!? */
2316   n->public_key =
2317     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2318   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2319     {
2320       GNUNET_break_op (0);
2321       GNUNET_free (n->public_key);
2322       n->public_key = NULL;
2323       return;
2324     }
2325 #if DEBUG_CORE
2326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2328               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2329 #endif
2330   handle_set_key (n, sm);
2331 }
2332
2333
2334 /**
2335  * We received a PING message.  Validate and transmit
2336  * PONG.
2337  *
2338  * @param n sender of the PING
2339  * @param m the encrypted PING message itself
2340  */
2341 static void
2342 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2343 {
2344   struct PingMessage t;
2345   struct PingMessage *tp;
2346   struct MessageEntry *me;
2347
2348 #if DEBUG_CORE
2349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350               "Core service receives `%s' request from `%4s'.\n",
2351               "PING", GNUNET_i2s (&n->peer));
2352 #endif
2353   if (GNUNET_OK !=
2354       do_decrypt (n,
2355                   &my_identity.hashPubKey,
2356                   &m->challenge,
2357                   &t.challenge,
2358                   sizeof (struct PingMessage) -
2359                   sizeof (struct GNUNET_MessageHeader)))
2360     return;
2361 #if DEBUG_CORE
2362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2364               "PING",
2365               GNUNET_i2s (&t.target),
2366               ntohl (t.challenge), n->decrypt_key.crc32);
2367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2368               "Target of `%s' request is `%4s'.\n",
2369               "PING", GNUNET_i2s (&t.target));
2370 #endif
2371   if (0 != memcmp (&t.target,
2372                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2373     {
2374       GNUNET_break_op (0);
2375       return;
2376     }
2377   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2378                       sizeof (struct PingMessage));
2379   if (n->encrypted_tail != NULL)
2380     n->encrypted_tail->next = me;
2381   else
2382     {
2383       n->encrypted_tail = me;
2384       n->encrypted_head = me;
2385     }
2386   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2387   me->priority = PONG_PRIORITY;
2388   me->size = sizeof (struct PingMessage);
2389   tp = (struct PingMessage *) &me[1];
2390   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2391   tp->header.size = htons (sizeof (struct PingMessage));
2392   do_encrypt (n,
2393               &my_identity.hashPubKey,
2394               &t.challenge,
2395               &tp->challenge,
2396               sizeof (struct PingMessage) -
2397               sizeof (struct GNUNET_MessageHeader));
2398 #if DEBUG_CORE
2399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2401               ntohl (t.challenge), n->encrypt_key.crc32);
2402 #endif
2403   /* trigger queue processing */
2404   process_encrypted_neighbour_queue (n);
2405 }
2406
2407
2408 /**
2409  * We received a SET_KEY message.  Validate and update
2410  * our key material and status.
2411  *
2412  * @param n the neighbour from which we received message m
2413  * @param m the set key message we received
2414  */
2415 static void
2416 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2417 {
2418   struct SetKeyMessage *m_cpy;
2419   struct GNUNET_TIME_Absolute t;
2420   struct GNUNET_CRYPTO_AesSessionKey k;
2421   struct PingMessage *ping;
2422   enum PeerStateMachine sender_status;
2423
2424 #if DEBUG_CORE
2425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2426               "Core service receives `%s' request from `%4s'.\n",
2427               "SET_KEY", GNUNET_i2s (&n->peer));
2428 #endif
2429   if (n->public_key == NULL)
2430     {
2431 #if DEBUG_CORE
2432       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2433                   "Lacking public key for peer, trying to obtain one.\n");
2434 #endif
2435       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2436       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2437       /* lookup n's public key, then try again */
2438       GNUNET_assert (n->pitr == NULL);
2439       GNUNET_assert (n->skm == NULL);
2440       n->skm = m_cpy;
2441       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2442                                          sched,
2443                                          &n->peer,
2444                                          0,
2445                                          GNUNET_TIME_UNIT_MINUTES,
2446                                          &process_hello_retry_handle_set_key, n);
2447       return;
2448     }
2449   if (0 != memcmp (&m->target,
2450                    &my_identity,
2451                    sizeof (struct GNUNET_PeerIdentity)))
2452     {
2453       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2454                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2455       return;
2456     }
2457   if ((ntohl (m->purpose.size) !=
2458        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2459        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2460        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2461        sizeof (struct GNUNET_PeerIdentity)) ||
2462       (GNUNET_OK !=
2463        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2464                                  &m->purpose, &m->signature, n->public_key)))
2465     {
2466       /* invalid signature */
2467       GNUNET_break_op (0);
2468       return;
2469     }
2470   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2471   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2472        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2473       (t.value < n->decrypt_key_created.value))
2474     {
2475       /* this could rarely happen due to massive re-ordering of
2476          messages on the network level, but is most likely either
2477          a bug or some adversary messing with us.  Report. */
2478       GNUNET_break_op (0);
2479       return;
2480     }
2481 #if DEBUG_CORE
2482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2483 #endif  
2484   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2485                                   &m->encrypted_key,
2486                                   &k,
2487                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2488        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2489       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2490     {
2491       /* failed to decrypt !? */
2492       GNUNET_break_op (0);
2493       return;
2494     }
2495
2496   n->decrypt_key = k;
2497   if (n->decrypt_key_created.value != t.value)
2498     {
2499       /* fresh key, reset sequence numbers */
2500       n->last_sequence_number_received = 0;
2501       n->last_packets_bitmap = 0;
2502       n->decrypt_key_created = t;
2503     }
2504   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2505   switch (n->status)
2506     {
2507     case PEER_STATE_DOWN:
2508       n->status = PEER_STATE_KEY_RECEIVED;
2509 #if DEBUG_CORE
2510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2511                   "Responding to `%s' with my own key.\n", "SET_KEY");
2512 #endif
2513       send_key (n);
2514       break;
2515     case PEER_STATE_KEY_SENT:
2516     case PEER_STATE_KEY_RECEIVED:
2517       n->status = PEER_STATE_KEY_RECEIVED;
2518       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2519           (sender_status != PEER_STATE_KEY_CONFIRMED))
2520         {
2521 #if DEBUG_CORE
2522           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2523                       "Responding to `%s' with my own key (other peer has status %u).\n",
2524                       "SET_KEY", sender_status);
2525 #endif
2526           send_key (n);
2527         }
2528       break;
2529     case PEER_STATE_KEY_CONFIRMED:
2530       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2531           (sender_status != PEER_STATE_KEY_CONFIRMED))
2532         {         
2533 #if DEBUG_CORE
2534           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2535                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2536                       "SET_KEY", sender_status);
2537 #endif
2538           send_key (n);
2539         }
2540       break;
2541     default:
2542       GNUNET_break (0);
2543       break;
2544     }
2545   if (n->pending_ping != NULL)
2546     {
2547       ping = n->pending_ping;
2548       n->pending_ping = NULL;
2549       handle_ping (n, ping);
2550       GNUNET_free (ping);
2551     }
2552 }
2553
2554
2555 /**
2556  * We received a PONG message.  Validate and update our status.
2557  *
2558  * @param n sender of the PONG
2559  * @param m the encrypted PONG message itself
2560  */
2561 static void
2562 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2563 {
2564   struct PingMessage t;
2565   struct ConnectNotifyMessage cnm;
2566
2567 #if DEBUG_CORE
2568   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2569               "Core service receives `%s' request from `%4s'.\n",
2570               "PONG", GNUNET_i2s (&n->peer));
2571 #endif
2572   if (GNUNET_OK !=
2573       do_decrypt (n,
2574                   &n->peer.hashPubKey,
2575                   &m->challenge,
2576                   &t.challenge,
2577                   sizeof (struct PingMessage) -
2578                   sizeof (struct GNUNET_MessageHeader)))
2579     return;
2580 #if DEBUG_CORE
2581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2582               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2583               "PONG",
2584               GNUNET_i2s (&t.target),
2585               ntohl (t.challenge), n->decrypt_key.crc32);
2586 #endif
2587   if ((0 != memcmp (&t.target,
2588                     &n->peer,
2589                     sizeof (struct GNUNET_PeerIdentity))) ||
2590       (n->ping_challenge != ntohl (t.challenge)))
2591     {
2592       /* PONG malformed */
2593 #if DEBUG_CORE
2594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2595                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2596                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2598                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2599                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2600 #endif
2601       GNUNET_break_op (0);
2602       return;
2603     }
2604   switch (n->status)
2605     {
2606     case PEER_STATE_DOWN:
2607       GNUNET_break (0);         /* should be impossible */
2608       return;
2609     case PEER_STATE_KEY_SENT:
2610       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2611       return;
2612     case PEER_STATE_KEY_RECEIVED:
2613       n->status = PEER_STATE_KEY_CONFIRMED;
2614       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2615         {
2616           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2617           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2618         }      
2619       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2620       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2621       cnm.distance = htonl (n->last_distance);
2622       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2623       cnm.peer = n->peer;
2624       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2625       process_encrypted_neighbour_queue (n);
2626       break;
2627     case PEER_STATE_KEY_CONFIRMED:
2628       /* duplicate PONG? */
2629       break;
2630     default:
2631       GNUNET_break (0);
2632       break;
2633     }
2634 }
2635
2636
2637 /**
2638  * Send a P2P message to a client.
2639  *
2640  * @param sender who sent us the message?
2641  * @param client who should we give the message to?
2642  * @param m contains the message to transmit
2643  * @param msize number of bytes in buf to transmit
2644  */
2645 static void
2646 send_p2p_message_to_client (struct Neighbour *sender,
2647                             struct Client *client,
2648                             const void *m, size_t msize)
2649 {
2650   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2651   struct NotifyTrafficMessage *ntm;
2652
2653 #if DEBUG_CORE
2654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2655               "Core service passes message from `%4s' of type %u to client.\n",
2656               GNUNET_i2s(&sender->peer),
2657               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2658 #endif
2659   ntm = (struct NotifyTrafficMessage *) buf;
2660   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2661   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2662   ntm->distance = htonl (sender->last_distance);
2663   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2664   ntm->peer = sender->peer;
2665   memcpy (&ntm[1], m, msize);
2666   send_to_client (client, &ntm->header, GNUNET_YES);
2667 }
2668
2669
2670 /**
2671  * Deliver P2P message to interested clients.
2672  *
2673  * @param sender who sent us the message?
2674  * @param m the message
2675  * @param msize size of the message (including header)
2676  */
2677 static void
2678 deliver_message (struct Neighbour *sender,
2679                  const struct GNUNET_MessageHeader *m, size_t msize)
2680 {
2681   struct Client *cpos;
2682   uint16_t type;
2683   unsigned int tpos;
2684   int deliver_full;
2685
2686   type = ntohs (m->type);
2687   cpos = clients;
2688   while (cpos != NULL)
2689     {
2690       deliver_full = GNUNET_NO;
2691       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2692         deliver_full = GNUNET_YES;
2693       else
2694         {
2695           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2696             {
2697               if (type != cpos->types[tpos])
2698                 continue;
2699               deliver_full = GNUNET_YES;
2700               break;
2701             }
2702         }
2703       if (GNUNET_YES == deliver_full)
2704         send_p2p_message_to_client (sender, cpos, m, msize);
2705       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2706         send_p2p_message_to_client (sender, cpos, m,
2707                                     sizeof (struct GNUNET_MessageHeader));
2708       cpos = cpos->next;
2709     }
2710 }
2711
2712
2713 /**
2714  * Align P2P message and then deliver to interested clients.
2715  *
2716  * @param sender who sent us the message?
2717  * @param buffer unaligned (!) buffer containing message
2718  * @param msize size of the message (including header)
2719  */
2720 static void
2721 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2722 {
2723   char abuf[msize];
2724
2725   /* TODO: call to statistics? */
2726   memcpy (abuf, buffer, msize);
2727   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2728 }
2729
2730
2731 /**
2732  * Deliver P2P messages to interested clients.
2733  *
2734  * @param sender who sent us the message?
2735  * @param buffer buffer containing messages, can be modified
2736  * @param buffer_size size of the buffer (overall)
2737  * @param offset offset where messages in the buffer start
2738  */
2739 static void
2740 deliver_messages (struct Neighbour *sender,
2741                   const char *buffer, size_t buffer_size, size_t offset)
2742 {
2743   struct GNUNET_MessageHeader *mhp;
2744   struct GNUNET_MessageHeader mh;
2745   uint16_t msize;
2746   int need_align;
2747
2748   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2749     {
2750       if (0 != offset % sizeof (uint16_t))
2751         {
2752           /* outch, need to copy to access header */
2753           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2754           mhp = &mh;
2755         }
2756       else
2757         {
2758           /* can access header directly */
2759           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2760         }
2761       msize = ntohs (mhp->size);
2762       if (msize + offset > buffer_size)
2763         {
2764           /* malformed message, header says it is larger than what
2765              would fit into the overall buffer */
2766           GNUNET_break_op (0);
2767           break;
2768         }
2769 #if HAVE_UNALIGNED_64_ACCESS
2770       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2771 #else
2772       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2773 #endif
2774       if (GNUNET_YES == need_align)
2775         align_and_deliver (sender, &buffer[offset], msize);
2776       else
2777         deliver_message (sender,
2778                          (const struct GNUNET_MessageHeader *)
2779                          &buffer[offset], msize);
2780       offset += msize;
2781     }
2782 }
2783
2784
2785 /**
2786  * We received an encrypted message.  Decrypt, validate and
2787  * pass on to the appropriate clients.
2788  */
2789 static void
2790 handle_encrypted_message (struct Neighbour *n,
2791                           const struct EncryptedMessage *m)
2792 {
2793   size_t size = ntohs (m->header.size);
2794   char buf[size];
2795   struct EncryptedMessage *pt;  /* plaintext */
2796   GNUNET_HashCode ph;
2797   size_t off;
2798   uint32_t snum;
2799   struct GNUNET_TIME_Absolute t;
2800
2801 #if DEBUG_CORE
2802   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2803               "Core service receives `%s' request from `%4s'.\n",
2804               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2805 #endif  
2806   /* decrypt */
2807   if (GNUNET_OK !=
2808       do_decrypt (n,
2809                   &m->plaintext_hash,
2810                   &m->sequence_number,
2811                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2812     return;
2813   pt = (struct EncryptedMessage *) buf;
2814
2815   /* validate hash */
2816   GNUNET_CRYPTO_hash (&pt->sequence_number,
2817                       size - ENCRYPTED_HEADER_SIZE, &ph);
2818   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2819     {
2820       /* checksum failed */
2821       GNUNET_break_op (0);
2822       return;
2823     }
2824
2825   /* validate sequence number */
2826   snum = ntohl (pt->sequence_number);
2827   if (n->last_sequence_number_received == snum)
2828     {
2829       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2830                   "Received duplicate message, ignoring.\n");
2831       /* duplicate, ignore */
2832       return;
2833     }
2834   if ((n->last_sequence_number_received > snum) &&
2835       (n->last_sequence_number_received - snum > 32))
2836     {
2837       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2838                   "Received ancient out of sequence message, ignoring.\n");
2839       /* ancient out of sequence, ignore */
2840       return;
2841     }
2842   if (n->last_sequence_number_received > snum)
2843     {
2844       unsigned int rotbit =
2845         1 << (n->last_sequence_number_received - snum - 1);
2846       if ((n->last_packets_bitmap & rotbit) != 0)
2847         {
2848           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2849                       "Received duplicate message, ignoring.\n");
2850           /* duplicate, ignore */
2851           return;
2852         }
2853       n->last_packets_bitmap |= rotbit;
2854     }
2855   if (n->last_sequence_number_received < snum)
2856     {
2857       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2858       n->last_sequence_number_received = snum;
2859     }
2860
2861   /* check timestamp */
2862   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2863   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2864     {
2865       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2866                   _
2867                   ("Message received far too old (%llu ms). Content ignored.\n"),
2868                   GNUNET_TIME_absolute_get_duration (t).value);
2869       return;
2870     }
2871
2872   /* process decrypted message(s) */
2873   update_window (GNUNET_YES,
2874                  &n->available_send_window,
2875                  &n->last_asw_update,
2876                  n->bpm_out);
2877   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2878   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2879                            n->bpm_out_internal_limit);
2880   n->last_activity = GNUNET_TIME_absolute_get ();
2881   off = sizeof (struct EncryptedMessage);
2882   deliver_messages (n, buf, size, off);
2883 }
2884
2885
2886 /**
2887  * Function called by the transport for each received message.
2888  *
2889  * @param cls closure
2890  * @param peer (claimed) identity of the other peer
2891  * @param message the message
2892  * @param latency estimated latency for communicating with the
2893  *             given peer (round-trip)
2894  * @param distance in overlay hops, as given by transport plugin
2895  */
2896 static void
2897 handle_transport_receive (void *cls,
2898                           const struct GNUNET_PeerIdentity *peer,
2899                           const struct GNUNET_MessageHeader *message,
2900                           struct GNUNET_TIME_Relative latency,
2901                           unsigned int distance)
2902 {
2903   struct Neighbour *n;
2904   struct GNUNET_TIME_Absolute now;
2905   int up;
2906   uint16_t type;
2907   uint16_t size;
2908
2909 #if DEBUG_CORE
2910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2911               "Received message of type %u from `%4s', demultiplexing.\n",
2912               ntohs (message->type), GNUNET_i2s (peer));
2913 #endif
2914   n = find_neighbour (peer);
2915   if (n == NULL)
2916     {
2917       GNUNET_break (0);
2918       return;
2919     }
2920   n->last_latency = latency;
2921   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2922   type = ntohs (message->type);
2923   size = ntohs (message->size);
2924   switch (type)
2925     {
2926     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2927       if (size != sizeof (struct SetKeyMessage))
2928         {
2929           GNUNET_break_op (0);
2930           return;
2931         }
2932       handle_set_key (n, (const struct SetKeyMessage *) message);
2933       break;
2934     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2935       if (size < sizeof (struct EncryptedMessage) +
2936           sizeof (struct GNUNET_MessageHeader))
2937         {
2938           GNUNET_break_op (0);
2939           return;
2940         }
2941       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2942           (n->status != PEER_STATE_KEY_CONFIRMED))
2943         {
2944           GNUNET_break_op (0);
2945           return;
2946         }
2947       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2948       break;
2949     case GNUNET_MESSAGE_TYPE_CORE_PING:
2950       if (size != sizeof (struct PingMessage))
2951         {
2952           GNUNET_break_op (0);
2953           return;
2954         }
2955       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2956           (n->status != PEER_STATE_KEY_CONFIRMED))
2957         {
2958 #if DEBUG_CORE
2959           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2960                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2961                       "PING", GNUNET_i2s (&n->peer));
2962 #endif
2963           GNUNET_free_non_null (n->pending_ping);
2964           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2965           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2966           return;
2967         }
2968       handle_ping (n, (const struct PingMessage *) message);
2969       break;
2970     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2971       if (size != sizeof (struct PingMessage))
2972         {
2973           GNUNET_break_op (0);
2974           return;
2975         }
2976       if ((n->status != PEER_STATE_KEY_SENT) &&
2977           (n->status != PEER_STATE_KEY_RECEIVED) &&
2978           (n->status != PEER_STATE_KEY_CONFIRMED))
2979         {
2980           /* could not decrypt pong, oops! */
2981           GNUNET_break_op (0);
2982           return;
2983         }
2984       handle_pong (n, (const struct PingMessage *) message);
2985       break;
2986     default:
2987       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2988                   _("Unsupported message of type %u received.\n"), type);
2989       return;
2990     }
2991   if (n->status == PEER_STATE_KEY_CONFIRMED)
2992     {
2993       now = GNUNET_TIME_absolute_get ();
2994       n->last_activity = now;
2995       if (!up)
2996         n->time_established = now;
2997     }
2998 }
2999
3000
3001 /**
3002  * Function that recalculates the bandwidth quota for the
3003  * given neighbour and transmits it to the transport service.
3004  * 
3005  * @param cls neighbour for the quota update
3006  * @param tc context
3007  */
3008 static void
3009 neighbour_quota_update (void *cls,
3010                         const struct GNUNET_SCHEDULER_TaskContext *tc);
3011
3012
3013 /**
3014  * Schedule the task that will recalculate the bandwidth
3015  * quota for this peer (and possibly force a disconnect of
3016  * idle peers by calculating a bandwidth of zero).
3017  */
3018 static void
3019 schedule_quota_update (struct Neighbour *n)
3020 {
3021   GNUNET_assert (n->quota_update_task ==
3022                  GNUNET_SCHEDULER_NO_TASK);
3023   n->quota_update_task
3024     = GNUNET_SCHEDULER_add_delayed (sched,
3025                                     QUOTA_UPDATE_FREQUENCY,
3026                                     &neighbour_quota_update,
3027                                     n);
3028 }
3029
3030
3031 /**
3032  * Function that recalculates the bandwidth quota for the
3033  * given neighbour and transmits it to the transport service.
3034  * 
3035  * @param cls neighbour for the quota update
3036  * @param tc context
3037  */
3038 static void
3039 neighbour_quota_update (void *cls,
3040                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3041 {
3042   struct Neighbour *n = cls;
3043   uint32_t q_in;
3044   double pref_rel;
3045   double share;
3046   unsigned long long distributable;
3047   
3048   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3049   /* calculate relative preference among all neighbours;
3050      divides by a bit more to avoid division by zero AND to
3051      account for possibility of new neighbours joining any time 
3052      AND to convert to double... */
3053   pref_rel = n->current_preference / (1.0 + preference_sum);
3054   distributable = 0;
3055   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
3056     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
3057   share = distributable * pref_rel;
3058   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
3059   /* check if we want to disconnect for good due to inactivity */
3060   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3061        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3062     q_in = 0; /* force disconnect */
3063   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
3064        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
3065     {
3066       n->bpm_in = q_in;
3067       GNUNET_TRANSPORT_set_quota (transport,
3068                                   &n->peer,
3069                                   n->bpm_in, 
3070                                   n->bpm_out,
3071                                   GNUNET_TIME_UNIT_FOREVER_REL,
3072                                   NULL, NULL);
3073     }
3074   schedule_quota_update (n);
3075 }
3076
3077
3078 /**
3079  * Function called by transport to notify us that
3080  * a peer connected to us (on the network level).
3081  *
3082  * @param cls closure
3083  * @param peer the peer that connected
3084  * @param latency current latency of the connection
3085  * @param distance in overlay hops, as given by transport plugin
3086  */
3087 static void
3088 handle_transport_notify_connect (void *cls,
3089                                  const struct GNUNET_PeerIdentity *peer,
3090                                  struct GNUNET_TIME_Relative latency,
3091                                  unsigned int distance)
3092 {
3093   struct Neighbour *n;
3094   struct GNUNET_TIME_Absolute now;
3095   struct ConnectNotifyMessage cnm;
3096
3097   n = find_neighbour (peer);
3098   if (n != NULL)
3099     {
3100       /* duplicate connect notification!? */
3101       GNUNET_break (0);
3102       return;
3103     }
3104   now = GNUNET_TIME_absolute_get ();
3105   n = GNUNET_malloc (sizeof (struct Neighbour));
3106   n->next = neighbours;
3107   neighbours = n;
3108   neighbour_count++;
3109   n->peer = *peer;
3110   n->last_latency = latency;
3111   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
3112   n->encrypt_key_created = now;
3113   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
3114   n->last_asw_update = now;
3115   n->last_arw_update = now;
3116   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3117   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3118   n->bpm_out_internal_limit = (uint32_t) - 1;
3119   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3120   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3121                                                 (uint32_t) - 1);
3122 #if DEBUG_CORE
3123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3124               "Received connection from `%4s'.\n",
3125               GNUNET_i2s (&n->peer));
3126 #endif
3127   schedule_quota_update (n);
3128   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3129   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3130   cnm.distance = htonl (n->last_distance);
3131   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3132   cnm.peer = *peer;
3133   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3134   send_key (n);
3135 }
3136
3137
3138 /**
3139  * Free the given entry for the neighbour (it has
3140  * already been removed from the list at this point).
3141  *
3142  * @param n neighbour to free
3143  */
3144 static void
3145 free_neighbour (struct Neighbour *n)
3146 {
3147   struct MessageEntry *m;
3148
3149   if (n->pitr != NULL)
3150     {
3151       GNUNET_PEERINFO_iterate_cancel (n->pitr);
3152       n->pitr = NULL;
3153     }
3154   if (n->skm != NULL)
3155     {
3156       GNUNET_free (n->skm);
3157       n->skm = NULL;
3158     }
3159   while (NULL != (m = n->messages))
3160     {
3161       n->messages = m->next;
3162       GNUNET_free (m);
3163     }
3164   while (NULL != (m = n->encrypted_head))
3165     {
3166       n->encrypted_head = m->next;
3167       GNUNET_free (m);
3168     }
3169   if (NULL != n->th)
3170     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3171   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3172     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3173   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3174     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3175   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3176     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3177   GNUNET_free_non_null (n->public_key);
3178   GNUNET_free_non_null (n->pending_ping);
3179   GNUNET_free (n);
3180 }
3181
3182
3183 /**
3184  * Function called by transport telling us that a peer
3185  * disconnected.
3186  *
3187  * @param cls closure
3188  * @param peer the peer that disconnected
3189  */
3190 static void
3191 handle_transport_notify_disconnect (void *cls,
3192                                     const struct GNUNET_PeerIdentity *peer)
3193 {
3194   struct DisconnectNotifyMessage cnm;
3195   struct Neighbour *n;
3196   struct Neighbour *p;
3197
3198 #if DEBUG_CORE
3199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3200               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3201 #endif
3202   p = NULL;
3203   n = neighbours;
3204   while ((n != NULL) &&
3205          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3206     {
3207       p = n;
3208       n = n->next;
3209     }
3210   if (n == NULL)
3211     {
3212       GNUNET_break (0);
3213       return;
3214     }
3215   if (p == NULL)
3216     neighbours = n->next;
3217   else
3218     p->next = n->next;
3219   GNUNET_assert (neighbour_count > 0);
3220   neighbour_count--;
3221   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3222   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3223   cnm.peer = *peer;
3224   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3225   free_neighbour (n);
3226 }
3227
3228
3229 /**
3230  * Last task run during shutdown.  Disconnects us from
3231  * the transport.
3232  */
3233 static void
3234 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3235 {
3236   struct Neighbour *n;
3237   struct Client *c;
3238
3239 #if DEBUG_CORE
3240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3241               "Core service shutting down.\n");
3242 #endif
3243   GNUNET_assert (transport != NULL);
3244   GNUNET_TRANSPORT_disconnect (transport);
3245   transport = NULL;
3246   while (NULL != (n = neighbours))
3247     {
3248       neighbours = n->next;
3249       GNUNET_assert (neighbour_count > 0);
3250       neighbour_count--;
3251       free_neighbour (n);
3252     }
3253   while (NULL != (c = clients))
3254     handle_client_disconnect (NULL, c->client_handle);
3255   if (my_private_key != NULL)
3256     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3257 }
3258
3259
3260 /**
3261  * Initiate core service.
3262  *
3263  * @param cls closure
3264  * @param s scheduler to use
3265  * @param serv the initialized server
3266  * @param c configuration to use
3267  */
3268 static void
3269 run (void *cls,
3270      struct GNUNET_SCHEDULER_Handle *s,
3271      struct GNUNET_SERVER_Handle *serv,
3272      const struct GNUNET_CONFIGURATION_Handle *c)
3273 {
3274 #if 0
3275   unsigned long long qin;
3276   unsigned long long qout;
3277   unsigned long long tneigh;
3278 #endif
3279   char *keyfile;
3280
3281   sched = s;
3282   cfg = c;
3283   /* parse configuration */
3284   if (
3285        (GNUNET_OK !=
3286         GNUNET_CONFIGURATION_get_value_number (c,
3287                                                "CORE",
3288                                                "TOTAL_QUOTA_IN",
3289                                                &bandwidth_target_in)) ||
3290        (GNUNET_OK !=
3291         GNUNET_CONFIGURATION_get_value_number (c,
3292                                                "CORE",
3293                                                "TOTAL_QUOTA_OUT",
3294                                                &bandwidth_target_out)) ||
3295 #if 0
3296        (GNUNET_OK !=
3297         GNUNET_CONFIGURATION_get_value_number (c,
3298                                                "CORE",
3299                                                "YY",
3300                                                &qout)) ||
3301        (GNUNET_OK !=
3302         GNUNET_CONFIGURATION_get_value_number (c,
3303                                                "CORE",
3304                                                "ZZ_LIMIT", &tneigh)) ||
3305 #endif
3306        (GNUNET_OK !=
3307         GNUNET_CONFIGURATION_get_value_filename (c,
3308                                                  "GNUNETD",
3309                                                  "HOSTKEY", &keyfile)))
3310     {
3311       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3312                   _
3313                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3314       GNUNET_SCHEDULER_shutdown (s);
3315       return;
3316     }
3317   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3318   GNUNET_free (keyfile);
3319   if (my_private_key == NULL)
3320     {
3321       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3322                   _("Core service could not access hostkey.  Exiting.\n"));
3323       GNUNET_SCHEDULER_shutdown (s);
3324       return;
3325     }
3326   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3327   GNUNET_CRYPTO_hash (&my_public_key,
3328                       sizeof (my_public_key), &my_identity.hashPubKey);
3329   /* setup notification */
3330   server = serv;
3331   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3332   /* setup transport connection */
3333   transport = GNUNET_TRANSPORT_connect (sched,
3334                                         cfg,
3335                                         NULL,
3336                                         &handle_transport_receive,
3337                                         &handle_transport_notify_connect,
3338                                         &handle_transport_notify_disconnect);
3339   GNUNET_assert (NULL != transport);
3340   GNUNET_SCHEDULER_add_delayed (sched,
3341                                 GNUNET_TIME_UNIT_FOREVER_REL,
3342                                 &cleaning_task, NULL);
3343   /* process client requests */
3344   GNUNET_SERVER_add_handlers (server, handlers);
3345   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3346               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3347 }
3348
3349
3350
3351 /**
3352  * The main function for the transport service.
3353  *
3354  * @param argc number of arguments from the command line
3355  * @param argv command line arguments
3356  * @return 0 ok, 1 on error
3357  */
3358 int
3359 main (int argc, char *const *argv)
3360 {
3361   return (GNUNET_OK ==
3362           GNUNET_SERVICE_run (argc,
3363                               argv,
3364                               "core",
3365                               GNUNET_SERVICE_OPTION_NONE,
3366                               &run, NULL)) ? 0 : 1;
3367 }
3368
3369 /* end of gnunet-service-core.c */