fixes
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Identity of the neighbour.
364    */
365   struct GNUNET_PeerIdentity peer;
366
367   /**
368    * Key we use to encrypt our messages for the other peer
369    * (initialized by us when we do the handshake).
370    */
371   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
372
373   /**
374    * Key we use to decrypt messages from the other peer
375    * (given to us by the other peer during the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
378
379   /**
380    * ID of task used for re-trying plaintext scheduling.
381    */
382   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
383
384   /**
385    * ID of task used for re-trying SET_KEY and PING message.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
388
389   /**
390    * ID of task used for updating bandwidth quota for this neighbour.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
393
394   /**
395    * At what time did we generate our encryption key?
396    */
397   struct GNUNET_TIME_Absolute encrypt_key_created;
398
399   /**
400    * At what time did the other peer generate the decryption key?
401    */
402   struct GNUNET_TIME_Absolute decrypt_key_created;
403
404   /**
405    * At what time did we initially establish (as in, complete session
406    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
407    */
408   struct GNUNET_TIME_Absolute time_established;
409
410   /**
411    * At what time did we last receive an encrypted message from the
412    * other peer?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute last_activity;
415
416   /**
417    * Last latency observed from this peer.
418    */
419   struct GNUNET_TIME_Relative last_latency;
420
421   /**
422    * At what frequency are we currently re-trying SET_KEY messages?
423    */
424   struct GNUNET_TIME_Relative set_key_retry_frequency;
425
426   /**
427    * Time of our last update to the "available_send_window".
428    */
429   struct GNUNET_TIME_Absolute last_asw_update;
430
431   /**
432    * Time of our last update to the "available_recv_window".
433    */
434   struct GNUNET_TIME_Absolute last_arw_update;
435
436   /**
437    * Number of bytes that we are eligible to transmit to this
438    * peer at this point.  Incremented every minute by max_out_bpm,
439    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
440    * bandwidth-hogs are sampled at a frequency of about 78s!);
441    * may get negative if we have VERY high priority content.
442    */
443   long long available_send_window; 
444
445   /**
446    * How much downstream capacity of this peer has been reserved for
447    * our traffic?  (Our clients can request that a certain amount of
448    * bandwidth is available for replies to them; this value is used to
449    * make sure that this reserved amount of bandwidth is actually
450    * available).
451    */
452   long long available_recv_window; 
453
454   /**
455    * How valueable were the messages of this peer recently?
456    */
457   unsigned long long current_preference;
458
459   /**
460    * Bit map indicating which of the 32 sequence numbers before the last
461    * were received (good for accepting out-of-order packets and
462    * estimating reliability of the connection)
463    */
464   unsigned int last_packets_bitmap;
465
466   /**
467    * Number of messages in the message queue for this peer.
468    */
469   unsigned int message_queue_size;
470
471   /**
472    * last sequence number received on this connection (highest)
473    */
474   uint32_t last_sequence_number_received;
475
476   /**
477    * last sequence number transmitted
478    */
479   uint32_t last_sequence_number_sent;
480
481   /**
482    * Available bandwidth in for this peer (current target).
483    */
484   uint32_t bpm_in;
485
486   /**
487    * Available bandwidth out for this peer (current target).
488    */
489   uint32_t bpm_out;
490
491   /**
492    * Internal bandwidth limit set for this peer (initially
493    * typically set to "-1").  "bpm_out" is MAX of
494    * "bpm_out_internal_limit" and "bpm_out_external_limit".
495    */
496   uint32_t bpm_out_internal_limit;
497
498   /**
499    * External bandwidth limit set for this peer by the
500    * peer that we are communicating with.  "bpm_out" is MAX of
501    * "bpm_out_internal_limit" and "bpm_out_external_limit".
502    */
503   uint32_t bpm_out_external_limit;
504
505   /**
506    * What was our PING challenge number (for this peer)?
507    */
508   uint32_t ping_challenge;
509
510   /**
511    * What is our connection status?
512    */
513   enum PeerStateMachine status;
514
515 };
516
517
518 /**
519  * Events are messages for clients.  The struct
520  * itself is followed by the actual message.
521  */
522 struct Event
523 {
524   /**
525    * This is a linked list.
526    */
527   struct Event *next;
528
529   /**
530    * Size of the message.
531    */
532   size_t size;
533
534   /**
535    * Could this event be dropped if this queue
536    * is getting too large? (NOT YET USED!)
537    */
538   int can_drop;
539
540 };
541
542
543 /**
544  * Data structure for each client connected to the core service.
545  */
546 struct Client
547 {
548   /**
549    * Clients are kept in a linked list.
550    */
551   struct Client *next;
552
553   /**
554    * Handle for the client with the server API.
555    */
556   struct GNUNET_SERVER_Client *client_handle;
557
558   /**
559    * Linked list of messages we still need to deliver to
560    * this client.
561    */
562   struct Event *event_head;
563
564   /**
565    * Tail of the linked list of events.
566    */
567   struct Event *event_tail;
568
569   /**
570    * Current transmit handle, NULL if no transmission request
571    * is pending.
572    */
573   struct GNUNET_CONNECTION_TransmitHandle *th;
574
575   /**
576    * Array of the types of messages this peer cares
577    * about (with "tcnt" entries).  Allocated as part
578    * of this client struct, do not free!
579    */
580   uint16_t *types;
581
582   /**
583    * Options for messages this client cares about,
584    * see GNUNET_CORE_OPTION_ values.
585    */
586   uint32_t options;
587
588   /**
589    * Number of types of incoming messages this client
590    * specifically cares about.  Size of the "types" array.
591    */
592   unsigned int tcnt;
593
594 };
595
596
597 /**
598  * Our public key.
599  */
600 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
601
602 /**
603  * Our identity.
604  */
605 static struct GNUNET_PeerIdentity my_identity;
606
607 /**
608  * Our private key.
609  */
610 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
611
612 /**
613  * Our scheduler.
614  */
615 struct GNUNET_SCHEDULER_Handle *sched;
616
617 /**
618  * Our configuration.
619  */
620 const struct GNUNET_CONFIGURATION_Handle *cfg;
621
622 /**
623  * Our server.
624  */
625 static struct GNUNET_SERVER_Handle *server;
626
627 /**
628  * Transport service.
629  */
630 static struct GNUNET_TRANSPORT_Handle *transport;
631
632 /**
633  * Linked list of our clients.
634  */
635 static struct Client *clients;
636
637 /**
638  * We keep neighbours in a linked list (for now).
639  */
640 static struct Neighbour *neighbours;
641
642 /**
643  * Sum of all preferences among all neighbours.
644  */
645 static unsigned long long preference_sum;
646
647 /**
648  * Total number of neighbours we have.
649  */
650 static unsigned int neighbour_count;
651
652 /**
653  * How much inbound bandwidth are we supposed to be using?
654  */
655 static unsigned long long bandwidth_target_in;
656
657 /**
658  * How much outbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_out;
661
662
663
664 /**
665  * A preference value for a neighbour was update.  Update
666  * the preference sum accordingly.
667  *
668  * @param inc how much was a preference value increased?
669  */
670 static void
671 update_preference_sum (unsigned long long inc)
672 {
673   struct Neighbour *n;
674   unsigned long long os;
675
676   os = preference_sum;
677   preference_sum += inc;
678   if (preference_sum >= os)
679     return; /* done! */
680   /* overflow! compensate by cutting all values in half! */
681   preference_sum = 0;
682   n = neighbours;
683   while (n != NULL)
684     {
685       n->current_preference /= 2;
686       preference_sum += n->current_preference;
687       n = n->next;
688     }    
689 }
690
691
692 /**
693  * Recalculate the number of bytes we expect to
694  * receive or transmit in a given window.
695  *
696  * @param force force an update now (even if not much time has passed)
697  * @param window pointer to the byte counter (updated)
698  * @param ts pointer to the timestamp (updated)
699  * @param bpm number of bytes per minute that should
700  *        be added to the window.
701  */
702 static void
703 update_window (int force,
704                long long *window,
705                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
706 {
707   struct GNUNET_TIME_Relative since;
708
709   since = GNUNET_TIME_absolute_get_duration (*ts);
710   if ( (force == GNUNET_NO) &&
711        (since.value < 60 * 1000) )
712     return;                     /* not even a minute has passed */
713   *ts = GNUNET_TIME_absolute_get ();
714   *window += (bpm * since.value) / 60 / 1000;
715   if (*window > MAX_WINDOW_TIME * bpm)
716     *window = MAX_WINDOW_TIME * bpm;
717 }
718
719
720 /**
721  * Find the entry for the given neighbour.
722  *
723  * @param peer identity of the neighbour
724  * @return NULL if we are not connected, otherwise the
725  *         neighbour's entry.
726  */
727 static struct Neighbour *
728 find_neighbour (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct Neighbour *ret;
731
732   ret = neighbours;
733   while ((ret != NULL) &&
734          (0 != memcmp (&ret->peer,
735                        peer, sizeof (struct GNUNET_PeerIdentity))))
736     ret = ret->next;
737   return ret;
738 }
739
740
741 /**
742  * Find the entry for the given client.
743  *
744  * @param client handle for the client
745  * @return NULL if we are not connected, otherwise the
746  *         client's struct.
747  */
748 static struct Client *
749 find_client (const struct GNUNET_SERVER_Client *client)
750 {
751   struct Client *ret;
752
753   ret = clients;
754   while ((ret != NULL) && (client != ret->client_handle))
755     ret = ret->next;
756   return ret;
757 }
758
759
760 /**
761  * If necessary, initiate a request with the server to
762  * transmit messages from the queue of the given client.
763  * @param client who to transfer messages to
764  */
765 static void request_transmit (struct Client *client);
766
767
768 /**
769  * Client is ready to receive data, provide it.
770  *
771  * @param cls closure
772  * @param size number of bytes available in buf
773  * @param buf where the callee should write the message
774  * @return number of bytes written to buf
775  */
776 static size_t
777 do_client_transmit (void *cls, size_t size, void *buf)
778 {
779   struct Client *client = cls;
780   struct Event *e;
781   char *tgt;
782   size_t ret;
783
784   client->th = NULL;
785 #if DEBUG_CORE_CLIENT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Client ready to receive %u bytes.\n", size);
788 #endif
789   if (buf == NULL)
790     {
791 #if DEBUG_CORE
792       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                   "Failed to transmit data to client (disconnect)?\n");
794 #endif
795       return 0;                 /* we'll surely get a disconnect soon... */
796     }
797   tgt = buf;
798   ret = 0;
799   while ((NULL != (e = client->event_head)) && (e->size <= size))
800     {
801       memcpy (&tgt[ret], &e[1], e->size);
802       size -= e->size;
803       ret += e->size;
804       client->event_head = e->next;
805       GNUNET_free (e);
806     }
807   GNUNET_assert (ret > 0);
808   if (client->event_head == NULL)
809     client->event_tail = NULL;
810 #if DEBUG_CORE_CLIENT
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Transmitting %u bytes to client\n", ret);
813 #endif
814   request_transmit (client);
815   return ret;
816 }
817
818
819 /**
820  * If necessary, initiate a request with the server to
821  * transmit messages from the queue of the given client.
822  * @param client who to transfer messages to
823  */
824 static void
825 request_transmit (struct Client *client)
826 {
827
828   if (NULL != client->th)
829     return;                     /* already pending */
830   if (NULL == client->event_head)
831     return;                     /* no more events pending */
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Asking server to transmit %u bytes to client\n",
835               client->event_head->size);
836 #endif
837   client->th
838     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
839                                            client->event_head->size,
840                                            GNUNET_TIME_UNIT_FOREVER_REL,
841                                            &do_client_transmit, client);
842 }
843
844
845 /**
846  * Send a message to one of our clients.
847  *
848  * @param client target for the message
849  * @param msg message to transmit
850  * @param can_drop could this message be dropped if the
851  *        client's queue is getting too large?
852  */
853 static void
854 send_to_client (struct Client *client,
855                 const struct GNUNET_MessageHeader *msg, int can_drop)
856 {
857   struct Event *e;
858   unsigned int queue_size;
859   uint16_t msize;
860
861 #if DEBUG_CORE_CLIENT
862   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863               "Preparing to send message of type %u to client.\n",
864               ntohs (msg->type));
865 #endif
866   queue_size = 0;
867   e = client->event_head;
868   while (e != NULL)
869     {
870       queue_size++;
871       e = e->next;
872     }
873   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
874        (can_drop == GNUNET_YES) )
875     {
876 #if DEBUG_CORE
877       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
878                   "Too many messages in queue for the client, dropping the new message.\n");
879 #endif
880       return;
881     }
882
883   msize = ntohs (msg->size);
884   e = GNUNET_malloc (sizeof (struct Event) + msize);
885   /* append */
886   if (client->event_tail != NULL)
887     client->event_tail->next = e;
888   else
889     client->event_head = e;
890   client->event_tail = e;
891   e->can_drop = can_drop;
892   e->size = msize;
893   memcpy (&e[1], msg, msize);
894   request_transmit (client);
895 }
896
897
898 /**
899  * Send a message to all of our current clients.
900  */
901 static void
902 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
903 {
904   struct Client *c;
905
906   c = clients;
907   while (c != NULL)
908     {
909       send_to_client (c, msg, can_drop);
910       c = c->next;
911     }
912 }
913
914
915 /**
916  * Handle CORE_INIT request.
917  */
918 static void
919 handle_client_init (void *cls,
920                     struct GNUNET_SERVER_Client *client,
921                     const struct GNUNET_MessageHeader *message)
922 {
923   const struct InitMessage *im;
924   struct InitReplyMessage irm;
925   struct Client *c;
926   uint16_t msize;
927   const uint16_t *types;
928   struct Neighbour *n;
929   struct ConnectNotifyMessage cnm;
930
931 #if DEBUG_CORE_CLIENT
932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933               "Client connecting to core service with `%s' message\n",
934               "INIT");
935 #endif
936   /* check that we don't have an entry already */
937   c = clients;
938   while (c != NULL)
939     {
940       if (client == c->client_handle)
941         {
942           GNUNET_break (0);
943           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
944           return;
945         }
946       c = c->next;
947     }
948   msize = ntohs (message->size);
949   if (msize < sizeof (struct InitMessage))
950     {
951       GNUNET_break (0);
952       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
953       return;
954     }
955   im = (const struct InitMessage *) message;
956   types = (const uint16_t *) &im[1];
957   msize -= sizeof (struct InitMessage);
958   c = GNUNET_malloc (sizeof (struct Client) + msize);
959   c->client_handle = client;
960   c->next = clients;
961   clients = c;
962   memcpy (&c[1], types, msize);
963   c->types = (uint16_t *) & c[1];
964   c->options = ntohl (im->options);
965   c->tcnt = msize / sizeof (uint16_t);
966   /* send init reply message */
967   irm.header.size = htons (sizeof (struct InitReplyMessage));
968   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
969   irm.reserved = htonl (0);
970   memcpy (&irm.publicKey,
971           &my_public_key,
972           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
973 #if DEBUG_CORE_CLIENT
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "Sending `%s' message to client.\n", "INIT_REPLY");
976 #endif
977   send_to_client (c, &irm.header, GNUNET_NO);
978   /* notify new client about existing neighbours */
979   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
980   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
981   n = neighbours;
982   while (n != NULL)
983     {
984 #if DEBUG_CORE_CLIENT
985       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
987 #endif
988       cnm.reserved = htonl (0);
989       cnm.peer = n->peer;
990       send_to_client (c, &cnm.header, GNUNET_NO);
991       n = n->next;
992     }
993   GNUNET_SERVER_receive_done (client, GNUNET_OK);
994 }
995
996
997 /**
998  * A client disconnected, clean up.
999  *
1000  * @param cls closure
1001  * @param client identification of the client
1002  */
1003 static void
1004 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1005 {
1006   struct Client *pos;
1007   struct Client *prev;
1008   struct Event *e;
1009
1010 #if DEBUG_CORE_CLIENT
1011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012               "Client has disconnected from core service.\n");
1013 #endif
1014   prev = NULL;
1015   pos = clients;
1016   while (pos != NULL)
1017     {
1018       if (client == pos->client_handle)
1019         {
1020           if (prev == NULL)
1021             clients = pos->next;
1022           else
1023             prev->next = pos->next;
1024           if (pos->th != NULL)
1025             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1026           while (NULL != (e = pos->event_head))
1027             {
1028               pos->event_head = e->next;
1029               GNUNET_free (e);
1030             }
1031           GNUNET_free (pos);
1032           return;
1033         }
1034       prev = pos;
1035       pos = pos->next;
1036     }
1037   /* client never sent INIT */
1038 }
1039
1040
1041 /**
1042  * Handle REQUEST_CONFIGURE request.
1043  */
1044 static void
1045 handle_client_request_configure (void *cls,
1046                                  struct GNUNET_SERVER_Client *client,
1047                                  const struct GNUNET_MessageHeader *message)
1048 {
1049   const struct RequestConfigureMessage *rcm;
1050   struct Neighbour *n;
1051   struct ConfigurationInfoMessage cim;
1052   struct Client *c;
1053   int reserv;
1054   unsigned long long old_preference;
1055
1056 #if DEBUG_CORE_CLIENT
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Core service receives `%s' request.\n", "CONFIGURE");
1059 #endif
1060   rcm = (const struct RequestConfigureMessage *) message;
1061   n = find_neighbour (&rcm->peer);
1062   memset (&cim, 0, sizeof (cim));
1063   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1064     {
1065       update_window (GNUNET_YES,
1066                      &n->available_send_window,
1067                      &n->last_asw_update,
1068                      n->bpm_out);
1069       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1070       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1071                                n->bpm_out_external_limit);
1072       reserv = ntohl (rcm->reserve_inbound);
1073       if (reserv < 0)
1074         {
1075           n->available_recv_window += reserv;
1076         }
1077       else if (reserv > 0)
1078         {
1079           update_window (GNUNET_NO,
1080                          &n->available_recv_window,
1081                          &n->last_arw_update, n->bpm_in);
1082           if (n->available_recv_window < reserv)
1083             reserv = n->available_recv_window;
1084           n->available_recv_window -= reserv;
1085         }
1086       old_preference = n->current_preference;
1087       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1088       if (old_preference > n->current_preference) 
1089         {
1090           /* overflow; cap at maximum value */
1091           n->current_preference = (unsigned long long) -1;
1092         }
1093       update_preference_sum (n->current_preference - old_preference);
1094       cim.reserved_amount = htonl (reserv);
1095       cim.bpm_in = htonl (n->bpm_in);
1096       cim.bpm_out = htonl (n->bpm_out);
1097       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1098       cim.preference = n->current_preference;
1099     }
1100   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1101   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1102   cim.peer = rcm->peer;
1103   c = find_client (client);
1104   if (c == NULL)
1105     {
1106       GNUNET_break (0);
1107       return;
1108     }
1109 #if DEBUG_CORE_CLIENT
1110   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1112 #endif
1113   send_to_client (c, &cim.header, GNUNET_NO);
1114 }
1115
1116
1117 /**
1118  * Check if we have encrypted messages for the specified neighbour
1119  * pending, and if so, check with the transport about sending them
1120  * out.
1121  *
1122  * @param n neighbour to check.
1123  */
1124 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1125
1126
1127 /**
1128  * Function called when the transport service is ready to
1129  * receive an encrypted message for the respective peer
1130  *
1131  * @param cls neighbour to use message from
1132  * @param size number of bytes we can transmit
1133  * @param buf where to copy the message
1134  * @return number of bytes transmitted
1135  */
1136 static size_t
1137 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1138 {
1139   struct Neighbour *n = cls;
1140   struct MessageEntry *m;
1141   size_t ret;
1142   char *cbuf;
1143
1144   n->th = NULL;
1145   GNUNET_assert (NULL != (m = n->encrypted_head));
1146   n->encrypted_head = m->next;
1147   if (m->next == NULL)
1148     n->encrypted_tail = NULL;
1149   ret = 0;
1150   cbuf = buf;
1151   if (buf != NULL)
1152     {
1153       GNUNET_assert (size >= m->size);
1154       memcpy (cbuf, &m[1], m->size);
1155       ret = m->size;
1156       n->available_send_window -= m->size;
1157       process_encrypted_neighbour_queue (n);
1158 #if DEBUG_CORE
1159       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1161                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1162                   ret, GNUNET_i2s (&n->peer));
1163 #endif
1164     }
1165   else
1166     {
1167       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1168                   "Transmission for message of type %u and size %u failed\n",
1169                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1170                   m->size);
1171     }
1172   GNUNET_free (m);
1173   return ret;
1174 }
1175
1176
1177 /**
1178  * Check if we have plaintext messages for the specified neighbour
1179  * pending, and if so, consider batching and encrypting them (and
1180  * then trigger processing of the encrypted queue if needed).
1181  *
1182  * @param n neighbour to check.
1183  */
1184 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1185
1186
1187 /**
1188  * Check if we have encrypted messages for the specified neighbour
1189  * pending, and if so, check with the transport about sending them
1190  * out.
1191  *
1192  * @param n neighbour to check.
1193  */
1194 static void
1195 process_encrypted_neighbour_queue (struct Neighbour *n)
1196 {
1197   struct MessageEntry *m;
1198  
1199   if (n->th != NULL)
1200     return;  /* request already pending */
1201   if (n->encrypted_head == NULL)
1202     {
1203       /* encrypted queue empty, try plaintext instead */
1204       process_plaintext_neighbour_queue (n);
1205       return;
1206     }
1207 #if DEBUG_CORE
1208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1210               n->encrypted_head->size,
1211               GNUNET_i2s (&n->peer),
1212               GNUNET_TIME_absolute_get_remaining (n->
1213                                                   encrypted_head->deadline).
1214               value);
1215 #endif
1216   n->th =
1217     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1218                                             n->encrypted_head->size,
1219                                             n->encrypted_head->priority,
1220                                             GNUNET_TIME_absolute_get_remaining
1221                                             (n->encrypted_head->deadline),
1222                                             &notify_encrypted_transmit_ready,
1223                                             n);
1224   if (n->th == NULL)
1225     {
1226       /* message request too large (oops) */
1227       GNUNET_break (0);
1228       /* discard encrypted message */
1229       GNUNET_assert (NULL != (m = n->encrypted_head));
1230       n->encrypted_head = m->next;
1231       if (m->next == NULL)
1232         n->encrypted_tail = NULL;
1233       GNUNET_free (m);
1234       process_encrypted_neighbour_queue (n);
1235     }
1236 }
1237
1238
1239 /**
1240  * Decrypt size bytes from in and write the result to out.  Use the
1241  * key for inbound traffic of the given neighbour.  This function does
1242  * NOT do any integrity-checks on the result.
1243  *
1244  * @param n neighbour we are receiving from
1245  * @param iv initialization vector to use
1246  * @param in ciphertext
1247  * @param out plaintext
1248  * @param size size of in/out
1249  * @return GNUNET_OK on success
1250  */
1251 static int
1252 do_decrypt (struct Neighbour *n,
1253             const GNUNET_HashCode * iv,
1254             const void *in, void *out, size_t size)
1255 {
1256   if (size != (uint16_t) size)
1257     {
1258       GNUNET_break (0);
1259       return GNUNET_NO;
1260     }
1261   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1262       (n->status != PEER_STATE_KEY_CONFIRMED))
1263     {
1264       GNUNET_break_op (0);
1265       return GNUNET_SYSERR;
1266     }
1267   if (size !=
1268       GNUNET_CRYPTO_aes_decrypt (in,
1269                                  (uint16_t) size,
1270                                  &n->decrypt_key,
1271                                  (const struct
1272                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1273                                  out))
1274     {
1275       GNUNET_break (0);
1276       return GNUNET_SYSERR;
1277     }
1278 #if DEBUG_CORE
1279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1280               "Decrypted %u bytes from `%4s' using key %u\n",
1281               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1282 #endif
1283   return GNUNET_OK;
1284 }
1285
1286
1287 /**
1288  * Encrypt size bytes from in and write the result to out.  Use the
1289  * key for outbound traffic of the given neighbour.
1290  *
1291  * @param n neighbour we are sending to
1292  * @param iv initialization vector to use
1293  * @param in ciphertext
1294  * @param out plaintext
1295  * @param size size of in/out
1296  * @return GNUNET_OK on success
1297  */
1298 static int
1299 do_encrypt (struct Neighbour *n,
1300             const GNUNET_HashCode * iv,
1301             const void *in, void *out, size_t size)
1302 {
1303   if (size != (uint16_t) size)
1304     {
1305       GNUNET_break (0);
1306       return GNUNET_NO;
1307     }
1308   GNUNET_assert (size ==
1309                  GNUNET_CRYPTO_aes_encrypt (in,
1310                                             (uint16_t) size,
1311                                             &n->encrypt_key,
1312                                             (const struct
1313                                              GNUNET_CRYPTO_AesInitializationVector
1314                                              *) iv, out));
1315 #if DEBUG_CORE
1316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317               "Encrypted %u bytes for `%4s' using key %u\n", size,
1318               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1319 #endif
1320   return GNUNET_OK;
1321 }
1322
1323
1324 /**
1325  * Select messages for transmission.  This heuristic uses a combination
1326  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1327  * and priority-based discard (in case no feasible schedule exist) and
1328  * speculative optimization (defer any kind of transmission until
1329  * we either create a batch of significant size, 25% of max, or until
1330  * we are close to a deadline).  Furthermore, when scheduling the
1331  * heuristic also packs as many messages into the batch as possible,
1332  * starting with those with the earliest deadline.  Yes, this is fun.
1333  *
1334  * @param n neighbour to select messages from
1335  * @param size number of bytes to select for transmission
1336  * @param retry_time set to the time when we should try again
1337  *        (only valid if this function returns zero)
1338  * @return number of bytes selected, or 0 if we decided to
1339  *         defer scheduling overall; in that case, retry_time is set.
1340  */
1341 static size_t
1342 select_messages (struct Neighbour *n,
1343                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1344 {
1345   struct MessageEntry *pos;
1346   struct MessageEntry *min;
1347   struct MessageEntry *last;
1348   unsigned int min_prio;
1349   struct GNUNET_TIME_Absolute t;
1350   struct GNUNET_TIME_Absolute now;
1351   uint64_t delta;
1352   uint64_t avail;
1353   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1354   size_t off;
1355   int discard_low_prio;
1356
1357   GNUNET_assert (NULL != n->messages);
1358   now = GNUNET_TIME_absolute_get ();
1359   /* last entry in linked list of messages processed */
1360   last = NULL;
1361   /* should we remove the entry with the lowest
1362      priority from consideration for scheduling at the
1363      end of the loop? */
1364   discard_low_prio = GNUNET_YES;
1365   while (GNUNET_YES == discard_low_prio)
1366     {
1367       min = NULL;
1368       min_prio = -1;
1369       discard_low_prio = GNUNET_NO;
1370       /* calculate number of bytes available for transmission at time "t" */
1371       update_window (GNUNET_NO,
1372                      &n->available_send_window,
1373                      &n->last_asw_update,
1374                      n->bpm_out);
1375       avail = n->available_send_window;
1376       t = n->last_asw_update;
1377       /* how many bytes have we (hypothetically) scheduled so far */
1378       off = 0;
1379       /* maximum time we can wait before transmitting anything
1380          and still make all of our deadlines */
1381       slack = -1;
1382
1383       pos = n->messages;
1384       /* note that we use "*2" here because we want to look
1385          a bit further into the future; much more makes no
1386          sense since new message might be scheduled in the
1387          meantime... */
1388       while ((pos != NULL) && (off < size * 2))
1389         {
1390           if (pos->do_transmit == GNUNET_YES)
1391             {
1392               /* already removed from consideration */
1393               pos = pos->next;
1394               continue;
1395             }
1396           if (discard_low_prio == GNUNET_NO)
1397             {
1398               delta = pos->deadline.value;
1399               if (delta < t.value)
1400                 delta = 0;
1401               else
1402                 delta = t.value - delta;
1403               avail += delta * n->bpm_out / 1000 / 60;
1404               if (avail < pos->size)
1405                 {
1406                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1407                 }
1408               else
1409                 {
1410                   avail -= pos->size;
1411                   /* update slack, considering both its absolute deadline
1412                      and relative deadlines caused by other messages
1413                      with their respective load */
1414                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1415                   if (pos->deadline.value < now.value)
1416                     slack = 0;
1417                   else
1418                     slack =
1419                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1420                 }
1421             }
1422           off += pos->size;
1423           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1424           if (pos->priority <= min_prio)
1425             {
1426               /* update min for discard */
1427               min_prio = pos->priority;
1428               min = pos;
1429             }
1430           pos = pos->next;
1431         }
1432       if (discard_low_prio)
1433         {
1434           GNUNET_assert (min != NULL);
1435           /* remove lowest-priority entry from consideration */
1436           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1437         }
1438       last = pos;
1439     }
1440   /* guard against sending "tiny" messages with large headers without
1441      urgent deadlines */
1442   if ( (slack > 1000) && (size > 4 * off) )
1443     {
1444       /* less than 25% of message would be filled with
1445          deadlines still being met if we delay by one
1446          second or more; so just wait for more data */
1447       retry_time->value = slack / 2;
1448       /* reset do_transmit values for next time */
1449       while (pos != last)
1450         {
1451           pos->do_transmit = GNUNET_NO;
1452           pos = pos->next;
1453         }
1454       return 0;
1455     }
1456   /* select marked messages (up to size) for transmission */
1457   off = 0;
1458   pos = n->messages;
1459   while (pos != last)
1460     {
1461       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1462         {
1463           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1464           off += pos->size;
1465           size -= pos->size;
1466         }
1467       else
1468         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1469       pos = pos->next;
1470     }
1471 #if DEBUG_CORE
1472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1474               off, GNUNET_i2s (&n->peer));
1475 #endif
1476   return off;
1477 }
1478
1479
1480 /**
1481  * Batch multiple messages into a larger buffer.
1482  *
1483  * @param n neighbour to take messages from
1484  * @param buf target buffer
1485  * @param size size of buf
1486  * @param deadline set to transmission deadline for the result
1487  * @param retry_time set to the time when we should try again
1488  *        (only valid if this function returns zero)
1489  * @param priority set to the priority of the batch
1490  * @return number of bytes written to buf (can be zero)
1491  */
1492 static size_t
1493 batch_message (struct Neighbour *n,
1494                char *buf,
1495                size_t size,
1496                struct GNUNET_TIME_Absolute *deadline,
1497                struct GNUNET_TIME_Relative *retry_time,
1498                unsigned int *priority)
1499 {
1500   struct MessageEntry *pos;
1501   struct MessageEntry *prev;
1502   struct MessageEntry *next;
1503   size_t ret;
1504
1505   ret = 0;
1506   *priority = 0;
1507   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1508   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1509   if (0 == select_messages (n, size, retry_time))
1510     {
1511       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1512                   "No messages selected, will try again in %llu ms\n",
1513                   retry_time->value);
1514       return 0;
1515     }
1516   pos = n->messages;
1517   prev = NULL;
1518   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1519     {
1520       next = pos->next;
1521       if (GNUNET_YES == pos->do_transmit)
1522         {
1523           GNUNET_assert (pos->size <= size);
1524           memcpy (&buf[ret], &pos[1], pos->size);
1525           ret += pos->size;
1526           size -= pos->size;
1527           *priority += pos->priority;
1528 #if DEBUG_CORE
1529           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530                       "Adding plaintext message with deadline %llu ms to batch\n",
1531                       GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
1532 #endif
1533           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1534           GNUNET_free (pos);
1535           if (prev == NULL)
1536             n->messages = next;
1537           else
1538             prev->next = next;
1539         }
1540       else
1541         {
1542           prev = pos;
1543         }
1544       pos = next;
1545     }
1546 #if DEBUG_CORE
1547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548               "Deadline for message batch is %llu ms\n",
1549               GNUNET_TIME_absolute_get_remaining (*deadline).value);
1550 #endif
1551   return ret;
1552 }
1553
1554
1555 /**
1556  * Remove messages with deadlines that have long expired from
1557  * the queue.
1558  *
1559  * @param n neighbour to inspect
1560  */
1561 static void
1562 discard_expired_messages (struct Neighbour *n)
1563 {
1564   struct MessageEntry *prev;
1565   struct MessageEntry *next;
1566   struct MessageEntry *pos;
1567   struct GNUNET_TIME_Absolute now;
1568   struct GNUNET_TIME_Relative delta;
1569
1570   now = GNUNET_TIME_absolute_get ();
1571   prev = NULL;
1572   pos = n->messages;
1573   while (pos != NULL) 
1574     {
1575       next = pos->next;
1576       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1577       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1578         {
1579 #if DEBUG_CORE
1580           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1581                       "Message is %llu ms past due, discarding.\n",
1582                       delta.value);
1583 #endif
1584           if (prev == NULL)
1585             n->messages = next;
1586           else
1587             prev->next = next;
1588           GNUNET_free (pos);
1589         }
1590       else
1591         prev = pos;
1592       pos = next;
1593     }
1594 }
1595
1596
1597 /**
1598  * Signature of the main function of a task.
1599  *
1600  * @param cls closure
1601  * @param tc context information (why was this task triggered now)
1602  */
1603 static void
1604 retry_plaintext_processing (void *cls,
1605                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1606 {
1607   struct Neighbour *n = cls;
1608
1609   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1610   process_plaintext_neighbour_queue (n);
1611 }
1612
1613
1614 /**
1615  * Send our key (and encrypted PING) to the other peer.
1616  *
1617  * @param n the other peer
1618  */
1619 static void send_key (struct Neighbour *n);
1620
1621
1622 /**
1623  * Check if we have plaintext messages for the specified neighbour
1624  * pending, and if so, consider batching and encrypting them (and
1625  * then trigger processing of the encrypted queue if needed).
1626  *
1627  * @param n neighbour to check.
1628  */
1629 static void
1630 process_plaintext_neighbour_queue (struct Neighbour *n)
1631 {
1632   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1633   size_t used;
1634   size_t esize;
1635   struct EncryptedMessage *em;  /* encrypted message */
1636   struct EncryptedMessage *ph;  /* plaintext header */
1637   struct MessageEntry *me;
1638   unsigned int priority;
1639   struct GNUNET_TIME_Absolute deadline;
1640   struct GNUNET_TIME_Relative retry_time;
1641
1642   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1643     {
1644       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1645       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1646     }
1647   switch (n->status)
1648     {
1649     case PEER_STATE_DOWN:
1650       send_key (n);
1651 #if DEBUG_CORE
1652       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1654                   GNUNET_i2s(&n->peer));
1655 #endif
1656       return;
1657     case PEER_STATE_KEY_SENT:
1658       GNUNET_assert (n->retry_set_key_task !=
1659                      GNUNET_SCHEDULER_NO_TASK);
1660 #if DEBUG_CORE
1661       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1663                   GNUNET_i2s(&n->peer));
1664 #endif
1665       return;
1666     case PEER_STATE_KEY_RECEIVED:
1667       GNUNET_assert (n->retry_set_key_task !=
1668                      GNUNET_SCHEDULER_NO_TASK);
1669 #if DEBUG_CORE
1670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1672                   GNUNET_i2s(&n->peer));
1673 #endif
1674       return;
1675     case PEER_STATE_KEY_CONFIRMED:
1676       /* ready to continue */
1677       break;
1678     }
1679   discard_expired_messages (n);
1680   if (n->messages == NULL)
1681     {
1682 #if DEBUG_CORE
1683       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1684                   "Plaintext message queue for `%4s' is empty.\n",
1685                   GNUNET_i2s(&n->peer));
1686 #endif
1687       return;                   /* no pending messages */
1688     }
1689   if (n->encrypted_head != NULL)
1690     {
1691 #if DEBUG_CORE
1692       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1693                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1694                   GNUNET_i2s(&n->peer));
1695 #endif
1696       return;                   /* wait for messages already encrypted to be
1697                                    processed first! */
1698     }
1699   ph = (struct EncryptedMessage *) pbuf;
1700   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1701   priority = 0;
1702   used = sizeof (struct EncryptedMessage);
1703   used += batch_message (n,
1704                          &pbuf[used],
1705                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1706                          &deadline, &retry_time, &priority);
1707   if (used == sizeof (struct EncryptedMessage))
1708     {
1709 #if DEBUG_CORE
1710       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1712                   GNUNET_i2s(&n->peer));
1713 #endif
1714       /* no messages selected for sending, try again later... */
1715       n->retry_plaintext_task =
1716         GNUNET_SCHEDULER_add_delayed (sched,
1717                                       retry_time,
1718                                       &retry_plaintext_processing, n);
1719       return;
1720     }
1721   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1722   ph->inbound_bpm_limit = htonl (n->bpm_in);
1723   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1724
1725   /* setup encryption message header */
1726   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1727   me->deadline = deadline;
1728   me->priority = priority;
1729   me->size = used;
1730   em = (struct EncryptedMessage *) &me[1];
1731   em->header.size = htons (used);
1732   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1733   em->reserved = htonl (0);
1734   esize = used - ENCRYPTED_HEADER_SIZE;
1735   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1736   /* encrypt */
1737 #if DEBUG_CORE
1738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739               "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
1740               esize,
1741               GNUNET_i2s(&n->peer),
1742               (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
1743 #endif
1744   GNUNET_assert (GNUNET_OK ==
1745                  do_encrypt (n,
1746                              &em->plaintext_hash,
1747                              &ph->sequence_number,
1748                              &em->sequence_number, esize));
1749   /* append to transmission list */
1750   if (n->encrypted_tail == NULL)
1751     n->encrypted_head = me;
1752   else
1753     n->encrypted_tail->next = me;
1754   n->encrypted_tail = me;
1755   process_encrypted_neighbour_queue (n);
1756 }
1757
1758
1759 /**
1760  * Handle CORE_SEND request.
1761  *
1762  * @param cls unused
1763  * @param client the client issuing the request
1764  * @param message the "struct SendMessage"
1765  */
1766 static void
1767 handle_client_send (void *cls,
1768                     struct GNUNET_SERVER_Client *client,
1769                     const struct GNUNET_MessageHeader *message);
1770
1771
1772 /**
1773  * Function called to notify us that we either succeeded
1774  * or failed to connect (at the transport level) to another
1775  * peer.  We should either free the message we were asked
1776  * to transmit or re-try adding it to the queue.
1777  *
1778  * @param cls closure
1779  * @param size number of bytes available in buf
1780  * @param buf where the callee should write the message
1781  * @return number of bytes written to buf
1782  */
1783 static size_t
1784 send_connect_continuation (void *cls, size_t size, void *buf)
1785 {
1786   struct SendMessage *sm = cls;
1787
1788   if (buf == NULL)
1789     {
1790 #if DEBUG_CORE
1791       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1792                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1793                   GNUNET_i2s (&sm->peer));
1794 #endif
1795       GNUNET_free (sm);
1796       return 0;
1797     }
1798 #if DEBUG_CORE
1799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1801               GNUNET_i2s (&sm->peer));
1802 #endif
1803   handle_client_send (NULL, NULL, &sm->header);
1804   GNUNET_free (sm);
1805   return 0;
1806 }
1807
1808
1809 /**
1810  * Handle CORE_SEND request.
1811  *
1812  * @param cls unused
1813  * @param client the client issuing the request
1814  * @param message the "struct SendMessage"
1815  */
1816 static void
1817 handle_client_send (void *cls,
1818                     struct GNUNET_SERVER_Client *client,
1819                     const struct GNUNET_MessageHeader *message)
1820 {
1821   const struct SendMessage *sm;
1822   struct SendMessage *smc;
1823   const struct GNUNET_MessageHeader *mh;
1824   struct Neighbour *n;
1825   struct MessageEntry *prev;
1826   struct MessageEntry *pos;
1827   struct MessageEntry *e; 
1828   struct MessageEntry *min_prio_entry;
1829   struct MessageEntry *min_prio_prev;
1830   unsigned int min_prio;
1831   unsigned int queue_size;
1832   uint16_t msize;
1833
1834   msize = ntohs (message->size);
1835   if (msize <
1836       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1837     {
1838       GNUNET_break (0);
1839       if (client != NULL)
1840         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1841       return;
1842     }
1843   sm = (const struct SendMessage *) message;
1844   msize -= sizeof (struct SendMessage);
1845   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1846   if (msize != ntohs (mh->size))
1847     {
1848       GNUNET_break (0);
1849       if (client != NULL)
1850         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1851       return;
1852     }
1853   n = find_neighbour (&sm->peer);
1854   if (n == NULL)
1855     {
1856 #if DEBUG_CORE
1857       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1859                   "SEND",
1860                   GNUNET_i2s (&sm->peer),
1861                   GNUNET_TIME_absolute_get_remaining
1862                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1863 #endif
1864       msize += sizeof (struct SendMessage);
1865       /* ask transport to connect to the peer */
1866       smc = GNUNET_malloc (msize);
1867       memcpy (smc, sm, msize);
1868       if (NULL ==
1869           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1870                                                   &sm->peer,
1871                                                   0, 0,
1872                                                   GNUNET_TIME_absolute_get_remaining
1873                                                   (GNUNET_TIME_absolute_ntoh
1874                                                    (sm->deadline)),
1875                                                   &send_connect_continuation,
1876                                                   smc))
1877         {
1878           /* transport has already a request pending for this peer! */
1879 #if DEBUG_CORE
1880           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1881                       "Dropped second message destined for `%4s' since connection is still down.\n",
1882                       GNUNET_i2s(&sm->peer));
1883 #endif
1884           GNUNET_free (smc);
1885         }
1886       if (client != NULL)
1887         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1888       return;
1889     }
1890 #if DEBUG_CORE
1891   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1892               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1893               "SEND",
1894               msize, 
1895               GNUNET_i2s (&sm->peer));
1896 #endif
1897   /* bound queue size */
1898   discard_expired_messages (n);
1899   min_prio = (unsigned int) -1;
1900   min_prio_entry = NULL;
1901   min_prio_prev = NULL;
1902   queue_size = 0;
1903   prev = NULL;
1904   pos = n->messages;
1905   while (pos != NULL) 
1906     {
1907       if (pos->priority < min_prio)
1908         {
1909           min_prio_entry = pos;
1910           min_prio_prev = prev;
1911           min_prio = pos->priority;
1912         }
1913       queue_size++;
1914       prev = pos;
1915       pos = pos->next;
1916     }
1917   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1918     {
1919       /* queue full */
1920       if (ntohl(sm->priority) <= min_prio)
1921         {
1922           /* discard new entry */
1923 #if DEBUG_CORE
1924           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1925                       "Queue full, discarding new request\n");
1926 #endif
1927           if (client != NULL)
1928             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1929           return;
1930         }
1931       /* discard "min_prio_entry" */
1932 #if DEBUG_CORE
1933       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1934                   "Queue full, discarding existing older request\n");
1935 #endif
1936       if (min_prio_prev == NULL)
1937         n->messages = min_prio_entry->next;
1938       else
1939         min_prio_prev->next = min_prio_entry->next;      
1940       GNUNET_free (min_prio_entry);     
1941     }
1942
1943 #if DEBUG_CORE
1944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1945               "Adding transmission request for `%4s' to queue\n",
1946               GNUNET_i2s (&sm->peer));
1947 #endif  
1948   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1949   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1950   e->priority = ntohl (sm->priority);
1951   e->size = msize;
1952   memcpy (&e[1], mh, msize);
1953
1954   /* insert, keep list sorted by deadline */
1955   prev = NULL;
1956   pos = n->messages;
1957   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1958     {
1959       prev = pos;
1960       pos = pos->next;
1961     }
1962   if (prev == NULL)
1963     n->messages = e;
1964   else
1965     prev->next = e;
1966   e->next = pos;
1967
1968   /* consider scheduling now */
1969   process_plaintext_neighbour_queue (n);
1970   if (client != NULL)
1971     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1972 }
1973
1974
1975 /**
1976  * List of handlers for the messages understood by this
1977  * service.
1978  */
1979 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1980   {&handle_client_init, NULL,
1981    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1982   {&handle_client_request_configure, NULL,
1983    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1984    sizeof (struct RequestConfigureMessage)},
1985   {&handle_client_send, NULL,
1986    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1987   {NULL, NULL, 0, 0}
1988 };
1989
1990
1991 /**
1992  * PEERINFO is giving us a HELLO for a peer.  Add the
1993  * public key to the neighbour's struct and retry
1994  * send_key.  Or, if we did not get a HELLO, just do
1995  * nothing.
1996  *
1997  * @param cls NULL
1998  * @param peer the peer for which this is the HELLO
1999  * @param hello HELLO message of that peer
2000  * @param trust amount of trust we currently have in that peer
2001  */
2002 static void
2003 process_hello_retry_send_key (void *cls,
2004                               const struct GNUNET_PeerIdentity *peer,
2005                               const struct GNUNET_HELLO_Message *hello,
2006                               uint32_t trust)
2007 {
2008   struct Neighbour *n;
2009
2010   if (peer == NULL)
2011     return;
2012   n = find_neighbour (peer);
2013   if (n == NULL)
2014     return;
2015   if (n->public_key != NULL)
2016     return;
2017 #if DEBUG_CORE
2018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2019               "Received new `%s' message for `%4s', initiating key exchange.\n",
2020               "HELLO",
2021               GNUNET_i2s (peer));
2022 #endif
2023   n->public_key =
2024     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2025   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2026     {
2027       GNUNET_free (n->public_key);
2028       n->public_key = NULL;
2029       return;
2030     }
2031   send_key (n);
2032 }
2033
2034
2035 /**
2036  * Task that will retry "send_key" if our previous attempt failed
2037  * to yield a PONG.
2038  */
2039 static void
2040 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2041 {
2042   struct Neighbour *n = cls;
2043
2044   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2045   n->set_key_retry_frequency =
2046     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2047   send_key (n);
2048 }
2049
2050
2051 /**
2052  * Send our key (and encrypted PING) to the other peer.
2053  *
2054  * @param n the other peer
2055  */
2056 static void
2057 send_key (struct Neighbour *n)
2058 {
2059   struct SetKeyMessage *sm;
2060   struct MessageEntry *me;
2061   struct PingMessage pp;
2062   struct PingMessage *pm;
2063
2064 #if DEBUG_CORE
2065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2066               "Asked to perform key exchange with `%4s'.\n",
2067               GNUNET_i2s (&n->peer));
2068 #endif
2069   if (n->public_key == NULL)
2070     {
2071       /* lookup n's public key, then try again */
2072 #if DEBUG_CORE
2073       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2074                   "Lacking public key for `%4s', trying to obtain one.\n",
2075                   GNUNET_i2s (&n->peer));
2076 #endif
2077       GNUNET_PEERINFO_for_all (cfg,
2078                                sched,
2079                                &n->peer,
2080                                0,
2081                                GNUNET_TIME_UNIT_MINUTES,
2082                                &process_hello_retry_send_key, NULL);
2083       return;
2084     }
2085   /* first, set key message */
2086   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2087                       sizeof (struct SetKeyMessage));
2088   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2089   me->priority = SET_KEY_PRIORITY;
2090   me->size = sizeof (struct SetKeyMessage);
2091   if (n->encrypted_head == NULL)
2092     n->encrypted_head = me;
2093   else
2094     n->encrypted_tail->next = me;
2095   n->encrypted_tail = me;
2096   sm = (struct SetKeyMessage *) &me[1];
2097   sm->header.size = htons (sizeof (struct SetKeyMessage));
2098   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2099   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2100                                         PEER_STATE_KEY_SENT : n->status));
2101   sm->purpose.size =
2102     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2103            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2104            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2105            sizeof (struct GNUNET_PeerIdentity));
2106   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2107   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2108   sm->target = n->peer;
2109   GNUNET_assert (GNUNET_OK ==
2110                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2111                                             sizeof (struct
2112                                                     GNUNET_CRYPTO_AesSessionKey),
2113                                             n->public_key,
2114                                             &sm->encrypted_key));
2115   GNUNET_assert (GNUNET_OK ==
2116                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2117                                          &sm->signature));
2118
2119   /* second, encrypted PING message */
2120   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2121                       sizeof (struct PingMessage));
2122   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2123   me->priority = PING_PRIORITY;
2124   me->size = sizeof (struct PingMessage);
2125   n->encrypted_tail->next = me;
2126   n->encrypted_tail = me;
2127   pm = (struct PingMessage *) &me[1];
2128   pm->header.size = htons (sizeof (struct PingMessage));
2129   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2130   pp.challenge = htonl (n->ping_challenge);
2131   pp.target = n->peer;
2132 #if DEBUG_CORE
2133   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2135               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2138               "PING",
2139               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2140 #endif
2141   do_encrypt (n,
2142               &n->peer.hashPubKey,
2143               &pp.challenge,
2144               &pm->challenge,
2145               sizeof (struct PingMessage) -
2146               sizeof (struct GNUNET_MessageHeader));
2147   /* update status */
2148   switch (n->status)
2149     {
2150     case PEER_STATE_DOWN:
2151       n->status = PEER_STATE_KEY_SENT;
2152       break;
2153     case PEER_STATE_KEY_SENT:
2154       break;
2155     case PEER_STATE_KEY_RECEIVED:
2156       break;
2157     case PEER_STATE_KEY_CONFIRMED:
2158       break;
2159     default:
2160       GNUNET_break (0);
2161       break;
2162     }
2163 #if DEBUG_CORE
2164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2165               "Have %llu ms left for `%s' transmission.\n",
2166               (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
2167               "SET_KEY");
2168 #endif
2169   /* trigger queue processing */
2170   process_encrypted_neighbour_queue (n);
2171   if (n->status != PEER_STATE_KEY_CONFIRMED)
2172     n->retry_set_key_task
2173       = GNUNET_SCHEDULER_add_delayed (sched,
2174                                       n->set_key_retry_frequency,
2175                                       &set_key_retry_task, n);
2176 }
2177
2178
2179 /**
2180  * We received a SET_KEY message.  Validate and update
2181  * our key material and status.
2182  *
2183  * @param n the neighbour from which we received message m
2184  * @param m the set key message we received
2185  */
2186 static void
2187 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2188
2189
2190 /**
2191  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2192  * the neighbour's struct and retry handling the set_key message.  Or,
2193  * if we did not get a HELLO, just free the set key message.
2194  *
2195  * @param cls pointer to the set key message
2196  * @param peer the peer for which this is the HELLO
2197  * @param hello HELLO message of that peer
2198  * @param trust amount of trust we currently have in that peer
2199  */
2200 static void
2201 process_hello_retry_handle_set_key (void *cls,
2202                                     const struct GNUNET_PeerIdentity *peer,
2203                                     const struct GNUNET_HELLO_Message *hello,
2204                                     uint32_t trust)
2205 {
2206   struct SetKeyMessage *sm = cls;
2207   struct Neighbour *n;
2208
2209   if (peer == NULL)
2210     {
2211       GNUNET_free (sm);
2212       return;
2213     }
2214   n = find_neighbour (peer);
2215   if (n == NULL)
2216     {
2217       GNUNET_break (0);
2218       return;
2219     }
2220   if (n->public_key != NULL)
2221     return;                     /* multiple HELLOs match!? */
2222   n->public_key =
2223     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2224   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2225     {
2226       GNUNET_break_op (0);
2227       GNUNET_free (n->public_key);
2228       n->public_key = NULL;
2229       return;
2230     }
2231 #if DEBUG_CORE
2232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2233               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2234               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2235 #endif
2236   handle_set_key (n, sm);
2237 }
2238
2239
2240 /**
2241  * We received a PING message.  Validate and transmit
2242  * PONG.
2243  *
2244  * @param n sender of the PING
2245  * @param m the encrypted PING message itself
2246  */
2247 static void
2248 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2249 {
2250   struct PingMessage t;
2251   struct PingMessage *tp;
2252   struct MessageEntry *me;
2253
2254 #if DEBUG_CORE
2255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2256               "Core service receives `%s' request from `%4s'.\n",
2257               "PING", GNUNET_i2s (&n->peer));
2258 #endif
2259   if (GNUNET_OK !=
2260       do_decrypt (n,
2261                   &my_identity.hashPubKey,
2262                   &m->challenge,
2263                   &t.challenge,
2264                   sizeof (struct PingMessage) -
2265                   sizeof (struct GNUNET_MessageHeader)))
2266     return;
2267 #if DEBUG_CORE
2268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2269               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2270               "PING",
2271               GNUNET_i2s (&t.target),
2272               ntohl (t.challenge), n->decrypt_key.crc32);
2273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2274               "Target of `%s' request is `%4s'.\n",
2275               "PING", GNUNET_i2s (&t.target));
2276 #endif
2277   if (0 != memcmp (&t.target,
2278                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2279     {
2280       GNUNET_break_op (0);
2281       return;
2282     }
2283   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2284                       sizeof (struct PingMessage));
2285   if (n->encrypted_tail != NULL)
2286     n->encrypted_tail->next = me;
2287   else
2288     {
2289       n->encrypted_tail = me;
2290       n->encrypted_head = me;
2291     }
2292   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2293   me->priority = PONG_PRIORITY;
2294   me->size = sizeof (struct PingMessage);
2295   tp = (struct PingMessage *) &me[1];
2296   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2297   tp->header.size = htons (sizeof (struct PingMessage));
2298   do_encrypt (n,
2299               &my_identity.hashPubKey,
2300               &t.challenge,
2301               &tp->challenge,
2302               sizeof (struct PingMessage) -
2303               sizeof (struct GNUNET_MessageHeader));
2304 #if DEBUG_CORE
2305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2307               ntohl (t.challenge), n->encrypt_key.crc32);
2308 #endif
2309   /* trigger queue processing */
2310   process_encrypted_neighbour_queue (n);
2311 }
2312
2313
2314 /**
2315  * We received a SET_KEY message.  Validate and update
2316  * our key material and status.
2317  *
2318  * @param n the neighbour from which we received message m
2319  * @param m the set key message we received
2320  */
2321 static void
2322 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2323 {
2324   struct SetKeyMessage *m_cpy;
2325   struct GNUNET_TIME_Absolute t;
2326   struct GNUNET_CRYPTO_AesSessionKey k;
2327   struct PingMessage *ping;
2328   enum PeerStateMachine sender_status;
2329
2330 #if DEBUG_CORE
2331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2332               "Core service receives `%s' request from `%4s'.\n",
2333               "SET_KEY", GNUNET_i2s (&n->peer));
2334 #endif
2335   if (n->public_key == NULL)
2336     {
2337 #if DEBUG_CORE
2338       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2339                   "Lacking public key for peer, trying to obtain one.\n");
2340 #endif
2341       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2342       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2343       /* lookup n's public key, then try again */
2344       GNUNET_PEERINFO_for_all (cfg,
2345                                sched,
2346                                &n->peer,
2347                                0,
2348                                GNUNET_TIME_UNIT_MINUTES,
2349                                &process_hello_retry_handle_set_key, m_cpy);
2350       return;
2351     }
2352   if ((ntohl (m->purpose.size) !=
2353        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2354        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2355        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2356        sizeof (struct GNUNET_PeerIdentity)) ||
2357       (GNUNET_OK !=
2358        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2359                                  &m->purpose, &m->signature, n->public_key)))
2360     {
2361       /* invalid signature */
2362       GNUNET_break_op (0);
2363       return;
2364     }
2365   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2366   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2367        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2368       (t.value < n->decrypt_key_created.value))
2369     {
2370       /* this could rarely happen due to massive re-ordering of
2371          messages on the network level, but is most likely either
2372          a bug or some adversary messing with us.  Report. */
2373       GNUNET_break_op (0);
2374       return;
2375     }
2376 #if DEBUG_CORE
2377   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2378 #endif  
2379   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2380                                   &m->encrypted_key,
2381                                   &k,
2382                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2383        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2384       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2385     {
2386       /* failed to decrypt !? */
2387       GNUNET_break_op (0);
2388       return;
2389     }
2390
2391   n->decrypt_key = k;
2392   if (n->decrypt_key_created.value != t.value)
2393     {
2394       /* fresh key, reset sequence numbers */
2395       n->last_sequence_number_received = 0;
2396       n->last_packets_bitmap = 0;
2397       n->decrypt_key_created = t;
2398     }
2399   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2400   switch (n->status)
2401     {
2402     case PEER_STATE_DOWN:
2403       n->status = PEER_STATE_KEY_RECEIVED;
2404 #if DEBUG_CORE
2405       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2406                   "Responding to `%s' with my own key.\n", "SET_KEY");
2407 #endif
2408       send_key (n);
2409       break;
2410     case PEER_STATE_KEY_SENT:
2411     case PEER_STATE_KEY_RECEIVED:
2412       n->status = PEER_STATE_KEY_RECEIVED;
2413       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2414           (sender_status != PEER_STATE_KEY_CONFIRMED))
2415         {
2416 #if DEBUG_CORE
2417           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2418                       "Responding to `%s' with my own key (other peer has status %u).\n",
2419                       "SET_KEY", sender_status);
2420 #endif
2421           send_key (n);
2422         }
2423       break;
2424     case PEER_STATE_KEY_CONFIRMED:
2425       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2426           (sender_status != PEER_STATE_KEY_CONFIRMED))
2427         {         
2428 #if DEBUG_CORE
2429           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2431                       "SET_KEY", sender_status);
2432 #endif
2433           send_key (n);
2434         }
2435       break;
2436     default:
2437       GNUNET_break (0);
2438       break;
2439     }
2440   if (n->pending_ping != NULL)
2441     {
2442       ping = n->pending_ping;
2443       n->pending_ping = NULL;
2444       handle_ping (n, ping);
2445       GNUNET_free (ping);
2446     }
2447 }
2448
2449
2450 /**
2451  * We received a PONG message.  Validate and update
2452  * our status.
2453  *
2454  * @param n sender of the PONG
2455  * @param m the encrypted PONG message itself
2456  */
2457 static void
2458 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2459 {
2460   struct PingMessage t;
2461
2462 #if DEBUG_CORE
2463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2464               "Core service receives `%s' request from `%4s'.\n",
2465               "PONG", GNUNET_i2s (&n->peer));
2466 #endif
2467   if (GNUNET_OK !=
2468       do_decrypt (n,
2469                   &n->peer.hashPubKey,
2470                   &m->challenge,
2471                   &t.challenge,
2472                   sizeof (struct PingMessage) -
2473                   sizeof (struct GNUNET_MessageHeader)))
2474     return;
2475 #if DEBUG_CORE
2476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2478               "PONG",
2479               GNUNET_i2s (&t.target),
2480               ntohl (t.challenge), n->decrypt_key.crc32);
2481 #endif
2482   if ((0 != memcmp (&t.target,
2483                     &n->peer,
2484                     sizeof (struct GNUNET_PeerIdentity))) ||
2485       (n->ping_challenge != ntohl (t.challenge)))
2486     {
2487       /* PONG malformed */
2488 #if DEBUG_CORE
2489       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2490                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2491                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2492       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2493                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2494                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2495 #endif
2496       GNUNET_break_op (0);
2497       return;
2498     }
2499   switch (n->status)
2500     {
2501     case PEER_STATE_DOWN:
2502       GNUNET_break (0);         /* should be impossible */
2503       return;
2504     case PEER_STATE_KEY_SENT:
2505       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2506       return;
2507     case PEER_STATE_KEY_RECEIVED:
2508       n->status = PEER_STATE_KEY_CONFIRMED;
2509       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2510         {
2511           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2512           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2513         }
2514       process_encrypted_neighbour_queue (n);
2515       break;
2516     case PEER_STATE_KEY_CONFIRMED:
2517       /* duplicate PONG? */
2518       break;
2519     default:
2520       GNUNET_break (0);
2521       break;
2522     }
2523 }
2524
2525
2526 /**
2527  * Send a P2P message to a client.
2528  *
2529  * @param sender who sent us the message?
2530  * @param client who should we give the message to?
2531  * @param m contains the message to transmit
2532  * @param msize number of bytes in buf to transmit
2533  */
2534 static void
2535 send_p2p_message_to_client (struct Neighbour *sender,
2536                             struct Client *client,
2537                             const void *m, size_t msize)
2538 {
2539   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2540   struct NotifyTrafficMessage *ntm;
2541
2542 #if DEBUG_CORE
2543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2544               "Core service passes message from `%4s' of type %u to client.\n",
2545               GNUNET_i2s(&sender->peer),
2546               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2547 #endif
2548   ntm = (struct NotifyTrafficMessage *) buf;
2549   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2550   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2551   ntm->reserved = htonl (0);
2552   ntm->peer = sender->peer;
2553   memcpy (&ntm[1], m, msize);
2554   send_to_client (client, &ntm->header, GNUNET_YES);
2555 }
2556
2557
2558 /**
2559  * Deliver P2P message to interested clients.
2560  *
2561  * @param sender who sent us the message?
2562  * @param m the message
2563  * @param msize size of the message (including header)
2564  */
2565 static void
2566 deliver_message (struct Neighbour *sender,
2567                  const struct GNUNET_MessageHeader *m, size_t msize)
2568 {
2569   struct Client *cpos;
2570   uint16_t type;
2571   unsigned int tpos;
2572   int deliver_full;
2573
2574   type = ntohs (m->type);
2575   cpos = clients;
2576   while (cpos != NULL)
2577     {
2578       deliver_full = GNUNET_NO;
2579       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2580         deliver_full = GNUNET_YES;
2581       else
2582         {
2583           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2584             {
2585               if (type != cpos->types[tpos])
2586                 continue;
2587               deliver_full = GNUNET_YES;
2588               break;
2589             }
2590         }
2591       if (GNUNET_YES == deliver_full)
2592         send_p2p_message_to_client (sender, cpos, m, msize);
2593       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2594         send_p2p_message_to_client (sender, cpos, m,
2595                                     sizeof (struct GNUNET_MessageHeader));
2596       cpos = cpos->next;
2597     }
2598 }
2599
2600
2601 /**
2602  * Align P2P message and then deliver to interested clients.
2603  *
2604  * @param sender who sent us the message?
2605  * @param buffer unaligned (!) buffer containing message
2606  * @param msize size of the message (including header)
2607  */
2608 static void
2609 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2610 {
2611   char abuf[msize];
2612
2613   /* TODO: call to statistics? */
2614   memcpy (abuf, buffer, msize);
2615   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2616 }
2617
2618
2619 /**
2620  * Deliver P2P messages to interested clients.
2621  *
2622  * @param sender who sent us the message?
2623  * @param buffer buffer containing messages, can be modified
2624  * @param buffer_size size of the buffer (overall)
2625  * @param offset offset where messages in the buffer start
2626  */
2627 static void
2628 deliver_messages (struct Neighbour *sender,
2629                   const char *buffer, size_t buffer_size, size_t offset)
2630 {
2631   struct GNUNET_MessageHeader *mhp;
2632   struct GNUNET_MessageHeader mh;
2633   uint16_t msize;
2634   int need_align;
2635
2636   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2637     {
2638       if (0 != offset % sizeof (uint16_t))
2639         {
2640           /* outch, need to copy to access header */
2641           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2642           mhp = &mh;
2643         }
2644       else
2645         {
2646           /* can access header directly */
2647           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2648         }
2649       msize = ntohs (mhp->size);
2650       if (msize + offset > buffer_size)
2651         {
2652           /* malformed message, header says it is larger than what
2653              would fit into the overall buffer */
2654           GNUNET_break_op (0);
2655           break;
2656         }
2657 #if HAVE_UNALIGNED_64_ACCESS
2658       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2659 #else
2660       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2661 #endif
2662       if (GNUNET_YES == need_align)
2663         align_and_deliver (sender, &buffer[offset], msize);
2664       else
2665         deliver_message (sender,
2666                          (const struct GNUNET_MessageHeader *)
2667                          &buffer[offset], msize);
2668       offset += msize;
2669     }
2670 }
2671
2672
2673 /**
2674  * We received an encrypted message.  Decrypt, validate and
2675  * pass on to the appropriate clients.
2676  */
2677 static void
2678 handle_encrypted_message (struct Neighbour *n,
2679                           const struct EncryptedMessage *m)
2680 {
2681   size_t size = ntohs (m->header.size);
2682   char buf[size];
2683   struct EncryptedMessage *pt;  /* plaintext */
2684   GNUNET_HashCode ph;
2685   size_t off;
2686   uint32_t snum;
2687   struct GNUNET_TIME_Absolute t;
2688
2689 #if DEBUG_CORE
2690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2691               "Core service receives `%s' request from `%4s'.\n",
2692               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2693 #endif  
2694   /* decrypt */
2695   if (GNUNET_OK !=
2696       do_decrypt (n,
2697                   &m->plaintext_hash,
2698                   &m->sequence_number,
2699                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2700     return;
2701   pt = (struct EncryptedMessage *) buf;
2702
2703   /* validate hash */
2704   GNUNET_CRYPTO_hash (&pt->sequence_number,
2705                       size - ENCRYPTED_HEADER_SIZE, &ph);
2706   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2707     {
2708       /* checksum failed */
2709       GNUNET_break_op (0);
2710       return;
2711     }
2712
2713   /* validate sequence number */
2714   snum = ntohl (pt->sequence_number);
2715   if (n->last_sequence_number_received == snum)
2716     {
2717       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2718                   "Received duplicate message, ignoring.\n");
2719       /* duplicate, ignore */
2720       return;
2721     }
2722   if ((n->last_sequence_number_received > snum) &&
2723       (n->last_sequence_number_received - snum > 32))
2724     {
2725       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2726                   "Received ancient out of sequence message, ignoring.\n");
2727       /* ancient out of sequence, ignore */
2728       return;
2729     }
2730   if (n->last_sequence_number_received > snum)
2731     {
2732       unsigned int rotbit =
2733         1 << (n->last_sequence_number_received - snum - 1);
2734       if ((n->last_packets_bitmap & rotbit) != 0)
2735         {
2736           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2737                       "Received duplicate message, ignoring.\n");
2738           /* duplicate, ignore */
2739           return;
2740         }
2741       n->last_packets_bitmap |= rotbit;
2742     }
2743   if (n->last_sequence_number_received < snum)
2744     {
2745       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2746       n->last_sequence_number_received = snum;
2747     }
2748
2749   /* check timestamp */
2750   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2751   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2752     {
2753       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2754                   _
2755                   ("Message received far too old (%llu ms). Content ignored.\n"),
2756                   GNUNET_TIME_absolute_get_duration (t).value);
2757       return;
2758     }
2759
2760   /* process decrypted message(s) */
2761   update_window (GNUNET_YES,
2762                  &n->available_send_window,
2763                  &n->last_asw_update,
2764                  n->bpm_out);
2765   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2766   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2767                            n->bpm_out_internal_limit);
2768   n->last_activity = GNUNET_TIME_absolute_get ();
2769   off = sizeof (struct EncryptedMessage);
2770   deliver_messages (n, buf, size, off);
2771 }
2772
2773
2774 /**
2775  * Function called by the transport for each received message.
2776  *
2777  * @param cls closure
2778  * @param latency estimated latency for communicating with the
2779  *             given peer
2780  * @param peer (claimed) identity of the other peer
2781  * @param message the message
2782  */
2783 static void
2784 handle_transport_receive (void *cls,
2785                           struct GNUNET_TIME_Relative latency,
2786                           const struct GNUNET_PeerIdentity *peer,
2787                           const struct GNUNET_MessageHeader *message)
2788 {
2789   struct Neighbour *n;
2790   struct GNUNET_TIME_Absolute now;
2791   int up;
2792   uint16_t type;
2793   uint16_t size;
2794
2795 #if DEBUG_CORE
2796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797               "Received message of type %u from `%4s', demultiplexing.\n",
2798               ntohs (message->type), GNUNET_i2s (peer));
2799 #endif
2800   n = find_neighbour (peer);
2801   if (n == NULL)
2802     {
2803       GNUNET_break (0);
2804       return;
2805     }
2806   n->last_latency = latency;
2807   up = (n->status == PEER_STATE_KEY_CONFIRMED);
2808   type = ntohs (message->type);
2809   size = ntohs (message->size);
2810   switch (type)
2811     {
2812     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2813       if (size != sizeof (struct SetKeyMessage))
2814         {
2815           GNUNET_break_op (0);
2816           return;
2817         }
2818       handle_set_key (n, (const struct SetKeyMessage *) message);
2819       break;
2820     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2821       if (size < sizeof (struct EncryptedMessage) +
2822           sizeof (struct GNUNET_MessageHeader))
2823         {
2824           GNUNET_break_op (0);
2825           return;
2826         }
2827       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2828           (n->status != PEER_STATE_KEY_CONFIRMED))
2829         {
2830           GNUNET_break_op (0);
2831           return;
2832         }
2833       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2834       break;
2835     case GNUNET_MESSAGE_TYPE_CORE_PING:
2836       if (size != sizeof (struct PingMessage))
2837         {
2838           GNUNET_break_op (0);
2839           return;
2840         }
2841       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2842           (n->status != PEER_STATE_KEY_CONFIRMED))
2843         {
2844 #if DEBUG_CORE
2845           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2846                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2847                       "PING", GNUNET_i2s (&n->peer));
2848 #endif
2849           GNUNET_free_non_null (n->pending_ping);
2850           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2851           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2852           return;
2853         }
2854       handle_ping (n, (const struct PingMessage *) message);
2855       break;
2856     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2857       if (size != sizeof (struct PingMessage))
2858         {
2859           GNUNET_break_op (0);
2860           return;
2861         }
2862       if ((n->status != PEER_STATE_KEY_SENT) &&
2863           (n->status != PEER_STATE_KEY_RECEIVED) &&
2864           (n->status != PEER_STATE_KEY_CONFIRMED))
2865         {
2866           /* could not decrypt pong, oops! */
2867           GNUNET_break_op (0);
2868           return;
2869         }
2870       handle_pong (n, (const struct PingMessage *) message);
2871       break;
2872     default:
2873       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2874                   _("Unsupported message of type %u received.\n"), type);
2875       return;
2876     }
2877   if (n->status == PEER_STATE_KEY_CONFIRMED)
2878     {
2879       now = GNUNET_TIME_absolute_get ();
2880       n->last_activity = now;
2881       if (!up)
2882         n->time_established = now;
2883     }
2884 }
2885
2886
2887 /**
2888  * Function that recalculates the bandwidth quota for the
2889  * given neighbour and transmits it to the transport service.
2890  * 
2891  * @param cls neighbour for the quota update
2892  * @param tc context
2893  */
2894 static void
2895 neighbour_quota_update (void *cls,
2896                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2897
2898
2899 /**
2900  * Schedule the task that will recalculate the bandwidth
2901  * quota for this peer (and possibly force a disconnect of
2902  * idle peers by calculating a bandwidth of zero).
2903  */
2904 static void
2905 schedule_quota_update (struct Neighbour *n)
2906 {
2907   GNUNET_assert (n->quota_update_task ==
2908                  GNUNET_SCHEDULER_NO_TASK);
2909   n->quota_update_task
2910     = GNUNET_SCHEDULER_add_delayed (sched,
2911                                     QUOTA_UPDATE_FREQUENCY,
2912                                     &neighbour_quota_update,
2913                                     n);
2914 }
2915
2916
2917 /**
2918  * Function that recalculates the bandwidth quota for the
2919  * given neighbour and transmits it to the transport service.
2920  * 
2921  * @param cls neighbour for the quota update
2922  * @param tc context
2923  */
2924 static void
2925 neighbour_quota_update (void *cls,
2926                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2927 {
2928   struct Neighbour *n = cls;
2929   uint32_t q_in;
2930   double pref_rel;
2931   double share;
2932   unsigned long long distributable;
2933   
2934   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2935   /* calculate relative preference among all neighbours;
2936      divides by a bit more to avoid division by zero AND to
2937      account for possibility of new neighbours joining any time 
2938      AND to convert to double... */
2939   pref_rel = n->current_preference / (1.0 + preference_sum);
2940   distributable = 0;
2941   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2942     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2943   share = distributable * pref_rel;
2944   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2945   /* check if we want to disconnect for good due to inactivity */
2946   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2947        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2948     q_in = 0; /* force disconnect */
2949   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2950        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2951     {
2952       n->bpm_in = q_in;
2953       GNUNET_TRANSPORT_set_quota (transport,
2954                                   &n->peer,
2955                                   n->bpm_in, 
2956                                   n->bpm_out,
2957                                   GNUNET_TIME_UNIT_FOREVER_REL,
2958                                   NULL, NULL);
2959     }
2960   schedule_quota_update (n);
2961 }
2962
2963
2964 /**
2965  * Function called by transport to notify us that
2966  * a peer connected to us (on the network level).
2967  *
2968  * @param cls closure
2969  * @param peer the peer that connected
2970  * @param latency current latency of the connection
2971  */
2972 static void
2973 handle_transport_notify_connect (void *cls,
2974                                  const struct GNUNET_PeerIdentity *peer,
2975                                  struct GNUNET_TIME_Relative latency)
2976 {
2977   struct Neighbour *n;
2978   struct GNUNET_TIME_Absolute now;
2979   struct ConnectNotifyMessage cnm;
2980
2981   n = find_neighbour (peer);
2982   if (n != NULL)
2983     {
2984       /* duplicate connect notification!? */
2985       GNUNET_break (0);
2986       return;
2987     }
2988   now = GNUNET_TIME_absolute_get ();
2989   n = GNUNET_malloc (sizeof (struct Neighbour));
2990   n->next = neighbours;
2991   neighbours = n;
2992   neighbour_count++;
2993   n->peer = *peer;
2994   n->last_latency = latency;
2995   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2996   n->encrypt_key_created = now;
2997   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2998   n->last_asw_update = now;
2999   n->last_arw_update = now;
3000   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3001   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3002   n->bpm_out_internal_limit = (uint32_t) - 1;
3003   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
3004   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3005                                                 (uint32_t) - 1);
3006 #if DEBUG_CORE
3007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3008               "Received connection from `%4s'.\n",
3009               GNUNET_i2s (&n->peer));
3010 #endif
3011   schedule_quota_update (n);
3012   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3013   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
3014   cnm.reserved = htonl (0);
3015   cnm.peer = *peer;
3016   send_to_all_clients (&cnm.header, GNUNET_YES);
3017 }
3018
3019
3020 /**
3021  * Free the given entry for the neighbour (it has
3022  * already been removed from the list at this point).
3023  *
3024  * @param n neighbour to free
3025  */
3026 static void
3027 free_neighbour (struct Neighbour *n)
3028 {
3029   struct MessageEntry *m;
3030
3031   while (NULL != (m = n->messages))
3032     {
3033       n->messages = m->next;
3034       GNUNET_free (m);
3035     }
3036   while (NULL != (m = n->encrypted_head))
3037     {
3038       n->encrypted_head = m->next;
3039       GNUNET_free (m);
3040     }
3041   if (NULL != n->th)
3042     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3043   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3044     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3045   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3046     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3047   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3048     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3049   GNUNET_free_non_null (n->public_key);
3050   GNUNET_free_non_null (n->pending_ping);
3051   GNUNET_free (n);
3052 }
3053
3054
3055 /**
3056  * Function called by transport telling us that a peer
3057  * disconnected.
3058  *
3059  * @param cls closure
3060  * @param peer the peer that disconnected
3061  */
3062 static void
3063 handle_transport_notify_disconnect (void *cls,
3064                                     const struct GNUNET_PeerIdentity *peer)
3065 {
3066   struct ConnectNotifyMessage cnm;
3067   struct Neighbour *n;
3068   struct Neighbour *p;
3069
3070 #if DEBUG_CORE
3071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3072               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3073 #endif
3074   p = NULL;
3075   n = neighbours;
3076   while ((n != NULL) &&
3077          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3078     {
3079       p = n;
3080       n = n->next;
3081     }
3082   if (n == NULL)
3083     {
3084       GNUNET_break (0);
3085       return;
3086     }
3087   if (p == NULL)
3088     neighbours = n->next;
3089   else
3090     p->next = n->next;
3091   GNUNET_assert (neighbour_count > 0);
3092   neighbour_count--;
3093   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3094   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3095   cnm.reserved = htonl (0);
3096   cnm.peer = *peer;
3097   send_to_all_clients (&cnm.header, GNUNET_YES);
3098   free_neighbour (n);
3099 }
3100
3101
3102 /**
3103  * Last task run during shutdown.  Disconnects us from
3104  * the transport.
3105  */
3106 static void
3107 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3108 {
3109   struct Neighbour *n;
3110   struct Client *c;
3111
3112 #if DEBUG_CORE
3113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3114               "Core service shutting down.\n");
3115 #endif
3116   GNUNET_assert (transport != NULL);
3117   GNUNET_TRANSPORT_disconnect (transport);
3118   transport = NULL;
3119   while (NULL != (n = neighbours))
3120     {
3121       neighbours = n->next;
3122       GNUNET_assert (neighbour_count > 0);
3123       neighbour_count--;
3124       free_neighbour (n);
3125     }
3126   while (NULL != (c = clients))
3127     handle_client_disconnect (NULL, c->client_handle);
3128   if (my_private_key != NULL)
3129     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3130 }
3131
3132
3133 /**
3134  * Initiate core service.
3135  *
3136  * @param cls closure
3137  * @param s scheduler to use
3138  * @param serv the initialized server
3139  * @param c configuration to use
3140  */
3141 static void
3142 run (void *cls,
3143      struct GNUNET_SCHEDULER_Handle *s,
3144      struct GNUNET_SERVER_Handle *serv,
3145      const struct GNUNET_CONFIGURATION_Handle *c)
3146 {
3147 #if 0
3148   unsigned long long qin;
3149   unsigned long long qout;
3150   unsigned long long tneigh;
3151 #endif
3152   char *keyfile;
3153
3154   sched = s;
3155   cfg = c;
3156   /* parse configuration */
3157   if (
3158        (GNUNET_OK !=
3159         GNUNET_CONFIGURATION_get_value_number (c,
3160                                                "CORE",
3161                                                "TOTAL_QUOTA_IN",
3162                                                &bandwidth_target_in)) ||
3163        (GNUNET_OK !=
3164         GNUNET_CONFIGURATION_get_value_number (c,
3165                                                "CORE",
3166                                                "TOTAL_QUOTA_OUT",
3167                                                &bandwidth_target_out)) ||
3168 #if 0
3169        (GNUNET_OK !=
3170         GNUNET_CONFIGURATION_get_value_number (c,
3171                                                "CORE",
3172                                                "YY",
3173                                                &qout)) ||
3174        (GNUNET_OK !=
3175         GNUNET_CONFIGURATION_get_value_number (c,
3176                                                "CORE",
3177                                                "ZZ_LIMIT", &tneigh)) ||
3178 #endif
3179        (GNUNET_OK !=
3180         GNUNET_CONFIGURATION_get_value_filename (c,
3181                                                  "GNUNETD",
3182                                                  "HOSTKEY", &keyfile)))
3183     {
3184       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3185                   _
3186                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3187       GNUNET_SCHEDULER_shutdown (s);
3188       return;
3189     }
3190   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3191   GNUNET_free (keyfile);
3192   if (my_private_key == NULL)
3193     {
3194       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3195                   _("Core service could not access hostkey.  Exiting.\n"));
3196       GNUNET_SCHEDULER_shutdown (s);
3197       return;
3198     }
3199   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3200   GNUNET_CRYPTO_hash (&my_public_key,
3201                       sizeof (my_public_key), &my_identity.hashPubKey);
3202   /* setup notification */
3203   server = serv;
3204   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3205   /* setup transport connection */
3206   transport = GNUNET_TRANSPORT_connect (sched,
3207                                         cfg,
3208                                         NULL,
3209                                         &handle_transport_receive,
3210                                         &handle_transport_notify_connect,
3211                                         &handle_transport_notify_disconnect);
3212   GNUNET_assert (NULL != transport);
3213   GNUNET_SCHEDULER_add_delayed (sched,
3214                                 GNUNET_TIME_UNIT_FOREVER_REL,
3215                                 &cleaning_task, NULL);
3216   /* process client requests */
3217   GNUNET_SERVER_add_handlers (server, handlers);
3218   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3219               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3220 }
3221
3222
3223
3224 /**
3225  * The main function for the transport service.
3226  *
3227  * @param argc number of arguments from the command line
3228  * @param argv command line arguments
3229  * @return 0 ok, 1 on error
3230  */
3231 int
3232 main (int argc, char *const *argv)
3233 {
3234   return (GNUNET_OK ==
3235           GNUNET_SERVICE_run (argc,
3236                               argv,
3237                               "core",
3238                               GNUNET_SERVICE_OPTION_NONE,
3239                               &run, NULL)) ? 0 : 1;
3240 }
3241
3242 /* end of gnunet-service-core.c */