{
struct MeshPeerQueue *queue;
struct GNUNET_PeerIdentity id;
- unsigned int *n;
+ struct MeshFlowControl *fc;
+ uint32_t pid;
+ uint32_t pid_q;
+ int priority;
- n = NULL;
+ fc = NULL;
+ priority = GNUNET_NO;
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type)
{
- n = &t->next_fc.queue_n;
+ fc = &t->next_fc;
+ pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid);
}
else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
{
- n = &t->prev_fc.queue_n;
+ fc = &t->prev_fc;
+ pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid);
}
- if (NULL != n)
+ if (NULL != fc)
{
- if (*n >= t->queue_max)
+ if (fc->queue_n >= t->queue_max)
{
- GNUNET_break(0);
+ GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"queue full: %u/%u\n",
- *n, t->queue_max);
- GNUNET_STATISTICS_update(stats,
- "# messages dropped (buffer full)",
- 1, GNUNET_NO);
- return; /* Drop message */
+ fc->queue_n, t->queue_max);
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped (buffer full)",
+ 1, GNUNET_NO);
+ /* Get the PID of the oldest message in the queue */
+ for (queue = dst->queue_head; queue != NULL; queue = queue->next)
+ if (queue->type == type && queue->tunnel == t)
+ {
+ pid_q = ntohl (((struct GNUNET_MESH_Data *)(queue->cls))->pid);
+ break;
+ }
+ GNUNET_assert (NULL != queue);
+
+ /* If this is an earlier message that that, give it priority:
+ * - drop the newest message in the queue
+ * - instert current one at the end of the queue (first to get out)
+ */
+ if (GNUNET_YES == t->reliable && GMC_is_pid_bigger(pid_q, pid))
+ {
+ for (queue = dst->queue_tail; queue != NULL; queue = queue->prev)
+ if (queue->type == type && queue->tunnel == t)
+ {
+ /* Drop message from queue */
+ GNUNET_CONTAINER_DLL_remove (dst->queue_head,
+ dst->queue_tail,
+ queue);
+ queue_destroy (queue, GNUNET_YES);
+ fc->queue_n--;
+ t->pending_messages--;
+ priority = GNUNET_YES;
+ break;
+ }
+ }
+ else
+ return; /* Drop this message */
}
- (*n)++;
+ fc->queue_n++;
}
queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
queue->cls = cls;
queue->size = size;
queue->peer = dst;
queue->tunnel = t;
- GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ if (GNUNET_YES == priority)
+ GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
+ else
+ GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
if (NULL == dst->core_transmit)
{
GNUNET_PEER_resolve (dst->id, &id);