3f97532bd634afb060ee2490626014bbee193747
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done) 
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_service_core.h"
28 #include "gnunet_service_core_neighbours.h"
29 #include "gnunet_service_core_kx.h"
30 #include "gnunet_service_core_sessions.h"
31
32 /**
33  * Record kept for each request for transmission issued by a
34  * client that is still pending.
35  */
36 struct GSC_ClientActiveRequest;
37
38 /**
39  * Data kept per session.
40  */
41 struct Session
42 {
43   /**
44    * Identity of the other peer.
45    */
46   struct GNUNET_PeerIdentity peer;
47
48   /**
49    * Head of list of requests from clients for transmission to
50    * this peer.
51    */
52   struct GSC_ClientActiveRequest *active_client_request_head;
53
54   /**
55    * Tail of list of requests from clients for transmission to
56    * this peer.
57    */
58   struct GSC_ClientActiveRequest *active_client_request_tail;
59
60   /**
61    * Performance data for the peer.
62    */
63   struct GNUNET_TRANSPORT_ATS_Information *ats;
64
65   /**
66    * Information about the key exchange with the other peer.
67    */
68   struct GSC_KeyExchangeInfo *kxinfo;
69
70   /**
71    * ID of task used for cleaning up dead neighbour entries.
72    */
73   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
74
75   /**
76    * ID of task used for updating bandwidth quota for this neighbour.
77    */
78   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
79
80   /**
81    * At what time did we initially establish (as in, complete session
82    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
83    */
84   struct GNUNET_TIME_Absolute time_established;
85
86   /**
87    * At what time did we last receive an encrypted message from the
88    * other peer?  Should be zero if status != KEY_CONFIRMED.
89    */
90   struct GNUNET_TIME_Absolute last_activity;
91
92   /**
93    * How valueable were the messages of this peer recently?
94    */
95   unsigned long long current_preference;
96
97   /**
98    * Number of entries in 'ats'.
99    */
100   unsigned int ats_count;
101
102   /**
103    * Bit map indicating which of the 32 sequence numbers before the last
104    * were received (good for accepting out-of-order packets and
105    * estimating reliability of the connection)
106    */
107   unsigned int last_packets_bitmap;
108
109   /**
110    * last sequence number received on this connection (highest)
111    */
112   uint32_t last_sequence_number_received;
113
114   /**
115    * last sequence number transmitted
116    */
117   uint32_t last_sequence_number_sent;
118
119   /**
120    * Available bandwidth in for this peer (current target).
121    */
122   struct GNUNET_BANDWIDTH_Value32NBO bw_in;
123
124   /**
125    * Available bandwidth out for this peer (current target).
126    */
127   struct GNUNET_BANDWIDTH_Value32NBO bw_out;
128
129   /**
130    * Internal bandwidth limit set for this peer (initially typically
131    * set to "-1").  Actual "bw_out" is MIN of
132    * "bpm_out_internal_limit" and "bw_out_external_limit".
133    */
134   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
135
136   /**
137    * External bandwidth limit set for this peer by the
138    * peer that we are communicating with.  "bw_out" is MIN of
139    * "bw_out_internal_limit" and "bw_out_external_limit".
140    */
141   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
142
143 };
144
145
146 /**
147  * Map of peer identities to 'struct Session'.
148  */
149 static struct GNUNET_CONTAINER_MultiHashMap *sessions;
150
151
152 /**
153  * Session entry for "this" peer.
154  */
155 static struct Session self;
156
157 /**
158  * Sum of all preferences among all neighbours.
159  */
160 static unsigned long long preference_sum;
161
162
163 // FIXME.........
164
165 /**
166  * At what time should the connection to the given neighbour
167  * time out (given no further activity?)
168  *
169  * @param n neighbour in question
170  * @return absolute timeout
171  */
172 static struct GNUNET_TIME_Absolute
173 get_neighbour_timeout (struct Neighbour *n)
174 {
175   return GNUNET_TIME_absolute_add (n->last_activity,
176                                    GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
177 }
178
179
180 /**
181  * Helper function for update_preference_sum.
182  */
183 static int
184 update_preference (void *cls, const GNUNET_HashCode * key, void *value)
185 {
186   unsigned long long *ps = cls;
187   struct Neighbour *n = value;
188
189   n->current_preference /= 2;
190   *ps += n->current_preference;
191   return GNUNET_OK;
192 }
193
194
195 /**
196  * A preference value for a neighbour was update.  Update
197  * the preference sum accordingly.
198  *
199  * @param inc how much was a preference value increased?
200  */
201 static void
202 update_preference_sum (unsigned long long inc)
203 {
204   unsigned long long os;
205
206   os = preference_sum;
207   preference_sum += inc;
208   if (preference_sum >= os)
209     return;                     /* done! */
210   /* overflow! compensate by cutting all values in half! */
211   preference_sum = 0;
212   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference,
213                                          &preference_sum);
214   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"),
215                          preference_sum, GNUNET_NO);
216 }
217
218
219 /**
220  * Find the entry for the given neighbour.
221  *
222  * @param peer identity of the neighbour
223  * @return NULL if we are not connected, otherwise the
224  *         neighbour's entry.
225  */
226 static struct Neighbour *
227 find_neighbour (const struct GNUNET_PeerIdentity *peer)
228 {
229   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
230 }
231
232
233 /**
234  * Function called by transport telling us that a peer
235  * changed status.
236  *
237  * @param n the peer that changed status
238  */
239 static void
240 handle_peer_status_change (struct Neighbour *n)
241 {
242   struct PeerStatusNotifyMessage *psnm;
243   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
244   struct GNUNET_TRANSPORT_ATS_Information *ats;
245   size_t size;
246
247   if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED))
248     return;
249 #if DEBUG_CORE > 1
250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n",
251               GNUNET_i2s (&n->peer));
252 #endif
253   size =
254       sizeof (struct PeerStatusNotifyMessage) +
255       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
256   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
257   {
258     GNUNET_break (0);
259     /* recovery strategy: throw away performance data */
260     GNUNET_array_grow (n->ats, n->ats_count, 0);
261     size =
262         sizeof (struct PeerStatusNotifyMessage) +
263         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
264   }
265   psnm = (struct PeerStatusNotifyMessage *) buf;
266   psnm->header.size = htons (size);
267   psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE);
268   psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n));
269   psnm->bandwidth_in = n->bw_in;
270   psnm->bandwidth_out = n->bw_out;
271   psnm->peer = n->peer;
272   psnm->ats_count = htonl (n->ats_count);
273   ats = &psnm->ats;
274   memcpy (ats, n->ats,
275           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
276   ats[n->ats_count].type = htonl (0);
277   ats[n->ats_count].value = htonl (0);
278   send_to_all_clients (&psnm->header, GNUNET_YES,
279                        GNUNET_CORE_OPTION_SEND_STATUS_CHANGE);
280   GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1,
281                             GNUNET_NO);
282 }
283
284
285
286 /**
287  * Go over our message queue and if it is not too long, go
288  * over the pending requests from clients for this
289  * neighbour and send some clients a 'READY' notification.
290  *
291  * @param n which peer to process
292  */
293 static void
294 schedule_peer_messages (struct Neighbour *n)
295 {
296   struct GSC_ClientActiveRequest *car;
297   struct GSC_ClientActiveRequest *pos;
298   struct Client *c;
299   struct MessageEntry *mqe;
300   unsigned int queue_size;
301
302   /* check if neighbour queue is empty enough! */
303   if (n != &self)
304   {
305     queue_size = 0;
306     mqe = n->messages;
307     while (mqe != NULL)
308     {
309       queue_size++;
310       mqe = mqe->next;
311     }
312     if (queue_size >= MAX_PEER_QUEUE_SIZE)
313     {
314 #if DEBUG_CORE_CLIENT
315       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
316                   "Not considering client transmission requests: queue full\n");
317 #endif
318       return;                   /* queue still full */
319     }
320     /* find highest priority request */
321     pos = n->active_client_request_head;
322     car = NULL;
323     while (pos != NULL)
324     {
325       if ((car == NULL) || (pos->priority > car->priority))
326         car = pos;
327       pos = pos->next;
328     }
329   }
330   else
331   {
332     car = n->active_client_request_head;
333   }
334   if (car == NULL)
335     return;                     /* no pending requests */
336 #if DEBUG_CORE_CLIENT
337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338               "Permitting client transmission request to `%s'\n",
339               GNUNET_i2s (&n->peer));
340 #endif
341   GSC_CLIENTS_solicite_request (car);
342 }
343
344
345
346 /**
347  * Free the given entry for the neighbour (it has
348  * already been removed from the list at this point).
349  *
350  * @param n neighbour to free
351  */
352 static void
353 free_neighbour (struct Neighbour *n)
354 {
355   struct MessageEntry *m;
356   struct GSC_ClientActiveRequest *car;
357
358 #if DEBUG_CORE
359   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360               "Destroying neighbour entry for peer `%4s'\n",
361               GNUNET_i2s (&n->peer));
362 #endif
363   if (n->skm != NULL)
364   {
365     GNUNET_free (n->skm);
366     n->skm = NULL;
367   }
368   while (NULL != (m = n->messages))
369   {
370     n->messages = m->next;
371     GNUNET_free (m);
372   }
373   while (NULL != (m = n->encrypted_head))
374   {
375     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
376     GNUNET_free (m);
377   }
378   while (NULL != (car = n->active_client_request_head))
379   {
380     GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
381                                  n->active_client_request_tail, car);
382     GNUNET_assert (GNUNET_YES ==
383                    GNUNET_CONTAINER_multihashmap_remove (car->client->requests,
384                                                          &n->peer.hashPubKey,
385                                                          car));
386     GNUNET_free (car);
387   }
388   if (NULL != n->th)
389   {
390     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
391     n->th = NULL;
392   }
393   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
394     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
395   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
396     GNUNET_SCHEDULER_cancel (n->quota_update_task);
397   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
398     GNUNET_SCHEDULER_cancel (n->keep_alive_task);
399   if (n->status == PEER_STATE_KEY_CONFIRMED)
400     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
401                               -1, GNUNET_NO);
402   GNUNET_array_grow (n->ats, n->ats_count, 0);
403   GNUNET_free_non_null (n->pending_ping);
404   GNUNET_free_non_null (n->pending_pong);
405   GNUNET_free (n);
406 }
407
408
409
410 /**
411  * Consider freeing the given neighbour since we may not need
412  * to keep it around anymore.
413  *
414  * @param n neighbour to consider discarding
415  */
416 static void
417 consider_free_neighbour (struct Neighbour *n);
418
419
420 /**
421  * Task triggered when a neighbour entry might have gotten stale.
422  *
423  * @param cls the 'struct Neighbour'
424  * @param tc scheduler context (not used)
425  */
426 static void
427 consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
428 {
429   struct Neighbour *n = cls;
430
431   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
432   consider_free_neighbour (n);
433 }
434
435
436 /**
437  * Consider freeing the given neighbour since we may not need
438  * to keep it around anymore.
439  *
440  * @param n neighbour to consider discarding
441  */
442 static void
443 consider_free_neighbour (struct Neighbour *n)
444 {
445   struct GNUNET_TIME_Relative left;
446
447   if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected))
448     return;                     /* no chance */
449
450   left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
451   if (left.rel_value > 0)
452   {
453     if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
454       GNUNET_SCHEDULER_cancel (n->dead_clean_task);
455     n->dead_clean_task =
456         GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n);
457     return;
458   }
459   /* actually free the neighbour... */
460   GNUNET_assert (GNUNET_YES ==
461                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
462                                                        &n->peer.hashPubKey, n));
463   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
464                          GNUNET_CONTAINER_multihashmap_size (neighbours),
465                          GNUNET_NO);
466   free_neighbour (n);
467 }
468
469
470 /**
471  * Function called when the transport service is ready to
472  * receive an encrypted message for the respective peer
473  *
474  * @param cls neighbour to use message from
475  * @param size number of bytes we can transmit
476  * @param buf where to copy the message
477  * @return number of bytes transmitted
478  */
479 static size_t
480 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
481 {
482   struct Neighbour *n = cls;
483   struct MessageEntry *m;
484   size_t ret;
485   char *cbuf;
486
487   n->th = NULL;
488   m = n->encrypted_head;
489   if (m == NULL)
490   {
491 #if DEBUG_CORE
492     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
493                 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
494                 GNUNET_i2s (&n->peer));
495 #endif
496     return 0;
497   }
498   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
499   ret = 0;
500   cbuf = buf;
501   if (buf != NULL)
502   {
503     GNUNET_assert (size >= m->size);
504     memcpy (cbuf, &m[1], m->size);
505     ret = m->size;
506     GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
507 #if DEBUG_CORE
508     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509                 "Copied message of type %u and size %u into transport buffer for `%4s'\n",
510                 (unsigned int)
511                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
512                 (unsigned int) ret, GNUNET_i2s (&n->peer));
513 #endif
514     process_encrypted_neighbour_queue (n);
515   }
516   else
517   {
518 #if DEBUG_CORE
519     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520                 "Transmission of message of type %u and size %u failed\n",
521                 (unsigned int)
522                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
523                 (unsigned int) m->size);
524 #endif
525   }
526   GNUNET_free (m);
527   consider_free_neighbour (n);
528   GNUNET_STATISTICS_update (stats,
529                             gettext_noop
530                             ("# encrypted bytes given to transport"), ret,
531                             GNUNET_NO);
532   return ret;
533 }
534
535
536
537
538
539 /**
540  * Select messages for transmission.  This heuristic uses a combination
541  * of earliest deadline first (EDF) scheduling (with bounded horizon)
542  * and priority-based discard (in case no feasible schedule exist) and
543  * speculative optimization (defer any kind of transmission until
544  * we either create a batch of significant size, 25% of max, or until
545  * we are close to a deadline).  Furthermore, when scheduling the
546  * heuristic also packs as many messages into the batch as possible,
547  * starting with those with the earliest deadline.  Yes, this is fun.
548  *
549  * @param n neighbour to select messages from
550  * @param size number of bytes to select for transmission
551  * @param retry_time set to the time when we should try again
552  *        (only valid if this function returns zero)
553  * @return number of bytes selected, or 0 if we decided to
554  *         defer scheduling overall; in that case, retry_time is set.
555  */
556 static size_t
557 select_messages (struct Neighbour *n, size_t size,
558                  struct GNUNET_TIME_Relative *retry_time)
559 {
560   struct MessageEntry *pos;
561   struct MessageEntry *min;
562   struct MessageEntry *last;
563   unsigned int min_prio;
564   struct GNUNET_TIME_Absolute t;
565   struct GNUNET_TIME_Absolute now;
566   struct GNUNET_TIME_Relative delta;
567   uint64_t avail;
568   struct GNUNET_TIME_Relative slack;    /* how long could we wait before missing deadlines? */
569   size_t off;
570   uint64_t tsize;
571   unsigned int queue_size;
572   int discard_low_prio;
573
574   GNUNET_assert (NULL != n->messages);
575   now = GNUNET_TIME_absolute_get ();
576   /* last entry in linked list of messages processed */
577   last = NULL;
578   /* should we remove the entry with the lowest
579    * priority from consideration for scheduling at the
580    * end of the loop? */
581   queue_size = 0;
582   tsize = 0;
583   pos = n->messages;
584   while (pos != NULL)
585   {
586     queue_size++;
587     tsize += pos->size;
588     pos = pos->next;
589   }
590   discard_low_prio = GNUNET_YES;
591   while (GNUNET_YES == discard_low_prio)
592   {
593     min = NULL;
594     min_prio = UINT_MAX;
595     discard_low_prio = GNUNET_NO;
596     /* calculate number of bytes available for transmission at time "t" */
597     avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
598     t = now;
599     /* how many bytes have we (hypothetically) scheduled so far */
600     off = 0;
601     /* maximum time we can wait before transmitting anything
602      * and still make all of our deadlines */
603     slack = GNUNET_TIME_UNIT_FOREVER_REL;
604     pos = n->messages;
605     /* note that we use "*2" here because we want to look
606      * a bit further into the future; much more makes no
607      * sense since new message might be scheduled in the
608      * meantime... */
609     while ((pos != NULL) && (off < size * 2))
610     {
611       if (pos->do_transmit == GNUNET_YES)
612       {
613         /* already removed from consideration */
614         pos = pos->next;
615         continue;
616       }
617       if (discard_low_prio == GNUNET_NO)
618       {
619         delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
620         if (delta.rel_value > 0)
621         {
622           // FIXME: HUH? Check!
623           t = pos->deadline;
624           avail +=
625               GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
626         }
627         if (avail < pos->size)
628         {
629           // FIXME: HUH? Check!
630           discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
631         }
632         else
633         {
634           avail -= pos->size;
635           /* update slack, considering both its absolute deadline
636            * and relative deadlines caused by other messages
637            * with their respective load */
638           slack =
639               GNUNET_TIME_relative_min (slack,
640                                         GNUNET_BANDWIDTH_value_get_delay_for
641                                         (n->bw_out, avail));
642           if (pos->deadline.abs_value <= now.abs_value)
643           {
644             /* now or never */
645             slack = GNUNET_TIME_UNIT_ZERO;
646           }
647           else if (GNUNET_YES == pos->got_slack)
648           {
649             /* should be soon now! */
650             slack =
651                 GNUNET_TIME_relative_min (slack,
652                                           GNUNET_TIME_absolute_get_remaining
653                                           (pos->slack_deadline));
654           }
655           else
656           {
657             slack =
658                 GNUNET_TIME_relative_min (slack,
659                                           GNUNET_TIME_absolute_get_difference
660                                           (now, pos->deadline));
661             pos->got_slack = GNUNET_YES;
662             pos->slack_deadline =
663                 GNUNET_TIME_absolute_min (pos->deadline,
664                                           GNUNET_TIME_relative_to_absolute
665                                           (GNUNET_CONSTANTS_MAX_CORK_DELAY));
666           }
667         }
668       }
669       off += pos->size;
670       t = GNUNET_TIME_absolute_max (pos->deadline, t);  // HUH? Check!
671       if (pos->priority <= min_prio)
672       {
673         /* update min for discard */
674         min_prio = pos->priority;
675         min = pos;
676       }
677       pos = pos->next;
678     }
679     if (discard_low_prio)
680     {
681       GNUNET_assert (min != NULL);
682       /* remove lowest-priority entry from consideration */
683       min->do_transmit = GNUNET_YES;    /* means: discard (for now) */
684     }
685     last = pos;
686   }
687   /* guard against sending "tiny" messages with large headers without
688    * urgent deadlines */
689   if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
690       (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
691   {
692     /* less than 25% of message would be filled with deadlines still
693      * being met if we delay by one second or more; so just wait for
694      * more data; but do not wait longer than 1s (since we don't want
695      * to delay messages for a really long time either). */
696     *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
697     /* reset do_transmit values for next time */
698     while (pos != last)
699     {
700       pos->do_transmit = GNUNET_NO;
701       pos = pos->next;
702     }
703     GNUNET_STATISTICS_update (stats,
704                               gettext_noop
705                               ("# transmissions delayed due to corking"), 1,
706                               GNUNET_NO);
707 #if DEBUG_CORE
708     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
710                 (unsigned long long) retry_time->rel_value, (unsigned int) off,
711                 (unsigned int) size);
712 #endif
713     return 0;
714   }
715   /* select marked messages (up to size) for transmission */
716   off = 0;
717   pos = n->messages;
718   while (pos != last)
719   {
720     if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
721     {
722       pos->do_transmit = GNUNET_YES;    /* mark for transmission */
723       off += pos->size;
724       size -= pos->size;
725 #if DEBUG_CORE > 1
726       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727                   "Selecting message of size %u for transmission\n",
728                   (unsigned int) pos->size);
729 #endif
730     }
731     else
732     {
733 #if DEBUG_CORE > 1
734       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735                   "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
736                   (unsigned int) pos->size, size);
737 #endif
738       pos->do_transmit = GNUNET_NO;     /* mark for not transmitting! */
739     }
740     pos = pos->next;
741   }
742 #if DEBUG_CORE
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
745               (unsigned long long) off, (unsigned long long) tsize, queue_size,
746               (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
747 #endif
748   return off;
749 }
750
751
752 /**
753  * Batch multiple messages into a larger buffer.
754  *
755  * @param n neighbour to take messages from
756  * @param buf target buffer
757  * @param size size of buf
758  * @param deadline set to transmission deadline for the result
759  * @param retry_time set to the time when we should try again
760  *        (only valid if this function returns zero)
761  * @param priority set to the priority of the batch
762  * @return number of bytes written to buf (can be zero)
763  */
764 static size_t
765 batch_message (struct Neighbour *n, char *buf, size_t size,
766                struct GNUNET_TIME_Absolute *deadline,
767                struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
768 {
769   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
770   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
771   struct MessageEntry *pos;
772   struct MessageEntry *prev;
773   struct MessageEntry *next;
774   size_t ret;
775
776   ret = 0;
777   *priority = 0;
778   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
779   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
780   if (0 == select_messages (n, size, retry_time))
781   {
782 #if DEBUG_CORE
783     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784                 "No messages selected, will try again in %llu ms\n",
785                 retry_time->rel_value);
786 #endif
787     return 0;
788   }
789   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
790   ntm->ats_count = htonl (0);
791   ntm->ats.type = htonl (0);
792   ntm->ats.value = htonl (0);
793   ntm->peer = n->peer;
794   pos = n->messages;
795   prev = NULL;
796   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
797   {
798     next = pos->next;
799     if (GNUNET_YES == pos->do_transmit)
800     {
801       GNUNET_assert (pos->size <= size);
802       /* do notifications */
803       /* FIXME: track if we have *any* client that wants
804        * full notifications and only do this if that is
805        * actually true */
806       if (pos->size <
807           GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
808       {
809         memcpy (&ntm[1], &pos[1], pos->size);
810         ntm->header.size =
811             htons (sizeof (struct NotifyTrafficMessage) +
812                    sizeof (struct GNUNET_MessageHeader));
813         send_to_all_clients (&ntm->header, GNUNET_YES,
814                              GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
815       }
816       else
817       {
818         /* message too large for 'full' notifications, we do at
819          * least the 'hdr' type */
820         memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
821       }
822       ntm->header.size =
823           htons (sizeof (struct NotifyTrafficMessage) + pos->size);
824       send_to_all_clients (&ntm->header, GNUNET_YES,
825                            GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
826 #if DEBUG_HANDSHAKE
827       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828                   "Encrypting %u bytes with message of type %u and size %u\n",
829                   pos->size,
830                   (unsigned int)
831                   ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
832                   (unsigned int)
833                   ntohs (((const struct GNUNET_MessageHeader *)
834                           &pos[1])->size));
835 #endif
836       /* copy for encrypted transmission */
837       memcpy (&buf[ret], &pos[1], pos->size);
838       ret += pos->size;
839       size -= pos->size;
840       *priority += pos->priority;
841 #if DEBUG_CORE > 1
842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843                   "Adding plaintext message of size %u with deadline %llu ms to batch\n",
844                   (unsigned int) pos->size,
845                   (unsigned long long)
846                   GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
847 #endif
848       deadline->abs_value =
849           GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
850       GNUNET_free (pos);
851       if (prev == NULL)
852         n->messages = next;
853       else
854         prev->next = next;
855     }
856     else
857     {
858       prev = pos;
859     }
860     pos = next;
861   }
862 #if DEBUG_CORE > 1
863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
864               "Deadline for message batch is %llu ms\n",
865               GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
866 #endif
867   return ret;
868 }
869
870
871 /**
872  * Remove messages with deadlines that have long expired from
873  * the queue.
874  *
875  * @param n neighbour to inspect
876  */
877 static void
878 discard_expired_messages (struct Neighbour *n)
879 {
880   struct MessageEntry *prev;
881   struct MessageEntry *next;
882   struct MessageEntry *pos;
883   struct GNUNET_TIME_Absolute now;
884   struct GNUNET_TIME_Relative delta;
885   int disc;
886   unsigned int queue_length;
887
888   disc = GNUNET_NO;
889   now = GNUNET_TIME_absolute_get ();
890   prev = NULL;
891   queue_length = 0;
892   pos = n->messages;
893   while (pos != NULL)
894   {
895     queue_length++;
896     next = pos->next;
897     delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
898     if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
899     {
900 #if DEBUG_CORE
901       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
902                   "Message is %llu ms past due, discarding.\n",
903                   delta.rel_value);
904 #endif
905       if (prev == NULL)
906         n->messages = next;
907       else
908         prev->next = next;
909       GNUNET_STATISTICS_update (stats,
910                                 gettext_noop
911                                 ("# messages discarded (expired prior to transmission)"),
912                                 1, GNUNET_NO);
913       disc = GNUNET_YES;
914       GNUNET_free (pos);
915     }
916     else
917       prev = pos;
918     pos = next;
919   }
920   if ( (GNUNET_YES == disc) &&
921        (queue_length == MAX_PEER_QUEUE_SIZE) )
922     schedule_peer_messages (n);
923 }
924
925
926 /**
927  * Signature of the main function of a task.
928  *
929  * @param cls closure
930  * @param tc context information (why was this task triggered now)
931  */
932 static void
933 retry_plaintext_processing (void *cls,
934                             const struct GNUNET_SCHEDULER_TaskContext *tc)
935 {
936   struct Neighbour *n = cls;
937
938   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
939   process_plaintext_neighbour_queue (n);
940 }
941
942
943 /**
944  * Check if we have plaintext messages for the specified neighbour
945  * pending, and if so, consider batching and encrypting them (and
946  * then trigger processing of the encrypted queue if needed).
947  *
948  * @param n neighbour to check.
949  */
950 static void
951 process_plaintext_neighbour_queue (struct Neighbour *n)
952 {
953   char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)];    /* plaintext */
954   size_t used;
955   struct EncryptedMessage *em;  /* encrypted message */
956   struct EncryptedMessage *ph;  /* plaintext header */
957   struct MessageEntry *me;
958   unsigned int priority;
959   struct GNUNET_TIME_Absolute deadline;
960   struct GNUNET_TIME_Relative retry_time;
961   struct GNUNET_CRYPTO_AesInitializationVector iv;
962   struct GNUNET_CRYPTO_AuthKey auth_key;
963
964   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
965   {
966     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
967     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
968   }
969   switch (n->status)
970   {
971   case PEER_STATE_DOWN:
972     send_key (n);
973 #if DEBUG_CORE
974     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
976                 GNUNET_i2s (&n->peer));
977 #endif
978     return;
979   case PEER_STATE_KEY_SENT:
980     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
981       n->retry_set_key_task =
982           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
983                                         &set_key_retry_task, n);
984 #if DEBUG_CORE
985     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
987                 GNUNET_i2s (&n->peer));
988 #endif
989     return;
990   case PEER_STATE_KEY_RECEIVED:
991     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
992       n->retry_set_key_task =
993           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
994                                         &set_key_retry_task, n);
995 #if DEBUG_CORE
996     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
998                 GNUNET_i2s (&n->peer));
999 #endif
1000     return;
1001   case PEER_STATE_KEY_CONFIRMED:
1002     /* ready to continue */
1003     break;
1004   }
1005   discard_expired_messages (n);
1006   if (n->messages == NULL)
1007   {
1008 #if DEBUG_CORE
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Plaintext message queue for `%4s' is empty.\n",
1011                 GNUNET_i2s (&n->peer));
1012 #endif
1013     return;                     /* no pending messages */
1014   }
1015   if (n->encrypted_head != NULL)
1016   {
1017 #if DEBUG_CORE > 2
1018     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019                 "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1020                 GNUNET_i2s (&n->peer));
1021 #endif
1022     return;                     /* wait for messages already encrypted to be
1023                                  * processed first! */
1024   }
1025   ph = (struct EncryptedMessage *) pbuf;
1026   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1027   priority = 0;
1028   used = sizeof (struct EncryptedMessage);
1029   used +=
1030       batch_message (n, &pbuf[used],
1031                      GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
1032                      &retry_time, &priority);
1033   if (used == sizeof (struct EncryptedMessage))
1034   {
1035 #if DEBUG_CORE > 1
1036     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                 "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1038                 GNUNET_i2s (&n->peer));
1039 #endif
1040     /* no messages selected for sending, try again later... */
1041     n->retry_plaintext_task =
1042         GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
1043                                       n);
1044     return;
1045   }
1046   GSC_KX_encrypt_and_transmit (n->kx,
1047                                &pbuf[struct EncryptedMessage],
1048                                used - sizeof (struct EncryptedMessage));
1049   schedule_peer_messages (n);
1050 }
1051
1052
1053
1054
1055 /**
1056  * Check if we have encrypted messages for the specified neighbour
1057  * pending, and if so, check with the transport about sending them
1058  * out.
1059  *
1060  * @param n neighbour to check.
1061  */
1062 static void
1063 process_encrypted_neighbour_queue (struct Neighbour *n)
1064 {
1065   struct MessageEntry *m;
1066
1067   if (n->th != NULL)
1068     return;                     /* request already pending */
1069   if (GNUNET_YES != n->is_connected)
1070   {
1071     GNUNET_break (0);
1072     return;
1073   }
1074   m = n->encrypted_head;
1075   if (m == NULL)
1076   {
1077     /* encrypted queue empty, try plaintext instead */
1078     process_plaintext_neighbour_queue (n);
1079     return;
1080   }
1081 #if DEBUG_CORE > 1
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1084               (unsigned int) m->size, GNUNET_i2s (&n->peer),
1085               (unsigned long long)
1086               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
1087 #endif
1088   n->th =
1089        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
1090                                               m->priority,
1091                                               GNUNET_TIME_absolute_get_remaining
1092                                               (m->deadline),
1093                                               &notify_encrypted_transmit_ready,
1094                                               n);
1095   if (n->th == NULL)
1096   {
1097     /* message request too large or duplicate request */
1098     GNUNET_break (0);
1099     /* discard encrypted message */
1100     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
1101     GNUNET_free (m);
1102     process_encrypted_neighbour_queue (n);
1103   }
1104 }
1105
1106
1107 /**
1108  * Initialize a new 'struct Neighbour'.
1109  *
1110  * @param pid ID of the new neighbour
1111  * @return handle for the new neighbour
1112  */
1113 static struct Neighbour *
1114 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1115 {
1116   struct Neighbour *n;
1117   struct GNUNET_TIME_Absolute now;
1118
1119 #if DEBUG_CORE
1120   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121               "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid));
1122 #endif
1123   n = GNUNET_malloc (sizeof (struct Neighbour));
1124   n->peer = *pid;
1125   n->last_activity = GNUNET_TIME_absolute_get ();
1126   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1127   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1128   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
1129   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1130   n->ping_challenge =
1131       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1132   GNUNET_assert (GNUNET_OK ==
1133                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1134                                                     &n->peer.hashPubKey, n,
1135                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1136   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1137                          GNUNET_CONTAINER_multihashmap_size (neighbours),
1138                          GNUNET_NO);
1139   neighbour_quota_update (n, NULL);
1140   consider_free_neighbour (n);
1141   return n;
1142 }
1143
1144
1145
1146 /**
1147  * We have a new client, notify it about all current sessions.
1148  *
1149  * @param client the new client
1150  */
1151 void
1152 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1153 {
1154   /* notify new client about existing neighbours */
1155   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
1156                                          &notify_client_about_neighbour, client);
1157 }
1158
1159
1160 /**
1161  * Queue a request from a client for transmission to a particular peer.
1162  *
1163  * @param car request to queue; this handle is then shared between
1164  *         the caller (CLIENTS subsystem) and SESSIONS and must not
1165  *         be released by either until either 'GNUNET_SESSIONS_dequeue',
1166  *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
1167  *         have been invoked on it
1168  */
1169 void
1170 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
1171 {
1172   struct Neighbour *n; // FIXME: session...
1173
1174   n = find_neighbour (&car->peer);
1175   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1176       (n->status != PEER_STATE_KEY_CONFIRMED))
1177   {
1178     /* neighbour must have disconnected since request was issued,
1179      * ignore (client will realize it once it processes the
1180      * disconnect notification) */
1181 #if DEBUG_CORE_CLIENT
1182     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1183                 "Dropped client request for transmission (am disconnected)\n");
1184 #endif
1185     GNUNET_STATISTICS_update (stats,
1186                               gettext_noop
1187                               ("# send requests dropped (disconnected)"), 1,
1188                               GNUNET_NO);
1189     GSC_CLIENTS_reject_requests (car);
1190     return;
1191   }
1192 #if DEBUG_CORE_CLIENT
1193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194               "Received client transmission request. queueing\n");
1195 #endif
1196     GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
1197                                  n->active_client_request_tail, car);
1198
1199   // schedule_peer_messages (n);
1200 }
1201
1202
1203 /**
1204  * Dequeue a request from a client from transmission to a particular peer.
1205  *
1206  * @param car request to dequeue; this handle will then be 'owned' by
1207  *        the caller (CLIENTS sysbsystem)
1208  */
1209 void
1210 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
1211 {
1212   struct Session *s;
1213
1214   s = find_session (&car->peer);
1215   GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
1216                                s->active_client_request_tail, car);
1217 }
1218
1219
1220
1221 /**
1222  * Transmit a message to a particular peer.
1223  *
1224  * @param car original request that was queued and then solicited;
1225  *            this handle will now be 'owned' by the SESSIONS subsystem
1226  * @param msg message to transmit
1227  */
1228 void
1229 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1230                        const struct GNUNET_MessageHeader *msg)
1231 {
1232   struct MessageEntry *prev;
1233   struct MessageEntry *pos;
1234   struct MessageEntry *e;
1235   struct MessageEntry *min_prio_entry;
1236   struct MessageEntry *min_prio_prev;
1237   unsigned int min_prio;
1238   unsigned int queue_size;
1239
1240   n = find_neighbour (&sm->peer);
1241   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1242       (n->status != PEER_STATE_KEY_CONFIRMED))
1243   {
1244     /* attempt to send message to peer that is not connected anymore
1245      * (can happen due to asynchrony) */
1246     GNUNET_STATISTICS_update (stats,
1247                               gettext_noop
1248                               ("# messages discarded (disconnected)"), 1,
1249                               GNUNET_NO);
1250     if (client != NULL)
1251       GNUNET_SERVER_receive_done (client, GNUNET_OK);
1252     return;
1253   }
1254 #if DEBUG_CORE
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1257               "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
1258 #endif
1259   discard_expired_messages (n);
1260   /* bound queue size */
1261   /* NOTE: this entire block to bound the queue size should be
1262    * obsolete with the new client-request code and the
1263    * 'schedule_peer_messages' mechanism; we still have this code in
1264    * here for now as a sanity check for the new mechanmism;
1265    * ultimately, we should probably simply reject SEND messages that
1266    * are not 'approved' (or provide a new core API for very unreliable
1267    * delivery that always sends with priority 0).  Food for thought. */
1268   min_prio = UINT32_MAX;
1269   min_prio_entry = NULL;
1270   min_prio_prev = NULL;
1271   queue_size = 0;
1272   prev = NULL;
1273   pos = n->messages;
1274   while (pos != NULL)
1275   {
1276     if (pos->priority <= min_prio)
1277     {
1278       min_prio_entry = pos;
1279       min_prio_prev = prev;
1280       min_prio = pos->priority;
1281     }
1282     queue_size++;
1283     prev = pos;
1284     pos = pos->next;
1285   }
1286   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1287   {
1288     /* queue full */
1289     if (ntohl (sm->priority) <= min_prio)
1290     {
1291       /* discard new entry; this should no longer happen! */
1292       GNUNET_break (0);
1293 #if DEBUG_CORE
1294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295                   "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
1296                   queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
1297                   (unsigned int) msize, (unsigned int) ntohs (message->type));
1298 #endif
1299       GNUNET_STATISTICS_update (stats,
1300                                 gettext_noop ("# discarded CORE_SEND requests"),
1301                                 1, GNUNET_NO);
1302
1303       if (client != NULL)
1304         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1305       return;
1306     }
1307     GNUNET_assert (min_prio_entry != NULL);
1308     /* discard "min_prio_entry" */
1309 #if DEBUG_CORE
1310     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1311                 "Queue full, discarding existing older request\n");
1312 #endif
1313     GNUNET_STATISTICS_update (stats,
1314                               gettext_noop
1315                               ("# discarded lower priority CORE_SEND requests"),
1316                               1, GNUNET_NO);
1317     if (min_prio_prev == NULL)
1318       n->messages = min_prio_entry->next;
1319     else
1320       min_prio_prev->next = min_prio_entry->next;
1321     GNUNET_free (min_prio_entry);
1322   }
1323
1324 #if DEBUG_CORE
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "Adding transmission request for `%4s' of size %u to queue\n",
1327               GNUNET_i2s (&sm->peer), (unsigned int) msize);
1328 #endif
1329   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1330   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1331   e->priority = ntohl (sm->priority);
1332   e->size = msize;
1333   if (GNUNET_YES != (int) ntohl (sm->cork))
1334     e->got_slack = GNUNET_YES;
1335   memcpy (&e[1], &sm[1], msize);
1336
1337   /* insert, keep list sorted by deadline */
1338   prev = NULL;
1339   pos = n->messages;
1340   while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
1341   {
1342     prev = pos;
1343     pos = pos->next;
1344   }
1345   if (prev == NULL)
1346     n->messages = e;
1347   else
1348     prev->next = e;
1349   e->next = pos;
1350
1351   /* consider scheduling now */
1352   process_plaintext_neighbour_queue (n);
1353
1354 }
1355
1356
1357
1358 /**
1359  * Send a message to the neighbour.
1360  *
1361  * @param cls the message
1362  * @param key neighbour's identity
1363  * @param value 'struct Neighbour' of the target
1364  * @return always GNUNET_OK
1365  */
1366 static int
1367 do_send_message (void *cls, const GNUNET_HashCode * key, void *value)
1368 {
1369   struct GNUNET_MessageHeader *hdr = cls;
1370   struct Neighbour *n = value;
1371   struct MessageEntry *m;
1372   uint16_t size;
1373
1374   size = ntohs (hdr->size);
1375   m = GNUNET_malloc (sizeof (struct MessageEntry) + size);
1376   memcpy (&m[1], hdr, size);
1377   m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1378   m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1379   m->priority = UINT_MAX;
1380   m->sender_status = n->status;
1381   m->size = size;
1382   GNUNET_CONTAINER_DLL_insert (n->message_head,
1383                                n->message_tail,
1384                                m);
1385   return GNUNET_OK;
1386 }
1387
1388
1389 /**
1390  * Broadcast a message to all neighbours.
1391  *
1392  * @param msg message to transmit
1393  */
1394 void
1395 GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
1396 {
1397   if (NULL == sessions)
1398     return;
1399   GNUNET_CONTAINER_multihashmap_iterate (sessions,
1400                                          &do_send_message, msg);
1401 }
1402
1403
1404 /**
1405  * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
1406  *
1407  * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
1408  * @param key identity of the connected peer
1409  * @param value the 'struct Neighbour' for the peer
1410  * @return GNUNET_OK (continue to iterate)
1411  */
1412 static int
1413 queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value)
1414 {
1415   struct GNUNET_SERVER_TransmitContext *tc = cls;
1416   struct Neighbour *n = value;
1417   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
1418   struct GNUNET_TRANSPORT_ATS_Information *ats;
1419   size_t size;
1420   struct ConnectNotifyMessage *cnm;
1421
1422   cnm = (struct ConnectNotifyMessage *) buf;
1423   if (n->status != PEER_STATE_KEY_CONFIRMED)
1424     return GNUNET_OK;
1425   size =
1426       sizeof (struct ConnectNotifyMessage) +
1427       (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1428   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1429   {
1430     GNUNET_break (0);
1431     /* recovery strategy: throw away performance data */
1432     GNUNET_array_grow (n->ats, n->ats_count, 0);
1433     size =
1434         sizeof (struct PeerStatusNotifyMessage) +
1435         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1436   }
1437   cnm = (struct ConnectNotifyMessage *) buf;
1438   cnm->header.size = htons (size);
1439   cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1440   cnm->ats_count = htonl (n->ats_count);
1441   ats = &cnm->ats;
1442   memcpy (ats, n->ats,
1443           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1444   ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1445   ats[n->ats_count].value = htonl (0);
1446 #if DEBUG_CORE_CLIENT
1447   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1448               "NOTIFY_CONNECT");
1449 #endif
1450   cnm->peer = n->peer;
1451   GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header);
1452   return GNUNET_OK;
1453 }
1454
1455
1456 /**
1457  * End the session with the given peer (we are no longer
1458  * connected). 
1459  *
1460  * @param pid identity of peer to kill session with
1461  */
1462 void
1463 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
1464 {
1465 }
1466
1467
1468 /**
1469  * Traffic is being solicited for the given peer.  This means that the
1470  * message queue on the transport-level (NEIGHBOURS subsystem) is now
1471  * empty and it is now OK to transmit another (non-control) message.
1472  *
1473  * @param pid identity of peer ready to receive data
1474  */
1475 void
1476 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
1477 {
1478 }
1479
1480
1481 /**
1482  * Transmit a message to a particular peer.
1483  *
1484  * @param car original request that was queued and then solicited,
1485  *            ownership does not change (dequeue will be called soon).
1486  * @param msg message to transmit
1487  */
1488 void
1489 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1490                        const struct GNUNET_MessageHeader *msg)
1491 {
1492 }
1493
1494
1495 /**
1496  * We have a new client, notify it about all current sessions.
1497  *
1498  * @param client the new client
1499  */
1500 void
1501 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1502 {
1503 }
1504
1505
1506 /**
1507  * Handle CORE_ITERATE_PEERS request. For this request type, the client
1508  * does not have to have transmitted an INIT request.  All current peers
1509  * are returned, regardless of which message types they accept. 
1510  *
1511  * @param cls unused
1512  * @param client client sending the iteration request
1513  * @param message iteration request message
1514  */
1515 void
1516 GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client,
1517                                           const struct GNUNET_MessageHeader *message)
1518 {
1519   struct GNUNET_MessageHeader done_msg;
1520   struct GNUNET_SERVER_TransmitContext *tc;
1521
1522   tc = GNUNET_SERVER_transmit_context_create (client);
1523   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message,
1524                                          tc);
1525   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1526   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1527   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1528   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1529 }
1530
1531
1532 /**
1533  * Handle CORE_PEER_CONNECTED request.   Notify client about connection
1534  * to the given neighbour.  For this request type, the client does not
1535  * have to have transmitted an INIT request.  All current peers are
1536  * returned, regardless of which message types they accept.
1537  *
1538  * @param cls unused
1539  * @param client client sending the iteration request
1540  * @param message iteration request message
1541  */
1542 void
1543 GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client,
1544                                       const struct GNUNET_MessageHeader *message)
1545 {
1546   struct GNUNET_MessageHeader done_msg;
1547   struct GNUNET_SERVER_TransmitContext *tc;
1548   const struct GNUNET_PeerIdentity *peer;
1549
1550   peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK!
1551   tc = GNUNET_SERVER_transmit_context_create (client);
1552   GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey,
1553                                               &queue_connect_message, tc);
1554   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1555   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1556   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1557   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1558 }
1559
1560
1561
1562 /**
1563  * Handle REQUEST_INFO request. For this request type, the client must
1564  * have transmitted an INIT first.
1565  *
1566  * @param cls unused
1567  * @param client client sending the request
1568  * @param message iteration request message
1569  */
1570 void
1571 GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client,
1572                                          const struct GNUNET_MessageHeader *message)
1573 {
1574   const struct RequestInfoMessage *rcm;
1575   struct GSC_Client *pos;
1576   struct Neighbour *n;
1577   struct ConfigurationInfoMessage cim;
1578   int32_t want_reserv;
1579   int32_t got_reserv;
1580   unsigned long long old_preference;
1581   struct GNUNET_TIME_Relative rdelay;
1582
1583   rdelay = GNUNET_TIME_relative_get_zero ();
1584 #if DEBUG_CORE_CLIENT
1585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n",
1586               "REQUEST_INFO");
1587 #endif
1588   rcm = (const struct RequestInfoMessage *) message;
1589   n = find_neighbour (&rcm->peer);
1590   memset (&cim, 0, sizeof (cim));
1591   if ((n != NULL) && (GNUNET_YES == n->is_connected))
1592   {
1593     want_reserv = ntohl (rcm->reserve_inbound);
1594     if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
1595     {
1596       n->bw_out_internal_limit = rcm->limit_outbound;
1597       if (n->bw_out.value__ !=
1598           GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1599                                       n->bw_out_external_limit).value__)
1600       {
1601         n->bw_out =
1602             GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1603                                         n->bw_out_external_limit);
1604         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
1605                                                n->bw_out);
1606         GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1607         handle_peer_status_change (n);
1608       }
1609     }
1610     if (want_reserv < 0)
1611     {
1612       got_reserv = want_reserv;
1613     }
1614     else if (want_reserv > 0)
1615     {
1616       rdelay =
1617           GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1618                                               want_reserv);
1619       if (rdelay.rel_value == 0)
1620         got_reserv = want_reserv;
1621       else
1622         got_reserv = 0;         /* all or nothing */
1623     }
1624     else
1625       got_reserv = 0;
1626     GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv);
1627     old_preference = n->current_preference;
1628     n->current_preference += GNUNET_ntohll (rcm->preference_change);
1629     if (old_preference > n->current_preference)
1630     {
1631       /* overflow; cap at maximum value */
1632       n->current_preference = ULLONG_MAX;
1633     }
1634     update_preference_sum (n->current_preference - old_preference);
1635 #if DEBUG_CORE_QUOTA
1636     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n",
1638                 (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv,
1639                 (unsigned long long) rdelay.rel_value);
1640 #endif
1641     cim.reserved_amount = htonl (got_reserv);
1642     cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
1643     cim.bw_out = n->bw_out;
1644     cim.preference = n->current_preference;
1645   }
1646   else
1647   {
1648     /* Technically, this COULD happen (due to asynchronous behavior),
1649      * but it should be rare, so we should generate an info event
1650      * to help diagnosis of serious errors that might be masked by this */
1651     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1652                 _
1653                 ("Client asked for preference change with peer `%s', which is not connected!\n"),
1654                 GNUNET_i2s (&rcm->peer));
1655     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1656     return;
1657   }
1658   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1659   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1660   cim.peer = rcm->peer;
1661   cim.rim_id = rcm->rim_id;
1662 #if DEBUG_CORE_CLIENT
1663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1664               "CONFIGURATION_INFO");
1665 #endif
1666   GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO);
1667   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1668 }
1669
1670
1671 /**
1672  * Create a session, a key exchange was just completed.
1673  *
1674  * @param peer peer that is now connected
1675  * @param kx key exchange that completed
1676  */
1677 void
1678 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
1679                      struct GSC_KeyExchangeInfo *kx)
1680 {
1681     {
1682       struct GNUNET_MessageHeader *hdr;
1683
1684       hdr = compute_type_map_message ();
1685       send_type_map_to_neighbour (hdr, &n->peer.hashPubKey, n);
1686       GNUNET_free (hdr);
1687     }
1688     if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
1689     {
1690       n->bw_out_external_limit = t.inbound_bw_limit;
1691       n->bw_out =
1692           GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1693                                       n->bw_out_internal_limit);
1694       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1695                                              n->bw_out);
1696       GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1697     }
1698 #if DEBUG_CORE
1699     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1700                 "Confirmed key via `%s' message for peer `%4s'\n", "PONG",
1701                 GNUNET_i2s (&n->peer));
1702 #endif
1703
1704
1705     size =
1706         sizeof (struct ConnectNotifyMessage) +
1707         (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1708     if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1709     {
1710       GNUNET_break (0);
1711       /* recovery strategy: throw away performance data */
1712       GNUNET_array_grow (n->ats, n->ats_count, 0);
1713       size =
1714           sizeof (struct PeerStatusNotifyMessage) +
1715           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1716     }
1717     cnm = (struct ConnectNotifyMessage *) buf;
1718     cnm->header.size = htons (size);
1719     cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1720     cnm->ats_count = htonl (n->ats_count);
1721     cnm->peer = n->peer;
1722     mats = &cnm->ats;
1723     memcpy (mats, n->ats,
1724             n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1725     mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1726     mats[n->ats_count].value = htonl (0);
1727     send_to_all_clients (&cnm->header, GNUNET_NO,
1728                          GNUNET_CORE_OPTION_SEND_CONNECT);
1729     process_encrypted_neighbour_queue (n);
1730     n->last_activity = GNUNET_TIME_absolute_get ();
1731
1732   if (n->status == PEER_STATE_KEY_CONFIRMED)
1733   {
1734     now = GNUNET_TIME_absolute_get ();
1735     n->last_activity = now;
1736     changed = GNUNET_YES;
1737     if (!up)
1738     {
1739       GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
1740                                 1, GNUNET_NO);
1741       n->time_established = now;
1742     }
1743     if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1744       GNUNET_SCHEDULER_cancel (n->keep_alive_task);
1745     n->keep_alive_task =
1746         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide
1747                                       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1748                                        2), &send_keep_alive, n);
1749   }
1750
1751
1752 }
1753
1754
1755 /**
1756  * Update information about a session.
1757  *
1758  * @param peer peer who's session should be updated
1759  * @param bw_out new outbound bandwidth limit for the peer
1760  * @param atsi performance information
1761  * @param atsi_count number of performance records supplied
1762  */
1763 void
1764 GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer,
1765                      struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1766                      const struct GNUNET_TRANSPORT_ATS_Information *atsi,
1767                      uint32_t atsi_count)
1768 {
1769   if (bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
1770   {
1771 #if DEBUG_CORE_SET_QUOTA
1772     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773                 "Received %u b/s as new inbound limit for peer `%4s'\n",
1774                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
1775                 GNUNET_i2s (&n->peer));
1776 #endif
1777     n->bw_out_external_limit = pt->inbound_bw_limit;
1778     n->bw_out =
1779         GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1780                                     n->bw_out_internal_limit);
1781     GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1782                                            n->bw_out);
1783     GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1784   }
1785
1786 }
1787
1788
1789 /**
1790  * Initialize sessions subsystem.
1791  */
1792 int
1793 GSC_SESSIONS_init ()
1794 {
1795   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
1796   self.public_key = &my_public_key;
1797   self.peer = my_identity;
1798   self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
1799   self.status = PEER_STATE_KEY_CONFIRMED;
1800   self.is_connected = GNUNET_YES;
1801   return GNUNET_OK;
1802 }
1803
1804
1805 /**
1806  * Shutdown sessions subsystem.
1807  */
1808 void
1809 GSC_SESSIONS_done ()
1810 {
1811   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
1812                                          NULL);
1813   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1814   neighbours = NULL;
1815   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1816                          0, GNUNET_NO);
1817 }