From f52fc4b001758430bb911759c755d0f06d3eb693 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 20 Jan 2017 20:39:51 +0100 Subject: [PATCH] working on channel passing data to clients --- src/cadet/cadet.h | 2 +- src/cadet/cadet_api.c | 8 +- src/cadet/gnunet-service-cadet-new.c | 2 +- src/cadet/gnunet-service-cadet-new.h | 1 - src/cadet/gnunet-service-cadet-new_channel.c | 215 +++++++++++++++++-- src/cadet/gnunet-service-cadet_local.c | 4 +- 6 files changed, 205 insertions(+), 27 deletions(-) diff --git a/src/cadet/cadet.h b/src/cadet/cadet.h index c16fb2917..9d154fb99 100644 --- a/src/cadet/cadet.h +++ b/src/cadet/cadet.h @@ -198,7 +198,7 @@ struct GNUNET_CADET_LocalData /** * ID of the channel */ - struct GNUNET_CADET_ClientChannelNumber id; + struct GNUNET_CADET_ClientChannelNumber channel_id; /** * Payload follows diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 8f1274d63..5dcf43e46 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -569,7 +569,7 @@ request_data (void *cls) env = GNUNET_MQ_msg_extra (msg, th->size, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - msg->id = th->channel->chid; + msg->channel_id = th->channel->chid; osize = th->notify (th->notify_cls, th->size, &msg[1]); @@ -697,7 +697,7 @@ check_local_data (void *cls, } ch = retrieve_channel (h, - message->id); + message->channel_id); if (NULL == ch) { GNUNET_break_op (0); @@ -727,7 +727,7 @@ handle_local_data (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n"); - ch = retrieve_channel (h, message->id); + ch = retrieve_channel (h, message->channel_id); GNUNET_assert (NULL != ch); payload = (struct GNUNET_MessageHeader *) &message[1]; @@ -735,7 +735,7 @@ handle_local_data (void *cls, GC_f2s (ntohl (ch->chid.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI), GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), - ntohl (message->id.channel_of_client)); + ntohl (message->channel_id.channel_of_client)); type = ntohs (payload->type); LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); diff --git a/src/cadet/gnunet-service-cadet-new.c b/src/cadet/gnunet-service-cadet-new.c index 7b4a0e95b..7801708c1 100644 --- a/src/cadet/gnunet-service-cadet-new.c +++ b/src/cadet/gnunet-service-cadet-new.c @@ -628,7 +628,7 @@ handle_data (void *cls, struct CadetChannel *ch; const struct GNUNET_MessageHeader *payload; - chid = msg->id; + chid = msg->channel_id; map = get_map_by_chid (c, chid); ch = GNUNET_CONTAINER_multihashmap32_get (map, diff --git a/src/cadet/gnunet-service-cadet-new.h b/src/cadet/gnunet-service-cadet-new.h index 9f4667e23..b3bb85d85 100644 --- a/src/cadet/gnunet-service-cadet-new.h +++ b/src/cadet/gnunet-service-cadet-new.h @@ -220,7 +220,6 @@ extern unsigned long long ratchet_messages; extern struct GNUNET_TIME_Relative ratchet_time; - /** * Send a message to a client. * diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 5d2eba618..75ec81992 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,14 +25,12 @@ * @author Christian Grothoff * * TODO: - * - handle CREATE_ACK - * - handle plaintext data - * - handle plaintext ACK * - handle destroy * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! * - check that '0xFFULL' really is sufficient for flow control! - * - what about the 'no buffer' option? - * - what about the 'out-of-order' option? + * - revisit handling of 'unreliable' traffic! + * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. + * - figure out flow control without ACKs (unreliable traffic!) */ #include "platform.h" #include "gnunet_util_lib.h" @@ -147,7 +145,8 @@ struct CadetOutOfOrderMessage struct CadetOutOfOrderMessage *prev; /** - * ID of the message (ACK needed to free) + * ID of the message (messages up to this point needed + * before we give this one to the client). */ struct ChannelMessageIdentifier mid; @@ -311,7 +310,6 @@ struct CadetChannel }; - /** * Get the static string for identification of the channel. * @@ -480,8 +478,10 @@ GCCH_channel_local_new (struct CadetClient *owner, struct CadetChannel *ch; ch = GNUNET_new (struct CadetChannel); - ch->max_pending_messages = 32; /* FIXME: allow control via options - or adjust dynamically... */ + ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); + ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); + ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); + ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */ ch->owner = owner; ch->lid = owner_id; ch->port = *port; @@ -490,9 +490,6 @@ GCCH_channel_local_new (struct CadetClient *owner, ch->chid = GCT_add_channel (ch->t, ch); ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; - ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); - ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); - ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create, ch); GNUNET_STATISTICS_update (stats, @@ -538,8 +535,6 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, struct CadetClient *c; ch = GNUNET_new (struct CadetChannel); - ch->max_pending_messages = 32; /* FIXME: allow control via options - or adjust dynamically... */ ch->port = *port; ch->t = t; ch->chid = chid; @@ -547,6 +542,7 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); + ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */ GNUNET_STATISTICS_update (stats, "# channels", 1, @@ -634,6 +630,27 @@ send_connect_ack (void *cls) } +/** + * Send a LOCAL ACK to the client to solicit more messages. + * + * @param ch channel the ack is for + * @param c client to send the ACK to + */ +static void +send_ack_to_client (struct CadetChannel *ch, + struct CadetClient *c) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_LocalAck *ack; + + env = GNUNET_MQ_msg (ack, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); + ack->channel_id = ch->lid; + GSC_send_to_client (c, + env); +} + + /** * A client is bound to the port that we have a channel * open to. Send the acknowledgement for the connection @@ -672,6 +689,10 @@ GCCH_bind (struct CadetChannel *ch, /* notify other peer that we accepted the connection */ ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack, ch); + /* give client it's initial supply of ACKs */ + for (unsigned int i=0;imax_pending_messages;i++) + send_ack_to_client (ch, + ch->owner); } @@ -742,12 +763,75 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch) void GCCH_handle_channel_create_ack (struct CadetChannel *ch) { - GNUNET_break (0); // FIXME! + switch (ch->state) + { + case CADET_CHANNEL_NEW: + /* this should be impossible */ + GNUNET_break (0); + break; + case CADET_CHANNEL_CREATE_SENT: + if (NULL == ch->owner) + { + /* We're not the owner, wrong direction! */ + GNUNET_break_op (0); + return; + } + ch->state = CADET_CHANNEL_READY; + /* On first connect, send client as many ACKs as we allow messages + to be buffered! */ + for (unsigned int i=0;imax_pending_messages;i++) + send_ack_to_client (ch, + ch->owner); + break; + case CADET_CHANNEL_READY: + /* duplicate ACK, maybe we retried the CREATE. Ignore. */ + GNUNET_STATISTICS_update (stats, + "# duplicate CREATE_ACKs", + 1, + GNUNET_NO); + break; + } } /** - * We got payload data for a channel. Pass it on to the client. + * Test if element @a e1 comes before element @a e2. + * + * TODO: use opportunity to create generic list insertion sort + * logic in container! + * + * @param cls closure, our `struct CadetChannel` + * @param e1 an element of to sort + * @param e2 another element to sort + * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO + */ +static int +is_before (void *cls, + void *e1, + void *e2) +{ + struct CadetOutOfOrderMessage *m1 = e1; + struct CadetOutOfOrderMessage *m2 = e2; + uint32_t v1 = ntohl (m1->mid.mid); + uint32_t v2 = ntohl (m2->mid.mid); + uint32_t delta; + + delta = v1 - v2; + if (delta > (uint32_t) INT_MAX) + { + /* in overflow range, we can safely assume we wrapped around */ + return GNUNET_NO; + } + else + { + return GNUNET_YES; + } +} + + +/** + * We got payload data for a channel. Pass it on to the client + * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data */ @@ -755,7 +839,70 @@ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, const struct GNUNET_CADET_ChannelAppDataMessage *msg) { - GNUNET_break (0); // FIXME! + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_LocalData *ld; + struct CadetOutOfOrderMessage *com; + size_t payload_size; + + payload_size = ntohs (msg->header.size) - sizeof (*msg); + env = GNUNET_MQ_msg_extra (ld, + payload_size, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); + ld->channel_id = ch->lid; + GNUNET_memcpy (&ld[1], + &msg[1], + payload_size); + if ( (GNUNET_YES == ch->client_ready) && + ( (GNUNET_YES == ch->out_of_order) || + (msg->mid.mid == ch->mid_recv.mid) ) ) + { + GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + env); + ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); + ch->mid_futures >>= 1; + } + else + { + /* FIXME-SECURITY: if the element is WAY too far ahead, + drop it (can't buffer too much!) */ + com = GNUNET_new (struct CadetOutOfOrderMessage); + com->mid = msg->mid; + com->env = env; + /* sort into list ordered by "is_before" */ + if ( (NULL == ch->head_recv) || + (GNUNET_YES == is_before (ch, + com, + ch->head_recv)) ) + { + GNUNET_CONTAINER_DLL_insert (ch->head_recv, + ch->tail_recv, + com); + } + else + { + struct CadetOutOfOrderMessage *pos; + + for (pos = ch->head_recv; + NULL != pos; + pos = pos->next) + { + if (GNUNET_YES != + is_before (ch, + pos, + com)) + break; + } + if (NULL == pos) + GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, + ch->tail_recv, + com); + else + GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, + ch->tail_recv, + com, + pos->prev); + } + } } @@ -770,7 +917,37 @@ void GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, const struct GNUNET_CADET_ChannelDataAckMessage *ack) { - GNUNET_break (0); // FIXME! + struct CadetReliableMessage *crm; + + if (GNUNET_NO == ch->reliable) + { + /* not expecting ACKs on unreliable channel, odd */ + GNUNET_break_op (0); + return; + } + for (crm = ch->head_sent; + NULL != crm; + crm = crm->next) + if (ack->mid.mid == crm->data_message.mid.mid) + break; + if (NULL == crm) + { + /* ACK for message we already dropped, might have been a + duplicate ACK? Ignore. */ + GNUNET_STATISTICS_update (stats, + "# duplicate CHANNEL_DATA_ACKs", + 1, + GNUNET_NO); + return; + } + GNUNET_CONTAINER_DLL_remove (ch->head_sent, + ch->tail_sent, + crm); + ch->pending_messages--; + GNUNET_free (crm); + GNUNET_assert (ch->pending_messages < ch->max_pending_messages); + send_ack_to_client (ch, + (NULL == ch->owner) ? ch->dest : ch->owner); } @@ -1026,6 +1203,8 @@ send_client_buffered_data (struct CadetChannel *ch) GNUNET_CONTAINER_DLL_remove (ch->head_recv, ch->tail_recv, com); + /* FIXME: if unreliable, this is not aggressive + enough, as it would be OK to have lost some! */ ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ GSC_send_to_client (ch->owner ? ch->owner : ch->dest, diff --git a/src/cadet/gnunet-service-cadet_local.c b/src/cadet/gnunet-service-cadet_local.c index e1f6ac4c3..c476f6ac2 100644 --- a/src/cadet/gnunet-service-cadet_local.c +++ b/src/cadet/gnunet-service-cadet_local.c @@ -586,7 +586,7 @@ handle_data (void *cls, struct GNUNET_SERVER_Client *client, return; } - chid = msg->id; + chid = msg->channel_id; LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes (%u payload) by client %u\n", payload_size, payload_claimed_size, c->id); @@ -1531,7 +1531,7 @@ GML_send_data (struct CadetClient *c, GNUNET_memcpy (©[1], &msg[1], size); copy->header.size = htons (sizeof (struct GNUNET_CADET_LocalData) + size); copy->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - copy->id = id; + copy->channel_id = id; GNUNET_SERVER_notification_context_unicast (nc, c->handle, ©->header, GNUNET_NO); } -- 2.25.1