regex profiler cleanup
[oweals/gnunet.git] / src / stream / stream_api.c
index e27f4df1fab00f96498986d78084f25ed30286e0..3abea2a49f41afb0e1c7234ad4ff287a130d718a 100644 (file)
 #include "gnunet_stream_lib.h"
 #include "stream_protocol.h"
 
+/**
+ * Generic logging shorthand
+ */
 #define LOG(kind,...)                                   \
   GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
 
+/**
+ * Debug logging shorthand
+ */
+#define LOG_DEBUG(...)                          \
+  LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
+
+/**
+ * Time in relative seconds shorthand
+ */
 #define TIME_REL_SECS(sec) \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
 
 /**
  * The maximum packet size of a stream packet
  */
-#define MAX_PACKET_SIZE 512//64000
+#define DEFAULT_MAX_PAYLOAD_SIZE 64000
 
 /**
  * Receive buffer
  */
 #define RECEIVE_BUFFER_SIZE 4096000
 
-/**
- * The maximum payload a data message packet can carry
- */
-static const size_t max_payload_size = 
-  MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
-
 /**
  * states in the Protocol
  */
@@ -213,16 +219,6 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_MESH_TransmitHandle *transmit_handle;
 
-  /**
-   * The current act transmit handle (if a pending ack transmit request exists)
-   */
-  struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
-
-  /**
-   * Pointer to the current ack message using in ack_task
-   */
-  struct GNUNET_STREAM_AckMessage *ack_msg;
-
   /**
    * The current message associated with the transmit handle
    */
@@ -265,25 +261,20 @@ struct GNUNET_STREAM_Socket
   struct GNUNET_PeerIdentity other_peer;
 
   /**
-   * Task identifier for the read io timeout task
+   * Task identifier for retransmission task after timeout
    */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+  GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
 
   /**
-   * Task identifier for retransmission task after timeout
+   * Task identifier for retransmission of control messages
    */
-  GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+  GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
 
   /**
    * The task for sending timely Acks
    */
   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
 
-  /**
-   * Task scheduled to continue a read operation.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
   /**
    * The state of the protocol associated with this socket
    */
@@ -314,12 +305,6 @@ struct GNUNET_STREAM_Socket
    */
   uint32_t testing_set_write_sequence_number_value;
 
-  /**
-   * The session id associated with this stream connection
-   * FIXME: Not used currently, may be removed
-   */
-  uint32_t session_id;
-
   /**
    * Write sequence number. Set to random when sending HELLO(client) and
    * HELLO_ACK(server) 
@@ -360,6 +345,11 @@ struct GNUNET_STREAM_Socket
    * The offset upto which user has read from the received buffer
    */
   uint32_t copy_offset;
+
+  /**
+   * The maximum size of the data message payload this stream handle can send
+   */
+  uint16_t max_payload_size;
 };
 
 
@@ -388,6 +378,11 @@ struct GNUNET_STREAM_ListenSocket
    */
   struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
 
+  /**
+   * Callback to call after acquring a lock and listening
+   */
+  GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
+
   /**
    * The callback function which is called after successful opening socket
    */
@@ -427,6 +422,12 @@ struct GNUNET_STREAM_ListenSocket
    * The write sequence number to be set incase of testing
    */
   uint32_t testing_set_write_sequence_number_value;
+
+  /**
+   * The maximum size of the data message payload this stream handle can send
+   */
+  uint16_t max_payload_size;
+
 };
 
 
@@ -465,6 +466,14 @@ struct GNUNET_STREAM_IOWriteHandle
    * Number of bytes in this write handle
    */
   size_t size;
+
+  /**
+   * Number of packets already transmitted from this IO handle. Retransmitted
+   * packets are not taken into account here. This is used to determine which
+   * packets account for retransmission and which packets occupy buffer space at
+   * the receiver.
+   */
+  unsigned int packets_sent;
 };
 
 
@@ -473,6 +482,11 @@ struct GNUNET_STREAM_IOWriteHandle
  */
 struct GNUNET_STREAM_IOReadHandle
 {
+  /**
+   * The socket to which this read handle is associated
+   */
+  struct GNUNET_STREAM_Socket *socket;
+  
   /**
    * Callback for the read processor
    */
@@ -482,6 +496,16 @@ struct GNUNET_STREAM_IOReadHandle
    * The closure pointer for the read processor callback
    */
   void *proc_cls;
+
+  /**
+   * Task identifier for the read io timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+  /**
+   * Task scheduled to continue a read operation.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
 };
 
 
@@ -556,8 +580,7 @@ send_message_notify (void *cls, size_t size, void *buf)
          socket->retries);
     socket->transmit_handle = 
       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                         0, /* Corking */
-                                         1, /* Priority */
+                                         GNUNET_NO, /* Corking */
                                          /* FIXME: exponential backoff */
                                          socket->retransmit_timeout,
                                          &socket->other_peer,
@@ -566,7 +589,6 @@ send_message_notify (void *cls, size_t size, void *buf)
                                          socket);
     return 0;
   }
-
   ret = ntohs (head->message->header.size);
   GNUNET_assert (size >= ret);
   memcpy (buf, head->message, ret);
@@ -585,8 +607,7 @@ send_message_notify (void *cls, size_t size, void *buf)
     socket->retries = 0;
     socket->transmit_handle = 
       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                         0, /* Corking */
-                                         1, /* Priority */
+                                         GNUNET_NO, /* Corking */
                                          /* FIXME: exponential backoff */
                                          socket->retransmit_timeout,
                                          &socket->other_peer,
@@ -605,19 +626,21 @@ send_message_notify (void *cls, size_t size, void *buf)
  * @param message the message to be sent
  * @param finish_cb the callback to be called when the message is sent
  * @param finish_cb_cls the closure for the callback
+ * @param urgent set to GNUNET_YES to add the message to the beginning of the
+ *          queue; GNUNET_NO to add at the tail
  */
 static void
 queue_message (struct GNUNET_STREAM_Socket *socket,
                struct GNUNET_STREAM_MessageHeader *message,
                SendFinishCallback finish_cb,
-               void *finish_cb_cls)
+               void *finish_cb_cls,
+               int urgent)
 {
   struct MessageQueue *queue_entity;
 
   GNUNET_assert 
     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Queueing message of type %d and size %d\n",
        GNUNET_i2s (&socket->other_peer),
@@ -628,21 +651,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
   queue_entity->message = message;
   queue_entity->finish_cb = finish_cb;
   queue_entity->finish_cb_cls = finish_cb_cls;
-  GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
-                                   socket->queue_tail,
-                                   queue_entity);
+  if (GNUNET_YES == urgent)
+  {
+    GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
+                                 queue_entity);
+    if (NULL != socket->transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+      socket->transmit_handle = NULL;
+    }
+  }
+  else
+    GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
+                                      socket->queue_tail,
+                                      queue_entity);
   if (NULL == socket->transmit_handle)
   {
     socket->retries = 0;
     socket->transmit_handle = 
-      GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                        0, /* Corking */
-                                        1, /* Priority */
-                                        socket->retransmit_timeout,
-                                        &socket->other_peer,
-                                        ntohs (message->header.size),
-                                        &send_message_notify,
-                                        socket);
+       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                          GNUNET_NO, /* Corking */
+                                          socket->retransmit_timeout,
+                                          &socket->other_peer,
+                                          ntohs (message->header.size),
+                                          &send_message_notify,
+                                          socket);
   }
 }
 
@@ -668,38 +701,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
   size = ntohs (message->header.size);
   msg_copy = GNUNET_malloc (size);
   memcpy (msg_copy, message, size);
-  queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
-}
-
-
-/**
- * Callback function for sending ack message
- *
- * @param cls closure the ACK message created in ack_task
- * @param size number of bytes available in buffer
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-send_ack_notify (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STREAM_Socket *socket = cls;
-
-  if (0 == size)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s called with size 0\n", __func__);
-    return 0;
-  }
-  GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
-  
-  size = ntohs (socket->ack_msg->header.header.size);
-  memcpy (buf, socket->ack_msg, size);
-  
-  GNUNET_free (socket->ack_msg);
-  socket->ack_msg = NULL;
-  socket->ack_transmit_handle = NULL;
-  return size;
+  queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
 }
 
 
@@ -721,17 +723,16 @@ write_data (struct GNUNET_STREAM_Socket *socket);
  * @param tc the Task context
  */
 static void
-retransmission_timeout_task (void *cls,
-                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+data_retransmission_task (void *cls,
+                          const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
     return;
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -749,11 +750,9 @@ ack_task (void *cls,
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_AckMessage *ack_msg;
 
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
-  {
-    return;
-  }
   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
   ack_msg->header.header.size = htons (sizeof (struct 
@@ -763,17 +762,8 @@ ack_task (void *cls,
   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
   ack_msg->receive_window_remaining = 
     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
-  socket->ack_msg = ack_msg;
-  /* Request MESH for sending ACK */
-  socket->ack_transmit_handle = 
-    GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                       0, /* Corking */
-                                       1, /* Priority */
-                                       socket->retransmit_timeout,
-                                       &socket->other_peer,
-                                       ntohs (ack_msg->header.header.size),
-                                       &send_ack_notify,
-                                       socket);
+  /* Queue up ACK for immediate sending */
+  queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
 }
 
 
@@ -791,9 +781,11 @@ close_msg_retransmission_task (void *cls,
   struct GNUNET_STREAM_MessageHeader *msg;
   struct GNUNET_STREAM_Socket *socket;
 
+  shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL != shutdown_handle);
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   socket = shutdown_handle->socket;
-
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   switch (shutdown_handle->operation)
@@ -813,7 +805,7 @@ close_msg_retransmission_task (void *cls,
       GNUNET_SCHEDULER_NO_TASK;
     return;
   }
-  queue_message (socket, msg, NULL, NULL);
+  queue_message (socket, msg, NULL, NULL, GNUNET_NO);
   shutdown_handle->close_msg_retransmission_task_id =
     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
                                   &close_msg_retransmission_task,
@@ -867,36 +859,23 @@ static void
 write_data (struct GNUNET_STREAM_Socket *socket)
 {
   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
-  int packet;                   /* Although an int, should never be negative */
-  int ack_packet;
-
-  ack_packet = -1;
-  /* Find the last acknowledged packet */
-  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
-  {      
-    if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                            packet))
-      ack_packet = packet;        
-    else if (NULL == io_handle->messages[packet])
-      break;
-  }
-  /* Resend packets which weren't ack'ed */
-  for (packet=0; packet < ack_packet; packet++)
+  unsigned int packet;
+  
+  for (packet=0; packet < io_handle->packets_sent; packet++)
   {
     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                           packet))
+                                          packet))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Placing DATA message with sequence %u in send queue\n",
-           GNUNET_i2s (&socket->other_peer),
-           ntohl (io_handle->messages[packet]->sequence_number));
+          "%s: Retransmitting DATA message with sequence %u\n",
+          GNUNET_i2s (&socket->other_peer),
+          ntohl (io_handle->messages[packet]->sequence_number));
       copy_and_queue_message (socket,
-                              &io_handle->messages[packet]->header,
-                              NULL,
-                              NULL);
+                             &io_handle->messages[packet]->header,
+                             NULL,
+                             NULL);
     }
   }
-  packet = ack_packet + 1;
   /* Now send new packets if there is enough buffer space */
   while ( (NULL != io_handle->messages[packet]) &&
          (socket->receiver_window_available 
@@ -915,11 +894,13 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                             NULL);
     packet++;
   }
-  if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
-    socket->retransmission_timeout_task_id = 
+  io_handle->packets_sent = packet;
+  // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
+  if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+    socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
                                     (GNUNET_TIME_UNIT_SECONDS, 8),
-                                    &retransmission_timeout_task,
+                                    &data_retransmission_task,
                                     socket);
 }
 
@@ -935,22 +916,22 @@ call_read_processor (void *cls,
                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   size_t read_size;
   size_t valid_read_size;
   unsigned int packet;
   uint32_t sequence_increase;
   uint32_t offset_increase;
 
-  socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   if (NULL == socket->receive_buffer) 
     return;
-
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (NULL != socket->read_handle->proc);
-
   /* Check the bitmap for any holes */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
@@ -964,22 +945,19 @@ call_read_processor (void *cls,
     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
   GNUNET_assert (0 != valid_read_size);
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   /* Call the data processor */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Calling read processor\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
        GNUNET_i2s (&socket->other_peer));
   read_size = 
-    socket->read_handle->proc (socket->read_handle->proc_cls,
-                               socket->status,
-                               socket->receive_buffer + socket->copy_offset,
-                               valid_read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor read %d bytes\n",
+      socket->read_handle->proc (socket->read_handle->proc_cls,
+                                socket->status,
+                                socket->receive_buffer + socket->copy_offset,
+                                valid_read_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
        GNUNET_i2s (&socket->other_peer), read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor completed successfully\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
        GNUNET_i2s (&socket->other_peer));
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
@@ -994,14 +972,13 @@ call_read_processor (void *cls,
     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
       break;
   }
-
   /* If no packets can be removed we can't move the buffer */
-  if (0 == packet) return;
+  if (0 == packet) 
+    return;
   sequence_increase = packet;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Sequence increase after read processor completion: %u\n",
        GNUNET_i2s (&socket->other_peer), sequence_increase);
-
   /* Shift the data in the receive buffer */
   socket->receive_buffer = 
     memmove (socket->receive_buffer,
@@ -1050,27 +1027,30 @@ call_read_processor (void *cls,
  * @param tc the task context
  */
 static void
-read_io_timeout (void *cls, 
+read_io_timeout (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
+  if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Read task timedout - Cancelling it\n",
          GNUNET_i2s (&socket->other_peer));
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+    read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  GNUNET_assert (NULL != socket->read_handle);
-  proc = socket->read_handle->proc;
-  proc_cls = socket->read_handle->proc_cls;
-
-  GNUNET_free (socket->read_handle);
+  proc = read_handle->proc;
+  proc_cls = read_handle->proc_cls;
+  GNUNET_free (read_handle);
   socket->read_handle = NULL;
   /* Call the read processor to signal timeout */
   proc (proc_cls,
@@ -1099,6 +1079,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
              const struct GNUNET_ATS_Information*atsi)
 {
   const void *payload;
+  struct GNUNET_TIME_Relative ack_deadline_rel;
   uint32_t bytes_needed;
   uint32_t relative_offset;
   uint32_t relative_sequence_number;
@@ -1110,23 +1091,19 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA from non-confirming peer\n",
-         GNUNET_i2s (&socket->other_peer));
+        "%s: Received DATA from non-confirming peer\n",
+        GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
-
   switch (socket->state)
   {
   case STATE_ESTABLISHED:
   case STATE_TRANSMIT_CLOSED:
-  case STATE_TRANSMIT_CLOSE_WAIT:
-      
+  case STATE_TRANSMIT_CLOSE_WAIT:      
     /* check if the message's sequence number is in the range we are
        expecting */
     relative_sequence_number = 
@@ -1147,8 +1124,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                         socket);
       }
       return GNUNET_YES;
-    }
-      
+    }      
     /* Check if we have already seen this message */
     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                             relative_sequence_number))
@@ -1162,20 +1138,14 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       {
         socket->ack_task_id = 
           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                        (msg->ack_deadline),
-                                        &ack_task,
-                                        socket);
+                                        (msg->ack_deadline), &ack_task, socket);
       }
       return GNUNET_YES;
     }
-
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         ntohl (msg->sequence_number),
-         ntohs (msg->header.header.size),
-         GNUNET_i2s (&socket->other_peer));
-      
+         GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+         ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
     /* Check if we have to allocate the buffer */
     size -= sizeof (struct GNUNET_STREAM_DataMessage);
     relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1192,54 +1162,67 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Cannot accommodate packet %d as buffer is full\n",
-             GNUNET_i2s (&socket->other_peer),
-             ntohl (msg->sequence_number));
+             GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
         return GNUNET_YES;
       }
     }
-      
     /* Copy Data to buffer */
     payload = &msg[1];
     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
-    memcpy (socket->receive_buffer + relative_offset,
-            payload,
-            size);
+    memcpy (socket->receive_buffer + relative_offset, payload, size);
     socket->receive_buffer_boundaries[relative_sequence_number] = 
-      relative_offset + size;
-      
+       relative_offset + size;
     /* Modify the ACK bitmap */
-    ackbitmap_modify_bit (&socket->ack_bitmap,
-                          relative_sequence_number,
-                          GNUNET_YES);
-
+    ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+                         GNUNET_YES);
     /* Start ACK sending task if one is not already present */
+    ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
     {
+      ack_deadline_rel = 
+         GNUNET_TIME_relative_min (ack_deadline_rel,
+                                   GNUNET_TIME_relative_multiply
+                                   (GNUNET_TIME_UNIT_SECONDS, 300));
       socket->ack_task_id = 
-        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                      (msg->ack_deadline),
-                                      &ack_task,
-                                      socket);
+         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
+                                       (msg->ack_deadline), &ack_task, socket);
+      socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+      socket->ack_time_deadline = ack_deadline_rel;
+    }
+    else
+    {
+      struct GNUNET_TIME_Relative ack_time_past;
+      struct GNUNET_TIME_Relative ack_time_remaining;
+      struct GNUNET_TIME_Relative ack_time_min;
+      ack_time_past = 
+         GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+      ack_time_remaining = GNUNET_TIME_relative_subtract
+         (socket->ack_time_deadline, ack_time_past);
+      ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+                                              ack_deadline_rel);
+      if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+                     sizeof (struct GNUNET_TIME_Relative)))
+      {
+       ack_deadline_rel = ack_time_min;
+       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+       socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+                                                           &ack_task, socket);
+       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+       socket->ack_time_deadline = ack_deadline_rel;
+      }
     }
-
     if ((NULL != socket->read_handle) /* A read handle is waiting */
         /* There is no current read task */
-        && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+        && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
         /* We have the first packet */
-        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
-                                               0)))
+        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Scheduling read processor\n",
-           GNUNET_i2s (&socket->other_peer));
-          
-      socket->read_task_id = 
-        GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                  socket);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+          GNUNET_i2s (&socket->other_peer));
+      socket->read_handle->read_task_id =
+         GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
     }
-      
     break;
-
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received data message when it cannot be handled\n",
@@ -1272,18 +1255,15 @@ client_handle_data (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return handle_data (socket, 
-                      tunnel, 
-                      sender, 
-                      (const struct GNUNET_STREAM_DataMessage *) message, 
-                      atsi);
+  return handle_data (socket, tunnel, sender, 
+                      (const struct GNUNET_STREAM_DataMessage *) message, atsi);
 }
 
 
 /**
  * Callback to set state to ESTABLISHED
  *
- * @param cls the closure from queue_message FIXME: document
+ * @param cls the closure NULL;
  * @param socket the socket to requiring state change
  */
 static void
@@ -1296,6 +1276,10 @@ set_state_established (void *cls,
   socket->write_offset = 0;
   socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+                 socket->control_retransmission_task_id);
+  GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (NULL != socket->lsocket)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1312,7 +1296,7 @@ set_state_established (void *cls,
       GNUNET_free (socket);
     }
   }
-  else if (NULL != socket->open_cb)
+  else
     socket->open_cb (socket->open_cls, socket);
 }
 
@@ -1328,7 +1312,7 @@ set_state_hello_wait (void *cls,
                       struct GNUNET_STREAM_Socket *socket)
 {
   GNUNET_assert (STATE_INIT == socket->state);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Attaining HELLO_WAIT state\n",
        GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_HELLO_WAIT;
@@ -1406,41 +1390,112 @@ set_state_closed (void *cls,
 }
 
 
+/**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+  struct GNUNET_STREAM_MessageHeader *msg;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  return msg;
+}
+
+
 /**
  * Returns a new HelloAckMessage. Also sets the write sequence number for the
  * socket
  *
  * @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ *          GNUNET_NO to use the existing sequence number
  * @return the HelloAckMessage
  */
 static struct GNUNET_STREAM_HelloAckMessage *
-generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+                    int generate_seq)
 {
   struct GNUNET_STREAM_HelloAckMessage *msg;
 
-  /* Get the random sequence number */
-  if (GNUNET_YES == socket->testing_active)
-    socket->write_sequence_number =
-      socket->testing_set_write_sequence_number_value;
-  else
-    socket->write_sequence_number = 
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: write sequence number %u\n",
-       GNUNET_i2s (&socket->other_peer),
-       (unsigned int) socket->write_sequence_number);
-  
+  if (GNUNET_YES == generate_seq)
+  {
+    if (GNUNET_YES == socket->testing_active)
+      socket->write_sequence_number =
+        socket->testing_set_write_sequence_number_value;
+    else
+      socket->write_sequence_number = 
+        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+    LOG_DEBUG ("%s: write sequence number %u\n",
+               GNUNET_i2s (&socket->other_peer),
+               (unsigned int) socket->write_sequence_number);
+  }
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.size = 
     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
   msg->sequence_number = htonl (socket->write_sequence_number);
   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-
   return msg;
 }
 
 
+/**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+    
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    return;
+  LOG_DEBUG ("%s: Retransmitting a control message\n",
+                 GNUNET_i2s (&socket->other_peer));
+  switch (socket->state)
+  {
+  case STATE_INIT:    
+    GNUNET_break (0);
+    break;
+  case STATE_LISTEN:
+    GNUNET_break (0);
+    break;
+  case STATE_HELLO_WAIT:
+    if (NULL == socket->lsocket) /* We are client */
+      queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
+    else
+      queue_message (socket,
+                     (struct GNUNET_STREAM_MessageHeader *)
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+                     GNUNET_NO);
+    socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
+    break;
+  case STATE_ESTABLISHED:
+    if (NULL == socket->lsocket)
+      queue_message (socket,
+                     (struct GNUNET_STREAM_MessageHeader *)
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+                     GNUNET_NO);
+    else
+      GNUNET_break (0);
+  default:
+    GNUNET_break (0);
+  }  
+}
+
+
 /**
  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
  *
@@ -1465,9 +1520,8 @@ client_handle_hello_ack (void *cls,
   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1475,11 +1529,8 @@ client_handle_hello_ack (void *cls,
     return GNUNET_YES;
   }
   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Received HELLO_ACK from %s\n",
-       GNUNET_i2s (&socket->other_peer),
-       GNUNET_i2s (&socket->other_peer));
-
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+       GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
   GNUNET_assert (socket->tunnel == tunnel);
   switch (socket->state)
   {
@@ -1490,27 +1541,24 @@ client_handle_hello_ack (void *cls,
          GNUNET_i2s (&socket->other_peer),
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
-    reply = generate_hello_ack_msg (socket);
-    queue_message (socket,
-                   &reply->header, 
-                   &set_state_established, 
-                   NULL);      
+    reply = generate_hello_ack (socket, GNUNET_YES);
+    queue_message (socket, &reply->header, &set_state_established,
+                   NULL, GNUNET_NO);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
-  case STATE_RECEIVE_CLOSE_WAIT:
     // call statistics (# ACKs ignored++)
+    GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                   socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id =
+      GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
     return GNUNET_OK;
-  case STATE_INIT:
   default:
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Server %s sent HELLO_ACK when in state %d\n", 
-         GNUNET_i2s (&socket->other_peer),
-         GNUNET_i2s (&socket->other_peer),
-         socket->state);
+    LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
+               GNUNET_i2s (&socket->other_peer),
+              GNUNET_i2s (&socket->other_peer), socket->state);
     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
     return GNUNET_SYSERR;
   }
-
 }
 
 
@@ -1564,15 +1612,13 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
   {
   case STATE_ESTABLISHED:
     socket->state = STATE_RECEIVE_CLOSED;
-
     /* Send TRANSMIT_CLOSE_ACK */
     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
     reply->header.type = 
       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-    queue_message (socket, reply, NULL, NULL);
+    queue_message (socket, reply, NULL, NULL, GNUNET_NO);
     break;
-
   default:
     /* FIXME: Call statistics? */
     break;
@@ -1641,7 +1687,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
          GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   }
-
   switch (operation)
   {
   case SHUT_RDWR:
@@ -1652,15 +1697,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received CLOSE_ACK when shutdown handle is not for "
-             "SHUT_RDWR\n",
-             GNUNET_i2s (&socket->other_peer));
+             "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_CLOSED;
       break;
     default:
@@ -1670,7 +1711,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       return GNUNET_OK;
     }
     break;
-
   case SHUT_RD:
     switch (socket->state)
     {
@@ -1679,15 +1719,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
-             "is not for SHUT_RD\n",
-             GNUNET_i2s (&socket->other_peer));
+             "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received RECEIVE_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_RECEIVE_CLOSED;
       break;
     default:
@@ -1696,7 +1732,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-
     break;
   case SHUT_WR:
     switch (socket->state)
@@ -1710,38 +1745,33 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
              GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_TRANSMIT_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
-           GNUNET_i2s (&socket->other_peer));
-          
+           GNUNET_i2s (&socket->other_peer));          
       return GNUNET_OK;
     }
     break;
   default:
     GNUNET_assert (0);
   }
-
   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
                                    operation);
-  GNUNET_free (shutdown_handle); /* Free shutdown handle */
-  socket->shutdown_handle = NULL;
   if (GNUNET_SCHEDULER_NO_TASK
       != shutdown_handle->close_msg_retransmission_task_id)
   {
     GNUNET_SCHEDULER_cancel
       (shutdown_handle->close_msg_retransmission_task_id);
     shutdown_handle->close_msg_retransmission_task_id =
-      GNUNET_SCHEDULER_NO_TASK;
+       GNUNET_SCHEDULER_NO_TASK;
   }
+  GNUNET_free (shutdown_handle); /* Free shutdown handle */
+  socket->shutdown_handle = NULL;
   return GNUNET_OK;
 }
 
@@ -1821,11 +1851,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   receive_close_ack->header.type =
     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
-  queue_message (socket,
-                 receive_close_ack,
-                 &set_state_closed,
-                 NULL);
-  
+  queue_message (socket, receive_close_ack, &set_state_closed,
+                 NULL, GNUNET_NO);  
   /* FIXME: Handle the case where write handle is present; the write operation
      should be deemed as finised and the write continuation callback
      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
@@ -1936,10 +1963,7 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
-  queue_message (socket,
-                 close_ack,
-                 &set_state_closed,
-                 NULL);
+  queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
   if (socket->state == STATE_CLOSED)
     return GNUNET_OK;
 
@@ -2072,36 +2096,38 @@ server_handle_hello (void *cls,
                    &socket->other_peer,
                    sizeof (struct GNUNET_PeerIdentity)))
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received HELLO from non-confirming peer\n",
-         GNUNET_i2s (&socket->other_peer));
+    LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
+               GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
-
-  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
-                 ntohs (message->type));
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Received HELLO from %s\n", 
-       GNUNET_i2s (&socket->other_peer),
-       GNUNET_i2s (&socket->other_peer));
-
-  if (STATE_INIT == socket->state)
-  {
-    reply = generate_hello_ack_msg (socket);
-    queue_message (socket, 
-                   &reply->header,
-                   &set_state_hello_wait, 
-                   NULL);
-  }
-  else
+  LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
+             GNUNET_i2s (&socket->other_peer));
+  switch (socket->state)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Client sent HELLO when in state %d\n", 
-         GNUNET_i2s (&socket->other_peer),
-         socket->state);
+  case STATE_INIT:
+    reply = generate_hello_ack (socket, GNUNET_YES);
+    queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
+                   GNUNET_NO);
+    GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                   socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id =
+      GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                    &control_retransmission_task, socket);
+    break;
+  case STATE_HELLO_WAIT:
+    /* Perhaps our HELLO_ACK was lost */
+    GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != 
+                   socket->control_retransmission_task_id);
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id =
+      GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
+    break;
+  default:
+    LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
+               GNUNET_i2s (&socket->other_peer), socket->state);
     /* FIXME: Send RESET? */
-      
   }
   return GNUNET_OK;
 }
@@ -2134,8 +2160,9 @@ server_handle_hello_ack (void *cls,
                  ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
-  if (STATE_HELLO_WAIT == socket->state)
+  switch (socket->state)  
   {
+  case STATE_HELLO_WAIT:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received HELLO_ACK from %s\n",
          GNUNET_i2s (&socket->other_peer),
@@ -2147,15 +2174,11 @@ server_handle_hello_ack (void *cls,
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = 
       ntohl (ack_message->receiver_window_size);
-    /* Attain ESTABLISHED state */
     set_state_established (NULL, socket);
-  }
-  else
-  {
+    break;
+  default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Client sent HELLO_ACK when in state %d\n", socket->state);
-    /* FIXME: Send RESET? */
-      
+         "Client sent HELLO_ACK when in state %d\n", socket->state);    
   }
   return GNUNET_OK;
 }
@@ -2418,10 +2441,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-    /* FIXME: increment in the base sequence number is breaking current flow
-     */
-    if (!((socket->write_sequence_number 
-           - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+    sequence_difference = 
+       socket->write_sequence_number - ntohl (ack->base_sequence_number);
+    if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received DATA_ACK with unexpected base sequence number\n",
@@ -2435,17 +2457,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     }
     /* FIXME: include the case when write_handle is cancelled - ignore the 
        acks */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA_ACK from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         GNUNET_i2s (&socket->other_peer));
-      
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+         GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
     /* Cancel the retransmission task */
-    if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
     {
-      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-      socket->retransmission_timeout_task_id = 
-        GNUNET_SCHEDULER_NO_TASK;
+      GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+      socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
     }
     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
     {
@@ -2453,22 +2471,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
       /* BS: Base sequence from ack; PS: sequence num of current packet */
       sequence_difference = ntohl (ack->base_sequence_number)
         - ntohl (socket->write_handle->messages[packet]->sequence_number);
+      if ((0 == sequence_difference) ||
+         (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
+       continue; /* The message in our handle is not yet received */
       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
-      if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-          || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-              && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-        continue;
-      }
-      if (GNUNET_YES == 
-          ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
-                                -sequence_difference))/*inversion as PS >= BS */
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-      }
+      /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+      ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+                           packet, GNUNET_YES);
     }
     /* Update the receive window remaining
        FIXME : Should update with the value from a data ack with greater
@@ -2656,30 +2665,20 @@ mesh_peer_connect_callback (void *cls,
          GNUNET_i2s(peer));
     return;
   }
-  
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Target peer %s connected\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
   /* Set state to INIT */
   socket->state = STATE_INIT;
-
   /* Send HELLO message */
-  message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
-  message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-  queue_message (socket,
-                 message,
-                 &set_state_hello_wait,
-                 NULL);
-
-  /* Call open callback */
-  if (NULL == socket->open_cb)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "STREAM_open callback is NULL\n");
-  }
+  message = generate_hello ();
+  queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                 socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
 }
 
 
@@ -2727,31 +2726,23 @@ new_tunnel_notify (void *cls,
 
   if (GNUNET_NO == lsocket->listening)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Destroying tunnel from peer %s as we don't have the lock\n",
-         GNUNET_i2s (&socket->other_peer),
-         GNUNET_i2s (&socket->other_peer));
     GNUNET_MESH_tunnel_destroy (tunnel);
     return NULL;
   }
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *initiator;
   socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
   socket->lsocket = lsocket;
   socket->retransmit_timeout = lsocket->retransmit_timeout;
   socket->testing_active = lsocket->testing_active;
   socket->testing_set_write_sequence_number_value =
-    lsocket->testing_set_write_sequence_number_value;
-    
+      lsocket->testing_set_write_sequence_number_value;
+  socket->max_payload_size = lsocket->max_payload_size;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
-  /* FIXME: Copy MESH handle from lsocket to socket */
-  
   return socket;
 }
 
@@ -2774,43 +2765,52 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  struct MessageQueue *head;
 
-  if (tunnel != socket->tunnel)
-    return;
-
+  GNUNET_assert (tunnel == socket->tunnel);
   GNUNET_break_op(0);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s has terminated connection abruptly\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-
   socket->status = GNUNET_STREAM_SHUTDOWN;
-
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
     socket->transmit_handle = NULL;
   }
-  if (NULL != socket->ack_transmit_handle)
-  {
-    GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
-    GNUNET_free (socket->ack_msg);
-    socket->ack_msg = NULL;
-    socket->ack_transmit_handle = NULL;
-  }
   /* Stop Tasks using socket->tunnel */
   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }  
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+  {
+    GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+    socket->transmit_handle = NULL;
+  }
+  /* Clear existing message queue */
+  while (NULL != (head = socket->queue_head)) {
+    GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+                                socket->queue_tail,
+                                head);
+    GNUNET_free (head->message);
+    GNUNET_free (head);
   }
-  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -2832,9 +2832,8 @@ lockmanager_acquire_timeout (void *cls,
   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
   listen_cb = lsocket->listen_cb;
   listen_cb_cls = lsocket->listen_cb_cls;
-  GNUNET_STREAM_listen_close (lsocket);
   if (NULL != listen_cb)
-    listen_cb (listen_cb_cls, NULL, NULL);  
+    listen_cb (listen_cb_cls, NULL, NULL);
 }
 
 
@@ -2866,13 +2865,16 @@ lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
       GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
 
       lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
-                                           RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE size as parameter? */
                                            lsocket, /* Closure */
                                            &new_tunnel_notify,
                                            &tunnel_cleaner,
                                            server_message_handlers,
                                            ports);
       GNUNET_assert (NULL != lsocket->mesh);
+      if (NULL != lsocket->listen_ok_cb)
+      {
+        (void) lsocket->listen_ok_cb ();
+      }
     }
   }
   if (GNUNET_LOCKMANAGER_RELEASE == status)
@@ -2892,7 +2894,8 @@ lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
  * @param target the target peer to which the stream has to be opened
  * @param app_port the application port number which uniquely identifies this
  *            stream
- * @param open_cb this function will be called after stream has be established 
+ * @param open_cb this function will be called after stream has be established;
+ *          cannot be NULL
  * @param open_cb_cls the closure for open_cb
  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
  * @return if successful it returns the stream socket; NULL if stream cannot be
@@ -2909,18 +2912,20 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   struct GNUNET_STREAM_Socket *socket;
   enum GNUNET_STREAM_Option option;
   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
-  va_list vargs;                /* Variable arguments */
+  va_list vargs;
+  uint16_t payload_size;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
+  GNUNET_assert (NULL != open_cb);
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *target;
   socket->open_cb = open_cb;
   socket->open_cls = open_cb_cls;
   /* Set defaults */
-  socket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   socket->testing_active = GNUNET_NO;
+  socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
   va_start (vargs, open_cb_cls); /* Parse variable args */
   do {
     option = va_arg (vargs, enum GNUNET_STREAM_Option);
@@ -2936,13 +2941,24 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
       socket->testing_set_write_sequence_number_value = va_arg (vargs,
                                                                 uint32_t);
       break;
+    case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
+      GNUNET_break (0);          /* Option irrelevant in STREAM_open */
+      break;
+    case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
+      GNUNET_break (0);          /* Option irrelevant in STREAM_open */
+      break;
+    case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+      payload_size = (uint16_t) va_arg (vargs, unsigned int);
+      GNUNET_assert (0 != payload_size);
+      if (payload_size < socket->max_payload_size)
+       socket->max_payload_size = payload_size;
+      break;
     case GNUNET_STREAM_OPTION_END:
       break;
     }
   } while (GNUNET_STREAM_OPTION_END != option);
   va_end (vargs);               /* End of variable args parsing */
   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
-                                      RECEIVE_BUFFER_SIZE,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
                                       NULL, /* No in-tunnel cleaner */
@@ -2953,10 +2969,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
     GNUNET_free (socket);
     return NULL;
   }
-
   /* Now create the mesh tunnel to target */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Creating MESH Tunnel\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
                                               NULL, /* Tunnel context */
                                               &mesh_peer_connect_callback,
@@ -2965,9 +2979,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   GNUNET_assert (NULL != socket->tunnel);
   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
                                         &socket->other_peer);
-  
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s() END\n", __func__);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
   return socket;
 }
 
@@ -3010,10 +3022,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
            "Existing read handle should be cancelled before shutting"
            " down reading\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_receive_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_receive_close_wait, NULL,
+                   GNUNET_NO);
     break;
   case SHUT_WR:
     handle->operation = SHUT_WR;
@@ -3022,10 +3032,8 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
            "Existing write handle should be cancelled before shutting"
            " down writing\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_transmit_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
+                   GNUNET_NO);
     break;
   case SHUT_RDWR:
     handle->operation = SHUT_RDWR;
@@ -3038,10 +3046,7 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
            "Existing read handle should be cancelled before shutting"
            " down reading\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
     break;
   default:
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -3069,6 +3074,7 @@ GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
 {
   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+  handle->socket->shutdown_handle = NULL;
   GNUNET_free (handle);
 }
 
@@ -3087,6 +3093,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Closing STREAM socket when a read handle is pending\n");
+    GNUNET_STREAM_io_read_cancel (socket->read_handle);
   }
   if (NULL != socket->write_handle)
   {
@@ -3095,36 +3102,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
-  {
-    /* socket closed with read task pending!? */
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
-  
-  /* Terminate the ack'ing tasks if they are still present */
+  /* Terminate the ack'ing task if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  }
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
     socket->transmit_handle = NULL;
   }
-  if (NULL != socket->ack_transmit_handle)
-  {
-    GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
-    GNUNET_free (socket->ack_msg);
-    socket->ack_msg = NULL;
-    socket->ack_transmit_handle = NULL;
-  }
-
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {
     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
@@ -3133,27 +3127,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_free (head->message);
     GNUNET_free (head);
   }
-
   /* Close associated tunnel */
   if (NULL != socket->tunnel)
   {
     GNUNET_MESH_tunnel_destroy (socket->tunnel);
     socket->tunnel = NULL;
   }
-
   /* Close mesh connection */
   if (NULL != socket->mesh && NULL == socket->lsocket)
   {
     GNUNET_MESH_disconnect (socket->mesh);
     socket->mesh = NULL;
-  }
-  
+  }  
   /* Release receive buffer */
   if (NULL != socket->receive_buffer)
   {
     GNUNET_free (socket->receive_buffer);
   }
-
   GNUNET_free (socket);
 }
 
@@ -3178,8 +3168,10 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   /* FIXME: Add variable args for passing configration options? */
   struct GNUNET_STREAM_ListenSocket *lsocket;
+  struct GNUNET_TIME_Relative listen_timeout;
   enum GNUNET_STREAM_Option option;
   va_list vargs;
+  uint16_t payload_size;
 
   GNUNET_assert (NULL != listen_cb);
   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
@@ -3193,9 +3185,11 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   }
   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
   /* Set defaults */
-  lsocket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   lsocket->testing_active = GNUNET_NO;
+  lsocket->listen_ok_cb = NULL;
+  lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
+  listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
   va_start (vargs, listen_cb_cls);
   do {
     option = va_arg (vargs, enum GNUNET_STREAM_Option);
@@ -3210,6 +3204,20 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
       lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
                                                                  uint32_t);
       break;
+    case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
+      listen_timeout = GNUNET_TIME_relative_multiply
+        (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
+      break;
+    case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
+      lsocket->listen_ok_cb = va_arg (vargs,
+                                      GNUNET_STREAM_ListenSuccessCallback);
+      break;
+    case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+      payload_size = (uint16_t) va_arg (vargs, unsigned int);
+      GNUNET_assert (0 != payload_size);
+      if (payload_size < lsocket->max_payload_size)
+       lsocket->max_payload_size = payload_size;
+      break;
     case GNUNET_STREAM_OPTION_END:
       break;
     }
@@ -3223,7 +3231,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                      (uint32_t) lsocket->port,
                                      &lock_status_change_cb, lsocket);
   lsocket->lockmanager_acquire_timeout_task =
-    GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20),
+    GNUNET_SCHEDULER_add_delayed (listen_timeout,
                                   &lockmanager_acquire_timeout, lsocket);
   return lsocket;
 }
@@ -3277,25 +3285,23 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
                      GNUNET_STREAM_CompletionContinuation write_cont,
                      void *write_cont_cls)
 {
-  unsigned int num_needed_packets;
-  unsigned int packet;
   struct GNUNET_STREAM_IOWriteHandle *io_handle;
-  uint32_t packet_size;
-  uint32_t payload_size;
   struct GNUNET_STREAM_DataMessage *data_msg;
   const void *sweep;
   struct GNUNET_TIME_Relative ack_deadline;
+  unsigned int num_needed_packets;
+  unsigned int packet;
+  uint32_t packet_size;
+  uint32_t payload_size;
+  uint16_t max_data_packet_size;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
-
-  /* Return NULL if there is already a write request pending */
   if (NULL != socket->write_handle)
   {
     GNUNET_break (0);
     return NULL;
   }
-
   switch (socket->state)
   {
   case STATE_TRANSMIT_CLOSED:
@@ -3321,32 +3327,35 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   case STATE_RECEIVE_CLOSE_WAIT:
     break;
   }
-
-  if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
-    size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
-  num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
+  if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
+    size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
+  num_needed_packets =
+      (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
   io_handle->socket = socket;
   io_handle->write_cont = write_cont;
   io_handle->write_cont_cls = write_cont_cls;
   io_handle->size = size;
+  io_handle->packets_sent = 0;
   sweep = data;
   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
      determined from RTT */
   ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
   /* Divide the given buffer into packets for sending */
+  max_data_packet_size =
+      socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
   for (packet=0; packet < num_needed_packets; packet++)
   {
-    if ((packet + 1) * max_payload_size < size) 
+    if ((packet + 1) * socket->max_payload_size < size) 
     {
-      payload_size = max_payload_size;
-      packet_size = MAX_PACKET_SIZE;
+      payload_size = socket->max_payload_size;
+      packet_size = max_data_packet_size;
     }
     else 
     {
-      payload_size = size - packet * max_payload_size;
-      packet_size =  payload_size + sizeof (struct
-                                            GNUNET_STREAM_DataMessage); 
+      payload_size = size - packet * socket->max_payload_size;
+      packet_size = 
+         payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
     }
     io_handle->messages[packet] = GNUNET_malloc (packet_size);
     io_handle->messages[packet]->header.header.size = htons (packet_size);
@@ -3355,25 +3364,24 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
     io_handle->messages[packet]->sequence_number =
       htonl (socket->write_sequence_number++);
     io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
        determined from RTT */
     io_handle->messages[packet]->ack_deadline =
       GNUNET_TIME_relative_hton (ack_deadline);
     data_msg = io_handle->messages[packet];
     /* Copy data from given buffer to the packet */
-    memcpy (&data_msg[1],
-            sweep,
-            payload_size);
+    memcpy (&data_msg[1], sweep, payload_size);
     sweep += payload_size;
     socket->write_offset += payload_size;
   }
+  /* ack the last data message. FIXME: remove when we figure out how to do this
+     using RTT */
+  io_handle->messages[num_needed_packets - 1]->ack_deadline = 
+      GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
   socket->write_handle = io_handle;
   write_data (socket);
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s() END\n", __func__);
-
   return io_handle;
 }
 
@@ -3386,9 +3394,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
  * @param proc function to call with data (once only)
  * @param proc_cls the closure for proc
  *
- * @return handle to cancel the operation; if the stream has been shutdown for
- *           this type of opeartion then the DataProcessor is immediately
- *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ *           been shutdown for this type of opeartion (the DataProcessor is
+ *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ *           read handle is present (only one read handle per socket is present
+ *           at any time)
  */
 struct GNUNET_STREAM_IOReadHandle *
 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -3402,13 +3412,11 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
        "%s: %s()\n", 
        GNUNET_i2s (&socket->other_peer),
        __func__);
-
   /* Return NULL if there is already a read handle; the user has to cancel that
      first before continuing or has to wait until it is completed */
-  if (NULL != socket->read_handle) return NULL;
-
+  if (NULL != socket->read_handle) 
+    return NULL;
   GNUNET_assert (NULL != proc);
-
   switch (socket->state)
   {
   case STATE_RECEIVE_CLOSED:
@@ -3424,30 +3432,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   default:
     break;
   }
-
   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
   read_handle->proc = proc;
   read_handle->proc_cls = proc_cls;
+  read_handle->socket = socket;
   socket->read_handle = read_handle;
-
-  /* Check if we have a packet at bitmap 0 */
   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                           0))
-  {
-    socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                     socket);
-   
-  }
-  
-  /* Setup the read timeout task */
-  socket->read_io_timeout_task_id =
-    GNUNET_SCHEDULER_add_delayed (timeout,
-                                  &read_io_timeout,
-                                  socket);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: %s() END\n",
-       GNUNET_i2s (&socket->other_peer),
-       __func__);
+    read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+                                                         socket);   
+  read_handle->read_io_timeout_task_id =
+      GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+       GNUNET_i2s (&socket->other_peer), __func__);
   return read_handle;
 }
 
@@ -3466,10 +3463,10 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
   GNUNET_assert (NULL != socket->write_handle);
   GNUNET_assert (socket->write_handle == ioh);
 
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
 
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
@@ -3491,7 +3488,21 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
 void
 GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
 {
-  // FIXME: do stuff
+  struct GNUNET_STREAM_Socket *socket;
+  
+  socket = ioh->socket;
+  GNUNET_assert (NULL != socket->read_handle);
+  GNUNET_assert (ioh == socket->read_handle);
+  /* Read io time task should be there; if it is already executed then this
+  read handle is not valid; However upon scheduler shutdown the read io task
+  may be executed before */
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
+  /* reading task may be present; if so we have to stop it */
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_task_id);
+  GNUNET_free (ioh);
+  socket->read_handle = NULL;
 }
 
 /* end of stream_api.c */