bugfixes and redesigning scheduler API
[oweals/gnunet.git] / src / core / gnunet-service-core.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core.c
23  * @brief high-level P2P messaging
24  * @author Christian Grothoff
25  *
26  * Considerations for later:
27  * - check that hostkey used by transport (for HELLOs) is the
28  *   same as the hostkey that we are using!
29  * - add code to send PINGs if we are about to time-out otherwise
30  * - optimize lookup (many O(n) list traversals
31  *   could ideally be changed to O(1) hash map lookups)
32  */
33 #include "platform.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_util_lib.h"
36 #include "gnunet_hello_lib.h"
37 #include "gnunet_peerinfo_service.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_transport_service.h"
41 #include "core.h"
42
43
44 /**
45  * Receive and send buffer windows grow over time.  For
46  * how long can 'unused' bandwidth accumulate before we
47  * need to cap it?  (specified in ms).
48  */
49 #define MAX_WINDOW_TIME (5 * 60 * 1000)
50
51 /**
52  * Minimum of bytes per minute (out) to assign to any connected peer.
53  * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
54  * sense.
55  */
56 #define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
57
58 /**
59  * What is the smallest change (in number of bytes per minute)
60  * that we consider significant enough to bother triggering?
61  */
62 #define MIN_BPM_CHANGE 32
63
64 /**
65  * After how much time past the "official" expiration time do
66  * we discard messages?  Should not be zero since we may 
67  * intentionally defer transmission until close to the deadline
68  * and then may be slightly past the deadline due to inaccuracy
69  * in sleep and our own CPU consumption.
70  */
71 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
72
73 /**
74  * What is the maximum delay for a SET_KEY message?
75  */
76 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
77
78 /**
79  * What how long do we wait for SET_KEY confirmation initially?
80  */
81 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
82
83 /**
84  * What is the maximum delay for a PING message?
85  */
86 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
87
88 /**
89  * What is the maximum delay for a PONG message?
90  */
91 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
92
93 /**
94  * How often do we recalculate bandwidth quotas?
95  */
96 #define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
97
98 /**
99  * What is the priority for a SET_KEY message?
100  */
101 #define SET_KEY_PRIORITY 0xFFFFFF
102
103 /**
104  * What is the priority for a PING message?
105  */
106 #define PING_PRIORITY 0xFFFFFF
107
108 /**
109  * What is the priority for a PONG message?
110  */
111 #define PONG_PRIORITY 0xFFFFFF
112
113 /**
114  * How many messages do we queue per peer at most?
115  */
116 #define MAX_PEER_QUEUE_SIZE 16
117
118 /**
119  * How many non-mandatory messages do we queue per client at most?
120  */
121 #define MAX_CLIENT_QUEUE_SIZE 32
122
123 /**
124  * What is the maximum age of a message for us to consider
125  * processing it?  Note that this looks at the timestamp used
126  * by the other peer, so clock skew between machines does
127  * come into play here.  So this should be picked high enough
128  * so that a little bit of clock skew does not prevent peers
129  * from connecting to us.
130  */
131 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
132
133 /**
134  * What is the maximum size for encrypted messages?  Note that this
135  * number imposes a clear limit on the maximum size of any message.
136  * Set to a value close to 64k but not so close that transports will
137  * have trouble with their headers.
138  */
139 #define MAX_ENCRYPTED_MESSAGE_SIZE (63 * 1024)
140
141
142 /**
143  * State machine for our P2P encryption handshake.  Everyone starts in
144  * "DOWN", if we receive the other peer's key (other peer initiated)
145  * we start in state RECEIVED (since we will immediately send our
146  * own); otherwise we start in SENT.  If we get back a PONG from
147  * within either state, we move up to CONFIRMED (the PONG will always
148  * be sent back encrypted with the key we sent to the other peer).
149  */
150 enum PeerStateMachine
151 {
152   PEER_STATE_DOWN,
153   PEER_STATE_KEY_SENT,
154   PEER_STATE_KEY_RECEIVED,
155   PEER_STATE_KEY_CONFIRMED
156 };
157
158
159 /**
160  * Number of bytes (at the beginning) of "struct EncryptedMessage"
161  * that are NOT encrypted.
162  */
163 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
164
165
166 /**
167  * Encapsulation for encrypted messages exchanged between
168  * peers.  Followed by the actual encrypted data.
169  */
170 struct EncryptedMessage
171 {
172   /**
173    * Message type is either CORE_ENCRYPTED_MESSAGE.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the plaintext, used to verify message integrity;
184    * ALSO used as the IV for the symmetric cipher!  Everything
185    * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
186    * must be set to the offset of the next field.
187    */
188   GNUNET_HashCode plaintext_hash;
189
190   /**
191    * Sequence number, in network byte order.  This field
192    * must be the first encrypted/decrypted field and the
193    * first byte that is hashed for the plaintext hash.
194    */
195   uint32_t sequence_number GNUNET_PACKED;
196
197   /**
198    * Desired bandwidth (how much we should send to this
199    * peer / how much is the sender willing to receive),
200    * in bytes per minute.
201    */
202   uint32_t inbound_bpm_limit GNUNET_PACKED;
203
204   /**
205    * Timestamp.  Used to prevent reply of ancient messages
206    * (recent messages are caught with the sequence number).
207    */
208   struct GNUNET_TIME_AbsoluteNBO timestamp;
209
210 };
211
212 /**
213  * We're sending an (encrypted) PING to the other peer to check if he
214  * can decrypt.  The other peer should respond with a PONG with the
215  * same content, except this time encrypted with the receiver's key.
216  */
217 struct PingMessage
218 {
219   /**
220    * Message type is either CORE_PING or CORE_PONG.
221    */
222   struct GNUNET_MessageHeader header;
223
224   /**
225    * Random number chosen to make reply harder.
226    */
227   uint32_t challenge GNUNET_PACKED;
228
229   /**
230    * Intended target of the PING, used primarily to check
231    * that decryption actually worked.
232    */
233   struct GNUNET_PeerIdentity target;
234 };
235
236
237 /**
238  * Message transmitted to set (or update) a session key.
239  */
240 struct SetKeyMessage
241 {
242
243   /**
244    * Message type is either CORE_SET_KEY.
245    */
246   struct GNUNET_MessageHeader header;
247
248   /**
249    * Status of the sender (should be in "enum PeerStateMachine"), nbo.
250    */
251   int32_t sender_status GNUNET_PACKED;
252
253   /**
254    * Purpose of the signature, will be
255    * GNUNET_SIGNATURE_PURPOSE_SET_KEY.
256    */
257   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
258
259   /**
260    * At what time was this key created?
261    */
262   struct GNUNET_TIME_AbsoluteNBO creation_time;
263
264   /**
265    * The encrypted session key.
266    */
267   struct GNUNET_CRYPTO_RsaEncryptedData encrypted_key;
268
269   /**
270    * Who is the intended recipient?
271    */
272   struct GNUNET_PeerIdentity target;
273
274   /**
275    * Signature of the stuff above (starting at purpose).
276    */
277   struct GNUNET_CRYPTO_RsaSignature signature;
278
279 };
280
281
282 /**
283  * Message waiting for transmission. This struct
284  * is followed by the actual content of the message.
285  */
286 struct MessageEntry
287 {
288
289   /**
290    * We keep messages in a linked list (for now).
291    */
292   struct MessageEntry *next;
293
294   /**
295    * By when are we supposed to transmit this message?
296    */
297   struct GNUNET_TIME_Absolute deadline;
298
299   /**
300    * How important is this message to us?
301    */
302   unsigned int priority;
303
304   /**
305    * How long is the message? (number of bytes following
306    * the "struct MessageEntry", but not including the
307    * size of "struct MessageEntry" itself!)
308    */
309   uint16_t size;
310
311   /**
312    * Was this message selected for transmission in the
313    * current round? GNUNET_YES or GNUNET_NO.
314    */
315   int16_t do_transmit;
316
317 };
318
319
320 struct Neighbour
321 {
322   /**
323    * We keep neighbours in a linked list (for now).
324    */
325   struct Neighbour *next;
326
327   /**
328    * Unencrypted messages destined for this peer.
329    */
330   struct MessageEntry *messages;
331
332   /**
333    * Head of the batched, encrypted message queue (already ordered,
334    * transmit starting with the head).
335    */
336   struct MessageEntry *encrypted_head;
337
338   /**
339    * Tail of the batched, encrypted message queue (already ordered,
340    * append new messages to tail)
341    */
342   struct MessageEntry *encrypted_tail;
343
344   /**
345    * Handle for pending requests for transmission to this peer
346    * with the transport service.  NULL if no request is pending.
347    */
348   struct GNUNET_TRANSPORT_TransmitHandle *th;
349
350   /**
351    * Public key of the neighbour, NULL if we don't have it yet.
352    */
353   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *public_key;
354
355   /**
356    * We received a PING message before we got the "public_key"
357    * (or the SET_KEY).  We keep it here until we have a key
358    * to decrypt it.  NULL if no PING is pending.
359    */
360   struct PingMessage *pending_ping;
361
362   /**
363    * Identity of the neighbour.
364    */
365   struct GNUNET_PeerIdentity peer;
366
367   /**
368    * Key we use to encrypt our messages for the other peer
369    * (initialized by us when we do the handshake).
370    */
371   struct GNUNET_CRYPTO_AesSessionKey encrypt_key;
372
373   /**
374    * Key we use to decrypt messages from the other peer
375    * (given to us by the other peer during the handshake).
376    */
377   struct GNUNET_CRYPTO_AesSessionKey decrypt_key;
378
379   /**
380    * ID of task used for re-trying plaintext scheduling.
381    */
382   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
383
384   /**
385    * ID of task used for re-trying SET_KEY and PING message.
386    */
387   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
388
389   /**
390    * ID of task used for updating bandwidth quota for this neighbour.
391    */
392   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
393
394   /**
395    * At what time did we generate our encryption key?
396    */
397   struct GNUNET_TIME_Absolute encrypt_key_created;
398
399   /**
400    * At what time did the other peer generate the decryption key?
401    */
402   struct GNUNET_TIME_Absolute decrypt_key_created;
403
404   /**
405    * At what time did we initially establish (as in, complete session
406    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
407    */
408   struct GNUNET_TIME_Absolute time_established;
409
410   /**
411    * At what time did we last receive an encrypted message from the
412    * other peer?  Should be zero if status != KEY_CONFIRMED.
413    */
414   struct GNUNET_TIME_Absolute last_activity;
415
416   /**
417    * Last latency observed from this peer.
418    */
419   struct GNUNET_TIME_Relative last_latency;
420
421   /**
422    * At what frequency are we currently re-trying SET_KEY messages?
423    */
424   struct GNUNET_TIME_Relative set_key_retry_frequency;
425
426   /**
427    * Time of our last update to the "available_send_window".
428    */
429   struct GNUNET_TIME_Absolute last_asw_update;
430
431   /**
432    * Time of our last update to the "available_recv_window".
433    */
434   struct GNUNET_TIME_Absolute last_arw_update;
435
436   /**
437    * Number of bytes that we are eligible to transmit to this
438    * peer at this point.  Incremented every minute by max_out_bpm,
439    * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
440    * bandwidth-hogs are sampled at a frequency of about 78s!);
441    * may get negative if we have VERY high priority content.
442    */
443   long long available_send_window; 
444
445   /**
446    * How much downstream capacity of this peer has been reserved for
447    * our traffic?  (Our clients can request that a certain amount of
448    * bandwidth is available for replies to them; this value is used to
449    * make sure that this reserved amount of bandwidth is actually
450    * available).
451    */
452   long long available_recv_window; 
453
454   /**
455    * How valueable were the messages of this peer recently?
456    */
457   unsigned long long current_preference;
458
459   /**
460    * Bit map indicating which of the 32 sequence numbers before the last
461    * were received (good for accepting out-of-order packets and
462    * estimating reliability of the connection)
463    */
464   unsigned int last_packets_bitmap;
465
466   /**
467    * Number of messages in the message queue for this peer.
468    */
469   unsigned int message_queue_size;
470
471   /**
472    * last sequence number received on this connection (highest)
473    */
474   uint32_t last_sequence_number_received;
475
476   /**
477    * last sequence number transmitted
478    */
479   uint32_t last_sequence_number_sent;
480
481   /**
482    * Available bandwidth in for this peer (current target).
483    */
484   uint32_t bpm_in;
485
486   /**
487    * Available bandwidth out for this peer (current target).
488    */
489   uint32_t bpm_out;
490
491   /**
492    * Internal bandwidth limit set for this peer (initially
493    * typically set to "-1").  "bpm_out" is MAX of
494    * "bpm_out_internal_limit" and "bpm_out_external_limit".
495    */
496   uint32_t bpm_out_internal_limit;
497
498   /**
499    * External bandwidth limit set for this peer by the
500    * peer that we are communicating with.  "bpm_out" is MAX of
501    * "bpm_out_internal_limit" and "bpm_out_external_limit".
502    */
503   uint32_t bpm_out_external_limit;
504
505   /**
506    * What was our PING challenge number (for this peer)?
507    */
508   uint32_t ping_challenge;
509
510   /**
511    * What is our connection status?
512    */
513   enum PeerStateMachine status;
514
515 };
516
517
518 /**
519  * Events are messages for clients.  The struct
520  * itself is followed by the actual message.
521  */
522 struct Event
523 {
524   /**
525    * This is a linked list.
526    */
527   struct Event *next;
528
529   /**
530    * Size of the message.
531    */
532   size_t size;
533
534   /**
535    * Could this event be dropped if this queue
536    * is getting too large? (NOT YET USED!)
537    */
538   int can_drop;
539
540 };
541
542
543 /**
544  * Data structure for each client connected to the core service.
545  */
546 struct Client
547 {
548   /**
549    * Clients are kept in a linked list.
550    */
551   struct Client *next;
552
553   /**
554    * Handle for the client with the server API.
555    */
556   struct GNUNET_SERVER_Client *client_handle;
557
558   /**
559    * Linked list of messages we still need to deliver to
560    * this client.
561    */
562   struct Event *event_head;
563
564   /**
565    * Tail of the linked list of events.
566    */
567   struct Event *event_tail;
568
569   /**
570    * Current transmit handle, NULL if no transmission request
571    * is pending.
572    */
573   struct GNUNET_CONNECTION_TransmitHandle *th;
574
575   /**
576    * Array of the types of messages this peer cares
577    * about (with "tcnt" entries).  Allocated as part
578    * of this client struct, do not free!
579    */
580   uint16_t *types;
581
582   /**
583    * Options for messages this client cares about,
584    * see GNUNET_CORE_OPTION_ values.
585    */
586   uint32_t options;
587
588   /**
589    * Number of types of incoming messages this client
590    * specifically cares about.  Size of the "types" array.
591    */
592   unsigned int tcnt;
593
594 };
595
596
597 /**
598  * Our public key.
599  */
600 static struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded my_public_key;
601
602 /**
603  * Our identity.
604  */
605 static struct GNUNET_PeerIdentity my_identity;
606
607 /**
608  * Our private key.
609  */
610 static struct GNUNET_CRYPTO_RsaPrivateKey *my_private_key;
611
612 /**
613  * Our scheduler.
614  */
615 struct GNUNET_SCHEDULER_Handle *sched;
616
617 /**
618  * Our configuration.
619  */
620 const struct GNUNET_CONFIGURATION_Handle *cfg;
621
622 /**
623  * Our server.
624  */
625 static struct GNUNET_SERVER_Handle *server;
626
627 /**
628  * Transport service.
629  */
630 static struct GNUNET_TRANSPORT_Handle *transport;
631
632 /**
633  * Linked list of our clients.
634  */
635 static struct Client *clients;
636
637 /**
638  * We keep neighbours in a linked list (for now).
639  */
640 static struct Neighbour *neighbours;
641
642 /**
643  * Sum of all preferences among all neighbours.
644  */
645 static unsigned long long preference_sum;
646
647 /**
648  * Total number of neighbours we have.
649  */
650 static unsigned int neighbour_count;
651
652 /**
653  * How much inbound bandwidth are we supposed to be using?
654  */
655 static unsigned long long bandwidth_target_in;
656
657 /**
658  * How much outbound bandwidth are we supposed to be using?
659  */
660 static unsigned long long bandwidth_target_out;
661
662
663
664 /**
665  * A preference value for a neighbour was update.  Update
666  * the preference sum accordingly.
667  *
668  * @param inc how much was a preference value increased?
669  */
670 static void
671 update_preference_sum (unsigned long long inc)
672 {
673   struct Neighbour *n;
674   unsigned long long os;
675
676   os = preference_sum;
677   preference_sum += inc;
678   if (preference_sum >= os)
679     return; /* done! */
680   /* overflow! compensate by cutting all values in half! */
681   preference_sum = 0;
682   n = neighbours;
683   while (n != NULL)
684     {
685       n->current_preference /= 2;
686       preference_sum += n->current_preference;
687       n = n->next;
688     }    
689 }
690
691
692 /**
693  * Recalculate the number of bytes we expect to
694  * receive or transmit in a given window.
695  *
696  * @param force force an update now (even if not much time has passed)
697  * @param window pointer to the byte counter (updated)
698  * @param ts pointer to the timestamp (updated)
699  * @param bpm number of bytes per minute that should
700  *        be added to the window.
701  */
702 static void
703 update_window (int force,
704                long long *window,
705                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
706 {
707   struct GNUNET_TIME_Relative since;
708
709   since = GNUNET_TIME_absolute_get_duration (*ts);
710   if ( (force == GNUNET_NO) &&
711        (since.value < 60 * 1000) )
712     return;                     /* not even a minute has passed */
713   *ts = GNUNET_TIME_absolute_get ();
714   *window += (bpm * since.value) / 60 / 1000;
715   if (*window > MAX_WINDOW_TIME * bpm)
716     *window = MAX_WINDOW_TIME * bpm;
717 }
718
719
720 /**
721  * Find the entry for the given neighbour.
722  *
723  * @param peer identity of the neighbour
724  * @return NULL if we are not connected, otherwise the
725  *         neighbour's entry.
726  */
727 static struct Neighbour *
728 find_neighbour (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct Neighbour *ret;
731
732   ret = neighbours;
733   while ((ret != NULL) &&
734          (0 != memcmp (&ret->peer,
735                        peer, sizeof (struct GNUNET_PeerIdentity))))
736     ret = ret->next;
737   return ret;
738 }
739
740
741 /**
742  * Find the entry for the given client.
743  *
744  * @param client handle for the client
745  * @return NULL if we are not connected, otherwise the
746  *         client's struct.
747  */
748 static struct Client *
749 find_client (const struct GNUNET_SERVER_Client *client)
750 {
751   struct Client *ret;
752
753   ret = clients;
754   while ((ret != NULL) && (client != ret->client_handle))
755     ret = ret->next;
756   return ret;
757 }
758
759
760 /**
761  * If necessary, initiate a request with the server to
762  * transmit messages from the queue of the given client.
763  * @param client who to transfer messages to
764  */
765 static void request_transmit (struct Client *client);
766
767
768 /**
769  * Client is ready to receive data, provide it.
770  *
771  * @param cls closure
772  * @param size number of bytes available in buf
773  * @param buf where the callee should write the message
774  * @return number of bytes written to buf
775  */
776 static size_t
777 do_client_transmit (void *cls, size_t size, void *buf)
778 {
779   struct Client *client = cls;
780   struct Event *e;
781   char *tgt;
782   size_t ret;
783
784   client->th = NULL;
785 #if DEBUG_CORE_CLIENT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Client ready to receive %u bytes.\n", size);
788 #endif
789   if (buf == NULL)
790     {
791 #if DEBUG_CORE
792       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                   "Failed to transmit data to client (disconnect)?\n");
794 #endif
795       return 0;                 /* we'll surely get a disconnect soon... */
796     }
797   tgt = buf;
798   ret = 0;
799   while ((NULL != (e = client->event_head)) && (e->size <= size))
800     {
801       memcpy (&tgt[ret], &e[1], e->size);
802       size -= e->size;
803       ret += e->size;
804       client->event_head = e->next;
805       GNUNET_free (e);
806     }
807   GNUNET_assert (ret > 0);
808   if (client->event_head == NULL)
809     client->event_tail = NULL;
810 #if DEBUG_CORE_CLIENT
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Transmitting %u bytes to client\n", ret);
813 #endif
814   request_transmit (client);
815   return ret;
816 }
817
818
819 /**
820  * If necessary, initiate a request with the server to
821  * transmit messages from the queue of the given client.
822  * @param client who to transfer messages to
823  */
824 static void
825 request_transmit (struct Client *client)
826 {
827
828   if (NULL != client->th)
829     return;                     /* already pending */
830   if (NULL == client->event_head)
831     return;                     /* no more events pending */
832 #if DEBUG_CORE_CLIENT
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834               "Asking server to transmit %u bytes to client\n",
835               client->event_head->size);
836 #endif
837   client->th
838     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
839                                            client->event_head->size,
840                                            GNUNET_TIME_UNIT_FOREVER_REL,
841                                            &do_client_transmit, client);
842 }
843
844
845 /**
846  * Send a message to one of our clients.
847  * @param client target for the message
848  * @param msg message to transmit
849  * @param can_drop could this message be dropped if the
850  *        client's queue is getting too large?
851  */
852 static void
853 send_to_client (struct Client *client,
854                 const struct GNUNET_MessageHeader *msg, int can_drop)
855 {
856   struct Event *e;
857   unsigned int queue_size;
858   uint16_t msize;
859
860 #if DEBUG_CORE_CLIENT
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Preparing to send message of type %u to client.\n",
863               ntohs (msg->type));
864 #endif
865   queue_size = 0;
866   e = client->event_head;
867   while (e != NULL)
868     {
869       queue_size++;
870       e = e->next;
871     }
872   if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
873        (can_drop == GNUNET_YES) )
874     {
875 #if DEBUG_CORE
876       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
877                   "Too many messages in queue for the client, dropping the new message.\n");
878 #endif
879       return;
880     }
881
882   msize = ntohs (msg->size);
883   e = GNUNET_malloc (sizeof (struct Event) + msize);
884   /* append */
885   if (client->event_tail != NULL)
886     client->event_tail->next = e;
887   else
888     client->event_head = e;
889   client->event_tail = e;
890   e->can_drop = can_drop;
891   e->size = msize;
892   memcpy (&e[1], msg, msize);
893   request_transmit (client);
894 }
895
896
897 /**
898  * Send a message to all of our current clients.
899  */
900 static void
901 send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
902 {
903   struct Client *c;
904
905   c = clients;
906   while (c != NULL)
907     {
908       send_to_client (c, msg, can_drop);
909       c = c->next;
910     }
911 }
912
913
914 /**
915  * Handle CORE_INIT request.
916  */
917 static void
918 handle_client_init (void *cls,
919                     struct GNUNET_SERVER_Client *client,
920                     const struct GNUNET_MessageHeader *message)
921 {
922   const struct InitMessage *im;
923   struct InitReplyMessage irm;
924   struct Client *c;
925   uint16_t msize;
926   const uint16_t *types;
927   struct Neighbour *n;
928   struct ConnectNotifyMessage cnm;
929
930 #if DEBUG_CORE_CLIENT
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Client connecting to core service with `%s' message\n",
933               "INIT");
934 #endif
935   /* check that we don't have an entry already */
936   c = clients;
937   while (c != NULL)
938     {
939       if (client == c->client_handle)
940         {
941           GNUNET_break (0);
942           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
943           return;
944         }
945       c = c->next;
946     }
947   msize = ntohs (message->size);
948   if (msize < sizeof (struct InitMessage))
949     {
950       GNUNET_break (0);
951       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
952       return;
953     }
954   im = (const struct InitMessage *) message;
955   types = (const uint16_t *) &im[1];
956   msize -= sizeof (struct InitMessage);
957   c = GNUNET_malloc (sizeof (struct Client) + msize);
958   c->client_handle = client;
959   c->next = clients;
960   clients = c;
961   memcpy (&c[1], types, msize);
962   c->types = (uint16_t *) & c[1];
963   c->options = ntohl (im->options);
964   c->tcnt = msize / sizeof (uint16_t);
965   /* send init reply message */
966   irm.header.size = htons (sizeof (struct InitReplyMessage));
967   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
968   irm.reserved = htonl (0);
969   memcpy (&irm.publicKey,
970           &my_public_key,
971           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
972 #if DEBUG_CORE_CLIENT
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Sending `%s' message to client.\n", "INIT_REPLY");
975 #endif
976   send_to_client (c, &irm.header, GNUNET_NO);
977   /* notify new client about existing neighbours */
978   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
979   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
980   n = neighbours;
981   while (n != NULL)
982     {
983 #if DEBUG_CORE_CLIENT
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
986 #endif
987       cnm.reserved = htonl (0);
988       cnm.peer = n->peer;
989       send_to_client (c, &cnm.header, GNUNET_NO);
990       n = n->next;
991     }
992   GNUNET_SERVER_receive_done (client, GNUNET_OK);
993 }
994
995
996 /**
997  * A client disconnected, clean up.
998  *
999  * @param cls closure
1000  * @param client identification of the client
1001  */
1002 static void
1003 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1004 {
1005   struct Client *pos;
1006   struct Client *prev;
1007   struct Event *e;
1008
1009 #if DEBUG_CORE_CLIENT
1010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011               "Client has disconnected from core service.\n");
1012 #endif
1013   prev = NULL;
1014   pos = clients;
1015   while (pos != NULL)
1016     {
1017       if (client == pos->client_handle)
1018         {
1019           if (prev == NULL)
1020             clients = pos->next;
1021           else
1022             prev->next = pos->next;
1023           if (pos->th != NULL)
1024             GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1025           while (NULL != (e = pos->event_head))
1026             {
1027               pos->event_head = e->next;
1028               GNUNET_free (e);
1029             }
1030           GNUNET_free (pos);
1031           return;
1032         }
1033       prev = pos;
1034       pos = pos->next;
1035     }
1036   /* client never sent INIT */
1037 }
1038
1039
1040 /**
1041  * Handle REQUEST_CONFIGURE request.
1042  */
1043 static void
1044 handle_client_request_configure (void *cls,
1045                                  struct GNUNET_SERVER_Client *client,
1046                                  const struct GNUNET_MessageHeader *message)
1047 {
1048   const struct RequestConfigureMessage *rcm;
1049   struct Neighbour *n;
1050   struct ConfigurationInfoMessage cim;
1051   struct Client *c;
1052   int reserv;
1053   unsigned long long old_preference;
1054
1055 #if DEBUG_CORE_CLIENT
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Core service receives `%s' request.\n", "CONFIGURE");
1058 #endif
1059   rcm = (const struct RequestConfigureMessage *) message;
1060   n = find_neighbour (&rcm->peer);
1061   memset (&cim, 0, sizeof (cim));
1062   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
1063     {
1064       update_window (GNUNET_YES,
1065                      &n->available_send_window,
1066                      &n->last_asw_update,
1067                      n->bpm_out);
1068       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
1069       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
1070                                n->bpm_out_external_limit);
1071       reserv = ntohl (rcm->reserve_inbound);
1072       if (reserv < 0)
1073         {
1074           n->available_recv_window += reserv;
1075         }
1076       else if (reserv > 0)
1077         {
1078           update_window (GNUNET_NO,
1079                          &n->available_recv_window,
1080                          &n->last_arw_update, n->bpm_in);
1081           if (n->available_recv_window < reserv)
1082             reserv = n->available_recv_window;
1083           n->available_recv_window -= reserv;
1084         }
1085       old_preference = n->current_preference;
1086       n->current_preference += GNUNET_ntohll(rcm->preference_change);
1087       if (old_preference > n->current_preference) 
1088         {
1089           /* overflow; cap at maximum value */
1090           n->current_preference = (unsigned long long) -1;
1091         }
1092       update_preference_sum (n->current_preference - old_preference);
1093       cim.reserved_amount = htonl (reserv);
1094       cim.bpm_in = htonl (n->bpm_in);
1095       cim.bpm_out = htonl (n->bpm_out);
1096       cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
1097       cim.preference = n->current_preference;
1098     }
1099   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1100   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1101   cim.peer = rcm->peer;
1102   c = find_client (client);
1103   if (c == NULL)
1104     {
1105       GNUNET_break (0);
1106       return;
1107     }
1108 #if DEBUG_CORE_CLIENT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
1111 #endif
1112   send_to_client (c, &cim.header, GNUNET_NO);
1113 }
1114
1115
1116 /**
1117  * Check if we have encrypted messages for the specified neighbour
1118  * pending, and if so, check with the transport about sending them
1119  * out.
1120  *
1121  * @param n neighbour to check.
1122  */
1123 static void process_encrypted_neighbour_queue (struct Neighbour *n);
1124
1125
1126 /**
1127  * Function called when the transport service is ready to
1128  * receive an encrypted message for the respective peer
1129  *
1130  * @param cls neighbour to use message from
1131  * @param size number of bytes we can transmit
1132  * @param buf where to copy the message
1133  * @return number of bytes transmitted
1134  */
1135 static size_t
1136 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
1137 {
1138   struct Neighbour *n = cls;
1139   struct MessageEntry *m;
1140   size_t ret;
1141   char *cbuf;
1142
1143   n->th = NULL;
1144   GNUNET_assert (NULL != (m = n->encrypted_head));
1145   n->encrypted_head = m->next;
1146   if (m->next == NULL)
1147     n->encrypted_tail = NULL;
1148   ret = 0;
1149   cbuf = buf;
1150   if (buf != NULL)
1151     {
1152       GNUNET_assert (size >= m->size);
1153       memcpy (cbuf, &m[1], m->size);
1154       ret = m->size;
1155       n->available_send_window -= m->size;
1156       process_encrypted_neighbour_queue (n);
1157 #if DEBUG_CORE
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
1160                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1161                   ret, GNUNET_i2s (&n->peer));
1162 #endif
1163     }
1164   else
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                   "Transmission for message of type %u and size %u failed\n",
1168                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
1169                   m->size);
1170     }
1171   GNUNET_free (m);
1172   return ret;
1173 }
1174
1175
1176 /**
1177  * Check if we have plaintext messages for the specified neighbour
1178  * pending, and if so, consider batching and encrypting them (and
1179  * then trigger processing of the encrypted queue if needed).
1180  *
1181  * @param n neighbour to check.
1182  */
1183 static void process_plaintext_neighbour_queue (struct Neighbour *n);
1184
1185
1186 /**
1187  * Check if we have encrypted messages for the specified neighbour
1188  * pending, and if so, check with the transport about sending them
1189  * out.
1190  *
1191  * @param n neighbour to check.
1192  */
1193 static void
1194 process_encrypted_neighbour_queue (struct Neighbour *n)
1195 {
1196   struct MessageEntry *m;
1197  
1198   if (n->th != NULL)
1199     return;  /* request already pending */
1200   if (n->encrypted_head == NULL)
1201     {
1202       /* encrypted queue empty, try plaintext instead */
1203       process_plaintext_neighbour_queue (n);
1204       return;
1205     }
1206 #if DEBUG_CORE
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1209               n->encrypted_head->size,
1210               GNUNET_i2s (&n->peer),
1211               GNUNET_TIME_absolute_get_remaining (n->
1212                                                   encrypted_head->deadline).
1213               value);
1214 #endif
1215   n->th =
1216     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
1217                                             n->encrypted_head->size,
1218                                             n->encrypted_head->priority,
1219                                             GNUNET_TIME_absolute_get_remaining
1220                                             (n->encrypted_head->deadline),
1221                                             &notify_encrypted_transmit_ready,
1222                                             n);
1223   if (n->th == NULL)
1224     {
1225       /* message request too large (oops) */
1226       GNUNET_break (0);
1227       /* discard encrypted message */
1228       GNUNET_assert (NULL != (m = n->encrypted_head));
1229       n->encrypted_head = m->next;
1230       if (m->next == NULL)
1231         n->encrypted_tail = NULL;
1232       GNUNET_free (m);
1233       process_encrypted_neighbour_queue (n);
1234     }
1235 }
1236
1237
1238 /**
1239  * Decrypt size bytes from in and write the result to out.  Use the
1240  * key for inbound traffic of the given neighbour.  This function does
1241  * NOT do any integrity-checks on the result.
1242  *
1243  * @param n neighbour we are receiving from
1244  * @param iv initialization vector to use
1245  * @param in ciphertext
1246  * @param out plaintext
1247  * @param size size of in/out
1248  * @return GNUNET_OK on success
1249  */
1250 static int
1251 do_decrypt (struct Neighbour *n,
1252             const GNUNET_HashCode * iv,
1253             const void *in, void *out, size_t size)
1254 {
1255   if (size != (uint16_t) size)
1256     {
1257       GNUNET_break (0);
1258       return GNUNET_NO;
1259     }
1260   if ((n->status != PEER_STATE_KEY_RECEIVED) &&
1261       (n->status != PEER_STATE_KEY_CONFIRMED))
1262     {
1263       GNUNET_break_op (0);
1264       return GNUNET_SYSERR;
1265     }
1266   if (size !=
1267       GNUNET_CRYPTO_aes_decrypt (in,
1268                                  (uint16_t) size,
1269                                  &n->decrypt_key,
1270                                  (const struct
1271                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
1272                                  out))
1273     {
1274       GNUNET_break (0);
1275       return GNUNET_SYSERR;
1276     }
1277 #if DEBUG_CORE
1278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279               "Decrypted %u bytes from `%4s' using key %u\n",
1280               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
1281 #endif
1282   return GNUNET_OK;
1283 }
1284
1285
1286 /**
1287  * Encrypt size bytes from in and write the result to out.  Use the
1288  * key for outbound traffic of the given neighbour.
1289  *
1290  * @param n neighbour we are sending to
1291  * @param iv initialization vector to use
1292  * @param in ciphertext
1293  * @param out plaintext
1294  * @param size size of in/out
1295  * @return GNUNET_OK on success
1296  */
1297 static int
1298 do_encrypt (struct Neighbour *n,
1299             const GNUNET_HashCode * iv,
1300             const void *in, void *out, size_t size)
1301 {
1302   if (size != (uint16_t) size)
1303     {
1304       GNUNET_break (0);
1305       return GNUNET_NO;
1306     }
1307   GNUNET_assert (size ==
1308                  GNUNET_CRYPTO_aes_encrypt (in,
1309                                             (uint16_t) size,
1310                                             &n->encrypt_key,
1311                                             (const struct
1312                                              GNUNET_CRYPTO_AesInitializationVector
1313                                              *) iv, out));
1314 #if DEBUG_CORE
1315   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316               "Encrypted %u bytes for `%4s' using key %u\n", size,
1317               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
1318 #endif
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * Select messages for transmission.  This heuristic uses a combination
1325  * of earliest deadline first (EDF) scheduling (with bounded horizon)
1326  * and priority-based discard (in case no feasible schedule exist) and
1327  * speculative optimization (defer any kind of transmission until
1328  * we either create a batch of significant size, 25% of max, or until
1329  * we are close to a deadline).  Furthermore, when scheduling the
1330  * heuristic also packs as many messages into the batch as possible,
1331  * starting with those with the earliest deadline.  Yes, this is fun.
1332  *
1333  * @param n neighbour to select messages from
1334  * @param size number of bytes to select for transmission
1335  * @param retry_time set to the time when we should try again
1336  *        (only valid if this function returns zero)
1337  * @return number of bytes selected, or 0 if we decided to
1338  *         defer scheduling overall; in that case, retry_time is set.
1339  */
1340 static size_t
1341 select_messages (struct Neighbour *n,
1342                  size_t size, struct GNUNET_TIME_Relative *retry_time)
1343 {
1344   struct MessageEntry *pos;
1345   struct MessageEntry *min;
1346   struct MessageEntry *last;
1347   unsigned int min_prio;
1348   struct GNUNET_TIME_Absolute t;
1349   struct GNUNET_TIME_Absolute now;
1350   uint64_t delta;
1351   uint64_t avail;
1352   unsigned long long slack;     /* how long could we wait before missing deadlines? */
1353   size_t off;
1354   int discard_low_prio;
1355
1356   GNUNET_assert (NULL != n->messages);
1357   now = GNUNET_TIME_absolute_get ();
1358   /* last entry in linked list of messages processed */
1359   last = NULL;
1360   /* should we remove the entry with the lowest
1361      priority from consideration for scheduling at the
1362      end of the loop? */
1363   discard_low_prio = GNUNET_YES;
1364   while (GNUNET_YES == discard_low_prio)
1365     {
1366       min = NULL;
1367       min_prio = -1;
1368       discard_low_prio = GNUNET_NO;
1369       /* calculate number of bytes available for transmission at time "t" */
1370       update_window (GNUNET_NO,
1371                      &n->available_send_window,
1372                      &n->last_asw_update,
1373                      n->bpm_out);
1374       avail = n->available_send_window;
1375       t = n->last_asw_update;
1376       /* how many bytes have we (hypothetically) scheduled so far */
1377       off = 0;
1378       /* maximum time we can wait before transmitting anything
1379          and still make all of our deadlines */
1380       slack = -1;
1381
1382       pos = n->messages;
1383       /* note that we use "*2" here because we want to look
1384          a bit further into the future; much more makes no
1385          sense since new message might be scheduled in the
1386          meantime... */
1387       while ((pos != NULL) && (off < size * 2))
1388         {
1389           if (pos->do_transmit == GNUNET_YES)
1390             {
1391               /* already removed from consideration */
1392               pos = pos->next;
1393               continue;
1394             }
1395           if (discard_low_prio == GNUNET_NO)
1396             {
1397               delta = pos->deadline.value;
1398               if (delta < t.value)
1399                 delta = 0;
1400               else
1401                 delta = t.value - delta;
1402               avail += delta * n->bpm_out / 1000 / 60;
1403               if (avail < pos->size)
1404                 {
1405                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
1406                 }
1407               else
1408                 {
1409                   avail -= pos->size;
1410                   /* update slack, considering both its absolute deadline
1411                      and relative deadlines caused by other messages
1412                      with their respective load */
1413                   slack = GNUNET_MIN (slack, avail / n->bpm_out);
1414                   if (pos->deadline.value < now.value)
1415                     slack = 0;
1416                   else
1417                     slack =
1418                       GNUNET_MIN (slack, pos->deadline.value - now.value);
1419                 }
1420             }
1421           off += pos->size;
1422           t.value = GNUNET_MAX (pos->deadline.value, t.value);
1423           if (pos->priority <= min_prio)
1424             {
1425               /* update min for discard */
1426               min_prio = pos->priority;
1427               min = pos;
1428             }
1429           pos = pos->next;
1430         }
1431       if (discard_low_prio)
1432         {
1433           GNUNET_assert (min != NULL);
1434           /* remove lowest-priority entry from consideration */
1435           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
1436         }
1437       last = pos;
1438     }
1439   /* guard against sending "tiny" messages with large headers without
1440      urgent deadlines */
1441   if ( (slack > 1000) && (size > 4 * off) )
1442     {
1443       /* less than 25% of message would be filled with
1444          deadlines still being met if we delay by one
1445          second or more; so just wait for more data */
1446       retry_time->value = slack / 2;
1447       /* reset do_transmit values for next time */
1448       while (pos != last)
1449         {
1450           pos->do_transmit = GNUNET_NO;
1451           pos = pos->next;
1452         }
1453       return 0;
1454     }
1455   /* select marked messages (up to size) for transmission */
1456   off = 0;
1457   pos = n->messages;
1458   while (pos != last)
1459     {
1460       if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
1461         {
1462           pos->do_transmit = GNUNET_YES;        /* mark for transmission */
1463           off += pos->size;
1464           size -= pos->size;
1465         }
1466       else
1467         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
1468       pos = pos->next;
1469     }
1470 #if DEBUG_CORE
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
1473               off, GNUNET_i2s (&n->peer));
1474 #endif
1475   return off;
1476 }
1477
1478
1479 /**
1480  * Batch multiple messages into a larger buffer.
1481  *
1482  * @param n neighbour to take messages from
1483  * @param buf target buffer
1484  * @param size size of buf
1485  * @param deadline set to transmission deadline for the result
1486  * @param retry_time set to the time when we should try again
1487  *        (only valid if this function returns zero)
1488  * @param priority set to the priority of the batch
1489  * @return number of bytes written to buf (can be zero)
1490  */
1491 static size_t
1492 batch_message (struct Neighbour *n,
1493                char *buf,
1494                size_t size,
1495                struct GNUNET_TIME_Absolute *deadline,
1496                struct GNUNET_TIME_Relative *retry_time,
1497                unsigned int *priority)
1498 {
1499   struct MessageEntry *pos;
1500   struct MessageEntry *prev;
1501   struct MessageEntry *next;
1502   size_t ret;
1503
1504   ret = 0;
1505   *priority = 0;
1506   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1507   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
1508   if (0 == select_messages (n, size, retry_time))
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1511                   "No messages selected, will try again in %llu ms\n",
1512                   retry_time->value);
1513       return 0;
1514     }
1515   pos = n->messages;
1516   prev = NULL;
1517   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
1518     {
1519       next = pos->next;
1520       if (GNUNET_YES == pos->do_transmit)
1521         {
1522           GNUNET_assert (pos->size <= size);
1523           memcpy (&buf[ret], &pos[1], pos->size);
1524           ret += pos->size;
1525           size -= pos->size;
1526           *priority += pos->priority;
1527           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
1528           GNUNET_free (pos);
1529           if (prev == NULL)
1530             n->messages = next;
1531           else
1532             prev->next = next;
1533         }
1534       else
1535         {
1536           prev = pos;
1537         }
1538       pos = next;
1539     }
1540   return ret;
1541 }
1542
1543
1544 /**
1545  * Remove messages with deadlines that have long expired from
1546  * the queue.
1547  *
1548  * @param n neighbour to inspect
1549  */
1550 static void
1551 discard_expired_messages (struct Neighbour *n)
1552 {
1553   struct MessageEntry *prev;
1554   struct MessageEntry *next;
1555   struct MessageEntry *pos;
1556   struct GNUNET_TIME_Absolute now;
1557   struct GNUNET_TIME_Relative delta;
1558
1559   now = GNUNET_TIME_absolute_get ();
1560   prev = NULL;
1561   pos = n->messages;
1562   while (pos != NULL) 
1563     {
1564       next = pos->next;
1565       delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
1566       if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
1567         {
1568 #if DEBUG_CORE
1569           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1570                       "Message is %llu ms past due, discarding.\n",
1571                       delta.value);
1572 #endif
1573           if (prev == NULL)
1574             n->messages = next;
1575           else
1576             prev->next = next;
1577           GNUNET_free (pos);
1578         }
1579       else
1580         prev = pos;
1581       pos = next;
1582     }
1583 }
1584
1585
1586 /**
1587  * Signature of the main function of a task.
1588  *
1589  * @param cls closure
1590  * @param tc context information (why was this task triggered now)
1591  */
1592 static void
1593 retry_plaintext_processing (void *cls,
1594                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1595 {
1596   struct Neighbour *n = cls;
1597
1598   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1599   process_plaintext_neighbour_queue (n);
1600 }
1601
1602
1603 /**
1604  * Send our key (and encrypted PING) to the other peer.
1605  *
1606  * @param n the other peer
1607  */
1608 static void send_key (struct Neighbour *n);
1609
1610
1611 /**
1612  * Check if we have plaintext messages for the specified neighbour
1613  * pending, and if so, consider batching and encrypting them (and
1614  * then trigger processing of the encrypted queue if needed).
1615  *
1616  * @param n neighbour to check.
1617  */
1618 static void
1619 process_plaintext_neighbour_queue (struct Neighbour *n)
1620 {
1621   char pbuf[MAX_ENCRYPTED_MESSAGE_SIZE];        /* plaintext */
1622   size_t used;
1623   size_t esize;
1624   struct EncryptedMessage *em;  /* encrypted message */
1625   struct EncryptedMessage *ph;  /* plaintext header */
1626   struct MessageEntry *me;
1627   unsigned int priority;
1628   struct GNUNET_TIME_Absolute deadline;
1629   struct GNUNET_TIME_Relative retry_time;
1630
1631   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
1632     {
1633       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
1634       n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
1635     }
1636   switch (n->status)
1637     {
1638     case PEER_STATE_DOWN:
1639       send_key (n);
1640 #if DEBUG_CORE
1641       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1643                   GNUNET_i2s(&n->peer));
1644 #endif
1645       return;
1646     case PEER_STATE_KEY_SENT:
1647       GNUNET_assert (n->retry_set_key_task !=
1648                      GNUNET_SCHEDULER_NO_TASK);
1649 #if DEBUG_CORE
1650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1651                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1652                   GNUNET_i2s(&n->peer));
1653 #endif
1654       return;
1655     case PEER_STATE_KEY_RECEIVED:
1656       GNUNET_assert (n->retry_set_key_task !=
1657                      GNUNET_SCHEDULER_NO_TASK);
1658 #if DEBUG_CORE
1659       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1661                   GNUNET_i2s(&n->peer));
1662 #endif
1663       return;
1664     case PEER_STATE_KEY_CONFIRMED:
1665       /* ready to continue */
1666       break;
1667     }
1668   discard_expired_messages (n);
1669   if (n->messages == NULL)
1670     {
1671 #if DEBUG_CORE
1672       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                   "Plaintext message queue for `%4s' is empty.\n",
1674                   GNUNET_i2s(&n->peer));
1675 #endif
1676       return;                   /* no pending messages */
1677     }
1678   if (n->encrypted_head != NULL)
1679     {
1680 #if DEBUG_CORE
1681       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1682                   "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1683                   GNUNET_i2s(&n->peer));
1684 #endif
1685       return;                   /* wait for messages already encrypted to be
1686                                    processed first! */
1687     }
1688   ph = (struct EncryptedMessage *) pbuf;
1689   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1690   priority = 0;
1691   used = sizeof (struct EncryptedMessage);
1692   used += batch_message (n,
1693                          &pbuf[used],
1694                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
1695                          &deadline, &retry_time, &priority);
1696   if (used == sizeof (struct EncryptedMessage))
1697     {
1698 #if DEBUG_CORE
1699       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1700                   "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1701                   GNUNET_i2s(&n->peer));
1702 #endif
1703       /* no messages selected for sending, try again later... */
1704       n->retry_plaintext_task =
1705         GNUNET_SCHEDULER_add_delayed (sched,
1706                                       retry_time,
1707                                       &retry_plaintext_processing, n);
1708       return;
1709     }
1710
1711   ph->sequence_number = htonl (++n->last_sequence_number_sent);
1712   ph->inbound_bpm_limit = htonl (n->bpm_in);
1713   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1714
1715   /* setup encryption message header */
1716   me = GNUNET_malloc (sizeof (struct MessageEntry) + used);
1717   me->deadline = deadline;
1718   me->priority = priority;
1719   me->size = used;
1720   em = (struct EncryptedMessage *) &me[1];
1721   em->header.size = htons (used);
1722   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
1723   em->reserved = htonl (0);
1724   esize = used - ENCRYPTED_HEADER_SIZE;
1725   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
1726   /* encrypt */
1727 #if DEBUG_CORE
1728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1729               "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
1730               esize,
1731               GNUNET_i2s(&n->peer));
1732 #endif
1733   GNUNET_assert (GNUNET_OK ==
1734                  do_encrypt (n,
1735                              &em->plaintext_hash,
1736                              &ph->sequence_number,
1737                              &em->sequence_number, esize));
1738   /* append to transmission list */
1739   if (n->encrypted_tail == NULL)
1740     n->encrypted_head = me;
1741   else
1742     n->encrypted_tail->next = me;
1743   n->encrypted_tail = me;
1744   process_encrypted_neighbour_queue (n);
1745 }
1746
1747
1748 /**
1749  * Handle CORE_SEND request.
1750  *
1751  * @param cls unused
1752  * @param client the client issuing the request
1753  * @param message the "struct SendMessage"
1754  */
1755 static void
1756 handle_client_send (void *cls,
1757                     struct GNUNET_SERVER_Client *client,
1758                     const struct GNUNET_MessageHeader *message);
1759
1760
1761 /**
1762  * Function called to notify us that we either succeeded
1763  * or failed to connect (at the transport level) to another
1764  * peer.  We should either free the message we were asked
1765  * to transmit or re-try adding it to the queue.
1766  *
1767  * @param cls closure
1768  * @param size number of bytes available in buf
1769  * @param buf where the callee should write the message
1770  * @return number of bytes written to buf
1771  */
1772 static size_t
1773 send_connect_continuation (void *cls, size_t size, void *buf)
1774 {
1775   struct SendMessage *sm = cls;
1776
1777   if (buf == NULL)
1778     {
1779 #if DEBUG_CORE
1780       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1781                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
1782                   GNUNET_i2s (&sm->peer));
1783 #endif
1784       GNUNET_free (sm);
1785       return 0;
1786     }
1787 #if DEBUG_CORE
1788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789               "Connection to peer `%4s' succeeded, retrying original transmission request\n",
1790               GNUNET_i2s (&sm->peer));
1791 #endif
1792   handle_client_send (NULL, NULL, &sm->header);
1793   GNUNET_free (sm);
1794   return 0;
1795 }
1796
1797
1798 /**
1799  * Handle CORE_SEND request.
1800  *
1801  * @param cls unused
1802  * @param client the client issuing the request
1803  * @param message the "struct SendMessage"
1804  */
1805 static void
1806 handle_client_send (void *cls,
1807                     struct GNUNET_SERVER_Client *client,
1808                     const struct GNUNET_MessageHeader *message)
1809 {
1810   const struct SendMessage *sm;
1811   struct SendMessage *smc;
1812   const struct GNUNET_MessageHeader *mh;
1813   struct Neighbour *n;
1814   struct MessageEntry *prev;
1815   struct MessageEntry *pos;
1816   struct MessageEntry *e; 
1817   struct MessageEntry *min_prio_entry;
1818   struct MessageEntry *min_prio_prev;
1819   unsigned int min_prio;
1820   unsigned int queue_size;
1821   uint16_t msize;
1822
1823   msize = ntohs (message->size);
1824   if (msize <
1825       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
1826     {
1827       GNUNET_break (0);
1828       if (client != NULL)
1829         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1830       return;
1831     }
1832   sm = (const struct SendMessage *) message;
1833   msize -= sizeof (struct SendMessage);
1834   mh = (const struct GNUNET_MessageHeader *) &sm[1];
1835   if (msize != ntohs (mh->size))
1836     {
1837       GNUNET_break (0);
1838       if (client != NULL)
1839         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1840       return;
1841     }
1842   n = find_neighbour (&sm->peer);
1843   if (n == NULL)
1844     {
1845 #if DEBUG_CORE
1846       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
1848                   "SEND",
1849                   GNUNET_i2s (&sm->peer),
1850                   GNUNET_TIME_absolute_get_remaining
1851                   (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
1852 #endif
1853       msize += sizeof (struct SendMessage);
1854       /* ask transport to connect to the peer */
1855       smc = GNUNET_malloc (msize);
1856       memcpy (smc, sm, msize);
1857       if (NULL ==
1858           GNUNET_TRANSPORT_notify_transmit_ready (transport,
1859                                                   &sm->peer,
1860                                                   0, 0,
1861                                                   GNUNET_TIME_absolute_get_remaining
1862                                                   (GNUNET_TIME_absolute_ntoh
1863                                                    (sm->deadline)),
1864                                                   &send_connect_continuation,
1865                                                   smc))
1866         {
1867           /* transport has already a request pending for this peer! */
1868 #if DEBUG_CORE
1869           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1870                       "Dropped second message destined for `%4s' since connection is still down.\n",
1871                       GNUNET_i2s(&sm->peer));
1872 #endif
1873           GNUNET_free (smc);
1874         }
1875       if (client != NULL)
1876         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1877       return;
1878     }
1879 #if DEBUG_CORE
1880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1882               "SEND",
1883               msize, 
1884               GNUNET_i2s (&sm->peer));
1885 #endif
1886   /* bound queue size */
1887   discard_expired_messages (n);
1888   min_prio = (unsigned int) -1;
1889   min_prio_entry = NULL;
1890   min_prio_prev = NULL;
1891   queue_size = 0;
1892   prev = NULL;
1893   pos = n->messages;
1894   while (pos != NULL) 
1895     {
1896       if (pos->priority < min_prio)
1897         {
1898           min_prio_entry = pos;
1899           min_prio_prev = prev;
1900           min_prio = pos->priority;
1901         }
1902       queue_size++;
1903       prev = pos;
1904       pos = pos->next;
1905     }
1906   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1907     {
1908       /* queue full */
1909       if (ntohl(sm->priority) <= min_prio)
1910         {
1911           /* discard new entry */
1912 #if DEBUG_CORE
1913           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                       "Queue full, discarding new request\n");
1915 #endif
1916           if (client != NULL)
1917             GNUNET_SERVER_receive_done (client, GNUNET_OK);
1918           return;
1919         }
1920       /* discard "min_prio_entry" */
1921 #if DEBUG_CORE
1922       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923                   "Queue full, discarding existing older request\n");
1924 #endif
1925       if (min_prio_prev == NULL)
1926         n->messages = min_prio_entry->next;
1927       else
1928         min_prio_prev->next = min_prio_entry->next;      
1929       GNUNET_free (min_prio_entry);     
1930     }
1931   
1932   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1933   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1934   e->priority = ntohl (sm->priority);
1935   e->size = msize;
1936   memcpy (&e[1], mh, msize);
1937
1938   /* insert, keep list sorted by deadline */
1939   prev = NULL;
1940   pos = n->messages;
1941   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1942     {
1943       prev = pos;
1944       pos = pos->next;
1945     }
1946   if (prev == NULL)
1947     n->messages = e;
1948   else
1949     prev->next = e;
1950   e->next = pos;
1951
1952   /* consider scheduling now */
1953   process_plaintext_neighbour_queue (n);
1954   if (client != NULL)
1955     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1956 }
1957
1958
1959 /**
1960  * List of handlers for the messages understood by this
1961  * service.
1962  */
1963 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1964   {&handle_client_init, NULL,
1965    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1966   {&handle_client_request_configure, NULL,
1967    GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
1968    sizeof (struct RequestConfigureMessage)},
1969   {&handle_client_send, NULL,
1970    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1971   {NULL, NULL, 0, 0}
1972 };
1973
1974
1975 /**
1976  * PEERINFO is giving us a HELLO for a peer.  Add the
1977  * public key to the neighbour's struct and retry
1978  * send_key.  Or, if we did not get a HELLO, just do
1979  * nothing.
1980  *
1981  * @param cls NULL
1982  * @param peer the peer for which this is the HELLO
1983  * @param hello HELLO message of that peer
1984  * @param trust amount of trust we currently have in that peer
1985  */
1986 static void
1987 process_hello_retry_send_key (void *cls,
1988                               const struct GNUNET_PeerIdentity *peer,
1989                               const struct GNUNET_HELLO_Message *hello,
1990                               uint32_t trust)
1991 {
1992   struct Neighbour *n;
1993
1994   if (peer == NULL)
1995     return;
1996   n = find_neighbour (peer);
1997   if (n == NULL)
1998     return;
1999   if (n->public_key != NULL)
2000     return;
2001 #if DEBUG_CORE
2002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2003               "Received new `%s' message for `%4s', initiating key exchange.\n",
2004               "HELLO",
2005               GNUNET_i2s (peer));
2006 #endif
2007   n->public_key =
2008     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2009   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2010     {
2011       GNUNET_free (n->public_key);
2012       n->public_key = NULL;
2013       return;
2014     }
2015   send_key (n);
2016 }
2017
2018
2019 /**
2020  * Task that will retry "send_key" if our previous attempt failed
2021  * to yield a PONG.
2022  */
2023 static void
2024 set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2025 {
2026   struct Neighbour *n = cls;
2027
2028   GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
2029   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2030   n->set_key_retry_frequency =
2031     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
2032   send_key (n);
2033 }
2034
2035
2036 /**
2037  * Send our key (and encrypted PING) to the other peer.
2038  *
2039  * @param n the other peer
2040  */
2041 static void
2042 send_key (struct Neighbour *n)
2043 {
2044   struct SetKeyMessage *sm;
2045   struct MessageEntry *me;
2046   struct PingMessage pp;
2047   struct PingMessage *pm;
2048
2049 #if DEBUG_CORE
2050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2051               "Asked to perform key exchange with `%4s'.\n",
2052               GNUNET_i2s (&n->peer));
2053 #endif
2054   if (n->public_key == NULL)
2055     {
2056       /* lookup n's public key, then try again */
2057 #if DEBUG_CORE
2058       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2059                   "Lacking public key for `%4s', trying to obtain one.\n",
2060                   GNUNET_i2s (&n->peer));
2061 #endif
2062       GNUNET_PEERINFO_for_all (cfg,
2063                                sched,
2064                                &n->peer,
2065                                0,
2066                                GNUNET_TIME_UNIT_MINUTES,
2067                                &process_hello_retry_send_key, NULL);
2068       return;
2069     }
2070   /* first, set key message */
2071   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2072                       sizeof (struct SetKeyMessage));
2073   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
2074   me->priority = SET_KEY_PRIORITY;
2075   me->size = sizeof (struct SetKeyMessage);
2076   if (n->encrypted_head == NULL)
2077     n->encrypted_head = me;
2078   else
2079     n->encrypted_tail->next = me;
2080   n->encrypted_tail = me;
2081   sm = (struct SetKeyMessage *) &me[1];
2082   sm->header.size = htons (sizeof (struct SetKeyMessage));
2083   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
2084   sm->sender_status = htonl ((int32_t) ((n->status == PEER_STATE_DOWN) ?
2085                                         PEER_STATE_KEY_SENT : n->status));
2086   sm->purpose.size =
2087     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2088            sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2089            sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2090            sizeof (struct GNUNET_PeerIdentity));
2091   sm->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_KEY);
2092   sm->creation_time = GNUNET_TIME_absolute_hton (n->encrypt_key_created);
2093   sm->target = n->peer;
2094   GNUNET_assert (GNUNET_OK ==
2095                  GNUNET_CRYPTO_rsa_encrypt (&n->encrypt_key,
2096                                             sizeof (struct
2097                                                     GNUNET_CRYPTO_AesSessionKey),
2098                                             n->public_key,
2099                                             &sm->encrypted_key));
2100   GNUNET_assert (GNUNET_OK ==
2101                  GNUNET_CRYPTO_rsa_sign (my_private_key, &sm->purpose,
2102                                          &sm->signature));
2103
2104   /* second, encrypted PING message */
2105   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2106                       sizeof (struct PingMessage));
2107   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
2108   me->priority = PING_PRIORITY;
2109   me->size = sizeof (struct PingMessage);
2110   n->encrypted_tail->next = me;
2111   n->encrypted_tail = me;
2112   pm = (struct PingMessage *) &me[1];
2113   pm->header.size = htons (sizeof (struct PingMessage));
2114   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
2115   pp.challenge = htonl (n->ping_challenge);
2116   pp.target = n->peer;
2117 #if DEBUG_CORE
2118   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2119               "Encrypting `%s' and `%s' messages for `%4s'.\n",
2120               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
2121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2122               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
2123               "PING",
2124               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
2125 #endif
2126   do_encrypt (n,
2127               &n->peer.hashPubKey,
2128               &pp.challenge,
2129               &pm->challenge,
2130               sizeof (struct PingMessage) -
2131               sizeof (struct GNUNET_MessageHeader));
2132   /* update status */
2133   switch (n->status)
2134     {
2135     case PEER_STATE_DOWN:
2136       n->status = PEER_STATE_KEY_SENT;
2137       break;
2138     case PEER_STATE_KEY_SENT:
2139       break;
2140     case PEER_STATE_KEY_RECEIVED:
2141       break;
2142     case PEER_STATE_KEY_CONFIRMED:
2143       GNUNET_break (0);
2144       break;
2145     default:
2146       GNUNET_break (0);
2147       break;
2148     }
2149   /* trigger queue processing */
2150   process_encrypted_neighbour_queue (n);
2151   if (n->status != PEER_STATE_KEY_CONFIRMED)
2152     n->retry_set_key_task
2153       = GNUNET_SCHEDULER_add_delayed (sched,
2154                                       n->set_key_retry_frequency,
2155                                       &set_key_retry_task, n);
2156 }
2157
2158
2159 /**
2160  * We received a SET_KEY message.  Validate and update
2161  * our key material and status.
2162  *
2163  * @param n the neighbour from which we received message m
2164  * @param m the set key message we received
2165  */
2166 static void
2167 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m);
2168
2169
2170 /**
2171  * PEERINFO is giving us a HELLO for a peer.  Add the public key to
2172  * the neighbour's struct and retry handling the set_key message.  Or,
2173  * if we did not get a HELLO, just free the set key message.
2174  *
2175  * @param cls pointer to the set key message
2176  * @param peer the peer for which this is the HELLO
2177  * @param hello HELLO message of that peer
2178  * @param trust amount of trust we currently have in that peer
2179  */
2180 static void
2181 process_hello_retry_handle_set_key (void *cls,
2182                                     const struct GNUNET_PeerIdentity *peer,
2183                                     const struct GNUNET_HELLO_Message *hello,
2184                                     uint32_t trust)
2185 {
2186   struct SetKeyMessage *sm = cls;
2187   struct Neighbour *n;
2188
2189   if (peer == NULL)
2190     {
2191       GNUNET_free (sm);
2192       return;
2193     }
2194   n = find_neighbour (peer);
2195   if (n == NULL)
2196     {
2197       GNUNET_break (0);
2198       return;
2199     }
2200   if (n->public_key != NULL)
2201     return;                     /* multiple HELLOs match!? */
2202   n->public_key =
2203     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
2204   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
2205     {
2206       GNUNET_break_op (0);
2207       GNUNET_free (n->public_key);
2208       n->public_key = NULL;
2209       return;
2210     }
2211 #if DEBUG_CORE
2212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2213               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
2214               "HELLO", GNUNET_i2s (peer), "SET_KEY");
2215 #endif
2216   handle_set_key (n, sm);
2217 }
2218
2219
2220 /**
2221  * We received a PING message.  Validate and transmit
2222  * PONG.
2223  *
2224  * @param n sender of the PING
2225  * @param m the encrypted PING message itself
2226  */
2227 static void
2228 handle_ping (struct Neighbour *n, const struct PingMessage *m)
2229 {
2230   struct PingMessage t;
2231   struct PingMessage *tp;
2232   struct MessageEntry *me;
2233
2234 #if DEBUG_CORE
2235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236               "Core service receives `%s' request from `%4s'.\n",
2237               "PING", GNUNET_i2s (&n->peer));
2238 #endif
2239   if (GNUNET_OK !=
2240       do_decrypt (n,
2241                   &my_identity.hashPubKey,
2242                   &m->challenge,
2243                   &t.challenge,
2244                   sizeof (struct PingMessage) -
2245                   sizeof (struct GNUNET_MessageHeader)))
2246     return;
2247 #if DEBUG_CORE
2248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
2250               "PING",
2251               GNUNET_i2s (&t.target),
2252               ntohl (t.challenge), n->decrypt_key.crc32);
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "Target of `%s' request is `%4s'.\n",
2255               "PING", GNUNET_i2s (&t.target));
2256 #endif
2257   if (0 != memcmp (&t.target,
2258                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
2259     {
2260       GNUNET_break_op (0);
2261       return;
2262     }
2263   me = GNUNET_malloc (sizeof (struct MessageEntry) +
2264                       sizeof (struct PingMessage));
2265   if (n->encrypted_tail != NULL)
2266     n->encrypted_tail->next = me;
2267   else
2268     {
2269       n->encrypted_tail = me;
2270       n->encrypted_head = me;
2271     }
2272   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
2273   me->priority = PONG_PRIORITY;
2274   me->size = sizeof (struct PingMessage);
2275   tp = (struct PingMessage *) &me[1];
2276   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
2277   tp->header.size = htons (sizeof (struct PingMessage));
2278   do_encrypt (n,
2279               &my_identity.hashPubKey,
2280               &t.challenge,
2281               &tp->challenge,
2282               sizeof (struct PingMessage) -
2283               sizeof (struct GNUNET_MessageHeader));
2284 #if DEBUG_CORE
2285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2286               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
2287               ntohl (t.challenge), n->encrypt_key.crc32);
2288 #endif
2289   /* trigger queue processing */
2290   process_encrypted_neighbour_queue (n);
2291 }
2292
2293
2294 /**
2295  * We received a SET_KEY message.  Validate and update
2296  * our key material and status.
2297  *
2298  * @param n the neighbour from which we received message m
2299  * @param m the set key message we received
2300  */
2301 static void
2302 handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
2303 {
2304   struct SetKeyMessage *m_cpy;
2305   struct GNUNET_TIME_Absolute t;
2306   struct GNUNET_CRYPTO_AesSessionKey k;
2307   struct PingMessage *ping;
2308   enum PeerStateMachine sender_status;
2309
2310 #if DEBUG_CORE
2311   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2312               "Core service receives `%s' request from `%4s'.\n",
2313               "SET_KEY", GNUNET_i2s (&n->peer));
2314 #endif
2315   if (n->public_key == NULL)
2316     {
2317 #if DEBUG_CORE
2318       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319                   "Lacking public key for peer, trying to obtain one.\n");
2320 #endif
2321       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
2322       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
2323       /* lookup n's public key, then try again */
2324       GNUNET_PEERINFO_for_all (cfg,
2325                                sched,
2326                                &n->peer,
2327                                0,
2328                                GNUNET_TIME_UNIT_MINUTES,
2329                                &process_hello_retry_handle_set_key, m_cpy);
2330       return;
2331     }
2332   if ((ntohl (m->purpose.size) !=
2333        sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2334        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
2335        sizeof (struct GNUNET_CRYPTO_RsaEncryptedData) +
2336        sizeof (struct GNUNET_PeerIdentity)) ||
2337       (GNUNET_OK !=
2338        GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_KEY,
2339                                  &m->purpose, &m->signature, n->public_key)))
2340     {
2341       /* invalid signature */
2342       GNUNET_break_op (0);
2343       return;
2344     }
2345   t = GNUNET_TIME_absolute_ntoh (m->creation_time);
2346   if (((n->status == PEER_STATE_KEY_RECEIVED) ||
2347        (n->status == PEER_STATE_KEY_CONFIRMED)) &&
2348       (t.value < n->decrypt_key_created.value))
2349     {
2350       /* this could rarely happen due to massive re-ordering of
2351          messages on the network level, but is most likely either
2352          a bug or some adversary messing with us.  Report. */
2353       GNUNET_break_op (0);
2354       return;
2355     }
2356 #if DEBUG_CORE
2357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
2358 #endif  
2359   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
2360                                   &m->encrypted_key,
2361                                   &k,
2362                                   sizeof (struct GNUNET_CRYPTO_AesSessionKey))
2363        != sizeof (struct GNUNET_CRYPTO_AesSessionKey)) ||
2364       (GNUNET_OK != GNUNET_CRYPTO_aes_check_session_key (&k)))
2365     {
2366       /* failed to decrypt !? */
2367       GNUNET_break_op (0);
2368       return;
2369     }
2370
2371   n->decrypt_key = k;
2372   if (n->decrypt_key_created.value != t.value)
2373     {
2374       /* fresh key, reset sequence numbers */
2375       n->last_sequence_number_received = 0;
2376       n->last_packets_bitmap = 0;
2377       n->decrypt_key_created = t;
2378     }
2379   sender_status = (enum PeerStateMachine) ntohl (m->sender_status);
2380   switch (n->status)
2381     {
2382     case PEER_STATE_DOWN:
2383       n->status = PEER_STATE_KEY_RECEIVED;
2384 #if DEBUG_CORE
2385       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2386                   "Responding to `%s' with my own key.\n", "SET_KEY");
2387 #endif
2388       send_key (n);
2389       break;
2390     case PEER_STATE_KEY_SENT:
2391     case PEER_STATE_KEY_RECEIVED:
2392       n->status = PEER_STATE_KEY_RECEIVED;
2393       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2394           (sender_status != PEER_STATE_KEY_CONFIRMED))
2395         {
2396 #if DEBUG_CORE
2397           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2398                       "Responding to `%s' with my own key (other peer has status %u).\n",
2399                       "SET_KEY", sender_status);
2400 #endif
2401           send_key (n);
2402         }
2403       break;
2404     case PEER_STATE_KEY_CONFIRMED:
2405       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
2406           (sender_status != PEER_STATE_KEY_CONFIRMED))
2407         {
2408 #if DEBUG_CORE
2409           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2410                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
2411                       "SET_KEY", sender_status);
2412 #endif
2413           send_key (n);
2414         }
2415       break;
2416     default:
2417       GNUNET_break (0);
2418       break;
2419     }
2420   if (n->pending_ping != NULL)
2421     {
2422       ping = n->pending_ping;
2423       n->pending_ping = NULL;
2424       handle_ping (n, ping);
2425       GNUNET_free (ping);
2426     }
2427 }
2428
2429
2430 /**
2431  * We received a PONG message.  Validate and update
2432  * our status.
2433  *
2434  * @param n sender of the PONG
2435  * @param m the encrypted PONG message itself
2436  */
2437 static void
2438 handle_pong (struct Neighbour *n, const struct PingMessage *m)
2439 {
2440   struct PingMessage t;
2441
2442 #if DEBUG_CORE
2443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2444               "Core service receives `%s' request from `%4s'.\n",
2445               "PONG", GNUNET_i2s (&n->peer));
2446 #endif
2447   if (GNUNET_OK !=
2448       do_decrypt (n,
2449                   &n->peer.hashPubKey,
2450                   &m->challenge,
2451                   &t.challenge,
2452                   sizeof (struct PingMessage) -
2453                   sizeof (struct GNUNET_MessageHeader)))
2454     return;
2455 #if DEBUG_CORE
2456   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2457               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
2458               "PONG",
2459               GNUNET_i2s (&t.target),
2460               ntohl (t.challenge), n->decrypt_key.crc32);
2461 #endif
2462   if ((0 != memcmp (&t.target,
2463                     &n->peer,
2464                     sizeof (struct GNUNET_PeerIdentity))) ||
2465       (n->ping_challenge != ntohl (t.challenge)))
2466     {
2467       /* PONG malformed */
2468 #if DEBUG_CORE
2469       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2470                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
2471                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
2472       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2473                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
2474                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
2475 #endif
2476       GNUNET_break_op (0);
2477       return;
2478     }
2479   switch (n->status)
2480     {
2481     case PEER_STATE_DOWN:
2482       GNUNET_break (0);         /* should be impossible */
2483       return;
2484     case PEER_STATE_KEY_SENT:
2485       GNUNET_break (0);         /* should be impossible, how did we decrypt? */
2486       return;
2487     case PEER_STATE_KEY_RECEIVED:
2488       n->status = PEER_STATE_KEY_CONFIRMED;
2489       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
2490         {
2491           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
2492           n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
2493         }
2494       process_encrypted_neighbour_queue (n);
2495       break;
2496     case PEER_STATE_KEY_CONFIRMED:
2497       /* duplicate PONG? */
2498       break;
2499     default:
2500       GNUNET_break (0);
2501       break;
2502     }
2503 }
2504
2505
2506 /**
2507  * Send a P2P message to a client.
2508  *
2509  * @param sender who sent us the message?
2510  * @param client who should we give the message to?
2511  * @param m contains the message to transmit
2512  * @param msize number of bytes in buf to transmit
2513  */
2514 static void
2515 send_p2p_message_to_client (struct Neighbour *sender,
2516                             struct Client *client,
2517                             const void *m, size_t msize)
2518 {
2519   char buf[msize + sizeof (struct NotifyTrafficMessage)];
2520   struct NotifyTrafficMessage *ntm;
2521
2522 #if DEBUG_CORE
2523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2524               "Core service passes message from `%4s' of type %u to client.\n",
2525               GNUNET_i2s(&sender->peer),
2526               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
2527 #endif
2528   ntm = (struct NotifyTrafficMessage *) buf;
2529   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
2530   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
2531   ntm->reserved = htonl (0);
2532   ntm->peer = sender->peer;
2533   memcpy (&ntm[1], m, msize);
2534   send_to_client (client, &ntm->header, GNUNET_YES);
2535 }
2536
2537
2538 /**
2539  * Deliver P2P message to interested clients.
2540  *
2541  * @param sender who sent us the message?
2542  * @param m the message
2543  * @param msize size of the message (including header)
2544  */
2545 static void
2546 deliver_message (struct Neighbour *sender,
2547                  const struct GNUNET_MessageHeader *m, size_t msize)
2548 {
2549   struct Client *cpos;
2550   uint16_t type;
2551   unsigned int tpos;
2552   int deliver_full;
2553
2554   type = ntohs (m->type);
2555   cpos = clients;
2556   while (cpos != NULL)
2557     {
2558       deliver_full = GNUNET_NO;
2559       if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
2560         deliver_full = GNUNET_YES;
2561       else
2562         {
2563           for (tpos = 0; tpos < cpos->tcnt; tpos++)
2564             {
2565               if (type != cpos->types[tpos])
2566                 continue;
2567               deliver_full = GNUNET_YES;
2568               break;
2569             }
2570         }
2571       if (GNUNET_YES == deliver_full)
2572         send_p2p_message_to_client (sender, cpos, m, msize);
2573       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
2574         send_p2p_message_to_client (sender, cpos, m,
2575                                     sizeof (struct GNUNET_MessageHeader));
2576       cpos = cpos->next;
2577     }
2578 }
2579
2580
2581 /**
2582  * Align P2P message and then deliver to interested clients.
2583  *
2584  * @param sender who sent us the message?
2585  * @param buffer unaligned (!) buffer containing message
2586  * @param msize size of the message (including header)
2587  */
2588 static void
2589 align_and_deliver (struct Neighbour *sender, const char *buffer, size_t msize)
2590 {
2591   char abuf[msize];
2592
2593   /* TODO: call to statistics? */
2594   memcpy (abuf, buffer, msize);
2595   deliver_message (sender, (const struct GNUNET_MessageHeader *) abuf, msize);
2596 }
2597
2598
2599 /**
2600  * Deliver P2P messages to interested clients.
2601  *
2602  * @param sender who sent us the message?
2603  * @param buffer buffer containing messages, can be modified
2604  * @param buffer_size size of the buffer (overall)
2605  * @param offset offset where messages in the buffer start
2606  */
2607 static void
2608 deliver_messages (struct Neighbour *sender,
2609                   const char *buffer, size_t buffer_size, size_t offset)
2610 {
2611   struct GNUNET_MessageHeader *mhp;
2612   struct GNUNET_MessageHeader mh;
2613   uint16_t msize;
2614   int need_align;
2615
2616   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
2617     {
2618       if (0 != offset % sizeof (uint16_t))
2619         {
2620           /* outch, need to copy to access header */
2621           memcpy (&mh, &buffer[offset], sizeof (struct GNUNET_MessageHeader));
2622           mhp = &mh;
2623         }
2624       else
2625         {
2626           /* can access header directly */
2627           mhp = (struct GNUNET_MessageHeader *) &buffer[offset];
2628         }
2629       msize = ntohs (mhp->size);
2630       if (msize + offset > buffer_size)
2631         {
2632           /* malformed message, header says it is larger than what
2633              would fit into the overall buffer */
2634           GNUNET_break_op (0);
2635           break;
2636         }
2637 #if HAVE_UNALIGNED_64_ACCESS
2638       need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
2639 #else
2640       need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
2641 #endif
2642       if (GNUNET_YES == need_align)
2643         align_and_deliver (sender, &buffer[offset], msize);
2644       else
2645         deliver_message (sender,
2646                          (const struct GNUNET_MessageHeader *)
2647                          &buffer[offset], msize);
2648       offset += msize;
2649     }
2650 }
2651
2652
2653 /**
2654  * We received an encrypted message.  Decrypt, validate and
2655  * pass on to the appropriate clients.
2656  */
2657 static void
2658 handle_encrypted_message (struct Neighbour *n,
2659                           const struct EncryptedMessage *m)
2660 {
2661   size_t size = ntohs (m->header.size);
2662   char buf[size];
2663   struct EncryptedMessage *pt;  /* plaintext */
2664   GNUNET_HashCode ph;
2665   size_t off;
2666   uint32_t snum;
2667   struct GNUNET_TIME_Absolute t;
2668
2669 #if DEBUG_CORE
2670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2671               "Core service receives `%s' request from `%4s'.\n",
2672               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
2673 #endif  
2674   /* decrypt */
2675   if (GNUNET_OK !=
2676       do_decrypt (n,
2677                   &m->plaintext_hash,
2678                   &m->sequence_number,
2679                   &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
2680     return;
2681   pt = (struct EncryptedMessage *) buf;
2682
2683   /* validate hash */
2684   GNUNET_CRYPTO_hash (&pt->sequence_number,
2685                       size - ENCRYPTED_HEADER_SIZE, &ph);
2686   if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
2687     {
2688       /* checksum failed */
2689       GNUNET_break_op (0);
2690       return;
2691     }
2692
2693   /* validate sequence number */
2694   snum = ntohl (pt->sequence_number);
2695   if (n->last_sequence_number_received == snum)
2696     {
2697       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2698                   "Received duplicate message, ignoring.\n");
2699       /* duplicate, ignore */
2700       return;
2701     }
2702   if ((n->last_sequence_number_received > snum) &&
2703       (n->last_sequence_number_received - snum > 32))
2704     {
2705       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2706                   "Received ancient out of sequence message, ignoring.\n");
2707       /* ancient out of sequence, ignore */
2708       return;
2709     }
2710   if (n->last_sequence_number_received > snum)
2711     {
2712       unsigned int rotbit =
2713         1 << (n->last_sequence_number_received - snum - 1);
2714       if ((n->last_packets_bitmap & rotbit) != 0)
2715         {
2716           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2717                       "Received duplicate message, ignoring.\n");
2718           /* duplicate, ignore */
2719           return;
2720         }
2721       n->last_packets_bitmap |= rotbit;
2722     }
2723   if (n->last_sequence_number_received < snum)
2724     {
2725       n->last_packets_bitmap <<= (snum - n->last_sequence_number_received);
2726       n->last_sequence_number_received = snum;
2727     }
2728
2729   /* check timestamp */
2730   t = GNUNET_TIME_absolute_ntoh (pt->timestamp);
2731   if (GNUNET_TIME_absolute_get_duration (t).value > MAX_MESSAGE_AGE.value)
2732     {
2733       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2734                   _
2735                   ("Message received far too old (%llu ms). Content ignored.\n"),
2736                   GNUNET_TIME_absolute_get_duration (t).value);
2737       return;
2738     }
2739
2740   /* process decrypted message(s) */
2741   update_window (GNUNET_YES,
2742                  &n->available_send_window,
2743                  &n->last_asw_update,
2744                  n->bpm_out);
2745   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
2746   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
2747                            n->bpm_out_internal_limit);
2748   n->last_activity = GNUNET_TIME_absolute_get ();
2749   off = sizeof (struct EncryptedMessage);
2750   deliver_messages (n, buf, size, off);
2751 }
2752
2753
2754 /**
2755  * Function called by the transport for each received message.
2756  *
2757  * @param cls closure
2758  * @param latency estimated latency for communicating with the
2759  *             given peer
2760  * @param peer (claimed) identity of the other peer
2761  * @param message the message
2762  */
2763 static void
2764 handle_transport_receive (void *cls,
2765                           struct GNUNET_TIME_Relative latency,
2766                           const struct GNUNET_PeerIdentity *peer,
2767                           const struct GNUNET_MessageHeader *message)
2768 {
2769   struct Neighbour *n;
2770   struct GNUNET_TIME_Absolute now;
2771   int up;
2772   uint16_t type;
2773   uint16_t size;
2774
2775 #if DEBUG_CORE
2776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2777               "Received message of type %u from `%4s', demultiplexing.\n",
2778               ntohs (message->type), GNUNET_i2s (peer));
2779 #endif
2780   n = find_neighbour (peer);
2781   if (n == NULL)
2782     {
2783       GNUNET_break (0);
2784       return;
2785     }
2786   n->last_latency = latency;
2787   up = n->status == PEER_STATE_KEY_CONFIRMED;
2788   type = ntohs (message->type);
2789   size = ntohs (message->size);
2790   switch (type)
2791     {
2792     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
2793       if (size != sizeof (struct SetKeyMessage))
2794         {
2795           GNUNET_break_op (0);
2796           return;
2797         }
2798       handle_set_key (n, (const struct SetKeyMessage *) message);
2799       break;
2800     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
2801       if (size < sizeof (struct EncryptedMessage) +
2802           sizeof (struct GNUNET_MessageHeader))
2803         {
2804           GNUNET_break_op (0);
2805           return;
2806         }
2807       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2808           (n->status != PEER_STATE_KEY_CONFIRMED))
2809         {
2810           GNUNET_break_op (0);
2811           return;
2812         }
2813       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
2814       break;
2815     case GNUNET_MESSAGE_TYPE_CORE_PING:
2816       if (size != sizeof (struct PingMessage))
2817         {
2818           GNUNET_break_op (0);
2819           return;
2820         }
2821       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
2822           (n->status != PEER_STATE_KEY_CONFIRMED))
2823         {
2824 #if DEBUG_CORE
2825           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2826                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
2827                       "PING", GNUNET_i2s (&n->peer));
2828 #endif
2829           GNUNET_free_non_null (n->pending_ping);
2830           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
2831           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
2832           return;
2833         }
2834       handle_ping (n, (const struct PingMessage *) message);
2835       break;
2836     case GNUNET_MESSAGE_TYPE_CORE_PONG:
2837       if (size != sizeof (struct PingMessage))
2838         {
2839           GNUNET_break_op (0);
2840           return;
2841         }
2842       if ((n->status != PEER_STATE_KEY_SENT) &&
2843           (n->status != PEER_STATE_KEY_RECEIVED) &&
2844           (n->status != PEER_STATE_KEY_CONFIRMED))
2845         {
2846           /* could not decrypt pong, oops! */
2847           GNUNET_break_op (0);
2848           return;
2849         }
2850       handle_pong (n, (const struct PingMessage *) message);
2851       break;
2852     default:
2853       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2854                   _("Unsupported message of type %u received.\n"), type);
2855       return;
2856     }
2857   if (n->status == PEER_STATE_KEY_CONFIRMED)
2858     {
2859       now = GNUNET_TIME_absolute_get ();
2860       n->last_activity = now;
2861       if (!up)
2862         n->time_established = now;
2863     }
2864 }
2865
2866
2867 /**
2868  * Function that recalculates the bandwidth quota for the
2869  * given neighbour and transmits it to the transport service.
2870  * 
2871  * @param cls neighbour for the quota update
2872  * @param tc context
2873  */
2874 static void
2875 neighbour_quota_update (void *cls,
2876                         const struct GNUNET_SCHEDULER_TaskContext *tc);
2877
2878
2879 /**
2880  * Schedule the task that will recalculate the bandwidth
2881  * quota for this peer (and possibly force a disconnect of
2882  * idle peers by calculating a bandwidth of zero).
2883  */
2884 static void
2885 schedule_quota_update (struct Neighbour *n)
2886 {
2887   GNUNET_assert (n->quota_update_task ==
2888                  GNUNET_SCHEDULER_NO_TASK);
2889   n->quota_update_task
2890     = GNUNET_SCHEDULER_add_delayed (sched,
2891                                     QUOTA_UPDATE_FREQUENCY,
2892                                     &neighbour_quota_update,
2893                                     n);
2894 }
2895
2896
2897 /**
2898  * Function that recalculates the bandwidth quota for the
2899  * given neighbour and transmits it to the transport service.
2900  * 
2901  * @param cls neighbour for the quota update
2902  * @param tc context
2903  */
2904 static void
2905 neighbour_quota_update (void *cls,
2906                         const struct GNUNET_SCHEDULER_TaskContext *tc)
2907 {
2908   struct Neighbour *n = cls;
2909   uint32_t q_in;
2910   double pref_rel;
2911   double share;
2912   unsigned long long distributable;
2913   
2914   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
2915   /* calculate relative preference among all neighbours;
2916      divides by a bit more to avoid division by zero AND to
2917      account for possibility of new neighbours joining any time 
2918      AND to convert to double... */
2919   pref_rel = n->current_preference / (1.0 + preference_sum);
2920   distributable = 0;
2921   if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
2922     distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
2923   share = distributable * pref_rel;
2924   q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
2925   /* check if we want to disconnect for good due to inactivity */
2926   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
2927        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
2928     q_in = 0; /* force disconnect */
2929   if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
2930        (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
2931     {
2932       n->bpm_in = q_in;
2933       GNUNET_TRANSPORT_set_quota (transport,
2934                                   &n->peer,
2935                                   n->bpm_in, 
2936                                   n->bpm_out,
2937                                   GNUNET_TIME_UNIT_FOREVER_REL,
2938                                   NULL, NULL);
2939     }
2940   schedule_quota_update (n);
2941 }
2942
2943
2944 /**
2945  * Function called by transport to notify us that
2946  * a peer connected to us (on the network level).
2947  *
2948  * @param cls closure
2949  * @param peer the peer that connected
2950  * @param latency current latency of the connection
2951  */
2952 static void
2953 handle_transport_notify_connect (void *cls,
2954                                  const struct GNUNET_PeerIdentity *peer,
2955                                  struct GNUNET_TIME_Relative latency)
2956 {
2957   struct Neighbour *n;
2958   struct GNUNET_TIME_Absolute now;
2959   struct ConnectNotifyMessage cnm;
2960
2961   n = find_neighbour (peer);
2962   if (n != NULL)
2963     {
2964       /* duplicate connect notification!? */
2965       GNUNET_break (0);
2966       return;
2967     }
2968   now = GNUNET_TIME_absolute_get ();
2969   n = GNUNET_malloc (sizeof (struct Neighbour));
2970   n->next = neighbours;
2971   neighbours = n;
2972   neighbour_count++;
2973   n->peer = *peer;
2974   n->last_latency = latency;
2975   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
2976   n->encrypt_key_created = now;
2977   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
2978   n->last_asw_update = now;
2979   n->last_arw_update = now;
2980   n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2981   n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2982   n->bpm_out_internal_limit = (uint32_t) - 1;
2983   n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
2984   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2985                                                 (uint32_t) - 1);
2986 #if DEBUG_CORE
2987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2988               "Received connection from `%4s'.\n",
2989               GNUNET_i2s (&n->peer));
2990 #endif
2991   schedule_quota_update (n);
2992   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
2993   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
2994   cnm.reserved = htonl (0);
2995   cnm.peer = *peer;
2996   send_to_all_clients (&cnm.header, GNUNET_YES);
2997 }
2998
2999
3000 /**
3001  * Free the given entry for the neighbour (it has
3002  * already been removed from the list at this point).
3003  *
3004  * @param n neighbour to free
3005  */
3006 static void
3007 free_neighbour (struct Neighbour *n)
3008 {
3009   struct MessageEntry *m;
3010
3011   while (NULL != (m = n->messages))
3012     {
3013       n->messages = m->next;
3014       GNUNET_free (m);
3015     }
3016   while (NULL != (m = n->encrypted_head))
3017     {
3018       n->encrypted_head = m->next;
3019       GNUNET_free (m);
3020     }
3021   if (NULL != n->th)
3022     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
3023   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
3024     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
3025   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
3026     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
3027   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
3028     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
3029   GNUNET_free_non_null (n->public_key);
3030   GNUNET_free_non_null (n->pending_ping);
3031   GNUNET_free (n);
3032 }
3033
3034
3035 /**
3036  * Function called by transport telling us that a peer
3037  * disconnected.
3038  *
3039  * @param cls closure
3040  * @param peer the peer that disconnected
3041  */
3042 static void
3043 handle_transport_notify_disconnect (void *cls,
3044                                     const struct GNUNET_PeerIdentity *peer)
3045 {
3046   struct ConnectNotifyMessage cnm;
3047   struct Neighbour *n;
3048   struct Neighbour *p;
3049
3050 #if DEBUG_CORE
3051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
3053 #endif
3054   p = NULL;
3055   n = neighbours;
3056   while ((n != NULL) &&
3057          (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
3058     {
3059       p = n;
3060       n = n->next;
3061     }
3062   if (n == NULL)
3063     {
3064       GNUNET_break (0);
3065       return;
3066     }
3067   if (p == NULL)
3068     neighbours = n->next;
3069   else
3070     p->next = n->next;
3071   GNUNET_assert (neighbour_count > 0);
3072   neighbour_count--;
3073   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
3074   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
3075   cnm.reserved = htonl (0);
3076   cnm.peer = *peer;
3077   send_to_all_clients (&cnm.header, GNUNET_YES);
3078   free_neighbour (n);
3079 }
3080
3081
3082 /**
3083  * Last task run during shutdown.  Disconnects us from
3084  * the transport.
3085  */
3086 static void
3087 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
3088 {
3089   struct Neighbour *n;
3090   struct Client *c;
3091
3092 #if DEBUG_CORE
3093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3094               "Core service shutting down.\n");
3095 #endif
3096   GNUNET_assert (transport != NULL);
3097   GNUNET_TRANSPORT_disconnect (transport);
3098   transport = NULL;
3099   while (NULL != (n = neighbours))
3100     {
3101       neighbours = n->next;
3102       GNUNET_assert (neighbour_count > 0);
3103       neighbour_count--;
3104       free_neighbour (n);
3105     }
3106   while (NULL != (c = clients))
3107     handle_client_disconnect (NULL, c->client_handle);
3108   if (my_private_key != NULL)
3109     GNUNET_CRYPTO_rsa_key_free (my_private_key);
3110 }
3111
3112
3113 /**
3114  * Initiate core service.
3115  *
3116  * @param cls closure
3117  * @param s scheduler to use
3118  * @param serv the initialized server
3119  * @param c configuration to use
3120  */
3121 static void
3122 run (void *cls,
3123      struct GNUNET_SCHEDULER_Handle *s,
3124      struct GNUNET_SERVER_Handle *serv,
3125      const struct GNUNET_CONFIGURATION_Handle *c)
3126 {
3127 #if 0
3128   unsigned long long qin;
3129   unsigned long long qout;
3130   unsigned long long tneigh;
3131 #endif
3132   char *keyfile;
3133
3134   sched = s;
3135   cfg = c;
3136   /* parse configuration */
3137   if (
3138        (GNUNET_OK !=
3139         GNUNET_CONFIGURATION_get_value_number (c,
3140                                                "CORE",
3141                                                "TOTAL_QUOTA_IN",
3142                                                &bandwidth_target_in)) ||
3143        (GNUNET_OK !=
3144         GNUNET_CONFIGURATION_get_value_number (c,
3145                                                "CORE",
3146                                                "TOTAL_QUOTA_OUT",
3147                                                &bandwidth_target_out)) ||
3148 #if 0
3149        (GNUNET_OK !=
3150         GNUNET_CONFIGURATION_get_value_number (c,
3151                                                "CORE",
3152                                                "YY",
3153                                                &qout)) ||
3154        (GNUNET_OK !=
3155         GNUNET_CONFIGURATION_get_value_number (c,
3156                                                "CORE",
3157                                                "ZZ_LIMIT", &tneigh)) ||
3158 #endif
3159        (GNUNET_OK !=
3160         GNUNET_CONFIGURATION_get_value_filename (c,
3161                                                  "GNUNETD",
3162                                                  "HOSTKEY", &keyfile)))
3163     {
3164       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3165                   _
3166                   ("Core service is lacking key configuration settings.  Exiting.\n"));
3167       GNUNET_SCHEDULER_shutdown (s);
3168       return;
3169     }
3170   my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile);
3171   GNUNET_free (keyfile);
3172   if (my_private_key == NULL)
3173     {
3174       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3175                   _("Core service could not access hostkey.  Exiting.\n"));
3176       GNUNET_SCHEDULER_shutdown (s);
3177       return;
3178     }
3179   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
3180   GNUNET_CRYPTO_hash (&my_public_key,
3181                       sizeof (my_public_key), &my_identity.hashPubKey);
3182   /* setup notification */
3183   server = serv;
3184   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
3185   /* setup transport connection */
3186   transport = GNUNET_TRANSPORT_connect (sched,
3187                                         cfg,
3188                                         NULL,
3189                                         &handle_transport_receive,
3190                                         &handle_transport_notify_connect,
3191                                         &handle_transport_notify_disconnect);
3192   GNUNET_assert (NULL != transport);
3193   GNUNET_SCHEDULER_add_delayed (sched,
3194                                 GNUNET_TIME_UNIT_FOREVER_REL,
3195                                 &cleaning_task, NULL);
3196   /* process client requests */
3197   GNUNET_SERVER_add_handlers (server, handlers);
3198   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3199               _("Core service of `%4s' ready.\n"), GNUNET_i2s (&my_identity));
3200 }
3201
3202
3203
3204 /**
3205  * The main function for the transport service.
3206  *
3207  * @param argc number of arguments from the command line
3208  * @param argv command line arguments
3209  * @return 0 ok, 1 on error
3210  */
3211 int
3212 main (int argc, char *const *argv)
3213 {
3214   return (GNUNET_OK ==
3215           GNUNET_SERVICE_run (argc,
3216                               argv,
3217                               "core", &run, NULL)) ? 0 : 1;
3218 }
3219
3220 /* end of gnunet-service-core.c */