#define MESH_MAX_POLL_TIME GNUNET_TIME_relative_multiply (\
GNUNET_TIME_UNIT_MINUTES,\
10)
+#define MESH_RETRANSMIT_TIME GNUNET_TIME_relative_multiply (\
+ GNUNET_TIME_UNIT_SECONDS,\
+ 5)
#if MESH_DEBUG_CONNECTION
#define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
/**
* Info needed to retry a message in case it gets lost.
*/
-struct MeshSentMessage {
+struct MeshReliableMessage
+{
+ /**
+ * Double linked list, FIFO style
+ */
+ struct MeshReliableMessage *next;
+ struct MeshReliableMessage *prev;
- /**
- * Tunnel this message is in.
- */
- struct MeshTunnel *t;
+ /**
+ * Tunnel Reliability queue this message is in.
+ */
+ struct MeshTunnelReliability *rel;
- /**
- * ID of the message (ACK needed to free)
- */
- uint32_t id;
+ /**
+ * ID of the message (ACK needed to free)
+ */
+ uint32_t id;
- /**
- * Task to resend/poll in case no ACK is received.
- */
- GNUNET_SCHEDULER_TaskIdentifier retry_task; // FIXME move to per tunnel timer?
+ /**
+ * When was this message issued (to calculate ACK delay) FIXME update with traffic
+ */
+ struct GNUNET_TIME_Absolute timestamp;
- /**
- * Counter for exponential backoff.
- */
- struct GNUNET_TIME_Relative retry_timer;
+ /* struct GNUNET_MESH_Data with payload */
+};
- /**
- * Is this a forward or backward going message?
- */
- int is_forward;
- /* struct GNUNET_MESH_Data with payload */
+/**
+ * Data needed for reliable tunnel endpoint retransmission management.
+ */
+struct MeshTunnelReliability
+{
+ /**
+ * Tunnel this is about.
+ */
+ struct MeshTunnel *t;
+
+ /**
+ * DLL of messages sent and not yet ACK'd.
+ */
+ struct MeshReliableMessage *head_sent;
+ struct MeshReliableMessage *tail_sent;
+
+ /**
+ * DLL of messages received out of order.
+ */
+ struct MeshReliableMessage *head_recv;
+ struct MeshReliableMessage *tail_recv;
+
+ /**
+ * Task to resend/poll in case no ACK is received.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
+ /**
+ * Counter for exponential backoff.
+ */
+ struct GNUNET_TIME_Relative retry_timer;
+
+ /**
+ * How long does it usually take to get an ACK. FIXME update with traffic
+ */
+ struct GNUNET_TIME_Relative expected_delay;
};
+
/**
* Struct containing all information regarding a tunnel
* For an intermediate node the improtant info used will be:
*/
unsigned int pending_messages;
- /**
- * Messages sent and not yet ACK'd.
- * Only present (non-NULL) at the owner of a tunnel.
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *sent_messages_fwd;
+ /**
+ * Reliability data.
+ * Only present (non-NULL) at the owner of a tunnel.
+ */
+ struct MeshTunnelReliability *fwd_rel;
- /**
- * Messages sent and not yet ACK'd.
- * Only present (non-NULL) at the destination of a tunnel.
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *sent_messages_bck;
+ /**
+ * Reliability data.
+ * Only present (non-NULL) at the destination of a tunnel.
+ */
+ struct MeshTunnelReliability *bck_rel;
};
t->local_tid,
t));
}
- else if (c == t->client)
+ if (c == t->client)
{
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap32_remove (c->incoming_tunnels,
t->local_tid_dest,
t));
}
- else
- {
- GNUNET_break (0);
- }
}
/**
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
msg.tunnel_id = htonl (t->local_tid_dest);
msg.port = htonl (t->port);
+ msg.options = 0;
+ msg.options |= GNUNET_YES == t->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0;
+ msg.options |= GNUNET_YES == t->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0;
+ msg.options = htonl (msg.options);
GNUNET_PEER_resolve (t->id.oid, &msg.peer);
GNUNET_SERVER_notification_context_unicast (nc, t->client->handle,
&msg.header, GNUNET_NO);
*
* @param t Tunnel on which to send the ACK.
* @param c Client to whom send the ACK.
- * @param ack Value of the ACK.
* @param is_fwd Set to GNUNET_YES for FWD ACK (dest->owner)
*/
static void
{
struct GNUNET_MESH_DataACK msg;
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK);
msg.header.size = htons (sizeof (msg));
msg.tid = htonl (t->id.tid);
GNUNET_PEER_resolve (t->id.oid, &msg.oid);
{
struct GNUNET_MESH_DataACK msg;
- msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK);
msg.header.size = htons (sizeof (msg));
msg.tid = htonl (t->id.tid);
GNUNET_PEER_resolve (t->id.oid, &msg.oid);
return;
case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
break;
- case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK:
tunnel_send_fwd_data_ack (t);
break;
case GNUNET_MESSAGE_TYPE_MESH_POLL:
return;
case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
break;
- case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK:
tunnel_send_bck_data_ack (t);
/* fall through */
case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
/**
* We haven't received an ACK after a certain time: restransmit the message.
*
- * @param cls Closure (MeshSentMessage with the message to restransmit)
+ * @param cls Closure (MeshReliableMessage with the message to restransmit)
* @param tc TaskContext.
*/
static void
tunnel_retransmit_message (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct MeshSentMessage *copy = cls;
+ struct MeshTunnelReliability *rel = cls;
+ struct MeshReliableMessage *copy;
+ struct MeshTunnel *t;
struct GNUNET_MESH_Data *payload;
GNUNET_PEER_Id hop;
- copy->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ t = rel->t;
+ copy = rel->head_sent;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Retransmit \n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! id %u\n", copy->id);
+
payload = (struct GNUNET_MESH_Data *) ©[1];
- hop = copy->is_forward ? copy->t->next_hop : copy->t->prev_hop;
- send_prebuilt_message (&payload->header, hop, copy->t);
- GNUNET_STATISTICS_update (stats, "# unicast retransmitted", 1, GNUNET_NO);
- copy->retry_timer = GNUNET_TIME_STD_BACKOFF (copy->retry_timer);
- copy->retry_task = GNUNET_SCHEDULER_add_delayed (copy->retry_timer,
+ hop = rel == t->fwd_rel ? t->next_hop : t->prev_hop;
+ send_prebuilt_message (&payload->header, hop, t);
+ GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+ rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer); // FIXME adapt
+ rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
&tunnel_retransmit_message,
cls);
}
static void
tunnel_set_options (struct MeshTunnel *t, uint32_t options)
{
- t->nobuffer = options & GNUNET_MESH_OPTION_NOBUFFER;
- t->reliable = options & GNUNET_MESH_OPTION_RELIABLE;
+ t->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ?
+ GNUNET_YES : GNUNET_NO;
+ t->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ?
+ GNUNET_YES : GNUNET_NO;
}
t->next_hop = 0;
}
}
- else if (c == t->owner)
+ if (c == t->owner)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Client %u is owner.\n", c->id);
t->owner = NULL;
t->prev_hop = 0;
}
}
- else
- {
- GNUNET_break (0);
- }
+
tunnel_destroy_empty (t);
return GNUNET_OK;
else
{
GNUNET_break (0);
+ GNUNET_free (queue);
return;
}
fc->queue_n--;
{
case 0:
case GNUNET_MESSAGE_TYPE_MESH_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK:
case GNUNET_MESSAGE_TYPE_MESH_POLL:
case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN:
case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY:
switch (type)
{
case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
- t->next_fc.last_pid_sent = pid;
+ if (GMC_is_pid_bigger(pid, t->next_fc.last_pid_sent))
+ t->next_fc.last_pid_sent = pid;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
break;
case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
- t->prev_fc.last_pid_sent = pid;
+ if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_sent))
+ t->prev_fc.last_pid_sent = pid;
tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
break;
default:
{
struct MeshPeerQueue *queue;
struct GNUNET_PeerIdentity id;
- unsigned int *n;
+ struct MeshFlowControl *fc;
+ uint32_t pid;
+ uint32_t pid_q;
+ int priority;
- n = NULL;
+ fc = NULL;
+ priority = GNUNET_NO;
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type)
{
- n = &t->next_fc.queue_n;
+ fc = &t->next_fc;
+ pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid);
}
else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
{
- n = &t->prev_fc.queue_n;
+ fc = &t->prev_fc;
+ pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid);
}
- if (NULL != n)
+ if (NULL != fc)
{
- if (*n >= t->queue_max)
+ if (fc->queue_n >= t->queue_max)
{
- GNUNET_break(0);
+ GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"queue full: %u/%u\n",
- *n, t->queue_max);
- GNUNET_STATISTICS_update(stats,
- "# messages dropped (buffer full)",
- 1, GNUNET_NO);
- return; /* Drop message */
+ fc->queue_n, t->queue_max);
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped (buffer full)",
+ 1, GNUNET_NO);
+ /* Get the PID of the oldest message in the queue */
+ for (queue = dst->queue_head; queue != NULL; queue = queue->next)
+ if (queue->type == type && queue->tunnel == t)
+ {
+ pid_q = ntohl (((struct GNUNET_MESH_Data *)(queue->cls))->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "pid: %u, pid_q: %u\n", pid, pid_q);
+ break;
+ }
+ GNUNET_assert (NULL != queue);
+
+ /* If this is an earlier message that that, give it priority:
+ * - drop the newest message in the queue
+ * - instert current one at the end of the queue (first to get out)
+ */
+ if (GNUNET_YES == t->reliable && GMC_is_pid_bigger(pid_q, pid))
+ {
+ for (queue = dst->queue_tail; queue != NULL; queue = queue->prev)
+ if (queue->type == type && queue->tunnel == t)
+ {
+ /* Drop message from queue */
+ pid_q = ntohl (((struct GNUNET_MESH_Data *)(queue->cls))->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "dropping pid: %u\n", pid_q);
+ queue_destroy (queue, GNUNET_YES);
+ t->pending_messages--;
+ priority = GNUNET_YES;
+ break;
+ }
+ }
+ else
+ return; /* Drop this message */
}
- (*n)++;
+ fc->queue_n++;
}
queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
queue->cls = cls;
queue->size = size;
queue->peer = dst;
queue->tunnel = t;
- GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ if (GNUNET_YES == priority)
+ GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
+ else
+ GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
if (NULL == dst->core_transmit)
{
GNUNET_PEER_resolve (dst->id, &id);
next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
if (GNUNET_YES == t->reliable)
- t->sent_messages_bck =
- GNUNET_CONTAINER_multihashmap32_create (t->queue_max);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
+ t->bck_rel = GNUNET_malloc (sizeof (struct MeshTunnelReliability));
+ t->bck_rel->t = t;
+ t->bck_rel->expected_delay = MESH_RETRANSMIT_TIME;
+ }
tunnel_add_client (t, c);
send_client_tunnel_create (t);
" pid %u not seen yet, forwarding\n", pid);
t->prev_fc.last_pid_recv = pid;
tunnel_send_client_ucast (t, msg);
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
" Pid %u not expected (%u), sending FWD ACK!\n",
pid, t->prev_fc.last_pid_recv + 1);
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
}
+ tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK);
return GNUNET_OK;
}
- t->prev_fc.last_pid_recv = pid;
+ if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv))
+ t->prev_fc.last_pid_recv = pid;
if (0 == t->next_hop)
{
GNUNET_break (0);
{
t->next_fc.last_pid_recv = pid;
tunnel_send_client_to_orig (t, msg);
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
}
else
{
// GNUNET_STATISTICS_update (stats, "# duplicate PID drops BCK", 1, GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
" Pid %u not expected, sending FWD ACK!\n", pid);
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
}
+ tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK);
return GNUNET_OK;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
" not for us, retransmitting...\n");
- t->next_fc.last_pid_recv = pid;
+ if (GMC_is_pid_bigger (pid, t->next_fc.last_pid_recv))
+ t->next_fc.last_pid_recv = pid;
if (0 == t->prev_hop) /* No owner AND no prev hop */
{
if (GNUNET_YES == t->destroy)
const struct GNUNET_MessageHeader *message)
{
struct GNUNET_MESH_DataACK *msg;
- struct GNUNET_CONTAINER_MultiHashMap32 *hm;
- struct MeshSentMessage *copy;
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
struct MeshTunnel *t;
GNUNET_PEER_Id id;
uint32_t ack;
+ uint16_t type;
+ int work;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a DATA ACK message from %s!\n",
- GNUNET_i2s (peer));
+ type = ntohs (message->type);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a %s message from %s!\n",
+ GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
msg = (struct GNUNET_MESH_DataACK *) message;
t = tunnel_get (&msg->oid, ntohl (msg->tid));
/* Is this a forward or backward ACK? */
id = GNUNET_PEER_search (peer);
- if (t->next_hop == id)
+ if (t->next_hop == id && GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n");
if (NULL == t->owner)
send_prebuilt_message (message, t->prev_hop, t);
return GNUNET_OK;
}
- hm = t->sent_messages_fwd;
- tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ rel = t->fwd_rel;
+ tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
}
- else if (t->prev_hop == id)
+ else if (t->prev_hop == id && GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n");
if (NULL == t->client)
send_prebuilt_message (message, t->next_hop, t);
return GNUNET_OK;
}
- hm = t->sent_messages_bck;
- tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
+ rel = t->bck_rel;
+ tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
}
else
- GNUNET_break_op (0);
-
- copy = GNUNET_CONTAINER_multihashmap32_get (hm, ack);
- if (NULL == copy)
{
- GNUNET_break (0); // FIXME needed?
+ GNUNET_break_op (0);
return GNUNET_OK;
}
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap32_remove (hm, ack, copy));
- if (GNUNET_SCHEDULER_NO_TASK != copy->retry_task)
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! ACK \n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! ack %u\n", ack);
+ for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
{
- GNUNET_SCHEDULER_cancel (copy->retry_task);
+ struct GNUNET_TIME_Relative time;
+
+ if (copy->id > ack)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! head %u, out!\n", copy->id);
+ return GNUNET_OK;
+ }
+ work = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! id %u\n", copy->id);
+ GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
+ next = copy->next;
+ GNUNET_free (copy);
+ time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
+ rel->expected_delay.rel_value += time.rel_value;
+ rel->expected_delay.rel_value /= 2;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s!\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
+ GNUNET_NO));
+ rel->retry_timer = rel->expected_delay;
+ }
+ if (GNUNET_YES == work)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
+ {
+ GNUNET_SCHEDULER_cancel (rel->retry_task);
+ if (NULL == rel->head_sent)
+ {
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ else
+ {
+ struct GNUNET_TIME_Absolute new_target;
+ struct GNUNET_TIME_Relative delay;
+
+ new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
+ rel->retry_timer);
+ delay = GNUNET_TIME_absolute_get_remaining (new_target);
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (delay,
+ &tunnel_retransmit_message,
+ rel);
+ }
+ }
+ else
+ GNUNET_break (0);
}
- else
- GNUNET_break (0);
- GNUNET_free (copy);
return GNUNET_OK;
}
{
struct GNUNET_MESH_Poll *msg;
struct MeshTunnel *t;
+ struct MeshFlowControl *fc;
GNUNET_PEER_Id id;
+ uint32_t pid;
+ uint32_t old;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an POLL packet from %s!\n",
GNUNET_i2s (peer));
/* Is this a forward or backward ACK? */
id = GNUNET_PEER_search(peer);
+ pid = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " PID %u\n", pid);
if (t->next_hop == id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from FWD\n");
- t->next_fc.last_pid_recv = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " was %u\n", t->next_fc.last_pid_recv);
+ fc = &t->next_fc;
+ old = fc->last_pid_recv;
+ fc->last_pid_recv = pid;
tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL);
}
else if (t->prev_hop == id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from BCK\n");
- t->prev_fc.last_pid_recv = ntohl (msg->pid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " was %u\n", t->prev_fc.last_pid_recv);
+ fc = &t->prev_fc;
+ old = fc->last_pid_recv;
+ fc->last_pid_recv = pid;
tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL);
}
else
GNUNET_break (0);
+
+ if (GNUNET_YES == t->reliable)
+ fc->last_pid_recv = old;
return GNUNET_OK;
}
sizeof (struct GNUNET_MESH_TunnelDestroy)},
{&handle_mesh_unicast, GNUNET_MESSAGE_TYPE_MESH_UNICAST, 0},
{&handle_mesh_to_orig, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN, 0},
- {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_DATA_ACK,
+ {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK,
+ sizeof (struct GNUNET_MESH_DataACK)},
+ {&handle_mesh_data_ack, GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK,
sizeof (struct GNUNET_MESH_DataACK)},
{&handle_mesh_keepalive, GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE,
sizeof (struct GNUNET_MESH_TunnelKeepAlive)},
t->port = ntohl (t_msg->port);
tunnel_set_options (t, ntohl (t_msg->options));
if (GNUNET_YES == t->reliable)
- t->sent_messages_fwd =
- GNUNET_CONTAINER_multihashmap32_create (t->queue_max);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
+ t->fwd_rel = GNUNET_malloc (sizeof (struct MeshTunnelReliability));
+ t->fwd_rel->t = t;
+ t->fwd_rel->expected_delay = MESH_RETRANSMIT_TIME;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CREATED TUNNEL %s[%x]:%u (%x)\n",
GNUNET_i2s (&my_full_id), t->id.tid, t->port, t->local_tid);
{
t->client = NULL;
}
- else if (c == t->owner)
+ if (c == t->owner)
{
peer_info_remove_tunnel (peer_get_short (t->dest), t);
t->owner = NULL;
fc = tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV ? &t->prev_fc : &t->next_fc;
if (GNUNET_YES == t->reliable)
{
- struct MeshSentMessage *copy;
+ struct MeshTunnelReliability *rel;
+ struct MeshReliableMessage *copy;
- copy = GNUNET_malloc (sizeof (struct MeshSentMessage)
+ copy = GNUNET_malloc (sizeof (struct MeshReliableMessage)
+ sizeof(struct GNUNET_MESH_Data)
+ size);
- copy->t = t;
copy->id = fc->last_pid_recv + 1;
- copy->is_forward = GNUNET_YES;
- copy->retry_timer = GNUNET_TIME_UNIT_MINUTES;
- copy->retry_task = GNUNET_SCHEDULER_add_delayed (copy->retry_timer,
- &tunnel_retransmit_message,
- copy);
- if (GNUNET_OK !=
- GNUNET_CONTAINER_multihashmap32_put (t->sent_messages_fwd,
- copy->id,
- copy,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST))
+ copy->timestamp = GNUNET_TIME_absolute_get ();
+ rel = (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) ? t->fwd_rel : t->bck_rel;
+ copy->rel = rel;
+ GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
+ if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
{
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (copy->retry_task);
- GNUNET_free (copy);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
+ rel->retry_timer = rel->expected_delay;
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &tunnel_retransmit_message,
+ rel);
}
payload = (struct GNUNET_MESH_Data *) ©[1];
}