* Last ACK sent to that child (BCK ACK).
*/
uint32_t bck_ack;
+
+ /**
+ * Circular buffer pointing to MeshPeerQueue elements.
+ * Size determined by the tunnel queue size.
+ */
+ struct MeshPeerQueue **send_buffer;
+
+ /**
+ * Index of the oldest element in the send_buffer.
+ */
+ unsigned int send_buffer_start;
+
+ /**
+ * How many elements are already in the buffer.
+ */
+ unsigned int send_buffer_n;
};
queue_add (void *cls, uint16_t type, size_t size,
struct MeshPeerInfo *dst, struct MeshTunnel *t);
+
/**
* Free a transmission that was already queued with all resources
* associated to the request.
static void
queue_destroy (struct MeshPeerQueue *queue, int clear_cls);
+
+/**
+ * @brief Get the next transmittable message from the queue.
+ *
+ * This will be the head, except in the case of being a data packet
+ * not allowed by the destination peer.
+ *
+ * @param peer Destination peer.
+ *
+ * @return The next viable MeshPeerQueue element to send to that peer.
+ * NULL when there are no transmittable messages.
+ */
+struct MeshPeerQueue *
+queue_get_next (const struct MeshPeerInfo *peer);
+
+
+/**
+ * Core callback to write a queued packet to core buffer
+ *
+ * @param cls Closure (peer info).
+ * @param size Number of bytes available in buf.
+ * @param buf Where the to write the message.
+ *
+ * @return number of bytes written to buf
+ */
+static size_t
+queue_send (void *cls, size_t size, void *buf);
+
/******************************************************************************/
/************************ ITERATORS ****************************/
/******************************************************************************/
const struct GNUNET_HashCode * key,
void *value)
{
- GNUNET_free (value);
+ struct MeshTunnelChildInfo *cinfo = value;
+ struct MeshTunnel *t = cls;
+ unsigned int c;
+ unsigned int i;
+
+ for (c = 0; c < cinfo->send_buffer_n; c++)
+ {
+ i = (cinfo->send_buffer_start + c) % t->fwd_queue_max;
+ if (NULL != cinfo->send_buffer[i])
+ queue_destroy(cinfo->send_buffer[i], GNUNET_YES);
+ else
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n);
+ }
+ GNUNET_free_non_null (cinfo->send_buffer);
+ GNUNET_free (cinfo);
return GNUNET_YES;
}
tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata);
if (*(mdata->reference_counter) == 0)
{
- GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" no one to send data to\n");
GNUNET_free (mdata->data);
cinfo->fwd_ack = t->fwd_pid + delta;
cinfo->bck_ack = delta;
+ cinfo->send_buffer =
+ GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max);
+
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (t->children_fc,
&peer->hashPubKey,
if (-1 == child_ack)
{
// Node has no children, child_ack AND core buffer are irrelevant.
- GNUNET_break (-1 != client_ack); // No children AND no clients? Not good! // FIXME fc
+ GNUNET_break (-1 != client_ack); // No children AND no clients? Not good!
return (uint32_t) client_ack;
}
}
+/**
+ * @brief Re-initiate traffic to this peer if necessary.
+ *
+ * Check if there is traffic queued towards this peer
+ * and the core transmit handle is NULL (traffic was stalled).
+ * If so, call core tmt rdy.
+ *
+ * @param cls Closure (unused)
+ * @param peer_id Short ID of peer to which initiate traffic.
+ */
+static void
+peer_unlock_queue(void *cls, GNUNET_PEER_Id peer_id)
+{
+ struct MeshPeerInfo *peer;
+ struct GNUNET_PeerIdentity id;
+ struct MeshPeerQueue *q;
+ size_t size;
+
+ peer = peer_info_get_short(peer_id);
+ if (NULL != peer->core_transmit)
+ return;
+
+ q = queue_get_next(peer);
+ if (NULL == q)
+ {
+ /* Might br multicast traffic already sent to this particular peer but
+ * not to other children in this tunnel.
+ * This way t->queue_n would be > 0 but the queue of this particular peer
+ * would be empty.
+ */
+ return;
+ }
+ size = q->size;
+ GNUNET_PEER_resolve (peer->id, &id);
+ peer->core_transmit =
+ GNUNET_CORE_notify_transmit_ready(core_handle,
+ 0,
+ 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &id,
+ size,
+ &queue_send,
+ peer);
+ return;
+}
+
+
+/**
+ * @brief Allow transmission of FWD traffic on this tunnel
+ *
+ * Check if there is traffic queued towards any children
+ * and the core transmit handle is NULL, and if so, call core tmt rdy.
+ *
+ * @param t Tunnel on which to unlock FWD traffic.
+ */
+static void
+tunnel_unlock_fwd_queues (struct MeshTunnel *t)
+{
+ if (0 == t->fwd_queue_n)
+ return;
+
+ tree_iterate_children (t->tree, &peer_unlock_queue, NULL);
+}
+
+
+/**
+ * @brief Allow transmission of BCK traffic on this tunnel
+ *
+ * Check if there is traffic queued towards the root of the tree
+ * and the core transmit handle is NULL, and if so, call core tmt rdy.
+ *
+ * @param t Tunnel on which to unlock BCK traffic.
+ */
+static void
+tunnel_unlock_bck_queue (struct MeshTunnel *t)
+{
+ if (0 == t->bck_queue_n)
+ return;
+
+ peer_unlock_queue(NULL, tree_get_predecessor(t->tree));
+}
+
+
/**
* Send a message to all peers in this tunnel that the tunnel is no longer
* valid.
/**
* @brief Get the next transmittable message from the queue.
*
- * This will be the head, expect in the case of being a data packet
+ * This will be the head, except in the case of being a data packet
* not allowed by the destination peer.
*
* @param peer Destination peer.
break;
case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
mcast = (struct GNUNET_MESH_Multicast *) info->mesh_data->data;
+ if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST != ntohs(mcast->header.type))
+ {
+ // Not a multicast payload: multicast control traffic (destroy, etc)
+ return q;
+ }
pid = ntohl (mcast->pid);
GNUNET_PEER_resolve (info->peer->id, &id);
cinfo = tunnel_get_neighbor_fc(t, &id);
struct GNUNET_MessageHeader *msg;
struct MeshPeerQueue *queue;
struct MeshTunnel *t;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_PeerIdentity dst_id;
size_t data_size;
peer->core_transmit = NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n");
+ GNUNET_PEER_resolve (peer->id, &dst_id);
/* Check if buffer size is enough for the message */
if (queue->size > size)
{
- struct GNUNET_PeerIdentity id;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"********* not enough room, reissue\n");
- GNUNET_PEER_resolve (peer->id, &id);
peer->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
0,
0,
GNUNET_TIME_UNIT_FOREVER_REL,
- &id,
+ &dst_id,
queue->size,
&queue_send,
peer);
case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n");
data_size = send_core_data_multicast(queue->cls, size, buf);
- // t->fwd_queue_n--; FIXME fc
+ // FIXME fc substract when? depending on the tunnel conf.
+ // t->fwd_queue_n--;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
break;
case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE:
queue->type);
data_size = 0;
}
+ switch (queue->type)
+ {
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ cinfo = tunnel_get_neighbor_fc(t, &dst_id);
+ if (cinfo->send_buffer[cinfo->send_buffer_start] != queue)
+ {
+ GNUNET_break(0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "at pos %u (%p) != %p\n",
+ cinfo->send_buffer_start,
+ cinfo->send_buffer[cinfo->send_buffer_start],
+ queue);
+ }
+ GNUNET_break(cinfo->send_buffer_n > 0);
+ cinfo->send_buffer[cinfo->send_buffer_start] = NULL;
+ cinfo->send_buffer_n--;
+ cinfo->send_buffer_start++;
+ cinfo->send_buffer_start %= t->fwd_queue_max;
+ break;
+ default:
+ break;
+ }
/* Free queue, but cls was freed by send_core_* */
queue_destroy (queue, GNUNET_NO);
struct MeshPeerInfo *dst, struct MeshTunnel *t)
{
struct MeshPeerQueue *queue;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_PeerIdentity id;
unsigned int *max;
unsigned int *n;
+ unsigned int i;
n = NULL;
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
GNUNET_break_op(0); // TODO: kill connection?
else
GNUNET_break(0);
+ GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)",
+ 1, GNUNET_NO);
return; // Drop message
}
(*n)++;
queue->peer = dst;
queue->tunnel = t;
GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ GNUNET_PEER_resolve (dst->id, &id);
if (NULL == dst->core_transmit)
{
- struct GNUNET_PeerIdentity id;
-
- GNUNET_PEER_resolve (dst->id, &id);
dst->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
0,
&queue_send,
dst);
}
+ if (NULL == n) // Is this internal mesh traffic?
+ return;
+
+ // It's payload, keep track of buffer per peer.
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ i = (cinfo->send_buffer_start + cinfo->send_buffer_n) % t->fwd_queue_max;
+ if (NULL != cinfo->send_buffer[i])
+ {
+ GNUNET_break (cinfo->send_buffer_n == t->fwd_queue_max); // aka i == start
+ queue_destroy(cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES);
+ cinfo->send_buffer_start++;
+ cinfo->send_buffer_start %= t->fwd_queue_max;
+ cinfo->send_buffer_n--;
+ }
+ cinfo->send_buffer[i] = queue;
+ cinfo->send_buffer_n++;
+ if (cinfo->send_buffer_n > t->fwd_queue_max)
+ {
+ GNUNET_break (0);
+ cinfo->send_buffer_n = t->fwd_queue_max;
+ }
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
GNUNET_i2s (peer));
msg = (struct GNUNET_MESH_ACK *) message;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n",
- GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type)));
+
t = tunnel_get (&msg->oid, ntohl (msg->tid));
if (NULL == t)
ack = ntohl (msg->pid);
/* Is this a forward or backward ACK? */
- if (tree_get_predecessor(t->tree) == GNUNET_PEER_search(peer))
+ if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer))
{
struct MeshTunnelChildInfo *cinfo;
cinfo = tunnel_get_neighbor_fc (t, peer);
cinfo->fwd_ack = ack;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+ tunnel_unlock_fwd_queues (t);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n");
t->bck_ack = ack;
tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+ tunnel_unlock_bck_queue (t);
}
- // FIXME fc Unlock queues?
return GNUNET_OK;
}
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
- if (t->owner == c)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n"); GNUNET_break (0);
+ GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
t = tunnel_get_by_local_id (c, tid);
if (NULL == t)
{
- GNUNET_break (0); // FIXME fc
+ GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
- if (t->owner == c)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n");
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}