* @brief high-level P2P messaging
* @author Christian Grothoff
*
- * TODO:
- * TESTING:
- * - write test for basic core functions:
- * + connect to peer
- * + transmit (encrypted) message [with handshake]
- * + receive (encrypted) message, forward plaintext to clients
* POST-TESTING:
- * - revisit API (which arguments are used, needed)?
- * - add code to bound queue size when handling client's SEND message
- * - add code to bound message queue size when passing messages to clients
- * - add code to discard_expired_messages
- * - add code to re-transmit key if first attempt failed
- * + timeout on connect / key exchange, etc.
- * + timeout for automatic re-try, etc.
- * - add code to give up re-transmission of key if many attempts fail
- * - add code to send PINGs if we are about to time-out otherwise
- * ? add heuristic to do another send_key in "handle_set_key"
- * in case previous attempt failed / didn't work / persist
- * (but don't do it always to avoid storm of SET_KEY's going
- * back and forth!) --- alternatively, add "status" field
- * of the other peer to the set key message, that way we'd
- * know for sure!
- * - check that hostkey used by transport (for HELLOs) is the
- * same as the hostkey that we are using!
- * - free list of clients on exit
* - topology management:
* + bootstrapping (transport offer hello, plugins)
* + internal neighbour selection
- * + update bandwidth usage statistics
- * + bandwidth allocation (transport set quota)
+ *
+ * Considerations for later:
+ * - check that hostkey used by transport (for HELLOs) is the
+ * same as the hostkey that we are using!
+ * - add code to send PINGs if we are about to time-out otherwise
* - optimize lookup (many O(n) list traversals
* could ideally be changed to O(1) hash map lookups)
*/
#include "platform.h"
+#include "gnunet_constants.h"
#include "gnunet_util_lib.h"
#include "gnunet_hello_lib.h"
#include "gnunet_peerinfo_service.h"
*/
#define MAX_WINDOW_TIME (5 * 60 * 1000)
+/**
+ * Minimum of bytes per minute (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
+ * sense.
+ */
+#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
/**
- * Amount of bytes per minute (in/out) to assume initially
- * (before either peer has communicated any particular
- * preference). Should be rather low.
+ * What is the smallest change (in number of bytes per minute)
+ * that we consider significant enough to bother triggering?
*/
-#define DEFAULT_BPM_IN_OUT 2048
+#define MIN_BPM_CHANGE 32
+/**
+ * After how much time past the "official" expiration time do
+ * we discard messages? Should not be zero since we may
+ * intentionally defer transmission until close to the deadline
+ * and then may be slightly past the deadline due to inaccuracy
+ * in sleep and our own CPU consumption.
+ */
+#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
/**
* What is the maximum delay for a SET_KEY message?
*/
#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
-
/**
* What how long do we wait for SET_KEY confirmation initially?
*/
#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
-
/**
* What is the maximum delay for a PING message?
*/
#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
-
/**
* What is the maximum delay for a PONG message?
*/
#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
+/**
+ * How often do we recalculate bandwidth quotas?
+ */
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
/**
* What is the priority for a SET_KEY message?
*/
#define SET_KEY_PRIORITY 0xFFFFFF
-
/**
* What is the priority for a PING message?
*/
#define PING_PRIORITY 0xFFFFFF
-
/**
* What is the priority for a PONG message?
*/
#define PONG_PRIORITY 0xFFFFFF
+/**
+ * How many messages do we queue per peer at most?
+ */
+#define MAX_PEER_QUEUE_SIZE 16
+
+/**
+ * How many non-mandatory messages do we queue per client at most?
+ */
+#define MAX_CLIENT_QUEUE_SIZE 32
/**
* What is the maximum age of a message for us to consider
*/
#define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
-
/**
* What is the maximum size for encrypted messages? Note that this
* number imposes a clear limit on the maximum size of any message.
*/
#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
+
/**
* Encapsulation for encrypted messages exchanged between
* peers. Followed by the actual encrypted data.
*/
GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
+ /**
+ * ID of task used for updating bandwidth quota for this neighbour.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+
/**
* At what time did we generate our encryption key?
*/
struct GNUNET_TIME_Relative last_latency;
/**
- * At what frequency are we currently re-trying SET KEY messages?
+ * At what frequency are we currently re-trying SET_KEY messages?
*/
struct GNUNET_TIME_Relative set_key_retry_frequency;
* bandwidth-hogs are sampled at a frequency of about 78s!);
* may get negative if we have VERY high priority content.
*/
- long long available_send_window;
+ long long available_send_window;
/**
* How much downstream capacity of this peer has been reserved for
* make sure that this reserved amount of bandwidth is actually
* available).
*/
- long long available_recv_window;
+ long long available_recv_window;
/**
* How valueable were the messages of this peer recently?
*/
- double current_preference;
+ unsigned long long current_preference;
/**
* Bit map indicating which of the 32 sequence numbers before the last
/**
* Internal bandwidth limit set for this peer (initially
- * typcially set to "-1"). "bpm_out" is MAX of
+ * typically set to "-1"). "bpm_out" is MAX of
* "bpm_out_internal_limit" and "bpm_out_external_limit".
*/
uint32_t bpm_out_internal_limit;
uint32_t bpm_out_external_limit;
/**
- * What was our PING challenge number?
+ * What was our PING challenge number (for this peer)?
*/
uint32_t ping_challenge;
*/
static struct GNUNET_TRANSPORT_Handle *transport;
+/**
+ * Linked list of our clients.
+ */
+static struct Client *clients;
+
/**
* We keep neighbours in a linked list (for now).
*/
static struct Neighbour *neighbours;
/**
- * Linked list of our clients.
+ * Sum of all preferences among all neighbours.
*/
-static struct Client *clients;
+static unsigned long long preference_sum;
+
+/**
+ * Total number of neighbours we have.
+ */
+static unsigned int neighbour_count;
+
+/**
+ * How much inbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_in;
+
+/**
+ * How much outbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_out;
+
+
+
+/**
+ * A preference value for a neighbour was update. Update
+ * the preference sum accordingly.
+ *
+ * @param inc how much was a preference value increased?
+ */
+static void
+update_preference_sum (unsigned long long inc)
+{
+ struct Neighbour *n;
+ unsigned long long os;
+
+ os = preference_sum;
+ preference_sum += inc;
+ if (preference_sum >= os)
+ return; /* done! */
+ /* overflow! compensate by cutting all values in half! */
+ preference_sum = 0;
+ n = neighbours;
+ while (n != NULL)
+ {
+ n->current_preference /= 2;
+ preference_sum += n->current_preference;
+ n = n->next;
+ }
+}
/**
* Recalculate the number of bytes we expect to
* receive or transmit in a given window.
*
+ * @param force force an update now (even if not much time has passed)
* @param window pointer to the byte counter (updated)
* @param ts pointer to the timestamp (updated)
* @param bpm number of bytes per minute that should
* be added to the window.
*/
static void
-update_window (long long *window,
+update_window (int force,
+ long long *window,
struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
{
struct GNUNET_TIME_Relative since;
since = GNUNET_TIME_absolute_get_duration (*ts);
- if (since.value < 60 * 1000)
+ if ( (force == GNUNET_NO) &&
+ (since.value < 60 * 1000) )
return; /* not even a minute has passed */
*ts = GNUNET_TIME_absolute_get ();
*window += (bpm * since.value) / 60 / 1000;
/**
* Client is ready to receive data, provide it.
+ *
* @param cls closure
* @param size number of bytes available in buf
* @param buf where the callee should write the message
size_t ret;
client->th = NULL;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client ready to receive %u bytes.\n", size);
+#endif
if (buf == NULL)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to transmit data to client (disconnect)?\n");
+#endif
return 0; /* we'll surely get a disconnect soon... */
}
tgt = buf;
GNUNET_assert (ret > 0);
if (client->event_head == NULL)
client->event_tail = NULL;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting %u bytes to client\n", ret);
+#endif
request_transmit (client);
return ret;
}
return; /* already pending */
if (NULL == client->event_head)
return; /* no more events pending */
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking server to transmit %u bytes to client\n",
client->event_head->size);
+#endif
client->th
= GNUNET_SERVER_notify_transmit_ready (client->client_handle,
client->event_head->size,
const struct GNUNET_MessageHeader *msg, int can_drop)
{
struct Event *e;
+ unsigned int queue_size;
uint16_t msize;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Preparing to send message of type %u to client.\n",
ntohs (msg->type));
+#endif
+ queue_size = 0;
+ e = client->event_head;
+ while (e != NULL)
+ {
+ queue_size++;
+ e = e->next;
+ }
+ if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
+ (can_drop == GNUNET_YES) )
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Too many messages in queue for the client, dropping the new message.\n");
+#endif
+ return;
+ }
+
msize = ntohs (msg->size);
e = GNUNET_malloc (sizeof (struct Event) + msize);
/* append */
*/
static void
handle_client_init (void *cls,
- struct GNUNET_SERVER_Handle *server,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
struct Neighbour *n;
struct ConnectNotifyMessage cnm;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client connecting to core service with `%s' message\n",
"INIT");
+#endif
/* check that we don't have an entry already */
c = clients;
while (c != NULL)
memcpy (&irm.publicKey,
&my_public_key,
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending `%s' message to client.\n", "INIT_REPLY");
+#endif
send_to_client (c, &irm.header, GNUNET_NO);
/* notify new client about existing neighbours */
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
n = neighbours;
while (n != NULL)
{
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending `%s' message to client.\n", "NOTIFY_CONNECT");
- cnm.bpm_available = htonl (n->bpm_out);
- cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
+#endif
+ cnm.reserved = htonl (0);
cnm.peer = n->peer;
send_to_client (c, &cnm.header, GNUNET_NO);
n = n->next;
{
struct Client *pos;
struct Client *prev;
+ struct Event *e;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client has disconnected from core service.\n");
+#endif
prev = NULL;
pos = clients;
while (pos != NULL)
prev->next = pos->next;
if (pos->th != NULL)
GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
+ while (NULL != (e = pos->event_head))
+ {
+ pos->event_head = e->next;
+ GNUNET_free (e);
+ }
GNUNET_free (pos);
return;
}
*/
static void
handle_client_request_configure (void *cls,
- struct GNUNET_SERVER_Handle *server,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
struct ConfigurationInfoMessage cim;
struct Client *c;
int reserv;
+ unsigned long long old_preference;
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request.\n", "CONFIGURE");
+#endif
rcm = (const struct RequestConfigureMessage *) message;
n = find_neighbour (&rcm->peer);
memset (&cim, 0, sizeof (cim));
if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
{
+ update_window (GNUNET_YES,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
n->bpm_out_external_limit);
}
else if (reserv > 0)
{
- update_window (&n->available_recv_window,
+ update_window (GNUNET_NO,
+ &n->available_recv_window,
&n->last_arw_update, n->bpm_in);
if (n->available_recv_window < reserv)
reserv = n->available_recv_window;
n->available_recv_window -= reserv;
}
- n->current_preference += rcm->preference_change;
- if (n->current_preference < 0)
- n->current_preference = 0;
+ old_preference = n->current_preference;
+ n->current_preference += GNUNET_ntohll(rcm->preference_change);
+ if (old_preference > n->current_preference)
+ {
+ /* overflow; cap at maximum value */
+ n->current_preference = (unsigned long long) -1;
+ }
+ update_preference_sum (n->current_preference - old_preference);
cim.reserved_amount = htonl (reserv);
cim.bpm_in = htonl (n->bpm_in);
cim.bpm_out = htonl (n->bpm_out);
GNUNET_break (0);
return;
}
+#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending `%s' message to client.\n", "CONFIGURATION_INFO");
+#endif
send_to_client (c, &cim.header, GNUNET_NO);
}
char *cbuf;
n->th = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transport ready to receive %u bytes for `%4s'\n",
- size, GNUNET_i2s (&n->peer));
GNUNET_assert (NULL != (m = n->encrypted_head));
n->encrypted_head = m->next;
if (m->next == NULL)
GNUNET_assert (size >= m->size);
memcpy (cbuf, &m[1], m->size);
ret = m->size;
+ n->available_send_window -= m->size;
process_encrypted_neighbour_queue (n);
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Copied message of type %u and size %u into transport buffer for `%4s'\n",
ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
ret, GNUNET_i2s (&n->peer));
+#endif
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Transmission for message of type %u and size %u failed\n",
ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
m->size);
static void
process_encrypted_neighbour_queue (struct Neighbour *n)
{
+ struct MessageEntry *m;
+
if (n->th != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to process encrypted queue, but request already pending.\n");
- return; /* already pending */
- }
+ return; /* request already pending */
if (n->encrypted_head == NULL)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Encrypted queue empty, trying plaintext queue instead.\n");
+ /* encrypted queue empty, try plaintext instead */
process_plaintext_neighbour_queue (n);
return;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
n->encrypted_head->size,
GNUNET_i2s (&n->peer),
- GNUNET_TIME_absolute_get_remaining (n->encrypted_head->
- deadline).value);
+ GNUNET_TIME_absolute_get_remaining (n->
+ encrypted_head->deadline).
+ value);
+#endif
n->th =
GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
n->encrypted_head->size,
+ n->encrypted_head->priority,
GNUNET_TIME_absolute_get_remaining
(n->encrypted_head->deadline),
¬ify_encrypted_transmit_ready,
{
/* message request too large (oops) */
GNUNET_break (0);
- /* FIXME: handle error somehow! */
+ /* discard encrypted message */
+ GNUNET_assert (NULL != (m = n->encrypted_head));
+ n->encrypted_head = m->next;
+ if (m->next == NULL)
+ n->encrypted_tail = NULL;
+ GNUNET_free (m);
+ process_encrypted_neighbour_queue (n);
}
}
GNUNET_break (0);
return GNUNET_SYSERR;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Decrypted %u bytes from `%4s' using key %u\n",
size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
+#endif
return GNUNET_OK;
}
(const struct
GNUNET_CRYPTO_AesInitializationVector
*) iv, out));
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Encrypted %u bytes for `%4s' using key %u\n", size,
GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
+#endif
return GNUNET_OK;
}
min = NULL;
min_prio = -1;
discard_low_prio = GNUNET_NO;
- /* number of bytes available for transmission at time "t" */
+ /* calculate number of bytes available for transmission at time "t" */
+ update_window (GNUNET_NO,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
avail = n->available_send_window;
t = n->last_asw_update;
- /* how many bytes have we (hyptothetically) scheduled so far */
+ /* how many bytes have we (hypothetically) scheduled so far */
off = 0;
/* maximum time we can wait before transmitting anything
and still make all of our deadlines */
}
if (discard_low_prio)
{
+ GNUNET_assert (min != NULL);
/* remove lowest-priority entry from consideration */
min->do_transmit = GNUNET_YES; /* means: discard (for now) */
}
}
/* guard against sending "tiny" messages with large headers without
urgent deadlines */
- if ((slack > 1000) && (size > 4 * off))
+ if ( (slack > 1000) && (size > 4 * off) )
{
/* less than 25% of message would be filled with
deadlines still being met if we delay by one
pos->do_transmit = GNUNET_NO; /* mark for not transmitting! */
pos = pos->next;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
off, GNUNET_i2s (&n->peer));
+#endif
return off;
}
*retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
if (0 == select_messages (n, size, retry_time))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No messages selected, will try again in %llu ms\n",
retry_time->value);
return 0;
static void
discard_expired_messages (struct Neighbour *n)
{
- /* FIXME */
+ struct MessageEntry *prev;
+ struct MessageEntry *next;
+ struct MessageEntry *pos;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative delta;
+
+ now = GNUNET_TIME_absolute_get ();
+ prev = NULL;
+ pos = n->messages;
+ while (pos != NULL)
+ {
+ next = pos->next;
+ delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
+ if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Message is %llu ms past due, discarding.\n",
+ delta.value);
+#endif
+ if (prev == NULL)
+ n->messages = next;
+ else
+ prev->next = next;
+ GNUNET_free (pos);
+ }
+ else
+ prev = pos;
+ pos = next;
+ }
}
struct GNUNET_TIME_Absolute deadline;
struct GNUNET_TIME_Relative retry_time;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing plaintext message queue for `%4s', scheduling messages.\n",
- GNUNET_i2s (&n->peer));
if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
{
GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
{
case PEER_STATE_DOWN:
send_key (n);
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not yet connected, deferring processing of plaintext messages.\n");
+ "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+ GNUNET_i2s(&n->peer));
+#endif
return;
case PEER_STATE_KEY_SENT:
GNUNET_assert (n->retry_set_key_task !=
GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not yet connected, deferring processing of plaintext messages.\n");
+ "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+ GNUNET_i2s(&n->peer));
+#endif
return;
case PEER_STATE_KEY_RECEIVED:
GNUNET_assert (n->retry_set_key_task !=
GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not yet connected, deferring processing of plaintext messages.\n");
+ "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+ GNUNET_i2s(&n->peer));
+#endif
return;
case PEER_STATE_KEY_CONFIRMED:
/* ready to continue */
break;
}
+ discard_expired_messages (n);
if (n->messages == NULL)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Plaintext message queue is empty.\n");
+ "Plaintext message queue for `%4s' is empty.\n",
+ GNUNET_i2s(&n->peer));
+#endif
return; /* no pending messages */
}
- discard_expired_messages (n);
if (n->encrypted_head != NULL)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Encrypted message queue is still full, delaying plaintext processing.\n");
+ "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
+ GNUNET_i2s(&n->peer));
+#endif
return; /* wait for messages already encrypted to be
processed first! */
}
deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
priority = 0;
used = sizeof (struct EncryptedMessage);
-
used += batch_message (n,
&pbuf[used],
MAX_ENCRYPTED_MESSAGE_SIZE - used,
&deadline, &retry_time, &priority);
if (used == sizeof (struct EncryptedMessage))
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No messages selected for processing at this time, will try again later.\n");
+ "No messages selected for transmission to `%4s' at this time, will try again later.\n",
+ GNUNET_i2s(&n->peer));
+#endif
/* no messages selected for sending, try again later... */
n->retry_plaintext_task =
GNUNET_SCHEDULER_add_delayed (sched,
esize = used - ENCRYPTED_HEADER_SIZE;
GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
/* encrypt */
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Encrypting plaintext messages for transmission.\n");
+ "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
+ esize,
+ GNUNET_i2s(&n->peer));
+#endif
GNUNET_assert (GNUNET_OK ==
do_encrypt (n,
&em->plaintext_hash,
*/
static void
handle_client_send (void *cls,
- struct GNUNET_SERVER_Handle *server,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message);
if (buf == NULL)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Asked to send message to disconnected peer `%4s' and connection failed. Discarding message.\n",
GNUNET_i2s (&sm->peer));
+#endif
GNUNET_free (sm);
return 0;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection to peer `%4s' succeeded, retrying original send request\n",
+ "Connection to peer `%4s' succeeded, retrying original transmission request\n",
GNUNET_i2s (&sm->peer));
- handle_client_send (NULL, NULL, NULL, &sm->header);
+#endif
+ handle_client_send (NULL, NULL, &sm->header);
GNUNET_free (sm);
return 0;
}
*/
static void
handle_client_send (void *cls,
- struct GNUNET_SERVER_Handle *server,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
struct SendMessage *smc;
const struct GNUNET_MessageHeader *mh;
struct Neighbour *n;
- struct MessageEntry *pred;
+ struct MessageEntry *prev;
struct MessageEntry *pos;
- struct MessageEntry *e;
+ struct MessageEntry *e;
+ struct MessageEntry *min_prio_entry;
+ struct MessageEntry *min_prio_prev;
+ unsigned int min_prio;
+ unsigned int queue_size;
uint16_t msize;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core service receives `%s' request.\n", "SEND");
msize = ntohs (message->size);
if (msize <
sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
n = find_neighbour (&sm->peer);
if (n == NULL)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not yet connected to `%4s', will try to establish connection\n",
- GNUNET_i2s (&sm->peer));
+ "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
+ "SEND",
+ GNUNET_i2s (&sm->peer),
+ GNUNET_TIME_absolute_get_remaining
+ (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
+#endif
msize += sizeof (struct SendMessage);
/* ask transport to connect to the peer */
- /* FIXME: this code does not handle the
- case where we get multiple SendMessages before
- transport responds to this request;
- => need to track pending requests! */
smc = GNUNET_malloc (msize);
memcpy (smc, sm, msize);
- GNUNET_TRANSPORT_notify_transmit_ready (transport,
- &sm->peer,
- 0,
- GNUNET_TIME_absolute_get_remaining
- (GNUNET_TIME_absolute_ntoh
- (sm->deadline)),
- &send_connect_continuation,
- smc);
+ if (NULL ==
+ GNUNET_TRANSPORT_notify_transmit_ready (transport,
+ &sm->peer,
+ 0, 0,
+ GNUNET_TIME_absolute_get_remaining
+ (GNUNET_TIME_absolute_ntoh
+ (sm->deadline)),
+ &send_connect_continuation,
+ smc))
+ {
+ /* transport has already a request pending for this peer! */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Dropped second message destined for `%4s' since connection is still down.\n",
+ GNUNET_i2s(&sm->peer));
+#endif
+ GNUNET_free (smc);
+ }
if (client != NULL)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core queues %u bytes of plaintext data for transmission to `%4s'.\n",
- msize, GNUNET_i2s (&sm->peer));
- /* FIXME: consider bounding queue size */
+ "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
+ "SEND",
+ msize,
+ GNUNET_i2s (&sm->peer));
+#endif
+ /* bound queue size */
+ discard_expired_messages (n);
+ min_prio = (unsigned int) -1;
+ queue_size = 0;
+ prev = NULL;
+ pos = n->messages;
+ while (pos != NULL)
+ {
+ if (pos->priority < min_prio)
+ {
+ min_prio_entry = pos;
+ min_prio_prev = prev;
+ min_prio = pos->priority;
+ }
+ queue_size++;
+ prev = pos;
+ pos = pos->next;
+ }
+ if (queue_size >= MAX_PEER_QUEUE_SIZE)
+ {
+ /* queue full */
+ if (ntohl(sm->priority) <= min_prio)
+ {
+ /* discard new entry */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full, discarding new request\n");
+#endif
+ if (client != NULL)
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return;
+ }
+ /* discard "min_prio_entry" */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full, discarding existing older request\n");
+#endif
+ if (min_prio_prev == NULL)
+ n->messages = min_prio_entry->next;
+ else
+ min_prio_prev->next = min_prio_entry->next;
+ GNUNET_free (min_prio_entry);
+ }
+
e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
e->priority = ntohl (sm->priority);
memcpy (&e[1], mh, msize);
/* insert, keep list sorted by deadline */
- pred = NULL;
+ prev = NULL;
pos = n->messages;
while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
{
- pred = pos;
+ prev = pos;
pos = pos->next;
}
- if (pred == NULL)
+ if (prev == NULL)
n->messages = e;
else
- pred->next = e;
+ prev->next = e;
e->next = pos;
/* consider scheduling now */
return;
if (n->public_key != NULL)
return;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received new HELLO for `%4s', initiating key exchange.\n",
+ "Received new `%s' message for `%4s', initiating key exchange.\n",
+ "HELLO",
GNUNET_i2s (peer));
+#endif
n->public_key =
GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
struct PingMessage pp;
struct PingMessage *pm;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to perform key exchange with `%4s'.\n",
GNUNET_i2s (&n->peer));
+#endif
if (n->public_key == NULL)
{
/* lookup n's public key, then try again */
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Lacking public key for `%4s', trying to obtain one.\n",
GNUNET_i2s (&n->peer));
+#endif
GNUNET_PEERINFO_for_all (cfg,
sched,
&n->peer,
pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
pp.challenge = htonl (n->ping_challenge);
pp.target = n->peer;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Encrypting `%s' and `%s' messages for `%4s'.\n",
"SET_KEY", "PING", GNUNET_i2s (&n->peer));
"Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
"PING",
GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
+#endif
do_encrypt (n,
&n->peer.hashPubKey,
&pp.challenge,
break;
}
/* trigger queue processing */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Triggering processing of encrypted message queue.\n");
process_encrypted_neighbour_queue (n);
if (n->status != PEER_STATE_KEY_CONFIRMED)
n->retry_set_key_task
GNUNET_free (sm);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Looking for peer `%4s' to handle `%s' message.\n",
- GNUNET_i2s (peer), "SET_KEY");
n = find_neighbour (peer);
if (n == NULL)
{
GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
{
+ GNUNET_break_op (0);
GNUNET_free (n->public_key);
n->public_key = NULL;
return;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' for `%4s', continuing processing of `%s' message.\n",
"HELLO", GNUNET_i2s (peer), "SET_KEY");
+#endif
handle_set_key (n, sm);
}
struct PingMessage *tp;
struct MessageEntry *me;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"PING", GNUNET_i2s (&n->peer));
+#endif
if (GNUNET_OK !=
do_decrypt (n,
&my_identity.hashPubKey,
sizeof (struct PingMessage) -
sizeof (struct GNUNET_MessageHeader)))
return;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
"PING",
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Target of `%s' request is `%4s'.\n",
"PING", GNUNET_i2s (&t.target));
+#endif
if (0 != memcmp (&t.target,
&my_identity, sizeof (struct GNUNET_PeerIdentity)))
{
&tp->challenge,
sizeof (struct PingMessage) -
sizeof (struct GNUNET_MessageHeader));
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Encrypting `%s' with challenge %u using key %u\n", "PONG",
ntohl (t.challenge), n->encrypt_key.crc32);
+#endif
/* trigger queue processing */
process_encrypted_neighbour_queue (n);
}
struct PingMessage *ping;
enum PeerStateMachine sender_status;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"SET_KEY", GNUNET_i2s (&n->peer));
+#endif
if (n->public_key == NULL)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Lacking public key for peer, trying to obtain one.\n");
+#endif
m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
/* lookup n's public key, then try again */
GNUNET_break_op (0);
return;
}
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
+#endif
if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
&m->encrypted_key,
&k,
{
case PEER_STATE_DOWN:
n->status = PEER_STATE_KEY_RECEIVED;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Responding to `%s' with my own key.\n", "SET_KEY");
+#endif
send_key (n);
break;
case PEER_STATE_KEY_SENT:
if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
(sender_status != PEER_STATE_KEY_CONFIRMED))
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Responding to `%s' with my own key (other peer has status %u).\n",
"SET_KEY", sender_status);
+#endif
send_key (n);
}
break;
if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
(sender_status != PEER_STATE_KEY_CONFIRMED))
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
"SET_KEY", sender_status);
+#endif
send_key (n);
}
break;
{
struct PingMessage t;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"PONG", GNUNET_i2s (&n->peer));
+#endif
if (GNUNET_OK !=
do_decrypt (n,
&n->peer.hashPubKey,
sizeof (struct PingMessage) -
sizeof (struct GNUNET_MessageHeader)))
return;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Decrypted `%s' from `%4s' with challenge %u using key %u\n",
"PONG",
GNUNET_i2s (&t.target),
ntohl (t.challenge), n->decrypt_key.crc32);
+#endif
if ((0 != memcmp (&t.target,
&n->peer,
sizeof (struct GNUNET_PeerIdentity))) ||
(n->ping_challenge != ntohl (t.challenge)))
{
/* PONG malformed */
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
"PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received malfromed `%s' received from `%4s' with challenge %u\n",
"PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
+#endif
GNUNET_break_op (0);
return;
}
char buf[msize + sizeof (struct NotifyTrafficMessage)];
struct NotifyTrafficMessage *ntm;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Core service passes P2P message of type %u to client.\n",
+ "Core service passes message from `%4s' of type %u to client.\n",
+ GNUNET_i2s(&sender->peer),
ntohs (((const struct GNUNET_MessageHeader *) m)->type));
+#endif
ntm = (struct NotifyTrafficMessage *) buf;
ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
unsigned int tpos;
int deliver_full;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Passing decrypted P2P message to interested clients.\n");
type = ntohs (m->type);
cpos = clients;
while (cpos != NULL)
uint16_t msize;
int need_align;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivering %u bytes of plaintext to interested clients.\n",
- buffer_size);
while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
{
if (0 != offset % sizeof (uint16_t))
uint32_t snum;
struct GNUNET_TIME_Absolute t;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
+#endif
/* decrypt */
if (GNUNET_OK !=
do_decrypt (n,
snum = ntohl (pt->sequence_number);
if (n->last_sequence_number_received == snum)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Received duplicate message, ignoring.\n");
/* duplicate, ignore */
return;
if ((n->last_sequence_number_received > snum) &&
(n->last_sequence_number_received - snum > 32))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Received ancient out of sequence message, ignoring.\n");
/* ancient out of sequence, ignore */
return;
1 << (n->last_sequence_number_received - snum - 1);
if ((n->last_packets_bitmap & rotbit) != 0)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Received duplicate message, ignoring.\n");
/* duplicate, ignore */
return;
}
/* process decrypted message(s) */
+ update_window (GNUNET_YES,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
n->bpm_out_internal_limit);
uint16_t type;
uint16_t size;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received message of type %u from `%4s', demultiplexing.\n",
ntohs (message->type), GNUNET_i2s (peer));
+#endif
n = find_neighbour (peer);
if (n == NULL)
{
if ((n->status != PEER_STATE_KEY_RECEIVED) &&
(n->status != PEER_STATE_KEY_CONFIRMED))
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
"PING", GNUNET_i2s (&n->peer));
+#endif
GNUNET_free_non_null (n->pending_ping);
n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
memcpy (n->pending_ping, message, sizeof (struct PingMessage));
}
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ *
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
+ */
+static void
+schedule_quota_update (struct Neighbour *n)
+{
+ GNUNET_assert (n->quota_update_task ==
+ GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+ n->quota_update_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ QUOTA_UPDATE_FREQUENCY,
+ &neighbour_quota_update,
+ n);
+}
+
+
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ *
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Neighbour *n = cls;
+ uint32_t q_in;
+ double pref_rel;
+ double share;
+ unsigned long long distributable;
+
+ n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ /* calculate relative preference among all neighbours;
+ divides by a bit more to avoid division by zero AND to
+ account for possibility of new neighbours joining any time
+ AND to convert to double... */
+ pref_rel = n->current_preference / (1.0 + preference_sum);
+ share = 0;
+ distributable = 0;
+ if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
+ distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+ share = distributable * pref_rel;
+ q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+ /* check if we want to disconnect for good due to inactivity */
+ if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
+ (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
+ q_in = 0; /* force disconnect */
+ if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
+ (n->bpm_in - MIN_BPM_CHANGE > q_in) )
+ {
+ n->bpm_in = q_in;
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bpm_in,
+ n->bpm_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ }
+ schedule_quota_update (n);
+}
+
+
/**
* Function called by transport to notify us that
* a peer connected to us (on the network level).
struct GNUNET_TIME_Absolute now;
struct ConnectNotifyMessage cnm;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received connection from `%4s'.\n", GNUNET_i2s (peer));
n = find_neighbour (peer);
if (n != NULL)
{
n = GNUNET_malloc (sizeof (struct Neighbour));
n->next = neighbours;
neighbours = n;
+ neighbour_count++;
n->peer = *peer;
n->last_latency = latency;
GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
n->last_asw_update = now;
n->last_arw_update = now;
- n->bpm_in = DEFAULT_BPM_IN_OUT;
- n->bpm_out = DEFAULT_BPM_IN_OUT;
+ n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+ n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
n->bpm_out_internal_limit = (uint32_t) - 1;
- n->bpm_out_external_limit = DEFAULT_BPM_IN_OUT;
+ n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
(uint32_t) - 1);
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Created entry for new neighbour `%4s'.\n",
+ "Received connection from `%4s'.\n",
GNUNET_i2s (&n->peer));
+#endif
+ schedule_quota_update (n);
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
- cnm.bpm_available = htonl (DEFAULT_BPM_IN_OUT);
+ cnm.reserved = htonl (0);
cnm.peer = *peer;
- cnm.last_activity = GNUNET_TIME_absolute_hton (now);
send_to_all_clients (&cnm.header, GNUNET_YES);
}
/**
* Free the given entry for the neighbour (it has
* already been removed from the list at this point).
+ *
* @param n neighbour to free
*/
static void
GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+ if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
GNUNET_free_non_null (n->public_key);
GNUNET_free_non_null (n->pending_ping);
GNUNET_free (n);
struct Neighbour *n;
struct Neighbour *p;
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
+#endif
p = NULL;
n = neighbours;
while ((n != NULL) &&
neighbours = n->next;
else
p->next = n->next;
+ GNUNET_assert (neighbour_count > 0);
+ neighbour_count--;
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
- cnm.bpm_available = htonl (0);
+ cnm.reserved = htonl (0);
cnm.peer = *peer;
- cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
send_to_all_clients (&cnm.header, GNUNET_YES);
free_neighbour (n);
}
static void
cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service shutting down.\n");
+ struct Neighbour *n;
+ struct Client *c;
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Core service shutting down.\n");
+#endif
GNUNET_assert (transport != NULL);
GNUNET_TRANSPORT_disconnect (transport);
transport = NULL;
+ while (NULL != (n = neighbours))
+ {
+ neighbours = n->next;
+ GNUNET_assert (neighbour_count > 0);
+ neighbour_count--;
+ free_neighbour (n);
+ }
+ while (NULL != (c = clients))
+ handle_client_disconnect (NULL, c->client_handle);
}
cfg = c;
/* parse configuration */
if (
-#if 0
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
- "XX",
- &qin)) ||
+ "TOTAL_QUOTA_IN",
+ &bandwidth_target_in)) ||
+ (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (c,
+ "CORE",
+ "TOTAL_QUOTA_OUT",
+ &bandwidth_target_out)) ||
+#if 0
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
static void
cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
{
- struct Neighbour *n;
-
if (my_private_key != NULL)
GNUNET_CRYPTO_rsa_key_free (my_private_key);
- while (NULL != (n = neighbours))
- {
- neighbours = n->next;
- free_neighbour (n);
- }
- /*
- FIXME:
- - free clients
- */
}