23d53022e3c3d9e7ccccb6ecaf7462c1e9acf02a
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done) 
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_service_core.h"
28 #include "gnunet_service_core_neighbours.h"
29 #include "gnunet_service_core_kx.h"
30 #include "gnunet_service_core_sessions.h"
31
32 /**
33  * Record kept for each request for transmission issued by a
34  * client that is still pending.
35  */
36 struct ClientActiveRequest;
37
38 /**
39  * Data kept per session.
40  */
41 struct Session
42 {
43   /**
44    * Identity of the other peer.
45    */
46   struct GNUNET_PeerIdentity peer;
47
48   /**
49    * Head of list of requests from clients for transmission to
50    * this peer.
51    */
52   struct ClientActiveRequest *active_client_request_head;
53
54   /**
55    * Tail of list of requests from clients for transmission to
56    * this peer.
57    */
58   struct ClientActiveRequest *active_client_request_tail;
59
60   /**
61    * Performance data for the peer.
62    */
63   struct GNUNET_TRANSPORT_ATS_Information *ats;
64
65   /**
66    * Information about the key exchange with the other peer.
67    */
68   struct GSC_KeyExchangeInfo *kxinfo;
69
70
71   /**
72    * ID of task used for cleaning up dead neighbour entries.
73    */
74   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
75
76   /**
77    * ID of task used for updating bandwidth quota for this neighbour.
78    */
79   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
80
81   /**
82    * At what time did we initially establish (as in, complete session
83    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
84    */
85   struct GNUNET_TIME_Absolute time_established;
86
87   /**
88    * At what time did we last receive an encrypted message from the
89    * other peer?  Should be zero if status != KEY_CONFIRMED.
90    */
91   struct GNUNET_TIME_Absolute last_activity;
92
93   /**
94    * How valueable were the messages of this peer recently?
95    */
96   unsigned long long current_preference;
97
98   /**
99    * Number of entries in 'ats'.
100    */
101   unsigned int ats_count;
102
103   /**
104    * Bit map indicating which of the 32 sequence numbers before the last
105    * were received (good for accepting out-of-order packets and
106    * estimating reliability of the connection)
107    */
108   unsigned int last_packets_bitmap;
109
110   /**
111    * last sequence number received on this connection (highest)
112    */
113   uint32_t last_sequence_number_received;
114
115   /**
116    * last sequence number transmitted
117    */
118   uint32_t last_sequence_number_sent;
119
120   /**
121    * Available bandwidth in for this peer (current target).
122    */
123   struct GNUNET_BANDWIDTH_Value32NBO bw_in;
124
125   /**
126    * Available bandwidth out for this peer (current target).
127    */
128   struct GNUNET_BANDWIDTH_Value32NBO bw_out;
129
130   /**
131    * Internal bandwidth limit set for this peer (initially typically
132    * set to "-1").  Actual "bw_out" is MIN of
133    * "bpm_out_internal_limit" and "bw_out_external_limit".
134    */
135   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
136
137   /**
138    * External bandwidth limit set for this peer by the
139    * peer that we are communicating with.  "bw_out" is MIN of
140    * "bw_out_internal_limit" and "bw_out_external_limit".
141    */
142   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
143
144 };
145
146
147 /**
148  * Map of peer identities to 'struct Session'.
149  */
150 static struct GNUNET_CONTAINER_MultiHashMap *sessions;
151
152
153 /**
154  * Session entry for "this" peer.
155  */
156 static struct Session self;
157
158 /**
159  * Sum of all preferences among all neighbours.
160  */
161 static unsigned long long preference_sum;
162
163
164 // FIXME.........
165
166 /**
167  * At what time should the connection to the given neighbour
168  * time out (given no further activity?)
169  *
170  * @param n neighbour in question
171  * @return absolute timeout
172  */
173 static struct GNUNET_TIME_Absolute
174 get_neighbour_timeout (struct Neighbour *n)
175 {
176   return GNUNET_TIME_absolute_add (n->last_activity,
177                                    GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
178 }
179
180
181 /**
182  * Helper function for update_preference_sum.
183  */
184 static int
185 update_preference (void *cls, const GNUNET_HashCode * key, void *value)
186 {
187   unsigned long long *ps = cls;
188   struct Neighbour *n = value;
189
190   n->current_preference /= 2;
191   *ps += n->current_preference;
192   return GNUNET_OK;
193 }
194
195
196 /**
197  * A preference value for a neighbour was update.  Update
198  * the preference sum accordingly.
199  *
200  * @param inc how much was a preference value increased?
201  */
202 static void
203 update_preference_sum (unsigned long long inc)
204 {
205   unsigned long long os;
206
207   os = preference_sum;
208   preference_sum += inc;
209   if (preference_sum >= os)
210     return;                     /* done! */
211   /* overflow! compensate by cutting all values in half! */
212   preference_sum = 0;
213   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference,
214                                          &preference_sum);
215   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"),
216                          preference_sum, GNUNET_NO);
217 }
218
219
220 /**
221  * Find the entry for the given neighbour.
222  *
223  * @param peer identity of the neighbour
224  * @return NULL if we are not connected, otherwise the
225  *         neighbour's entry.
226  */
227 static struct Neighbour *
228 find_neighbour (const struct GNUNET_PeerIdentity *peer)
229 {
230   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
231 }
232
233
234 /**
235  * Function called by transport telling us that a peer
236  * changed status.
237  *
238  * @param n the peer that changed status
239  */
240 static void
241 handle_peer_status_change (struct Neighbour *n)
242 {
243   struct PeerStatusNotifyMessage *psnm;
244   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
245   struct GNUNET_TRANSPORT_ATS_Information *ats;
246   size_t size;
247
248   if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED))
249     return;
250 #if DEBUG_CORE > 1
251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n",
252               GNUNET_i2s (&n->peer));
253 #endif
254   size =
255       sizeof (struct PeerStatusNotifyMessage) +
256       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
257   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
258   {
259     GNUNET_break (0);
260     /* recovery strategy: throw away performance data */
261     GNUNET_array_grow (n->ats, n->ats_count, 0);
262     size =
263         sizeof (struct PeerStatusNotifyMessage) +
264         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
265   }
266   psnm = (struct PeerStatusNotifyMessage *) buf;
267   psnm->header.size = htons (size);
268   psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE);
269   psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n));
270   psnm->bandwidth_in = n->bw_in;
271   psnm->bandwidth_out = n->bw_out;
272   psnm->peer = n->peer;
273   psnm->ats_count = htonl (n->ats_count);
274   ats = &psnm->ats;
275   memcpy (ats, n->ats,
276           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
277   ats[n->ats_count].type = htonl (0);
278   ats[n->ats_count].value = htonl (0);
279   send_to_all_clients (&psnm->header, GNUNET_YES,
280                        GNUNET_CORE_OPTION_SEND_STATUS_CHANGE);
281   GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1,
282                             GNUNET_NO);
283 }
284
285
286
287 /**
288  * Go over our message queue and if it is not too long, go
289  * over the pending requests from clients for this
290  * neighbour and send some clients a 'READY' notification.
291  *
292  * @param n which peer to process
293  */
294 static void
295 schedule_peer_messages (struct Neighbour *n)
296 {
297   struct SendMessageReady smr;
298   struct ClientActiveRequest *car;
299   struct ClientActiveRequest *pos;
300   struct Client *c;
301   struct MessageEntry *mqe;
302   unsigned int queue_size;
303
304   /* check if neighbour queue is empty enough! */
305   if (n != &self)
306   {
307     queue_size = 0;
308     mqe = n->messages;
309     while (mqe != NULL)
310     {
311       queue_size++;
312       mqe = mqe->next;
313     }
314     if (queue_size >= MAX_PEER_QUEUE_SIZE)
315     {
316 #if DEBUG_CORE_CLIENT
317       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
318                   "Not considering client transmission requests: queue full\n");
319 #endif
320       return;                   /* queue still full */
321     }
322     /* find highest priority request */
323     pos = n->active_client_request_head;
324     car = NULL;
325     while (pos != NULL)
326     {
327       if ((car == NULL) || (pos->priority > car->priority))
328         car = pos;
329       pos = pos->next;
330     }
331   }
332   else
333   {
334     car = n->active_client_request_head;
335   }
336   if (car == NULL)
337     return;                     /* no pending requests */
338 #if DEBUG_CORE_CLIENT
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Permitting client transmission request to `%s'\n",
341               GNUNET_i2s (&n->peer));
342 #endif
343   c = car->client;
344   GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
345                                n->active_client_request_tail, car);
346   GNUNET_assert (GNUNET_YES ==
347                  GNUNET_CONTAINER_multihashmap_remove (c->requests,
348                                                        &n->peer.hashPubKey,
349                                                        car));
350   smr.header.size = htons (sizeof (struct SendMessageReady));
351   smr.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_READY);
352   smr.size = htons (car->msize);
353   smr.smr_id = car->smr_id;
354   smr.peer = n->peer;
355   send_to_client (c, &smr.header, GNUNET_NO);
356   GNUNET_free (car);
357 }
358
359
360
361 /**
362  * Free the given entry for the neighbour (it has
363  * already been removed from the list at this point).
364  *
365  * @param n neighbour to free
366  */
367 static void
368 free_neighbour (struct Neighbour *n)
369 {
370   struct MessageEntry *m;
371   struct ClientActiveRequest *car;
372
373 #if DEBUG_CORE
374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
375               "Destroying neighbour entry for peer `%4s'\n",
376               GNUNET_i2s (&n->peer));
377 #endif
378   if (n->skm != NULL)
379   {
380     GNUNET_free (n->skm);
381     n->skm = NULL;
382   }
383   while (NULL != (m = n->messages))
384   {
385     n->messages = m->next;
386     GNUNET_free (m);
387   }
388   while (NULL != (m = n->encrypted_head))
389   {
390     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
391     GNUNET_free (m);
392   }
393   while (NULL != (car = n->active_client_request_head))
394   {
395     GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
396                                  n->active_client_request_tail, car);
397     GNUNET_assert (GNUNET_YES ==
398                    GNUNET_CONTAINER_multihashmap_remove (car->client->requests,
399                                                          &n->peer.hashPubKey,
400                                                          car));
401     GNUNET_free (car);
402   }
403   if (NULL != n->th)
404   {
405     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
406     n->th = NULL;
407   }
408   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
409     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
410   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
411     GNUNET_SCHEDULER_cancel (n->quota_update_task);
412   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
413     GNUNET_SCHEDULER_cancel (n->keep_alive_task);
414   if (n->status == PEER_STATE_KEY_CONFIRMED)
415     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
416                               -1, GNUNET_NO);
417   GNUNET_array_grow (n->ats, n->ats_count, 0);
418   GNUNET_free_non_null (n->pending_ping);
419   GNUNET_free_non_null (n->pending_pong);
420   GNUNET_free (n);
421 }
422
423
424
425 /**
426  * Consider freeing the given neighbour since we may not need
427  * to keep it around anymore.
428  *
429  * @param n neighbour to consider discarding
430  */
431 static void
432 consider_free_neighbour (struct Neighbour *n);
433
434
435 /**
436  * Task triggered when a neighbour entry might have gotten stale.
437  *
438  * @param cls the 'struct Neighbour'
439  * @param tc scheduler context (not used)
440  */
441 static void
442 consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
443 {
444   struct Neighbour *n = cls;
445
446   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
447   consider_free_neighbour (n);
448 }
449
450
451 /**
452  * Consider freeing the given neighbour since we may not need
453  * to keep it around anymore.
454  *
455  * @param n neighbour to consider discarding
456  */
457 static void
458 consider_free_neighbour (struct Neighbour *n)
459 {
460   struct GNUNET_TIME_Relative left;
461
462   if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected))
463     return;                     /* no chance */
464
465   left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
466   if (left.rel_value > 0)
467   {
468     if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
469       GNUNET_SCHEDULER_cancel (n->dead_clean_task);
470     n->dead_clean_task =
471         GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n);
472     return;
473   }
474   /* actually free the neighbour... */
475   GNUNET_assert (GNUNET_YES ==
476                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
477                                                        &n->peer.hashPubKey, n));
478   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
479                          GNUNET_CONTAINER_multihashmap_size (neighbours),
480                          GNUNET_NO);
481   free_neighbour (n);
482 }
483
484
485 /**
486  * Function called when the transport service is ready to
487  * receive an encrypted message for the respective peer
488  *
489  * @param cls neighbour to use message from
490  * @param size number of bytes we can transmit
491  * @param buf where to copy the message
492  * @return number of bytes transmitted
493  */
494 static size_t
495 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
496 {
497   struct Neighbour *n = cls;
498   struct MessageEntry *m;
499   size_t ret;
500   char *cbuf;
501
502   n->th = NULL;
503   m = n->encrypted_head;
504   if (m == NULL)
505   {
506 #if DEBUG_CORE
507     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508                 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
509                 GNUNET_i2s (&n->peer));
510 #endif
511     return 0;
512   }
513   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
514   ret = 0;
515   cbuf = buf;
516   if (buf != NULL)
517   {
518     GNUNET_assert (size >= m->size);
519     memcpy (cbuf, &m[1], m->size);
520     ret = m->size;
521     GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
522 #if DEBUG_CORE
523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                 "Copied message of type %u and size %u into transport buffer for `%4s'\n",
525                 (unsigned int)
526                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
527                 (unsigned int) ret, GNUNET_i2s (&n->peer));
528 #endif
529     process_encrypted_neighbour_queue (n);
530   }
531   else
532   {
533 #if DEBUG_CORE
534     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535                 "Transmission of message of type %u and size %u failed\n",
536                 (unsigned int)
537                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
538                 (unsigned int) m->size);
539 #endif
540   }
541   GNUNET_free (m);
542   consider_free_neighbour (n);
543   GNUNET_STATISTICS_update (stats,
544                             gettext_noop
545                             ("# encrypted bytes given to transport"), ret,
546                             GNUNET_NO);
547   return ret;
548 }
549
550
551
552
553
554 /**
555  * Select messages for transmission.  This heuristic uses a combination
556  * of earliest deadline first (EDF) scheduling (with bounded horizon)
557  * and priority-based discard (in case no feasible schedule exist) and
558  * speculative optimization (defer any kind of transmission until
559  * we either create a batch of significant size, 25% of max, or until
560  * we are close to a deadline).  Furthermore, when scheduling the
561  * heuristic also packs as many messages into the batch as possible,
562  * starting with those with the earliest deadline.  Yes, this is fun.
563  *
564  * @param n neighbour to select messages from
565  * @param size number of bytes to select for transmission
566  * @param retry_time set to the time when we should try again
567  *        (only valid if this function returns zero)
568  * @return number of bytes selected, or 0 if we decided to
569  *         defer scheduling overall; in that case, retry_time is set.
570  */
571 static size_t
572 select_messages (struct Neighbour *n, size_t size,
573                  struct GNUNET_TIME_Relative *retry_time)
574 {
575   struct MessageEntry *pos;
576   struct MessageEntry *min;
577   struct MessageEntry *last;
578   unsigned int min_prio;
579   struct GNUNET_TIME_Absolute t;
580   struct GNUNET_TIME_Absolute now;
581   struct GNUNET_TIME_Relative delta;
582   uint64_t avail;
583   struct GNUNET_TIME_Relative slack;    /* how long could we wait before missing deadlines? */
584   size_t off;
585   uint64_t tsize;
586   unsigned int queue_size;
587   int discard_low_prio;
588
589   GNUNET_assert (NULL != n->messages);
590   now = GNUNET_TIME_absolute_get ();
591   /* last entry in linked list of messages processed */
592   last = NULL;
593   /* should we remove the entry with the lowest
594    * priority from consideration for scheduling at the
595    * end of the loop? */
596   queue_size = 0;
597   tsize = 0;
598   pos = n->messages;
599   while (pos != NULL)
600   {
601     queue_size++;
602     tsize += pos->size;
603     pos = pos->next;
604   }
605   discard_low_prio = GNUNET_YES;
606   while (GNUNET_YES == discard_low_prio)
607   {
608     min = NULL;
609     min_prio = UINT_MAX;
610     discard_low_prio = GNUNET_NO;
611     /* calculate number of bytes available for transmission at time "t" */
612     avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
613     t = now;
614     /* how many bytes have we (hypothetically) scheduled so far */
615     off = 0;
616     /* maximum time we can wait before transmitting anything
617      * and still make all of our deadlines */
618     slack = GNUNET_TIME_UNIT_FOREVER_REL;
619     pos = n->messages;
620     /* note that we use "*2" here because we want to look
621      * a bit further into the future; much more makes no
622      * sense since new message might be scheduled in the
623      * meantime... */
624     while ((pos != NULL) && (off < size * 2))
625     {
626       if (pos->do_transmit == GNUNET_YES)
627       {
628         /* already removed from consideration */
629         pos = pos->next;
630         continue;
631       }
632       if (discard_low_prio == GNUNET_NO)
633       {
634         delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
635         if (delta.rel_value > 0)
636         {
637           // FIXME: HUH? Check!
638           t = pos->deadline;
639           avail +=
640               GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
641         }
642         if (avail < pos->size)
643         {
644           // FIXME: HUH? Check!
645           discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
646         }
647         else
648         {
649           avail -= pos->size;
650           /* update slack, considering both its absolute deadline
651            * and relative deadlines caused by other messages
652            * with their respective load */
653           slack =
654               GNUNET_TIME_relative_min (slack,
655                                         GNUNET_BANDWIDTH_value_get_delay_for
656                                         (n->bw_out, avail));
657           if (pos->deadline.abs_value <= now.abs_value)
658           {
659             /* now or never */
660             slack = GNUNET_TIME_UNIT_ZERO;
661           }
662           else if (GNUNET_YES == pos->got_slack)
663           {
664             /* should be soon now! */
665             slack =
666                 GNUNET_TIME_relative_min (slack,
667                                           GNUNET_TIME_absolute_get_remaining
668                                           (pos->slack_deadline));
669           }
670           else
671           {
672             slack =
673                 GNUNET_TIME_relative_min (slack,
674                                           GNUNET_TIME_absolute_get_difference
675                                           (now, pos->deadline));
676             pos->got_slack = GNUNET_YES;
677             pos->slack_deadline =
678                 GNUNET_TIME_absolute_min (pos->deadline,
679                                           GNUNET_TIME_relative_to_absolute
680                                           (GNUNET_CONSTANTS_MAX_CORK_DELAY));
681           }
682         }
683       }
684       off += pos->size;
685       t = GNUNET_TIME_absolute_max (pos->deadline, t);  // HUH? Check!
686       if (pos->priority <= min_prio)
687       {
688         /* update min for discard */
689         min_prio = pos->priority;
690         min = pos;
691       }
692       pos = pos->next;
693     }
694     if (discard_low_prio)
695     {
696       GNUNET_assert (min != NULL);
697       /* remove lowest-priority entry from consideration */
698       min->do_transmit = GNUNET_YES;    /* means: discard (for now) */
699     }
700     last = pos;
701   }
702   /* guard against sending "tiny" messages with large headers without
703    * urgent deadlines */
704   if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
705       (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
706   {
707     /* less than 25% of message would be filled with deadlines still
708      * being met if we delay by one second or more; so just wait for
709      * more data; but do not wait longer than 1s (since we don't want
710      * to delay messages for a really long time either). */
711     *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
712     /* reset do_transmit values for next time */
713     while (pos != last)
714     {
715       pos->do_transmit = GNUNET_NO;
716       pos = pos->next;
717     }
718     GNUNET_STATISTICS_update (stats,
719                               gettext_noop
720                               ("# transmissions delayed due to corking"), 1,
721                               GNUNET_NO);
722 #if DEBUG_CORE
723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
725                 (unsigned long long) retry_time->rel_value, (unsigned int) off,
726                 (unsigned int) size);
727 #endif
728     return 0;
729   }
730   /* select marked messages (up to size) for transmission */
731   off = 0;
732   pos = n->messages;
733   while (pos != last)
734   {
735     if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
736     {
737       pos->do_transmit = GNUNET_YES;    /* mark for transmission */
738       off += pos->size;
739       size -= pos->size;
740 #if DEBUG_CORE > 1
741       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
742                   "Selecting message of size %u for transmission\n",
743                   (unsigned int) pos->size);
744 #endif
745     }
746     else
747     {
748 #if DEBUG_CORE > 1
749       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750                   "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
751                   (unsigned int) pos->size, size);
752 #endif
753       pos->do_transmit = GNUNET_NO;     /* mark for not transmitting! */
754     }
755     pos = pos->next;
756   }
757 #if DEBUG_CORE
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
760               (unsigned long long) off, (unsigned long long) tsize, queue_size,
761               (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
762 #endif
763   return off;
764 }
765
766
767 /**
768  * Batch multiple messages into a larger buffer.
769  *
770  * @param n neighbour to take messages from
771  * @param buf target buffer
772  * @param size size of buf
773  * @param deadline set to transmission deadline for the result
774  * @param retry_time set to the time when we should try again
775  *        (only valid if this function returns zero)
776  * @param priority set to the priority of the batch
777  * @return number of bytes written to buf (can be zero)
778  */
779 static size_t
780 batch_message (struct Neighbour *n, char *buf, size_t size,
781                struct GNUNET_TIME_Absolute *deadline,
782                struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
783 {
784   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
785   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
786   struct MessageEntry *pos;
787   struct MessageEntry *prev;
788   struct MessageEntry *next;
789   size_t ret;
790
791   ret = 0;
792   *priority = 0;
793   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
794   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
795   if (0 == select_messages (n, size, retry_time))
796   {
797 #if DEBUG_CORE
798     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799                 "No messages selected, will try again in %llu ms\n",
800                 retry_time->rel_value);
801 #endif
802     return 0;
803   }
804   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
805   ntm->ats_count = htonl (0);
806   ntm->ats.type = htonl (0);
807   ntm->ats.value = htonl (0);
808   ntm->peer = n->peer;
809   pos = n->messages;
810   prev = NULL;
811   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
812   {
813     next = pos->next;
814     if (GNUNET_YES == pos->do_transmit)
815     {
816       GNUNET_assert (pos->size <= size);
817       /* do notifications */
818       /* FIXME: track if we have *any* client that wants
819        * full notifications and only do this if that is
820        * actually true */
821       if (pos->size <
822           GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
823       {
824         memcpy (&ntm[1], &pos[1], pos->size);
825         ntm->header.size =
826             htons (sizeof (struct NotifyTrafficMessage) +
827                    sizeof (struct GNUNET_MessageHeader));
828         send_to_all_clients (&ntm->header, GNUNET_YES,
829                              GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
830       }
831       else
832       {
833         /* message too large for 'full' notifications, we do at
834          * least the 'hdr' type */
835         memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
836       }
837       ntm->header.size =
838           htons (sizeof (struct NotifyTrafficMessage) + pos->size);
839       send_to_all_clients (&ntm->header, GNUNET_YES,
840                            GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
841 #if DEBUG_HANDSHAKE
842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843                   "Encrypting %u bytes with message of type %u and size %u\n",
844                   pos->size,
845                   (unsigned int)
846                   ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
847                   (unsigned int)
848                   ntohs (((const struct GNUNET_MessageHeader *)
849                           &pos[1])->size));
850 #endif
851       /* copy for encrypted transmission */
852       memcpy (&buf[ret], &pos[1], pos->size);
853       ret += pos->size;
854       size -= pos->size;
855       *priority += pos->priority;
856 #if DEBUG_CORE > 1
857       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858                   "Adding plaintext message of size %u with deadline %llu ms to batch\n",
859                   (unsigned int) pos->size,
860                   (unsigned long long)
861                   GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
862 #endif
863       deadline->abs_value =
864           GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
865       GNUNET_free (pos);
866       if (prev == NULL)
867         n->messages = next;
868       else
869         prev->next = next;
870     }
871     else
872     {
873       prev = pos;
874     }
875     pos = next;
876   }
877 #if DEBUG_CORE > 1
878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879               "Deadline for message batch is %llu ms\n",
880               GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
881 #endif
882   return ret;
883 }
884
885
886 /**
887  * Remove messages with deadlines that have long expired from
888  * the queue.
889  *
890  * @param n neighbour to inspect
891  */
892 static void
893 discard_expired_messages (struct Neighbour *n)
894 {
895   struct MessageEntry *prev;
896   struct MessageEntry *next;
897   struct MessageEntry *pos;
898   struct GNUNET_TIME_Absolute now;
899   struct GNUNET_TIME_Relative delta;
900   int disc;
901   unsigned int queue_length;
902
903   disc = GNUNET_NO;
904   now = GNUNET_TIME_absolute_get ();
905   prev = NULL;
906   queue_length = 0;
907   pos = n->messages;
908   while (pos != NULL)
909   {
910     queue_length++;
911     next = pos->next;
912     delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
913     if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
914     {
915 #if DEBUG_CORE
916       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
917                   "Message is %llu ms past due, discarding.\n",
918                   delta.rel_value);
919 #endif
920       if (prev == NULL)
921         n->messages = next;
922       else
923         prev->next = next;
924       GNUNET_STATISTICS_update (stats,
925                                 gettext_noop
926                                 ("# messages discarded (expired prior to transmission)"),
927                                 1, GNUNET_NO);
928       disc = GNUNET_YES;
929       GNUNET_free (pos);
930     }
931     else
932       prev = pos;
933     pos = next;
934   }
935   if ( (GNUNET_YES == disc) &&
936        (queue_length == MAX_PEER_QUEUE_SIZE) )
937     schedule_peer_messages (n);
938 }
939
940
941 /**
942  * Signature of the main function of a task.
943  *
944  * @param cls closure
945  * @param tc context information (why was this task triggered now)
946  */
947 static void
948 retry_plaintext_processing (void *cls,
949                             const struct GNUNET_SCHEDULER_TaskContext *tc)
950 {
951   struct Neighbour *n = cls;
952
953   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
954   process_plaintext_neighbour_queue (n);
955 }
956
957
958 /**
959  * Check if we have plaintext messages for the specified neighbour
960  * pending, and if so, consider batching and encrypting them (and
961  * then trigger processing of the encrypted queue if needed).
962  *
963  * @param n neighbour to check.
964  */
965 static void
966 process_plaintext_neighbour_queue (struct Neighbour *n)
967 {
968   char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)];    /* plaintext */
969   size_t used;
970   struct EncryptedMessage *em;  /* encrypted message */
971   struct EncryptedMessage *ph;  /* plaintext header */
972   struct MessageEntry *me;
973   unsigned int priority;
974   struct GNUNET_TIME_Absolute deadline;
975   struct GNUNET_TIME_Relative retry_time;
976   struct GNUNET_CRYPTO_AesInitializationVector iv;
977   struct GNUNET_CRYPTO_AuthKey auth_key;
978
979   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
980   {
981     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
982     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
983   }
984   switch (n->status)
985   {
986   case PEER_STATE_DOWN:
987     send_key (n);
988 #if DEBUG_CORE
989     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
991                 GNUNET_i2s (&n->peer));
992 #endif
993     return;
994   case PEER_STATE_KEY_SENT:
995     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
996       n->retry_set_key_task =
997           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
998                                         &set_key_retry_task, n);
999 #if DEBUG_CORE
1000     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1002                 GNUNET_i2s (&n->peer));
1003 #endif
1004     return;
1005   case PEER_STATE_KEY_RECEIVED:
1006     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
1007       n->retry_set_key_task =
1008           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
1009                                         &set_key_retry_task, n);
1010 #if DEBUG_CORE
1011     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
1013                 GNUNET_i2s (&n->peer));
1014 #endif
1015     return;
1016   case PEER_STATE_KEY_CONFIRMED:
1017     /* ready to continue */
1018     break;
1019   }
1020   discard_expired_messages (n);
1021   if (n->messages == NULL)
1022   {
1023 #if DEBUG_CORE
1024     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025                 "Plaintext message queue for `%4s' is empty.\n",
1026                 GNUNET_i2s (&n->peer));
1027 #endif
1028     return;                     /* no pending messages */
1029   }
1030   if (n->encrypted_head != NULL)
1031   {
1032 #if DEBUG_CORE > 2
1033     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034                 "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1035                 GNUNET_i2s (&n->peer));
1036 #endif
1037     return;                     /* wait for messages already encrypted to be
1038                                  * processed first! */
1039   }
1040   ph = (struct EncryptedMessage *) pbuf;
1041   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1042   priority = 0;
1043   used = sizeof (struct EncryptedMessage);
1044   used +=
1045       batch_message (n, &pbuf[used],
1046                      GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
1047                      &retry_time, &priority);
1048   if (used == sizeof (struct EncryptedMessage))
1049   {
1050 #if DEBUG_CORE > 1
1051     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1052                 "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1053                 GNUNET_i2s (&n->peer));
1054 #endif
1055     /* no messages selected for sending, try again later... */
1056     n->retry_plaintext_task =
1057         GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
1058                                       n);
1059     return;
1060   }
1061   GSC_KX_encrypt_and_transmit (n->kx,
1062                                &pbuf[struct EncryptedMessage],
1063                                used - sizeof (struct EncryptedMessage));
1064   schedule_peer_messages (n);
1065 }
1066
1067
1068
1069
1070 /**
1071  * Check if we have encrypted messages for the specified neighbour
1072  * pending, and if so, check with the transport about sending them
1073  * out.
1074  *
1075  * @param n neighbour to check.
1076  */
1077 static void
1078 process_encrypted_neighbour_queue (struct Neighbour *n)
1079 {
1080   struct MessageEntry *m;
1081
1082   if (n->th != NULL)
1083     return;                     /* request already pending */
1084   if (GNUNET_YES != n->is_connected)
1085   {
1086     GNUNET_break (0);
1087     return;
1088   }
1089   m = n->encrypted_head;
1090   if (m == NULL)
1091   {
1092     /* encrypted queue empty, try plaintext instead */
1093     process_plaintext_neighbour_queue (n);
1094     return;
1095   }
1096 #if DEBUG_CORE > 1
1097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1099               (unsigned int) m->size, GNUNET_i2s (&n->peer),
1100               (unsigned long long)
1101               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
1102 #endif
1103   n->th =
1104        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
1105                                               m->priority,
1106                                               GNUNET_TIME_absolute_get_remaining
1107                                               (m->deadline),
1108                                               &notify_encrypted_transmit_ready,
1109                                               n);
1110   if (n->th == NULL)
1111   {
1112     /* message request too large or duplicate request */
1113     GNUNET_break (0);
1114     /* discard encrypted message */
1115     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
1116     GNUNET_free (m);
1117     process_encrypted_neighbour_queue (n);
1118   }
1119 }
1120
1121
1122 /**
1123  * Initialize a new 'struct Neighbour'.
1124  *
1125  * @param pid ID of the new neighbour
1126  * @return handle for the new neighbour
1127  */
1128 static struct Neighbour *
1129 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1130 {
1131   struct Neighbour *n;
1132   struct GNUNET_TIME_Absolute now;
1133
1134 #if DEBUG_CORE
1135   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136               "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid));
1137 #endif
1138   n = GNUNET_malloc (sizeof (struct Neighbour));
1139   n->peer = *pid;
1140   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1141   now = GNUNET_TIME_absolute_get ();
1142   n->encrypt_key_created = now;
1143   n->last_activity = now;
1144   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1145   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1146   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1147   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
1148   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1149   n->ping_challenge =
1150       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1151   GNUNET_assert (GNUNET_OK ==
1152                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1153                                                     &n->peer.hashPubKey, n,
1154                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1155   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1156                          GNUNET_CONTAINER_multihashmap_size (neighbours),
1157                          GNUNET_NO);
1158   neighbour_quota_update (n, NULL);
1159   consider_free_neighbour (n);
1160   return n;
1161 }
1162
1163
1164 int
1165 GSC_NEIGHBOURS_init ()
1166 {
1167   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
1168   self.public_key = &my_public_key;
1169   self.peer = my_identity;
1170   self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
1171   self.status = PEER_STATE_KEY_CONFIRMED;
1172   self.is_connected = GNUNET_YES;
1173   return GNUNET_OK;
1174 }
1175
1176
1177 void
1178 GSC_NEIGHBOURS_done ()
1179 {
1180   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
1181                                          NULL);
1182   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1183   neighbours = NULL;
1184   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1185                          0, GNUNET_NO);
1186 }