removing dead code / unused variables
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * POST-TESTING:
27  * - topology management:
28  *   + bootstrapping (transport offer hello, plugins)
29  *   + internal neighbour selection
30  *
31  * Considerations for later:
32  * - check that hostkey used by transport (for HELLOs) is the
33  *   same as the hostkey that we are using!
34  * - add code to send PINGs if we are about to time-out otherwise
35  * - optimize lookup (many O(n) list traversals
36  *   could ideally be changed to O(1) hash map lookups)
37  */
38 #include "platform.h"
39 #include "gnunet_constants.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet_hello_lib.h"
42 #include "gnunet_peerinfo_service.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_transport_service.h"
46 #include "core.h"
47
48
49 /**
50  * Receive and send buffer windows grow over time.  For
51  * how long can 'unused' bandwidth accumulate before we
52  * need to cap it?  (specified in ms).
53  */
54 #define MAX_WINDOW_TIME (5 * 60 * 1000)
55
56 /**
57  * Minimum of bytes per minute (out) to assign to any connected peer.
58  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
59  * sense.
60  */
61 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
62
63 /**
64  * What is the smallest change (in number of bytes per minute)
65  * that we consider significant enough to bother triggering?
66  */
67 #define MIN_BPM_CHANGE 32
68
69 /**
70  * After how much time past the "official" expiration time do
71  * we discard messages?  Should not be zero since we may 
72  * intentionally defer transmission until close to the deadline
73  * and then may be slightly past the deadline due to inaccuracy
74  * in sleep and our own CPU consumption.
75  */
76 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What is the maximum delay for a SET_KEY message?
80  */
81 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
82
83 /**
84  * What how long do we wait for SET_KEY confirmation initially?
85  */
86 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
87
88 /**
89  * What is the maximum delay for a PING message?
90  */
91 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * What is the maximum delay for a PONG message?
95  */
96 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * How often do we recalculate bandwidth quotas?
100  */
101 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
102
103 /**
104  * What is the priority for a SET_KEY message?
105  */
106 #define SET_KEY_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PING message?
110  */
111 #define PING_PRIORITY 0xFFFFFF
112
113 /**
114  * What is the priority for a PONG message?
115  */
116 #define PONG_PRIORITY 0xFFFFFF
117
118 /**
119  * How many messages do we queue per peer at most?
120  */
121 #define MAX_PEER_QUEUE_SIZE 16
122
123 /**
124  * How many non-mandatory messages do we queue per client at most?
125  */
126 #define MAX_CLIENT_QUEUE_SIZE 32
127
128 /**
129  * What is the maximum age of a message for us to consider
130  * processing it?  Note that this looks at the timestamp used
131  * by the other peer, so clock skew between machines does
132  * come into play here.  So this should be picked high enough
133  * so that a little bit of clock skew does not prevent peers
134  * from connecting to us.
135  */
136 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
137
138 /**
139  * What is the maximum size for encrypted messages?  Note that this
140  * number imposes a clear limit on the maximum size of any message.
141  * Set to a value close to 64k but not so close that transports will
142  * have trouble with their headers.
143  */
144 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
145
146
147 /**
148  * State machine for our P2P encryption handshake.  Everyone starts in
149  * "DOWN", if we receive the other peer's key (other peer initiated)
150  * we start in state RECEIVED (since we will immediately send our
151  * own); otherwise we start in SENT.  If we get back a PONG from
152  * within either state, we move up to CONFIRMED (the PONG will always
153  * be sent back encrypted with the key we sent to the other peer).
154  */
155 enum PeerStateMachine
156 {
157   PEER_STATE_DOWN,
158   PEER_STATE_KEY_SENT,
159   PEER_STATE_KEY_RECEIVED,
160   PEER_STATE_KEY_CONFIRMED
161 };
162
163
164 /**
165  * Number of bytes (at the beginning) of "struct EncryptedMessage"
166  * that are NOT encrypted.
167  */
168 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
169
170
171 /**
172  * Encapsulation for encrypted messages exchanged between
173  * peers.  Followed by the actual encrypted data.
174  */
175 struct EncryptedMessage
176 {
177   /**
178    * Message type is either CORE_ENCRYPTED_MESSAGE.
179    */
180   struct GNUNET_MessageHeader header;
181
182   /**
183    * Always zero.
184    */
185   uint32_t reserved GNUNET_PACKED;
186
187   /**
188    * Hash of the plaintext, used to verify message integrity;
189    * ALSO used as the IV for the symmetric cipher!  Everything
190    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
191    * must be set to the offset of the next field.
192    */
193   GNUNET_HashCode plaintext_hash;
194
195   /**
196    * Sequence number, in network byte order.  This field
197    * must be the first encrypted/decrypted field and the
198    * first byte that is hashed for the plaintext hash.
199    */
200   uint32_t sequence_number GNUNET_PACKED;
201
202   /**
203    * Desired bandwidth (how much we should send to this
204    * peer / how much is the sender willing to receive),
205    * in bytes per minute.
206    */
207   uint32_t inbound_bpm_limit GNUNET_PACKED;
208
209   /**
210    * Timestamp.  Used to prevent reply of ancient messages
211    * (recent messages are caught with the sequence number).
212    */
213   struct GNUNET_TIME_AbsoluteNBO timestamp;
214
215 };
216
217 /**
218  * We're sending an (encrypted) PING to the other peer to check if he
219  * can decrypt.  The other peer should respond with a PONG with the
220  * same content, except this time encrypted with the receiver's key.
221  */
222 struct PingMessage
223 {
224   /**
225    * Message type is either CORE_PING or CORE_PONG.
226    */
227   struct GNUNET_MessageHeader header;
228
229   /**
230    * Random number chosen to make reply harder.
231    */
232   uint32_t challenge GNUNET_PACKED;
233
234   /**
235    * Intended target of the PING, used primarily to check
236    * that decryption actually worked.
237    */
238   struct GNUNET_PeerIdentity target;
239 };
240
241
242 /**
243  * Message transmitted to set (or update) a session key.
244  */
245 struct SetKeyMessage
246 {
247
248   /**
249    * Message type is either CORE_SET_KEY.
250    */
251   struct GNUNET_MessageHeader header;
252
253   /**
254    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
255    */
256   int32_t sender_status GNUNET_PACKED;
257
258   /**
259    * Purpose of the signature, will be
260    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
261    */
262   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
263
264   /**
265    * At what time was this key created?
266    */
267   struct GNUNET_TIME_AbsoluteNBO creation_time;
268
269   /**
270    * The encrypted session key.
271    */
272   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
273
274   /**
275    * Who is the intended recipient?
276    */
277   struct GNUNET_PeerIdentity target;
278
279   /**
280    * Signature of the stuff above (starting at purpose).
281    */
282   struct GNUNET_CRYPTO_RsaSignature signature;
283
284 };
285
286
287 /**
288  * Message waiting for transmission. This struct
289  * is followed by the actual content of the message.
290  */
291 struct MessageEntry
292 {
293
294   /**
295    * We keep messages in a linked list (for now).
296    */
297   struct MessageEntry *next;
298
299   /**
300    * By when are we supposed to transmit this message?
301    */
302   struct GNUNET_TIME_Absolute deadline;
303
304   /**
305    * How important is this message to us?
306    */
307   unsigned int priority;
308
309   /**
310    * How long is the message? (number of bytes following
311    * the "struct MessageEntry", but not including the
312    * size of "struct MessageEntry" itself!)
313    */
314   uint16_t size;
315
316   /**
317    * Was this message selected for transmission in the
318    * current round? GNUNET_YES or GNUNET_NO.
319    */
320   int16_t do_transmit;
321
322 };
323
324
325 struct Neighbour
326 {
327   /**
328    * We keep neighbours in a linked list (for now).
329    */
330   struct Neighbour *next;
331
332   /**
333    * Unencrypted messages destined for this peer.
334    */
335   struct MessageEntry *messages;
336
337   /**
338    * Head of the batched, encrypted message queue (already ordered,
339    * transmit starting with the head).
340    */
341   struct MessageEntry *encrypted_head;
342
343   /**
344    * Tail of the batched, encrypted message queue (already ordered,
345    * append new messages to tail)
346    */
347   struct MessageEntry *encrypted_tail;
348
349   /**
350    * Handle for pending requests for transmission to this peer
351    * with the transport service.  NULL if no request is pending.
352    */
353   struct GNUNET_TRANSPORT_TransmitHandle *th;
354
355   /**
356    * Public key of the neighbour, NULL if we don't have it yet.
357    */
358   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
359
360   /**
361    * We received a PING message before we got the "public_key"
362    * (or the SET_KEY).  We keep it here until we have a key
363    * to decrypt it.  NULL if no PING is pending.
364    */
365   struct PingMessage *pending_ping;
366
367   /**
368    * Identity of the neighbour.
369    */
370   struct GNUNET_PeerIdentity peer;
371
372   /**
373    * Key we use to encrypt our messages for the other peer
374    * (initialized by us when we do the handshake).
375    */
376   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
377
378   /**
379    * Key we use to decrypt messages from the other peer
380    * (given to us by the other peer during the handshake).
381    */
382   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
383
384   /**
385    * ID of task used for re-trying plaintext scheduling.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
388
389   /**
390    * ID of task used for re-trying SET_KEY and PING message.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
393
394   /**
395    * ID of task used for updating bandwidth quota for this neighbour.
396    */
397   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
398
399   /**
400    * At what time did we generate our encryption key?
401    */
402   struct GNUNET_TIME_Absolute encrypt_key_created;
403
404   /**
405    * At what time did the other peer generate the decryption key?
406    */
407   struct GNUNET_TIME_Absolute decrypt_key_created;
408
409   /**
410    * At what time did we initially establish (as in, complete session
411    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
412    */
413   struct GNUNET_TIME_Absolute time_established;
414
415   /**
416    * At what time did we last receive an encrypted message from the
417    * other peer?  Should be zero if status != KEY_CONFIRMED.
418    */
419   struct GNUNET_TIME_Absolute last_activity;
420
421   /**
422    * Last latency observed from this peer.
423    */
424   struct GNUNET_TIME_Relative last_latency;
425
426   /**
427    * At what frequency are we currently re-trying SET_KEY messages?
428    */
429   struct GNUNET_TIME_Relative set_key_retry_frequency;
430
431   /**
432    * Time of our last update to the "available_send_window".
433    */
434   struct GNUNET_TIME_Absolute last_asw_update;
435
436   /**
437    * Time of our last update to the "available_recv_window".
438    */
439   struct GNUNET_TIME_Absolute last_arw_update;
440
441   /**
442    * Number of bytes that we are eligible to transmit to this
443    * peer at this point.  Incremented every minute by max_out_bpm,
444    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
445    * bandwidth-hogs are sampled at a frequency of about 78s!);
446    * may get negative if we have VERY high priority content.
447    */
448   long long available_send_window; 
449
450   /**
451    * How much downstream capacity of this peer has been reserved for
452    * our traffic?  (Our clients can request that a certain amount of
453    * bandwidth is available for replies to them; this value is used to
454    * make sure that this reserved amount of bandwidth is actually
455    * available).
456    */
457   long long available_recv_window; 
458
459   /**
460    * How valueable were the messages of this peer recently?
461    */
462   unsigned long long current_preference;
463
464   /**
465    * Bit map indicating which of the 32 sequence numbers before the last
466    * were received (good for accepting out-of-order packets and
467    * estimating reliability of the connection)
468    */
469   unsigned int last_packets_bitmap;
470
471   /**
472    * Number of messages in the message queue for this peer.
473    */
474   unsigned int message_queue_size;
475
476   /**
477    * last sequence number received on this connection (highest)
478    */
479   uint32_t last_sequence_number_received;
480
481   /**
482    * last sequence number transmitted
483    */
484   uint32_t last_sequence_number_sent;
485
486   /**
487    * Available bandwidth in for this peer (current target).
488    */
489   uint32_t bpm_in;
490
491   /**
492    * Available bandwidth out for this peer (current target).
493    */
494   uint32_t bpm_out;
495
496   /**
497    * Internal bandwidth limit set for this peer (initially
498    * typically set to "-1").  "bpm_out" is MAX of
499    * "bpm_out_internal_limit" and "bpm_out_external_limit".
500    */
501   uint32_t bpm_out_internal_limit;
502
503   /**
504    * External bandwidth limit set for this peer by the
505    * peer that we are communicating with.  "bpm_out" is MAX of
506    * "bpm_out_internal_limit" and "bpm_out_external_limit".
507    */
508   uint32_t bpm_out_external_limit;
509
510   /**
511    * What was our PING challenge number (for this peer)?
512    */
513   uint32_t ping_challenge;
514
515   /**
516    * What is our connection status?
517    */
518   enum PeerStateMachine status;
519
520 };
521
522
523 /**
524  * Events are messages for clients.  The struct
525  * itself is followed by the actual message.
526  */
527 struct Event
528 {
529   /**
530    * This is a linked list.
531    */
532   struct Event *next;
533
534   /**
535    * Size of the message.
536    */
537   size_t size;
538
539   /**
540    * Could this event be dropped if this queue
541    * is getting too large? (NOT YET USED!)
542    */
543   int can_drop;
544
545 };
546
547
548 /**
549  * Data structure for each client connected to the core service.
550  */
551 struct Client
552 {
553   /**
554    * Clients are kept in a linked list.
555    */
556   struct Client *next;
557
558   /**
559    * Handle for the client with the server API.
560    */
561   struct GNUNET_SERVER_Client *client_handle;
562
563   /**
564    * Linked list of messages we still need to deliver to
565    * this client.
566    */
567   struct Event *event_head;
568
569   /**
570    * Tail of the linked list of events.
571    */
572   struct Event *event_tail;
573
574   /**
575    * Current transmit handle, NULL if no transmission request
576    * is pending.
577    */
578   struct GNUNET_NETWORK_TransmitHandle *th;
579
580   /**
581    * Array of the types of messages this peer cares
582    * about (with "tcnt" entries).  Allocated as part
583    * of this client struct, do not free!
584    */
585   uint16_t *types;
586
587   /**
588    * Options for messages this client cares about,
589    * see GNUNET_CORE_OPTION_ values.
590    */
591   uint32_t options;
592
593   /**
594    * Number of types of incoming messages this client
595    * specifically cares about.  Size of the "types" array.
596    */
597   unsigned int tcnt;
598
599 };
600
601
602 /**
603  * Our public key.
604  */
605 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
606
607 /**
608  * Our identity.
609  */
610 static struct GNUNET_PeerIdentity my_identity;
611
612 /**
613  * Our private key.
614  */
615 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
616
617 /**
618  * Our scheduler.
619  */
620 struct GNUNET_SCHEDULER_Handle *sched;
621
622 /**
623  * Our configuration.
624  */
625 struct GNUNET_CONFIGURATION_Handle *cfg;
626
627 /**
628  * Our server.
629  */
630 static struct GNUNET_SERVER_Handle *server;
631
632 /**
633  * Transport service.
634  */
635 static struct GNUNET_TRANSPORT_Handle *transport;
636
637 /**
638  * Linked list of our clients.
639  */
640 static struct Client *clients;
641
642 /**
643  * We keep neighbours in a linked list (for now).
644  */
645 static struct Neighbour *neighbours;
646
647 /**
648  * Sum of all preferences among all neighbours.
649  */
650 static unsigned long long preference_sum;
651
652 /**
653  * Total number of neighbours we have.
654  */
655 static unsigned int neighbour_count;
656
657 /**
658  * How much inbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_in;
661
662 /**
663  * How much outbound bandwidth are we supposed to be using?
664  */
665 static unsigned long long bandwidth_target_out;
666
667
668
669 /**
670  * A preference value for a neighbour was update.  Update
671  * the preference sum accordingly.
672  *
673  * @param inc how much was a preference value increased?
674  */
675 static void
676 update_preference_sum (unsigned long long inc)
677 {
678   struct Neighbour *n;
679   unsigned long long os;
680
681   os = preference_sum;
682   preference_sum += inc;
683   if (preference_sum >= os)
684     return; /* done! */
685   /* overflow! compensate by cutting all values in half! */
686   preference_sum = 0;
687   n = neighbours;
688   while (n != NULL)
689     {
690       n->current_preference /= 2;
691       preference_sum += n->current_preference;
692       n = n->next;
693     }    
694 }
695
696
697 /**
698  * Recalculate the number of bytes we expect to
699  * receive or transmit in a given window.
700  *
701  * @param force force an update now (even if not much time has passed)
702  * @param window pointer to the byte counter (updated)
703  * @param ts pointer to the timestamp (updated)
704  * @param bpm number of bytes per minute that should
705  *        be added to the window.
706  */
707 static void
708 update_window (int force,
709                long long *window,
710                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
711 {
712   struct GNUNET_TIME_Relative since;
713
714   since = GNUNET_TIME_absolute_get_duration (*ts);
715   if ( (force == GNUNET_NO) &&
716        (since.value < 60 * 1000) )
717     return;                     /* not even a minute has passed */
718   *ts = GNUNET_TIME_absolute_get ();
719   *window += (bpm * since.value) / 60 / 1000;
720   if (*window > MAX_WINDOW_TIME * bpm)
721     *window = MAX_WINDOW_TIME * bpm;
722 }
723
724
725 /**
726  * Find the entry for the given neighbour.
727  *
728  * @param peer identity of the neighbour
729  * @return NULL if we are not connected, otherwise the
730  *         neighbour's entry.
731  */
732 static struct Neighbour *
733 find_neighbour (const struct GNUNET_PeerIdentity *peer)
734 {
735   struct Neighbour *ret;
736
737   ret = neighbours;
738   while ((ret != NULL) &&
739          (0 != memcmp (&ret->peer,
740                        peer, sizeof (struct GNUNET_PeerIdentity))))
741     ret = ret->next;
742   return ret;
743 }
744
745
746 /**
747  * Find the entry for the given client.
748  *
749  * @param client handle for the client
750  * @return NULL if we are not connected, otherwise the
751  *         client's struct.
752  */
753 static struct Client *
754 find_client (const struct GNUNET_SERVER_Client *client)
755 {
756   struct Client *ret;
757
758   ret = clients;
759   while ((ret != NULL) && (client != ret->client_handle))
760     ret = ret->next;
761   return ret;
762 }
763
764
765 /**
766  * If necessary, initiate a request with the server to
767  * transmit messages from the queue of the given client.
768  * @param client who to transfer messages to
769  */
770 static void request_transmit (struct Client *client);
771
772
773 /**
774  * Client is ready to receive data, provide it.
775  *
776  * @param cls closure
777  * @param size number of bytes available in buf
778  * @param buf where the callee should write the message
779  * @return number of bytes written to buf
780  */
781 static size_t
782 do_client_transmit (void *cls, size_t size, void *buf)
783 {
784   struct Client *client = cls;
785   struct Event *e;
786   char *tgt;
787   size_t ret;
788
789   client->th = NULL;
790 #if DEBUG_CORE_CLIENT
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Client ready to receive %u bytes.\n", size);
793 #endif
794   if (buf == NULL)
795     {
796 #if DEBUG_CORE
797       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
798                   "Failed to transmit data to client (disconnect)?\n");
799 #endif
800       return 0;                 /* we'll surely get a disconnect soon... */
801     }
802   tgt = buf;
803   ret = 0;
804   while ((NULL != (e = client->event_head)) && (e->size <= size))
805     {
806       memcpy (&tgt[ret], &e[1], e->size);
807       size -= e->size;
808       ret += e->size;
809       client->event_head = e->next;
810       GNUNET_free (e);
811     }
812   GNUNET_assert (ret > 0);
813   if (client->event_head == NULL)
814     client->event_tail = NULL;
815 #if DEBUG_CORE_CLIENT
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "Transmitting %u bytes to client\n", ret);
818 #endif
819   request_transmit (client);
820   return ret;
821 }
822
823
824 /**
825  * If necessary, initiate a request with the server to
826  * transmit messages from the queue of the given client.
827  * @param client who to transfer messages to
828  */
829 static void
830 request_transmit (struct Client *client)
831 {
832
833   if (NULL != client->th)
834     return;                     /* already pending */
835   if (NULL == client->event_head)
836     return;                     /* no more events pending */
837 #if DEBUG_CORE_CLIENT
838   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
839               "Asking server to transmit %u bytes to client\n",
840               client->event_head->size);
841 #endif
842   client->th
843     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
844                                            client->event_head->size,
845                                            GNUNET_TIME_UNIT_FOREVER_REL,
846                                            &do_client_transmit, client);
847 }
848
849
850 /**
851  * Send a message to one of our clients.
852  * @param client target for the message
853  * @param msg message to transmit
854  * @param can_drop could this message be dropped if the
855  *        client's queue is getting too large?
856  */
857 static void
858 send_to_client (struct Client *client,
859                 const struct GNUNET_MessageHeader *msg, int can_drop)
860 {
861   struct Event *e;
862   unsigned int queue_size;
863   uint16_t msize;
864
865 #if DEBUG_CORE_CLIENT
866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867               "Preparing to send message of type %u to client.\n",
868               ntohs (msg->type));
869 #endif
870   queue_size = 0;
871   e = client->event_head;
872   while (e != NULL)
873     {
874       queue_size++;
875       e = e->next;
876     }
877   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
878        (can_drop == GNUNET_YES) )
879     {
880 #if DEBUG_CORE
881       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
882                   "Too many messages in queue for the client, dropping the new message.\n");
883 #endif
884       return;
885     }
886
887   msize = ntohs (msg->size);
888   e = GNUNET_malloc (sizeof (struct Event) + msize);
889   /* append */
890   if (client->event_tail != NULL)
891     client->event_tail->next = e;
892   else
893     client->event_head = e;
894   client->event_tail = e;
895   e->can_drop = can_drop;
896   e->size = msize;
897   memcpy (&e[1], msg, msize);
898   request_transmit (client);
899 }
900
901
902 /**
903  * Send a message to all of our current clients.
904  */
905 static void
906 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
907 {
908   struct Client *c;
909
910   c = clients;
911   while (c != NULL)
912     {
913       send_to_client (c, msg, can_drop);
914       c = c->next;
915     }
916 }
917
918
919 /**
920  * Handle CORE_INIT request.
921  */
922 static void
923 handle_client_init (void *cls,
924                     struct GNUNET_SERVER_Client *client,
925                     const struct GNUNET_MessageHeader *message)
926 {
927   const struct InitMessage *im;
928   struct InitReplyMessage irm;
929   struct Client *c;
930   uint16_t msize;
931   const uint16_t *types;
932   struct Neighbour *n;
933   struct ConnectNotifyMessage cnm;
934
935 #if DEBUG_CORE_CLIENT
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "Client connecting to core service with `%s' message\n",
938               "INIT");
939 #endif
940   /* check that we don't have an entry already */
941   c = clients;
942   while (c != NULL)
943     {
944       if (client == c->client_handle)
945         {
946           GNUNET_break (0);
947           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
948           return;
949         }
950       c = c->next;
951     }
952   msize = ntohs (message->size);
953   if (msize < sizeof (struct InitMessage))
954     {
955       GNUNET_break (0);
956       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
957       return;
958     }
959   im = (const struct InitMessage *) message;
960   types = (const uint16_t *) &im[1];
961   msize -= sizeof (struct InitMessage);
962   c = GNUNET_malloc (sizeof (struct Client) + msize);
963   c->client_handle = client;
964   c->next = clients;
965   clients = c;
966   memcpy (&c[1], types, msize);
967   c->types = (uint16_t *) & c[1];
968   c->options = ntohl (im->options);
969   c->tcnt = msize / sizeof (uint16_t);
970   /* send init reply message */
971   irm.header.size = htons (sizeof (struct InitReplyMessage));
972   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
973   irm.reserved = htonl (0);
974   memcpy (&irm.publicKey,
975           &my_public_key,
976           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
977 #if DEBUG_CORE_CLIENT
978   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979               "Sending `%s' message to client.\n", "INIT_REPLY");
980 #endif
981   send_to_client (c, &irm.header, GNUNET_NO);
982   /* notify new client about existing neighbours */
983   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
984   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
985   n = neighbours;
986   while (n != NULL)
987     {
988 #if DEBUG_CORE_CLIENT
989       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
991 #endif
992       cnm.reserved = htonl (0);
993       cnm.peer = n->peer;
994       send_to_client (c, &cnm.header, GNUNET_NO);
995       n = n->next;
996     }
997   GNUNET_SERVER_receive_done (client, GNUNET_OK);
998 }
999
1000
1001 /**
1002  * A client disconnected, clean up.
1003  *
1004  * @param cls closure
1005  * @param client identification of the client
1006  */
1007 static void
1008 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1009 {
1010   struct Client *pos;
1011   struct Client *prev;
1012   struct Event *e;
1013
1014 #if DEBUG_CORE_CLIENT
1015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016               "Client has disconnected from core service.\n");
1017 #endif
1018   prev = NULL;
1019   pos = clients;
1020   while (pos != NULL)
1021     {
1022       if (client == pos->client_handle)
1023         {
1024           if (prev == NULL)
1025             clients = pos->next;
1026           else
1027             prev->next = pos->next;
1028           if (pos->th != NULL)
1029             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
1030           while (NULL != (e = pos->event_head))
1031             {
1032               pos->event_head = e->next;
1033               GNUNET_free (e);
1034             }
1035           GNUNET_free (pos);
1036           return;
1037         }
1038       prev = pos;
1039       pos = pos->next;
1040     }
1041   /* client never sent INIT */
1042 }
1043
1044
1045 /**
1046  * Handle REQUEST_CONFIGURE request.
1047  */
1048 static void
1049 handle_client_request_configure (void *cls,
1050                                  struct GNUNET_SERVER_Client *client,
1051                                  const struct GNUNET_MessageHeader *message)
1052 {
1053   const struct RequestConfigureMessage *rcm;
1054   struct Neighbour *n;
1055   struct ConfigurationInfoMessage cim;
1056   struct Client *c;
1057   int reserv;
1058   unsigned long long old_preference;
1059
1060 #if DEBUG_CORE_CLIENT
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "Core service receives `%s' request.\n", "CONFIGURE");
1063 #endif
1064   rcm = (const struct RequestConfigureMessage *) message;
1065   n = find_neighbour (&rcm->peer);
1066   memset (&cim, 0, sizeof (cim));
1067   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1068     {
1069       update_window (GNUNET_YES,
1070                      &n->available_send_window,
1071                      &n->last_asw_update,
1072                      n->bpm_out);
1073       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1074       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1075                                n->bpm_out_external_limit);
1076       reserv = ntohl (rcm->reserve_inbound);
1077       if (reserv < 0)
1078         {
1079           n->available_recv_window += reserv;
1080         }
1081       else if (reserv > 0)
1082         {
1083           update_window (GNUNET_NO,
1084                          &n->available_recv_window,
1085                          &n->last_arw_update, n->bpm_in);
1086           if (n->available_recv_window < reserv)
1087             reserv = n->available_recv_window;
1088           n->available_recv_window -= reserv;
1089         }
1090       old_preference = n->current_preference;
1091       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1092       if (old_preference > n->current_preference) 
1093         {
1094           /* overflow; cap at maximum value */
1095           n->current_preference = (unsigned long long) -1;
1096         }
1097       update_preference_sum (n->current_preference - old_preference);
1098       cim.reserved_amount = htonl (reserv);
1099       cim.bpm_in = htonl (n->bpm_in);
1100       cim.bpm_out = htonl (n->bpm_out);
1101       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1102       cim.preference = n->current_preference;
1103     }
1104   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1105   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1106   cim.peer = rcm->peer;
1107   c = find_client (client);
1108   if (c == NULL)
1109     {
1110       GNUNET_break (0);
1111       return;
1112     }
1113 #if DEBUG_CORE_CLIENT
1114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1116 #endif
1117   send_to_client (c, &cim.header, GNUNET_NO);
1118 }
1119
1120
1121 /**
1122  * Check if we have encrypted messages for the specified neighbour
1123  * pending, and if so, check with the transport about sending them
1124  * out.
1125  *
1126  * @param n neighbour to check.
1127  */
1128 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1129
1130
1131 /**
1132  * Function called when the transport service is ready to
1133  * receive an encrypted message for the respective peer
1134  *
1135  * @param cls neighbour to use message from
1136  * @param size number of bytes we can transmit
1137  * @param buf where to copy the message
1138  * @return number of bytes transmitted
1139  */
1140 static size_t
1141 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1142 {
1143   struct Neighbour *n = cls;
1144   struct MessageEntry *m;
1145   size_t ret;
1146   char *cbuf;
1147
1148   n->th = NULL;
1149   GNUNET_assert (NULL != (m = n->encrypted_head));
1150   n->encrypted_head = m->next;
1151   if (m->next == NULL)
1152     n->encrypted_tail = NULL;
1153   ret = 0;
1154   cbuf = buf;
1155   if (buf != NULL)
1156     {
1157       GNUNET_assert (size >= m->size);
1158       memcpy (cbuf, &m[1], m->size);
1159       ret = m->size;
1160       n->available_send_window -= m->size;
1161       process_encrypted_neighbour_queue (n);
1162 #if DEBUG_CORE
1163       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1165                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1166                   ret, GNUNET_i2s (&n->peer));
1167 #endif
1168     }
1169   else
1170     {
1171       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1172                   "Transmission for message of type %u and size %u failed\n",
1173                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1174                   m->size);
1175     }
1176   GNUNET_free (m);
1177   return ret;
1178 }
1179
1180
1181 /**
1182  * Check if we have plaintext messages for the specified neighbour
1183  * pending, and if so, consider batching and encrypting them (and
1184  * then trigger processing of the encrypted queue if needed).
1185  *
1186  * @param n neighbour to check.
1187  */
1188 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1189
1190
1191 /**
1192  * Check if we have encrypted messages for the specified neighbour
1193  * pending, and if so, check with the transport about sending them
1194  * out.
1195  *
1196  * @param n neighbour to check.
1197  */
1198 static void
1199 process_encrypted_neighbour_queue (struct Neighbour *n)
1200 {
1201   struct MessageEntry *m;
1202  
1203   if (n->th != NULL)
1204     return;  /* request already pending */
1205   if (n->encrypted_head == NULL)
1206     {
1207       /* encrypted queue empty, try plaintext instead */
1208       process_plaintext_neighbour_queue (n);
1209       return;
1210     }
1211 #if DEBUG_CORE
1212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1214               n->encrypted_head->size,
1215               GNUNET_i2s (&n->peer),
1216               GNUNET_TIME_absolute_get_remaining (n->
1217                                                   encrypted_head->deadline).
1218               value);
1219 #endif
1220   n->th =
1221     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1222                                             n->encrypted_head->size,
1223                                             n->encrypted_head->priority,
1224                                             GNUNET_TIME_absolute_get_remaining
1225                                             (n->encrypted_head->deadline),
1226                                             &notify_encrypted_transmit_ready,
1227                                             n);
1228   if (n->th == NULL)
1229     {
1230       /* message request too large (oops) */
1231       GNUNET_break (0);
1232       /* discard encrypted message */
1233       GNUNET_assert (NULL != (m = n->encrypted_head));
1234       n->encrypted_head = m->next;
1235       if (m->next == NULL)
1236         n->encrypted_tail = NULL;
1237       GNUNET_free (m);
1238       process_encrypted_neighbour_queue (n);
1239     }
1240 }
1241
1242
1243 /**
1244  * Decrypt size bytes from in and write the result to out.  Use the
1245  * key for inbound traffic of the given neighbour.  This function does
1246  * NOT do any integrity-checks on the result.
1247  *
1248  * @param n neighbour we are receiving from
1249  * @param iv initialization vector to use
1250  * @param in ciphertext
1251  * @param out plaintext
1252  * @param size size of in/out
1253  * @return GNUNET_OK on success
1254  */
1255 static int
1256 do_decrypt (struct Neighbour *n,
1257             const GNUNET_HashCode * iv,
1258             const void *in, void *out, size_t size)
1259 {
1260   if (size != (uint16_t) size)
1261     {
1262       GNUNET_break (0);
1263       return GNUNET_NO;
1264     }
1265   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1266       (n->status != PEER_STATE_KEY_CONFIRMED))
1267     {
1268       GNUNET_break_op (0);
1269       return GNUNET_SYSERR;
1270     }
1271   if (size !=
1272       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1273                                  in,
1274                                  (uint16_t) size,
1275                                  (const struct
1276                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1277                                  out))
1278     {
1279       GNUNET_break (0);
1280       return GNUNET_SYSERR;
1281     }
1282 #if DEBUG_CORE
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Decrypted %u bytes from `%4s' using key %u\n",
1285               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1286 #endif
1287   return GNUNET_OK;
1288 }
1289
1290
1291 /**
1292  * Encrypt size bytes from in and write the result to out.  Use the
1293  * key for outbound traffic of the given neighbour.
1294  *
1295  * @param n neighbour we are sending to
1296  * @param iv initialization vector to use
1297  * @param in ciphertext
1298  * @param out plaintext
1299  * @param size size of in/out
1300  * @return GNUNET_OK on success
1301  */
1302 static int
1303 do_encrypt (struct Neighbour *n,
1304             const GNUNET_HashCode * iv,
1305             const void *in, void *out, size_t size)
1306 {
1307   if (size != (uint16_t) size)
1308     {
1309       GNUNET_break (0);
1310       return GNUNET_NO;
1311     }
1312   GNUNET_assert (size ==
1313                  GNUNET_CRYPTO_aes_encrypt (in,
1314                                             (uint16_t) size,
1315                                             &n->encrypt_key,
1316                                             (const struct
1317                                              GNUNET_CRYPTO_AesInitializationVector
1318                                              *) iv, out));
1319 #if DEBUG_CORE
1320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321               "Encrypted %u bytes for `%4s' using key %u\n", size,
1322               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1323 #endif
1324   return GNUNET_OK;
1325 }
1326
1327
1328 /**
1329  * Select messages for transmission.  This heuristic uses a combination
1330  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1331  * and priority-based discard (in case no feasible schedule exist) and
1332  * speculative optimization (defer any kind of transmission until
1333  * we either create a batch of significant size, 25% of max, or until
1334  * we are close to a deadline).  Furthermore, when scheduling the
1335  * heuristic also packs as many messages into the batch as possible,
1336  * starting with those with the earliest deadline.  Yes, this is fun.
1337  *
1338  * @param n neighbour to select messages from
1339  * @param size number of bytes to select for transmission
1340  * @param retry_time set to the time when we should try again
1341  *        (only valid if this function returns zero)
1342  * @return number of bytes selected, or 0 if we decided to
1343  *         defer scheduling overall; in that case, retry_time is set.
1344  */
1345 static size_t
1346 select_messages (struct Neighbour *n,
1347                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1348 {
1349   struct MessageEntry *pos;
1350   struct MessageEntry *min;
1351   struct MessageEntry *last;
1352   unsigned int min_prio;
1353   struct GNUNET_TIME_Absolute t;
1354   struct GNUNET_TIME_Absolute now;
1355   uint64_t delta;
1356   uint64_t avail;
1357   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1358   size_t off;
1359   int discard_low_prio;
1360
1361   GNUNET_assert (NULL != n->messages);
1362   now = GNUNET_TIME_absolute_get ();
1363   /* last entry in linked list of messages processed */
1364   last = NULL;
1365   /* should we remove the entry with the lowest
1366      priority from consideration for scheduling at the
1367      end of the loop? */
1368   discard_low_prio = GNUNET_YES;
1369   while (GNUNET_YES == discard_low_prio)
1370     {
1371       min = NULL;
1372       min_prio = -1;
1373       discard_low_prio = GNUNET_NO;
1374       /* calculate number of bytes available for transmission at time "t" */
1375       update_window (GNUNET_NO,
1376                      &n->available_send_window,
1377                      &n->last_asw_update,
1378                      n->bpm_out);
1379       avail = n->available_send_window;
1380       t = n->last_asw_update;
1381       /* how many bytes have we (hypothetically) scheduled so far */
1382       off = 0;
1383       /* maximum time we can wait before transmitting anything
1384          and still make all of our deadlines */
1385       slack = -1;
1386
1387       pos = n->messages;
1388       /* note that we use "*2" here because we want to look
1389          a bit further into the future; much more makes no
1390          sense since new message might be scheduled in the
1391          meantime... */
1392       while ((pos != NULL) && (off < size * 2))
1393         {
1394           if (pos->do_transmit == GNUNET_YES)
1395             {
1396               /* already removed from consideration */
1397               pos = pos->next;
1398               continue;
1399             }
1400           if (discard_low_prio == GNUNET_NO)
1401             {
1402               delta = pos->deadline.value;
1403               if (delta < t.value)
1404                 delta = 0;
1405               else
1406                 delta = t.value - delta;
1407               avail += delta * n->bpm_out / 1000 / 60;
1408               if (avail < pos->size)
1409                 {
1410                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1411                 }
1412               else
1413                 {
1414                   avail -= pos->size;
1415                   /* update slack, considering both its absolute deadline
1416                      and relative deadlines caused by other messages
1417                      with their respective load */
1418                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1419                   if (pos->deadline.value < now.value)
1420                     slack = 0;
1421                   else
1422                     slack =
1423                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1424                 }
1425             }
1426           off += pos->size;
1427           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1428           if (pos->priority <= min_prio)
1429             {
1430               /* update min for discard */
1431               min_prio = pos->priority;
1432               min = pos;
1433             }
1434           pos = pos->next;
1435         }
1436       if (discard_low_prio)
1437         {
1438           GNUNET_assert (min != NULL);
1439           /* remove lowest-priority entry from consideration */
1440           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1441         }
1442       last = pos;
1443     }
1444   /* guard against sending "tiny" messages with large headers without
1445      urgent deadlines */
1446   if ( (slack > 1000) && (size > 4 * off) )
1447     {
1448       /* less than 25% of message would be filled with
1449          deadlines still being met if we delay by one
1450          second or more; so just wait for more data */
1451       retry_time->value = slack / 2;
1452       /* reset do_transmit values for next time */
1453       while (pos != last)
1454         {
1455           pos->do_transmit = GNUNET_NO;
1456           pos = pos->next;
1457         }
1458       return 0;
1459     }
1460   /* select marked messages (up to size) for transmission */
1461   off = 0;
1462   pos = n->messages;
1463   while (pos != last)
1464     {
1465       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1466         {
1467           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1468           off += pos->size;
1469           size -= pos->size;
1470         }
1471       else
1472         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1473       pos = pos->next;
1474     }
1475 #if DEBUG_CORE
1476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1478               off, GNUNET_i2s (&n->peer));
1479 #endif
1480   return off;
1481 }
1482
1483
1484 /**
1485  * Batch multiple messages into a larger buffer.
1486  *
1487  * @param n neighbour to take messages from
1488  * @param buf target buffer
1489  * @param size size of buf
1490  * @param deadline set to transmission deadline for the result
1491  * @param retry_time set to the time when we should try again
1492  *        (only valid if this function returns zero)
1493  * @param priority set to the priority of the batch
1494  * @return number of bytes written to buf (can be zero)
1495  */
1496 static size_t
1497 batch_message (struct Neighbour *n,
1498                char *buf,
1499                size_t size,
1500                struct GNUNET_TIME_Absolute *deadline,
1501                struct GNUNET_TIME_Relative *retry_time,
1502                unsigned int *priority)
1503 {
1504   struct MessageEntry *pos;
1505   struct MessageEntry *prev;
1506   struct MessageEntry *next;
1507   size_t ret;
1508
1509   ret = 0;
1510   *priority = 0;
1511   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1512   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1513   if (0 == select_messages (n, size, retry_time))
1514     {
1515       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1516                   "No messages selected, will try again in %llu ms\n",
1517                   retry_time->value);
1518       return 0;
1519     }
1520   pos = n->messages;
1521   prev = NULL;
1522   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1523     {
1524       next = pos->next;
1525       if (GNUNET_YES == pos->do_transmit)
1526         {
1527           GNUNET_assert (pos->size <= size);
1528           memcpy (&buf[ret], &pos[1], pos->size);
1529           ret += pos->size;
1530           size -= pos->size;
1531           *priority += pos->priority;
1532           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1533           GNUNET_free (pos);
1534           if (prev == NULL)
1535             n->messages = next;
1536           else
1537             prev->next = next;
1538         }
1539       else
1540         {
1541           prev = pos;
1542         }
1543       pos = next;
1544     }
1545   return ret;
1546 }
1547
1548
1549 /**
1550  * Remove messages with deadlines that have long expired from
1551  * the queue.
1552  *
1553  * @param n neighbour to inspect
1554  */
1555 static void
1556 discard_expired_messages (struct Neighbour *n)
1557 {
1558   struct MessageEntry *prev;
1559   struct MessageEntry *next;
1560   struct MessageEntry *pos;
1561   struct GNUNET_TIME_Absolute now;
1562   struct GNUNET_TIME_Relative delta;
1563
1564   now = GNUNET_TIME_absolute_get ();
1565   prev = NULL;
1566   pos = n->messages;
1567   while (pos != NULL) 
1568     {
1569       next = pos->next;
1570       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1571       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1572         {
1573 #if DEBUG_CORE
1574           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1575                       "Message is %llu ms past due, discarding.\n",
1576                       delta.value);
1577 #endif
1578           if (prev == NULL)
1579             n->messages = next;
1580           else
1581             prev->next = next;
1582           GNUNET_free (pos);
1583         }
1584       else
1585         prev = pos;
1586       pos = next;
1587     }
1588 }
1589
1590
1591 /**
1592  * Signature of the main function of a task.
1593  *
1594  * @param cls closure
1595  * @param tc context information (why was this task triggered now)
1596  */
1597 static void
1598 retry_plaintext_processing (void *cls,
1599                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1600 {
1601   struct Neighbour *n = cls;
1602
1603   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1604   process_plaintext_neighbour_queue (n);
1605 }
1606
1607
1608 /**
1609  * Send our key (and encrypted PING) to the other peer.
1610  *
1611  * @param n the other peer
1612  */
1613 static void send_key (struct Neighbour *n);
1614
1615
1616 /**
1617  * Check if we have plaintext messages for the specified neighbour
1618  * pending, and if so, consider batching and encrypting them (and
1619  * then trigger processing of the encrypted queue if needed).
1620  *
1621  * @param n neighbour to check.
1622  */
1623 static void
1624 process_plaintext_neighbour_queue (struct Neighbour *n)
1625 {
1626   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1627   size_t used;
1628   size_t esize;
1629   struct EncryptedMessage *em;  /* encrypted message */
1630   struct EncryptedMessage *ph;  /* plaintext header */
1631   struct MessageEntry *me;
1632   unsigned int priority;
1633   struct GNUNET_TIME_Absolute deadline;
1634   struct GNUNET_TIME_Relative retry_time;
1635
1636   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1637     {
1638       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1639       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1640     }
1641   switch (n->status)
1642     {
1643     case PEER_STATE_DOWN:
1644       send_key (n);
1645 #if DEBUG_CORE
1646       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1647                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1648                   GNUNET_i2s(&n->peer));
1649 #endif
1650       return;
1651     case PEER_STATE_KEY_SENT:
1652       GNUNET_assert (n->retry_set_key_task !=
1653                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1654 #if DEBUG_CORE
1655       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1656                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1657                   GNUNET_i2s(&n->peer));
1658 #endif
1659       return;
1660     case PEER_STATE_KEY_RECEIVED:
1661       GNUNET_assert (n->retry_set_key_task !=
1662                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1663 #if DEBUG_CORE
1664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1666                   GNUNET_i2s(&n->peer));
1667 #endif
1668       return;
1669     case PEER_STATE_KEY_CONFIRMED:
1670       /* ready to continue */
1671       break;
1672     }
1673   discard_expired_messages (n);
1674   if (n->messages == NULL)
1675     {
1676 #if DEBUG_CORE
1677       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678                   "Plaintext message queue for `%4s' is empty.\n",
1679                   GNUNET_i2s(&n->peer));
1680 #endif
1681       return;                   /* no pending messages */
1682     }
1683   if (n->encrypted_head != NULL)
1684     {
1685 #if DEBUG_CORE
1686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1687                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1688                   GNUNET_i2s(&n->peer));
1689 #endif
1690       return;                   /* wait for messages already encrypted to be
1691                                    processed first! */
1692     }
1693   ph = (struct EncryptedMessage *) pbuf;
1694   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1695   priority = 0;
1696   used = sizeof (struct EncryptedMessage);
1697   used += batch_message (n,
1698                          &pbuf[used],
1699                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1700                          &deadline, &retry_time, &priority);
1701   if (used == sizeof (struct EncryptedMessage))
1702     {
1703 #if DEBUG_CORE
1704       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1705                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1706                   GNUNET_i2s(&n->peer));
1707 #endif
1708       /* no messages selected for sending, try again later... */
1709       n->retry_plaintext_task =
1710         GNUNET_SCHEDULER_add_delayed (sched,
1711                                       GNUNET_NO,
1712                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1713                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1714                                       retry_time,
1715                                       &retry_plaintext_processing, n);
1716       return;
1717     }
1718
1719   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1720   ph->inbound_bpm_limit = htonl (n->bpm_in);
1721   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1722
1723   /* setup encryption message header */
1724   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1725   me->deadline = deadline;
1726   me->priority = priority;
1727   me->size = used;
1728   em = (struct EncryptedMessage *) &me[1];
1729   em->header.size = htons (used);
1730   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1731   em->reserved = htonl (0);
1732   esize = used - ENCRYPTED_HEADER_SIZE;
1733   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1734   /* encrypt */
1735 #if DEBUG_CORE
1736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1738               esize,
1739               GNUNET_i2s(&n->peer));
1740 #endif
1741   GNUNET_assert (GNUNET_OK ==
1742                  do_encrypt (n,
1743                              &em->plaintext_hash,
1744                              &ph->sequence_number,
1745                              &em->sequence_number, esize));
1746   /* append to transmission list */
1747   if (n->encrypted_tail == NULL)
1748     n->encrypted_head = me;
1749   else
1750     n->encrypted_tail->next = me;
1751   n->encrypted_tail = me;
1752   process_encrypted_neighbour_queue (n);
1753 }
1754
1755
1756 /**
1757  * Handle CORE_SEND request.
1758  */
1759 static void
1760 handle_client_send (void *cls,
1761                     struct GNUNET_SERVER_Client *client,
1762                     const struct GNUNET_MessageHeader *message);
1763
1764
1765 /**
1766  * Function called to notify us that we either succeeded
1767  * or failed to connect (at the transport level) to another
1768  * peer.  We should either free the message we were asked
1769  * to transmit or re-try adding it to the queue.
1770  *
1771  * @param cls closure
1772  * @param size number of bytes available in buf
1773  * @param buf where the callee should write the message
1774  * @return number of bytes written to buf
1775  */
1776 static size_t
1777 send_connect_continuation (void *cls, size_t size, void *buf)
1778 {
1779   struct SendMessage *sm = cls;
1780
1781   if (buf == NULL)
1782     {
1783 #if DEBUG_CORE
1784       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1785                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1786                   GNUNET_i2s (&sm->peer));
1787 #endif
1788       GNUNET_free (sm);
1789       return 0;
1790     }
1791 #if DEBUG_CORE
1792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1793               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1794               GNUNET_i2s (&sm->peer));
1795 #endif
1796   handle_client_send (NULL, NULL, &sm->header);
1797   GNUNET_free (sm);
1798   return 0;
1799 }
1800
1801
1802 /**
1803  * Handle CORE_SEND request.
1804  */
1805 static void
1806 handle_client_send (void *cls,
1807                     struct GNUNET_SERVER_Client *client,
1808                     const struct GNUNET_MessageHeader *message)
1809 {
1810   const struct SendMessage *sm;
1811   struct SendMessage *smc;
1812   const struct GNUNET_MessageHeader *mh;
1813   struct Neighbour *n;
1814   struct MessageEntry *prev;
1815   struct MessageEntry *pos;
1816   struct MessageEntry *e; 
1817   struct MessageEntry *min_prio_entry;
1818   struct MessageEntry *min_prio_prev;
1819   unsigned int min_prio;
1820   unsigned int queue_size;
1821   uint16_t msize;
1822
1823   msize = ntohs (message->size);
1824   if (msize <
1825       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1826     {
1827       GNUNET_break (0);
1828       if (client != NULL)
1829         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1830       return;
1831     }
1832   sm = (const struct SendMessage *) message;
1833   msize -= sizeof (struct SendMessage);
1834   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1835   if (msize != ntohs (mh->size))
1836     {
1837       GNUNET_break (0);
1838       if (client != NULL)
1839         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1840       return;
1841     }
1842   n = find_neighbour (&sm->peer);
1843   if (n == NULL)
1844     {
1845 #if DEBUG_CORE
1846       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1848                   "SEND",
1849                   GNUNET_i2s (&sm->peer),
1850                   GNUNET_TIME_absolute_get_remaining
1851                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1852 #endif
1853       msize += sizeof (struct SendMessage);
1854       /* ask transport to connect to the peer */
1855       smc = GNUNET_malloc (msize);
1856       memcpy (smc, sm, msize);
1857       if (NULL ==
1858           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1859                                                   &sm->peer,
1860                                                   0, 0,
1861                                                   GNUNET_TIME_absolute_get_remaining
1862                                                   (GNUNET_TIME_absolute_ntoh
1863                                                    (sm->deadline)),
1864                                                   &send_connect_continuation,
1865                                                   smc))
1866         {
1867           /* transport has already a request pending for this peer! */
1868 #if DEBUG_CORE
1869           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1870                       "Dropped second message destined for `%4s' since connection is still down.\n",
1871                       GNUNET_i2s(&sm->peer));
1872 #endif
1873           GNUNET_free (smc);
1874         }
1875       if (client != NULL)
1876         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1877       return;
1878     }
1879 #if DEBUG_CORE
1880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1882               "SEND",
1883               msize, 
1884               GNUNET_i2s (&sm->peer));
1885 #endif
1886   /* bound queue size */
1887   discard_expired_messages (n);
1888   min_prio = (unsigned int) -1;
1889   queue_size = 0;
1890   prev = NULL;
1891   pos = n->messages;
1892   while (pos != NULL) 
1893     {
1894       if (pos->priority < min_prio)
1895         {
1896           min_prio_entry = pos;
1897           min_prio_prev = prev;
1898           min_prio = pos->priority;
1899         }
1900       queue_size++;
1901       prev = pos;
1902       pos = pos->next;
1903     }
1904   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1905     {
1906       /* queue full */
1907       if (ntohl(sm->priority) <= min_prio)
1908         {
1909           /* discard new entry */
1910 #if DEBUG_CORE
1911           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1912                       "Queue full, discarding new request\n");
1913 #endif
1914           if (client != NULL)
1915             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1916           return;
1917         }
1918       /* discard "min_prio_entry" */
1919 #if DEBUG_CORE
1920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1921                   "Queue full, discarding existing older request\n");
1922 #endif
1923       if (min_prio_prev == NULL)
1924         n->messages = min_prio_entry->next;
1925       else
1926         min_prio_prev->next = min_prio_entry->next;      
1927       GNUNET_free (min_prio_entry);     
1928     }
1929   
1930   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1931   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1932   e->priority = ntohl (sm->priority);
1933   e->size = msize;
1934   memcpy (&e[1], mh, msize);
1935
1936   /* insert, keep list sorted by deadline */
1937   prev = NULL;
1938   pos = n->messages;
1939   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1940     {
1941       prev = pos;
1942       pos = pos->next;
1943     }
1944   if (prev == NULL)
1945     n->messages = e;
1946   else
1947     prev->next = e;
1948   e->next = pos;
1949
1950   /* consider scheduling now */
1951   process_plaintext_neighbour_queue (n);
1952   if (client != NULL)
1953     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1954 }
1955
1956
1957 /**
1958  * List of handlers for the messages understood by this
1959  * service.
1960  */
1961 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1962   {&handle_client_init, NULL,
1963    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1964   {&handle_client_request_configure, NULL,
1965    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1966    sizeof (struct RequestConfigureMessage)},
1967   {&handle_client_send, NULL,
1968    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1969   {NULL, NULL, 0, 0}
1970 };
1971
1972
1973 /**
1974  * PEERINFO is giving us a HELLO for a peer.  Add the
1975  * public key to the neighbour's struct and retry
1976  * send_key.  Or, if we did not get a HELLO, just do
1977  * nothing.
1978  *
1979  * @param cls NULL
1980  * @param peer the peer for which this is the HELLO
1981  * @param hello HELLO message of that peer
1982  * @param trust amount of trust we currently have in that peer
1983  */
1984 static void
1985 process_hello_retry_send_key (void *cls,
1986                               const struct GNUNET_PeerIdentity *peer,
1987                               const struct GNUNET_HELLO_Message *hello,
1988                               uint32_t trust)
1989 {
1990   struct Neighbour *n;
1991
1992   if (peer == NULL)
1993     return;
1994   n = find_neighbour (peer);
1995   if (n == NULL)
1996     return;
1997   if (n->public_key != NULL)
1998     return;
1999 #if DEBUG_CORE
2000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001               "Received new `%s' message for `%4s', initiating key exchange.\n",
2002               "HELLO",
2003               GNUNET_i2s (peer));
2004 #endif
2005   n->public_key =
2006     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2007   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2008     {
2009       GNUNET_free (n->public_key);
2010       n->public_key = NULL;
2011       return;
2012     }
2013   send_key (n);
2014 }
2015
2016
2017 /**
2018  * Task that will retry "send_key" if our previous attempt failed
2019  * to yield a PONG.
2020  */
2021 static void
2022 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2023 {
2024   struct Neighbour *n = cls;
2025
2026   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2027   n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2028   n->set_key_retry_frequency =
2029     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2030   send_key (n);
2031 }
2032
2033
2034 /**
2035  * Send our key (and encrypted PING) to the other peer.
2036  *
2037  * @param n the other peer
2038  */
2039 static void
2040 send_key (struct Neighbour *n)
2041 {
2042   struct SetKeyMessage *sm;
2043   struct MessageEntry *me;
2044   struct PingMessage pp;
2045   struct PingMessage *pm;
2046
2047 #if DEBUG_CORE
2048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2049               "Asked to perform key exchange with `%4s'.\n",
2050               GNUNET_i2s (&n->peer));
2051 #endif
2052   if (n->public_key == NULL)
2053     {
2054       /* lookup n's public key, then try again */
2055 #if DEBUG_CORE
2056       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2057                   "Lacking public key for `%4s', trying to obtain one.\n",
2058                   GNUNET_i2s (&n->peer));
2059 #endif
2060       GNUNET_PEERINFO_for_all (cfg,
2061                                sched,
2062                                &n->peer,
2063                                0,
2064                                GNUNET_TIME_UNIT_MINUTES,
2065                                &process_hello_retry_send_key, NULL);
2066       return;
2067     }
2068   /* first, set key message */
2069   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2070                       sizeof (struct SetKeyMessage));
2071   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2072   me->priority = SET_KEY_PRIORITY;
2073   me->size = sizeof (struct SetKeyMessage);
2074   if (n->encrypted_head == NULL)
2075     n->encrypted_head = me;
2076   else
2077     n->encrypted_tail->next = me;
2078   n->encrypted_tail = me;
2079   sm = (struct SetKeyMessage *) &me[1];
2080   sm->header.size = htons (sizeof (struct SetKeyMessage));
2081   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2082   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2083                                         PEER_STATE_KEY_SENT : n->status));
2084   sm->purpose.size =
2085     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2086            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2087            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2088            sizeof (struct GNUNET_PeerIdentity));
2089   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2090   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2091   sm->target = n->peer;
2092   GNUNET_assert (GNUNET_OK ==
2093                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2094                                             sizeof (struct
2095                                                     GNUNET_CRYPTO_AesSessionKey),
2096                                             n->public_key,
2097                                             &sm->encrypted_key));
2098   GNUNET_assert (GNUNET_OK ==
2099                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2100                                          &sm->signature));
2101
2102   /* second, encrypted PING message */
2103   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2104                       sizeof (struct PingMessage));
2105   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2106   me->priority = PING_PRIORITY;
2107   me->size = sizeof (struct PingMessage);
2108   n->encrypted_tail->next = me;
2109   n->encrypted_tail = me;
2110   pm = (struct PingMessage *) &me[1];
2111   pm->header.size = htons (sizeof (struct PingMessage));
2112   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2113   pp.challenge = htonl (n->ping_challenge);
2114   pp.target = n->peer;
2115 #if DEBUG_CORE
2116   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2118               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2119   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2120               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2121               "PING",
2122               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2123 #endif
2124   do_encrypt (n,
2125               &n->peer.hashPubKey,
2126               &pp.challenge,
2127               &pm->challenge,
2128               sizeof (struct PingMessage) -
2129               sizeof (struct GNUNET_MessageHeader));
2130   /* update status */
2131   switch (n->status)
2132     {
2133     case PEER_STATE_DOWN:
2134       n->status = PEER_STATE_KEY_SENT;
2135       break;
2136     case PEER_STATE_KEY_SENT:
2137       break;
2138     case PEER_STATE_KEY_RECEIVED:
2139       break;
2140     case PEER_STATE_KEY_CONFIRMED:
2141       GNUNET_break (0);
2142       break;
2143     default:
2144       GNUNET_break (0);
2145       break;
2146     }
2147   /* trigger queue processing */
2148   process_encrypted_neighbour_queue (n);
2149   if (n->status != PEER_STATE_KEY_CONFIRMED)
2150     n->retry_set_key_task
2151       = GNUNET_SCHEDULER_add_delayed (sched,
2152                                       GNUNET_NO,
2153                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2154                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2155                                       n->set_key_retry_frequency,
2156                                       &set_key_retry_task, n);
2157 }
2158
2159
2160 /**
2161  * We received a SET_KEY message.  Validate and update
2162  * our key material and status.
2163  *
2164  * @param n the neighbour from which we received message m
2165  * @param m the set key message we received
2166  */
2167 static void
2168 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2169
2170
2171 /**
2172  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2173  * the neighbour's struct and retry handling the set_key message.  Or,
2174  * if we did not get a HELLO, just free the set key message.
2175  *
2176  * @param cls pointer to the set key message
2177  * @param peer the peer for which this is the HELLO
2178  * @param hello HELLO message of that peer
2179  * @param trust amount of trust we currently have in that peer
2180  */
2181 static void
2182 process_hello_retry_handle_set_key (void *cls,
2183                                     const struct GNUNET_PeerIdentity *peer,
2184                                     const struct GNUNET_HELLO_Message *hello,
2185                                     uint32_t trust)
2186 {
2187   struct SetKeyMessage *sm = cls;
2188   struct Neighbour *n;
2189
2190   if (peer == NULL)
2191     {
2192       GNUNET_free (sm);
2193       return;
2194     }
2195   n = find_neighbour (peer);
2196   if (n == NULL)
2197     {
2198       GNUNET_break (0);
2199       return;
2200     }
2201   if (n->public_key != NULL)
2202     return;                     /* multiple HELLOs match!? */
2203   n->public_key =
2204     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2205   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2206     {
2207       GNUNET_break_op (0);
2208       GNUNET_free (n->public_key);
2209       n->public_key = NULL;
2210       return;
2211     }
2212 #if DEBUG_CORE
2213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2214               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2215               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2216 #endif
2217   handle_set_key (n, sm);
2218 }
2219
2220
2221 /**
2222  * We received a PING message.  Validate and transmit
2223  * PONG.
2224  *
2225  * @param n sender of the PING
2226  * @param m the encrypted PING message itself
2227  */
2228 static void
2229 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2230 {
2231   struct PingMessage t;
2232   struct PingMessage *tp;
2233   struct MessageEntry *me;
2234
2235 #if DEBUG_CORE
2236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237               "Core service receives `%s' request from `%4s'.\n",
2238               "PING", GNUNET_i2s (&n->peer));
2239 #endif
2240   if (GNUNET_OK !=
2241       do_decrypt (n,
2242                   &my_identity.hashPubKey,
2243                   &m->challenge,
2244                   &t.challenge,
2245                   sizeof (struct PingMessage) -
2246                   sizeof (struct GNUNET_MessageHeader)))
2247     return;
2248 #if DEBUG_CORE
2249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2251               "PING",
2252               GNUNET_i2s (&t.target),
2253               ntohl (t.challenge), n->decrypt_key.crc32);
2254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2255               "Target of `%s' request is `%4s'.\n",
2256               "PING", GNUNET_i2s (&t.target));
2257 #endif
2258   if (0 != memcmp (&t.target,
2259                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2260     {
2261       GNUNET_break_op (0);
2262       return;
2263     }
2264   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2265                       sizeof (struct PingMessage));
2266   if (n->encrypted_tail != NULL)
2267     n->encrypted_tail->next = me;
2268   else
2269     {
2270       n->encrypted_tail = me;
2271       n->encrypted_head = me;
2272     }
2273   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2274   me->priority = PONG_PRIORITY;
2275   me->size = sizeof (struct PingMessage);
2276   tp = (struct PingMessage *) &me[1];
2277   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2278   tp->header.size = htons (sizeof (struct PingMessage));
2279   do_encrypt (n,
2280               &my_identity.hashPubKey,
2281               &t.challenge,
2282               &tp->challenge,
2283               sizeof (struct PingMessage) -
2284               sizeof (struct GNUNET_MessageHeader));
2285 #if DEBUG_CORE
2286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2288               ntohl (t.challenge), n->encrypt_key.crc32);
2289 #endif
2290   /* trigger queue processing */
2291   process_encrypted_neighbour_queue (n);
2292 }
2293
2294
2295 /**
2296  * We received a SET_KEY message.  Validate and update
2297  * our key material and status.
2298  *
2299  * @param n the neighbour from which we received message m
2300  * @param m the set key message we received
2301  */
2302 static void
2303 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2304 {
2305   struct SetKeyMessage *m_cpy;
2306   struct GNUNET_TIME_Absolute t;
2307   struct GNUNET_CRYPTO_AesSessionKey k;
2308   struct PingMessage *ping;
2309   enum PeerStateMachine sender_status;
2310
2311 #if DEBUG_CORE
2312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2313               "Core service receives `%s' request from `%4s'.\n",
2314               "SET_KEY", GNUNET_i2s (&n->peer));
2315 #endif
2316   if (n->public_key == NULL)
2317     {
2318 #if DEBUG_CORE
2319       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2320                   "Lacking public key for peer, trying to obtain one.\n");
2321 #endif
2322       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2323       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2324       /* lookup n's public key, then try again */
2325       GNUNET_PEERINFO_for_all (cfg,
2326                                sched,
2327                                &n->peer,
2328                                0,
2329                                GNUNET_TIME_UNIT_MINUTES,
2330                                &process_hello_retry_handle_set_key, m_cpy);
2331       return;
2332     }
2333   if ((ntohl (m->purpose.size) !=
2334        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2335        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2336        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2337        sizeof (struct GNUNET_PeerIdentity)) ||
2338       (GNUNET_OK !=
2339        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2340                                  &m->purpose, &m->signature, n->public_key)))
2341     {
2342       /* invalid signature */
2343       GNUNET_break_op (0);
2344       return;
2345     }
2346   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2347   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2348        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2349       (t.value < n->decrypt_key_created.value))
2350     {
2351       /* this could rarely happen due to massive re-ordering of
2352          messages on the network level, but is most likely either
2353          a bug or some adversary messing with us.  Report. */
2354       GNUNET_break_op (0);
2355       return;
2356     }
2357 #if DEBUG_CORE
2358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2359 #endif
2360   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2361                                   &m->encrypted_key,
2362                                   &k,
2363                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2364        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2365       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2366     {
2367       /* failed to decrypt !? */
2368       GNUNET_break_op (0);
2369       return;
2370     }
2371
2372   n->decrypt_key = k;
2373   if (n->decrypt_key_created.value != t.value)
2374     {
2375       /* fresh key, reset sequence numbers */
2376       n->last_sequence_number_received = 0;
2377       n->last_packets_bitmap = 0;
2378       n->decrypt_key_created = t;
2379     }
2380   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2381   switch (n->status)
2382     {
2383     case PEER_STATE_DOWN:
2384       n->status = PEER_STATE_KEY_RECEIVED;
2385 #if DEBUG_CORE
2386       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2387                   "Responding to `%s' with my own key.\n", "SET_KEY");
2388 #endif
2389       send_key (n);
2390       break;
2391     case PEER_STATE_KEY_SENT:
2392     case PEER_STATE_KEY_RECEIVED:
2393       n->status = PEER_STATE_KEY_RECEIVED;
2394       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2395           (sender_status != PEER_STATE_KEY_CONFIRMED))
2396         {
2397 #if DEBUG_CORE
2398           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2399                       "Responding to `%s' with my own key (other peer has status %u).\n",
2400                       "SET_KEY", sender_status);
2401 #endif
2402           send_key (n);
2403         }
2404       break;
2405     case PEER_STATE_KEY_CONFIRMED:
2406       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2407           (sender_status != PEER_STATE_KEY_CONFIRMED))
2408         {
2409 #if DEBUG_CORE
2410           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2411                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2412                       "SET_KEY", sender_status);
2413 #endif
2414           send_key (n);
2415         }
2416       break;
2417     default:
2418       GNUNET_break (0);
2419       break;
2420     }
2421   if (n->pending_ping != NULL)
2422     {
2423       ping = n->pending_ping;
2424       n->pending_ping = NULL;
2425       handle_ping (n, ping);
2426       GNUNET_free (ping);
2427     }
2428 }
2429
2430
2431 /**
2432  * We received a PONG message.  Validate and update
2433  * our status.
2434  *
2435  * @param n sender of the PONG
2436  * @param m the encrypted PONG message itself
2437  */
2438 static void
2439 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2440 {
2441   struct PingMessage t;
2442
2443 #if DEBUG_CORE
2444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2445               "Core service receives `%s' request from `%4s'.\n",
2446               "PONG", GNUNET_i2s (&n->peer));
2447 #endif
2448   if (GNUNET_OK !=
2449       do_decrypt (n,
2450                   &n->peer.hashPubKey,
2451                   &m->challenge,
2452                   &t.challenge,
2453                   sizeof (struct PingMessage) -
2454                   sizeof (struct GNUNET_MessageHeader)))
2455     return;
2456 #if DEBUG_CORE
2457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2458               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2459               "PONG",
2460               GNUNET_i2s (&t.target),
2461               ntohl (t.challenge), n->decrypt_key.crc32);
2462 #endif
2463   if ((0 != memcmp (&t.target,
2464                     &n->peer,
2465                     sizeof (struct GNUNET_PeerIdentity))) ||
2466       (n->ping_challenge != ntohl (t.challenge)))
2467     {
2468       /* PONG malformed */
2469 #if DEBUG_CORE
2470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2471                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2472                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2473       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2474                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2475                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2476 #endif
2477       GNUNET_break_op (0);
2478       return;
2479     }
2480   switch (n->status)
2481     {
2482     case PEER_STATE_DOWN:
2483       GNUNET_break (0);         /* should be impossible */
2484       return;
2485     case PEER_STATE_KEY_SENT:
2486       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2487       return;
2488     case PEER_STATE_KEY_RECEIVED:
2489       n->status = PEER_STATE_KEY_CONFIRMED;
2490       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
2491         {
2492           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2493           n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2494         }
2495       process_encrypted_neighbour_queue (n);
2496       break;
2497     case PEER_STATE_KEY_CONFIRMED:
2498       /* duplicate PONG? */
2499       break;
2500     default:
2501       GNUNET_break (0);
2502       break;
2503     }
2504 }
2505
2506
2507 /**
2508  * Send a P2P message to a client.
2509  *
2510  * @param sender who sent us the message?
2511  * @param client who should we give the message to?
2512  * @param m contains the message to transmit
2513  * @param msize number of bytes in buf to transmit
2514  */
2515 static void
2516 send_p2p_message_to_client (struct Neighbour *sender,
2517                             struct Client *client,
2518                             const void *m, size_t msize)
2519 {
2520   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2521   struct NotifyTrafficMessage *ntm;
2522
2523 #if DEBUG_CORE
2524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525               "Core service passes message from `%4s' of type %u to client.\n",
2526               GNUNET_i2s(&sender->peer),
2527               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2528 #endif
2529   ntm = (struct NotifyTrafficMessage *) buf;
2530   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2531   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2532   ntm->reserved = htonl (0);
2533   ntm->peer = sender->peer;
2534   memcpy (&ntm[1], m, msize);
2535   send_to_client (client, &ntm->header, GNUNET_YES);
2536 }
2537
2538
2539 /**
2540  * Deliver P2P message to interested clients.
2541  *
2542  * @param sender who sent us the message?
2543  * @param m the message
2544  * @param msize size of the message (including header)
2545  */
2546 static void
2547 deliver_message (struct Neighbour *sender,
2548                  const struct GNUNET_MessageHeader *m, size_t msize)
2549 {
2550   struct Client *cpos;
2551   uint16_t type;
2552   unsigned int tpos;
2553   int deliver_full;
2554
2555   type = ntohs (m->type);
2556   cpos = clients;
2557   while (cpos != NULL)
2558     {
2559       deliver_full = GNUNET_NO;
2560       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2561         deliver_full = GNUNET_YES;
2562       else
2563         {
2564           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2565             {
2566               if (type != cpos->types[tpos])
2567                 continue;
2568               deliver_full = GNUNET_YES;
2569               break;
2570             }
2571         }
2572       if (GNUNET_YES == deliver_full)
2573         send_p2p_message_to_client (sender, cpos, m, msize);
2574       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2575         send_p2p_message_to_client (sender, cpos, m,
2576                                     sizeof (struct GNUNET_MessageHeader));
2577       cpos = cpos->next;
2578     }
2579 }
2580
2581
2582 /**
2583  * Align P2P message and then deliver to interested clients.
2584  *
2585  * @param sender who sent us the message?
2586  * @param buffer unaligned (!) buffer containing message
2587  * @param msize size of the message (including header)
2588  */
2589 static void
2590 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2591 {
2592   char abuf[msize];
2593
2594   /* TODO: call to statistics? */
2595   memcpy (abuf, buffer, msize);
2596   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2597 }
2598
2599
2600 /**
2601  * Deliver P2P messages to interested clients.
2602  *
2603  * @param sender who sent us the message?
2604  * @param buffer buffer containing messages, can be modified
2605  * @param buffer_size size of the buffer (overall)
2606  * @param offset offset where messages in the buffer start
2607  */
2608 static void
2609 deliver_messages (struct Neighbour *sender,
2610                   const char *buffer, size_t buffer_size, size_t offset)
2611 {
2612   struct GNUNET_MessageHeader *mhp;
2613   struct GNUNET_MessageHeader mh;
2614   uint16_t msize;
2615   int need_align;
2616
2617   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2618     {
2619       if (0 != offset % sizeof (uint16_t))
2620         {
2621           /* outch, need to copy to access header */
2622           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2623           mhp = &mh;
2624         }
2625       else
2626         {
2627           /* can access header directly */
2628           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2629         }
2630       msize = ntohs (mhp->size);
2631       if (msize + offset > buffer_size)
2632         {
2633           /* malformed message, header says it is larger than what
2634              would fit into the overall buffer */
2635           GNUNET_break_op (0);
2636           break;
2637         }
2638 #if HAVE_UNALIGNED_64_ACCESS
2639       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2640 #else
2641       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2642 #endif
2643       if (GNUNET_YES == need_align)
2644         align_and_deliver (sender, &buffer[offset], msize);
2645       else
2646         deliver_message (sender,
2647                          (const struct GNUNET_MessageHeader *)
2648                          &buffer[offset], msize);
2649       offset += msize;
2650     }
2651 }
2652
2653
2654 /**
2655  * We received an encrypted message.  Decrypt, validate and
2656  * pass on to the appropriate clients.
2657  */
2658 static void
2659 handle_encrypted_message (struct Neighbour *n,
2660                           const struct EncryptedMessage *m)
2661 {
2662   size_t size = ntohs (m->header.size);
2663   char buf[size];
2664   struct EncryptedMessage *pt;  /* plaintext */
2665   GNUNET_HashCode ph;
2666   size_t off;
2667   uint32_t snum;
2668   struct GNUNET_TIME_Absolute t;
2669
2670 #if DEBUG_CORE
2671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2672               "Core service receives `%s' request from `%4s'.\n",
2673               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2674 #endif  
2675   /* decrypt */
2676   if (GNUNET_OK !=
2677       do_decrypt (n,
2678                   &m->plaintext_hash,
2679                   &m->sequence_number,
2680                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2681     return;
2682   pt = (struct EncryptedMessage *) buf;
2683
2684   /* validate hash */
2685   GNUNET_CRYPTO_hash (&pt->sequence_number,
2686                       size - ENCRYPTED_HEADER_SIZE, &ph);
2687   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2688     {
2689       /* checksum failed */
2690       GNUNET_break_op (0);
2691       return;
2692     }
2693
2694   /* validate sequence number */
2695   snum = ntohl (pt->sequence_number);
2696   if (n->last_sequence_number_received == snum)
2697     {
2698       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2699                   "Received duplicate message, ignoring.\n");
2700       /* duplicate, ignore */
2701       return;
2702     }
2703   if ((n->last_sequence_number_received > snum) &&
2704       (n->last_sequence_number_received - snum > 32))
2705     {
2706       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2707                   "Received ancient out of sequence message, ignoring.\n");
2708       /* ancient out of sequence, ignore */
2709       return;
2710     }
2711   if (n->last_sequence_number_received > snum)
2712     {
2713       unsigned int rotbit =
2714         1 << (n->last_sequence_number_received - snum - 1);
2715       if ((n->last_packets_bitmap & rotbit) != 0)
2716         {
2717           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2718                       "Received duplicate message, ignoring.\n");
2719           /* duplicate, ignore */
2720           return;
2721         }
2722       n->last_packets_bitmap |= rotbit;
2723     }
2724   if (n->last_sequence_number_received < snum)
2725     {
2726       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2727       n->last_sequence_number_received = snum;
2728     }
2729
2730   /* check timestamp */
2731   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2732   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2733     {
2734       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2735                   _
2736                   ("Message received far too old (%llu ms). Content ignored.\n"),
2737                   GNUNET_TIME_absolute_get_duration (t).value);
2738       return;
2739     }
2740
2741   /* process decrypted message(s) */
2742   update_window (GNUNET_YES,
2743                  &n->available_send_window,
2744                  &n->last_asw_update,
2745                  n->bpm_out);
2746   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2747   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2748                            n->bpm_out_internal_limit);
2749   n->last_activity = GNUNET_TIME_absolute_get ();
2750   off = sizeof (struct EncryptedMessage);
2751   deliver_messages (n, buf, size, off);
2752 }
2753
2754
2755 /**
2756  * Function called by the transport for each received message.
2757  *
2758  * @param cls closure
2759  * @param latency estimated latency for communicating with the
2760  *             given peer
2761  * @param peer (claimed) identity of the other peer
2762  * @param message the message
2763  */
2764 static void
2765 handle_transport_receive (void *cls,
2766                           struct GNUNET_TIME_Relative latency,
2767                           const struct GNUNET_PeerIdentity *peer,
2768                           const struct GNUNET_MessageHeader *message)
2769 {
2770   struct Neighbour *n;
2771   struct GNUNET_TIME_Absolute now;
2772   int up;
2773   uint16_t type;
2774   uint16_t size;
2775
2776 #if DEBUG_CORE
2777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2778               "Received message of type %u from `%4s', demultiplexing.\n",
2779               ntohs (message->type), GNUNET_i2s (peer));
2780 #endif
2781   n = find_neighbour (peer);
2782   if (n == NULL)
2783     {
2784       GNUNET_break (0);
2785       return;
2786     }
2787   n->last_latency = latency;
2788   up = n->status == PEER_STATE_KEY_CONFIRMED;
2789   type = ntohs (message->type);
2790   size = ntohs (message->size);
2791   switch (type)
2792     {
2793     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2794       if (size != sizeof (struct SetKeyMessage))
2795         {
2796           GNUNET_break_op (0);
2797           return;
2798         }
2799       handle_set_key (n, (const struct SetKeyMessage *) message);
2800       break;
2801     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2802       if (size < sizeof (struct EncryptedMessage) +
2803           sizeof (struct GNUNET_MessageHeader))
2804         {
2805           GNUNET_break_op (0);
2806           return;
2807         }
2808       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2809           (n->status != PEER_STATE_KEY_CONFIRMED))
2810         {
2811           GNUNET_break_op (0);
2812           return;
2813         }
2814       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2815       break;
2816     case GNUNET_MESSAGE_TYPE_CORE_PING:
2817       if (size != sizeof (struct PingMessage))
2818         {
2819           GNUNET_break_op (0);
2820           return;
2821         }
2822       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2823           (n->status != PEER_STATE_KEY_CONFIRMED))
2824         {
2825 #if DEBUG_CORE
2826           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2827                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2828                       "PING", GNUNET_i2s (&n->peer));
2829 #endif
2830           GNUNET_free_non_null (n->pending_ping);
2831           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2832           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2833           return;
2834         }
2835       handle_ping (n, (const struct PingMessage *) message);
2836       break;
2837     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2838       if (size != sizeof (struct PingMessage))
2839         {
2840           GNUNET_break_op (0);
2841           return;
2842         }
2843       if ((n->status != PEER_STATE_KEY_SENT) &&
2844           (n->status != PEER_STATE_KEY_RECEIVED) &&
2845           (n->status != PEER_STATE_KEY_CONFIRMED))
2846         {
2847           /* could not decrypt pong, oops! */
2848           GNUNET_break_op (0);
2849           return;
2850         }
2851       handle_pong (n, (const struct PingMessage *) message);
2852       break;
2853     default:
2854       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2855                   _("Unsupported message of type %u received.\n"), type);
2856       return;
2857     }
2858   if (n->status == PEER_STATE_KEY_CONFIRMED)
2859     {
2860       now = GNUNET_TIME_absolute_get ();
2861       n->last_activity = now;
2862       if (!up)
2863         n->time_established = now;
2864     }
2865 }
2866
2867
2868 /**
2869  * Function that recalculates the bandwidth quota for the
2870  * given neighbour and transmits it to the transport service.
2871  * 
2872  * @param cls neighbour for the quota update
2873  * @param tc context
2874  */
2875 static void
2876 neighbour_quota_update (void *cls,
2877                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2878
2879
2880 /**
2881  * Schedule the task that will recalculate the bandwidth
2882  * quota for this peer (and possibly force a disconnect of
2883  * idle peers by calculating a bandwidth of zero).
2884  */
2885 static void
2886 schedule_quota_update (struct Neighbour *n)
2887 {
2888   GNUNET_assert (n->quota_update_task ==
2889                  GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
2890   n->quota_update_task
2891     = GNUNET_SCHEDULER_add_delayed (sched,
2892                                     GNUNET_NO,
2893                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2894                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2895                                     QUOTA_UPDATE_FREQUENCY,
2896                                     &neighbour_quota_update,
2897                                     n);
2898 }
2899
2900
2901 /**
2902  * Function that recalculates the bandwidth quota for the
2903  * given neighbour and transmits it to the transport service.
2904  * 
2905  * @param cls neighbour for the quota update
2906  * @param tc context
2907  */
2908 static void
2909 neighbour_quota_update (void *cls,
2910                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2911 {
2912   struct Neighbour *n = cls;
2913   uint32_t q_in;
2914   double pref_rel;
2915   double share;
2916   unsigned long long distributable;
2917   
2918   n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2919   /* calculate relative preference among all neighbours;
2920      divides by a bit more to avoid division by zero AND to
2921      account for possibility of new neighbours joining any time 
2922      AND to convert to double... */
2923   pref_rel = n->current_preference / (1.0 + preference_sum);
2924   share = 0;
2925   distributable = 0;
2926   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2927     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2928   share = distributable * pref_rel;
2929   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2930   /* check if we want to disconnect for good due to inactivity */
2931   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2932        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2933     q_in = 0; /* force disconnect */
2934   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2935        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2936     {
2937       n->bpm_in = q_in;
2938       GNUNET_TRANSPORT_set_quota (transport,
2939                                   &n->peer,
2940                                   n->bpm_in, 
2941                                   n->bpm_out,
2942                                   GNUNET_TIME_UNIT_FOREVER_REL,
2943                                   NULL, NULL);
2944     }
2945   schedule_quota_update (n);
2946 }
2947
2948
2949 /**
2950  * Function called by transport to notify us that
2951  * a peer connected to us (on the network level).
2952  *
2953  * @param cls closure
2954  * @param peer the peer that connected
2955  * @param latency current latency of the connection
2956  */
2957 static void
2958 handle_transport_notify_connect (void *cls,
2959                                  const struct GNUNET_PeerIdentity *peer,
2960                                  struct GNUNET_TIME_Relative latency)
2961 {
2962   struct Neighbour *n;
2963   struct GNUNET_TIME_Absolute now;
2964   struct ConnectNotifyMessage cnm;
2965
2966   n = find_neighbour (peer);
2967   if (n != NULL)
2968     {
2969       /* duplicate connect notification!? */
2970       GNUNET_break (0);
2971       return;
2972     }
2973   now = GNUNET_TIME_absolute_get ();
2974   n = GNUNET_malloc (sizeof (struct Neighbour));
2975   n->next = neighbours;
2976   neighbours = n;
2977   neighbour_count++;
2978   n->peer = *peer;
2979   n->last_latency = latency;
2980   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2981   n->encrypt_key_created = now;
2982   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2983   n->last_asw_update = now;
2984   n->last_arw_update = now;
2985   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2986   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2987   n->bpm_out_internal_limit = (uint32_t) - 1;
2988   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2989   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2990                                                 (uint32_t) - 1);
2991 #if DEBUG_CORE
2992   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2993               "Received connection from `%4s'.\n",
2994               GNUNET_i2s (&n->peer));
2995 #endif
2996   schedule_quota_update (n);
2997   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2998   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2999   cnm.reserved = htonl (0);
3000   cnm.peer = *peer;
3001   send_to_all_clients (&cnm.header, GNUNET_YES);
3002 }
3003
3004
3005 /**
3006  * Free the given entry for the neighbour (it has
3007  * already been removed from the list at this point).
3008  *
3009  * @param n neighbour to free
3010  */
3011 static void
3012 free_neighbour (struct Neighbour *n)
3013 {
3014   struct MessageEntry *m;
3015
3016   while (NULL != (m = n->messages))
3017     {
3018       n->messages = m->next;
3019       GNUNET_free (m);
3020     }
3021   while (NULL != (m = n->encrypted_head))
3022     {
3023       n->encrypted_head = m->next;
3024       GNUNET_free (m);
3025     }
3026   if (NULL != n->th)
3027     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3028   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3029     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3030   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3031     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3032   if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3033     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3034   GNUNET_free_non_null (n->public_key);
3035   GNUNET_free_non_null (n->pending_ping);
3036   GNUNET_free (n);
3037 }
3038
3039
3040 /**
3041  * Function called by transport telling us that a peer
3042  * disconnected.
3043  *
3044  * @param cls closure
3045  * @param peer the peer that disconnected
3046  */
3047 static void
3048 handle_transport_notify_disconnect (void *cls,
3049                                     const struct GNUNET_PeerIdentity *peer)
3050 {
3051   struct ConnectNotifyMessage cnm;
3052   struct Neighbour *n;
3053   struct Neighbour *p;
3054
3055 #if DEBUG_CORE
3056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3057               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3058 #endif
3059   p = NULL;
3060   n = neighbours;
3061   while ((n != NULL) &&
3062          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3063     {
3064       p = n;
3065       n = n->next;
3066     }
3067   if (n == NULL)
3068     {
3069       GNUNET_break (0);
3070       return;
3071     }
3072   if (p == NULL)
3073     neighbours = n->next;
3074   else
3075     p->next = n->next;
3076   GNUNET_assert (neighbour_count > 0);
3077   neighbour_count--;
3078   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3079   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3080   cnm.reserved = htonl (0);
3081   cnm.peer = *peer;
3082   send_to_all_clients (&cnm.header, GNUNET_YES);
3083   free_neighbour (n);
3084 }
3085
3086
3087 /**
3088  * Last task run during shutdown.  Disconnects us from
3089  * the transport.
3090  */
3091 static void
3092 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3093 {
3094   struct Neighbour *n;
3095   struct Client *c;
3096
3097 #if DEBUG_CORE
3098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3099               "Core service shutting down.\n");
3100 #endif
3101   GNUNET_assert (transport != NULL);
3102   GNUNET_TRANSPORT_disconnect (transport);
3103   transport = NULL;
3104   while (NULL != (n = neighbours))
3105     {
3106       neighbours = n->next;
3107       GNUNET_assert (neighbour_count > 0);
3108       neighbour_count--;
3109       free_neighbour (n);
3110     }
3111   while (NULL != (c = clients))
3112     handle_client_disconnect (NULL, c->client_handle);
3113 }
3114
3115
3116 /**
3117  * Initiate core service.
3118  *
3119  * @param cls closure
3120  * @param s scheduler to use
3121  * @param serv the initialized server
3122  * @param c configuration to use
3123  */
3124 static void
3125 run (void *cls,
3126      struct GNUNET_SCHEDULER_Handle *s,
3127      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
3128 {
3129 #if 0
3130   unsigned long long qin;
3131   unsigned long long qout;
3132   unsigned long long tneigh;
3133 #endif
3134   char *keyfile;
3135
3136   sched = s;
3137   cfg = c;
3138   /* parse configuration */
3139   if (
3140        (GNUNET_OK !=
3141         GNUNET_CONFIGURATION_get_value_number (c,
3142                                                "CORE",
3143                                                "TOTAL_QUOTA_IN",
3144                                                &bandwidth_target_in)) ||
3145        (GNUNET_OK !=
3146         GNUNET_CONFIGURATION_get_value_number (c,
3147                                                "CORE",
3148                                                "TOTAL_QUOTA_OUT",
3149                                                &bandwidth_target_out)) ||
3150 #if 0
3151        (GNUNET_OK !=
3152         GNUNET_CONFIGURATION_get_value_number (c,
3153                                                "CORE",
3154                                                "YY",
3155                                                &qout)) ||
3156        (GNUNET_OK !=
3157         GNUNET_CONFIGURATION_get_value_number (c,
3158                                                "CORE",
3159                                                "ZZ_LIMIT", &tneigh)) ||
3160 #endif
3161        (GNUNET_OK !=
3162         GNUNET_CONFIGURATION_get_value_filename (c,
3163                                                  "GNUNETD",
3164                                                  "HOSTKEY", &keyfile)))
3165     {
3166       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3167                   _
3168                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3169       GNUNET_SCHEDULER_shutdown (s);
3170       return;
3171     }
3172   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3173   GNUNET_free (keyfile);
3174   if (my_private_key == NULL)
3175     {
3176       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3177                   _("Core service could not access hostkey.  Exiting.\n"));
3178       GNUNET_SCHEDULER_shutdown (s);
3179       return;
3180     }
3181   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3182   GNUNET_CRYPTO_hash (&my_public_key,
3183                       sizeof (my_public_key), &my_identity.hashPubKey);
3184   /* setup notification */
3185   server = serv;
3186   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3187   /* setup transport connection */
3188   transport = GNUNET_TRANSPORT_connect (sched,
3189                                         cfg,
3190                                         NULL,
3191                                         &handle_transport_receive,
3192                                         &handle_transport_notify_connect,
3193                                         &handle_transport_notify_disconnect);
3194   GNUNET_assert (NULL != transport);
3195   GNUNET_SCHEDULER_add_delayed (sched,
3196                                 GNUNET_YES,
3197                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3198                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
3199                                 GNUNET_TIME_UNIT_FOREVER_REL,
3200                                 &cleaning_task, NULL);
3201   /* process client requests */
3202   GNUNET_SERVER_add_handlers (server, handlers);
3203   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3204               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3205 }
3206
3207
3208 /**
3209  * Function called during shutdown.  Clean up our state.
3210  */
3211 static void
3212 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
3213 {
3214   if (my_private_key != NULL)
3215     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3216 }
3217
3218
3219 /**
3220  * The main function for the transport service.
3221  *
3222  * @param argc number of arguments from the command line
3223  * @param argv command line arguments
3224  * @return 0 ok, 1 on error
3225  */
3226 int
3227 main (int argc, char *const *argv)
3228 {
3229   return (GNUNET_OK ==
3230           GNUNET_SERVICE_run (argc,
3231                               argv,
3232                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3233 }
3234
3235 /* end of gnunet-service-core.c */