#define MESH_MAX_POLL_TIME GNUNET_TIME_relative_multiply (\
GNUNET_TIME_UNIT_MINUTES,\
10)
+#define MESH_RETRANSMIT_TIME GNUNET_TIME_UNIT_SECONDS
+#define MESH_RETRANSMIT_MARGIN 4
#if MESH_DEBUG_CONNECTION
#define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
/**
- * Info needed to retry a message in case it gets lost.
+ * Data needed for reliable tunnel endpoint retransmission management.
*/
-struct MeshSentMessage {
+struct MeshTunnelReliability;
- /**
- * Tunnel this message is in.
- */
- struct MeshTunnel *t;
- /**
- * ID of the message (ACK needed to free)
- */
- uint32_t id;
+/**
+ * Info needed to retry a message in case it gets lost.
+ */
+struct MeshReliableMessage
+{
+ /**
+ * Double linked list, FIFO style
+ */
+ struct MeshReliableMessage *next;
+ struct MeshReliableMessage *prev;
- /**
- * Task to resend/poll in case no ACK is received.
- */
- GNUNET_SCHEDULER_TaskIdentifier retry_task; // FIXME move to per tunnel timer?
+ /**
+ * Tunnel Reliability queue this message is in.
+ */
+ struct MeshTunnelReliability *rel;
- /**
- * Counter for exponential backoff.
- */
- struct GNUNET_TIME_Relative retry_timer;
+ /**
+ * ID of the message (ACK needed to free)
+ */
+ uint64_t mid;
- /**
- * Is this a forward or backward going message?
- */
- int is_forward;
+ /**
+ * When was this message issued (to calculate ACK delay) FIXME update with traffic
+ */
+ struct GNUNET_TIME_Absolute timestamp;
/* struct GNUNET_MESH_Data with payload */
};
+
+struct MeshTunnelReliability
+{
+ /**
+ * Tunnel this is about.
+ */
+ struct MeshTunnel *t;
+
+ /**
+ * DLL of messages sent and not yet ACK'd.
+ */
+ struct MeshReliableMessage *head_sent;
+ struct MeshReliableMessage *tail_sent;
+
+ /**
+ * Messages pending
+ */
+ unsigned int n_sent;
+
+ /**
+ * Next MID to use
+ */
+ uint64_t mid_sent;
+
+ /**
+ * DLL of messages received out of order.
+ */
+ struct MeshReliableMessage *head_recv;
+ struct MeshReliableMessage *tail_recv;
+
+ uint64_t mid_recv;
+
+ /**
+ * Task to resend/poll in case no ACK is received.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
+ /**
+ * Counter for exponential backoff.
+ */
+ struct GNUNET_TIME_Relative retry_timer;
+
+ /**
+ * How long does it usually take to get an ACK. FIXME update with traffic
+ */
+ struct GNUNET_TIME_Relative expected_delay;
+};
+
+
/**
* Struct containing all information regarding a tunnel
* For an intermediate node the improtant info used will be:
*/
unsigned int pending_messages;
- /**
- * Messages sent and not yet ACK'd.
- * Only present (non-NULL) at the owner of a tunnel.
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *sent_messages_fwd;
+ /**
+ * Reliability data.
+ * Only present (non-NULL) at the owner of a tunnel.
+ */
+ struct MeshTunnelReliability *fwd_rel;
- /**
- * Messages sent and not yet ACK'd.
- * Only present (non-NULL) at the destination of a tunnel.
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *sent_messages_bck;
+ /**
+ * Reliability data.
+ * Only present (non-NULL) at the destination of a tunnel.
+ */
+ struct MeshTunnelReliability *bck_rel;
};
static struct MeshClient *
client_get (struct GNUNET_SERVER_Client *client)
{
- struct MeshClient *c;
-
- c = clients_head;
- while (NULL != c)
- {
- if (c->handle == client)
- return c;
- c = c->next;
- }
- return NULL;
+ return GNUNET_SERVER_client_get_user_context (client, struct MeshClient);
}
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
msg.tunnel_id = htonl (t->local_tid_dest);
msg.port = htonl (t->port);
+ msg.options = 0;
+ msg.options |= GNUNET_YES == t->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0;
+ msg.options |= GNUNET_YES == t->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0;
+ msg.options = htonl (msg.options);
GNUNET_PEER_resolve (t->id.oid, &msg.peer);
GNUNET_SERVER_notification_context_unicast (nc, t->client->handle,
&msg.header, GNUNET_NO);
*
* @param t Tunnel on which to send the ACK.
* @param c Client to whom send the ACK.
- * @param ack Value of the ACK.
* @param is_fwd Set to GNUNET_YES for FWD ACK (dest->owner)
*/
static void
{
struct GNUNET_MESH_DataACK msg;
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK);
msg.header.size = htons (sizeof (msg));
msg.tid = htonl (t->id.tid);
GNUNET_PEER_resolve (t->id.oid, &msg.oid);
- msg.pid = htonl (t->prev_fc.last_pid_recv);
+ msg.mid = GNUNET_htonll (t->bck_rel->mid_recv);
msg.futures = 0; // FIXME set bits of other newer messages received
send_prebuilt_message (&msg.header, t->prev_hop, t);
{
struct GNUNET_MESH_DataACK msg;
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK);
msg.header.size = htons (sizeof (msg));
msg.tid = htonl (t->id.tid);
GNUNET_PEER_resolve (t->id.oid, &msg.oid);
- msg.pid = htonl (t->next_fc.last_pid_recv);
+ msg.mid = GNUNET_htonll (t->fwd_rel->mid_recv);
msg.futures = 0; // FIXME set bits of other newer messages received
send_prebuilt_message (&msg.header, t->next_hop, t);
tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type)
{
uint32_t ack;
+ int delta;
/* Is it after unicast retransmission? */
switch (type)
tunnel_send_fwd_data_ack (t);
break;
case GNUNET_MESSAGE_TYPE_MESH_ACK:
- if (NULL != t->owner && GNUNET_YES == t->reliable)
- return;
case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
break;
- case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK:
- tunnel_send_fwd_data_ack (t);
- break;
case GNUNET_MESSAGE_TYPE_MESH_POLL:
case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
t->force_ack = GNUNET_YES;
}
/* Ok, ACK might be necessary, what PID to ACK? */
- ack = t->prev_fc.last_pid_recv + t->queue_max - t->next_fc.queue_n;
+ delta = t->queue_max - t->next_fc.queue_n;
+ if (0 > delta)
+ delta = 0;
+ ack = t->prev_fc.last_pid_recv + delta;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK %u\n", ack);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" last %u, qmax %u, q %u\n",
" Not sending ACK, nobuffer + traffic\n");
return;
}
+ if (GNUNET_YES == t->reliable && NULL != t->owner)
+ tunnel_send_bck_data_ack (t);
break;
case GNUNET_MESSAGE_TYPE_MESH_ACK:
- if (NULL != t->client && GNUNET_YES == t->reliable)
- return;
case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
break;
- case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK:
- tunnel_send_bck_data_ack (t);
- /* fall through */
case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
case GNUNET_MESSAGE_TYPE_MESH_POLL:
t->force_ack = GNUNET_YES;
/**
* We haven't received an ACK after a certain time: restransmit the message.
*
- * @param cls Closure (MeshSentMessage with the message to restransmit)
+ * @param cls Closure (MeshReliableMessage with the message to restransmit)
* @param tc TaskContext.
*/
static void
tunnel_retransmit_message (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct MeshSentMessage *copy = cls;
+ struct MeshTunnelReliability *rel = cls;
+ struct MeshReliableMessage *copy;
+ struct MeshFlowControl *fc;
+ struct MeshPeerQueue *q;
+ struct MeshPeerInfo *pi;
+ struct MeshTunnel *t;
struct GNUNET_MESH_Data *payload;
GNUNET_PEER_Id hop;
- copy->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ t = rel->t;
+ copy = rel->head_sent;
+ if (NULL == copy)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ /* Search the message to be retransmitted in the outgoing queue */
payload = (struct GNUNET_MESH_Data *) ©[1];
- hop = copy->is_forward ? copy->t->next_hop : copy->t->prev_hop;
- send_prebuilt_message (&payload->header, hop, copy->t);
- GNUNET_STATISTICS_update (stats, "# unicast retransmitted", 1, GNUNET_NO);
- copy->retry_timer = GNUNET_TIME_STD_BACKOFF (copy->retry_timer);
- copy->retry_task = GNUNET_SCHEDULER_add_delayed (copy->retry_timer,
- &tunnel_retransmit_message,
- cls);
+ hop = rel == t->fwd_rel ? t->next_hop : t->prev_hop;
+ fc = rel == t->fwd_rel ? &t->next_fc : &t->prev_fc;
+ pi = peer_get_short (hop);
+ for (q = pi->queue_head; NULL != q; q = q->next)
+ {
+ if (ntohs (payload->header.type) == q->type)
+ {
+ struct GNUNET_MESH_Data *queued_data = q->cls;
+
+ if (queued_data->mid == payload->mid)
+ break;
+ }
+ }
+
+ /* Message not found in the queue */
+ if (NULL == q)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %llu\n", copy->mid);
+ copy->timestamp = GNUNET_TIME_absolute_get ();
+
+ fc->last_ack_sent++;
+ payload->pid = htonl (fc->last_pid_recv + 1);
+ fc->last_pid_recv++;
+ send_prebuilt_message (&payload->header, hop, t);
+ GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! STILL IN QUEUE %llu\n", copy->mid);
+ }
+
+ rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
+ rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &tunnel_retransmit_message,
+ cls);
}
static void
tunnel_set_options (struct MeshTunnel *t, uint32_t options)
{
- t->nobuffer = options & GNUNET_MESH_OPTION_NOBUFFER;
- t->reliable = options & GNUNET_MESH_OPTION_RELIABLE;
+ t->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ?
+ GNUNET_YES : GNUNET_NO;
+ t->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ?
+ GNUNET_YES : GNUNET_NO;
}
else
{
GNUNET_break (0);
+ GNUNET_free (queue);
return;
}
fc->queue_n--;
{
case 0:
case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK:
case GNUNET_MESSAGE_TYPE_MESH_POLL:
case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN:
case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY:
else
{
GNUNET_break (0);
- fc = NULL;
+ return data_size;
}
if (NULL != fc && GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
{
{
struct MeshPeerQueue *queue;
struct GNUNET_PeerIdentity id;
- unsigned int *n;
+ struct MeshFlowControl *fc;
+ int priority;
- n = NULL;
+ fc = NULL;
+ priority = GNUNET_NO;
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type)
{
- n = &t->next_fc.queue_n;
+ fc = &t->next_fc;
}
else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
{
- n = &t->prev_fc.queue_n;
+ fc = &t->prev_fc;
}
- if (NULL != n)
+ if (NULL != fc)
{
- if (*n >= t->queue_max)
+ if (fc->queue_n >= t->queue_max)
{
- GNUNET_break(0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"queue full: %u/%u\n",
- *n, t->queue_max);
- GNUNET_STATISTICS_update(stats,
- "# messages dropped (buffer full)",
- 1, GNUNET_NO);
- return; /* Drop message */
+ fc->queue_n, t->queue_max);
+
+ /* If this isn't a retransmission, drop the message */
+ if (GNUNET_NO == t->reliable ||
+ (NULL == t->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) ||
+ (NULL == t->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type))
+ {
+ GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
+ 1, GNUNET_NO);
+ return; /* Drop this message */
+ }
+ priority = GNUNET_YES;
}
- (*n)++;
+ fc->queue_n++;
}
queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
queue->cls = cls;
queue->size = size;
queue->peer = dst;
queue->tunnel = t;
- GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ if (GNUNET_YES == priority)
+ GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
+ else
+ GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
if (NULL == dst->core_transmit)
{
GNUNET_PEER_resolve (dst->id, &id);
next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
if (GNUNET_YES == t->reliable)
- t->sent_messages_bck =
- GNUNET_CONTAINER_multihashmap32_create (t->queue_max);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
+ t->bck_rel = GNUNET_malloc (sizeof (struct MeshTunnelReliability));
+ t->bck_rel->t = t;
+ t->bck_rel->expected_delay = MESH_RETRANSMIT_TIME;
+ }
tunnel_add_client (t, c);
send_client_tunnel_create (t);
" it's for us! sending to clients...\n");
GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO);
// if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv)) FIXME use
- if ( (GNUNET_NO == t->reliable &&
- GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv))
- ||
- (GNUNET_YES == t->reliable &&
- pid == t->prev_fc.last_pid_recv + 1) )
+ if (GMC_is_pid_bigger (pid, t->prev_fc.last_pid_recv)
+ &&
+ (GNUNET_NO == t->reliable ||
+ GNUNET_ntohll (msg->mid) == (t->bck_rel->mid_recv + 1)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" pid %u not seen yet, forwarding\n", pid);
t->prev_fc.last_pid_recv = pid;
+ t->bck_rel->mid_recv++;
tunnel_send_client_ucast (t, msg);
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
" Pid %u not expected (%u), sending FWD ACK!\n",
pid, t->prev_fc.last_pid_recv + 1);
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
}
+ tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
return GNUNET_OK;
}
- t->prev_fc.last_pid_recv = pid;
+ if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv))
+ t->prev_fc.last_pid_recv = pid;
if (0 == t->next_hop)
{
GNUNET_break (0);
{
t->next_fc.last_pid_recv = pid;
tunnel_send_client_to_orig (t, msg);
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
}
else
{
// GNUNET_STATISTICS_update (stats, "# duplicate PID drops BCK", 1, GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
" Pid %u not expected, sending FWD ACK!\n", pid);
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
}
+ tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
return GNUNET_OK;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" not for us, retransmitting...\n");
- t->next_fc.last_pid_recv = pid;
+ if (GMC_is_pid_bigger (pid, t->next_fc.last_pid_recv))
+ t->next_fc.last_pid_recv = pid;
if (0 == t->prev_hop) /* No owner AND no prev hop */
{
if (GNUNET_YES == t->destroy)
const struct GNUNET_MessageHeader *message)
{
struct GNUNET_MESH_DataACK *msg;
- struct GNUNET_CONTAINER_MultiHashMap32 *hm;
- struct MeshSentMessage *copy;
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
struct MeshTunnel *t;
GNUNET_PEER_Id id;
- uint32_t ack;
+ uint64_t ack;
+ uint16_t type;
+ int work;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a DATA ACK message from %s!\n",
- GNUNET_i2s (peer));
+ type = ntohs (message->type);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a %s message from %s!\n",
+ GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
msg = (struct GNUNET_MESH_DataACK *) message;
t = tunnel_get (&msg->oid, ntohl (msg->tid));
GNUNET_STATISTICS_update (stats, "# ack on unknown tunnel", 1, GNUNET_NO);
return GNUNET_OK;
}
- ack = ntohl (msg->pid);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
+ ack = GNUNET_ntohll (msg->mid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %llu\n", ack);
/* Is this a forward or backward ACK? */
id = GNUNET_PEER_search (peer);
- if (t->next_hop == id)
+ if (t->next_hop == id && GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n");
if (NULL == t->owner)
send_prebuilt_message (message, t->prev_hop, t);
return GNUNET_OK;
}
- hm = t->sent_messages_fwd;
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ rel = t->fwd_rel;
+ tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
}
- else if (t->prev_hop == id)
+ else if (t->prev_hop == id && GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n");
if (NULL == t->client)
send_prebuilt_message (message, t->next_hop, t);
return GNUNET_OK;
}
- hm = t->sent_messages_bck;
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ rel = t->bck_rel;
+ tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
}
else
- GNUNET_break_op (0);
-
- copy = GNUNET_CONTAINER_multihashmap32_get (hm, ack);
- if (NULL == copy)
{
- GNUNET_break (0); // FIXME needed?
+ GNUNET_break_op (0);
return GNUNET_OK;
}
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap32_remove (hm, ack, copy));
- if (GNUNET_SCHEDULER_NO_TASK != copy->retry_task)
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! ACK %llu\n", ack);
+ for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
{
- GNUNET_SCHEDULER_cancel (copy->retry_task);
+ struct GNUNET_TIME_Relative time;
+
+ if (copy->mid > ack)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! head %llu, out!\n", copy->mid);
+ break;
+ }
+ work = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! id %llu\n", copy->mid);
+ next = copy->next;
+ time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
+ rel->expected_delay.rel_value += time.rel_value;
+ rel->expected_delay.rel_value /= 2;
+ rel->n_sent--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
+ GNUNET_NO));
+ rel->retry_timer = rel->expected_delay;
+ GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
+ GNUNET_free (copy);
+ }
+
+ if (GNUNET_YES == work)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
+ {
+ GNUNET_SCHEDULER_cancel (rel->retry_task);
+ if (NULL == rel->head_sent)
+ {
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ else
+ {
+ struct GNUNET_TIME_Absolute new_target;
+ struct GNUNET_TIME_Relative delay;
+
+ new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
+ rel->retry_timer);
+ delay = GNUNET_TIME_absolute_get_remaining (new_target);
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (delay,
+ &tunnel_retransmit_message,
+ rel);
+ }
+ }
+ else
+ GNUNET_break (0);
}
- else
- GNUNET_break (0);
- GNUNET_free (copy);
return GNUNET_OK;
}
{
struct GNUNET_MESH_Poll *msg;
struct MeshTunnel *t;
+ struct MeshFlowControl *fc;
GNUNET_PEER_Id id;
+ uint32_t pid;
+ uint32_t old;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an POLL packet from %s!\n",
GNUNET_i2s (peer));
/* Is this a forward or backward ACK? */
id = GNUNET_PEER_search(peer);
+ pid = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " PID %u\n", pid);
if (t->next_hop == id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from FWD\n");
- t->next_fc.last_pid_recv = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " was %u\n", t->next_fc.last_pid_recv);
+ fc = &t->next_fc;
+ old = fc->last_pid_recv;
+ fc->last_pid_recv = pid;
tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL);
}
else if (t->prev_hop == id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from BCK\n");
- t->prev_fc.last_pid_recv = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " was %u\n", t->prev_fc.last_pid_recv);
+ fc = &t->prev_fc;
+ old = fc->last_pid_recv;
+ fc->last_pid_recv = pid;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL);
}
else
GNUNET_break (0);
+
+ if (GNUNET_YES == t->reliable)
+ fc->last_pid_recv = old;
return GNUNET_OK;
}
sizeof (struct GNUNET_MESH_TunnelDestroy)},
{&handle_mesh_unicast, GNUNET_MESSAGE_TYPE_MESH_UNICAST, 0},
{&handle_mesh_to_orig, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN, 0},
- {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK,
+ {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK,
+ sizeof (struct GNUNET_MESH_DataACK)},
+ {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK,
sizeof (struct GNUNET_MESH_DataACK)},
{&handle_mesh_keepalive, GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE,
sizeof (struct GNUNET_MESH_TunnelKeepAlive)},
return;
}
- c = clients_head;
- while (NULL != c)
+ c = client_get (client);
+ if (NULL != c)
{
- if (c->handle != client)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- " ... searching %p (%u)\n",
- c->handle, c->id);
- c = c->next;
- continue;
- }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "matching client found (%u)\n",
c->id);
GNUNET_SERVER_client_drop (c->handle);
GNUNET_STATISTICS_update (stats, "# clients", -1, GNUNET_NO);
c = next;
}
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " context NULL!\n");
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "done!\n");
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " client has %u ports\n", size);
c->handle = client;
GNUNET_SERVER_client_keep (client);
+ GNUNET_SERVER_client_set_user_context (client, c);
if (size > 0)
{
uint32_t u32;
t->port = ntohl (t_msg->port);
tunnel_set_options (t, ntohl (t_msg->options));
if (GNUNET_YES == t->reliable)
- t->sent_messages_fwd =
- GNUNET_CONTAINER_multihashmap32_create (t->queue_max);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
+ t->fwd_rel = GNUNET_malloc (sizeof (struct MeshTunnelReliability));
+ t->fwd_rel->t = t;
+ t->fwd_rel->expected_delay = MESH_RETRANSMIT_TIME;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CREATED TUNNEL %s[%x]:%u (%x)\n",
GNUNET_i2s (&my_full_id), t->id.tid, t->port, t->local_tid);
fc = tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV ? &t->prev_fc : &t->next_fc;
if (GNUNET_YES == t->reliable)
{
- struct MeshSentMessage *copy;
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
- copy = GNUNET_malloc (sizeof (struct MeshSentMessage)
+ rel = (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) ? t->fwd_rel : t->bck_rel;
+ copy = GNUNET_malloc (sizeof (struct MeshReliableMessage)
+ sizeof(struct GNUNET_MESH_Data)
+ size);
- copy->t = t;
- copy->id = fc->last_pid_recv + 1;
- copy->is_forward = (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV);
- copy->retry_timer = GNUNET_TIME_UNIT_MINUTES;
- copy->retry_task = GNUNET_SCHEDULER_add_delayed (copy->retry_timer,
- &tunnel_retransmit_message,
- copy);
- if (GNUNET_OK !=
- GNUNET_CONTAINER_multihashmap32_put (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV ?
- t->sent_messages_fwd : t->sent_messages_bck,
- copy->id,
- copy,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST))
+ copy->mid = rel->mid_sent++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %llu\n", copy->mid);
+ copy->timestamp = GNUNET_TIME_absolute_get ();
+ copy->rel = rel;
+ rel->n_sent++;
+ GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
+ if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
{
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (copy->retry_task);
- GNUNET_free (copy);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
+ rel->retry_timer =
+ GNUNET_TIME_relative_multiply (rel->expected_delay,
+ MESH_RETRANSMIT_MARGIN);
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &tunnel_retransmit_message,
+ rel);
}
payload = (struct GNUNET_MESH_Data *) ©[1];
+ payload->mid = GNUNET_htonll (copy->mid);
}
else
{
payload = (struct GNUNET_MESH_Data *) cbuf;
+ payload->mid = 0;
+ // FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME
+ // use different struct for unreliable traffic, save 8 bytes
}
memcpy (&payload[1], &data_msg[1], size);
payload->header.size = htons (sizeof (struct GNUNET_MESH_Data) + size);