* - error reporting (CREATE/CHANGE/ADD/DEL?) -- new message!
* - partial disconnect reporting -- same as error reporting?
* - add vs create? change vs. keep-alive? same msg or different ones? -- thinking...
- * - speed requirement specification (change?) in mesh API -- API call
* - add ping message
* - relay corking down to core
* - set ttl relative to tree depth
+ * - Add data ACK count in path ACK
* TODO END
*/
*/
uint32_t last_fwd_ack;
+ /**
+ * Last ACK sent towards the next hop (for traffic towards root).
+ */
+ uint32_t last_bck_ack;
+
/**
* BCK ACK value received from the hop towards the owner of the tunnel,
* (previous node / owner): up to what message PID can we sent back to him.
uint32_t bck_ack;
/**
- * How many messages are in the queue.
+ * How many messages are in the forward queue (towards leaves).
*/
- unsigned int queue_n;
+ unsigned int fwd_queue_n;
/**
- * How many messages do we accept in the queue.
+ * How many messages do we accept in the forward queue.
*/
- unsigned int queue_max;
+ unsigned int fwd_queue_max;
+
+ /**
+ * How many messages are in the backward queue (towards origin).
+ */
+ unsigned int bck_queue_n;
+
+ /**
+ * How many messages do we accept in the backward queue.
+ */
+ unsigned int bck_queue_max;
/**
* Last time the tunnel was used
struct MeshClient **clients;
/**
- * FWD ACK value of each active client: up to what message can we transmit
- * to a leaf client.
+ * Flow control info for each client.
*/
- uint32_t *clients_acks;
+ struct MeshTunnelClientInfo *clients_fc;
/**
- * Number of elements in clients/clients_acks
+ * Number of elements in clients/clients_fc
*/
unsigned int nclients;
/**
* Maximum PID allowed (FWD ACK received).
*/
- uint32_t max_pid;
+ uint32_t fwd_ack;
/**
* Last ACK sent to that child (BCK ACK).
*/
- uint32_t last_ack;
+ uint32_t bck_ack;
};
+/**
+ * Info about a leaf client of a tunnel, needed to perform flow control.
+ */
+struct MeshTunnelClientInfo
+{
+ /**
+ * Last sent PID.
+ */
+ uint32_t pid;
+
+ /**
+ * Maximum PID allowed (FWD ACK received).
+ */
+ uint32_t fwd_ack;
+
+ /**
+ * Last ACK sent to that child (BCK ACK).
+ */
+ uint32_t bck_ack;
+};
+
+
+
/**
* Info collected during iteration of child nodes in order to get the ACK value
* for a tunnel.
/**
* Notify a the client of a tunnel about how many more
* payload packages will we accept on a given tunnel,
- * distinguiching between root and leaf clients.
+ * distinguishing between root and leaf clients.
*
* @param c Client whom to send the ACK.
* @param t Tunnel on which to send the ACK.
if (t->clients[i] == c)
{
t->clients[i] = t->clients[t->nclients - 1];
- t->clients_acks[i] = t->clients_acks[t->nclients - 1];
+ t->clients_fc[i] = t->clients_fc[t->nclients - 1];
GNUNET_array_grow (t->clients, t->nclients, t->nclients - 1);
t->nclients++;
- GNUNET_array_grow (t->clients_acks, t->nclients, t->nclients - 1);
+ GNUNET_array_grow (t->clients_fc, t->nclients, t->nclients - 1);
break;
}
}
cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo));
cinfo->id = GNUNET_PEER_intern (peer);
cinfo->skip = t->fwd_pid;
- cinfo->max_pid = t->fwd_pid + t->queue_max - t->queue_n; // FIXME review
+ cinfo->fwd_ack = t->fwd_pid + t->fwd_queue_max - t->fwd_queue_n; // FIXME review
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (t->children_fc,
GNUNET_PEER_resolve (id, &peer_id);
cinfo = tunnel_get_neighbor_fc (t, &peer_id);
- ack = cinfo->max_pid;
+ ack = cinfo->fwd_ack;
if (0 == ctx->max_child_ack)
ctx->max_child_ack = ack;
static void
tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c)
{
- uint32_t ack;
+ struct MeshTunnelClientInfo clinfo;
GNUNET_array_append (t->clients, t->nclients, c);
t->nclients--;
- ack = t->fwd_pid + 1;
- GNUNET_array_append (t->clients_acks, t->nclients, ack);
+ clinfo.fwd_ack = t->fwd_pid + 1;
+ clinfo.bck_ack = t->bck_ack + 1; // FIXME fc review
+ clinfo.pid = t->fwd_pid;
+ GNUNET_array_append (t->clients_fc, t->nclients, clinfo);
}
{
if (t->clients[i] != c)
continue;
- t->clients_acks[i] = ack;
+ t->clients_fc[i].fwd_ack = ack;
return;
}
GNUNET_break (0);
{
if (t->clients[i] != c)
continue;
- return t->clients_acks[i];
+ return t->clients_fc[i].fwd_ack;
}
GNUNET_break (0);
- return t->clients_acks[0];
+ return UINT32_MAX;
}
unsigned int i;
int64_t ack;
+ if (0 == t->nclients)
+ return -1;
+
for (ack = -1, i = 0; i < t->nclients; i++)
{
if (-1 == ack ||
- (GNUNET_YES == t->speed_min && t->clients_acks[i] < ack) ||
- (GNUNET_NO == t->speed_min && t->clients_acks[i] > ack))
+ (GNUNET_YES == t->speed_min &&
+ GNUNET_YES == is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) ||
+ (GNUNET_NO == t->speed_min &&
+ GNUNET_YES == is_pid_bigger (t->clients_fc[i].fwd_ack, ack)))
{
- ack = t->clients_acks[i];
+ ack = t->clients_fc[i].fwd_ack;
}
}
- if (GNUNET_YES == t->nobuffer && ack > t->fwd_pid)
- ack = t->fwd_pid + 1;
+ if (GNUNET_YES == t->nobuffer && is_pid_bigger(ack, t->fwd_pid))
+ ack = (uint32_t) t->fwd_pid + 1; // Might overflow, it's ok.
return (uint32_t) ack;
}
/**
- * Get the current ack value for a tunnel, taking in account the tunnel
+ * Get the current fwd ack value for a tunnel, taking in account the tunnel
* mode and the status of all children nodes.
*
* @param t Tunnel.
uint32_t ack;
count = t->fwd_pid - t->skip;
- buffer_free = t->queue_max - t->queue_n;
- ack = count + buffer_free; // Might overflow 32bits, it's ok!
+ buffer_free = t->fwd_queue_max - t->fwd_queue_n;
+ ack = count + buffer_free; // Might overflow 32 bits, it's ok!
child_ack = tunnel_get_children_ack (t);
client_ack = tunnel_get_clients_ack (t);
if (-1 == child_ack)
{
// Node has no children, child_ack AND core buffer are irrelevant.
- GNUNET_break (-1 != client_ack); // No children and no clients? Not good!
+ GNUNET_break (-1 != client_ack); // No children AND no clients? Not good!
return (uint32_t) client_ack;
}
if (GNUNET_YES == t->speed_min)
{
- ack = min_pid (child_ack, ack);
- ack = min_pid (client_ack, ack);
+ ack = min_pid ((uint32_t) child_ack, ack);
+ ack = min_pid ((uint32_t) client_ack, ack);
}
else
{
- ack = max_pid (child_ack, ack);
- ack = max_pid (client_ack, ack);
+ ack = max_pid ((uint32_t) child_ack, ack);
+ ack = max_pid ((uint32_t) client_ack, ack);
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u\n",
- count, buffer_free, child_ack, client_ack);
+ if (GNUNET_YES == t->nobuffer && is_pid_bigger(ack, t->fwd_pid))
+ ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok!
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u, ACK: %u\n",
+ count, buffer_free, child_ack, client_ack, ack);
return ack;
}
/**
- * Get the current ack value for a tunnel, taking in account the tunnel
+ * Get the current bck ack value for a tunnel, taking in account the tunnel
* mode and the status of all children nodes.
*
* @param t Tunnel.
{
uint32_t ack;
- ack = 0;
+ if (GNUNET_YES == t->nobuffer)
+ {
+ if (t->bck_ack > t->bck_pid)
+ {
+ return t->bck_pid + 1;
+ }
+ else
+ {
+ return t->bck_pid;
+ }
+ }
+ ack = t->bck_pid + t->bck_queue_max - t->bck_queue_n;
+ // FIXME fc
return ack;
}
return;
}
/* Is it after unicast / multicast retransmission? */
- if (GNUNET_MESSAGE_TYPE_MESH_ACK != type)
+ switch (type)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ACK via DATA retransmission\n");
- if (GNUNET_YES == t->nobuffer)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
- return;
- }
- if (t->queue_max > t->queue_n * 2)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
- return;
- }
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+ case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "ACK due to FWD DATA retransmission\n");
+ if (GNUNET_YES == t->nobuffer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
+ return;
+ }
+ if (t->fwd_queue_max > t->fwd_queue_n * 2)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
+ return;
+ }
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
+ break;
+ default:
+ GNUNET_break (0);
}
/* Ok, ACK might be necessary, what PID to ACK? */
/* If speed_min and not all children have ack'd, dont send yet */
if (ack == t->last_fwd_ack)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, not ready\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not ready\n");
return;
}
struct GNUNET_MESH_ACK msg;
struct GNUNET_PeerIdentity id;
uint32_t ack;
+ unsigned int i;
+ unsigned int min_d;
+ unsigned int min_i;
if (NULL != t->owner)
{
send_client_tunnel_ack (t->owner, t);
return;
}
- /* Is it after unicast / multicast retransmission? */
- if (GNUNET_MESSAGE_TYPE_MESH_ACK != type)
+ /* Is it after data to_origin retransmission? */
+ switch (type)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ACK via DATA retransmission\n");
- if (GNUNET_YES == t->nobuffer)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
- return;
- }
- if (t->queue_max > t->queue_n * 2)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
- return;
- }
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "ACK due to BCK DATA retransmission\n");
+ if (GNUNET_YES == t->nobuffer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
+ return;
+ }
+ if (t->bck_queue_max > t->bck_queue_n * 2)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
+ return;
+ }
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
+ break;
+ default:
+ GNUNET_break (0);
}
/* Ok, ACK might be necessary, what PID to ACK? */
ack = tunnel_get_bck_ack (t);
/* If speed_min and not all children have ack'd, dont send yet */
- if (ack == t->last_fwd_ack)
+ if (ack == t->last_bck_ack)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, not ready\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending BCK ACK, not ready\n");
return;
}
- t->last_fwd_ack = ack;
- msg.pid = htonl (ack);
+ /* Unlock local clients. */
+ if (0 < t->nclients)
+ {
+ struct GNUNET_MESH_LocalAck msg;
- GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id);
+ /* Find client who to allow to send to origin (with lowest buffer space) */
+ /* FIXME fc Round robin? Priority? FIFO? */
+ for (i = 0; i < t->nclients; i++)
+ {
+ unsigned int d;
+
+ d = t->clients_fc[i].bck_ack - t->clients_fc[i].pid;
+ if (0 == i || d < min_d)
+ {
+ min_d = d;
+ min_i = i;
+ }
+ }
+ msg.header.size = htons (sizeof (msg));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
+ msg.tunnel_id = htonl (t->local_tid_dest);
+ msg.max_pid = t->bck_pid + 1; // FIXME fc
+ GNUNET_SERVER_notification_context_unicast(nc,
+ t->clients[min_i]->handle,
+ &msg.header,
+ GNUNET_NO);
+ }
+ t->last_bck_ack = ack;
+ msg.pid = htonl (ack);
msg.header.size = htons (sizeof (msg));
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
msg.tid = htonl (t->id.tid);
t = GNUNET_malloc (sizeof (struct MeshTunnel));
t->id.oid = owner;
t->id.tid = tid;
- t->queue_max = (max_msgs_queue / max_tunnels) + 1;
+ t->fwd_queue_max = (max_msgs_queue / max_tunnels) + 1;
+ t->bck_queue_max = t->fwd_queue_max;
t->tree = tree_new (owner);
t->owner = client;
t->bck_ack = 1;
* Resets the tunnel timeout. Starts it if no timeout was running.
*
* @param t Tunnel whose timeout to reset.
+ *
+ * TODO use heap to improve efficiency of scheduler.
*/
static void
tunnel_reset_timeout (struct MeshTunnel *t)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* size ok\n");
t = queue->tunnel;
- t->queue_n--;
+ t->fwd_queue_n--;
/* Fill buf */
switch (queue->type)
{
- case 0: // RAW data (preconstructed message, retransmission, etc)
+ case 0: // RAW data (preconstructed message, retransmission, etc.)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* raw\n");
data_size = send_core_data_raw (queue->cls, size, buf);
msg = (struct GNUNET_MessageHeader *) buf;
- switch (ntohs (msg->type)) // Type of payload
+ switch (ntohs (msg->type)) // Type of preconstructed message
{
case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
/* Free queue, but cls was freed by send_core_* */
queue_destroy (queue, GNUNET_NO);
- if (GNUNET_YES == t->destroy && 0 == t->queue_n)
+ if (GNUNET_YES == t->destroy && 0 == t->fwd_queue_n)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* destroying tunnel!\n");
tunnel_destroy (t);
queue_add (void *cls, uint16_t type, size_t size,
struct MeshPeerInfo *dst, struct MeshTunnel *t)
{
- struct MeshPeerQueue *queue;
-
- if (t->queue_n >= t->queue_max)
- {
- if (NULL == t->owner)
- GNUNET_break_op(0); // TODO: kill connection?
- else
- GNUNET_break(0);
- return; // Drop message
- }
- t->queue_n++;
- queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
- queue->cls = cls;
- queue->type = type;
- queue->size = size;
- queue->peer = dst;
- queue->tunnel = t;
- GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
- if (NULL == dst->core_transmit)
- {
- struct GNUNET_PeerIdentity id;
+ struct MeshPeerQueue *queue;
+ unsigned int *max;
+ unsigned int *n;
- GNUNET_PEER_resolve (dst->id, &id);
- dst->core_transmit =
- GNUNET_CORE_notify_transmit_ready(core_handle,
- 0,
- 0,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &id,
- size,
- &queue_send,
- dst);
- }
+ if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
+ GNUNET_MESSAGE_TYPE_MESH_MULTICAST == type)
+ {
+ n = &t->fwd_queue_n;
+ max = &t->fwd_queue_max;
+ }
+ else
+ {
+ n = &t->bck_queue_n;
+ max = &t->bck_queue_max;
+ }
+ if (*n >= *max)
+ {
+ if (NULL == t->owner)
+ GNUNET_break_op(0); // TODO: kill connection?
+ else
+ GNUNET_break(0);
+ return; // Drop message
+ }
+ (*n)++;
+ queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
+ queue->cls = cls;
+ queue->type = type;
+ queue->size = size;
+ queue->peer = dst;
+ queue->tunnel = t;
+ GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ if (NULL == dst->core_transmit)
+ {
+ struct GNUNET_PeerIdentity id;
+
+ GNUNET_PEER_resolve (dst->id, &id);
+ dst->core_transmit =
+ GNUNET_CORE_notify_transmit_ready(core_handle,
+ 0,
+ 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &id,
+ size,
+ &queue_send,
+ dst);
+ }
}
GNUNET_YES : GNUNET_NO;
if (GNUNET_YES == t->nobuffer)
- t->queue_max = 1;
+ {
+ t->bck_queue_max = 1;
+ t->fwd_queue_max = 1;
+ }
+ // FIXME only assign a local tid if a local client is interested (on demand)
while (NULL != tunnel_get_incoming (next_local_tid))
next_local_tid = (next_local_tid + 1) | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
t->local_tid_dest = next_local_tid++;
next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
+ // FIXME end
tunnel_reset_timeout (t);
GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash);
/* create path: self not found in path through self */
GNUNET_break_op (0);
path_destroy (path);
- /* FIXME error. destroy tunnel? leave for timeout? */
- return 0;
+ tunnel_destroy (t);
+ return GNUNET_OK;
}
path_add_to_peers (path, GNUNET_NO);
tunnel_add_path (t, path, own_pos);
pid = ntohl (msg->pid);
if (t->fwd_pid == pid)
{
- GNUNET_STATISTICS_update (stats, "# PID drops", 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
" Already seen pid %u, DROPPING!\n", pid);
return GNUNET_OK;
}
t->skip += (pid - t->fwd_pid) - 1;
t->fwd_pid = pid;
- tunnel_reset_timeout (t);
if (is_pid_bigger (pid, t->last_fwd_ack))
{
GNUNET_STATISTICS_update (stats, "# not allowed unicast", 1, GNUNET_NO);
GNUNET_break_op (0);
return GNUNET_OK;
- // FIXME peer sent unauthorized data. Break connection? Accept anyway?
}
+ tunnel_reset_timeout (t);
dest_id = GNUNET_PEER_search (&msg->destination);
if (dest_id == myid)
{
" it's for us! sending to clients...\n");
GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO);
send_subscribed_clients (message, (struct GNUNET_MessageHeader *) &msg[1]);
- // ACK sent by client, service retransmits.
+ // ACK is generated by client (api part), service only retransmits.
return GNUNET_OK;
}
ttl = ntohl (msg->ttl);
GNUNET_CONTAINER_multihashmap_iterate (t->children_fc,
&tunnel_add_skip,
&neighbor);
- if (is_pid_bigger(pid, cinfo->max_pid))
+ if (is_pid_bigger(pid, cinfo->fwd_ack))
{
GNUNET_break_op (0);
return GNUNET_OK;
if (t->fwd_pid == pid)
{
/* already seen this packet, drop */
- GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" Already seen pid %u, DROPPING!\n", pid);
return GNUNET_OK;
}
}
ack = ntohl (msg->pid);
cinfo = tunnel_get_neighbor_fc (t, peer);
- cinfo->max_pid = ack;
+ cinfo->fwd_ack = ack;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
return GNUNET_OK;
}
return;
}
- /* Does client own tunnel? Is this and ACK for BCK traffic? */
+ /* Does client own tunnel? I.E: Is this and ACK for BCK traffic? */
if (NULL != t->owner && t->owner->handle == client)
{
- GNUNET_break (0);
- // FIXME TODO
+ /* The client owns the tunnel, ACK is for data to_origin, send BCK ACK. */
+ t->bck_ack = ntohl(msg->max_pid);
+ tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
}
else
{
/* The client doesn't own the tunnel, this ACK is for FWD traffic. */
tunnel_set_client_fwd_ack (t, c, ntohl (msg->max_pid));
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
+ tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);