From dfba021b14dc9ee9cb961e892f5ca62eb6e10281 Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Thu, 8 Mar 2012 16:30:42 +0000 Subject: [PATCH] peer interning --- src/stream/stream_api.c | 264 +++++++++++++++++++++++----------------- 1 file changed, 155 insertions(+), 109 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 19a2d918d..4014f4957 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -19,10 +19,13 @@ */ /* TODO: - * Copy MESH handle from lsocket to socket * * Checks for matching the sender and socket->other_peer in server - * message handlers */ + * message handlers + * + * Decrement PEER intern count during socket close and listen close to free the + * memory used for PEER interning + **/ /** * @file stream/stream_api.c @@ -157,16 +160,6 @@ struct MessageQueue */ struct GNUNET_STREAM_Socket { - /** - * The peer identity of the peer at the other end of the stream - */ - struct GNUNET_PeerIdentity other_peer; - - /** - * Our Peer Identity (for debugging) - */ - struct GNUNET_PeerIdentity our_id; - /** * Retransmission timeout */ @@ -187,16 +180,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_TIME_Relative ack_time_deadline; - /** - * The task for sending timely Acks - */ - GNUNET_SCHEDULER_TaskIdentifier ack_task_id; - - /** - * Task scheduled to continue a read operation. - */ - GNUNET_SCHEDULER_TaskIdentifier read_task; - /** * The mesh handle */ @@ -257,6 +240,16 @@ struct GNUNET_STREAM_Socket */ GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + /** + * The task for sending timely Acks + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task_id; + + /** + * Task scheduled to continue a read operation. + */ + GNUNET_SCHEDULER_TaskIdentifier read_task_id; + /** * The state of the protocol associated with this socket */ @@ -277,6 +270,16 @@ struct GNUNET_STREAM_Socket */ unsigned int derived; + /** + * The peer identity of the peer at the other end of the stream + */ + GNUNET_PEER_Id other_peer; + + /** + * Our Peer Identity (for debugging) + */ + GNUNET_PEER_Id our_id; + /** * The application port number (type: uint32_t) */ @@ -336,12 +339,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_STREAM_ListenSocket { - - /** - * Our Peer's identity - */ - struct GNUNET_PeerIdentity our_id; - /** * The mesh handle */ @@ -357,6 +354,11 @@ struct GNUNET_STREAM_ListenSocket */ void *listen_cb_cls; + /** + * Our interned Peer's identity + */ + GNUNET_PEER_Id our_id; + /** * The service port * FIXME: Remove if not required! @@ -427,7 +429,6 @@ struct GNUNET_STREAM_IOReadHandle */ static unsigned int default_timeout = 10; - /** * Callback function for sending hello message * @@ -440,6 +441,7 @@ static size_t send_message_notify (void *cls, size_t size, void *buf) { struct GNUNET_STREAM_Socket *socket = cls; + struct GNUNET_PeerIdentity target; struct MessageQueue *head; size_t ret; @@ -447,6 +449,7 @@ send_message_notify (void *cls, size_t size, void *buf) head = socket->queue_head; if (NULL == head) return 0; /* just to be safe */ + GNUNET_PEER_resolve (socket->other_peer, &target); if (0 == size) /* request timed out */ { socket->retries++; @@ -459,7 +462,7 @@ send_message_notify (void *cls, size_t size, void *buf) 1, /* Priority */ /* FIXME: exponential backoff */ socket->retransmit_timeout, - &socket->other_peer, + &target, ntohs (head->message->header.size), &send_message_notify, socket); @@ -488,7 +491,7 @@ send_message_notify (void *cls, size_t size, void *buf) 1, /* Priority */ /* FIXME: exponential backoff */ socket->retransmit_timeout, - &socket->other_peer, + &target, ntohs (head->message->header.size), &send_message_notify, socket); @@ -512,14 +515,15 @@ queue_message (struct GNUNET_STREAM_Socket *socket, void *finish_cb_cls) { struct MessageQueue *queue_entity; + struct GNUNET_PeerIdentity target; GNUNET_assert ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Queueing message of type %d and size %d\n", - GNUNET_i2s (&socket->our_id), + "%x: Queueing message of type %d and size %d\n", + socket->our_id, ntohs (message->header.type), ntohs (message->header.size)); GNUNET_assert (NULL != message); @@ -533,12 +537,13 @@ queue_message (struct GNUNET_STREAM_Socket *socket, if (NULL == socket->transmit_handle) { socket->retries = 0; + GNUNET_PEER_resolve (socket->other_peer, &target); socket->transmit_handle = GNUNET_MESH_notify_transmit_ready (socket->tunnel, 0, /* Corking */ 1, /* Priority */ socket->retransmit_timeout, - &socket->other_peer, + &target, ntohs (message->header.size), &send_message_notify, socket); @@ -623,7 +628,7 @@ retransmission_timeout_task (void *cls, return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id)); + "%x: Retransmitting DATA...\n", socket->our_id); socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; write_data (socket); } @@ -641,6 +646,7 @@ ack_task (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; struct GNUNET_STREAM_AckMessage *ack_msg; + struct GNUNET_PeerIdentity target; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) { @@ -659,12 +665,13 @@ ack_task (void *cls, ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); + GNUNET_PEER_resolve (socket->other_peer, &target); /* Request MESH for sending ACK */ GNUNET_MESH_notify_transmit_ready (socket->tunnel, 0, /* Corking */ 1, /* Priority */ socket->retransmit_timeout, - &socket->other_peer, + &target, ntohs (ack_msg->header.header.size), &send_ack_notify, ack_msg); @@ -756,9 +763,8 @@ write_data (struct GNUNET_STREAM_Socket *socket) packet)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Placing DATA message with sequence %u in send", - "queue\n", - GNUNET_i2s (&socket->our_id), + "%x: Placing DATA message with sequence %u in send queue\n", + socket->our_id, (unsigned int) io_handle->messages[packet]->sequence_number); @@ -776,9 +782,8 @@ write_data (struct GNUNET_STREAM_Socket *socket) socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Placing DATA message with sequence %u in send", - "queue\n", - GNUNET_i2s (&socket->our_id), + "%x: Placing DATA message with sequence %u in send queue\n", + socket->our_id, (unsigned int) io_handle->messages[packet]->sequence_number); copy_and_queue_message (socket, @@ -812,7 +817,7 @@ call_read_processor (void *cls, uint32_t sequence_increase; uint32_t offset_increase; - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; @@ -909,10 +914,10 @@ read_io_timeout (void *cls, struct GNUNET_STREAM_Socket *socket = cls; socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } GNUNET_assert (NULL != socket->read_handle); @@ -952,6 +957,14 @@ handle_data (struct GNUNET_STREAM_Socket *socket, return GNUNET_SYSERR; } + if (GNUNET_PEER_search (sender) != socket->other_peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received DATA from non-confirming peer\n", + socket->our_id); + return GNUNET_YES; + } + switch (socket->state) { case STATE_ESTABLISHED: @@ -965,8 +978,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket, if ( relative_sequence_number > 64) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Ignoring received message with sequence number %u\n", - GNUNET_i2s (&socket->our_id), + "%x: Ignoring received message with sequence number %u\n", + socket->our_id, ntohl (msg->sequence_number)); return GNUNET_YES; } @@ -987,7 +1000,9 @@ handle_data (struct GNUNET_STREAM_Socket *socket, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cannot accommodate packet %d as buffer is full\n", + "%x: Cannot accommodate packet %d as buffer is", + "full\n", + socket->our_id, ntohl (msg->sequence_number)); return GNUNET_YES; } @@ -1019,12 +1034,12 @@ handle_data (struct GNUNET_STREAM_Socket *socket, if ((NULL != socket->read_handle) /* A read handle is waiting */ /* There is no current read task */ - && (GNUNET_SCHEDULER_NO_TASK == socket->read_task) + && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) /* We have the first packet */ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0))) { - socket->read_task = + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, socket); } @@ -1033,8 +1048,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket, default: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Received data message when it cannot be handled\n", - GNUNET_i2s (&socket->our_id)); + "%x: Received data message when it cannot be handled\n", + socket->our_id); break; } return GNUNET_YES; @@ -1081,8 +1096,8 @@ set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Attaining ESTABLISHED state\n", - GNUNET_i2s (&socket->our_id)); + "%x: Attaining ESTABLISHED state\n", + socket->our_id); socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; @@ -1103,8 +1118,8 @@ set_state_hello_wait (void *cls, { GNUNET_assert (STATE_INIT == socket->state); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Attaining HELLO_WAIT state\n", - GNUNET_i2s (&socket->our_id)); + "%x: Attaining HELLO_WAIT state\n", + socket->our_id); socket->state = STATE_HELLO_WAIT; } @@ -1133,13 +1148,18 @@ client_handle_hello_ack (void *cls, const struct GNUNET_STREAM_HelloAckMessage *ack_msg; struct GNUNET_STREAM_HelloAckMessage *reply; - GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == - ntohs (message->type)); + if (GNUNET_PEER_search (sender) != socket->other_peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received HELLO_ACK from non-confirming peer\n", + socket->our_id); + return GNUNET_YES; + } ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Received HELLO_ACK from %s\n", - GNUNET_i2s (&socket->our_id), - GNUNET_i2s (sender)); + "%x: Received HELLO_ACK from %x\n", + socket->our_id, + socket->other_peer); GNUNET_assert (socket->tunnel == tunnel); switch (socket->state) @@ -1147,16 +1167,16 @@ client_handle_hello_ack (void *cls, case STATE_HELLO_WAIT: socket->read_sequence_number = ntohl (ack_msg->sequence_number); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Read sequence number %u\n", - GNUNET_i2s (&socket->our_id), + "%x: Read sequence number %u\n", + socket->our_id, (unsigned int) socket->read_sequence_number); socket->receive_window_available = ntohl (ack_msg->receive_window_size); /* Get the random sequence number */ socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Generated write sequence number %u\n", - GNUNET_i2s (&socket->our_id), + "%x: Generated write sequence number %u\n", + socket->our_id, (unsigned int) socket->write_sequence_number); reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); @@ -1178,8 +1198,9 @@ client_handle_hello_ack (void *cls, case STATE_INIT: default: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Server sent HELLO_ACK when in state %d\n", - GNUNET_i2s (&socket->our_id), + "%x: Server %x sent HELLO_ACK when in state %d\n", + socket->our_id, + socket->other_peer, socket->state); socket->state = STATE_CLOSED; // introduce STATE_ERROR? return GNUNET_SYSERR; @@ -1471,13 +1492,21 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; struct GNUNET_STREAM_HelloAckMessage *reply; + if (GNUNET_PEER_search (sender) != socket->other_peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received HELLO from non-confirming peer\n", + socket->our_id); + return GNUNET_YES; + } + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Received HELLO from %s\n", - GNUNET_i2s (&socket->our_id), - GNUNET_i2s(sender)); + "%x: Received HELLO from %x\n", + socket->our_id, + socket->other_peer); /* Catch possible protocol breaks */ GNUNET_break_op (0 == memcmp (&socket->other_peer, @@ -1490,8 +1519,8 @@ server_handle_hello (void *cls, socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Generated write sequence number %u\n", - GNUNET_i2s (&socket->our_id), + "%x: Generated write sequence number %u\n", + socket->our_id, (unsigned int) socket->write_sequence_number); reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); @@ -1547,8 +1576,8 @@ server_handle_hello_ack (void *cls, { socket->read_sequence_number = ntohl (ack_message->sequence_number); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Read sequence number %u\n", - GNUNET_i2s (&socket->our_id), + "%x: Read sequence number %u\n", + socket->our_id, (unsigned int) socket->read_sequence_number); socket->receive_window_available = ntohl (ack_message->receive_window_size); @@ -1773,13 +1802,22 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, unsigned int packet; int need_retransmission; + if (GNUNET_PEER_search (sender) != socket->other_peer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Received ACK from non-confirming peer\n", + socket->our_id); + return GNUNET_YES; + } + switch (socket->state) { case (STATE_ESTABLISHED): if (NULL == socket->write_handle) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK when write_handle is NULL\n"); + "%x: Received DATA_ACK when write_handle is NULL\n", + socket->our_id); return GNUNET_OK; } @@ -1787,9 +1825,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, - htonl (ack->base_sequence_number)) < 64)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Received DATA_ACK with unexpected base sequence", + "%x: Received DATA_ACK with unexpected base sequence", "number\n", - GNUNET_i2s (&socket->our_id)); + socket->our_id); return GNUNET_OK; } /* FIXME: include the case when write_handle is cancelled - ignore the @@ -1974,23 +2012,23 @@ mesh_peer_connect_callback (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; struct GNUNET_STREAM_MessageHeader *message; + GNUNET_PEER_Id connected_peer; - if (0 != memcmp (&socket->other_peer, - peer, - sizeof (struct GNUNET_PeerIdentity))) + connected_peer = GNUNET_PEER_search (peer); + + if (connected_peer != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: A peer (%s) which is not our target has connected", - "to our tunnel", - GNUNET_i2s (&socket->our_id), - GNUNET_i2s (peer)); + "%x: A peer which is not our target has connected", + "to our tunnel\n", + socket->our_id); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Target peer %s connected\n", - GNUNET_i2s (&(socket->our_id)), - GNUNET_i2s (&(socket->other_peer))); + "%x: Target peer %x connected\n", + socket->our_id, + connected_peer); /* Set state to INIT */ socket->state = STATE_INIT; @@ -2046,24 +2084,27 @@ new_tunnel_notify (void *cls, struct GNUNET_STREAM_ListenSocket *lsocket = cls; struct GNUNET_STREAM_Socket *socket; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Peer %s initiated tunnel to us\n", - GNUNET_i2s (&lsocket->our_id), - GNUNET_i2s (initiator)); + /* FIXME: If a tunnel is already created, we should not accept new tunnels + from the same peer again until the socket is closed */ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + socket->other_peer = GNUNET_PEER_intern (initiator); socket->tunnel = tunnel; socket->session_id = 0; /* FIXME */ - socket->other_peer = *initiator; socket->state = STATE_INIT; socket->derived = GNUNET_YES; socket->our_id = lsocket->our_id; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Peer %x initiated tunnel to us\n", + socket->our_id, + socket->other_peer); + /* FIXME: Copy MESH handle from lsocket to socket */ - + /* FIXME: What if listen_cb is NULL */ if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, socket, - &socket->other_peer)) + initiator)) { socket->state = STATE_CLOSED; /* FIXME: Send CLOSE message and then free */ @@ -2094,11 +2135,12 @@ tunnel_cleaner (void *cls, struct GNUNET_STREAM_Socket *socket = tunnel_ctx; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Peer %s has terminated connection abruptly\n", - GNUNET_i2s (&socket->our_id), - GNUNET_i2s (&socket->other_peer)); + "%x: Peer %x has terminated connection abruptly\n", + socket->our_id, + socket->other_peer); socket->status = GNUNET_STREAM_SHUTDOWN; + /* Clear Transmit handles */ if (NULL != socket->transmit_handle) { @@ -2136,6 +2178,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, ...) { struct GNUNET_STREAM_Socket *socket; + struct GNUNET_PeerIdentity own_peer_id; enum GNUNET_STREAM_Option option; va_list vargs; /* Variable arguments */ @@ -2143,11 +2186,12 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, "%s\n", __func__); socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); - socket->other_peer = *target; + socket->other_peer = GNUNET_PEER_intern (target); socket->open_cb = open_cb; socket->open_cls = open_cb_cls; - GNUNET_TESTING_get_peer_identity (cfg, &socket->our_id); - + GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id); + socket->our_id = GNUNET_PEER_intern (&own_peer_id); + /* Set defaults */ socket->retransmit_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); @@ -2222,12 +2266,12 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) { struct MessageQueue *head; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { /* socket closed with read task pending!? */ GNUNET_break (0); - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } /* Clear Transmit handles */ @@ -2288,12 +2332,14 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; + struct GNUNET_PeerIdentity our_peer_id; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); lsocket->port = app_port; lsocket->listen_cb = listen_cb; lsocket->listen_cb_cls = listen_cb_cls; - GNUNET_TESTING_get_peer_identity (cfg, &lsocket->our_id); + GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id); + lsocket->our_id = GNUNET_PEER_intern (&our_peer_id); lsocket->mesh = GNUNET_MESH_connect (cfg, 10, /* FIXME: QUEUE size as parameter? */ lsocket, /* Closure */ @@ -2364,9 +2410,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, || (STATE_RECEIVE_CLOSED == socket->state))) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%s: Attempting to write on a closed (OR) not-yet-established" + "%x: Attempting to write on a closed (OR) not-yet-established" "stream\n", - GNUNET_i2s (&socket->our_id)); + socket->our_id); return NULL; } if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) @@ -2453,8 +2499,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 0)) { - socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor, - socket); + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); } -- 2.25.1