remvod verbose debugging in stream api; fixed warning in stream_big test case
[oweals/gnunet.git] / src / stream / stream_api.c
index e8ba24966ccf21a7218d77416743eaf92805f35e..1dc1ba80e6c7ee959587aa9d2370ab238c4c0a4b 100644 (file)
@@ -23,9 +23,6 @@
  * Checks for matching the sender and socket->other_peer in server
  * message handlers  
  *
- * Decrement PEER intern count during socket close and listen close to free the
- * memory used for PEER interning
- *
  * Add code for write io timeout
  *
  * Include retransmission for control messages
@@ -42,7 +39,6 @@
 #include "gnunet_common.h"
 #include "gnunet_crypto_lib.h"
 #include "gnunet_stream_lib.h"
-#include "gnunet_testing_lib.h"
 #include "stream_protocol.h"
 
 #define LOG(kind,...)                                   \
@@ -51,7 +47,7 @@
 /**
  * The maximum packet size of a stream packet
  */
-#define MAX_PACKET_SIZE 64000
+#define MAX_PACKET_SIZE 512//64000
 
 /**
  * Receive buffer
@@ -61,7 +57,7 @@
 /**
  * The maximum payload a data message packet can carry
  */
-static size_t max_payload_size = 
+static const size_t max_payload_size = 
   MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
 
 /**
@@ -259,6 +255,11 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_STREAM_ListenSocket *lsocket;
 
+  /**
+   * The peer identity of the peer at the other end of the stream
+   */
+  struct GNUNET_PeerIdentity other_peer;
+
   /**
    * Task identifier for the read io timeout task
    */
@@ -294,11 +295,6 @@ struct GNUNET_STREAM_Socket
    */
   unsigned int retries;
 
-  /**
-   * The peer identity of the peer at the other end of the stream
-   */
-  GNUNET_PEER_Id other_peer;
-
   /**
    * The application port number (type: uint32_t)
    */
@@ -486,7 +482,6 @@ static size_t
 send_message_notify (void *cls, size_t size, void *buf)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
-  struct GNUNET_PeerIdentity target;
   struct MessageQueue *head;
   size_t ret;
 
@@ -494,12 +489,12 @@ send_message_notify (void *cls, size_t size, void *buf)
   head = socket->queue_head;
   if (NULL == head)
     return 0; /* just to be safe */
-  GNUNET_PEER_resolve (socket->other_peer, &target);
   if (0 == size)                /* request timed out */
   {
     socket->retries++;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Message sending timed out. Retry %d \n",
+         "%s: Message sending timed out. Retry %d \n",
+         GNUNET_i2s (&socket->other_peer),
          socket->retries);
     socket->transmit_handle = 
       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
@@ -507,7 +502,7 @@ send_message_notify (void *cls, size_t size, void *buf)
                                          1, /* Priority */
                                          /* FIXME: exponential backoff */
                                          socket->retransmit_timeout,
-                                         &target,
+                                         &socket->other_peer,
                                          ntohs (head->message->header.size),
                                          &send_message_notify,
                                          socket);
@@ -536,7 +531,7 @@ send_message_notify (void *cls, size_t size, void *buf)
                                          1, /* Priority */
                                          /* FIXME: exponential backoff */
                                          socket->retransmit_timeout,
-                                         &target,
+                                         &socket->other_peer,
                                          ntohs (head->message->header.size),
                                          &send_message_notify,
                                          socket);
@@ -560,14 +555,14 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
                void *finish_cb_cls)
 {
   struct MessageQueue *queue_entity;
-  struct GNUNET_PeerIdentity target;
 
   GNUNET_assert 
     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing message of type %d and size %d\n",
+       "%s: Queueing message of type %d and size %d\n",
+       GNUNET_i2s (&socket->other_peer),
        ntohs (message->header.type),
        ntohs (message->header.size));
   GNUNET_assert (NULL != message);
@@ -581,13 +576,12 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
   if (NULL == socket->transmit_handle)
   {
     socket->retries = 0;
-    GNUNET_PEER_resolve (socket->other_peer, &target);
     socket->transmit_handle = 
       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
                                         0, /* Corking */
                                         1, /* Priority */
                                         socket->retransmit_timeout,
-                                        &target,
+                                        &socket->other_peer,
                                         ntohs (message->header.size),
                                         &send_message_notify,
                                         socket);
@@ -676,7 +670,7 @@ retransmission_timeout_task (void *cls,
     return;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Retransmitting DATA...\n");
+       "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
@@ -694,15 +688,12 @@ ack_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_AckMessage *ack_msg;
-  struct GNUNET_PeerIdentity target;
 
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
   {
     return;
   }
-
   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
-
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
   ack_msg->header.header.size = htons (sizeof (struct 
@@ -712,16 +703,14 @@ ack_task (void *cls,
   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
   ack_msg->receive_window_remaining = 
     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
-
   socket->ack_msg = ack_msg;
-  GNUNET_PEER_resolve (socket->other_peer, &target);
   /* Request MESH for sending ACK */
   socket->ack_transmit_handle = 
     GNUNET_MESH_notify_transmit_ready (socket->tunnel,
                                        0, /* Corking */
                                        1, /* Priority */
                                        socket->retransmit_timeout,
-                                       &target,
+                                       &socket->other_peer,
                                        ntohs (ack_msg->header.header.size),
                                        &send_ack_notify,
                                        socket);
@@ -838,9 +827,9 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                                            packet))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Placing DATA message with sequence %u in send queue\n",
+           "%s: Placing DATA message with sequence %u in send queue\n",
+           GNUNET_i2s (&socket->other_peer),
            ntohl (io_handle->messages[packet]->sequence_number));
-
       copy_and_queue_message (socket,
                               &io_handle->messages[packet]->header,
                               NULL,
@@ -851,12 +840,14 @@ write_data (struct GNUNET_STREAM_Socket *socket)
   /* Now send new packets if there is enough buffer space */
   while ( (NULL != io_handle->messages[packet]) &&
          (socket->receiver_window_available 
-           >= ntohs (io_handle->messages[packet]->header.header.size)) )
+           >= ntohs (io_handle->messages[packet]->header.header.size)) &&
+          (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
   {
     socket->receiver_window_available -= 
       ntohs (io_handle->messages[packet]->header.header.size);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Placing DATA message with sequence %u in send queue\n",
+         "%s: Placing DATA message with sequence %u in send queue\n",
+         GNUNET_i2s (&socket->other_peer),
          ntohl (io_handle->messages[packet]->sequence_number));
     copy_and_queue_message (socket,
                             &io_handle->messages[packet]->header,
@@ -864,7 +855,6 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                             NULL);
     packet++;
   }
-
   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
     socket->retransmission_timeout_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
@@ -910,37 +900,32 @@ call_read_processor (void *cls,
   }
   /* We only call read processor if we have the first packet */
   GNUNET_assert (0 < packet);
-
   valid_read_size = 
     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
-
   GNUNET_assert (0 != valid_read_size);
-
   /* Cancel the read_io_timeout_task */
   GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
   socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
-
   /* Call the data processor */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Calling read processor\n");
+       "%s: Calling read processor\n",
+       GNUNET_i2s (&socket->other_peer));
   read_size = 
     socket->read_handle->proc (socket->read_handle->proc_cls,
                                socket->status,
                                socket->receive_buffer + socket->copy_offset,
                                valid_read_size);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Read processor read %d bytes\n",
-       read_size);
+       "%s: Read processor read %d bytes\n",
+       GNUNET_i2s (&socket->other_peer), read_size);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Read processor completed successfully\n");
-
+       "%s: Read processor completed successfully\n",
+       GNUNET_i2s (&socket->other_peer));
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
   socket->read_handle = NULL;
-
   GNUNET_assert (read_size <= valid_read_size);
   socket->copy_offset += read_size;
-
   /* Determine upto which packet we can remove from the buffer */
   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
@@ -952,41 +937,45 @@ call_read_processor (void *cls,
 
   /* If no packets can be removed we can't move the buffer */
   if (0 == packet) return;
-
   sequence_increase = packet;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sequence increase after read processor completion: %u\n",
-       sequence_increase);
+       "%s: Sequence increase after read processor completion: %u\n",
+       GNUNET_i2s (&socket->other_peer), sequence_increase);
 
   /* Shift the data in the receive buffer */
-  memmove (socket->receive_buffer,
-           socket->receive_buffer 
-           + socket->receive_buffer_boundaries[sequence_increase-1],
-           socket->receive_buffer_size
-           - socket->receive_buffer_boundaries[sequence_increase-1]);
-  
+  socket->receive_buffer = 
+    memmove (socket->receive_buffer,
+            socket->receive_buffer 
+            + socket->receive_buffer_boundaries[sequence_increase-1],
+            socket->receive_buffer_size
+            - socket->receive_buffer_boundaries[sequence_increase-1]);
   /* Shift the bitmap */
   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
-  
   /* Set read_sequence_number */
   socket->read_sequence_number += sequence_increase;
-  
   /* Set read_offset */
   offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
   socket->read_offset += offset_increase;
-
   /* Fix copy_offset */
   GNUNET_assert (offset_increase <= socket->copy_offset);
   socket->copy_offset -= offset_increase;
-  
   /* Fix relative boundaries */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
     {
-      socket->receive_buffer_boundaries[packet] = 
-        socket->receive_buffer_boundaries[packet + sequence_increase] 
-        - offset_increase;
+      uint32_t ahead_buffer_boundary;
+
+      ahead_buffer_boundary = 
+       socket->receive_buffer_boundaries[packet + sequence_increase];
+      if (0 == ahead_buffer_boundary)
+       socket->receive_buffer_boundaries[packet] = 0;
+      else
+      {
+       GNUNET_assert (offset_increase < ahead_buffer_boundary);
+       socket->receive_buffer_boundaries[packet] = 
+         ahead_buffer_boundary - offset_increase;
+      }
     }
     else
       socket->receive_buffer_boundaries[packet] = 0;
@@ -1012,7 +1001,8 @@ read_io_timeout (void *cls,
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Read task timedout - Cancelling it\n");
+         "%s: Read task timedout - Cancelling it\n",
+         GNUNET_i2s (&socket->other_peer));
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
@@ -1061,10 +1051,13 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     return GNUNET_SYSERR;
   }
 
-  if (GNUNET_PEER_search (sender) != socket->other_peer)
+  if (0 != memcmp (sender,
+                   &socket->other_peer,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received DATA from non-confirming peer\n");
+         "%s: Received DATA from non-confirming peer\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
 
@@ -1081,7 +1074,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Ignoring received message with sequence number %u\n",
+           "%s: Ignoring received message with sequence number %u\n",
+           GNUNET_i2s (&socket->other_peer),
            ntohl (msg->sequence_number));
       /* Start ACK sending task if one is not already present */
       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
@@ -1100,8 +1094,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                             relative_sequence_number))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Ignoring already received message with sequence "
-           "number %u\n",
+           "%s: Ignoring already received message with sequence number %u\n",
+           GNUNET_i2s (&socket->other_peer),
            ntohl (msg->sequence_number));
       /* Start ACK sending task if one is not already present */
       if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
@@ -1116,11 +1110,12 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     }
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving DATA with sequence number: %u and size: %d from %x\n",
+         "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
+         GNUNET_i2s (&socket->other_peer),
          ntohl (msg->sequence_number),
          ntohs (msg->header.header.size),
-         socket->other_peer);
-
+         GNUNET_i2s (&socket->other_peer));
+      
     /* Check if we have to allocate the buffer */
     size -= sizeof (struct GNUNET_STREAM_DataMessage);
     relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1136,7 +1131,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       else
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Cannot accommodate packet %d as buffer is full\n",
+             "%s: Cannot accommodate packet %d as buffer is full\n",
+             GNUNET_i2s (&socket->other_peer),
              ntohl (msg->sequence_number));
         return GNUNET_YES;
       }
@@ -1174,8 +1170,9 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                                0)))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Scheduling read processor\n");
-
+           "%s: Scheduling read processor\n",
+           GNUNET_i2s (&socket->other_peer));
+          
       socket->read_task_id = 
         GNUNET_SCHEDULER_add_now (&call_read_processor,
                                   socket);
@@ -1185,7 +1182,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
 
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received data message when it cannot be handled\n");
+         "%s: Received data message when it cannot be handled\n",
+         GNUNET_i2s (&socket->other_peer));
     break;
   }
   return GNUNET_YES;
@@ -1232,23 +1230,22 @@ static void
 set_state_established (void *cls,
                        struct GNUNET_STREAM_Socket *socket)
 {
-  struct GNUNET_PeerIdentity initiator_pid;
-
   LOG (GNUNET_ERROR_TYPE_DEBUG, 
-       "Attaining ESTABLISHED state\n");
+       "%s: Attaining ESTABLISHED state\n",
+       GNUNET_i2s (&socket->other_peer));
   socket->write_offset = 0;
   socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
   /* FIXME: What if listen_cb is NULL */
   if (NULL != socket->lsocket)
   {
-    GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Calling listen callback\n");
+         "%s: Calling listen callback\n",
+         GNUNET_i2s (&socket->other_peer));
     if (GNUNET_SYSERR == 
         socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
                                     socket,
-                                    &initiator_pid))
+                                    &socket->other_peer))
     {
       socket->state = STATE_CLOSED;
       /* FIXME: We should close in a decent way */
@@ -1273,7 +1270,8 @@ set_state_hello_wait (void *cls,
 {
   GNUNET_assert (STATE_INIT == socket->state);
   LOG (GNUNET_ERROR_TYPE_DEBUG, 
-       "Attaining HELLO_WAIT state\n");
+       "%s: Attaining HELLO_WAIT state\n",
+       GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_HELLO_WAIT;
 }
 
@@ -1289,7 +1287,8 @@ set_state_close_wait (void *cls,
                       struct GNUNET_STREAM_Socket *socket)
 {
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Attaing CLOSE_WAIT state\n");
+       "%s: Attaing CLOSE_WAIT state\n",
+       GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_CLOSE_WAIT;
   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
   socket->receive_buffer = NULL;
@@ -1308,7 +1307,8 @@ set_state_receive_close_wait (void *cls,
                               struct GNUNET_STREAM_Socket *socket)
 {
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Attaing RECEIVE_CLOSE_WAIT state\n");
+       "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
+       GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_RECEIVE_CLOSE_WAIT;
   GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
   socket->receive_buffer = NULL;
@@ -1327,7 +1327,8 @@ set_state_transmit_close_wait (void *cls,
                                struct GNUNET_STREAM_Socket *socket)
 {
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Attaining TRANSMIT_CLOSE_WAIT state\n");
+       "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
+       GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_TRANSMIT_CLOSE_WAIT;
 }
 
@@ -1361,7 +1362,8 @@ generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
   socket->write_sequence_number = 
     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Generated write sequence number %u\n",
+       "%s: Generated write sequence number %u\n",
+       GNUNET_i2s (&socket->other_peer),
        (unsigned int) socket->write_sequence_number);
   
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
@@ -1399,16 +1401,20 @@ client_handle_hello_ack (void *cls,
   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
-  if (GNUNET_PEER_search (sender) != socket->other_peer)
+  if (0 != memcmp (sender,
+                   &socket->other_peer,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received HELLO_ACK from non-confirming peer\n");
+         "%s: Received HELLO_ACK from non-confirming peer\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received HELLO_ACK from %x\n",
-       socket->other_peer);
+       "%s: Received HELLO_ACK from %s\n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
 
   GNUNET_assert (socket->tunnel == tunnel);
   switch (socket->state)
@@ -1416,7 +1422,8 @@ client_handle_hello_ack (void *cls,
   case STATE_HELLO_WAIT:
     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Read sequence number %u\n",
+         "%s: Read sequence number %u\n",
+         GNUNET_i2s (&socket->other_peer),
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
     reply = generate_hello_ack_msg (socket);
@@ -1432,8 +1439,9 @@ client_handle_hello_ack (void *cls,
   case STATE_INIT:
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Server %x sent HELLO_ACK when in state %d\n", 
-         socket->other_peer,
+         "%s: Server %s sent HELLO_ACK when in state %d\n", 
+         GNUNET_i2s (&socket->other_peer),
+         GNUNET_i2s (&socket->other_peer),
          socket->state);
     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
     return GNUNET_SYSERR;
@@ -1565,7 +1573,8 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
   if (NULL == shutdown_handle)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received *CLOSE_ACK when shutdown handle is NULL\n");
+         "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   }
 
@@ -1578,18 +1587,22 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       if (SHUT_RDWR != shutdown_handle->operation)
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
+             "%s: Received CLOSE_ACK when shutdown handle is not for "
+             "SHUT_RDWR\n",
+             GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
 
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received CLOSE_ACK from %x\n",
-           socket->other_peer);
+           "%s: Received CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer),
+           GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received CLOSE_ACK when in it not expected\n");
+           "%s: Received CLOSE_ACK when in it not expected\n",
+           GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
     break;
@@ -1601,18 +1614,22 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       if (SHUT_RD != shutdown_handle->operation)
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
+             "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
+             "is not for SHUT_RD\n",
+             GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
 
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received RECEIVE_CLOSE_ACK from %x\n",
-           socket->other_peer);
+           "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer),
+           GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_RECEIVE_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received RECEIVE_CLOSE_ACK when in it not expected\n");
+           "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
+           GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
 
@@ -1624,18 +1641,22 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       if (SHUT_WR != shutdown_handle->operation)
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
+             "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
+             "is not for SHUT_WR\n",
+             GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
 
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received TRANSMIT_CLOSE_ACK from %x\n",
-           socket->other_peer);
+           "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer),
+           GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_TRANSMIT_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
+           "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
+           GNUNET_i2s (&socket->other_peer));
           
       return GNUNET_OK;
     }
@@ -1719,15 +1740,17 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
   case STATE_LISTEN:
   case STATE_HELLO_WAIT:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+         "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   default:
     break;
   }
   
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received RECEIVE_CLOSE from %x\n",
-       socket->other_peer);
+       "%s: Received RECEIVE_CLOSE from %s\n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
   receive_close_ack =
     GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   receive_close_ack->header.size =
@@ -1835,15 +1858,17 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
   case STATE_LISTEN:
   case STATE_HELLO_WAIT:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+         "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   default:
     break;
   }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received CLOSE from %x\n",
-       socket->other_peer);
+       "%s: Received CLOSE from %s\n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
@@ -1979,10 +2004,13 @@ server_handle_hello (void *cls,
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
-  if (GNUNET_PEER_search (sender) != socket->other_peer)
+  if (0 != memcmp (sender,
+                   &socket->other_peer,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received HELLO from non-confirming peer\n");
+         "%s: Received HELLO from non-confirming peer\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
 
@@ -1990,8 +2018,9 @@ server_handle_hello (void *cls,
                  ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received HELLO from %x\n", 
-       socket->other_peer);
+       "%s: Received HELLO from %s\n", 
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
 
   if (STATE_INIT == socket->state)
   {
@@ -2004,7 +2033,9 @@ server_handle_hello (void *cls,
   else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Client sent HELLO when in state %d\n", socket->state);
+         "%s: Client sent HELLO when in state %d\n", 
+         GNUNET_i2s (&socket->other_peer),
+         socket->state);
     /* FIXME: Send RESET? */
       
   }
@@ -2042,11 +2073,13 @@ server_handle_hello_ack (void *cls,
   if (STATE_HELLO_WAIT == socket->state)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received HELLO_ACK from %x\n",
-         socket->other_peer);
+         "%s: Received HELLO_ACK from %s\n",
+         GNUNET_i2s (&socket->other_peer),
+         GNUNET_i2s (&socket->other_peer));
     socket->read_sequence_number = ntohl (ack_message->sequence_number);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Read sequence number %u\n",
+         "%s: Read sequence number %u\n",
+         GNUNET_i2s (&socket->other_peer),
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = 
       ntohl (ack_message->receiver_window_size);
@@ -2300,10 +2333,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
   int need_retransmission;
   
 
-  if (GNUNET_PEER_search (sender) != socket->other_peer)
+  if (0 != memcmp (sender,
+                   &socket->other_peer,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received ACK from non-confirming peer\n");
+         "%s: Received ACK from non-confirming peer\n",
+         GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
 
@@ -2315,7 +2351,8 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     if (NULL == socket->write_handle)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received DATA_ACK when write_handle is NULL\n");
+           "%s: Received DATA_ACK when write_handle is NULL\n",
+           GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
     /* FIXME: increment in the base sequence number is breaking current flow
@@ -2324,9 +2361,11 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Received DATA_ACK with unexpected base sequence number\n");
+           "%s: Received DATA_ACK with unexpected base sequence number\n",
+           GNUNET_i2s (&socket->other_peer));
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Current write sequence: %u; Ack's base sequence: %u\n",
+           "%s: Current write sequence: %u; Ack's base sequence: %u\n",
+           GNUNET_i2s (&socket->other_peer),
            socket->write_sequence_number,
            ntohl (ack->base_sequence_number));
       return GNUNET_OK;
@@ -2335,8 +2374,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
        acks */
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received DATA_ACK from %x\n",
-         socket->other_peer);
+         "%s: Received DATA_ACK from %s\n",
+         GNUNET_i2s (&socket->other_peer),
+         GNUNET_i2s (&socket->other_peer));
       
     /* Cancel the retransmission task */
     if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
@@ -2388,21 +2428,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     }
     else      /* We have to call the write continuation callback now */
     {
+      struct GNUNET_STREAM_IOWriteHandle *write_handle;
+      
       /* Free the packets */
       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
       {
         GNUNET_free_non_null (socket->write_handle->messages[packet]);
       }
-      if (NULL != socket->write_handle->write_cont)
-        socket->write_handle->write_cont
-          (socket->write_handle->write_cont_cls,
-           socket->status,
-           socket->write_handle->size);
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Write completion callback completed\n");
-      /* We are done with the write handle - Freeing it */
-      GNUNET_free (socket->write_handle);
+      write_handle = socket->write_handle;
       socket->write_handle = NULL;
+      if (NULL != write_handle->write_cont)
+        write_handle->write_cont (write_handle->write_cont_cls,
+                                 socket->status,
+                                 write_handle->size);
+      /* We are done with the write handle - Freeing it */
+      GNUNET_free (write_handle);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "%s: Write completion callback completed\n",
+           GNUNET_i2s (&socket->other_peer));      
     }
     break;
   default:
@@ -2538,20 +2581,21 @@ mesh_peer_connect_callback (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_MessageHeader *message;
-  GNUNET_PEER_Id connected_peer;
-
-  connected_peer = GNUNET_PEER_search (peer);
   
-  if (connected_peer != socket->other_peer)
+  if (0 != memcmp (peer,
+                   &socket->other_peer,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "A peer which is not our target has connected to our tunnel\n");
+         "%s: A peer which is not our target has connected to our tunnel\n",
+         GNUNET_i2s(peer));
     return;
   }
   
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Target peer %x connected\n", 
-       connected_peer);
+       "%s: Target peer %s connected\n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
   
   /* Set state to INIT */
   socket->state = STATE_INIT;
@@ -2588,8 +2632,9 @@ mesh_peer_disconnect_callback (void *cls,
   
   /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Other peer %x disconnected\n",
-       socket->other_peer);
+       "%s: Other peer %s disconnected \n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
 }
 
 
@@ -2616,14 +2661,16 @@ new_tunnel_notify (void *cls,
      from the same peer again until the socket is closed */
 
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
-  socket->other_peer = GNUNET_PEER_intern (initiator);
+  socket->other_peer = *initiator;
   socket->tunnel = tunnel;
   socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
-  socket->lsocket = lsocket; 
+  socket->lsocket = lsocket;
+  
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Peer %x initiated tunnel to us\n", 
-       socket->other_peer);
+       "%s: Peer %s initiated tunnel to us\n", 
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
   
   /* FIXME: Copy MESH handle from lsocket to socket */
   
@@ -2655,8 +2702,9 @@ tunnel_cleaner (void *cls,
 
   GNUNET_break_op(0);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Peer %x has terminated connection abruptly\n",
-       socket->other_peer);
+       "%s: Peer %s has terminated connection abruptly\n",
+       GNUNET_i2s (&socket->other_peer),
+       GNUNET_i2s (&socket->other_peer));
 
   socket->status = GNUNET_STREAM_SHUTDOWN;
 
@@ -2722,15 +2770,13 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
-
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
-  socket->other_peer = GNUNET_PEER_intern (target);
+  socket->other_peer = *target;
   socket->open_cb = open_cb;
   socket->open_cls = open_cb_cls;
   /* Set defaults */
   socket->retransmit_timeout = 
     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
-
   va_start (vargs, open_cb_cls); /* Parse variable args */
   do {
     option = va_arg (vargs, enum GNUNET_STREAM_Option);
@@ -2747,7 +2793,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   } while (GNUNET_STREAM_OPTION_END != option);
   va_end (vargs);               /* End of variable args parsing */
   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
-                                      10,  /* QUEUE size as parameter? */
+                                      RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
                                       NULL, /* No in-tunnel cleaner */
@@ -2769,7 +2815,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                               socket);
   GNUNET_assert (NULL != socket->tunnel);
   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
-                                        target);
+                                        &socket->other_peer);
   
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s() END\n", __func__);
@@ -2889,8 +2935,18 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
 {
   struct MessageQueue *head;
 
-  GNUNET_break (NULL == socket->read_handle);
-  GNUNET_break (NULL == socket->write_handle);
+  if (NULL != socket->read_handle)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+        "Closing STREAM socket when a read handle is pending\n");
+  }
+  if (NULL != socket->write_handle)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+        "Closing STREAM socket when a write handle is pending\n");
+    GNUNET_STREAM_io_write_cancel (socket->write_handle);
+    //socket->write_handle = NULL;
+  }
 
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
@@ -2979,7 +3035,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   lsocket->listen_cb = listen_cb;
   lsocket->listen_cb_cls = listen_cb_cls;
   lsocket->mesh = GNUNET_MESH_connect (cfg,
-                                       10, /* FIXME: QUEUE size as parameter? */
+                                       RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
                                        lsocket, /* Closure */
                                        &new_tunnel_notify,
                                        &tunnel_cleaner,
@@ -3155,7 +3211,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   struct GNUNET_STREAM_IOReadHandle *read_handle;
   
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s()\n", 
+       "%s: %s()\n", 
+       GNUNET_i2s (&socket->other_peer),
        __func__);
 
   /* Return NULL if there is already a read handle; the user has to cancel that
@@ -3172,7 +3229,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   case STATE_CLOSE_WAIT:
     proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s() END\n",
+         "%s: %s() END\n",
+         GNUNET_i2s (&socket->other_peer),
          __func__);
     return NULL;
   default:
@@ -3199,7 +3257,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
                                   &read_io_timeout,
                                   socket);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s() END\n",
+       "%s: %s() END\n",
+       GNUNET_i2s (&socket->other_peer),
        __func__);
   return read_handle;
 }