+/**
+ * Cancels the existing read io handle
+ *
+ * @param cls the closure from the SCHEDULER call
+ * @param tc the task context
+ */
+static void
+read_io_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
+
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read task timedout - Cancelling it\n",
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ GNUNET_free (read_handle);
+ socket->read_handle = NULL;
+ /* Call the read processor to signal timeout */
+ proc (proc_cls,
+ GNUNET_STREAM_TIMEOUT,
+ NULL,
+ 0);
+}
+
+
+/**
+ * Handler for DATA messages; Same for both client and server
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param msg the data message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_data (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_DataMessage *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ const void *payload;
+ struct GNUNET_TIME_Relative ack_deadline_rel;
+ uint32_t bytes_needed;
+ uint32_t relative_offset;
+ uint32_t relative_sequence_number;
+ uint16_t size;
+
+ size = htons (msg->header.header.size);
+ if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ /* check if the message's sequence number is in the range we are
+ expecting */
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
+ /* Check if we have already seen this message */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ relative_sequence_number))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring already received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ }
+ return GNUNET_YES;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
+ /* Check if we have to allocate the buffer */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Cannot accommodate packet %d as buffer is full\n",
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
+ /* Copy Data to buffer */
+ payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
+ memcpy (socket->receive_buffer + relative_offset, payload, size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+ GNUNET_YES);
+ /* Start ACK sending task if one is not already present */
+ ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ ack_deadline_rel =
+ GNUNET_TIME_relative_min (ack_deadline_rel,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300));
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ else
+ {
+ struct GNUNET_TIME_Relative ack_time_past;
+ struct GNUNET_TIME_Relative ack_time_remaining;
+ struct GNUNET_TIME_Relative ack_time_min;
+ ack_time_past =
+ GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+ ack_time_remaining = GNUNET_TIME_relative_subtract
+ (socket->ack_time_deadline, ack_time_past);
+ ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+ ack_deadline_rel);
+ if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+ sizeof (struct GNUNET_TIME_Relative)))
+ {
+ ack_deadline_rel = ack_time_min;
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+ &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }