* - relay corking down to core
* - set ttl relative to tree depth
* - Add data ACK count in path ACK
+ * - Make common GNUNET_MESH_Data header for unicast, to_orig, multicast
* TODO END
*/
#define MESH_BLOOM_SIZE 128
-#define MESH_DEBUG_DHT GNUNET_YES
+#define MESH_DEBUG_DHT GNUNET_NO
#define MESH_DEBUG_CONNECTION GNUNET_NO
-
-#define INITIAL_WINDOW_SIZE 2
-#define ACK_THRESHOLD INITIAL_WINDOW_SIZE / 2
+#define MESH_DEBUG_TIMING __LINUX__ && GNUNET_YES
#if MESH_DEBUG_CONNECTION
#define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
#define DEBUG_DHT(...)
#endif
+#if MESH_DEBUG_TIMING
+#include <time.h>
+double __sum;
+uint64_t __count;
+struct timespec __mesh_start;
+struct timespec __mesh_end;
+#define INTERVAL_START clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &(__mesh_start))
+#define INTERVAL_END \
+do {\
+ clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &(__mesh_end));\
+ double __diff = __mesh_end.tv_nsec - __mesh_start.tv_nsec;\
+ if (__diff < 0) __diff += 1000000000;\
+ __sum += __diff;\
+ __count++;\
+} while (0)
+#define INTERVAL_SHOW \
+if (0 < __count)\
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "AVG process time: %f ns\n", __sum/__count)
+#else
+#define INTERVAL_START
+#define INTERVAL_END
+#define INTERVAL_SHOW
+#endif
+
/******************************************************************************/
/************************ DATA STRUCTURES ****************************/
/******************************************************************************/
/*********************** GLOBAL VARIABLES ****************************/
/******************************************************************************/
-
/**
* Configuration parameters
*/
* @param c Client to check
*
* @return GNUNET_YES or GNUNET_NO, depending on subscription status
+ *
+ * FIXME: use of crypto_hash slows it down
+ * The hash function alone takes 8-10us out of the ~55us for the whole
+ * process of retransmitting the message from one local client to another.
+ * Find faster implementation!
*/
static int
client_is_subscribed (uint16_t message_type, struct MeshClient *c)
if (NULL == c->types)
return GNUNET_NO;
+
GNUNET_CRYPTO_hash (&message_type, sizeof (uint16_t), &hc);
return GNUNET_CONTAINER_multihashmap_contains (c->types, &hc);
}
*
* @param msg Pointer to the message itself
* @param payload Pointer to the payload of the message.
+ * @param t The tunnel to whose clients this message goes.
+ *
* @return number of clients this message was sent to
*/
static unsigned int
send_subscribed_clients (const struct GNUNET_MessageHeader *msg,
- const struct GNUNET_MessageHeader *payload)
+ const struct GNUNET_MessageHeader *payload,
+ struct MeshTunnel *t)
{
- struct GNUNET_PeerIdentity *oid;
struct MeshClient *c;
- struct MeshTunnel *t;
MESH_TunnelNumber *tid;
unsigned int count;
uint16_t type;
struct GNUNET_MESH_Multicast *mc;
struct GNUNET_MESH_ToOrigin *to;
- case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
- uc = (struct GNUNET_MESH_Unicast *) cbuf;
- tid = &uc->tid;
- oid = &uc->oid;
- break;
- case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
- mc = (struct GNUNET_MESH_Multicast *) cbuf;
- tid = &mc->tid;
- oid = &mc->oid;
- break;
- case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
- to = (struct GNUNET_MESH_ToOrigin *) cbuf;
- tid = &to->tid;
- oid = &to->oid;
- break;
- default:
- GNUNET_break (0);
- return 0;
- }
- t = tunnel_get (oid, ntohl (*tid));
- if (NULL == t)
- {
- GNUNET_break (0);
- return 0;
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+ uc = (struct GNUNET_MESH_Unicast *) cbuf;
+ tid = &uc->tid;
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ mc = (struct GNUNET_MESH_Multicast *) cbuf;
+ tid = &mc->tid;
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ to = (struct GNUNET_MESH_ToOrigin *) cbuf;
+ tid = &to->tid;
+ break;
+ default:
+ GNUNET_break (0);
+ return 0;
}
for (count = 0, c = clients; c != NULL; c = c->next)
*) cbuf, GNUNET_NO);
}
}
+
return count;
}
GNUNET_array_append (t->clients, t->nclients, c);
clinfo.fwd_ack = t->fwd_pid + 1;
- clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+ clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1;
clinfo.fwd_pid = t->fwd_pid;
clinfo.bck_pid = (uint32_t) -1; // Expected next: 0
t->nclients--;
/**
- * Send a message in a tunnel in multicast, sending a copy to each child node
+ * Queue a message in a tunnel in multicast, sending a copy to each child node
* down the local one in the tunnel tree.
*
* @param t Tunnel in which to send the data.
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" sending a multicast packet...\n");
+
mdata = GNUNET_malloc (sizeof (struct MeshData));
mdata->data_len = ntohs (msg->size);
mdata->reference_counter = GNUNET_malloc (sizeof (unsigned int));
{
struct GNUNET_MESH_Multicast *mcast;
+ if (t->fwd_queue_n >= t->fwd_queue_max)
+ {
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " queue full!\n");
+ GNUNET_free (mdata->data);
+ GNUNET_free (mdata->reference_counter);
+ GNUNET_free (mdata);
+ return;
+ }
+ t->fwd_queue_n++;
mcast = (struct GNUNET_MESH_Multicast *) mdata->data;
mcast->ttl = htonl (ntohl (mcast->ttl) - 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " data packet, ttl: %u\n",
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not a data packet, no ttl\n");
}
- if (NULL != t->owner && GNUNET_YES != t->owner->shutting_down
- && GNUNET_NO == internal)
+ if (NULL != t->owner &&
+ GNUNET_YES != t->owner->shutting_down &&
+ GNUNET_NO == internal)
{
mdata->task = GNUNET_malloc (sizeof (GNUNET_SCHEDULER_TaskIdentifier));
(*(mdata->task)) =
tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata);
if (*(mdata->reference_counter) == 0)
{
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " no one to send data to\n");
GNUNET_free (mdata->data);
GNUNET_free (mdata->reference_counter);
if (NULL != mdata->task)
GNUNET_free (mdata->task);
GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK);
}
- // FIXME change order?
GNUNET_free (mdata);
+ t->fwd_queue_n--;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" sending a multicast packet done\n");
cinfo->id = GNUNET_PEER_intern (peer);
cinfo->skip = t->fwd_pid;
- delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+ delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1;
cinfo->fwd_ack = t->fwd_pid + delta;
cinfo->bck_ack = delta;
cinfo = tunnel_get_neighbor_fc (t, &peer_id);
ack = cinfo->fwd_ack;
+ ctx->nchildren++;
if (GNUNET_NO == ctx->init)
{
ctx->max_child_ack = ack;
tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx);
if (0 == ctx.nchildren)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " tunnel has no children, no FWD ACK\n");
return -1LL;
+ }
if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ctx.max_child_ack, t->fwd_pid))
ctx.max_child_ack = t->fwd_pid + 1; // Might overflow, it's ok.
int64_t ack;
if (0 == t->nclients)
- return -1;
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " tunnel has no clients, no FWD ACK\n");
+ return -1LL;
+ }
for (ack = -1, i = 0; i < t->nclients; i++)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n");
return;
}
- if (t->fwd_queue_max > t->fwd_queue_n * 2)
+ if (t->fwd_queue_max > t->fwd_queue_n * 2 &&
+ GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " t->qmax: %u, t->qn: %u\n",
+ t->fwd_queue_max, t->fwd_queue_n);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ " t->pid: %u, t->ack: %u\n",
+ t->fwd_pid, t->last_fwd_ack);
return;
}
break;
}
tunnel_send_clients_bck_ack (t);
- tree_iterate_children (t->tree, &tunnel_send_child_bck_ack, NULL);
+ tree_iterate_children (t->tree, &tunnel_send_child_bck_ack, t);
}
}
+/**
+ * @brief Get the next transmittable message from the queue.
+ *
+ * This will be the head, expect in the case of being a data packet
+ * not allowed by the destination peer.
+ *
+ * @param peer Destination peer.
+ *
+ * @return The next viable MeshPeerQueue element to send to that peer.
+ * NULL when there are no transmittable messages.
+ */
+struct MeshPeerQueue *
+queue_get_next (const struct MeshPeerInfo *peer)
+{
+ struct MeshPeerQueue *q;
+ struct MeshTunnel *t;
+ struct MeshTransmissionDescriptor *info;
+ struct MeshTunnelChildInfo *cinfo;
+ struct GNUNET_MESH_Unicast *ucast;
+ struct GNUNET_MESH_ToOrigin *to_orig;
+ struct GNUNET_MESH_Multicast *mcast;
+ struct GNUNET_PeerIdentity id;
+ uint32_t pid;
+ uint32_t ack;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* selecting message\n");
+ for (q = peer->queue_head; NULL != q; q = q->next)
+ {
+ t = q->tunnel;
+ info = q->cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* %s\n",
+ GNUNET_MESH_DEBUG_M2S(q->type));
+ switch (q->type)
+ {
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+ ucast = (struct GNUNET_MESH_Unicast *) info->mesh_data->data;
+ pid = ntohl (ucast->pid);
+ GNUNET_PEER_resolve (info->peer->id, &id);
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ ack = cinfo->fwd_ack;
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+ to_orig = (struct GNUNET_MESH_ToOrigin *) info->mesh_data->data;
+ pid = ntohl (to_orig->pid);
+ ack = t->bck_ack;
+ break;
+ case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+ mcast = (struct GNUNET_MESH_Multicast *) info->mesh_data->data;
+ pid = ntohl (mcast->pid);
+ GNUNET_PEER_resolve (info->peer->id, &id);
+ cinfo = tunnel_get_neighbor_fc(t, &id);
+ ack = cinfo->fwd_ack;
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* OK!\n");
+ return q;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* ACK: %u, PID: %u\n",
+ ack, pid);
+ if (GNUNET_NO == GMC_is_pid_bigger(pid, ack))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* OK!\n");
+ return q;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* NEXT!\n");
+ }
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* nothing found\n");
+ return NULL;
+}
+
+
/**
* Core callback to write a queued packet to core buffer
*
size_t data_size;
peer->core_transmit = NULL;
- queue = peer->queue_head;
-
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n");
+ queue = queue_get_next(peer);
- /* If queue is empty, send should have been cancelled */
+ /* Queue has no internal mesh traffic not sendable payload */
if (NULL == queue)
{
- GNUNET_break(0);
- return 0;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not ready, return\n");
+ if (NULL == peer->queue_head)
+ GNUNET_break(0); // Should've been canceled
+ return 0;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n");
{
struct GNUNET_PeerIdentity id;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* not enough room, reissue\n");
GNUNET_PEER_resolve (peer->id, &id);
peer->core_transmit =
GNUNET_CORE_notify_transmit_ready(core_handle,
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == queue->type)
{
t->fwd_queue_n--;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* unicast: %u\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "********* unicast: t->q (%u/%u)\n",
+ t->fwd_queue_n, t->fwd_queue_max);
}
else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == queue->type)
{
case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN:
case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "********* raw: %u\n",
- queue->type);
+ "********* raw: %s\n",
+ GNUNET_MESH_DEBUG_M2S (queue->type));
/* Fall through */
case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n");
data_size = send_core_data_multicast(queue->cls, size, buf);
+ // t->fwd_queue_n--; FIXME fc
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
break;
case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" pid %u not seen yet, forwarding\n", pid);
}
+
t->skip += (pid - t->fwd_pid) - 1;
t->fwd_pid = pid;
+
if (GMC_is_pid_bigger (pid, t->last_fwd_ack))
{
GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO);
GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received PID %u, ACK %u\n",
+ pid, t->last_fwd_ack);
return GNUNET_OK;
}
+
tunnel_reset_timeout (t);
dest_id = GNUNET_PEER_search (&msg->destination);
if (dest_id == myid)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" it's for us! sending to clients...\n");
GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO);
- send_subscribed_clients (message, (struct GNUNET_MessageHeader *) &msg[1]);
+ send_subscribed_clients (message, &msg[1].header, t);
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
return GNUNET_OK;
}
GNUNET_CONTAINER_multihashmap_iterate (t->children_fc,
&tunnel_add_skip,
&neighbor);
- if (GMC_is_pid_bigger (pid, cinfo->fwd_ack))
+ if (GNUNET_YES == t->nobuffer &&
+ GNUNET_YES == GMC_is_pid_bigger (pid, cinfo->fwd_ack))
{
GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO);
GNUNET_break_op (0);
}
send_message (message, neighbor, t);
GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO);
-
return GNUNET_OK;
}
GNUNET_CONTAINER_multihashmap_contains (t->peers, &my_full_id.hashPubKey))
{
GNUNET_STATISTICS_update (stats, "# multicast received", 1, GNUNET_NO);
- send_subscribed_clients (message, &msg[1].header);
+ send_subscribed_clients (message, &msg[1].header, t);
tunnel_send_fwd_ack(t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ttl: %u\n", ntohl (msg->ttl));
struct MeshTunnel *t;
size_t size;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a ToOrigin packet from %s\n",
GNUNET_i2s (peer));
size = ntohs (message->size);
msg = (struct GNUNET_MESH_Multicast *) cbuf;
msg->header.size = htons (size);
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
+ // FIXME: change type to != MULTICAST
msg->oid = my_full_id;
msg->tid = htonl (t->id.tid);
msg->ttl = htonl (default_ttl);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "receive done OK\n");
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
return;
}
t = tunnel_get_by_local_id (c, tid);
if (NULL == t)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
+ if (t->owner == c)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n");
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"To Origin PID, expected %u, got %u\n",
- clinfo->bck_pid + 1, ntohl (data_msg->pid));
+ clinfo->bck_pid + 1,
+ ntohl (data_msg->pid));
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
/* Ok, everything is correct, send the message
* (pretend we got it from a mesh peer)
*/
+ clinfo->bck_pid++;
{
char buf[ntohs (message->size)] GNUNET_ALIGN;
struct GNUNET_MESH_ToOrigin *copy;
GNUNET_PEER_resolve (t->id.oid, ©->oid);
copy->tid = htonl (t->id.tid);
copy->ttl = htonl (default_ttl);
- GNUNET_assert (ntohl (copy->pid) == (t->bck_pid + 1));
+ if (ntohl (copy->pid) != (t->bck_pid + 1))
+ {
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "To Origin PID, expected %u, got %u\n",
+ t->bck_pid + 1,
+ ntohl (copy->pid));
+ return;
+ }
+ t->bck_pid++;
copy->sender = my_full_id;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" calling generic handler...\n");
handle_mesh_data_to_orig (NULL, &my_full_id, ©->header, NULL, 0);
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
return;
}
if (NULL == t)
{
GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
+ if (t->owner == c)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n"); GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
if (NULL == t)
{
GNUNET_break (0); // FIXME fc
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id);
+ if (t->owner == c)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is owner)\n");
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " (client is leaf)\n");
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
}
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
return;
}
GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key),
&my_full_id.hashPubKey);
myid = GNUNET_PEER_intern (&my_full_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Mesh for peer [%s] starting\n",
+ GNUNET_i2s(&my_full_id));
// transport_handle = GNUNET_TRANSPORT_connect(c,
// &my_full_id,
NULL)) ? 0 : 1;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "main() END\n");
+ INTERVAL_SHOW;
+
return ret;
}