* @author Christian Grothoff
*
* TODO:
+ * - Optimize ACKs by using 'mid_futures' properly!
* - introduce shutdown so we can have half-closed channels, modify
* destroy to include MID to have FIN-ACK equivalents, etc.
* - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
/**
* Bitfield of already-received messages past @e mid_recv.
+ *
+ * FIXME: not yet properly used (bits here are never set!)
*/
uint64_t mid_futures;
msg.futures = GNUNET_htonll (ch->mid_futures);
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending DATA_ACK %u:%llX via %s\n",
+ (unsigned int) ntohl (msg.mid.mid),
+ (unsigned long long) ch->mid_futures,
+ GCCH_2s (ch));
ch->last_control_qe = GCT_send (ch->t,
&msg.header,
&send_ack_cb,
if (delta > (uint32_t) INT_MAX)
{
/* in overflow range, we can safely assume we wrapped around */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u > %u => %p > %p\n",
- (unsigned int) v1,
- (unsigned int) v2,
- m1,
- m2);
return GNUNET_NO;
}
else
{
/* result is small, thus v2 > v1, thus e1 < e2 */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u < %u => %p < %p\n",
- (unsigned int) v1,
- (unsigned int) v2,
- m1,
- m2);
return GNUNET_YES;
}
}
struct GNUNET_CADET_LocalData *ld;
struct CadetChannelClient *ccc;
size_t payload_size;
+ struct CadetOutOfOrderMessage *com;
+ int duplicate;
+ uint32_t mid_min;
+ uint32_t mid_max;
+ uint32_t mid_msg;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
if ( (GNUNET_YES == ch->destroy) &&
env);
ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
ch->mid_futures >>= 1;
+ if (GNUNET_YES == ch->reliable)
+ send_channel_data_ack (ch);
+ return;
}
- else
+
+ /* check if message ought to be dropped because it is anicent/too distant/duplicate */
+ mid_min = ntohl (ch->mid_recv.mid);
+ mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
+ mid_msg = ntohl (msg->mid.mid);
+ if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
+ ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
{
- struct CadetOutOfOrderMessage *com;
- int duplicate;
- uint32_t mid_min;
- uint32_t mid_max;
- uint32_t mid_msg;
-
- mid_min = ntohl (ch->mid_recv.mid);
- mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
- mid_msg = ntohl (msg->mid.mid);
- if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
- ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
- (unsigned int) payload_size,
- GCCH_2s (ch),
- ntohl (msg->mid.mid));
- GNUNET_STATISTICS_update (stats,
- "# duplicate DATA (ancient or future)",
- 1,
- GNUNET_NO);
- GNUNET_MQ_discard (env);
- return;
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA (ancient or future)",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ return;
+ }
- com = GNUNET_new (struct CadetOutOfOrderMessage);
- com->mid = msg->mid;
- com->env = env;
- duplicate = GNUNET_NO;
- GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
- is_before,
- &duplicate,
- ccc->head_recv,
- ccc->tail_recv,
- com);
- if (GNUNET_YES == duplicate)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
- (unsigned int) payload_size,
- GCCH_2s (ch),
- ntohl (msg->mid.mid));
- GNUNET_STATISTICS_update (stats,
- "# duplicate DATA",
- 1,
- GNUNET_NO);
- GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
- ccc->tail_recv,
- com);
- GNUNET_MQ_discard (com->env);
- GNUNET_free (com);
- return;
- }
+ /* Insert message into sorted out-of-order queue */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ duplicate = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
+ is_before,
+ &duplicate,
+ ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ if (GNUNET_YES == duplicate)
+ {
+ /* Duplicate within the queue, drop also */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n",
- (GNUNET_YES == ccc->client_ready)
- ? "out-of-order"
- : "client-not-ready",
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
(unsigned int) payload_size,
GCCH_2s (ch),
- ntohl (msg->mid.mid),
- ntohl (ch->mid_recv.mid));
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ if (GNUNET_YES == ch->reliable)
+ send_channel_data_ack (ch);
+ return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
+ (GNUNET_YES == ccc->client_ready)
+ ? "out-of-order"
+ : "client-not-ready",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc,
+ ntohl (msg->mid.mid),
+ ntohl (ch->mid_recv.mid));
+ send_channel_data_ack (ch);
}
}
+/**
+ * We got an ACK for a message in our queue, remove it from
+ * the queue and tell our client that it can send more.
+ *
+ * @param ch the channel that got the ACK
+ * @param crm the message that got acknowledged
+ */
+static void
+handle_matching_ack (struct CadetChannel *ch,
+ struct CadetReliableMessage *crm)
+{
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ ch->pending_messages);
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
+}
+
+
/**
* We got an acknowledgement for payload data for a channel.
* Possibly resume transmissions.
const struct GNUNET_CADET_ChannelDataAckMessage *ack)
{
struct CadetReliableMessage *crm;
- int was_head;
+ struct CadetReliableMessage *crmn;
+ int found;
+ uint32_t mid_base;
+ uint64_t mid_mask;
+ unsigned int delta;
GNUNET_break (GNUNET_NO == ch->is_loopback);
if (GNUNET_NO == ch->reliable)
GNUNET_break_op (0);
return;
}
+ mid_base = ntohl (ack->mid.mid);
+ mid_mask = GNUNET_htonll (ack->futures);
+ found = GNUNET_NO;
for (crm = ch->head_sent;
NULL != crm;
- crm = crm->next)
+ crm = crmn)
+ {
+ crmn = crm->next;
if (ack->mid.mid == crm->data_message->mid.mid)
- break;
- if (NULL == crm)
+ {
+ handle_matching_ack (ch,
+ crm);
+ continue;
+ }
+ delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
+ if (delta >= 64)
+ continue;
+ if (0 != (mid_mask & (1LLU << delta)))
+ handle_matching_ack (ch,
+ crm);
+ }
+ if (GNUNET_NO == found)
{
/* ACK for message we already dropped, might have been a
duplicate ACK? Ignore. */
GNUNET_NO);
return;
}
- was_head = (crm == ch->head_sent);
- GNUNET_CONTAINER_DLL_remove (ch->head_sent,
- ch->tail_sent,
- crm);
- GNUNET_free (crm->data_message);
- GNUNET_free (crm);
- ch->pending_messages--;
- GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
- GCCH_2s (ch),
- (unsigned int) ntohl (ack->mid.mid),
- ch->pending_messages);
- send_ack_to_client (ch,
- (NULL == ch->owner)
- ? GNUNET_NO
- : GNUNET_YES);
- if (was_head)
+ if (NULL != ch->retry_data_task)
{
- if (NULL != ch->retry_data_task)
- {
- GNUNET_SCHEDULER_cancel (ch->retry_data_task);
- ch->retry_data_task = NULL;
- }
- if (NULL != ch->head_sent)
- ch->retry_data_task
- = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
- &retry_transmission,
- ch);
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
}
+ if (NULL != ch->head_sent)
+ ch->retry_data_task
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
if (NULL == com)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on %s)!\n",
+ "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
GSC_2s (ccc->c),
+ ntohl (client_ccn.channel_of_client),
+ GCCH_2s (ch),
ntohl (ccc->ccn.channel_of_client),
- GCCH_2s (ch));
+ ccc);
return; /* none pending */
}
if (GNUNET_YES == ch->is_loopback)
return; /* missing next one in-order */
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
- GSC_2s (ccc->c),
- ntohl (ccc->ccn.channel_of_client),
- GCCH_2s (ch));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
+ ntohl (com->mid.mid),
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ GCCH_2s (ch));
/* all good, pass next message to client */
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
urgently waiting for an ACK from us. (As we have an inherent
maximum of 64 bits, and 15 is getting too close for comfort.)
So we should send one now. */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sender on %s likely blocked on flow-control, sending ACK now.\n",
- GCCH_2s (ch));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sender on %s likely blocked on flow-control, sending ACK now.\n",
+ GCCH_2s (ch));
if (GNUNET_YES == ch->reliable)
send_channel_data_ack (ch);
}