+
+ size = htons (msg->header.header.size);
+ if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ /* check if the message's sequence number is in the range we are
+ expecting */
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
+ /* Check if we have already seen this message */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ relative_sequence_number))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring already received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ }
+ return GNUNET_YES;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
+ /* Check if we have to allocate the buffer */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Cannot accommodate packet %d as buffer is full\n",
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
+ /* Copy Data to buffer */
+ payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
+ memcpy (socket->receive_buffer + relative_offset, payload, size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+ GNUNET_YES);
+ /* Start ACK sending task if one is not already present */
+ ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ ack_deadline_rel =
+ GNUNET_TIME_relative_min (ack_deadline_rel,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300));
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ else
+ {
+ struct GNUNET_TIME_Relative ack_time_past;
+ struct GNUNET_TIME_Relative ack_time_remaining;
+ struct GNUNET_TIME_Relative ack_time_min;
+ ack_time_past =
+ GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+ ack_time_remaining = GNUNET_TIME_relative_subtract
+ (socket->ack_time_deadline, ack_time_past);
+ ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+ ack_deadline_rel);
+ if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+ sizeof (struct GNUNET_TIME_Relative)))
+ {
+ ack_deadline_rel = ack_time_min;
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+ &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ }
+ if ((NULL != socket->read_handle) /* A read handle is waiting */
+ /* There is no current read task */
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
+ /* We have the first packet */
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_handle->read_task_id =
+ GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+ }
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received data message when it cannot be handled\n",
+ GNUNET_i2s (&socket->other_peer));
+ break;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_data (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_data (socket, tunnel, sender,
+ (const struct GNUNET_STREAM_DataMessage *) message, atsi);
+}
+
+
+/**
+ * Callback to set state to ESTABLISHED
+ *
+ * @param cls the closure NULL;
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_established (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaining ESTABLISHED state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->write_offset = 0;
+ socket->read_offset = 0;
+ socket->state = STATE_ESTABLISHED;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+ socket->control_retransmission_task_id);
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL != socket->lsocket)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Calling listen callback\n",
+ GNUNET_i2s (&socket->other_peer));
+ if (GNUNET_SYSERR ==
+ socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
+ socket,
+ &socket->other_peer))
+ {
+ socket->state = STATE_CLOSED;
+ /* FIXME: We should close in a decent way (send RST) */
+ GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
+ GNUNET_free (socket);
+ }
+ }
+ else
+ socket->open_cb (socket->open_cls, socket);
+}
+
+
+/**
+ * Callback to set state to HELLO_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_hello_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_assert (STATE_INIT == socket->state);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaining HELLO_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_HELLO_WAIT;
+}
+
+
+/**
+ * Callback to set state to CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_RECEIVE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
+/**
+ * Callback to set state to CLOSED
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_closed (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ socket->state = STATE_CLOSED;
+}
+
+
+/**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+ struct GNUNET_STREAM_MessageHeader *msg;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ return msg;
+}
+
+
+/**
+ * Returns a new HelloAckMessage. Also sets the write sequence number for the
+ * socket
+ *
+ * @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ * GNUNET_NO to use the existing sequence number
+ * @return the HelloAckMessage
+ */
+static struct GNUNET_STREAM_HelloAckMessage *
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+ int generate_seq)
+{
+ struct GNUNET_STREAM_HelloAckMessage *msg;
+
+ if (GNUNET_YES == generate_seq)
+ {
+ if (GNUNET_YES == socket->testing_active)
+ socket->write_sequence_number =
+ socket->testing_set_write_sequence_number_value;
+ else
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ LOG_DEBUG ("%s: write sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->write_sequence_number);
+ }
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ msg->sequence_number = htonl (socket->write_sequence_number);
+ msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+ return msg;
+}
+
+
+/**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+ LOG_DEBUG ("%s: Retransmitting a control message\n",
+ GNUNET_i2s (&socket->other_peer));
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ GNUNET_break (0);
+ break;
+ case STATE_LISTEN:
+ GNUNET_break (0);
+ break;
+ case STATE_HELLO_WAIT:
+ if (NULL == socket->lsocket) /* We are client */
+ queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
+ else
+ queue_message (socket,
+ (struct GNUNET_STREAM_MessageHeader *)
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
+ break;
+ case STATE_ESTABLISHED:
+ if (NULL == socket->lsocket)
+ queue_message (socket,
+ (struct GNUNET_STREAM_MessageHeader *)
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
+ else
+ GNUNET_break (0);
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_hello_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO_ACK from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+ ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ GNUNET_assert (socket->tunnel == tunnel);
+ switch (socket->state)
+ {
+ case STATE_HELLO_WAIT:
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->read_sequence_number);
+ socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
+ reply = generate_hello_ack (socket, GNUNET_YES);
+ queue_message (socket, &reply->header, &set_state_established,
+ NULL, GNUNET_NO);
+ return GNUNET_OK;
+ case STATE_ESTABLISHED:
+ // call statistics (# ACKs ignored++)
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
+ return GNUNET_OK;
+ default:
+ LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer), socket->state);
+ socket->state = STATE_CLOSED; // introduce STATE_ERROR?
+ return GNUNET_SYSERR;
+ }
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_reset (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ // struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Common message handler for handling TRANSMIT_CLOSE messages
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param msg the transmit close message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *reply;
+
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ socket->state = STATE_RECEIVE_CLOSED;
+ /* Send TRANSMIT_CLOSE_ACK */
+ reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
+ reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket, reply, NULL, NULL, GNUNET_NO);
+ break;
+ default:
+ /* FIXME: Call statistics? */
+ break;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_transmit_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @param operation the close operation which is being ACK'ed
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi,
+ int operation)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ shutdown_handle = socket->shutdown_handle;
+ if (NULL == shutdown_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ switch (operation)
+ {
+ case SHUT_RDWR:
+ switch (socket->state)
+ {
+ case STATE_CLOSE_WAIT:
+ if (SHUT_RDWR != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when shutdown handle is not for "
+ "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ break;
+ case SHUT_RD:
+ switch (socket->state)
+ {
+ case STATE_RECEIVE_CLOSE_WAIT:
+ if (SHUT_RD != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
+ "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_RECEIVE_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ break;
+ case SHUT_WR:
+ switch (socket->state)
+ {
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ if (SHUT_WR != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
+ "is not for SHUT_WR\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_TRANSMIT_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
+ shutdown_handle->completion_cb(shutdown_handle->completion_cls,
+ operation);
+ if (GNUNET_SCHEDULER_NO_TASK
+ != shutdown_handle->close_msg_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel
+ (shutdown_handle->close_msg_retransmission_task_id);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (shutdown_handle); /* Free shutdown handle */
+ socket->shutdown_handle = NULL;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_transmit_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_receive_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *receive_close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ receive_close_ack =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
+ queue_message (socket, receive_close_ack, &set_state_closed,
+ NULL, GNUNET_NO);
+ /* FIXME: Handle the case where write handle is present; the write operation
+ should be deemed as finised and the write continuation callback
+ has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_receive_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_receive_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
+ queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
+ if (socket->state == STATE_CLOSED)
+ return GNUNET_OK;
+
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
+}
+
+/*****************************/
+/* Server's Message Handlers */
+/*****************************/
+
+/**
+ * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_data (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_data (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_DataMessage *)message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_hello (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
+ GNUNET_assert (socket->tunnel == tunnel);
+ LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ reply = generate_hello_ack (socket, GNUNET_YES);
+ queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
+ GNUNET_NO);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
+ break;
+ case STATE_HELLO_WAIT:
+ /* Perhaps our HELLO_ACK was lost */
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+ socket->control_retransmission_task_id);
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
+ break;
+ default:
+ LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
+ GNUNET_i2s (&socket->other_peer), socket->state);
+ /* FIXME: Send RESET? */
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_hello_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_message;
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
+ ntohs (message->type));
+ GNUNET_assert (socket->tunnel == tunnel);
+ ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
+ switch (socket->state)
+ {
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_sequence_number = ntohl (ack_message->sequence_number);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->read_sequence_number);
+ socket->receiver_window_available =
+ ntohl (ack_message->receiver_window_size);
+ set_state_established (NULL, socket);
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client sent HELLO_ACK when in state %d\n", socket->state);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_reset (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_transmit_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_transmit_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_receive_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_receive_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param cls the listen socket (from GNUNET_MESH_connect in
+ * GNUNET_STREAM_listen)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
+}
+
+
+/**
+ * Handler for DATA_ACK messages
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param ack the acknowledgment message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_ack (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_AckMessage *ack,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ unsigned int packet;
+ int need_retransmission;
+ uint32_t sequence_difference;
+
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received ACK from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+ switch (socket->state)
+ {
+ case (STATE_ESTABLISHED):
+ case (STATE_RECEIVE_CLOSED):
+ case (STATE_RECEIVE_CLOSE_WAIT):
+ if (NULL == socket->write_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK when write_handle is NULL\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ sequence_difference =
+ socket->write_sequence_number - ntohl (ack->base_sequence_number);
+ if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK with unexpected base sequence number\n",
+ GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Current write sequence: %u; Ack's base sequence: %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ socket->write_sequence_number,
+ ntohl (ack->base_sequence_number));
+ return GNUNET_OK;
+ }
+ /* FIXME: include the case when write_handle is cancelled - ignore the
+ acks */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ /* Cancel the retransmission task */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ /* BS: Base sequence from ack; PS: sequence num of current packet */
+ sequence_difference = ntohl (ack->base_sequence_number)
+ - ntohl (socket->write_handle->messages[packet]->sequence_number);
+ if ((0 == sequence_difference) ||
+ (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
+ continue; /* The message in our handle is not yet received */
+ /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+ /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet, GNUNET_YES);
+ }
+ /* Update the receive window remaining
+ FIXME : Should update with the value from a data ack with greater
+ sequence number */
+ socket->receiver_window_available =
+ ntohl (ack->receive_window_remaining);
+ /* Check if we have received all acknowledgements */
+ need_retransmission = GNUNET_NO;
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ if (GNUNET_YES != ackbitmap_is_bit_set
+ (&socket->write_handle->ack_bitmap,packet))
+ {
+ need_retransmission = GNUNET_YES;
+ break;
+ }
+ }
+ if (GNUNET_YES == need_retransmission)
+ {
+ write_data (socket);
+ }
+ else /* We have to call the write continuation callback now */
+ {
+ struct GNUNET_STREAM_IOWriteHandle *write_handle;
+
+ /* Free the packets */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ GNUNET_free_non_null (socket->write_handle->messages[packet]);
+ }
+ write_handle = socket->write_handle;
+ socket->write_handle = NULL;
+ if (NULL != write_handle->write_cont)
+ write_handle->write_cont (write_handle->write_cont_cls,
+ socket->status,
+ write_handle->size);
+ /* We are done with the write handle - Freeing it */
+ GNUNET_free (write_handle);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Write completion callback completed\n",
+ GNUNET_i2s (&socket->other_peer));
+ }
+ break;
+ default:
+ break;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for DATA_ACK messages
+ *
+ * @param cls the 'struct GNUNET_STREAM_Socket'
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx unused
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
+ return handle_ack (socket, tunnel, sender, ack, atsi);
+}
+
+
+/**
+ * Handler for DATA_ACK messages
+ *
+ * @param cls the server's listen socket
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
+ return handle_ack (socket, tunnel, sender, ack, atsi);
+}
+
+
+/**
+ * For client message handlers, the stream socket is in the
+ * closure argument.
+ */
+static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
+ {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
+ {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
+ sizeof (struct GNUNET_STREAM_AckMessage) },
+ {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
+ {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {NULL, 0, 0}
+};
+
+
+/**
+ * For server message handlers, the stream socket is in the
+ * tunnel context, and the listen socket in the closure argument.
+ */
+static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
+ {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
+ {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
+ sizeof (struct GNUNET_STREAM_AckMessage) },
+ {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
+ {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},