updates to peerinfo to use new nc API
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - not all GNUNET_CORE_OPTION_SEND_* flags are fully supported yet
28  *   (i.e. no SEND_XXX_OUTBOUND).
29  * - 'REQUEST_DISCONNECT' is not implemented (transport API is lacking!)
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Non-NULL if we are currently looking up HELLOs for this peer.
369    * for this peer.
370    */
371   struct GNUNET_PEERINFO_IteratorContext *pitr;
372
373   /**
374    * SetKeyMessage to transmit, NULL if we are not currently trying
375    * to send one.
376    */
377   struct SetKeyMessage *skm;
378
379   /**
380    * Identity of the neighbour.
381    */
382   struct GNUNET_PeerIdentity peer;
383
384   /**
385    * Key we use to encrypt our messages for the other peer
386    * (initialized by us when we do the handshake).
387    */
388   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
389
390   /**
391    * Key we use to decrypt messages from the other peer
392    * (given to us by the other peer during the handshake).
393    */
394   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
395
396   /**
397    * ID of task used for re-trying plaintext scheduling.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
400
401   /**
402    * ID of task used for re-trying SET_KEY and PING message.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
405
406   /**
407    * ID of task used for updating bandwidth quota for this neighbour.
408    */
409   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
410
411   /**
412    * At what time did we generate our encryption key?
413    */
414   struct GNUNET_TIME_Absolute encrypt_key_created;
415
416   /**
417    * At what time did the other peer generate the decryption key?
418    */
419   struct GNUNET_TIME_Absolute decrypt_key_created;
420
421   /**
422    * At what time did we initially establish (as in, complete session
423    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
424    */
425   struct GNUNET_TIME_Absolute time_established;
426
427   /**
428    * At what time did we last receive an encrypted message from the
429    * other peer?  Should be zero if status != KEY_CONFIRMED.
430    */
431   struct GNUNET_TIME_Absolute last_activity;
432
433   /**
434    * Last latency observed from this peer.
435    */
436   struct GNUNET_TIME_Relative last_latency;
437
438   /**
439    * At what frequency are we currently re-trying SET_KEY messages?
440    */
441   struct GNUNET_TIME_Relative set_key_retry_frequency;
442
443   /**
444    * Time of our last update to the "available_send_window".
445    */
446   struct GNUNET_TIME_Absolute last_asw_update;
447
448   /**
449    * Time of our last update to the "available_recv_window".
450    */
451   struct GNUNET_TIME_Absolute last_arw_update;
452
453   /**
454    * Number of bytes that we are eligible to transmit to this
455    * peer at this point.  Incremented every minute by max_out_bpm,
456    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
457    * bandwidth-hogs are sampled at a frequency of about 78s!);
458    * may get negative if we have VERY high priority content.
459    */
460   long long available_send_window; 
461
462   /**
463    * How much downstream capacity of this peer has been reserved for
464    * our traffic?  (Our clients can request that a certain amount of
465    * bandwidth is available for replies to them; this value is used to
466    * make sure that this reserved amount of bandwidth is actually
467    * available).
468    */
469   long long available_recv_window; 
470
471   /**
472    * How valueable were the messages of this peer recently?
473    */
474   unsigned long long current_preference;
475
476   /**
477    * Bit map indicating which of the 32 sequence numbers before the last
478    * were received (good for accepting out-of-order packets and
479    * estimating reliability of the connection)
480    */
481   unsigned int last_packets_bitmap;
482
483   /**
484    * Number of messages in the message queue for this peer.
485    */
486   unsigned int message_queue_size;
487
488   /**
489    * last sequence number received on this connection (highest)
490    */
491   uint32_t last_sequence_number_received;
492
493   /**
494    * last sequence number transmitted
495    */
496   uint32_t last_sequence_number_sent;
497
498   /**
499    * Available bandwidth in for this peer (current target).
500    */
501   uint32_t bpm_in;
502
503   /**
504    * Available bandwidth out for this peer (current target).
505    */
506   uint32_t bpm_out;
507
508   /**
509    * Internal bandwidth limit set for this peer (initially
510    * typically set to "-1").  "bpm_out" is MAX of
511    * "bpm_out_internal_limit" and "bpm_out_external_limit".
512    */
513   uint32_t bpm_out_internal_limit;
514
515   /**
516    * External bandwidth limit set for this peer by the
517    * peer that we are communicating with.  "bpm_out" is MAX of
518    * "bpm_out_internal_limit" and "bpm_out_external_limit".
519    */
520   uint32_t bpm_out_external_limit;
521
522   /**
523    * What was our PING challenge number (for this peer)?
524    */
525   uint32_t ping_challenge;
526
527   /**
528    * What was the last distance to this peer as reported by the transports?
529    * (FIXME: actually set this!)
530    */
531   uint32_t last_distance;
532
533   /**
534    * What is our connection status?
535    */
536   enum PeerStateMachine status;
537
538 };
539
540
541 /**
542  * Events are messages for clients.  The struct
543  * itself is followed by the actual message.
544  */
545 struct Event
546 {
547   /**
548    * This is a linked list.
549    */
550   struct Event *next;
551
552   /**
553    * Size of the message.
554    */
555   size_t size;
556
557   /**
558    * Could this event be dropped if this queue
559    * is getting too large? (NOT YET USED!)
560    */
561   int can_drop;
562
563 };
564
565
566 /**
567  * Data structure for each client connected to the core service.
568  */
569 struct Client
570 {
571   /**
572    * Clients are kept in a linked list.
573    */
574   struct Client *next;
575
576   /**
577    * Handle for the client with the server API.
578    */
579   struct GNUNET_SERVER_Client *client_handle;
580
581   /**
582    * Linked list of messages we still need to deliver to
583    * this client.
584    */
585   struct Event *event_head;
586
587   /**
588    * Tail of the linked list of events.
589    */
590   struct Event *event_tail;
591
592   /**
593    * Current transmit handle, NULL if no transmission request
594    * is pending.
595    */
596   struct GNUNET_CONNECTION_TransmitHandle *th;
597
598   /**
599    * Array of the types of messages this peer cares
600    * about (with "tcnt" entries).  Allocated as part
601    * of this client struct, do not free!
602    */
603   uint16_t *types;
604
605   /**
606    * Options for messages this client cares about,
607    * see GNUNET_CORE_OPTION_ values.
608    */
609   uint32_t options;
610
611   /**
612    * Number of types of incoming messages this client
613    * specifically cares about.  Size of the "types" array.
614    */
615   unsigned int tcnt;
616
617 };
618
619
620 /**
621  * Our public key.
622  */
623 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
624
625 /**
626  * Our identity.
627  */
628 static struct GNUNET_PeerIdentity my_identity;
629
630 /**
631  * Our private key.
632  */
633 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
634
635 /**
636  * Our scheduler.
637  */
638 struct GNUNET_SCHEDULER_Handle *sched;
639
640 /**
641  * Our configuration.
642  */
643 const struct GNUNET_CONFIGURATION_Handle *cfg;
644
645 /**
646  * Our server.
647  */
648 static struct GNUNET_SERVER_Handle *server;
649
650 /**
651  * Transport service.
652  */
653 static struct GNUNET_TRANSPORT_Handle *transport;
654
655 /**
656  * Linked list of our clients.
657  */
658 static struct Client *clients;
659
660 /**
661  * We keep neighbours in a linked list (for now).
662  */
663 static struct Neighbour *neighbours;
664
665 /**
666  * Sum of all preferences among all neighbours.
667  */
668 static unsigned long long preference_sum;
669
670 /**
671  * Total number of neighbours we have.
672  */
673 static unsigned int neighbour_count;
674
675 /**
676  * How much inbound bandwidth are we supposed to be using?
677  */
678 static unsigned long long bandwidth_target_in;
679
680 /**
681  * How much outbound bandwidth are we supposed to be using?
682  */
683 static unsigned long long bandwidth_target_out;
684
685
686
687 /**
688  * A preference value for a neighbour was update.  Update
689  * the preference sum accordingly.
690  *
691  * @param inc how much was a preference value increased?
692  */
693 static void
694 update_preference_sum (unsigned long long inc)
695 {
696   struct Neighbour *n;
697   unsigned long long os;
698
699   os = preference_sum;
700   preference_sum += inc;
701   if (preference_sum >= os)
702     return; /* done! */
703   /* overflow! compensate by cutting all values in half! */
704   preference_sum = 0;
705   n = neighbours;
706   while (n != NULL)
707     {
708       n->current_preference /= 2;
709       preference_sum += n->current_preference;
710       n = n->next;
711     }    
712 }
713
714
715 /**
716  * Recalculate the number of bytes we expect to
717  * receive or transmit in a given window.
718  *
719  * @param force force an update now (even if not much time has passed)
720  * @param window pointer to the byte counter (updated)
721  * @param ts pointer to the timestamp (updated)
722  * @param bpm number of bytes per minute that should
723  *        be added to the window.
724  */
725 static void
726 update_window (int force,
727                long long *window,
728                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
729 {
730   struct GNUNET_TIME_Relative since;
731
732   since = GNUNET_TIME_absolute_get_duration (*ts);
733   if ( (force == GNUNET_NO) &&
734        (since.value < 60 * 1000) )
735     return;                     /* not even a minute has passed */
736   *ts = GNUNET_TIME_absolute_get ();
737   *window += (bpm * since.value) / 60 / 1000;
738   if (*window > MAX_WINDOW_TIME * bpm)
739     *window = MAX_WINDOW_TIME * bpm;
740 }
741
742
743 /**
744  * Find the entry for the given neighbour.
745  *
746  * @param peer identity of the neighbour
747  * @return NULL if we are not connected, otherwise the
748  *         neighbour's entry.
749  */
750 static struct Neighbour *
751 find_neighbour (const struct GNUNET_PeerIdentity *peer)
752 {
753   struct Neighbour *ret;
754
755   ret = neighbours;
756   while ((ret != NULL) &&
757          (0 != memcmp (&ret->peer,
758                        peer, sizeof (struct GNUNET_PeerIdentity))))
759     ret = ret->next;
760   return ret;
761 }
762
763
764 /**
765  * Find the entry for the given client.
766  *
767  * @param client handle for the client
768  * @return NULL if we are not connected, otherwise the
769  *         client's struct.
770  */
771 static struct Client *
772 find_client (const struct GNUNET_SERVER_Client *client)
773 {
774   struct Client *ret;
775
776   ret = clients;
777   while ((ret != NULL) && (client != ret->client_handle))
778     ret = ret->next;
779   return ret;
780 }
781
782
783 /**
784  * If necessary, initiate a request with the server to
785  * transmit messages from the queue of the given client.
786  * @param client who to transfer messages to
787  */
788 static void request_transmit (struct Client *client);
789
790
791 /**
792  * Client is ready to receive data, provide it.
793  *
794  * @param cls closure
795  * @param size number of bytes available in buf
796  * @param buf where the callee should write the message
797  * @return number of bytes written to buf
798  */
799 static size_t
800 do_client_transmit (void *cls, size_t size, void *buf)
801 {
802   struct Client *client = cls;
803   struct Event *e;
804   char *tgt;
805   size_t ret;
806
807   client->th = NULL;
808 #if DEBUG_CORE_CLIENT
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Client ready to receive %u bytes.\n", size);
811 #endif
812   if (buf == NULL)
813     {
814 #if DEBUG_CORE
815       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
816                   "Failed to transmit data to client (disconnect)?\n");
817 #endif
818       return 0;                 /* we'll surely get a disconnect soon... */
819     }
820   tgt = buf;
821   ret = 0;
822   while ((NULL != (e = client->event_head)) && (e->size <= size))
823     {
824       memcpy (&tgt[ret], &e[1], e->size);
825       size -= e->size;
826       ret += e->size;
827       client->event_head = e->next;
828       GNUNET_free (e);
829     }
830   GNUNET_assert (ret > 0);
831   if (client->event_head == NULL)
832     client->event_tail = NULL;
833 #if DEBUG_CORE_CLIENT
834   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
835               "Transmitting %u bytes to client\n", ret);
836 #endif
837   request_transmit (client);
838   return ret;
839 }
840
841
842 /**
843  * If necessary, initiate a request with the server to
844  * transmit messages from the queue of the given client.
845  * @param client who to transfer messages to
846  */
847 static void
848 request_transmit (struct Client *client)
849 {
850
851   if (NULL != client->th)
852     return;                     /* already pending */
853   if (NULL == client->event_head)
854     return;                     /* no more events pending */
855 #if DEBUG_CORE_CLIENT
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Asking server to transmit %u bytes to client\n",
858               client->event_head->size);
859 #endif
860   client->th
861     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
862                                            client->event_head->size,
863                                            GNUNET_TIME_UNIT_FOREVER_REL,
864                                            &do_client_transmit, client);
865 }
866
867
868 /**
869  * Send a message to one of our clients.
870  *
871  * @param client target for the message
872  * @param msg message to transmit
873  * @param can_drop could this message be dropped if the
874  *        client's queue is getting too large?
875  */
876 static void
877 send_to_client (struct Client *client,
878                 const struct GNUNET_MessageHeader *msg, int can_drop)
879 {
880   struct Event *e;
881   unsigned int queue_size;
882   uint16_t msize;
883
884 #if DEBUG_CORE_CLIENT
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "Preparing to send message of type %u to client.\n",
887               ntohs (msg->type));
888 #endif
889   queue_size = 0;
890   e = client->event_head;
891   while (e != NULL)
892     {
893       queue_size++;
894       e = e->next;
895     }
896   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
897        (can_drop == GNUNET_YES) )
898     {
899 #if DEBUG_CORE
900       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
901                   "Too many messages in queue for the client, dropping the new message.\n");
902 #endif
903       return;
904     }
905
906   msize = ntohs (msg->size);
907   e = GNUNET_malloc (sizeof (struct Event) + msize);
908   /* append */
909   if (client->event_tail != NULL)
910     client->event_tail->next = e;
911   else
912     client->event_head = e;
913   client->event_tail = e;
914   e->can_drop = can_drop;
915   e->size = msize;
916   memcpy (&e[1], msg, msize);
917   request_transmit (client);
918 }
919
920
921 /**
922  * Send a message to all of our current clients.
923  */
924 static void
925 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
926                      int can_drop,
927                      int options)
928 {
929   struct Client *c;
930
931   c = clients;
932   while (c != NULL)
933     {
934       if (0 != (c->options & options))
935         send_to_client (c, msg, can_drop);
936       c = c->next;
937     }
938 }
939
940
941 /**
942  * Handle CORE_INIT request.
943  */
944 static void
945 handle_client_init (void *cls,
946                     struct GNUNET_SERVER_Client *client,
947                     const struct GNUNET_MessageHeader *message)
948 {
949   const struct InitMessage *im;
950   struct InitReplyMessage irm;
951   struct Client *c;
952   uint16_t msize;
953   const uint16_t *types;
954   struct Neighbour *n;
955   struct ConnectNotifyMessage cnm;
956
957 #if DEBUG_CORE_CLIENT
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Client connecting to core service with `%s' message\n",
960               "INIT");
961 #endif
962   /* check that we don't have an entry already */
963   c = clients;
964   while (c != NULL)
965     {
966       if (client == c->client_handle)
967         {
968           GNUNET_break (0);
969           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
970           return;
971         }
972       c = c->next;
973     }
974   msize = ntohs (message->size);
975   if (msize < sizeof (struct InitMessage))
976     {
977       GNUNET_break (0);
978       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
979       return;
980     }
981   im = (const struct InitMessage *) message;
982   types = (const uint16_t *) &im[1];
983   msize -= sizeof (struct InitMessage);
984   c = GNUNET_malloc (sizeof (struct Client) + msize);
985   c->client_handle = client;
986   c->next = clients;
987   clients = c;
988   memcpy (&c[1], types, msize);
989   c->types = (uint16_t *) & c[1];
990   c->options = ntohl (im->options);
991   c->tcnt = msize / sizeof (uint16_t);
992   /* send init reply message */
993   irm.header.size = htons (sizeof (struct InitReplyMessage));
994   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
995   irm.reserved = htonl (0);
996   memcpy (&irm.publicKey,
997           &my_public_key,
998           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
999 #if DEBUG_CORE_CLIENT
1000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001               "Sending `%s' message to client.\n", "INIT_REPLY");
1002 #endif
1003   send_to_client (c, &irm.header, GNUNET_NO);
1004   /* notify new client about existing neighbours */
1005   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
1006   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1007   n = neighbours;
1008   while (n != NULL)
1009     {
1010 #if DEBUG_CORE_CLIENT
1011       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
1013 #endif
1014       cnm.distance = htonl (n->last_distance);
1015       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
1016       cnm.peer = n->peer;
1017       send_to_client (c, &cnm.header, GNUNET_NO);
1018       n = n->next;
1019     }
1020   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1021 }
1022
1023
1024 /**
1025  * A client disconnected, clean up.
1026  *
1027  * @param cls closure
1028  * @param client identification of the client
1029  */
1030 static void
1031 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1032 {
1033   struct Client *pos;
1034   struct Client *prev;
1035   struct Event *e;
1036
1037   if (client == NULL)
1038     return;
1039 #if DEBUG_CORE_CLIENT
1040   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041               "Client has disconnected from core service.\n");
1042 #endif
1043   prev = NULL;
1044   pos = clients;
1045   while (pos != NULL)
1046     {
1047       if (client == pos->client_handle)
1048         {
1049           if (prev == NULL)
1050             clients = pos->next;
1051           else
1052             prev->next = pos->next;
1053           if (pos->th != NULL)
1054             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1055           while (NULL != (e = pos->event_head))
1056             {
1057               pos->event_head = e->next;
1058               GNUNET_free (e);
1059             }
1060           GNUNET_free (pos);
1061           return;
1062         }
1063       prev = pos;
1064       pos = pos->next;
1065     }
1066   /* client never sent INIT */
1067 }
1068
1069
1070 /**
1071  * Handle REQUEST_INFO request.
1072  */
1073 static void
1074 handle_client_request_info (void *cls,
1075                                  struct GNUNET_SERVER_Client *client,
1076                                  const struct GNUNET_MessageHeader *message)
1077 {
1078   const struct RequestInfoMessage *rcm;
1079   struct Neighbour *n;
1080   struct ConfigurationInfoMessage cim;
1081   struct Client *c;
1082   int reserv;
1083   unsigned long long old_preference;
1084
1085 #if DEBUG_CORE_CLIENT
1086   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087               "Core service receives `%s' request.\n", "REQUEST_INFO");
1088 #endif
1089   rcm = (const struct RequestInfoMessage *) message;
1090   n = find_neighbour (&rcm->peer);
1091   memset (&cim, 0, sizeof (cim));
1092   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1093     {
1094       update_window (GNUNET_YES,
1095                      &n->available_send_window,
1096                      &n->last_asw_update,
1097                      n->bpm_out);
1098       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1099       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1100                                n->bpm_out_external_limit);
1101       reserv = ntohl (rcm->reserve_inbound);
1102       if (reserv < 0)
1103         {
1104           n->available_recv_window += reserv;
1105         }
1106       else if (reserv > 0)
1107         {
1108           update_window (GNUNET_NO,
1109                          &n->available_recv_window,
1110                          &n->last_arw_update, n->bpm_in);
1111           if (n->available_recv_window < reserv)
1112             reserv = n->available_recv_window;
1113           n->available_recv_window -= reserv;
1114         }
1115       old_preference = n->current_preference;
1116       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1117       if (old_preference > n->current_preference) 
1118         {
1119           /* overflow; cap at maximum value */
1120           n->current_preference = (unsigned long long) -1;
1121         }
1122       update_preference_sum (n->current_preference - old_preference);
1123       cim.reserved_amount = htonl (reserv);
1124       cim.bpm_in = htonl (n->bpm_in);
1125       cim.bpm_out = htonl (n->bpm_out);
1126       cim.preference = n->current_preference;
1127     }
1128   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1129   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1130   cim.peer = rcm->peer;
1131   c = find_client (client);
1132   if (c == NULL)
1133     {
1134       GNUNET_break (0);
1135       return;
1136     }
1137 #if DEBUG_CORE_CLIENT
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1140 #endif
1141   send_to_client (c, &cim.header, GNUNET_NO);
1142 }
1143
1144
1145 /**
1146  * Check if we have encrypted messages for the specified neighbour
1147  * pending, and if so, check with the transport about sending them
1148  * out.
1149  *
1150  * @param n neighbour to check.
1151  */
1152 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1153
1154
1155 /**
1156  * Function called when the transport service is ready to
1157  * receive an encrypted message for the respective peer
1158  *
1159  * @param cls neighbour to use message from
1160  * @param size number of bytes we can transmit
1161  * @param buf where to copy the message
1162  * @return number of bytes transmitted
1163  */
1164 static size_t
1165 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1166 {
1167   struct Neighbour *n = cls;
1168   struct MessageEntry *m;
1169   size_t ret;
1170   char *cbuf;
1171
1172   n->th = NULL;
1173   GNUNET_assert (NULL != (m = n->encrypted_head));
1174   n->encrypted_head = m->next;
1175   if (m->next == NULL)
1176     n->encrypted_tail = NULL;
1177   ret = 0;
1178   cbuf = buf;
1179   if (buf != NULL)
1180     {
1181       GNUNET_assert (size >= m->size);
1182       memcpy (cbuf, &m[1], m->size);
1183       ret = m->size;
1184       n->available_send_window -= m->size;
1185       process_encrypted_neighbour_queue (n);
1186 #if DEBUG_CORE
1187       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1189                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1190                   ret, GNUNET_i2s (&n->peer));
1191 #endif
1192     }
1193   else
1194     {
1195       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1196                   "Transmission for message of type %u and size %u failed\n",
1197                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1198                   m->size);
1199     }
1200   GNUNET_free (m);
1201   return ret;
1202 }
1203
1204
1205 /**
1206  * Check if we have plaintext messages for the specified neighbour
1207  * pending, and if so, consider batching and encrypting them (and
1208  * then trigger processing of the encrypted queue if needed).
1209  *
1210  * @param n neighbour to check.
1211  */
1212 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1213
1214
1215 /**
1216  * Check if we have encrypted messages for the specified neighbour
1217  * pending, and if so, check with the transport about sending them
1218  * out.
1219  *
1220  * @param n neighbour to check.
1221  */
1222 static void
1223 process_encrypted_neighbour_queue (struct Neighbour *n)
1224 {
1225   struct MessageEntry *m;
1226  
1227   if (n->th != NULL)
1228     return;  /* request already pending */
1229   if (n->encrypted_head == NULL)
1230     {
1231       /* encrypted queue empty, try plaintext instead */
1232       process_plaintext_neighbour_queue (n);
1233       return;
1234     }
1235 #if DEBUG_CORE
1236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1237               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1238               n->encrypted_head->size,
1239               GNUNET_i2s (&n->peer),
1240               GNUNET_TIME_absolute_get_remaining (n->
1241                                                   encrypted_head->deadline).
1242               value);
1243 #endif
1244   n->th =
1245     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1246                                             n->encrypted_head->size,
1247                                             n->encrypted_head->priority,
1248                                             GNUNET_TIME_absolute_get_remaining
1249                                             (n->encrypted_head->deadline),
1250                                             &notify_encrypted_transmit_ready,
1251                                             n);
1252   if (n->th == NULL)
1253     {
1254       /* message request too large (oops) */
1255       GNUNET_break (0);
1256       /* discard encrypted message */
1257       GNUNET_assert (NULL != (m = n->encrypted_head));
1258       n->encrypted_head = m->next;
1259       if (m->next == NULL)
1260         n->encrypted_tail = NULL;
1261       GNUNET_free (m);
1262       process_encrypted_neighbour_queue (n);
1263     }
1264 }
1265
1266
1267 /**
1268  * Decrypt size bytes from in and write the result to out.  Use the
1269  * key for inbound traffic of the given neighbour.  This function does
1270  * NOT do any integrity-checks on the result.
1271  *
1272  * @param n neighbour we are receiving from
1273  * @param iv initialization vector to use
1274  * @param in ciphertext
1275  * @param out plaintext
1276  * @param size size of in/out
1277  * @return GNUNET_OK on success
1278  */
1279 static int
1280 do_decrypt (struct Neighbour *n,
1281             const GNUNET_HashCode * iv,
1282             const void *in, void *out, size_t size)
1283 {
1284   if (size != (uint16_t) size)
1285     {
1286       GNUNET_break (0);
1287       return GNUNET_NO;
1288     }
1289   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1290       (n->status != PEER_STATE_KEY_CONFIRMED))
1291     {
1292       GNUNET_break_op (0);
1293       return GNUNET_SYSERR;
1294     }
1295   if (size !=
1296       GNUNET_CRYPTO_aes_decrypt (in,
1297                                  (uint16_t) size,
1298                                  &n->decrypt_key,
1299                                  (const struct
1300                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1301                                  out))
1302     {
1303       GNUNET_break (0);
1304       return GNUNET_SYSERR;
1305     }
1306 #if DEBUG_CORE
1307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308               "Decrypted %u bytes from `%4s' using key %u\n",
1309               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1310 #endif
1311   return GNUNET_OK;
1312 }
1313
1314
1315 /**
1316  * Encrypt size bytes from in and write the result to out.  Use the
1317  * key for outbound traffic of the given neighbour.
1318  *
1319  * @param n neighbour we are sending to
1320  * @param iv initialization vector to use
1321  * @param in ciphertext
1322  * @param out plaintext
1323  * @param size size of in/out
1324  * @return GNUNET_OK on success
1325  */
1326 static int
1327 do_encrypt (struct Neighbour *n,
1328             const GNUNET_HashCode * iv,
1329             const void *in, void *out, size_t size)
1330 {
1331   if (size != (uint16_t) size)
1332     {
1333       GNUNET_break (0);
1334       return GNUNET_NO;
1335     }
1336   GNUNET_assert (size ==
1337                  GNUNET_CRYPTO_aes_encrypt (in,
1338                                             (uint16_t) size,
1339                                             &n->encrypt_key,
1340                                             (const struct
1341                                              GNUNET_CRYPTO_AesInitializationVector
1342                                              *) iv, out));
1343 #if DEBUG_CORE
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345               "Encrypted %u bytes for `%4s' using key %u\n", size,
1346               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1347 #endif
1348   return GNUNET_OK;
1349 }
1350
1351
1352 /**
1353  * Select messages for transmission.  This heuristic uses a combination
1354  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1355  * and priority-based discard (in case no feasible schedule exist) and
1356  * speculative optimization (defer any kind of transmission until
1357  * we either create a batch of significant size, 25% of max, or until
1358  * we are close to a deadline).  Furthermore, when scheduling the
1359  * heuristic also packs as many messages into the batch as possible,
1360  * starting with those with the earliest deadline.  Yes, this is fun.
1361  *
1362  * @param n neighbour to select messages from
1363  * @param size number of bytes to select for transmission
1364  * @param retry_time set to the time when we should try again
1365  *        (only valid if this function returns zero)
1366  * @return number of bytes selected, or 0 if we decided to
1367  *         defer scheduling overall; in that case, retry_time is set.
1368  */
1369 static size_t
1370 select_messages (struct Neighbour *n,
1371                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1372 {
1373   struct MessageEntry *pos;
1374   struct MessageEntry *min;
1375   struct MessageEntry *last;
1376   unsigned int min_prio;
1377   struct GNUNET_TIME_Absolute t;
1378   struct GNUNET_TIME_Absolute now;
1379   uint64_t delta;
1380   uint64_t avail;
1381   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1382   size_t off;
1383   int discard_low_prio;
1384
1385   GNUNET_assert (NULL != n->messages);
1386   now = GNUNET_TIME_absolute_get ();
1387   /* last entry in linked list of messages processed */
1388   last = NULL;
1389   /* should we remove the entry with the lowest
1390      priority from consideration for scheduling at the
1391      end of the loop? */
1392   discard_low_prio = GNUNET_YES;
1393   while (GNUNET_YES == discard_low_prio)
1394     {
1395       min = NULL;
1396       min_prio = -1;
1397       discard_low_prio = GNUNET_NO;
1398       /* calculate number of bytes available for transmission at time "t" */
1399       update_window (GNUNET_NO,
1400                      &n->available_send_window,
1401                      &n->last_asw_update,
1402                      n->bpm_out);
1403       avail = n->available_send_window;
1404       t = n->last_asw_update;
1405       /* how many bytes have we (hypothetically) scheduled so far */
1406       off = 0;
1407       /* maximum time we can wait before transmitting anything
1408          and still make all of our deadlines */
1409       slack = -1;
1410
1411       pos = n->messages;
1412       /* note that we use "*2" here because we want to look
1413          a bit further into the future; much more makes no
1414          sense since new message might be scheduled in the
1415          meantime... */
1416       while ((pos != NULL) && (off < size * 2))
1417         {
1418           if (pos->do_transmit == GNUNET_YES)
1419             {
1420               /* already removed from consideration */
1421               pos = pos->next;
1422               continue;
1423             }
1424           if (discard_low_prio == GNUNET_NO)
1425             {
1426               delta = pos->deadline.value;
1427               if (delta < t.value)
1428                 delta = 0;
1429               else
1430                 delta = t.value - delta;
1431               avail += delta * n->bpm_out / 1000 / 60;
1432               if (avail < pos->size)
1433                 {
1434                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1435                 }
1436               else
1437                 {
1438                   avail -= pos->size;
1439                   /* update slack, considering both its absolute deadline
1440                      and relative deadlines caused by other messages
1441                      with their respective load */
1442                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1443                   if (pos->deadline.value < now.value)
1444                     slack = 0;
1445                   else
1446                     slack =
1447                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1448                 }
1449             }
1450           off += pos->size;
1451           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1452           if (pos->priority <= min_prio)
1453             {
1454               /* update min for discard */
1455               min_prio = pos->priority;
1456               min = pos;
1457             }
1458           pos = pos->next;
1459         }
1460       if (discard_low_prio)
1461         {
1462           GNUNET_assert (min != NULL);
1463           /* remove lowest-priority entry from consideration */
1464           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1465         }
1466       last = pos;
1467     }
1468   /* guard against sending "tiny" messages with large headers without
1469      urgent deadlines */
1470   if ( (slack > 1000) && (size > 4 * off) )
1471     {
1472       /* less than 25% of message would be filled with
1473          deadlines still being met if we delay by one
1474          second or more; so just wait for more data */
1475       retry_time->value = slack / 2;
1476       /* reset do_transmit values for next time */
1477       while (pos != last)
1478         {
1479           pos->do_transmit = GNUNET_NO;
1480           pos = pos->next;
1481         }
1482       return 0;
1483     }
1484   /* select marked messages (up to size) for transmission */
1485   off = 0;
1486   pos = n->messages;
1487   while (pos != last)
1488     {
1489       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1490         {
1491           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1492           off += pos->size;
1493           size -= pos->size;
1494         }
1495       else
1496         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1497       pos = pos->next;
1498     }
1499 #if DEBUG_CORE
1500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1502               off, GNUNET_i2s (&n->peer));
1503 #endif
1504   return off;
1505 }
1506
1507
1508 /**
1509  * Batch multiple messages into a larger buffer.
1510  *
1511  * @param n neighbour to take messages from
1512  * @param buf target buffer
1513  * @param size size of buf
1514  * @param deadline set to transmission deadline for the result
1515  * @param retry_time set to the time when we should try again
1516  *        (only valid if this function returns zero)
1517  * @param priority set to the priority of the batch
1518  * @return number of bytes written to buf (can be zero)
1519  */
1520 static size_t
1521 batch_message (struct Neighbour *n,
1522                char *buf,
1523                size_t size,
1524                struct GNUNET_TIME_Absolute *deadline,
1525                struct GNUNET_TIME_Relative *retry_time,
1526                unsigned int *priority)
1527 {
1528   struct MessageEntry *pos;
1529   struct MessageEntry *prev;
1530   struct MessageEntry *next;
1531   size_t ret;
1532
1533   ret = 0;
1534   *priority = 0;
1535   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1536   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1537   if (0 == select_messages (n, size, retry_time))
1538     {
1539       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1540                   "No messages selected, will try again in %llu ms\n",
1541                   retry_time->value);
1542       return 0;
1543     }
1544   pos = n->messages;
1545   prev = NULL;
1546   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1547     {
1548       next = pos->next;
1549       if (GNUNET_YES == pos->do_transmit)
1550         {
1551           GNUNET_assert (pos->size <= size);
1552           memcpy (&buf[ret], &pos[1], pos->size);
1553           ret += pos->size;
1554           size -= pos->size;
1555           *priority += pos->priority;
1556 #if DEBUG_CORE
1557           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558                       "Adding plaintext message with deadline %llu ms to batch\n",
1559                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1560 #endif
1561           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1562           GNUNET_free (pos);
1563           if (prev == NULL)
1564             n->messages = next;
1565           else
1566             prev->next = next;
1567         }
1568       else
1569         {
1570           prev = pos;
1571         }
1572       pos = next;
1573     }
1574 #if DEBUG_CORE
1575   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576               "Deadline for message batch is %llu ms\n",
1577               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1578 #endif
1579   return ret;
1580 }
1581
1582
1583 /**
1584  * Remove messages with deadlines that have long expired from
1585  * the queue.
1586  *
1587  * @param n neighbour to inspect
1588  */
1589 static void
1590 discard_expired_messages (struct Neighbour *n)
1591 {
1592   struct MessageEntry *prev;
1593   struct MessageEntry *next;
1594   struct MessageEntry *pos;
1595   struct GNUNET_TIME_Absolute now;
1596   struct GNUNET_TIME_Relative delta;
1597
1598   now = GNUNET_TIME_absolute_get ();
1599   prev = NULL;
1600   pos = n->messages;
1601   while (pos != NULL) 
1602     {
1603       next = pos->next;
1604       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1605       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1606         {
1607 #if DEBUG_CORE
1608           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1609                       "Message is %llu ms past due, discarding.\n",
1610                       delta.value);
1611 #endif
1612           if (prev == NULL)
1613             n->messages = next;
1614           else
1615             prev->next = next;
1616           GNUNET_free (pos);
1617         }
1618       else
1619         prev = pos;
1620       pos = next;
1621     }
1622 }
1623
1624
1625 /**
1626  * Signature of the main function of a task.
1627  *
1628  * @param cls closure
1629  * @param tc context information (why was this task triggered now)
1630  */
1631 static void
1632 retry_plaintext_processing (void *cls,
1633                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1634 {
1635   struct Neighbour *n = cls;
1636
1637   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1638   process_plaintext_neighbour_queue (n);
1639 }
1640
1641
1642 /**
1643  * Send our key (and encrypted PING) to the other peer.
1644  *
1645  * @param n the other peer
1646  */
1647 static void send_key (struct Neighbour *n);
1648
1649
1650 /**
1651  * Check if we have plaintext messages for the specified neighbour
1652  * pending, and if so, consider batching and encrypting them (and
1653  * then trigger processing of the encrypted queue if needed).
1654  *
1655  * @param n neighbour to check.
1656  */
1657 static void
1658 process_plaintext_neighbour_queue (struct Neighbour *n)
1659 {
1660   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1661   size_t used;
1662   size_t esize;
1663   struct EncryptedMessage *em;  /* encrypted message */
1664   struct EncryptedMessage *ph;  /* plaintext header */
1665   struct MessageEntry *me;
1666   unsigned int priority;
1667   struct GNUNET_TIME_Absolute deadline;
1668   struct GNUNET_TIME_Relative retry_time;
1669
1670   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1671     {
1672       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1673       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1674     }
1675   switch (n->status)
1676     {
1677     case PEER_STATE_DOWN:
1678       send_key (n);
1679 #if DEBUG_CORE
1680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1682                   GNUNET_i2s(&n->peer));
1683 #endif
1684       return;
1685     case PEER_STATE_KEY_SENT:
1686       GNUNET_assert (n->retry_set_key_task !=
1687                      GNUNET_SCHEDULER_NO_TASK);
1688 #if DEBUG_CORE
1689       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1691                   GNUNET_i2s(&n->peer));
1692 #endif
1693       return;
1694     case PEER_STATE_KEY_RECEIVED:
1695       GNUNET_assert (n->retry_set_key_task !=
1696                      GNUNET_SCHEDULER_NO_TASK);
1697 #if DEBUG_CORE
1698       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1699                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1700                   GNUNET_i2s(&n->peer));
1701 #endif
1702       return;
1703     case PEER_STATE_KEY_CONFIRMED:
1704       /* ready to continue */
1705       break;
1706     }
1707   discard_expired_messages (n);
1708   if (n->messages == NULL)
1709     {
1710 #if DEBUG_CORE
1711       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1712                   "Plaintext message queue for `%4s' is empty.\n",
1713                   GNUNET_i2s(&n->peer));
1714 #endif
1715       return;                   /* no pending messages */
1716     }
1717   if (n->encrypted_head != NULL)
1718     {
1719 #if DEBUG_CORE
1720       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1721                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1722                   GNUNET_i2s(&n->peer));
1723 #endif
1724       return;                   /* wait for messages already encrypted to be
1725                                    processed first! */
1726     }
1727   ph = (struct EncryptedMessage *) pbuf;
1728   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1729   priority = 0;
1730   used = sizeof (struct EncryptedMessage);
1731   used += batch_message (n,
1732                          &pbuf[used],
1733                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1734                          &deadline, &retry_time, &priority);
1735   if (used == sizeof (struct EncryptedMessage))
1736     {
1737 #if DEBUG_CORE
1738       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1740                   GNUNET_i2s(&n->peer));
1741 #endif
1742       /* no messages selected for sending, try again later... */
1743       n->retry_plaintext_task =
1744         GNUNET_SCHEDULER_add_delayed (sched,
1745                                       retry_time,
1746                                       &retry_plaintext_processing, n);
1747       return;
1748     }
1749   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1750   ph->inbound_bpm_limit = htonl (n->bpm_in);
1751   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1752
1753   /* setup encryption message header */
1754   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1755   me->deadline = deadline;
1756   me->priority = priority;
1757   me->size = used;
1758   em = (struct EncryptedMessage *) &me[1];
1759   em->header.size = htons (used);
1760   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1761   em->reserved = htonl (0);
1762   esize = used - ENCRYPTED_HEADER_SIZE;
1763   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1764   /* encrypt */
1765 #if DEBUG_CORE
1766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1767               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1768               esize,
1769               GNUNET_i2s(&n->peer),
1770               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1771 #endif
1772   GNUNET_assert (GNUNET_OK ==
1773                  do_encrypt (n,
1774                              &em->plaintext_hash,
1775                              &ph->sequence_number,
1776                              &em->sequence_number, esize));
1777   /* append to transmission list */
1778   if (n->encrypted_tail == NULL)
1779     n->encrypted_head = me;
1780   else
1781     n->encrypted_tail->next = me;
1782   n->encrypted_tail = me;
1783   process_encrypted_neighbour_queue (n);
1784 }
1785
1786
1787 /**
1788  * Handle CORE_SEND request.
1789  *
1790  * @param cls unused
1791  * @param client the client issuing the request
1792  * @param message the "struct SendMessage"
1793  */
1794 static void
1795 handle_client_send (void *cls,
1796                     struct GNUNET_SERVER_Client *client,
1797                     const struct GNUNET_MessageHeader *message);
1798
1799
1800 /**
1801  * Function called to notify us that we either succeeded
1802  * or failed to connect (at the transport level) to another
1803  * peer.  We should either free the message we were asked
1804  * to transmit or re-try adding it to the queue.
1805  *
1806  * @param cls closure
1807  * @param size number of bytes available in buf
1808  * @param buf where the callee should write the message
1809  * @return number of bytes written to buf
1810  */
1811 static size_t
1812 send_connect_continuation (void *cls, size_t size, void *buf)
1813 {
1814   struct SendMessage *sm = cls;
1815
1816   if (buf == NULL)
1817     {
1818 #if DEBUG_CORE
1819       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1820                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1821                   GNUNET_i2s (&sm->peer));
1822 #endif
1823       GNUNET_free (sm);
1824       return 0;
1825     }
1826 #if DEBUG_CORE
1827   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1828               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1829               GNUNET_i2s (&sm->peer));
1830 #endif
1831   handle_client_send (NULL, NULL, &sm->header);
1832   GNUNET_free (sm);
1833   return 0;
1834 }
1835
1836
1837 /**
1838  * Handle CORE_SEND request.
1839  *
1840  * @param cls unused
1841  * @param client the client issuing the request
1842  * @param message the "struct SendMessage"
1843  */
1844 static void
1845 handle_client_send (void *cls,
1846                     struct GNUNET_SERVER_Client *client,
1847                     const struct GNUNET_MessageHeader *message)
1848 {
1849   const struct SendMessage *sm;
1850   struct SendMessage *smc;
1851   const struct GNUNET_MessageHeader *mh;
1852   struct Neighbour *n;
1853   struct MessageEntry *prev;
1854   struct MessageEntry *pos;
1855   struct MessageEntry *e; 
1856   struct MessageEntry *min_prio_entry;
1857   struct MessageEntry *min_prio_prev;
1858   unsigned int min_prio;
1859   unsigned int queue_size;
1860   uint16_t msize;
1861
1862   msize = ntohs (message->size);
1863   if (msize <
1864       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1865     {
1866       GNUNET_break (0);
1867       if (client != NULL)
1868         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1869       return;
1870     }
1871   sm = (const struct SendMessage *) message;
1872   msize -= sizeof (struct SendMessage);
1873   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1874   if (msize != ntohs (mh->size))
1875     {
1876       GNUNET_break (0);
1877       if (client != NULL)
1878         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1879       return;
1880     }
1881   n = find_neighbour (&sm->peer);
1882   if (n == NULL)
1883     {
1884 #if DEBUG_CORE
1885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1886                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1887                   "SEND",
1888                   GNUNET_i2s (&sm->peer),
1889                   GNUNET_TIME_absolute_get_remaining
1890                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1891 #endif
1892       msize += sizeof (struct SendMessage);
1893       /* ask transport to connect to the peer */
1894       smc = GNUNET_malloc (msize);
1895       memcpy (smc, sm, msize);
1896       if (NULL ==
1897           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1898                                                   &sm->peer,
1899                                                   0, 0,
1900                                                   GNUNET_TIME_absolute_get_remaining
1901                                                   (GNUNET_TIME_absolute_ntoh
1902                                                    (sm->deadline)),
1903                                                   &send_connect_continuation,
1904                                                   smc))
1905         {
1906           /* transport has already a request pending for this peer! */
1907 #if DEBUG_CORE
1908           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1909                       "Dropped second message destined for `%4s' since connection is still down.\n",
1910                       GNUNET_i2s(&sm->peer));
1911 #endif
1912           GNUNET_free (smc);
1913         }
1914       if (client != NULL)
1915         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1916       return;
1917     }
1918 #if DEBUG_CORE
1919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1920               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1921               "SEND",
1922               msize, 
1923               GNUNET_i2s (&sm->peer));
1924 #endif
1925   /* bound queue size */
1926   discard_expired_messages (n);
1927   min_prio = (unsigned int) -1;
1928   min_prio_entry = NULL;
1929   min_prio_prev = NULL;
1930   queue_size = 0;
1931   prev = NULL;
1932   pos = n->messages;
1933   while (pos != NULL) 
1934     {
1935       if (pos->priority < min_prio)
1936         {
1937           min_prio_entry = pos;
1938           min_prio_prev = prev;
1939           min_prio = pos->priority;
1940         }
1941       queue_size++;
1942       prev = pos;
1943       pos = pos->next;
1944     }
1945   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1946     {
1947       /* queue full */
1948       if (ntohl(sm->priority) <= min_prio)
1949         {
1950           /* discard new entry */
1951 #if DEBUG_CORE
1952           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1953                       "Queue full, discarding new request\n");
1954 #endif
1955           if (client != NULL)
1956             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1957           return;
1958         }
1959       /* discard "min_prio_entry" */
1960 #if DEBUG_CORE
1961       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1962                   "Queue full, discarding existing older request\n");
1963 #endif
1964       if (min_prio_prev == NULL)
1965         n->messages = min_prio_entry->next;
1966       else
1967         min_prio_prev->next = min_prio_entry->next;      
1968       GNUNET_free (min_prio_entry);     
1969     }
1970
1971 #if DEBUG_CORE
1972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973               "Adding transmission request for `%4s' to queue\n",
1974               GNUNET_i2s (&sm->peer));
1975 #endif  
1976   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1977   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1978   e->priority = ntohl (sm->priority);
1979   e->size = msize;
1980   memcpy (&e[1], mh, msize);
1981
1982   /* insert, keep list sorted by deadline */
1983   prev = NULL;
1984   pos = n->messages;
1985   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1986     {
1987       prev = pos;
1988       pos = pos->next;
1989     }
1990   if (prev == NULL)
1991     n->messages = e;
1992   else
1993     prev->next = e;
1994   e->next = pos;
1995
1996   /* consider scheduling now */
1997   process_plaintext_neighbour_queue (n);
1998   if (client != NULL)
1999     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2000 }
2001
2002
2003 /**
2004  * Handle CORE_REQUEST_CONNECT request.
2005  *
2006  * @param cls unused
2007  * @param client the client issuing the request
2008  * @param message the "struct ConnectMessage"
2009  */
2010 static void
2011 handle_client_request_connect (void *cls,
2012                                struct GNUNET_SERVER_Client *client,
2013                                const struct GNUNET_MessageHeader *message)
2014 {
2015   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2016   struct Neighbour *n;
2017
2018   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2019   n = find_neighbour (&cm->peer);
2020   if (n != NULL)
2021     return; /* already connected, or at least trying */
2022 #if DEBUG_CORE
2023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024               "Core received `%s' request for `%4s', will try to establish connection\n",
2025               "REQUEST_CONNECT",
2026               GNUNET_i2s (&cm->peer));
2027 #endif
2028   /* ask transport to connect to the peer */
2029   /* FIXME: timeout zero OK? need for cancellation? */
2030   GNUNET_TRANSPORT_notify_transmit_ready (transport,
2031                                           &cm->peer,
2032                                           0, 0,
2033                                           GNUNET_TIME_UNIT_ZERO,
2034                                           NULL,
2035                                           NULL);
2036 }
2037
2038
2039 /**
2040  * Handle CORE_REQUEST_DISCONNECT request.
2041  *
2042  * @param cls unused
2043  * @param client the client issuing the request
2044  * @param message the "struct ConnectMessage"
2045  */
2046 static void
2047 handle_client_request_disconnect (void *cls,
2048                                   struct GNUNET_SERVER_Client *client,
2049                                   const struct GNUNET_MessageHeader *message)
2050 {
2051   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2052   struct Neighbour *n;
2053
2054   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2055   n = find_neighbour (&cm->peer);
2056   if (n == NULL)
2057     return; /* done */
2058   /* FIXME: implement disconnect! */
2059 }
2060
2061
2062
2063 /**
2064  * List of handlers for the messages understood by this
2065  * service.
2066  */
2067 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2068   {&handle_client_init, NULL,
2069    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2070   {&handle_client_request_info, NULL,
2071    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2072    sizeof (struct RequestInfoMessage)},
2073   {&handle_client_send, NULL,
2074    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2075   {&handle_client_request_connect, NULL,
2076    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2077    sizeof (struct ConnectMessage)},
2078   {&handle_client_request_disconnect, NULL,
2079    GNUNET_MESSAGE_TYPE_CORE_REQUEST_DISCONNECT,
2080    sizeof (struct ConnectMessage)},
2081   {NULL, NULL, 0, 0}
2082 };
2083
2084
2085 /**
2086  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2087  * the neighbour's struct and retry send_key.  Or, if we did not get a
2088  * HELLO, just do nothing.
2089  *
2090  * @param cls NULL
2091  * @param peer the peer for which this is the HELLO
2092  * @param hello HELLO message of that peer
2093  * @param trust amount of trust we currently have in that peer
2094  */
2095 static void
2096 process_hello_retry_send_key (void *cls,
2097                               const struct GNUNET_PeerIdentity *peer,
2098                               const struct GNUNET_HELLO_Message *hello,
2099                               uint32_t trust)
2100 {
2101   struct Neighbour *n = cls;
2102
2103   if (peer == NULL)
2104     {
2105       n->pitr = NULL;
2106       return;
2107     }
2108   if (n->public_key != NULL)
2109     return;
2110 #if DEBUG_CORE
2111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112               "Received new `%s' message for `%4s', initiating key exchange.\n",
2113               "HELLO",
2114               GNUNET_i2s (peer));
2115 #endif
2116   n->public_key =
2117     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2118   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2119     {
2120       GNUNET_free (n->public_key);
2121       n->public_key = NULL;
2122       return;
2123     }
2124   send_key (n);
2125 }
2126
2127
2128 /**
2129  * Task that will retry "send_key" if our previous attempt failed
2130  * to yield a PONG.
2131  */
2132 static void
2133 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2134 {
2135   struct Neighbour *n = cls;
2136
2137   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2138   n->set_key_retry_frequency =
2139     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2140   send_key (n);
2141 }
2142
2143
2144 /**
2145  * Send our key (and encrypted PING) to the other peer.
2146  *
2147  * @param n the other peer
2148  */
2149 static void
2150 send_key (struct Neighbour *n)
2151 {
2152   struct SetKeyMessage *sm;
2153   struct MessageEntry *me;
2154   struct PingMessage pp;
2155   struct PingMessage *pm;
2156
2157   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2158        (n->pitr != NULL) )
2159     return; /* already in progress */
2160 #if DEBUG_CORE
2161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2162               "Asked to perform key exchange with `%4s'.\n",
2163               GNUNET_i2s (&n->peer));
2164 #endif
2165   if (n->public_key == NULL)
2166     {
2167       /* lookup n's public key, then try again */
2168 #if DEBUG_CORE
2169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170                   "Lacking public key for `%4s', trying to obtain one.\n",
2171                   GNUNET_i2s (&n->peer));
2172 #endif
2173       GNUNET_assert (n->pitr == NULL);
2174       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2175                                          sched,
2176                                          &n->peer,
2177                                          0,
2178                                          GNUNET_TIME_UNIT_MINUTES,
2179                                          &process_hello_retry_send_key, n);
2180       return;
2181     }
2182   /* first, set key message */
2183   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2184                       sizeof (struct SetKeyMessage));
2185   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2186   me->priority = SET_KEY_PRIORITY;
2187   me->size = sizeof (struct SetKeyMessage);
2188   if (n->encrypted_head == NULL)
2189     n->encrypted_head = me;
2190   else
2191     n->encrypted_tail->next = me;
2192   n->encrypted_tail = me;
2193   sm = (struct SetKeyMessage *) &me[1];
2194   sm->header.size = htons (sizeof (struct SetKeyMessage));
2195   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2196   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2197                                         PEER_STATE_KEY_SENT : n->status));
2198   sm->purpose.size =
2199     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2200            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2201            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2202            sizeof (struct GNUNET_PeerIdentity));
2203   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2204   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2205   sm->target = n->peer;
2206   GNUNET_assert (GNUNET_OK ==
2207                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2208                                             sizeof (struct
2209                                                     GNUNET_CRYPTO_AesSessionKey),
2210                                             n->public_key,
2211                                             &sm->encrypted_key));
2212   GNUNET_assert (GNUNET_OK ==
2213                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2214                                          &sm->signature));
2215
2216   /* second, encrypted PING message */
2217   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2218                       sizeof (struct PingMessage));
2219   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2220   me->priority = PING_PRIORITY;
2221   me->size = sizeof (struct PingMessage);
2222   n->encrypted_tail->next = me;
2223   n->encrypted_tail = me;
2224   pm = (struct PingMessage *) &me[1];
2225   pm->header.size = htons (sizeof (struct PingMessage));
2226   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2227   pp.challenge = htonl (n->ping_challenge);
2228   pp.target = n->peer;
2229 #if DEBUG_CORE
2230   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2231               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2232               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2234               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2235               "PING",
2236               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2237 #endif
2238   do_encrypt (n,
2239               &n->peer.hashPubKey,
2240               &pp.challenge,
2241               &pm->challenge,
2242               sizeof (struct PingMessage) -
2243               sizeof (struct GNUNET_MessageHeader));
2244   /* update status */
2245   switch (n->status)
2246     {
2247     case PEER_STATE_DOWN:
2248       n->status = PEER_STATE_KEY_SENT;
2249       break;
2250     case PEER_STATE_KEY_SENT:
2251       break;
2252     case PEER_STATE_KEY_RECEIVED:
2253       break;
2254     case PEER_STATE_KEY_CONFIRMED:
2255       break;
2256     default:
2257       GNUNET_break (0);
2258       break;
2259     }
2260 #if DEBUG_CORE
2261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2262               "Have %llu ms left for `%s' transmission.\n",
2263               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2264               "SET_KEY");
2265 #endif
2266   /* trigger queue processing */
2267   process_encrypted_neighbour_queue (n);
2268   if (n->status != PEER_STATE_KEY_CONFIRMED)
2269     {
2270       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task);
2271       n->retry_set_key_task
2272         = GNUNET_SCHEDULER_add_delayed (sched,
2273                                         n->set_key_retry_frequency,
2274                                         &set_key_retry_task, n);
2275     }
2276 }
2277
2278
2279 /**
2280  * We received a SET_KEY message.  Validate and update
2281  * our key material and status.
2282  *
2283  * @param n the neighbour from which we received message m
2284  * @param m the set key message we received
2285  */
2286 static void
2287 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2288
2289
2290 /**
2291  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2292  * the neighbour's struct and retry handling the set_key message.  Or,
2293  * if we did not get a HELLO, just free the set key message.
2294  *
2295  * @param cls pointer to the set key message
2296  * @param peer the peer for which this is the HELLO
2297  * @param hello HELLO message of that peer
2298  * @param trust amount of trust we currently have in that peer
2299  */
2300 static void
2301 process_hello_retry_handle_set_key (void *cls,
2302                                     const struct GNUNET_PeerIdentity *peer,
2303                                     const struct GNUNET_HELLO_Message *hello,
2304                                     uint32_t trust)
2305 {
2306   struct Neighbour *n = cls;
2307   struct SetKeyMessage *sm = n->skm;
2308
2309   if (peer == NULL)
2310     {
2311       GNUNET_free (sm);
2312       n->skm = NULL;
2313       n->pitr = NULL;
2314       return;
2315     }
2316   if (n->public_key != NULL)
2317     return;                     /* multiple HELLOs match!? */
2318   n->public_key =
2319     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2320   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2321     {
2322       GNUNET_break_op (0);
2323       GNUNET_free (n->public_key);
2324       n->public_key = NULL;
2325       return;
2326     }
2327 #if DEBUG_CORE
2328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2329               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2330               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2331 #endif
2332   handle_set_key (n, sm);
2333 }
2334
2335
2336 /**
2337  * We received a PING message.  Validate and transmit
2338  * PONG.
2339  *
2340  * @param n sender of the PING
2341  * @param m the encrypted PING message itself
2342  */
2343 static void
2344 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2345 {
2346   struct PingMessage t;
2347   struct PingMessage *tp;
2348   struct MessageEntry *me;
2349
2350 #if DEBUG_CORE
2351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352               "Core service receives `%s' request from `%4s'.\n",
2353               "PING", GNUNET_i2s (&n->peer));
2354 #endif
2355   if (GNUNET_OK !=
2356       do_decrypt (n,
2357                   &my_identity.hashPubKey,
2358                   &m->challenge,
2359                   &t.challenge,
2360                   sizeof (struct PingMessage) -
2361                   sizeof (struct GNUNET_MessageHeader)))
2362     return;
2363 #if DEBUG_CORE
2364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2365               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2366               "PING",
2367               GNUNET_i2s (&t.target),
2368               ntohl (t.challenge), n->decrypt_key.crc32);
2369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2370               "Target of `%s' request is `%4s'.\n",
2371               "PING", GNUNET_i2s (&t.target));
2372 #endif
2373   if (0 != memcmp (&t.target,
2374                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2375     {
2376       GNUNET_break_op (0);
2377       return;
2378     }
2379   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2380                       sizeof (struct PingMessage));
2381   if (n->encrypted_tail != NULL)
2382     n->encrypted_tail->next = me;
2383   else
2384     {
2385       n->encrypted_tail = me;
2386       n->encrypted_head = me;
2387     }
2388   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2389   me->priority = PONG_PRIORITY;
2390   me->size = sizeof (struct PingMessage);
2391   tp = (struct PingMessage *) &me[1];
2392   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2393   tp->header.size = htons (sizeof (struct PingMessage));
2394   do_encrypt (n,
2395               &my_identity.hashPubKey,
2396               &t.challenge,
2397               &tp->challenge,
2398               sizeof (struct PingMessage) -
2399               sizeof (struct GNUNET_MessageHeader));
2400 #if DEBUG_CORE
2401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2403               ntohl (t.challenge), n->encrypt_key.crc32);
2404 #endif
2405   /* trigger queue processing */
2406   process_encrypted_neighbour_queue (n);
2407 }
2408
2409
2410 /**
2411  * We received a SET_KEY message.  Validate and update
2412  * our key material and status.
2413  *
2414  * @param n the neighbour from which we received message m
2415  * @param m the set key message we received
2416  */
2417 static void
2418 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2419 {
2420   struct SetKeyMessage *m_cpy;
2421   struct GNUNET_TIME_Absolute t;
2422   struct GNUNET_CRYPTO_AesSessionKey k;
2423   struct PingMessage *ping;
2424   enum PeerStateMachine sender_status;
2425
2426 #if DEBUG_CORE
2427   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2428               "Core service receives `%s' request from `%4s'.\n",
2429               "SET_KEY", GNUNET_i2s (&n->peer));
2430 #endif
2431   if (n->public_key == NULL)
2432     {
2433 #if DEBUG_CORE
2434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2435                   "Lacking public key for peer, trying to obtain one.\n");
2436 #endif
2437       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2438       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2439       /* lookup n's public key, then try again */
2440       GNUNET_assert (n->pitr == NULL);
2441       GNUNET_assert (n->skm == NULL);
2442       n->skm = m_cpy;
2443       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2444                                          sched,
2445                                          &n->peer,
2446                                          0,
2447                                          GNUNET_TIME_UNIT_MINUTES,
2448                                          &process_hello_retry_handle_set_key, n);
2449       return;
2450     }
2451   if (0 != memcmp (&m->target,
2452                    &my_identity,
2453                    sizeof (struct GNUNET_PeerIdentity)))
2454     {
2455       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2456                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2457       return;
2458     }
2459   if ((ntohl (m->purpose.size) !=
2460        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2461        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2462        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2463        sizeof (struct GNUNET_PeerIdentity)) ||
2464       (GNUNET_OK !=
2465        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2466                                  &m->purpose, &m->signature, n->public_key)))
2467     {
2468       /* invalid signature */
2469       GNUNET_break_op (0);
2470       return;
2471     }
2472   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2473   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2474        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2475       (t.value < n->decrypt_key_created.value))
2476     {
2477       /* this could rarely happen due to massive re-ordering of
2478          messages on the network level, but is most likely either
2479          a bug or some adversary messing with us.  Report. */
2480       GNUNET_break_op (0);
2481       return;
2482     }
2483 #if DEBUG_CORE
2484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2485 #endif  
2486   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2487                                   &m->encrypted_key,
2488                                   &k,
2489                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2490        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2491       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2492     {
2493       /* failed to decrypt !? */
2494       GNUNET_break_op (0);
2495       return;
2496     }
2497
2498   n->decrypt_key = k;
2499   if (n->decrypt_key_created.value != t.value)
2500     {
2501       /* fresh key, reset sequence numbers */
2502       n->last_sequence_number_received = 0;
2503       n->last_packets_bitmap = 0;
2504       n->decrypt_key_created = t;
2505     }
2506   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2507   switch (n->status)
2508     {
2509     case PEER_STATE_DOWN:
2510       n->status = PEER_STATE_KEY_RECEIVED;
2511 #if DEBUG_CORE
2512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2513                   "Responding to `%s' with my own key.\n", "SET_KEY");
2514 #endif
2515       send_key (n);
2516       break;
2517     case PEER_STATE_KEY_SENT:
2518     case PEER_STATE_KEY_RECEIVED:
2519       n->status = PEER_STATE_KEY_RECEIVED;
2520       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2521           (sender_status != PEER_STATE_KEY_CONFIRMED))
2522         {
2523 #if DEBUG_CORE
2524           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525                       "Responding to `%s' with my own key (other peer has status %u).\n",
2526                       "SET_KEY", sender_status);
2527 #endif
2528           send_key (n);
2529         }
2530       break;
2531     case PEER_STATE_KEY_CONFIRMED:
2532       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2533           (sender_status != PEER_STATE_KEY_CONFIRMED))
2534         {         
2535 #if DEBUG_CORE
2536           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2537                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2538                       "SET_KEY", sender_status);
2539 #endif
2540           send_key (n);
2541         }
2542       break;
2543     default:
2544       GNUNET_break (0);
2545       break;
2546     }
2547   if (n->pending_ping != NULL)
2548     {
2549       ping = n->pending_ping;
2550       n->pending_ping = NULL;
2551       handle_ping (n, ping);
2552       GNUNET_free (ping);
2553     }
2554 }
2555
2556
2557 /**
2558  * We received a PONG message.  Validate and update our status.
2559  *
2560  * @param n sender of the PONG
2561  * @param m the encrypted PONG message itself
2562  */
2563 static void
2564 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2565 {
2566   struct PingMessage t;
2567   struct ConnectNotifyMessage cnm;
2568
2569 #if DEBUG_CORE
2570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2571               "Core service receives `%s' request from `%4s'.\n",
2572               "PONG", GNUNET_i2s (&n->peer));
2573 #endif
2574   if (GNUNET_OK !=
2575       do_decrypt (n,
2576                   &n->peer.hashPubKey,
2577                   &m->challenge,
2578                   &t.challenge,
2579                   sizeof (struct PingMessage) -
2580                   sizeof (struct GNUNET_MessageHeader)))
2581     return;
2582 #if DEBUG_CORE
2583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2584               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2585               "PONG",
2586               GNUNET_i2s (&t.target),
2587               ntohl (t.challenge), n->decrypt_key.crc32);
2588 #endif
2589   if ((0 != memcmp (&t.target,
2590                     &n->peer,
2591                     sizeof (struct GNUNET_PeerIdentity))) ||
2592       (n->ping_challenge != ntohl (t.challenge)))
2593     {
2594       /* PONG malformed */
2595 #if DEBUG_CORE
2596       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2597                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2598                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2599       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2600                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2601                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2602 #endif
2603       GNUNET_break_op (0);
2604       return;
2605     }
2606   switch (n->status)
2607     {
2608     case PEER_STATE_DOWN:
2609       GNUNET_break (0);         /* should be impossible */
2610       return;
2611     case PEER_STATE_KEY_SENT:
2612       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2613       return;
2614     case PEER_STATE_KEY_RECEIVED:
2615       n->status = PEER_STATE_KEY_CONFIRMED;
2616       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2617         {
2618           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2619           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2620         }      
2621       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2622       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2623       cnm.distance = htonl (n->last_distance);
2624       cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
2625       cnm.peer = n->peer;
2626       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2627       process_encrypted_neighbour_queue (n);
2628       break;
2629     case PEER_STATE_KEY_CONFIRMED:
2630       /* duplicate PONG? */
2631       break;
2632     default:
2633       GNUNET_break (0);
2634       break;
2635     }
2636 }
2637
2638
2639 /**
2640  * Send a P2P message to a client.
2641  *
2642  * @param sender who sent us the message?
2643  * @param client who should we give the message to?
2644  * @param m contains the message to transmit
2645  * @param msize number of bytes in buf to transmit
2646  */
2647 static void
2648 send_p2p_message_to_client (struct Neighbour *sender,
2649                             struct Client *client,
2650                             const void *m, size_t msize)
2651 {
2652   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2653   struct NotifyTrafficMessage *ntm;
2654
2655 #if DEBUG_CORE
2656   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2657               "Core service passes message from `%4s' of type %u to client.\n",
2658               GNUNET_i2s(&sender->peer),
2659               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2660 #endif
2661   ntm = (struct NotifyTrafficMessage *) buf;
2662   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2663   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2664   ntm->distance = htonl (sender->last_distance);
2665   ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
2666   ntm->peer = sender->peer;
2667   memcpy (&ntm[1], m, msize);
2668   send_to_client (client, &ntm->header, GNUNET_YES);
2669 }
2670
2671
2672 /**
2673  * Deliver P2P message to interested clients.
2674  *
2675  * @param sender who sent us the message?
2676  * @param m the message
2677  * @param msize size of the message (including header)
2678  */
2679 static void
2680 deliver_message (struct Neighbour *sender,
2681                  const struct GNUNET_MessageHeader *m, size_t msize)
2682 {
2683   struct Client *cpos;
2684   uint16_t type;
2685   unsigned int tpos;
2686   int deliver_full;
2687
2688   type = ntohs (m->type);
2689   cpos = clients;
2690   while (cpos != NULL)
2691     {
2692       deliver_full = GNUNET_NO;
2693       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2694         deliver_full = GNUNET_YES;
2695       else
2696         {
2697           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2698             {
2699               if (type != cpos->types[tpos])
2700                 continue;
2701               deliver_full = GNUNET_YES;
2702               break;
2703             }
2704         }
2705       if (GNUNET_YES == deliver_full)
2706         send_p2p_message_to_client (sender, cpos, m, msize);
2707       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2708         send_p2p_message_to_client (sender, cpos, m,
2709                                     sizeof (struct GNUNET_MessageHeader));
2710       cpos = cpos->next;
2711     }
2712 }
2713
2714
2715 /**
2716  * Align P2P message and then deliver to interested clients.
2717  *
2718  * @param sender who sent us the message?
2719  * @param buffer unaligned (!) buffer containing message
2720  * @param msize size of the message (including header)
2721  */
2722 static void
2723 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2724 {
2725   char abuf[msize];
2726
2727   /* TODO: call to statistics? */
2728   memcpy (abuf, buffer, msize);
2729   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2730 }
2731
2732
2733 /**
2734  * Deliver P2P messages to interested clients.
2735  *
2736  * @param sender who sent us the message?
2737  * @param buffer buffer containing messages, can be modified
2738  * @param buffer_size size of the buffer (overall)
2739  * @param offset offset where messages in the buffer start
2740  */
2741 static void
2742 deliver_messages (struct Neighbour *sender,
2743                   const char *buffer, size_t buffer_size, size_t offset)
2744 {
2745   struct GNUNET_MessageHeader *mhp;
2746   struct GNUNET_MessageHeader mh;
2747   uint16_t msize;
2748   int need_align;
2749
2750   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2751     {
2752       if (0 != offset % sizeof (uint16_t))
2753         {
2754           /* outch, need to copy to access header */
2755           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2756           mhp = &mh;
2757         }
2758       else
2759         {
2760           /* can access header directly */
2761           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2762         }
2763       msize = ntohs (mhp->size);
2764       if (msize + offset > buffer_size)
2765         {
2766           /* malformed message, header says it is larger than what
2767              would fit into the overall buffer */
2768           GNUNET_break_op (0);
2769           break;
2770         }
2771 #if HAVE_UNALIGNED_64_ACCESS
2772       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2773 #else
2774       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2775 #endif
2776       if (GNUNET_YES == need_align)
2777         align_and_deliver (sender, &buffer[offset], msize);
2778       else
2779         deliver_message (sender,
2780                          (const struct GNUNET_MessageHeader *)
2781                          &buffer[offset], msize);
2782       offset += msize;
2783     }
2784 }
2785
2786
2787 /**
2788  * We received an encrypted message.  Decrypt, validate and
2789  * pass on to the appropriate clients.
2790  */
2791 static void
2792 handle_encrypted_message (struct Neighbour *n,
2793                           const struct EncryptedMessage *m)
2794 {
2795   size_t size = ntohs (m->header.size);
2796   char buf[size];
2797   struct EncryptedMessage *pt;  /* plaintext */
2798   GNUNET_HashCode ph;
2799   size_t off;
2800   uint32_t snum;
2801   struct GNUNET_TIME_Absolute t;
2802
2803 #if DEBUG_CORE
2804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2805               "Core service receives `%s' request from `%4s'.\n",
2806               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2807 #endif  
2808   /* decrypt */
2809   if (GNUNET_OK !=
2810       do_decrypt (n,
2811                   &m->plaintext_hash,
2812                   &m->sequence_number,
2813                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2814     return;
2815   pt = (struct EncryptedMessage *) buf;
2816
2817   /* validate hash */
2818   GNUNET_CRYPTO_hash (&pt->sequence_number,
2819                       size - ENCRYPTED_HEADER_SIZE, &ph);
2820   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2821     {
2822       /* checksum failed */
2823       GNUNET_break_op (0);
2824       return;
2825     }
2826
2827   /* validate sequence number */
2828   snum = ntohl (pt->sequence_number);
2829   if (n->last_sequence_number_received == snum)
2830     {
2831       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2832                   "Received duplicate message, ignoring.\n");
2833       /* duplicate, ignore */
2834       return;
2835     }
2836   if ((n->last_sequence_number_received > snum) &&
2837       (n->last_sequence_number_received - snum > 32))
2838     {
2839       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2840                   "Received ancient out of sequence message, ignoring.\n");
2841       /* ancient out of sequence, ignore */
2842       return;
2843     }
2844   if (n->last_sequence_number_received > snum)
2845     {
2846       unsigned int rotbit =
2847         1 << (n->last_sequence_number_received - snum - 1);
2848       if ((n->last_packets_bitmap & rotbit) != 0)
2849         {
2850           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2851                       "Received duplicate message, ignoring.\n");
2852           /* duplicate, ignore */
2853           return;
2854         }
2855       n->last_packets_bitmap |= rotbit;
2856     }
2857   if (n->last_sequence_number_received < snum)
2858     {
2859       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2860       n->last_sequence_number_received = snum;
2861     }
2862
2863   /* check timestamp */
2864   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2865   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2866     {
2867       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2868                   _
2869                   ("Message received far too old (%llu ms). Content ignored.\n"),
2870                   GNUNET_TIME_absolute_get_duration (t).value);
2871       return;
2872     }
2873
2874   /* process decrypted message(s) */
2875   update_window (GNUNET_YES,
2876                  &n->available_send_window,
2877                  &n->last_asw_update,
2878                  n->bpm_out);
2879   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2880   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2881                            n->bpm_out_internal_limit);
2882   n->last_activity = GNUNET_TIME_absolute_get ();
2883   off = sizeof (struct EncryptedMessage);
2884   deliver_messages (n, buf, size, off);
2885 }
2886
2887
2888 /**
2889  * Function called by the transport for each received message.
2890  *
2891  * @param cls closure
2892  * @param peer (claimed) identity of the other peer
2893  * @param message the message
2894  * @param latency estimated latency for communicating with the
2895  *             given peer (round-trip)
2896  * @param distance in overlay hops, as given by transport plugin
2897  */
2898 static void
2899 handle_transport_receive (void *cls,
2900                           const struct GNUNET_PeerIdentity *peer,
2901                           const struct GNUNET_MessageHeader *message,
2902                           struct GNUNET_TIME_Relative latency,
2903                           unsigned int distance)
2904 {
2905   struct Neighbour *n;
2906   struct GNUNET_TIME_Absolute now;
2907   int up;
2908   uint16_t type;
2909   uint16_t size;
2910
2911 #if DEBUG_CORE
2912   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2913               "Received message of type %u from `%4s', demultiplexing.\n",
2914               ntohs (message->type), GNUNET_i2s (peer));
2915 #endif
2916   n = find_neighbour (peer);
2917   if (n == NULL)
2918     {
2919       GNUNET_break (0);
2920       return;
2921     }
2922   n->last_latency = latency;
2923   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2924   type = ntohs (message->type);
2925   size = ntohs (message->size);
2926   switch (type)
2927     {
2928     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2929       if (size != sizeof (struct SetKeyMessage))
2930         {
2931           GNUNET_break_op (0);
2932           return;
2933         }
2934       handle_set_key (n, (const struct SetKeyMessage *) message);
2935       break;
2936     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2937       if (size < sizeof (struct EncryptedMessage) +
2938           sizeof (struct GNUNET_MessageHeader))
2939         {
2940           GNUNET_break_op (0);
2941           return;
2942         }
2943       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2944           (n->status != PEER_STATE_KEY_CONFIRMED))
2945         {
2946           GNUNET_break_op (0);
2947           return;
2948         }
2949       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2950       break;
2951     case GNUNET_MESSAGE_TYPE_CORE_PING:
2952       if (size != sizeof (struct PingMessage))
2953         {
2954           GNUNET_break_op (0);
2955           return;
2956         }
2957       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2958           (n->status != PEER_STATE_KEY_CONFIRMED))
2959         {
2960 #if DEBUG_CORE
2961           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2962                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2963                       "PING", GNUNET_i2s (&n->peer));
2964 #endif
2965           GNUNET_free_non_null (n->pending_ping);
2966           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2967           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2968           return;
2969         }
2970       handle_ping (n, (const struct PingMessage *) message);
2971       break;
2972     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2973       if (size != sizeof (struct PingMessage))
2974         {
2975           GNUNET_break_op (0);
2976           return;
2977         }
2978       if ((n->status != PEER_STATE_KEY_SENT) &&
2979           (n->status != PEER_STATE_KEY_RECEIVED) &&
2980           (n->status != PEER_STATE_KEY_CONFIRMED))
2981         {
2982           /* could not decrypt pong, oops! */
2983           GNUNET_break_op (0);
2984           return;
2985         }
2986       handle_pong (n, (const struct PingMessage *) message);
2987       break;
2988     default:
2989       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2990                   _("Unsupported message of type %u received.\n"), type);
2991       return;
2992     }
2993   if (n->status == PEER_STATE_KEY_CONFIRMED)
2994     {
2995       now = GNUNET_TIME_absolute_get ();
2996       n->last_activity = now;
2997       if (!up)
2998         n->time_established = now;
2999     }
3000 }
3001
3002
3003 /**
3004  * Function that recalculates the bandwidth quota for the
3005  * given neighbour and transmits it to the transport service.
3006  * 
3007  * @param cls neighbour for the quota update
3008  * @param tc context
3009  */
3010 static void
3011 neighbour_quota_update (void *cls,
3012                         const struct GNUNET_SCHEDULER_TaskContext *tc);
3013
3014
3015 /**
3016  * Schedule the task that will recalculate the bandwidth
3017  * quota for this peer (and possibly force a disconnect of
3018  * idle peers by calculating a bandwidth of zero).
3019  */
3020 static void
3021 schedule_quota_update (struct Neighbour *n)
3022 {
3023   GNUNET_assert (n->quota_update_task ==
3024                  GNUNET_SCHEDULER_NO_TASK);
3025   n->quota_update_task
3026     = GNUNET_SCHEDULER_add_delayed (sched,
3027                                     QUOTA_UPDATE_FREQUENCY,
3028                                     &neighbour_quota_update,
3029                                     n);
3030 }
3031
3032
3033 /**
3034  * Function that recalculates the bandwidth quota for the
3035  * given neighbour and transmits it to the transport service.
3036  * 
3037  * @param cls neighbour for the quota update
3038  * @param tc context
3039  */
3040 static void
3041 neighbour_quota_update (void *cls,
3042                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3043 {
3044   struct Neighbour *n = cls;
3045   uint32_t q_in;
3046   double pref_rel;
3047   double share;
3048   unsigned long long distributable;
3049   
3050   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3051   /* calculate relative preference among all neighbours;
3052      divides by a bit more to avoid division by zero AND to
3053      account for possibility of new neighbours joining any time 
3054      AND to convert to double... */
3055   pref_rel = n->current_preference / (1.0 + preference_sum);
3056   distributable = 0;
3057   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
3058     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
3059   share = distributable * pref_rel;
3060   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
3061   /* check if we want to disconnect for good due to inactivity */
3062   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3063        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3064     q_in = 0; /* force disconnect */
3065   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
3066        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
3067     {
3068       n->bpm_in = q_in;
3069       GNUNET_TRANSPORT_set_quota (transport,
3070                                   &n->peer,
3071                                   n->bpm_in, 
3072                                   n->bpm_out,
3073                                   GNUNET_TIME_UNIT_FOREVER_REL,
3074                                   NULL, NULL);
3075     }
3076   schedule_quota_update (n);
3077 }
3078
3079
3080 /**
3081  * Function called by transport to notify us that
3082  * a peer connected to us (on the network level).
3083  *
3084  * @param cls closure
3085  * @param peer the peer that connected
3086  * @param latency current latency of the connection
3087  * @param distance in overlay hops, as given by transport plugin
3088  */
3089 static void
3090 handle_transport_notify_connect (void *cls,
3091                                  const struct GNUNET_PeerIdentity *peer,
3092                                  struct GNUNET_TIME_Relative latency,
3093                                  unsigned int distance)
3094 {
3095   struct Neighbour *n;
3096   struct GNUNET_TIME_Absolute now;
3097   struct ConnectNotifyMessage cnm;
3098
3099   n = find_neighbour (peer);
3100   if (n != NULL)
3101     {
3102       /* duplicate connect notification!? */
3103       GNUNET_break (0);
3104       return;
3105     }
3106   now = GNUNET_TIME_absolute_get ();
3107   n = GNUNET_malloc (sizeof (struct Neighbour));
3108   n->next = neighbours;
3109   neighbours = n;
3110   neighbour_count++;
3111   n->peer = *peer;
3112   n->last_latency = latency;
3113   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
3114   n->encrypt_key_created = now;
3115   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
3116   n->last_asw_update = now;
3117   n->last_arw_update = now;
3118   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3119   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3120   n->bpm_out_internal_limit = (uint32_t) - 1;
3121   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3122   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3123                                                 (uint32_t) - 1);
3124 #if DEBUG_CORE
3125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3126               "Received connection from `%4s'.\n",
3127               GNUNET_i2s (&n->peer));
3128 #endif
3129   schedule_quota_update (n);
3130   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3131   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3132   cnm.distance = htonl (n->last_distance);
3133   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
3134   cnm.peer = *peer;
3135   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3136   send_key (n);
3137 }
3138
3139
3140 /**
3141  * Free the given entry for the neighbour (it has
3142  * already been removed from the list at this point).
3143  *
3144  * @param n neighbour to free
3145  */
3146 static void
3147 free_neighbour (struct Neighbour *n)
3148 {
3149   struct MessageEntry *m;
3150
3151   if (n->pitr != NULL)
3152     {
3153       GNUNET_PEERINFO_iterate_cancel (n->pitr);
3154       n->pitr = NULL;
3155     }
3156   if (n->skm != NULL)
3157     {
3158       GNUNET_free (n->skm);
3159       n->skm = NULL;
3160     }
3161   while (NULL != (m = n->messages))
3162     {
3163       n->messages = m->next;
3164       GNUNET_free (m);
3165     }
3166   while (NULL != (m = n->encrypted_head))
3167     {
3168       n->encrypted_head = m->next;
3169       GNUNET_free (m);
3170     }
3171   if (NULL != n->th)
3172     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3173   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3174     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3175   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3176     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3177   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3178     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3179   GNUNET_free_non_null (n->public_key);
3180   GNUNET_free_non_null (n->pending_ping);
3181   GNUNET_free (n);
3182 }
3183
3184
3185 /**
3186  * Function called by transport telling us that a peer
3187  * disconnected.
3188  *
3189  * @param cls closure
3190  * @param peer the peer that disconnected
3191  */
3192 static void
3193 handle_transport_notify_disconnect (void *cls,
3194                                     const struct GNUNET_PeerIdentity *peer)
3195 {
3196   struct DisconnectNotifyMessage cnm;
3197   struct Neighbour *n;
3198   struct Neighbour *p;
3199
3200 #if DEBUG_CORE
3201   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3202               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3203 #endif
3204   p = NULL;
3205   n = neighbours;
3206   while ((n != NULL) &&
3207          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3208     {
3209       p = n;
3210       n = n->next;
3211     }
3212   if (n == NULL)
3213     {
3214       GNUNET_break (0);
3215       return;
3216     }
3217   if (p == NULL)
3218     neighbours = n->next;
3219   else
3220     p->next = n->next;
3221   GNUNET_assert (neighbour_count > 0);
3222   neighbour_count--;
3223   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
3224   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3225   cnm.peer = *peer;
3226   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3227   free_neighbour (n);
3228 }
3229
3230
3231 /**
3232  * Last task run during shutdown.  Disconnects us from
3233  * the transport.
3234  */
3235 static void
3236 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3237 {
3238   struct Neighbour *n;
3239   struct Client *c;
3240
3241 #if DEBUG_CORE
3242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3243               "Core service shutting down.\n");
3244 #endif
3245   GNUNET_assert (transport != NULL);
3246   GNUNET_TRANSPORT_disconnect (transport);
3247   transport = NULL;
3248   while (NULL != (n = neighbours))
3249     {
3250       neighbours = n->next;
3251       GNUNET_assert (neighbour_count > 0);
3252       neighbour_count--;
3253       free_neighbour (n);
3254     }
3255   while (NULL != (c = clients))
3256     handle_client_disconnect (NULL, c->client_handle);
3257   if (my_private_key != NULL)
3258     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3259 }
3260
3261
3262 /**
3263  * Initiate core service.
3264  *
3265  * @param cls closure
3266  * @param s scheduler to use
3267  * @param serv the initialized server
3268  * @param c configuration to use
3269  */
3270 static void
3271 run (void *cls,
3272      struct GNUNET_SCHEDULER_Handle *s,
3273      struct GNUNET_SERVER_Handle *serv,
3274      const struct GNUNET_CONFIGURATION_Handle *c)
3275 {
3276 #if 0
3277   unsigned long long qin;
3278   unsigned long long qout;
3279   unsigned long long tneigh;
3280 #endif
3281   char *keyfile;
3282
3283   sched = s;
3284   cfg = c;
3285   /* parse configuration */
3286   if (
3287        (GNUNET_OK !=
3288         GNUNET_CONFIGURATION_get_value_number (c,
3289                                                "CORE",
3290                                                "TOTAL_QUOTA_IN",
3291                                                &bandwidth_target_in)) ||
3292        (GNUNET_OK !=
3293         GNUNET_CONFIGURATION_get_value_number (c,
3294                                                "CORE",
3295                                                "TOTAL_QUOTA_OUT",
3296                                                &bandwidth_target_out)) ||
3297 #if 0
3298        (GNUNET_OK !=
3299         GNUNET_CONFIGURATION_get_value_number (c,
3300                                                "CORE",
3301                                                "YY",
3302                                                &qout)) ||
3303        (GNUNET_OK !=
3304         GNUNET_CONFIGURATION_get_value_number (c,
3305                                                "CORE",
3306                                                "ZZ_LIMIT", &tneigh)) ||
3307 #endif
3308        (GNUNET_OK !=
3309         GNUNET_CONFIGURATION_get_value_filename (c,
3310                                                  "GNUNETD",
3311                                                  "HOSTKEY", &keyfile)))
3312     {
3313       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3314                   _
3315                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3316       GNUNET_SCHEDULER_shutdown (s);
3317       return;
3318     }
3319   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3320   GNUNET_free (keyfile);
3321   if (my_private_key == NULL)
3322     {
3323       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3324                   _("Core service could not access hostkey.  Exiting.\n"));
3325       GNUNET_SCHEDULER_shutdown (s);
3326       return;
3327     }
3328   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3329   GNUNET_CRYPTO_hash (&my_public_key,
3330                       sizeof (my_public_key), &my_identity.hashPubKey);
3331   /* setup notification */
3332   server = serv;
3333   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3334   /* setup transport connection */
3335   transport = GNUNET_TRANSPORT_connect (sched,
3336                                         cfg,
3337                                         NULL,
3338                                         &handle_transport_receive,
3339                                         &handle_transport_notify_connect,
3340                                         &handle_transport_notify_disconnect);
3341   GNUNET_assert (NULL != transport);
3342   GNUNET_SCHEDULER_add_delayed (sched,
3343                                 GNUNET_TIME_UNIT_FOREVER_REL,
3344                                 &cleaning_task, NULL);
3345   /* process client requests */
3346   GNUNET_SERVER_add_handlers (server, handlers);
3347   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3348               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3349 }
3350
3351
3352
3353 /**
3354  * The main function for the transport service.
3355  *
3356  * @param argc number of arguments from the command line
3357  * @param argv command line arguments
3358  * @return 0 ok, 1 on error
3359  */
3360 int
3361 main (int argc, char *const *argv)
3362 {
3363   return (GNUNET_OK ==
3364           GNUNET_SERVICE_run (argc,
3365                               argv,
3366                               "core",
3367                               GNUNET_SERVICE_OPTION_NONE,
3368                               &run, NULL)) ? 0 : 1;
3369 }
3370
3371 /* end of gnunet-service-core.c */