2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file core/gnunet-service-core_neighbours.c
23 * @brief code for managing of 'encrypted' sessions (key exchange done)
24 * @author Christian Grothoff
27 #include "gnunet_service_core.h"
28 #include "gnunet_service_core_neighbours.h"
29 #include "gnunet_service_core_kx.h"
30 #include "gnunet_service_core_sessions.h"
33 * Record kept for each request for transmission issued by a
34 * client that is still pending.
36 struct ClientActiveRequest;
39 * Data kept per session.
44 * Identity of the other peer.
46 struct GNUNET_PeerIdentity peer;
49 * Head of list of requests from clients for transmission to
52 struct ClientActiveRequest *active_client_request_head;
55 * Tail of list of requests from clients for transmission to
58 struct ClientActiveRequest *active_client_request_tail;
61 * Performance data for the peer.
63 struct GNUNET_TRANSPORT_ATS_Information *ats;
66 * Information about the key exchange with the other peer.
68 struct GSC_KeyExchangeInfo *kxinfo;
72 * ID of task used for cleaning up dead neighbour entries.
74 GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
77 * ID of task used for updating bandwidth quota for this neighbour.
79 GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
82 * At what time did we initially establish (as in, complete session
83 * key handshake) this connection? Should be zero if status != KEY_CONFIRMED.
85 struct GNUNET_TIME_Absolute time_established;
88 * At what time did we last receive an encrypted message from the
89 * other peer? Should be zero if status != KEY_CONFIRMED.
91 struct GNUNET_TIME_Absolute last_activity;
94 * How valueable were the messages of this peer recently?
96 unsigned long long current_preference;
99 * Number of entries in 'ats'.
101 unsigned int ats_count;
104 * Bit map indicating which of the 32 sequence numbers before the last
105 * were received (good for accepting out-of-order packets and
106 * estimating reliability of the connection)
108 unsigned int last_packets_bitmap;
111 * last sequence number received on this connection (highest)
113 uint32_t last_sequence_number_received;
116 * last sequence number transmitted
118 uint32_t last_sequence_number_sent;
121 * Available bandwidth in for this peer (current target).
123 struct GNUNET_BANDWIDTH_Value32NBO bw_in;
126 * Available bandwidth out for this peer (current target).
128 struct GNUNET_BANDWIDTH_Value32NBO bw_out;
131 * Internal bandwidth limit set for this peer (initially typically
132 * set to "-1"). Actual "bw_out" is MIN of
133 * "bpm_out_internal_limit" and "bw_out_external_limit".
135 struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
138 * External bandwidth limit set for this peer by the
139 * peer that we are communicating with. "bw_out" is MIN of
140 * "bw_out_internal_limit" and "bw_out_external_limit".
142 struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
148 * Map of peer identities to 'struct Session'.
150 static struct GNUNET_CONTAINER_MultiHashMap *sessions;
154 * Session entry for "this" peer.
156 static struct Session self;
159 * Sum of all preferences among all neighbours.
161 static unsigned long long preference_sum;
167 * At what time should the connection to the given neighbour
168 * time out (given no further activity?)
170 * @param n neighbour in question
171 * @return absolute timeout
173 static struct GNUNET_TIME_Absolute
174 get_neighbour_timeout (struct Neighbour *n)
176 return GNUNET_TIME_absolute_add (n->last_activity,
177 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
182 * Helper function for update_preference_sum.
185 update_preference (void *cls, const GNUNET_HashCode * key, void *value)
187 unsigned long long *ps = cls;
188 struct Neighbour *n = value;
190 n->current_preference /= 2;
191 *ps += n->current_preference;
197 * A preference value for a neighbour was update. Update
198 * the preference sum accordingly.
200 * @param inc how much was a preference value increased?
203 update_preference_sum (unsigned long long inc)
205 unsigned long long os;
208 preference_sum += inc;
209 if (preference_sum >= os)
211 /* overflow! compensate by cutting all values in half! */
213 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference,
215 GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"),
216 preference_sum, GNUNET_NO);
221 * Find the entry for the given neighbour.
223 * @param peer identity of the neighbour
224 * @return NULL if we are not connected, otherwise the
227 static struct Neighbour *
228 find_neighbour (const struct GNUNET_PeerIdentity *peer)
230 return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
235 * Function called by transport telling us that a peer
238 * @param n the peer that changed status
241 handle_peer_status_change (struct Neighbour *n)
243 struct PeerStatusNotifyMessage *psnm;
244 char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
245 struct GNUNET_TRANSPORT_ATS_Information *ats;
248 if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED))
251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n",
252 GNUNET_i2s (&n->peer));
255 sizeof (struct PeerStatusNotifyMessage) +
256 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
257 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
260 /* recovery strategy: throw away performance data */
261 GNUNET_array_grow (n->ats, n->ats_count, 0);
263 sizeof (struct PeerStatusNotifyMessage) +
264 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
266 psnm = (struct PeerStatusNotifyMessage *) buf;
267 psnm->header.size = htons (size);
268 psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE);
269 psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n));
270 psnm->bandwidth_in = n->bw_in;
271 psnm->bandwidth_out = n->bw_out;
272 psnm->peer = n->peer;
273 psnm->ats_count = htonl (n->ats_count);
276 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
277 ats[n->ats_count].type = htonl (0);
278 ats[n->ats_count].value = htonl (0);
279 send_to_all_clients (&psnm->header, GNUNET_YES,
280 GNUNET_CORE_OPTION_SEND_STATUS_CHANGE);
281 GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1,
288 * Go over our message queue and if it is not too long, go
289 * over the pending requests from clients for this
290 * neighbour and send some clients a 'READY' notification.
292 * @param n which peer to process
295 schedule_peer_messages (struct Neighbour *n)
297 struct ClientActiveRequest *car;
298 struct ClientActiveRequest *pos;
300 struct MessageEntry *mqe;
301 unsigned int queue_size;
303 /* check if neighbour queue is empty enough! */
313 if (queue_size >= MAX_PEER_QUEUE_SIZE)
315 #if DEBUG_CORE_CLIENT
316 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
317 "Not considering client transmission requests: queue full\n");
319 return; /* queue still full */
321 /* find highest priority request */
322 pos = n->active_client_request_head;
326 if ((car == NULL) || (pos->priority > car->priority))
333 car = n->active_client_request_head;
336 return; /* no pending requests */
337 #if DEBUG_CORE_CLIENT
338 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339 "Permitting client transmission request to `%s'\n",
340 GNUNET_i2s (&n->peer));
342 GSC_CLIENTS_solicite_request (car);
348 * Free the given entry for the neighbour (it has
349 * already been removed from the list at this point).
351 * @param n neighbour to free
354 free_neighbour (struct Neighbour *n)
356 struct MessageEntry *m;
357 struct ClientActiveRequest *car;
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
361 "Destroying neighbour entry for peer `%4s'\n",
362 GNUNET_i2s (&n->peer));
366 GNUNET_free (n->skm);
369 while (NULL != (m = n->messages))
371 n->messages = m->next;
374 while (NULL != (m = n->encrypted_head))
376 GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
379 while (NULL != (car = n->active_client_request_head))
381 GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
382 n->active_client_request_tail, car);
383 GNUNET_assert (GNUNET_YES ==
384 GNUNET_CONTAINER_multihashmap_remove (car->client->requests,
391 GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
394 if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
395 GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
396 if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
397 GNUNET_SCHEDULER_cancel (n->quota_update_task);
398 if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
399 GNUNET_SCHEDULER_cancel (n->keep_alive_task);
400 if (n->status == PEER_STATE_KEY_CONFIRMED)
401 GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
403 GNUNET_array_grow (n->ats, n->ats_count, 0);
404 GNUNET_free_non_null (n->pending_ping);
405 GNUNET_free_non_null (n->pending_pong);
412 * Consider freeing the given neighbour since we may not need
413 * to keep it around anymore.
415 * @param n neighbour to consider discarding
418 consider_free_neighbour (struct Neighbour *n);
422 * Task triggered when a neighbour entry might have gotten stale.
424 * @param cls the 'struct Neighbour'
425 * @param tc scheduler context (not used)
428 consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
430 struct Neighbour *n = cls;
432 n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
433 consider_free_neighbour (n);
438 * Consider freeing the given neighbour since we may not need
439 * to keep it around anymore.
441 * @param n neighbour to consider discarding
444 consider_free_neighbour (struct Neighbour *n)
446 struct GNUNET_TIME_Relative left;
448 if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected))
449 return; /* no chance */
451 left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
452 if (left.rel_value > 0)
454 if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
455 GNUNET_SCHEDULER_cancel (n->dead_clean_task);
457 GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n);
460 /* actually free the neighbour... */
461 GNUNET_assert (GNUNET_YES ==
462 GNUNET_CONTAINER_multihashmap_remove (neighbours,
463 &n->peer.hashPubKey, n));
464 GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
465 GNUNET_CONTAINER_multihashmap_size (neighbours),
472 * Function called when the transport service is ready to
473 * receive an encrypted message for the respective peer
475 * @param cls neighbour to use message from
476 * @param size number of bytes we can transmit
477 * @param buf where to copy the message
478 * @return number of bytes transmitted
481 notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
483 struct Neighbour *n = cls;
484 struct MessageEntry *m;
489 m = n->encrypted_head;
493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
495 GNUNET_i2s (&n->peer));
499 GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
504 GNUNET_assert (size >= m->size);
505 memcpy (cbuf, &m[1], m->size);
507 GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510 "Copied message of type %u and size %u into transport buffer for `%4s'\n",
512 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
513 (unsigned int) ret, GNUNET_i2s (&n->peer));
515 process_encrypted_neighbour_queue (n);
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "Transmission of message of type %u and size %u failed\n",
523 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
524 (unsigned int) m->size);
528 consider_free_neighbour (n);
529 GNUNET_STATISTICS_update (stats,
531 ("# encrypted bytes given to transport"), ret,
541 * Select messages for transmission. This heuristic uses a combination
542 * of earliest deadline first (EDF) scheduling (with bounded horizon)
543 * and priority-based discard (in case no feasible schedule exist) and
544 * speculative optimization (defer any kind of transmission until
545 * we either create a batch of significant size, 25% of max, or until
546 * we are close to a deadline). Furthermore, when scheduling the
547 * heuristic also packs as many messages into the batch as possible,
548 * starting with those with the earliest deadline. Yes, this is fun.
550 * @param n neighbour to select messages from
551 * @param size number of bytes to select for transmission
552 * @param retry_time set to the time when we should try again
553 * (only valid if this function returns zero)
554 * @return number of bytes selected, or 0 if we decided to
555 * defer scheduling overall; in that case, retry_time is set.
558 select_messages (struct Neighbour *n, size_t size,
559 struct GNUNET_TIME_Relative *retry_time)
561 struct MessageEntry *pos;
562 struct MessageEntry *min;
563 struct MessageEntry *last;
564 unsigned int min_prio;
565 struct GNUNET_TIME_Absolute t;
566 struct GNUNET_TIME_Absolute now;
567 struct GNUNET_TIME_Relative delta;
569 struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */
572 unsigned int queue_size;
573 int discard_low_prio;
575 GNUNET_assert (NULL != n->messages);
576 now = GNUNET_TIME_absolute_get ();
577 /* last entry in linked list of messages processed */
579 /* should we remove the entry with the lowest
580 * priority from consideration for scheduling at the
581 * end of the loop? */
591 discard_low_prio = GNUNET_YES;
592 while (GNUNET_YES == discard_low_prio)
596 discard_low_prio = GNUNET_NO;
597 /* calculate number of bytes available for transmission at time "t" */
598 avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
600 /* how many bytes have we (hypothetically) scheduled so far */
602 /* maximum time we can wait before transmitting anything
603 * and still make all of our deadlines */
604 slack = GNUNET_TIME_UNIT_FOREVER_REL;
606 /* note that we use "*2" here because we want to look
607 * a bit further into the future; much more makes no
608 * sense since new message might be scheduled in the
610 while ((pos != NULL) && (off < size * 2))
612 if (pos->do_transmit == GNUNET_YES)
614 /* already removed from consideration */
618 if (discard_low_prio == GNUNET_NO)
620 delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
621 if (delta.rel_value > 0)
623 // FIXME: HUH? Check!
626 GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
628 if (avail < pos->size)
630 // FIXME: HUH? Check!
631 discard_low_prio = GNUNET_YES; /* we could not schedule this one! */
636 /* update slack, considering both its absolute deadline
637 * and relative deadlines caused by other messages
638 * with their respective load */
640 GNUNET_TIME_relative_min (slack,
641 GNUNET_BANDWIDTH_value_get_delay_for
643 if (pos->deadline.abs_value <= now.abs_value)
646 slack = GNUNET_TIME_UNIT_ZERO;
648 else if (GNUNET_YES == pos->got_slack)
650 /* should be soon now! */
652 GNUNET_TIME_relative_min (slack,
653 GNUNET_TIME_absolute_get_remaining
654 (pos->slack_deadline));
659 GNUNET_TIME_relative_min (slack,
660 GNUNET_TIME_absolute_get_difference
661 (now, pos->deadline));
662 pos->got_slack = GNUNET_YES;
663 pos->slack_deadline =
664 GNUNET_TIME_absolute_min (pos->deadline,
665 GNUNET_TIME_relative_to_absolute
666 (GNUNET_CONSTANTS_MAX_CORK_DELAY));
671 t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
672 if (pos->priority <= min_prio)
674 /* update min for discard */
675 min_prio = pos->priority;
680 if (discard_low_prio)
682 GNUNET_assert (min != NULL);
683 /* remove lowest-priority entry from consideration */
684 min->do_transmit = GNUNET_YES; /* means: discard (for now) */
688 /* guard against sending "tiny" messages with large headers without
689 * urgent deadlines */
690 if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
691 (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
693 /* less than 25% of message would be filled with deadlines still
694 * being met if we delay by one second or more; so just wait for
695 * more data; but do not wait longer than 1s (since we don't want
696 * to delay messages for a really long time either). */
697 *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
698 /* reset do_transmit values for next time */
701 pos->do_transmit = GNUNET_NO;
704 GNUNET_STATISTICS_update (stats,
706 ("# transmissions delayed due to corking"), 1,
709 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
711 (unsigned long long) retry_time->rel_value, (unsigned int) off,
712 (unsigned int) size);
716 /* select marked messages (up to size) for transmission */
721 if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
723 pos->do_transmit = GNUNET_YES; /* mark for transmission */
727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728 "Selecting message of size %u for transmission\n",
729 (unsigned int) pos->size);
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
737 (unsigned int) pos->size, size);
739 pos->do_transmit = GNUNET_NO; /* mark for not transmitting! */
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745 "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
746 (unsigned long long) off, (unsigned long long) tsize, queue_size,
747 (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
754 * Batch multiple messages into a larger buffer.
756 * @param n neighbour to take messages from
757 * @param buf target buffer
758 * @param size size of buf
759 * @param deadline set to transmission deadline for the result
760 * @param retry_time set to the time when we should try again
761 * (only valid if this function returns zero)
762 * @param priority set to the priority of the batch
763 * @return number of bytes written to buf (can be zero)
766 batch_message (struct Neighbour *n, char *buf, size_t size,
767 struct GNUNET_TIME_Absolute *deadline,
768 struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
770 char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
771 struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
772 struct MessageEntry *pos;
773 struct MessageEntry *prev;
774 struct MessageEntry *next;
779 *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
780 *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
781 if (0 == select_messages (n, size, retry_time))
784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785 "No messages selected, will try again in %llu ms\n",
786 retry_time->rel_value);
790 ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
791 ntm->ats_count = htonl (0);
792 ntm->ats.type = htonl (0);
793 ntm->ats.value = htonl (0);
797 while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
800 if (GNUNET_YES == pos->do_transmit)
802 GNUNET_assert (pos->size <= size);
803 /* do notifications */
804 /* FIXME: track if we have *any* client that wants
805 * full notifications and only do this if that is
808 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
810 memcpy (&ntm[1], &pos[1], pos->size);
812 htons (sizeof (struct NotifyTrafficMessage) +
813 sizeof (struct GNUNET_MessageHeader));
814 send_to_all_clients (&ntm->header, GNUNET_YES,
815 GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
819 /* message too large for 'full' notifications, we do at
820 * least the 'hdr' type */
821 memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
824 htons (sizeof (struct NotifyTrafficMessage) + pos->size);
825 send_to_all_clients (&ntm->header, GNUNET_YES,
826 GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
828 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829 "Encrypting %u bytes with message of type %u and size %u\n",
832 ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
834 ntohs (((const struct GNUNET_MessageHeader *)
837 /* copy for encrypted transmission */
838 memcpy (&buf[ret], &pos[1], pos->size);
841 *priority += pos->priority;
843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844 "Adding plaintext message of size %u with deadline %llu ms to batch\n",
845 (unsigned int) pos->size,
847 GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
849 deadline->abs_value =
850 GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865 "Deadline for message batch is %llu ms\n",
866 GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
873 * Remove messages with deadlines that have long expired from
876 * @param n neighbour to inspect
879 discard_expired_messages (struct Neighbour *n)
881 struct MessageEntry *prev;
882 struct MessageEntry *next;
883 struct MessageEntry *pos;
884 struct GNUNET_TIME_Absolute now;
885 struct GNUNET_TIME_Relative delta;
887 unsigned int queue_length;
890 now = GNUNET_TIME_absolute_get ();
898 delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
899 if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
902 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
903 "Message is %llu ms past due, discarding.\n",
910 GNUNET_STATISTICS_update (stats,
912 ("# messages discarded (expired prior to transmission)"),
921 if ( (GNUNET_YES == disc) &&
922 (queue_length == MAX_PEER_QUEUE_SIZE) )
923 schedule_peer_messages (n);
928 * Signature of the main function of a task.
931 * @param tc context information (why was this task triggered now)
934 retry_plaintext_processing (void *cls,
935 const struct GNUNET_SCHEDULER_TaskContext *tc)
937 struct Neighbour *n = cls;
939 n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
940 process_plaintext_neighbour_queue (n);
945 * Check if we have plaintext messages for the specified neighbour
946 * pending, and if so, consider batching and encrypting them (and
947 * then trigger processing of the encrypted queue if needed).
949 * @param n neighbour to check.
952 process_plaintext_neighbour_queue (struct Neighbour *n)
954 char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)]; /* plaintext */
956 struct EncryptedMessage *em; /* encrypted message */
957 struct EncryptedMessage *ph; /* plaintext header */
958 struct MessageEntry *me;
959 unsigned int priority;
960 struct GNUNET_TIME_Absolute deadline;
961 struct GNUNET_TIME_Relative retry_time;
962 struct GNUNET_CRYPTO_AesInitializationVector iv;
963 struct GNUNET_CRYPTO_AuthKey auth_key;
965 if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
967 GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
968 n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
972 case PEER_STATE_DOWN:
975 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
977 GNUNET_i2s (&n->peer));
980 case PEER_STATE_KEY_SENT:
981 if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
982 n->retry_set_key_task =
983 GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
984 &set_key_retry_task, n);
986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
988 GNUNET_i2s (&n->peer));
991 case PEER_STATE_KEY_RECEIVED:
992 if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
993 n->retry_set_key_task =
994 GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
995 &set_key_retry_task, n);
997 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998 "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
999 GNUNET_i2s (&n->peer));
1002 case PEER_STATE_KEY_CONFIRMED:
1003 /* ready to continue */
1006 discard_expired_messages (n);
1007 if (n->messages == NULL)
1010 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011 "Plaintext message queue for `%4s' is empty.\n",
1012 GNUNET_i2s (&n->peer));
1014 return; /* no pending messages */
1016 if (n->encrypted_head != NULL)
1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
1021 GNUNET_i2s (&n->peer));
1023 return; /* wait for messages already encrypted to be
1024 * processed first! */
1026 ph = (struct EncryptedMessage *) pbuf;
1027 deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
1029 used = sizeof (struct EncryptedMessage);
1031 batch_message (n, &pbuf[used],
1032 GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
1033 &retry_time, &priority);
1034 if (used == sizeof (struct EncryptedMessage))
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "No messages selected for transmission to `%4s' at this time, will try again later.\n",
1039 GNUNET_i2s (&n->peer));
1041 /* no messages selected for sending, try again later... */
1042 n->retry_plaintext_task =
1043 GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
1047 GSC_KX_encrypt_and_transmit (n->kx,
1048 &pbuf[struct EncryptedMessage],
1049 used - sizeof (struct EncryptedMessage));
1050 schedule_peer_messages (n);
1057 * Check if we have encrypted messages for the specified neighbour
1058 * pending, and if so, check with the transport about sending them
1061 * @param n neighbour to check.
1064 process_encrypted_neighbour_queue (struct Neighbour *n)
1066 struct MessageEntry *m;
1069 return; /* request already pending */
1070 if (GNUNET_YES != n->is_connected)
1075 m = n->encrypted_head;
1078 /* encrypted queue empty, try plaintext instead */
1079 process_plaintext_neighbour_queue (n);
1083 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084 "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
1085 (unsigned int) m->size, GNUNET_i2s (&n->peer),
1086 (unsigned long long)
1087 GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
1090 GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
1092 GNUNET_TIME_absolute_get_remaining
1094 ¬ify_encrypted_transmit_ready,
1098 /* message request too large or duplicate request */
1100 /* discard encrypted message */
1101 GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
1103 process_encrypted_neighbour_queue (n);
1109 * Initialize a new 'struct Neighbour'.
1111 * @param pid ID of the new neighbour
1112 * @return handle for the new neighbour
1114 static struct Neighbour *
1115 create_neighbour (const struct GNUNET_PeerIdentity *pid)
1117 struct Neighbour *n;
1118 struct GNUNET_TIME_Absolute now;
1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122 "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid));
1124 n = GNUNET_malloc (sizeof (struct Neighbour));
1126 GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
1127 now = GNUNET_TIME_absolute_get ();
1128 n->encrypt_key_created = now;
1129 n->last_activity = now;
1130 n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
1131 n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1132 n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1133 n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
1134 n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
1136 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1137 GNUNET_assert (GNUNET_OK ==
1138 GNUNET_CONTAINER_multihashmap_put (neighbours,
1139 &n->peer.hashPubKey, n,
1140 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1141 GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
1142 GNUNET_CONTAINER_multihashmap_size (neighbours),
1144 neighbour_quota_update (n, NULL);
1145 consider_free_neighbour (n);
1152 * We have a new client, notify it about all current sessions.
1154 * @param client the new client
1157 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
1159 /* notify new client about existing neighbours */
1160 GNUNET_CONTAINER_multihashmap_iterate (neighbours,
1161 ¬ify_client_about_neighbour, client);
1166 * Queue a request from a client for transmission to a particular peer.
1168 * @param car request to queue; this handle is then shared between
1169 * the caller (CLIENTS subsystem) and SESSIONS and must not
1170 * be released by either until either 'GNUNET_SESSIONS_dequeue',
1171 * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
1172 * have been invoked on it
1175 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
1177 struct Neighbour *n; // FIXME: session...
1179 n = find_neighbour (&car->peer);
1180 if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1181 (n->status != PEER_STATE_KEY_CONFIRMED))
1183 /* neighbour must have disconnected since request was issued,
1184 * ignore (client will realize it once it processes the
1185 * disconnect notification) */
1186 #if DEBUG_CORE_CLIENT
1187 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188 "Dropped client request for transmission (am disconnected)\n");
1190 GNUNET_STATISTICS_update (stats,
1192 ("# send requests dropped (disconnected)"), 1,
1194 GSC_CLIENTS_reject_requests (car);
1197 #if DEBUG_CORE_CLIENT
1198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199 "Received client transmission request. queueing\n");
1201 GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
1202 n->active_client_request_tail, car);
1204 // schedule_peer_messages (n);
1209 * Dequeue a request from a client from transmission to a particular peer.
1211 * @param car request to dequeue; this handle will then be 'owned' by
1212 * the caller (CLIENTS sysbsystem)
1215 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
1219 s = find_session (&car->peer);
1220 GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
1221 s->active_client_request_tail, car);
1227 * Transmit a message to a particular peer.
1229 * @param car original request that was queued and then solicited;
1230 * this handle will now be 'owned' by the SESSIONS subsystem
1231 * @param msg message to transmit
1234 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
1235 const struct GNUNET_MessageHeader *msg)
1237 struct MessageEntry *prev;
1238 struct MessageEntry *pos;
1239 struct MessageEntry *e;
1240 struct MessageEntry *min_prio_entry;
1241 struct MessageEntry *min_prio_prev;
1242 unsigned int min_prio;
1243 unsigned int queue_size;
1245 n = find_neighbour (&sm->peer);
1246 if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
1247 (n->status != PEER_STATE_KEY_CONFIRMED))
1249 /* attempt to send message to peer that is not connected anymore
1250 * (can happen due to asynchrony) */
1251 GNUNET_STATISTICS_update (stats,
1253 ("# messages discarded (disconnected)"), 1,
1256 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261 "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
1262 "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
1264 discard_expired_messages (n);
1265 /* bound queue size */
1266 /* NOTE: this entire block to bound the queue size should be
1267 * obsolete with the new client-request code and the
1268 * 'schedule_peer_messages' mechanism; we still have this code in
1269 * here for now as a sanity check for the new mechanmism;
1270 * ultimately, we should probably simply reject SEND messages that
1271 * are not 'approved' (or provide a new core API for very unreliable
1272 * delivery that always sends with priority 0). Food for thought. */
1273 min_prio = UINT32_MAX;
1274 min_prio_entry = NULL;
1275 min_prio_prev = NULL;
1281 if (pos->priority <= min_prio)
1283 min_prio_entry = pos;
1284 min_prio_prev = prev;
1285 min_prio = pos->priority;
1291 if (queue_size >= MAX_PEER_QUEUE_SIZE)
1294 if (ntohl (sm->priority) <= min_prio)
1296 /* discard new entry; this should no longer happen! */
1299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300 "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
1301 queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
1302 (unsigned int) msize, (unsigned int) ntohs (message->type));
1304 GNUNET_STATISTICS_update (stats,
1305 gettext_noop ("# discarded CORE_SEND requests"),
1309 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1312 GNUNET_assert (min_prio_entry != NULL);
1313 /* discard "min_prio_entry" */
1315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316 "Queue full, discarding existing older request\n");
1318 GNUNET_STATISTICS_update (stats,
1320 ("# discarded lower priority CORE_SEND requests"),
1322 if (min_prio_prev == NULL)
1323 n->messages = min_prio_entry->next;
1325 min_prio_prev->next = min_prio_entry->next;
1326 GNUNET_free (min_prio_entry);
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "Adding transmission request for `%4s' of size %u to queue\n",
1332 GNUNET_i2s (&sm->peer), (unsigned int) msize);
1334 e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1335 e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1336 e->priority = ntohl (sm->priority);
1338 if (GNUNET_YES != (int) ntohl (sm->cork))
1339 e->got_slack = GNUNET_YES;
1340 memcpy (&e[1], &sm[1], msize);
1342 /* insert, keep list sorted by deadline */
1345 while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
1356 /* consider scheduling now */
1357 process_plaintext_neighbour_queue (n);
1364 GSC_NEIGHBOURS_init ()
1366 neighbours = GNUNET_CONTAINER_multihashmap_create (128);
1367 self.public_key = &my_public_key;
1368 self.peer = my_identity;
1369 self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
1370 self.status = PEER_STATE_KEY_CONFIRMED;
1371 self.is_connected = GNUNET_YES;
1377 GSC_NEIGHBOURS_done ()
1379 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
1381 GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1383 GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),