+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
+{
+ struct GNUNET_STREAM_MessageHeader *msg_copy;
+ uint16_t size;
+
+ size = ntohs (message->header.size);
+ msg_copy = GNUNET_malloc (size);
+ memcpy (msg_copy, message, size);
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receiver_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket);
+
+
+/**
+ * Task for retransmitting data messages if they aren't ACK before their ack
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+data_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
+ write_data (socket);
+}
+
+
+/**
+ * Task for sending ACK message
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+ack_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ /* Create the ACK Message */
+ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.size = htons (sizeof (struct
+ GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
+ ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
+ ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
+ ack_msg->receive_window_remaining =
+ htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
+ /* Queue up ACK for immediate sending */
+ queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
+}
+
+
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+ struct GNUNET_STREAM_MessageHeader *msg;
+ struct GNUNET_STREAM_Socket *socket;
+
+ shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL != shutdown_handle);
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ socket = shutdown_handle->socket;
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (shutdown_handle->operation)
+ {
+ case SHUT_RDWR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ break;
+ case SHUT_RD:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ break;
+ case SHUT_WR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ break;
+ default:
+ GNUNET_free (msg);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ return;
+ }
+ queue_message (socket, msg, NULL, NULL, GNUNET_NO);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ shutdown_handle);
+}
+
+
+/**
+ * Function to modify a bit in GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap the bitmap to modify
+ * @param bit the bit number to modify
+ * @param value GNUNET_YES to on, GNUNET_NO to off
+ */
+static void
+ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ unsigned int bit,
+ int value)
+{
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
+ if (GNUNET_YES == value)
+ *bitmap |= (1LL << bit);
+ else
+ *bitmap &= ~(1LL << bit);
+}
+
+
+/**
+ * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap address of the bitmap that has to be checked
+ * @param bit the bit number to check
+ * @return GNUNET_YES if the bit is set; GNUNET_NO if not
+ */
+static uint8_t
+ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ unsigned int bit)
+{
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
+ return 0 != (*bitmap & (1LL << bit));
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receiver_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
+ unsigned int packet;
+
+ for (packet=0; packet < io_handle->packets_sent; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Retransmitting DATA message with sequence %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ }
+ }
+ /* Now send new packets if there is enough buffer space */
+ while ( (NULL != io_handle->messages[packet]) &&
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)) &&
+ (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ {
+ socket->receiver_window_available -=
+ ntohs (io_handle->messages[packet]->header.header.size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ packet++;
+ }
+ io_handle->packets_sent = packet;
+ // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
+ if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+ socket->data_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 8),
+ &data_retransmission_task,
+ socket);
+}
+
+
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ * @param tc the task context
+ */
+static void
+call_read_processor (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+ size_t read_size;
+ size_t valid_read_size;
+ unsigned int packet;
+ uint32_t sequence_increase;
+ uint32_t offset_increase;
+
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ if (NULL == socket->receive_buffer)
+ return;
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+ /* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
+ /* We only call read processor if we have the first packet */
+ GNUNET_assert (0 < packet);
+ valid_read_size =
+ socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
+ GNUNET_assert (0 != valid_read_size);
+ /* Cancel the read_io_timeout_task */
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ /* Call the data processor */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ read_size =
+ socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
+ GNUNET_i2s (&socket->other_peer), read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
+ GNUNET_i2s (&socket->other_peer));
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_offset += read_size;
+ /* Determine upto which packet we can remove from the buffer */
+ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+ { packet++; break; }
+ if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
+ break;
+ }
+ /* If no packets can be removed we can't move the buffer */
+ if (0 == packet)
+ return;
+ sequence_increase = packet;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Sequence increase after read processor completion: %u\n",
+ GNUNET_i2s (&socket->other_peer), sequence_increase);
+ /* Shift the data in the receive buffer */
+ socket->receive_buffer =
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
+ /* Shift the bitmap */
+ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
+ /* Set read_sequence_number */
+ socket->read_sequence_number += sequence_increase;
+ /* Set read_offset */
+ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
+ socket->read_offset += offset_increase;
+ /* Fix copy_offset */
+ GNUNET_assert (offset_increase <= socket->copy_offset);
+ socket->copy_offset -= offset_increase;
+ /* Fix relative boundaries */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ {
+ uint32_t ahead_buffer_boundary;
+
+ ahead_buffer_boundary =
+ socket->receive_buffer_boundaries[packet + sequence_increase];
+ if (0 == ahead_buffer_boundary)
+ socket->receive_buffer_boundaries[packet] = 0;
+ else
+ {
+ GNUNET_assert (offset_increase < ahead_buffer_boundary);
+ socket->receive_buffer_boundaries[packet] =
+ ahead_buffer_boundary - offset_increase;
+ }
+ }
+ else
+ socket->receive_buffer_boundaries[packet] = 0;
+ }
+}
+
+
+/**
+ * Cancels the existing read io handle
+ *
+ * @param cls the closure from the SCHEDULER call
+ * @param tc the task context
+ */
+static void
+read_io_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
+
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read task timedout - Cancelling it\n",
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ GNUNET_free (read_handle);
+ socket->read_handle = NULL;
+ /* Call the read processor to signal timeout */
+ proc (proc_cls,
+ GNUNET_STREAM_TIMEOUT,
+ NULL,
+ 0);
+}
+
+
+/**
+ * Handler for DATA messages; Same for both client and server
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param msg the data message