From: Christian Grothoff Date: Fri, 7 Oct 2011 14:56:07 +0000 (+0000) Subject: hxing X-Git-Tag: initial-import-from-subversion-38251~16705 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=4c5201417431a8cb700dab32e339ded3d7363156;p=oweals%2Fgnunet.git hxing --- diff --git a/src/core/core.h b/src/core/core.h index 8c208a2ed..a1854ccb4 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -412,7 +412,7 @@ struct SendMessageReady /** * Client asking core to transmit a particular message to a particular - * target (responsde to GNUNET_MESSAGE_TYPE_CORE_SEND_READY). + * target (response to GNUNET_MESSAGE_TYPE_CORE_SEND_READY). */ struct SendMessage { diff --git a/src/core/gnunet-service-core-new.c b/src/core/gnunet-service-core-new.c index d094817a0..25cabde2e 100644 --- a/src/core/gnunet-service-core-new.c +++ b/src/core/gnunet-service-core-new.c @@ -22,6 +22,10 @@ * @file core/gnunet-service-core.c * @brief high-level P2P messaging * @author Christian Grothoff + * + * Not implemented: + * - peer status changes (PeerStatusNotifyMessage) [needed?] + * - ATS integration / bw allocation / preferences */ #include "platform.h" #include "gnunet_util_lib.h" diff --git a/src/core/gnunet-service-core.h b/src/core/gnunet-service-core.h index 9068e7c3d..2d5156efe 100644 --- a/src/core/gnunet-service-core.h +++ b/src/core/gnunet-service-core.h @@ -49,13 +49,13 @@ struct GSC_ClientActiveRequest * Active requests are kept in a doubly-linked list of * the respective target peer. */ - struct ClientActiveRequest *next; + struct GSC_ClientActiveRequest *next; /** * Active requests are kept in a doubly-linked list of * the respective target peer. */ - struct ClientActiveRequest *prev; + struct GSC_ClientActiveRequest *prev; /** * Which peer is the message going to be for? @@ -77,6 +77,11 @@ struct GSC_ClientActiveRequest */ uint32_t priority; + /** + * Has this request been solicited yet? + */ + int was_solicited; + /** * How many bytes does the client intend to send? */ diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c index 94fecb4ca..4a380e02d 100644 --- a/src/core/gnunet-service-core_clients.c +++ b/src/core/gnunet-service-core_clients.c @@ -340,6 +340,7 @@ handle_client_send_request (void *cls, struct GNUNET_SERVER_Client *client, car->priority = ntohl (req->priority); car->msize = ntohs (req->size); car->smr_id = req->smr_id; + car->was_solicited = GNUNET_NO; if (0 == memcmp (&req->peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) GSC_CLIENTS_solicit_request (car); @@ -440,7 +441,7 @@ client_tokenizer_callback (void *cls, void *client, GNUNET_CORE_OPTION_SEND_HDR_INBOUND | GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); } else - GSC_SESSIONS_transmit (car, message); + GSC_SESSIONS_transmit (car, message, GNUNET_NO /* FIXME: get cork flag form 'struct SendMessage'! */); } diff --git a/src/core/gnunet-service-core_neighbours.c b/src/core/gnunet-service-core_neighbours.c index bdcc81d0e..c3456ccf4 100644 --- a/src/core/gnunet-service-core_neighbours.c +++ b/src/core/gnunet-service-core_neighbours.c @@ -46,18 +46,18 @@ * Message ready for transmission via transport service. This struct * is followed by the actual content of the message. */ -struct MessageEntry +struct NeighbourMessageEntry { /** * We keep messages in a doubly linked list. */ - struct MessageEntry *next; + struct NeighbourMessageEntry *next; /** * We keep messages in a doubly linked list. */ - struct MessageEntry *prev; + struct NeighbourMessageEntry *prev; /** * By when are we supposed to transmit this message? @@ -84,13 +84,13 @@ struct Neighbour * Head of the batched message queue (already ordered, transmit * starting with the head). */ - struct MessageEntry *message_head; + struct NeighbourMessageEntry *message_head; /** * Tail of the batched message queue (already ordered, append new * messages to tail). */ - struct MessageEntry *message_tail; + struct NeighbourMessageEntry *message_tail; /** * Handle for pending requests for transmission to this peer @@ -159,7 +159,7 @@ find_neighbour (const struct GNUNET_PeerIdentity *peer) static void free_neighbour (struct Neighbour *n) { - struct MessageEntry *m; + struct NeighbourMessageEntry *m; #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -221,7 +221,7 @@ static size_t transmit_ready (void *cls, size_t size, void *buf) { struct Neighbour *n = cls; - struct MessageEntry *m; + struct NeighbourMessageEntry *m; size_t ret; char *cbuf; @@ -278,7 +278,7 @@ transmit_ready (void *cls, size_t size, void *buf) static void process_queue (struct Neighbour *n) { - struct MessageEntry *m; + struct NeighbourMessageEntry *m; if (n->th != NULL) return; /* request already pending */ @@ -468,11 +468,11 @@ handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer, * @param timeout by when should the transmission be done? */ void -GDS_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target, +GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target, const struct GNUNET_MessageHeader *msg, struct GNUNET_TIME_Relative timeout) { - struct MessageEntry *me; + struct NeighbourMessageEntry *me; struct Neighbour *n; size_t msize; @@ -483,7 +483,7 @@ GDS_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target, return; } msize = ntohs (msg->size); - me = GNUNET_malloc (sizeof (struct MessageEntry) + msize); + me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize); me->deadline = GNUNET_TIME_relative_to_absolute (timeout); me->size = msize; memcpy (&me[1], msg, msize); diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index 0d1fec2dd..b379c3ac7 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -27,1105 +27,245 @@ #include "gnunet-service-core.h" #include "gnunet-service-core_neighbours.h" #include "gnunet-service-core_kx.h" +#include "gnunet-service-core_typemap.h" #include "gnunet-service-core_sessions.h" +#include "gnunet-service-core_clients.h" +#include "gnunet_constants.h" -/** - * Record kept for each request for transmission issued by a - * client that is still pending. - */ -struct GSC_ClientActiveRequest; /** - * Data kept per session. + * Message ready for encryption. This struct is followed by the + * actual content of the message. */ -struct Session +struct SessionMessageEntry { - /** - * Identity of the other peer. - */ - struct GNUNET_PeerIdentity peer; /** - * Head of list of requests from clients for transmission to - * this peer. + * We keep messages in a doubly linked list. */ - struct GSC_ClientActiveRequest *active_client_request_head; + struct SessionMessageEntry *next; /** - * Tail of list of requests from clients for transmission to - * this peer. - */ - struct GSC_ClientActiveRequest *active_client_request_tail; - - /** - * Performance data for the peer. + * We keep messages in a doubly linked list. */ - struct GNUNET_TRANSPORT_ATS_Information *ats; + struct SessionMessageEntry *prev; /** - * Information about the key exchange with the other peer. + * Deadline for transmission, 1s after we received it (if we + * are not corking), otherwise "now". Note that this message + * does NOT expire past its deadline. */ - struct GSC_KeyExchangeInfo *kxinfo; - - /** - * ID of task used for cleaning up dead neighbour entries. - */ - GNUNET_SCHEDULER_TaskIdentifier dead_clean_task; + struct GNUNET_TIME_Absolute deadline; /** - * ID of task used for updating bandwidth quota for this neighbour. + * How long is the message? (number of bytes following the "struct + * MessageEntry", but not including the size of "struct + * MessageEntry" itself!) */ - GNUNET_SCHEDULER_TaskIdentifier quota_update_task; + size_t size; - /** - * At what time did we initially establish (as in, complete session - * key handshake) this connection? Should be zero if status != KEY_CONFIRMED. - */ - struct GNUNET_TIME_Absolute time_established; +}; - /** - * At what time did we last receive an encrypted message from the - * other peer? Should be zero if status != KEY_CONFIRMED. - */ - struct GNUNET_TIME_Absolute last_activity; +/** + * Data kept per session. + */ +struct Session +{ /** - * How valueable were the messages of this peer recently? + * Identity of the other peer. */ - unsigned long long current_preference; + struct GNUNET_PeerIdentity peer; /** - * Number of entries in 'ats'. + * Head of list of requests from clients for transmission to + * this peer. */ - unsigned int ats_count; + struct GSC_ClientActiveRequest *active_client_request_head; /** - * Available bandwidth in for this peer (current target). + * Tail of list of requests from clients for transmission to + * this peer. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_in; + struct GSC_ClientActiveRequest *active_client_request_tail; /** - * Available bandwidth out for this peer (current target). + * Head of list of messages ready for encryption. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out; + struct SessionMessageEntry *sme_head; /** - * Internal bandwidth limit set for this peer (initially typically - * set to "-1"). Actual "bw_out" is MIN of - * "bpm_out_internal_limit" and "bw_out_external_limit". + * Tail of list of messages ready for encryption. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit; + struct SessionMessageEntry *sme_tail; /** - * External bandwidth limit set for this peer by the - * peer that we are communicating with. "bw_out" is MIN of - * "bw_out_internal_limit" and "bw_out_external_limit". - */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit; - -}; - - -/** - * Map of peer identities to 'struct Session'. - */ -static struct GNUNET_CONTAINER_MultiHashMap *sessions; - - -/** - * Session entry for "this" peer. - */ -static struct Session self; - -/** - * Sum of all preferences among all neighbours. - */ -static unsigned long long preference_sum; - - -// FIXME......... - -/** - * At what time should the connection to the given neighbour - * time out (given no further activity?) - * - * @param n neighbour in question - * @return absolute timeout - */ -static struct GNUNET_TIME_Absolute -get_neighbour_timeout (struct Neighbour *n) -{ - return GNUNET_TIME_absolute_add (n->last_activity, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); -} - - -/** - * Helper function for update_preference_sum. - */ -static int -update_preference (void *cls, const GNUNET_HashCode * key, void *value) -{ - unsigned long long *ps = cls; - struct Neighbour *n = value; - - n->current_preference /= 2; - *ps += n->current_preference; - return GNUNET_OK; -} - - -/** - * A preference value for a neighbour was update. Update - * the preference sum accordingly. - * - * @param inc how much was a preference value increased? - */ -static void -update_preference_sum (unsigned long long inc) -{ - unsigned long long os; - - os = preference_sum; - preference_sum += inc; - if (preference_sum >= os) - return; /* done! */ - /* overflow! compensate by cutting all values in half! */ - preference_sum = 0; - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference, - &preference_sum); - GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), - preference_sum, GNUNET_NO); -} - - -/** - * Find the entry for the given neighbour. - * - * @param peer identity of the neighbour - * @return NULL if we are not connected, otherwise the - * neighbour's entry. - */ -static struct Neighbour * -find_neighbour (const struct GNUNET_PeerIdentity *peer) -{ - return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey); -} - - -/** - * Function called by transport telling us that a peer - * changed status. - * - * @param n the peer that changed status - */ -static void -handle_peer_status_change (struct Neighbour *n) -{ - struct PeerStatusNotifyMessage *psnm; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; - - if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED)) - return; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n", - GNUNET_i2s (&n->peer)); -#endif - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - psnm = (struct PeerStatusNotifyMessage *) buf; - psnm->header.size = htons (size); - psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE); - psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n)); - psnm->bandwidth_in = n->bw_in; - psnm->bandwidth_out = n->bw_out; - psnm->peer = n->peer; - psnm->ats_count = htonl (n->ats_count); - ats = &psnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (0); - ats[n->ats_count].value = htonl (0); - send_to_all_clients (&psnm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_STATUS_CHANGE); - GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1, - GNUNET_NO); -} - - - -/** - * Go over our message queue and if it is not too long, go - * over the pending requests from clients for this - * neighbour and send some clients a 'READY' notification. - * - * @param n which peer to process - */ -static void -schedule_peer_messages (struct Neighbour *n) -{ - struct GSC_ClientActiveRequest *car; - struct GSC_ClientActiveRequest *pos; - struct Client *c; - struct MessageEntry *mqe; - unsigned int queue_size; - - /* check if neighbour queue is empty enough! */ - if (n != &self) - { - queue_size = 0; - mqe = n->messages; - while (mqe != NULL) - { - queue_size++; - mqe = mqe->next; - } - if (queue_size >= MAX_PEER_QUEUE_SIZE) - { -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not considering client transmission requests: queue full\n"); -#endif - return; /* queue still full */ - } - /* find highest priority request */ - pos = n->active_client_request_head; - car = NULL; - while (pos != NULL) - { - if ((car == NULL) || (pos->priority > car->priority)) - car = pos; - pos = pos->next; - } - } - else - { - car = n->active_client_request_head; - } - if (car == NULL) - return; /* no pending requests */ -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Permitting client transmission request to `%s'\n", - GNUNET_i2s (&n->peer)); -#endif - GSC_CLIENTS_solicite_request (car); -} - - - -/** - * Free the given entry for the neighbour (it has - * already been removed from the list at this point). - * - * @param n neighbour to free - */ -static void -free_neighbour (struct Neighbour *n) -{ - struct MessageEntry *m; - struct GSC_ClientActiveRequest *car; - -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying neighbour entry for peer `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - if (n->skm != NULL) - { - GNUNET_free (n->skm); - n->skm = NULL; - } - while (NULL != (m = n->messages)) - { - n->messages = m->next; - GNUNET_free (m); - } - while (NULL != (m = n->encrypted_head)) - { - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - } - while (NULL != (car = n->active_client_request_head)) - { - GNUNET_CONTAINER_DLL_remove (n->active_client_request_head, - n->active_client_request_tail, car); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (car->client->requests, - &n->peer.hashPubKey, - car)); - GNUNET_free (car); - } - if (NULL != n->th) - { - GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th); - n->th = NULL; - } - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->quota_update_task); - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->keep_alive_task); - if (n->status == PEER_STATE_KEY_CONFIRMED) - GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), - -1, GNUNET_NO); - GNUNET_array_grow (n->ats, n->ats_count, 0); - GNUNET_free_non_null (n->pending_ping); - GNUNET_free_non_null (n->pending_pong); - GNUNET_free (n); -} - - - -/** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. - * - * @param n neighbour to consider discarding - */ -static void -consider_free_neighbour (struct Neighbour *n); - - -/** - * Task triggered when a neighbour entry might have gotten stale. - * - * @param cls the 'struct Neighbour' - * @param tc scheduler context (not used) - */ -static void -consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Neighbour *n = cls; - - n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK; - consider_free_neighbour (n); -} - - -/** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. - * - * @param n neighbour to consider discarding - */ -static void -consider_free_neighbour (struct Neighbour *n) -{ - struct GNUNET_TIME_Relative left; - - if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected)) - return; /* no chance */ - - left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n)); - if (left.rel_value > 0) - { - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->dead_clean_task); - n->dead_clean_task = - GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n); - return; - } - /* actually free the neighbour... */ - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (neighbours, - &n->peer.hashPubKey, n)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), - GNUNET_NO); - free_neighbour (n); -} - - -/** - * Function called when the transport service is ready to - * receive an encrypted message for the respective peer - * - * @param cls neighbour to use message from - * @param size number of bytes we can transmit - * @param buf where to copy the message - * @return number of bytes transmitted - */ -static size_t -notify_encrypted_transmit_ready (void *cls, size_t size, void *buf) -{ - struct Neighbour *n = cls; - struct MessageEntry *m; - size_t ret; - char *cbuf; - - n->th = NULL; - m = n->encrypted_head; - if (m == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue empty, no messages added to buffer for `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - return 0; - } - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - ret = 0; - cbuf = buf; - if (buf != NULL) - { - GNUNET_assert (size >= m->size); - memcpy (cbuf, &m[1], m->size); - ret = m->size; - GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Copied message of type %u and size %u into transport buffer for `%4s'\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) ret, GNUNET_i2s (&n->peer)); -#endif - process_encrypted_neighbour_queue (n); - } - else - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of message of type %u and size %u failed\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) m->size); -#endif - } - GNUNET_free (m); - consider_free_neighbour (n); - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# encrypted bytes given to transport"), ret, - GNUNET_NO); - return ret; -} - - - - - -/** - * Select messages for transmission. This heuristic uses a combination - * of earliest deadline first (EDF) scheduling (with bounded horizon) - * and priority-based discard (in case no feasible schedule exist) and - * speculative optimization (defer any kind of transmission until - * we either create a batch of significant size, 25% of max, or until - * we are close to a deadline). Furthermore, when scheduling the - * heuristic also packs as many messages into the batch as possible, - * starting with those with the earliest deadline. Yes, this is fun. - * - * @param n neighbour to select messages from - * @param size number of bytes to select for transmission - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @return number of bytes selected, or 0 if we decided to - * defer scheduling overall; in that case, retry_time is set. - */ -static size_t -select_messages (struct Neighbour *n, size_t size, - struct GNUNET_TIME_Relative *retry_time) -{ - struct MessageEntry *pos; - struct MessageEntry *min; - struct MessageEntry *last; - unsigned int min_prio; - struct GNUNET_TIME_Absolute t; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - uint64_t avail; - struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */ - size_t off; - uint64_t tsize; - unsigned int queue_size; - int discard_low_prio; - - GNUNET_assert (NULL != n->messages); - now = GNUNET_TIME_absolute_get (); - /* last entry in linked list of messages processed */ - last = NULL; - /* should we remove the entry with the lowest - * priority from consideration for scheduling at the - * end of the loop? */ - queue_size = 0; - tsize = 0; - pos = n->messages; - while (pos != NULL) - { - queue_size++; - tsize += pos->size; - pos = pos->next; - } - discard_low_prio = GNUNET_YES; - while (GNUNET_YES == discard_low_prio) - { - min = NULL; - min_prio = UINT_MAX; - discard_low_prio = GNUNET_NO; - /* calculate number of bytes available for transmission at time "t" */ - avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window); - t = now; - /* how many bytes have we (hypothetically) scheduled so far */ - off = 0; - /* maximum time we can wait before transmitting anything - * and still make all of our deadlines */ - slack = GNUNET_TIME_UNIT_FOREVER_REL; - pos = n->messages; - /* note that we use "*2" here because we want to look - * a bit further into the future; much more makes no - * sense since new message might be scheduled in the - * meantime... */ - while ((pos != NULL) && (off < size * 2)) - { - if (pos->do_transmit == GNUNET_YES) - { - /* already removed from consideration */ - pos = pos->next; - continue; - } - if (discard_low_prio == GNUNET_NO) - { - delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline); - if (delta.rel_value > 0) - { - // FIXME: HUH? Check! - t = pos->deadline; - avail += - GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta); - } - if (avail < pos->size) - { - // FIXME: HUH? Check! - discard_low_prio = GNUNET_YES; /* we could not schedule this one! */ - } - else - { - avail -= pos->size; - /* update slack, considering both its absolute deadline - * and relative deadlines caused by other messages - * with their respective load */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_BANDWIDTH_value_get_delay_for - (n->bw_out, avail)); - if (pos->deadline.abs_value <= now.abs_value) - { - /* now or never */ - slack = GNUNET_TIME_UNIT_ZERO; - } - else if (GNUNET_YES == pos->got_slack) - { - /* should be soon now! */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_remaining - (pos->slack_deadline)); - } - else - { - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_difference - (now, pos->deadline)); - pos->got_slack = GNUNET_YES; - pos->slack_deadline = - GNUNET_TIME_absolute_min (pos->deadline, - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_MAX_CORK_DELAY)); - } - } - } - off += pos->size; - t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check! - if (pos->priority <= min_prio) - { - /* update min for discard */ - min_prio = pos->priority; - min = pos; - } - pos = pos->next; - } - if (discard_low_prio) - { - GNUNET_assert (min != NULL); - /* remove lowest-priority entry from consideration */ - min->do_transmit = GNUNET_YES; /* means: discard (for now) */ - } - last = pos; - } - /* guard against sending "tiny" messages with large headers without - * urgent deadlines */ - if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) && - (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2)) - { - /* less than 25% of message would be filled with deadlines still - * being met if we delay by one second or more; so just wait for - * more data; but do not wait longer than 1s (since we don't want - * to delay messages for a really long time either). */ - *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY; - /* reset do_transmit values for next time */ - while (pos != last) - { - pos->do_transmit = GNUNET_NO; - pos = pos->next; - } - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# transmissions delayed due to corking"), 1, - GNUNET_NO); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n", - (unsigned long long) retry_time->rel_value, (unsigned int) off, - (unsigned int) size); -#endif - return 0; - } - /* select marked messages (up to size) for transmission */ - off = 0; - pos = n->messages; - while (pos != last) - { - if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO)) - { - pos->do_transmit = GNUNET_YES; /* mark for transmission */ - off += pos->size; - size -= pos->size; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selecting message of size %u for transmission\n", - (unsigned int) pos->size); -#endif - } - else - { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not selecting message of size %u for transmission at this time (maximum is %u)\n", - (unsigned int) pos->size, size); -#endif - pos->do_transmit = GNUNET_NO; /* mark for not transmitting! */ - } - pos = pos->next; - } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n", - (unsigned long long) off, (unsigned long long) tsize, queue_size, - (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer)); -#endif - return off; -} - - -/** - * Batch multiple messages into a larger buffer. - * - * @param n neighbour to take messages from - * @param buf target buffer - * @param size size of buf - * @param deadline set to transmission deadline for the result - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @param priority set to the priority of the batch - * @return number of bytes written to buf (can be zero) - */ -static size_t -batch_message (struct Neighbour *n, char *buf, size_t size, - struct GNUNET_TIME_Absolute *deadline, - struct GNUNET_TIME_Relative *retry_time, unsigned int *priority) -{ - char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb; - struct MessageEntry *pos; - struct MessageEntry *prev; - struct MessageEntry *next; - size_t ret; - - ret = 0; - *priority = 0; - *deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - *retry_time = GNUNET_TIME_UNIT_FOREVER_REL; - if (0 == select_messages (n, size, retry_time)) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected, will try again in %llu ms\n", - retry_time->rel_value); -#endif - return 0; - } - ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND); - ntm->ats_count = htonl (0); - ntm->ats.type = htonl (0); - ntm->ats.value = htonl (0); - ntm->peer = n->peer; - pos = n->messages; - prev = NULL; - while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader))) - { - next = pos->next; - if (GNUNET_YES == pos->do_transmit) - { - GNUNET_assert (pos->size <= size); - /* do notifications */ - /* FIXME: track if we have *any* client that wants - * full notifications and only do this if that is - * actually true */ - if (pos->size < - GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage)) - { - memcpy (&ntm[1], &pos[1], pos->size); - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); - } - else - { - /* message too large for 'full' notifications, we do at - * least the 'hdr' type */ - memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader)); - } - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + pos->size); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); -#if DEBUG_HANDSHAKE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypting %u bytes with message of type %u and size %u\n", - pos->size, - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type), - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) - &pos[1])->size)); -#endif - /* copy for encrypted transmission */ - memcpy (&buf[ret], &pos[1], pos->size); - ret += pos->size; - size -= pos->size; - *priority += pos->priority; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding plaintext message of size %u with deadline %llu ms to batch\n", - (unsigned int) pos->size, - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value); -#endif - deadline->abs_value = - GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value); - GNUNET_free (pos); - if (prev == NULL) - n->messages = next; - else - prev->next = next; - } - else - { - prev = pos; - } - pos = next; - } -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deadline for message batch is %llu ms\n", - GNUNET_TIME_absolute_get_remaining (*deadline).rel_value); -#endif - return ret; -} - - -/** - * Remove messages with deadlines that have long expired from - * the queue. - * - * @param n neighbour to inspect - */ -static void -discard_expired_messages (struct Neighbour *n) -{ - struct MessageEntry *prev; - struct MessageEntry *next; - struct MessageEntry *pos; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - int disc; - unsigned int queue_length; - - disc = GNUNET_NO; - now = GNUNET_TIME_absolute_get (); - prev = NULL; - queue_length = 0; - pos = n->messages; - while (pos != NULL) - { - queue_length++; - next = pos->next; - delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now); - if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Message is %llu ms past due, discarding.\n", - delta.rel_value); -#endif - if (prev == NULL) - n->messages = next; - else - prev->next = next; - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - disc = GNUNET_YES; - GNUNET_free (pos); - } - else - prev = pos; - pos = next; - } - if ( (GNUNET_YES == disc) && - (queue_length == MAX_PEER_QUEUE_SIZE) ) - schedule_peer_messages (n); -} - - -/** - * Signature of the main function of a task. - * - * @param cls closure - * @param tc context information (why was this task triggered now) - */ -static void -retry_plaintext_processing (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Neighbour *n = cls; - - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - process_plaintext_neighbour_queue (n); -} - - -/** - * Check if we have plaintext messages for the specified neighbour - * pending, and if so, consider batching and encrypting them (and - * then trigger processing of the encrypted queue if needed). - * - * @param n neighbour to check. - */ -static void -process_plaintext_neighbour_queue (struct Neighbour *n) -{ - char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)]; /* plaintext */ - size_t used; - struct EncryptedMessage *em; /* encrypted message */ - struct EncryptedMessage *ph; /* plaintext header */ - struct MessageEntry *me; - unsigned int priority; - struct GNUNET_TIME_Absolute deadline; - struct GNUNET_TIME_Relative retry_time; - struct GNUNET_CRYPTO_AesInitializationVector iv; - struct GNUNET_CRYPTO_AuthKey auth_key; - - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - } - switch (n->status) - { - case PEER_STATE_DOWN: - send_key (n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_SENT: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_RECEIVED: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_CONFIRMED: - /* ready to continue */ - break; - } - discard_expired_messages (n); - if (n->messages == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Plaintext message queue for `%4s' is empty.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* no pending messages */ - } - if (n->encrypted_head != NULL) - { -#if DEBUG_CORE > 2 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* wait for messages already encrypted to be - * processed first! */ - } - ph = (struct EncryptedMessage *) pbuf; - deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - priority = 0; - used = sizeof (struct EncryptedMessage); - used += - batch_message (n, &pbuf[used], - GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline, - &retry_time, &priority); - if (used == sizeof (struct EncryptedMessage)) - { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected for transmission to `%4s' at this time, will try again later.\n", - GNUNET_i2s (&n->peer)); -#endif - /* no messages selected for sending, try again later... */ - n->retry_plaintext_task = - GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing, - n); - return; - } - GSC_KX_encrypt_and_transmit (n->kx, - &pbuf[struct EncryptedMessage], - used - sizeof (struct EncryptedMessage)); - schedule_peer_messages (n); -} + * Information about the key exchange with the other peer. + */ + struct GSC_KeyExchangeInfo *kxinfo; + + /** + * Current type map for this peer. + */ + struct GSC_TypeMap *tmap; + + /** + * At what time did we initially establish this session? + * (currently unused, should be integrated with ATS in the + * future...). + */ + struct GNUNET_TIME_Absolute time_established; + + /** + * Task to transmit corked messages with a delay. + */ + GNUNET_SCHEDULER_TaskIdentifier cork_task; + + /** + * Is the neighbour queue empty and thus ready for us + * to transmit an encrypted message? + */ + int ready_to_transmit; + +}; +/** + * Map of peer identities to 'struct Session'. + */ +static struct GNUNET_CONTAINER_MultiHashMap *sessions; /** - * Check if we have encrypted messages for the specified neighbour - * pending, and if so, check with the transport about sending them - * out. + * Find the session for the given peer. * - * @param n neighbour to check. + * @param peer identity of the peer + * @return NULL if we are not connected, otherwise the + * session handle */ -static void -process_encrypted_neighbour_queue (struct Neighbour *n) +static struct Session * +find_session (const struct GNUNET_PeerIdentity *peer) { - struct MessageEntry *m; + return GNUNET_CONTAINER_multihashmap_get (sessions, &peer->hashPubKey); +} - if (n->th != NULL) - return; /* request already pending */ - if (GNUNET_YES != n->is_connected) - { - GNUNET_break (0); - return; - } - m = n->encrypted_head; - if (m == NULL) - { - /* encrypted queue empty, try plaintext instead */ - process_plaintext_neighbour_queue (n); + +/** + * End the session with the given peer (we are no longer + * connected). + * + * @param pid identity of peer to kill session with + */ +void +GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) +{ + struct Session *session; + struct GSC_ClientActiveRequest *car; + + session = find_session (pid); + if (NULL == session) return; - } -#if DEBUG_CORE > 1 +#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n", - (unsigned int) m->size, GNUNET_i2s (&n->peer), - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value); + "Destroying session for peer `%4s'\n", + GNUNET_i2s (&session->peer)); #endif - n->th = - GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size, - m->priority, - GNUNET_TIME_absolute_get_remaining - (m->deadline), - ¬ify_encrypted_transmit_ready, - n); - if (n->th == NULL) + if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) { - /* message request too large or duplicate request */ - GNUNET_break (0); - /* discard encrypted message */ - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - process_encrypted_neighbour_queue (n); + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = GNUNET_SCHEDULER_NO_TASK; } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (sessions, + &session->peer.hashPubKey, session)); + while (NULL != (car = session->active_client_request_head)) + { + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + car); + GSC_CLIENTS_reject_request (car); + } + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# established sessions"), + GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_NO); + GNUNET_free (session); } /** - * Initialize a new 'struct Neighbour'. + * Create a session, a key exchange was just completed. * - * @param pid ID of the new neighbour - * @return handle for the new neighbour + * @param peer peer that is now connected + * @param kx key exchange that completed */ -static struct Neighbour * -create_neighbour (const struct GNUNET_PeerIdentity *pid) +void +GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, + struct GSC_KeyExchangeInfo *kx) { - struct Neighbour *n; - struct GNUNET_TIME_Absolute now; + struct GNUNET_MessageHeader *hdr; + struct Session *session; #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid)); + "Creating session for peer `%4s'\n", GNUNET_i2s (pid)); #endif - n = GNUNET_malloc (sizeof (struct Neighbour)); - n->peer = *pid; - n->last_activity = GNUNET_TIME_absolute_get (); - n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX); - n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->ping_challenge = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + session = GNUNET_malloc (sizeof (struct Session)); + session->peer = *peer; + session->kxinfo = kx; + session->time_established = GNUNET_TIME_absolute_get (); GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (neighbours, - &n->peer.hashPubKey, n, + GNUNET_CONTAINER_multihashmap_put (sessions, + &peer->hashPubKey, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), - GNUNET_NO); - neighbour_quota_update (n, NULL); - consider_free_neighbour (n); - return n; + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop ("# established sessions"), + GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_NO); +#if 0 + /* FIXME: integration with ATS for quota calculations... */ + /* FIXME: who should do this? Neighbours!? */ + GNUNET_TRANSPORT_set_quota (transport, + peer, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT); +#endif + /* FIXME: we should probably do this periodically (in case + type map message is lost...) */ + hdr = GSC_TYPEMAP_compute_type_map_message (); + GSC_KX_encrypt_and_transmit (kx, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + hdr, + ntohs (hdr->size)); + GNUNET_free (hdr); +} + + +/** + * Notify the given client about the session (client is new). + * + * @param cls the 'struct GSC_Client' + * @param key peer identity + * @param value the 'struct Session' + * @return GNUNET_OK (continue to iterate) + */ +static int +notify_client_about_session (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct GSC_Client *client = cls; + struct Session *session = value; + + GDS_CLIENTS_notify_client_about_neighbour (client, + &session->peer, + NULL, 0, /* FIXME: ATS!? */ + NULL, /* old TMAP: none */ + session->tmap); + return GNUNET_OK; } - /** * We have a new client, notify it about all current sessions. * @@ -1134,12 +274,22 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid) void GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { - /* notify new client about existing neighbours */ - GNUNET_CONTAINER_multihashmap_iterate (neighbours, - ¬ify_client_about_neighbour, client); + /* notify new client about existing sessions */ + GNUNET_CONTAINER_multihashmap_iterate (sessions, + ¬ify_client_about_session, client); } +/** + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough. + * + * @param session session to transmit messages from + */ +static void +try_transmission (struct Session *session); + + /** * Queue a request from a client for transmission to a particular peer. * @@ -1152,11 +302,10 @@ GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) void GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) { - struct Neighbour *n; // FIXME: session... + struct Session *session; - n = find_neighbour (&car->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) + session = find_session (&car->target); + if (session == NULL) { /* neighbour must have disconnected since request was issued, * ignore (client will realize it once it processes the @@ -1165,21 +314,26 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); #endif - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# send requests dropped (disconnected)"), 1, GNUNET_NO); - GSC_CLIENTS_reject_requests (car); + GSC_CLIENTS_reject_request (car); + return; + } + if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + { + GNUNET_break (0); + GSC_CLIENTS_reject_request (car); return; } #if DEBUG_CORE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received client transmission request. queueing\n"); #endif - GNUNET_CONTAINER_DLL_insert (n->active_client_request_head, - n->active_client_request_tail, car); - - // schedule_peer_messages (n); + GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, + session->active_client_request_tail, car); + try_transmission (session); } @@ -1194,152 +348,179 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { struct Session *s; - s = find_session (&car->peer); + s = find_session (&car->target); GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, s->active_client_request_tail, car); } +/** + * Discard all expired active transmission requests from clients. + * + * @param session session to clean up + */ +static void +discard_expired_requests (struct Session *session) +{ + struct GSC_ClientActiveRequest *pos; + struct GSC_ClientActiveRequest *nxt; + struct GNUNET_TIME_Absolute now; + + now = GNUNET_TIME_absolute_get (); + pos = NULL; + nxt = session->active_client_request_head; + while (NULL != nxt) + { + pos = nxt; + nxt = pos->next; + if ( (pos->deadline.abs_value < now.abs_value) && + (GNUNET_YES != pos->was_solicited) ) + { + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# messages discarded (expired prior to transmission)"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + pos); + GSC_CLIENTS_reject_request (pos); + } + } +} + /** - * Transmit a message to a particular peer. + * Solicit messages for transmission. * - * @param car original request that was queued and then solicited; - * this handle will now be 'owned' by the SESSIONS subsystem - * @param msg message to transmit + * @param session session to solict messages for */ -void -GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg) +static void +solicit_messages (struct Session *session) { - struct MessageEntry *prev; - struct MessageEntry *pos; - struct MessageEntry *e; - struct MessageEntry *min_prio_entry; - struct MessageEntry *min_prio_prev; - unsigned int min_prio; - unsigned int queue_size; - - n = find_neighbour (&sm->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) + struct GSC_ClientActiveRequest *car; + size_t so_size; + + discard_expired_requests (session); + so_size = 0; + for (car = session->active_client_request_head; NULL != car; car = car->next) { - /* attempt to send message to peer that is not connected anymore - * (can happen due to asynchrony) */ - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (disconnected)"), 1, - GNUNET_NO); - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); + if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + break; + so_size += car->msize; + if (car->was_solicited == GNUNET_YES) + continue; + car->was_solicited = GNUNET_YES; + GSC_CLIENTS_solicit_request (car); + } +} + + +/** + * Some messages were delayed (corked), but the timeout has now expired. + * Send them now. + * + * @param cls 'struct Session' with the messages to transmit now + * @param tc scheduler context (unused) + */ +static void +pop_cork_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Session *session = session; + + session->cork_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission (session); +} + + +/** + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough. + * + * @param session session to transmit messages from + */ +static void +try_transmission (struct Session *session) +{ + struct SessionMessageEntry *pos; + size_t msize; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Absolute min_deadline; + + if (GNUNET_YES != session->ready_to_transmit) return; + msize = 0; + min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; + /* check 'ready' messages */ + pos = session->sme_head; + GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); + while ( (NULL != pos) && + (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) ) + { + msize += pos->size; + min_deadline = GNUNET_TIME_absolute_min (min_deadline, + pos->deadline); + pos = pos->next; } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n", - "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer)); -#endif - discard_expired_messages (n); - /* bound queue size */ - /* NOTE: this entire block to bound the queue size should be - * obsolete with the new client-request code and the - * 'schedule_peer_messages' mechanism; we still have this code in - * here for now as a sanity check for the new mechanmism; - * ultimately, we should probably simply reject SEND messages that - * are not 'approved' (or provide a new core API for very unreliable - * delivery that always sends with priority 0). Food for thought. */ - min_prio = UINT32_MAX; - min_prio_entry = NULL; - min_prio_prev = NULL; - queue_size = 0; - prev = NULL; - pos = n->messages; - while (pos != NULL) + now = GNUNET_TIME_absolute_get (); + if ( (msize == 0) || + ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && + (min_deadline.abs_value > now.abs_value) ) ) { - if (pos->priority <= min_prio) + /* not enough ready yet, try to solicit more */ + solicit_messages (session); + if (msize > 0) { - min_prio_entry = pos; - min_prio_prev = prev; - min_prio = pos->priority; + /* if there is data to send, just not yet, make sure we do transmit + it once the deadline is reached */ + if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline), + &pop_cork_task, + session); } - queue_size++; - prev = pos; - pos = pos->next; + return; } - if (queue_size >= MAX_PEER_QUEUE_SIZE) + /* create plaintext buffer of all messages, encrypt and transmit */ { - /* queue full */ - if (ntohl (sm->priority) <= min_prio) - { - /* discard new entry; this should no longer happen! */ - GNUNET_break (0); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n", - queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE, - (unsigned int) msize, (unsigned int) ntohs (message->type)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# discarded CORE_SEND requests"), - 1, GNUNET_NO); + static unsigned long long total_bytes; + static unsigned int total_msgs; + char pbuf[msize]; /* plaintext */ + size_t used; - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; + used = 0; + pos = session->sme_head; + while ( (NULL != pos) && + (used + pos->size <= msize) ) + { + memcpy (&pbuf[used], &pos[1], pos->size); + used += pos->size; } - GNUNET_assert (min_prio_entry != NULL); - /* discard "min_prio_entry" */ -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full, discarding existing older request\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# discarded lower priority CORE_SEND requests"), - 1, GNUNET_NO); - if (min_prio_prev == NULL) - n->messages = min_prio_entry->next; - else - min_prio_prev->next = min_prio_entry->next; - GNUNET_free (min_prio_entry); - } - -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transmission request for `%4s' of size %u to queue\n", - GNUNET_i2s (&sm->peer), (unsigned int) msize); -#endif - e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); - e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); - e->priority = ntohl (sm->priority); - e->size = msize; - if (GNUNET_YES != (int) ntohl (sm->cork)) - e->got_slack = GNUNET_YES; - memcpy (&e[1], &sm[1], msize); - - /* insert, keep list sorted by deadline */ - prev = NULL; - pos = n->messages; - while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value)) - { - prev = pos; - pos = pos->next; + /* compute average payload size */ + total_bytes += used; + total_msgs++; + if (0 == total_msgs) + { + /* 2^32 messages, wrap around... */ + total_msgs = 1; + total_bytes = used; + } + GNUNET_STATISTICS_set (GSC_stats, + "# avg payload per encrypted message", + total_bytes / total_msgs, + GNUNET_NO); + /* now actually transmit... */ + session->ready_to_transmit = GNUNET_NO; + GSC_KX_encrypt_and_transmit (session->kxinfo, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT /* FIXME! */, + pbuf, + used); } - if (prev == NULL) - n->messages = e; - else - prev->next = e; - e->next = pos; - - /* consider scheduling now */ - process_plaintext_neighbour_queue (n); - } - /** - * Send a message to the neighbour. + * Send a message to the neighbour now. * * @param cls the message * @param key neighbour's identity @@ -1349,22 +530,19 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, static int do_send_message (void *cls, const GNUNET_HashCode * key, void *value) { - struct GNUNET_MessageHeader *hdr = cls; - struct Neighbour *n = value; - struct MessageEntry *m; + const struct GNUNET_MessageHeader *hdr = cls; + struct Session *session = value; + struct SessionMessageEntry *m; uint16_t size; size = ntohs (hdr->size); - m = GNUNET_malloc (sizeof (struct MessageEntry) + size); + m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); memcpy (&m[1], hdr, size); - m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - m->priority = UINT_MAX; - m->sender_status = n->status; m->size = size; - GNUNET_CONTAINER_DLL_insert (n->message_head, - n->message_tail, + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, m); + try_transmission (session); return GNUNET_OK; } @@ -1380,71 +558,7 @@ GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) if (NULL == sessions) return; GNUNET_CONTAINER_multihashmap_iterate (sessions, - &do_send_message, msg); -} - - -/** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. - * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies - * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer - * @return GNUNET_OK (continue to iterate) - */ -static int -queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Neighbour *n = value; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; - struct ConnectNotifyMessage *cnm; - - cnm = (struct ConnectNotifyMessage *) buf; - if (n->status != PEER_STATE_KEY_CONFIRMED) - return GNUNET_OK; - size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - cnm = (struct ConnectNotifyMessage *) buf; - cnm->header.size = htons (size); - cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm->ats_count = htonl (n->ats_count); - ats = &cnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - ats[n->ats_count].value = htonl (0); -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", - "NOTIFY_CONNECT"); -#endif - cnm->peer = n->peer; - GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header); - return GNUNET_OK; -} - - -/** - * End the session with the given peer (we are no longer - * connected). - * - * @param pid identity of peer to kill session with - */ -void -GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) -{ + &do_send_message, (void*) msg); } @@ -1458,31 +572,73 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) void GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) { + struct Session *session; + + session = find_session (pid); + session->ready_to_transmit = GNUNET_YES; + try_transmission (session); } /** * Transmit a message to a particular peer. * - * @param car original request that was queued and then solicited, - * ownership does not change (dequeue will be called soon). + * @param car original request that was queued and then solicited; + * this handle will now be 'owned' by the SESSIONS subsystem * @param msg message to transmit + * @param cork is corking allowed? */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_MessageHeader *msg, + int cork) { + struct Session *session; + struct SessionMessageEntry *sme; + size_t msize; + + session = find_session (&car->target); + msize = ntohs (msg->size); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); + memcpy (&sme[1], msg, msize); + sme->size = msize; + if (GNUNET_YES == cork) + sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); } /** - * We have a new client, notify it about all current sessions. + * Helper function for GSC_SESSIONS_handle_client_iterate_peers. * - * @param client the new client + * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies + * @param key identity of the connected peer + * @param value the 'struct Neighbour' for the peer + * @return GNUNET_OK (continue to iterate) */ -void -GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) +#include "core.h" +static int +queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) { + struct GNUNET_SERVER_TransmitContext *tc = cls; + struct Session *session = value; + struct ConnectNotifyMessage cnm; + struct GNUNET_TRANSPORT_ATS_Information *a; + + /* FIXME: code duplication with clients... */ + cnm.header.size = htons (sizeof (struct ConnectNotifyMessage)); + cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); + cnm.ats_count = htonl (0); + cnm.peer = session->peer; + a = &cnm.ats; + // FIXME: full ats... + a[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + a[0].value = htonl (0); + GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header); + return GNUNET_OK; } @@ -1503,7 +659,8 @@ GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client struct GNUNET_SERVER_TransmitContext *tc; tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message, + GNUNET_CONTAINER_multihashmap_iterate (sessions, + &queue_connect_message, tc); done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); @@ -1532,7 +689,7 @@ GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *cl peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey, + GNUNET_CONTAINER_multihashmap_get_multiple (sessions, &peer->hashPubKey, &queue_connect_message, tc); done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); @@ -1541,7 +698,6 @@ GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *cl } - /** * Handle REQUEST_INFO request. For this request type, the client must * have transmitted an INIT first. @@ -1554,6 +710,8 @@ void GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { +#if 0 + // FIXME! const struct RequestInfoMessage *rcm; struct GSC_Client *pos; struct Neighbour *n; @@ -1563,7 +721,7 @@ GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client unsigned long long old_preference; struct GNUNET_TIME_Relative rdelay; - rdelay = GNUNET_TIME_relative_get_zero (); + rdelay = GNUNET_TIME_UNIT_ZERO; #if DEBUG_CORE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n", "REQUEST_INFO"); @@ -1647,91 +805,8 @@ GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client "CONFIGURATION_INFO"); #endif GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Create a session, a key exchange was just completed. - * - * @param peer peer that is now connected - * @param kx key exchange that completed - */ -void -GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, - struct GSC_KeyExchangeInfo *kx) -{ - { - struct GNUNET_MessageHeader *hdr; - - hdr = compute_type_map_message (); - send_type_map_to_neighbour (hdr, &n->peer.hashPubKey, n); - GNUNET_free (hdr); - } - if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__) - { - n->bw_out_external_limit = t.inbound_bw_limit; - n->bw_out = - GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit, - n->bw_out_internal_limit); - GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window, - n->bw_out); - GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out); - } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirmed key via `%s' message for peer `%4s'\n", "PONG", - GNUNET_i2s (&n->peer)); #endif - - - size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - cnm = (struct ConnectNotifyMessage *) buf; - cnm->header.size = htons (size); - cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm->ats_count = htonl (n->ats_count); - cnm->peer = n->peer; - mats = &cnm->ats; - memcpy (mats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - mats[n->ats_count].value = htonl (0); - send_to_all_clients (&cnm->header, GNUNET_NO, - GNUNET_CORE_OPTION_SEND_CONNECT); - process_encrypted_neighbour_queue (n); - n->last_activity = GNUNET_TIME_absolute_get (); - - if (n->status == PEER_STATE_KEY_CONFIRMED) - { - now = GNUNET_TIME_absolute_get (); - n->last_activity = now; - changed = GNUNET_YES; - if (!up) - { - GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), - 1, GNUNET_NO); - n->time_established = now; - } - if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->keep_alive_task); - n->keep_alive_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - 2), &send_keep_alive, n); - } - - + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1745,42 +820,37 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, */ void GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_BANDWIDTH_Value32NBO bw_out, - const struct GNUNET_TRANSPORT_ATS_Information *atsi, - uint32_t atsi_count) + struct GNUNET_BANDWIDTH_Value32NBO bw_out) { - if (bw_out_external_limit.value__ != pt->inbound_bw_limit.value__) - { -#if DEBUG_CORE_SET_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received %u b/s as new inbound limit for peer `%4s'\n", - (unsigned int) ntohl (pt->inbound_bw_limit.value__), - GNUNET_i2s (&n->peer)); -#endif - n->bw_out_external_limit = pt->inbound_bw_limit; - n->bw_out = - GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit, - n->bw_out_internal_limit); - GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window, - n->bw_out); - GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out); - } - + // FIXME + /* not implemented */ } /** * Initialize sessions subsystem. */ -int +void GSC_SESSIONS_init () { - neighbours = GNUNET_CONTAINER_multihashmap_create (128); - self.public_key = &my_public_key; - self.peer = my_identity; - self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS; - self.status = PEER_STATE_KEY_CONFIRMED; - self.is_connected = GNUNET_YES; + sessions = GNUNET_CONTAINER_multihashmap_create (128); +} + + +/** + * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * + * @param cls NULL + * @param key identity of the connected peer + * @param value the 'struct Session' for the peer + * @return GNUNET_OK (continue to iterate) + */ +static int +free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct Session *session = value; + + GSC_SESSIONS_end (&session->peer); return GNUNET_OK; } @@ -1791,10 +861,15 @@ GSC_SESSIONS_init () void GSC_SESSIONS_done () { - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper, + GNUNET_CONTAINER_multihashmap_iterate (sessions, + &free_session_helper, NULL); - GNUNET_CONTAINER_multihashmap_destroy (neighbours); - neighbours = NULL; - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), + GNUNET_CONTAINER_multihashmap_destroy (sessions); + sessions = NULL; + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# established sessions"), 0, GNUNET_NO); } + +/* end of gnunet-service-core_sessions.c */ + diff --git a/src/core/gnunet-service-core_sessions.h b/src/core/gnunet-service-core_sessions.h index 46f8d1f5c..787a431ee 100644 --- a/src/core/gnunet-service-core_sessions.h +++ b/src/core/gnunet-service-core_sessions.h @@ -30,6 +30,28 @@ #include "gnunet-service-core_kx.h" +/** + * Create a session, a key exchange was just completed. + * + * @param peer peer that is now connected + * @param kx key exchange that completed + */ +void +GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, + struct GSC_KeyExchangeInfo *kx); + + +/** + * Update information about a session. + * + * @param peer peer who's session should be updated + * @param bw_out new outbound bandwidth limit for the peer + */ +void +GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_BANDWIDTH_Value32NBO bw_out); + + /** * End the session with the given peer (we are no longer * connected). @@ -80,10 +102,12 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car); * @param car original request that was queued and then solicited, * ownership does not change (dequeue will be called soon). * @param msg message to transmit + * @param cork is corking allowed? */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg); + const struct GNUNET_MessageHeader *msg, + int cork); /** @@ -146,28 +170,6 @@ GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client const struct GNUNET_MessageHeader *message); -/** - * Create a session, a key exchange was just completed. - * - * @param peer peer that is now connected - * @param kx key exchange that completed - */ -void -GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, - struct GSC_KeyExchangeInfo *kx); - - -/** - * Update information about a session. - * - * @param peer peer who's session should be updated - * @param bw_out new outbound bandwidth limit for the peer - */ -void -GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_BANDWIDTH_Value32NBO bw_out); - - /** * Initialize sessions subsystem. */ diff --git a/src/core/gnunet-service-core_typemap.c b/src/core/gnunet-service-core_typemap.c index e52bc0a6e..78dfc2bb9 100644 --- a/src/core/gnunet-service-core_typemap.c +++ b/src/core/gnunet-service-core_typemap.c @@ -57,8 +57,8 @@ static uint8_t map_counters[UINT16_MAX + 1]; * * @return this peers current type map message. */ -static struct GNUNET_MessageHeader * -compute_type_map_message () +struct GNUNET_MessageHeader * +GSC_TYPEMAP_compute_type_map_message () { char *tmp; uLongf dlen; @@ -98,7 +98,7 @@ broadcast_my_type_map () { struct GNUNET_MessageHeader *hdr; - hdr = compute_type_map_message (); + hdr = GSC_TYPEMAP_compute_type_map_message (); GSC_SESSIONS_broadcast (hdr); GNUNET_free (hdr); } diff --git a/src/core/gnunet-service-core_typemap.h b/src/core/gnunet-service-core_typemap.h index 1087a90a6..10c614a85 100644 --- a/src/core/gnunet-service-core_typemap.h +++ b/src/core/gnunet-service-core_typemap.h @@ -51,6 +51,15 @@ GSC_TYPEMAP_remove (const uint16_t *types, unsigned int tlen); +/** + * Compute a type map message for this peer. + * + * @return this peers current type map message. + */ +struct GNUNET_MessageHeader * +GSC_TYPEMAP_compute_type_map_message (void); + + /** * Test if any of the types from the types array is in the * given type map.