a2b662252350a705b9d2a7756c9a526660a9077d
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done) 
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_service_core.h"
28 #include "gnunet_service_core_neighbours.h"
29 #include "gnunet_service_core_kx.h"
30 #include "gnunet_service_core_sessions.h"
31
32 /**
33  * Record kept for each request for transmission issued by a
34  * client that is still pending.
35  */
36 struct GSC_ClientActiveRequest;
37
38 /**
39  * Data kept per session.
40  */
41 struct Session
42 {
43   /**
44    * Identity of the other peer.
45    */
46   struct GNUNET_PeerIdentity peer;
47
48   /**
49    * Head of list of requests from clients for transmission to
50    * this peer.
51    */
52   struct GSC_ClientActiveRequest *active_client_request_head;
53
54   /**
55    * Tail of list of requests from clients for transmission to
56    * this peer.
57    */
58   struct GSC_ClientActiveRequest *active_client_request_tail;
59
60   /**
61    * Performance data for the peer.
62    */
63   struct GNUNET_TRANSPORT_ATS_Information *ats;
64
65   /**
66    * Information about the key exchange with the other peer.
67    */
68   struct GSC_KeyExchangeInfo *kxinfo;
69
70
71   /**
72    * ID of task used for cleaning up dead neighbour entries.
73    */
74   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
75
76   /**
77    * ID of task used for updating bandwidth quota for this neighbour.
78    */
79   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
80
81   /**
82    * At what time did we initially establish (as in, complete session
83    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
84    */
85   struct GNUNET_TIME_Absolute time_established;
86
87   /**
88    * At what time did we last receive an encrypted message from the
89    * other peer?  Should be zero if status != KEY_CONFIRMED.
90    */
91   struct GNUNET_TIME_Absolute last_activity;
92
93   /**
94    * How valueable were the messages of this peer recently?
95    */
96   unsigned long long current_preference;
97
98   /**
99    * Number of entries in 'ats'.
100    */
101   unsigned int ats_count;
102
103   /**
104    * Bit map indicating which of the 32 sequence numbers before the last
105    * were received (good for accepting out-of-order packets and
106    * estimating reliability of the connection)
107    */
108   unsigned int last_packets_bitmap;
109
110   /**
111    * last sequence number received on this connection (highest)
112    */
113   uint32_t last_sequence_number_received;
114
115   /**
116    * last sequence number transmitted
117    */
118   uint32_t last_sequence_number_sent;
119
120   /**
121    * Available bandwidth in for this peer (current target).
122    */
123   struct GNUNET_BANDWIDTH_Value32NBO bw_in;
124
125   /**
126    * Available bandwidth out for this peer (current target).
127    */
128   struct GNUNET_BANDWIDTH_Value32NBO bw_out;
129
130   /**
131    * Internal bandwidth limit set for this peer (initially typically
132    * set to "-1").  Actual "bw_out" is MIN of
133    * "bpm_out_internal_limit" and "bw_out_external_limit".
134    */
135   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
136
137   /**
138    * External bandwidth limit set for this peer by the
139    * peer that we are communicating with.  "bw_out" is MIN of
140    * "bw_out_internal_limit" and "bw_out_external_limit".
141    */
142   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
143
144 };
145
146
147 /**
148  * Map of peer identities to 'struct Session'.
149  */
150 static struct GNUNET_CONTAINER_MultiHashMap *sessions;
151
152
153 /**
154  * Session entry for "this" peer.
155  */
156 static struct Session self;
157
158 /**
159  * Sum of all preferences among all neighbours.
160  */
161 static unsigned long long preference_sum;
162
163
164 // FIXME.........
165
166 /**
167  * At what time should the connection to the given neighbour
168  * time out (given no further activity?)
169  *
170  * @param n neighbour in question
171  * @return absolute timeout
172  */
173 static struct GNUNET_TIME_Absolute
174 get_neighbour_timeout (struct Neighbour *n)
175 {
176   return GNUNET_TIME_absolute_add (n->last_activity,
177                                    GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
178 }
179
180
181 /**
182  * Helper function for update_preference_sum.
183  */
184 static int
185 update_preference (void *cls, const GNUNET_HashCode * key, void *value)
186 {
187   unsigned long long *ps = cls;
188   struct Neighbour *n = value;
189
190   n->current_preference /= 2;
191   *ps += n->current_preference;
192   return GNUNET_OK;
193 }
194
195
196 /**
197  * A preference value for a neighbour was update.  Update
198  * the preference sum accordingly.
199  *
200  * @param inc how much was a preference value increased?
201  */
202 static void
203 update_preference_sum (unsigned long long inc)
204 {
205   unsigned long long os;
206
207   os = preference_sum;
208   preference_sum += inc;
209   if (preference_sum >= os)
210     return;                     /* done! */
211   /* overflow! compensate by cutting all values in half! */
212   preference_sum = 0;
213   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference,
214                                          &preference_sum);
215   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"),
216                          preference_sum, GNUNET_NO);
217 }
218
219
220 /**
221  * Find the entry for the given neighbour.
222  *
223  * @param peer identity of the neighbour
224  * @return NULL if we are not connected, otherwise the
225  *         neighbour's entry.
226  */
227 static struct Neighbour *
228 find_neighbour (const struct GNUNET_PeerIdentity *peer)
229 {
230   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
231 }
232
233
234 /**
235  * Function called by transport telling us that a peer
236  * changed status.
237  *
238  * @param n the peer that changed status
239  */
240 static void
241 handle_peer_status_change (struct Neighbour *n)
242 {
243   struct PeerStatusNotifyMessage *psnm;
244   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
245   struct GNUNET_TRANSPORT_ATS_Information *ats;
246   size_t size;
247
248   if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED))
249     return;
250 #if DEBUG_CORE > 1
251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n",
252               GNUNET_i2s (&n->peer));
253 #endif
254   size =
255       sizeof (struct PeerStatusNotifyMessage) +
256       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
257   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
258   {
259     GNUNET_break (0);
260     /* recovery strategy: throw away performance data */
261     GNUNET_array_grow (n->ats, n->ats_count, 0);
262     size =
263         sizeof (struct PeerStatusNotifyMessage) +
264         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
265   }
266   psnm = (struct PeerStatusNotifyMessage *) buf;
267   psnm->header.size = htons (size);
268   psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE);
269   psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n));
270   psnm->bandwidth_in = n->bw_in;
271   psnm->bandwidth_out = n->bw_out;
272   psnm->peer = n->peer;
273   psnm->ats_count = htonl (n->ats_count);
274   ats = &psnm->ats;
275   memcpy (ats, n->ats,
276           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
277   ats[n->ats_count].type = htonl (0);
278   ats[n->ats_count].value = htonl (0);
279   send_to_all_clients (&psnm->header, GNUNET_YES,
280                        GNUNET_CORE_OPTION_SEND_STATUS_CHANGE);
281   GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1,
282                             GNUNET_NO);
283 }
284
285
286
287 /**
288  * Go over our message queue and if it is not too long, go
289  * over the pending requests from clients for this
290  * neighbour and send some clients a 'READY' notification.
291  *
292  * @param n which peer to process
293  */
294 static void
295 schedule_peer_messages (struct Neighbour *n)
296 {
297   struct GSC_ClientActiveRequest *car;
298   struct GSC_ClientActiveRequest *pos;
299   struct Client *c;
300   struct MessageEntry *mqe;
301   unsigned int queue_size;
302
303   /* check if neighbour queue is empty enough! */
304   if (n != &self)
305   {
306     queue_size = 0;
307     mqe = n->messages;
308     while (mqe != NULL)
309     {
310       queue_size++;
311       mqe = mqe->next;
312     }
313     if (queue_size >= MAX_PEER_QUEUE_SIZE)
314     {
315 #if DEBUG_CORE_CLIENT
316       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
317                   "Not considering client transmission requests: queue full\n");
318 #endif
319       return;                   /* queue still full */
320     }
321     /* find highest priority request */
322     pos = n->active_client_request_head;
323     car = NULL;
324     while (pos != NULL)
325     {
326       if ((car == NULL) || (pos->priority > car->priority))
327         car = pos;
328       pos = pos->next;
329     }
330   }
331   else
332   {
333     car = n->active_client_request_head;
334   }
335   if (car == NULL)
336     return;                     /* no pending requests */
337 #if DEBUG_CORE_CLIENT
338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339               "Permitting client transmission request to `%s'\n",
340               GNUNET_i2s (&n->peer));
341 #endif
342   GSC_CLIENTS_solicite_request (car);
343 }
344
345
346
347 /**
348  * Free the given entry for the neighbour (it has
349  * already been removed from the list at this point).
350  *
351  * @param n neighbour to free
352  */
353 static void
354 free_neighbour (struct Neighbour *n)
355 {
356   struct MessageEntry *m;
357   struct GSC_ClientActiveRequest *car;
358
359 #if DEBUG_CORE
360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361               "Destroying neighbour entry for peer `%4s'\n",
362               GNUNET_i2s (&n->peer));
363 #endif
364   if (n->skm != NULL)
365   {
366     GNUNET_free (n->skm);
367     n->skm = NULL;
368   }
369   while (NULL != (m = n->messages))
370   {
371     n->messages = m->next;
372     GNUNET_free (m);
373   }
374   while (NULL != (m = n->encrypted_head))
375   {
376     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
377     GNUNET_free (m);
378   }
379   while (NULL != (car = n->active_client_request_head))
380   {
381     GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
382                                  n->active_client_request_tail, car);
383     GNUNET_assert (GNUNET_YES ==
384                    GNUNET_CONTAINER_multihashmap_remove (car->client->requests,
385                                                          &n->peer.hashPubKey,
386                                                          car));
387     GNUNET_free (car);
388   }
389   if (NULL != n->th)
390   {
391     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
392     n->th = NULL;
393   }
394   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
395     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
396   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
397     GNUNET_SCHEDULER_cancel (n->quota_update_task);
398   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
399     GNUNET_SCHEDULER_cancel (n->keep_alive_task);
400   if (n->status == PEER_STATE_KEY_CONFIRMED)
401     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
402                               -1, GNUNET_NO);
403   GNUNET_array_grow (n->ats, n->ats_count, 0);
404   GNUNET_free_non_null (n->pending_ping);
405   GNUNET_free_non_null (n->pending_pong);
406   GNUNET_free (n);
407 }
408
409
410
411 /**
412  * Consider freeing the given neighbour since we may not need
413  * to keep it around anymore.
414  *
415  * @param n neighbour to consider discarding
416  */
417 static void
418 consider_free_neighbour (struct Neighbour *n);
419
420
421 /**
422  * Task triggered when a neighbour entry might have gotten stale.
423  *
424  * @param cls the 'struct Neighbour'
425  * @param tc scheduler context (not used)
426  */
427 static void
428 consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
429 {
430   struct Neighbour *n = cls;
431
432   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
433   consider_free_neighbour (n);
434 }
435
436
437 /**
438  * Consider freeing the given neighbour since we may not need
439  * to keep it around anymore.
440  *
441  * @param n neighbour to consider discarding
442  */
443 static void
444 consider_free_neighbour (struct Neighbour *n)
445 {
446   struct GNUNET_TIME_Relative left;
447
448   if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected))
449     return;                     /* no chance */
450
451   left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
452   if (left.rel_value > 0)
453   {
454     if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
455       GNUNET_SCHEDULER_cancel (n->dead_clean_task);
456     n->dead_clean_task =
457         GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n);
458     return;
459   }
460   /* actually free the neighbour... */
461   GNUNET_assert (GNUNET_YES ==
462                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
463                                                        &n->peer.hashPubKey, n));
464   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
465                          GNUNET_CONTAINER_multihashmap_size (neighbours),
466                          GNUNET_NO);
467   free_neighbour (n);
468 }
469
470
471 /**
472  * Function called when the transport service is ready to
473  * receive an encrypted message for the respective peer
474  *
475  * @param cls neighbour to use message from
476  * @param size number of bytes we can transmit
477  * @param buf where to copy the message
478  * @return number of bytes transmitted
479  */
480 static size_t
481 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
482 {
483   struct Neighbour *n = cls;
484   struct MessageEntry *m;
485   size_t ret;
486   char *cbuf;
487
488   n->th = NULL;
489   m = n->encrypted_head;
490   if (m == NULL)
491   {
492 #if DEBUG_CORE
493     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494                 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
495                 GNUNET_i2s (&n->peer));
496 #endif
497     return 0;
498   }
499   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
500   ret = 0;
501   cbuf = buf;
502   if (buf != NULL)
503   {
504     GNUNET_assert (size >= m->size);
505     memcpy (cbuf, &m[1], m->size);
506     ret = m->size;
507     GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
508 #if DEBUG_CORE
509     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510                 "Copied message of type %u and size %u into transport buffer for `%4s'\n",
511                 (unsigned int)
512                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
513                 (unsigned int) ret, GNUNET_i2s (&n->peer));
514 #endif
515     process_encrypted_neighbour_queue (n);
516   }
517   else
518   {
519 #if DEBUG_CORE
520     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521                 "Transmission of message of type %u and size %u failed\n",
522                 (unsigned int)
523                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
524                 (unsigned int) m->size);
525 #endif
526   }
527   GNUNET_free (m);
528   consider_free_neighbour (n);
529   GNUNET_STATISTICS_update (stats,
530                             gettext_noop
531                             ("# encrypted bytes given to transport"), ret,
532                             GNUNET_NO);
533   return ret;
534 }
535
536
537
538
539
540 /**
541  * Select messages for transmission.  This heuristic uses a combination
542  * of earliest deadline first (EDF) scheduling (with bounded horizon)
543  * and priority-based discard (in case no feasible schedule exist) and
544  * speculative optimization (defer any kind of transmission until
545  * we either create a batch of significant size, 25% of max, or until
546  * we are close to a deadline).  Furthermore, when scheduling the
547  * heuristic also packs as many messages into the batch as possible,
548  * starting with those with the earliest deadline.  Yes, this is fun.
549  *
550  * @param n neighbour to select messages from
551  * @param size number of bytes to select for transmission
552  * @param retry_time set to the time when we should try again
553  *        (only valid if this function returns zero)
554  * @return number of bytes selected, or 0 if we decided to
555  *         defer scheduling overall; in that case, retry_time is set.
556  */
557 static size_t
558 select_messages (struct Neighbour *n, size_t size,
559                  struct GNUNET_TIME_Relative *retry_time)
560 {
561   struct MessageEntry *pos;
562   struct MessageEntry *min;
563   struct MessageEntry *last;
564   unsigned int min_prio;
565   struct GNUNET_TIME_Absolute t;
566   struct GNUNET_TIME_Absolute now;
567   struct GNUNET_TIME_Relative delta;
568   uint64_t avail;
569   struct GNUNET_TIME_Relative slack;    /* how long could we wait before missing deadlines? */
570   size_t off;
571   uint64_t tsize;
572   unsigned int queue_size;
573   int discard_low_prio;
574
575   GNUNET_assert (NULL != n->messages);
576   now = GNUNET_TIME_absolute_get ();
577   /* last entry in linked list of messages processed */
578   last = NULL;
579   /* should we remove the entry with the lowest
580    * priority from consideration for scheduling at the
581    * end of the loop? */
582   queue_size = 0;
583   tsize = 0;
584   pos = n->messages;
585   while (pos != NULL)
586   {
587     queue_size++;
588     tsize += pos->size;
589     pos = pos->next;
590   }
591   discard_low_prio = GNUNET_YES;
592   while (GNUNET_YES == discard_low_prio)
593   {
594     min = NULL;
595     min_prio = UINT_MAX;
596     discard_low_prio = GNUNET_NO;
597     /* calculate number of bytes available for transmission at time "t" */
598     avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
599     t = now;
600     /* how many bytes have we (hypothetically) scheduled so far */
601     off = 0;
602     /* maximum time we can wait before transmitting anything
603      * and still make all of our deadlines */
604     slack = GNUNET_TIME_UNIT_FOREVER_REL;
605     pos = n->messages;
606     /* note that we use "*2" here because we want to look
607      * a bit further into the future; much more makes no
608      * sense since new message might be scheduled in the
609      * meantime... */
610     while ((pos != NULL) && (off < size * 2))
611     {
612       if (pos->do_transmit == GNUNET_YES)
613       {
614         /* already removed from consideration */
615         pos = pos->next;
616         continue;
617       }
618       if (discard_low_prio == GNUNET_NO)
619       {
620         delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
621         if (delta.rel_value > 0)
622         {
623           // FIXME: HUH? Check!
624           t = pos->deadline;
625           avail +=
626               GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
627         }
628         if (avail < pos->size)
629         {
630           // FIXME: HUH? Check!
631           discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
632         }
633         else
634         {
635           avail -= pos->size;
636           /* update slack, considering both its absolute deadline
637            * and relative deadlines caused by other messages
638            * with their respective load */
639           slack =
640               GNUNET_TIME_relative_min (slack,
641                                         GNUNET_BANDWIDTH_value_get_delay_for
642                                         (n->bw_out, avail));
643           if (pos->deadline.abs_value <= now.abs_value)
644           {
645             /* now or never */
646             slack = GNUNET_TIME_UNIT_ZERO;
647           }
648           else if (GNUNET_YES == pos->got_slack)
649           {
650             /* should be soon now! */
651             slack =
652                 GNUNET_TIME_relative_min (slack,
653                                           GNUNET_TIME_absolute_get_remaining
654                                           (pos->slack_deadline));
655           }
656           else
657           {
658             slack =
659                 GNUNET_TIME_relative_min (slack,
660                                           GNUNET_TIME_absolute_get_difference
661                                           (now, pos->deadline));
662             pos->got_slack = GNUNET_YES;
663             pos->slack_deadline =
664                 GNUNET_TIME_absolute_min (pos->deadline,
665                                           GNUNET_TIME_relative_to_absolute
666                                           (GNUNET_CONSTANTS_MAX_CORK_DELAY));
667           }
668         }
669       }
670       off += pos->size;
671       t = GNUNET_TIME_absolute_max (pos->deadline, t);  // HUH? Check!
672       if (pos->priority <= min_prio)
673       {
674         /* update min for discard */
675         min_prio = pos->priority;
676         min = pos;
677       }
678       pos = pos->next;
679     }
680     if (discard_low_prio)
681     {
682       GNUNET_assert (min != NULL);
683       /* remove lowest-priority entry from consideration */
684       min->do_transmit = GNUNET_YES;    /* means: discard (for now) */
685     }
686     last = pos;
687   }
688   /* guard against sending "tiny" messages with large headers without
689    * urgent deadlines */
690   if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
691       (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
692   {
693     /* less than 25% of message would be filled with deadlines still
694      * being met if we delay by one second or more; so just wait for
695      * more data; but do not wait longer than 1s (since we don't want
696      * to delay messages for a really long time either). */
697     *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
698     /* reset do_transmit values for next time */
699     while (pos != last)
700     {
701       pos->do_transmit = GNUNET_NO;
702       pos = pos->next;
703     }
704     GNUNET_STATISTICS_update (stats,
705                               gettext_noop
706                               ("# transmissions delayed due to corking"), 1,
707                               GNUNET_NO);
708 #if DEBUG_CORE
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
711                 (unsigned long long) retry_time->rel_value, (unsigned int) off,
712                 (unsigned int) size);
713 #endif
714     return 0;
715   }
716   /* select marked messages (up to size) for transmission */
717   off = 0;
718   pos = n->messages;
719   while (pos != last)
720   {
721     if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
722     {
723       pos->do_transmit = GNUNET_YES;    /* mark for transmission */
724       off += pos->size;
725       size -= pos->size;
726 #if DEBUG_CORE > 1
727       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728                   "Selecting message of size %u for transmission\n",
729                   (unsigned int) pos->size);
730 #endif
731     }
732     else
733     {
734 #if DEBUG_CORE > 1
735       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736                   "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
737                   (unsigned int) pos->size, size);
738 #endif
739       pos->do_transmit = GNUNET_NO;     /* mark for not transmitting! */
740     }
741     pos = pos->next;
742   }
743 #if DEBUG_CORE
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
746               (unsigned long long) off, (unsigned long long) tsize, queue_size,
747               (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
748 #endif
749   return off;
750 }
751
752
753 /**
754  * Batch multiple messages into a larger buffer.
755  *
756  * @param n neighbour to take messages from
757  * @param buf target buffer
758  * @param size size of buf
759  * @param deadline set to transmission deadline for the result
760  * @param retry_time set to the time when we should try again
761  *        (only valid if this function returns zero)
762  * @param priority set to the priority of the batch
763  * @return number of bytes written to buf (can be zero)
764  */
765 static size_t
766 batch_message (struct Neighbour *n, char *buf, size_t size,
767                struct GNUNET_TIME_Absolute *deadline,
768                struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
769 {
770   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
771   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
772   struct MessageEntry *pos;
773   struct MessageEntry *prev;
774   struct MessageEntry *next;
775   size_t ret;
776
777   ret = 0;
778   *priority = 0;
779   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
780   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
781   if (0 == select_messages (n, size, retry_time))
782   {
783 #if DEBUG_CORE
784     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785                 "No messages selected, will try again in %llu ms\n",
786                 retry_time->rel_value);
787 #endif
788     return 0;
789   }
790   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
791   ntm->ats_count = htonl (0);
792   ntm->ats.type = htonl (0);
793   ntm->ats.value = htonl (0);
794   ntm->peer = n->peer;
795   pos = n->messages;
796   prev = NULL;
797   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
798   {
799     next = pos->next;
800     if (GNUNET_YES == pos->do_transmit)
801     {
802       GNUNET_assert (pos->size <= size);
803       /* do notifications */
804       /* FIXME: track if we have *any* client that wants
805        * full notifications and only do this if that is
806        * actually true */
807       if (pos->size <
808           GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
809       {
810         memcpy (&ntm[1], &pos[1], pos->size);
811         ntm->header.size =
812             htons (sizeof (struct NotifyTrafficMessage) +
813                    sizeof (struct GNUNET_MessageHeader));
814         send_to_all_clients (&ntm->header, GNUNET_YES,
815                              GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
816       }
817       else
818       {
819         /* message too large for 'full' notifications, we do at
820          * least the 'hdr' type */
821         memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
822       }
823       ntm->header.size =
824           htons (sizeof (struct NotifyTrafficMessage) + pos->size);
825       send_to_all_clients (&ntm->header, GNUNET_YES,
826                            GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
827 #if DEBUG_HANDSHAKE
828       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829                   "Encrypting %u bytes with message of type %u and size %u\n",
830                   pos->size,
831                   (unsigned int)
832                   ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
833                   (unsigned int)
834                   ntohs (((const struct GNUNET_MessageHeader *)
835                           &pos[1])->size));
836 #endif
837       /* copy for encrypted transmission */
838       memcpy (&buf[ret], &pos[1], pos->size);
839       ret += pos->size;
840       size -= pos->size;
841       *priority += pos->priority;
842 #if DEBUG_CORE > 1
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                   "Adding plaintext message of size %u with deadline %llu ms to batch\n",
845                   (unsigned int) pos->size,
846                   (unsigned long long)
847                   GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
848 #endif
849       deadline->abs_value =
850           GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
851       GNUNET_free (pos);
852       if (prev == NULL)
853         n->messages = next;
854       else
855         prev->next = next;
856     }
857     else
858     {
859       prev = pos;
860     }
861     pos = next;
862   }
863 #if DEBUG_CORE > 1
864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865               "Deadline for message batch is %llu ms\n",
866               GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
867 #endif
868   return ret;
869 }
870
871
872 /**
873  * Remove messages with deadlines that have long expired from
874  * the queue.
875  *
876  * @param n neighbour to inspect
877  */
878 static void
879 discard_expired_messages (struct Neighbour *n)
880 {
881   struct MessageEntry *prev;
882   struct MessageEntry *next;
883   struct MessageEntry *pos;
884   struct GNUNET_TIME_Absolute now;
885   struct GNUNET_TIME_Relative delta;
886   int disc;
887   unsigned int queue_length;
888
889   disc = GNUNET_NO;
890   now = GNUNET_TIME_absolute_get ();
891   prev = NULL;
892   queue_length = 0;
893   pos = n->messages;
894   while (pos != NULL)
895   {
896     queue_length++;
897     next = pos->next;
898     delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
899     if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
900     {
901 #if DEBUG_CORE
902       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
903                   "Message is %llu ms past due, discarding.\n",
904                   delta.rel_value);
905 #endif
906       if (prev == NULL)
907         n->messages = next;
908       else
909         prev->next = next;
910       GNUNET_STATISTICS_update (stats,
911                                 gettext_noop
912                                 ("# messages discarded (expired prior to transmission)"),
913                                 1, GNUNET_NO);
914       disc = GNUNET_YES;
915       GNUNET_free (pos);
916     }
917     else
918       prev = pos;
919     pos = next;
920   }
921   if ( (GNUNET_YES == disc) &&
922        (queue_length == MAX_PEER_QUEUE_SIZE) )
923     schedule_peer_messages (n);
924 }
925
926
927 /**
928  * Signature of the main function of a task.
929  *
930  * @param cls closure
931  * @param tc context information (why was this task triggered now)
932  */
933 static void
934 retry_plaintext_processing (void *cls,
935                             const struct GNUNET_SCHEDULER_TaskContext *tc)
936 {
937   struct Neighbour *n = cls;
938
939   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
940   process_plaintext_neighbour_queue (n);
941 }
942
943
944 /**
945  * Check if we have plaintext messages for the specified neighbour
946  * pending, and if so, consider batching and encrypting them (and
947  * then trigger processing of the encrypted queue if needed).
948  *
949  * @param n neighbour to check.
950  */
951 static void
952 process_plaintext_neighbour_queue (struct Neighbour *n)
953 {
954   char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)];    /* plaintext */
955   size_t used;
956   struct EncryptedMessage *em;  /* encrypted message */
957   struct EncryptedMessage *ph;  /* plaintext header */
958   struct MessageEntry *me;
959   unsigned int priority;
960   struct GNUNET_TIME_Absolute deadline;
961   struct GNUNET_TIME_Relative retry_time;
962   struct GNUNET_CRYPTO_AesInitializationVector iv;
963   struct GNUNET_CRYPTO_AuthKey auth_key;
964
965   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
966   {
967     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
968     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
969   }
970   switch (n->status)
971   {
972   case PEER_STATE_DOWN:
973     send_key (n);
974 #if DEBUG_CORE
975     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
977                 GNUNET_i2s (&n->peer));
978 #endif
979     return;
980   case PEER_STATE_KEY_SENT:
981     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
982       n->retry_set_key_task =
983           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
984                                         &set_key_retry_task, n);
985 #if DEBUG_CORE
986     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
988                 GNUNET_i2s (&n->peer));
989 #endif
990     return;
991   case PEER_STATE_KEY_RECEIVED:
992     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
993       n->retry_set_key_task =
994           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
995                                         &set_key_retry_task, n);
996 #if DEBUG_CORE
997     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
999                 GNUNET_i2s (&n->peer));
1000 #endif
1001     return;
1002   case PEER_STATE_KEY_CONFIRMED:
1003     /* ready to continue */
1004     break;
1005   }
1006   discard_expired_messages (n);
1007   if (n->messages == NULL)
1008   {
1009 #if DEBUG_CORE
1010     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011                 "Plaintext message queue for `%4s' is empty.\n",
1012                 GNUNET_i2s (&n->peer));
1013 #endif
1014     return;                     /* no pending messages */
1015   }
1016   if (n->encrypted_head != NULL)
1017   {
1018 #if DEBUG_CORE > 2
1019     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020                 "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1021                 GNUNET_i2s (&n->peer));
1022 #endif
1023     return;                     /* wait for messages already encrypted to be
1024                                  * processed first! */
1025   }
1026   ph = (struct EncryptedMessage *) pbuf;
1027   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1028   priority = 0;
1029   used = sizeof (struct EncryptedMessage);
1030   used +=
1031       batch_message (n, &pbuf[used],
1032                      GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
1033                      &retry_time, &priority);
1034   if (used == sizeof (struct EncryptedMessage))
1035   {
1036 #if DEBUG_CORE > 1
1037     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                 "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1039                 GNUNET_i2s (&n->peer));
1040 #endif
1041     /* no messages selected for sending, try again later... */
1042     n->retry_plaintext_task =
1043         GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
1044                                       n);
1045     return;
1046   }
1047   GSC_KX_encrypt_and_transmit (n->kx,
1048                                &pbuf[struct EncryptedMessage],
1049                                used - sizeof (struct EncryptedMessage));
1050   schedule_peer_messages (n);
1051 }
1052
1053
1054
1055
1056 /**
1057  * Check if we have encrypted messages for the specified neighbour
1058  * pending, and if so, check with the transport about sending them
1059  * out.
1060  *
1061  * @param n neighbour to check.
1062  */
1063 static void
1064 process_encrypted_neighbour_queue (struct Neighbour *n)
1065 {
1066   struct MessageEntry *m;
1067
1068   if (n->th != NULL)
1069     return;                     /* request already pending */
1070   if (GNUNET_YES != n->is_connected)
1071   {
1072     GNUNET_break (0);
1073     return;
1074   }
1075   m = n->encrypted_head;
1076   if (m == NULL)
1077   {
1078     /* encrypted queue empty, try plaintext instead */
1079     process_plaintext_neighbour_queue (n);
1080     return;
1081   }
1082 #if DEBUG_CORE > 1
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1085               (unsigned int) m->size, GNUNET_i2s (&n->peer),
1086               (unsigned long long)
1087               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
1088 #endif
1089   n->th =
1090        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
1091                                               m->priority,
1092                                               GNUNET_TIME_absolute_get_remaining
1093                                               (m->deadline),
1094                                               &notify_encrypted_transmit_ready,
1095                                               n);
1096   if (n->th == NULL)
1097   {
1098     /* message request too large or duplicate request */
1099     GNUNET_break (0);
1100     /* discard encrypted message */
1101     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
1102     GNUNET_free (m);
1103     process_encrypted_neighbour_queue (n);
1104   }
1105 }
1106
1107
1108 /**
1109  * Initialize a new 'struct Neighbour'.
1110  *
1111  * @param pid ID of the new neighbour
1112  * @return handle for the new neighbour
1113  */
1114 static struct Neighbour *
1115 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1116 {
1117   struct Neighbour *n;
1118   struct GNUNET_TIME_Absolute now;
1119
1120 #if DEBUG_CORE
1121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122               "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid));
1123 #endif
1124   n = GNUNET_malloc (sizeof (struct Neighbour));
1125   n->peer = *pid;
1126   n->last_activity = GNUNET_TIME_absolute_get ();
1127   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1128   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1129   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
1130   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1131   n->ping_challenge =
1132       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1133   GNUNET_assert (GNUNET_OK ==
1134                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1135                                                     &n->peer.hashPubKey, n,
1136                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1137   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1138                          GNUNET_CONTAINER_multihashmap_size (neighbours),
1139                          GNUNET_NO);
1140   neighbour_quota_update (n, NULL);
1141   consider_free_neighbour (n);
1142   return n;
1143 }
1144
1145
1146
1147 /**
1148  * We have a new client, notify it about all current sessions.
1149  *
1150  * @param client the new client
1151  */
1152 void
1153 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1154 {
1155   /* notify new client about existing neighbours */
1156   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
1157                                          &notify_client_about_neighbour, client);
1158 }
1159
1160
1161 /**
1162  * Queue a request from a client for transmission to a particular peer.
1163  *
1164  * @param car request to queue; this handle is then shared between
1165  *         the caller (CLIENTS subsystem) and SESSIONS and must not
1166  *         be released by either until either 'GNUNET_SESSIONS_dequeue',
1167  *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
1168  *         have been invoked on it
1169  */
1170 void
1171 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
1172 {
1173   struct Neighbour *n; // FIXME: session...
1174
1175   n = find_neighbour (&car->peer);
1176   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1177       (n->status != PEER_STATE_KEY_CONFIRMED))
1178   {
1179     /* neighbour must have disconnected since request was issued,
1180      * ignore (client will realize it once it processes the
1181      * disconnect notification) */
1182 #if DEBUG_CORE_CLIENT
1183     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184                 "Dropped client request for transmission (am disconnected)\n");
1185 #endif
1186     GNUNET_STATISTICS_update (stats,
1187                               gettext_noop
1188                               ("# send requests dropped (disconnected)"), 1,
1189                               GNUNET_NO);
1190     GSC_CLIENTS_reject_requests (car);
1191     return;
1192   }
1193 #if DEBUG_CORE_CLIENT
1194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195               "Received client transmission request. queueing\n");
1196 #endif
1197     GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
1198                                  n->active_client_request_tail, car);
1199
1200   // schedule_peer_messages (n);
1201 }
1202
1203
1204 /**
1205  * Dequeue a request from a client from transmission to a particular peer.
1206  *
1207  * @param car request to dequeue; this handle will then be 'owned' by
1208  *        the caller (CLIENTS sysbsystem)
1209  */
1210 void
1211 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
1212 {
1213   struct Session *s;
1214
1215   s = find_session (&car->peer);
1216   GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
1217                                s->active_client_request_tail, car);
1218 }
1219
1220
1221
1222 /**
1223  * Transmit a message to a particular peer.
1224  *
1225  * @param car original request that was queued and then solicited;
1226  *            this handle will now be 'owned' by the SESSIONS subsystem
1227  * @param msg message to transmit
1228  */
1229 void
1230 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1231                        const struct GNUNET_MessageHeader *msg)
1232 {
1233   struct MessageEntry *prev;
1234   struct MessageEntry *pos;
1235   struct MessageEntry *e;
1236   struct MessageEntry *min_prio_entry;
1237   struct MessageEntry *min_prio_prev;
1238   unsigned int min_prio;
1239   unsigned int queue_size;
1240
1241   n = find_neighbour (&sm->peer);
1242   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1243       (n->status != PEER_STATE_KEY_CONFIRMED))
1244   {
1245     /* attempt to send message to peer that is not connected anymore
1246      * (can happen due to asynchrony) */
1247     GNUNET_STATISTICS_update (stats,
1248                               gettext_noop
1249                               ("# messages discarded (disconnected)"), 1,
1250                               GNUNET_NO);
1251     if (client != NULL)
1252       GNUNET_SERVER_receive_done (client, GNUNET_OK);
1253     return;
1254   }
1255 #if DEBUG_CORE
1256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1258               "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
1259 #endif
1260   discard_expired_messages (n);
1261   /* bound queue size */
1262   /* NOTE: this entire block to bound the queue size should be
1263    * obsolete with the new client-request code and the
1264    * 'schedule_peer_messages' mechanism; we still have this code in
1265    * here for now as a sanity check for the new mechanmism;
1266    * ultimately, we should probably simply reject SEND messages that
1267    * are not 'approved' (or provide a new core API for very unreliable
1268    * delivery that always sends with priority 0).  Food for thought. */
1269   min_prio = UINT32_MAX;
1270   min_prio_entry = NULL;
1271   min_prio_prev = NULL;
1272   queue_size = 0;
1273   prev = NULL;
1274   pos = n->messages;
1275   while (pos != NULL)
1276   {
1277     if (pos->priority <= min_prio)
1278     {
1279       min_prio_entry = pos;
1280       min_prio_prev = prev;
1281       min_prio = pos->priority;
1282     }
1283     queue_size++;
1284     prev = pos;
1285     pos = pos->next;
1286   }
1287   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1288   {
1289     /* queue full */
1290     if (ntohl (sm->priority) <= min_prio)
1291     {
1292       /* discard new entry; this should no longer happen! */
1293       GNUNET_break (0);
1294 #if DEBUG_CORE
1295       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296                   "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
1297                   queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
1298                   (unsigned int) msize, (unsigned int) ntohs (message->type));
1299 #endif
1300       GNUNET_STATISTICS_update (stats,
1301                                 gettext_noop ("# discarded CORE_SEND requests"),
1302                                 1, GNUNET_NO);
1303
1304       if (client != NULL)
1305         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1306       return;
1307     }
1308     GNUNET_assert (min_prio_entry != NULL);
1309     /* discard "min_prio_entry" */
1310 #if DEBUG_CORE
1311     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312                 "Queue full, discarding existing older request\n");
1313 #endif
1314     GNUNET_STATISTICS_update (stats,
1315                               gettext_noop
1316                               ("# discarded lower priority CORE_SEND requests"),
1317                               1, GNUNET_NO);
1318     if (min_prio_prev == NULL)
1319       n->messages = min_prio_entry->next;
1320     else
1321       min_prio_prev->next = min_prio_entry->next;
1322     GNUNET_free (min_prio_entry);
1323   }
1324
1325 #if DEBUG_CORE
1326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1327               "Adding transmission request for `%4s' of size %u to queue\n",
1328               GNUNET_i2s (&sm->peer), (unsigned int) msize);
1329 #endif
1330   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1331   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1332   e->priority = ntohl (sm->priority);
1333   e->size = msize;
1334   if (GNUNET_YES != (int) ntohl (sm->cork))
1335     e->got_slack = GNUNET_YES;
1336   memcpy (&e[1], &sm[1], msize);
1337
1338   /* insert, keep list sorted by deadline */
1339   prev = NULL;
1340   pos = n->messages;
1341   while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
1342   {
1343     prev = pos;
1344     pos = pos->next;
1345   }
1346   if (prev == NULL)
1347     n->messages = e;
1348   else
1349     prev->next = e;
1350   e->next = pos;
1351
1352   /* consider scheduling now */
1353   process_plaintext_neighbour_queue (n);
1354
1355 }
1356
1357
1358
1359 /**
1360  * Send a message to the neighbour.
1361  *
1362  * @param cls the message
1363  * @param key neighbour's identity
1364  * @param value 'struct Neighbour' of the target
1365  * @return always GNUNET_OK
1366  */
1367 static int
1368 do_send_message (void *cls, const GNUNET_HashCode * key, void *value)
1369 {
1370   struct GNUNET_MessageHeader *hdr = cls;
1371   struct Neighbour *n = value;
1372   struct MessageEntry *m;
1373   uint16_t size;
1374
1375   size = ntohs (hdr->size);
1376   m = GNUNET_malloc (sizeof (struct MessageEntry) + size);
1377   memcpy (&m[1], hdr, size);
1378   m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1379   m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1380   m->priority = UINT_MAX;
1381   m->sender_status = n->status;
1382   m->size = size;
1383   GNUNET_CONTAINER_DLL_insert (n->message_head,
1384                                n->message_tail,
1385                                m);
1386   return GNUNET_OK;
1387 }
1388
1389
1390 /**
1391  * Broadcast a message to all neighbours.
1392  *
1393  * @param msg message to transmit
1394  */
1395 void
1396 GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
1397 {
1398   if (NULL == sessions)
1399     return;
1400   GNUNET_CONTAINER_multihashmap_iterate (sessions,
1401                                          &do_send_message, msg);
1402 }
1403
1404
1405 /**
1406  * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
1407  *
1408  * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
1409  * @param key identity of the connected peer
1410  * @param value the 'struct Neighbour' for the peer
1411  * @return GNUNET_OK (continue to iterate)
1412  */
1413 static int
1414 queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value)
1415 {
1416   struct GNUNET_SERVER_TransmitContext *tc = cls;
1417   struct Neighbour *n = value;
1418   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
1419   struct GNUNET_TRANSPORT_ATS_Information *ats;
1420   size_t size;
1421   struct ConnectNotifyMessage *cnm;
1422
1423   cnm = (struct ConnectNotifyMessage *) buf;
1424   if (n->status != PEER_STATE_KEY_CONFIRMED)
1425     return GNUNET_OK;
1426   size =
1427       sizeof (struct ConnectNotifyMessage) +
1428       (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1429   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1430   {
1431     GNUNET_break (0);
1432     /* recovery strategy: throw away performance data */
1433     GNUNET_array_grow (n->ats, n->ats_count, 0);
1434     size =
1435         sizeof (struct PeerStatusNotifyMessage) +
1436         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1437   }
1438   cnm = (struct ConnectNotifyMessage *) buf;
1439   cnm->header.size = htons (size);
1440   cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1441   cnm->ats_count = htonl (n->ats_count);
1442   ats = &cnm->ats;
1443   memcpy (ats, n->ats,
1444           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1445   ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1446   ats[n->ats_count].value = htonl (0);
1447 #if DEBUG_CORE_CLIENT
1448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1449               "NOTIFY_CONNECT");
1450 #endif
1451   cnm->peer = n->peer;
1452   GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header);
1453   return GNUNET_OK;
1454 }
1455
1456
1457 /**
1458  * End the session with the given peer (we are no longer
1459  * connected). 
1460  *
1461  * @param pid identity of peer to kill session with
1462  */
1463 void
1464 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
1465 {
1466 }
1467
1468
1469 /**
1470  * Traffic is being solicited for the given peer.  This means that the
1471  * message queue on the transport-level (NEIGHBOURS subsystem) is now
1472  * empty and it is now OK to transmit another (non-control) message.
1473  *
1474  * @param pid identity of peer ready to receive data
1475  */
1476 void
1477 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
1478 {
1479 }
1480
1481
1482 /**
1483  * Transmit a message to a particular peer.
1484  *
1485  * @param car original request that was queued and then solicited,
1486  *            ownership does not change (dequeue will be called soon).
1487  * @param msg message to transmit
1488  */
1489 void
1490 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1491                        const struct GNUNET_MessageHeader *msg)
1492 {
1493 }
1494
1495
1496 /**
1497  * We have a new client, notify it about all current sessions.
1498  *
1499  * @param client the new client
1500  */
1501 void
1502 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1503 {
1504 }
1505
1506
1507 /**
1508  * Handle CORE_ITERATE_PEERS request. For this request type, the client
1509  * does not have to have transmitted an INIT request.  All current peers
1510  * are returned, regardless of which message types they accept. 
1511  *
1512  * @param cls unused
1513  * @param client client sending the iteration request
1514  * @param message iteration request message
1515  */
1516 void
1517 GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client,
1518                                           const struct GNUNET_MessageHeader *message)
1519 {
1520   struct GNUNET_MessageHeader done_msg;
1521   struct GNUNET_SERVER_TransmitContext *tc;
1522
1523   tc = GNUNET_SERVER_transmit_context_create (client);
1524   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message,
1525                                          tc);
1526   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1527   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1528   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1529   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1530 }
1531
1532
1533 /**
1534  * Handle CORE_PEER_CONNECTED request.   Notify client about connection
1535  * to the given neighbour.  For this request type, the client does not
1536  * have to have transmitted an INIT request.  All current peers are
1537  * returned, regardless of which message types they accept.
1538  *
1539  * @param cls unused
1540  * @param client client sending the iteration request
1541  * @param message iteration request message
1542  */
1543 void
1544 GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client,
1545                                       const struct GNUNET_MessageHeader *message)
1546 {
1547   struct GNUNET_MessageHeader done_msg;
1548   struct GNUNET_SERVER_TransmitContext *tc;
1549   const struct GNUNET_PeerIdentity *peer;
1550
1551   peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK!
1552   tc = GNUNET_SERVER_transmit_context_create (client);
1553   GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey,
1554                                               &queue_connect_message, tc);
1555   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1556   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1557   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1558   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1559 }
1560
1561
1562
1563 /**
1564  * Handle REQUEST_INFO request. For this request type, the client must
1565  * have transmitted an INIT first.
1566  *
1567  * @param cls unused
1568  * @param client client sending the request
1569  * @param message iteration request message
1570  */
1571 void
1572 GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client,
1573                                          const struct GNUNET_MessageHeader *message)
1574 {
1575   const struct RequestInfoMessage *rcm;
1576   struct GSC_Client *pos;
1577   struct Neighbour *n;
1578   struct ConfigurationInfoMessage cim;
1579   int32_t want_reserv;
1580   int32_t got_reserv;
1581   unsigned long long old_preference;
1582   struct GNUNET_TIME_Relative rdelay;
1583
1584   rdelay = GNUNET_TIME_relative_get_zero ();
1585 #if DEBUG_CORE_CLIENT
1586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n",
1587               "REQUEST_INFO");
1588 #endif
1589   rcm = (const struct RequestInfoMessage *) message;
1590   n = find_neighbour (&rcm->peer);
1591   memset (&cim, 0, sizeof (cim));
1592   if ((n != NULL) && (GNUNET_YES == n->is_connected))
1593   {
1594     want_reserv = ntohl (rcm->reserve_inbound);
1595     if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
1596     {
1597       n->bw_out_internal_limit = rcm->limit_outbound;
1598       if (n->bw_out.value__ !=
1599           GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1600                                       n->bw_out_external_limit).value__)
1601       {
1602         n->bw_out =
1603             GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1604                                         n->bw_out_external_limit);
1605         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
1606                                                n->bw_out);
1607         GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1608         handle_peer_status_change (n);
1609       }
1610     }
1611     if (want_reserv < 0)
1612     {
1613       got_reserv = want_reserv;
1614     }
1615     else if (want_reserv > 0)
1616     {
1617       rdelay =
1618           GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1619                                               want_reserv);
1620       if (rdelay.rel_value == 0)
1621         got_reserv = want_reserv;
1622       else
1623         got_reserv = 0;         /* all or nothing */
1624     }
1625     else
1626       got_reserv = 0;
1627     GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv);
1628     old_preference = n->current_preference;
1629     n->current_preference += GNUNET_ntohll (rcm->preference_change);
1630     if (old_preference > n->current_preference)
1631     {
1632       /* overflow; cap at maximum value */
1633       n->current_preference = ULLONG_MAX;
1634     }
1635     update_preference_sum (n->current_preference - old_preference);
1636 #if DEBUG_CORE_QUOTA
1637     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638                 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n",
1639                 (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv,
1640                 (unsigned long long) rdelay.rel_value);
1641 #endif
1642     cim.reserved_amount = htonl (got_reserv);
1643     cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
1644     cim.bw_out = n->bw_out;
1645     cim.preference = n->current_preference;
1646   }
1647   else
1648   {
1649     /* Technically, this COULD happen (due to asynchronous behavior),
1650      * but it should be rare, so we should generate an info event
1651      * to help diagnosis of serious errors that might be masked by this */
1652     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1653                 _
1654                 ("Client asked for preference change with peer `%s', which is not connected!\n"),
1655                 GNUNET_i2s (&rcm->peer));
1656     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1657     return;
1658   }
1659   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1660   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1661   cim.peer = rcm->peer;
1662   cim.rim_id = rcm->rim_id;
1663 #if DEBUG_CORE_CLIENT
1664   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1665               "CONFIGURATION_INFO");
1666 #endif
1667   GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO);
1668   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1669 }
1670
1671
1672 /**
1673  * Create a session, a key exchange was just completed.
1674  */
1675 void
1676 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer)
1677 {
1678     {
1679       struct GNUNET_MessageHeader *hdr;
1680
1681       hdr = compute_type_map_message ();
1682       send_type_map_to_neighbour (hdr, &n->peer.hashPubKey, n);
1683       GNUNET_free (hdr);
1684     }
1685     if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
1686     {
1687       n->bw_out_external_limit = t.inbound_bw_limit;
1688       n->bw_out =
1689           GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1690                                       n->bw_out_internal_limit);
1691       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1692                                              n->bw_out);
1693       GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1694     }
1695 #if DEBUG_CORE
1696     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697                 "Confirmed key via `%s' message for peer `%4s'\n", "PONG",
1698                 GNUNET_i2s (&n->peer));
1699 #endif
1700
1701
1702     size =
1703         sizeof (struct ConnectNotifyMessage) +
1704         (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1705     if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1706     {
1707       GNUNET_break (0);
1708       /* recovery strategy: throw away performance data */
1709       GNUNET_array_grow (n->ats, n->ats_count, 0);
1710       size =
1711           sizeof (struct PeerStatusNotifyMessage) +
1712           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1713     }
1714     cnm = (struct ConnectNotifyMessage *) buf;
1715     cnm->header.size = htons (size);
1716     cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1717     cnm->ats_count = htonl (n->ats_count);
1718     cnm->peer = n->peer;
1719     mats = &cnm->ats;
1720     memcpy (mats, n->ats,
1721             n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1722     mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1723     mats[n->ats_count].value = htonl (0);
1724     send_to_all_clients (&cnm->header, GNUNET_NO,
1725                          GNUNET_CORE_OPTION_SEND_CONNECT);
1726     process_encrypted_neighbour_queue (n);
1727     n->last_activity = GNUNET_TIME_absolute_get ();
1728
1729   if (n->status == PEER_STATE_KEY_CONFIRMED)
1730   {
1731     now = GNUNET_TIME_absolute_get ();
1732     n->last_activity = now;
1733     changed = GNUNET_YES;
1734     if (!up)
1735     {
1736       GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
1737                                 1, GNUNET_NO);
1738       n->time_established = now;
1739     }
1740     if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1741       GNUNET_SCHEDULER_cancel (n->keep_alive_task);
1742     n->keep_alive_task =
1743         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide
1744                                       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1745                                        2), &send_keep_alive, n);
1746   }
1747
1748
1749 }
1750
1751
1752 /**
1753  * Update information about a session.
1754  *
1755  * @param peer peer who's session should be updated
1756  * @param bw_out new outbound bandwidth limit for the peer
1757  * @param atsi performance information
1758  * @param atsi_count number of performance records supplied
1759  */
1760 void
1761 GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer,
1762                      struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1763                      const struct GNUNET_TRANSPORT_ATS_Information *atsi,
1764                      uint32_t atsi_count)
1765 {
1766   if (bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
1767   {
1768 #if DEBUG_CORE_SET_QUOTA
1769     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770                 "Received %u b/s as new inbound limit for peer `%4s'\n",
1771                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
1772                 GNUNET_i2s (&n->peer));
1773 #endif
1774     n->bw_out_external_limit = pt->inbound_bw_limit;
1775     n->bw_out =
1776         GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1777                                     n->bw_out_internal_limit);
1778     GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1779                                            n->bw_out);
1780     GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1781   }
1782
1783 }
1784
1785
1786 /**
1787  * Initialize sessions subsystem.
1788  */
1789 int
1790 GSC_SESSIONS_init ()
1791 {
1792   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
1793   self.public_key = &my_public_key;
1794   self.peer = my_identity;
1795   self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
1796   self.status = PEER_STATE_KEY_CONFIRMED;
1797   self.is_connected = GNUNET_YES;
1798   return GNUNET_OK;
1799 }
1800
1801
1802 /**
1803  * Shutdown sessions subsystem.
1804  */
1805 void
1806 GSC_SESSIONS_done ()
1807 {
1808   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
1809                                          NULL);
1810   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1811   neighbours = NULL;
1812   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1813                          0, GNUNET_NO);
1814 }