tunnel_send_fwd_data_ack (struct MeshTunnel *t)
{
struct GNUNET_MESH_DataACK msg;
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ uint64_t mask;
+ unsigned int i;
+ unsigned int delta;
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK);
msg.header.size = htons (sizeof (msg));
msg.tid = htonl (t->id.tid);
GNUNET_PEER_resolve (t->id.oid, &msg.oid);
msg.mid = GNUNET_htonll (t->bck_rel->mid_recv - 1);
- msg.futures = 0; // FIXME set bits of other newer messages received
+ msg.futures = 0;
+ rel = t->bck_rel;
+ for (i = 0, copy = rel->head_recv;
+ i < 64 && NULL != copy;
+ i++, copy = copy->next)
+ {
+ delta = copy->mid - t->bck_rel->mid_recv;
+ mask = 0x1 << delta;
+ msg.futures |= mask;
+ }
+ msg.futures = GNUNET_htonll (msg.futures);
send_prebuilt_message (&msg.header, t->prev_hop, t);
}
}
+/**
+ * Send up to 64 buffered messages to the client for in order delivery.
+ *
+ * @param t Tunnel on which to empty the message buffer.
+ */
+static void
+tunnel_send_client_buffered_ucast (struct MeshTunnel *t)
+{
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
+
+ rel = t->bck_rel;
+ for (copy = rel->head_recv; NULL != copy; copy = next)
+ {
+ next = copy->next;
+ if (copy->mid == rel->mid_recv)
+ {
+ struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1];
+
+ tunnel_send_client_ucast (t, msg);
+ rel->mid_recv++;
+ GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
+ GNUNET_free (copy);
+ }
+ else
+ {
+ return;
+ }
+ }
+}
+
+
+/**
+ * We have received a message out of order, buffer it until we receive
+ * the missing one and we can feed the rest to the client.
+ */
+static void
+tunnel_add_buffer_ucast (struct MeshTunnel *t,
+ const struct GNUNET_MESH_Data *msg)
+{
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *prev;
+ uint64_t mid;
+ uint16_t size;
+
+ rel = t->bck_rel;
+ size = ntohs (msg->header.size);
+ mid = GNUNET_ntohll (msg->mid);
+
+ copy = GNUNET_malloc (sizeof (*copy) + size);
+ memcpy (©[1], msg, size);
+
+ // FIXME do something better than O(n), although n < 64...
+ for (prev = rel->head_recv; NULL != prev; prev = prev->next)
+ {
+ if (mid < prev->mid)
+ GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
+ prev, copy);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy);
+}
+
+
+/**
+ * Mark future messages as ACK'd.
+ *
+ * @param t Tunnel whose sent buffer to clean.
+ * @param msg DataACK message with a bitfield of future ACK'd messages.
+ */
+static void
+tunnel_free_buffer_ucast (struct MeshTunnel *t,
+ const struct GNUNET_MESH_DataACK *msg)
+{
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
+ uint64_t bitfield;
+ uint64_t mask;
+ uint64_t mid;
+ uint64_t target;
+ unsigned int i;
+
+ bitfield = GNUNET_ntohll (msg->futures);
+ mid = GNUNET_ntohll (msg->mid);
+ rel = t->fwd_rel;
+ for (i = 0, copy = rel->head_recv;
+ i < 64 && NULL != copy && 0 != bitfield;
+ i++, copy = next)
+ {
+ mask = 0x1 << i;
+ if (0 == (bitfield & mask))
+ continue;
+
+ /* Bit was set, clear the bit from the bitfield */
+ bitfield &= ~mask;
+
+ /* The i-th bit was set. Do we have that copy? */
+ /* Skip copies with mid < target */
+ target = mid + i + 1;
+ while (NULL != copy && copy->mid < target)
+ copy = copy->next;
+
+ /* Did we run out of copies? (previously freed, it's ok) */
+ if (NULL == copy)
+ return;
+
+ /* Did we overshoot the target? (previously freed, it's ok) */
+ if (copy->mid > target)
+ {
+ next = copy;
+ continue;
+ }
+
+ /* Now copy->mid == target, free it */
+ copy = copy->next;
+ GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
+ GNUNET_free (copy);
+ }
+}
+
+
/**
* Modify the to_origin message TID from global to local and send to client.
*
t->prev_fc.last_pid_recv = pid;
if (GNUNET_NO == t->reliable ||
- (mid >= t->bck_rel->mid_recv && mid < t->bck_rel->mid_recv + 64))
+ (mid >= t->bck_rel->mid_recv && mid <= t->bck_rel->mid_recv + 64))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"!!! RECV %llu\n", GNUNET_ntohll(msg->mid));
if (GNUNET_YES == t->reliable)
{
+ /* Is this the exact next expected messasge? */
if (mid == t->bck_rel->mid_recv)
+ {
t->bck_rel->mid_recv++;
+ tunnel_send_client_ucast (t, msg);
+ tunnel_send_client_buffered_ucast (t);
+ }
+ else
+ {
+ tunnel_add_buffer_ucast (t, msg);
+ }
}
- tunnel_send_client_ucast (t, msg);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- " MID %llu not expected (%llu), dropping!\n",
- GNUNET_ntohll (msg->mid), t->bck_rel->mid_recv);
+ " MID %llu not expected (%llu - %llu), dropping!\n",
+ GNUNET_ntohll (msg->mid),
+ t->bck_rel->mid_recv, t->bck_rel->mid_recv + 64LL);
}
}
else
if (copy->mid > ack)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! head %llu, out!\n", copy->mid);
+ tunnel_free_buffer_ucast (t, msg);
break;
}
work = GNUNET_YES;