* @author Christian Grothoff
*
* TODO:
- * - handle CREATE_ACK
- * - handle plaintext data
- * - handle plaintext ACK
* - handle destroy
* - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
* - check that '0xFFULL' really is sufficient for flow control!
- * - what about the 'no buffer' option?
- * - what about the 'out-of-order' option?
+ * - revisit handling of 'unreliable' traffic!
+ * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
+ * - figure out flow control without ACKs (unreliable traffic!)
*/
#include "platform.h"
#include "gnunet_util_lib.h"
struct CadetOutOfOrderMessage *prev;
/**
- * ID of the message (ACK needed to free)
+ * ID of the message (messages up to this point needed
+ * before we give this one to the client).
*/
struct ChannelMessageIdentifier mid;
};
-
/**
* Get the static string for identification of the channel.
*
struct CadetChannel *ch;
ch = GNUNET_new (struct CadetChannel);
- ch->max_pending_messages = 32; /* FIXME: allow control via options
- or adjust dynamically... */
+ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
+ ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
+ ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
ch->owner = owner;
ch->lid = owner_id;
ch->port = *port;
ch->chid = GCT_add_channel (ch->t,
ch);
ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
- ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
- ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
- ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
ch);
GNUNET_STATISTICS_update (stats,
struct CadetClient *c;
ch = GNUNET_new (struct CadetChannel);
- ch->max_pending_messages = 32; /* FIXME: allow control via options
- or adjust dynamically... */
ch->port = *port;
ch->t = t;
ch->chid = chid;
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
GNUNET_STATISTICS_update (stats,
"# channels",
1,
}
+/**
+ * Send a LOCAL ACK to the client to solicit more messages.
+ *
+ * @param ch channel the ack is for
+ * @param c client to send the ACK to
+ */
+static void
+send_ack_to_client (struct CadetChannel *ch,
+ struct CadetClient *c)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalAck *ack;
+
+ env = GNUNET_MQ_msg (ack,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+ ack->channel_id = ch->lid;
+ GSC_send_to_client (c,
+ env);
+}
+
+
/**
* A client is bound to the port that we have a channel
* open to. Send the acknowledgement for the connection
/* notify other peer that we accepted the connection */
ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
ch);
+ /* give client it's initial supply of ACKs */
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ ch->owner);
}
void
GCCH_handle_channel_create_ack (struct CadetChannel *ch)
{
- GNUNET_break (0); // FIXME!
+ switch (ch->state)
+ {
+ case CADET_CHANNEL_NEW:
+ /* this should be impossible */
+ GNUNET_break (0);
+ break;
+ case CADET_CHANNEL_CREATE_SENT:
+ if (NULL == ch->owner)
+ {
+ /* We're not the owner, wrong direction! */
+ GNUNET_break_op (0);
+ return;
+ }
+ ch->state = CADET_CHANNEL_READY;
+ /* On first connect, send client as many ACKs as we allow messages
+ to be buffered! */
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ ch->owner);
+ break;
+ case CADET_CHANNEL_READY:
+ /* duplicate ACK, maybe we retried the CREATE. Ignore. */
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate CREATE_ACKs",
+ 1,
+ GNUNET_NO);
+ break;
+ }
}
/**
- * We got payload data for a channel. Pass it on to the client.
+ * Test if element @a e1 comes before element @a e2.
+ *
+ * TODO: use opportunity to create generic list insertion sort
+ * logic in container!
+ *
+ * @param cls closure, our `struct CadetChannel`
+ * @param e1 an element of to sort
+ * @param e2 another element to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
+ */
+static int
+is_before (void *cls,
+ void *e1,
+ void *e2)
+{
+ struct CadetOutOfOrderMessage *m1 = e1;
+ struct CadetOutOfOrderMessage *m2 = e2;
+ uint32_t v1 = ntohl (m1->mid.mid);
+ uint32_t v2 = ntohl (m2->mid.mid);
+ uint32_t delta;
+
+ delta = v1 - v2;
+ if (delta > (uint32_t) INT_MAX)
+ {
+ /* in overflow range, we can safely assume we wrapped around */
+ return GNUNET_NO;
+ }
+ else
+ {
+ return GNUNET_YES;
+ }
+}
+
+
+/**
+ * We got payload data for a channel. Pass it on to the client
+ * and send an ACK to the other end (once flow control allows it!)
*
* @param ch channel that got data
*/
GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
const struct GNUNET_CADET_ChannelAppDataMessage *msg)
{
- GNUNET_break (0); // FIXME!
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *ld;
+ struct CadetOutOfOrderMessage *com;
+ size_t payload_size;
+
+ payload_size = ntohs (msg->header.size) - sizeof (*msg);
+ env = GNUNET_MQ_msg_extra (ld,
+ payload_size,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
+ ld->channel_id = ch->lid;
+ GNUNET_memcpy (&ld[1],
+ &msg[1],
+ payload_size);
+ if ( (GNUNET_YES == ch->client_ready) &&
+ ( (GNUNET_YES == ch->out_of_order) ||
+ (msg->mid.mid == ch->mid_recv.mid) ) )
+ {
+ GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
+ env);
+ ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
+ ch->mid_futures >>= 1;
+ }
+ else
+ {
+ /* FIXME-SECURITY: if the element is WAY too far ahead,
+ drop it (can't buffer too much!) */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ /* sort into list ordered by "is_before" */
+ if ( (NULL == ch->head_recv) ||
+ (GNUNET_YES == is_before (ch,
+ com,
+ ch->head_recv)) )
+ {
+ GNUNET_CONTAINER_DLL_insert (ch->head_recv,
+ ch->tail_recv,
+ com);
+ }
+ else
+ {
+ struct CadetOutOfOrderMessage *pos;
+
+ for (pos = ch->head_recv;
+ NULL != pos;
+ pos = pos->next)
+ {
+ if (GNUNET_YES !=
+ is_before (ch,
+ pos,
+ com))
+ break;
+ }
+ if (NULL == pos)
+ GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
+ ch->tail_recv,
+ com);
+ else
+ GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
+ ch->tail_recv,
+ com,
+ pos->prev);
+ }
+ }
}
GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
const struct GNUNET_CADET_ChannelDataAckMessage *ack)
{
- GNUNET_break (0); // FIXME!
+ struct CadetReliableMessage *crm;
+
+ if (GNUNET_NO == ch->reliable)
+ {
+ /* not expecting ACKs on unreliable channel, odd */
+ GNUNET_break_op (0);
+ return;
+ }
+ for (crm = ch->head_sent;
+ NULL != crm;
+ crm = crm->next)
+ if (ack->mid.mid == crm->data_message.mid.mid)
+ break;
+ if (NULL == crm)
+ {
+ /* ACK for message we already dropped, might have been a
+ duplicate ACK? Ignore. */
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate CHANNEL_DATA_ACKs",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_free (crm);
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ send_ack_to_client (ch,
+ (NULL == ch->owner) ? ch->dest : ch->owner);
}
GNUNET_CONTAINER_DLL_remove (ch->head_recv,
ch->tail_recv,
com);
+ /* FIXME: if unreliable, this is not aggressive
+ enough, as it would be OK to have lost some! */
ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
ch->mid_futures >>= 1; /* equivalent to division by 2 */
GSC_send_to_client (ch->owner ? ch->owner : ch->dest,