+ LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
+ rel = fwd ? ch->dest_rel : ch->root_rel;
+ if (GNUNET_NO == rel->client_ready)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
+ return;
+ }
+
+ copy = rel->head_recv;
+ /* We never buffer channel management messages */
+ if (NULL != copy)
+ {
+ if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable)
+ {
+ struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1];
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " have %u! now expecting %u\n",
+ copy->mid, rel->mid_recv + 1);
+ send_client_data (ch, msg, fwd);
+ rel->n_recv--;
+ rel->mid_recv++;
+ GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE RECV %p\n", copy);
+ GNUNET_free (copy);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " reliable && don't have %u, next is %u\n",
+ rel->mid_recv,
+ copy->mid);
+ return;
+ }
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
+}
+
+
+/**
+ * Allow a client to send more data.
+ *
+ * In case the client was already allowed to send data, do nothing.
+ *
+ * @param ch Channel.
+ * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root)
+ */
+static void
+send_client_ack (struct MeshChannel *ch, int fwd)
+{
+ struct MeshChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel;
+ struct MeshClient *c = fwd ? ch->root : ch->dest;
+
+ if (NULL == c)
+ {
+ GNUNET_break (GNUNET_NO != ch->destroy);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " sending %s ack to client on channel %s\n",
+ GM_f2s (fwd), GMCH_2s (ch));
+
+ if (NULL == rel)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ if (GNUNET_YES == rel->client_allowed)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " already allowed\n");
+ return;
+ }
+ rel->client_allowed = GNUNET_YES;
+
+ GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
+}
+
+
+/**
+ * Notify the root that the destination rejected the channel.
+ *
+ * @param ch Rejected channel.
+ */
+static void
+send_client_nack (struct MeshChannel *ch)
+{
+ if (NULL == ch->root)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ GML_send_channel_nack (ch->root, ch->lid_root);
+}
+
+
+/**
+ * We haven't received an ACK after a certain time: restransmit the message.
+ *
+ * @param cls Closure (MeshChannelReliability with the message to restransmit)
+ * @param tc TaskContext.
+ */
+static void
+channel_retransmit_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MeshChannelReliability *rel = cls;
+ struct MeshReliableMessage *copy;
+ struct MeshChannel *ch;
+ struct GNUNET_MESH_Data *payload;
+ int fwd;
+
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ ch = rel->ch;
+ copy = rel->head_sent;
+ if (NULL == copy)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ payload = (struct GNUNET_MESH_Data *) ©[1];
+ fwd = (rel == ch->root_rel);
+
+ /* Message not found in the queue that we are going to use. */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
+
+ GMCH_send_prebuilt_message (&payload->header, ch, fwd, copy);
+ GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+}
+
+
+/**
+ * We haven't received an Channel ACK after a certain time: resend the CREATE.
+ *
+ * @param cls Closure (MeshChannelReliability of the channel to recreate)
+ * @param tc TaskContext.
+ */
+static void
+channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MeshChannelReliability *rel = cls;
+
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n");
+ GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
+
+ if (rel == rel->ch->root_rel)
+ {
+ send_create (rel->ch);
+ }
+ else if (rel == rel->ch->dest_rel)
+ {
+ send_ack (rel->ch, GNUNET_YES);
+ }
+ else
+ {
+ GNUNET_break (0);
+ }
+
+}
+
+
+/**
+ * Message has been sent: start retransmission timer.
+ *
+ * @param cls Closure (queue structure).
+ * @param t Tunnel.
+ * @param q Queue handler (no longer valid).
+ * @param type Type of message.
+ * @param size Size of the message.
+ */
+static void
+ch_message_sent (void *cls,
+ struct MeshTunnel3 *t,
+ struct MeshTunnel3Queue *q,
+ uint16_t type, size_t size)
+{
+ struct MeshChannelQueue *chq = cls;
+ struct MeshReliableMessage *copy = chq->copy;
+ struct MeshChannelReliability *rel;
+
+ switch (chq->type)
+ {
+ case GNUNET_MESSAGE_TYPE_MESH_DATA:
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid);
+ GNUNET_assert (chq == copy->chq);
+ copy->timestamp = GNUNET_TIME_absolute_get ();
+ rel = copy->rel;
+ if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
+ GNUNET_YES));
+ if (0 != rel->expected_delay.rel_value_us)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n");
+ rel->retry_timer =
+ GNUNET_TIME_relative_multiply (rel->expected_delay,
+ MESH_RETRANSMIT_MARGIN);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n");
+ rel->retry_timer = MESH_RETRANSMIT_TIME;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
+ GNUNET_NO));
+ rel->retry_task =
+ GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &channel_retransmit_message, rel);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task);
+ }
+ copy->chq = NULL;
+ break;
+
+
+ case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK:
+ case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
+ case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK:
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GM_m2s (chq->type));
+ rel = chq->rel;
+ GNUNET_assert (rel->uniq == chq);
+ rel->uniq = NULL;
+
+ if (MESH_CHANNEL_READY != rel->ch->state
+ && GNUNET_MESSAGE_TYPE_MESH_DATA_ACK != type
+ && GNUNET_NO == rel->ch->destroy)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rel->retry_task);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n",
+ GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
+ GNUNET_NO));
+ rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
+ rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
+ &channel_recreate, rel);
+ }
+ break;
+
+
+ default:
+ GNUNET_break (0);
+ }
+
+ GNUNET_free (chq);
+}
+
+
+/**
+ * send a channel create message.
+ *
+ * @param ch Channel for which to send.
+ */
+static void
+send_create (struct MeshChannel *ch)
+{
+ struct GNUNET_MESH_ChannelCreate msgcc;
+
+ msgcc.header.size = htons (sizeof (msgcc));
+ msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE);
+ msgcc.chid = htonl (ch->gid);
+ msgcc.port = htonl (ch->port);
+ msgcc.opt = htonl (channel_get_options (ch));
+
+ GMCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL);
+}
+
+
+/**
+ * Confirm we got a channel create or FWD ack.
+ *
+ * @param ch The channel to confirm.
+ * @param fwd Should we send a FWD ACK? (going dest->root)
+ */
+static void
+send_ack (struct MeshChannel *ch, int fwd)
+{
+ struct GNUNET_MESH_ChannelManage msg;
+
+ msg.header.size = htons (sizeof (msg));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " sending channel %s ack for channel %s\n",
+ GM_f2s (fwd), GMCH_2s (ch));
+
+ msg.chid = htonl (ch->gid);
+ GMCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
+}
+
+
+/**
+ * Send a message and don't keep any info about it: we won't need to cancel it
+ * or resend it.
+ *
+ * @param msg Header of the message to fire away.
+ * @param ch Channel on which the message should go.
+ * @param force Is this a forced (undroppable) message?
+ */
+static void
+fire_and_forget (const struct GNUNET_MessageHeader *msg,
+ struct MeshChannel *ch,
+ int force)
+{
+ GNUNET_break (NULL == GMT_send_prebuilt_message (msg, ch->t, force,
+ NULL, NULL));
+}
+
+
+/**
+ * Notify that a channel create didn't succeed.
+ *
+ * @param ch The channel to reject.
+ */
+static void
+send_nack (struct MeshChannel *ch)
+{
+ struct GNUNET_MESH_ChannelManage msg;
+
+ msg.header.size = htons (sizeof (msg));
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_NACK);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ " sending channel NACK for channel %s\n",
+ GMCH_2s (ch));
+
+ msg.chid = htonl (ch->gid);
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
+}
+
+
+/**
+ * Notify a client that the channel is no longer valid.
+ *
+ * @param ch Channel that is destroyed.
+ * @param local_only Should we avoid sending it to other peers?
+ */
+static void
+send_destroy (struct MeshChannel *ch, int local_only)
+{
+ struct GNUNET_MESH_ChannelManage msg;
+
+ msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY);
+ msg.header.size = htons (sizeof (msg));
+ msg.chid = htonl (ch->gid);
+
+ /* If root is not NULL, notify.
+ * If it's NULL, check lid_root. When a local destroy comes in, root
+ * is set to NULL but lid_root is left untouched. In this case, do nothing,
+ * the client is the one who reuqested the channel to be destroyed.
+ */
+ if (NULL != ch->root)
+ GML_send_channel_destroy (ch->root, ch->lid_root);
+ else if (0 == ch->lid_root && GNUNET_NO == local_only)
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
+
+ if (NULL != ch->dest)
+ GML_send_channel_destroy (ch->dest, ch->lid_dest);
+ else if (0 == ch->lid_dest && GNUNET_NO == local_only)
+ GMCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
+}
+
+
+/**
+ * Destroy all reliable messages queued for a channel,
+ * during a channel destruction.
+ * Frees the reliability structure itself.
+ *
+ * @param rel Reliability data for a channel.
+ */
+static void
+channel_rel_free_all (struct MeshChannelReliability *rel)
+{
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
+
+ if (NULL == rel)
+ return;
+
+ for (copy = rel->head_recv; NULL != copy; copy = next)
+ {
+ next = copy->next;
+ GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy);
+ GNUNET_break (NULL == copy->chq);
+ GNUNET_free (copy);
+ }
+ for (copy = rel->head_sent; NULL != copy; copy = next)
+ {
+ next = copy->next;
+ GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy);
+ if (NULL != copy->chq)
+ {
+ if (NULL != copy->chq->tq)
+ {
+ GMT_cancel (copy->chq->tq);
+ /* ch_message_sent will free copy->q */
+ }
+ else
+ {
+ GNUNET_free (copy->chq);
+ GNUNET_break (0);
+ }
+ }
+ GNUNET_free (copy);
+ }
+ if (NULL != rel->uniq && NULL != rel->uniq->tq)
+ {
+ GMT_cancel (rel->uniq->tq);
+ /* ch_message_sent is called freeing uniq */
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != rel->retry_task)
+ {
+ GNUNET_SCHEDULER_cancel (rel->retry_task);
+ rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (rel);
+}
+
+
+/**
+ * Mark future messages as ACK'd.
+ *
+ * @param rel Reliability data.
+ * @param msg DataACK message with a bitfield of future ACK'd messages.
+ */
+static void
+channel_rel_free_sent (struct MeshChannelReliability *rel,
+ const struct GNUNET_MESH_DataACK *msg)
+{
+ struct MeshReliableMessage *copy;
+ struct MeshReliableMessage *next;
+ uint64_t bitfield;
+ uint64_t mask;
+ uint32_t mid;
+ uint32_t target;
+ unsigned int i;
+
+ bitfield = msg->futures;
+ mid = ntohl (msg->mid);