cleaning up
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Identity of the neighbour.
364    */
365   struct GNUNET_PeerIdentity peer;
366
367   /**
368    * Key we use to encrypt our messages for the other peer
369    * (initialized by us when we do the handshake).
370    */
371   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
372
373   /**
374    * Key we use to decrypt messages from the other peer
375    * (given to us by the other peer during the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
378
379   /**
380    * ID of task used for re-trying plaintext scheduling.
381    */
382   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
383
384   /**
385    * ID of task used for re-trying SET_KEY and PING message.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
388
389   /**
390    * ID of task used for updating bandwidth quota for this neighbour.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
393
394   /**
395    * At what time did we generate our encryption key?
396    */
397   struct GNUNET_TIME_Absolute encrypt_key_created;
398
399   /**
400    * At what time did the other peer generate the decryption key?
401    */
402   struct GNUNET_TIME_Absolute decrypt_key_created;
403
404   /**
405    * At what time did we initially establish (as in, complete session
406    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
407    */
408   struct GNUNET_TIME_Absolute time_established;
409
410   /**
411    * At what time did we last receive an encrypted message from the
412    * other peer?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute last_activity;
415
416   /**
417    * Last latency observed from this peer.
418    */
419   struct GNUNET_TIME_Relative last_latency;
420
421   /**
422    * At what frequency are we currently re-trying SET_KEY messages?
423    */
424   struct GNUNET_TIME_Relative set_key_retry_frequency;
425
426   /**
427    * Time of our last update to the "available_send_window".
428    */
429   struct GNUNET_TIME_Absolute last_asw_update;
430
431   /**
432    * Time of our last update to the "available_recv_window".
433    */
434   struct GNUNET_TIME_Absolute last_arw_update;
435
436   /**
437    * Number of bytes that we are eligible to transmit to this
438    * peer at this point.  Incremented every minute by max_out_bpm,
439    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
440    * bandwidth-hogs are sampled at a frequency of about 78s!);
441    * may get negative if we have VERY high priority content.
442    */
443   long long available_send_window; 
444
445   /**
446    * How much downstream capacity of this peer has been reserved for
447    * our traffic?  (Our clients can request that a certain amount of
448    * bandwidth is available for replies to them; this value is used to
449    * make sure that this reserved amount of bandwidth is actually
450    * available).
451    */
452   long long available_recv_window; 
453
454   /**
455    * How valueable were the messages of this peer recently?
456    */
457   unsigned long long current_preference;
458
459   /**
460    * Bit map indicating which of the 32 sequence numbers before the last
461    * were received (good for accepting out-of-order packets and
462    * estimating reliability of the connection)
463    */
464   unsigned int last_packets_bitmap;
465
466   /**
467    * Number of messages in the message queue for this peer.
468    */
469   unsigned int message_queue_size;
470
471   /**
472    * last sequence number received on this connection (highest)
473    */
474   uint32_t last_sequence_number_received;
475
476   /**
477    * last sequence number transmitted
478    */
479   uint32_t last_sequence_number_sent;
480
481   /**
482    * Available bandwidth in for this peer (current target).
483    */
484   uint32_t bpm_in;
485
486   /**
487    * Available bandwidth out for this peer (current target).
488    */
489   uint32_t bpm_out;
490
491   /**
492    * Internal bandwidth limit set for this peer (initially
493    * typically set to "-1").  "bpm_out" is MAX of
494    * "bpm_out_internal_limit" and "bpm_out_external_limit".
495    */
496   uint32_t bpm_out_internal_limit;
497
498   /**
499    * External bandwidth limit set for this peer by the
500    * peer that we are communicating with.  "bpm_out" is MAX of
501    * "bpm_out_internal_limit" and "bpm_out_external_limit".
502    */
503   uint32_t bpm_out_external_limit;
504
505   /**
506    * What was our PING challenge number (for this peer)?
507    */
508   uint32_t ping_challenge;
509
510   /**
511    * What is our connection status?
512    */
513   enum PeerStateMachine status;
514
515 };
516
517
518 /**
519  * Events are messages for clients.  The struct
520  * itself is followed by the actual message.
521  */
522 struct Event
523 {
524   /**
525    * This is a linked list.
526    */
527   struct Event *next;
528
529   /**
530    * Size of the message.
531    */
532   size_t size;
533
534   /**
535    * Could this event be dropped if this queue
536    * is getting too large? (NOT YET USED!)
537    */
538   int can_drop;
539
540 };
541
542
543 /**
544  * Data structure for each client connected to the core service.
545  */
546 struct Client
547 {
548   /**
549    * Clients are kept in a linked list.
550    */
551   struct Client *next;
552
553   /**
554    * Handle for the client with the server API.
555    */
556   struct GNUNET_SERVER_Client *client_handle;
557
558   /**
559    * Linked list of messages we still need to deliver to
560    * this client.
561    */
562   struct Event *event_head;
563
564   /**
565    * Tail of the linked list of events.
566    */
567   struct Event *event_tail;
568
569   /**
570    * Current transmit handle, NULL if no transmission request
571    * is pending.
572    */
573   struct GNUNET_CONNECTION_TransmitHandle *th;
574
575   /**
576    * Array of the types of messages this peer cares
577    * about (with "tcnt" entries).  Allocated as part
578    * of this client struct, do not free!
579    */
580   uint16_t *types;
581
582   /**
583    * Options for messages this client cares about,
584    * see GNUNET_CORE_OPTION_ values.
585    */
586   uint32_t options;
587
588   /**
589    * Number of types of incoming messages this client
590    * specifically cares about.  Size of the "types" array.
591    */
592   unsigned int tcnt;
593
594 };
595
596
597 /**
598  * Our public key.
599  */
600 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
601
602 /**
603  * Our identity.
604  */
605 static struct GNUNET_PeerIdentity my_identity;
606
607 /**
608  * Our private key.
609  */
610 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
611
612 /**
613  * Our scheduler.
614  */
615 struct GNUNET_SCHEDULER_Handle *sched;
616
617 /**
618  * Our configuration.
619  */
620 const struct GNUNET_CONFIGURATION_Handle *cfg;
621
622 /**
623  * Our server.
624  */
625 static struct GNUNET_SERVER_Handle *server;
626
627 /**
628  * Transport service.
629  */
630 static struct GNUNET_TRANSPORT_Handle *transport;
631
632 /**
633  * Linked list of our clients.
634  */
635 static struct Client *clients;
636
637 /**
638  * We keep neighbours in a linked list (for now).
639  */
640 static struct Neighbour *neighbours;
641
642 /**
643  * Sum of all preferences among all neighbours.
644  */
645 static unsigned long long preference_sum;
646
647 /**
648  * Total number of neighbours we have.
649  */
650 static unsigned int neighbour_count;
651
652 /**
653  * How much inbound bandwidth are we supposed to be using?
654  */
655 static unsigned long long bandwidth_target_in;
656
657 /**
658  * How much outbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_out;
661
662
663
664 /**
665  * A preference value for a neighbour was update.  Update
666  * the preference sum accordingly.
667  *
668  * @param inc how much was a preference value increased?
669  */
670 static void
671 update_preference_sum (unsigned long long inc)
672 {
673   struct Neighbour *n;
674   unsigned long long os;
675
676   os = preference_sum;
677   preference_sum += inc;
678   if (preference_sum >= os)
679     return; /* done! */
680   /* overflow! compensate by cutting all values in half! */
681   preference_sum = 0;
682   n = neighbours;
683   while (n != NULL)
684     {
685       n->current_preference /= 2;
686       preference_sum += n->current_preference;
687       n = n->next;
688     }    
689 }
690
691
692 /**
693  * Recalculate the number of bytes we expect to
694  * receive or transmit in a given window.
695  *
696  * @param force force an update now (even if not much time has passed)
697  * @param window pointer to the byte counter (updated)
698  * @param ts pointer to the timestamp (updated)
699  * @param bpm number of bytes per minute that should
700  *        be added to the window.
701  */
702 static void
703 update_window (int force,
704                long long *window,
705                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
706 {
707   struct GNUNET_TIME_Relative since;
708
709   since = GNUNET_TIME_absolute_get_duration (*ts);
710   if ( (force == GNUNET_NO) &&
711        (since.value < 60 * 1000) )
712     return;                     /* not even a minute has passed */
713   *ts = GNUNET_TIME_absolute_get ();
714   *window += (bpm * since.value) / 60 / 1000;
715   if (*window > MAX_WINDOW_TIME * bpm)
716     *window = MAX_WINDOW_TIME * bpm;
717 }
718
719
720 /**
721  * Find the entry for the given neighbour.
722  *
723  * @param peer identity of the neighbour
724  * @return NULL if we are not connected, otherwise the
725  *         neighbour's entry.
726  */
727 static struct Neighbour *
728 find_neighbour (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct Neighbour *ret;
731
732   ret = neighbours;
733   while ((ret != NULL) &&
734          (0 != memcmp (&ret->peer,
735                        peer, sizeof (struct GNUNET_PeerIdentity))))
736     ret = ret->next;
737   return ret;
738 }
739
740
741 /**
742  * Find the entry for the given client.
743  *
744  * @param client handle for the client
745  * @return NULL if we are not connected, otherwise the
746  *         client's struct.
747  */
748 static struct Client *
749 find_client (const struct GNUNET_SERVER_Client *client)
750 {
751   struct Client *ret;
752
753   ret = clients;
754   while ((ret != NULL) && (client != ret->client_handle))
755     ret = ret->next;
756   return ret;
757 }
758
759
760 /**
761  * If necessary, initiate a request with the server to
762  * transmit messages from the queue of the given client.
763  * @param client who to transfer messages to
764  */
765 static void request_transmit (struct Client *client);
766
767
768 /**
769  * Client is ready to receive data, provide it.
770  *
771  * @param cls closure
772  * @param size number of bytes available in buf
773  * @param buf where the callee should write the message
774  * @return number of bytes written to buf
775  */
776 static size_t
777 do_client_transmit (void *cls, size_t size, void *buf)
778 {
779   struct Client *client = cls;
780   struct Event *e;
781   char *tgt;
782   size_t ret;
783
784   client->th = NULL;
785 #if DEBUG_CORE_CLIENT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Client ready to receive %u bytes.\n", size);
788 #endif
789   if (buf == NULL)
790     {
791 #if DEBUG_CORE
792       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                   "Failed to transmit data to client (disconnect)?\n");
794 #endif
795       return 0;                 /* we'll surely get a disconnect soon... */
796     }
797   tgt = buf;
798   ret = 0;
799   while ((NULL != (e = client->event_head)) && (e->size <= size))
800     {
801       memcpy (&tgt[ret], &e[1], e->size);
802       size -= e->size;
803       ret += e->size;
804       client->event_head = e->next;
805       GNUNET_free (e);
806     }
807   GNUNET_assert (ret > 0);
808   if (client->event_head == NULL)
809     client->event_tail = NULL;
810 #if DEBUG_CORE_CLIENT
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Transmitting %u bytes to client\n", ret);
813 #endif
814   request_transmit (client);
815   return ret;
816 }
817
818
819 /**
820  * If necessary, initiate a request with the server to
821  * transmit messages from the queue of the given client.
822  * @param client who to transfer messages to
823  */
824 static void
825 request_transmit (struct Client *client)
826 {
827
828   if (NULL != client->th)
829     return;                     /* already pending */
830   if (NULL == client->event_head)
831     return;                     /* no more events pending */
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Asking server to transmit %u bytes to client\n",
835               client->event_head->size);
836 #endif
837   client->th
838     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
839                                            client->event_head->size,
840                                            GNUNET_TIME_UNIT_FOREVER_REL,
841                                            &do_client_transmit, client);
842 }
843
844
845 /**
846  * Send a message to one of our clients.
847  * @param client target for the message
848  * @param msg message to transmit
849  * @param can_drop could this message be dropped if the
850  *        client's queue is getting too large?
851  */
852 static void
853 send_to_client (struct Client *client,
854                 const struct GNUNET_MessageHeader *msg, int can_drop)
855 {
856   struct Event *e;
857   unsigned int queue_size;
858   uint16_t msize;
859
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Preparing to send message of type %u to client.\n",
863               ntohs (msg->type));
864 #endif
865   queue_size = 0;
866   e = client->event_head;
867   while (e != NULL)
868     {
869       queue_size++;
870       e = e->next;
871     }
872   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
873        (can_drop == GNUNET_YES) )
874     {
875 #if DEBUG_CORE
876       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
877                   "Too many messages in queue for the client, dropping the new message.\n");
878 #endif
879       return;
880     }
881
882   msize = ntohs (msg->size);
883   e = GNUNET_malloc (sizeof (struct Event) + msize);
884   /* append */
885   if (client->event_tail != NULL)
886     client->event_tail->next = e;
887   else
888     client->event_head = e;
889   client->event_tail = e;
890   e->can_drop = can_drop;
891   e->size = msize;
892   memcpy (&e[1], msg, msize);
893   request_transmit (client);
894 }
895
896
897 /**
898  * Send a message to all of our current clients.
899  */
900 static void
901 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
902 {
903   struct Client *c;
904
905   c = clients;
906   while (c != NULL)
907     {
908       send_to_client (c, msg, can_drop);
909       c = c->next;
910     }
911 }
912
913
914 /**
915  * Handle CORE_INIT request.
916  */
917 static void
918 handle_client_init (void *cls,
919                     struct GNUNET_SERVER_Client *client,
920                     const struct GNUNET_MessageHeader *message)
921 {
922   const struct InitMessage *im;
923   struct InitReplyMessage irm;
924   struct Client *c;
925   uint16_t msize;
926   const uint16_t *types;
927   struct Neighbour *n;
928   struct ConnectNotifyMessage cnm;
929
930 #if DEBUG_CORE_CLIENT
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Client connecting to core service with `%s' message\n",
933               "INIT");
934 #endif
935   /* check that we don't have an entry already */
936   c = clients;
937   while (c != NULL)
938     {
939       if (client == c->client_handle)
940         {
941           GNUNET_break (0);
942           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
943           return;
944         }
945       c = c->next;
946     }
947   msize = ntohs (message->size);
948   if (msize < sizeof (struct InitMessage))
949     {
950       GNUNET_break (0);
951       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
952       return;
953     }
954   im = (const struct InitMessage *) message;
955   types = (const uint16_t *) &im[1];
956   msize -= sizeof (struct InitMessage);
957   c = GNUNET_malloc (sizeof (struct Client) + msize);
958   c->client_handle = client;
959   c->next = clients;
960   clients = c;
961   memcpy (&c[1], types, msize);
962   c->types = (uint16_t *) & c[1];
963   c->options = ntohl (im->options);
964   c->tcnt = msize / sizeof (uint16_t);
965   /* send init reply message */
966   irm.header.size = htons (sizeof (struct InitReplyMessage));
967   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
968   irm.reserved = htonl (0);
969   memcpy (&irm.publicKey,
970           &my_public_key,
971           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
972 #if DEBUG_CORE_CLIENT
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Sending `%s' message to client.\n", "INIT_REPLY");
975 #endif
976   send_to_client (c, &irm.header, GNUNET_NO);
977   /* notify new client about existing neighbours */
978   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
979   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
980   n = neighbours;
981   while (n != NULL)
982     {
983 #if DEBUG_CORE_CLIENT
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
986 #endif
987       cnm.reserved = htonl (0);
988       cnm.peer = n->peer;
989       send_to_client (c, &cnm.header, GNUNET_NO);
990       n = n->next;
991     }
992   GNUNET_SERVER_receive_done (client, GNUNET_OK);
993 }
994
995
996 /**
997  * A client disconnected, clean up.
998  *
999  * @param cls closure
1000  * @param client identification of the client
1001  */
1002 static void
1003 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1004 {
1005   struct Client *pos;
1006   struct Client *prev;
1007   struct Event *e;
1008
1009 #if DEBUG_CORE_CLIENT
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               "Client has disconnected from core service.\n");
1012 #endif
1013   prev = NULL;
1014   pos = clients;
1015   while (pos != NULL)
1016     {
1017       if (client == pos->client_handle)
1018         {
1019           if (prev == NULL)
1020             clients = pos->next;
1021           else
1022             prev->next = pos->next;
1023           if (pos->th != NULL)
1024             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1025           while (NULL != (e = pos->event_head))
1026             {
1027               pos->event_head = e->next;
1028               GNUNET_free (e);
1029             }
1030           GNUNET_free (pos);
1031           return;
1032         }
1033       prev = pos;
1034       pos = pos->next;
1035     }
1036   /* client never sent INIT */
1037 }
1038
1039
1040 /**
1041  * Handle REQUEST_CONFIGURE request.
1042  */
1043 static void
1044 handle_client_request_configure (void *cls,
1045                                  struct GNUNET_SERVER_Client *client,
1046                                  const struct GNUNET_MessageHeader *message)
1047 {
1048   const struct RequestConfigureMessage *rcm;
1049   struct Neighbour *n;
1050   struct ConfigurationInfoMessage cim;
1051   struct Client *c;
1052   int reserv;
1053   unsigned long long old_preference;
1054
1055 #if DEBUG_CORE_CLIENT
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Core service receives `%s' request.\n", "CONFIGURE");
1058 #endif
1059   rcm = (const struct RequestConfigureMessage *) message;
1060   n = find_neighbour (&rcm->peer);
1061   memset (&cim, 0, sizeof (cim));
1062   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1063     {
1064       update_window (GNUNET_YES,
1065                      &n->available_send_window,
1066                      &n->last_asw_update,
1067                      n->bpm_out);
1068       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1069       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1070                                n->bpm_out_external_limit);
1071       reserv = ntohl (rcm->reserve_inbound);
1072       if (reserv < 0)
1073         {
1074           n->available_recv_window += reserv;
1075         }
1076       else if (reserv > 0)
1077         {
1078           update_window (GNUNET_NO,
1079                          &n->available_recv_window,
1080                          &n->last_arw_update, n->bpm_in);
1081           if (n->available_recv_window < reserv)
1082             reserv = n->available_recv_window;
1083           n->available_recv_window -= reserv;
1084         }
1085       old_preference = n->current_preference;
1086       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1087       if (old_preference > n->current_preference) 
1088         {
1089           /* overflow; cap at maximum value */
1090           n->current_preference = (unsigned long long) -1;
1091         }
1092       update_preference_sum (n->current_preference - old_preference);
1093       cim.reserved_amount = htonl (reserv);
1094       cim.bpm_in = htonl (n->bpm_in);
1095       cim.bpm_out = htonl (n->bpm_out);
1096       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1097       cim.preference = n->current_preference;
1098     }
1099   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1100   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1101   cim.peer = rcm->peer;
1102   c = find_client (client);
1103   if (c == NULL)
1104     {
1105       GNUNET_break (0);
1106       return;
1107     }
1108 #if DEBUG_CORE_CLIENT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1111 #endif
1112   send_to_client (c, &cim.header, GNUNET_NO);
1113 }
1114
1115
1116 /**
1117  * Check if we have encrypted messages for the specified neighbour
1118  * pending, and if so, check with the transport about sending them
1119  * out.
1120  *
1121  * @param n neighbour to check.
1122  */
1123 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1124
1125
1126 /**
1127  * Function called when the transport service is ready to
1128  * receive an encrypted message for the respective peer
1129  *
1130  * @param cls neighbour to use message from
1131  * @param size number of bytes we can transmit
1132  * @param buf where to copy the message
1133  * @return number of bytes transmitted
1134  */
1135 static size_t
1136 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1137 {
1138   struct Neighbour *n = cls;
1139   struct MessageEntry *m;
1140   size_t ret;
1141   char *cbuf;
1142
1143   n->th = NULL;
1144   GNUNET_assert (NULL != (m = n->encrypted_head));
1145   n->encrypted_head = m->next;
1146   if (m->next == NULL)
1147     n->encrypted_tail = NULL;
1148   ret = 0;
1149   cbuf = buf;
1150   if (buf != NULL)
1151     {
1152       GNUNET_assert (size >= m->size);
1153       memcpy (cbuf, &m[1], m->size);
1154       ret = m->size;
1155       n->available_send_window -= m->size;
1156       process_encrypted_neighbour_queue (n);
1157 #if DEBUG_CORE
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1160                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1161                   ret, GNUNET_i2s (&n->peer));
1162 #endif
1163     }
1164   else
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                   "Transmission for message of type %u and size %u failed\n",
1168                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1169                   m->size);
1170     }
1171   GNUNET_free (m);
1172   return ret;
1173 }
1174
1175
1176 /**
1177  * Check if we have plaintext messages for the specified neighbour
1178  * pending, and if so, consider batching and encrypting them (and
1179  * then trigger processing of the encrypted queue if needed).
1180  *
1181  * @param n neighbour to check.
1182  */
1183 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1184
1185
1186 /**
1187  * Check if we have encrypted messages for the specified neighbour
1188  * pending, and if so, check with the transport about sending them
1189  * out.
1190  *
1191  * @param n neighbour to check.
1192  */
1193 static void
1194 process_encrypted_neighbour_queue (struct Neighbour *n)
1195 {
1196   struct MessageEntry *m;
1197  
1198   if (n->th != NULL)
1199     return;  /* request already pending */
1200   if (n->encrypted_head == NULL)
1201     {
1202       /* encrypted queue empty, try plaintext instead */
1203       process_plaintext_neighbour_queue (n);
1204       return;
1205     }
1206 #if DEBUG_CORE
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1209               n->encrypted_head->size,
1210               GNUNET_i2s (&n->peer),
1211               GNUNET_TIME_absolute_get_remaining (n->
1212                                                   encrypted_head->deadline).
1213               value);
1214 #endif
1215   n->th =
1216     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1217                                             n->encrypted_head->size,
1218                                             n->encrypted_head->priority,
1219                                             GNUNET_TIME_absolute_get_remaining
1220                                             (n->encrypted_head->deadline),
1221                                             &notify_encrypted_transmit_ready,
1222                                             n);
1223   if (n->th == NULL)
1224     {
1225       /* message request too large (oops) */
1226       GNUNET_break (0);
1227       /* discard encrypted message */
1228       GNUNET_assert (NULL != (m = n->encrypted_head));
1229       n->encrypted_head = m->next;
1230       if (m->next == NULL)
1231         n->encrypted_tail = NULL;
1232       GNUNET_free (m);
1233       process_encrypted_neighbour_queue (n);
1234     }
1235 }
1236
1237
1238 /**
1239  * Decrypt size bytes from in and write the result to out.  Use the
1240  * key for inbound traffic of the given neighbour.  This function does
1241  * NOT do any integrity-checks on the result.
1242  *
1243  * @param n neighbour we are receiving from
1244  * @param iv initialization vector to use
1245  * @param in ciphertext
1246  * @param out plaintext
1247  * @param size size of in/out
1248  * @return GNUNET_OK on success
1249  */
1250 static int
1251 do_decrypt (struct Neighbour *n,
1252             const GNUNET_HashCode * iv,
1253             const void *in, void *out, size_t size)
1254 {
1255   if (size != (uint16_t) size)
1256     {
1257       GNUNET_break (0);
1258       return GNUNET_NO;
1259     }
1260   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1261       (n->status != PEER_STATE_KEY_CONFIRMED))
1262     {
1263       GNUNET_break_op (0);
1264       return GNUNET_SYSERR;
1265     }
1266   if (size !=
1267       GNUNET_CRYPTO_aes_decrypt (in,
1268                                  (uint16_t) size,
1269                                  &n->decrypt_key,
1270                                  (const struct
1271                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1272                                  out))
1273     {
1274       GNUNET_break (0);
1275       return GNUNET_SYSERR;
1276     }
1277 #if DEBUG_CORE
1278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279               "Decrypted %u bytes from `%4s' using key %u\n",
1280               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1281 #endif
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Encrypt size bytes from in and write the result to out.  Use the
1288  * key for outbound traffic of the given neighbour.
1289  *
1290  * @param n neighbour we are sending to
1291  * @param iv initialization vector to use
1292  * @param in ciphertext
1293  * @param out plaintext
1294  * @param size size of in/out
1295  * @return GNUNET_OK on success
1296  */
1297 static int
1298 do_encrypt (struct Neighbour *n,
1299             const GNUNET_HashCode * iv,
1300             const void *in, void *out, size_t size)
1301 {
1302   if (size != (uint16_t) size)
1303     {
1304       GNUNET_break (0);
1305       return GNUNET_NO;
1306     }
1307   GNUNET_assert (size ==
1308                  GNUNET_CRYPTO_aes_encrypt (in,
1309                                             (uint16_t) size,
1310                                             &n->encrypt_key,
1311                                             (const struct
1312                                              GNUNET_CRYPTO_AesInitializationVector
1313                                              *) iv, out));
1314 #if DEBUG_CORE
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Encrypted %u bytes for `%4s' using key %u\n", size,
1317               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1318 #endif
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Select messages for transmission.  This heuristic uses a combination
1325  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1326  * and priority-based discard (in case no feasible schedule exist) and
1327  * speculative optimization (defer any kind of transmission until
1328  * we either create a batch of significant size, 25% of max, or until
1329  * we are close to a deadline).  Furthermore, when scheduling the
1330  * heuristic also packs as many messages into the batch as possible,
1331  * starting with those with the earliest deadline.  Yes, this is fun.
1332  *
1333  * @param n neighbour to select messages from
1334  * @param size number of bytes to select for transmission
1335  * @param retry_time set to the time when we should try again
1336  *        (only valid if this function returns zero)
1337  * @return number of bytes selected, or 0 if we decided to
1338  *         defer scheduling overall; in that case, retry_time is set.
1339  */
1340 static size_t
1341 select_messages (struct Neighbour *n,
1342                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1343 {
1344   struct MessageEntry *pos;
1345   struct MessageEntry *min;
1346   struct MessageEntry *last;
1347   unsigned int min_prio;
1348   struct GNUNET_TIME_Absolute t;
1349   struct GNUNET_TIME_Absolute now;
1350   uint64_t delta;
1351   uint64_t avail;
1352   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1353   size_t off;
1354   int discard_low_prio;
1355
1356   GNUNET_assert (NULL != n->messages);
1357   now = GNUNET_TIME_absolute_get ();
1358   /* last entry in linked list of messages processed */
1359   last = NULL;
1360   /* should we remove the entry with the lowest
1361      priority from consideration for scheduling at the
1362      end of the loop? */
1363   discard_low_prio = GNUNET_YES;
1364   while (GNUNET_YES == discard_low_prio)
1365     {
1366       min = NULL;
1367       min_prio = -1;
1368       discard_low_prio = GNUNET_NO;
1369       /* calculate number of bytes available for transmission at time "t" */
1370       update_window (GNUNET_NO,
1371                      &n->available_send_window,
1372                      &n->last_asw_update,
1373                      n->bpm_out);
1374       avail = n->available_send_window;
1375       t = n->last_asw_update;
1376       /* how many bytes have we (hypothetically) scheduled so far */
1377       off = 0;
1378       /* maximum time we can wait before transmitting anything
1379          and still make all of our deadlines */
1380       slack = -1;
1381
1382       pos = n->messages;
1383       /* note that we use "*2" here because we want to look
1384          a bit further into the future; much more makes no
1385          sense since new message might be scheduled in the
1386          meantime... */
1387       while ((pos != NULL) && (off < size * 2))
1388         {
1389           if (pos->do_transmit == GNUNET_YES)
1390             {
1391               /* already removed from consideration */
1392               pos = pos->next;
1393               continue;
1394             }
1395           if (discard_low_prio == GNUNET_NO)
1396             {
1397               delta = pos->deadline.value;
1398               if (delta < t.value)
1399                 delta = 0;
1400               else
1401                 delta = t.value - delta;
1402               avail += delta * n->bpm_out / 1000 / 60;
1403               if (avail < pos->size)
1404                 {
1405                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1406                 }
1407               else
1408                 {
1409                   avail -= pos->size;
1410                   /* update slack, considering both its absolute deadline
1411                      and relative deadlines caused by other messages
1412                      with their respective load */
1413                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1414                   if (pos->deadline.value < now.value)
1415                     slack = 0;
1416                   else
1417                     slack =
1418                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1419                 }
1420             }
1421           off += pos->size;
1422           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1423           if (pos->priority <= min_prio)
1424             {
1425               /* update min for discard */
1426               min_prio = pos->priority;
1427               min = pos;
1428             }
1429           pos = pos->next;
1430         }
1431       if (discard_low_prio)
1432         {
1433           GNUNET_assert (min != NULL);
1434           /* remove lowest-priority entry from consideration */
1435           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1436         }
1437       last = pos;
1438     }
1439   /* guard against sending "tiny" messages with large headers without
1440      urgent deadlines */
1441   if ( (slack > 1000) && (size > 4 * off) )
1442     {
1443       /* less than 25% of message would be filled with
1444          deadlines still being met if we delay by one
1445          second or more; so just wait for more data */
1446       retry_time->value = slack / 2;
1447       /* reset do_transmit values for next time */
1448       while (pos != last)
1449         {
1450           pos->do_transmit = GNUNET_NO;
1451           pos = pos->next;
1452         }
1453       return 0;
1454     }
1455   /* select marked messages (up to size) for transmission */
1456   off = 0;
1457   pos = n->messages;
1458   while (pos != last)
1459     {
1460       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1461         {
1462           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1463           off += pos->size;
1464           size -= pos->size;
1465         }
1466       else
1467         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1468       pos = pos->next;
1469     }
1470 #if DEBUG_CORE
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1473               off, GNUNET_i2s (&n->peer));
1474 #endif
1475   return off;
1476 }
1477
1478
1479 /**
1480  * Batch multiple messages into a larger buffer.
1481  *
1482  * @param n neighbour to take messages from
1483  * @param buf target buffer
1484  * @param size size of buf
1485  * @param deadline set to transmission deadline for the result
1486  * @param retry_time set to the time when we should try again
1487  *        (only valid if this function returns zero)
1488  * @param priority set to the priority of the batch
1489  * @return number of bytes written to buf (can be zero)
1490  */
1491 static size_t
1492 batch_message (struct Neighbour *n,
1493                char *buf,
1494                size_t size,
1495                struct GNUNET_TIME_Absolute *deadline,
1496                struct GNUNET_TIME_Relative *retry_time,
1497                unsigned int *priority)
1498 {
1499   struct MessageEntry *pos;
1500   struct MessageEntry *prev;
1501   struct MessageEntry *next;
1502   size_t ret;
1503
1504   ret = 0;
1505   *priority = 0;
1506   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1507   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1508   if (0 == select_messages (n, size, retry_time))
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1511                   "No messages selected, will try again in %llu ms\n",
1512                   retry_time->value);
1513       return 0;
1514     }
1515   pos = n->messages;
1516   prev = NULL;
1517   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1518     {
1519       next = pos->next;
1520       if (GNUNET_YES == pos->do_transmit)
1521         {
1522           GNUNET_assert (pos->size <= size);
1523           memcpy (&buf[ret], &pos[1], pos->size);
1524           ret += pos->size;
1525           size -= pos->size;
1526           *priority += pos->priority;
1527 #if DEBUG_CORE
1528           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1529                       "Adding plaintext message with deadline %llu ms to batch\n",
1530                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1531 #endif
1532           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1533           GNUNET_free (pos);
1534           if (prev == NULL)
1535             n->messages = next;
1536           else
1537             prev->next = next;
1538         }
1539       else
1540         {
1541           prev = pos;
1542         }
1543       pos = next;
1544     }
1545 #if DEBUG_CORE
1546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1547               "Deadline for message batch is %llu ms\n",
1548               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1549 #endif
1550   return ret;
1551 }
1552
1553
1554 /**
1555  * Remove messages with deadlines that have long expired from
1556  * the queue.
1557  *
1558  * @param n neighbour to inspect
1559  */
1560 static void
1561 discard_expired_messages (struct Neighbour *n)
1562 {
1563   struct MessageEntry *prev;
1564   struct MessageEntry *next;
1565   struct MessageEntry *pos;
1566   struct GNUNET_TIME_Absolute now;
1567   struct GNUNET_TIME_Relative delta;
1568
1569   now = GNUNET_TIME_absolute_get ();
1570   prev = NULL;
1571   pos = n->messages;
1572   while (pos != NULL) 
1573     {
1574       next = pos->next;
1575       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1576       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1577         {
1578 #if DEBUG_CORE
1579           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1580                       "Message is %llu ms past due, discarding.\n",
1581                       delta.value);
1582 #endif
1583           if (prev == NULL)
1584             n->messages = next;
1585           else
1586             prev->next = next;
1587           GNUNET_free (pos);
1588         }
1589       else
1590         prev = pos;
1591       pos = next;
1592     }
1593 }
1594
1595
1596 /**
1597  * Signature of the main function of a task.
1598  *
1599  * @param cls closure
1600  * @param tc context information (why was this task triggered now)
1601  */
1602 static void
1603 retry_plaintext_processing (void *cls,
1604                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1605 {
1606   struct Neighbour *n = cls;
1607
1608   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1609   process_plaintext_neighbour_queue (n);
1610 }
1611
1612
1613 /**
1614  * Send our key (and encrypted PING) to the other peer.
1615  *
1616  * @param n the other peer
1617  */
1618 static void send_key (struct Neighbour *n);
1619
1620
1621 /**
1622  * Check if we have plaintext messages for the specified neighbour
1623  * pending, and if so, consider batching and encrypting them (and
1624  * then trigger processing of the encrypted queue if needed).
1625  *
1626  * @param n neighbour to check.
1627  */
1628 static void
1629 process_plaintext_neighbour_queue (struct Neighbour *n)
1630 {
1631   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1632   size_t used;
1633   size_t esize;
1634   struct EncryptedMessage *em;  /* encrypted message */
1635   struct EncryptedMessage *ph;  /* plaintext header */
1636   struct MessageEntry *me;
1637   unsigned int priority;
1638   struct GNUNET_TIME_Absolute deadline;
1639   struct GNUNET_TIME_Relative retry_time;
1640
1641   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1642     {
1643       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1644       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1645     }
1646   switch (n->status)
1647     {
1648     case PEER_STATE_DOWN:
1649       send_key (n);
1650 #if DEBUG_CORE
1651       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1653                   GNUNET_i2s(&n->peer));
1654 #endif
1655       return;
1656     case PEER_STATE_KEY_SENT:
1657       GNUNET_assert (n->retry_set_key_task !=
1658                      GNUNET_SCHEDULER_NO_TASK);
1659 #if DEBUG_CORE
1660       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1662                   GNUNET_i2s(&n->peer));
1663 #endif
1664       return;
1665     case PEER_STATE_KEY_RECEIVED:
1666       GNUNET_assert (n->retry_set_key_task !=
1667                      GNUNET_SCHEDULER_NO_TASK);
1668 #if DEBUG_CORE
1669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1670                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1671                   GNUNET_i2s(&n->peer));
1672 #endif
1673       return;
1674     case PEER_STATE_KEY_CONFIRMED:
1675       /* ready to continue */
1676       break;
1677     }
1678   discard_expired_messages (n);
1679   if (n->messages == NULL)
1680     {
1681 #if DEBUG_CORE
1682       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                   "Plaintext message queue for `%4s' is empty.\n",
1684                   GNUNET_i2s(&n->peer));
1685 #endif
1686       return;                   /* no pending messages */
1687     }
1688   if (n->encrypted_head != NULL)
1689     {
1690 #if DEBUG_CORE
1691       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1693                   GNUNET_i2s(&n->peer));
1694 #endif
1695       return;                   /* wait for messages already encrypted to be
1696                                    processed first! */
1697     }
1698   ph = (struct EncryptedMessage *) pbuf;
1699   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1700   priority = 0;
1701   used = sizeof (struct EncryptedMessage);
1702   used += batch_message (n,
1703                          &pbuf[used],
1704                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1705                          &deadline, &retry_time, &priority);
1706   if (used == sizeof (struct EncryptedMessage))
1707     {
1708 #if DEBUG_CORE
1709       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1710                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1711                   GNUNET_i2s(&n->peer));
1712 #endif
1713       /* no messages selected for sending, try again later... */
1714       n->retry_plaintext_task =
1715         GNUNET_SCHEDULER_add_delayed (sched,
1716                                       retry_time,
1717                                       &retry_plaintext_processing, n);
1718       return;
1719     }
1720   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1721   ph->inbound_bpm_limit = htonl (n->bpm_in);
1722   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1723
1724   /* setup encryption message header */
1725   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1726   me->deadline = deadline;
1727   me->priority = priority;
1728   me->size = used;
1729   em = (struct EncryptedMessage *) &me[1];
1730   em->header.size = htons (used);
1731   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1732   em->reserved = htonl (0);
1733   esize = used - ENCRYPTED_HEADER_SIZE;
1734   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1735   /* encrypt */
1736 #if DEBUG_CORE
1737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1738               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1739               esize,
1740               GNUNET_i2s(&n->peer),
1741               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1742 #endif
1743   GNUNET_assert (GNUNET_OK ==
1744                  do_encrypt (n,
1745                              &em->plaintext_hash,
1746                              &ph->sequence_number,
1747                              &em->sequence_number, esize));
1748   /* append to transmission list */
1749   if (n->encrypted_tail == NULL)
1750     n->encrypted_head = me;
1751   else
1752     n->encrypted_tail->next = me;
1753   n->encrypted_tail = me;
1754   process_encrypted_neighbour_queue (n);
1755 }
1756
1757
1758 /**
1759  * Handle CORE_SEND request.
1760  *
1761  * @param cls unused
1762  * @param client the client issuing the request
1763  * @param message the "struct SendMessage"
1764  */
1765 static void
1766 handle_client_send (void *cls,
1767                     struct GNUNET_SERVER_Client *client,
1768                     const struct GNUNET_MessageHeader *message);
1769
1770
1771 /**
1772  * Function called to notify us that we either succeeded
1773  * or failed to connect (at the transport level) to another
1774  * peer.  We should either free the message we were asked
1775  * to transmit or re-try adding it to the queue.
1776  *
1777  * @param cls closure
1778  * @param size number of bytes available in buf
1779  * @param buf where the callee should write the message
1780  * @return number of bytes written to buf
1781  */
1782 static size_t
1783 send_connect_continuation (void *cls, size_t size, void *buf)
1784 {
1785   struct SendMessage *sm = cls;
1786
1787   if (buf == NULL)
1788     {
1789 #if DEBUG_CORE
1790       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1791                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1792                   GNUNET_i2s (&sm->peer));
1793 #endif
1794       GNUNET_free (sm);
1795       return 0;
1796     }
1797 #if DEBUG_CORE
1798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1799               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1800               GNUNET_i2s (&sm->peer));
1801 #endif
1802   handle_client_send (NULL, NULL, &sm->header);
1803   GNUNET_free (sm);
1804   return 0;
1805 }
1806
1807
1808 /**
1809  * Handle CORE_SEND request.
1810  *
1811  * @param cls unused
1812  * @param client the client issuing the request
1813  * @param message the "struct SendMessage"
1814  */
1815 static void
1816 handle_client_send (void *cls,
1817                     struct GNUNET_SERVER_Client *client,
1818                     const struct GNUNET_MessageHeader *message)
1819 {
1820   const struct SendMessage *sm;
1821   struct SendMessage *smc;
1822   const struct GNUNET_MessageHeader *mh;
1823   struct Neighbour *n;
1824   struct MessageEntry *prev;
1825   struct MessageEntry *pos;
1826   struct MessageEntry *e; 
1827   struct MessageEntry *min_prio_entry;
1828   struct MessageEntry *min_prio_prev;
1829   unsigned int min_prio;
1830   unsigned int queue_size;
1831   uint16_t msize;
1832
1833   msize = ntohs (message->size);
1834   if (msize <
1835       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1836     {
1837       GNUNET_break (0);
1838       if (client != NULL)
1839         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1840       return;
1841     }
1842   sm = (const struct SendMessage *) message;
1843   msize -= sizeof (struct SendMessage);
1844   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1845   if (msize != ntohs (mh->size))
1846     {
1847       GNUNET_break (0);
1848       if (client != NULL)
1849         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1850       return;
1851     }
1852   n = find_neighbour (&sm->peer);
1853   if (n == NULL)
1854     {
1855 #if DEBUG_CORE
1856       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1857                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1858                   "SEND",
1859                   GNUNET_i2s (&sm->peer),
1860                   GNUNET_TIME_absolute_get_remaining
1861                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1862 #endif
1863       msize += sizeof (struct SendMessage);
1864       /* ask transport to connect to the peer */
1865       smc = GNUNET_malloc (msize);
1866       memcpy (smc, sm, msize);
1867       if (NULL ==
1868           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1869                                                   &sm->peer,
1870                                                   0, 0,
1871                                                   GNUNET_TIME_absolute_get_remaining
1872                                                   (GNUNET_TIME_absolute_ntoh
1873                                                    (sm->deadline)),
1874                                                   &send_connect_continuation,
1875                                                   smc))
1876         {
1877           /* transport has already a request pending for this peer! */
1878 #if DEBUG_CORE
1879           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1880                       "Dropped second message destined for `%4s' since connection is still down.\n",
1881                       GNUNET_i2s(&sm->peer));
1882 #endif
1883           GNUNET_free (smc);
1884         }
1885       if (client != NULL)
1886         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1887       return;
1888     }
1889 #if DEBUG_CORE
1890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1892               "SEND",
1893               msize, 
1894               GNUNET_i2s (&sm->peer));
1895 #endif
1896   /* bound queue size */
1897   discard_expired_messages (n);
1898   min_prio = (unsigned int) -1;
1899   min_prio_entry = NULL;
1900   min_prio_prev = NULL;
1901   queue_size = 0;
1902   prev = NULL;
1903   pos = n->messages;
1904   while (pos != NULL) 
1905     {
1906       if (pos->priority < min_prio)
1907         {
1908           min_prio_entry = pos;
1909           min_prio_prev = prev;
1910           min_prio = pos->priority;
1911         }
1912       queue_size++;
1913       prev = pos;
1914       pos = pos->next;
1915     }
1916   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1917     {
1918       /* queue full */
1919       if (ntohl(sm->priority) <= min_prio)
1920         {
1921           /* discard new entry */
1922 #if DEBUG_CORE
1923           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1924                       "Queue full, discarding new request\n");
1925 #endif
1926           if (client != NULL)
1927             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1928           return;
1929         }
1930       /* discard "min_prio_entry" */
1931 #if DEBUG_CORE
1932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1933                   "Queue full, discarding existing older request\n");
1934 #endif
1935       if (min_prio_prev == NULL)
1936         n->messages = min_prio_entry->next;
1937       else
1938         min_prio_prev->next = min_prio_entry->next;      
1939       GNUNET_free (min_prio_entry);     
1940     }
1941   
1942   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1943   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1944   e->priority = ntohl (sm->priority);
1945   e->size = msize;
1946   memcpy (&e[1], mh, msize);
1947
1948   /* insert, keep list sorted by deadline */
1949   prev = NULL;
1950   pos = n->messages;
1951   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1952     {
1953       prev = pos;
1954       pos = pos->next;
1955     }
1956   if (prev == NULL)
1957     n->messages = e;
1958   else
1959     prev->next = e;
1960   e->next = pos;
1961
1962   /* consider scheduling now */
1963   process_plaintext_neighbour_queue (n);
1964   if (client != NULL)
1965     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1966 }
1967
1968
1969 /**
1970  * List of handlers for the messages understood by this
1971  * service.
1972  */
1973 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1974   {&handle_client_init, NULL,
1975    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1976   {&handle_client_request_configure, NULL,
1977    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1978    sizeof (struct RequestConfigureMessage)},
1979   {&handle_client_send, NULL,
1980    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1981   {NULL, NULL, 0, 0}
1982 };
1983
1984
1985 /**
1986  * PEERINFO is giving us a HELLO for a peer.  Add the
1987  * public key to the neighbour's struct and retry
1988  * send_key.  Or, if we did not get a HELLO, just do
1989  * nothing.
1990  *
1991  * @param cls NULL
1992  * @param peer the peer for which this is the HELLO
1993  * @param hello HELLO message of that peer
1994  * @param trust amount of trust we currently have in that peer
1995  */
1996 static void
1997 process_hello_retry_send_key (void *cls,
1998                               const struct GNUNET_PeerIdentity *peer,
1999                               const struct GNUNET_HELLO_Message *hello,
2000                               uint32_t trust)
2001 {
2002   struct Neighbour *n;
2003
2004   if (peer == NULL)
2005     return;
2006   n = find_neighbour (peer);
2007   if (n == NULL)
2008     return;
2009   if (n->public_key != NULL)
2010     return;
2011 #if DEBUG_CORE
2012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013               "Received new `%s' message for `%4s', initiating key exchange.\n",
2014               "HELLO",
2015               GNUNET_i2s (peer));
2016 #endif
2017   n->public_key =
2018     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2019   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2020     {
2021       GNUNET_free (n->public_key);
2022       n->public_key = NULL;
2023       return;
2024     }
2025   send_key (n);
2026 }
2027
2028
2029 /**
2030  * Task that will retry "send_key" if our previous attempt failed
2031  * to yield a PONG.
2032  */
2033 static void
2034 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2035 {
2036   struct Neighbour *n = cls;
2037
2038   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2039   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2040   n->set_key_retry_frequency =
2041     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2042   send_key (n);
2043 }
2044
2045
2046 /**
2047  * Send our key (and encrypted PING) to the other peer.
2048  *
2049  * @param n the other peer
2050  */
2051 static void
2052 send_key (struct Neighbour *n)
2053 {
2054   struct SetKeyMessage *sm;
2055   struct MessageEntry *me;
2056   struct PingMessage pp;
2057   struct PingMessage *pm;
2058
2059 #if DEBUG_CORE
2060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2061               "Asked to perform key exchange with `%4s'.\n",
2062               GNUNET_i2s (&n->peer));
2063 #endif
2064   if (n->public_key == NULL)
2065     {
2066       /* lookup n's public key, then try again */
2067 #if DEBUG_CORE
2068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2069                   "Lacking public key for `%4s', trying to obtain one.\n",
2070                   GNUNET_i2s (&n->peer));
2071 #endif
2072       GNUNET_PEERINFO_for_all (cfg,
2073                                sched,
2074                                &n->peer,
2075                                0,
2076                                GNUNET_TIME_UNIT_MINUTES,
2077                                &process_hello_retry_send_key, NULL);
2078       return;
2079     }
2080   /* first, set key message */
2081   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2082                       sizeof (struct SetKeyMessage));
2083   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2084   me->priority = SET_KEY_PRIORITY;
2085   me->size = sizeof (struct SetKeyMessage);
2086   if (n->encrypted_head == NULL)
2087     n->encrypted_head = me;
2088   else
2089     n->encrypted_tail->next = me;
2090   n->encrypted_tail = me;
2091   sm = (struct SetKeyMessage *) &me[1];
2092   sm->header.size = htons (sizeof (struct SetKeyMessage));
2093   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2094   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2095                                         PEER_STATE_KEY_SENT : n->status));
2096   sm->purpose.size =
2097     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2098            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2099            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2100            sizeof (struct GNUNET_PeerIdentity));
2101   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2102   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2103   sm->target = n->peer;
2104   GNUNET_assert (GNUNET_OK ==
2105                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2106                                             sizeof (struct
2107                                                     GNUNET_CRYPTO_AesSessionKey),
2108                                             n->public_key,
2109                                             &sm->encrypted_key));
2110   GNUNET_assert (GNUNET_OK ==
2111                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2112                                          &sm->signature));
2113
2114   /* second, encrypted PING message */
2115   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2116                       sizeof (struct PingMessage));
2117   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2118   me->priority = PING_PRIORITY;
2119   me->size = sizeof (struct PingMessage);
2120   n->encrypted_tail->next = me;
2121   n->encrypted_tail = me;
2122   pm = (struct PingMessage *) &me[1];
2123   pm->header.size = htons (sizeof (struct PingMessage));
2124   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2125   pp.challenge = htonl (n->ping_challenge);
2126   pp.target = n->peer;
2127 #if DEBUG_CORE
2128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2129               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2130               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2131   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2132               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2133               "PING",
2134               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2135 #endif
2136   do_encrypt (n,
2137               &n->peer.hashPubKey,
2138               &pp.challenge,
2139               &pm->challenge,
2140               sizeof (struct PingMessage) -
2141               sizeof (struct GNUNET_MessageHeader));
2142   /* update status */
2143   switch (n->status)
2144     {
2145     case PEER_STATE_DOWN:
2146       n->status = PEER_STATE_KEY_SENT;
2147       break;
2148     case PEER_STATE_KEY_SENT:
2149       break;
2150     case PEER_STATE_KEY_RECEIVED:
2151       break;
2152     case PEER_STATE_KEY_CONFIRMED:
2153       GNUNET_break (0);
2154       break;
2155     default:
2156       GNUNET_break (0);
2157       break;
2158     }
2159 #if DEBUG_CORE
2160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2161               "Have %llu ms left for `%s' transmission.\n",
2162               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2163               "SET_KEY");
2164 #endif
2165   /* trigger queue processing */
2166   process_encrypted_neighbour_queue (n);
2167   if (n->status != PEER_STATE_KEY_CONFIRMED)
2168     n->retry_set_key_task
2169       = GNUNET_SCHEDULER_add_delayed (sched,
2170                                       n->set_key_retry_frequency,
2171                                       &set_key_retry_task, n);
2172 }
2173
2174
2175 /**
2176  * We received a SET_KEY message.  Validate and update
2177  * our key material and status.
2178  *
2179  * @param n the neighbour from which we received message m
2180  * @param m the set key message we received
2181  */
2182 static void
2183 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2184
2185
2186 /**
2187  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2188  * the neighbour's struct and retry handling the set_key message.  Or,
2189  * if we did not get a HELLO, just free the set key message.
2190  *
2191  * @param cls pointer to the set key message
2192  * @param peer the peer for which this is the HELLO
2193  * @param hello HELLO message of that peer
2194  * @param trust amount of trust we currently have in that peer
2195  */
2196 static void
2197 process_hello_retry_handle_set_key (void *cls,
2198                                     const struct GNUNET_PeerIdentity *peer,
2199                                     const struct GNUNET_HELLO_Message *hello,
2200                                     uint32_t trust)
2201 {
2202   struct SetKeyMessage *sm = cls;
2203   struct Neighbour *n;
2204
2205   if (peer == NULL)
2206     {
2207       GNUNET_free (sm);
2208       return;
2209     }
2210   n = find_neighbour (peer);
2211   if (n == NULL)
2212     {
2213       GNUNET_break (0);
2214       return;
2215     }
2216   if (n->public_key != NULL)
2217     return;                     /* multiple HELLOs match!? */
2218   n->public_key =
2219     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2220   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2221     {
2222       GNUNET_break_op (0);
2223       GNUNET_free (n->public_key);
2224       n->public_key = NULL;
2225       return;
2226     }
2227 #if DEBUG_CORE
2228   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2229               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2230               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2231 #endif
2232   handle_set_key (n, sm);
2233 }
2234
2235
2236 /**
2237  * We received a PING message.  Validate and transmit
2238  * PONG.
2239  *
2240  * @param n sender of the PING
2241  * @param m the encrypted PING message itself
2242  */
2243 static void
2244 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2245 {
2246   struct PingMessage t;
2247   struct PingMessage *tp;
2248   struct MessageEntry *me;
2249
2250 #if DEBUG_CORE
2251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2252               "Core service receives `%s' request from `%4s'.\n",
2253               "PING", GNUNET_i2s (&n->peer));
2254 #endif
2255   if (GNUNET_OK !=
2256       do_decrypt (n,
2257                   &my_identity.hashPubKey,
2258                   &m->challenge,
2259                   &t.challenge,
2260                   sizeof (struct PingMessage) -
2261                   sizeof (struct GNUNET_MessageHeader)))
2262     return;
2263 #if DEBUG_CORE
2264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2266               "PING",
2267               GNUNET_i2s (&t.target),
2268               ntohl (t.challenge), n->decrypt_key.crc32);
2269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2270               "Target of `%s' request is `%4s'.\n",
2271               "PING", GNUNET_i2s (&t.target));
2272 #endif
2273   if (0 != memcmp (&t.target,
2274                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2275     {
2276       GNUNET_break_op (0);
2277       return;
2278     }
2279   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2280                       sizeof (struct PingMessage));
2281   if (n->encrypted_tail != NULL)
2282     n->encrypted_tail->next = me;
2283   else
2284     {
2285       n->encrypted_tail = me;
2286       n->encrypted_head = me;
2287     }
2288   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2289   me->priority = PONG_PRIORITY;
2290   me->size = sizeof (struct PingMessage);
2291   tp = (struct PingMessage *) &me[1];
2292   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2293   tp->header.size = htons (sizeof (struct PingMessage));
2294   do_encrypt (n,
2295               &my_identity.hashPubKey,
2296               &t.challenge,
2297               &tp->challenge,
2298               sizeof (struct PingMessage) -
2299               sizeof (struct GNUNET_MessageHeader));
2300 #if DEBUG_CORE
2301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2302               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2303               ntohl (t.challenge), n->encrypt_key.crc32);
2304 #endif
2305   /* trigger queue processing */
2306   process_encrypted_neighbour_queue (n);
2307 }
2308
2309
2310 /**
2311  * We received a SET_KEY message.  Validate and update
2312  * our key material and status.
2313  *
2314  * @param n the neighbour from which we received message m
2315  * @param m the set key message we received
2316  */
2317 static void
2318 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2319 {
2320   struct SetKeyMessage *m_cpy;
2321   struct GNUNET_TIME_Absolute t;
2322   struct GNUNET_CRYPTO_AesSessionKey k;
2323   struct PingMessage *ping;
2324   enum PeerStateMachine sender_status;
2325
2326 #if DEBUG_CORE
2327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2328               "Core service receives `%s' request from `%4s'.\n",
2329               "SET_KEY", GNUNET_i2s (&n->peer));
2330 #endif
2331   if (n->public_key == NULL)
2332     {
2333 #if DEBUG_CORE
2334       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2335                   "Lacking public key for peer, trying to obtain one.\n");
2336 #endif
2337       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2338       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2339       /* lookup n's public key, then try again */
2340       GNUNET_PEERINFO_for_all (cfg,
2341                                sched,
2342                                &n->peer,
2343                                0,
2344                                GNUNET_TIME_UNIT_MINUTES,
2345                                &process_hello_retry_handle_set_key, m_cpy);
2346       return;
2347     }
2348   if ((ntohl (m->purpose.size) !=
2349        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2350        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2351        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2352        sizeof (struct GNUNET_PeerIdentity)) ||
2353       (GNUNET_OK !=
2354        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2355                                  &m->purpose, &m->signature, n->public_key)))
2356     {
2357       /* invalid signature */
2358       GNUNET_break_op (0);
2359       return;
2360     }
2361   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2362   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2363        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2364       (t.value < n->decrypt_key_created.value))
2365     {
2366       /* this could rarely happen due to massive re-ordering of
2367          messages on the network level, but is most likely either
2368          a bug or some adversary messing with us.  Report. */
2369       GNUNET_break_op (0);
2370       return;
2371     }
2372 #if DEBUG_CORE
2373   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2374 #endif  
2375   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2376                                   &m->encrypted_key,
2377                                   &k,
2378                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2379        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2380       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2381     {
2382       /* failed to decrypt !? */
2383       GNUNET_break_op (0);
2384       return;
2385     }
2386
2387   n->decrypt_key = k;
2388   if (n->decrypt_key_created.value != t.value)
2389     {
2390       /* fresh key, reset sequence numbers */
2391       n->last_sequence_number_received = 0;
2392       n->last_packets_bitmap = 0;
2393       n->decrypt_key_created = t;
2394     }
2395   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2396   switch (n->status)
2397     {
2398     case PEER_STATE_DOWN:
2399       n->status = PEER_STATE_KEY_RECEIVED;
2400 #if DEBUG_CORE
2401       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402                   "Responding to `%s' with my own key.\n", "SET_KEY");
2403 #endif
2404       send_key (n);
2405       break;
2406     case PEER_STATE_KEY_SENT:
2407     case PEER_STATE_KEY_RECEIVED:
2408       n->status = PEER_STATE_KEY_RECEIVED;
2409       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2410           (sender_status != PEER_STATE_KEY_CONFIRMED))
2411         {
2412 #if DEBUG_CORE
2413           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2414                       "Responding to `%s' with my own key (other peer has status %u).\n",
2415                       "SET_KEY", sender_status);
2416 #endif
2417           send_key (n);
2418         }
2419       break;
2420     case PEER_STATE_KEY_CONFIRMED:
2421       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2422           (sender_status != PEER_STATE_KEY_CONFIRMED))
2423         {
2424 #if DEBUG_CORE
2425           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2426                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2427                       "SET_KEY", sender_status);
2428 #endif
2429           send_key (n);
2430         }
2431       break;
2432     default:
2433       GNUNET_break (0);
2434       break;
2435     }
2436   if (n->pending_ping != NULL)
2437     {
2438       ping = n->pending_ping;
2439       n->pending_ping = NULL;
2440       handle_ping (n, ping);
2441       GNUNET_free (ping);
2442     }
2443 }
2444
2445
2446 /**
2447  * We received a PONG message.  Validate and update
2448  * our status.
2449  *
2450  * @param n sender of the PONG
2451  * @param m the encrypted PONG message itself
2452  */
2453 static void
2454 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2455 {
2456   struct PingMessage t;
2457
2458 #if DEBUG_CORE
2459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2460               "Core service receives `%s' request from `%4s'.\n",
2461               "PONG", GNUNET_i2s (&n->peer));
2462 #endif
2463   if (GNUNET_OK !=
2464       do_decrypt (n,
2465                   &n->peer.hashPubKey,
2466                   &m->challenge,
2467                   &t.challenge,
2468                   sizeof (struct PingMessage) -
2469                   sizeof (struct GNUNET_MessageHeader)))
2470     return;
2471 #if DEBUG_CORE
2472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2473               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2474               "PONG",
2475               GNUNET_i2s (&t.target),
2476               ntohl (t.challenge), n->decrypt_key.crc32);
2477 #endif
2478   if ((0 != memcmp (&t.target,
2479                     &n->peer,
2480                     sizeof (struct GNUNET_PeerIdentity))) ||
2481       (n->ping_challenge != ntohl (t.challenge)))
2482     {
2483       /* PONG malformed */
2484 #if DEBUG_CORE
2485       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2487                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2488       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2489                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2490                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2491 #endif
2492       GNUNET_break_op (0);
2493       return;
2494     }
2495   switch (n->status)
2496     {
2497     case PEER_STATE_DOWN:
2498       GNUNET_break (0);         /* should be impossible */
2499       return;
2500     case PEER_STATE_KEY_SENT:
2501       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2502       return;
2503     case PEER_STATE_KEY_RECEIVED:
2504       n->status = PEER_STATE_KEY_CONFIRMED;
2505       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2506         {
2507           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2508           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2509         }
2510       process_encrypted_neighbour_queue (n);
2511       break;
2512     case PEER_STATE_KEY_CONFIRMED:
2513       /* duplicate PONG? */
2514       break;
2515     default:
2516       GNUNET_break (0);
2517       break;
2518     }
2519 }
2520
2521
2522 /**
2523  * Send a P2P message to a client.
2524  *
2525  * @param sender who sent us the message?
2526  * @param client who should we give the message to?
2527  * @param m contains the message to transmit
2528  * @param msize number of bytes in buf to transmit
2529  */
2530 static void
2531 send_p2p_message_to_client (struct Neighbour *sender,
2532                             struct Client *client,
2533                             const void *m, size_t msize)
2534 {
2535   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2536   struct NotifyTrafficMessage *ntm;
2537
2538 #if DEBUG_CORE
2539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2540               "Core service passes message from `%4s' of type %u to client.\n",
2541               GNUNET_i2s(&sender->peer),
2542               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2543 #endif
2544   ntm = (struct NotifyTrafficMessage *) buf;
2545   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2546   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2547   ntm->reserved = htonl (0);
2548   ntm->peer = sender->peer;
2549   memcpy (&ntm[1], m, msize);
2550   send_to_client (client, &ntm->header, GNUNET_YES);
2551 }
2552
2553
2554 /**
2555  * Deliver P2P message to interested clients.
2556  *
2557  * @param sender who sent us the message?
2558  * @param m the message
2559  * @param msize size of the message (including header)
2560  */
2561 static void
2562 deliver_message (struct Neighbour *sender,
2563                  const struct GNUNET_MessageHeader *m, size_t msize)
2564 {
2565   struct Client *cpos;
2566   uint16_t type;
2567   unsigned int tpos;
2568   int deliver_full;
2569
2570   type = ntohs (m->type);
2571   cpos = clients;
2572   while (cpos != NULL)
2573     {
2574       deliver_full = GNUNET_NO;
2575       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2576         deliver_full = GNUNET_YES;
2577       else
2578         {
2579           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2580             {
2581               if (type != cpos->types[tpos])
2582                 continue;
2583               deliver_full = GNUNET_YES;
2584               break;
2585             }
2586         }
2587       if (GNUNET_YES == deliver_full)
2588         send_p2p_message_to_client (sender, cpos, m, msize);
2589       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2590         send_p2p_message_to_client (sender, cpos, m,
2591                                     sizeof (struct GNUNET_MessageHeader));
2592       cpos = cpos->next;
2593     }
2594 }
2595
2596
2597 /**
2598  * Align P2P message and then deliver to interested clients.
2599  *
2600  * @param sender who sent us the message?
2601  * @param buffer unaligned (!) buffer containing message
2602  * @param msize size of the message (including header)
2603  */
2604 static void
2605 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2606 {
2607   char abuf[msize];
2608
2609   /* TODO: call to statistics? */
2610   memcpy (abuf, buffer, msize);
2611   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2612 }
2613
2614
2615 /**
2616  * Deliver P2P messages to interested clients.
2617  *
2618  * @param sender who sent us the message?
2619  * @param buffer buffer containing messages, can be modified
2620  * @param buffer_size size of the buffer (overall)
2621  * @param offset offset where messages in the buffer start
2622  */
2623 static void
2624 deliver_messages (struct Neighbour *sender,
2625                   const char *buffer, size_t buffer_size, size_t offset)
2626 {
2627   struct GNUNET_MessageHeader *mhp;
2628   struct GNUNET_MessageHeader mh;
2629   uint16_t msize;
2630   int need_align;
2631
2632   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2633     {
2634       if (0 != offset % sizeof (uint16_t))
2635         {
2636           /* outch, need to copy to access header */
2637           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2638           mhp = &mh;
2639         }
2640       else
2641         {
2642           /* can access header directly */
2643           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2644         }
2645       msize = ntohs (mhp->size);
2646       if (msize + offset > buffer_size)
2647         {
2648           /* malformed message, header says it is larger than what
2649              would fit into the overall buffer */
2650           GNUNET_break_op (0);
2651           break;
2652         }
2653 #if HAVE_UNALIGNED_64_ACCESS
2654       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2655 #else
2656       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2657 #endif
2658       if (GNUNET_YES == need_align)
2659         align_and_deliver (sender, &buffer[offset], msize);
2660       else
2661         deliver_message (sender,
2662                          (const struct GNUNET_MessageHeader *)
2663                          &buffer[offset], msize);
2664       offset += msize;
2665     }
2666 }
2667
2668
2669 /**
2670  * We received an encrypted message.  Decrypt, validate and
2671  * pass on to the appropriate clients.
2672  */
2673 static void
2674 handle_encrypted_message (struct Neighbour *n,
2675                           const struct EncryptedMessage *m)
2676 {
2677   size_t size = ntohs (m->header.size);
2678   char buf[size];
2679   struct EncryptedMessage *pt;  /* plaintext */
2680   GNUNET_HashCode ph;
2681   size_t off;
2682   uint32_t snum;
2683   struct GNUNET_TIME_Absolute t;
2684
2685 #if DEBUG_CORE
2686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2687               "Core service receives `%s' request from `%4s'.\n",
2688               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2689 #endif  
2690   /* decrypt */
2691   if (GNUNET_OK !=
2692       do_decrypt (n,
2693                   &m->plaintext_hash,
2694                   &m->sequence_number,
2695                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2696     return;
2697   pt = (struct EncryptedMessage *) buf;
2698
2699   /* validate hash */
2700   GNUNET_CRYPTO_hash (&pt->sequence_number,
2701                       size - ENCRYPTED_HEADER_SIZE, &ph);
2702   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2703     {
2704       /* checksum failed */
2705       GNUNET_break_op (0);
2706       return;
2707     }
2708
2709   /* validate sequence number */
2710   snum = ntohl (pt->sequence_number);
2711   if (n->last_sequence_number_received == snum)
2712     {
2713       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2714                   "Received duplicate message, ignoring.\n");
2715       /* duplicate, ignore */
2716       return;
2717     }
2718   if ((n->last_sequence_number_received > snum) &&
2719       (n->last_sequence_number_received - snum > 32))
2720     {
2721       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2722                   "Received ancient out of sequence message, ignoring.\n");
2723       /* ancient out of sequence, ignore */
2724       return;
2725     }
2726   if (n->last_sequence_number_received > snum)
2727     {
2728       unsigned int rotbit =
2729         1 << (n->last_sequence_number_received - snum - 1);
2730       if ((n->last_packets_bitmap & rotbit) != 0)
2731         {
2732           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2733                       "Received duplicate message, ignoring.\n");
2734           /* duplicate, ignore */
2735           return;
2736         }
2737       n->last_packets_bitmap |= rotbit;
2738     }
2739   if (n->last_sequence_number_received < snum)
2740     {
2741       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2742       n->last_sequence_number_received = snum;
2743     }
2744
2745   /* check timestamp */
2746   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2747   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2748     {
2749       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2750                   _
2751                   ("Message received far too old (%llu ms). Content ignored.\n"),
2752                   GNUNET_TIME_absolute_get_duration (t).value);
2753       return;
2754     }
2755
2756   /* process decrypted message(s) */
2757   update_window (GNUNET_YES,
2758                  &n->available_send_window,
2759                  &n->last_asw_update,
2760                  n->bpm_out);
2761   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2762   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2763                            n->bpm_out_internal_limit);
2764   n->last_activity = GNUNET_TIME_absolute_get ();
2765   off = sizeof (struct EncryptedMessage);
2766   deliver_messages (n, buf, size, off);
2767 }
2768
2769
2770 /**
2771  * Function called by the transport for each received message.
2772  *
2773  * @param cls closure
2774  * @param latency estimated latency for communicating with the
2775  *             given peer
2776  * @param peer (claimed) identity of the other peer
2777  * @param message the message
2778  */
2779 static void
2780 handle_transport_receive (void *cls,
2781                           struct GNUNET_TIME_Relative latency,
2782                           const struct GNUNET_PeerIdentity *peer,
2783                           const struct GNUNET_MessageHeader *message)
2784 {
2785   struct Neighbour *n;
2786   struct GNUNET_TIME_Absolute now;
2787   int up;
2788   uint16_t type;
2789   uint16_t size;
2790
2791 #if DEBUG_CORE
2792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2793               "Received message of type %u from `%4s', demultiplexing.\n",
2794               ntohs (message->type), GNUNET_i2s (peer));
2795 #endif
2796   n = find_neighbour (peer);
2797   if (n == NULL)
2798     {
2799       GNUNET_break (0);
2800       return;
2801     }
2802   n->last_latency = latency;
2803   up = n->status == PEER_STATE_KEY_CONFIRMED;
2804   type = ntohs (message->type);
2805   size = ntohs (message->size);
2806   switch (type)
2807     {
2808     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2809       if (size != sizeof (struct SetKeyMessage))
2810         {
2811           GNUNET_break_op (0);
2812           return;
2813         }
2814       handle_set_key (n, (const struct SetKeyMessage *) message);
2815       break;
2816     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2817       if (size < sizeof (struct EncryptedMessage) +
2818           sizeof (struct GNUNET_MessageHeader))
2819         {
2820           GNUNET_break_op (0);
2821           return;
2822         }
2823       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2824           (n->status != PEER_STATE_KEY_CONFIRMED))
2825         {
2826           GNUNET_break_op (0);
2827           return;
2828         }
2829       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2830       break;
2831     case GNUNET_MESSAGE_TYPE_CORE_PING:
2832       if (size != sizeof (struct PingMessage))
2833         {
2834           GNUNET_break_op (0);
2835           return;
2836         }
2837       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2838           (n->status != PEER_STATE_KEY_CONFIRMED))
2839         {
2840 #if DEBUG_CORE
2841           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2842                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2843                       "PING", GNUNET_i2s (&n->peer));
2844 #endif
2845           GNUNET_free_non_null (n->pending_ping);
2846           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2847           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2848           return;
2849         }
2850       handle_ping (n, (const struct PingMessage *) message);
2851       break;
2852     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2853       if (size != sizeof (struct PingMessage))
2854         {
2855           GNUNET_break_op (0);
2856           return;
2857         }
2858       if ((n->status != PEER_STATE_KEY_SENT) &&
2859           (n->status != PEER_STATE_KEY_RECEIVED) &&
2860           (n->status != PEER_STATE_KEY_CONFIRMED))
2861         {
2862           /* could not decrypt pong, oops! */
2863           GNUNET_break_op (0);
2864           return;
2865         }
2866       handle_pong (n, (const struct PingMessage *) message);
2867       break;
2868     default:
2869       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2870                   _("Unsupported message of type %u received.\n"), type);
2871       return;
2872     }
2873   if (n->status == PEER_STATE_KEY_CONFIRMED)
2874     {
2875       now = GNUNET_TIME_absolute_get ();
2876       n->last_activity = now;
2877       if (!up)
2878         n->time_established = now;
2879     }
2880 }
2881
2882
2883 /**
2884  * Function that recalculates the bandwidth quota for the
2885  * given neighbour and transmits it to the transport service.
2886  * 
2887  * @param cls neighbour for the quota update
2888  * @param tc context
2889  */
2890 static void
2891 neighbour_quota_update (void *cls,
2892                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2893
2894
2895 /**
2896  * Schedule the task that will recalculate the bandwidth
2897  * quota for this peer (and possibly force a disconnect of
2898  * idle peers by calculating a bandwidth of zero).
2899  */
2900 static void
2901 schedule_quota_update (struct Neighbour *n)
2902 {
2903   GNUNET_assert (n->quota_update_task ==
2904                  GNUNET_SCHEDULER_NO_TASK);
2905   n->quota_update_task
2906     = GNUNET_SCHEDULER_add_delayed (sched,
2907                                     QUOTA_UPDATE_FREQUENCY,
2908                                     &neighbour_quota_update,
2909                                     n);
2910 }
2911
2912
2913 /**
2914  * Function that recalculates the bandwidth quota for the
2915  * given neighbour and transmits it to the transport service.
2916  * 
2917  * @param cls neighbour for the quota update
2918  * @param tc context
2919  */
2920 static void
2921 neighbour_quota_update (void *cls,
2922                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2923 {
2924   struct Neighbour *n = cls;
2925   uint32_t q_in;
2926   double pref_rel;
2927   double share;
2928   unsigned long long distributable;
2929   
2930   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2931   /* calculate relative preference among all neighbours;
2932      divides by a bit more to avoid division by zero AND to
2933      account for possibility of new neighbours joining any time 
2934      AND to convert to double... */
2935   pref_rel = n->current_preference / (1.0 + preference_sum);
2936   distributable = 0;
2937   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2938     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2939   share = distributable * pref_rel;
2940   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2941   /* check if we want to disconnect for good due to inactivity */
2942   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2943        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2944     q_in = 0; /* force disconnect */
2945   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2946        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2947     {
2948       n->bpm_in = q_in;
2949       GNUNET_TRANSPORT_set_quota (transport,
2950                                   &n->peer,
2951                                   n->bpm_in, 
2952                                   n->bpm_out,
2953                                   GNUNET_TIME_UNIT_FOREVER_REL,
2954                                   NULL, NULL);
2955     }
2956   schedule_quota_update (n);
2957 }
2958
2959
2960 /**
2961  * Function called by transport to notify us that
2962  * a peer connected to us (on the network level).
2963  *
2964  * @param cls closure
2965  * @param peer the peer that connected
2966  * @param latency current latency of the connection
2967  */
2968 static void
2969 handle_transport_notify_connect (void *cls,
2970                                  const struct GNUNET_PeerIdentity *peer,
2971                                  struct GNUNET_TIME_Relative latency)
2972 {
2973   struct Neighbour *n;
2974   struct GNUNET_TIME_Absolute now;
2975   struct ConnectNotifyMessage cnm;
2976
2977   n = find_neighbour (peer);
2978   if (n != NULL)
2979     {
2980       /* duplicate connect notification!? */
2981       GNUNET_break (0);
2982       return;
2983     }
2984   now = GNUNET_TIME_absolute_get ();
2985   n = GNUNET_malloc (sizeof (struct Neighbour));
2986   n->next = neighbours;
2987   neighbours = n;
2988   neighbour_count++;
2989   n->peer = *peer;
2990   n->last_latency = latency;
2991   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2992   n->encrypt_key_created = now;
2993   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2994   n->last_asw_update = now;
2995   n->last_arw_update = now;
2996   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2997   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2998   n->bpm_out_internal_limit = (uint32_t) - 1;
2999   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3000   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3001                                                 (uint32_t) - 1);
3002 #if DEBUG_CORE
3003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3004               "Received connection from `%4s'.\n",
3005               GNUNET_i2s (&n->peer));
3006 #endif
3007   schedule_quota_update (n);
3008   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3009   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
3010   cnm.reserved = htonl (0);
3011   cnm.peer = *peer;
3012   send_to_all_clients (&cnm.header, GNUNET_YES);
3013 }
3014
3015
3016 /**
3017  * Free the given entry for the neighbour (it has
3018  * already been removed from the list at this point).
3019  *
3020  * @param n neighbour to free
3021  */
3022 static void
3023 free_neighbour (struct Neighbour *n)
3024 {
3025   struct MessageEntry *m;
3026
3027   while (NULL != (m = n->messages))
3028     {
3029       n->messages = m->next;
3030       GNUNET_free (m);
3031     }
3032   while (NULL != (m = n->encrypted_head))
3033     {
3034       n->encrypted_head = m->next;
3035       GNUNET_free (m);
3036     }
3037   if (NULL != n->th)
3038     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3039   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3040     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3041   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3042     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3043   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3044     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3045   GNUNET_free_non_null (n->public_key);
3046   GNUNET_free_non_null (n->pending_ping);
3047   GNUNET_free (n);
3048 }
3049
3050
3051 /**
3052  * Function called by transport telling us that a peer
3053  * disconnected.
3054  *
3055  * @param cls closure
3056  * @param peer the peer that disconnected
3057  */
3058 static void
3059 handle_transport_notify_disconnect (void *cls,
3060                                     const struct GNUNET_PeerIdentity *peer)
3061 {
3062   struct ConnectNotifyMessage cnm;
3063   struct Neighbour *n;
3064   struct Neighbour *p;
3065
3066 #if DEBUG_CORE
3067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3068               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3069 #endif
3070   p = NULL;
3071   n = neighbours;
3072   while ((n != NULL) &&
3073          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3074     {
3075       p = n;
3076       n = n->next;
3077     }
3078   if (n == NULL)
3079     {
3080       GNUNET_break (0);
3081       return;
3082     }
3083   if (p == NULL)
3084     neighbours = n->next;
3085   else
3086     p->next = n->next;
3087   GNUNET_assert (neighbour_count > 0);
3088   neighbour_count--;
3089   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3090   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3091   cnm.reserved = htonl (0);
3092   cnm.peer = *peer;
3093   send_to_all_clients (&cnm.header, GNUNET_YES);
3094   free_neighbour (n);
3095 }
3096
3097
3098 /**
3099  * Last task run during shutdown.  Disconnects us from
3100  * the transport.
3101  */
3102 static void
3103 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3104 {
3105   struct Neighbour *n;
3106   struct Client *c;
3107
3108 #if DEBUG_CORE
3109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3110               "Core service shutting down.\n");
3111 #endif
3112   GNUNET_assert (transport != NULL);
3113   GNUNET_TRANSPORT_disconnect (transport);
3114   transport = NULL;
3115   while (NULL != (n = neighbours))
3116     {
3117       neighbours = n->next;
3118       GNUNET_assert (neighbour_count > 0);
3119       neighbour_count--;
3120       free_neighbour (n);
3121     }
3122   while (NULL != (c = clients))
3123     handle_client_disconnect (NULL, c->client_handle);
3124   if (my_private_key != NULL)
3125     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3126 }
3127
3128
3129 /**
3130  * Initiate core service.
3131  *
3132  * @param cls closure
3133  * @param s scheduler to use
3134  * @param serv the initialized server
3135  * @param c configuration to use
3136  */
3137 static void
3138 run (void *cls,
3139      struct GNUNET_SCHEDULER_Handle *s,
3140      struct GNUNET_SERVER_Handle *serv,
3141      const struct GNUNET_CONFIGURATION_Handle *c)
3142 {
3143 #if 0
3144   unsigned long long qin;
3145   unsigned long long qout;
3146   unsigned long long tneigh;
3147 #endif
3148   char *keyfile;
3149
3150   sched = s;
3151   cfg = c;
3152   /* parse configuration */
3153   if (
3154        (GNUNET_OK !=
3155         GNUNET_CONFIGURATION_get_value_number (c,
3156                                                "CORE",
3157                                                "TOTAL_QUOTA_IN",
3158                                                &bandwidth_target_in)) ||
3159        (GNUNET_OK !=
3160         GNUNET_CONFIGURATION_get_value_number (c,
3161                                                "CORE",
3162                                                "TOTAL_QUOTA_OUT",
3163                                                &bandwidth_target_out)) ||
3164 #if 0
3165        (GNUNET_OK !=
3166         GNUNET_CONFIGURATION_get_value_number (c,
3167                                                "CORE",
3168                                                "YY",
3169                                                &qout)) ||
3170        (GNUNET_OK !=
3171         GNUNET_CONFIGURATION_get_value_number (c,
3172                                                "CORE",
3173                                                "ZZ_LIMIT", &tneigh)) ||
3174 #endif
3175        (GNUNET_OK !=
3176         GNUNET_CONFIGURATION_get_value_filename (c,
3177                                                  "GNUNETD",
3178                                                  "HOSTKEY", &keyfile)))
3179     {
3180       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3181                   _
3182                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3183       GNUNET_SCHEDULER_shutdown (s);
3184       return;
3185     }
3186   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3187   GNUNET_free (keyfile);
3188   if (my_private_key == NULL)
3189     {
3190       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3191                   _("Core service could not access hostkey.  Exiting.\n"));
3192       GNUNET_SCHEDULER_shutdown (s);
3193       return;
3194     }
3195   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3196   GNUNET_CRYPTO_hash (&my_public_key,
3197                       sizeof (my_public_key), &my_identity.hashPubKey);
3198   /* setup notification */
3199   server = serv;
3200   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3201   /* setup transport connection */
3202   transport = GNUNET_TRANSPORT_connect (sched,
3203                                         cfg,
3204                                         NULL,
3205                                         &handle_transport_receive,
3206                                         &handle_transport_notify_connect,
3207                                         &handle_transport_notify_disconnect);
3208   GNUNET_assert (NULL != transport);
3209   GNUNET_SCHEDULER_add_delayed (sched,
3210                                 GNUNET_TIME_UNIT_FOREVER_REL,
3211                                 &cleaning_task, NULL);
3212   /* process client requests */
3213   GNUNET_SERVER_add_handlers (server, handlers);
3214   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3215               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3216 }
3217
3218
3219
3220 /**
3221  * The main function for the transport service.
3222  *
3223  * @param argc number of arguments from the command line
3224  * @param argv command line arguments
3225  * @return 0 ok, 1 on error
3226  */
3227 int
3228 main (int argc, char *const *argv)
3229 {
3230   return (GNUNET_OK ==
3231           GNUNET_SERVICE_run (argc,
3232                               argv,
3233                               "core",
3234                               GNUNET_SERVICE_OPTION_NONE,
3235                               &run, NULL)) ? 0 : 1;
3236 }
3237
3238 /* end of gnunet-service-core.c */