* All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
*
* TODO:
- * - properly implement GLOBAL message buffer, instead of per-route buffers
* - do NOT use buffering if the route options say no buffer!
* - Optimization: given BROKEN messages, destroy paths (?)
*/
* Total number of route directions in this rung.
*/
unsigned int num_routes;
-};
-
-/**
- * Number of messages we are willing to buffer per route.
- * FIXME: have global buffer pool instead!
- */
-#define ROUTE_BUFFER_SIZE 8
+ /**
+ * Number of messages route directions at this rung have
+ * in their buffer.
+ */
+ unsigned int rung_off;
+};
/**
*/
struct GCP_MessageQueueManager *mqm;
- /**
- * Cyclic message buffer to @e hop.
- */
- struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
-
- /**
- * Next write offset to use to append messages to @e out_buffer.
- */
- unsigned int out_wpos;
-
- /**
- * Next read offset to use to retrieve messages from @e out_buffer.
- */
- unsigned int out_rpos;
-
/**
* Is @e mqm currently ready for transmission?
*/
*/
static struct GNUNET_CONTAINER_Heap *route_heap;
+/**
+ * Rung zero (always pointed to by #rung_head).
+ */
+static struct Rung rung_zero;
+
+/**
+ * DLL of rungs, with the head always point to a rung of
+ * route directions with no messages in the queue.
+ */
+static struct Rung *rung_head = &rung_zero;
+
+/**
+ * Tail of the #rung_head DLL.
+ */
+static struct Rung *rung_tail = &rung_zero;
+
/**
* Maximum number of concurrent routes this peer will support.
*/
}
+/**
+ * Lower the rung in which @a dir is by 1.
+ *
+ * @param dir direction to lower in rung.
+ */
+static void
+lower_rung (struct RouteDirection *dir)
+{
+ struct Rung *rung = dir->rung;
+ struct Rung *prev;
+
+ GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+ rung->rd_tail,
+ dir);
+ prev = rung->prev;
+ GNUNET_assert (NULL != prev);
+ if (prev->rung_off != rung->rung_off - 1)
+ {
+ prev = GNUNET_new (struct Rung);
+ prev->rung_off = rung->rung_off - 1;
+ GNUNET_CONTAINER_DLL_insert_after (rung_head,
+ rung_tail,
+ prev,
+ rung);
+ }
+ else
+ {
+ rung = prev;
+ }
+ GNUNET_assert (NULL != rung);
+ GNUNET_CONTAINER_DLL_insert (rung->rd_head,
+ rung->rd_tail,
+ dir);
+
+}
+
+
+/**
+ * Discard the buffer @a env from the route direction @a dir and
+ * move @a dir down a rung.
+ *
+ * @param dir direction that contains the @a env in the buffer
+ * @param env envelope to discard
+ */
+static void
+discard_buffer (struct RouteDirection *dir,
+ struct GNUNET_MQ_Envelope *env)
+{
+ GNUNET_MQ_dll_remove (&dir->env_head,
+ &dir->env_tail,
+ env);
+ cur_buffers--;
+ GNUNET_MQ_discard (env);
+ lower_rung (dir);
+}
+
+
+/**
+ * Discard all messages from the highest rung, to make space.
+ */
+static void
+discard_all_from_rung_tail ()
+{
+ struct Rung *tail = rung_tail;
+ struct RouteDirection *dir;
+
+ while (NULL != (dir = tail->rd_head))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full due new message %s on connection %s, dropping old message\n",
+ GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to full buffer",
+ 1,
+ GNUNET_NO);
+ discard_buffer (dir,
+ dir->env_head);
+ }
+ GNUNET_CONTAINER_DLL_remove (rung_head,
+ rung_tail,
+ tail);
+ GNUNET_free (tail);
+}
+
+
/**
* We message @a msg from @a prev. Find its route by @a cid and
* forward to the next hop. Drop and signal broken route if we do not
{
struct CadetRoute *route;
struct RouteDirection *dir;
+ struct Rung *rung;
+ struct Rung *nxt;
struct GNUNET_MQ_Envelope *env;
route = get_route (cid);
GNUNET_MQ_msg_copy (msg));
return;
}
- env = dir->out_buffer[dir->out_wpos];
- if (NULL != env)
+ rung = dir->rung;
+ if (cur_buffers == max_buffers)
{
- /* Queue full, drop earliest message in queue */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
- ntohs (msg->type),
- GCP_2s (prev),
- GNUNET_i2s (GCP_get_id (dir->hop)),
- GNUNET_sh2s (&cid->connection_of_tunnel));
- GNUNET_STATISTICS_update (stats,
- "# messages dropped due to full buffer",
- 1,
- GNUNET_NO);
- GNUNET_assert (dir->out_rpos == dir->out_wpos);
- GNUNET_MQ_discard (env);
- dir->out_rpos++;
- if (ROUTE_BUFFER_SIZE == dir->out_rpos)
- dir->out_rpos = 0;
+ /* Need to make room. */
+ if (NULL != rung->next)
+ {
+ /* Easy case, drop messages from route directions in highest rung */
+ discard_all_from_rung_tail ();
+ }
+ else
+ {
+ /* We are in the highest rung, drop our own! */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full due new message %s on connection %s, dropping old message\n",
+ GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to full buffer",
+ 1,
+ GNUNET_NO);
+ discard_buffer (dir,
+ dir->env_head);
+ rung = dir->rung;
+ }
+ }
+ /* remove 'dir' from current rung */
+ GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+ rung->rd_tail,
+ dir);
+ /* make 'nxt' point to the next higher rung, creat if necessary */
+ nxt = rung->next;
+ if ( (NULL == nxt) ||
+ (rung->rung_off + 1 != nxt->rung_off) )
+ {
+ nxt = GNUNET_new (struct Rung);
+ nxt->rung_off = rung->rung_off + 1;
+ GNUNET_CONTAINER_DLL_insert_after (rung_head,
+ rung_tail,
+ rung,
+ nxt);
}
+ /* insert 'dir' into next higher rung */
+ GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
+ nxt->rd_tail,
+ dir);
+ /* add message into 'dir' buffer */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queueing new message of type %u from %s to %s on connection %s\n",
ntohs (msg->type),
GNUNET_i2s (GCP_get_id (dir->hop)),
GNUNET_sh2s (&cid->connection_of_tunnel));
env = GNUNET_MQ_msg_copy (msg);
- dir->out_buffer[dir->out_wpos] = env;
- dir->out_wpos++;
- if (ROUTE_BUFFER_SIZE == dir->out_wpos)
- dir->out_wpos = 0;
+ GNUNET_MQ_dll_insert_tail (&dir->env_head,
+ &dir->env_tail,
+ env);
+ cur_buffers++;
+ /* Clean up 'rung' if now empty (and not head) */
+ if ( (NULL == rung->rd_head) &&
+ (rung != rung_head) )
+ {
+ GNUNET_CONTAINER_DLL_remove (rung_head,
+ rung_tail,
+ rung);
+ GNUNET_free (rung);
+ }
}
static void
destroy_direction (struct RouteDirection *dir)
{
- for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
- if (NULL != dir->out_buffer[i])
- {
- GNUNET_MQ_discard (dir->out_buffer[i]);
- dir->out_buffer[i] = NULL;
- }
+ struct GNUNET_MQ_Envelope *env;
+
+ while (NULL != (env = dir->env_head))
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to route destruction",
+ 1,
+ GNUNET_NO);
+ discard_buffer (dir,
+ env);
+ }
if (NULL != dir->mqm)
{
GCP_request_mq_cancel (dir->mqm,
NULL);
dir->mqm = NULL;
}
+ GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
+ rung_head->rd_tail,
+ dir);
}
struct GNUNET_MQ_Envelope *env;
dir->is_ready = GNUNET_YES;
- if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+ if (NULL != (env = dir->env_head))
{
- dir->out_buffer[dir->out_rpos] = NULL;
- dir->out_rpos++;
- if (ROUTE_BUFFER_SIZE == dir->out_rpos)
- dir->out_rpos = 0;
+ GNUNET_MQ_dll_remove (&dir->env_head,
+ &dir->env_tail,
+ env);
+ cur_buffers--;
+ lower_rung (dir);
dir->is_ready = GNUNET_NO;
GCP_send (dir->mqm,
env);