X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh_channel.c;h=06cf599d4ca6c89e5a9933eee2f47cb9a630a886;hb=e43078b68951ad8a3daa3a193473e9c321549e1d;hp=d3e78efa53fa1ec0d8b6675e4635fd3fc06bdb63;hpb=fb69ad9897d96ad2772ff3ef64d4673802765821;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh_channel.c b/src/mesh/gnunet-service-mesh_channel.c index d3e78efa5..06cf599d4 100644 --- a/src/mesh/gnunet-service-mesh_channel.c +++ b/src/mesh/gnunet-service-mesh_channel.c @@ -24,8 +24,8 @@ #include "gnunet_statistics_service.h" -#include "mesh_enc.h" -#include "mesh_protocol_enc.h" +#include "mesh.h" +#include "mesh_protocol.h" #include "gnunet-service-mesh_channel.h" #include "gnunet-service-mesh_local.h" @@ -34,7 +34,8 @@ #define LOG(level, ...) GNUNET_log_from(level,"mesh-chn",__VA_ARGS__) -#define MESH_RETRANSMIT_TIME GNUNET_TIME_UNIT_SECONDS +#define MESH_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(\ + GNUNET_TIME_UNIT_MILLISECONDS, 250) #define MESH_RETRANSMIT_MARGIN 4 @@ -60,6 +61,32 @@ enum MeshChannelState }; +/** + * Info holder for channel messages in queues. + */ +struct MeshChannelQueue +{ + /** + * Tunnel Queue. + */ + struct MeshTunnel3Queue *tq; + + /** + * Message type (DATA/DATA_ACK) + */ + uint16_t type; + + /** + * Message copy (for DATAs, to start retransmission timer) + */ + struct MeshReliableMessage *copy; + + /** + * Reliability (for DATA_ACKs, to access rel->ack_q) + */ + struct MeshChannelReliability *rel; +}; + /** * Info needed to retry a message in case it gets lost. @@ -87,6 +114,11 @@ struct MeshReliableMessage */ uint32_t mid; + /** + * Tunnel Queue. + */ + struct MeshChannelQueue *chq; + /** * When was this message issued (to calculate ACK delay) */ @@ -112,11 +144,6 @@ struct MeshChannelReliability struct MeshReliableMessage *head_sent; struct MeshReliableMessage *tail_sent; - /** - * Messages pending to send. - */ - unsigned int n_sent; - /** * DLL of messages received out of order. */ @@ -138,6 +165,11 @@ struct MeshChannelReliability */ uint32_t mid_recv; + /** + * Handle for queued unique data CREATE, DATA_ACK. + */ + struct MeshChannelQueue *uniq; + /** * Can we send data to the client? */ @@ -235,7 +267,7 @@ struct MeshChannel int destroy; /** - * Total messages pending for this channel, payload or not. + * Total (reliable) messages pending ACK for this channel. */ unsigned int pending_messages; @@ -279,9 +311,49 @@ extern GNUNET_PEER_Id myid; * timers and frees all memory. * * @param copy Message that is no longer needed: remote peer got it. + * @param update_time Is the timing information relevant? + * If this message is ACK in a batch the timing information + * is skewed by the retransmission, count only for the + * retransmitted message. + */ +static int +rel_message_free (struct MeshReliableMessage *copy, int update_time); + +/** + * send a channel create message. + * + * @param ch Channel for which to send. + */ +static void +send_create (struct MeshChannel *ch); + +/** + * Confirm we got a channel create, FWD ack. + * + * @param ch The channel to confirm. + * @param fwd Should we send a FWD ACK? (going dest->root) */ static void -rel_message_free (struct MeshReliableMessage *copy); +send_ack (struct MeshChannel *ch, int fwd); + + + +/** + * Test if the channel is loopback: both root and dest are on the local peer. + * + * @param ch Channel to test. + * + * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise. + */ +static int +is_loopback (const struct MeshChannel *ch) +{ + if (NULL != ch->t) + return GMT_is_loopback (ch->t); + + return (NULL != ch->root && NULL != ch->dest); +} + /** * We have received a message out of order, or the client is not ready. @@ -308,6 +380,7 @@ add_buffered_data (const struct GNUNET_MESH_Data *msg, copy = GNUNET_malloc (sizeof (*copy) + size); copy->mid = mid; copy->rel = rel; + copy->type = GNUNET_MESSAGE_TYPE_MESH_DATA; memcpy (©[1], msg, size); rel->n_recv++; @@ -317,7 +390,7 @@ add_buffered_data (const struct GNUNET_MESH_Data *msg, for (prev = rel->head_recv; NULL != prev; prev = prev->next) { LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid); - if (GMC_is_pid_bigger (prev->mid, mid)) + if (GM_is_pid_bigger (prev->mid, mid)) { LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n"); GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv, @@ -330,6 +403,7 @@ add_buffered_data (const struct GNUNET_MESH_Data *msg, LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n"); } + /** * Add a destination client to a channel, initializing all data structures * in the channel and the client. @@ -355,12 +429,73 @@ add_destination (struct MeshChannel *ch, struct MeshClient *c) GNUNET_break (NULL == ch->dest_rel); ch->dest_rel = GNUNET_new (struct MeshChannelReliability); ch->dest_rel->ch = ch; - ch->dest_rel->expected_delay = MESH_RETRANSMIT_TIME; + ch->dest_rel->expected_delay.rel_value_us = 0; + ch->dest_rel->retry_timer = MESH_RETRANSMIT_TIME; ch->dest = c; } +/** + * Set options in a channel, extracted from a bit flag field. + * + * @param ch Channel to set options to. + * @param options Bit array in host byte order. + */ +static void +channel_set_options (struct MeshChannel *ch, uint32_t options) +{ + ch->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ? + GNUNET_YES : GNUNET_NO; + ch->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ? + GNUNET_YES : GNUNET_NO; +} + + +/** + * Get a bit flag field with the options of a channel. + * + * @param ch Channel to get options from. + * + * @return Bit array in host byte order. + */ +static uint32_t +channel_get_options (struct MeshChannel *ch) +{ + uint32_t options; + + options = 0; + if (ch->nobuffer) + options |= GNUNET_MESH_OPTION_NOBUFFER; + if (ch->reliable) + options |= GNUNET_MESH_OPTION_RELIABLE; + + return options; +} + + +/** + * Notify the destination client that a new incoming channel was created. + * + * @param ch Channel that was created. + */ +static void +send_client_create (struct MeshChannel *ch) +{ + uint32_t opt; + + if (NULL == ch->dest) + return; + + opt = 0; + opt |= GNUNET_YES == ch->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0; + opt |= GNUNET_YES == ch->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0; + GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt, + GMT_get_destination (ch->t)); + +} + + /** * Send data to a client. * @@ -426,20 +561,21 @@ send_client_buffered_data (struct MeshChannel *ch, struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1]; LOG (GNUNET_ERROR_TYPE_DEBUG, - " have %u! now expecting %u\n", - copy->mid, rel->mid_recv + 1); + " have %u! now expecting %u\n", + copy->mid, rel->mid_recv + 1); send_client_data (ch, msg, fwd); rel->n_recv--; rel->mid_recv++; GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); + LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE RECV %p\n", copy); GNUNET_free (copy); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, - " reliable && don't have %u, next is %u\n", - rel->mid_recv, - copy->mid); + " reliable && don't have %u, next is %u\n", + rel->mid_recv, + copy->mid); return; } } @@ -459,10 +595,16 @@ static void send_client_ack (struct MeshChannel *ch, int fwd) { struct MeshChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel; + struct MeshClient *c = fwd ? ch->root : ch->dest; + if (NULL == c) + { + GNUNET_break (GNUNET_NO != ch->destroy); + return; + } LOG (GNUNET_ERROR_TYPE_DEBUG, " sending %s ack to client on channel %s\n", - fwd ? "FWD" : "BCK", GMCH_2s (ch)); + GM_f2s (fwd), GMCH_2s (ch)); if (NULL == rel) { @@ -477,7 +619,7 @@ send_client_ack (struct MeshChannel *ch, int fwd) } rel->client_allowed = GNUNET_YES; - GML_send_ack (fwd ? ch->root : ch->dest, fwd ? ch->lid_root : ch->lid_dest); + GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest); } @@ -494,7 +636,282 @@ send_client_nack (struct MeshChannel *ch) GNUNET_break (0); return; } - GML_send_nack (ch->root, ch->lid_root); + GML_send_channel_nack (ch->root, ch->lid_root); +} + + +/** + * We haven't received an ACK after a certain time: restransmit the message. + * + * @param cls Closure (MeshChannelReliability with the message to restransmit) + * @param tc TaskContext. + */ +static void +channel_retransmit_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MeshChannelReliability *rel = cls; + struct MeshReliableMessage *copy; + struct MeshChannel *ch; + struct GNUNET_MESH_Data *payload; + int fwd; + + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + ch = rel->ch; + copy = rel->head_sent; + if (NULL == copy) + { + GNUNET_break (0); + return; + } + + payload = (struct GNUNET_MESH_Data *) ©[1]; + fwd = (rel == ch->root_rel); + + /* Message not found in the queue that we are going to use. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid); + + GMCH_send_prebuilt_message (&payload->header, ch, fwd, copy); + GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); +} + + +/** + * We haven't received an Channel ACK after a certain time: resend the CREATE. + * + * @param cls Closure (MeshChannelReliability of the channel to recreate) + * @param tc TaskContext. + */ +static void +channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MeshChannelReliability *rel = cls; + + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n"); + GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); + + if (rel == rel->ch->root_rel) + { + send_create (rel->ch); + } + else if (rel == rel->ch->dest_rel) + { + send_ack (rel->ch, GNUNET_YES); + } + else + { + GNUNET_break (0); + } + +} + + +/** + * Message has been sent: start retransmission timer. + * + * @param cls Closure (queue structure). + * @param t Tunnel. + * @param q Queue handler (no longer valid). + * @param type Type of message. + * @param size Size of the message. + */ +static void +ch_message_sent (void *cls, + struct MeshTunnel3 *t, + struct MeshTunnel3Queue *q, + uint16_t type, size_t size) +{ + struct MeshChannelQueue *chq = cls; + struct MeshReliableMessage *copy = chq->copy; + struct MeshChannelReliability *rel; + + switch (chq->type) + { + case GNUNET_MESSAGE_TYPE_MESH_DATA: + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid); + GNUNET_assert (chq == copy->chq); + copy->timestamp = GNUNET_TIME_absolute_get (); + rel = copy->rel; + if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->expected_delay, + GNUNET_YES)); + if (0 != rel->expected_delay.rel_value_us) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n"); + rel->retry_timer = + GNUNET_TIME_relative_multiply (rel->expected_delay, + MESH_RETRANSMIT_MARGIN); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n"); + rel->retry_timer = MESH_RETRANSMIT_TIME; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, + GNUNET_NO)); + rel->retry_task = + GNUNET_SCHEDULER_add_delayed (rel->retry_timer, + &channel_retransmit_message, rel); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task); + } + copy->chq = NULL; + break; + + + case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK: + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GM_m2s (chq->type)); + rel = chq->rel; + GNUNET_assert (rel->uniq == chq); + rel->uniq = NULL; + + if (MESH_CHANNEL_READY != rel->ch->state + && GNUNET_MESSAGE_TYPE_MESH_DATA_ACK != type + && GNUNET_NO == rel->ch->destroy) + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rel->retry_task); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, + GNUNET_NO)); + rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer); + rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer, + &channel_recreate, rel); + } + break; + + + default: + GNUNET_break (0); + } + + GNUNET_free (chq); +} + + +/** + * send a channel create message. + * + * @param ch Channel for which to send. + */ +static void +send_create (struct MeshChannel *ch) +{ + struct GNUNET_MESH_ChannelCreate msgcc; + + msgcc.header.size = htons (sizeof (msgcc)); + msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE); + msgcc.chid = htonl (ch->gid); + msgcc.port = htonl (ch->port); + msgcc.opt = htonl (channel_get_options (ch)); + + GMCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL); +} + + +/** + * Confirm we got a channel create or FWD ack. + * + * @param ch The channel to confirm. + * @param fwd Should we send a FWD ACK? (going dest->root) + */ +static void +send_ack (struct MeshChannel *ch, int fwd) +{ + struct GNUNET_MESH_ChannelManage msg; + + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK); + LOG (GNUNET_ERROR_TYPE_DEBUG, + " sending channel %s ack for channel %s\n", + GM_f2s (fwd), GMCH_2s (ch)); + + msg.chid = htonl (ch->gid); + GMCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL); +} + + +/** + * Send a message and don't keep any info about it: we won't need to cancel it + * or resend it. + * + * @param msg Header of the message to fire away. + * @param ch Channel on which the message should go. + * @param force Is this a forced (undroppable) message? + */ +static void +fire_and_forget (const struct GNUNET_MessageHeader *msg, + struct MeshChannel *ch, + int force) +{ + GNUNET_break (NULL == GMT_send_prebuilt_message (msg, ch->t, force, + NULL, NULL)); +} + + +/** + * Notify that a channel create didn't succeed. + * + * @param ch The channel to reject. + */ +static void +send_nack (struct MeshChannel *ch) +{ + struct GNUNET_MESH_ChannelManage msg; + + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_NACK); + LOG (GNUNET_ERROR_TYPE_DEBUG, + " sending channel NACK for channel %s\n", + GMCH_2s (ch)); + + msg.chid = htonl (ch->gid); + GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL); +} + + +/** + * Notify a client that the channel is no longer valid. + * + * @param ch Channel that is destroyed. + * @param local_only Should we avoid sending it to other peers? + */ +static void +send_destroy (struct MeshChannel *ch, int local_only) +{ + struct GNUNET_MESH_ChannelManage msg; + + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY); + msg.header.size = htons (sizeof (msg)); + msg.chid = htonl (ch->gid); + + /* If root is not NULL, notify. + * If it's NULL, check lid_root. When a local destroy comes in, root + * is set to NULL but lid_root is left untouched. In this case, do nothing, + * the client is the one who reuqested the channel to be destroyed. + */ + if (NULL != ch->root) + GML_send_channel_destroy (ch->root, ch->lid_root); + else if (0 == ch->lid_root && GNUNET_NO == local_only) + GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL); + + if (NULL != ch->dest) + GML_send_channel_destroy (ch->dest, ch->lid_dest); + else if (0 == ch->lid_dest && GNUNET_NO == local_only) + GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL); } @@ -518,16 +935,40 @@ channel_rel_free_all (struct MeshChannelReliability *rel) { next = copy->next; GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); + LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy); + GNUNET_break (NULL == copy->chq); GNUNET_free (copy); } for (copy = rel->head_sent; NULL != copy; copy = next) { next = copy->next; GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy); + LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy); + if (NULL != copy->chq) + { + if (NULL != copy->chq->tq) + { + GMT_cancel (copy->chq->tq); + /* ch_message_sent will free copy->q */ + } + else + { + GNUNET_free (copy->chq); + GNUNET_break (0); + } + } GNUNET_free (copy); } + if (NULL != rel->uniq && NULL != rel->uniq->tq) + { + GMT_cancel (rel->uniq->tq); + /* ch_message_sent is called freeing uniq */ + } if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) + { GNUNET_SCHEDULER_cancel (rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_free (rel); } @@ -553,7 +994,7 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, bitfield = msg->futures; mid = ntohl (msg->mid); LOG (GNUNET_ERROR_TYPE_DEBUG, - "free_sent_reliable %u %llX\n", + "!!! free_sent_reliable %u %llX\n", mid, bitfield); LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", @@ -577,8 +1018,8 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, /* Skip copies with mid < target */ target = mid + i + 1; LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target); - while (NULL != copy && GMC_is_pid_bigger (target, copy->mid)) - copy = copy->next; + while (NULL != copy && GM_is_pid_bigger (target, copy->mid)) + copy = copy->next; /* Did we run out of copies? (previously freed, it's ok) */ if (NULL == copy) @@ -588,7 +1029,7 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, } /* Did we overshoot the target? (previously freed, it's ok) */ - if (GMC_is_pid_bigger (copy->mid, target)) + if (GM_is_pid_bigger (copy->mid, target)) { LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid); continue; @@ -596,159 +1037,75 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, /* Now copy->mid == target, free it */ next = copy->next; - rel_message_free (copy); + GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES)); copy = next; } LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n"); } -/** - * We haven't received an ACK after a certain time: restransmit the message. - * - * @param cls Closure (MeshReliableMessage with the message to restransmit) - * @param tc TaskContext. - */ -static void -channel_retransmit_message (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct MeshChannelReliability *rel = cls; - struct MeshReliableMessage *copy; - struct MeshChannel *ch; - struct GNUNET_MESH_Data *payload; - int fwd; - - rel->retry_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - - ch = rel->ch; - copy = rel->head_sent; - if (NULL == copy) - { - GNUNET_break (0); - return; - } - - /* Search the message to be retransmitted in the outgoing queue. - * Check only the queue for the connection that is going to be used, - * if the message is stuck in some other connection's queue we shouldn't - * act upon it: - * - cancelling it and sending the new one doesn't guarantee it's delivery, - * the old connection could be temporary stalled or the queue happened to - * be long at time of insertion. - * - not sending the new one could cause terrible delays the old connection - * is stalled. - */ -// FIXME access to queue elements is limited - payload = (struct GNUNET_MESH_Data *) ©[1]; - fwd = (rel == ch->root_rel); -// c = GMT_get_connection (ch->t, fwd); -// hop = connection_get_hop (c, fwd); -// for (q = hop->queue_head; NULL != q; q = q->next) -// { -// if (ntohs (payload->header.type) == q->type && ch == q->ch) -// { -// struct GNUNET_MESH_Data *queued_data = q->cls; -// -// if (queued_data->mid == payload->mid) -// break; -// } -// } - - /* Message not found in the queue that we are going to use. */ -// if (NULL == q) -// { - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid); - - GMCH_send_prebuilt_message (&payload->header, ch, fwd); - GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); -// } -// else -// { -// LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! ALREADY IN QUEUE %u\n", copy->mid); -// } - - rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer); - rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer, - &channel_retransmit_message, - cls); -} - - /** * Destroy a reliable message after it has been acknowledged, either by * direct mid ACK or bitfield. Updates the appropriate data structures and * timers and frees all memory. * * @param copy Message that is no longer needed: remote peer got it. + * @param update_time Is the timing information relevant? + * If this message is ACK in a batch the timing information + * is skewed by the retransmission, count only for the + * retransmitted message. + * + * @return #GNUNET_YES if channel was destroyed as a result of the call, + * #GNUNET_NO otherwise. */ -static void -rel_message_free (struct MeshReliableMessage *copy) +static int +rel_message_free (struct MeshReliableMessage *copy, int update_time) { struct MeshChannelReliability *rel; struct GNUNET_TIME_Relative time; rel = copy->rel; - time = GNUNET_TIME_absolute_get_duration (copy->timestamp); - rel->expected_delay.rel_value_us *= 7; - rel->expected_delay.rel_value_us += time.rel_value_us; - rel->expected_delay.rel_value_us /= 8; - rel->n_sent--; LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Freeing %u\n", copy->mid); - LOG (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent); - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! took %s\n", - GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s\n", - GNUNET_STRINGS_relative_time_to_string (rel->expected_delay, - GNUNET_NO)); - rel->retry_timer = rel->expected_delay; + if (update_time) + { + time = GNUNET_TIME_absolute_get_duration (copy->timestamp); + if (0 == rel->expected_delay.rel_value_us) + rel->expected_delay = time; + else + { + rel->expected_delay.rel_value_us *= 7; + rel->expected_delay.rel_value_us += time.rel_value_us; + rel->expected_delay.rel_value_us /= 8; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! took %s\n", + GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO)); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! new expected delay %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->expected_delay, + GNUNET_NO)); + rel->retry_timer = rel->expected_delay; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! batch free, ignoring timing\n"); + } + rel->ch->pending_messages--; + if (NULL != copy->chq) + { + GMT_cancel (copy->chq->tq); + /* copy->q is set to NULL by ch_message_sent */ + } GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy); + LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE %p\n", copy); GNUNET_free (copy); -} - - -/** - * Confirm we got a channel create. - * - * @param ch The channel to confirm. - * @param fwd Should we send a FWD ACK? (going dest->root) - */ -static void -channel_send_ack (struct MeshChannel *ch, int fwd) -{ - struct GNUNET_MESH_ChannelManage msg; - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK); - LOG (GNUNET_ERROR_TYPE_DEBUG, - " sending channel %s ack for channel %s\n", - fwd ? "FWD" : "BCK", GMCH_2s (ch)); - - msg.chid = htonl (ch->gid); - GMCH_send_prebuilt_message (&msg.header, ch, !fwd); -} - -/** - * Notify that a channel create didn't succeed. - * - * @param ch The channel to reject. - */ -static void -channel_send_nack (struct MeshChannel *ch) -{ - struct GNUNET_MESH_ChannelManage msg; - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_NACK); - LOG (GNUNET_ERROR_TYPE_DEBUG, - " sending channel NACK for channel %s\n", - GMCH_2s (ch)); - - msg.chid = htonl (ch->gid); - GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO); + if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages) + { + struct MeshTunnel3 *t = rel->ch->t; + GMCH_destroy (rel->ch); + GMT_destroy_if_empty (t); + return GNUNET_YES; + } + return GNUNET_NO; } @@ -762,33 +1119,53 @@ static void channel_confirm (struct MeshChannel *ch, int fwd) { struct MeshChannelReliability *rel; - struct MeshReliableMessage *copy; - struct MeshReliableMessage *next; + enum MeshChannelState oldstate; + rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL == rel) + { + GNUNET_break (GNUNET_NO != ch->destroy); + return; + } LOG (GNUNET_ERROR_TYPE_DEBUG, - " channel confirm %s %s:%X\n", - fwd ? "FWD" : "BCK", GMT_2s (ch->t), ch->gid); + " channel confirm %s %s\n", + GM_f2s (fwd), GMCH_2s (ch)); + oldstate = ch->state; ch->state = MESH_CHANNEL_READY; - rel = fwd ? ch->root_rel : ch->dest_rel; - rel->client_ready = GNUNET_YES; - for (copy = rel->head_sent; NULL != copy; copy = next) + if (MESH_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch)) { - struct GNUNET_MessageHeader *msg; + rel->client_ready = GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + " !! retry timer confirm %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO)); + rel->expected_delay = rel->retry_timer; + if (GMT_get_connections_buffer (ch->t) > 0 || GMT_is_loopback (ch->t)) + send_client_ack (ch, fwd); - next = copy->next; - msg = (struct GNUNET_MessageHeader *) ©[1]; - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE) + if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) + { + GNUNET_SCHEDULER_cancel (rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + } + else if (NULL != rel->uniq) + { + GMT_cancel (rel->uniq->tq); + /* ch_message_sent will free and NULL uniq */ + } + else { - rel_message_free (copy); - /* TODO return? */ + if (GNUNET_NO == is_loopback (ch)) + { + /* We SHOULD have been trying to retransmit this! */ + GNUNET_break (0); + } } } - send_client_ack (ch, fwd); /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */ - if (fwd) - channel_send_ack (ch, !fwd); + if (GNUNET_YES == fwd) + send_ack (ch, GNUNET_NO); } @@ -801,7 +1178,7 @@ channel_confirm (struct MeshChannel *ch, int fwd) * @param msg Message to copy. * @param fwd Is this fwd traffic? */ -static void +static struct MeshReliableMessage * channel_save_copy (struct MeshChannel *ch, const struct GNUNET_MessageHeader *msg, int fwd) @@ -813,69 +1190,21 @@ channel_save_copy (struct MeshChannel *ch, uint16_t size; rel = fwd ? ch->root_rel : ch->dest_rel; - mid = rel->mid_send; + mid = rel->mid_send - 1; type = ntohs (msg->type); size = ntohs (msg->size); - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u\n", mid); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u %s\n", mid, GM_m2s (type)); copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) + size); + LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", copy); copy->mid = mid; - copy->timestamp = GNUNET_TIME_absolute_get (); copy->rel = rel; copy->type = type; memcpy (©[1], msg, size); - rel->n_sent++; - LOG (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent); - GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy); - if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) - { - rel->retry_timer = - GNUNET_TIME_relative_multiply (rel->expected_delay, - MESH_RETRANSMIT_MARGIN); - rel->retry_task = - GNUNET_SCHEDULER_add_delayed (rel->retry_timer, - &channel_retransmit_message, - rel); - } -} - - -/** - * Destroy a channel and free all resources. - * - * @param ch Channel to destroy. - */ -static void -channel_destroy (struct MeshChannel *ch) -{ - struct MeshClient *c; - - if (NULL == ch) - return; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n", - GMT_2s (ch->t), ch->gid); - GMCH_debug (ch); - - c = ch->root; - if (NULL != c) - { - GML_channel_remove (c, ch->lid_root, ch); - } - - c = ch->dest; - if (NULL != c) - { - GML_channel_remove (c, ch->lid_dest, ch); - } - - channel_rel_free_all (ch->root_rel); - channel_rel_free_all (ch->dest_rel); - - GMT_remove_channel (ch->t, ch); - GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy); + ch->pending_messages++; - GNUNET_free (ch); + return copy; } @@ -913,31 +1242,6 @@ channel_new (struct MeshTunnel3 *t, } -/** - * Set options in a channel, extracted from a bit flag field - * - * @param ch Channel to set options to. - * @param options Bit array in host byte order. - */ -static void -channel_set_options (struct MeshChannel *ch, uint32_t options) -{ - ch->nobuffer = (options & GNUNET_MESH_OPTION_NOBUFFER) != 0 ? - GNUNET_YES : GNUNET_NO; - ch->reliable = (options & GNUNET_MESH_OPTION_RELIABLE) != 0 ? - GNUNET_YES : GNUNET_NO; -} - -static int -is_loopback (const struct MeshChannel *ch) -{ - if (NULL != ch->t) - return GMT_is_loopback (ch->t); - - return (NULL != ch->root && NULL != ch->dest); -} - - /** * Handle a loopback message: call the appropriate handler for the message type. * @@ -955,12 +1259,14 @@ handle_loopback (struct MeshChannel *ch, type = ntohs (msgh->type); LOG (GNUNET_ERROR_TYPE_DEBUG, "Loopback %s %s message!\n", - fwd ? "FWD" : "BCK", GNUNET_MESH_DEBUG_M2S (type)); + GM_f2s (fwd), GM_m2s (type)); switch (type) { case GNUNET_MESSAGE_TYPE_MESH_DATA: /* Don't send hop ACK, wait for client to ACK */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SEND loopback %u (%u)\n", + ntohl (((struct GNUNET_MESH_Data *) msgh)->mid), ntohs (msgh->size)); GMCH_handle_data (ch, (struct GNUNET_MESH_Data *) msgh, fwd); break; @@ -1003,13 +1309,54 @@ handle_loopback (struct MeshChannel *ch, /******************************** API ***********************************/ /******************************************************************************/ +/** + * Destroy a channel and free all resources. + * + * @param ch Channel to destroy. + */ +void +GMCH_destroy (struct MeshChannel *ch) +{ + struct MeshClient *c; + + if (NULL == ch) + return; + if (2 == ch->destroy) + return; /* recursive call */ + ch->destroy = 2; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n", + GMT_2s (ch->t), ch->gid); + GMCH_debug (ch); + + c = ch->root; + if (NULL != c) + { + GML_channel_remove (c, ch->lid_root, ch); + } + + c = ch->dest; + if (NULL != c) + { + GML_channel_remove (c, ch->lid_dest, ch); + } + + channel_rel_free_all (ch->root_rel); + channel_rel_free_all (ch->dest_rel); + + GMT_remove_channel (ch->t, ch); + GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO); + + GNUNET_free (ch); +} + /** - * Get channel ID. + * Get the channel's public ID. * * @param ch Channel. * - * @return ID + * @return ID used to identify the channel with the remote peer. */ MESH_ChannelNumber GMCH_get_id (const struct MeshChannel *ch) @@ -1073,6 +1420,13 @@ GMCH_get_allowed (struct MeshChannel *ch, int fwd) rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL == rel) + { + /* Probably shutting down: root/dest NULL'ed to mark disconnection */ + GNUNET_break (GNUNET_NO != ch->destroy); + return 0; + } + return rel->client_allowed; } @@ -1113,86 +1467,6 @@ GMCH_is_terminal (struct MeshChannel *ch, int fwd) } -/** - * Notify the destination client that a new incoming channel was created. - * - * @param ch Channel that was created. - */ -void -GMCH_send_create (struct MeshChannel *ch) -{ - uint32_t opt; - - if (NULL == ch->dest) - return; - - opt = 0; - opt |= GNUNET_YES == ch->reliable ? GNUNET_MESH_OPTION_RELIABLE : 0; - opt |= GNUNET_YES == ch->nobuffer ? GNUNET_MESH_OPTION_NOBUFFER : 0; - GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt, - GMT_get_destination (ch->t)); - -} - -/** - * Notify a client that the channel is no longer valid. - * - * @param ch Channel that is destroyed. - */ -void -GMCH_send_destroy (struct MeshChannel *ch) -{ - struct GNUNET_MESH_ChannelManage msg; - - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY); - msg.header.size = htons (sizeof (msg)); - msg.chid = htonl (ch->gid); - - /* If root is not NULL, notify. - * If it's NULL, check lid_root. When a local destroy comes in, root - * is set to NULL but lid_root is left untouched. In this case, do nothing, - * the client is the one who reuqested the channel to be destroyed. - */ - if (NULL != ch->root) - GML_send_channel_destroy (ch->root, ch->lid_root); - else if (0 == ch->lid_root) - GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO); - - if (NULL != ch->dest) - GML_send_channel_destroy (ch->dest, ch->lid_dest); - else if (0 == ch->lid_dest) - GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES); -} - - -/** - * Send data on a channel. - * - * If the destination is local, send it to client, otherwise encrypt and - * send to next hop. - * - * @param ch Channel - * @param msg Message. - * @param fwd Is this a fwd (root->dest) message? - */ -void -GMCH_send_data (struct MeshChannel *ch, - const struct GNUNET_MESH_Data *msg, - int fwd) -{ - if (GMCH_is_terminal (ch, fwd)) - { - GML_send_data (fwd ? ch->dest : ch->root, - msg, - fwd ? ch->lid_dest : ch->lid_root); - } - else - { - GMT_send_prebuilt_message (&msg->header, ch->t, ch, fwd); - } -} - - /** * Send an end-to-end ACK message for the most recent in-sequence payload. * @@ -1209,39 +1483,51 @@ GMCH_send_data_ack (struct MeshChannel *ch, int fwd) struct MeshReliableMessage *copy; unsigned int delta; uint64_t mask; - uint16_t type; + uint32_t ack; if (GNUNET_NO == ch->reliable) { return; } rel = fwd ? ch->dest_rel : ch->root_rel; + ack = rel->mid_recv - 1; LOG (GNUNET_ERROR_TYPE_DEBUG, - "send_data_ack for %u\n", - rel->mid_recv - 1); + " !! Send DATA_ACK for %u\n", + ack); - type = GNUNET_MESSAGE_TYPE_MESH_DATA_ACK; - msg.header.type = htons (type); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA_ACK); msg.header.size = htons (sizeof (msg)); msg.chid = htonl (ch->gid); - msg.mid = htonl (rel->mid_recv - 1); msg.futures = 0; for (copy = rel->head_recv; NULL != copy; copy = copy->next) { - if (copy->type != type) + if (copy->type != GNUNET_MESSAGE_TYPE_MESH_DATA) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "!! Type %s, expected DATA\n", + GM_m2s (copy->type)); + continue; + } + if (copy->mid == ack + 1) + { + ack++; continue; - delta = copy->mid - rel->mid_recv; + } + delta = copy->mid - (ack + 1); if (63 < delta) break; mask = 0x1LL << delta; msg.futures |= mask; LOG (GNUNET_ERROR_TYPE_DEBUG, - " setting bit for %u (delta %u) (%llX) -> %llX\n", - copy->mid, delta, mask, msg.futures); + " !! setting bit for %u (delta %u) (%llX) -> %llX\n", + copy->mid, delta, mask, msg.futures); } - LOG (GNUNET_ERROR_TYPE_DEBUG, " final futures %llX\n", msg.futures); + msg.mid = htonl (ack); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "!!! ACK for %u, futures %llX\n", + ack, msg.futures); - GMCH_send_prebuilt_message (&msg.header, ch, fwd); + GMCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL); LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n"); } @@ -1255,9 +1541,54 @@ GMCH_send_data_ack (struct MeshChannel *ch, int fwd) void GMCH_allow_client (struct MeshChannel *ch, int fwd) { + struct MeshChannelReliability *rel; + unsigned int buffer; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n"); + if (MESH_CHANNEL_READY != ch->state) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n"); + return; + } + + if (GNUNET_YES == ch->reliable) + { + rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL == rel) + { + GNUNET_break (GNUNET_NO != ch->destroy); + return; + } + if (NULL != rel->head_sent) + { + if (64 <= rel->mid_send - rel->head_sent->mid) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n"); + return; + } + else + LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n", + rel->head_sent->mid, rel->mid_send); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n"); + } + } + + if (is_loopback (ch)) + buffer = GMCH_get_buffer (ch, fwd); + else + buffer = GMT_get_connections_buffer (ch->t); + + if (0 == buffer) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n"); return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer); send_client_ack (ch, fwd); } @@ -1364,12 +1695,18 @@ GMCH_handle_local_data (struct MeshChannel *ch, (!fwd && ch->dest == c) ) ) { - GNUNET_break (0); + GNUNET_break_op (0); return GNUNET_SYSERR; } rel = fwd ? ch->root_rel : ch->dest_rel; + if (GNUNET_NO == rel->client_allowed) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + rel->client_allowed = GNUNET_NO; /* Ok, everything is correct, send the message. */ @@ -1381,21 +1718,19 @@ GMCH_handle_local_data (struct MeshChannel *ch, payload->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA); payload->chid = htonl (ch->gid); LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n"); - if (GNUNET_YES == ch->reliable) - channel_save_copy (ch, &payload->header, fwd); - GMCH_send_prebuilt_message (&payload->header, ch, fwd); + GMCH_send_prebuilt_message (&payload->header, ch, fwd, NULL); if (is_loopback (ch)) { - if (GMCH_get_buffer (ch, fwd) > 0); - send_client_ack (ch, fwd); + if (GMCH_get_buffer (ch, fwd) > 0) + GMCH_allow_client (ch, fwd); return GNUNET_OK; } if (GMT_get_connections_buffer (ch->t) > 0) { - send_client_ack (ch, fwd); + GMCH_allow_client (ch, fwd); } return GNUNET_OK; @@ -1409,21 +1744,24 @@ GMCH_handle_local_data (struct MeshChannel *ch, * * @param ch Channel. * @param c Client that requested the destruction (to avoid notifying him). + * @param is_root Is the request coming from root? */ void GMCH_handle_local_destroy (struct MeshChannel *ch, - struct MeshClient *c) + struct MeshClient *c, + int is_root) { struct MeshTunnel3 *t; + ch->destroy = GNUNET_YES; /* Cleanup after the tunnel */ - if (c == ch->dest) + if (GNUNET_NO == is_root && c == ch->dest) { LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c)); GML_client_delete_channel (c, ch, ch->lid_dest); ch->dest = NULL; } - if (c == ch->root) + if (GNUNET_YES == is_root && c == ch->root) { LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c)); GML_client_delete_channel (c, ch, ch->lid_root); @@ -1431,9 +1769,12 @@ GMCH_handle_local_destroy (struct MeshChannel *ch, } t = ch->t; - GMCH_send_destroy (ch); - channel_destroy (ch); - GMT_destroy_if_empty (t); + send_destroy (ch, GNUNET_NO); + if (0 == ch->pending_messages) + { + GMCH_destroy (ch); + GMT_destroy_if_empty (t); + } } @@ -1473,10 +1814,11 @@ GMCH_handle_local_create (struct MeshClient *c, if (GMP_get_short_id (peer) == myid) { - GMT_change_state (t, MESH_TUNNEL3_READY); + GMT_change_cstate (t, MESH_TUNNEL3_READY); } else { + /* FIXME change to a tunnel API, eliminate ch <-> peer connection */ GMP_connect (peer); } @@ -1493,22 +1835,13 @@ GMCH_handle_local_create (struct MeshClient *c, /* In unreliable channels, we'll use the DLL to buffer BCK data */ ch->root_rel = GNUNET_new (struct MeshChannelReliability); ch->root_rel->ch = ch; - ch->root_rel->expected_delay = MESH_RETRANSMIT_TIME; + ch->root_rel->retry_timer = MESH_RETRANSMIT_TIME; + ch->root_rel->expected_delay.rel_value_us = 0; LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GMCH_2s (ch)); - /* Send create channel */ - { - struct GNUNET_MESH_ChannelCreate msgcc; - - msgcc.header.size = htons (sizeof (msgcc)); - msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE); - msgcc.chid = htonl (ch->gid); - msgcc.port = msg->port; - msgcc.opt = msg->opt; + send_create (ch); - GMT_send_prebuilt_message (&msgcc.header, t, ch, GNUNET_YES); - } return GNUNET_OK; } @@ -1550,20 +1883,38 @@ GMCH_handle_data (struct MeshChannel *ch, if (NULL == c) { - GNUNET_break (0); + GNUNET_break (GNUNET_NO != ch->destroy); return; } + if (MESH_CHANNEL_READY != ch->state) + { + if (GNUNET_NO == fwd) + { + /* If we are the root, this means the other peer has sent traffic before + * receiving our ACK. Even if the SYNACK goes missing, no traffic should + * be sent before the ACK. + */ + GNUNET_break_op (0); + return; + } + /* If we are the dest, this means that the SYNACK got to the root but + * the ACK went missing. Treat this as an ACK. + */ + channel_confirm (ch, GNUNET_NO); + } + GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO); mid = ntohl (msg->mid); - LOG (GNUNET_ERROR_TYPE_DEBUG, " mid %u\n", mid); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! got mid %u\n", mid); if (GNUNET_NO == ch->reliable || - ( !GMC_is_pid_bigger (rel->mid_recv, mid) && - GMC_is_pid_bigger (rel->mid_recv + 64, mid) ) ) + ( !GM_is_pid_bigger (rel->mid_recv, mid) && + GM_is_pid_bigger (rel->mid_recv + 64, mid) ) ) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RECV %u\n", mid); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RECV %u (%u)\n", + mid, ntohs (msg->header.size)); if (GNUNET_YES == ch->reliable) { /* Is this the exact next expected messasge? */ @@ -1589,10 +1940,10 @@ GMCH_handle_data (struct MeshChannel *ch, } else { - GNUNET_break_op (0); + GNUNET_break_op (GM_is_pid_bigger (rel->mid_recv, mid)); LOG (GNUNET_ERROR_TYPE_DEBUG, - " MID %u not expected (%u - %u), dropping!\n", - mid, rel->mid_recv, rel->mid_recv + 64); + " !!! MID %u not expected (%u - %u), dropping!\n", + mid, rel->mid_recv, rel->mid_recv + 63); } GMCH_send_data_ack (ch, fwd); @@ -1629,7 +1980,8 @@ GMCH_handle_data_ack (struct MeshChannel *ch, GNUNET_break (0); return; } - fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO; + /* Inverted: if message came 'FWD' is a 'BCK ACK'. */ + fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES; } ack = ntohl (msg->mid); @@ -1646,37 +1998,37 @@ GMCH_handle_data_ack (struct MeshChannel *ch, } if (NULL == rel) { - GNUNET_break (0); + GNUNET_break_op (GNUNET_NO != ch->destroy); return; } + /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */ for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next) { - if (GMC_is_pid_bigger (copy->mid, ack)) + if (GM_is_pid_bigger (copy->mid, ack)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! head %u, out!\n", copy->mid); channel_rel_free_sent (rel, msg); break; } work = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! id %u\n", copy->mid); + LOG (GNUNET_ERROR_TYPE_DEBUG, " !! id %u\n", copy->mid); next = copy->next; - rel_message_free (copy); + if (GNUNET_YES == rel_message_free (copy, GNUNET_YES)) + return; } - /* ACK client if needed */ -// channel_send_ack (t, type, GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK == type); - /* If some message was free'd, update the retransmission delay*/ + /* ACK client if needed and possible */ + GMCH_allow_client (ch, fwd); + + /* If some message was free'd, update the retransmission delay */ if (GNUNET_YES == work) { if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) { GNUNET_SCHEDULER_cancel (rel->retry_task); - if (NULL == rel->head_sent) - { - rel->retry_task = GNUNET_SCHEDULER_NO_TASK; - } - else + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL != rel->head_sent && NULL == rel->head_sent->chq) { struct GNUNET_TIME_Absolute new_target; struct GNUNET_TIME_Relative delay; @@ -1693,7 +2045,10 @@ GMCH_handle_data_ack (struct MeshChannel *ch, } } else + { + /* Work was done but no task was pending? Shouldn't happen! */ GNUNET_break (0); + } } } @@ -1713,53 +2068,64 @@ GMCH_handle_create (struct MeshTunnel3 *t, MESH_ChannelNumber chid; struct MeshChannel *ch; struct MeshClient *c; - uint32_t port; + int new_channel; chid = ntohl (msg->chid); - ch = GMT_get_channel (t, chid); if (NULL == ch) { /* Create channel */ ch = channel_new (t, NULL, 0); ch->gid = chid; + channel_set_options (ch, ntohl (msg->opt)); + new_channel = GNUNET_YES; + } + else + { + new_channel = GNUNET_NO; } - channel_set_options (ch, ntohl (msg->opt)); - /* Find a destination client */ - port = ntohl (msg->port); - LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n", port); - c = GML_client_get_by_port (port); - if (NULL == c) + if (GNUNET_YES == new_channel || GMT_is_loopback (t)) { - /* TODO send reject */ - LOG (GNUNET_ERROR_TYPE_DEBUG, " no client has port registered\n"); - if (is_loopback (ch)) + /* Find a destination client */ + ch->port = ntohl (msg->port); + LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n", ch->port); + c = GML_client_get_by_port (ch->port); + if (NULL == c) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " loopback: destroy on handler\n"); - channel_send_nack (ch); + LOG (GNUNET_ERROR_TYPE_DEBUG, " no client has port registered\n"); + if (is_loopback (ch)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " loopback: destroy on handler\n"); + send_nack (ch); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " not loopback: destroy now\n"); + send_nack (ch); + GMCH_destroy (ch); + } + return NULL; } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, " not loopback: destroy now\n"); - channel_send_nack (ch); - channel_destroy (ch); + LOG (GNUNET_ERROR_TYPE_DEBUG, " client %p has port registered\n", c); } - return NULL; + + add_destination (ch, c); + if (GNUNET_YES == ch->reliable) + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n"); + + send_client_create (ch); + ch->state = MESH_CHANNEL_SENT; } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, " client %p has port registered\n", c); + LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate create channel\n"); } - - add_destination (ch, c); - if (GNUNET_YES == ch->reliable) - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n"); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n"); - - GMCH_send_create (ch); - channel_send_ack (ch, GNUNET_YES); + send_ack (ch, GNUNET_YES); return ch; } @@ -1776,7 +2142,7 @@ void GMCH_handle_nack (struct MeshChannel *ch) { send_client_nack (ch); - channel_destroy (ch); + GMCH_destroy (ch); } @@ -1848,8 +2214,8 @@ GMCH_handle_destroy (struct MeshChannel *ch, } t = ch->t; - GMCH_send_destroy (ch); - channel_destroy (ch); + send_destroy (ch, GNUNET_YES); + GMCH_destroy (ch); GMT_destroy_if_empty (t); } @@ -1868,13 +2234,19 @@ GMCH_handle_destroy (struct MeshChannel *ch, * @param message Message to send. Function makes a copy of it. * @param ch Channel on which this message is transmitted. * @param fwd Is this a fwd message? + * @param existing_copy This is a retransmission, don't save a copy. */ void GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, - struct MeshChannel *ch, int fwd) + struct MeshChannel *ch, int fwd, + void *existing_copy) { + struct MeshChannelQueue *chq; + uint16_t type; + + type = ntohs (message->type); LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH Send %s %s on channel %s\n", - fwd ? "FWD" : "BCK", GNUNET_MESH_DEBUG_M2S (ntohs (message->type)), + GM_f2s (fwd), GM_m2s (type), GMCH_2s (ch)); if (GMT_is_loopback (ch->t)) @@ -1883,7 +2255,105 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, return; } - GMT_send_prebuilt_message (message, ch->t, ch, fwd); + switch (type) + { + case GNUNET_MESSAGE_TYPE_MESH_DATA: + + if (GNUNET_YES == ch->reliable) + { + chq = GNUNET_new (struct MeshChannelQueue); + chq->type = type; + if (NULL == existing_copy) + chq->copy = channel_save_copy (ch, message, fwd); + else + { + chq->copy = (struct MeshReliableMessage *) existing_copy; + if (NULL != chq->copy->chq) + { + /* Last retransmission was queued but not yet sent! + * This retransmission was scheduled by a ch_message_sent which + * followed a very fast RTT, so the tiny delay made the + * retransmission function to execute before the previous + * retransmitted message even had a chance to leave the peer. + * Cancel this message and wait until the pending + * retransmission leaves the peer and ch_message_sent starts + * the timer for the next one. + */ + GNUNET_free (chq); + LOG (GNUNET_ERROR_TYPE_DEBUG, + " exisitng copy not yet transmitted!\n"); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + " using existing copy: %p {r:%p q:%p t:%u}\n", + existing_copy, + chq->copy->rel, chq->copy->chq, chq->copy->type); + } + LOG (GNUNET_ERROR_TYPE_DEBUG, " new chq: %p\n", chq); + chq->copy->chq = chq; + chq->tq = GMT_send_prebuilt_message (message, ch->t, + NULL != existing_copy, + &ch_message_sent, chq); + /* q itself is stored in copy */ + GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy); + } + else + { + fire_and_forget (message, ch, GNUNET_NO); + } + break; + + + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK: + if (GNUNET_YES == fwd) + { + /* BCK ACK (going FWD) is just a response for a SYNACK, don't keep*/ + fire_and_forget (message, ch, GNUNET_YES); + return; + } + /* fall-trough */ + case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: + chq = GNUNET_new (struct MeshChannelQueue); + chq->type = type; + chq->rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL != chq->rel->uniq) + { + if (NULL != chq->rel->uniq->tq) + { + GMT_cancel (chq->rel->uniq->tq); + /* ch_message_sent is called, freeing and NULLing uniq */ + } + else + { + GNUNET_break (0); + GNUNET_free (chq->rel->uniq); + } + } + chq->tq = GMT_send_prebuilt_message (message, ch->t, GNUNET_YES, + &ch_message_sent, chq); + if (NULL == chq->tq) + { + GNUNET_break (0); + GNUNET_free (chq); + chq = NULL; + return; + } + chq->rel->uniq = chq; + break; + + + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY: + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_NACK: + fire_and_forget (message, ch, GNUNET_YES); + break; + + + default: + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, "type %s unknown!\n", GM_m2s (type)); + fire_and_forget (message, ch, GNUNET_YES); + } } @@ -1902,8 +2372,8 @@ GMCH_2s (const struct MeshChannel *ch) if (NULL == ch) return "(NULL Channel)"; - sprintf (buf, "%s:%X (%X / %X)", - GMT_2s (ch->t), ch->gid, ch->lid_root, ch->lid_dest); + sprintf (buf, "%s:%u gid:%X (%X / %X)", + GMT_2s (ch->t), ch->port, ch->gid, ch->lid_root, ch->lid_dest); return buf; }