From 260604bbbc16a733158aac296eb1547c98816922 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Thu, 2 May 2013 08:07:45 +0000 Subject: [PATCH] fix #2877: stream doesn't distinguish between the application ports --- src/stream/stream.h | 40 ++---- src/stream/stream_api.c | 298 ++++++++++++++++++---------------------- 2 files changed, 147 insertions(+), 191 deletions(-) diff --git a/src/stream/stream.h b/src/stream/stream.h index 5c85f8b34..d9ba09f11 100644 --- a/src/stream/stream.h +++ b/src/stream/stream.h @@ -41,17 +41,20 @@ GNUNET_NETWORK_STRUCT_BEGIN /** - * The stream message header - * All messages of STREAM should commonly have this as header + * The HELLO message to begin the handshake */ -struct GNUNET_STREAM_MessageHeader +struct GNUNET_STREAM_HelloMessage { /** - * The GNUNET message header, types are from GNUNET_MESSAGE_TYPE_STREAM_*-range. + * Type is GNUNET_MESSAGE_TYPE_STREAM_HELLO */ struct GNUNET_MessageHeader header; -}; + /** + * The application port number + */ + uint64_t port GNUNET_PACKED;; +}; /** * The Data message, should be prefixed with stream header with its type set to @@ -63,7 +66,7 @@ struct GNUNET_STREAM_DataMessage /** * Type is GNUNET_MESSAGE_TYPE_STREAM_DATA */ - struct GNUNET_STREAM_MessageHeader header; + struct GNUNET_MessageHeader header; /** * Sequence number; starts with a random value. (Just in case @@ -112,7 +115,7 @@ struct GNUNET_STREAM_AckMessage /** * Type is GNUNET_MESSAGE_TYPE_STREAM_ACK */ - struct GNUNET_STREAM_MessageHeader header; + struct GNUNET_MessageHeader header; /** * The sequence number of the next Data Message receiver is @@ -142,7 +145,7 @@ struct GNUNET_STREAM_HelloAckMessage /** * The stream message header */ - struct GNUNET_STREAM_MessageHeader header; + struct GNUNET_MessageHeader header; /** * The selected sequence number. Following data tranmissions from the sender @@ -158,25 +161,6 @@ struct GNUNET_STREAM_HelloAckMessage uint32_t receiver_window_size GNUNET_PACKED; }; - -/** - * The Transmit close message(used to signal transmission is closed) - * FIXME: dead struct? - */ -struct GNUNET_STREAM_TransmitCloseMessage -{ - /** - * The stream message header - */ - struct GNUNET_STREAM_MessageHeader header; - - /** - * The last sequence number of the packet after which the transmission has - * ended - */ - uint32_t final_sequence_number GNUNET_PACKED; -}; - GNUNET_NETWORK_STRUCT_END @@ -188,3 +172,5 @@ GNUNET_NETWORK_STRUCT_END #endif #endif /* STREAM.H */ + +/* End of stream.h */ diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 4042e05cc..28e908105 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -145,7 +145,7 @@ struct MessageQueue /** * The message */ - struct GNUNET_STREAM_MessageHeader *message; + struct GNUNET_MessageHeader *message; /** * Callback to be called when the message is sent @@ -313,7 +313,7 @@ struct GNUNET_STREAM_Socket /** * The application port number (type: uint32_t) */ - GNUNET_MESH_ApplicationType app_port; + GNUNET_MESH_ApplicationType port; /** * The write sequence number to be set incase of testing @@ -621,12 +621,12 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_NO, /* Corking */ socket->mesh_retry_timeout, &socket->other_peer, - ntohs (head->message->header.size), + ntohs (head->message->size), &send_message_notify, socket); return 0; } - ret = ntohs (head->message->header.size); + ret = ntohs (head->message->size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); if (NULL != head->finish_cb) @@ -649,7 +649,7 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_NO, /* Corking */ socket->mesh_retry_timeout, &socket->other_peer, - ntohs (head->message->header.size), + ntohs (head->message->size), &send_message_notify, socket); } @@ -669,21 +669,19 @@ send_message_notify (void *cls, size_t size, void *buf) */ static void queue_message (struct GNUNET_STREAM_Socket *socket, - struct GNUNET_STREAM_MessageHeader *message, + struct GNUNET_MessageHeader *message, SendFinishCallback finish_cb, void *finish_cb_cls, int urgent) { struct MessageQueue *queue_entity; - GNUNET_assert - ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) - && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s: Queueing message of type %d and size %d\n", + GNUNET_assert ((ntohs (message->type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) + && (ntohs (message->type) + <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Queueing message of type %d and size %d\n", GNUNET_i2s (&socket->other_peer), - ntohs (message->header.type), - ntohs (message->header.size)); + ntohs (message->type),ntohs (message->size)); GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; @@ -711,7 +709,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket, GNUNET_NO, /* Corking */ socket->mesh_retry_timeout, &socket->other_peer, - ntohs (message->header.size), + ntohs (message->size), &send_message_notify, socket); } @@ -729,14 +727,14 @@ queue_message (struct GNUNET_STREAM_Socket *socket, */ static void copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_MessageHeader *message, SendFinishCallback finish_cb, void *finish_cb_cls) { - struct GNUNET_STREAM_MessageHeader *msg_copy; + struct GNUNET_MessageHeader *msg_copy; uint16_t size; - size = ntohs (message->header.size); + size = ntohs (message->size); msg_copy = GNUNET_malloc (size); memcpy (msg_copy, message, size); queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO); @@ -793,9 +791,9 @@ ack_task (void *cls, return; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); - ack_msg->header.header.size = htons (sizeof (struct + ack_msg->header.size = htons (sizeof (struct GNUNET_STREAM_AckMessage)); - ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); + ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK); ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap); ack_msg->base_sequence_number = htonl (socket->read_sequence_number); ack_msg->receive_window_remaining = @@ -816,7 +814,7 @@ close_msg_retransmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; - struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_MessageHeader *msg; struct GNUNET_STREAM_Socket *socket; shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK; @@ -824,18 +822,18 @@ close_msg_retransmission_task (void *cls, if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) return; socket = shutdown_handle->socket; - msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); + msg->size = htons (sizeof (struct GNUNET_MessageHeader)); switch (shutdown_handle->operation) { case SHUT_RDWR: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); break; case SHUT_RD: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); break; case SHUT_WR: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); break; default: GNUNET_free (msg); @@ -918,10 +916,10 @@ write_data (struct GNUNET_STREAM_Socket *socket) while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) && (NULL != io_handle->messages[packet]) && (socket->receiver_window_available - >= ntohs (io_handle->messages[packet]->header.header.size))) + >= ntohs (io_handle->messages[packet]->header.size))) { socket->receiver_window_available -= - ntohs (io_handle->messages[packet]->header.header.size); + ntohs (io_handle->messages[packet]->header.size); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Placing DATA message with sequence %u in send queue\n", GNUNET_i2s (&socket->other_peer), @@ -1150,7 +1148,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, uint32_t relative_sequence_number; uint16_t size; - size = htons (msg->header.header.size); + size = htons (msg->header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) { GNUNET_break_op (0); @@ -1208,9 +1206,9 @@ handle_data (struct GNUNET_STREAM_Socket *socket, return GNUNET_YES; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s: Receiving DATA with sequence number: %u and size: %d from %s\n", - GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number), - ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer)); + "%1$s: Receiving DATA with sequence number: %2$u and size: %3$d from " + "%1$s\n", GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number), ntohs (msg->header.size)); /* Check if we have to allocate the buffer */ size -= sizeof (struct GNUNET_STREAM_DataMessage); relative_offset = ntohl (msg->offset) - socket->read_offset; @@ -1460,15 +1458,16 @@ set_state_closed (void *cls, * * @return the generate hello message */ -static struct GNUNET_STREAM_MessageHeader * -generate_hello (void) +static struct GNUNET_MessageHeader * +generate_hello (struct GNUNET_STREAM_Socket *socket) { - struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_HelloMessage *msg; - msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloMessage)); msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO); - msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - return msg; + msg->header.size = htons (sizeof (struct GNUNET_STREAM_HelloMessage)); + msg->port = GNUNET_htonll ((uint64_t) socket->port); + return &msg->header; } @@ -1500,9 +1499,9 @@ generate_hello_ack (struct GNUNET_STREAM_Socket *socket, (unsigned int) socket->write_sequence_number); } msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - msg->header.header.size = + msg->header.size = htons (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); msg->sequence_number = htonl (socket->write_sequence_number); msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE); return msg; @@ -1537,10 +1536,10 @@ control_retransmission_task (void *cls, break; case STATE_HELLO_WAIT: if (NULL == socket->lsocket) /* We are client */ - queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO); + queue_message (socket, generate_hello (socket), NULL, NULL, GNUNET_NO); else queue_message (socket, - (struct GNUNET_STREAM_MessageHeader *) + (struct GNUNET_MessageHeader *) generate_hello_ack (socket, GNUNET_NO), NULL, NULL, GNUNET_NO); socket->control_retransmission_task_id = @@ -1550,7 +1549,7 @@ control_retransmission_task (void *cls, case STATE_ESTABLISHED: if (NULL == socket->lsocket) queue_message (socket, - (struct GNUNET_STREAM_MessageHeader *) + (struct GNUNET_MessageHeader *) generate_hello_ack (socket, GNUNET_NO), NULL, NULL, GNUNET_NO); else @@ -1726,10 +1725,10 @@ static int handle_transmit_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *msg, + const struct GNUNET_MessageHeader *msg, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_MessageHeader *reply; + struct GNUNET_MessageHeader *reply; switch (socket->state) { @@ -1744,13 +1743,12 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, break; } /* Send TRANSMIT_CLOSE_ACK */ - reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); - reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); + reply->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); + reply->size = htons (sizeof (struct GNUNET_MessageHeader)); queue_message (socket, reply, NULL, NULL, GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n", - GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%1$s: Received TRANSMIT_CLOSE from %1$s\n", + GNUNET_i2s (&socket->other_peer)); switch(socket->state) { case STATE_RECEIVE_CLOSED: @@ -1795,7 +1793,7 @@ client_handle_transmit_close (void *cls, return handle_transmit_close (socket, tunnel, sender, - (struct GNUNET_STREAM_MessageHeader *)message, + (struct GNUNET_MessageHeader *)message, atsi); } @@ -1838,7 +1836,7 @@ static int handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information *atsi, int operation) { @@ -1964,7 +1962,7 @@ client_handle_transmit_close_ack (void *cls, return handle_generic_close_ack (socket, tunnel, sender, - (const struct GNUNET_STREAM_MessageHeader *) + (const struct GNUNET_MessageHeader *) message, atsi, SHUT_WR); @@ -1986,10 +1984,10 @@ static int handle_receive_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information *atsi) { - struct GNUNET_STREAM_MessageHeader *receive_close_ack; + struct GNUNET_MessageHeader *receive_close_ack; switch (socket->state) { @@ -2005,12 +2003,10 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, } LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n", GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); - receive_close_ack = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - receive_close_ack->header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - receive_close_ack->header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); + receive_close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); + receive_close_ack->size = htons (sizeof (struct GNUNET_MessageHeader)); + receive_close_ack->type = + htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO); switch (socket->state) { @@ -2057,7 +2053,7 @@ client_handle_receive_close (void *cls, handle_receive_close (socket, tunnel, sender, - (const struct GNUNET_STREAM_MessageHeader *) message, + (const struct GNUNET_MessageHeader *) message, atsi); } @@ -2087,7 +2083,7 @@ client_handle_receive_close_ack (void *cls, return handle_generic_close_ack (socket, tunnel, sender, - (const struct GNUNET_STREAM_MessageHeader *) + (const struct GNUNET_MessageHeader *) message, atsi, SHUT_RD); @@ -2109,10 +2105,10 @@ static int handle_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_MessageHeader *close_ack; + struct GNUNET_MessageHeader *close_ack; switch (socket->state) { @@ -2128,9 +2124,9 @@ handle_close (struct GNUNET_STREAM_Socket *socket, } LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n", GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer)); - close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); + close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); + close_ack->size = htons (sizeof (struct GNUNET_MessageHeader)); + close_ack->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO); if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state)) return GNUNET_OK; @@ -2167,7 +2163,7 @@ client_handle_close (void *cls, return handle_close (socket, tunnel, sender, - (const struct GNUNET_STREAM_MessageHeader *) message, + (const struct GNUNET_MessageHeader *) message, atsi); } @@ -2197,7 +2193,7 @@ client_handle_close_ack (void *cls, return handle_generic_close_ack (socket, tunnel, sender, - (const struct GNUNET_STREAM_MessageHeader *) + (const struct GNUNET_MessageHeader *) message, atsi, SHUT_RDWR); @@ -2258,8 +2254,11 @@ server_handle_hello (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; + const struct GNUNET_STREAM_HelloMessage *hello; struct GNUNET_STREAM_HelloAckMessage *reply; + uint32_t port; + hello = (const struct GNUNET_STREAM_HelloMessage *) message; if (0 != memcmp (sender, &socket->other_peer, sizeof (struct GNUNET_PeerIdentity))) @@ -2270,11 +2269,19 @@ server_handle_hello (void *cls, } GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); - LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer), + LOG_DEBUG ("%1$s: Received HELLO from %1$s\n", GNUNET_i2s (&socket->other_peer)); + port = (uint32_t) GNUNET_ntohll (hello->port); switch (socket->state) { case STATE_INIT: + if (port != socket->port) + { + LOG_DEBUG ("Ignoring HELLO for port %u\n", port); + GNUNET_MESH_tunnel_destroy (tunnel); + GNUNET_free (socket); + return GNUNET_OK; + } reply = generate_hello_ack (socket, GNUNET_YES); queue_message (socket, &reply->header, &set_state_hello_wait, NULL, GNUNET_NO); @@ -2373,7 +2380,7 @@ server_handle_reset (void *cls, const struct GNUNET_ATS_Information*atsi) { // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - + /* FIXME */ return GNUNET_OK; } @@ -2400,11 +2407,7 @@ server_handle_transmit_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_transmit_close (socket, - tunnel, - sender, - (struct GNUNET_STREAM_MessageHeader *)message, - atsi); + return handle_transmit_close (socket, tunnel, sender, message, atsi); } @@ -2430,12 +2433,7 @@ server_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_generic_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) - message, - atsi, + return handle_generic_close_ack (socket, tunnel, sender, message, atsi, SHUT_WR); } @@ -2462,12 +2460,7 @@ server_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return - handle_receive_close (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) message, - atsi); + return handle_receive_close (socket, tunnel, sender, message, atsi); } @@ -2493,12 +2486,7 @@ server_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_generic_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) - message, - atsi, + return handle_generic_close_ack (socket, tunnel, sender, message, atsi, SHUT_RD); } @@ -2526,11 +2514,7 @@ server_handle_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_close (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) message, - atsi); + return handle_close (socket, tunnel, sender, message, atsi); } @@ -2556,12 +2540,7 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_generic_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) - message, - atsi, + return handle_generic_close_ack (socket, tunnel, sender, message, atsi, SHUT_RDWR); } @@ -2742,8 +2721,9 @@ client_handle_ack (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; - const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message; - + const struct GNUNET_STREAM_AckMessage *ack; + + ack = (const struct GNUNET_STREAM_AckMessage *) message; return handle_ack (socket, tunnel, sender, ack, atsi); } @@ -2786,19 +2766,19 @@ static struct GNUNET_MESH_MessageHandler client_message_handlers[] = { {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, sizeof (struct GNUNET_STREAM_HelloAckMessage)}, {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {NULL, 0, 0} }; @@ -2812,23 +2792,23 @@ static struct GNUNET_MESH_MessageHandler server_message_handlers[] = { {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) }, {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_STREAM_HelloMessage)}, {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, sizeof (struct GNUNET_STREAM_HelloAckMessage)}, {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK, - sizeof (struct GNUNET_STREAM_MessageHeader)}, + sizeof (struct GNUNET_MessageHeader)}, {NULL, 0, 0} }; @@ -2846,7 +2826,7 @@ mesh_peer_connect_callback (void *cls, const struct GNUNET_ATS_Information * atsi) { struct GNUNET_STREAM_Socket *socket = cls; - struct GNUNET_STREAM_MessageHeader *message; + struct GNUNET_MessageHeader *message; if (0 != memcmp (peer, &socket->other_peer, @@ -2864,10 +2844,10 @@ mesh_peer_connect_callback (void *cls, /* Set state to INIT */ socket->state = STATE_INIT; /* Send HELLO message */ - message = generate_hello (); + message = generate_hello (socket); queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == - socket->control_retransmission_task_id); + if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id) + GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id); socket->control_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, &control_retransmission_task, socket); @@ -2887,10 +2867,8 @@ mesh_peer_disconnect_callback (void *cls, struct GNUNET_STREAM_Socket *socket=cls; /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s: Other peer %s disconnected \n", - GNUNET_i2s (&socket->other_peer), - GNUNET_i2s (&socket->other_peer)); + LOG_DEBUG ("%1$s: Other peer %1$s disconnected \n", + GNUNET_i2s (&socket->other_peer)); } @@ -2915,7 +2893,6 @@ new_tunnel_notify (void *cls, /* FIXME: If a tunnel is already created, we should not accept new tunnels from the same peer again until the socket is closed */ - if (GNUNET_NO == lsocket->listening) { GNUNET_MESH_tunnel_destroy (tunnel); @@ -2926,16 +2903,15 @@ new_tunnel_notify (void *cls, socket->tunnel = tunnel; socket->state = STATE_INIT; socket->lsocket = lsocket; + socket->port = lsocket->port; socket->stat_handle = lsocket->stat_handle; socket->retransmit_timeout = lsocket->retransmit_timeout; socket->testing_active = lsocket->testing_active; socket->testing_set_write_sequence_number_value = lsocket->testing_set_write_sequence_number_value; socket->max_payload_size = lsocket->max_payload_size; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s: Peer %s initiated tunnel to us\n", - GNUNET_i2s (&socket->other_peer), - GNUNET_i2s (&socket->other_peer)); + LOG_DEBUG ("%1$s: Peer %1$s initiated tunnel to us\n", + GNUNET_i2s (&socket->other_peer)); if (NULL != socket->stat_handle) { GNUNET_STATISTICS_update (socket->stat_handle, @@ -2944,7 +2920,6 @@ new_tunnel_notify (void *cls, GNUNET_STATISTICS_update (socket->stat_handle, "inbound connections", 1, GNUNET_NO); } - return socket; } @@ -3113,7 +3088,6 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_STREAM_Socket *socket; enum GNUNET_STREAM_Option option; - GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; uint16_t payload_size; @@ -3124,6 +3098,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, socket->other_peer = *target; socket->open_cb = open_cb; socket->open_cls = open_cb_cls; + socket->port = app_port; /* Set defaults */ socket->retransmit_timeout = TIME_REL_SECS (default_timeout); socket->testing_active = GNUNET_NO; @@ -3165,7 +3140,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, NULL, /* No inbound tunnel handler */ NULL, /* No in-tunnel cleaner */ client_message_handlers, - ports); /* We don't get inbound tunnels */ + NULL); /* We don't get inbound tunnels */ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ { GNUNET_free (socket); @@ -3204,7 +3179,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, void *completion_cls) { struct GNUNET_STREAM_ShutdownHandle *handle; - struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_MessageHeader *msg; GNUNET_assert (NULL == socket->shutdown_handle); handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle)); @@ -3223,8 +3198,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, socket); return handle; } - msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); + msg->size = htons (sizeof (struct GNUNET_MessageHeader)); switch (operation) { case SHUT_RD: @@ -3233,7 +3208,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, LOG (GNUNET_ERROR_TYPE_WARNING, "Existing read handle should be cancelled before shutting" " down reading\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); queue_message (socket, msg, &set_state_receive_close_wait, NULL, GNUNET_NO); socket->receive_closed = GNUNET_YES; @@ -3244,7 +3219,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, LOG (GNUNET_ERROR_TYPE_WARNING, "Existing write handle should be cancelled before shutting" " down writing\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); queue_message (socket, msg, &set_state_transmit_close_wait, NULL, GNUNET_NO); socket->transmit_closed = GNUNET_YES; @@ -3259,7 +3234,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, LOG (GNUNET_ERROR_TYPE_WARNING, "Existing read handle should be cancelled before shutting" " down reading\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO); socket->transmit_closed = GNUNET_YES; socket->receive_closed = GNUNET_YES; @@ -3514,11 +3489,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, void *write_cont_cls) { struct GNUNET_STREAM_WriteHandle *io_handle; - struct GNUNET_STREAM_DataMessage *data_msg; + struct GNUNET_STREAM_DataMessage *dmsg; const void *sweep; struct GNUNET_TIME_Relative ack_deadline; unsigned int num_needed_packets; - unsigned int packet; + unsigned int cnt; uint32_t packet_size; uint32_t payload_size; uint16_t max_data_packet_size; @@ -3578,33 +3553,29 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, max_data_packet_size = socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage); io_handle->max_ack_base_num = socket->write_sequence_number; - for (packet=0; packet < num_needed_packets; packet++) + for (cnt=0; cnt < num_needed_packets; cnt++) { - if ((packet + 1) * socket->max_payload_size < size) + if ((cnt + 1) * socket->max_payload_size < size) { payload_size = socket->max_payload_size; packet_size = max_data_packet_size; } else { - payload_size = size - packet * socket->max_payload_size; - packet_size = - payload_size + sizeof (struct GNUNET_STREAM_DataMessage); + payload_size = size - (cnt * socket->max_payload_size); + packet_size = payload_size + sizeof (struct GNUNET_STREAM_DataMessage); } - io_handle->messages[packet] = GNUNET_malloc (packet_size); - io_handle->messages[packet]->header.header.size = htons (packet_size); - io_handle->messages[packet]->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); - io_handle->messages[packet]->sequence_number = - htonl (socket->write_sequence_number++); - io_handle->messages[packet]->offset = htonl (socket->write_offset); + dmsg = GNUNET_malloc (packet_size); + dmsg->header.size = htons (packet_size); + dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + dmsg->sequence_number = htonl (socket->write_sequence_number++); + dmsg->offset = htonl (socket->write_offset); /* FIXME: Remove the fixed delay for ack deadline; Set it to the value determined from RTT */ - io_handle->messages[packet]->ack_deadline = - GNUNET_TIME_relative_hton (ack_deadline); - data_msg = io_handle->messages[packet]; + dmsg->ack_deadline = GNUNET_TIME_relative_hton (ack_deadline); /* Copy data from given buffer to the packet */ - memcpy (&data_msg[1], sweep, payload_size); + memcpy (&dmsg[1], sweep, payload_size); + io_handle->messages[cnt] = dmsg; sweep += payload_size; socket->write_offset += payload_size; } @@ -3645,7 +3616,6 @@ probe_data_availability (void *cls, } - /** * Tries to read data from the stream. Should not be called when another read * handle is present; the existing read handle should be canceled with -- 2.25.1