fixing non-transport compilation issues
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - not all GNUNET_CORE_OPTION_SEND_* flags are fully supported yet
28  *   (i.e. no SEND_XXX_OUTBOUND).
29  * - 'REQUEST_DISCONNECT' is not implemented (transport API is lacking!)
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Non-NULL if we are currently looking up HELLOs for this peer.
369    * for this peer.
370    */
371   struct GNUNET_PEERINFO_IteratorContext *pitr;
372
373   /**
374    * SetKeyMessage to transmit, NULL if we are not currently trying
375    * to send one.
376    */
377   struct SetKeyMessage *skm;
378
379   /**
380    * Identity of the neighbour.
381    */
382   struct GNUNET_PeerIdentity peer;
383
384   /**
385    * Key we use to encrypt our messages for the other peer
386    * (initialized by us when we do the handshake).
387    */
388   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
389
390   /**
391    * Key we use to decrypt messages from the other peer
392    * (given to us by the other peer during the handshake).
393    */
394   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
395
396   /**
397    * ID of task used for re-trying plaintext scheduling.
398    */
399   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
400
401   /**
402    * ID of task used for re-trying SET_KEY and PING message.
403    */
404   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
405
406   /**
407    * ID of task used for updating bandwidth quota for this neighbour.
408    */
409   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
410
411   /**
412    * At what time did we generate our encryption key?
413    */
414   struct GNUNET_TIME_Absolute encrypt_key_created;
415
416   /**
417    * At what time did the other peer generate the decryption key?
418    */
419   struct GNUNET_TIME_Absolute decrypt_key_created;
420
421   /**
422    * At what time did we initially establish (as in, complete session
423    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
424    */
425   struct GNUNET_TIME_Absolute time_established;
426
427   /**
428    * At what time did we last receive an encrypted message from the
429    * other peer?  Should be zero if status != KEY_CONFIRMED.
430    */
431   struct GNUNET_TIME_Absolute last_activity;
432
433   /**
434    * Last latency observed from this peer.
435    */
436   struct GNUNET_TIME_Relative last_latency;
437
438   /**
439    * At what frequency are we currently re-trying SET_KEY messages?
440    */
441   struct GNUNET_TIME_Relative set_key_retry_frequency;
442
443   /**
444    * Time of our last update to the "available_send_window".
445    */
446   struct GNUNET_TIME_Absolute last_asw_update;
447
448   /**
449    * Time of our last update to the "available_recv_window".
450    */
451   struct GNUNET_TIME_Absolute last_arw_update;
452
453   /**
454    * Number of bytes that we are eligible to transmit to this
455    * peer at this point.  Incremented every minute by max_out_bpm,
456    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
457    * bandwidth-hogs are sampled at a frequency of about 78s!);
458    * may get negative if we have VERY high priority content.
459    */
460   long long available_send_window; 
461
462   /**
463    * How much downstream capacity of this peer has been reserved for
464    * our traffic?  (Our clients can request that a certain amount of
465    * bandwidth is available for replies to them; this value is used to
466    * make sure that this reserved amount of bandwidth is actually
467    * available).
468    */
469   long long available_recv_window; 
470
471   /**
472    * How valueable were the messages of this peer recently?
473    */
474   unsigned long long current_preference;
475
476   /**
477    * Bit map indicating which of the 32 sequence numbers before the last
478    * were received (good for accepting out-of-order packets and
479    * estimating reliability of the connection)
480    */
481   unsigned int last_packets_bitmap;
482
483   /**
484    * Number of messages in the message queue for this peer.
485    */
486   unsigned int message_queue_size;
487
488   /**
489    * last sequence number received on this connection (highest)
490    */
491   uint32_t last_sequence_number_received;
492
493   /**
494    * last sequence number transmitted
495    */
496   uint32_t last_sequence_number_sent;
497
498   /**
499    * Available bandwidth in for this peer (current target).
500    */
501   uint32_t bpm_in;
502
503   /**
504    * Available bandwidth out for this peer (current target).
505    */
506   uint32_t bpm_out;
507
508   /**
509    * Internal bandwidth limit set for this peer (initially
510    * typically set to "-1").  "bpm_out" is MAX of
511    * "bpm_out_internal_limit" and "bpm_out_external_limit".
512    */
513   uint32_t bpm_out_internal_limit;
514
515   /**
516    * External bandwidth limit set for this peer by the
517    * peer that we are communicating with.  "bpm_out" is MAX of
518    * "bpm_out_internal_limit" and "bpm_out_external_limit".
519    */
520   uint32_t bpm_out_external_limit;
521
522   /**
523    * What was our PING challenge number (for this peer)?
524    */
525   uint32_t ping_challenge;
526
527   /**
528    * What is our connection status?
529    */
530   enum PeerStateMachine status;
531
532 };
533
534
535 /**
536  * Events are messages for clients.  The struct
537  * itself is followed by the actual message.
538  */
539 struct Event
540 {
541   /**
542    * This is a linked list.
543    */
544   struct Event *next;
545
546   /**
547    * Size of the message.
548    */
549   size_t size;
550
551   /**
552    * Could this event be dropped if this queue
553    * is getting too large? (NOT YET USED!)
554    */
555   int can_drop;
556
557 };
558
559
560 /**
561  * Data structure for each client connected to the core service.
562  */
563 struct Client
564 {
565   /**
566    * Clients are kept in a linked list.
567    */
568   struct Client *next;
569
570   /**
571    * Handle for the client with the server API.
572    */
573   struct GNUNET_SERVER_Client *client_handle;
574
575   /**
576    * Linked list of messages we still need to deliver to
577    * this client.
578    */
579   struct Event *event_head;
580
581   /**
582    * Tail of the linked list of events.
583    */
584   struct Event *event_tail;
585
586   /**
587    * Current transmit handle, NULL if no transmission request
588    * is pending.
589    */
590   struct GNUNET_CONNECTION_TransmitHandle *th;
591
592   /**
593    * Array of the types of messages this peer cares
594    * about (with "tcnt" entries).  Allocated as part
595    * of this client struct, do not free!
596    */
597   uint16_t *types;
598
599   /**
600    * Options for messages this client cares about,
601    * see GNUNET_CORE_OPTION_ values.
602    */
603   uint32_t options;
604
605   /**
606    * Number of types of incoming messages this client
607    * specifically cares about.  Size of the "types" array.
608    */
609   unsigned int tcnt;
610
611 };
612
613
614 /**
615  * Our public key.
616  */
617 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
618
619 /**
620  * Our identity.
621  */
622 static struct GNUNET_PeerIdentity my_identity;
623
624 /**
625  * Our private key.
626  */
627 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
628
629 /**
630  * Our scheduler.
631  */
632 struct GNUNET_SCHEDULER_Handle *sched;
633
634 /**
635  * Our configuration.
636  */
637 const struct GNUNET_CONFIGURATION_Handle *cfg;
638
639 /**
640  * Our server.
641  */
642 static struct GNUNET_SERVER_Handle *server;
643
644 /**
645  * Transport service.
646  */
647 static struct GNUNET_TRANSPORT_Handle *transport;
648
649 /**
650  * Linked list of our clients.
651  */
652 static struct Client *clients;
653
654 /**
655  * We keep neighbours in a linked list (for now).
656  */
657 static struct Neighbour *neighbours;
658
659 /**
660  * Sum of all preferences among all neighbours.
661  */
662 static unsigned long long preference_sum;
663
664 /**
665  * Total number of neighbours we have.
666  */
667 static unsigned int neighbour_count;
668
669 /**
670  * How much inbound bandwidth are we supposed to be using?
671  */
672 static unsigned long long bandwidth_target_in;
673
674 /**
675  * How much outbound bandwidth are we supposed to be using?
676  */
677 static unsigned long long bandwidth_target_out;
678
679
680
681 /**
682  * A preference value for a neighbour was update.  Update
683  * the preference sum accordingly.
684  *
685  * @param inc how much was a preference value increased?
686  */
687 static void
688 update_preference_sum (unsigned long long inc)
689 {
690   struct Neighbour *n;
691   unsigned long long os;
692
693   os = preference_sum;
694   preference_sum += inc;
695   if (preference_sum >= os)
696     return; /* done! */
697   /* overflow! compensate by cutting all values in half! */
698   preference_sum = 0;
699   n = neighbours;
700   while (n != NULL)
701     {
702       n->current_preference /= 2;
703       preference_sum += n->current_preference;
704       n = n->next;
705     }    
706 }
707
708
709 /**
710  * Recalculate the number of bytes we expect to
711  * receive or transmit in a given window.
712  *
713  * @param force force an update now (even if not much time has passed)
714  * @param window pointer to the byte counter (updated)
715  * @param ts pointer to the timestamp (updated)
716  * @param bpm number of bytes per minute that should
717  *        be added to the window.
718  */
719 static void
720 update_window (int force,
721                long long *window,
722                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
723 {
724   struct GNUNET_TIME_Relative since;
725
726   since = GNUNET_TIME_absolute_get_duration (*ts);
727   if ( (force == GNUNET_NO) &&
728        (since.value < 60 * 1000) )
729     return;                     /* not even a minute has passed */
730   *ts = GNUNET_TIME_absolute_get ();
731   *window += (bpm * since.value) / 60 / 1000;
732   if (*window > MAX_WINDOW_TIME * bpm)
733     *window = MAX_WINDOW_TIME * bpm;
734 }
735
736
737 /**
738  * Find the entry for the given neighbour.
739  *
740  * @param peer identity of the neighbour
741  * @return NULL if we are not connected, otherwise the
742  *         neighbour's entry.
743  */
744 static struct Neighbour *
745 find_neighbour (const struct GNUNET_PeerIdentity *peer)
746 {
747   struct Neighbour *ret;
748
749   ret = neighbours;
750   while ((ret != NULL) &&
751          (0 != memcmp (&ret->peer,
752                        peer, sizeof (struct GNUNET_PeerIdentity))))
753     ret = ret->next;
754   return ret;
755 }
756
757
758 /**
759  * Find the entry for the given client.
760  *
761  * @param client handle for the client
762  * @return NULL if we are not connected, otherwise the
763  *         client's struct.
764  */
765 static struct Client *
766 find_client (const struct GNUNET_SERVER_Client *client)
767 {
768   struct Client *ret;
769
770   ret = clients;
771   while ((ret != NULL) && (client != ret->client_handle))
772     ret = ret->next;
773   return ret;
774 }
775
776
777 /**
778  * If necessary, initiate a request with the server to
779  * transmit messages from the queue of the given client.
780  * @param client who to transfer messages to
781  */
782 static void request_transmit (struct Client *client);
783
784
785 /**
786  * Client is ready to receive data, provide it.
787  *
788  * @param cls closure
789  * @param size number of bytes available in buf
790  * @param buf where the callee should write the message
791  * @return number of bytes written to buf
792  */
793 static size_t
794 do_client_transmit (void *cls, size_t size, void *buf)
795 {
796   struct Client *client = cls;
797   struct Event *e;
798   char *tgt;
799   size_t ret;
800
801   client->th = NULL;
802 #if DEBUG_CORE_CLIENT
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804               "Client ready to receive %u bytes.\n", size);
805 #endif
806   if (buf == NULL)
807     {
808 #if DEBUG_CORE
809       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
810                   "Failed to transmit data to client (disconnect)?\n");
811 #endif
812       return 0;                 /* we'll surely get a disconnect soon... */
813     }
814   tgt = buf;
815   ret = 0;
816   while ((NULL != (e = client->event_head)) && (e->size <= size))
817     {
818       memcpy (&tgt[ret], &e[1], e->size);
819       size -= e->size;
820       ret += e->size;
821       client->event_head = e->next;
822       GNUNET_free (e);
823     }
824   GNUNET_assert (ret > 0);
825   if (client->event_head == NULL)
826     client->event_tail = NULL;
827 #if DEBUG_CORE_CLIENT
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829               "Transmitting %u bytes to client\n", ret);
830 #endif
831   request_transmit (client);
832   return ret;
833 }
834
835
836 /**
837  * If necessary, initiate a request with the server to
838  * transmit messages from the queue of the given client.
839  * @param client who to transfer messages to
840  */
841 static void
842 request_transmit (struct Client *client)
843 {
844
845   if (NULL != client->th)
846     return;                     /* already pending */
847   if (NULL == client->event_head)
848     return;                     /* no more events pending */
849 #if DEBUG_CORE_CLIENT
850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851               "Asking server to transmit %u bytes to client\n",
852               client->event_head->size);
853 #endif
854   client->th
855     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
856                                            client->event_head->size,
857                                            GNUNET_TIME_UNIT_FOREVER_REL,
858                                            &do_client_transmit, client);
859 }
860
861
862 /**
863  * Send a message to one of our clients.
864  *
865  * @param client target for the message
866  * @param msg message to transmit
867  * @param can_drop could this message be dropped if the
868  *        client's queue is getting too large?
869  */
870 static void
871 send_to_client (struct Client *client,
872                 const struct GNUNET_MessageHeader *msg, int can_drop)
873 {
874   struct Event *e;
875   unsigned int queue_size;
876   uint16_t msize;
877
878 #if DEBUG_CORE_CLIENT
879   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880               "Preparing to send message of type %u to client.\n",
881               ntohs (msg->type));
882 #endif
883   queue_size = 0;
884   e = client->event_head;
885   while (e != NULL)
886     {
887       queue_size++;
888       e = e->next;
889     }
890   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
891        (can_drop == GNUNET_YES) )
892     {
893 #if DEBUG_CORE
894       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
895                   "Too many messages in queue for the client, dropping the new message.\n");
896 #endif
897       return;
898     }
899
900   msize = ntohs (msg->size);
901   e = GNUNET_malloc (sizeof (struct Event) + msize);
902   /* append */
903   if (client->event_tail != NULL)
904     client->event_tail->next = e;
905   else
906     client->event_head = e;
907   client->event_tail = e;
908   e->can_drop = can_drop;
909   e->size = msize;
910   memcpy (&e[1], msg, msize);
911   request_transmit (client);
912 }
913
914
915 /**
916  * Send a message to all of our current clients.
917  */
918 static void
919 send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
920                      int can_drop,
921                      int options)
922 {
923   struct Client *c;
924
925   c = clients;
926   while (c != NULL)
927     {
928       if (0 != (c->options & options))
929         send_to_client (c, msg, can_drop);
930       c = c->next;
931     }
932 }
933
934
935 /**
936  * Handle CORE_INIT request.
937  */
938 static void
939 handle_client_init (void *cls,
940                     struct GNUNET_SERVER_Client *client,
941                     const struct GNUNET_MessageHeader *message)
942 {
943   const struct InitMessage *im;
944   struct InitReplyMessage irm;
945   struct Client *c;
946   uint16_t msize;
947   const uint16_t *types;
948   struct Neighbour *n;
949   struct ConnectNotifyMessage cnm;
950
951 #if DEBUG_CORE_CLIENT
952   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953               "Client connecting to core service with `%s' message\n",
954               "INIT");
955 #endif
956   /* check that we don't have an entry already */
957   c = clients;
958   while (c != NULL)
959     {
960       if (client == c->client_handle)
961         {
962           GNUNET_break (0);
963           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
964           return;
965         }
966       c = c->next;
967     }
968   msize = ntohs (message->size);
969   if (msize < sizeof (struct InitMessage))
970     {
971       GNUNET_break (0);
972       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
973       return;
974     }
975   im = (const struct InitMessage *) message;
976   types = (const uint16_t *) &im[1];
977   msize -= sizeof (struct InitMessage);
978   c = GNUNET_malloc (sizeof (struct Client) + msize);
979   c->client_handle = client;
980   c->next = clients;
981   clients = c;
982   memcpy (&c[1], types, msize);
983   c->types = (uint16_t *) & c[1];
984   c->options = ntohl (im->options);
985   c->tcnt = msize / sizeof (uint16_t);
986   /* send init reply message */
987   irm.header.size = htons (sizeof (struct InitReplyMessage));
988   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
989   irm.reserved = htonl (0);
990   memcpy (&irm.publicKey,
991           &my_public_key,
992           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
993 #if DEBUG_CORE_CLIENT
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "Sending `%s' message to client.\n", "INIT_REPLY");
996 #endif
997   send_to_client (c, &irm.header, GNUNET_NO);
998   /* notify new client about existing neighbours */
999   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
1000   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1001   n = neighbours;
1002   while (n != NULL)
1003     {
1004 #if DEBUG_CORE_CLIENT
1005       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
1007 #endif
1008       cnm.reserved = htonl (0);
1009       cnm.peer = n->peer;
1010       send_to_client (c, &cnm.header, GNUNET_NO);
1011       n = n->next;
1012     }
1013   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1014 }
1015
1016
1017 /**
1018  * A client disconnected, clean up.
1019  *
1020  * @param cls closure
1021  * @param client identification of the client
1022  */
1023 static void
1024 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1025 {
1026   struct Client *pos;
1027   struct Client *prev;
1028   struct Event *e;
1029
1030 #if DEBUG_CORE_CLIENT
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032               "Client has disconnected from core service.\n");
1033 #endif
1034   prev = NULL;
1035   pos = clients;
1036   while (pos != NULL)
1037     {
1038       if (client == pos->client_handle)
1039         {
1040           if (prev == NULL)
1041             clients = pos->next;
1042           else
1043             prev->next = pos->next;
1044           if (pos->th != NULL)
1045             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1046           while (NULL != (e = pos->event_head))
1047             {
1048               pos->event_head = e->next;
1049               GNUNET_free (e);
1050             }
1051           GNUNET_free (pos);
1052           return;
1053         }
1054       prev = pos;
1055       pos = pos->next;
1056     }
1057   /* client never sent INIT */
1058 }
1059
1060
1061 /**
1062  * Handle REQUEST_INFO request.
1063  */
1064 static void
1065 handle_client_request_info (void *cls,
1066                                  struct GNUNET_SERVER_Client *client,
1067                                  const struct GNUNET_MessageHeader *message)
1068 {
1069   const struct RequestInfoMessage *rcm;
1070   struct Neighbour *n;
1071   struct ConfigurationInfoMessage cim;
1072   struct Client *c;
1073   int reserv;
1074   unsigned long long old_preference;
1075
1076 #if DEBUG_CORE_CLIENT
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "Core service receives `%s' request.\n", "REQUEST_INFO");
1079 #endif
1080   rcm = (const struct RequestInfoMessage *) message;
1081   n = find_neighbour (&rcm->peer);
1082   memset (&cim, 0, sizeof (cim));
1083   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1084     {
1085       update_window (GNUNET_YES,
1086                      &n->available_send_window,
1087                      &n->last_asw_update,
1088                      n->bpm_out);
1089       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1090       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1091                                n->bpm_out_external_limit);
1092       reserv = ntohl (rcm->reserve_inbound);
1093       if (reserv < 0)
1094         {
1095           n->available_recv_window += reserv;
1096         }
1097       else if (reserv > 0)
1098         {
1099           update_window (GNUNET_NO,
1100                          &n->available_recv_window,
1101                          &n->last_arw_update, n->bpm_in);
1102           if (n->available_recv_window < reserv)
1103             reserv = n->available_recv_window;
1104           n->available_recv_window -= reserv;
1105         }
1106       old_preference = n->current_preference;
1107       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1108       if (old_preference > n->current_preference) 
1109         {
1110           /* overflow; cap at maximum value */
1111           n->current_preference = (unsigned long long) -1;
1112         }
1113       update_preference_sum (n->current_preference - old_preference);
1114       cim.reserved_amount = htonl (reserv);
1115       cim.bpm_in = htonl (n->bpm_in);
1116       cim.bpm_out = htonl (n->bpm_out);
1117       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1118       cim.preference = n->current_preference;
1119     }
1120   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1121   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1122   cim.peer = rcm->peer;
1123   c = find_client (client);
1124   if (c == NULL)
1125     {
1126       GNUNET_break (0);
1127       return;
1128     }
1129 #if DEBUG_CORE_CLIENT
1130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1132 #endif
1133   send_to_client (c, &cim.header, GNUNET_NO);
1134 }
1135
1136
1137 /**
1138  * Check if we have encrypted messages for the specified neighbour
1139  * pending, and if so, check with the transport about sending them
1140  * out.
1141  *
1142  * @param n neighbour to check.
1143  */
1144 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1145
1146
1147 /**
1148  * Function called when the transport service is ready to
1149  * receive an encrypted message for the respective peer
1150  *
1151  * @param cls neighbour to use message from
1152  * @param size number of bytes we can transmit
1153  * @param buf where to copy the message
1154  * @return number of bytes transmitted
1155  */
1156 static size_t
1157 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1158 {
1159   struct Neighbour *n = cls;
1160   struct MessageEntry *m;
1161   size_t ret;
1162   char *cbuf;
1163
1164   n->th = NULL;
1165   GNUNET_assert (NULL != (m = n->encrypted_head));
1166   n->encrypted_head = m->next;
1167   if (m->next == NULL)
1168     n->encrypted_tail = NULL;
1169   ret = 0;
1170   cbuf = buf;
1171   if (buf != NULL)
1172     {
1173       GNUNET_assert (size >= m->size);
1174       memcpy (cbuf, &m[1], m->size);
1175       ret = m->size;
1176       n->available_send_window -= m->size;
1177       process_encrypted_neighbour_queue (n);
1178 #if DEBUG_CORE
1179       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1181                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1182                   ret, GNUNET_i2s (&n->peer));
1183 #endif
1184     }
1185   else
1186     {
1187       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188                   "Transmission for message of type %u and size %u failed\n",
1189                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1190                   m->size);
1191     }
1192   GNUNET_free (m);
1193   return ret;
1194 }
1195
1196
1197 /**
1198  * Check if we have plaintext messages for the specified neighbour
1199  * pending, and if so, consider batching and encrypting them (and
1200  * then trigger processing of the encrypted queue if needed).
1201  *
1202  * @param n neighbour to check.
1203  */
1204 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1205
1206
1207 /**
1208  * Check if we have encrypted messages for the specified neighbour
1209  * pending, and if so, check with the transport about sending them
1210  * out.
1211  *
1212  * @param n neighbour to check.
1213  */
1214 static void
1215 process_encrypted_neighbour_queue (struct Neighbour *n)
1216 {
1217   struct MessageEntry *m;
1218  
1219   if (n->th != NULL)
1220     return;  /* request already pending */
1221   if (n->encrypted_head == NULL)
1222     {
1223       /* encrypted queue empty, try plaintext instead */
1224       process_plaintext_neighbour_queue (n);
1225       return;
1226     }
1227 #if DEBUG_CORE
1228   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1230               n->encrypted_head->size,
1231               GNUNET_i2s (&n->peer),
1232               GNUNET_TIME_absolute_get_remaining (n->
1233                                                   encrypted_head->deadline).
1234               value);
1235 #endif
1236   n->th =
1237     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1238                                             n->encrypted_head->size,
1239                                             n->encrypted_head->priority,
1240                                             GNUNET_TIME_absolute_get_remaining
1241                                             (n->encrypted_head->deadline),
1242                                             &notify_encrypted_transmit_ready,
1243                                             n);
1244   if (n->th == NULL)
1245     {
1246       /* message request too large (oops) */
1247       GNUNET_break (0);
1248       /* discard encrypted message */
1249       GNUNET_assert (NULL != (m = n->encrypted_head));
1250       n->encrypted_head = m->next;
1251       if (m->next == NULL)
1252         n->encrypted_tail = NULL;
1253       GNUNET_free (m);
1254       process_encrypted_neighbour_queue (n);
1255     }
1256 }
1257
1258
1259 /**
1260  * Decrypt size bytes from in and write the result to out.  Use the
1261  * key for inbound traffic of the given neighbour.  This function does
1262  * NOT do any integrity-checks on the result.
1263  *
1264  * @param n neighbour we are receiving from
1265  * @param iv initialization vector to use
1266  * @param in ciphertext
1267  * @param out plaintext
1268  * @param size size of in/out
1269  * @return GNUNET_OK on success
1270  */
1271 static int
1272 do_decrypt (struct Neighbour *n,
1273             const GNUNET_HashCode * iv,
1274             const void *in, void *out, size_t size)
1275 {
1276   if (size != (uint16_t) size)
1277     {
1278       GNUNET_break (0);
1279       return GNUNET_NO;
1280     }
1281   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1282       (n->status != PEER_STATE_KEY_CONFIRMED))
1283     {
1284       GNUNET_break_op (0);
1285       return GNUNET_SYSERR;
1286     }
1287   if (size !=
1288       GNUNET_CRYPTO_aes_decrypt (in,
1289                                  (uint16_t) size,
1290                                  &n->decrypt_key,
1291                                  (const struct
1292                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1293                                  out))
1294     {
1295       GNUNET_break (0);
1296       return GNUNET_SYSERR;
1297     }
1298 #if DEBUG_CORE
1299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300               "Decrypted %u bytes from `%4s' using key %u\n",
1301               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1302 #endif
1303   return GNUNET_OK;
1304 }
1305
1306
1307 /**
1308  * Encrypt size bytes from in and write the result to out.  Use the
1309  * key for outbound traffic of the given neighbour.
1310  *
1311  * @param n neighbour we are sending to
1312  * @param iv initialization vector to use
1313  * @param in ciphertext
1314  * @param out plaintext
1315  * @param size size of in/out
1316  * @return GNUNET_OK on success
1317  */
1318 static int
1319 do_encrypt (struct Neighbour *n,
1320             const GNUNET_HashCode * iv,
1321             const void *in, void *out, size_t size)
1322 {
1323   if (size != (uint16_t) size)
1324     {
1325       GNUNET_break (0);
1326       return GNUNET_NO;
1327     }
1328   GNUNET_assert (size ==
1329                  GNUNET_CRYPTO_aes_encrypt (in,
1330                                             (uint16_t) size,
1331                                             &n->encrypt_key,
1332                                             (const struct
1333                                              GNUNET_CRYPTO_AesInitializationVector
1334                                              *) iv, out));
1335 #if DEBUG_CORE
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337               "Encrypted %u bytes for `%4s' using key %u\n", size,
1338               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1339 #endif
1340   return GNUNET_OK;
1341 }
1342
1343
1344 /**
1345  * Select messages for transmission.  This heuristic uses a combination
1346  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1347  * and priority-based discard (in case no feasible schedule exist) and
1348  * speculative optimization (defer any kind of transmission until
1349  * we either create a batch of significant size, 25% of max, or until
1350  * we are close to a deadline).  Furthermore, when scheduling the
1351  * heuristic also packs as many messages into the batch as possible,
1352  * starting with those with the earliest deadline.  Yes, this is fun.
1353  *
1354  * @param n neighbour to select messages from
1355  * @param size number of bytes to select for transmission
1356  * @param retry_time set to the time when we should try again
1357  *        (only valid if this function returns zero)
1358  * @return number of bytes selected, or 0 if we decided to
1359  *         defer scheduling overall; in that case, retry_time is set.
1360  */
1361 static size_t
1362 select_messages (struct Neighbour *n,
1363                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1364 {
1365   struct MessageEntry *pos;
1366   struct MessageEntry *min;
1367   struct MessageEntry *last;
1368   unsigned int min_prio;
1369   struct GNUNET_TIME_Absolute t;
1370   struct GNUNET_TIME_Absolute now;
1371   uint64_t delta;
1372   uint64_t avail;
1373   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1374   size_t off;
1375   int discard_low_prio;
1376
1377   GNUNET_assert (NULL != n->messages);
1378   now = GNUNET_TIME_absolute_get ();
1379   /* last entry in linked list of messages processed */
1380   last = NULL;
1381   /* should we remove the entry with the lowest
1382      priority from consideration for scheduling at the
1383      end of the loop? */
1384   discard_low_prio = GNUNET_YES;
1385   while (GNUNET_YES == discard_low_prio)
1386     {
1387       min = NULL;
1388       min_prio = -1;
1389       discard_low_prio = GNUNET_NO;
1390       /* calculate number of bytes available for transmission at time "t" */
1391       update_window (GNUNET_NO,
1392                      &n->available_send_window,
1393                      &n->last_asw_update,
1394                      n->bpm_out);
1395       avail = n->available_send_window;
1396       t = n->last_asw_update;
1397       /* how many bytes have we (hypothetically) scheduled so far */
1398       off = 0;
1399       /* maximum time we can wait before transmitting anything
1400          and still make all of our deadlines */
1401       slack = -1;
1402
1403       pos = n->messages;
1404       /* note that we use "*2" here because we want to look
1405          a bit further into the future; much more makes no
1406          sense since new message might be scheduled in the
1407          meantime... */
1408       while ((pos != NULL) && (off < size * 2))
1409         {
1410           if (pos->do_transmit == GNUNET_YES)
1411             {
1412               /* already removed from consideration */
1413               pos = pos->next;
1414               continue;
1415             }
1416           if (discard_low_prio == GNUNET_NO)
1417             {
1418               delta = pos->deadline.value;
1419               if (delta < t.value)
1420                 delta = 0;
1421               else
1422                 delta = t.value - delta;
1423               avail += delta * n->bpm_out / 1000 / 60;
1424               if (avail < pos->size)
1425                 {
1426                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1427                 }
1428               else
1429                 {
1430                   avail -= pos->size;
1431                   /* update slack, considering both its absolute deadline
1432                      and relative deadlines caused by other messages
1433                      with their respective load */
1434                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1435                   if (pos->deadline.value < now.value)
1436                     slack = 0;
1437                   else
1438                     slack =
1439                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1440                 }
1441             }
1442           off += pos->size;
1443           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1444           if (pos->priority <= min_prio)
1445             {
1446               /* update min for discard */
1447               min_prio = pos->priority;
1448               min = pos;
1449             }
1450           pos = pos->next;
1451         }
1452       if (discard_low_prio)
1453         {
1454           GNUNET_assert (min != NULL);
1455           /* remove lowest-priority entry from consideration */
1456           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1457         }
1458       last = pos;
1459     }
1460   /* guard against sending "tiny" messages with large headers without
1461      urgent deadlines */
1462   if ( (slack > 1000) && (size > 4 * off) )
1463     {
1464       /* less than 25% of message would be filled with
1465          deadlines still being met if we delay by one
1466          second or more; so just wait for more data */
1467       retry_time->value = slack / 2;
1468       /* reset do_transmit values for next time */
1469       while (pos != last)
1470         {
1471           pos->do_transmit = GNUNET_NO;
1472           pos = pos->next;
1473         }
1474       return 0;
1475     }
1476   /* select marked messages (up to size) for transmission */
1477   off = 0;
1478   pos = n->messages;
1479   while (pos != last)
1480     {
1481       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1482         {
1483           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1484           off += pos->size;
1485           size -= pos->size;
1486         }
1487       else
1488         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1489       pos = pos->next;
1490     }
1491 #if DEBUG_CORE
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1494               off, GNUNET_i2s (&n->peer));
1495 #endif
1496   return off;
1497 }
1498
1499
1500 /**
1501  * Batch multiple messages into a larger buffer.
1502  *
1503  * @param n neighbour to take messages from
1504  * @param buf target buffer
1505  * @param size size of buf
1506  * @param deadline set to transmission deadline for the result
1507  * @param retry_time set to the time when we should try again
1508  *        (only valid if this function returns zero)
1509  * @param priority set to the priority of the batch
1510  * @return number of bytes written to buf (can be zero)
1511  */
1512 static size_t
1513 batch_message (struct Neighbour *n,
1514                char *buf,
1515                size_t size,
1516                struct GNUNET_TIME_Absolute *deadline,
1517                struct GNUNET_TIME_Relative *retry_time,
1518                unsigned int *priority)
1519 {
1520   struct MessageEntry *pos;
1521   struct MessageEntry *prev;
1522   struct MessageEntry *next;
1523   size_t ret;
1524
1525   ret = 0;
1526   *priority = 0;
1527   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1528   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1529   if (0 == select_messages (n, size, retry_time))
1530     {
1531       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1532                   "No messages selected, will try again in %llu ms\n",
1533                   retry_time->value);
1534       return 0;
1535     }
1536   pos = n->messages;
1537   prev = NULL;
1538   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1539     {
1540       next = pos->next;
1541       if (GNUNET_YES == pos->do_transmit)
1542         {
1543           GNUNET_assert (pos->size <= size);
1544           memcpy (&buf[ret], &pos[1], pos->size);
1545           ret += pos->size;
1546           size -= pos->size;
1547           *priority += pos->priority;
1548 #if DEBUG_CORE
1549           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550                       "Adding plaintext message with deadline %llu ms to batch\n",
1551                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1552 #endif
1553           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1554           GNUNET_free (pos);
1555           if (prev == NULL)
1556             n->messages = next;
1557           else
1558             prev->next = next;
1559         }
1560       else
1561         {
1562           prev = pos;
1563         }
1564       pos = next;
1565     }
1566 #if DEBUG_CORE
1567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1568               "Deadline for message batch is %llu ms\n",
1569               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1570 #endif
1571   return ret;
1572 }
1573
1574
1575 /**
1576  * Remove messages with deadlines that have long expired from
1577  * the queue.
1578  *
1579  * @param n neighbour to inspect
1580  */
1581 static void
1582 discard_expired_messages (struct Neighbour *n)
1583 {
1584   struct MessageEntry *prev;
1585   struct MessageEntry *next;
1586   struct MessageEntry *pos;
1587   struct GNUNET_TIME_Absolute now;
1588   struct GNUNET_TIME_Relative delta;
1589
1590   now = GNUNET_TIME_absolute_get ();
1591   prev = NULL;
1592   pos = n->messages;
1593   while (pos != NULL) 
1594     {
1595       next = pos->next;
1596       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1597       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1598         {
1599 #if DEBUG_CORE
1600           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1601                       "Message is %llu ms past due, discarding.\n",
1602                       delta.value);
1603 #endif
1604           if (prev == NULL)
1605             n->messages = next;
1606           else
1607             prev->next = next;
1608           GNUNET_free (pos);
1609         }
1610       else
1611         prev = pos;
1612       pos = next;
1613     }
1614 }
1615
1616
1617 /**
1618  * Signature of the main function of a task.
1619  *
1620  * @param cls closure
1621  * @param tc context information (why was this task triggered now)
1622  */
1623 static void
1624 retry_plaintext_processing (void *cls,
1625                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1626 {
1627   struct Neighbour *n = cls;
1628
1629   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1630   process_plaintext_neighbour_queue (n);
1631 }
1632
1633
1634 /**
1635  * Send our key (and encrypted PING) to the other peer.
1636  *
1637  * @param n the other peer
1638  */
1639 static void send_key (struct Neighbour *n);
1640
1641
1642 /**
1643  * Check if we have plaintext messages for the specified neighbour
1644  * pending, and if so, consider batching and encrypting them (and
1645  * then trigger processing of the encrypted queue if needed).
1646  *
1647  * @param n neighbour to check.
1648  */
1649 static void
1650 process_plaintext_neighbour_queue (struct Neighbour *n)
1651 {
1652   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1653   size_t used;
1654   size_t esize;
1655   struct EncryptedMessage *em;  /* encrypted message */
1656   struct EncryptedMessage *ph;  /* plaintext header */
1657   struct MessageEntry *me;
1658   unsigned int priority;
1659   struct GNUNET_TIME_Absolute deadline;
1660   struct GNUNET_TIME_Relative retry_time;
1661
1662   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1663     {
1664       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1665       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1666     }
1667   switch (n->status)
1668     {
1669     case PEER_STATE_DOWN:
1670       send_key (n);
1671 #if DEBUG_CORE
1672       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1674                   GNUNET_i2s(&n->peer));
1675 #endif
1676       return;
1677     case PEER_STATE_KEY_SENT:
1678       GNUNET_assert (n->retry_set_key_task !=
1679                      GNUNET_SCHEDULER_NO_TASK);
1680 #if DEBUG_CORE
1681       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1683                   GNUNET_i2s(&n->peer));
1684 #endif
1685       return;
1686     case PEER_STATE_KEY_RECEIVED:
1687       GNUNET_assert (n->retry_set_key_task !=
1688                      GNUNET_SCHEDULER_NO_TASK);
1689 #if DEBUG_CORE
1690       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1691                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1692                   GNUNET_i2s(&n->peer));
1693 #endif
1694       return;
1695     case PEER_STATE_KEY_CONFIRMED:
1696       /* ready to continue */
1697       break;
1698     }
1699   discard_expired_messages (n);
1700   if (n->messages == NULL)
1701     {
1702 #if DEBUG_CORE
1703       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1704                   "Plaintext message queue for `%4s' is empty.\n",
1705                   GNUNET_i2s(&n->peer));
1706 #endif
1707       return;                   /* no pending messages */
1708     }
1709   if (n->encrypted_head != NULL)
1710     {
1711 #if DEBUG_CORE
1712       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1713                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1714                   GNUNET_i2s(&n->peer));
1715 #endif
1716       return;                   /* wait for messages already encrypted to be
1717                                    processed first! */
1718     }
1719   ph = (struct EncryptedMessage *) pbuf;
1720   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1721   priority = 0;
1722   used = sizeof (struct EncryptedMessage);
1723   used += batch_message (n,
1724                          &pbuf[used],
1725                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1726                          &deadline, &retry_time, &priority);
1727   if (used == sizeof (struct EncryptedMessage))
1728     {
1729 #if DEBUG_CORE
1730       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1732                   GNUNET_i2s(&n->peer));
1733 #endif
1734       /* no messages selected for sending, try again later... */
1735       n->retry_plaintext_task =
1736         GNUNET_SCHEDULER_add_delayed (sched,
1737                                       retry_time,
1738                                       &retry_plaintext_processing, n);
1739       return;
1740     }
1741   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1742   ph->inbound_bpm_limit = htonl (n->bpm_in);
1743   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1744
1745   /* setup encryption message header */
1746   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1747   me->deadline = deadline;
1748   me->priority = priority;
1749   me->size = used;
1750   em = (struct EncryptedMessage *) &me[1];
1751   em->header.size = htons (used);
1752   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1753   em->reserved = htonl (0);
1754   esize = used - ENCRYPTED_HEADER_SIZE;
1755   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1756   /* encrypt */
1757 #if DEBUG_CORE
1758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1759               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1760               esize,
1761               GNUNET_i2s(&n->peer),
1762               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1763 #endif
1764   GNUNET_assert (GNUNET_OK ==
1765                  do_encrypt (n,
1766                              &em->plaintext_hash,
1767                              &ph->sequence_number,
1768                              &em->sequence_number, esize));
1769   /* append to transmission list */
1770   if (n->encrypted_tail == NULL)
1771     n->encrypted_head = me;
1772   else
1773     n->encrypted_tail->next = me;
1774   n->encrypted_tail = me;
1775   process_encrypted_neighbour_queue (n);
1776 }
1777
1778
1779 /**
1780  * Handle CORE_SEND request.
1781  *
1782  * @param cls unused
1783  * @param client the client issuing the request
1784  * @param message the "struct SendMessage"
1785  */
1786 static void
1787 handle_client_send (void *cls,
1788                     struct GNUNET_SERVER_Client *client,
1789                     const struct GNUNET_MessageHeader *message);
1790
1791
1792 /**
1793  * Function called to notify us that we either succeeded
1794  * or failed to connect (at the transport level) to another
1795  * peer.  We should either free the message we were asked
1796  * to transmit or re-try adding it to the queue.
1797  *
1798  * @param cls closure
1799  * @param size number of bytes available in buf
1800  * @param buf where the callee should write the message
1801  * @return number of bytes written to buf
1802  */
1803 static size_t
1804 send_connect_continuation (void *cls, size_t size, void *buf)
1805 {
1806   struct SendMessage *sm = cls;
1807
1808   if (buf == NULL)
1809     {
1810 #if DEBUG_CORE
1811       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1812                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1813                   GNUNET_i2s (&sm->peer));
1814 #endif
1815       GNUNET_free (sm);
1816       return 0;
1817     }
1818 #if DEBUG_CORE
1819   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1820               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1821               GNUNET_i2s (&sm->peer));
1822 #endif
1823   handle_client_send (NULL, NULL, &sm->header);
1824   GNUNET_free (sm);
1825   return 0;
1826 }
1827
1828
1829 /**
1830  * Handle CORE_SEND request.
1831  *
1832  * @param cls unused
1833  * @param client the client issuing the request
1834  * @param message the "struct SendMessage"
1835  */
1836 static void
1837 handle_client_send (void *cls,
1838                     struct GNUNET_SERVER_Client *client,
1839                     const struct GNUNET_MessageHeader *message)
1840 {
1841   const struct SendMessage *sm;
1842   struct SendMessage *smc;
1843   const struct GNUNET_MessageHeader *mh;
1844   struct Neighbour *n;
1845   struct MessageEntry *prev;
1846   struct MessageEntry *pos;
1847   struct MessageEntry *e; 
1848   struct MessageEntry *min_prio_entry;
1849   struct MessageEntry *min_prio_prev;
1850   unsigned int min_prio;
1851   unsigned int queue_size;
1852   uint16_t msize;
1853
1854   msize = ntohs (message->size);
1855   if (msize <
1856       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1857     {
1858       GNUNET_break (0);
1859       if (client != NULL)
1860         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1861       return;
1862     }
1863   sm = (const struct SendMessage *) message;
1864   msize -= sizeof (struct SendMessage);
1865   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1866   if (msize != ntohs (mh->size))
1867     {
1868       GNUNET_break (0);
1869       if (client != NULL)
1870         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1871       return;
1872     }
1873   n = find_neighbour (&sm->peer);
1874   if (n == NULL)
1875     {
1876 #if DEBUG_CORE
1877       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1878                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1879                   "SEND",
1880                   GNUNET_i2s (&sm->peer),
1881                   GNUNET_TIME_absolute_get_remaining
1882                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1883 #endif
1884       msize += sizeof (struct SendMessage);
1885       /* ask transport to connect to the peer */
1886       smc = GNUNET_malloc (msize);
1887       memcpy (smc, sm, msize);
1888       if (NULL ==
1889           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1890                                                   &sm->peer,
1891                                                   0, 0,
1892                                                   GNUNET_TIME_absolute_get_remaining
1893                                                   (GNUNET_TIME_absolute_ntoh
1894                                                    (sm->deadline)),
1895                                                   &send_connect_continuation,
1896                                                   smc))
1897         {
1898           /* transport has already a request pending for this peer! */
1899 #if DEBUG_CORE
1900           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1901                       "Dropped second message destined for `%4s' since connection is still down.\n",
1902                       GNUNET_i2s(&sm->peer));
1903 #endif
1904           GNUNET_free (smc);
1905         }
1906       if (client != NULL)
1907         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1908       return;
1909     }
1910 #if DEBUG_CORE
1911   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1912               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1913               "SEND",
1914               msize, 
1915               GNUNET_i2s (&sm->peer));
1916 #endif
1917   /* bound queue size */
1918   discard_expired_messages (n);
1919   min_prio = (unsigned int) -1;
1920   min_prio_entry = NULL;
1921   min_prio_prev = NULL;
1922   queue_size = 0;
1923   prev = NULL;
1924   pos = n->messages;
1925   while (pos != NULL) 
1926     {
1927       if (pos->priority < min_prio)
1928         {
1929           min_prio_entry = pos;
1930           min_prio_prev = prev;
1931           min_prio = pos->priority;
1932         }
1933       queue_size++;
1934       prev = pos;
1935       pos = pos->next;
1936     }
1937   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1938     {
1939       /* queue full */
1940       if (ntohl(sm->priority) <= min_prio)
1941         {
1942           /* discard new entry */
1943 #if DEBUG_CORE
1944           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1945                       "Queue full, discarding new request\n");
1946 #endif
1947           if (client != NULL)
1948             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1949           return;
1950         }
1951       /* discard "min_prio_entry" */
1952 #if DEBUG_CORE
1953       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1954                   "Queue full, discarding existing older request\n");
1955 #endif
1956       if (min_prio_prev == NULL)
1957         n->messages = min_prio_entry->next;
1958       else
1959         min_prio_prev->next = min_prio_entry->next;      
1960       GNUNET_free (min_prio_entry);     
1961     }
1962
1963 #if DEBUG_CORE
1964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1965               "Adding transmission request for `%4s' to queue\n",
1966               GNUNET_i2s (&sm->peer));
1967 #endif  
1968   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1969   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1970   e->priority = ntohl (sm->priority);
1971   e->size = msize;
1972   memcpy (&e[1], mh, msize);
1973
1974   /* insert, keep list sorted by deadline */
1975   prev = NULL;
1976   pos = n->messages;
1977   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1978     {
1979       prev = pos;
1980       pos = pos->next;
1981     }
1982   if (prev == NULL)
1983     n->messages = e;
1984   else
1985     prev->next = e;
1986   e->next = pos;
1987
1988   /* consider scheduling now */
1989   process_plaintext_neighbour_queue (n);
1990   if (client != NULL)
1991     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1992 }
1993
1994
1995 /**
1996  * Handle CORE_REQUEST_CONNECT request.
1997  *
1998  * @param cls unused
1999  * @param client the client issuing the request
2000  * @param message the "struct ConnectMessage"
2001  */
2002 static void
2003 handle_client_request_connect (void *cls,
2004                                struct GNUNET_SERVER_Client *client,
2005                                const struct GNUNET_MessageHeader *message)
2006 {
2007   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2008   struct Neighbour *n;
2009
2010   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2011   n = find_neighbour (&cm->peer);
2012   if (n != NULL)
2013     return; /* already connected, or at least trying */
2014 #if DEBUG_CORE
2015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016               "Core received `%s' request for `%4s', will try to establish connection\n",
2017               "REQUEST_CONNECT",
2018               GNUNET_i2s (&cm->peer));
2019 #endif
2020   /* ask transport to connect to the peer */
2021   /* FIXME: timeout zero OK? need for cancellation? */
2022   GNUNET_TRANSPORT_notify_transmit_ready (transport,
2023                                           &cm->peer,
2024                                           0, 0,
2025                                           GNUNET_TIME_UNIT_ZERO,
2026                                           NULL,
2027                                           NULL);
2028 }
2029
2030
2031 /**
2032  * Handle CORE_REQUEST_DISCONNECT request.
2033  *
2034  * @param cls unused
2035  * @param client the client issuing the request
2036  * @param message the "struct ConnectMessage"
2037  */
2038 static void
2039 handle_client_request_disconnect (void *cls,
2040                                   struct GNUNET_SERVER_Client *client,
2041                                   const struct GNUNET_MessageHeader *message)
2042 {
2043   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
2044   struct Neighbour *n;
2045
2046   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2047   n = find_neighbour (&cm->peer);
2048   if (n == NULL)
2049     return; /* done */
2050   /* FIXME: implement disconnect! */
2051 }
2052
2053
2054
2055 /**
2056  * List of handlers for the messages understood by this
2057  * service.
2058  */
2059 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2060   {&handle_client_init, NULL,
2061    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
2062   {&handle_client_request_info, NULL,
2063    GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
2064    sizeof (struct RequestInfoMessage)},
2065   {&handle_client_send, NULL,
2066    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
2067   {&handle_client_request_connect, NULL,
2068    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
2069    sizeof (struct ConnectMessage)},
2070   {&handle_client_request_disconnect, NULL,
2071    GNUNET_MESSAGE_TYPE_CORE_REQUEST_DISCONNECT,
2072    sizeof (struct ConnectMessage)},
2073   {NULL, NULL, 0, 0}
2074 };
2075
2076
2077 /**
2078  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2079  * the neighbour's struct and retry send_key.  Or, if we did not get a
2080  * HELLO, just do nothing.
2081  *
2082  * @param cls NULL
2083  * @param peer the peer for which this is the HELLO
2084  * @param hello HELLO message of that peer
2085  * @param trust amount of trust we currently have in that peer
2086  */
2087 static void
2088 process_hello_retry_send_key (void *cls,
2089                               const struct GNUNET_PeerIdentity *peer,
2090                               const struct GNUNET_HELLO_Message *hello,
2091                               uint32_t trust)
2092 {
2093   struct Neighbour *n = cls;
2094
2095   if (peer == NULL)
2096     {
2097       n->pitr = NULL;
2098       return;
2099     }
2100   if (n->public_key != NULL)
2101     return;
2102 #if DEBUG_CORE
2103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104               "Received new `%s' message for `%4s', initiating key exchange.\n",
2105               "HELLO",
2106               GNUNET_i2s (peer));
2107 #endif
2108   n->public_key =
2109     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2110   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2111     {
2112       GNUNET_free (n->public_key);
2113       n->public_key = NULL;
2114       return;
2115     }
2116   send_key (n);
2117 }
2118
2119
2120 /**
2121  * Task that will retry "send_key" if our previous attempt failed
2122  * to yield a PONG.
2123  */
2124 static void
2125 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2126 {
2127   struct Neighbour *n = cls;
2128
2129   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2130   n->set_key_retry_frequency =
2131     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2132   send_key (n);
2133 }
2134
2135
2136 /**
2137  * Send our key (and encrypted PING) to the other peer.
2138  *
2139  * @param n the other peer
2140  */
2141 static void
2142 send_key (struct Neighbour *n)
2143 {
2144   struct SetKeyMessage *sm;
2145   struct MessageEntry *me;
2146   struct PingMessage pp;
2147   struct PingMessage *pm;
2148
2149   if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
2150        (n->pitr != NULL) )
2151     return; /* already in progress */
2152 #if DEBUG_CORE
2153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154               "Asked to perform key exchange with `%4s'.\n",
2155               GNUNET_i2s (&n->peer));
2156 #endif
2157   if (n->public_key == NULL)
2158     {
2159       /* lookup n's public key, then try again */
2160 #if DEBUG_CORE
2161       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2162                   "Lacking public key for `%4s', trying to obtain one.\n",
2163                   GNUNET_i2s (&n->peer));
2164 #endif
2165       GNUNET_assert (n->pitr == NULL);
2166       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2167                                          sched,
2168                                          &n->peer,
2169                                          0,
2170                                          GNUNET_TIME_UNIT_MINUTES,
2171                                          &process_hello_retry_send_key, n);
2172       return;
2173     }
2174   /* first, set key message */
2175   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2176                       sizeof (struct SetKeyMessage));
2177   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2178   me->priority = SET_KEY_PRIORITY;
2179   me->size = sizeof (struct SetKeyMessage);
2180   if (n->encrypted_head == NULL)
2181     n->encrypted_head = me;
2182   else
2183     n->encrypted_tail->next = me;
2184   n->encrypted_tail = me;
2185   sm = (struct SetKeyMessage *) &me[1];
2186   sm->header.size = htons (sizeof (struct SetKeyMessage));
2187   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2188   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2189                                         PEER_STATE_KEY_SENT : n->status));
2190   sm->purpose.size =
2191     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2192            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2193            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2194            sizeof (struct GNUNET_PeerIdentity));
2195   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2196   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2197   sm->target = n->peer;
2198   GNUNET_assert (GNUNET_OK ==
2199                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2200                                             sizeof (struct
2201                                                     GNUNET_CRYPTO_AesSessionKey),
2202                                             n->public_key,
2203                                             &sm->encrypted_key));
2204   GNUNET_assert (GNUNET_OK ==
2205                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2206                                          &sm->signature));
2207
2208   /* second, encrypted PING message */
2209   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2210                       sizeof (struct PingMessage));
2211   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2212   me->priority = PING_PRIORITY;
2213   me->size = sizeof (struct PingMessage);
2214   n->encrypted_tail->next = me;
2215   n->encrypted_tail = me;
2216   pm = (struct PingMessage *) &me[1];
2217   pm->header.size = htons (sizeof (struct PingMessage));
2218   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2219   pp.challenge = htonl (n->ping_challenge);
2220   pp.target = n->peer;
2221 #if DEBUG_CORE
2222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2223               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2224               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2227               "PING",
2228               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2229 #endif
2230   do_encrypt (n,
2231               &n->peer.hashPubKey,
2232               &pp.challenge,
2233               &pm->challenge,
2234               sizeof (struct PingMessage) -
2235               sizeof (struct GNUNET_MessageHeader));
2236   /* update status */
2237   switch (n->status)
2238     {
2239     case PEER_STATE_DOWN:
2240       n->status = PEER_STATE_KEY_SENT;
2241       break;
2242     case PEER_STATE_KEY_SENT:
2243       break;
2244     case PEER_STATE_KEY_RECEIVED:
2245       break;
2246     case PEER_STATE_KEY_CONFIRMED:
2247       break;
2248     default:
2249       GNUNET_break (0);
2250       break;
2251     }
2252 #if DEBUG_CORE
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "Have %llu ms left for `%s' transmission.\n",
2255               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2256               "SET_KEY");
2257 #endif
2258   /* trigger queue processing */
2259   process_encrypted_neighbour_queue (n);
2260   if (n->status != PEER_STATE_KEY_CONFIRMED)
2261     {
2262       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task);
2263       n->retry_set_key_task
2264         = GNUNET_SCHEDULER_add_delayed (sched,
2265                                         n->set_key_retry_frequency,
2266                                         &set_key_retry_task, n);
2267     }
2268 }
2269
2270
2271 /**
2272  * We received a SET_KEY message.  Validate and update
2273  * our key material and status.
2274  *
2275  * @param n the neighbour from which we received message m
2276  * @param m the set key message we received
2277  */
2278 static void
2279 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2280
2281
2282 /**
2283  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2284  * the neighbour's struct and retry handling the set_key message.  Or,
2285  * if we did not get a HELLO, just free the set key message.
2286  *
2287  * @param cls pointer to the set key message
2288  * @param peer the peer for which this is the HELLO
2289  * @param hello HELLO message of that peer
2290  * @param trust amount of trust we currently have in that peer
2291  */
2292 static void
2293 process_hello_retry_handle_set_key (void *cls,
2294                                     const struct GNUNET_PeerIdentity *peer,
2295                                     const struct GNUNET_HELLO_Message *hello,
2296                                     uint32_t trust)
2297 {
2298   struct Neighbour *n = cls;
2299   struct SetKeyMessage *sm = n->skm;
2300
2301   if (peer == NULL)
2302     {
2303       GNUNET_free (sm);
2304       n->skm = NULL;
2305       n->pitr = NULL;
2306       return;
2307     }
2308   if (n->public_key != NULL)
2309     return;                     /* multiple HELLOs match!? */
2310   n->public_key =
2311     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2312   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2313     {
2314       GNUNET_break_op (0);
2315       GNUNET_free (n->public_key);
2316       n->public_key = NULL;
2317       return;
2318     }
2319 #if DEBUG_CORE
2320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2322               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2323 #endif
2324   handle_set_key (n, sm);
2325 }
2326
2327
2328 /**
2329  * We received a PING message.  Validate and transmit
2330  * PONG.
2331  *
2332  * @param n sender of the PING
2333  * @param m the encrypted PING message itself
2334  */
2335 static void
2336 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2337 {
2338   struct PingMessage t;
2339   struct PingMessage *tp;
2340   struct MessageEntry *me;
2341
2342 #if DEBUG_CORE
2343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2344               "Core service receives `%s' request from `%4s'.\n",
2345               "PING", GNUNET_i2s (&n->peer));
2346 #endif
2347   if (GNUNET_OK !=
2348       do_decrypt (n,
2349                   &my_identity.hashPubKey,
2350                   &m->challenge,
2351                   &t.challenge,
2352                   sizeof (struct PingMessage) -
2353                   sizeof (struct GNUNET_MessageHeader)))
2354     return;
2355 #if DEBUG_CORE
2356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2357               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2358               "PING",
2359               GNUNET_i2s (&t.target),
2360               ntohl (t.challenge), n->decrypt_key.crc32);
2361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362               "Target of `%s' request is `%4s'.\n",
2363               "PING", GNUNET_i2s (&t.target));
2364 #endif
2365   if (0 != memcmp (&t.target,
2366                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2367     {
2368       GNUNET_break_op (0);
2369       return;
2370     }
2371   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2372                       sizeof (struct PingMessage));
2373   if (n->encrypted_tail != NULL)
2374     n->encrypted_tail->next = me;
2375   else
2376     {
2377       n->encrypted_tail = me;
2378       n->encrypted_head = me;
2379     }
2380   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2381   me->priority = PONG_PRIORITY;
2382   me->size = sizeof (struct PingMessage);
2383   tp = (struct PingMessage *) &me[1];
2384   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2385   tp->header.size = htons (sizeof (struct PingMessage));
2386   do_encrypt (n,
2387               &my_identity.hashPubKey,
2388               &t.challenge,
2389               &tp->challenge,
2390               sizeof (struct PingMessage) -
2391               sizeof (struct GNUNET_MessageHeader));
2392 #if DEBUG_CORE
2393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2394               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2395               ntohl (t.challenge), n->encrypt_key.crc32);
2396 #endif
2397   /* trigger queue processing */
2398   process_encrypted_neighbour_queue (n);
2399 }
2400
2401
2402 /**
2403  * We received a SET_KEY message.  Validate and update
2404  * our key material and status.
2405  *
2406  * @param n the neighbour from which we received message m
2407  * @param m the set key message we received
2408  */
2409 static void
2410 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2411 {
2412   struct SetKeyMessage *m_cpy;
2413   struct GNUNET_TIME_Absolute t;
2414   struct GNUNET_CRYPTO_AesSessionKey k;
2415   struct PingMessage *ping;
2416   enum PeerStateMachine sender_status;
2417
2418 #if DEBUG_CORE
2419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2420               "Core service receives `%s' request from `%4s'.\n",
2421               "SET_KEY", GNUNET_i2s (&n->peer));
2422 #endif
2423   if (n->public_key == NULL)
2424     {
2425 #if DEBUG_CORE
2426       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2427                   "Lacking public key for peer, trying to obtain one.\n");
2428 #endif
2429       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2430       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2431       /* lookup n's public key, then try again */
2432       GNUNET_assert (n->pitr == NULL);
2433       GNUNET_assert (n->skm == NULL);
2434       n->skm = m_cpy;
2435       n->pitr = GNUNET_PEERINFO_iterate (cfg,
2436                                          sched,
2437                                          &n->peer,
2438                                          0,
2439                                          GNUNET_TIME_UNIT_MINUTES,
2440                                          &process_hello_retry_handle_set_key, n);
2441       return;
2442     }
2443   if (0 != memcmp (&m->target,
2444                    &my_identity,
2445                    sizeof (struct GNUNET_PeerIdentity)))
2446     {
2447       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2448                   _("Received `%s' message that was not for me.  Ignoring.\n"));
2449       return;
2450     }
2451   if ((ntohl (m->purpose.size) !=
2452        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2453        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2454        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2455        sizeof (struct GNUNET_PeerIdentity)) ||
2456       (GNUNET_OK !=
2457        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2458                                  &m->purpose, &m->signature, n->public_key)))
2459     {
2460       /* invalid signature */
2461       GNUNET_break_op (0);
2462       return;
2463     }
2464   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2465   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2466        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2467       (t.value < n->decrypt_key_created.value))
2468     {
2469       /* this could rarely happen due to massive re-ordering of
2470          messages on the network level, but is most likely either
2471          a bug or some adversary messing with us.  Report. */
2472       GNUNET_break_op (0);
2473       return;
2474     }
2475 #if DEBUG_CORE
2476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2477 #endif  
2478   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2479                                   &m->encrypted_key,
2480                                   &k,
2481                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2482        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2483       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2484     {
2485       /* failed to decrypt !? */
2486       GNUNET_break_op (0);
2487       return;
2488     }
2489
2490   n->decrypt_key = k;
2491   if (n->decrypt_key_created.value != t.value)
2492     {
2493       /* fresh key, reset sequence numbers */
2494       n->last_sequence_number_received = 0;
2495       n->last_packets_bitmap = 0;
2496       n->decrypt_key_created = t;
2497     }
2498   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2499   switch (n->status)
2500     {
2501     case PEER_STATE_DOWN:
2502       n->status = PEER_STATE_KEY_RECEIVED;
2503 #if DEBUG_CORE
2504       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2505                   "Responding to `%s' with my own key.\n", "SET_KEY");
2506 #endif
2507       send_key (n);
2508       break;
2509     case PEER_STATE_KEY_SENT:
2510     case PEER_STATE_KEY_RECEIVED:
2511       n->status = PEER_STATE_KEY_RECEIVED;
2512       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2513           (sender_status != PEER_STATE_KEY_CONFIRMED))
2514         {
2515 #if DEBUG_CORE
2516           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517                       "Responding to `%s' with my own key (other peer has status %u).\n",
2518                       "SET_KEY", sender_status);
2519 #endif
2520           send_key (n);
2521         }
2522       break;
2523     case PEER_STATE_KEY_CONFIRMED:
2524       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2525           (sender_status != PEER_STATE_KEY_CONFIRMED))
2526         {         
2527 #if DEBUG_CORE
2528           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2529                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2530                       "SET_KEY", sender_status);
2531 #endif
2532           send_key (n);
2533         }
2534       break;
2535     default:
2536       GNUNET_break (0);
2537       break;
2538     }
2539   if (n->pending_ping != NULL)
2540     {
2541       ping = n->pending_ping;
2542       n->pending_ping = NULL;
2543       handle_ping (n, ping);
2544       GNUNET_free (ping);
2545     }
2546 }
2547
2548
2549 /**
2550  * We received a PONG message.  Validate and update
2551  * our status.
2552  *
2553  * @param n sender of the PONG
2554  * @param m the encrypted PONG message itself
2555  */
2556 static void
2557 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2558 {
2559   struct PingMessage t;
2560   struct ConnectNotifyMessage cnm;
2561
2562 #if DEBUG_CORE
2563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2564               "Core service receives `%s' request from `%4s'.\n",
2565               "PONG", GNUNET_i2s (&n->peer));
2566 #endif
2567   if (GNUNET_OK !=
2568       do_decrypt (n,
2569                   &n->peer.hashPubKey,
2570                   &m->challenge,
2571                   &t.challenge,
2572                   sizeof (struct PingMessage) -
2573                   sizeof (struct GNUNET_MessageHeader)))
2574     return;
2575 #if DEBUG_CORE
2576   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2577               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2578               "PONG",
2579               GNUNET_i2s (&t.target),
2580               ntohl (t.challenge), n->decrypt_key.crc32);
2581 #endif
2582   if ((0 != memcmp (&t.target,
2583                     &n->peer,
2584                     sizeof (struct GNUNET_PeerIdentity))) ||
2585       (n->ping_challenge != ntohl (t.challenge)))
2586     {
2587       /* PONG malformed */
2588 #if DEBUG_CORE
2589       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2590                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2591                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2592       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2594                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2595 #endif
2596       GNUNET_break_op (0);
2597       return;
2598     }
2599   switch (n->status)
2600     {
2601     case PEER_STATE_DOWN:
2602       GNUNET_break (0);         /* should be impossible */
2603       return;
2604     case PEER_STATE_KEY_SENT:
2605       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2606       return;
2607     case PEER_STATE_KEY_RECEIVED:
2608       n->status = PEER_STATE_KEY_CONFIRMED;
2609       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2610         {
2611           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2612           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2613         }      
2614       cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2615       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2616       cnm.reserved = htonl (0);
2617       cnm.peer = n->peer;
2618       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
2619       process_encrypted_neighbour_queue (n);
2620       break;
2621     case PEER_STATE_KEY_CONFIRMED:
2622       /* duplicate PONG? */
2623       break;
2624     default:
2625       GNUNET_break (0);
2626       break;
2627     }
2628 }
2629
2630
2631 /**
2632  * Send a P2P message to a client.
2633  *
2634  * @param sender who sent us the message?
2635  * @param client who should we give the message to?
2636  * @param m contains the message to transmit
2637  * @param msize number of bytes in buf to transmit
2638  */
2639 static void
2640 send_p2p_message_to_client (struct Neighbour *sender,
2641                             struct Client *client,
2642                             const void *m, size_t msize)
2643 {
2644   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2645   struct NotifyTrafficMessage *ntm;
2646
2647 #if DEBUG_CORE
2648   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2649               "Core service passes message from `%4s' of type %u to client.\n",
2650               GNUNET_i2s(&sender->peer),
2651               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2652 #endif
2653   ntm = (struct NotifyTrafficMessage *) buf;
2654   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2655   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2656   ntm->reserved = htonl (0);
2657   ntm->peer = sender->peer;
2658   memcpy (&ntm[1], m, msize);
2659   send_to_client (client, &ntm->header, GNUNET_YES);
2660 }
2661
2662
2663 /**
2664  * Deliver P2P message to interested clients.
2665  *
2666  * @param sender who sent us the message?
2667  * @param m the message
2668  * @param msize size of the message (including header)
2669  */
2670 static void
2671 deliver_message (struct Neighbour *sender,
2672                  const struct GNUNET_MessageHeader *m, size_t msize)
2673 {
2674   struct Client *cpos;
2675   uint16_t type;
2676   unsigned int tpos;
2677   int deliver_full;
2678
2679   type = ntohs (m->type);
2680   cpos = clients;
2681   while (cpos != NULL)
2682     {
2683       deliver_full = GNUNET_NO;
2684       if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
2685         deliver_full = GNUNET_YES;
2686       else
2687         {
2688           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2689             {
2690               if (type != cpos->types[tpos])
2691                 continue;
2692               deliver_full = GNUNET_YES;
2693               break;
2694             }
2695         }
2696       if (GNUNET_YES == deliver_full)
2697         send_p2p_message_to_client (sender, cpos, m, msize);
2698       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2699         send_p2p_message_to_client (sender, cpos, m,
2700                                     sizeof (struct GNUNET_MessageHeader));
2701       cpos = cpos->next;
2702     }
2703 }
2704
2705
2706 /**
2707  * Align P2P message and then deliver to interested clients.
2708  *
2709  * @param sender who sent us the message?
2710  * @param buffer unaligned (!) buffer containing message
2711  * @param msize size of the message (including header)
2712  */
2713 static void
2714 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2715 {
2716   char abuf[msize];
2717
2718   /* TODO: call to statistics? */
2719   memcpy (abuf, buffer, msize);
2720   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2721 }
2722
2723
2724 /**
2725  * Deliver P2P messages to interested clients.
2726  *
2727  * @param sender who sent us the message?
2728  * @param buffer buffer containing messages, can be modified
2729  * @param buffer_size size of the buffer (overall)
2730  * @param offset offset where messages in the buffer start
2731  */
2732 static void
2733 deliver_messages (struct Neighbour *sender,
2734                   const char *buffer, size_t buffer_size, size_t offset)
2735 {
2736   struct GNUNET_MessageHeader *mhp;
2737   struct GNUNET_MessageHeader mh;
2738   uint16_t msize;
2739   int need_align;
2740
2741   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2742     {
2743       if (0 != offset % sizeof (uint16_t))
2744         {
2745           /* outch, need to copy to access header */
2746           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2747           mhp = &mh;
2748         }
2749       else
2750         {
2751           /* can access header directly */
2752           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2753         }
2754       msize = ntohs (mhp->size);
2755       if (msize + offset > buffer_size)
2756         {
2757           /* malformed message, header says it is larger than what
2758              would fit into the overall buffer */
2759           GNUNET_break_op (0);
2760           break;
2761         }
2762 #if HAVE_UNALIGNED_64_ACCESS
2763       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2764 #else
2765       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2766 #endif
2767       if (GNUNET_YES == need_align)
2768         align_and_deliver (sender, &buffer[offset], msize);
2769       else
2770         deliver_message (sender,
2771                          (const struct GNUNET_MessageHeader *)
2772                          &buffer[offset], msize);
2773       offset += msize;
2774     }
2775 }
2776
2777
2778 /**
2779  * We received an encrypted message.  Decrypt, validate and
2780  * pass on to the appropriate clients.
2781  */
2782 static void
2783 handle_encrypted_message (struct Neighbour *n,
2784                           const struct EncryptedMessage *m)
2785 {
2786   size_t size = ntohs (m->header.size);
2787   char buf[size];
2788   struct EncryptedMessage *pt;  /* plaintext */
2789   GNUNET_HashCode ph;
2790   size_t off;
2791   uint32_t snum;
2792   struct GNUNET_TIME_Absolute t;
2793
2794 #if DEBUG_CORE
2795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2796               "Core service receives `%s' request from `%4s'.\n",
2797               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2798 #endif  
2799   /* decrypt */
2800   if (GNUNET_OK !=
2801       do_decrypt (n,
2802                   &m->plaintext_hash,
2803                   &m->sequence_number,
2804                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2805     return;
2806   pt = (struct EncryptedMessage *) buf;
2807
2808   /* validate hash */
2809   GNUNET_CRYPTO_hash (&pt->sequence_number,
2810                       size - ENCRYPTED_HEADER_SIZE, &ph);
2811   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2812     {
2813       /* checksum failed */
2814       GNUNET_break_op (0);
2815       return;
2816     }
2817
2818   /* validate sequence number */
2819   snum = ntohl (pt->sequence_number);
2820   if (n->last_sequence_number_received == snum)
2821     {
2822       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2823                   "Received duplicate message, ignoring.\n");
2824       /* duplicate, ignore */
2825       return;
2826     }
2827   if ((n->last_sequence_number_received > snum) &&
2828       (n->last_sequence_number_received - snum > 32))
2829     {
2830       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2831                   "Received ancient out of sequence message, ignoring.\n");
2832       /* ancient out of sequence, ignore */
2833       return;
2834     }
2835   if (n->last_sequence_number_received > snum)
2836     {
2837       unsigned int rotbit =
2838         1 << (n->last_sequence_number_received - snum - 1);
2839       if ((n->last_packets_bitmap & rotbit) != 0)
2840         {
2841           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2842                       "Received duplicate message, ignoring.\n");
2843           /* duplicate, ignore */
2844           return;
2845         }
2846       n->last_packets_bitmap |= rotbit;
2847     }
2848   if (n->last_sequence_number_received < snum)
2849     {
2850       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2851       n->last_sequence_number_received = snum;
2852     }
2853
2854   /* check timestamp */
2855   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2856   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2857     {
2858       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2859                   _
2860                   ("Message received far too old (%llu ms). Content ignored.\n"),
2861                   GNUNET_TIME_absolute_get_duration (t).value);
2862       return;
2863     }
2864
2865   /* process decrypted message(s) */
2866   update_window (GNUNET_YES,
2867                  &n->available_send_window,
2868                  &n->last_asw_update,
2869                  n->bpm_out);
2870   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2871   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2872                            n->bpm_out_internal_limit);
2873   n->last_activity = GNUNET_TIME_absolute_get ();
2874   off = sizeof (struct EncryptedMessage);
2875   deliver_messages (n, buf, size, off);
2876 }
2877
2878
2879 /**
2880  * Function called by the transport for each received message.
2881  *
2882  * @param cls closure
2883  * @param peer (claimed) identity of the other peer
2884  * @param message the message
2885  * @param latency estimated latency for communicating with the
2886  *             given peer (round-trip)
2887  * @param distance in overlay hops, as given by transport plugin
2888  */
2889 static void
2890 handle_transport_receive (void *cls,
2891                           const struct GNUNET_PeerIdentity *peer,
2892                           const struct GNUNET_MessageHeader *message,
2893                           struct GNUNET_TIME_Relative latency,
2894                           unsigned int distance)
2895 {
2896   struct Neighbour *n;
2897   struct GNUNET_TIME_Absolute now;
2898   int up;
2899   uint16_t type;
2900   uint16_t size;
2901
2902 #if DEBUG_CORE
2903   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2904               "Received message of type %u from `%4s', demultiplexing.\n",
2905               ntohs (message->type), GNUNET_i2s (peer));
2906 #endif
2907   n = find_neighbour (peer);
2908   if (n == NULL)
2909     {
2910       GNUNET_break (0);
2911       return;
2912     }
2913   n->last_latency = latency;
2914   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2915   type = ntohs (message->type);
2916   size = ntohs (message->size);
2917   switch (type)
2918     {
2919     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2920       if (size != sizeof (struct SetKeyMessage))
2921         {
2922           GNUNET_break_op (0);
2923           return;
2924         }
2925       handle_set_key (n, (const struct SetKeyMessage *) message);
2926       break;
2927     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2928       if (size < sizeof (struct EncryptedMessage) +
2929           sizeof (struct GNUNET_MessageHeader))
2930         {
2931           GNUNET_break_op (0);
2932           return;
2933         }
2934       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2935           (n->status != PEER_STATE_KEY_CONFIRMED))
2936         {
2937           GNUNET_break_op (0);
2938           return;
2939         }
2940       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2941       break;
2942     case GNUNET_MESSAGE_TYPE_CORE_PING:
2943       if (size != sizeof (struct PingMessage))
2944         {
2945           GNUNET_break_op (0);
2946           return;
2947         }
2948       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2949           (n->status != PEER_STATE_KEY_CONFIRMED))
2950         {
2951 #if DEBUG_CORE
2952           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2953                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2954                       "PING", GNUNET_i2s (&n->peer));
2955 #endif
2956           GNUNET_free_non_null (n->pending_ping);
2957           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2958           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2959           return;
2960         }
2961       handle_ping (n, (const struct PingMessage *) message);
2962       break;
2963     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2964       if (size != sizeof (struct PingMessage))
2965         {
2966           GNUNET_break_op (0);
2967           return;
2968         }
2969       if ((n->status != PEER_STATE_KEY_SENT) &&
2970           (n->status != PEER_STATE_KEY_RECEIVED) &&
2971           (n->status != PEER_STATE_KEY_CONFIRMED))
2972         {
2973           /* could not decrypt pong, oops! */
2974           GNUNET_break_op (0);
2975           return;
2976         }
2977       handle_pong (n, (const struct PingMessage *) message);
2978       break;
2979     default:
2980       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2981                   _("Unsupported message of type %u received.\n"), type);
2982       return;
2983     }
2984   if (n->status == PEER_STATE_KEY_CONFIRMED)
2985     {
2986       now = GNUNET_TIME_absolute_get ();
2987       n->last_activity = now;
2988       if (!up)
2989         n->time_established = now;
2990     }
2991 }
2992
2993
2994 /**
2995  * Function that recalculates the bandwidth quota for the
2996  * given neighbour and transmits it to the transport service.
2997  * 
2998  * @param cls neighbour for the quota update
2999  * @param tc context
3000  */
3001 static void
3002 neighbour_quota_update (void *cls,
3003                         const struct GNUNET_SCHEDULER_TaskContext *tc);
3004
3005
3006 /**
3007  * Schedule the task that will recalculate the bandwidth
3008  * quota for this peer (and possibly force a disconnect of
3009  * idle peers by calculating a bandwidth of zero).
3010  */
3011 static void
3012 schedule_quota_update (struct Neighbour *n)
3013 {
3014   GNUNET_assert (n->quota_update_task ==
3015                  GNUNET_SCHEDULER_NO_TASK);
3016   n->quota_update_task
3017     = GNUNET_SCHEDULER_add_delayed (sched,
3018                                     QUOTA_UPDATE_FREQUENCY,
3019                                     &neighbour_quota_update,
3020                                     n);
3021 }
3022
3023
3024 /**
3025  * Function that recalculates the bandwidth quota for the
3026  * given neighbour and transmits it to the transport service.
3027  * 
3028  * @param cls neighbour for the quota update
3029  * @param tc context
3030  */
3031 static void
3032 neighbour_quota_update (void *cls,
3033                         const struct GNUNET_SCHEDULER_TaskContext *tc)
3034 {
3035   struct Neighbour *n = cls;
3036   uint32_t q_in;
3037   double pref_rel;
3038   double share;
3039   unsigned long long distributable;
3040   
3041   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
3042   /* calculate relative preference among all neighbours;
3043      divides by a bit more to avoid division by zero AND to
3044      account for possibility of new neighbours joining any time 
3045      AND to convert to double... */
3046   pref_rel = n->current_preference / (1.0 + preference_sum);
3047   distributable = 0;
3048   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
3049     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
3050   share = distributable * pref_rel;
3051   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
3052   /* check if we want to disconnect for good due to inactivity */
3053   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
3054        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
3055     q_in = 0; /* force disconnect */
3056   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
3057        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
3058     {
3059       n->bpm_in = q_in;
3060       GNUNET_TRANSPORT_set_quota (transport,
3061                                   &n->peer,
3062                                   n->bpm_in, 
3063                                   n->bpm_out,
3064                                   GNUNET_TIME_UNIT_FOREVER_REL,
3065                                   NULL, NULL);
3066     }
3067   schedule_quota_update (n);
3068 }
3069
3070
3071 /**
3072  * Function called by transport to notify us that
3073  * a peer connected to us (on the network level).
3074  *
3075  * @param cls closure
3076  * @param peer the peer that connected
3077  * @param latency current latency of the connection
3078  * @param distance in overlay hops, as given by transport plugin
3079  */
3080 static void
3081 handle_transport_notify_connect (void *cls,
3082                                  const struct GNUNET_PeerIdentity *peer,
3083                                  struct GNUNET_TIME_Relative latency,
3084                                  unsigned int distance)
3085 {
3086   struct Neighbour *n;
3087   struct GNUNET_TIME_Absolute now;
3088   struct ConnectNotifyMessage cnm;
3089
3090   n = find_neighbour (peer);
3091   if (n != NULL)
3092     {
3093       /* duplicate connect notification!? */
3094       GNUNET_break (0);
3095       return;
3096     }
3097   now = GNUNET_TIME_absolute_get ();
3098   n = GNUNET_malloc (sizeof (struct Neighbour));
3099   n->next = neighbours;
3100   neighbours = n;
3101   neighbour_count++;
3102   n->peer = *peer;
3103   n->last_latency = latency;
3104   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
3105   n->encrypt_key_created = now;
3106   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
3107   n->last_asw_update = now;
3108   n->last_arw_update = now;
3109   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3110   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3111   n->bpm_out_internal_limit = (uint32_t) - 1;
3112   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3113   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3114                                                 (uint32_t) - 1);
3115 #if DEBUG_CORE
3116   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3117               "Received connection from `%4s'.\n",
3118               GNUNET_i2s (&n->peer));
3119 #endif
3120   schedule_quota_update (n);
3121   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3122   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
3123   cnm.reserved = htonl (0);
3124   cnm.peer = *peer;
3125   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
3126   send_key (n);
3127 }
3128
3129
3130 /**
3131  * Free the given entry for the neighbour (it has
3132  * already been removed from the list at this point).
3133  *
3134  * @param n neighbour to free
3135  */
3136 static void
3137 free_neighbour (struct Neighbour *n)
3138 {
3139   struct MessageEntry *m;
3140
3141   if (n->pitr != NULL)
3142     {
3143       GNUNET_PEERINFO_iterate_cancel (n->pitr);
3144       n->pitr = NULL;
3145     }
3146   if (n->skm != NULL)
3147     {
3148       GNUNET_free (n->skm);
3149       n->skm = NULL;
3150     }
3151   while (NULL != (m = n->messages))
3152     {
3153       n->messages = m->next;
3154       GNUNET_free (m);
3155     }
3156   while (NULL != (m = n->encrypted_head))
3157     {
3158       n->encrypted_head = m->next;
3159       GNUNET_free (m);
3160     }
3161   if (NULL != n->th)
3162     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3163   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3164     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3165   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3166     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3167   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3168     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3169   GNUNET_free_non_null (n->public_key);
3170   GNUNET_free_non_null (n->pending_ping);
3171   GNUNET_free (n);
3172 }
3173
3174
3175 /**
3176  * Function called by transport telling us that a peer
3177  * disconnected.
3178  *
3179  * @param cls closure
3180  * @param peer the peer that disconnected
3181  */
3182 static void
3183 handle_transport_notify_disconnect (void *cls,
3184                                     const struct GNUNET_PeerIdentity *peer)
3185 {
3186   struct ConnectNotifyMessage cnm;
3187   struct Neighbour *n;
3188   struct Neighbour *p;
3189
3190 #if DEBUG_CORE
3191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3192               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3193 #endif
3194   p = NULL;
3195   n = neighbours;
3196   while ((n != NULL) &&
3197          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3198     {
3199       p = n;
3200       n = n->next;
3201     }
3202   if (n == NULL)
3203     {
3204       GNUNET_break (0);
3205       return;
3206     }
3207   if (p == NULL)
3208     neighbours = n->next;
3209   else
3210     p->next = n->next;
3211   GNUNET_assert (neighbour_count > 0);
3212   neighbour_count--;
3213   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3214   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3215   cnm.reserved = htonl (0);
3216   cnm.peer = *peer;
3217   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
3218   free_neighbour (n);
3219 }
3220
3221
3222 /**
3223  * Last task run during shutdown.  Disconnects us from
3224  * the transport.
3225  */
3226 static void
3227 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3228 {
3229   struct Neighbour *n;
3230   struct Client *c;
3231
3232 #if DEBUG_CORE
3233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3234               "Core service shutting down.\n");
3235 #endif
3236   GNUNET_assert (transport != NULL);
3237   GNUNET_TRANSPORT_disconnect (transport);
3238   transport = NULL;
3239   while (NULL != (n = neighbours))
3240     {
3241       neighbours = n->next;
3242       GNUNET_assert (neighbour_count > 0);
3243       neighbour_count--;
3244       free_neighbour (n);
3245     }
3246   while (NULL != (c = clients))
3247     handle_client_disconnect (NULL, c->client_handle);
3248   if (my_private_key != NULL)
3249     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3250 }
3251
3252
3253 /**
3254  * Initiate core service.
3255  *
3256  * @param cls closure
3257  * @param s scheduler to use
3258  * @param serv the initialized server
3259  * @param c configuration to use
3260  */
3261 static void
3262 run (void *cls,
3263      struct GNUNET_SCHEDULER_Handle *s,
3264      struct GNUNET_SERVER_Handle *serv,
3265      const struct GNUNET_CONFIGURATION_Handle *c)
3266 {
3267 #if 0
3268   unsigned long long qin;
3269   unsigned long long qout;
3270   unsigned long long tneigh;
3271 #endif
3272   char *keyfile;
3273
3274   sched = s;
3275   cfg = c;
3276   /* parse configuration */
3277   if (
3278        (GNUNET_OK !=
3279         GNUNET_CONFIGURATION_get_value_number (c,
3280                                                "CORE",
3281                                                "TOTAL_QUOTA_IN",
3282                                                &bandwidth_target_in)) ||
3283        (GNUNET_OK !=
3284         GNUNET_CONFIGURATION_get_value_number (c,
3285                                                "CORE",
3286                                                "TOTAL_QUOTA_OUT",
3287                                                &bandwidth_target_out)) ||
3288 #if 0
3289        (GNUNET_OK !=
3290         GNUNET_CONFIGURATION_get_value_number (c,
3291                                                "CORE",
3292                                                "YY",
3293                                                &qout)) ||
3294        (GNUNET_OK !=
3295         GNUNET_CONFIGURATION_get_value_number (c,
3296                                                "CORE",
3297                                                "ZZ_LIMIT", &tneigh)) ||
3298 #endif
3299        (GNUNET_OK !=
3300         GNUNET_CONFIGURATION_get_value_filename (c,
3301                                                  "GNUNETD",
3302                                                  "HOSTKEY", &keyfile)))
3303     {
3304       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3305                   _
3306                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3307       GNUNET_SCHEDULER_shutdown (s);
3308       return;
3309     }
3310   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3311   GNUNET_free (keyfile);
3312   if (my_private_key == NULL)
3313     {
3314       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3315                   _("Core service could not access hostkey.  Exiting.\n"));
3316       GNUNET_SCHEDULER_shutdown (s);
3317       return;
3318     }
3319   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3320   GNUNET_CRYPTO_hash (&my_public_key,
3321                       sizeof (my_public_key), &my_identity.hashPubKey);
3322   /* setup notification */
3323   server = serv;
3324   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3325   /* setup transport connection */
3326   transport = GNUNET_TRANSPORT_connect (sched,
3327                                         cfg,
3328                                         NULL,
3329                                         &handle_transport_receive,
3330                                         &handle_transport_notify_connect,
3331                                         &handle_transport_notify_disconnect);
3332   GNUNET_assert (NULL != transport);
3333   GNUNET_SCHEDULER_add_delayed (sched,
3334                                 GNUNET_TIME_UNIT_FOREVER_REL,
3335                                 &cleaning_task, NULL);
3336   /* process client requests */
3337   GNUNET_SERVER_add_handlers (server, handlers);
3338   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3339               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3340 }
3341
3342
3343
3344 /**
3345  * The main function for the transport service.
3346  *
3347  * @param argc number of arguments from the command line
3348  * @param argv command line arguments
3349  * @return 0 ok, 1 on error
3350  */
3351 int
3352 main (int argc, char *const *argv)
3353 {
3354   return (GNUNET_OK ==
3355           GNUNET_SERVICE_run (argc,
3356                               argv,
3357                               "core",
3358                               GNUNET_SERVICE_OPTION_NONE,
3359                               &run, NULL)) ? 0 : 1;
3360 }
3361
3362 /* end of gnunet-service-core.c */