* - relay corking down to core
* - set ttl relative to tree depth
* - Add data ACK count in path ACK
+ * - Make common GNUNET_MESH_Data header for unicast, to_orig, multicast
* TODO END
*/
/** Tunnel it belongs to. */
struct MeshTunnel *t;
- /** In case of a multicast, task to allow a client to send more data if
- * some neighbor is too slow. */
- GNUNET_SCHEDULER_TaskIdentifier *task;
-
/** How many remaining neighbors we need to send this to. */
- unsigned int *reference_counter;
+ unsigned int reference_counter;
/** Size of the data. */
size_t data_len;
* Last ACK sent to that child (BCK ACK).
*/
uint32_t bck_ack;
+
+ /**
+ * Circular buffer pointing to MeshPeerQueue elements.
+ * Size determined by the tunnel queue size.
+ */
+ struct MeshPeerQueue **send_buffer;
+
+ /**
+ * Index of the oldest element in the send_buffer.
+ */
+ unsigned int send_buffer_start;
+
+ /**
+ * How many elements are already in the buffer.
+ */
+ unsigned int send_buffer_n;
};
}
#endif
+unsigned int debug_fwd_ack;
+unsigned int debug_bck_ack;
+
#endif
/******************************************************************************/
queue_add (void *cls, uint16_t type, size_t size,
struct MeshPeerInfo *dst, struct MeshTunnel *t);
+
/**
* Free a transmission that was already queued with all resources
* associated to the request.
static void
queue_destroy (struct MeshPeerQueue *queue, int clear_cls);
+
+/**
+ * @brief Get the next transmittable message from the queue.
+ *
+ * This will be the head, except in the case of being a data packet
+ * not allowed by the destination peer.
+ *
+ * @param peer Destination peer.
+ *
+ * @return The next viable MeshPeerQueue element to send to that peer.
+ * NULL when there are no transmittable messages.
+ */
+struct MeshPeerQueue *
+queue_get_next (const struct MeshPeerInfo *peer);
+
+
+/**
+ * Core callback to write a queued packet to core buffer
+ *
+ * @param cls Closure (peer info).
+ * @param size Number of bytes available in buf.
+ * @param buf Where the to write the message.
+ *
+ * @return number of bytes written to buf
+ */
+static size_t
+queue_send (void *cls, size_t size, void *buf);
+
/******************************************************************************/
/************************ ITERATORS ****************************/
/******************************************************************************/
block.id = my_full_id;
c = GNUNET_CONTAINER_multihashmap_get (applications, key);
+ GNUNET_assert(NULL != c);
block.type = (long) GNUNET_CONTAINER_multihashmap_get (c->apps, key);
if (0 == block.type)
{
static void
data_descriptor_decrement_rc (struct MeshData *mesh_data)
{
- /* Make sure it's a multicast packet */
- GNUNET_assert (NULL != mesh_data->reference_counter);
-
- if (0 == --(*(mesh_data->reference_counter)))
+ if (0 == --(mesh_data->reference_counter))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Last copy!\n");
- if (NULL != mesh_data->task)
- {
- if (GNUNET_SCHEDULER_NO_TASK != *(mesh_data->task))
- {
- GNUNET_SCHEDULER_cancel (*(mesh_data->task));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " notifying client...\n");
- GNUNET_SERVER_receive_done (mesh_data->t->owner->handle, GNUNET_OK);
- }
- GNUNET_free (mesh_data->task);
- }
- GNUNET_free (mesh_data->reference_counter);
GNUNET_free (mesh_data->data);
GNUNET_free (mesh_data);
}
}
-/**
- * Allow a client to send more data after transmitting a multicast message
- * which some neighbor has not yet accepted altough a reasonable time has
- * passed.
- *
- * @param cls Closure (DataDescriptor containing the task identifier)
- * @param tc Task Context
- *
- * FIXME reference counter cshould be just int
- */
-static void
-client_allow_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct MeshData *mdata = cls;
-
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
- GNUNET_assert (NULL != mdata->reference_counter);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "CLIENT ALLOW SEND DESPITE %u COPIES PENDING\n",
- *(mdata->reference_counter));
- *(mdata->task) = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_SERVER_receive_done (mdata->t->owner->handle, GNUNET_OK);
-}
-
-
/**
* Check whether client wants traffic from a tunnel.
*
m->ttl = htonl (ntohl (m->ttl) - 1);
}
info->mesh_data->data_len = size;
- info->mesh_data->reference_counter = GNUNET_malloc (sizeof (unsigned int));
- *info->mesh_data->reference_counter = 1;
+ info->mesh_data->reference_counter = 1;
neighbor = peer_info_get (peer);
for (p = neighbor->path_head; NULL != p; p = p->next)
{
const struct GNUNET_HashCode * key,
void *value)
{
- GNUNET_free (value);
+ struct MeshTunnelChildInfo *cinfo = value;
+ struct MeshTunnel *t = cls;
+ unsigned int c;
+ unsigned int i;
+
+ for (c = 0; c < cinfo->send_buffer_n; c++)
+ {
+ i = (cinfo->send_buffer_start + c) % t->fwd_queue_max;
+ if (NULL != cinfo->send_buffer[i])
+ queue_destroy(cinfo->send_buffer[i], GNUNET_YES);
+ else
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n);
+ }
+ GNUNET_free_non_null (cinfo->send_buffer);
+ GNUNET_free (cinfo);
return GNUNET_YES;
}
GNUNET_array_append (t->clients, t->nclients, c);
clinfo.fwd_ack = t->fwd_pid + 1;
- clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+ clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1;
clinfo.fwd_pid = t->fwd_pid;
clinfo.bck_pid = (uint32_t) -1; // Expected next: 0
t->nclients--;
info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor));
info->mesh_data = mdata;
- (*(mdata->reference_counter)) ++;
+ (mdata->reference_counter) ++;
info->destination = neighbor_id;
GNUNET_PEER_resolve (neighbor_id, &neighbor);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending to %s...\n",
/**
- * Send a message in a tunnel in multicast, sending a copy to each child node
+ * Queue a message in a tunnel in multicast, sending a copy to each child node
* down the local one in the tunnel tree.
*
* @param t Tunnel in which to send the data.
* @param msg Message to be sent.
- * @param internal Has the service generated this message?
+ * @param internal DEPRECATED Has the service generated this message?
+ *
+ * FIXME remove internal if no use comes up
*/
static void
tunnel_send_multicast (struct MeshTunnel *t,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" sending a multicast packet...\n");
+
mdata = GNUNET_malloc (sizeof (struct MeshData));
mdata->data_len = ntohs (msg->size);
- mdata->reference_counter = GNUNET_malloc (sizeof (unsigned int));
mdata->t = t;
mdata->data = GNUNET_malloc (mdata->data_len);
memcpy (mdata->data, msg, mdata->data_len);
struct GNUNET_MESH_Multicast *mcast;
mcast = (struct GNUNET_MESH_Multicast *) mdata->data;
+ if (t->fwd_queue_n >= t->fwd_queue_max)
+ {
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " queue full!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ " message from %s!\n",
+ GNUNET_i2s(&mcast->oid));
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ " message at %s!\n",
+ GNUNET_i2s(&my_full_id));
+ GNUNET_free (mdata->data);
+ GNUNET_free (mdata);
+ return;
+ }
+ t->fwd_queue_n++;
mcast->ttl = htonl (ntohl (mcast->ttl) - 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " data packet, ttl: %u\n",
ntohl (mcast->ttl));
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not a data packet, no ttl\n");
}
- if (NULL != t->owner && GNUNET_YES != t->owner->shutting_down
- && GNUNET_NO == internal)
- {
- mdata->task = GNUNET_malloc (sizeof (GNUNET_SCHEDULER_TaskIdentifier));
- (*(mdata->task)) =
- GNUNET_SCHEDULER_add_delayed (unacknowledged_wait_time, &client_allow_send,
- mdata);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "timeout task %u\n",
- *(mdata->task));
- }
tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata);
- if (*(mdata->reference_counter) == 0)
+ if (mdata->reference_counter == 0)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " no one to send data to\n");
GNUNET_free (mdata->data);
- GNUNET_free (mdata->reference_counter);
- if (NULL != mdata->task)
- {
- GNUNET_SCHEDULER_cancel(*(mdata->task));
- GNUNET_free (mdata->task);
- GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK);
- }
- // FIXME change order?
GNUNET_free (mdata);
+ t->fwd_queue_n--;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" sending a multicast packet done\n");
cinfo->id = GNUNET_PEER_intern (peer);
cinfo->skip = t->fwd_pid;
- delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+ delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1;
cinfo->fwd_ack = t->fwd_pid + delta;
cinfo->bck_ack = delta;
+ cinfo->send_buffer =
+ GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max);
+
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (t->children_fc,
&peer->hashPubKey,
*
* @param t Tunnel.
*
- * @return Maximum PID allowed (uint32 MAX), -1 if node has no children.
+ * @return Maximum PID allowed (uint32 MAX), -1LL if node has no children.
*/
static int64_t
tunnel_get_children_fwd_ack (struct MeshTunnel *t)
* @param t Tunnel on which to look.
*
* @return Corresponding ACK value (max uint32_t).
- * If no clients are suscribed, -1.
+ * If no clients are suscribed, -1LL.
*/
static int64_t
tunnel_get_clients_fwd_ack (struct MeshTunnel *t)
return -1LL;
}
- for (ack = -1, i = 0; i < t->nclients; i++)
+ for (ack = -1LL, i = 0; i < t->nclients; i++)
{
- if (-1 == ack ||
+ if (-1LL == ack ||
(GNUNET_YES == t->speed_min &&
GNUNET_YES == GMC_is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) ||
(GNUNET_NO == t->speed_min &&
static uint32_t
tunnel_get_fwd_ack (struct MeshTunnel *t)
{
+ uint32_t ack;
uint32_t count;
uint32_t buffer_free;
int64_t child_ack;
int64_t client_ack;
- uint32_t ack;
count = t->fwd_pid - t->skip;
buffer_free = t->fwd_queue_max - t->fwd_queue_n;
ack = count + buffer_free; // Might overflow 32 bits, it's ok!
child_ack = tunnel_get_children_fwd_ack (t);
client_ack = tunnel_get_clients_fwd_ack (t);
- if (-1 == child_ack)
+ if (-1LL == child_ack)
{
// Node has no children, child_ack AND core buffer are irrelevant.
- GNUNET_break (-1 != client_ack); // No children AND no clients? Not good! // FIXME fc
+ GNUNET_break (-1LL != client_ack); // No children AND no clients? Not good!
return (uint32_t) client_ack;
}
-
+ if (-1LL == client_ack)
+ {
+ client_ack = ack;
+ }
if (GNUNET_YES == t->speed_min)
{
ack = GMC_min_pid ((uint32_t) child_ack, ack);
}
if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid))
ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok!
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u, ACK: %u\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "c %u, bf %u, ch %lld, cl %lld, ACK: %u\n",
count, buffer_free, child_ack, client_ack, ack);
return ack;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
return;
}
- if (t->fwd_queue_max > t->fwd_queue_n * 2)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
- return;
- }
break;
case GNUNET_MESSAGE_TYPE_MESH_ACK:
case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
GNUNET_break (0);
}
+ /* Check if we need no retransmit the ACK */
+ if (t->fwd_queue_max > t->fwd_queue_n * 2 &&
+ GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " t->qmax: %u, t->qn: %u\n",
+ t->fwd_queue_max, t->fwd_queue_n);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " t->pid: %u, t->ack: %u\n",
+ t->fwd_pid, t->last_fwd_ack);
+ return;
+ }
+
/* Ok, ACK might be necessary, what PID to ACK? */
ack = tunnel_get_fwd_ack (t);
t->last_fwd_ack = ack;
GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id);
send_ack (t, &id, ack);
+ debug_fwd_ack++;
}
}
+/**
+ * @brief Re-initiate traffic to this peer if necessary.
+ *
+ * Check if there is traffic queued towards this peer
+ * and the core transmit handle is NULL (traffic was stalled).
+ * If so, call core tmt rdy.
+ *
+ * @param cls Closure (unused)
+ * @param peer_id Short ID of peer to which initiate traffic.
+ */
+static void
+peer_unlock_queue(void *cls, GNUNET_PEER_Id peer_id)
+{
+ struct MeshPeerInfo *peer;
+ struct GNUNET_PeerIdentity id;
+ struct MeshPeerQueue *q;
+ size_t size;
+
+ peer = peer_info_get_short(peer_id);
+ if (NULL != peer->core_transmit)
+ return;
+
+ q = queue_get_next(peer);
+ if (NULL == q)
+ {
+ /* Might br multicast traffic already sent to this particular peer but
+ * not to other children in this tunnel.
+ * This way t->queue_n would be > 0 but the queue of this particular peer
+ * would be empty.
+ */
+ return;
+ }
+ size = q->size;
+ GNUNET_PEER_resolve (peer->id, &id);
+ peer->core_transmit =
+ GNUNET_CORE_notify_transmit_ready(core_handle,
+ 0,
+ 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &id,
+ size,
+ &queue_send,
+ peer);
+ return;
+}
+
+
+/**
+ * @brief Allow transmission of FWD traffic on this tunnel
+ *
+ * Check if there is traffic queued towards any children
+ * and the core transmit handle is NULL, and if so, call core tmt rdy.
+ *
+ * @param t Tunnel on which to unlock FWD traffic.
+ */
+static void
+tunnel_unlock_fwd_queues (struct MeshTunnel *t)
+{
+ if (0 == t->fwd_queue_n)
+ return;
+
+ tree_iterate_children (t->tree, &peer_unlock_queue, NULL);
+}
+
+
+/**
+ * @brief Allow transmission of BCK traffic on this tunnel
+ *
+ * Check if there is traffic queued towards the root of the tree
+ * and the core transmit handle is NULL, and if so, call core tmt rdy.
+ *
+ * @param t Tunnel on which to unlock BCK traffic.
+ */
+static void
+tunnel_unlock_bck_queue (struct MeshTunnel *t)
+{
+ if (0 == t->bck_queue_n)
+ return;
+
+ peer_unlock_queue(NULL, tree_get_predecessor(t->tree));
+}
+
+
/**
* Send a message to all peers in this tunnel that the tunnel is no longer
* valid.
next = pq->next;
if (pq->tunnel == t)
{
+ if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST == pq->type ||
+ GNUNET_MESSAGE_TYPE_MESH_UNICAST == pq->type ||
+ GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == pq->type)
+ {
+ // Should have been removed on destroy children
+ GNUNET_break (0);
+ }
queue_destroy (pq, GNUNET_YES);
}
}
if (NULL == t)
return GNUNET_OK;
- tree_iterate_children (t->tree, &tunnel_cancel_queues, t);
-
r = GNUNET_OK;
c = t->owner;
#if MESH_DEBUG
t);
GNUNET_CONTAINER_multihashmap_destroy (t->children_fc);
+ tree_iterate_children (t->tree, &tunnel_cancel_queues, t);
+
tree_destroy (t->tree);
if (NULL != t->regex_ctx)
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE);
msg->tid = ntohl (t->id.tid);
+ opt = 0;
if (GNUNET_YES == t->speed_min)
- opt = MESH_TUNNEL_OPT_SPEED_MIN;
+ opt |= MESH_TUNNEL_OPT_SPEED_MIN;
if (GNUNET_YES == t->nobuffer)
opt |= MESH_TUNNEL_OPT_NOBUFFER;
msg->opt = htonl(opt);
break;
default:
GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ " type %u unknown!\n",
+ queue->type);
}
GNUNET_free_non_null (queue->cls);
}
/**
* @brief Get the next transmittable message from the queue.
*
- * This will be the head, expect in the case of being a data packet
+ * This will be the head, except in the case of being a data packet
* not allowed by the destination peer.
*
* @param peer Destination peer.
* NULL when there are no transmittable messages.
*/
struct MeshPeerQueue *
-queue_get_next (static struct MeshPeerInfo *peer)
+queue_get_next (const struct MeshPeerInfo *peer)
{
struct MeshPeerQueue *q;
struct MeshTunnel *t;
struct MeshTransmissionDescriptor *info;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_MESH_Unicast *ucast;
+ struct GNUNET_MESH_ToOrigin *to_orig;
+ struct GNUNET_MESH_Multicast *mcast;
+ struct GNUNET_PeerIdentity id;
+ uint32_t pid;
+ uint32_t ack;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* selecting message\n");
for (q = peer->queue_head; NULL != q; q = q->next)
{
t = q->tunnel;
+ info = q->cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* %s\n",
+ GNUNET_MESH_DEBUG_M2S(q->type));
switch (q->type)
{
case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
- info = q->cls;
+ ucast = (struct GNUNET_MESH_Unicast *) info->mesh_data->data;
+ pid = ntohl (ucast->pid);
+ GNUNET_PEER_resolve (info->peer->id, &id);
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ ack = cinfo->fwd_ack;
break;
case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ to_orig = (struct GNUNET_MESH_ToOrigin *) info->mesh_data->data;
+ pid = ntohl (to_orig->pid);
+ ack = t->bck_ack;
break;
case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ mcast = (struct GNUNET_MESH_Multicast *) info->mesh_data->data;
+ if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST != ntohs(mcast->header.type))
+ {
+ // Not a multicast payload: multicast control traffic (destroy, etc)
+ return q;
+ }
+ pid = ntohl (mcast->pid);
+ GNUNET_PEER_resolve (info->peer->id, &id);
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ ack = cinfo->fwd_ack;
break;
default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* OK!\n");
return q;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* ACK: %u, PID: %u\n",
+ ack, pid);
+ if (GNUNET_NO == GMC_is_pid_bigger(pid, ack))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* OK!\n");
+ return q;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* NEXT!\n");
}
}
- // FIXME fc WIP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* nothing found\n");
return NULL;
}
struct GNUNET_MessageHeader *msg;
struct MeshPeerQueue *queue;
struct MeshTunnel *t;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_PeerIdentity dst_id;
size_t data_size;
peer->core_transmit = NULL;
- queue = peer->queue_head;
-
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n");
+ queue = queue_get_next(peer);
- /* If queue is empty, send should have been cancelled */
+ /* Queue has no internal mesh traffic not sendable payload */
if (NULL == queue)
{
- GNUNET_break(0);
- return 0;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not ready, return\n");
+ if (NULL == peer->queue_head)
+ GNUNET_break(0); // Should've been canceled
+ return 0;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n");
+ GNUNET_PEER_resolve (peer->id, &dst_id);
/* Check if buffer size is enough for the message */
if (queue->size > size)
{
- struct GNUNET_PeerIdentity id;
- GNUNET_PEER_resolve (peer->id, &id);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* not enough room, reissue\n");
peer->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
0,
0,
GNUNET_TIME_UNIT_FOREVER_REL,
- &id,
+ &dst_id,
queue->size,
&queue_send,
peer);
{
t->fwd_queue_n--;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "********* unicast: %u\n",
- t->fwd_queue_n);
+ "********* unicast: t->q (%u/%u)\n",
+ t->fwd_queue_n, t->fwd_queue_max);
}
else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == queue->type)
{
case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN:
case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "********* raw: %u\n",
- queue->type);
+ "********* raw: %s\n",
+ GNUNET_MESH_DEBUG_M2S (queue->type));
/* Fall through */
case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
break;
case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n");
+ {
+ struct MeshTransmissionDescriptor *info = queue->cls;
+ if (info->mesh_data->reference_counter == 1)
+ t->fwd_queue_n--;
+ // FIXME fc (t->fwd_queue_n--)
+ }
data_size = send_core_data_multicast(queue->cls, size, buf);
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
break;
queue->type);
data_size = 0;
}
+ switch (queue->type)
+ {
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ cinfo = tunnel_get_neighbor_fc(t, &dst_id);
+ if (cinfo->send_buffer[cinfo->send_buffer_start] != queue)
+ {
+ GNUNET_break(0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "at pos %u (%p) != %p\n",
+ cinfo->send_buffer_start,
+ cinfo->send_buffer[cinfo->send_buffer_start],
+ queue);
+ }
+ GNUNET_break(cinfo->send_buffer_n > 0);
+ cinfo->send_buffer[cinfo->send_buffer_start] = NULL;
+ cinfo->send_buffer_n--;
+ cinfo->send_buffer_start++;
+ cinfo->send_buffer_start %= t->fwd_queue_max;
+ break;
+ default:
+ break;
+ }
/* Free queue, but cls was freed by send_core_* */
queue_destroy (queue, GNUNET_NO);
}
/* If more data in queue, send next */
- if (NULL != peer->queue_head)
+ queue = queue_get_next(peer);
+ if (NULL != queue)
{
struct GNUNET_PeerIdentity id;
0,
GNUNET_TIME_UNIT_FOREVER_REL,
&id,
- peer->queue_head->size,
+ queue->size,
&queue_send,
peer);
}
+ else
+ {
+ if (NULL != peer->queue_head)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "********* %s stalled\n",
+ GNUNET_i2s(&my_full_id));
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* return %d\n", data_size);
return data_size;
}
struct MeshPeerInfo *dst, struct MeshTunnel *t)
{
struct MeshPeerQueue *queue;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_PeerIdentity id;
unsigned int *max;
unsigned int *n;
+ unsigned int i;
n = NULL;
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
GNUNET_break_op(0); // TODO: kill connection?
else
GNUNET_break(0);
+ GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)",
+ 1, GNUNET_NO);
return; // Drop message
}
(*n)++;
queue->peer = dst;
queue->tunnel = t;
GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ GNUNET_PEER_resolve (dst->id, &id);
if (NULL == dst->core_transmit)
{
- struct GNUNET_PeerIdentity id;
-
- GNUNET_PEER_resolve (dst->id, &id);
dst->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
0,
&queue_send,
dst);
}
+ if (NULL == n) // Is this internal mesh traffic?
+ return;
+
+ // It's payload, keep track of buffer per peer.
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ i = (cinfo->send_buffer_start + cinfo->send_buffer_n) % t->fwd_queue_max;
+ if (NULL != cinfo->send_buffer[i])
+ {
+ GNUNET_break (cinfo->send_buffer_n == t->fwd_queue_max); // aka i == start
+ queue_destroy(cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES);
+ cinfo->send_buffer_start++;
+ cinfo->send_buffer_start %= t->fwd_queue_max;
+ cinfo->send_buffer_n--;
+ }
+ cinfo->send_buffer[i] = queue;
+ cinfo->send_buffer_n++;
+ if (cinfo->send_buffer_n > t->fwd_queue_max)
+ {
+ GNUNET_break (0);
+ cinfo->send_buffer_n = t->fwd_queue_max;
+ }
}
GNUNET_YES : GNUNET_NO;
t->nobuffer = (0 != (opt & MESH_TUNNEL_OPT_NOBUFFER)) ?
GNUNET_YES : GNUNET_NO;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " speed_min: %d, nobuffer:%d\n",
+ t->speed_min, t->nobuffer);
if (GNUNET_YES == t->nobuffer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
GNUNET_i2s (peer));
msg = (struct GNUNET_MESH_ACK *) message;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n",
- GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type)));
+
t = tunnel_get (&msg->oid, ntohl (msg->tid));
if (NULL == t)
ack = ntohl (msg->pid);
/* Is this a forward or backward ACK? */
- if (tree_get_predecessor(t->tree) == GNUNET_PEER_search(peer))
+ if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer))
{
struct MeshTunnelChildInfo *cinfo;
+ debug_bck_ack++;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n");
cinfo = tunnel_get_neighbor_fc (t, peer);
cinfo->fwd_ack = ack;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+ tunnel_unlock_fwd_queues (t);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n");
t->bck_ack = ack;
tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+ tunnel_unlock_bck_queue (t);
}
- // FIXME fc Unlock queues?
return GNUNET_OK;
}
msg = (struct GNUNET_MESH_Multicast *) cbuf;
msg->header.size = htons (size);
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
+ // FIXME: change type to != MULTICAST
msg->oid = my_full_id;
msg->tid = htonl (t->id.tid);
msg->ttl = htonl (default_ttl);
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
- if (t->owner == c)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n");
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
- if (t->owner == c)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n"); GNUNET_break (0);
+ GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
handle_mesh_data_multicast (client, &my_full_id, ©->header, NULL, 0);
}
- /* receive done gets called when last copy is sent to a neighbor */
+ GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK);
return;
}
t = tunnel_get_by_local_id (c, tid);
if (NULL == t)
{
- GNUNET_break (0); // FIXME fc
+ GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
- if (t->owner == c)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n");
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key),
&my_full_id.hashPubKey);
myid = GNUNET_PEER_intern (&my_full_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Mesh for peer [%s] starting\n",
+ GNUNET_i2s(&my_full_id));
// transport_handle = GNUNET_TRANSPORT_connect(c,
// &my_full_id,
INTERVAL_SHOW;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Mesh for peer [%s] FWD ACKs %u, BCK ACKs %u\n",
+ GNUNET_i2s(&my_full_id), debug_fwd_ack, debug_bck_ack);
+
+
return ret;
}