#include "gnunet-service-core_typemap.h"
/**
- * How many messages do we queue up at most for optional
- * notifications to a client? (this can cause notifications
- * about outgoing messages to be dropped).
+ * How many messages do we queue up at most for any client? This can
+ * cause messages to be dropped if clients do not process them fast
+ * enough! Note that this is a soft limit; we try
+ * to keep a few larger messages above the limit.
*/
-#define MAX_NOTIFY_QUEUE 1024
+#define SOFT_MAX_QUEUE 128
+
+/**
+ * How many messages do we queue up at most for any client? This can
+ * cause messages to be dropped if clients do not process them fast
+ * enough! Note that this is the hard limit.
+ */
+#define HARD_MAX_QUEUE 256
/**
* Message queue to talk to @e client.
*/
struct GNUNET_MQ_Handle *mq;
-
+
/**
* Array of the types of messages this peer cares
* about (with @e tcnt entries). Allocated as part
* of this client struct, do not free!
*/
uint16_t *types;
-
+
/**
* Map of peer identities to active transmission requests of this
* client to the peer (of type `struct GSC_ClientActiveRequest`).
*/
uint32_t options;
+ /**
+ * Have we gotten the #GNUNET_MESSAGE_TYPE_CORE_INIT message
+ * from this client already?
+ */
+ int got_init;
+
/**
* Number of types of incoming messages this client
* specifically cares about. Size of the @e types array.
* Check #GNUNET_MESSAGE_TYPE_CORE_INIT request.
*
* @param cls client that sent #GNUNET_MESSAGE_TYPE_CORE_INIT
- * @param im the `struct InitMessage`
+ * @param im the `struct InitMessage`
* @return #GNUNET_OK if @a im is well-formed
*/
static int
* Handle #GNUNET_MESSAGE_TYPE_CORE_INIT request.
*
* @param cls client that sent #GNUNET_MESSAGE_TYPE_CORE_INIT
- * @param im the `struct InitMessage`
+ * @param im the `struct InitMessage`
*/
static void
handle_client_init (void *cls,
types = (const uint16_t *) &im[1];
c->tcnt = msize / sizeof (uint16_t);
c->options = ntohl (im->options);
+ c->got_init = GNUNET_YES;
all_client_options |= c->options;
c->types = GNUNET_malloc (msize);
- c->connectmap = GNUNET_CONTAINER_multipeermap_create (16,
- GNUNET_NO);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_put (c->connectmap,
&GSC_my_identity,
struct GNUNET_MQ_Handle *mq)
{
struct GSC_Client *c;
-
+
c = GNUNET_new (struct GSC_Client);
c->client = client;
c->mq = mq;
+ c->connectmap = GNUNET_CONTAINER_multipeermap_create (16,
+ GNUNET_NO);
GNUNET_CONTAINER_DLL_insert (client_head,
client_tail,
c);
NULL);
GNUNET_CONTAINER_multipeermap_destroy (c->requests);
}
- if (NULL != c->connectmap)
- {
- GNUNET_CONTAINER_multipeermap_destroy (c->connectmap);
- c->connectmap = NULL;
- }
+ GNUNET_CONTAINER_multipeermap_destroy (c->connectmap);
+ c->connectmap = NULL;
if (NULL != c->types)
{
GSC_TYPEMAP_remove (c->types,
int old_match;
int new_match;
+ if (GNUNET_YES != client->got_init)
+ return;
old_match = GSC_TYPEMAP_test_match (tmap_old,
client->types,
client->tcnt);
new_match = GSC_TYPEMAP_test_match (tmap_new,
client->types,
client->tcnt);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying client about neighbour %s (%d/%d)\n",
+ GNUNET_i2s (neighbour),
+ old_match,
+ new_match);
if (old_match == new_match)
{
GNUNET_assert (old_match ==
GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
cnm->reserved = htonl (0);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending NOTIFY_CONNECT message to client.\n");
+ "Sending NOTIFY_CONNECT message about peer %s to client.\n",
+ GNUNET_i2s (neighbour));
cnm->peer = *neighbour;
GNUNET_MQ_send (client->mq,
env);
{
struct DisconnectNotifyMessage *dcm;
- /* send disconnect */
+ /* send disconnect */
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_contains (client->connectmap,
neighbour));
env = GNUNET_MQ_msg (dcm,
GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
dcm->reserved = htonl (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending NOTIFY_DISCONNECT message about peer %s to client.\n",
+ GNUNET_i2s (neighbour));
dcm->peer = *neighbour;
GNUNET_MQ_send (client->mq,
env);
struct GNUNET_MQ_Envelope *env;
struct NotifyTrafficMessage *ntm;
uint16_t mtype;
+ unsigned int qlen;
int tm;
tm = type_match (ntohs (msg->type),
if ( (0 != (options & GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND)) &&
(0 != (c->options & GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND)) )
continue;
+
+ /* Drop messages if:
+ 1) We are above the hard limit, or
+ 2) We are above the soft limit, and a coin toss limited
+ to the message size (giving larger messages a
+ proportionally higher chance of being queued) falls
+ below the threshold. The threshold is based on where
+ we are between the soft and the hard limit, scaled
+ to match the range of message sizes we usually encounter
+ (i.e. up to 32k); so a 64k message has a 50% chance of
+ being kept if we are just barely below the hard max,
+ and a 99% chance of being kept if we are at the soft max.
+ The reason is to make it more likely to drop control traffic
+ (ACK, queries) which may be cummulative or highly redundant,
+ and cheap to drop than data traffic. */
+ qlen = GNUNET_MQ_get_length (c->mq);
+ if ( (qlen >= HARD_MAX_QUEUE) ||
+ ( (qlen > SOFT_MAX_QUEUE) &&
+ ( (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ ntohs (msg->size)) ) <
+ (qlen - SOFT_MAX_QUEUE) * 0x8000 /
+ (HARD_MAX_QUEUE - SOFT_MAX_QUEUE) ) ) )
+ {
+ char buf[1024];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
+ "Dropping decrypted message of type %u as client is too busy (queue full)\n",
+ (unsigned int) ntohs (msg->type));
+ GNUNET_snprintf (buf,
+ sizeof (buf),
+ gettext_noop ("# messages of type %u discarded (client busy)"),
+ (unsigned int) ntohs (msg->type));
+ GNUNET_STATISTICS_update (GSC_stats,
+ buf,
+ 1,
+ GNUNET_NO);
+ continue;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending %u message with %u bytes to client interested in messages of type %u.\n",
options,
sender)) );
GNUNET_MQ_send (c->mq,
env);
- }
+ }
}
const struct GNUNET_MessageHeader *message)
{
struct GSC_Client *c = cls;
-
+
GNUNET_SERVICE_client_continue (c->client);
GSC_KX_handle_client_monitor_peers (c->mq);
}
NULL,
GNUNET_MQ_hd_var_size (client_init,
GNUNET_MESSAGE_TYPE_CORE_INIT,
- struct InitMessage,
+ struct InitMessage,
NULL),
GNUNET_MQ_hd_fixed_size (client_monitor_peers,
GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS,