* @author Christian Grothoff
*
* TODO:
- * TESTING:
- * - write test for basic core functions:
- * + connect to peer
- * + transmit (encrypted) message [with handshake]
- * + receive (encrypted) message, forward plaintext to clients
* POST-TESTING:
* - revisit API (which arguments are used, needed)?
- * - add code to bound queue size when handling client's SEND message
* - add code to bound message queue size when passing messages to clients
- * - add code to discard_expired_messages
* - add code to re-transmit key if first attempt failed
* + timeout on connect / key exchange, etc.
* + timeout for automatic re-try, etc.
#define DEFAULT_BPM_IN_OUT 2048
+/**
+ * After how much time past the "official" expiration time do
+ * we discard messages? Should not be zero since we may
+ * intentionally defer transmission until close to the deadline
+ * and then may be slightly past the deadline due to inaccuracy
+ * in sleep and our own CPU consumption.
+ */
+#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
+
+
/**
* What is the maximum delay for a SET_KEY message?
*/
#define PONG_PRIORITY 0xFFFFFF
+/**
+ * How many messages do we queue per peer at most?
+ */
+#define MAX_PEER_QUEUE_SIZE 16
+
+
/**
* What is the maximum age of a message for us to consider
* processing it? Note that this looks at the timestamp used
{
struct Client *pos;
struct Client *prev;
+ struct Event *e;
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
prev->next = pos->next;
if (pos->th != NULL)
GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
+ while (NULL != (e = pos->event_head))
+ {
+ pos->event_head = e->next;
+ GNUNET_free (e);
+ }
GNUNET_free (pos);
return;
}
static void
process_encrypted_neighbour_queue (struct Neighbour *n)
{
+ struct MessageEntry *m;
+
if (n->th != NULL)
return; /* request already pending */
if (n->encrypted_head == NULL)
{
/* message request too large (oops) */
GNUNET_break (0);
- /* FIXME: handle error somehow! */
+ /* discard encrypted message */
+ GNUNET_assert (NULL != (m = n->encrypted_head));
+ n->encrypted_head = m->next;
+ if (m->next == NULL)
+ n->encrypted_tail = NULL;
+ GNUNET_free (m);
+ process_encrypted_neighbour_queue (n);
}
}
static void
discard_expired_messages (struct Neighbour *n)
{
- /* FIXME */
+ struct MessageEntry *prev;
+ struct MessageEntry *next;
+ struct MessageEntry *pos;
+ struct GNUNET_TIME_Absolute cutoff;
+
+ cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME);
+ prev = NULL;
+ pos = n->messages;
+ while (pos != NULL)
+ {
+ next = pos->next;
+ if (pos->deadline.value < cutoff.value)
+ {
+ if (prev == NULL)
+ n->messages = next;
+ else
+ prev->next = next;
+ GNUNET_free (pos);
+ }
+ else
+ prev = pos;
+ pos = next;
+ }
}
GNUNET_i2s (&sm->peer));
#endif
GNUNET_free (sm);
- /* FIXME: do we need to do something here to let the
- client know about the failure!? */
return 0;
}
#if DEBUG_CORE
struct SendMessage *smc;
const struct GNUNET_MessageHeader *mh;
struct Neighbour *n;
- struct MessageEntry *pred;
+ struct MessageEntry *prev;
struct MessageEntry *pos;
- struct MessageEntry *e;
+ struct MessageEntry *e;
+ struct MessageEntry *min_prio_entry;
+ struct MessageEntry *min_prio_prev;
+ unsigned int min_prio;
+ unsigned int queue_size;
uint16_t msize;
msize = ntohs (message->size);
#endif
msize += sizeof (struct SendMessage);
/* ask transport to connect to the peer */
- /* FIXME: this code does not handle the
- case where we get multiple SendMessages before
- transport responds to this request;
- => need to track pending requests! */
smc = GNUNET_malloc (msize);
memcpy (smc, sm, msize);
- GNUNET_TRANSPORT_notify_transmit_ready (transport,
- &sm->peer,
- 0,
- GNUNET_TIME_absolute_get_remaining
- (GNUNET_TIME_absolute_ntoh
- (sm->deadline)),
- &send_connect_continuation,
- smc);
+ if (NULL ==
+ GNUNET_TRANSPORT_notify_transmit_ready (transport,
+ &sm->peer,
+ 0,
+ GNUNET_TIME_absolute_get_remaining
+ (GNUNET_TIME_absolute_ntoh
+ (sm->deadline)),
+ &send_connect_continuation,
+ smc))
+ {
+ /* transport has already a request pending for this peer! */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Dropped second message destined for `%4s' since connection is still down.\n",
+ GNUNET_i2s(&sm->peer));
+#endif
+ GNUNET_free (smc);
+ }
if (client != NULL)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
msize,
GNUNET_i2s (&sm->peer));
#endif
- /* FIXME: consider bounding queue size */
+ /* bound queue size */
+ discard_expired_messages (n);
+ min_prio = (unsigned int) -1;
+ queue_size = 0;
+ prev = NULL;
+ pos = n->messages;
+ while (pos != NULL)
+ {
+ if (pos->priority < min_prio)
+ {
+ min_prio_entry = pos;
+ min_prio_prev = prev;
+ min_prio = pos->priority;
+ }
+ queue_size++;
+ prev = pos;
+ pos = pos->next;
+ }
+ if (queue_size >= MAX_PEER_QUEUE_SIZE)
+ {
+ /* queue full */
+ if (ntohl(sm->priority) <= min_prio)
+ {
+ /* discard new entry */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full, discarding new request\n");
+#endif
+ if (client != NULL)
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return;
+ }
+ /* discard "min_prio_entry" */
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full, discarding existing older request\n");
+#endif
+ if (min_prio_prev == NULL)
+ n->messages = min_prio_entry->next;
+ else
+ min_prio_prev->next = min_prio_entry->next;
+ GNUNET_free (min_prio_entry);
+ }
+
e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
e->priority = ntohl (sm->priority);
memcpy (&e[1], mh, msize);
/* insert, keep list sorted by deadline */
- pred = NULL;
+ prev = NULL;
pos = n->messages;
while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
{
- pred = pos;
+ prev = pos;
pos = pos->next;
}
- if (pred == NULL)
+ if (prev == NULL)
n->messages = e;
else
- pred->next = e;
+ prev->next = e;
e->next = pos;
/* consider scheduling now */
cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Neighbour *n;
+ struct Client *c;
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
neighbours = n->next;
free_neighbour (n);
}
+ while (NULL != (c = clients))
+ handle_client_disconnect (NULL, c->client_handle);
}
if (my_private_key != NULL)
GNUNET_CRYPTO_rsa_key_free (my_private_key);
- /*
- FIXME:
- - free clients
- */
}