X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh_channel.c;h=06cf599d4ca6c89e5a9933eee2f47cb9a630a886;hb=e43078b68951ad8a3daa3a193473e9c321549e1d;hp=b6305aabefbc97444b2258189ee42c70e2bea1ab;hpb=e81a1e569010d1a3161ef91a914c780e71230537;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh_channel.c b/src/mesh/gnunet-service-mesh_channel.c index b6305aabe..06cf599d4 100644 --- a/src/mesh/gnunet-service-mesh_channel.c +++ b/src/mesh/gnunet-service-mesh_channel.c @@ -69,7 +69,7 @@ struct MeshChannelQueue /** * Tunnel Queue. */ - struct MeshTunnel3Queue *q; + struct MeshTunnel3Queue *tq; /** * Message type (DATA/DATA_ACK) @@ -117,7 +117,7 @@ struct MeshReliableMessage /** * Tunnel Queue. */ - struct MeshChannelQueue *q; + struct MeshChannelQueue *chq; /** * When was this message issued (to calculate ACK delay) @@ -316,7 +316,7 @@ extern GNUNET_PEER_Id myid; * is skewed by the retransmission, count only for the * retransmitted message. */ -static void +static int rel_message_free (struct MeshReliableMessage *copy, int update_time); /** @@ -338,6 +338,23 @@ send_ack (struct MeshChannel *ch, int fwd); +/** + * Test if the channel is loopback: both root and dest are on the local peer. + * + * @param ch Channel to test. + * + * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise. + */ +static int +is_loopback (const struct MeshChannel *ch) +{ + if (NULL != ch->t) + return GMT_is_loopback (ch->t); + + return (NULL != ch->root && NULL != ch->dest); +} + + /** * We have received a message out of order, or the client is not ready. * Buffer it until we receive an ACK from the client or the missing @@ -386,6 +403,7 @@ add_buffered_data (const struct GNUNET_MESH_Data *msg, LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n"); } + /** * Add a destination client to a channel, initializing all data structures * in the channel and the client. @@ -577,7 +595,13 @@ static void send_client_ack (struct MeshChannel *ch, int fwd) { struct MeshChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel; + struct MeshClient *c = fwd ? ch->root : ch->dest; + if (NULL == c) + { + GNUNET_break (GNUNET_NO != ch->destroy); + return; + } LOG (GNUNET_ERROR_TYPE_DEBUG, " sending %s ack to client on channel %s\n", GM_f2s (fwd), GMCH_2s (ch)); @@ -595,7 +619,7 @@ send_client_ack (struct MeshChannel *ch, int fwd) } rel->client_allowed = GNUNET_YES; - GML_send_ack (fwd ? ch->root : ch->dest, fwd ? ch->lid_root : ch->lid_dest); + GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest); } @@ -612,7 +636,7 @@ send_client_nack (struct MeshChannel *ch) GNUNET_break (0); return; } - GML_send_nack (ch->root, ch->lid_root); + GML_send_channel_nack (ch->root, ch->lid_root); } @@ -704,20 +728,22 @@ ch_message_sent (void *cls, struct MeshTunnel3Queue *q, uint16_t type, size_t size) { - struct MeshChannelQueue *ch_q = cls; - struct MeshReliableMessage *copy = ch_q->copy; + struct MeshChannelQueue *chq = cls; + struct MeshReliableMessage *copy = chq->copy; struct MeshChannelReliability *rel; - switch (ch_q->type) + switch (chq->type) { case GNUNET_MESSAGE_TYPE_MESH_DATA: LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid); - GNUNET_assert (ch_q == copy->q); + GNUNET_assert (chq == copy->chq); copy->timestamp = GNUNET_TIME_absolute_get (); rel = copy->rel; if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry %u\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->expected_delay, + GNUNET_YES)); if (0 != rel->expected_delay.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n"); @@ -727,7 +753,7 @@ ch_message_sent (void *cls, } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay 0\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n"); rel->retry_timer = MESH_RETRANSMIT_TIME; } LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n", @@ -741,20 +767,21 @@ ch_message_sent (void *cls, { LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task); } - copy->q = NULL; + copy->chq = NULL; break; case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK: - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GM_m2s (ch_q->type)); - rel = ch_q->rel; - GNUNET_assert (rel->uniq == ch_q); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GM_m2s (chq->type)); + rel = chq->rel; + GNUNET_assert (rel->uniq == chq); rel->uniq = NULL; if (MESH_CHANNEL_READY != rel->ch->state - && GNUNET_MESSAGE_TYPE_MESH_DATA_ACK != type) + && GNUNET_MESSAGE_TYPE_MESH_DATA_ACK != type + && GNUNET_NO == rel->ch->destroy) { GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rel->retry_task); LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n", @@ -771,7 +798,7 @@ ch_message_sent (void *cls, GNUNET_break (0); } - GNUNET_free (ch_q); + GNUNET_free (chq); } @@ -830,7 +857,7 @@ fire_and_forget (const struct GNUNET_MessageHeader *msg, struct MeshChannel *ch, int force) { - GNUNET_break (NULL == GMT_send_prebuilt_message (msg, ch->t, ch, force, + GNUNET_break (NULL == GMT_send_prebuilt_message (msg, ch->t, force, NULL, NULL)); } @@ -909,6 +936,7 @@ channel_rel_free_all (struct MeshChannelReliability *rel) next = copy->next; GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy); + GNUNET_break (NULL == copy->chq); GNUNET_free (copy); } for (copy = rel->head_sent; NULL != copy; copy = next) @@ -916,14 +944,31 @@ channel_rel_free_all (struct MeshChannelReliability *rel) next = copy->next; GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy); LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy); + if (NULL != copy->chq) + { + if (NULL != copy->chq->tq) + { + GMT_cancel (copy->chq->tq); + /* ch_message_sent will free copy->q */ + } + else + { + GNUNET_free (copy->chq); + GNUNET_break (0); + } + } GNUNET_free (copy); } + if (NULL != rel->uniq && NULL != rel->uniq->tq) + { + GMT_cancel (rel->uniq->tq); + /* ch_message_sent is called freeing uniq */ + } if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) { GNUNET_SCHEDULER_cancel (rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; } - if (NULL != rel->uniq && NULL != rel->uniq->q) - GMT_cancel (rel->uniq->q); GNUNET_free (rel); } @@ -974,7 +1019,7 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, target = mid + i + 1; LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target); while (NULL != copy && GM_is_pid_bigger (target, copy->mid)) - copy = copy->next; + copy = copy->next; /* Did we run out of copies? (previously freed, it's ok) */ if (NULL == copy) @@ -992,7 +1037,7 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, /* Now copy->mid == target, free it */ next = copy->next; - rel_message_free (copy, GNUNET_YES); + GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES)); copy = next; } LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n"); @@ -1009,8 +1054,11 @@ channel_rel_free_sent (struct MeshChannelReliability *rel, * If this message is ACK in a batch the timing information * is skewed by the retransmission, count only for the * retransmitted message. + * + * @return #GNUNET_YES if channel was destroyed as a result of the call, + * #GNUNET_NO otherwise. */ -static void +static int rel_message_free (struct MeshReliableMessage *copy, int update_time) { struct MeshChannelReliability *rel; @@ -1041,9 +1089,9 @@ rel_message_free (struct MeshReliableMessage *copy, int update_time) LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! batch free, ignoring timing\n"); } rel->ch->pending_messages--; - if (NULL != copy->q) + if (NULL != copy->chq) { - GMT_cancel (copy->q->q); + GMT_cancel (copy->chq->tq); /* copy->q is set to NULL by ch_message_sent */ } GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy); @@ -1055,7 +1103,9 @@ rel_message_free (struct MeshReliableMessage *copy, int update_time) struct MeshTunnel3 *t = rel->ch->t; GMCH_destroy (rel->ch); GMT_destroy_if_empty (t); + return GNUNET_YES; } + return GNUNET_NO; } @@ -1071,35 +1121,46 @@ channel_confirm (struct MeshChannel *ch, int fwd) struct MeshChannelReliability *rel; enum MeshChannelState oldstate; + rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL == rel) + { + GNUNET_break (GNUNET_NO != ch->destroy); + return; + } LOG (GNUNET_ERROR_TYPE_DEBUG, - " channel confirm %s %s:%X\n", - GM_f2s (fwd), GMT_2s (ch->t), ch->gid); + " channel confirm %s %s\n", + GM_f2s (fwd), GMCH_2s (ch)); oldstate = ch->state; ch->state = MESH_CHANNEL_READY; - rel = fwd ? ch->root_rel : ch->dest_rel; - rel->client_ready = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - " !! retry timer confirm %s\n", - GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO)); - rel->expected_delay = rel->retry_timer; - if (GMT_get_connections_buffer (ch->t) > 0 || GMT_is_loopback (ch->t)) - send_client_ack (ch, fwd); - - if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) - { - GNUNET_SCHEDULER_cancel (rel->retry_task); - rel->retry_task = GNUNET_SCHEDULER_NO_TASK; - } - else if (NULL != rel->uniq) + if (MESH_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch)) { - GMT_cancel (rel->uniq->q); - /* ch_sent_message will free and NULL uniq */ - } - else - { - /* We SHOULD have been trying to retransmit this! */ - GNUNET_break (oldstate == MESH_CHANNEL_READY); + rel->client_ready = GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + " !! retry timer confirm %s\n", + GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO)); + rel->expected_delay = rel->retry_timer; + if (GMT_get_connections_buffer (ch->t) > 0 || GMT_is_loopback (ch->t)) + send_client_ack (ch, fwd); + + if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) + { + GNUNET_SCHEDULER_cancel (rel->retry_task); + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + } + else if (NULL != rel->uniq) + { + GMT_cancel (rel->uniq->tq); + /* ch_message_sent will free and NULL uniq */ + } + else + { + if (GNUNET_NO == is_loopback (ch)) + { + /* We SHOULD have been trying to retransmit this! */ + GNUNET_break (0); + } + } } /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */ @@ -1181,23 +1242,6 @@ channel_new (struct MeshTunnel3 *t, } -/** - * Test if the channel is loopback: both root and dest are on the local peer. - * - * @param ch Channel to test. - * - * @return #GNUNET_YES if channel is loopback, #NGUNET_NO otherwise. - */ -static int -is_loopback (const struct MeshChannel *ch) -{ - if (NULL != ch->t) - return GMT_is_loopback (ch->t); - - return (NULL != ch->root && NULL != ch->dest); -} - - /** * Handle a loopback message: call the appropriate handler for the message type. * @@ -1221,6 +1265,8 @@ handle_loopback (struct MeshChannel *ch, { case GNUNET_MESSAGE_TYPE_MESH_DATA: /* Don't send hop ACK, wait for client to ACK */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SEND loopback %u (%u)\n", + ntohl (((struct GNUNET_MESH_Data *) msgh)->mid), ntohs (msgh->size)); GMCH_handle_data (ch, (struct GNUNET_MESH_Data *) msgh, fwd); break; @@ -1275,6 +1321,9 @@ GMCH_destroy (struct MeshChannel *ch) if (NULL == ch) return; + if (2 == ch->destroy) + return; /* recursive call */ + ch->destroy = 2; LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n", GMT_2s (ch->t), ch->gid); @@ -1303,11 +1352,11 @@ GMCH_destroy (struct MeshChannel *ch) /** - * Get channel ID. + * Get the channel's public ID. * * @param ch Channel. * - * @return ID + * @return ID used to identify the channel with the remote peer. */ MESH_ChannelNumber GMCH_get_id (const struct MeshChannel *ch) @@ -1511,10 +1560,20 @@ GMCH_allow_client (struct MeshChannel *ch, int fwd) GNUNET_break (GNUNET_NO != ch->destroy); return; } - if (NULL != rel->head_sent && 64 <= rel->mid_send - rel->head_sent->mid) + if (NULL != rel->head_sent) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n"); - return; + if (64 <= rel->mid_send - rel->head_sent->mid) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n"); + return; + } + else + LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n", + rel->head_sent->mid, rel->mid_send); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n"); } } @@ -1664,14 +1723,14 @@ GMCH_handle_local_data (struct MeshChannel *ch, if (is_loopback (ch)) { if (GMCH_get_buffer (ch, fwd) > 0) - send_client_ack (ch, fwd); + GMCH_allow_client (ch, fwd); return GNUNET_OK; } if (GMT_get_connections_buffer (ch->t) > 0) { - send_client_ack (ch, fwd); + GMCH_allow_client (ch, fwd); } return GNUNET_OK; @@ -1759,6 +1818,7 @@ GMCH_handle_local_create (struct MeshClient *c, } else { + /* FIXME change to a tunnel API, eliminate ch <-> peer connection */ GMP_connect (peer); } @@ -1823,7 +1883,7 @@ GMCH_handle_data (struct MeshChannel *ch, if (NULL == c) { - GNUNET_break (0); + GNUNET_break (GNUNET_NO != ch->destroy); return; } @@ -1853,7 +1913,8 @@ GMCH_handle_data (struct MeshChannel *ch, ( !GM_is_pid_bigger (rel->mid_recv, mid) && GM_is_pid_bigger (rel->mid_recv + 64, mid) ) ) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RECV %u\n", mid); + LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RECV %u (%u)\n", + mid, ntohs (msg->header.size)); if (GNUNET_YES == ch->reliable) { /* Is this the exact next expected messasge? */ @@ -1937,11 +1998,11 @@ GMCH_handle_data_ack (struct MeshChannel *ch, } if (NULL == rel) { - GNUNET_break_op (0); + GNUNET_break_op (GNUNET_NO != ch->destroy); return; } - /* Free ACK'd copies: no need to retransmit those anymore */ + /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */ for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next) { if (GM_is_pid_bigger (copy->mid, ack)) @@ -1953,10 +2014,11 @@ GMCH_handle_data_ack (struct MeshChannel *ch, work = GNUNET_YES; LOG (GNUNET_ERROR_TYPE_DEBUG, " !! id %u\n", copy->mid); next = copy->next; - rel_message_free (copy, GNUNET_YES); + if (GNUNET_YES == rel_message_free (copy, GNUNET_YES)) + return; } - /* ACK client if needed */ + /* ACK client if needed and possible */ GMCH_allow_client (ch, fwd); /* If some message was free'd, update the retransmission delay */ @@ -1965,7 +2027,8 @@ GMCH_handle_data_ack (struct MeshChannel *ch, if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task) { GNUNET_SCHEDULER_cancel (rel->retry_task); - if (NULL != rel->head_sent && NULL == rel->head_sent->q) + rel->retry_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL != rel->head_sent && NULL == rel->head_sent->chq) { struct GNUNET_TIME_Absolute new_target; struct GNUNET_TIME_Relative delay; @@ -1980,13 +2043,10 @@ GMCH_handle_data_ack (struct MeshChannel *ch, &channel_retransmit_message, rel); } - else /* either no more traffic to ack or traffic has just been queued */ - { - rel->retry_task = GNUNET_SCHEDULER_NO_TASK; - } } - else /* work was done but no task was pending? shouldn't happen! */ + else { + /* Work was done but no task was pending? Shouldn't happen! */ GNUNET_break (0); } } @@ -2181,7 +2241,7 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct MeshChannel *ch, int fwd, void *existing_copy) { - struct MeshChannelQueue *q; + struct MeshChannelQueue *chq; uint16_t type; type = ntohs (message->type); @@ -2201,14 +2261,14 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, if (GNUNET_YES == ch->reliable) { - q = GNUNET_new (struct MeshChannelQueue); - q->type = type; + chq = GNUNET_new (struct MeshChannelQueue); + chq->type = type; if (NULL == existing_copy) - q->copy = channel_save_copy (ch, message, fwd); + chq->copy = channel_save_copy (ch, message, fwd); else { - q->copy = (struct MeshReliableMessage *) existing_copy; - if (NULL != q->copy->q) + chq->copy = (struct MeshReliableMessage *) existing_copy; + if (NULL != chq->copy->chq) { /* Last retransmission was queued but not yet sent! * This retransmission was scheduled by a ch_message_sent which @@ -2219,7 +2279,7 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, * retransmission leaves the peer and ch_message_sent starts * the timer for the next one. */ - GNUNET_free (q); + GNUNET_free (chq); LOG (GNUNET_ERROR_TYPE_DEBUG, " exisitng copy not yet transmitted!\n"); return; @@ -2227,14 +2287,15 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, LOG (GNUNET_ERROR_TYPE_DEBUG, " using existing copy: %p {r:%p q:%p t:%u}\n", existing_copy, - q->copy->rel, q->copy->q, q->copy->type); + chq->copy->rel, chq->copy->chq, chq->copy->type); } - LOG (GNUNET_ERROR_TYPE_DEBUG, " new q: %p\n", q); - q->copy->q = q; - q->q = GMT_send_prebuilt_message (message, ch->t, ch, + LOG (GNUNET_ERROR_TYPE_DEBUG, " new chq: %p\n", chq); + chq->copy->chq = chq; + chq->tq = GMT_send_prebuilt_message (message, ch->t, NULL != existing_copy, - &ch_message_sent, q); + &ch_message_sent, chq); /* q itself is stored in copy */ + GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy); } else { @@ -2248,29 +2309,37 @@ GMCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message, { /* BCK ACK (going FWD) is just a response for a SYNACK, don't keep*/ fire_and_forget (message, ch, GNUNET_YES); - break; + return; } /* fall-trough */ case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: - q = GNUNET_new (struct MeshChannelQueue); - q->type = type; - q->rel = fwd ? ch->root_rel : ch->dest_rel; - if (NULL != q->rel->uniq) + chq = GNUNET_new (struct MeshChannelQueue); + chq->type = type; + chq->rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL != chq->rel->uniq) { - if (NULL != q->rel->uniq->q) + if (NULL != chq->rel->uniq->tq) { - GMT_cancel (q->rel->uniq->q); + GMT_cancel (chq->rel->uniq->tq); /* ch_message_sent is called, freeing and NULLing uniq */ } else { - GNUNET_free (q->rel->uniq); + GNUNET_break (0); + GNUNET_free (chq->rel->uniq); } } - q->q = GMT_send_prebuilt_message (message, ch->t, ch, GNUNET_YES, - &ch_message_sent, q); - q->rel->uniq = q; + chq->tq = GMT_send_prebuilt_message (message, ch->t, GNUNET_YES, + &ch_message_sent, chq); + if (NULL == chq->tq) + { + GNUNET_break (0); + GNUNET_free (chq); + chq = NULL; + return; + } + chq->rel->uniq = chq; break;