*/
/* TODO:
- * Copy MESH handle from lsocket to socket
*
* Checks for matching the sender and socket->other_peer in server
- * message handlers */
+ * message handlers
+ *
+ * Decrement PEER intern count during socket close and listen close to free the
+ * memory used for PEER interning
+ **/
/**
* @file stream/stream_api.c
*/
struct GNUNET_STREAM_Socket
{
- /**
- * The peer identity of the peer at the other end of the stream
- */
- struct GNUNET_PeerIdentity other_peer;
-
- /**
- * Our Peer Identity (for debugging)
- */
- struct GNUNET_PeerIdentity our_id;
-
/**
* Retransmission timeout
*/
*/
struct GNUNET_TIME_Relative ack_time_deadline;
- /**
- * The task for sending timely Acks
- */
- GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
-
- /**
- * Task scheduled to continue a read operation.
- */
- GNUNET_SCHEDULER_TaskIdentifier read_task;
-
/**
* The mesh handle
*/
*/
GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+ /**
+ * The task for sending timely Acks
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
+
/**
* The state of the protocol associated with this socket
*/
*/
unsigned int derived;
+ /**
+ * The peer identity of the peer at the other end of the stream
+ */
+ GNUNET_PEER_Id other_peer;
+
+ /**
+ * Our Peer Identity (for debugging)
+ */
+ GNUNET_PEER_Id our_id;
+
/**
* The application port number (type: uint32_t)
*/
*/
struct GNUNET_STREAM_ListenSocket
{
-
- /**
- * Our Peer's identity
- */
- struct GNUNET_PeerIdentity our_id;
-
/**
* The mesh handle
*/
*/
void *listen_cb_cls;
+ /**
+ * Our interned Peer's identity
+ */
+ GNUNET_PEER_Id our_id;
+
/**
* The service port
* FIXME: Remove if not required!
*/
static unsigned int default_timeout = 10;
-
/**
* Callback function for sending hello message
*
send_message_notify (void *cls, size_t size, void *buf)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_PeerIdentity target;
struct MessageQueue *head;
size_t ret;
head = socket->queue_head;
if (NULL == head)
return 0; /* just to be safe */
+ GNUNET_PEER_resolve (socket->other_peer, &target);
if (0 == size) /* request timed out */
{
socket->retries++;
1, /* Priority */
/* FIXME: exponential backoff */
socket->retransmit_timeout,
- &socket->other_peer,
+ &target,
ntohs (head->message->header.size),
&send_message_notify,
socket);
1, /* Priority */
/* FIXME: exponential backoff */
socket->retransmit_timeout,
- &socket->other_peer,
+ &target,
ntohs (head->message->header.size),
&send_message_notify,
socket);
void *finish_cb_cls)
{
struct MessageQueue *queue_entity;
+ struct GNUNET_PeerIdentity target;
GNUNET_assert
((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
&& (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Queueing message of type %d and size %d\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Queueing message of type %d and size %d\n",
+ socket->our_id,
ntohs (message->header.type),
ntohs (message->header.size));
GNUNET_assert (NULL != message);
if (NULL == socket->transmit_handle)
{
socket->retries = 0;
+ GNUNET_PEER_resolve (socket->other_peer, &target);
socket->transmit_handle =
GNUNET_MESH_notify_transmit_ready (socket->tunnel,
0, /* Corking */
1, /* Priority */
socket->retransmit_timeout,
- &socket->other_peer,
+ &target,
ntohs (message->header.size),
&send_message_notify,
socket);
return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->our_id));
+ "%x: Retransmitting DATA...\n", socket->our_id);
socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
{
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_AckMessage *ack_msg;
+ struct GNUNET_PeerIdentity target;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
{
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
+ GNUNET_PEER_resolve (socket->other_peer, &target);
/* Request MESH for sending ACK */
GNUNET_MESH_notify_transmit_ready (socket->tunnel,
0, /* Corking */
1, /* Priority */
socket->retransmit_timeout,
- &socket->other_peer,
+ &target,
ntohs (ack_msg->header.header.size),
&send_ack_notify,
ack_msg);
packet))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Placing DATA message with sequence %u in send",
- "queue\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Placing DATA message with sequence %u in send queue\n",
+ socket->our_id,
(unsigned int)
io_handle->messages[packet]->sequence_number);
socket->receive_window_available -=
ntohs (io_handle->messages[packet]->header.header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Placing DATA message with sequence %u in send",
- "queue\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Placing DATA message with sequence %u in send queue\n",
+ socket->our_id,
(unsigned int)
io_handle->messages[packet]->sequence_number);
copy_and_queue_message (socket,
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
struct GNUNET_STREAM_Socket *socket = cls;
socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (socket->read_task);
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_assert (NULL != socket->read_handle);
return GNUNET_SYSERR;
}
+ if (GNUNET_PEER_search (sender) != socket->other_peer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Received DATA from non-confirming peer\n",
+ socket->our_id);
+ return GNUNET_YES;
+ }
+
switch (socket->state)
{
case STATE_ESTABLISHED:
if ( relative_sequence_number > 64)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Ignoring received message with sequence number %u\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Ignoring received message with sequence number %u\n",
+ socket->our_id,
ntohl (msg->sequence_number));
return GNUNET_YES;
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cannot accommodate packet %d as buffer is full\n",
+ "%x: Cannot accommodate packet %d as buffer is",
+ "full\n",
+ socket->our_id,
ntohl (msg->sequence_number));
return GNUNET_YES;
}
if ((NULL != socket->read_handle) /* A read handle is waiting */
/* There is no current read task */
- && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
/* We have the first packet */
&& (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
0)))
{
- socket->read_task =
+ socket->read_task_id =
GNUNET_SCHEDULER_add_now (&call_read_processor,
socket);
}
default:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received data message when it cannot be handled\n",
- GNUNET_i2s (&socket->our_id));
+ "%x: Received data message when it cannot be handled\n",
+ socket->our_id);
break;
}
return GNUNET_YES;
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Attaining ESTABLISHED state\n",
- GNUNET_i2s (&socket->our_id));
+ "%x: Attaining ESTABLISHED state\n",
+ socket->our_id);
socket->write_offset = 0;
socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
{
GNUNET_assert (STATE_INIT == socket->state);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Attaining HELLO_WAIT state\n",
- GNUNET_i2s (&socket->our_id));
+ "%x: Attaining HELLO_WAIT state\n",
+ socket->our_id);
socket->state = STATE_HELLO_WAIT;
}
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
- GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
- ntohs (message->type));
+ if (GNUNET_PEER_search (sender) != socket->other_peer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Received HELLO_ACK from non-confirming peer\n",
+ socket->our_id);
+ return GNUNET_YES;
+ }
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO_ACK from %s\n",
- GNUNET_i2s (&socket->our_id),
- GNUNET_i2s (sender));
+ "%x: Received HELLO_ACK from %x\n",
+ socket->our_id,
+ socket->other_peer);
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
case STATE_HELLO_WAIT:
socket->read_sequence_number = ntohl (ack_msg->sequence_number);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read sequence number %u\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Read sequence number %u\n",
+ socket->our_id,
(unsigned int) socket->read_sequence_number);
socket->receive_window_available = ntohl (ack_msg->receive_window_size);
/* Get the random sequence number */
socket->write_sequence_number =
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Generated write sequence number %u\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Generated write sequence number %u\n",
+ socket->our_id,
(unsigned int) socket->write_sequence_number);
reply =
GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
case STATE_INIT:
default:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Server sent HELLO_ACK when in state %d\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Server %x sent HELLO_ACK when in state %d\n",
+ socket->our_id,
+ socket->other_peer,
socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
return GNUNET_SYSERR;
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
struct GNUNET_STREAM_HelloAckMessage *reply;
+ if (GNUNET_PEER_search (sender) != socket->other_peer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Received HELLO from non-confirming peer\n",
+ socket->our_id);
+ return GNUNET_YES;
+ }
+
GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO from %s\n",
- GNUNET_i2s (&socket->our_id),
- GNUNET_i2s(sender));
+ "%x: Received HELLO from %x\n",
+ socket->our_id,
+ socket->other_peer);
/* Catch possible protocol breaks */
GNUNET_break_op (0 == memcmp (&socket->other_peer,
socket->write_sequence_number =
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Generated write sequence number %u\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Generated write sequence number %u\n",
+ socket->our_id,
(unsigned int) socket->write_sequence_number);
reply =
GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
{
socket->read_sequence_number = ntohl (ack_message->sequence_number);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read sequence number %u\n",
- GNUNET_i2s (&socket->our_id),
+ "%x: Read sequence number %u\n",
+ socket->our_id,
(unsigned int) socket->read_sequence_number);
socket->receive_window_available =
ntohl (ack_message->receive_window_size);
unsigned int packet;
int need_retransmission;
+ if (GNUNET_PEER_search (sender) != socket->other_peer)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Received ACK from non-confirming peer\n",
+ socket->our_id);
+ return GNUNET_YES;
+ }
+
switch (socket->state)
{
case (STATE_ESTABLISHED):
if (NULL == socket->write_handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA_ACK when write_handle is NULL\n");
+ "%x: Received DATA_ACK when write_handle is NULL\n",
+ socket->our_id);
return GNUNET_OK;
}
- htonl (ack->base_sequence_number)) < 64))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA_ACK with unexpected base sequence",
+ "%x: Received DATA_ACK with unexpected base sequence",
"number\n",
- GNUNET_i2s (&socket->our_id));
+ socket->our_id);
return GNUNET_OK;
}
/* FIXME: include the case when write_handle is cancelled - ignore the
{
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_MessageHeader *message;
+ GNUNET_PEER_Id connected_peer;
- if (0 != memcmp (&socket->other_peer,
- peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ connected_peer = GNUNET_PEER_search (peer);
+
+ if (connected_peer != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: A peer (%s) which is not our target has connected",
- "to our tunnel",
- GNUNET_i2s (&socket->our_id),
- GNUNET_i2s (peer));
+ "%x: A peer which is not our target has connected",
+ "to our tunnel\n",
+ socket->our_id);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Target peer %s connected\n",
- GNUNET_i2s (&(socket->our_id)),
- GNUNET_i2s (&(socket->other_peer)));
+ "%x: Target peer %x connected\n",
+ socket->our_id,
+ connected_peer);
/* Set state to INIT */
socket->state = STATE_INIT;
struct GNUNET_STREAM_ListenSocket *lsocket = cls;
struct GNUNET_STREAM_Socket *socket;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Peer %s initiated tunnel to us\n",
- GNUNET_i2s (&lsocket->our_id),
- GNUNET_i2s (initiator));
+ /* FIXME: If a tunnel is already created, we should not accept new tunnels
+ from the same peer again until the socket is closed */
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->other_peer = GNUNET_PEER_intern (initiator);
socket->tunnel = tunnel;
socket->session_id = 0; /* FIXME */
- socket->other_peer = *initiator;
socket->state = STATE_INIT;
socket->derived = GNUNET_YES;
socket->our_id = lsocket->our_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Peer %x initiated tunnel to us\n",
+ socket->our_id,
+ socket->other_peer);
+
/* FIXME: Copy MESH handle from lsocket to socket */
-
+ /* FIXME: What if listen_cb is NULL */
if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
socket,
- &socket->other_peer))
+ initiator))
{
socket->state = STATE_CLOSED;
/* FIXME: Send CLOSE message and then free */
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Peer %s has terminated connection abruptly\n",
- GNUNET_i2s (&socket->our_id),
- GNUNET_i2s (&socket->other_peer));
+ "%x: Peer %x has terminated connection abruptly\n",
+ socket->our_id,
+ socket->other_peer);
socket->status = GNUNET_STREAM_SHUTDOWN;
+
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
...)
{
struct GNUNET_STREAM_Socket *socket;
+ struct GNUNET_PeerIdentity own_peer_id;
enum GNUNET_STREAM_Option option;
va_list vargs; /* Variable arguments */
"%s\n", __func__);
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
- socket->other_peer = *target;
+ socket->other_peer = GNUNET_PEER_intern (target);
socket->open_cb = open_cb;
socket->open_cls = open_cb_cls;
- GNUNET_TESTING_get_peer_identity (cfg, &socket->our_id);
-
+ GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
+ socket->our_id = GNUNET_PEER_intern (&own_peer_id);
+
/* Set defaults */
socket->retransmit_timeout =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
{
struct MessageQueue *head;
- if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
/* socket closed with read task pending!? */
GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task);
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
/* Clear Transmit handles */
{
/* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
+ struct GNUNET_PeerIdentity our_peer_id;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
- GNUNET_TESTING_get_peer_identity (cfg, &lsocket->our_id);
+ GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
+ lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
lsocket->mesh = GNUNET_MESH_connect (cfg,
10, /* FIXME: QUEUE size as parameter? */
lsocket, /* Closure */
|| (STATE_RECEIVE_CLOSED == socket->state)))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%s: Attempting to write on a closed (OR) not-yet-established"
+ "%x: Attempting to write on a closed (OR) not-yet-established"
"stream\n",
- GNUNET_i2s (&socket->our_id));
+ socket->our_id);
return NULL;
}
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
0))
{
- socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
+ socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
}