0d1fec2dd89ffb06c6bd01130a273d32a17580aa
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done) 
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-core.h"
28 #include "gnunet-service-core_neighbours.h"
29 #include "gnunet-service-core_kx.h"
30 #include "gnunet-service-core_sessions.h"
31
32 /**
33  * Record kept for each request for transmission issued by a
34  * client that is still pending.
35  */
36 struct GSC_ClientActiveRequest;
37
38 /**
39  * Data kept per session.
40  */
41 struct Session
42 {
43   /**
44    * Identity of the other peer.
45    */
46   struct GNUNET_PeerIdentity peer;
47
48   /**
49    * Head of list of requests from clients for transmission to
50    * this peer.
51    */
52   struct GSC_ClientActiveRequest *active_client_request_head;
53
54   /**
55    * Tail of list of requests from clients for transmission to
56    * this peer.
57    */
58   struct GSC_ClientActiveRequest *active_client_request_tail;
59
60   /**
61    * Performance data for the peer.
62    */
63   struct GNUNET_TRANSPORT_ATS_Information *ats;
64
65   /**
66    * Information about the key exchange with the other peer.
67    */
68   struct GSC_KeyExchangeInfo *kxinfo;
69
70   /**
71    * ID of task used for cleaning up dead neighbour entries.
72    */
73   GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
74
75   /**
76    * ID of task used for updating bandwidth quota for this neighbour.
77    */
78   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
79
80   /**
81    * At what time did we initially establish (as in, complete session
82    * key handshake) this connection?  Should be zero if status != KEY_CONFIRMED.
83    */
84   struct GNUNET_TIME_Absolute time_established;
85
86   /**
87    * At what time did we last receive an encrypted message from the
88    * other peer?  Should be zero if status != KEY_CONFIRMED.
89    */
90   struct GNUNET_TIME_Absolute last_activity;
91
92   /**
93    * How valueable were the messages of this peer recently?
94    */
95   unsigned long long current_preference;
96
97   /**
98    * Number of entries in 'ats'.
99    */
100   unsigned int ats_count;
101
102   /**
103    * Available bandwidth in for this peer (current target).
104    */
105   struct GNUNET_BANDWIDTH_Value32NBO bw_in;
106
107   /**
108    * Available bandwidth out for this peer (current target).
109    */
110   struct GNUNET_BANDWIDTH_Value32NBO bw_out;
111
112   /**
113    * Internal bandwidth limit set for this peer (initially typically
114    * set to "-1").  Actual "bw_out" is MIN of
115    * "bpm_out_internal_limit" and "bw_out_external_limit".
116    */
117   struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
118
119   /**
120    * External bandwidth limit set for this peer by the
121    * peer that we are communicating with.  "bw_out" is MIN of
122    * "bw_out_internal_limit" and "bw_out_external_limit".
123    */
124   struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
125
126 };
127
128
129 /**
130  * Map of peer identities to 'struct Session'.
131  */
132 static struct GNUNET_CONTAINER_MultiHashMap *sessions;
133
134
135 /**
136  * Session entry for "this" peer.
137  */
138 static struct Session self;
139
140 /**
141  * Sum of all preferences among all neighbours.
142  */
143 static unsigned long long preference_sum;
144
145
146 // FIXME.........
147
148 /**
149  * At what time should the connection to the given neighbour
150  * time out (given no further activity?)
151  *
152  * @param n neighbour in question
153  * @return absolute timeout
154  */
155 static struct GNUNET_TIME_Absolute
156 get_neighbour_timeout (struct Neighbour *n)
157 {
158   return GNUNET_TIME_absolute_add (n->last_activity,
159                                    GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
160 }
161
162
163 /**
164  * Helper function for update_preference_sum.
165  */
166 static int
167 update_preference (void *cls, const GNUNET_HashCode * key, void *value)
168 {
169   unsigned long long *ps = cls;
170   struct Neighbour *n = value;
171
172   n->current_preference /= 2;
173   *ps += n->current_preference;
174   return GNUNET_OK;
175 }
176
177
178 /**
179  * A preference value for a neighbour was update.  Update
180  * the preference sum accordingly.
181  *
182  * @param inc how much was a preference value increased?
183  */
184 static void
185 update_preference_sum (unsigned long long inc)
186 {
187   unsigned long long os;
188
189   os = preference_sum;
190   preference_sum += inc;
191   if (preference_sum >= os)
192     return;                     /* done! */
193   /* overflow! compensate by cutting all values in half! */
194   preference_sum = 0;
195   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference,
196                                          &preference_sum);
197   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"),
198                          preference_sum, GNUNET_NO);
199 }
200
201
202 /**
203  * Find the entry for the given neighbour.
204  *
205  * @param peer identity of the neighbour
206  * @return NULL if we are not connected, otherwise the
207  *         neighbour's entry.
208  */
209 static struct Neighbour *
210 find_neighbour (const struct GNUNET_PeerIdentity *peer)
211 {
212   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
213 }
214
215
216 /**
217  * Function called by transport telling us that a peer
218  * changed status.
219  *
220  * @param n the peer that changed status
221  */
222 static void
223 handle_peer_status_change (struct Neighbour *n)
224 {
225   struct PeerStatusNotifyMessage *psnm;
226   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
227   struct GNUNET_TRANSPORT_ATS_Information *ats;
228   size_t size;
229
230   if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED))
231     return;
232 #if DEBUG_CORE > 1
233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n",
234               GNUNET_i2s (&n->peer));
235 #endif
236   size =
237       sizeof (struct PeerStatusNotifyMessage) +
238       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
239   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
240   {
241     GNUNET_break (0);
242     /* recovery strategy: throw away performance data */
243     GNUNET_array_grow (n->ats, n->ats_count, 0);
244     size =
245         sizeof (struct PeerStatusNotifyMessage) +
246         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
247   }
248   psnm = (struct PeerStatusNotifyMessage *) buf;
249   psnm->header.size = htons (size);
250   psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE);
251   psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n));
252   psnm->bandwidth_in = n->bw_in;
253   psnm->bandwidth_out = n->bw_out;
254   psnm->peer = n->peer;
255   psnm->ats_count = htonl (n->ats_count);
256   ats = &psnm->ats;
257   memcpy (ats, n->ats,
258           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
259   ats[n->ats_count].type = htonl (0);
260   ats[n->ats_count].value = htonl (0);
261   send_to_all_clients (&psnm->header, GNUNET_YES,
262                        GNUNET_CORE_OPTION_SEND_STATUS_CHANGE);
263   GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1,
264                             GNUNET_NO);
265 }
266
267
268
269 /**
270  * Go over our message queue and if it is not too long, go
271  * over the pending requests from clients for this
272  * neighbour and send some clients a 'READY' notification.
273  *
274  * @param n which peer to process
275  */
276 static void
277 schedule_peer_messages (struct Neighbour *n)
278 {
279   struct GSC_ClientActiveRequest *car;
280   struct GSC_ClientActiveRequest *pos;
281   struct Client *c;
282   struct MessageEntry *mqe;
283   unsigned int queue_size;
284
285   /* check if neighbour queue is empty enough! */
286   if (n != &self)
287   {
288     queue_size = 0;
289     mqe = n->messages;
290     while (mqe != NULL)
291     {
292       queue_size++;
293       mqe = mqe->next;
294     }
295     if (queue_size >= MAX_PEER_QUEUE_SIZE)
296     {
297 #if DEBUG_CORE_CLIENT
298       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299                   "Not considering client transmission requests: queue full\n");
300 #endif
301       return;                   /* queue still full */
302     }
303     /* find highest priority request */
304     pos = n->active_client_request_head;
305     car = NULL;
306     while (pos != NULL)
307     {
308       if ((car == NULL) || (pos->priority > car->priority))
309         car = pos;
310       pos = pos->next;
311     }
312   }
313   else
314   {
315     car = n->active_client_request_head;
316   }
317   if (car == NULL)
318     return;                     /* no pending requests */
319 #if DEBUG_CORE_CLIENT
320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321               "Permitting client transmission request to `%s'\n",
322               GNUNET_i2s (&n->peer));
323 #endif
324   GSC_CLIENTS_solicite_request (car);
325 }
326
327
328
329 /**
330  * Free the given entry for the neighbour (it has
331  * already been removed from the list at this point).
332  *
333  * @param n neighbour to free
334  */
335 static void
336 free_neighbour (struct Neighbour *n)
337 {
338   struct MessageEntry *m;
339   struct GSC_ClientActiveRequest *car;
340
341 #if DEBUG_CORE
342   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
343               "Destroying neighbour entry for peer `%4s'\n",
344               GNUNET_i2s (&n->peer));
345 #endif
346   if (n->skm != NULL)
347   {
348     GNUNET_free (n->skm);
349     n->skm = NULL;
350   }
351   while (NULL != (m = n->messages))
352   {
353     n->messages = m->next;
354     GNUNET_free (m);
355   }
356   while (NULL != (m = n->encrypted_head))
357   {
358     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
359     GNUNET_free (m);
360   }
361   while (NULL != (car = n->active_client_request_head))
362   {
363     GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
364                                  n->active_client_request_tail, car);
365     GNUNET_assert (GNUNET_YES ==
366                    GNUNET_CONTAINER_multihashmap_remove (car->client->requests,
367                                                          &n->peer.hashPubKey,
368                                                          car));
369     GNUNET_free (car);
370   }
371   if (NULL != n->th)
372   {
373     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
374     n->th = NULL;
375   }
376   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
377     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
378   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
379     GNUNET_SCHEDULER_cancel (n->quota_update_task);
380   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
381     GNUNET_SCHEDULER_cancel (n->keep_alive_task);
382   if (n->status == PEER_STATE_KEY_CONFIRMED)
383     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
384                               -1, GNUNET_NO);
385   GNUNET_array_grow (n->ats, n->ats_count, 0);
386   GNUNET_free_non_null (n->pending_ping);
387   GNUNET_free_non_null (n->pending_pong);
388   GNUNET_free (n);
389 }
390
391
392
393 /**
394  * Consider freeing the given neighbour since we may not need
395  * to keep it around anymore.
396  *
397  * @param n neighbour to consider discarding
398  */
399 static void
400 consider_free_neighbour (struct Neighbour *n);
401
402
403 /**
404  * Task triggered when a neighbour entry might have gotten stale.
405  *
406  * @param cls the 'struct Neighbour'
407  * @param tc scheduler context (not used)
408  */
409 static void
410 consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
411 {
412   struct Neighbour *n = cls;
413
414   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
415   consider_free_neighbour (n);
416 }
417
418
419 /**
420  * Consider freeing the given neighbour since we may not need
421  * to keep it around anymore.
422  *
423  * @param n neighbour to consider discarding
424  */
425 static void
426 consider_free_neighbour (struct Neighbour *n)
427 {
428   struct GNUNET_TIME_Relative left;
429
430   if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected))
431     return;                     /* no chance */
432
433   left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
434   if (left.rel_value > 0)
435   {
436     if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
437       GNUNET_SCHEDULER_cancel (n->dead_clean_task);
438     n->dead_clean_task =
439         GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n);
440     return;
441   }
442   /* actually free the neighbour... */
443   GNUNET_assert (GNUNET_YES ==
444                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
445                                                        &n->peer.hashPubKey, n));
446   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
447                          GNUNET_CONTAINER_multihashmap_size (neighbours),
448                          GNUNET_NO);
449   free_neighbour (n);
450 }
451
452
453 /**
454  * Function called when the transport service is ready to
455  * receive an encrypted message for the respective peer
456  *
457  * @param cls neighbour to use message from
458  * @param size number of bytes we can transmit
459  * @param buf where to copy the message
460  * @return number of bytes transmitted
461  */
462 static size_t
463 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
464 {
465   struct Neighbour *n = cls;
466   struct MessageEntry *m;
467   size_t ret;
468   char *cbuf;
469
470   n->th = NULL;
471   m = n->encrypted_head;
472   if (m == NULL)
473   {
474 #if DEBUG_CORE
475     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476                 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
477                 GNUNET_i2s (&n->peer));
478 #endif
479     return 0;
480   }
481   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
482   ret = 0;
483   cbuf = buf;
484   if (buf != NULL)
485   {
486     GNUNET_assert (size >= m->size);
487     memcpy (cbuf, &m[1], m->size);
488     ret = m->size;
489     GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
490 #if DEBUG_CORE
491     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492                 "Copied message of type %u and size %u into transport buffer for `%4s'\n",
493                 (unsigned int)
494                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
495                 (unsigned int) ret, GNUNET_i2s (&n->peer));
496 #endif
497     process_encrypted_neighbour_queue (n);
498   }
499   else
500   {
501 #if DEBUG_CORE
502     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503                 "Transmission of message of type %u and size %u failed\n",
504                 (unsigned int)
505                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
506                 (unsigned int) m->size);
507 #endif
508   }
509   GNUNET_free (m);
510   consider_free_neighbour (n);
511   GNUNET_STATISTICS_update (stats,
512                             gettext_noop
513                             ("# encrypted bytes given to transport"), ret,
514                             GNUNET_NO);
515   return ret;
516 }
517
518
519
520
521
522 /**
523  * Select messages for transmission.  This heuristic uses a combination
524  * of earliest deadline first (EDF) scheduling (with bounded horizon)
525  * and priority-based discard (in case no feasible schedule exist) and
526  * speculative optimization (defer any kind of transmission until
527  * we either create a batch of significant size, 25% of max, or until
528  * we are close to a deadline).  Furthermore, when scheduling the
529  * heuristic also packs as many messages into the batch as possible,
530  * starting with those with the earliest deadline.  Yes, this is fun.
531  *
532  * @param n neighbour to select messages from
533  * @param size number of bytes to select for transmission
534  * @param retry_time set to the time when we should try again
535  *        (only valid if this function returns zero)
536  * @return number of bytes selected, or 0 if we decided to
537  *         defer scheduling overall; in that case, retry_time is set.
538  */
539 static size_t
540 select_messages (struct Neighbour *n, size_t size,
541                  struct GNUNET_TIME_Relative *retry_time)
542 {
543   struct MessageEntry *pos;
544   struct MessageEntry *min;
545   struct MessageEntry *last;
546   unsigned int min_prio;
547   struct GNUNET_TIME_Absolute t;
548   struct GNUNET_TIME_Absolute now;
549   struct GNUNET_TIME_Relative delta;
550   uint64_t avail;
551   struct GNUNET_TIME_Relative slack;    /* how long could we wait before missing deadlines? */
552   size_t off;
553   uint64_t tsize;
554   unsigned int queue_size;
555   int discard_low_prio;
556
557   GNUNET_assert (NULL != n->messages);
558   now = GNUNET_TIME_absolute_get ();
559   /* last entry in linked list of messages processed */
560   last = NULL;
561   /* should we remove the entry with the lowest
562    * priority from consideration for scheduling at the
563    * end of the loop? */
564   queue_size = 0;
565   tsize = 0;
566   pos = n->messages;
567   while (pos != NULL)
568   {
569     queue_size++;
570     tsize += pos->size;
571     pos = pos->next;
572   }
573   discard_low_prio = GNUNET_YES;
574   while (GNUNET_YES == discard_low_prio)
575   {
576     min = NULL;
577     min_prio = UINT_MAX;
578     discard_low_prio = GNUNET_NO;
579     /* calculate number of bytes available for transmission at time "t" */
580     avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
581     t = now;
582     /* how many bytes have we (hypothetically) scheduled so far */
583     off = 0;
584     /* maximum time we can wait before transmitting anything
585      * and still make all of our deadlines */
586     slack = GNUNET_TIME_UNIT_FOREVER_REL;
587     pos = n->messages;
588     /* note that we use "*2" here because we want to look
589      * a bit further into the future; much more makes no
590      * sense since new message might be scheduled in the
591      * meantime... */
592     while ((pos != NULL) && (off < size * 2))
593     {
594       if (pos->do_transmit == GNUNET_YES)
595       {
596         /* already removed from consideration */
597         pos = pos->next;
598         continue;
599       }
600       if (discard_low_prio == GNUNET_NO)
601       {
602         delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
603         if (delta.rel_value > 0)
604         {
605           // FIXME: HUH? Check!
606           t = pos->deadline;
607           avail +=
608               GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
609         }
610         if (avail < pos->size)
611         {
612           // FIXME: HUH? Check!
613           discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
614         }
615         else
616         {
617           avail -= pos->size;
618           /* update slack, considering both its absolute deadline
619            * and relative deadlines caused by other messages
620            * with their respective load */
621           slack =
622               GNUNET_TIME_relative_min (slack,
623                                         GNUNET_BANDWIDTH_value_get_delay_for
624                                         (n->bw_out, avail));
625           if (pos->deadline.abs_value <= now.abs_value)
626           {
627             /* now or never */
628             slack = GNUNET_TIME_UNIT_ZERO;
629           }
630           else if (GNUNET_YES == pos->got_slack)
631           {
632             /* should be soon now! */
633             slack =
634                 GNUNET_TIME_relative_min (slack,
635                                           GNUNET_TIME_absolute_get_remaining
636                                           (pos->slack_deadline));
637           }
638           else
639           {
640             slack =
641                 GNUNET_TIME_relative_min (slack,
642                                           GNUNET_TIME_absolute_get_difference
643                                           (now, pos->deadline));
644             pos->got_slack = GNUNET_YES;
645             pos->slack_deadline =
646                 GNUNET_TIME_absolute_min (pos->deadline,
647                                           GNUNET_TIME_relative_to_absolute
648                                           (GNUNET_CONSTANTS_MAX_CORK_DELAY));
649           }
650         }
651       }
652       off += pos->size;
653       t = GNUNET_TIME_absolute_max (pos->deadline, t);  // HUH? Check!
654       if (pos->priority <= min_prio)
655       {
656         /* update min for discard */
657         min_prio = pos->priority;
658         min = pos;
659       }
660       pos = pos->next;
661     }
662     if (discard_low_prio)
663     {
664       GNUNET_assert (min != NULL);
665       /* remove lowest-priority entry from consideration */
666       min->do_transmit = GNUNET_YES;    /* means: discard (for now) */
667     }
668     last = pos;
669   }
670   /* guard against sending "tiny" messages with large headers without
671    * urgent deadlines */
672   if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
673       (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
674   {
675     /* less than 25% of message would be filled with deadlines still
676      * being met if we delay by one second or more; so just wait for
677      * more data; but do not wait longer than 1s (since we don't want
678      * to delay messages for a really long time either). */
679     *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
680     /* reset do_transmit values for next time */
681     while (pos != last)
682     {
683       pos->do_transmit = GNUNET_NO;
684       pos = pos->next;
685     }
686     GNUNET_STATISTICS_update (stats,
687                               gettext_noop
688                               ("# transmissions delayed due to corking"), 1,
689                               GNUNET_NO);
690 #if DEBUG_CORE
691     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
692                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
693                 (unsigned long long) retry_time->rel_value, (unsigned int) off,
694                 (unsigned int) size);
695 #endif
696     return 0;
697   }
698   /* select marked messages (up to size) for transmission */
699   off = 0;
700   pos = n->messages;
701   while (pos != last)
702   {
703     if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
704     {
705       pos->do_transmit = GNUNET_YES;    /* mark for transmission */
706       off += pos->size;
707       size -= pos->size;
708 #if DEBUG_CORE > 1
709       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                   "Selecting message of size %u for transmission\n",
711                   (unsigned int) pos->size);
712 #endif
713     }
714     else
715     {
716 #if DEBUG_CORE > 1
717       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718                   "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
719                   (unsigned int) pos->size, size);
720 #endif
721       pos->do_transmit = GNUNET_NO;     /* mark for not transmitting! */
722     }
723     pos = pos->next;
724   }
725 #if DEBUG_CORE
726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727               "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
728               (unsigned long long) off, (unsigned long long) tsize, queue_size,
729               (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
730 #endif
731   return off;
732 }
733
734
735 /**
736  * Batch multiple messages into a larger buffer.
737  *
738  * @param n neighbour to take messages from
739  * @param buf target buffer
740  * @param size size of buf
741  * @param deadline set to transmission deadline for the result
742  * @param retry_time set to the time when we should try again
743  *        (only valid if this function returns zero)
744  * @param priority set to the priority of the batch
745  * @return number of bytes written to buf (can be zero)
746  */
747 static size_t
748 batch_message (struct Neighbour *n, char *buf, size_t size,
749                struct GNUNET_TIME_Absolute *deadline,
750                struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
751 {
752   char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
753   struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
754   struct MessageEntry *pos;
755   struct MessageEntry *prev;
756   struct MessageEntry *next;
757   size_t ret;
758
759   ret = 0;
760   *priority = 0;
761   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
762   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
763   if (0 == select_messages (n, size, retry_time))
764   {
765 #if DEBUG_CORE
766     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767                 "No messages selected, will try again in %llu ms\n",
768                 retry_time->rel_value);
769 #endif
770     return 0;
771   }
772   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
773   ntm->ats_count = htonl (0);
774   ntm->ats.type = htonl (0);
775   ntm->ats.value = htonl (0);
776   ntm->peer = n->peer;
777   pos = n->messages;
778   prev = NULL;
779   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
780   {
781     next = pos->next;
782     if (GNUNET_YES == pos->do_transmit)
783     {
784       GNUNET_assert (pos->size <= size);
785       /* do notifications */
786       /* FIXME: track if we have *any* client that wants
787        * full notifications and only do this if that is
788        * actually true */
789       if (pos->size <
790           GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
791       {
792         memcpy (&ntm[1], &pos[1], pos->size);
793         ntm->header.size =
794             htons (sizeof (struct NotifyTrafficMessage) +
795                    sizeof (struct GNUNET_MessageHeader));
796         send_to_all_clients (&ntm->header, GNUNET_YES,
797                              GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
798       }
799       else
800       {
801         /* message too large for 'full' notifications, we do at
802          * least the 'hdr' type */
803         memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
804       }
805       ntm->header.size =
806           htons (sizeof (struct NotifyTrafficMessage) + pos->size);
807       send_to_all_clients (&ntm->header, GNUNET_YES,
808                            GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
809 #if DEBUG_HANDSHAKE
810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                   "Encrypting %u bytes with message of type %u and size %u\n",
812                   pos->size,
813                   (unsigned int)
814                   ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
815                   (unsigned int)
816                   ntohs (((const struct GNUNET_MessageHeader *)
817                           &pos[1])->size));
818 #endif
819       /* copy for encrypted transmission */
820       memcpy (&buf[ret], &pos[1], pos->size);
821       ret += pos->size;
822       size -= pos->size;
823       *priority += pos->priority;
824 #if DEBUG_CORE > 1
825       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826                   "Adding plaintext message of size %u with deadline %llu ms to batch\n",
827                   (unsigned int) pos->size,
828                   (unsigned long long)
829                   GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
830 #endif
831       deadline->abs_value =
832           GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
833       GNUNET_free (pos);
834       if (prev == NULL)
835         n->messages = next;
836       else
837         prev->next = next;
838     }
839     else
840     {
841       prev = pos;
842     }
843     pos = next;
844   }
845 #if DEBUG_CORE > 1
846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847               "Deadline for message batch is %llu ms\n",
848               GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
849 #endif
850   return ret;
851 }
852
853
854 /**
855  * Remove messages with deadlines that have long expired from
856  * the queue.
857  *
858  * @param n neighbour to inspect
859  */
860 static void
861 discard_expired_messages (struct Neighbour *n)
862 {
863   struct MessageEntry *prev;
864   struct MessageEntry *next;
865   struct MessageEntry *pos;
866   struct GNUNET_TIME_Absolute now;
867   struct GNUNET_TIME_Relative delta;
868   int disc;
869   unsigned int queue_length;
870
871   disc = GNUNET_NO;
872   now = GNUNET_TIME_absolute_get ();
873   prev = NULL;
874   queue_length = 0;
875   pos = n->messages;
876   while (pos != NULL)
877   {
878     queue_length++;
879     next = pos->next;
880     delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
881     if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
882     {
883 #if DEBUG_CORE
884       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
885                   "Message is %llu ms past due, discarding.\n",
886                   delta.rel_value);
887 #endif
888       if (prev == NULL)
889         n->messages = next;
890       else
891         prev->next = next;
892       GNUNET_STATISTICS_update (stats,
893                                 gettext_noop
894                                 ("# messages discarded (expired prior to transmission)"),
895                                 1, GNUNET_NO);
896       disc = GNUNET_YES;
897       GNUNET_free (pos);
898     }
899     else
900       prev = pos;
901     pos = next;
902   }
903   if ( (GNUNET_YES == disc) &&
904        (queue_length == MAX_PEER_QUEUE_SIZE) )
905     schedule_peer_messages (n);
906 }
907
908
909 /**
910  * Signature of the main function of a task.
911  *
912  * @param cls closure
913  * @param tc context information (why was this task triggered now)
914  */
915 static void
916 retry_plaintext_processing (void *cls,
917                             const struct GNUNET_SCHEDULER_TaskContext *tc)
918 {
919   struct Neighbour *n = cls;
920
921   n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
922   process_plaintext_neighbour_queue (n);
923 }
924
925
926 /**
927  * Check if we have plaintext messages for the specified neighbour
928  * pending, and if so, consider batching and encrypting them (and
929  * then trigger processing of the encrypted queue if needed).
930  *
931  * @param n neighbour to check.
932  */
933 static void
934 process_plaintext_neighbour_queue (struct Neighbour *n)
935 {
936   char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)];    /* plaintext */
937   size_t used;
938   struct EncryptedMessage *em;  /* encrypted message */
939   struct EncryptedMessage *ph;  /* plaintext header */
940   struct MessageEntry *me;
941   unsigned int priority;
942   struct GNUNET_TIME_Absolute deadline;
943   struct GNUNET_TIME_Relative retry_time;
944   struct GNUNET_CRYPTO_AesInitializationVector iv;
945   struct GNUNET_CRYPTO_AuthKey auth_key;
946
947   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
948   {
949     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
950     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
951   }
952   switch (n->status)
953   {
954   case PEER_STATE_DOWN:
955     send_key (n);
956 #if DEBUG_CORE
957     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
959                 GNUNET_i2s (&n->peer));
960 #endif
961     return;
962   case PEER_STATE_KEY_SENT:
963     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
964       n->retry_set_key_task =
965           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
966                                         &set_key_retry_task, n);
967 #if DEBUG_CORE
968     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
970                 GNUNET_i2s (&n->peer));
971 #endif
972     return;
973   case PEER_STATE_KEY_RECEIVED:
974     if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
975       n->retry_set_key_task =
976           GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
977                                         &set_key_retry_task, n);
978 #if DEBUG_CORE
979     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
981                 GNUNET_i2s (&n->peer));
982 #endif
983     return;
984   case PEER_STATE_KEY_CONFIRMED:
985     /* ready to continue */
986     break;
987   }
988   discard_expired_messages (n);
989   if (n->messages == NULL)
990   {
991 #if DEBUG_CORE
992     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993                 "Plaintext message queue for `%4s' is empty.\n",
994                 GNUNET_i2s (&n->peer));
995 #endif
996     return;                     /* no pending messages */
997   }
998   if (n->encrypted_head != NULL)
999   {
1000 #if DEBUG_CORE > 2
1001     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                 "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1003                 GNUNET_i2s (&n->peer));
1004 #endif
1005     return;                     /* wait for messages already encrypted to be
1006                                  * processed first! */
1007   }
1008   ph = (struct EncryptedMessage *) pbuf;
1009   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1010   priority = 0;
1011   used = sizeof (struct EncryptedMessage);
1012   used +=
1013       batch_message (n, &pbuf[used],
1014                      GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
1015                      &retry_time, &priority);
1016   if (used == sizeof (struct EncryptedMessage))
1017   {
1018 #if DEBUG_CORE > 1
1019     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020                 "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1021                 GNUNET_i2s (&n->peer));
1022 #endif
1023     /* no messages selected for sending, try again later... */
1024     n->retry_plaintext_task =
1025         GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
1026                                       n);
1027     return;
1028   }
1029   GSC_KX_encrypt_and_transmit (n->kx,
1030                                &pbuf[struct EncryptedMessage],
1031                                used - sizeof (struct EncryptedMessage));
1032   schedule_peer_messages (n);
1033 }
1034
1035
1036
1037
1038 /**
1039  * Check if we have encrypted messages for the specified neighbour
1040  * pending, and if so, check with the transport about sending them
1041  * out.
1042  *
1043  * @param n neighbour to check.
1044  */
1045 static void
1046 process_encrypted_neighbour_queue (struct Neighbour *n)
1047 {
1048   struct MessageEntry *m;
1049
1050   if (n->th != NULL)
1051     return;                     /* request already pending */
1052   if (GNUNET_YES != n->is_connected)
1053   {
1054     GNUNET_break (0);
1055     return;
1056   }
1057   m = n->encrypted_head;
1058   if (m == NULL)
1059   {
1060     /* encrypted queue empty, try plaintext instead */
1061     process_plaintext_neighbour_queue (n);
1062     return;
1063   }
1064 #if DEBUG_CORE > 1
1065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1067               (unsigned int) m->size, GNUNET_i2s (&n->peer),
1068               (unsigned long long)
1069               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
1070 #endif
1071   n->th =
1072        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
1073                                               m->priority,
1074                                               GNUNET_TIME_absolute_get_remaining
1075                                               (m->deadline),
1076                                               &notify_encrypted_transmit_ready,
1077                                               n);
1078   if (n->th == NULL)
1079   {
1080     /* message request too large or duplicate request */
1081     GNUNET_break (0);
1082     /* discard encrypted message */
1083     GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
1084     GNUNET_free (m);
1085     process_encrypted_neighbour_queue (n);
1086   }
1087 }
1088
1089
1090 /**
1091  * Initialize a new 'struct Neighbour'.
1092  *
1093  * @param pid ID of the new neighbour
1094  * @return handle for the new neighbour
1095  */
1096 static struct Neighbour *
1097 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1098 {
1099   struct Neighbour *n;
1100   struct GNUNET_TIME_Absolute now;
1101
1102 #if DEBUG_CORE
1103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104               "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid));
1105 #endif
1106   n = GNUNET_malloc (sizeof (struct Neighbour));
1107   n->peer = *pid;
1108   n->last_activity = GNUNET_TIME_absolute_get ();
1109   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1110   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1111   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
1112   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1113   n->ping_challenge =
1114       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1115   GNUNET_assert (GNUNET_OK ==
1116                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1117                                                     &n->peer.hashPubKey, n,
1118                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1119   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1120                          GNUNET_CONTAINER_multihashmap_size (neighbours),
1121                          GNUNET_NO);
1122   neighbour_quota_update (n, NULL);
1123   consider_free_neighbour (n);
1124   return n;
1125 }
1126
1127
1128
1129 /**
1130  * We have a new client, notify it about all current sessions.
1131  *
1132  * @param client the new client
1133  */
1134 void
1135 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1136 {
1137   /* notify new client about existing neighbours */
1138   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
1139                                          &notify_client_about_neighbour, client);
1140 }
1141
1142
1143 /**
1144  * Queue a request from a client for transmission to a particular peer.
1145  *
1146  * @param car request to queue; this handle is then shared between
1147  *         the caller (CLIENTS subsystem) and SESSIONS and must not
1148  *         be released by either until either 'GNUNET_SESSIONS_dequeue',
1149  *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
1150  *         have been invoked on it
1151  */
1152 void
1153 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
1154 {
1155   struct Neighbour *n; // FIXME: session...
1156
1157   n = find_neighbour (&car->peer);
1158   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1159       (n->status != PEER_STATE_KEY_CONFIRMED))
1160   {
1161     /* neighbour must have disconnected since request was issued,
1162      * ignore (client will realize it once it processes the
1163      * disconnect notification) */
1164 #if DEBUG_CORE_CLIENT
1165     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166                 "Dropped client request for transmission (am disconnected)\n");
1167 #endif
1168     GNUNET_STATISTICS_update (stats,
1169                               gettext_noop
1170                               ("# send requests dropped (disconnected)"), 1,
1171                               GNUNET_NO);
1172     GSC_CLIENTS_reject_requests (car);
1173     return;
1174   }
1175 #if DEBUG_CORE_CLIENT
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "Received client transmission request. queueing\n");
1178 #endif
1179     GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
1180                                  n->active_client_request_tail, car);
1181
1182   // schedule_peer_messages (n);
1183 }
1184
1185
1186 /**
1187  * Dequeue a request from a client from transmission to a particular peer.
1188  *
1189  * @param car request to dequeue; this handle will then be 'owned' by
1190  *        the caller (CLIENTS sysbsystem)
1191  */
1192 void
1193 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
1194 {
1195   struct Session *s;
1196
1197   s = find_session (&car->peer);
1198   GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
1199                                s->active_client_request_tail, car);
1200 }
1201
1202
1203
1204 /**
1205  * Transmit a message to a particular peer.
1206  *
1207  * @param car original request that was queued and then solicited;
1208  *            this handle will now be 'owned' by the SESSIONS subsystem
1209  * @param msg message to transmit
1210  */
1211 void
1212 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1213                        const struct GNUNET_MessageHeader *msg)
1214 {
1215   struct MessageEntry *prev;
1216   struct MessageEntry *pos;
1217   struct MessageEntry *e;
1218   struct MessageEntry *min_prio_entry;
1219   struct MessageEntry *min_prio_prev;
1220   unsigned int min_prio;
1221   unsigned int queue_size;
1222
1223   n = find_neighbour (&sm->peer);
1224   if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1225       (n->status != PEER_STATE_KEY_CONFIRMED))
1226   {
1227     /* attempt to send message to peer that is not connected anymore
1228      * (can happen due to asynchrony) */
1229     GNUNET_STATISTICS_update (stats,
1230                               gettext_noop
1231                               ("# messages discarded (disconnected)"), 1,
1232                               GNUNET_NO);
1233     if (client != NULL)
1234       GNUNET_SERVER_receive_done (client, GNUNET_OK);
1235     return;
1236   }
1237 #if DEBUG_CORE
1238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1240               "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
1241 #endif
1242   discard_expired_messages (n);
1243   /* bound queue size */
1244   /* NOTE: this entire block to bound the queue size should be
1245    * obsolete with the new client-request code and the
1246    * 'schedule_peer_messages' mechanism; we still have this code in
1247    * here for now as a sanity check for the new mechanmism;
1248    * ultimately, we should probably simply reject SEND messages that
1249    * are not 'approved' (or provide a new core API for very unreliable
1250    * delivery that always sends with priority 0).  Food for thought. */
1251   min_prio = UINT32_MAX;
1252   min_prio_entry = NULL;
1253   min_prio_prev = NULL;
1254   queue_size = 0;
1255   prev = NULL;
1256   pos = n->messages;
1257   while (pos != NULL)
1258   {
1259     if (pos->priority <= min_prio)
1260     {
1261       min_prio_entry = pos;
1262       min_prio_prev = prev;
1263       min_prio = pos->priority;
1264     }
1265     queue_size++;
1266     prev = pos;
1267     pos = pos->next;
1268   }
1269   if (queue_size >= MAX_PEER_QUEUE_SIZE)
1270   {
1271     /* queue full */
1272     if (ntohl (sm->priority) <= min_prio)
1273     {
1274       /* discard new entry; this should no longer happen! */
1275       GNUNET_break (0);
1276 #if DEBUG_CORE
1277       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278                   "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
1279                   queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
1280                   (unsigned int) msize, (unsigned int) ntohs (message->type));
1281 #endif
1282       GNUNET_STATISTICS_update (stats,
1283                                 gettext_noop ("# discarded CORE_SEND requests"),
1284                                 1, GNUNET_NO);
1285
1286       if (client != NULL)
1287         GNUNET_SERVER_receive_done (client, GNUNET_OK);
1288       return;
1289     }
1290     GNUNET_assert (min_prio_entry != NULL);
1291     /* discard "min_prio_entry" */
1292 #if DEBUG_CORE
1293     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                 "Queue full, discarding existing older request\n");
1295 #endif
1296     GNUNET_STATISTICS_update (stats,
1297                               gettext_noop
1298                               ("# discarded lower priority CORE_SEND requests"),
1299                               1, GNUNET_NO);
1300     if (min_prio_prev == NULL)
1301       n->messages = min_prio_entry->next;
1302     else
1303       min_prio_prev->next = min_prio_entry->next;
1304     GNUNET_free (min_prio_entry);
1305   }
1306
1307 #if DEBUG_CORE
1308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309               "Adding transmission request for `%4s' of size %u to queue\n",
1310               GNUNET_i2s (&sm->peer), (unsigned int) msize);
1311 #endif
1312   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1313   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1314   e->priority = ntohl (sm->priority);
1315   e->size = msize;
1316   if (GNUNET_YES != (int) ntohl (sm->cork))
1317     e->got_slack = GNUNET_YES;
1318   memcpy (&e[1], &sm[1], msize);
1319
1320   /* insert, keep list sorted by deadline */
1321   prev = NULL;
1322   pos = n->messages;
1323   while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
1324   {
1325     prev = pos;
1326     pos = pos->next;
1327   }
1328   if (prev == NULL)
1329     n->messages = e;
1330   else
1331     prev->next = e;
1332   e->next = pos;
1333
1334   /* consider scheduling now */
1335   process_plaintext_neighbour_queue (n);
1336
1337 }
1338
1339
1340
1341 /**
1342  * Send a message to the neighbour.
1343  *
1344  * @param cls the message
1345  * @param key neighbour's identity
1346  * @param value 'struct Neighbour' of the target
1347  * @return always GNUNET_OK
1348  */
1349 static int
1350 do_send_message (void *cls, const GNUNET_HashCode * key, void *value)
1351 {
1352   struct GNUNET_MessageHeader *hdr = cls;
1353   struct Neighbour *n = value;
1354   struct MessageEntry *m;
1355   uint16_t size;
1356
1357   size = ntohs (hdr->size);
1358   m = GNUNET_malloc (sizeof (struct MessageEntry) + size);
1359   memcpy (&m[1], hdr, size);
1360   m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1361   m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1362   m->priority = UINT_MAX;
1363   m->sender_status = n->status;
1364   m->size = size;
1365   GNUNET_CONTAINER_DLL_insert (n->message_head,
1366                                n->message_tail,
1367                                m);
1368   return GNUNET_OK;
1369 }
1370
1371
1372 /**
1373  * Broadcast a message to all neighbours.
1374  *
1375  * @param msg message to transmit
1376  */
1377 void
1378 GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
1379 {
1380   if (NULL == sessions)
1381     return;
1382   GNUNET_CONTAINER_multihashmap_iterate (sessions,
1383                                          &do_send_message, msg);
1384 }
1385
1386
1387 /**
1388  * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
1389  *
1390  * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
1391  * @param key identity of the connected peer
1392  * @param value the 'struct Neighbour' for the peer
1393  * @return GNUNET_OK (continue to iterate)
1394  */
1395 static int
1396 queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value)
1397 {
1398   struct GNUNET_SERVER_TransmitContext *tc = cls;
1399   struct Neighbour *n = value;
1400   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
1401   struct GNUNET_TRANSPORT_ATS_Information *ats;
1402   size_t size;
1403   struct ConnectNotifyMessage *cnm;
1404
1405   cnm = (struct ConnectNotifyMessage *) buf;
1406   if (n->status != PEER_STATE_KEY_CONFIRMED)
1407     return GNUNET_OK;
1408   size =
1409       sizeof (struct ConnectNotifyMessage) +
1410       (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1411   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1412   {
1413     GNUNET_break (0);
1414     /* recovery strategy: throw away performance data */
1415     GNUNET_array_grow (n->ats, n->ats_count, 0);
1416     size =
1417         sizeof (struct PeerStatusNotifyMessage) +
1418         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1419   }
1420   cnm = (struct ConnectNotifyMessage *) buf;
1421   cnm->header.size = htons (size);
1422   cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1423   cnm->ats_count = htonl (n->ats_count);
1424   ats = &cnm->ats;
1425   memcpy (ats, n->ats,
1426           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1427   ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1428   ats[n->ats_count].value = htonl (0);
1429 #if DEBUG_CORE_CLIENT
1430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1431               "NOTIFY_CONNECT");
1432 #endif
1433   cnm->peer = n->peer;
1434   GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header);
1435   return GNUNET_OK;
1436 }
1437
1438
1439 /**
1440  * End the session with the given peer (we are no longer
1441  * connected). 
1442  *
1443  * @param pid identity of peer to kill session with
1444  */
1445 void
1446 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
1447 {
1448 }
1449
1450
1451 /**
1452  * Traffic is being solicited for the given peer.  This means that the
1453  * message queue on the transport-level (NEIGHBOURS subsystem) is now
1454  * empty and it is now OK to transmit another (non-control) message.
1455  *
1456  * @param pid identity of peer ready to receive data
1457  */
1458 void
1459 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
1460 {
1461 }
1462
1463
1464 /**
1465  * Transmit a message to a particular peer.
1466  *
1467  * @param car original request that was queued and then solicited,
1468  *            ownership does not change (dequeue will be called soon).
1469  * @param msg message to transmit
1470  */
1471 void
1472 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1473                        const struct GNUNET_MessageHeader *msg)
1474 {
1475 }
1476
1477
1478 /**
1479  * We have a new client, notify it about all current sessions.
1480  *
1481  * @param client the new client
1482  */
1483 void
1484 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1485 {
1486 }
1487
1488
1489 /**
1490  * Handle CORE_ITERATE_PEERS request. For this request type, the client
1491  * does not have to have transmitted an INIT request.  All current peers
1492  * are returned, regardless of which message types they accept. 
1493  *
1494  * @param cls unused
1495  * @param client client sending the iteration request
1496  * @param message iteration request message
1497  */
1498 void
1499 GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client,
1500                                           const struct GNUNET_MessageHeader *message)
1501 {
1502   struct GNUNET_MessageHeader done_msg;
1503   struct GNUNET_SERVER_TransmitContext *tc;
1504
1505   tc = GNUNET_SERVER_transmit_context_create (client);
1506   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message,
1507                                          tc);
1508   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1509   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1510   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1511   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1512 }
1513
1514
1515 /**
1516  * Handle CORE_PEER_CONNECTED request.   Notify client about connection
1517  * to the given neighbour.  For this request type, the client does not
1518  * have to have transmitted an INIT request.  All current peers are
1519  * returned, regardless of which message types they accept.
1520  *
1521  * @param cls unused
1522  * @param client client sending the iteration request
1523  * @param message iteration request message
1524  */
1525 void
1526 GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client,
1527                                       const struct GNUNET_MessageHeader *message)
1528 {
1529   struct GNUNET_MessageHeader done_msg;
1530   struct GNUNET_SERVER_TransmitContext *tc;
1531   const struct GNUNET_PeerIdentity *peer;
1532
1533   peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK!
1534   tc = GNUNET_SERVER_transmit_context_create (client);
1535   GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey,
1536                                               &queue_connect_message, tc);
1537   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1538   done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
1539   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
1540   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
1541 }
1542
1543
1544
1545 /**
1546  * Handle REQUEST_INFO request. For this request type, the client must
1547  * have transmitted an INIT first.
1548  *
1549  * @param cls unused
1550  * @param client client sending the request
1551  * @param message iteration request message
1552  */
1553 void
1554 GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client,
1555                                          const struct GNUNET_MessageHeader *message)
1556 {
1557   const struct RequestInfoMessage *rcm;
1558   struct GSC_Client *pos;
1559   struct Neighbour *n;
1560   struct ConfigurationInfoMessage cim;
1561   int32_t want_reserv;
1562   int32_t got_reserv;
1563   unsigned long long old_preference;
1564   struct GNUNET_TIME_Relative rdelay;
1565
1566   rdelay = GNUNET_TIME_relative_get_zero ();
1567 #if DEBUG_CORE_CLIENT
1568   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n",
1569               "REQUEST_INFO");
1570 #endif
1571   rcm = (const struct RequestInfoMessage *) message;
1572   n = find_neighbour (&rcm->peer);
1573   memset (&cim, 0, sizeof (cim));
1574   if ((n != NULL) && (GNUNET_YES == n->is_connected))
1575   {
1576     want_reserv = ntohl (rcm->reserve_inbound);
1577     if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
1578     {
1579       n->bw_out_internal_limit = rcm->limit_outbound;
1580       if (n->bw_out.value__ !=
1581           GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1582                                       n->bw_out_external_limit).value__)
1583       {
1584         n->bw_out =
1585             GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
1586                                         n->bw_out_external_limit);
1587         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
1588                                                n->bw_out);
1589         GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1590         handle_peer_status_change (n);
1591       }
1592     }
1593     if (want_reserv < 0)
1594     {
1595       got_reserv = want_reserv;
1596     }
1597     else if (want_reserv > 0)
1598     {
1599       rdelay =
1600           GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
1601                                               want_reserv);
1602       if (rdelay.rel_value == 0)
1603         got_reserv = want_reserv;
1604       else
1605         got_reserv = 0;         /* all or nothing */
1606     }
1607     else
1608       got_reserv = 0;
1609     GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv);
1610     old_preference = n->current_preference;
1611     n->current_preference += GNUNET_ntohll (rcm->preference_change);
1612     if (old_preference > n->current_preference)
1613     {
1614       /* overflow; cap at maximum value */
1615       n->current_preference = ULLONG_MAX;
1616     }
1617     update_preference_sum (n->current_preference - old_preference);
1618 #if DEBUG_CORE_QUOTA
1619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620                 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n",
1621                 (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv,
1622                 (unsigned long long) rdelay.rel_value);
1623 #endif
1624     cim.reserved_amount = htonl (got_reserv);
1625     cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
1626     cim.bw_out = n->bw_out;
1627     cim.preference = n->current_preference;
1628   }
1629   else
1630   {
1631     /* Technically, this COULD happen (due to asynchronous behavior),
1632      * but it should be rare, so we should generate an info event
1633      * to help diagnosis of serious errors that might be masked by this */
1634     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1635                 _
1636                 ("Client asked for preference change with peer `%s', which is not connected!\n"),
1637                 GNUNET_i2s (&rcm->peer));
1638     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1639     return;
1640   }
1641   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
1642   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
1643   cim.peer = rcm->peer;
1644   cim.rim_id = rcm->rim_id;
1645 #if DEBUG_CORE_CLIENT
1646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
1647               "CONFIGURATION_INFO");
1648 #endif
1649   GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO);
1650   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1651 }
1652
1653
1654 /**
1655  * Create a session, a key exchange was just completed.
1656  *
1657  * @param peer peer that is now connected
1658  * @param kx key exchange that completed
1659  */
1660 void
1661 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
1662                      struct GSC_KeyExchangeInfo *kx)
1663 {
1664     {
1665       struct GNUNET_MessageHeader *hdr;
1666
1667       hdr = compute_type_map_message ();
1668       send_type_map_to_neighbour (hdr, &n->peer.hashPubKey, n);
1669       GNUNET_free (hdr);
1670     }
1671     if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
1672     {
1673       n->bw_out_external_limit = t.inbound_bw_limit;
1674       n->bw_out =
1675           GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1676                                       n->bw_out_internal_limit);
1677       GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1678                                              n->bw_out);
1679       GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1680     }
1681 #if DEBUG_CORE
1682     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                 "Confirmed key via `%s' message for peer `%4s'\n", "PONG",
1684                 GNUNET_i2s (&n->peer));
1685 #endif
1686
1687
1688     size =
1689         sizeof (struct ConnectNotifyMessage) +
1690         (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1691     if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1692     {
1693       GNUNET_break (0);
1694       /* recovery strategy: throw away performance data */
1695       GNUNET_array_grow (n->ats, n->ats_count, 0);
1696       size =
1697           sizeof (struct PeerStatusNotifyMessage) +
1698           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
1699     }
1700     cnm = (struct ConnectNotifyMessage *) buf;
1701     cnm->header.size = htons (size);
1702     cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
1703     cnm->ats_count = htonl (n->ats_count);
1704     cnm->peer = n->peer;
1705     mats = &cnm->ats;
1706     memcpy (mats, n->ats,
1707             n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
1708     mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
1709     mats[n->ats_count].value = htonl (0);
1710     send_to_all_clients (&cnm->header, GNUNET_NO,
1711                          GNUNET_CORE_OPTION_SEND_CONNECT);
1712     process_encrypted_neighbour_queue (n);
1713     n->last_activity = GNUNET_TIME_absolute_get ();
1714
1715   if (n->status == PEER_STATE_KEY_CONFIRMED)
1716   {
1717     now = GNUNET_TIME_absolute_get ();
1718     n->last_activity = now;
1719     changed = GNUNET_YES;
1720     if (!up)
1721     {
1722       GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
1723                                 1, GNUNET_NO);
1724       n->time_established = now;
1725     }
1726     if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
1727       GNUNET_SCHEDULER_cancel (n->keep_alive_task);
1728     n->keep_alive_task =
1729         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide
1730                                       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1731                                        2), &send_keep_alive, n);
1732   }
1733
1734
1735 }
1736
1737
1738 /**
1739  * Update information about a session.
1740  *
1741  * @param peer peer who's session should be updated
1742  * @param bw_out new outbound bandwidth limit for the peer
1743  * @param atsi performance information
1744  * @param atsi_count number of performance records supplied
1745  */
1746 void
1747 GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer,
1748                      struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1749                      const struct GNUNET_TRANSPORT_ATS_Information *atsi,
1750                      uint32_t atsi_count)
1751 {
1752   if (bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
1753   {
1754 #if DEBUG_CORE_SET_QUOTA
1755     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1756                 "Received %u b/s as new inbound limit for peer `%4s'\n",
1757                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
1758                 GNUNET_i2s (&n->peer));
1759 #endif
1760     n->bw_out_external_limit = pt->inbound_bw_limit;
1761     n->bw_out =
1762         GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
1763                                     n->bw_out_internal_limit);
1764     GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
1765                                            n->bw_out);
1766     GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
1767   }
1768
1769 }
1770
1771
1772 /**
1773  * Initialize sessions subsystem.
1774  */
1775 int
1776 GSC_SESSIONS_init ()
1777 {
1778   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
1779   self.public_key = &my_public_key;
1780   self.peer = my_identity;
1781   self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
1782   self.status = PEER_STATE_KEY_CONFIRMED;
1783   self.is_connected = GNUNET_YES;
1784   return GNUNET_OK;
1785 }
1786
1787
1788 /**
1789  * Shutdown sessions subsystem.
1790  */
1791 void
1792 GSC_SESSIONS_done ()
1793 {
1794   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
1795                                          NULL);
1796   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1797   neighbours = NULL;
1798   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1799                          0, GNUNET_NO);
1800 }