From 8ed6d64262665ba9ce306823f569213feabba669 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 24 Jan 2017 21:00:23 +0100 Subject: [PATCH] fix client-client loopback flow control --- src/cadet/cadet_api.c | 85 ++--- src/cadet/gnunet-service-cadet-new.c | 8 +- src/cadet/gnunet-service-cadet-new_channel.c | 316 +++++++++++-------- src/cadet/gnunet-service-cadet-new_channel.h | 5 +- 4 files changed, 249 insertions(+), 165 deletions(-) diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 859a5378b..89d9daeda 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -601,45 +601,49 @@ handle_channel_created (void *cls, ccn = msg->ccn; port_number = &msg->port; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating incoming channel %X [%s]\n", - ntohl (ccn.channel_of_client), - GNUNET_h2s (port_number)); if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) { GNUNET_break (0); return; } port = find_port (h, port_number); - if (NULL != port) - { - void *ctx; - - ch = create_channel (h, ccn); - ch->peer = GNUNET_PEER_intern (&msg->peer); - ch->cadet = h; - ch->ccn = ccn; - ch->port = port; - ch->options = ntohl (msg->opt); - - LOG (GNUNET_ERROR_TYPE_DEBUG, " created channel %p\n", ch); - ctx = port->handler (port->cls, ch, &msg->peer, port->hash, ch->options); - if (NULL != ctx) - ch->ctx = ctx; - LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n"); - } - else + if (NULL == port) { struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; struct GNUNET_MQ_Envelope *env; - LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n"); + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "No handler for incoming channel %X [%s]\n", + ntohl (ccn.channel_of_client), + GNUNET_h2s (port_number)); + /* FIXME: should disconnect instead, this is a serious error! */ env = GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); d_msg->ccn = msg->ccn; - GNUNET_MQ_send (h->mq, env); + GNUNET_MQ_send (h->mq, + env); + return; } - return; + + ch = create_channel (h, + ccn); + ch->peer = GNUNET_PEER_intern (&msg->peer); + ch->cadet = h; + ch->ccn = ccn; + ch->port = port; + ch->options = ntohl (msg->opt); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating incoming channel %X [%s] %p\n", + ntohl (ccn.channel_of_client), + GNUNET_h2s (port_number), + ch); + ch->ctx = port->handler (port->cls, + ch, + &msg->peer, + port->hash, + ch->options); } @@ -735,12 +739,13 @@ handle_local_data (void *cls, payload = (struct GNUNET_MessageHeader *) &message[1]; type = ntohs (payload->type); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got a %s data on channel %s [%X] of type %s\n", + "Got a %s data on channel %s [%X] of type %s (%u)\n", GC_f2s (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI), GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (message->ccn.channel_of_client), - GC_m2s (type)); + GC_m2s (type), + type); for (i = 0; i < h->n_handlers; i++) { @@ -748,7 +753,10 @@ handle_local_data (void *cls, if (handler->type == type) { if (GNUNET_OK != - handler->callback (h->cls, ch, &ch->ctx, payload)) + handler->callback (h->cls, + ch, + &ch->ctx, + payload)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n"); @@ -1378,9 +1386,10 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_CADET_Handle *h; - LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n"); h = GNUNET_new (struct GNUNET_CADET_Handle); - LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "GNUNET_CADET_connect() %p\n", + h); h->cfg = cfg; h->cleaner = cleaner; h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES); @@ -1401,7 +1410,6 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, for (h->n_handlers = 0; handlers && handlers[h->n_handlers].type; h->n_handlers++) ; - LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect() END\n"); return h; } @@ -1574,18 +1582,19 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, struct GNUNET_CADET_Channel *ch; struct GNUNET_CADET_ClientChannelNumber ccn; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating new channel to %s:%u\n", - GNUNET_i2s (peer), port); ccn.channel_of_client = htonl (0); ch = create_channel (h, ccn); - LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", ch); - LOG (GNUNET_ERROR_TYPE_DEBUG, " number %X\n", - ntohl (ch->ccn.channel_of_client)); ch->ctx = channel_ctx; ch->peer = GNUNET_PEER_intern (peer); - env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new channel to %s:%u at %p number %X\n", + GNUNET_i2s (peer), + port, + ch, + ntohl (ch->ccn.channel_of_client)); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE); msg->ccn = ch->ccn; msg->port = *port; msg->peer = *peer; diff --git a/src/cadet/gnunet-service-cadet-new.c b/src/cadet/gnunet-service-cadet-new.c index e67e507e3..097f77647 100644 --- a/src/cadet/gnunet-service-cadet-new.c +++ b/src/cadet/gnunet-service-cadet-new.c @@ -621,7 +621,8 @@ handle_channel_destroy (void *cls, ntohl (msg->ccn.channel_of_client), ch)); GCCH_channel_local_destroy (ch, - c); + c, + msg->ccn); GNUNET_SERVICE_client_continue (c->client); } @@ -1201,6 +1202,7 @@ channel_destroy_iterator (void *cls, void *value) { struct CadetClient *c = cls; + struct GNUNET_CADET_ClientChannelNumber ccn; struct CadetChannel *ch = value; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1211,8 +1213,10 @@ channel_destroy_iterator (void *cls, GNUNET_CONTAINER_multihashmap32_remove (c->channels, key, ch)); + ccn.channel_of_client = htonl (key); GCCH_channel_local_destroy (ch, - c); + c, + ccn); return GNUNET_OK; } diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 74aafe5a1..98cfa8383 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,6 +25,8 @@ * @author Christian Grothoff * * TODO: + * - FIXME: send ACKs back to loopback clients! + * * - introduce shutdown so we can have half-closed channels, modify * destroy to include MID to have FIN-ACK equivalents, etc. * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! @@ -159,6 +161,46 @@ struct CadetOutOfOrderMessage }; +/** + * Client endpoint of a `struct CadetChannel`. A channel may be a + * loopback channel, in which case it has two of these endpoints. + * Note that flow control also is required in both directions. + */ +struct CadetChannelClient +{ + /** + * Client handle. Not by itself sufficient to designate + * the client endpoint, as the same client handle may + * be used for both the owner and the destination, and + * we thus also need the channel ID to identify the client. + */ + struct CadetClient *c; + + /** + * Head of DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *head_recv; + + /** + * Tail DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *tail_recv; + + /** + * Local tunnel number for this client. + * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, + * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) + */ + struct GNUNET_CADET_ClientChannelNumber ccn; + + /** + * Can we send data to the client? + */ + int client_ready; + +}; + + /** * Struct containing all information regarding a channel to a remote client. */ @@ -173,13 +215,13 @@ struct CadetChannel * Client owner of the tunnel, if any. * (Used if this channel represends the initiating end of the tunnel.) */ - struct CadetClient *owner; + struct CadetChannelClient *owner; /** * Client destination of the tunnel, if any. * (Used if this channel represents the listening end of the tunnel.) */ - struct CadetClient *dest; + struct CadetChannelClient *dest; /** * Last entry in the tunnel's queue relating to control messages @@ -199,16 +241,6 @@ struct CadetChannel */ struct CadetReliableMessage *tail_sent; - /** - * Head of DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *head_recv; - - /** - * Tail DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *tail_recv; - /** * Task to resend/poll in case no ACK is received. */ @@ -270,28 +302,11 @@ struct CadetChannel */ struct GNUNET_CADET_ChannelTunnelNumber ctn; - /** - * Local tunnel number for local client @e owner owning the channel. - * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_owner; - - /** - * Local tunnel number for local client @e dest owning the channel. - * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_dest; - /** * Channel state. */ enum CadetChannelState state; - /** - * Can we send data to the client? - */ - int client_ready; - /** * Is the tunnel bufferless (minimum latency)? */ @@ -342,8 +357,8 @@ GCCH_2s (const struct CadetChannel *ch) : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), GNUNET_h2s (&ch->port), ch->ctn, - ntohl (ch->ccn_owner.channel_of_client), - ntohl (ch->ccn_dest.channel_of_client)); + (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), + (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); return buf; } @@ -362,6 +377,28 @@ GCCH_get_id (const struct CadetChannel *ch) } +/** + * Release memory associated with @a ccc + * + * @param ccc data structure to clean up + */ +static void +free_channel_client (struct CadetChannelClient *ccc) +{ + struct CadetOutOfOrderMessage *com; + + while (NULL != (com = ccc->head_recv)) + { + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + } + GNUNET_free (ccc); +} + + /** * Destroy the given channel. * @@ -371,7 +408,6 @@ static void channel_destroy (struct CadetChannel *ch) { struct CadetReliableMessage *crm; - struct CadetOutOfOrderMessage *com; while (NULL != (crm = ch->head_sent)) { @@ -386,13 +422,15 @@ channel_destroy (struct CadetChannel *ch) crm); GNUNET_free (crm); } - while (NULL != (com = ch->head_recv)) + if (NULL != ch->owner) { - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, - com); - GNUNET_MQ_discard (com->env); - GNUNET_free (com); + free_channel_client (ch->owner); + ch->owner = NULL; + } + if (NULL != ch->dest) + { + free_channel_client (ch->dest); + ch->dest = NULL; } if (NULL != ch->last_control_qe) { @@ -444,7 +482,7 @@ channel_open_sent_cb (void *cls) ch->last_control_qe = NULL; ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sent CHANNEL_OPEN on %s, retrying in %s\n", + "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", GCCH_2s (ch), GNUNET_STRINGS_relative_time_to_string (ch->retry_time, GNUNET_YES)); @@ -532,23 +570,30 @@ GCCH_channel_local_new (struct CadetClient *owner, uint32_t options) { struct CadetChannel *ch; + struct CadetChannelClient *ccco; + + ccco = GNUNET_new (struct CadetChannelClient); + ccco->c = owner; + ccco->ccn = ccn; + ccco->client_ready = GNUNET_YES; ch = GNUNET_new (struct CadetChannel); ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ - ch->owner = owner; - ch->ccn_owner = ccn; + ch->owner = ccco; ch->port = *port; if (0 == memcmp (&my_full_id, GCP_get_id (destination), sizeof (struct GNUNET_PeerIdentity))) { + struct CadetClient *c; + ch->is_loopback = GNUNET_YES; - ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports, - port); - if (NULL == ch->dest) + c = GNUNET_CONTAINER_multihashmap_get (open_ports, + port); + if (NULL == c) { /* port closed, wait for it to possibly open */ (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, @@ -561,8 +606,11 @@ GCCH_channel_local_new (struct CadetClient *owner, } else { + ch->dest = GNUNET_new (struct CadetChannelClient); + ch->dest->c = c; + ch->dest->client_ready = GNUNET_YES; GCCH_bind (ch, - ch->dest); + ch->dest->c); } } else @@ -786,20 +834,18 @@ send_ack_to_client (struct CadetChannel *ch, { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalAck *ack; - struct CadetClient *c; + struct CadetChannelClient *ccc; env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest; - c = (GNUNET_YES == to_owner) - ? ch->owner - : ch->dest; + ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; + ack->ccn = ccc->ccn; LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", - GSC_2s (c), + GSC_2s (ccc->c), (GNUNET_YES == to_owner) ? "owner" : "dest", ntohl (ack->ccn.channel_of_client)); - GSC_send_to_client (c, + GSC_send_to_client (ccc->c, env); } @@ -817,6 +863,7 @@ GCCH_bind (struct CadetChannel *ch, struct CadetClient *c) { uint32_t options; + struct CadetChannelClient *cccd; LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding %s from %s to port %s of %s\n", @@ -837,16 +884,19 @@ GCCH_bind (struct CadetChannel *ch, options |= GNUNET_CADET_OPTION_RELIABLE; if (ch->out_of_order) options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; - ch->dest = c; - ch->ccn_dest = GSC_bind (c, - ch, - (GNUNET_YES == ch->is_loopback) - ? GCP_get (&my_full_id, - GNUNET_YES) - : GCT_get_destination (ch->t), - &ch->port, - options); - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + cccd = GNUNET_new (struct CadetChannelClient); + ch->dest = cccd; + cccd->c = c; + cccd->client_ready = GNUNET_YES; + cccd->ccn = GSC_bind (c, + ch, + (GNUNET_YES == ch->is_loopback) + ? GCP_get (&my_full_id, + GNUNET_YES) + : GCT_get_destination (ch->t), + &ch->port, + options); + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ if (GNUNET_YES == ch->is_loopback) @@ -862,7 +912,7 @@ GCCH_bind (struct CadetChannel *ch, ch); } /* give client it's initial supply of ACKs */ - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); for (unsigned int i=0;imax_pending_messages;i++) send_ack_to_client (ch, @@ -876,22 +926,37 @@ GCCH_bind (struct CadetChannel *ch, * * @param ch channel to destroy * @param c client that caused the destruction + * @param ccn client number of the client @a c */ void GCCH_channel_local_destroy (struct CadetChannel *ch, - struct CadetClient *c) + struct CadetClient *c, + struct GNUNET_CADET_ClientChannelNumber ccn) { LOG (GNUNET_ERROR_TYPE_DEBUG, "%s asks for destruction of %s\n", GSC_2s (c), GCCH_2s (ch)); GNUNET_assert (NULL != c); - if (c == ch->owner) + if ( (NULL != ch->owner) && + (c == ch->owner->c) && + (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) + { + free_channel_client (ch->owner); ch->owner = NULL; - else if (c == ch->dest) + } + else if ( (NULL != ch->dest) && + (c == ch->dest->c) && + (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) + { + free_channel_client (ch->dest); ch->dest = NULL; + } else + { GNUNET_assert (0); + } + if (GNUNET_YES == ch->destroy) { /* other end already destroyed, with the local client gone, no need @@ -1008,6 +1073,7 @@ is_before (void *cls, * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data + * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, @@ -1015,6 +1081,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; size_t payload_size; @@ -1023,11 +1090,12 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, env = GNUNET_MQ_msg_extra (ld, payload_size, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest; + ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; GNUNET_memcpy (&ld[1], &msg[1], payload_size); - if ( (GNUNET_YES == ch->client_ready) && + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if ( (GNUNET_YES == ccc->client_ready) && ( (GNUNET_YES == ch->out_of_order) || (msg->mid.mid == ch->mid_recv.mid) ) ) { @@ -1035,8 +1103,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, "Giving %u bytes of payload from %s to client %s\n", (unsigned int) payload_size, GCCH_2s (ch), - GSC_2s (ch->owner ? ch->owner : ch->dest)); - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + GSC_2s (ccc->c)); + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; @@ -1047,7 +1116,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, drop it (can't buffer too much!) */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", - (GNUNET_YES == ch->client_ready) + (GNUNET_YES == ccc->client_ready) ? "out-of-order" : "client-not-ready", (unsigned int) payload_size, @@ -1059,36 +1128,36 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, com->mid = msg->mid; com->env = env; /* sort into list ordered by "is_before" */ - if ( (NULL == ch->head_recv) || + if ( (NULL == ccc->head_recv) || (GNUNET_YES == is_before (ch, com, - ch->head_recv)) ) + ccc->head_recv)) ) { - GNUNET_CONTAINER_DLL_insert (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert (ccc->head_recv, + ccc->tail_recv, com); } else { struct CadetOutOfOrderMessage *pos; - for (pos = ch->head_recv; + for (pos = ccc->head_recv; NULL != pos; pos = pos->next) { if (GNUNET_YES != - is_before (ch, + is_before (NULL, pos, com)) break; } if (NULL == pos) - GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv, + ccc->tail_recv, com); else - GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv, + ccc->tail_recv, com, pos->prev); } @@ -1166,6 +1235,8 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, void GCCH_handle_remote_destroy (struct CadetChannel *ch) { + struct CadetChannelClient *ccc; + GNUNET_assert (GNUNET_NO == ch->is_loopback); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received remote channel DESTROY for %s\n", @@ -1176,7 +1247,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) channel_destroy (ch); return; } - if (NULL != ch->head_recv) + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if (NULL != ccc->head_recv) { LOG (GNUNET_ERROR_TYPE_WARNING, "Lost end of transmission due to remote shutdown on %s\n", @@ -1184,8 +1256,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) /* FIXME: change API to notify client about truncated transmission! */ } ch->destroy = GNUNET_YES; - GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest, - (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest, + GSC_handle_remote_channel_destroy (ccc->c, + ccc->ccn, ch); channel_destroy (ch); } @@ -1326,7 +1398,7 @@ GCCH_handle_local_data (struct CadetChannel *ch, if (GNUNET_YES == ch->is_loopback) { - struct CadetClient *receiver; + struct CadetChannelClient *receiver; struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; int to_owner; @@ -1335,25 +1407,24 @@ GCCH_handle_local_data (struct CadetChannel *ch, buf_len, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); if (sender_ccn.channel_of_client == - ch->ccn_owner.channel_of_client) + ch->owner->ccn.channel_of_client) { receiver = ch->dest; - ld->ccn = ch->ccn_dest; to_owner = GNUNET_NO; } else { GNUNET_assert (sender_ccn.channel_of_client == - ch->ccn_dest.channel_of_client); + ch->dest->ccn.channel_of_client); receiver = ch->owner; - ld->ccn = ch->ccn_owner; to_owner = GNUNET_YES; } + ld->ccn = receiver->ccn; GNUNET_memcpy (&ld[1], buf, buf_len); /* FIXME: this does not provide for flow control! */ - GSC_send_to_client (receiver, + GSC_send_to_client (receiver->c, env); send_ack_to_client (ch, to_owner); @@ -1387,18 +1458,31 @@ GCCH_handle_local_data (struct CadetChannel *ch, /** - * Try to deliver messages to the local client, if it is ready for more. + * Handle ACK from client on local channel. Means the client is ready + * for more data, see if we have any for it. * - * @param ch channel to process + * @param ch channel to destroy + * @param client_ccn ccn of the client sending the ack */ -static void -send_client_buffered_data (struct CadetChannel *ch) +void +GCCH_handle_local_ack (struct CadetChannel *ch, + struct GNUNET_CADET_ClientChannelNumber client_ccn) { + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; - if (GNUNET_NO == ch->client_ready) - return; /* client not ready */ - com = ch->head_recv; + if ( (NULL != ch->owner) && + (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->owner; + else if ( (NULL != ch->dest) && + (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->dest; + else + GNUNET_assert (0); + ccc->client_ready = GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, client ready to receive more data!\n"); + com = ccc->head_recv; if (NULL == com) return; /* none pending */ if ( (com->mid.mid != ch->mid_recv.mid) && @@ -1410,14 +1494,15 @@ send_client_buffered_data (struct CadetChannel *ch) GCCH_2s (ch)); /* all good, pass next message to client */ - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, com); /* FIXME: if unreliable, this is not aggressive enough, as it would be OK to have lost some! */ ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, com->env); GNUNET_free (com); if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && @@ -1435,7 +1520,7 @@ send_client_buffered_data (struct CadetChannel *ch) send_channel_data_ack (ch); } - if (NULL != ch->head_recv) + if (NULL != ccc->head_recv) return; if (GNUNET_NO == ch->destroy) return; @@ -1445,23 +1530,6 @@ send_client_buffered_data (struct CadetChannel *ch) } -/** - * Handle ACK from client on local channel. - * - * @param ch channel to destroy - * @param client_ccn ccn of the client sending the ack - */ -void -GCCH_handle_local_ack (struct CadetChannel *ch, - struct GNUNET_CADET_ClientChannelNumber client_ccn) -{ - ch->client_ready = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL_ACK, client ready to receive more data!\n"); - send_client_buffered_data (ch); -} - - #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) @@ -1497,17 +1565,17 @@ GCCH_debug (struct CadetChannel *ch, { LOG2 (level, "CHN origin %s ready %s local-id: %u\n", - GSC_2s (ch->owner), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_owner.channel_of_client)); + GSC_2s (ch->owner->c), + ch->owner->client_ready ? "YES" : "NO", + ntohl (ch->owner->ccn.channel_of_client)); } if (NULL != ch->dest) { LOG2 (level, "CHN destination %s ready %s local-id: %u\n", - GSC_2s (ch->dest), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_dest.channel_of_client)); + GSC_2s (ch->dest->c), + ch->dest->client_ready ? "YES" : "NO", + ntohl (ch->dest->ccn.channel_of_client)); } LOG2 (level, "CHN Message IDs recv: %d (%LLX), send: %d\n", diff --git a/src/cadet/gnunet-service-cadet-new_channel.h b/src/cadet/gnunet-service-cadet-new_channel.h index 41f0bfe9b..e572b7633 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.h +++ b/src/cadet/gnunet-service-cadet-new_channel.h @@ -114,10 +114,12 @@ GCCH_bind (struct CadetChannel *ch, * * @param ch channel to destroy * @param c client that caused the destruction + * @param ccn client number of the client @a c */ void GCCH_channel_local_destroy (struct CadetChannel *ch, - struct CadetClient *c); + struct CadetClient *c, + struct GNUNET_CADET_ClientChannelNumber ccn); /** @@ -166,6 +168,7 @@ GCCH_handle_duplicate_open (struct CadetChannel *ch); * We got payload data for a channel. Pass it on to the client. * * @param ch channel that got data + * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, -- 2.25.1