From: Bart Polot Date: Sat, 30 Nov 2013 01:17:48 +0000 (+0000) Subject: - retransmit channel create based on separate timeout tasks X-Git-Tag: initial-import-from-subversion-38251~5845 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=ea45834dfdb68b7c6c3c2fc5b560b23d6e6aa505;p=oweals%2Fgnunet.git - retransmit channel create based on separate timeout tasks --- diff --git a/src/mesh/gnunet-service-mesh_channel.c b/src/mesh/gnunet-service-mesh_channel.c index 20f49fc9f..0739ad021 100644 --- a/src/mesh/gnunet-service-mesh_channel.c +++ b/src/mesh/gnunet-service-mesh_channel.c @@ -165,9 +165,9 @@ struct MeshChannelReliability uint32_t mid_recv; /** - * Handle for queued DATA_ACKs. + * Handle for queued unique data CREATE, DATA_ACK. */ - struct MeshChannelQueue *ack_q; + struct MeshChannelQueue *uniq; /** * Can we send data to the client? @@ -318,6 +318,16 @@ extern GNUNET_PEER_Id myid; static void rel_message_free (struct MeshReliableMessage *copy, int update_time); +/** + * send a channel create message. + * + * @param ch Channel for which to send. + */ +static void +send_create (struct MeshChannel *ch); + + + /** * We have received a message out of order, or the client is not ready. * Buffer it until we receive an ACK from the client or the missing @@ -397,6 +407,66 @@ add_destination (struct MeshChannel *ch, struct MeshClient *c) } +/** + * Set options in a channel, extracted from a bit flag field. + * + * @param ch Channel to set options to. + * @param options Bit array in host byte order. + */ +static void +channel_set_options (struct MeshChannel *ch, uint32_t options) +{ + ch->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ? + GNUNET_YES : GNUNET_NO; + ch->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ? + GNUNET_YES : GNUNET_NO; +} + + +/** + * Get a bit flag field with the options of a channel. + * + * @param ch Channel to get options from. + * + * @return Bit array in host byte order. + */ +static uint32_t +channel_get_options (struct MeshChannel *ch) +{ + uint32_t options; + + options = 0; + if (ch->nobuffer) + options |= GNUNET_MESH_OPTION_NOBUFFER; + if (ch->reliable) + options |= GNUNET_MESH_OPTION_RELIABLE; + + return options; +} + + +/** + * Notify the destination client that a new incoming channel was created. + * + * @param ch Channel that was created. + */ +static void +send_client_create (struct MeshChannel *ch) +{ + uint32_t opt; + + if (NULL == ch->dest) + return; + + opt = 0; + opt |= GNUNET_YES == ch->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0; + opt |= GNUNET_YES == ch->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0; + GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt, + GMT_get_destination (ch->t)); + +} + + /** * Send data to a client. * @@ -535,6 +605,180 @@ send_client_nack (struct MeshChannel *ch) } +/** + * We haven't received an ACK after a certain time: restransmit the message. + * + * @param cls Closure (MeshChannelReliability with the message to restransmit) + * @param tc TaskContext. + */ +static void +channel_retransmit_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MeshChannelReliability *rel = cls; + struct MeshReliableMessage *copy; + struct MeshChannel *ch; + struct GNUNET_MESH_Data *payload; + int fwd; + + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + ch = rel->ch; + copy = rel->head_sent; + if (NULL == copy) + { + GNUNET_break (0); + return; + } + + payload = (struct GNUNET_MESH_Data *) ©[1]; + fwd = (rel == ch->root_rel); + + /* Message not found in the queue that we are going to use. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid); + + GMCH_send_prebuilt_message (&payload->header, ch, fwd, copy); + GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); +} + + +/** + * We haven't received an Channel ACK after a certain time: resend the CREATE. + * + * @param cls Closure (MeshChannelReliability of the channel to recreate) + * @param tc TaskContext. + */ +static void +channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MeshChannelReliability *rel = cls; + + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n"); + GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); + + if (rel == rel->ch->root_rel) + { + send_create (rel->ch); + } + else + { + + } + +} + + +/** + * Message has been sent: start retransmission timer. + * + * @param cls Closure (queue structure). + * @param t Tunnel. + * @param q Queue handler (no longer valid). + * @param type Type of message. + * @param size Size of the message. + */ +static void +ch_message_sent (void *cls, + struct MeshTunnel3 *t, + struct MeshTunnel3Queue *q, + uint16_t type, size_t size) +{ + struct MeshChannelQueue *ch_q = cls; + struct MeshReliableMessage *copy = ch_q->copy; + struct MeshChannelReliability *rel; + + switch (ch_q->type) + { + case GNUNET_MESSAGE_TYPE_MESH_DATA: + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %u %s (c: %p, q: %p)\n", + copy->mid, GM_m2s (type), copy, copy->q); + GNUNET_assert (ch_q == copy->q); + copy->timestamp = GNUNET_TIME_absolute_get (); + rel = copy->rel; + if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) + { + if (0 != rel->expected_delay.rel_value_us) + { + rel->retry_timer = + GNUNET_TIME_relative_multiply (rel->expected_delay, + MESH_RETRANSMIT_MARGIN); + } + else + { + rel->retry_timer = MESH_RETRANSMIT_TIME; + } + rel->retry_task = + GNUNET_SCHEDULER_add_delayed (rel->retry_timer, + &channel_retransmit_message, rel); + } + copy->q = NULL; + break; + + + case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: + rel = ch_q->rel; + GNUNET_assert (rel->uniq == ch_q); + rel->uniq = NULL; + break; + + + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: + rel = ch_q->rel; + GNUNET_assert (rel->uniq == ch_q); + if (MESH_CHANNEL_READY != rel->ch->state) + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer, + &channel_recreate, rel); + } + rel->uniq = NULL; + break; + + + default: + GNUNET_break (0); + GNUNET_free (ch_q); + return; + } + + GNUNET_free (ch_q); +} + + +/** + * send a channel create message. + * + * @param ch Channel for which to send. + */ +static void +send_create (struct MeshChannel *ch) +{ + struct GNUNET_MESH_ChannelCreate msgcc; + struct MeshChannelQueue *q; + + msgcc.header.size = htons (sizeof (msgcc)); + msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE); + msgcc.chid = htonl (ch->gid); + msgcc.port = htonl (ch->port); + msgcc.opt = htonl (channel_get_options (ch)); + + q = GNUNET_new (struct MeshChannelQueue); + q->rel = ch->root_rel; + + /* FIXME cancel on confirm */ + q->q = GMT_send_prebuilt_message (&msgcc.header, ch->t, ch, + GNUNET_YES, GNUNET_YES, + ch_message_sent, q); + q->rel->uniq = q; +} + + /** * Notify a client that the channel is no longer valid. * @@ -601,8 +845,8 @@ channel_rel_free_all (struct MeshChannelReliability *rel) { GNUNET_SCHEDULER_cancel (rel->retry_task); } - if (NULL != rel->ack_q) - GMT_cancel (rel->ack_q->q); + if (NULL != rel->uniq) + GMT_cancel (rel->uniq->q); GNUNET_free (rel); } @@ -678,45 +922,6 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, } -/** - * We haven't received an ACK after a certain time: restransmit the message. - * - * @param cls Closure (MeshReliableMessage with the message to restransmit) - * @param tc TaskContext. - */ -static void -channel_retransmit_message (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct MeshChannelReliability *rel = cls; - struct MeshReliableMessage *copy; - struct MeshChannel *ch; - struct GNUNET_MESH_Data *payload; - int fwd; - - rel->retry_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - - ch = rel->ch; - copy = rel->head_sent; - if (NULL == copy) - { - GNUNET_break (0); - return; - } - - payload = (struct GNUNET_MESH_Data *) ©[1]; - fwd = (rel == ch->root_rel); - - /* Message not found in the queue that we are going to use. */ - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid); - - GMCH_send_prebuilt_message (&payload->header, ch, fwd, copy); - GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); -} - - /** * Destroy a reliable message after it has been acknowledged, either by * direct mid ACK or bitfield. Updates the appropriate data structures and @@ -828,8 +1033,6 @@ static void channel_confirm (struct MeshChannel *ch, int fwd) { struct MeshChannelReliability *rel; - struct MeshReliableMessage *copy; - struct MeshReliableMessage *next; LOG (GNUNET_ERROR_TYPE_DEBUG, " channel confirm %s %s:%X\n", @@ -838,19 +1041,23 @@ channel_confirm (struct MeshChannel *ch, int fwd) rel = fwd ? ch->root_rel : ch->dest_rel; rel->client_ready = GNUNET_YES; - for (copy = rel->head_sent; NULL != copy; copy = next) - { - struct GNUNET_MessageHeader *msg; + send_client_ack (ch, fwd); - next = copy->next; - msg = (struct GNUNET_MessageHeader *) ©[1]; - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE) - { - rel_message_free (copy, GNUNET_YES); - /* TODO return? */ - } + if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) + { + GNUNET_SCHEDULER_cancel (rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + } + else if (NULL != rel->uniq) + { + GMT_cancel (rel->uniq->q); + /* ch_sent_message will free and NULL uniq */ + } + else + { + /* We SHOULD have been trying to retransmit this! */ + GNUNET_break (0); } - send_client_ack (ch, fwd); /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */ if (fwd) @@ -858,71 +1065,6 @@ channel_confirm (struct MeshChannel *ch, int fwd) } -/** - * Message has been sent: start retransmission timer. - * - * @param cls Closure (queue structure). - * @param t Tunnel. - * @param q Queue handler (no longer valid). - * @param type Type of message. - * @param size Size of the message. - */ -static void -ch_message_sent (void *cls, - struct MeshTunnel3 *t, - struct MeshTunnel3Queue *q, - uint16_t type, size_t size) -{ - struct MeshChannelQueue *ch_q = cls; - struct MeshReliableMessage *copy = ch_q->copy; - struct MeshChannelReliability *rel; - - switch (ch_q->type) - { - case GNUNET_MESSAGE_TYPE_MESH_DATA: - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %u %s (c: %p, q: %p)\n", - copy->mid, GM_m2s (type), copy, copy->q); - GNUNET_assert (ch_q == copy->q); - copy->timestamp = GNUNET_TIME_absolute_get (); - rel = copy->rel; - if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) - { - if (0 != rel->expected_delay.rel_value_us) - { - rel->retry_timer = - GNUNET_TIME_relative_multiply (rel->expected_delay, - MESH_RETRANSMIT_MARGIN); - } - else - { - rel->retry_timer = MESH_RETRANSMIT_TIME; - } - rel->retry_task = - GNUNET_SCHEDULER_add_delayed (rel->retry_timer, - &channel_retransmit_message, - rel); - } - copy->q = NULL; - break; - - - case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: - rel = ch_q->rel; - GNUNET_assert (rel->ack_q == ch_q); - rel->ack_q = NULL; - break; - - - default: - GNUNET_break (0); - GNUNET_free (ch_q); - return; - } - - GNUNET_free (ch_q); -} - - /** * Save a copy to retransmit in case it gets lost. * @@ -997,20 +1139,12 @@ channel_new (struct MeshTunnel3 *t, /** - * Set options in a channel, extracted from a bit flag field + * Test if the channel is loopback: both root and dest are on the local peer. * - * @param ch Channel to set options to. - * @param options Bit array in host byte order. + * @param ch Channel to test. + * + * @return #GNUNET_YES if channel is loopback, #NGUNET_NO otherwise. */ -static void -channel_set_options (struct MeshChannel *ch, uint32_t options) -{ - ch->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ? - GNUNET_YES : GNUNET_NO; - ch->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ? - GNUNET_YES : GNUNET_NO; -} - static int is_loopback (const struct MeshChannel *ch) { @@ -1241,28 +1375,6 @@ GMCH_is_terminal (struct MeshChannel *ch, int fwd) } -/** - * Notify the destination client that a new incoming channel was created. - * - * @param ch Channel that was created. - */ -void -GMCH_send_create (struct MeshChannel *ch) -{ - uint32_t opt; - - if (NULL == ch->dest) - return; - - opt = 0; - opt |= GNUNET_YES == ch->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0; - opt |= GNUNET_YES == ch->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0; - GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt, - GMT_get_destination (ch->t)); - -} - - /** * Send an end-to-end ACK message for the most recent in-sequence payload. * @@ -1624,20 +1736,8 @@ GMCH_handle_local_create (struct MeshClient *c, LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GMCH_2s (ch)); - /* Send create channel */ - { - struct GNUNET_MESH_ChannelCreate msgcc; + send_create (ch); - msgcc.header.size = htons (sizeof (msgcc)); - msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE); - msgcc.chid = htonl (ch->gid); - msgcc.port = msg->port; - msgcc.opt = msg->opt; - - /* FIXME retransmit if lost */ - GMT_send_prebuilt_message (&msgcc.header, t, ch, - GNUNET_YES, GNUNET_YES, NULL, NULL); - } return GNUNET_OK; } @@ -1891,7 +1991,7 @@ GMCH_handle_create (struct MeshTunnel3 *t, else LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n"); - GMCH_send_create (ch); + send_client_create (ch); channel_send_ack (ch, GNUNET_YES); return ch; @@ -2075,18 +2175,18 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct MeshChannelReliability *rel; rel = fwd ? ch->root_rel : ch->dest_rel; - if (NULL != rel->ack_q) + if (NULL != rel->uniq) { - GMT_cancel (rel->ack_q->q); + GMT_cancel (rel->uniq->q); /* ch_message_sent is called, freeing ack_q */ } - rel->ack_q = GNUNET_new (struct MeshChannelQueue); - rel->ack_q->type = type; - rel->ack_q->rel = rel; - rel->ack_q->q = GMT_send_prebuilt_message (message, ch->t, ch, + rel->uniq = GNUNET_new (struct MeshChannelQueue); + rel->uniq->type = type; + rel->uniq->rel = rel; + rel->uniq->q = GMT_send_prebuilt_message (message, ch->t, ch, fwd, GNUNET_YES, &ch_message_sent, - rel->ack_q); + rel->uniq); } break; default: