move-hacknig
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Identity of the neighbour.
364    */
365   struct GNUNET_PeerIdentity peer;
366
367   /**
368    * Key we use to encrypt our messages for the other peer
369    * (initialized by us when we do the handshake).
370    */
371   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
372
373   /**
374    * Key we use to decrypt messages from the other peer
375    * (given to us by the other peer during the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
378
379   /**
380    * ID of task used for re-trying plaintext scheduling.
381    */
382   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
383
384   /**
385    * ID of task used for re-trying SET_KEY and PING message.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
388
389   /**
390    * ID of task used for updating bandwidth quota for this neighbour.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
393
394   /**
395    * At what time did we generate our encryption key?
396    */
397   struct GNUNET_TIME_Absolute encrypt_key_created;
398
399   /**
400    * At what time did the other peer generate the decryption key?
401    */
402   struct GNUNET_TIME_Absolute decrypt_key_created;
403
404   /**
405    * At what time did we initially establish (as in, complete session
406    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
407    */
408   struct GNUNET_TIME_Absolute time_established;
409
410   /**
411    * At what time did we last receive an encrypted message from the
412    * other peer?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute last_activity;
415
416   /**
417    * Last latency observed from this peer.
418    */
419   struct GNUNET_TIME_Relative last_latency;
420
421   /**
422    * At what frequency are we currently re-trying SET_KEY messages?
423    */
424   struct GNUNET_TIME_Relative set_key_retry_frequency;
425
426   /**
427    * Time of our last update to the "available_send_window".
428    */
429   struct GNUNET_TIME_Absolute last_asw_update;
430
431   /**
432    * Time of our last update to the "available_recv_window".
433    */
434   struct GNUNET_TIME_Absolute last_arw_update;
435
436   /**
437    * Number of bytes that we are eligible to transmit to this
438    * peer at this point.  Incremented every minute by max_out_bpm,
439    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
440    * bandwidth-hogs are sampled at a frequency of about 78s!);
441    * may get negative if we have VERY high priority content.
442    */
443   long long available_send_window; 
444
445   /**
446    * How much downstream capacity of this peer has been reserved for
447    * our traffic?  (Our clients can request that a certain amount of
448    * bandwidth is available for replies to them; this value is used to
449    * make sure that this reserved amount of bandwidth is actually
450    * available).
451    */
452   long long available_recv_window; 
453
454   /**
455    * How valueable were the messages of this peer recently?
456    */
457   unsigned long long current_preference;
458
459   /**
460    * Bit map indicating which of the 32 sequence numbers before the last
461    * were received (good for accepting out-of-order packets and
462    * estimating reliability of the connection)
463    */
464   unsigned int last_packets_bitmap;
465
466   /**
467    * Number of messages in the message queue for this peer.
468    */
469   unsigned int message_queue_size;
470
471   /**
472    * last sequence number received on this connection (highest)
473    */
474   uint32_t last_sequence_number_received;
475
476   /**
477    * last sequence number transmitted
478    */
479   uint32_t last_sequence_number_sent;
480
481   /**
482    * Available bandwidth in for this peer (current target).
483    */
484   uint32_t bpm_in;
485
486   /**
487    * Available bandwidth out for this peer (current target).
488    */
489   uint32_t bpm_out;
490
491   /**
492    * Internal bandwidth limit set for this peer (initially
493    * typically set to "-1").  "bpm_out" is MAX of
494    * "bpm_out_internal_limit" and "bpm_out_external_limit".
495    */
496   uint32_t bpm_out_internal_limit;
497
498   /**
499    * External bandwidth limit set for this peer by the
500    * peer that we are communicating with.  "bpm_out" is MAX of
501    * "bpm_out_internal_limit" and "bpm_out_external_limit".
502    */
503   uint32_t bpm_out_external_limit;
504
505   /**
506    * What was our PING challenge number (for this peer)?
507    */
508   uint32_t ping_challenge;
509
510   /**
511    * What is our connection status?
512    */
513   enum PeerStateMachine status;
514
515 };
516
517
518 /**
519  * Events are messages for clients.  The struct
520  * itself is followed by the actual message.
521  */
522 struct Event
523 {
524   /**
525    * This is a linked list.
526    */
527   struct Event *next;
528
529   /**
530    * Size of the message.
531    */
532   size_t size;
533
534   /**
535    * Could this event be dropped if this queue
536    * is getting too large? (NOT YET USED!)
537    */
538   int can_drop;
539
540 };
541
542
543 /**
544  * Data structure for each client connected to the core service.
545  */
546 struct Client
547 {
548   /**
549    * Clients are kept in a linked list.
550    */
551   struct Client *next;
552
553   /**
554    * Handle for the client with the server API.
555    */
556   struct GNUNET_SERVER_Client *client_handle;
557
558   /**
559    * Linked list of messages we still need to deliver to
560    * this client.
561    */
562   struct Event *event_head;
563
564   /**
565    * Tail of the linked list of events.
566    */
567   struct Event *event_tail;
568
569   /**
570    * Current transmit handle, NULL if no transmission request
571    * is pending.
572    */
573   struct GNUNET_NETWORK_TransmitHandle *th;
574
575   /**
576    * Array of the types of messages this peer cares
577    * about (with "tcnt" entries).  Allocated as part
578    * of this client struct, do not free!
579    */
580   uint16_t *types;
581
582   /**
583    * Options for messages this client cares about,
584    * see GNUNET_CORE_OPTION_ values.
585    */
586   uint32_t options;
587
588   /**
589    * Number of types of incoming messages this client
590    * specifically cares about.  Size of the "types" array.
591    */
592   unsigned int tcnt;
593
594 };
595
596
597 /**
598  * Our public key.
599  */
600 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
601
602 /**
603  * Our identity.
604  */
605 static struct GNUNET_PeerIdentity my_identity;
606
607 /**
608  * Our private key.
609  */
610 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
611
612 /**
613  * Our scheduler.
614  */
615 struct GNUNET_SCHEDULER_Handle *sched;
616
617 /**
618  * Our configuration.
619  */
620 const struct GNUNET_CONFIGURATION_Handle *cfg;
621
622 /**
623  * Our server.
624  */
625 static struct GNUNET_SERVER_Handle *server;
626
627 /**
628  * Transport service.
629  */
630 static struct GNUNET_TRANSPORT_Handle *transport;
631
632 /**
633  * Linked list of our clients.
634  */
635 static struct Client *clients;
636
637 /**
638  * We keep neighbours in a linked list (for now).
639  */
640 static struct Neighbour *neighbours;
641
642 /**
643  * Sum of all preferences among all neighbours.
644  */
645 static unsigned long long preference_sum;
646
647 /**
648  * Total number of neighbours we have.
649  */
650 static unsigned int neighbour_count;
651
652 /**
653  * How much inbound bandwidth are we supposed to be using?
654  */
655 static unsigned long long bandwidth_target_in;
656
657 /**
658  * How much outbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_out;
661
662
663
664 /**
665  * A preference value for a neighbour was update.  Update
666  * the preference sum accordingly.
667  *
668  * @param inc how much was a preference value increased?
669  */
670 static void
671 update_preference_sum (unsigned long long inc)
672 {
673   struct Neighbour *n;
674   unsigned long long os;
675
676   os = preference_sum;
677   preference_sum += inc;
678   if (preference_sum >= os)
679     return; /* done! */
680   /* overflow! compensate by cutting all values in half! */
681   preference_sum = 0;
682   n = neighbours;
683   while (n != NULL)
684     {
685       n->current_preference /= 2;
686       preference_sum += n->current_preference;
687       n = n->next;
688     }    
689 }
690
691
692 /**
693  * Recalculate the number of bytes we expect to
694  * receive or transmit in a given window.
695  *
696  * @param force force an update now (even if not much time has passed)
697  * @param window pointer to the byte counter (updated)
698  * @param ts pointer to the timestamp (updated)
699  * @param bpm number of bytes per minute that should
700  *        be added to the window.
701  */
702 static void
703 update_window (int force,
704                long long *window,
705                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
706 {
707   struct GNUNET_TIME_Relative since;
708
709   since = GNUNET_TIME_absolute_get_duration (*ts);
710   if ( (force == GNUNET_NO) &&
711        (since.value < 60 * 1000) )
712     return;                     /* not even a minute has passed */
713   *ts = GNUNET_TIME_absolute_get ();
714   *window += (bpm * since.value) / 60 / 1000;
715   if (*window > MAX_WINDOW_TIME * bpm)
716     *window = MAX_WINDOW_TIME * bpm;
717 }
718
719
720 /**
721  * Find the entry for the given neighbour.
722  *
723  * @param peer identity of the neighbour
724  * @return NULL if we are not connected, otherwise the
725  *         neighbour's entry.
726  */
727 static struct Neighbour *
728 find_neighbour (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct Neighbour *ret;
731
732   ret = neighbours;
733   while ((ret != NULL) &&
734          (0 != memcmp (&ret->peer,
735                        peer, sizeof (struct GNUNET_PeerIdentity))))
736     ret = ret->next;
737   return ret;
738 }
739
740
741 /**
742  * Find the entry for the given client.
743  *
744  * @param client handle for the client
745  * @return NULL if we are not connected, otherwise the
746  *         client's struct.
747  */
748 static struct Client *
749 find_client (const struct GNUNET_SERVER_Client *client)
750 {
751   struct Client *ret;
752
753   ret = clients;
754   while ((ret != NULL) && (client != ret->client_handle))
755     ret = ret->next;
756   return ret;
757 }
758
759
760 /**
761  * If necessary, initiate a request with the server to
762  * transmit messages from the queue of the given client.
763  * @param client who to transfer messages to
764  */
765 static void request_transmit (struct Client *client);
766
767
768 /**
769  * Client is ready to receive data, provide it.
770  *
771  * @param cls closure
772  * @param size number of bytes available in buf
773  * @param buf where the callee should write the message
774  * @return number of bytes written to buf
775  */
776 static size_t
777 do_client_transmit (void *cls, size_t size, void *buf)
778 {
779   struct Client *client = cls;
780   struct Event *e;
781   char *tgt;
782   size_t ret;
783
784   client->th = NULL;
785 #if DEBUG_CORE_CLIENT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Client ready to receive %u bytes.\n", size);
788 #endif
789   if (buf == NULL)
790     {
791 #if DEBUG_CORE
792       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                   "Failed to transmit data to client (disconnect)?\n");
794 #endif
795       return 0;                 /* we'll surely get a disconnect soon... */
796     }
797   tgt = buf;
798   ret = 0;
799   while ((NULL != (e = client->event_head)) && (e->size <= size))
800     {
801       memcpy (&tgt[ret], &e[1], e->size);
802       size -= e->size;
803       ret += e->size;
804       client->event_head = e->next;
805       GNUNET_free (e);
806     }
807   GNUNET_assert (ret > 0);
808   if (client->event_head == NULL)
809     client->event_tail = NULL;
810 #if DEBUG_CORE_CLIENT
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Transmitting %u bytes to client\n", ret);
813 #endif
814   request_transmit (client);
815   return ret;
816 }
817
818
819 /**
820  * If necessary, initiate a request with the server to
821  * transmit messages from the queue of the given client.
822  * @param client who to transfer messages to
823  */
824 static void
825 request_transmit (struct Client *client)
826 {
827
828   if (NULL != client->th)
829     return;                     /* already pending */
830   if (NULL == client->event_head)
831     return;                     /* no more events pending */
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Asking server to transmit %u bytes to client\n",
835               client->event_head->size);
836 #endif
837   client->th
838     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
839                                            client->event_head->size,
840                                            GNUNET_TIME_UNIT_FOREVER_REL,
841                                            &do_client_transmit, client);
842 }
843
844
845 /**
846  * Send a message to one of our clients.
847  * @param client target for the message
848  * @param msg message to transmit
849  * @param can_drop could this message be dropped if the
850  *        client's queue is getting too large?
851  */
852 static void
853 send_to_client (struct Client *client,
854                 const struct GNUNET_MessageHeader *msg, int can_drop)
855 {
856   struct Event *e;
857   unsigned int queue_size;
858   uint16_t msize;
859
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Preparing to send message of type %u to client.\n",
863               ntohs (msg->type));
864 #endif
865   queue_size = 0;
866   e = client->event_head;
867   while (e != NULL)
868     {
869       queue_size++;
870       e = e->next;
871     }
872   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
873        (can_drop == GNUNET_YES) )
874     {
875 #if DEBUG_CORE
876       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
877                   "Too many messages in queue for the client, dropping the new message.\n");
878 #endif
879       return;
880     }
881
882   msize = ntohs (msg->size);
883   e = GNUNET_malloc (sizeof (struct Event) + msize);
884   /* append */
885   if (client->event_tail != NULL)
886     client->event_tail->next = e;
887   else
888     client->event_head = e;
889   client->event_tail = e;
890   e->can_drop = can_drop;
891   e->size = msize;
892   memcpy (&e[1], msg, msize);
893   request_transmit (client);
894 }
895
896
897 /**
898  * Send a message to all of our current clients.
899  */
900 static void
901 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
902 {
903   struct Client *c;
904
905   c = clients;
906   while (c != NULL)
907     {
908       send_to_client (c, msg, can_drop);
909       c = c->next;
910     }
911 }
912
913
914 /**
915  * Handle CORE_INIT request.
916  */
917 static void
918 handle_client_init (void *cls,
919                     struct GNUNET_SERVER_Client *client,
920                     const struct GNUNET_MessageHeader *message)
921 {
922   const struct InitMessage *im;
923   struct InitReplyMessage irm;
924   struct Client *c;
925   uint16_t msize;
926   const uint16_t *types;
927   struct Neighbour *n;
928   struct ConnectNotifyMessage cnm;
929
930 #if DEBUG_CORE_CLIENT
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Client connecting to core service with `%s' message\n",
933               "INIT");
934 #endif
935   /* check that we don't have an entry already */
936   c = clients;
937   while (c != NULL)
938     {
939       if (client == c->client_handle)
940         {
941           GNUNET_break (0);
942           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
943           return;
944         }
945       c = c->next;
946     }
947   msize = ntohs (message->size);
948   if (msize < sizeof (struct InitMessage))
949     {
950       GNUNET_break (0);
951       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
952       return;
953     }
954   im = (const struct InitMessage *) message;
955   types = (const uint16_t *) &im[1];
956   msize -= sizeof (struct InitMessage);
957   c = GNUNET_malloc (sizeof (struct Client) + msize);
958   c->client_handle = client;
959   c->next = clients;
960   clients = c;
961   memcpy (&c[1], types, msize);
962   c->types = (uint16_t *) & c[1];
963   c->options = ntohl (im->options);
964   c->tcnt = msize / sizeof (uint16_t);
965   /* send init reply message */
966   irm.header.size = htons (sizeof (struct InitReplyMessage));
967   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
968   irm.reserved = htonl (0);
969   memcpy (&irm.publicKey,
970           &my_public_key,
971           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
972 #if DEBUG_CORE_CLIENT
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Sending `%s' message to client.\n", "INIT_REPLY");
975 #endif
976   send_to_client (c, &irm.header, GNUNET_NO);
977   /* notify new client about existing neighbours */
978   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
979   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
980   n = neighbours;
981   while (n != NULL)
982     {
983 #if DEBUG_CORE_CLIENT
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
986 #endif
987       cnm.reserved = htonl (0);
988       cnm.peer = n->peer;
989       send_to_client (c, &cnm.header, GNUNET_NO);
990       n = n->next;
991     }
992   GNUNET_SERVER_receive_done (client, GNUNET_OK);
993 }
994
995
996 /**
997  * A client disconnected, clean up.
998  *
999  * @param cls closure
1000  * @param client identification of the client
1001  */
1002 static void
1003 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1004 {
1005   struct Client *pos;
1006   struct Client *prev;
1007   struct Event *e;
1008
1009 #if DEBUG_CORE_CLIENT
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               "Client has disconnected from core service.\n");
1012 #endif
1013   prev = NULL;
1014   pos = clients;
1015   while (pos != NULL)
1016     {
1017       if (client == pos->client_handle)
1018         {
1019           if (prev == NULL)
1020             clients = pos->next;
1021           else
1022             prev->next = pos->next;
1023           if (pos->th != NULL)
1024             GNUNET_NETWORK_connection_notify_transmit_ready_cancel (pos->th);
1025           while (NULL != (e = pos->event_head))
1026             {
1027               pos->event_head = e->next;
1028               GNUNET_free (e);
1029             }
1030           GNUNET_free (pos);
1031           return;
1032         }
1033       prev = pos;
1034       pos = pos->next;
1035     }
1036   /* client never sent INIT */
1037 }
1038
1039
1040 /**
1041  * Handle REQUEST_CONFIGURE request.
1042  */
1043 static void
1044 handle_client_request_configure (void *cls,
1045                                  struct GNUNET_SERVER_Client *client,
1046                                  const struct GNUNET_MessageHeader *message)
1047 {
1048   const struct RequestConfigureMessage *rcm;
1049   struct Neighbour *n;
1050   struct ConfigurationInfoMessage cim;
1051   struct Client *c;
1052   int reserv;
1053   unsigned long long old_preference;
1054
1055 #if DEBUG_CORE_CLIENT
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Core service receives `%s' request.\n", "CONFIGURE");
1058 #endif
1059   rcm = (const struct RequestConfigureMessage *) message;
1060   n = find_neighbour (&rcm->peer);
1061   memset (&cim, 0, sizeof (cim));
1062   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1063     {
1064       update_window (GNUNET_YES,
1065                      &n->available_send_window,
1066                      &n->last_asw_update,
1067                      n->bpm_out);
1068       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1069       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1070                                n->bpm_out_external_limit);
1071       reserv = ntohl (rcm->reserve_inbound);
1072       if (reserv < 0)
1073         {
1074           n->available_recv_window += reserv;
1075         }
1076       else if (reserv > 0)
1077         {
1078           update_window (GNUNET_NO,
1079                          &n->available_recv_window,
1080                          &n->last_arw_update, n->bpm_in);
1081           if (n->available_recv_window < reserv)
1082             reserv = n->available_recv_window;
1083           n->available_recv_window -= reserv;
1084         }
1085       old_preference = n->current_preference;
1086       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1087       if (old_preference > n->current_preference) 
1088         {
1089           /* overflow; cap at maximum value */
1090           n->current_preference = (unsigned long long) -1;
1091         }
1092       update_preference_sum (n->current_preference - old_preference);
1093       cim.reserved_amount = htonl (reserv);
1094       cim.bpm_in = htonl (n->bpm_in);
1095       cim.bpm_out = htonl (n->bpm_out);
1096       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1097       cim.preference = n->current_preference;
1098     }
1099   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1100   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1101   cim.peer = rcm->peer;
1102   c = find_client (client);
1103   if (c == NULL)
1104     {
1105       GNUNET_break (0);
1106       return;
1107     }
1108 #if DEBUG_CORE_CLIENT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1111 #endif
1112   send_to_client (c, &cim.header, GNUNET_NO);
1113 }
1114
1115
1116 /**
1117  * Check if we have encrypted messages for the specified neighbour
1118  * pending, and if so, check with the transport about sending them
1119  * out.
1120  *
1121  * @param n neighbour to check.
1122  */
1123 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1124
1125
1126 /**
1127  * Function called when the transport service is ready to
1128  * receive an encrypted message for the respective peer
1129  *
1130  * @param cls neighbour to use message from
1131  * @param size number of bytes we can transmit
1132  * @param buf where to copy the message
1133  * @return number of bytes transmitted
1134  */
1135 static size_t
1136 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1137 {
1138   struct Neighbour *n = cls;
1139   struct MessageEntry *m;
1140   size_t ret;
1141   char *cbuf;
1142
1143   n->th = NULL;
1144   GNUNET_assert (NULL != (m = n->encrypted_head));
1145   n->encrypted_head = m->next;
1146   if (m->next == NULL)
1147     n->encrypted_tail = NULL;
1148   ret = 0;
1149   cbuf = buf;
1150   if (buf != NULL)
1151     {
1152       GNUNET_assert (size >= m->size);
1153       memcpy (cbuf, &m[1], m->size);
1154       ret = m->size;
1155       n->available_send_window -= m->size;
1156       process_encrypted_neighbour_queue (n);
1157 #if DEBUG_CORE
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1160                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1161                   ret, GNUNET_i2s (&n->peer));
1162 #endif
1163     }
1164   else
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                   "Transmission for message of type %u and size %u failed\n",
1168                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1169                   m->size);
1170     }
1171   GNUNET_free (m);
1172   return ret;
1173 }
1174
1175
1176 /**
1177  * Check if we have plaintext messages for the specified neighbour
1178  * pending, and if so, consider batching and encrypting them (and
1179  * then trigger processing of the encrypted queue if needed).
1180  *
1181  * @param n neighbour to check.
1182  */
1183 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1184
1185
1186 /**
1187  * Check if we have encrypted messages for the specified neighbour
1188  * pending, and if so, check with the transport about sending them
1189  * out.
1190  *
1191  * @param n neighbour to check.
1192  */
1193 static void
1194 process_encrypted_neighbour_queue (struct Neighbour *n)
1195 {
1196   struct MessageEntry *m;
1197  
1198   if (n->th != NULL)
1199     return;  /* request already pending */
1200   if (n->encrypted_head == NULL)
1201     {
1202       /* encrypted queue empty, try plaintext instead */
1203       process_plaintext_neighbour_queue (n);
1204       return;
1205     }
1206 #if DEBUG_CORE
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1209               n->encrypted_head->size,
1210               GNUNET_i2s (&n->peer),
1211               GNUNET_TIME_absolute_get_remaining (n->
1212                                                   encrypted_head->deadline).
1213               value);
1214 #endif
1215   n->th =
1216     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1217                                             n->encrypted_head->size,
1218                                             n->encrypted_head->priority,
1219                                             GNUNET_TIME_absolute_get_remaining
1220                                             (n->encrypted_head->deadline),
1221                                             &notify_encrypted_transmit_ready,
1222                                             n);
1223   if (n->th == NULL)
1224     {
1225       /* message request too large (oops) */
1226       GNUNET_break (0);
1227       /* discard encrypted message */
1228       GNUNET_assert (NULL != (m = n->encrypted_head));
1229       n->encrypted_head = m->next;
1230       if (m->next == NULL)
1231         n->encrypted_tail = NULL;
1232       GNUNET_free (m);
1233       process_encrypted_neighbour_queue (n);
1234     }
1235 }
1236
1237
1238 /**
1239  * Decrypt size bytes from in and write the result to out.  Use the
1240  * key for inbound traffic of the given neighbour.  This function does
1241  * NOT do any integrity-checks on the result.
1242  *
1243  * @param n neighbour we are receiving from
1244  * @param iv initialization vector to use
1245  * @param in ciphertext
1246  * @param out plaintext
1247  * @param size size of in/out
1248  * @return GNUNET_OK on success
1249  */
1250 static int
1251 do_decrypt (struct Neighbour *n,
1252             const GNUNET_HashCode * iv,
1253             const void *in, void *out, size_t size)
1254 {
1255   if (size != (uint16_t) size)
1256     {
1257       GNUNET_break (0);
1258       return GNUNET_NO;
1259     }
1260   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1261       (n->status != PEER_STATE_KEY_CONFIRMED))
1262     {
1263       GNUNET_break_op (0);
1264       return GNUNET_SYSERR;
1265     }
1266   if (size !=
1267       GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
1268                                  in,
1269                                  (uint16_t) size,
1270                                  (const struct
1271                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1272                                  out))
1273     {
1274       GNUNET_break (0);
1275       return GNUNET_SYSERR;
1276     }
1277 #if DEBUG_CORE
1278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279               "Decrypted %u bytes from `%4s' using key %u\n",
1280               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1281 #endif
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Encrypt size bytes from in and write the result to out.  Use the
1288  * key for outbound traffic of the given neighbour.
1289  *
1290  * @param n neighbour we are sending to
1291  * @param iv initialization vector to use
1292  * @param in ciphertext
1293  * @param out plaintext
1294  * @param size size of in/out
1295  * @return GNUNET_OK on success
1296  */
1297 static int
1298 do_encrypt (struct Neighbour *n,
1299             const GNUNET_HashCode * iv,
1300             const void *in, void *out, size_t size)
1301 {
1302   if (size != (uint16_t) size)
1303     {
1304       GNUNET_break (0);
1305       return GNUNET_NO;
1306     }
1307   GNUNET_assert (size ==
1308                  GNUNET_CRYPTO_aes_encrypt (in,
1309                                             (uint16_t) size,
1310                                             &n->encrypt_key,
1311                                             (const struct
1312                                              GNUNET_CRYPTO_AesInitializationVector
1313                                              *) iv, out));
1314 #if DEBUG_CORE
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Encrypted %u bytes for `%4s' using key %u\n", size,
1317               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1318 #endif
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Select messages for transmission.  This heuristic uses a combination
1325  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1326  * and priority-based discard (in case no feasible schedule exist) and
1327  * speculative optimization (defer any kind of transmission until
1328  * we either create a batch of significant size, 25% of max, or until
1329  * we are close to a deadline).  Furthermore, when scheduling the
1330  * heuristic also packs as many messages into the batch as possible,
1331  * starting with those with the earliest deadline.  Yes, this is fun.
1332  *
1333  * @param n neighbour to select messages from
1334  * @param size number of bytes to select for transmission
1335  * @param retry_time set to the time when we should try again
1336  *        (only valid if this function returns zero)
1337  * @return number of bytes selected, or 0 if we decided to
1338  *         defer scheduling overall; in that case, retry_time is set.
1339  */
1340 static size_t
1341 select_messages (struct Neighbour *n,
1342                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1343 {
1344   struct MessageEntry *pos;
1345   struct MessageEntry *min;
1346   struct MessageEntry *last;
1347   unsigned int min_prio;
1348   struct GNUNET_TIME_Absolute t;
1349   struct GNUNET_TIME_Absolute now;
1350   uint64_t delta;
1351   uint64_t avail;
1352   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1353   size_t off;
1354   int discard_low_prio;
1355
1356   GNUNET_assert (NULL != n->messages);
1357   now = GNUNET_TIME_absolute_get ();
1358   /* last entry in linked list of messages processed */
1359   last = NULL;
1360   /* should we remove the entry with the lowest
1361      priority from consideration for scheduling at the
1362      end of the loop? */
1363   discard_low_prio = GNUNET_YES;
1364   while (GNUNET_YES == discard_low_prio)
1365     {
1366       min = NULL;
1367       min_prio = -1;
1368       discard_low_prio = GNUNET_NO;
1369       /* calculate number of bytes available for transmission at time "t" */
1370       update_window (GNUNET_NO,
1371                      &n->available_send_window,
1372                      &n->last_asw_update,
1373                      n->bpm_out);
1374       avail = n->available_send_window;
1375       t = n->last_asw_update;
1376       /* how many bytes have we (hypothetically) scheduled so far */
1377       off = 0;
1378       /* maximum time we can wait before transmitting anything
1379          and still make all of our deadlines */
1380       slack = -1;
1381
1382       pos = n->messages;
1383       /* note that we use "*2" here because we want to look
1384          a bit further into the future; much more makes no
1385          sense since new message might be scheduled in the
1386          meantime... */
1387       while ((pos != NULL) && (off < size * 2))
1388         {
1389           if (pos->do_transmit == GNUNET_YES)
1390             {
1391               /* already removed from consideration */
1392               pos = pos->next;
1393               continue;
1394             }
1395           if (discard_low_prio == GNUNET_NO)
1396             {
1397               delta = pos->deadline.value;
1398               if (delta < t.value)
1399                 delta = 0;
1400               else
1401                 delta = t.value - delta;
1402               avail += delta * n->bpm_out / 1000 / 60;
1403               if (avail < pos->size)
1404                 {
1405                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1406                 }
1407               else
1408                 {
1409                   avail -= pos->size;
1410                   /* update slack, considering both its absolute deadline
1411                      and relative deadlines caused by other messages
1412                      with their respective load */
1413                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1414                   if (pos->deadline.value < now.value)
1415                     slack = 0;
1416                   else
1417                     slack =
1418                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1419                 }
1420             }
1421           off += pos->size;
1422           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1423           if (pos->priority <= min_prio)
1424             {
1425               /* update min for discard */
1426               min_prio = pos->priority;
1427               min = pos;
1428             }
1429           pos = pos->next;
1430         }
1431       if (discard_low_prio)
1432         {
1433           GNUNET_assert (min != NULL);
1434           /* remove lowest-priority entry from consideration */
1435           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1436         }
1437       last = pos;
1438     }
1439   /* guard against sending "tiny" messages with large headers without
1440      urgent deadlines */
1441   if ( (slack > 1000) && (size > 4 * off) )
1442     {
1443       /* less than 25% of message would be filled with
1444          deadlines still being met if we delay by one
1445          second or more; so just wait for more data */
1446       retry_time->value = slack / 2;
1447       /* reset do_transmit values for next time */
1448       while (pos != last)
1449         {
1450           pos->do_transmit = GNUNET_NO;
1451           pos = pos->next;
1452         }
1453       return 0;
1454     }
1455   /* select marked messages (up to size) for transmission */
1456   off = 0;
1457   pos = n->messages;
1458   while (pos != last)
1459     {
1460       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1461         {
1462           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1463           off += pos->size;
1464           size -= pos->size;
1465         }
1466       else
1467         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1468       pos = pos->next;
1469     }
1470 #if DEBUG_CORE
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1473               off, GNUNET_i2s (&n->peer));
1474 #endif
1475   return off;
1476 }
1477
1478
1479 /**
1480  * Batch multiple messages into a larger buffer.
1481  *
1482  * @param n neighbour to take messages from
1483  * @param buf target buffer
1484  * @param size size of buf
1485  * @param deadline set to transmission deadline for the result
1486  * @param retry_time set to the time when we should try again
1487  *        (only valid if this function returns zero)
1488  * @param priority set to the priority of the batch
1489  * @return number of bytes written to buf (can be zero)
1490  */
1491 static size_t
1492 batch_message (struct Neighbour *n,
1493                char *buf,
1494                size_t size,
1495                struct GNUNET_TIME_Absolute *deadline,
1496                struct GNUNET_TIME_Relative *retry_time,
1497                unsigned int *priority)
1498 {
1499   struct MessageEntry *pos;
1500   struct MessageEntry *prev;
1501   struct MessageEntry *next;
1502   size_t ret;
1503
1504   ret = 0;
1505   *priority = 0;
1506   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1507   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1508   if (0 == select_messages (n, size, retry_time))
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1511                   "No messages selected, will try again in %llu ms\n",
1512                   retry_time->value);
1513       return 0;
1514     }
1515   pos = n->messages;
1516   prev = NULL;
1517   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1518     {
1519       next = pos->next;
1520       if (GNUNET_YES == pos->do_transmit)
1521         {
1522           GNUNET_assert (pos->size <= size);
1523           memcpy (&buf[ret], &pos[1], pos->size);
1524           ret += pos->size;
1525           size -= pos->size;
1526           *priority += pos->priority;
1527           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1528           GNUNET_free (pos);
1529           if (prev == NULL)
1530             n->messages = next;
1531           else
1532             prev->next = next;
1533         }
1534       else
1535         {
1536           prev = pos;
1537         }
1538       pos = next;
1539     }
1540   return ret;
1541 }
1542
1543
1544 /**
1545  * Remove messages with deadlines that have long expired from
1546  * the queue.
1547  *
1548  * @param n neighbour to inspect
1549  */
1550 static void
1551 discard_expired_messages (struct Neighbour *n)
1552 {
1553   struct MessageEntry *prev;
1554   struct MessageEntry *next;
1555   struct MessageEntry *pos;
1556   struct GNUNET_TIME_Absolute now;
1557   struct GNUNET_TIME_Relative delta;
1558
1559   now = GNUNET_TIME_absolute_get ();
1560   prev = NULL;
1561   pos = n->messages;
1562   while (pos != NULL) 
1563     {
1564       next = pos->next;
1565       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1566       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1567         {
1568 #if DEBUG_CORE
1569           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1570                       "Message is %llu ms past due, discarding.\n",
1571                       delta.value);
1572 #endif
1573           if (prev == NULL)
1574             n->messages = next;
1575           else
1576             prev->next = next;
1577           GNUNET_free (pos);
1578         }
1579       else
1580         prev = pos;
1581       pos = next;
1582     }
1583 }
1584
1585
1586 /**
1587  * Signature of the main function of a task.
1588  *
1589  * @param cls closure
1590  * @param tc context information (why was this task triggered now)
1591  */
1592 static void
1593 retry_plaintext_processing (void *cls,
1594                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1595 {
1596   struct Neighbour *n = cls;
1597
1598   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1599   process_plaintext_neighbour_queue (n);
1600 }
1601
1602
1603 /**
1604  * Send our key (and encrypted PING) to the other peer.
1605  *
1606  * @param n the other peer
1607  */
1608 static void send_key (struct Neighbour *n);
1609
1610
1611 /**
1612  * Check if we have plaintext messages for the specified neighbour
1613  * pending, and if so, consider batching and encrypting them (and
1614  * then trigger processing of the encrypted queue if needed).
1615  *
1616  * @param n neighbour to check.
1617  */
1618 static void
1619 process_plaintext_neighbour_queue (struct Neighbour *n)
1620 {
1621   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1622   size_t used;
1623   size_t esize;
1624   struct EncryptedMessage *em;  /* encrypted message */
1625   struct EncryptedMessage *ph;  /* plaintext header */
1626   struct MessageEntry *me;
1627   unsigned int priority;
1628   struct GNUNET_TIME_Absolute deadline;
1629   struct GNUNET_TIME_Relative retry_time;
1630
1631   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1632     {
1633       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1634       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1635     }
1636   switch (n->status)
1637     {
1638     case PEER_STATE_DOWN:
1639       send_key (n);
1640 #if DEBUG_CORE
1641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1643                   GNUNET_i2s(&n->peer));
1644 #endif
1645       return;
1646     case PEER_STATE_KEY_SENT:
1647       GNUNET_assert (n->retry_set_key_task !=
1648                      GNUNET_SCHEDULER_NO_TASK);
1649 #if DEBUG_CORE
1650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1651                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1652                   GNUNET_i2s(&n->peer));
1653 #endif
1654       return;
1655     case PEER_STATE_KEY_RECEIVED:
1656       GNUNET_assert (n->retry_set_key_task !=
1657                      GNUNET_SCHEDULER_NO_TASK);
1658 #if DEBUG_CORE
1659       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1661                   GNUNET_i2s(&n->peer));
1662 #endif
1663       return;
1664     case PEER_STATE_KEY_CONFIRMED:
1665       /* ready to continue */
1666       break;
1667     }
1668   discard_expired_messages (n);
1669   if (n->messages == NULL)
1670     {
1671 #if DEBUG_CORE
1672       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                   "Plaintext message queue for `%4s' is empty.\n",
1674                   GNUNET_i2s(&n->peer));
1675 #endif
1676       return;                   /* no pending messages */
1677     }
1678   if (n->encrypted_head != NULL)
1679     {
1680 #if DEBUG_CORE
1681       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1683                   GNUNET_i2s(&n->peer));
1684 #endif
1685       return;                   /* wait for messages already encrypted to be
1686                                    processed first! */
1687     }
1688   ph = (struct EncryptedMessage *) pbuf;
1689   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1690   priority = 0;
1691   used = sizeof (struct EncryptedMessage);
1692   used += batch_message (n,
1693                          &pbuf[used],
1694                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1695                          &deadline, &retry_time, &priority);
1696   if (used == sizeof (struct EncryptedMessage))
1697     {
1698 #if DEBUG_CORE
1699       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1700                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1701                   GNUNET_i2s(&n->peer));
1702 #endif
1703       /* no messages selected for sending, try again later... */
1704       n->retry_plaintext_task =
1705         GNUNET_SCHEDULER_add_delayed (sched,
1706                                       GNUNET_NO,
1707                                       GNUNET_SCHEDULER_PRIORITY_IDLE,
1708                                       GNUNET_SCHEDULER_NO_TASK,
1709                                       retry_time,
1710                                       &retry_plaintext_processing, n);
1711       return;
1712     }
1713
1714   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1715   ph->inbound_bpm_limit = htonl (n->bpm_in);
1716   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1717
1718   /* setup encryption message header */
1719   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1720   me->deadline = deadline;
1721   me->priority = priority;
1722   me->size = used;
1723   em = (struct EncryptedMessage *) &me[1];
1724   em->header.size = htons (used);
1725   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1726   em->reserved = htonl (0);
1727   esize = used - ENCRYPTED_HEADER_SIZE;
1728   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1729   /* encrypt */
1730 #if DEBUG_CORE
1731   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1732               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1733               esize,
1734               GNUNET_i2s(&n->peer));
1735 #endif
1736   GNUNET_assert (GNUNET_OK ==
1737                  do_encrypt (n,
1738                              &em->plaintext_hash,
1739                              &ph->sequence_number,
1740                              &em->sequence_number, esize));
1741   /* append to transmission list */
1742   if (n->encrypted_tail == NULL)
1743     n->encrypted_head = me;
1744   else
1745     n->encrypted_tail->next = me;
1746   n->encrypted_tail = me;
1747   process_encrypted_neighbour_queue (n);
1748 }
1749
1750
1751 /**
1752  * Handle CORE_SEND request.
1753  */
1754 static void
1755 handle_client_send (void *cls,
1756                     struct GNUNET_SERVER_Client *client,
1757                     const struct GNUNET_MessageHeader *message);
1758
1759
1760 /**
1761  * Function called to notify us that we either succeeded
1762  * or failed to connect (at the transport level) to another
1763  * peer.  We should either free the message we were asked
1764  * to transmit or re-try adding it to the queue.
1765  *
1766  * @param cls closure
1767  * @param size number of bytes available in buf
1768  * @param buf where the callee should write the message
1769  * @return number of bytes written to buf
1770  */
1771 static size_t
1772 send_connect_continuation (void *cls, size_t size, void *buf)
1773 {
1774   struct SendMessage *sm = cls;
1775
1776   if (buf == NULL)
1777     {
1778 #if DEBUG_CORE
1779       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1780                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1781                   GNUNET_i2s (&sm->peer));
1782 #endif
1783       GNUNET_free (sm);
1784       return 0;
1785     }
1786 #if DEBUG_CORE
1787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1789               GNUNET_i2s (&sm->peer));
1790 #endif
1791   handle_client_send (NULL, NULL, &sm->header);
1792   GNUNET_free (sm);
1793   return 0;
1794 }
1795
1796
1797 /**
1798  * Handle CORE_SEND request.
1799  */
1800 static void
1801 handle_client_send (void *cls,
1802                     struct GNUNET_SERVER_Client *client,
1803                     const struct GNUNET_MessageHeader *message)
1804 {
1805   const struct SendMessage *sm;
1806   struct SendMessage *smc;
1807   const struct GNUNET_MessageHeader *mh;
1808   struct Neighbour *n;
1809   struct MessageEntry *prev;
1810   struct MessageEntry *pos;
1811   struct MessageEntry *e; 
1812   struct MessageEntry *min_prio_entry;
1813   struct MessageEntry *min_prio_prev;
1814   unsigned int min_prio;
1815   unsigned int queue_size;
1816   uint16_t msize;
1817
1818   msize = ntohs (message->size);
1819   if (msize <
1820       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1821     {
1822       GNUNET_break (0);
1823       if (client != NULL)
1824         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1825       return;
1826     }
1827   sm = (const struct SendMessage *) message;
1828   msize -= sizeof (struct SendMessage);
1829   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1830   if (msize != ntohs (mh->size))
1831     {
1832       GNUNET_break (0);
1833       if (client != NULL)
1834         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1835       return;
1836     }
1837   n = find_neighbour (&sm->peer);
1838   if (n == NULL)
1839     {
1840 #if DEBUG_CORE
1841       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1843                   "SEND",
1844                   GNUNET_i2s (&sm->peer),
1845                   GNUNET_TIME_absolute_get_remaining
1846                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1847 #endif
1848       msize += sizeof (struct SendMessage);
1849       /* ask transport to connect to the peer */
1850       smc = GNUNET_malloc (msize);
1851       memcpy (smc, sm, msize);
1852       if (NULL ==
1853           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1854                                                   &sm->peer,
1855                                                   0, 0,
1856                                                   GNUNET_TIME_absolute_get_remaining
1857                                                   (GNUNET_TIME_absolute_ntoh
1858                                                    (sm->deadline)),
1859                                                   &send_connect_continuation,
1860                                                   smc))
1861         {
1862           /* transport has already a request pending for this peer! */
1863 #if DEBUG_CORE
1864           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1865                       "Dropped second message destined for `%4s' since connection is still down.\n",
1866                       GNUNET_i2s(&sm->peer));
1867 #endif
1868           GNUNET_free (smc);
1869         }
1870       if (client != NULL)
1871         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1872       return;
1873     }
1874 #if DEBUG_CORE
1875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1876               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1877               "SEND",
1878               msize, 
1879               GNUNET_i2s (&sm->peer));
1880 #endif
1881   /* bound queue size */
1882   discard_expired_messages (n);
1883   min_prio = (unsigned int) -1;
1884   min_prio_entry = NULL;
1885   min_prio_prev = NULL;
1886   queue_size = 0;
1887   prev = NULL;
1888   pos = n->messages;
1889   while (pos != NULL) 
1890     {
1891       if (pos->priority < min_prio)
1892         {
1893           min_prio_entry = pos;
1894           min_prio_prev = prev;
1895           min_prio = pos->priority;
1896         }
1897       queue_size++;
1898       prev = pos;
1899       pos = pos->next;
1900     }
1901   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1902     {
1903       /* queue full */
1904       if (ntohl(sm->priority) <= min_prio)
1905         {
1906           /* discard new entry */
1907 #if DEBUG_CORE
1908           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1909                       "Queue full, discarding new request\n");
1910 #endif
1911           if (client != NULL)
1912             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1913           return;
1914         }
1915       /* discard "min_prio_entry" */
1916 #if DEBUG_CORE
1917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                   "Queue full, discarding existing older request\n");
1919 #endif
1920       if (min_prio_prev == NULL)
1921         n->messages = min_prio_entry->next;
1922       else
1923         min_prio_prev->next = min_prio_entry->next;      
1924       GNUNET_free (min_prio_entry);     
1925     }
1926   
1927   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1928   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1929   e->priority = ntohl (sm->priority);
1930   e->size = msize;
1931   memcpy (&e[1], mh, msize);
1932
1933   /* insert, keep list sorted by deadline */
1934   prev = NULL;
1935   pos = n->messages;
1936   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1937     {
1938       prev = pos;
1939       pos = pos->next;
1940     }
1941   if (prev == NULL)
1942     n->messages = e;
1943   else
1944     prev->next = e;
1945   e->next = pos;
1946
1947   /* consider scheduling now */
1948   process_plaintext_neighbour_queue (n);
1949   if (client != NULL)
1950     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1951 }
1952
1953
1954 /**
1955  * List of handlers for the messages understood by this
1956  * service.
1957  */
1958 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1959   {&handle_client_init, NULL,
1960    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1961   {&handle_client_request_configure, NULL,
1962    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1963    sizeof (struct RequestConfigureMessage)},
1964   {&handle_client_send, NULL,
1965    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1966   {NULL, NULL, 0, 0}
1967 };
1968
1969
1970 /**
1971  * PEERINFO is giving us a HELLO for a peer.  Add the
1972  * public key to the neighbour's struct and retry
1973  * send_key.  Or, if we did not get a HELLO, just do
1974  * nothing.
1975  *
1976  * @param cls NULL
1977  * @param peer the peer for which this is the HELLO
1978  * @param hello HELLO message of that peer
1979  * @param trust amount of trust we currently have in that peer
1980  */
1981 static void
1982 process_hello_retry_send_key (void *cls,
1983                               const struct GNUNET_PeerIdentity *peer,
1984                               const struct GNUNET_HELLO_Message *hello,
1985                               uint32_t trust)
1986 {
1987   struct Neighbour *n;
1988
1989   if (peer == NULL)
1990     return;
1991   n = find_neighbour (peer);
1992   if (n == NULL)
1993     return;
1994   if (n->public_key != NULL)
1995     return;
1996 #if DEBUG_CORE
1997   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1998               "Received new `%s' message for `%4s', initiating key exchange.\n",
1999               "HELLO",
2000               GNUNET_i2s (peer));
2001 #endif
2002   n->public_key =
2003     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2004   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2005     {
2006       GNUNET_free (n->public_key);
2007       n->public_key = NULL;
2008       return;
2009     }
2010   send_key (n);
2011 }
2012
2013
2014 /**
2015  * Task that will retry "send_key" if our previous attempt failed
2016  * to yield a PONG.
2017  */
2018 static void
2019 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2020 {
2021   struct Neighbour *n = cls;
2022
2023   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2024   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2025   n->set_key_retry_frequency =
2026     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2027   send_key (n);
2028 }
2029
2030
2031 /**
2032  * Send our key (and encrypted PING) to the other peer.
2033  *
2034  * @param n the other peer
2035  */
2036 static void
2037 send_key (struct Neighbour *n)
2038 {
2039   struct SetKeyMessage *sm;
2040   struct MessageEntry *me;
2041   struct PingMessage pp;
2042   struct PingMessage *pm;
2043
2044 #if DEBUG_CORE
2045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2046               "Asked to perform key exchange with `%4s'.\n",
2047               GNUNET_i2s (&n->peer));
2048 #endif
2049   if (n->public_key == NULL)
2050     {
2051       /* lookup n's public key, then try again */
2052 #if DEBUG_CORE
2053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054                   "Lacking public key for `%4s', trying to obtain one.\n",
2055                   GNUNET_i2s (&n->peer));
2056 #endif
2057       GNUNET_PEERINFO_for_all (cfg,
2058                                sched,
2059                                &n->peer,
2060                                0,
2061                                GNUNET_TIME_UNIT_MINUTES,
2062                                &process_hello_retry_send_key, NULL);
2063       return;
2064     }
2065   /* first, set key message */
2066   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2067                       sizeof (struct SetKeyMessage));
2068   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2069   me->priority = SET_KEY_PRIORITY;
2070   me->size = sizeof (struct SetKeyMessage);
2071   if (n->encrypted_head == NULL)
2072     n->encrypted_head = me;
2073   else
2074     n->encrypted_tail->next = me;
2075   n->encrypted_tail = me;
2076   sm = (struct SetKeyMessage *) &me[1];
2077   sm->header.size = htons (sizeof (struct SetKeyMessage));
2078   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2079   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2080                                         PEER_STATE_KEY_SENT : n->status));
2081   sm->purpose.size =
2082     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2083            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2084            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2085            sizeof (struct GNUNET_PeerIdentity));
2086   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2087   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2088   sm->target = n->peer;
2089   GNUNET_assert (GNUNET_OK ==
2090                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2091                                             sizeof (struct
2092                                                     GNUNET_CRYPTO_AesSessionKey),
2093                                             n->public_key,
2094                                             &sm->encrypted_key));
2095   GNUNET_assert (GNUNET_OK ==
2096                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2097                                          &sm->signature));
2098
2099   /* second, encrypted PING message */
2100   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2101                       sizeof (struct PingMessage));
2102   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2103   me->priority = PING_PRIORITY;
2104   me->size = sizeof (struct PingMessage);
2105   n->encrypted_tail->next = me;
2106   n->encrypted_tail = me;
2107   pm = (struct PingMessage *) &me[1];
2108   pm->header.size = htons (sizeof (struct PingMessage));
2109   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2110   pp.challenge = htonl (n->ping_challenge);
2111   pp.target = n->peer;
2112 #if DEBUG_CORE
2113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2114               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2115               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2116   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2118               "PING",
2119               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2120 #endif
2121   do_encrypt (n,
2122               &n->peer.hashPubKey,
2123               &pp.challenge,
2124               &pm->challenge,
2125               sizeof (struct PingMessage) -
2126               sizeof (struct GNUNET_MessageHeader));
2127   /* update status */
2128   switch (n->status)
2129     {
2130     case PEER_STATE_DOWN:
2131       n->status = PEER_STATE_KEY_SENT;
2132       break;
2133     case PEER_STATE_KEY_SENT:
2134       break;
2135     case PEER_STATE_KEY_RECEIVED:
2136       break;
2137     case PEER_STATE_KEY_CONFIRMED:
2138       GNUNET_break (0);
2139       break;
2140     default:
2141       GNUNET_break (0);
2142       break;
2143     }
2144   /* trigger queue processing */
2145   process_encrypted_neighbour_queue (n);
2146   if (n->status != PEER_STATE_KEY_CONFIRMED)
2147     n->retry_set_key_task
2148       = GNUNET_SCHEDULER_add_delayed (sched,
2149                                       GNUNET_NO,
2150                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
2151                                       GNUNET_SCHEDULER_NO_TASK,
2152                                       n->set_key_retry_frequency,
2153                                       &set_key_retry_task, n);
2154 }
2155
2156
2157 /**
2158  * We received a SET_KEY message.  Validate and update
2159  * our key material and status.
2160  *
2161  * @param n the neighbour from which we received message m
2162  * @param m the set key message we received
2163  */
2164 static void
2165 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2166
2167
2168 /**
2169  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2170  * the neighbour's struct and retry handling the set_key message.  Or,
2171  * if we did not get a HELLO, just free the set key message.
2172  *
2173  * @param cls pointer to the set key message
2174  * @param peer the peer for which this is the HELLO
2175  * @param hello HELLO message of that peer
2176  * @param trust amount of trust we currently have in that peer
2177  */
2178 static void
2179 process_hello_retry_handle_set_key (void *cls,
2180                                     const struct GNUNET_PeerIdentity *peer,
2181                                     const struct GNUNET_HELLO_Message *hello,
2182                                     uint32_t trust)
2183 {
2184   struct SetKeyMessage *sm = cls;
2185   struct Neighbour *n;
2186
2187   if (peer == NULL)
2188     {
2189       GNUNET_free (sm);
2190       return;
2191     }
2192   n = find_neighbour (peer);
2193   if (n == NULL)
2194     {
2195       GNUNET_break (0);
2196       return;
2197     }
2198   if (n->public_key != NULL)
2199     return;                     /* multiple HELLOs match!? */
2200   n->public_key =
2201     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2202   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2203     {
2204       GNUNET_break_op (0);
2205       GNUNET_free (n->public_key);
2206       n->public_key = NULL;
2207       return;
2208     }
2209 #if DEBUG_CORE
2210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2211               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2212               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2213 #endif
2214   handle_set_key (n, sm);
2215 }
2216
2217
2218 /**
2219  * We received a PING message.  Validate and transmit
2220  * PONG.
2221  *
2222  * @param n sender of the PING
2223  * @param m the encrypted PING message itself
2224  */
2225 static void
2226 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2227 {
2228   struct PingMessage t;
2229   struct PingMessage *tp;
2230   struct MessageEntry *me;
2231
2232 #if DEBUG_CORE
2233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2234               "Core service receives `%s' request from `%4s'.\n",
2235               "PING", GNUNET_i2s (&n->peer));
2236 #endif
2237   if (GNUNET_OK !=
2238       do_decrypt (n,
2239                   &my_identity.hashPubKey,
2240                   &m->challenge,
2241                   &t.challenge,
2242                   sizeof (struct PingMessage) -
2243                   sizeof (struct GNUNET_MessageHeader)))
2244     return;
2245 #if DEBUG_CORE
2246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2247               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2248               "PING",
2249               GNUNET_i2s (&t.target),
2250               ntohl (t.challenge), n->decrypt_key.crc32);
2251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2252               "Target of `%s' request is `%4s'.\n",
2253               "PING", GNUNET_i2s (&t.target));
2254 #endif
2255   if (0 != memcmp (&t.target,
2256                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2257     {
2258       GNUNET_break_op (0);
2259       return;
2260     }
2261   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2262                       sizeof (struct PingMessage));
2263   if (n->encrypted_tail != NULL)
2264     n->encrypted_tail->next = me;
2265   else
2266     {
2267       n->encrypted_tail = me;
2268       n->encrypted_head = me;
2269     }
2270   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2271   me->priority = PONG_PRIORITY;
2272   me->size = sizeof (struct PingMessage);
2273   tp = (struct PingMessage *) &me[1];
2274   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2275   tp->header.size = htons (sizeof (struct PingMessage));
2276   do_encrypt (n,
2277               &my_identity.hashPubKey,
2278               &t.challenge,
2279               &tp->challenge,
2280               sizeof (struct PingMessage) -
2281               sizeof (struct GNUNET_MessageHeader));
2282 #if DEBUG_CORE
2283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2284               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2285               ntohl (t.challenge), n->encrypt_key.crc32);
2286 #endif
2287   /* trigger queue processing */
2288   process_encrypted_neighbour_queue (n);
2289 }
2290
2291
2292 /**
2293  * We received a SET_KEY message.  Validate and update
2294  * our key material and status.
2295  *
2296  * @param n the neighbour from which we received message m
2297  * @param m the set key message we received
2298  */
2299 static void
2300 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2301 {
2302   struct SetKeyMessage *m_cpy;
2303   struct GNUNET_TIME_Absolute t;
2304   struct GNUNET_CRYPTO_AesSessionKey k;
2305   struct PingMessage *ping;
2306   enum PeerStateMachine sender_status;
2307
2308 #if DEBUG_CORE
2309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2310               "Core service receives `%s' request from `%4s'.\n",
2311               "SET_KEY", GNUNET_i2s (&n->peer));
2312 #endif
2313   if (n->public_key == NULL)
2314     {
2315 #if DEBUG_CORE
2316       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2317                   "Lacking public key for peer, trying to obtain one.\n");
2318 #endif
2319       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2320       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2321       /* lookup n's public key, then try again */
2322       GNUNET_PEERINFO_for_all (cfg,
2323                                sched,
2324                                &n->peer,
2325                                0,
2326                                GNUNET_TIME_UNIT_MINUTES,
2327                                &process_hello_retry_handle_set_key, m_cpy);
2328       return;
2329     }
2330   if ((ntohl (m->purpose.size) !=
2331        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2332        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2333        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2334        sizeof (struct GNUNET_PeerIdentity)) ||
2335       (GNUNET_OK !=
2336        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2337                                  &m->purpose, &m->signature, n->public_key)))
2338     {
2339       /* invalid signature */
2340       GNUNET_break_op (0);
2341       return;
2342     }
2343   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2344   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2345        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2346       (t.value < n->decrypt_key_created.value))
2347     {
2348       /* this could rarely happen due to massive re-ordering of
2349          messages on the network level, but is most likely either
2350          a bug or some adversary messing with us.  Report. */
2351       GNUNET_break_op (0);
2352       return;
2353     }
2354 #if DEBUG_CORE
2355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2356 #endif  
2357   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2358                                   &m->encrypted_key,
2359                                   &k,
2360                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2361        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2362       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2363     {
2364       /* failed to decrypt !? */
2365       GNUNET_break_op (0);
2366       return;
2367     }
2368
2369   n->decrypt_key = k;
2370   if (n->decrypt_key_created.value != t.value)
2371     {
2372       /* fresh key, reset sequence numbers */
2373       n->last_sequence_number_received = 0;
2374       n->last_packets_bitmap = 0;
2375       n->decrypt_key_created = t;
2376     }
2377   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2378   switch (n->status)
2379     {
2380     case PEER_STATE_DOWN:
2381       n->status = PEER_STATE_KEY_RECEIVED;
2382 #if DEBUG_CORE
2383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2384                   "Responding to `%s' with my own key.\n", "SET_KEY");
2385 #endif
2386       send_key (n);
2387       break;
2388     case PEER_STATE_KEY_SENT:
2389     case PEER_STATE_KEY_RECEIVED:
2390       n->status = PEER_STATE_KEY_RECEIVED;
2391       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2392           (sender_status != PEER_STATE_KEY_CONFIRMED))
2393         {
2394 #if DEBUG_CORE
2395           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2396                       "Responding to `%s' with my own key (other peer has status %u).\n",
2397                       "SET_KEY", sender_status);
2398 #endif
2399           send_key (n);
2400         }
2401       break;
2402     case PEER_STATE_KEY_CONFIRMED:
2403       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2404           (sender_status != PEER_STATE_KEY_CONFIRMED))
2405         {
2406 #if DEBUG_CORE
2407           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2408                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2409                       "SET_KEY", sender_status);
2410 #endif
2411           send_key (n);
2412         }
2413       break;
2414     default:
2415       GNUNET_break (0);
2416       break;
2417     }
2418   if (n->pending_ping != NULL)
2419     {
2420       ping = n->pending_ping;
2421       n->pending_ping = NULL;
2422       handle_ping (n, ping);
2423       GNUNET_free (ping);
2424     }
2425 }
2426
2427
2428 /**
2429  * We received a PONG message.  Validate and update
2430  * our status.
2431  *
2432  * @param n sender of the PONG
2433  * @param m the encrypted PONG message itself
2434  */
2435 static void
2436 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2437 {
2438   struct PingMessage t;
2439
2440 #if DEBUG_CORE
2441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2442               "Core service receives `%s' request from `%4s'.\n",
2443               "PONG", GNUNET_i2s (&n->peer));
2444 #endif
2445   if (GNUNET_OK !=
2446       do_decrypt (n,
2447                   &n->peer.hashPubKey,
2448                   &m->challenge,
2449                   &t.challenge,
2450                   sizeof (struct PingMessage) -
2451                   sizeof (struct GNUNET_MessageHeader)))
2452     return;
2453 #if DEBUG_CORE
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2456               "PONG",
2457               GNUNET_i2s (&t.target),
2458               ntohl (t.challenge), n->decrypt_key.crc32);
2459 #endif
2460   if ((0 != memcmp (&t.target,
2461                     &n->peer,
2462                     sizeof (struct GNUNET_PeerIdentity))) ||
2463       (n->ping_challenge != ntohl (t.challenge)))
2464     {
2465       /* PONG malformed */
2466 #if DEBUG_CORE
2467       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2468                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2469                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2471                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2472                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2473 #endif
2474       GNUNET_break_op (0);
2475       return;
2476     }
2477   switch (n->status)
2478     {
2479     case PEER_STATE_DOWN:
2480       GNUNET_break (0);         /* should be impossible */
2481       return;
2482     case PEER_STATE_KEY_SENT:
2483       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2484       return;
2485     case PEER_STATE_KEY_RECEIVED:
2486       n->status = PEER_STATE_KEY_CONFIRMED;
2487       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2488         {
2489           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2490           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2491         }
2492       process_encrypted_neighbour_queue (n);
2493       break;
2494     case PEER_STATE_KEY_CONFIRMED:
2495       /* duplicate PONG? */
2496       break;
2497     default:
2498       GNUNET_break (0);
2499       break;
2500     }
2501 }
2502
2503
2504 /**
2505  * Send a P2P message to a client.
2506  *
2507  * @param sender who sent us the message?
2508  * @param client who should we give the message to?
2509  * @param m contains the message to transmit
2510  * @param msize number of bytes in buf to transmit
2511  */
2512 static void
2513 send_p2p_message_to_client (struct Neighbour *sender,
2514                             struct Client *client,
2515                             const void *m, size_t msize)
2516 {
2517   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2518   struct NotifyTrafficMessage *ntm;
2519
2520 #if DEBUG_CORE
2521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2522               "Core service passes message from `%4s' of type %u to client.\n",
2523               GNUNET_i2s(&sender->peer),
2524               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2525 #endif
2526   ntm = (struct NotifyTrafficMessage *) buf;
2527   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2528   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2529   ntm->reserved = htonl (0);
2530   ntm->peer = sender->peer;
2531   memcpy (&ntm[1], m, msize);
2532   send_to_client (client, &ntm->header, GNUNET_YES);
2533 }
2534
2535
2536 /**
2537  * Deliver P2P message to interested clients.
2538  *
2539  * @param sender who sent us the message?
2540  * @param m the message
2541  * @param msize size of the message (including header)
2542  */
2543 static void
2544 deliver_message (struct Neighbour *sender,
2545                  const struct GNUNET_MessageHeader *m, size_t msize)
2546 {
2547   struct Client *cpos;
2548   uint16_t type;
2549   unsigned int tpos;
2550   int deliver_full;
2551
2552   type = ntohs (m->type);
2553   cpos = clients;
2554   while (cpos != NULL)
2555     {
2556       deliver_full = GNUNET_NO;
2557       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2558         deliver_full = GNUNET_YES;
2559       else
2560         {
2561           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2562             {
2563               if (type != cpos->types[tpos])
2564                 continue;
2565               deliver_full = GNUNET_YES;
2566               break;
2567             }
2568         }
2569       if (GNUNET_YES == deliver_full)
2570         send_p2p_message_to_client (sender, cpos, m, msize);
2571       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2572         send_p2p_message_to_client (sender, cpos, m,
2573                                     sizeof (struct GNUNET_MessageHeader));
2574       cpos = cpos->next;
2575     }
2576 }
2577
2578
2579 /**
2580  * Align P2P message and then deliver to interested clients.
2581  *
2582  * @param sender who sent us the message?
2583  * @param buffer unaligned (!) buffer containing message
2584  * @param msize size of the message (including header)
2585  */
2586 static void
2587 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2588 {
2589   char abuf[msize];
2590
2591   /* TODO: call to statistics? */
2592   memcpy (abuf, buffer, msize);
2593   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2594 }
2595
2596
2597 /**
2598  * Deliver P2P messages to interested clients.
2599  *
2600  * @param sender who sent us the message?
2601  * @param buffer buffer containing messages, can be modified
2602  * @param buffer_size size of the buffer (overall)
2603  * @param offset offset where messages in the buffer start
2604  */
2605 static void
2606 deliver_messages (struct Neighbour *sender,
2607                   const char *buffer, size_t buffer_size, size_t offset)
2608 {
2609   struct GNUNET_MessageHeader *mhp;
2610   struct GNUNET_MessageHeader mh;
2611   uint16_t msize;
2612   int need_align;
2613
2614   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2615     {
2616       if (0 != offset % sizeof (uint16_t))
2617         {
2618           /* outch, need to copy to access header */
2619           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2620           mhp = &mh;
2621         }
2622       else
2623         {
2624           /* can access header directly */
2625           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2626         }
2627       msize = ntohs (mhp->size);
2628       if (msize + offset > buffer_size)
2629         {
2630           /* malformed message, header says it is larger than what
2631              would fit into the overall buffer */
2632           GNUNET_break_op (0);
2633           break;
2634         }
2635 #if HAVE_UNALIGNED_64_ACCESS
2636       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2637 #else
2638       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2639 #endif
2640       if (GNUNET_YES == need_align)
2641         align_and_deliver (sender, &buffer[offset], msize);
2642       else
2643         deliver_message (sender,
2644                          (const struct GNUNET_MessageHeader *)
2645                          &buffer[offset], msize);
2646       offset += msize;
2647     }
2648 }
2649
2650
2651 /**
2652  * We received an encrypted message.  Decrypt, validate and
2653  * pass on to the appropriate clients.
2654  */
2655 static void
2656 handle_encrypted_message (struct Neighbour *n,
2657                           const struct EncryptedMessage *m)
2658 {
2659   size_t size = ntohs (m->header.size);
2660   char buf[size];
2661   struct EncryptedMessage *pt;  /* plaintext */
2662   GNUNET_HashCode ph;
2663   size_t off;
2664   uint32_t snum;
2665   struct GNUNET_TIME_Absolute t;
2666
2667 #if DEBUG_CORE
2668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2669               "Core service receives `%s' request from `%4s'.\n",
2670               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2671 #endif  
2672   /* decrypt */
2673   if (GNUNET_OK !=
2674       do_decrypt (n,
2675                   &m->plaintext_hash,
2676                   &m->sequence_number,
2677                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2678     return;
2679   pt = (struct EncryptedMessage *) buf;
2680
2681   /* validate hash */
2682   GNUNET_CRYPTO_hash (&pt->sequence_number,
2683                       size - ENCRYPTED_HEADER_SIZE, &ph);
2684   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2685     {
2686       /* checksum failed */
2687       GNUNET_break_op (0);
2688       return;
2689     }
2690
2691   /* validate sequence number */
2692   snum = ntohl (pt->sequence_number);
2693   if (n->last_sequence_number_received == snum)
2694     {
2695       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2696                   "Received duplicate message, ignoring.\n");
2697       /* duplicate, ignore */
2698       return;
2699     }
2700   if ((n->last_sequence_number_received > snum) &&
2701       (n->last_sequence_number_received - snum > 32))
2702     {
2703       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2704                   "Received ancient out of sequence message, ignoring.\n");
2705       /* ancient out of sequence, ignore */
2706       return;
2707     }
2708   if (n->last_sequence_number_received > snum)
2709     {
2710       unsigned int rotbit =
2711         1 << (n->last_sequence_number_received - snum - 1);
2712       if ((n->last_packets_bitmap & rotbit) != 0)
2713         {
2714           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2715                       "Received duplicate message, ignoring.\n");
2716           /* duplicate, ignore */
2717           return;
2718         }
2719       n->last_packets_bitmap |= rotbit;
2720     }
2721   if (n->last_sequence_number_received < snum)
2722     {
2723       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2724       n->last_sequence_number_received = snum;
2725     }
2726
2727   /* check timestamp */
2728   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2729   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2730     {
2731       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2732                   _
2733                   ("Message received far too old (%llu ms). Content ignored.\n"),
2734                   GNUNET_TIME_absolute_get_duration (t).value);
2735       return;
2736     }
2737
2738   /* process decrypted message(s) */
2739   update_window (GNUNET_YES,
2740                  &n->available_send_window,
2741                  &n->last_asw_update,
2742                  n->bpm_out);
2743   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2744   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2745                            n->bpm_out_internal_limit);
2746   n->last_activity = GNUNET_TIME_absolute_get ();
2747   off = sizeof (struct EncryptedMessage);
2748   deliver_messages (n, buf, size, off);
2749 }
2750
2751
2752 /**
2753  * Function called by the transport for each received message.
2754  *
2755  * @param cls closure
2756  * @param latency estimated latency for communicating with the
2757  *             given peer
2758  * @param peer (claimed) identity of the other peer
2759  * @param message the message
2760  */
2761 static void
2762 handle_transport_receive (void *cls,
2763                           struct GNUNET_TIME_Relative latency,
2764                           const struct GNUNET_PeerIdentity *peer,
2765                           const struct GNUNET_MessageHeader *message)
2766 {
2767   struct Neighbour *n;
2768   struct GNUNET_TIME_Absolute now;
2769   int up;
2770   uint16_t type;
2771   uint16_t size;
2772
2773 #if DEBUG_CORE
2774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2775               "Received message of type %u from `%4s', demultiplexing.\n",
2776               ntohs (message->type), GNUNET_i2s (peer));
2777 #endif
2778   n = find_neighbour (peer);
2779   if (n == NULL)
2780     {
2781       GNUNET_break (0);
2782       return;
2783     }
2784   n->last_latency = latency;
2785   up = n->status == PEER_STATE_KEY_CONFIRMED;
2786   type = ntohs (message->type);
2787   size = ntohs (message->size);
2788   switch (type)
2789     {
2790     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2791       if (size != sizeof (struct SetKeyMessage))
2792         {
2793           GNUNET_break_op (0);
2794           return;
2795         }
2796       handle_set_key (n, (const struct SetKeyMessage *) message);
2797       break;
2798     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2799       if (size < sizeof (struct EncryptedMessage) +
2800           sizeof (struct GNUNET_MessageHeader))
2801         {
2802           GNUNET_break_op (0);
2803           return;
2804         }
2805       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2806           (n->status != PEER_STATE_KEY_CONFIRMED))
2807         {
2808           GNUNET_break_op (0);
2809           return;
2810         }
2811       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2812       break;
2813     case GNUNET_MESSAGE_TYPE_CORE_PING:
2814       if (size != sizeof (struct PingMessage))
2815         {
2816           GNUNET_break_op (0);
2817           return;
2818         }
2819       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2820           (n->status != PEER_STATE_KEY_CONFIRMED))
2821         {
2822 #if DEBUG_CORE
2823           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2824                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2825                       "PING", GNUNET_i2s (&n->peer));
2826 #endif
2827           GNUNET_free_non_null (n->pending_ping);
2828           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2829           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2830           return;
2831         }
2832       handle_ping (n, (const struct PingMessage *) message);
2833       break;
2834     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2835       if (size != sizeof (struct PingMessage))
2836         {
2837           GNUNET_break_op (0);
2838           return;
2839         }
2840       if ((n->status != PEER_STATE_KEY_SENT) &&
2841           (n->status != PEER_STATE_KEY_RECEIVED) &&
2842           (n->status != PEER_STATE_KEY_CONFIRMED))
2843         {
2844           /* could not decrypt pong, oops! */
2845           GNUNET_break_op (0);
2846           return;
2847         }
2848       handle_pong (n, (const struct PingMessage *) message);
2849       break;
2850     default:
2851       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2852                   _("Unsupported message of type %u received.\n"), type);
2853       return;
2854     }
2855   if (n->status == PEER_STATE_KEY_CONFIRMED)
2856     {
2857       now = GNUNET_TIME_absolute_get ();
2858       n->last_activity = now;
2859       if (!up)
2860         n->time_established = now;
2861     }
2862 }
2863
2864
2865 /**
2866  * Function that recalculates the bandwidth quota for the
2867  * given neighbour and transmits it to the transport service.
2868  * 
2869  * @param cls neighbour for the quota update
2870  * @param tc context
2871  */
2872 static void
2873 neighbour_quota_update (void *cls,
2874                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2875
2876
2877 /**
2878  * Schedule the task that will recalculate the bandwidth
2879  * quota for this peer (and possibly force a disconnect of
2880  * idle peers by calculating a bandwidth of zero).
2881  */
2882 static void
2883 schedule_quota_update (struct Neighbour *n)
2884 {
2885   GNUNET_assert (n->quota_update_task ==
2886                  GNUNET_SCHEDULER_NO_TASK);
2887   n->quota_update_task
2888     = GNUNET_SCHEDULER_add_delayed (sched,
2889                                     GNUNET_NO,
2890                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
2891                                     GNUNET_SCHEDULER_NO_TASK,
2892                                     QUOTA_UPDATE_FREQUENCY,
2893                                     &neighbour_quota_update,
2894                                     n);
2895 }
2896
2897
2898 /**
2899  * Function that recalculates the bandwidth quota for the
2900  * given neighbour and transmits it to the transport service.
2901  * 
2902  * @param cls neighbour for the quota update
2903  * @param tc context
2904  */
2905 static void
2906 neighbour_quota_update (void *cls,
2907                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2908 {
2909   struct Neighbour *n = cls;
2910   uint32_t q_in;
2911   double pref_rel;
2912   double share;
2913   unsigned long long distributable;
2914   
2915   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2916   /* calculate relative preference among all neighbours;
2917      divides by a bit more to avoid division by zero AND to
2918      account for possibility of new neighbours joining any time 
2919      AND to convert to double... */
2920   pref_rel = n->current_preference / (1.0 + preference_sum);
2921   share = 0;
2922   distributable = 0;
2923   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2924     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2925   share = distributable * pref_rel;
2926   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2927   /* check if we want to disconnect for good due to inactivity */
2928   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2929        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2930     q_in = 0; /* force disconnect */
2931   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2932        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2933     {
2934       n->bpm_in = q_in;
2935       GNUNET_TRANSPORT_set_quota (transport,
2936                                   &n->peer,
2937                                   n->bpm_in, 
2938                                   n->bpm_out,
2939                                   GNUNET_TIME_UNIT_FOREVER_REL,
2940                                   NULL, NULL);
2941     }
2942   schedule_quota_update (n);
2943 }
2944
2945
2946 /**
2947  * Function called by transport to notify us that
2948  * a peer connected to us (on the network level).
2949  *
2950  * @param cls closure
2951  * @param peer the peer that connected
2952  * @param latency current latency of the connection
2953  */
2954 static void
2955 handle_transport_notify_connect (void *cls,
2956                                  const struct GNUNET_PeerIdentity *peer,
2957                                  struct GNUNET_TIME_Relative latency)
2958 {
2959   struct Neighbour *n;
2960   struct GNUNET_TIME_Absolute now;
2961   struct ConnectNotifyMessage cnm;
2962
2963   n = find_neighbour (peer);
2964   if (n != NULL)
2965     {
2966       /* duplicate connect notification!? */
2967       GNUNET_break (0);
2968       return;
2969     }
2970   now = GNUNET_TIME_absolute_get ();
2971   n = GNUNET_malloc (sizeof (struct Neighbour));
2972   n->next = neighbours;
2973   neighbours = n;
2974   neighbour_count++;
2975   n->peer = *peer;
2976   n->last_latency = latency;
2977   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2978   n->encrypt_key_created = now;
2979   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2980   n->last_asw_update = now;
2981   n->last_arw_update = now;
2982   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2983   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2984   n->bpm_out_internal_limit = (uint32_t) - 1;
2985   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2986   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2987                                                 (uint32_t) - 1);
2988 #if DEBUG_CORE
2989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2990               "Received connection from `%4s'.\n",
2991               GNUNET_i2s (&n->peer));
2992 #endif
2993   schedule_quota_update (n);
2994   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2995   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2996   cnm.reserved = htonl (0);
2997   cnm.peer = *peer;
2998   send_to_all_clients (&cnm.header, GNUNET_YES);
2999 }
3000
3001
3002 /**
3003  * Free the given entry for the neighbour (it has
3004  * already been removed from the list at this point).
3005  *
3006  * @param n neighbour to free
3007  */
3008 static void
3009 free_neighbour (struct Neighbour *n)
3010 {
3011   struct MessageEntry *m;
3012
3013   while (NULL != (m = n->messages))
3014     {
3015       n->messages = m->next;
3016       GNUNET_free (m);
3017     }
3018   while (NULL != (m = n->encrypted_head))
3019     {
3020       n->encrypted_head = m->next;
3021       GNUNET_free (m);
3022     }
3023   if (NULL != n->th)
3024     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3025   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3026     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3027   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3028     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3029   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3030     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3031   GNUNET_free_non_null (n->public_key);
3032   GNUNET_free_non_null (n->pending_ping);
3033   GNUNET_free (n);
3034 }
3035
3036
3037 /**
3038  * Function called by transport telling us that a peer
3039  * disconnected.
3040  *
3041  * @param cls closure
3042  * @param peer the peer that disconnected
3043  */
3044 static void
3045 handle_transport_notify_disconnect (void *cls,
3046                                     const struct GNUNET_PeerIdentity *peer)
3047 {
3048   struct ConnectNotifyMessage cnm;
3049   struct Neighbour *n;
3050   struct Neighbour *p;
3051
3052 #if DEBUG_CORE
3053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3054               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3055 #endif
3056   p = NULL;
3057   n = neighbours;
3058   while ((n != NULL) &&
3059          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3060     {
3061       p = n;
3062       n = n->next;
3063     }
3064   if (n == NULL)
3065     {
3066       GNUNET_break (0);
3067       return;
3068     }
3069   if (p == NULL)
3070     neighbours = n->next;
3071   else
3072     p->next = n->next;
3073   GNUNET_assert (neighbour_count > 0);
3074   neighbour_count--;
3075   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3076   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3077   cnm.reserved = htonl (0);
3078   cnm.peer = *peer;
3079   send_to_all_clients (&cnm.header, GNUNET_YES);
3080   free_neighbour (n);
3081 }
3082
3083
3084 /**
3085  * Last task run during shutdown.  Disconnects us from
3086  * the transport.
3087  */
3088 static void
3089 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3090 {
3091   struct Neighbour *n;
3092   struct Client *c;
3093
3094 #if DEBUG_CORE
3095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3096               "Core service shutting down.\n");
3097 #endif
3098   GNUNET_assert (transport != NULL);
3099   GNUNET_TRANSPORT_disconnect (transport);
3100   transport = NULL;
3101   while (NULL != (n = neighbours))
3102     {
3103       neighbours = n->next;
3104       GNUNET_assert (neighbour_count > 0);
3105       neighbour_count--;
3106       free_neighbour (n);
3107     }
3108   while (NULL != (c = clients))
3109     handle_client_disconnect (NULL, c->client_handle);
3110 }
3111
3112
3113 /**
3114  * Initiate core service.
3115  *
3116  * @param cls closure
3117  * @param s scheduler to use
3118  * @param serv the initialized server
3119  * @param c configuration to use
3120  */
3121 static void
3122 run (void *cls,
3123      struct GNUNET_SCHEDULER_Handle *s,
3124      struct GNUNET_SERVER_Handle *serv,
3125      const struct GNUNET_CONFIGURATION_Handle *c)
3126 {
3127 #if 0
3128   unsigned long long qin;
3129   unsigned long long qout;
3130   unsigned long long tneigh;
3131 #endif
3132   char *keyfile;
3133
3134   sched = s;
3135   cfg = c;
3136   /* parse configuration */
3137   if (
3138        (GNUNET_OK !=
3139         GNUNET_CONFIGURATION_get_value_number (c,
3140                                                "CORE",
3141                                                "TOTAL_QUOTA_IN",
3142                                                &bandwidth_target_in)) ||
3143        (GNUNET_OK !=
3144         GNUNET_CONFIGURATION_get_value_number (c,
3145                                                "CORE",
3146                                                "TOTAL_QUOTA_OUT",
3147                                                &bandwidth_target_out)) ||
3148 #if 0
3149        (GNUNET_OK !=
3150         GNUNET_CONFIGURATION_get_value_number (c,
3151                                                "CORE",
3152                                                "YY",
3153                                                &qout)) ||
3154        (GNUNET_OK !=
3155         GNUNET_CONFIGURATION_get_value_number (c,
3156                                                "CORE",
3157                                                "ZZ_LIMIT", &tneigh)) ||
3158 #endif
3159        (GNUNET_OK !=
3160         GNUNET_CONFIGURATION_get_value_filename (c,
3161                                                  "GNUNETD",
3162                                                  "HOSTKEY", &keyfile)))
3163     {
3164       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3165                   _
3166                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3167       GNUNET_SCHEDULER_shutdown (s);
3168       return;
3169     }
3170   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3171   GNUNET_free (keyfile);
3172   if (my_private_key == NULL)
3173     {
3174       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3175                   _("Core service could not access hostkey.  Exiting.\n"));
3176       GNUNET_SCHEDULER_shutdown (s);
3177       return;
3178     }
3179   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3180   GNUNET_CRYPTO_hash (&my_public_key,
3181                       sizeof (my_public_key), &my_identity.hashPubKey);
3182   /* setup notification */
3183   server = serv;
3184   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3185   /* setup transport connection */
3186   transport = GNUNET_TRANSPORT_connect (sched,
3187                                         cfg,
3188                                         NULL,
3189                                         &handle_transport_receive,
3190                                         &handle_transport_notify_connect,
3191                                         &handle_transport_notify_disconnect);
3192   GNUNET_assert (NULL != transport);
3193   GNUNET_SCHEDULER_add_delayed (sched,
3194                                 GNUNET_YES,
3195                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3196                                 GNUNET_SCHEDULER_NO_TASK,
3197                                 GNUNET_TIME_UNIT_FOREVER_REL,
3198                                 &cleaning_task, NULL);
3199   /* process client requests */
3200   GNUNET_SERVER_add_handlers (server, handlers);
3201   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3202               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3203 }
3204
3205
3206 /**
3207  * Function called during shutdown.  Clean up our state.
3208  */
3209 static void
3210 cleanup (void *cls, 
3211          const struct GNUNET_CONFIGURATION_Handle *cfg)
3212 {
3213   if (my_private_key != NULL)
3214     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3215 }
3216
3217
3218 /**
3219  * The main function for the transport service.
3220  *
3221  * @param argc number of arguments from the command line
3222  * @param argv command line arguments
3223  * @return 0 ok, 1 on error
3224  */
3225 int
3226 main (int argc, char *const *argv)
3227 {
3228   return (GNUNET_OK ==
3229           GNUNET_SERVICE_run (argc,
3230                               argv,
3231                               "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
3232 }
3233
3234 /* end of gnunet-service-core.c */