implemented advertising
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Identity of the neighbour.
364    */
365   struct GNUNET_PeerIdentity peer;
366
367   /**
368    * Key we use to encrypt our messages for the other peer
369    * (initialized by us when we do the handshake).
370    */
371   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
372
373   /**
374    * Key we use to decrypt messages from the other peer
375    * (given to us by the other peer during the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
378
379   /**
380    * ID of task used for re-trying plaintext scheduling.
381    */
382   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
383
384   /**
385    * ID of task used for re-trying SET_KEY and PING message.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
388
389   /**
390    * ID of task used for updating bandwidth quota for this neighbour.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
393
394   /**
395    * At what time did we generate our encryption key?
396    */
397   struct GNUNET_TIME_Absolute encrypt_key_created;
398
399   /**
400    * At what time did the other peer generate the decryption key?
401    */
402   struct GNUNET_TIME_Absolute decrypt_key_created;
403
404   /**
405    * At what time did we initially establish (as in, complete session
406    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
407    */
408   struct GNUNET_TIME_Absolute time_established;
409
410   /**
411    * At what time did we last receive an encrypted message from the
412    * other peer?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute last_activity;
415
416   /**
417    * Last latency observed from this peer.
418    */
419   struct GNUNET_TIME_Relative last_latency;
420
421   /**
422    * At what frequency are we currently re-trying SET_KEY messages?
423    */
424   struct GNUNET_TIME_Relative set_key_retry_frequency;
425
426   /**
427    * Time of our last update to the "available_send_window".
428    */
429   struct GNUNET_TIME_Absolute last_asw_update;
430
431   /**
432    * Time of our last update to the "available_recv_window".
433    */
434   struct GNUNET_TIME_Absolute last_arw_update;
435
436   /**
437    * Number of bytes that we are eligible to transmit to this
438    * peer at this point.  Incremented every minute by max_out_bpm,
439    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
440    * bandwidth-hogs are sampled at a frequency of about 78s!);
441    * may get negative if we have VERY high priority content.
442    */
443   long long available_send_window; 
444
445   /**
446    * How much downstream capacity of this peer has been reserved for
447    * our traffic?  (Our clients can request that a certain amount of
448    * bandwidth is available for replies to them; this value is used to
449    * make sure that this reserved amount of bandwidth is actually
450    * available).
451    */
452   long long available_recv_window; 
453
454   /**
455    * How valueable were the messages of this peer recently?
456    */
457   unsigned long long current_preference;
458
459   /**
460    * Bit map indicating which of the 32 sequence numbers before the last
461    * were received (good for accepting out-of-order packets and
462    * estimating reliability of the connection)
463    */
464   unsigned int last_packets_bitmap;
465
466   /**
467    * Number of messages in the message queue for this peer.
468    */
469   unsigned int message_queue_size;
470
471   /**
472    * last sequence number received on this connection (highest)
473    */
474   uint32_t last_sequence_number_received;
475
476   /**
477    * last sequence number transmitted
478    */
479   uint32_t last_sequence_number_sent;
480
481   /**
482    * Available bandwidth in for this peer (current target).
483    */
484   uint32_t bpm_in;
485
486   /**
487    * Available bandwidth out for this peer (current target).
488    */
489   uint32_t bpm_out;
490
491   /**
492    * Internal bandwidth limit set for this peer (initially
493    * typically set to "-1").  "bpm_out" is MAX of
494    * "bpm_out_internal_limit" and "bpm_out_external_limit".
495    */
496   uint32_t bpm_out_internal_limit;
497
498   /**
499    * External bandwidth limit set for this peer by the
500    * peer that we are communicating with.  "bpm_out" is MAX of
501    * "bpm_out_internal_limit" and "bpm_out_external_limit".
502    */
503   uint32_t bpm_out_external_limit;
504
505   /**
506    * What was our PING challenge number (for this peer)?
507    */
508   uint32_t ping_challenge;
509
510   /**
511    * What is our connection status?
512    */
513   enum PeerStateMachine status;
514
515 };
516
517
518 /**
519  * Events are messages for clients.  The struct
520  * itself is followed by the actual message.
521  */
522 struct Event
523 {
524   /**
525    * This is a linked list.
526    */
527   struct Event *next;
528
529   /**
530    * Size of the message.
531    */
532   size_t size;
533
534   /**
535    * Could this event be dropped if this queue
536    * is getting too large? (NOT YET USED!)
537    */
538   int can_drop;
539
540 };
541
542
543 /**
544  * Data structure for each client connected to the core service.
545  */
546 struct Client
547 {
548   /**
549    * Clients are kept in a linked list.
550    */
551   struct Client *next;
552
553   /**
554    * Handle for the client with the server API.
555    */
556   struct GNUNET_SERVER_Client *client_handle;
557
558   /**
559    * Linked list of messages we still need to deliver to
560    * this client.
561    */
562   struct Event *event_head;
563
564   /**
565    * Tail of the linked list of events.
566    */
567   struct Event *event_tail;
568
569   /**
570    * Current transmit handle, NULL if no transmission request
571    * is pending.
572    */
573   struct GNUNET_NETWORK_TransmitHandle *th;
574
575   /**
576    * Array of the types of messages this peer cares
577    * about (with "tcnt" entries).  Allocated as part
578    * of this client struct, do not free!
579    */
580   uint16_t *types;
581
582   /**
583    * Options for messages this client cares about,
584    * see GNUNET_CORE_OPTION_ values.
585    */
586   uint32_t options;
587
588   /**
589    * Number of types of incoming messages this client
590    * specifically cares about.  Size of the "types" array.
591    */
592   unsigned int tcnt;
593
594 };
595
596
597 /**
598  * Our public key.
599  */
600 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
601
602 /**
603  * Our identity.
604  */
605 static struct GNUNET_PeerIdentity my_identity;
606
607 /**
608  * Our private key.
609  */
610 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
611
612 /**
613  * Our scheduler.
614  */
615 struct GNUNET_SCHEDULER_Handle *sched;
616
617 /**
618  * Our configuration.
619  */
620 struct GNUNET_CONFIGURATION_Handle *cfg;
621
622 /**
623  * Our server.
624  */
625 static struct GNUNET_SERVER_Handle *server;
626
627 /**
628  * Transport service.
629  */
630 static struct GNUNET_TRANSPORT_Handle *transport;
631
632 /**
633  * Linked list of our clients.
634  */
635 static struct Client *clients;
636
637 /**
638  * We keep neighbours in a linked list (for now).
639  */
640 static struct Neighbour *neighbours;
641
642 /**
643  * Sum of all preferences among all neighbours.
644  */
645 static unsigned long long preference_sum;
646
647 /**
648  * Total number of neighbours we have.
649  */
650 static unsigned int neighbour_count;
651
652 /**
653  * How much inbound bandwidth are we supposed to be using?
654  */
655 static unsigned long long bandwidth_target_in;
656
657 /**
658  * How much outbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_out;
661
662
663
664 /**
665  * A preference value for a neighbour was update.  Update
666  * the preference sum accordingly.
667  *
668  * @param inc how much was a preference value increased?
669  */
670 static void
671 update_preference_sum (unsigned long long inc)
672 {
673   struct Neighbour *n;
674   unsigned long long os;
675
676   os = preference_sum;
677   preference_sum += inc;
678   if (preference_sum >= os)
679     return; /* done! */
680   /* overflow! compensate by cutting all values in half! */
681   preference_sum = 0;
682   n = neighbours;
683   while (n != NULL)
684     {
685       n->current_preference /= 2;
686       preference_sum += n->current_preference;
687       n = n->next;
688     }    
689 }
690
691
692 /**
693  * Recalculate the number of bytes we expect to
694  * receive or transmit in a given window.
695  *
696  * @param force force an update now (even if not much time has passed)
697  * @param window pointer to the byte counter (updated)
698  * @param ts pointer to the timestamp (updated)
699  * @param bpm number of bytes per minute that should
700  *        be added to the window.
701  */
702 static void
703 update_window (int force,
704                long long *window,
705                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
706 {
707   struct GNUNET_TIME_Relative since;
708
709   since = GNUNET_TIME_absolute_get_duration (*ts);
710   if ( (force == GNUNET_NO) &&
711        (since.value < 60 * 1000) )
712     return;                     /* not even a minute has passed */
713   *ts = GNUNET_TIME_absolute_get ();
714   *window += (bpm * since.value) / 60 / 1000;
715   if (*window > MAX_WINDOW_TIME * bpm)
716     *window = MAX_WINDOW_TIME * bpm;
717 }
718
719
720 /**
721  * Find the entry for the given neighbour.
722  *
723  * @param peer identity of the neighbour
724  * @return NULL if we are not connected, otherwise the
725  *         neighbour's entry.
726  */
727 static struct Neighbour *
728 find_neighbour (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct Neighbour *ret;
731
732   ret = neighbours;
733   while ((ret != NULL) &&
734          (0 != memcmp (&ret->peer,
735                        peer, sizeof (struct GNUNET_PeerIdentity))))
736     ret = ret->next;
737   return ret;
738 }
739
740
741 /**
742  * Find the entry for the given client.
743  *
744  * @param client handle for the client
745  * @return NULL if we are not connected, otherwise the
746  *         client's struct.
747  */
748 static struct Client *
749 find_client (const struct GNUNET_SERVER_Client *client)
750 {
751   struct Client *ret;
752
753   ret = clients;
754   while ((ret != NULL) && (client != ret->client_handle))
755     ret = ret->next;
756   return ret;
757 }
758
759
760 /**
761  * If necessary, initiate a request with the server to
762  * transmit messages from the queue of the given client.
763  * @param client who to transfer messages to
764  */
765 static void request_transmit (struct Client *client);
766
767
768 /**
769  * Client is ready to receive data, provide it.
770  *
771  * @param cls closure
772  * @param size number of bytes available in buf
773  * @param buf where the callee should write the message
774  * @return number of bytes written to buf
775  */
776 static size_t
777 do_client_transmit (void *cls, size_t size, void *buf)
778 {
779   struct Client *client = cls;
780   struct Event *e;
781   char *tgt;
782   size_t ret;
783
784   client->th = NULL;
785 #if DEBUG_CORE_CLIENT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Client ready to receive %u bytes.\n", size);
788 #endif
789   if (buf == NULL)
790     {
791 #if DEBUG_CORE
792       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                   "Failed to transmit data to client (disconnect)?\n");
794 #endif
795       return 0;                 /* we'll surely get a disconnect soon... */
796     }
797   tgt = buf;
798   ret = 0;
799   while ((NULL != (e = client->event_head)) && (e->size <= size))
800     {
801       memcpy (&tgt[ret], &e[1], e->size);
802       size -= e->size;
803       ret += e->size;
804       client->event_head = e->next;
805       GNUNET_free (e);
806     }
807   GNUNET_assert (ret > 0);
808   if (client->event_head == NULL)
809     client->event_tail = NULL;
810 #if DEBUG_CORE_CLIENT
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Transmitting %u bytes to client\n", ret);
813 #endif
814   request_transmit (client);
815   return ret;
816 }
817
818
819 /**
820  * If necessary, initiate a request with the server to
821  * transmit messages from the queue of the given client.
822  * @param client who to transfer messages to
823  */
824 static void
825 request_transmit (struct Client *client)
826 {
827
828   if (NULL != client->th)
829     return;                     /* already pending */
830   if (NULL == client->event_head)
831     return;                     /* no more events pending */
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Asking server to transmit %u bytes to client\n",
835               client->event_head->size);
836 #endif
837   client->th
838     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
839                                            client->event_head->size,
840                                            GNUNET_TIME_UNIT_FOREVER_REL,
841                                            &do_client_transmit, client);
842 }
843
844
845 /**
846  * Send a message to one of our clients.
847  * @param client target for the message
848  * @param msg message to transmit
849  * @param can_drop could this message be dropped if the
850  *        client's queue is getting too large?
851  */
852 static void
853 send_to_client (struct Client *client,
854                 const struct GNUNET_MessageHeader *msg, int can_drop)
855 {
856   struct Event *e;
857   unsigned int queue_size;
858   uint16_t msize;
859
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Preparing to send message of type %u to client.\n",
863               ntohs (msg->type));
864 #endif
865   queue_size = 0;
866   e = client->event_head;
867   while (e != NULL)
868     {
869       queue_size++;
870       e = e->next;
871     }
872   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
873        (can_drop == GNUNET_YES) )
874     {
875 #if DEBUG_CORE
876       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
877                   "Too many messages in queue for the client, dropping the new message.\n");
878 #endif
879       return;
880     }
881
882   msize = ntohs (msg->size);
883   e = GNUNET_malloc (sizeof (struct Event) + msize);
884   /* append */
885   if (client->event_tail != NULL)
886     client->event_tail->next = e;
887   else
888     client->event_head = e;
889   client->event_tail = e;
890   e->can_drop = can_drop;
891   e->size = msize;
892   memcpy (&e[1], msg, msize);
893   request_transmit (client);
894 }
895
896
897 /**
898  * Send a message to all of our current clients.
899  */
900 static void
901 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
902 {
903   struct Client *c;
904
905   c = clients;
906   while (c != NULL)
907     {
908       send_to_client (c, msg, can_drop);
909       c = c->next;
910     }
911 }
912
913
914 /**
915  * Handle CORE_INIT request.
916  */
917 static void
918 handle_client_init (void *cls,
919                     struct GNUNET_SERVER_Client *client,
920                     const struct GNUNET_MessageHeader *message)
921 {
922   const struct InitMessage *im;
923   struct InitReplyMessage irm;
924   struct Client *c;
925   uint16_t msize;
926   const uint16_t *types;
927   struct Neighbour *n;
928   struct ConnectNotifyMessage cnm;
929
930 #if DEBUG_CORE_CLIENT
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Client connecting to core service with `%s' message\n",
933               "INIT");
934 #endif
935   /* check that we don't have an entry already */
936   c = clients;
937   while (c != NULL)
938     {
939       if (client == c->client_handle)
940         {
941           GNUNET_break (0);
942           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
943           return;
944         }
945       c = c->next;
946     }
947   msize = ntohs (message->size);
948   if (msize < sizeof (struct InitMessage))
949     {
950       GNUNET_break (0);
951       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
952       return;
953     }
954   im = (const struct InitMessage *) message;
955   types = (const uint16_t *) &im[1];
956   msize -= sizeof (struct InitMessage);
957   c = GNUNET_malloc (sizeof (struct Client) + msize);
958   c->client_handle = client;
959   c->next = clients;
960   clients = c;
961   memcpy (&c[1], types, msize);
962   c->types = (uint16_t *) & c[1];
963   c->options = ntohl (im->options);
964   c->tcnt = msize / sizeof (uint16_t);
965   /* send init reply message */
966   irm.header.size = htons (sizeof (struct InitReplyMessage));
967   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
968   irm.reserved = htonl (0);
969   memcpy (&irm.publicKey,
970           &my_public_key,
971           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
972 #if DEBUG_CORE_CLIENT
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Sending `%s' message to client.\n", "INIT_REPLY");
975 #endif
976   send_to_client (c, &irm.header, GNUNET_NO);
977   /* notify new client about existing neighbours */
978   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
979   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
980   n = neighbours;
981   while (n != NULL)
982     {
983 #if DEBUG_CORE_CLIENT
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
986 #endif
987       cnm.reserved = htonl (0);
988       cnm.peer = n->peer;
989       send_to_client (c, &cnm.header, GNUNET_NO);
990       n = n->next;
991     }
992   GNUNET_SERVER_receive_done (client, GNUNET_OK);
993 }
994
995
996 /**
997  * A client disconnected, clean up.
998  *
999  * @param cls closure
1000  * @param client identification of the client
1001  */
1002 static void
1003 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1004 {
1005   struct Client *pos;
1006   struct Client *prev;
1007   struct Event *e;
1008
1009 #if DEBUG_CORE_CLIENT
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               "Client has disconnected from core service.\n");
1012 #endif
1013   prev = NULL;
1014   pos = clients;
1015   while (pos != NULL)
1016     {
1017       if (client == pos->client_handle)
1018         {
1019           if (prev == NULL)
1020             clients = pos->next;
1021           else
1022             prev->next = pos->next;
1023           if (pos->th != NULL)
1024             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
1025           while (NULL != (e = pos->event_head))
1026             {
1027               pos->event_head = e->next;
1028               GNUNET_free (e);
1029             }
1030           GNUNET_free (pos);
1031           return;
1032         }
1033       prev = pos;
1034       pos = pos->next;
1035     }
1036   /* client never sent INIT */
1037 }
1038
1039
1040 /**
1041  * Handle REQUEST_CONFIGURE request.
1042  */
1043 static void
1044 handle_client_request_configure (void *cls,
1045                                  struct GNUNET_SERVER_Client *client,
1046                                  const struct GNUNET_MessageHeader *message)
1047 {
1048   const struct RequestConfigureMessage *rcm;
1049   struct Neighbour *n;
1050   struct ConfigurationInfoMessage cim;
1051   struct Client *c;
1052   int reserv;
1053   unsigned long long old_preference;
1054
1055 #if DEBUG_CORE_CLIENT
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Core service receives `%s' request.\n", "CONFIGURE");
1058 #endif
1059   rcm = (const struct RequestConfigureMessage *) message;
1060   n = find_neighbour (&rcm->peer);
1061   memset (&cim, 0, sizeof (cim));
1062   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1063     {
1064       update_window (GNUNET_YES,
1065                      &n->available_send_window,
1066                      &n->last_asw_update,
1067                      n->bpm_out);
1068       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1069       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1070                                n->bpm_out_external_limit);
1071       reserv = ntohl (rcm->reserve_inbound);
1072       if (reserv < 0)
1073         {
1074           n->available_recv_window += reserv;
1075         }
1076       else if (reserv > 0)
1077         {
1078           update_window (GNUNET_NO,
1079                          &n->available_recv_window,
1080                          &n->last_arw_update, n->bpm_in);
1081           if (n->available_recv_window < reserv)
1082             reserv = n->available_recv_window;
1083           n->available_recv_window -= reserv;
1084         }
1085       old_preference = n->current_preference;
1086       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1087       if (old_preference > n->current_preference) 
1088         {
1089           /* overflow; cap at maximum value */
1090           n->current_preference = (unsigned long long) -1;
1091         }
1092       update_preference_sum (n->current_preference - old_preference);
1093       cim.reserved_amount = htonl (reserv);
1094       cim.bpm_in = htonl (n->bpm_in);
1095       cim.bpm_out = htonl (n->bpm_out);
1096       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1097       cim.preference = n->current_preference;
1098     }
1099   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1100   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1101   cim.peer = rcm->peer;
1102   c = find_client (client);
1103   if (c == NULL)
1104     {
1105       GNUNET_break (0);
1106       return;
1107     }
1108 #if DEBUG_CORE_CLIENT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1111 #endif
1112   send_to_client (c, &cim.header, GNUNET_NO);
1113 }
1114
1115
1116 /**
1117  * Check if we have encrypted messages for the specified neighbour
1118  * pending, and if so, check with the transport about sending them
1119  * out.
1120  *
1121  * @param n neighbour to check.
1122  */
1123 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1124
1125
1126 /**
1127  * Function called when the transport service is ready to
1128  * receive an encrypted message for the respective peer
1129  *
1130  * @param cls neighbour to use message from
1131  * @param size number of bytes we can transmit
1132  * @param buf where to copy the message
1133  * @return number of bytes transmitted
1134  */
1135 static size_t
1136 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1137 {
1138   struct Neighbour *n = cls;
1139   struct MessageEntry *m;
1140   size_t ret;
1141   char *cbuf;
1142
1143   n->th = NULL;
1144   GNUNET_assert (NULL != (m = n->encrypted_head));
1145   n->encrypted_head = m->next;
1146   if (m->next == NULL)
1147     n->encrypted_tail = NULL;
1148   ret = 0;
1149   cbuf = buf;
1150   if (buf != NULL)
1151     {
1152       GNUNET_assert (size >= m->size);
1153       memcpy (cbuf, &m[1], m->size);
1154       ret = m->size;
1155       n->available_send_window -= m->size;
1156       process_encrypted_neighbour_queue (n);
1157 #if DEBUG_CORE
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1160                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1161                   ret, GNUNET_i2s (&n->peer));
1162 #endif
1163     }
1164   else
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                   "Transmission for message of type %u and size %u failed\n",
1168                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1169                   m->size);
1170     }
1171   GNUNET_free (m);
1172   return ret;
1173 }
1174
1175
1176 /**
1177  * Check if we have plaintext messages for the specified neighbour
1178  * pending, and if so, consider batching and encrypting them (and
1179  * then trigger processing of the encrypted queue if needed).
1180  *
1181  * @param n neighbour to check.
1182  */
1183 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1184
1185
1186 /**
1187  * Check if we have encrypted messages for the specified neighbour
1188  * pending, and if so, check with the transport about sending them
1189  * out.
1190  *
1191  * @param n neighbour to check.
1192  */
1193 static void
1194 process_encrypted_neighbour_queue (struct Neighbour *n)
1195 {
1196   struct MessageEntry *m;
1197  
1198   if (n->th != NULL)
1199     return;  /* request already pending */
1200   if (n->encrypted_head == NULL)
1201     {
1202       /* encrypted queue empty, try plaintext instead */
1203       process_plaintext_neighbour_queue (n);
1204       return;
1205     }
1206 #if DEBUG_CORE
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1209               n->encrypted_head->size,
1210               GNUNET_i2s (&n->peer),
1211               GNUNET_TIME_absolute_get_remaining (n->
1212                                                   encrypted_head->deadline).
1213               value);
1214 #endif
1215   n->th =
1216     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1217                                             n->encrypted_head->size,
1218                                             n->encrypted_head->priority,
1219                                             GNUNET_TIME_absolute_get_remaining
1220                                             (n->encrypted_head->deadline),
1221                                             &notify_encrypted_transmit_ready,
1222                                             n);
1223   if (n->th == NULL)
1224     {
1225       /* message request too large (oops) */
1226       GNUNET_break (0);
1227       /* discard encrypted message */
1228       GNUNET_assert (NULL != (m = n->encrypted_head));
1229       n->encrypted_head = m->next;
1230       if (m->next == NULL)
1231         n->encrypted_tail = NULL;
1232       GNUNET_free (m);
1233       process_encrypted_neighbour_queue (n);
1234     }
1235 }
1236
1237
1238 /**
1239  * Decrypt size bytes from in and write the result to out.  Use the
1240  * key for inbound traffic of the given neighbour.  This function does
1241  * NOT do any integrity-checks on the result.
1242  *
1243  * @param n neighbour we are receiving from
1244  * @param iv initialization vector to use
1245  * @param in ciphertext
1246  * @param out plaintext
1247  * @param size size of in/out
1248  * @return GNUNET_OK on success
1249  */
1250 static int
1251 do_decrypt (struct Neighbour *n,
1252             const GNUNET_HashCode * iv,
1253             const void *in, void *out, size_t size)
1254 {
1255   if (size != (uint16_t) size)
1256     {
1257       GNUNET_break (0);
1258       return GNUNET_NO;
1259     }
1260   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1261       (n->status != PEER_STATE_KEY_CONFIRMED))
1262     {
1263       GNUNET_break_op (0);
1264       return GNUNET_SYSERR;
1265     }
1266   if (size !=
1267       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1268                                  in,
1269                                  (uint16_t) size,
1270                                  (const struct
1271                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1272                                  out))
1273     {
1274       GNUNET_break (0);
1275       return GNUNET_SYSERR;
1276     }
1277 #if DEBUG_CORE
1278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279               "Decrypted %u bytes from `%4s' using key %u\n",
1280               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1281 #endif
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Encrypt size bytes from in and write the result to out.  Use the
1288  * key for outbound traffic of the given neighbour.
1289  *
1290  * @param n neighbour we are sending to
1291  * @param iv initialization vector to use
1292  * @param in ciphertext
1293  * @param out plaintext
1294  * @param size size of in/out
1295  * @return GNUNET_OK on success
1296  */
1297 static int
1298 do_encrypt (struct Neighbour *n,
1299             const GNUNET_HashCode * iv,
1300             const void *in, void *out, size_t size)
1301 {
1302   if (size != (uint16_t) size)
1303     {
1304       GNUNET_break (0);
1305       return GNUNET_NO;
1306     }
1307   GNUNET_assert (size ==
1308                  GNUNET_CRYPTO_aes_encrypt (in,
1309                                             (uint16_t) size,
1310                                             &n->encrypt_key,
1311                                             (const struct
1312                                              GNUNET_CRYPTO_AesInitializationVector
1313                                              *) iv, out));
1314 #if DEBUG_CORE
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Encrypted %u bytes for `%4s' using key %u\n", size,
1317               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1318 #endif
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Select messages for transmission.  This heuristic uses a combination
1325  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1326  * and priority-based discard (in case no feasible schedule exist) and
1327  * speculative optimization (defer any kind of transmission until
1328  * we either create a batch of significant size, 25% of max, or until
1329  * we are close to a deadline).  Furthermore, when scheduling the
1330  * heuristic also packs as many messages into the batch as possible,
1331  * starting with those with the earliest deadline.  Yes, this is fun.
1332  *
1333  * @param n neighbour to select messages from
1334  * @param size number of bytes to select for transmission
1335  * @param retry_time set to the time when we should try again
1336  *        (only valid if this function returns zero)
1337  * @return number of bytes selected, or 0 if we decided to
1338  *         defer scheduling overall; in that case, retry_time is set.
1339  */
1340 static size_t
1341 select_messages (struct Neighbour *n,
1342                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1343 {
1344   struct MessageEntry *pos;
1345   struct MessageEntry *min;
1346   struct MessageEntry *last;
1347   unsigned int min_prio;
1348   struct GNUNET_TIME_Absolute t;
1349   struct GNUNET_TIME_Absolute now;
1350   uint64_t delta;
1351   uint64_t avail;
1352   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1353   size_t off;
1354   int discard_low_prio;
1355
1356   GNUNET_assert (NULL != n->messages);
1357   now = GNUNET_TIME_absolute_get ();
1358   /* last entry in linked list of messages processed */
1359   last = NULL;
1360   /* should we remove the entry with the lowest
1361      priority from consideration for scheduling at the
1362      end of the loop? */
1363   discard_low_prio = GNUNET_YES;
1364   while (GNUNET_YES == discard_low_prio)
1365     {
1366       min = NULL;
1367       min_prio = -1;
1368       discard_low_prio = GNUNET_NO;
1369       /* calculate number of bytes available for transmission at time "t" */
1370       update_window (GNUNET_NO,
1371                      &n->available_send_window,
1372                      &n->last_asw_update,
1373                      n->bpm_out);
1374       avail = n->available_send_window;
1375       t = n->last_asw_update;
1376       /* how many bytes have we (hypothetically) scheduled so far */
1377       off = 0;
1378       /* maximum time we can wait before transmitting anything
1379          and still make all of our deadlines */
1380       slack = -1;
1381
1382       pos = n->messages;
1383       /* note that we use "*2" here because we want to look
1384          a bit further into the future; much more makes no
1385          sense since new message might be scheduled in the
1386          meantime... */
1387       while ((pos != NULL) && (off < size * 2))
1388         {
1389           if (pos->do_transmit == GNUNET_YES)
1390             {
1391               /* already removed from consideration */
1392               pos = pos->next;
1393               continue;
1394             }
1395           if (discard_low_prio == GNUNET_NO)
1396             {
1397               delta = pos->deadline.value;
1398               if (delta < t.value)
1399                 delta = 0;
1400               else
1401                 delta = t.value - delta;
1402               avail += delta * n->bpm_out / 1000 / 60;
1403               if (avail < pos->size)
1404                 {
1405                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1406                 }
1407               else
1408                 {
1409                   avail -= pos->size;
1410                   /* update slack, considering both its absolute deadline
1411                      and relative deadlines caused by other messages
1412                      with their respective load */
1413                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1414                   if (pos->deadline.value < now.value)
1415                     slack = 0;
1416                   else
1417                     slack =
1418                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1419                 }
1420             }
1421           off += pos->size;
1422           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1423           if (pos->priority <= min_prio)
1424             {
1425               /* update min for discard */
1426               min_prio = pos->priority;
1427               min = pos;
1428             }
1429           pos = pos->next;
1430         }
1431       if (discard_low_prio)
1432         {
1433           GNUNET_assert (min != NULL);
1434           /* remove lowest-priority entry from consideration */
1435           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1436         }
1437       last = pos;
1438     }
1439   /* guard against sending "tiny" messages with large headers without
1440      urgent deadlines */
1441   if ( (slack > 1000) && (size > 4 * off) )
1442     {
1443       /* less than 25% of message would be filled with
1444          deadlines still being met if we delay by one
1445          second or more; so just wait for more data */
1446       retry_time->value = slack / 2;
1447       /* reset do_transmit values for next time */
1448       while (pos != last)
1449         {
1450           pos->do_transmit = GNUNET_NO;
1451           pos = pos->next;
1452         }
1453       return 0;
1454     }
1455   /* select marked messages (up to size) for transmission */
1456   off = 0;
1457   pos = n->messages;
1458   while (pos != last)
1459     {
1460       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1461         {
1462           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1463           off += pos->size;
1464           size -= pos->size;
1465         }
1466       else
1467         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1468       pos = pos->next;
1469     }
1470 #if DEBUG_CORE
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1473               off, GNUNET_i2s (&n->peer));
1474 #endif
1475   return off;
1476 }
1477
1478
1479 /**
1480  * Batch multiple messages into a larger buffer.
1481  *
1482  * @param n neighbour to take messages from
1483  * @param buf target buffer
1484  * @param size size of buf
1485  * @param deadline set to transmission deadline for the result
1486  * @param retry_time set to the time when we should try again
1487  *        (only valid if this function returns zero)
1488  * @param priority set to the priority of the batch
1489  * @return number of bytes written to buf (can be zero)
1490  */
1491 static size_t
1492 batch_message (struct Neighbour *n,
1493                char *buf,
1494                size_t size,
1495                struct GNUNET_TIME_Absolute *deadline,
1496                struct GNUNET_TIME_Relative *retry_time,
1497                unsigned int *priority)
1498 {
1499   struct MessageEntry *pos;
1500   struct MessageEntry *prev;
1501   struct MessageEntry *next;
1502   size_t ret;
1503
1504   ret = 0;
1505   *priority = 0;
1506   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1507   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1508   if (0 == select_messages (n, size, retry_time))
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1511                   "No messages selected, will try again in %llu ms\n",
1512                   retry_time->value);
1513       return 0;
1514     }
1515   pos = n->messages;
1516   prev = NULL;
1517   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1518     {
1519       next = pos->next;
1520       if (GNUNET_YES == pos->do_transmit)
1521         {
1522           GNUNET_assert (pos->size <= size);
1523           memcpy (&buf[ret], &pos[1], pos->size);
1524           ret += pos->size;
1525           size -= pos->size;
1526           *priority += pos->priority;
1527           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1528           GNUNET_free (pos);
1529           if (prev == NULL)
1530             n->messages = next;
1531           else
1532             prev->next = next;
1533         }
1534       else
1535         {
1536           prev = pos;
1537         }
1538       pos = next;
1539     }
1540   return ret;
1541 }
1542
1543
1544 /**
1545  * Remove messages with deadlines that have long expired from
1546  * the queue.
1547  *
1548  * @param n neighbour to inspect
1549  */
1550 static void
1551 discard_expired_messages (struct Neighbour *n)
1552 {
1553   struct MessageEntry *prev;
1554   struct MessageEntry *next;
1555   struct MessageEntry *pos;
1556   struct GNUNET_TIME_Absolute now;
1557   struct GNUNET_TIME_Relative delta;
1558
1559   now = GNUNET_TIME_absolute_get ();
1560   prev = NULL;
1561   pos = n->messages;
1562   while (pos != NULL) 
1563     {
1564       next = pos->next;
1565       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1566       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1567         {
1568 #if DEBUG_CORE
1569           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1570                       "Message is %llu ms past due, discarding.\n",
1571                       delta.value);
1572 #endif
1573           if (prev == NULL)
1574             n->messages = next;
1575           else
1576             prev->next = next;
1577           GNUNET_free (pos);
1578         }
1579       else
1580         prev = pos;
1581       pos = next;
1582     }
1583 }
1584
1585
1586 /**
1587  * Signature of the main function of a task.
1588  *
1589  * @param cls closure
1590  * @param tc context information (why was this task triggered now)
1591  */
1592 static void
1593 retry_plaintext_processing (void *cls,
1594                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1595 {
1596   struct Neighbour *n = cls;
1597
1598   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1599   process_plaintext_neighbour_queue (n);
1600 }
1601
1602
1603 /**
1604  * Send our key (and encrypted PING) to the other peer.
1605  *
1606  * @param n the other peer
1607  */
1608 static void send_key (struct Neighbour *n);
1609
1610
1611 /**
1612  * Check if we have plaintext messages for the specified neighbour
1613  * pending, and if so, consider batching and encrypting them (and
1614  * then trigger processing of the encrypted queue if needed).
1615  *
1616  * @param n neighbour to check.
1617  */
1618 static void
1619 process_plaintext_neighbour_queue (struct Neighbour *n)
1620 {
1621   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1622   size_t used;
1623   size_t esize;
1624   struct EncryptedMessage *em;  /* encrypted message */
1625   struct EncryptedMessage *ph;  /* plaintext header */
1626   struct MessageEntry *me;
1627   unsigned int priority;
1628   struct GNUNET_TIME_Absolute deadline;
1629   struct GNUNET_TIME_Relative retry_time;
1630
1631   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1632     {
1633       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1634       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1635     }
1636   switch (n->status)
1637     {
1638     case PEER_STATE_DOWN:
1639       send_key (n);
1640 #if DEBUG_CORE
1641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1643                   GNUNET_i2s(&n->peer));
1644 #endif
1645       return;
1646     case PEER_STATE_KEY_SENT:
1647       GNUNET_assert (n->retry_set_key_task !=
1648                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1649 #if DEBUG_CORE
1650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1651                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1652                   GNUNET_i2s(&n->peer));
1653 #endif
1654       return;
1655     case PEER_STATE_KEY_RECEIVED:
1656       GNUNET_assert (n->retry_set_key_task !=
1657                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1658 #if DEBUG_CORE
1659       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1661                   GNUNET_i2s(&n->peer));
1662 #endif
1663       return;
1664     case PEER_STATE_KEY_CONFIRMED:
1665       /* ready to continue */
1666       break;
1667     }
1668   discard_expired_messages (n);
1669   if (n->messages == NULL)
1670     {
1671 #if DEBUG_CORE
1672       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                   "Plaintext message queue for `%4s' is empty.\n",
1674                   GNUNET_i2s(&n->peer));
1675 #endif
1676       return;                   /* no pending messages */
1677     }
1678   if (n->encrypted_head != NULL)
1679     {
1680 #if DEBUG_CORE
1681       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1683                   GNUNET_i2s(&n->peer));
1684 #endif
1685       return;                   /* wait for messages already encrypted to be
1686                                    processed first! */
1687     }
1688   ph = (struct EncryptedMessage *) pbuf;
1689   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1690   priority = 0;
1691   used = sizeof (struct EncryptedMessage);
1692   used += batch_message (n,
1693                          &pbuf[used],
1694                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1695                          &deadline, &retry_time, &priority);
1696   if (used == sizeof (struct EncryptedMessage))
1697     {
1698 #if DEBUG_CORE
1699       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1700                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1701                   GNUNET_i2s(&n->peer));
1702 #endif
1703       /* no messages selected for sending, try again later... */
1704       n->retry_plaintext_task =
1705         GNUNET_SCHEDULER_add_delayed (sched,
1706                                       GNUNET_NO,
1707                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1708                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1709                                       retry_time,
1710                                       &retry_plaintext_processing, n);
1711       return;
1712     }
1713
1714   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1715   ph->inbound_bpm_limit = htonl (n->bpm_in);
1716   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1717
1718   /* setup encryption message header */
1719   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1720   me->deadline = deadline;
1721   me->priority = priority;
1722   me->size = used;
1723   em = (struct EncryptedMessage *) &me[1];
1724   em->header.size = htons (used);
1725   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1726   em->reserved = htonl (0);
1727   esize = used - ENCRYPTED_HEADER_SIZE;
1728   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1729   /* encrypt */
1730 #if DEBUG_CORE
1731   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1732               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1733               esize,
1734               GNUNET_i2s(&n->peer));
1735 #endif
1736   GNUNET_assert (GNUNET_OK ==
1737                  do_encrypt (n,
1738                              &em->plaintext_hash,
1739                              &ph->sequence_number,
1740                              &em->sequence_number, esize));
1741   /* append to transmission list */
1742   if (n->encrypted_tail == NULL)
1743     n->encrypted_head = me;
1744   else
1745     n->encrypted_tail->next = me;
1746   n->encrypted_tail = me;
1747   process_encrypted_neighbour_queue (n);
1748 }
1749
1750
1751 /**
1752  * Handle CORE_SEND request.
1753  */
1754 static void
1755 handle_client_send (void *cls,
1756                     struct GNUNET_SERVER_Client *client,
1757                     const struct GNUNET_MessageHeader *message);
1758
1759
1760 /**
1761  * Function called to notify us that we either succeeded
1762  * or failed to connect (at the transport level) to another
1763  * peer.  We should either free the message we were asked
1764  * to transmit or re-try adding it to the queue.
1765  *
1766  * @param cls closure
1767  * @param size number of bytes available in buf
1768  * @param buf where the callee should write the message
1769  * @return number of bytes written to buf
1770  */
1771 static size_t
1772 send_connect_continuation (void *cls, size_t size, void *buf)
1773 {
1774   struct SendMessage *sm = cls;
1775
1776   if (buf == NULL)
1777     {
1778 #if DEBUG_CORE
1779       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1780                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1781                   GNUNET_i2s (&sm->peer));
1782 #endif
1783       GNUNET_free (sm);
1784       return 0;
1785     }
1786 #if DEBUG_CORE
1787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1789               GNUNET_i2s (&sm->peer));
1790 #endif
1791   handle_client_send (NULL, NULL, &sm->header);
1792   GNUNET_free (sm);
1793   return 0;
1794 }
1795
1796
1797 /**
1798  * Handle CORE_SEND request.
1799  */
1800 static void
1801 handle_client_send (void *cls,
1802                     struct GNUNET_SERVER_Client *client,
1803                     const struct GNUNET_MessageHeader *message)
1804 {
1805   const struct SendMessage *sm;
1806   struct SendMessage *smc;
1807   const struct GNUNET_MessageHeader *mh;
1808   struct Neighbour *n;
1809   struct MessageEntry *prev;
1810   struct MessageEntry *pos;
1811   struct MessageEntry *e; 
1812   struct MessageEntry *min_prio_entry;
1813   struct MessageEntry *min_prio_prev;
1814   unsigned int min_prio;
1815   unsigned int queue_size;
1816   uint16_t msize;
1817
1818   msize = ntohs (message->size);
1819   if (msize <
1820       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1821     {
1822       GNUNET_break (0);
1823       if (client != NULL)
1824         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1825       return;
1826     }
1827   sm = (const struct SendMessage *) message;
1828   msize -= sizeof (struct SendMessage);
1829   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1830   if (msize != ntohs (mh->size))
1831     {
1832       GNUNET_break (0);
1833       if (client != NULL)
1834         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1835       return;
1836     }
1837   n = find_neighbour (&sm->peer);
1838   if (n == NULL)
1839     {
1840 #if DEBUG_CORE
1841       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1843                   "SEND",
1844                   GNUNET_i2s (&sm->peer),
1845                   GNUNET_TIME_absolute_get_remaining
1846                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1847 #endif
1848       msize += sizeof (struct SendMessage);
1849       /* ask transport to connect to the peer */
1850       smc = GNUNET_malloc (msize);
1851       memcpy (smc, sm, msize);
1852       if (NULL ==
1853           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1854                                                   &sm->peer,
1855                                                   0, 0,
1856                                                   GNUNET_TIME_absolute_get_remaining
1857                                                   (GNUNET_TIME_absolute_ntoh
1858                                                    (sm->deadline)),
1859                                                   &send_connect_continuation,
1860                                                   smc))
1861         {
1862           /* transport has already a request pending for this peer! */
1863 #if DEBUG_CORE
1864           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1865                       "Dropped second message destined for `%4s' since connection is still down.\n",
1866                       GNUNET_i2s(&sm->peer));
1867 #endif
1868           GNUNET_free (smc);
1869         }
1870       if (client != NULL)
1871         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1872       return;
1873     }
1874 #if DEBUG_CORE
1875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1876               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1877               "SEND",
1878               msize, 
1879               GNUNET_i2s (&sm->peer));
1880 #endif
1881   /* bound queue size */
1882   discard_expired_messages (n);
1883   min_prio = (unsigned int) -1;
1884   queue_size = 0;
1885   prev = NULL;
1886   pos = n->messages;
1887   while (pos != NULL) 
1888     {
1889       if (pos->priority < min_prio)
1890         {
1891           min_prio_entry = pos;
1892           min_prio_prev = prev;
1893           min_prio = pos->priority;
1894         }
1895       queue_size++;
1896       prev = pos;
1897       pos = pos->next;
1898     }
1899   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1900     {
1901       /* queue full */
1902       if (ntohl(sm->priority) <= min_prio)
1903         {
1904           /* discard new entry */
1905 #if DEBUG_CORE
1906           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1907                       "Queue full, discarding new request\n");
1908 #endif
1909           if (client != NULL)
1910             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1911           return;
1912         }
1913       /* discard "min_prio_entry" */
1914 #if DEBUG_CORE
1915       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916                   "Queue full, discarding existing older request\n");
1917 #endif
1918       if (min_prio_prev == NULL)
1919         n->messages = min_prio_entry->next;
1920       else
1921         min_prio_prev->next = min_prio_entry->next;      
1922       GNUNET_free (min_prio_entry);     
1923     }
1924   
1925   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1926   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1927   e->priority = ntohl (sm->priority);
1928   e->size = msize;
1929   memcpy (&e[1], mh, msize);
1930
1931   /* insert, keep list sorted by deadline */
1932   prev = NULL;
1933   pos = n->messages;
1934   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1935     {
1936       prev = pos;
1937       pos = pos->next;
1938     }
1939   if (prev == NULL)
1940     n->messages = e;
1941   else
1942     prev->next = e;
1943   e->next = pos;
1944
1945   /* consider scheduling now */
1946   process_plaintext_neighbour_queue (n);
1947   if (client != NULL)
1948     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1949 }
1950
1951
1952 /**
1953  * List of handlers for the messages understood by this
1954  * service.
1955  */
1956 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1957   {&handle_client_init, NULL,
1958    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1959   {&handle_client_request_configure, NULL,
1960    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1961    sizeof (struct RequestConfigureMessage)},
1962   {&handle_client_send, NULL,
1963    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1964   {NULL, NULL, 0, 0}
1965 };
1966
1967
1968 /**
1969  * PEERINFO is giving us a HELLO for a peer.  Add the
1970  * public key to the neighbour's struct and retry
1971  * send_key.  Or, if we did not get a HELLO, just do
1972  * nothing.
1973  *
1974  * @param cls NULL
1975  * @param peer the peer for which this is the HELLO
1976  * @param hello HELLO message of that peer
1977  * @param trust amount of trust we currently have in that peer
1978  */
1979 static void
1980 process_hello_retry_send_key (void *cls,
1981                               const struct GNUNET_PeerIdentity *peer,
1982                               const struct GNUNET_HELLO_Message *hello,
1983                               uint32_t trust)
1984 {
1985   struct Neighbour *n;
1986
1987   if (peer == NULL)
1988     return;
1989   n = find_neighbour (peer);
1990   if (n == NULL)
1991     return;
1992   if (n->public_key != NULL)
1993     return;
1994 #if DEBUG_CORE
1995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996               "Received new `%s' message for `%4s', initiating key exchange.\n",
1997               "HELLO",
1998               GNUNET_i2s (peer));
1999 #endif
2000   n->public_key =
2001     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2002   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2003     {
2004       GNUNET_free (n->public_key);
2005       n->public_key = NULL;
2006       return;
2007     }
2008   send_key (n);
2009 }
2010
2011
2012 /**
2013  * Task that will retry "send_key" if our previous attempt failed
2014  * to yield a PONG.
2015  */
2016 static void
2017 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2018 {
2019   struct Neighbour *n = cls;
2020
2021   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2022   n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2023   n->set_key_retry_frequency =
2024     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2025   send_key (n);
2026 }
2027
2028
2029 /**
2030  * Send our key (and encrypted PING) to the other peer.
2031  *
2032  * @param n the other peer
2033  */
2034 static void
2035 send_key (struct Neighbour *n)
2036 {
2037   struct SetKeyMessage *sm;
2038   struct MessageEntry *me;
2039   struct PingMessage pp;
2040   struct PingMessage *pm;
2041
2042 #if DEBUG_CORE
2043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2044               "Asked to perform key exchange with `%4s'.\n",
2045               GNUNET_i2s (&n->peer));
2046 #endif
2047   if (n->public_key == NULL)
2048     {
2049       /* lookup n's public key, then try again */
2050 #if DEBUG_CORE
2051       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052                   "Lacking public key for `%4s', trying to obtain one.\n",
2053                   GNUNET_i2s (&n->peer));
2054 #endif
2055       GNUNET_PEERINFO_for_all (cfg,
2056                                sched,
2057                                &n->peer,
2058                                0,
2059                                GNUNET_TIME_UNIT_MINUTES,
2060                                &process_hello_retry_send_key, NULL);
2061       return;
2062     }
2063   /* first, set key message */
2064   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2065                       sizeof (struct SetKeyMessage));
2066   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2067   me->priority = SET_KEY_PRIORITY;
2068   me->size = sizeof (struct SetKeyMessage);
2069   if (n->encrypted_head == NULL)
2070     n->encrypted_head = me;
2071   else
2072     n->encrypted_tail->next = me;
2073   n->encrypted_tail = me;
2074   sm = (struct SetKeyMessage *) &me[1];
2075   sm->header.size = htons (sizeof (struct SetKeyMessage));
2076   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2077   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2078                                         PEER_STATE_KEY_SENT : n->status));
2079   sm->purpose.size =
2080     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2081            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2082            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2083            sizeof (struct GNUNET_PeerIdentity));
2084   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2085   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2086   sm->target = n->peer;
2087   GNUNET_assert (GNUNET_OK ==
2088                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2089                                             sizeof (struct
2090                                                     GNUNET_CRYPTO_AesSessionKey),
2091                                             n->public_key,
2092                                             &sm->encrypted_key));
2093   GNUNET_assert (GNUNET_OK ==
2094                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2095                                          &sm->signature));
2096
2097   /* second, encrypted PING message */
2098   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2099                       sizeof (struct PingMessage));
2100   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2101   me->priority = PING_PRIORITY;
2102   me->size = sizeof (struct PingMessage);
2103   n->encrypted_tail->next = me;
2104   n->encrypted_tail = me;
2105   pm = (struct PingMessage *) &me[1];
2106   pm->header.size = htons (sizeof (struct PingMessage));
2107   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2108   pp.challenge = htonl (n->ping_challenge);
2109   pp.target = n->peer;
2110 #if DEBUG_CORE
2111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2113               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2115               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2116               "PING",
2117               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2118 #endif
2119   do_encrypt (n,
2120               &n->peer.hashPubKey,
2121               &pp.challenge,
2122               &pm->challenge,
2123               sizeof (struct PingMessage) -
2124               sizeof (struct GNUNET_MessageHeader));
2125   /* update status */
2126   switch (n->status)
2127     {
2128     case PEER_STATE_DOWN:
2129       n->status = PEER_STATE_KEY_SENT;
2130       break;
2131     case PEER_STATE_KEY_SENT:
2132       break;
2133     case PEER_STATE_KEY_RECEIVED:
2134       break;
2135     case PEER_STATE_KEY_CONFIRMED:
2136       GNUNET_break (0);
2137       break;
2138     default:
2139       GNUNET_break (0);
2140       break;
2141     }
2142   /* trigger queue processing */
2143   process_encrypted_neighbour_queue (n);
2144   if (n->status != PEER_STATE_KEY_CONFIRMED)
2145     n->retry_set_key_task
2146       = GNUNET_SCHEDULER_add_delayed (sched,
2147                                       GNUNET_NO,
2148                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2149                                       GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2150                                       n->set_key_retry_frequency,
2151                                       &set_key_retry_task, n);
2152 }
2153
2154
2155 /**
2156  * We received a SET_KEY message.  Validate and update
2157  * our key material and status.
2158  *
2159  * @param n the neighbour from which we received message m
2160  * @param m the set key message we received
2161  */
2162 static void
2163 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2164
2165
2166 /**
2167  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2168  * the neighbour's struct and retry handling the set_key message.  Or,
2169  * if we did not get a HELLO, just free the set key message.
2170  *
2171  * @param cls pointer to the set key message
2172  * @param peer the peer for which this is the HELLO
2173  * @param hello HELLO message of that peer
2174  * @param trust amount of trust we currently have in that peer
2175  */
2176 static void
2177 process_hello_retry_handle_set_key (void *cls,
2178                                     const struct GNUNET_PeerIdentity *peer,
2179                                     const struct GNUNET_HELLO_Message *hello,
2180                                     uint32_t trust)
2181 {
2182   struct SetKeyMessage *sm = cls;
2183   struct Neighbour *n;
2184
2185   if (peer == NULL)
2186     {
2187       GNUNET_free (sm);
2188       return;
2189     }
2190   n = find_neighbour (peer);
2191   if (n == NULL)
2192     {
2193       GNUNET_break (0);
2194       return;
2195     }
2196   if (n->public_key != NULL)
2197     return;                     /* multiple HELLOs match!? */
2198   n->public_key =
2199     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2200   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2201     {
2202       GNUNET_break_op (0);
2203       GNUNET_free (n->public_key);
2204       n->public_key = NULL;
2205       return;
2206     }
2207 #if DEBUG_CORE
2208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2209               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2210               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2211 #endif
2212   handle_set_key (n, sm);
2213 }
2214
2215
2216 /**
2217  * We received a PING message.  Validate and transmit
2218  * PONG.
2219  *
2220  * @param n sender of the PING
2221  * @param m the encrypted PING message itself
2222  */
2223 static void
2224 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2225 {
2226   struct PingMessage t;
2227   struct PingMessage *tp;
2228   struct MessageEntry *me;
2229
2230 #if DEBUG_CORE
2231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232               "Core service receives `%s' request from `%4s'.\n",
2233               "PING", GNUNET_i2s (&n->peer));
2234 #endif
2235   if (GNUNET_OK !=
2236       do_decrypt (n,
2237                   &my_identity.hashPubKey,
2238                   &m->challenge,
2239                   &t.challenge,
2240                   sizeof (struct PingMessage) -
2241                   sizeof (struct GNUNET_MessageHeader)))
2242     return;
2243 #if DEBUG_CORE
2244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2245               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2246               "PING",
2247               GNUNET_i2s (&t.target),
2248               ntohl (t.challenge), n->decrypt_key.crc32);
2249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250               "Target of `%s' request is `%4s'.\n",
2251               "PING", GNUNET_i2s (&t.target));
2252 #endif
2253   if (0 != memcmp (&t.target,
2254                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2255     {
2256       GNUNET_break_op (0);
2257       return;
2258     }
2259   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2260                       sizeof (struct PingMessage));
2261   if (n->encrypted_tail != NULL)
2262     n->encrypted_tail->next = me;
2263   else
2264     {
2265       n->encrypted_tail = me;
2266       n->encrypted_head = me;
2267     }
2268   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2269   me->priority = PONG_PRIORITY;
2270   me->size = sizeof (struct PingMessage);
2271   tp = (struct PingMessage *) &me[1];
2272   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2273   tp->header.size = htons (sizeof (struct PingMessage));
2274   do_encrypt (n,
2275               &my_identity.hashPubKey,
2276               &t.challenge,
2277               &tp->challenge,
2278               sizeof (struct PingMessage) -
2279               sizeof (struct GNUNET_MessageHeader));
2280 #if DEBUG_CORE
2281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2282               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2283               ntohl (t.challenge), n->encrypt_key.crc32);
2284 #endif
2285   /* trigger queue processing */
2286   process_encrypted_neighbour_queue (n);
2287 }
2288
2289
2290 /**
2291  * We received a SET_KEY message.  Validate and update
2292  * our key material and status.
2293  *
2294  * @param n the neighbour from which we received message m
2295  * @param m the set key message we received
2296  */
2297 static void
2298 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2299 {
2300   struct SetKeyMessage *m_cpy;
2301   struct GNUNET_TIME_Absolute t;
2302   struct GNUNET_CRYPTO_AesSessionKey k;
2303   struct PingMessage *ping;
2304   enum PeerStateMachine sender_status;
2305
2306 #if DEBUG_CORE
2307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308               "Core service receives `%s' request from `%4s'.\n",
2309               "SET_KEY", GNUNET_i2s (&n->peer));
2310 #endif
2311   if (n->public_key == NULL)
2312     {
2313 #if DEBUG_CORE
2314       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2315                   "Lacking public key for peer, trying to obtain one.\n");
2316 #endif
2317       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2318       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2319       /* lookup n's public key, then try again */
2320       GNUNET_PEERINFO_for_all (cfg,
2321                                sched,
2322                                &n->peer,
2323                                0,
2324                                GNUNET_TIME_UNIT_MINUTES,
2325                                &process_hello_retry_handle_set_key, m_cpy);
2326       return;
2327     }
2328   if ((ntohl (m->purpose.size) !=
2329        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2330        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2331        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2332        sizeof (struct GNUNET_PeerIdentity)) ||
2333       (GNUNET_OK !=
2334        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2335                                  &m->purpose, &m->signature, n->public_key)))
2336     {
2337       /* invalid signature */
2338       GNUNET_break_op (0);
2339       return;
2340     }
2341   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2342   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2343        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2344       (t.value < n->decrypt_key_created.value))
2345     {
2346       /* this could rarely happen due to massive re-ordering of
2347          messages on the network level, but is most likely either
2348          a bug or some adversary messing with us.  Report. */
2349       GNUNET_break_op (0);
2350       return;
2351     }
2352 #if DEBUG_CORE
2353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2354 #endif
2355   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2356                                   &m->encrypted_key,
2357                                   &k,
2358                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2359        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2360       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2361     {
2362       /* failed to decrypt !? */
2363       GNUNET_break_op (0);
2364       return;
2365     }
2366
2367   n->decrypt_key = k;
2368   if (n->decrypt_key_created.value != t.value)
2369     {
2370       /* fresh key, reset sequence numbers */
2371       n->last_sequence_number_received = 0;
2372       n->last_packets_bitmap = 0;
2373       n->decrypt_key_created = t;
2374     }
2375   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2376   switch (n->status)
2377     {
2378     case PEER_STATE_DOWN:
2379       n->status = PEER_STATE_KEY_RECEIVED;
2380 #if DEBUG_CORE
2381       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2382                   "Responding to `%s' with my own key.\n", "SET_KEY");
2383 #endif
2384       send_key (n);
2385       break;
2386     case PEER_STATE_KEY_SENT:
2387     case PEER_STATE_KEY_RECEIVED:
2388       n->status = PEER_STATE_KEY_RECEIVED;
2389       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2390           (sender_status != PEER_STATE_KEY_CONFIRMED))
2391         {
2392 #if DEBUG_CORE
2393           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2394                       "Responding to `%s' with my own key (other peer has status %u).\n",
2395                       "SET_KEY", sender_status);
2396 #endif
2397           send_key (n);
2398         }
2399       break;
2400     case PEER_STATE_KEY_CONFIRMED:
2401       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2402           (sender_status != PEER_STATE_KEY_CONFIRMED))
2403         {
2404 #if DEBUG_CORE
2405           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2406                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2407                       "SET_KEY", sender_status);
2408 #endif
2409           send_key (n);
2410         }
2411       break;
2412     default:
2413       GNUNET_break (0);
2414       break;
2415     }
2416   if (n->pending_ping != NULL)
2417     {
2418       ping = n->pending_ping;
2419       n->pending_ping = NULL;
2420       handle_ping (n, ping);
2421       GNUNET_free (ping);
2422     }
2423 }
2424
2425
2426 /**
2427  * We received a PONG message.  Validate and update
2428  * our status.
2429  *
2430  * @param n sender of the PONG
2431  * @param m the encrypted PONG message itself
2432  */
2433 static void
2434 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2435 {
2436   struct PingMessage t;
2437
2438 #if DEBUG_CORE
2439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2440               "Core service receives `%s' request from `%4s'.\n",
2441               "PONG", GNUNET_i2s (&n->peer));
2442 #endif
2443   if (GNUNET_OK !=
2444       do_decrypt (n,
2445                   &n->peer.hashPubKey,
2446                   &m->challenge,
2447                   &t.challenge,
2448                   sizeof (struct PingMessage) -
2449                   sizeof (struct GNUNET_MessageHeader)))
2450     return;
2451 #if DEBUG_CORE
2452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2453               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2454               "PONG",
2455               GNUNET_i2s (&t.target),
2456               ntohl (t.challenge), n->decrypt_key.crc32);
2457 #endif
2458   if ((0 != memcmp (&t.target,
2459                     &n->peer,
2460                     sizeof (struct GNUNET_PeerIdentity))) ||
2461       (n->ping_challenge != ntohl (t.challenge)))
2462     {
2463       /* PONG malformed */
2464 #if DEBUG_CORE
2465       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2466                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2467                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2468       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2469                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2470                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2471 #endif
2472       GNUNET_break_op (0);
2473       return;
2474     }
2475   switch (n->status)
2476     {
2477     case PEER_STATE_DOWN:
2478       GNUNET_break (0);         /* should be impossible */
2479       return;
2480     case PEER_STATE_KEY_SENT:
2481       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2482       return;
2483     case PEER_STATE_KEY_RECEIVED:
2484       n->status = PEER_STATE_KEY_CONFIRMED;
2485       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
2486         {
2487           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2488           n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2489         }
2490       process_encrypted_neighbour_queue (n);
2491       break;
2492     case PEER_STATE_KEY_CONFIRMED:
2493       /* duplicate PONG? */
2494       break;
2495     default:
2496       GNUNET_break (0);
2497       break;
2498     }
2499 }
2500
2501
2502 /**
2503  * Send a P2P message to a client.
2504  *
2505  * @param sender who sent us the message?
2506  * @param client who should we give the message to?
2507  * @param m contains the message to transmit
2508  * @param msize number of bytes in buf to transmit
2509  */
2510 static void
2511 send_p2p_message_to_client (struct Neighbour *sender,
2512                             struct Client *client,
2513                             const void *m, size_t msize)
2514 {
2515   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2516   struct NotifyTrafficMessage *ntm;
2517
2518 #if DEBUG_CORE
2519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2520               "Core service passes message from `%4s' of type %u to client.\n",
2521               GNUNET_i2s(&sender->peer),
2522               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2523 #endif
2524   ntm = (struct NotifyTrafficMessage *) buf;
2525   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2526   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2527   ntm->reserved = htonl (0);
2528   ntm->peer = sender->peer;
2529   memcpy (&ntm[1], m, msize);
2530   send_to_client (client, &ntm->header, GNUNET_YES);
2531 }
2532
2533
2534 /**
2535  * Deliver P2P message to interested clients.
2536  *
2537  * @param sender who sent us the message?
2538  * @param m the message
2539  * @param msize size of the message (including header)
2540  */
2541 static void
2542 deliver_message (struct Neighbour *sender,
2543                  const struct GNUNET_MessageHeader *m, size_t msize)
2544 {
2545   struct Client *cpos;
2546   uint16_t type;
2547   unsigned int tpos;
2548   int deliver_full;
2549
2550   type = ntohs (m->type);
2551   cpos = clients;
2552   while (cpos != NULL)
2553     {
2554       deliver_full = GNUNET_NO;
2555       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2556         deliver_full = GNUNET_YES;
2557       else
2558         {
2559           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2560             {
2561               if (type != cpos->types[tpos])
2562                 continue;
2563               deliver_full = GNUNET_YES;
2564               break;
2565             }
2566         }
2567       if (GNUNET_YES == deliver_full)
2568         send_p2p_message_to_client (sender, cpos, m, msize);
2569       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2570         send_p2p_message_to_client (sender, cpos, m,
2571                                     sizeof (struct GNUNET_MessageHeader));
2572       cpos = cpos->next;
2573     }
2574 }
2575
2576
2577 /**
2578  * Align P2P message and then deliver to interested clients.
2579  *
2580  * @param sender who sent us the message?
2581  * @param buffer unaligned (!) buffer containing message
2582  * @param msize size of the message (including header)
2583  */
2584 static void
2585 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2586 {
2587   char abuf[msize];
2588
2589   /* TODO: call to statistics? */
2590   memcpy (abuf, buffer, msize);
2591   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2592 }
2593
2594
2595 /**
2596  * Deliver P2P messages to interested clients.
2597  *
2598  * @param sender who sent us the message?
2599  * @param buffer buffer containing messages, can be modified
2600  * @param buffer_size size of the buffer (overall)
2601  * @param offset offset where messages in the buffer start
2602  */
2603 static void
2604 deliver_messages (struct Neighbour *sender,
2605                   const char *buffer, size_t buffer_size, size_t offset)
2606 {
2607   struct GNUNET_MessageHeader *mhp;
2608   struct GNUNET_MessageHeader mh;
2609   uint16_t msize;
2610   int need_align;
2611
2612   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2613     {
2614       if (0 != offset % sizeof (uint16_t))
2615         {
2616           /* outch, need to copy to access header */
2617           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2618           mhp = &mh;
2619         }
2620       else
2621         {
2622           /* can access header directly */
2623           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2624         }
2625       msize = ntohs (mhp->size);
2626       if (msize + offset > buffer_size)
2627         {
2628           /* malformed message, header says it is larger than what
2629              would fit into the overall buffer */
2630           GNUNET_break_op (0);
2631           break;
2632         }
2633 #if HAVE_UNALIGNED_64_ACCESS
2634       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2635 #else
2636       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2637 #endif
2638       if (GNUNET_YES == need_align)
2639         align_and_deliver (sender, &buffer[offset], msize);
2640       else
2641         deliver_message (sender,
2642                          (const struct GNUNET_MessageHeader *)
2643                          &buffer[offset], msize);
2644       offset += msize;
2645     }
2646 }
2647
2648
2649 /**
2650  * We received an encrypted message.  Decrypt, validate and
2651  * pass on to the appropriate clients.
2652  */
2653 static void
2654 handle_encrypted_message (struct Neighbour *n,
2655                           const struct EncryptedMessage *m)
2656 {
2657   size_t size = ntohs (m->header.size);
2658   char buf[size];
2659   struct EncryptedMessage *pt;  /* plaintext */
2660   GNUNET_HashCode ph;
2661   size_t off;
2662   uint32_t snum;
2663   struct GNUNET_TIME_Absolute t;
2664
2665 #if DEBUG_CORE
2666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2667               "Core service receives `%s' request from `%4s'.\n",
2668               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2669 #endif  
2670   /* decrypt */
2671   if (GNUNET_OK !=
2672       do_decrypt (n,
2673                   &m->plaintext_hash,
2674                   &m->sequence_number,
2675                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2676     return;
2677   pt = (struct EncryptedMessage *) buf;
2678
2679   /* validate hash */
2680   GNUNET_CRYPTO_hash (&pt->sequence_number,
2681                       size - ENCRYPTED_HEADER_SIZE, &ph);
2682   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2683     {
2684       /* checksum failed */
2685       GNUNET_break_op (0);
2686       return;
2687     }
2688
2689   /* validate sequence number */
2690   snum = ntohl (pt->sequence_number);
2691   if (n->last_sequence_number_received == snum)
2692     {
2693       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2694                   "Received duplicate message, ignoring.\n");
2695       /* duplicate, ignore */
2696       return;
2697     }
2698   if ((n->last_sequence_number_received > snum) &&
2699       (n->last_sequence_number_received - snum > 32))
2700     {
2701       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2702                   "Received ancient out of sequence message, ignoring.\n");
2703       /* ancient out of sequence, ignore */
2704       return;
2705     }
2706   if (n->last_sequence_number_received > snum)
2707     {
2708       unsigned int rotbit =
2709         1 << (n->last_sequence_number_received - snum - 1);
2710       if ((n->last_packets_bitmap & rotbit) != 0)
2711         {
2712           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2713                       "Received duplicate message, ignoring.\n");
2714           /* duplicate, ignore */
2715           return;
2716         }
2717       n->last_packets_bitmap |= rotbit;
2718     }
2719   if (n->last_sequence_number_received < snum)
2720     {
2721       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2722       n->last_sequence_number_received = snum;
2723     }
2724
2725   /* check timestamp */
2726   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2727   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2728     {
2729       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2730                   _
2731                   ("Message received far too old (%llu ms). Content ignored.\n"),
2732                   GNUNET_TIME_absolute_get_duration (t).value);
2733       return;
2734     }
2735
2736   /* process decrypted message(s) */
2737   update_window (GNUNET_YES,
2738                  &n->available_send_window,
2739                  &n->last_asw_update,
2740                  n->bpm_out);
2741   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2742   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2743                            n->bpm_out_internal_limit);
2744   n->last_activity = GNUNET_TIME_absolute_get ();
2745   off = sizeof (struct EncryptedMessage);
2746   deliver_messages (n, buf, size, off);
2747 }
2748
2749
2750 /**
2751  * Function called by the transport for each received message.
2752  *
2753  * @param cls closure
2754  * @param latency estimated latency for communicating with the
2755  *             given peer
2756  * @param peer (claimed) identity of the other peer
2757  * @param message the message
2758  */
2759 static void
2760 handle_transport_receive (void *cls,
2761                           struct GNUNET_TIME_Relative latency,
2762                           const struct GNUNET_PeerIdentity *peer,
2763                           const struct GNUNET_MessageHeader *message)
2764 {
2765   struct Neighbour *n;
2766   struct GNUNET_TIME_Absolute now;
2767   int up;
2768   uint16_t type;
2769   uint16_t size;
2770
2771 #if DEBUG_CORE
2772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773               "Received message of type %u from `%4s', demultiplexing.\n",
2774               ntohs (message->type), GNUNET_i2s (peer));
2775 #endif
2776   n = find_neighbour (peer);
2777   if (n == NULL)
2778     {
2779       GNUNET_break (0);
2780       return;
2781     }
2782   n->last_latency = latency;
2783   up = n->status == PEER_STATE_KEY_CONFIRMED;
2784   type = ntohs (message->type);
2785   size = ntohs (message->size);
2786   switch (type)
2787     {
2788     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2789       if (size != sizeof (struct SetKeyMessage))
2790         {
2791           GNUNET_break_op (0);
2792           return;
2793         }
2794       handle_set_key (n, (const struct SetKeyMessage *) message);
2795       break;
2796     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2797       if (size < sizeof (struct EncryptedMessage) +
2798           sizeof (struct GNUNET_MessageHeader))
2799         {
2800           GNUNET_break_op (0);
2801           return;
2802         }
2803       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2804           (n->status != PEER_STATE_KEY_CONFIRMED))
2805         {
2806           GNUNET_break_op (0);
2807           return;
2808         }
2809       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2810       break;
2811     case GNUNET_MESSAGE_TYPE_CORE_PING:
2812       if (size != sizeof (struct PingMessage))
2813         {
2814           GNUNET_break_op (0);
2815           return;
2816         }
2817       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2818           (n->status != PEER_STATE_KEY_CONFIRMED))
2819         {
2820 #if DEBUG_CORE
2821           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2822                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2823                       "PING", GNUNET_i2s (&n->peer));
2824 #endif
2825           GNUNET_free_non_null (n->pending_ping);
2826           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2827           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2828           return;
2829         }
2830       handle_ping (n, (const struct PingMessage *) message);
2831       break;
2832     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2833       if (size != sizeof (struct PingMessage))
2834         {
2835           GNUNET_break_op (0);
2836           return;
2837         }
2838       if ((n->status != PEER_STATE_KEY_SENT) &&
2839           (n->status != PEER_STATE_KEY_RECEIVED) &&
2840           (n->status != PEER_STATE_KEY_CONFIRMED))
2841         {
2842           /* could not decrypt pong, oops! */
2843           GNUNET_break_op (0);
2844           return;
2845         }
2846       handle_pong (n, (const struct PingMessage *) message);
2847       break;
2848     default:
2849       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2850                   _("Unsupported message of type %u received.\n"), type);
2851       return;
2852     }
2853   if (n->status == PEER_STATE_KEY_CONFIRMED)
2854     {
2855       now = GNUNET_TIME_absolute_get ();
2856       n->last_activity = now;
2857       if (!up)
2858         n->time_established = now;
2859     }
2860 }
2861
2862
2863 /**
2864  * Function that recalculates the bandwidth quota for the
2865  * given neighbour and transmits it to the transport service.
2866  * 
2867  * @param cls neighbour for the quota update
2868  * @param tc context
2869  */
2870 static void
2871 neighbour_quota_update (void *cls,
2872                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2873
2874
2875 /**
2876  * Schedule the task that will recalculate the bandwidth
2877  * quota for this peer (and possibly force a disconnect of
2878  * idle peers by calculating a bandwidth of zero).
2879  */
2880 static void
2881 schedule_quota_update (struct Neighbour *n)
2882 {
2883   GNUNET_assert (n->quota_update_task ==
2884                  GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
2885   n->quota_update_task
2886     = GNUNET_SCHEDULER_add_delayed (sched,
2887                                     GNUNET_NO,
2888                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2889                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
2890                                     QUOTA_UPDATE_FREQUENCY,
2891                                     &neighbour_quota_update,
2892                                     n);
2893 }
2894
2895
2896 /**
2897  * Function that recalculates the bandwidth quota for the
2898  * given neighbour and transmits it to the transport service.
2899  * 
2900  * @param cls neighbour for the quota update
2901  * @param tc context
2902  */
2903 static void
2904 neighbour_quota_update (void *cls,
2905                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2906 {
2907   struct Neighbour *n = cls;
2908   uint32_t q_in;
2909   double pref_rel;
2910   double share;
2911   unsigned long long distributable;
2912   
2913   n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
2914   /* calculate relative preference among all neighbours;
2915      divides by a bit more to avoid division by zero AND to
2916      account for possibility of new neighbours joining any time 
2917      AND to convert to double... */
2918   pref_rel = n->current_preference / (1.0 + preference_sum);
2919   share = 0;
2920   distributable = 0;
2921   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2922     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2923   share = distributable * pref_rel;
2924   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2925   /* check if we want to disconnect for good due to inactivity */
2926   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2927        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2928     q_in = 0; /* force disconnect */
2929   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2930        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2931     {
2932       n->bpm_in = q_in;
2933       GNUNET_TRANSPORT_set_quota (transport,
2934                                   &n->peer,
2935                                   n->bpm_in, 
2936                                   n->bpm_out,
2937                                   GNUNET_TIME_UNIT_FOREVER_REL,
2938                                   NULL, NULL);
2939     }
2940   schedule_quota_update (n);
2941 }
2942
2943
2944 /**
2945  * Function called by transport to notify us that
2946  * a peer connected to us (on the network level).
2947  *
2948  * @param cls closure
2949  * @param peer the peer that connected
2950  * @param latency current latency of the connection
2951  */
2952 static void
2953 handle_transport_notify_connect (void *cls,
2954                                  const struct GNUNET_PeerIdentity *peer,
2955                                  struct GNUNET_TIME_Relative latency)
2956 {
2957   struct Neighbour *n;
2958   struct GNUNET_TIME_Absolute now;
2959   struct ConnectNotifyMessage cnm;
2960
2961   n = find_neighbour (peer);
2962   if (n != NULL)
2963     {
2964       /* duplicate connect notification!? */
2965       GNUNET_break (0);
2966       return;
2967     }
2968   now = GNUNET_TIME_absolute_get ();
2969   n = GNUNET_malloc (sizeof (struct Neighbour));
2970   n->next = neighbours;
2971   neighbours = n;
2972   neighbour_count++;
2973   n->peer = *peer;
2974   n->last_latency = latency;
2975   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2976   n->encrypt_key_created = now;
2977   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2978   n->last_asw_update = now;
2979   n->last_arw_update = now;
2980   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2981   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2982   n->bpm_out_internal_limit = (uint32_t) - 1;
2983   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2984   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2985                                                 (uint32_t) - 1);
2986 #if DEBUG_CORE
2987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2988               "Received connection from `%4s'.\n",
2989               GNUNET_i2s (&n->peer));
2990 #endif
2991   schedule_quota_update (n);
2992   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2993   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2994   cnm.reserved = htonl (0);
2995   cnm.peer = *peer;
2996   send_to_all_clients (&cnm.header, GNUNET_YES);
2997 }
2998
2999
3000 /**
3001  * Free the given entry for the neighbour (it has
3002  * already been removed from the list at this point).
3003  *
3004  * @param n neighbour to free
3005  */
3006 static void
3007 free_neighbour (struct Neighbour *n)
3008 {
3009   struct MessageEntry *m;
3010
3011   while (NULL != (m = n->messages))
3012     {
3013       n->messages = m->next;
3014       GNUNET_free (m);
3015     }
3016   while (NULL != (m = n->encrypted_head))
3017     {
3018       n->encrypted_head = m->next;
3019       GNUNET_free (m);
3020     }
3021   if (NULL != n->th)
3022     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3023   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3024     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3025   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3026     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3027   if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
3028     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3029   GNUNET_free_non_null (n->public_key);
3030   GNUNET_free_non_null (n->pending_ping);
3031   GNUNET_free (n);
3032 }
3033
3034
3035 /**
3036  * Function called by transport telling us that a peer
3037  * disconnected.
3038  *
3039  * @param cls closure
3040  * @param peer the peer that disconnected
3041  */
3042 static void
3043 handle_transport_notify_disconnect (void *cls,
3044                                     const struct GNUNET_PeerIdentity *peer)
3045 {
3046   struct ConnectNotifyMessage cnm;
3047   struct Neighbour *n;
3048   struct Neighbour *p;
3049
3050 #if DEBUG_CORE
3051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3053 #endif
3054   p = NULL;
3055   n = neighbours;
3056   while ((n != NULL) &&
3057          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3058     {
3059       p = n;
3060       n = n->next;
3061     }
3062   if (n == NULL)
3063     {
3064       GNUNET_break (0);
3065       return;
3066     }
3067   if (p == NULL)
3068     neighbours = n->next;
3069   else
3070     p->next = n->next;
3071   GNUNET_assert (neighbour_count > 0);
3072   neighbour_count--;
3073   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3074   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3075   cnm.reserved = htonl (0);
3076   cnm.peer = *peer;
3077   send_to_all_clients (&cnm.header, GNUNET_YES);
3078   free_neighbour (n);
3079 }
3080
3081
3082 /**
3083  * Last task run during shutdown.  Disconnects us from
3084  * the transport.
3085  */
3086 static void
3087 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3088 {
3089   struct Neighbour *n;
3090   struct Client *c;
3091
3092 #if DEBUG_CORE
3093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3094               "Core service shutting down.\n");
3095 #endif
3096   GNUNET_assert (transport != NULL);
3097   GNUNET_TRANSPORT_disconnect (transport);
3098   transport = NULL;
3099   while (NULL != (n = neighbours))
3100     {
3101       neighbours = n->next;
3102       GNUNET_assert (neighbour_count > 0);
3103       neighbour_count--;
3104       free_neighbour (n);
3105     }
3106   while (NULL != (c = clients))
3107     handle_client_disconnect (NULL, c->client_handle);
3108 }
3109
3110
3111 /**
3112  * Initiate core service.
3113  *
3114  * @param cls closure
3115  * @param s scheduler to use
3116  * @param serv the initialized server
3117  * @param c configuration to use
3118  */
3119 static void
3120 run (void *cls,
3121      struct GNUNET_SCHEDULER_Handle *s,
3122      struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
3123 {
3124 #if 0
3125   unsigned long long qin;
3126   unsigned long long qout;
3127   unsigned long long tneigh;
3128 #endif
3129   char *keyfile;
3130
3131   sched = s;
3132   cfg = c;
3133   /* parse configuration */
3134   if (
3135        (GNUNET_OK !=
3136         GNUNET_CONFIGURATION_get_value_number (c,
3137                                                "CORE",
3138                                                "TOTAL_QUOTA_IN",
3139                                                &bandwidth_target_in)) ||
3140        (GNUNET_OK !=
3141         GNUNET_CONFIGURATION_get_value_number (c,
3142                                                "CORE",
3143                                                "TOTAL_QUOTA_OUT",
3144                                                &bandwidth_target_out)) ||
3145 #if 0
3146        (GNUNET_OK !=
3147         GNUNET_CONFIGURATION_get_value_number (c,
3148                                                "CORE",
3149                                                "YY",
3150                                                &qout)) ||
3151        (GNUNET_OK !=
3152         GNUNET_CONFIGURATION_get_value_number (c,
3153                                                "CORE",
3154                                                "ZZ_LIMIT", &tneigh)) ||
3155 #endif
3156        (GNUNET_OK !=
3157         GNUNET_CONFIGURATION_get_value_filename (c,
3158                                                  "GNUNETD",
3159                                                  "HOSTKEY", &keyfile)))
3160     {
3161       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3162                   _
3163                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3164       GNUNET_SCHEDULER_shutdown (s);
3165       return;
3166     }
3167   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3168   GNUNET_free (keyfile);
3169   if (my_private_key == NULL)
3170     {
3171       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3172                   _("Core service could not access hostkey.  Exiting.\n"));
3173       GNUNET_SCHEDULER_shutdown (s);
3174       return;
3175     }
3176   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3177   GNUNET_CRYPTO_hash (&my_public_key,
3178                       sizeof (my_public_key), &my_identity.hashPubKey);
3179   /* setup notification */
3180   server = serv;
3181   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3182   /* setup transport connection */
3183   transport = GNUNET_TRANSPORT_connect (sched,
3184                                         cfg,
3185                                         NULL,
3186                                         &handle_transport_receive,
3187                                         &handle_transport_notify_connect,
3188                                         &handle_transport_notify_disconnect);
3189   GNUNET_assert (NULL != transport);
3190   GNUNET_SCHEDULER_add_delayed (sched,
3191                                 GNUNET_YES,
3192                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3193                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
3194                                 GNUNET_TIME_UNIT_FOREVER_REL,
3195                                 &cleaning_task, NULL);
3196   /* process client requests */
3197   GNUNET_SERVER_add_handlers (server, handlers);
3198   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3199               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3200 }
3201
3202
3203 /**
3204  * Function called during shutdown.  Clean up our state.
3205  */
3206 static void
3207 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
3208 {
3209   if (my_private_key != NULL)
3210     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3211 }
3212
3213
3214 /**
3215  * The main function for the transport service.
3216  *
3217  * @param argc number of arguments from the command line
3218  * @param argv command line arguments
3219  * @return 0 ok, 1 on error
3220  */
3221 int
3222 main (int argc, char *const *argv)
3223 {
3224   return (GNUNET_OK ==
3225           GNUNET_SERVICE_run (argc,
3226                               argv,
3227                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3228 }
3229
3230 /* end of gnunet-service-core.c */