#include "gnunet_statistics_service.h"
-#include "mesh_enc.h"
-#include "mesh_protocol_enc.h"
+#include "mesh.h"
+#include "mesh_protocol.h"
#include "gnunet-service-mesh_channel.h"
#include "gnunet-service-mesh_local.h"
*/
uint32_t mid;
+ /**
+ * Tunnel Queue.
+ */
+ struct MeshTunnel3Queue *q;
+
/**
* When was this message issued (to calculate ACK delay)
*/
struct MeshReliableMessage *head_sent;
struct MeshReliableMessage *tail_sent;
- /**
- * Messages pending to send.
- */
- unsigned int n_sent;
-
/**
* DLL of messages received out of order.
*/
* timers and frees all memory.
*
* @param copy Message that is no longer needed: remote peer got it.
+ * @param update_time Is the timing information relevant?
+ * If this message is ACK in a batch the timing information
+ * is skewed by the retransmission, count only for the
+ * retransmitted message.
*/
static void
-rel_message_free (struct MeshReliableMessage *copy);
+rel_message_free (struct MeshReliableMessage *copy, int update_time);
/**
* We have received a message out of order, or the client is not ready.
copy = GNUNET_malloc (sizeof (*copy) + size);
copy->mid = mid;
copy->rel = rel;
+ copy->type = GNUNET_MESSAGE_TYPE_MESH_DATA;
memcpy (©[1], msg, size);
rel->n_recv++;
struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1];
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " have %u! now expecting %u\n",
- copy->mid, rel->mid_recv + 1);
+ " have %u! now expecting %u\n",
+ copy->mid, rel->mid_recv + 1);
send_client_data (ch, msg, fwd);
rel->n_recv--;
rel->mid_recv++;
else
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " reliable && don't have %u, next is %u\n",
- rel->mid_recv,
- copy->mid);
+ " reliable && don't have %u, next is %u\n",
+ rel->mid_recv,
+ copy->mid);
return;
}
}
bitfield = msg->futures;
mid = ntohl (msg->mid);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "free_sent_reliable %u %llX\n",
+ "!!! free_sent_reliable %u %llX\n",
mid, bitfield);
LOG (GNUNET_ERROR_TYPE_DEBUG,
" rel %p, head %p\n",
/* Now copy->mid == target, free it */
next = copy->next;
- rel_message_free (copy);
+ rel_message_free (copy, GNUNET_YES);
copy = next;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
return;
}
- /* Search the message to be retransmitted in the outgoing queue.
- * Check only the queue for the connection that is going to be used,
- * if the message is stuck in some other connection's queue we shouldn't
- * act upon it:
- * - cancelling it and sending the new one doesn't guarantee it's delivery,
- * the old connection could be temporary stalled or the queue happened to
- * be long at time of insertion.
- * - not sending the new one could cause terrible delays the old connection
- * is stalled.
- */
-// FIXME access to queue elements is limited
payload = (struct GNUNET_MESH_Data *) ©[1];
fwd = (rel == ch->root_rel);
-// c = GMT_get_connection (ch->t, fwd);
-// hop = connection_get_hop (c, fwd);
-// for (q = hop->queue_head; NULL != q; q = q->next)
-// {
-// if (ntohs (payload->header.type) == q->type && ch == q->ch)
-// {
-// struct GNUNET_MESH_Data *queued_data = q->cls;
-//
-// if (queued_data->mid == payload->mid)
-// break;
-// }
-// }
/* Message not found in the queue that we are going to use. */
-// if (NULL == q)
-// {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
-
- GMCH_send_prebuilt_message (&payload->header, ch, fwd);
- GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
-// }
-// else
-// {
-// LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! ALREADY IN QUEUE %u\n", copy->mid);
-// }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
+ GMCH_send_prebuilt_message (&payload->header, ch, fwd, GNUNET_YES);
+ GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+
+ copy->timestamp = GNUNET_TIME_absolute_get();
rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
&channel_retransmit_message,
* timers and frees all memory.
*
* @param copy Message that is no longer needed: remote peer got it.
+ * @param update_time Is the timing information relevant?
+ * If this message is ACK in a batch the timing information
+ * is skewed by the retransmission, count only for the
+ * retransmitted message.
*/
static void
-rel_message_free (struct MeshReliableMessage *copy)
+rel_message_free (struct MeshReliableMessage *copy, int update_time)
{
struct MeshChannelReliability *rel;
struct GNUNET_TIME_Relative time;
rel = copy->rel;
- time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
- rel->expected_delay.rel_value_us *= 7;
- rel->expected_delay.rel_value_us += time.rel_value_us;
- rel->expected_delay.rel_value_us /= 8;
- rel->n_sent--;
LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Freeing %u\n", copy->mid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! took %s\n",
- GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s\n",
- GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
- GNUNET_NO));
- rel->retry_timer = rel->expected_delay;
+ if (update_time)
+ {
+ time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
+ rel->expected_delay.rel_value_us *= 7;
+ rel->expected_delay.rel_value_us += time.rel_value_us;
+ rel->expected_delay.rel_value_us /= 8;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! took %s\n",
+ GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
+ GNUNET_NO));
+ rel->retry_timer = rel->expected_delay;
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! batch free, ignoring timing\n");
+ }
GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
GNUNET_free (copy);
}
fwd ? "FWD" : "BCK", GMCH_2s (ch));
msg.chid = htonl (ch->gid);
- GMCH_send_prebuilt_message (&msg.header, ch, !fwd);
+ GMCH_send_prebuilt_message (&msg.header, ch, !fwd, GNUNET_NO);
}
GMCH_2s (ch));
msg.chid = htonl (ch->gid);
- GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO);
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, GNUNET_NO);
}
msg = (struct GNUNET_MessageHeader *) ©[1];
if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE)
{
- rel_message_free (copy);
+ rel_message_free (copy, GNUNET_YES);
/* TODO return? */
}
}
channel_send_ack (ch, !fwd);
}
+static void
+message_sent (void *cls,
+ struct MeshTunnel3 *t,
+ struct MeshTunnel3Queue *q,
+ uint16_t type, size_t size)
+{
+ struct MeshReliableMessage *copy = cls;
+ struct MeshChannelReliability *rel = copy->rel;
+
+ copy->timestamp = GNUNET_TIME_absolute_get ();
+ if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
+ {
+ rel->retry_timer =
+ GNUNET_TIME_relative_multiply (rel->expected_delay,
+ MESH_RETRANSMIT_MARGIN);
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &channel_retransmit_message,
+ rel);
+ }
+}
+
/**
* Save a copy to retransmit in case it gets lost.
* @param msg Message to copy.
* @param fwd Is this fwd traffic?
*/
-static void
+static struct MeshReliableMessage *
channel_save_copy (struct MeshChannel *ch,
const struct GNUNET_MessageHeader *msg,
int fwd)
uint16_t size;
rel = fwd ? ch->root_rel : ch->dest_rel;
- mid = rel->mid_send;
+ mid = rel->mid_send - 1;
type = ntohs (msg->type);
size = ntohs (msg->size);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u\n", mid);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u %s\n",
+ mid, GNUNET_MESH_DEBUG_M2S (type));
copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) + size);
copy->mid = mid;
- copy->timestamp = GNUNET_TIME_absolute_get ();
copy->rel = rel;
copy->type = type;
memcpy (©[1], msg, size);
- rel->n_sent++;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent);
GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
- if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
- {
- rel->retry_timer =
- GNUNET_TIME_relative_multiply (rel->expected_delay,
- MESH_RETRANSMIT_MARGIN);
- rel->retry_task =
- GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
- &channel_retransmit_message,
- rel);
- }
+
+ return copy;
}
if (NULL != ch->root)
GML_send_channel_destroy (ch->root, ch->lid_root);
else if (0 == ch->lid_root)
- GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO);
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, GNUNET_NO);
if (NULL != ch->dest)
GML_send_channel_destroy (ch->dest, ch->lid_dest);
else if (0 == ch->lid_dest)
- GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES);
-}
-
-
-/**
- * Send data on a channel.
- *
- * If the destination is local, send it to client, otherwise encrypt and
- * send to next hop.
- *
- * @param ch Channel
- * @param msg Message.
- * @param fwd Is this a fwd (root->dest) message?
- */
-void
-GMCH_send_data (struct MeshChannel *ch,
- const struct GNUNET_MESH_Data *msg,
- int fwd)
-{
- if (GMCH_is_terminal (ch, fwd))
- {
- GML_send_data (fwd ? ch->dest : ch->root,
- msg,
- fwd ? ch->lid_dest : ch->lid_root);
- }
- else
- {
- GMT_send_prebuilt_message (&msg->header, ch->t, ch, fwd);
- }
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, GNUNET_NO);
}
struct MeshReliableMessage *copy;
unsigned int delta;
uint64_t mask;
- uint16_t type;
+ uint32_t ack;
if (GNUNET_NO == ch->reliable)
{
return;
}
rel = fwd ? ch->dest_rel : ch->root_rel;
+ ack = rel->mid_recv - 1;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "send_data_ack for %u\n",
- rel->mid_recv - 1);
+ " !! Send DATA_ACK for %u\n",
+ ack);
- type = GNUNET_MESSAGE_TYPE_MESH_DATA_ACK;
- msg.header.type = htons (type);
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK);
msg.header.size = htons (sizeof (msg));
msg.chid = htonl (ch->gid);
- msg.mid = htonl (rel->mid_recv - 1);
msg.futures = 0;
for (copy = rel->head_recv; NULL != copy; copy = copy->next)
{
- if (copy->type != type)
+ if (copy->type != GNUNET_MESSAGE_TYPE_MESH_DATA)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "!! Type %s, expected DATA\n",
+ GNUNET_MESH_DEBUG_M2S (copy->type));
+ continue;
+ }
+ if (copy->mid == ack + 1)
+ {
+ ack++;
continue;
- delta = copy->mid - rel->mid_recv;
+ }
+ delta = copy->mid - (ack + 1);
if (63 < delta)
break;
mask = 0x1LL << delta;
msg.futures |= mask;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " setting bit for %u (delta %u) (%llX) -> %llX\n",
- copy->mid, delta, mask, msg.futures);
+ " !! setting bit for %u (delta %u) (%llX) -> %llX\n",
+ copy->mid, delta, mask, msg.futures);
}
- LOG (GNUNET_ERROR_TYPE_DEBUG, " final futures %llX\n", msg.futures);
+ msg.mid = htonl (ack);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "!!! ACK for %u, futures %llX\n",
+ ack, msg.futures);
- GMCH_send_prebuilt_message (&msg.header, ch, fwd);
+ GMCH_send_prebuilt_message (&msg.header, ch, !fwd, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
}
void
GMCH_allow_client (struct MeshChannel *ch, int fwd)
{
+ struct MeshChannelReliability *rel;
+ unsigned int buffer;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n");
+
if (MESH_CHANNEL_READY != ch->state)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n");
return;
+ }
+ if (GNUNET_YES == ch->reliable)
+ {
+ rel = fwd ? ch->root_rel : ch->dest_rel;
+ if (NULL == rel)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (NULL != rel->head_sent && 64 <= rel->mid_send - rel->head_sent->mid)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n");
+ return;
+ }
+ }
+
+ if (is_loopback (ch))
+ buffer = GMCH_get_buffer (ch, fwd);
+ else
+ buffer = GMT_get_connections_buffer (ch->t);
+
+ if (0 == buffer)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n");
+ return;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer);
send_client_ack (ch, fwd);
}
(!fwd &&
ch->dest == c) ) )
{
- GNUNET_break (0);
+ GNUNET_break_op (0);
return GNUNET_SYSERR;
}
rel = fwd ? ch->root_rel : ch->dest_rel;
+ if (GNUNET_NO == rel->client_allowed)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
rel->client_allowed = GNUNET_NO;
/* Ok, everything is correct, send the message. */
payload->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA);
payload->chid = htonl (ch->gid);
LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n");
- if (GNUNET_YES == ch->reliable)
- channel_save_copy (ch, &payload->header, fwd);
- GMCH_send_prebuilt_message (&payload->header, ch, fwd);
+ GMCH_send_prebuilt_message (&payload->header, ch, fwd, GNUNET_NO);
if (is_loopback (ch))
{
- if (GMCH_get_buffer (ch, fwd) > 0);
+ if (GMCH_get_buffer (ch, fwd) > 0)
send_client_ack (ch, fwd);
return GNUNET_OK;
msgcc.port = msg->port;
msgcc.opt = msg->opt;
- GMT_send_prebuilt_message (&msgcc.header, t, ch, GNUNET_YES);
+ GMT_send_prebuilt_message (&msgcc.header, t, ch, GNUNET_YES, NULL, NULL);
}
return GNUNET_OK;
}
GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO);
mid = ntohl (msg->mid);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " mid %u\n", mid);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! got mid %u\n", mid);
if (GNUNET_NO == ch->reliable ||
( !GMC_is_pid_bigger (rel->mid_recv, mid) &&
GNUNET_break_op (0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
" MID %u not expected (%u - %u), dropping!\n",
- mid, rel->mid_recv, rel->mid_recv + 64);
+ mid, rel->mid_recv, rel->mid_recv + 63);
}
GMCH_send_data_ack (ch, fwd);
GNUNET_break (0);
return;
}
- fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
+ /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
+ fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
}
ack = ntohl (msg->mid);
}
if (NULL == rel)
{
- GNUNET_break (0);
+ GNUNET_break_op (0);
return;
}
+ /* Free ACK'd copies: no need to retransmit those anymore */
for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
{
if (GMC_is_pid_bigger (copy->mid, ack))
break;
}
work = GNUNET_YES;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! id %u\n", copy->mid);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " !! id %u\n", copy->mid);
next = copy->next;
- rel_message_free (copy);
+ rel_message_free (copy, GNUNET_YES);
}
+
/* ACK client if needed */
-// channel_send_ack (t, type, GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK == type);
+ GMCH_allow_client (ch, fwd);
- /* If some message was free'd, update the retransmission delay*/
+ /* If some message was free'd, update the retransmission delay */
if (GNUNET_YES == work)
{
if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
* @param message Message to send. Function makes a copy of it.
* @param ch Channel on which this message is transmitted.
* @param fwd Is this a fwd message?
+ * @param retransmission Is this a retransmission? (Don't save a copy)
*/
void
GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
- struct MeshChannel *ch, int fwd)
+ struct MeshChannel *ch, int fwd,
+ int retransmission)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH Send %s %s on channel %s\n",
fwd ? "FWD" : "BCK", GNUNET_MESH_DEBUG_M2S (ntohs (message->type)),
return;
}
- GMT_send_prebuilt_message (message, ch->t, ch, fwd);
+ if (GNUNET_YES == ch->reliable && GNUNET_NO == retransmission
+ && ntohs (message->type) == GNUNET_MESSAGE_TYPE_MESH_DATA)
+ {
+ struct MeshReliableMessage *copy;
+
+ copy = channel_save_copy (ch, message, fwd);
+ copy->q = GMT_send_prebuilt_message (message, ch->t, ch, fwd,
+ &message_sent, copy);
+ }
+ else
+ GMT_send_prebuilt_message (message, ch->t, ch, fwd, NULL, NULL);
}