-fixing misc issues in stream, including compilation errors on bots, but more
[oweals/gnunet.git] / src / stream / stream_api.c
index 2b9363c68be1955806162e352da199d135601431..3bf3c7863df49b348ece91bf5280d20e51e9f115 100644 (file)
  *
  * Decrement PEER intern count during socket close and listen close to free the
  * memory used for PEER interning
+ *
+ * Add code for write io timeout
+ *
+ * Include retransmission for control messages
  **/
 
 /**
@@ -32,6 +36,8 @@
  * @brief Implementation of the stream library
  * @author Sree Harsha Totakura
  */
+
+
 #include "platform.h"
 #include "gnunet_common.h"
 #include "gnunet_crypto_lib.h"
 #define MAX_PACKET_SIZE 64000
 
 /**
- * The maximum payload a data message packet can carry
+ * Receive buffer
  */
-static size_t max_payload_size = 
-  MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+#define RECEIVE_BUFFER_SIZE 4096000
 
 /**
- * Receive buffer
+ * The maximum payload a data message packet can carry
  */
-#define RECEIVE_BUFFER_SIZE 4096000
+static size_t max_payload_size = 
+  MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
 
 /**
  * states in the Protocol
@@ -205,6 +211,16 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_MESH_TransmitHandle *transmit_handle;
 
+  /**
+   * The current act transmit handle (if a pending ack transmit request exists)
+   */
+  struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
+
+  /**
+   * Pointer to the current ack message using in ack_task
+   */
+  struct GNUNET_STREAM_AckMessage *ack_msg;
+
   /**
    * The current message associated with the transmit handle
    */
@@ -225,6 +241,11 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_STREAM_IOReadHandle *read_handle;
 
+  /**
+   * The shutdown handle associated with this socket
+   */
+  struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
   /**
    * Buffer for storing received messages
    */
@@ -239,7 +260,7 @@ struct GNUNET_STREAM_Socket
   /**
    * Task identifier for the read io timeout task
    */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
 
   /**
    * Task identifier for retransmission task after timeout
@@ -276,11 +297,6 @@ struct GNUNET_STREAM_Socket
    */
   GNUNET_PEER_Id other_peer;
 
-  /**
-   * Our Peer Identity (for debugging)
-   */
-  GNUNET_PEER_Id our_id;
-
   /**
    * The application port number (type: uint32_t)
    */
@@ -355,11 +371,6 @@ struct GNUNET_STREAM_ListenSocket
    */
   void *listen_cb_cls;
 
-  /**
-   * Our interned Peer's identity
-   */
-  GNUNET_PEER_Id our_id;
-
   /**
    * The service port
    * FIXME: Remove if not required!
@@ -373,6 +384,11 @@ struct GNUNET_STREAM_ListenSocket
  */
 struct GNUNET_STREAM_IOWriteHandle
 {
+  /**
+   * The socket to which this write handle is associated
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
   /**
    * The packet_buffers associated with this Handle
    */
@@ -398,13 +414,6 @@ struct GNUNET_STREAM_IOWriteHandle
    * Number of bytes in this write handle
    */
   size_t size;
-
-  /**
-   * Number of packets sent before waiting for an ack
-   *
-   * FIXME: Do we need this?
-   */
-  unsigned int sent_packets;
 };
 
 
@@ -425,13 +434,46 @@ struct GNUNET_STREAM_IOReadHandle
 };
 
 
+/**
+ * Handle for Shutdown
+ */
+struct GNUNET_STREAM_ShutdownHandle
+{
+  /**
+   * The socket associated with this shutdown handle
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
+   * Shutdown completion callback
+   */
+  GNUNET_STREAM_ShutdownCompletion completion_cb;
+
+  /**
+   * Closure for completion callback
+   */
+  void *completion_cls;
+
+  /**
+   * Close message retransmission task id
+   */
+  GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+  /**
+   * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+   */
+  int operation;  
+};
+
+
 /**
  * Default value in seconds for various timeouts
  */
 static unsigned int default_timeout = 10;
 
+
 /**
- * Callback function for sending hello message
+ * Callback function for sending queued message
  *
  * @param cls closure the socket
  * @param size number of bytes available in buf
@@ -523,8 +565,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
      && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Queueing message of type %d and size %d\n",
-              socket->our_id,
+              "Queueing message of type %d and size %d\n",
               ntohs (message->header.type),
               ntohs (message->header.size));
   GNUNET_assert (NULL != message);
@@ -588,7 +629,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
 static size_t
 send_ack_notify (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+  struct GNUNET_STREAM_Socket *socket = cls;
 
   if (0 == size)
     {
@@ -596,10 +637,14 @@ send_ack_notify (void *cls, size_t size, void *buf)
                   "%s called with size 0\n", __func__);
       return 0;
     }
-  GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
+  GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
+  
+  size = ntohs (socket->ack_msg->header.header.size);
+  memcpy (buf, socket->ack_msg, size);
   
-  size = ntohs (ack_msg->header.header.size);
-  memcpy (buf, ack_msg, size);
+  GNUNET_free (socket->ack_msg);
+  socket->ack_msg = NULL;
+  socket->ack_transmit_handle = NULL;
   return size;
 }
 
@@ -629,7 +674,7 @@ retransmission_timeout_task (void *cls,
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Retransmitting DATA...\n", socket->our_id);
+              "Retransmitting DATA...\n");
   socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
@@ -654,7 +699,7 @@ ack_task (void *cls,
       return;
     }
 
-  socket->ack_task_id = 0;
+  socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
 
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
@@ -666,18 +711,62 @@ ack_task (void *cls,
   ack_msg->receive_window_remaining = 
     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
 
+  socket->ack_msg = ack_msg;
   GNUNET_PEER_resolve (socket->other_peer, &target);
   /* Request MESH for sending ACK */
-  GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                     0, /* Corking */
-                                     1, /* Priority */
-                                     socket->retransmit_timeout,
-                                     &target,
-                                     ntohs (ack_msg->header.header.size),
-                                     &send_ack_notify,
-                                     ack_msg);
+  socket->ack_transmit_handle = 
+    GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                       0, /* Corking */
+                                       1, /* Priority */
+                                       socket->retransmit_timeout,
+                                       &target,
+                                       ntohs (ack_msg->header.header.size),
+                                       &send_ack_notify,
+                                       socket);
+}
 
-  
+
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+                               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+  struct GNUNET_STREAM_MessageHeader *msg;
+  struct GNUNET_STREAM_Socket *socket;
+
+  GNUNET_assert (NULL != shutdown_handle);
+  socket = shutdown_handle->socket;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  switch (shutdown_handle->operation)
+    {
+    case SHUT_RDWR:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+      break;
+    case SHUT_RD:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+      break;
+    case SHUT_WR:
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+      break;
+    default:
+      GNUNET_free (msg);
+      shutdown_handle->close_msg_retransmission_task_id = 
+        GNUNET_SCHEDULER_NO_TASK;
+      return;
+    }
+  queue_message (socket, msg, NULL, NULL);
+  shutdown_handle->close_msg_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &close_msg_retransmission_task,
+                                  shutdown_handle);
 }
 
 
@@ -717,23 +806,6 @@ ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
 }
 
 
-
-/**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
-                      struct GNUNET_STREAM_Socket *socket)
-{
-  struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
-  io_handle->sent_packets++;
-}
-
-
 /**
  * Writes data using the given socket. The amount of data written is limited by
  * the receiver_window_size
@@ -764,8 +836,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                                              packet))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Placing DATA message with sequence %u in send queue\n",
-                      socket->our_id,
+                      "Placing DATA message with sequence %u in send queue\n",
                       ntohl (io_handle->messages[packet]->sequence_number));
 
           copy_and_queue_message (socket,
@@ -777,25 +848,25 @@ write_data (struct GNUNET_STREAM_Socket *socket)
   packet = ack_packet + 1;
   /* Now send new packets if there is enough buffer space */
   while ( (NULL != io_handle->messages[packet]) &&
-         (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
+         (socket->receiver_window_available 
+           >= ntohs (io_handle->messages[packet]->header.header.size)) )
     {
       socket->receiver_window_available -= 
         ntohs (io_handle->messages[packet]->header.header.size);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Placing DATA message with sequence %u in send queue\n",
-                  socket->our_id,
+                  "Placing DATA message with sequence %u in send queue\n",
                   ntohl (io_handle->messages[packet]->sequence_number));
       copy_and_queue_message (socket,
                               &io_handle->messages[packet]->header,
-                              &write_data_finish_cb,
-                              io_handle);
+                              NULL,
+                              NULL);
       packet++;
     }
 
   if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
     socket->retransmission_timeout_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
-                                    (GNUNET_TIME_UNIT_SECONDS, 5),
+                                    (GNUNET_TIME_UNIT_SECONDS, 8),
                                     &retransmission_timeout_task,
                                     socket);
 }
@@ -809,7 +880,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
  */
 static void
 call_read_processor (void *cls,
-                          const struct GNUNET_SCHEDULER_TaskContext *tc)
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   size_t read_size;
@@ -822,6 +893,9 @@ call_read_processor (void *cls,
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
+  if (NULL == socket->receive_buffer) 
+    return;
+
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (NULL != socket->read_handle->proc);
 
@@ -841,25 +915,22 @@ call_read_processor (void *cls,
   GNUNET_assert (0 != valid_read_size);
 
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
 
   /* Call the data processor */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Calling read processor\n",
-              socket->our_id);
+              "Calling read processor\n");
   read_size = 
     socket->read_handle->proc (socket->read_handle->proc_cls,
                                socket->status,
                                socket->receive_buffer + socket->copy_offset,
                                valid_read_size);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Read processor read %d bytes\n",
-              socket->our_id,
+              "Read processor read %d bytes\n",
               read_size);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Read processor completed successfully\n",
-              socket->our_id);
+              "Read processor completed successfully\n");
 
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
@@ -870,19 +941,27 @@ call_read_processor (void *cls,
 
   /* Determine upto which packet we can remove from the buffer */
   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
-    if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
-      break;
+    {
+      if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+        { packet++; break; }
+      if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
+        break;
+    }
 
   /* If no packets can be removed we can't move the buffer */
   if (0 == packet) return;
 
   sequence_increase = packet;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sequence increase after read processor completion: %u\n",
+              sequence_increase);
 
   /* Shift the data in the receive buffer */
   memmove (socket->receive_buffer,
            socket->receive_buffer 
            + socket->receive_buffer_boundaries[sequence_increase-1],
-           socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
+           socket->receive_buffer_size
+           - socket->receive_buffer_boundaries[sequence_increase-1]);
   
   /* Shift the bitmap */
   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
@@ -927,12 +1006,11 @@ read_io_timeout (void *cls,
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%x: Read task timedout - Cancelling it\n",
-                socket->our_id);
+                "Read task timedout - Cancelling it\n");
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
@@ -984,8 +1062,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
   if (GNUNET_PEER_search (sender) != socket->other_peer)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received DATA from non-confirming peer\n",
-                  socket->our_id);
+                  "Received DATA from non-confirming peer\n");
       return GNUNET_YES;
     }
 
@@ -1002,8 +1079,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Ignoring received message with sequence number %u\n",
-                      socket->our_id,
+                      "Ignoring received message with sequence number %u\n",
                       ntohl (msg->sequence_number));
           /* Start ACK sending task if one is not already present */
           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
@@ -1022,9 +1098,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                               relative_sequence_number))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Ignoring already received message with sequence "
+                      "Ignoring already received message with sequence "
                       "number %u\n",
-                      socket->our_id,
                       ntohl (msg->sequence_number));
           /* Start ACK sending task if one is not already present */
           if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
@@ -1039,9 +1114,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
         }
 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Receiving DATA with sequence number: %u and size: %d "
-                  "from %x\n",
-                  socket->our_id,
+                  "Receiving DATA with sequence number: %u and size: %d from %x\n",
                   ntohl (msg->sequence_number),
                   ntohs (msg->header.header.size),
                   socket->other_peer);
@@ -1061,9 +1134,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
           else
             {
               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                          "%x: Cannot accommodate packet %d as buffer is",
-                          "full\n",
-                          socket->our_id,
+                          "Cannot accommodate packet %d as buffer is full\n",
                           ntohl (msg->sequence_number));
               return GNUNET_YES;
             }
@@ -1101,8 +1172,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                                  0)))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Scheduling read processor\n",
-                      socket->our_id);
+                      "Scheduling read processor\n");
 
           socket->read_task_id = 
             GNUNET_SCHEDULER_add_now (&call_read_processor,
@@ -1113,13 +1183,13 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
 
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received data message when it cannot be handled\n",
-                  socket->our_id);
+                  "Received data message when it cannot be handled\n");
       break;
     }
   return GNUNET_YES;
 }
 
+
 /**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
@@ -1163,8 +1233,7 @@ set_state_established (void *cls,
   struct GNUNET_PeerIdentity initiator_pid;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
-              "%x: Attaining ESTABLISHED state\n",
-              socket->our_id);
+              "Attaining ESTABLISHED state\n");
   socket->write_offset = 0;
   socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
@@ -1173,8 +1242,7 @@ set_state_established (void *cls,
     {
       GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Calling listen callback\n",
-                  socket->our_id);
+                  "Calling listen callback\n");
       if (GNUNET_SYSERR == 
           socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
                                       socket,
@@ -1203,12 +1271,108 @@ set_state_hello_wait (void *cls,
 {
   GNUNET_assert (STATE_INIT == socket->state);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
-              "%x: Attaining HELLO_WAIT state\n",
-              socket->our_id);
+              "Attaining HELLO_WAIT state\n");
   socket->state = STATE_HELLO_WAIT;
 }
 
 
+/**
+ * Callback to set state to CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_close_wait (void *cls,
+                      struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Attaing CLOSE_WAIT state\n");
+  socket->state = STATE_CLOSE_WAIT;
+  GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+  socket->receive_buffer = NULL;
+  socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+                              struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Attaing RECEIVE_CLOSE_WAIT state\n");
+  socket->state = STATE_RECEIVE_CLOSE_WAIT;
+  GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+  socket->receive_buffer = NULL;
+  socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+                               struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Attaining TRANSMIT_CLOSE_WAIT state\n");
+  socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
+/**
+ * Callback to set state to CLOSED
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_closed (void *cls,
+                  struct GNUNET_STREAM_Socket *socket)
+{
+  socket->state = STATE_CLOSED;
+}
+
+/**
+ * Returns a new HelloAckMessage. Also sets the write sequence number for the
+ * socket
+ *
+ * @param socket the socket for which this HelloAckMessage has to be generated
+ * @return the HelloAckMessage
+ */
+static struct GNUNET_STREAM_HelloAckMessage *
+generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+{
+  struct GNUNET_STREAM_HelloAckMessage *msg;
+
+  /* Get the random sequence number */
+  socket->write_sequence_number = 
+    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Generated write sequence number %u\n",
+              (unsigned int) socket->write_sequence_number);
+  
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+  msg->header.header.size = 
+    htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+  msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+  msg->sequence_number = htonl (socket->write_sequence_number);
+  msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+
+  return msg;
+}
+
+
 /**
  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
  *
@@ -1236,14 +1400,12 @@ client_handle_hello_ack (void *cls,
   if (GNUNET_PEER_search (sender) != socket->other_peer)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received HELLO_ACK from non-confirming peer\n",
-                  socket->our_id);
+                  "Received HELLO_ACK from non-confirming peer\n");
       return GNUNET_YES;
     }
   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Received HELLO_ACK from %x\n",
-              socket->our_id,
+              "Received HELLO_ACK from %x\n",
               socket->other_peer);
 
   GNUNET_assert (socket->tunnel == tunnel);
@@ -1252,26 +1414,11 @@ client_handle_hello_ack (void *cls,
   case STATE_HELLO_WAIT:
     socket->read_sequence_number = ntohl (ack_msg->sequence_number);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%x: Read sequence number %u\n",
-                socket->our_id,
+                "Read sequence number %u\n",
                 (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
-    /* Get the random sequence number */
-    socket->write_sequence_number = 
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Generated write sequence number %u\n",
-                  socket->our_id,
-                  (unsigned int) socket->write_sequence_number);
-    reply = 
-      GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
-    reply->header.header.size = 
-      htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
-    reply->header.header.type = 
-      htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
-    reply->sequence_number = htonl (socket->write_sequence_number);
-    reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-    queue_message (socket, 
+    reply = generate_hello_ack_msg (socket);
+    queue_message (socket,
                    &reply->header, 
                    &set_state_established, 
                    NULL);      
@@ -1283,8 +1430,7 @@ client_handle_hello_ack (void *cls,
   case STATE_INIT:
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "%x: Server %x sent HELLO_ACK when in state %d\n", 
-                socket->our_id,
+               "Server %x sent HELLO_ACK when in state %d\n", 
                 socket->other_peer,
                 socket->state);
     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
@@ -1314,7 +1460,7 @@ client_handle_reset (void *cls,
                      const struct GNUNET_MessageHeader *message,
                      const struct GNUNET_ATS_Information*atsi)
 {
-  struct GNUNET_STREAM_Socket *socket = cls;
+  // struct GNUNET_STREAM_Socket *socket = cls;
 
   return GNUNET_OK;
 }
@@ -1391,6 +1537,128 @@ client_handle_transmit_close (void *cls,
 }
 
 
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @param operation the close operation which is being ACK'ed
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
+                          struct GNUNET_MESH_Tunnel *tunnel,
+                          const struct GNUNET_PeerIdentity *sender,
+                          const struct GNUNET_STREAM_MessageHeader *message,
+                          const struct GNUNET_ATS_Information *atsi,
+                          int operation)
+{
+  struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+  shutdown_handle = socket->shutdown_handle;
+  if (NULL == shutdown_handle)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Received *CLOSE_ACK when shutdown handle is NULL\n");
+      return GNUNET_OK;
+    }
+
+  switch (operation)
+    {
+    case SHUT_RDWR:
+      switch (socket->state)
+        {
+        case STATE_CLOSE_WAIT:
+          if (SHUT_RDWR != shutdown_handle->operation)
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
+              return GNUNET_OK;
+            }
+
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received CLOSE_ACK from %x\n",
+                      socket->other_peer);
+          socket->state = STATE_CLOSED;
+          break;
+        default:
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received CLOSE_ACK when in it not expected\n");
+          return GNUNET_OK;
+        }
+      break;
+
+    case SHUT_RD:
+      switch (socket->state)
+        {
+        case STATE_RECEIVE_CLOSE_WAIT:
+          if (SHUT_RD != shutdown_handle->operation)
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
+              return GNUNET_OK;
+            }
+
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received RECEIVE_CLOSE_ACK from %x\n",
+                      socket->other_peer);
+          socket->state = STATE_RECEIVE_CLOSED;
+          break;
+        default:
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received RECEIVE_CLOSE_ACK when in it not expected\n");
+          return GNUNET_OK;
+        }
+
+      break;
+    case SHUT_WR:
+      switch (socket->state)
+        {
+        case STATE_TRANSMIT_CLOSE_WAIT:
+          if (SHUT_WR != shutdown_handle->operation)
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
+              return GNUNET_OK;
+            }
+
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received TRANSMIT_CLOSE_ACK from %x\n",
+                      socket->other_peer);
+          socket->state = STATE_TRANSMIT_CLOSED;
+          break;
+        default:
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
+          
+          return GNUNET_OK;
+        }
+      break;
+    default:
+      GNUNET_assert (0);
+    }
+
+  if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
+    shutdown_handle->completion_cb(shutdown_handle->completion_cls,
+                                   operation);
+  GNUNET_free (shutdown_handle); /* Free shutdown handle */
+  socket->shutdown_handle = NULL;
+  if (GNUNET_SCHEDULER_NO_TASK
+      != shutdown_handle->close_msg_retransmission_task_id)
+    {
+      GNUNET_SCHEDULER_cancel
+        (shutdown_handle->close_msg_retransmission_task_id);
+      shutdown_handle->close_msg_retransmission_task_id =
+        GNUNET_SCHEDULER_NO_TASK;
+    }
+  return GNUNET_OK;
+}
+
+
 /**
  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
  *
@@ -1413,6 +1681,65 @@ client_handle_transmit_close_ack (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *)
+                                   message,
+                                   atsi,
+                                   SHUT_WR);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_receive_close (struct GNUNET_STREAM_Socket *socket,
+                      struct GNUNET_MESH_Tunnel *tunnel,
+                      const struct GNUNET_PeerIdentity *sender,
+                      const struct GNUNET_STREAM_MessageHeader *message,
+                      const struct GNUNET_ATS_Information *atsi)
+{
+  struct GNUNET_STREAM_MessageHeader *receive_close_ack;
+
+  switch (socket->state)
+    {
+    case STATE_INIT:
+    case STATE_LISTEN:
+    case STATE_HELLO_WAIT:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+      return GNUNET_OK;
+    default:
+      break;
+    }
+  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received RECEIVE_CLOSE from %x\n",
+              socket->other_peer);
+  receive_close_ack =
+    GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  receive_close_ack->header.size =
+    htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  receive_close_ack->header.type =
+    htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
+  queue_message (socket,
+                 receive_close_ack,
+                 &set_state_closed,
+                 NULL);
+  
+  /* FIXME: Handle the case where write handle is present; the write operation
+              should be deemed as finised and the write continuation callback
+              has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
   return GNUNET_OK;
 }
 
@@ -1439,7 +1766,12 @@ client_handle_receive_close (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return GNUNET_OK;
+  return
+    handle_receive_close (socket,
+                          tunnel,
+                          sender,
+                          (const struct GNUNET_STREAM_MessageHeader *) message,
+                          atsi);
 }
 
 
@@ -1465,6 +1797,64 @@ client_handle_receive_close_ack (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *)
+                                   message,
+                                   atsi,
+                                   SHUT_RD);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_close (struct GNUNET_STREAM_Socket *socket,
+              struct GNUNET_MESH_Tunnel *tunnel,
+              const struct GNUNET_PeerIdentity *sender,
+              const struct GNUNET_STREAM_MessageHeader *message,
+              const struct GNUNET_ATS_Information*atsi)
+{
+  struct GNUNET_STREAM_MessageHeader *close_ack;
+
+  switch (socket->state)
+    {
+    case STATE_INIT:
+    case STATE_LISTEN:
+    case STATE_HELLO_WAIT:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+      return GNUNET_OK;
+    default:
+      break;
+    }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received CLOSE from %x\n",
+              socket->other_peer);
+  close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
+  queue_message (socket,
+                 close_ack,
+                 &set_state_closed,
+                 NULL);
+  if (socket->state == STATE_CLOSED)
+    return GNUNET_OK;
+
+  GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+  socket->receive_buffer = NULL;
+  socket->receive_buffer_size = 0;
   return GNUNET_OK;
 }
 
@@ -1491,7 +1881,11 @@ client_handle_close (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return GNUNET_OK;
+  return handle_close (socket,
+                       tunnel,
+                       sender,
+                       (const struct GNUNET_STREAM_MessageHeader *) message,
+                       atsi);
 }
 
 
@@ -1513,11 +1907,17 @@ client_handle_close_ack (void *cls,
                          void **tunnel_ctx,
                          const struct GNUNET_PeerIdentity *sender,
                          const struct GNUNET_MessageHeader *message,
-                         const struct GNUNET_ATS_Information*atsi)
+                         const struct GNUNET_ATS_Information *atsi)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return GNUNET_OK;
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *) 
+                                   message,
+                                   atsi,
+                                   SHUT_RDWR);
 }
 
 /*****************************/
@@ -1580,8 +1980,7 @@ server_handle_hello (void *cls,
   if (GNUNET_PEER_search (sender) != socket->other_peer)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received HELLO from non-confirming peer\n",
-                  socket->our_id);
+                  "Received HELLO from non-confirming peer\n");
       return GNUNET_YES;
     }
 
@@ -1589,27 +1988,12 @@ server_handle_hello (void *cls,
                  ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Received HELLO from %x\n", 
-              socket->our_id,
+              "Received HELLO from %x\n", 
               socket->other_peer);
 
   if (STATE_INIT == socket->state)
     {
-      /* Get the random sequence number */
-      socket->write_sequence_number = 
-        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Generated write sequence number %u\n",
-                  socket->our_id,
-                  (unsigned int) socket->write_sequence_number);
-      reply = 
-        GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
-      reply->header.header.size = 
-        htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
-      reply->header.header.type = 
-        htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
-      reply->sequence_number = htonl (socket->write_sequence_number);
-      reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+      reply = generate_hello_ack_msg (socket);
       queue_message (socket, 
                     &reply->header,
                      &set_state_hello_wait, 
@@ -1656,13 +2040,11 @@ server_handle_hello_ack (void *cls,
   if (STATE_HELLO_WAIT == socket->state)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received HELLO_ACK from %x\n",
-                  socket->our_id,
+                  "Received HELLO_ACK from %x\n",
                   socket->other_peer);
       socket->read_sequence_number = ntohl (ack_message->sequence_number);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Read sequence number %u\n",
-                  socket->our_id,
+                  "Read sequence number %u\n",
                   (unsigned int) socket->read_sequence_number);
       socket->receiver_window_available = 
         ntohl (ack_message->receiver_window_size);
@@ -1700,7 +2082,7 @@ server_handle_reset (void *cls,
                      const struct GNUNET_MessageHeader *message,
                      const struct GNUNET_ATS_Information*atsi)
 {
-  struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+  // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
 
   return GNUNET_OK;
 }
@@ -1758,7 +2140,13 @@ server_handle_transmit_close_ack (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
 
-  return GNUNET_OK;
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *)
+                                   message,
+                                   atsi,
+                                   SHUT_WR);
 }
 
 
@@ -1784,7 +2172,12 @@ server_handle_receive_close (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
 
-  return GNUNET_OK;
+  return
+    handle_receive_close (socket,
+                          tunnel,
+                          sender,
+                          (const struct GNUNET_STREAM_MessageHeader *) message,
+                          atsi);
 }
 
 
@@ -1810,14 +2203,21 @@ server_handle_receive_close_ack (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
 
-  return GNUNET_OK;
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *)
+                                   message,
+                                   atsi,
+                                   SHUT_RD);
 }
 
 
 /**
  * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
  *
- * @param cls the closure
+ * @param cls the listen socket (from GNUNET_MESH_connect in
+ *          GNUNET_STREAM_listen) 
  * @param tunnel connection to the other end
  * @param tunnel_ctx the socket
  * @param sender who sent the message
@@ -1835,8 +2235,12 @@ server_handle_close (void *cls,
                      const struct GNUNET_ATS_Information*atsi)
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
-
-  return GNUNET_OK;
+  
+  return handle_close (socket,
+                       tunnel,
+                       sender,
+                       (const struct GNUNET_STREAM_MessageHeader *) message,
+                       atsi);
 }
 
 
@@ -1862,12 +2266,18 @@ server_handle_close_ack (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
 
-  return GNUNET_OK;
+  return handle_generic_close_ack (socket,
+                                   tunnel,
+                                   sender,
+                                   (const struct GNUNET_STREAM_MessageHeader *) 
+                                   message,
+                                   atsi,
+                                   SHUT_RDWR);
 }
 
 
 /**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
  *
  * @param socket the socket through which the ack was received
  * @param tunnel connection to the other end
@@ -1886,42 +2296,44 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
 {
   unsigned int packet;
   int need_retransmission;
+  
 
   if (GNUNET_PEER_search (sender) != socket->other_peer)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received ACK from non-confirming peer\n",
-                  socket->our_id);
+                  "Received ACK from non-confirming peer\n");
       return GNUNET_YES;
     }
 
   switch (socket->state)
     {
     case (STATE_ESTABLISHED):
+    case (STATE_RECEIVE_CLOSED):
+    case (STATE_RECEIVE_CLOSE_WAIT):
       if (NULL == socket->write_handle)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Received DATA_ACK when write_handle is NULL\n",
-                      socket->our_id);
+                      "Received DATA_ACK when write_handle is NULL\n");
           return GNUNET_OK;
         }
       /* FIXME: increment in the base sequence number is breaking current flow
        */
       if (!((socket->write_sequence_number 
-             - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+             - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Received DATA_ACK with unexpected base sequence "
-                      "number\n",
-                      socket->our_id);
+                      "Received DATA_ACK with unexpected base sequence number\n");
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Current write sequence: %u; Ack's base sequence: %u\n",
+                      socket->write_sequence_number,
+                      ntohl (ack->base_sequence_number));
           return GNUNET_OK;
         }
       /* FIXME: include the case when write_handle is cancelled - ignore the 
          acks */
 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: Received DATA_ACK from %x\n",
-                  socket->our_id,
+                  "Received DATA_ACK from %x\n",
                   socket->other_peer);
       
       /* Cancel the retransmission task */
@@ -1932,9 +2344,27 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
             GNUNET_SCHEDULER_NO_TASK;
         }
 
-      /* FIXME: Bits in the ack_bitmap are only to be set; Once set they cannot
-         be unset */
-      socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
+      for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+        {
+          if (NULL == socket->write_handle->messages[packet]) break;
+          if (ntohl (ack->base_sequence_number)
+              >= ntohl (socket->write_handle->messages[packet]->sequence_number))
+            ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+                                  packet,
+                                  GNUNET_YES);
+          else
+            if (GNUNET_YES == 
+                ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
+                                      ntohl (socket->write_handle->messages[packet]->sequence_number)
+                                      - ntohl (ack->base_sequence_number)))
+              ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+                                    packet,
+                                    GNUNET_YES);
+        }
+
+      /* Update the receive window remaining
+       FIXME : Should update with the value from a data ack with greater
+       sequence number */
       socket->receiver_window_available = 
         ntohl (ack->receive_window_remaining);
 
@@ -1967,8 +2397,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
                socket->status,
                socket->write_handle->size);
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Write completion callback completed\n",
-                      socket->our_id);
+                      "Write completion callback completed\n");
           /* We are done with the write handle - Freeing it */
           GNUNET_free (socket->write_handle);
           socket->write_handle = NULL;
@@ -1982,7 +2411,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
 
 
 /**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
  *
  * @param cls the 'struct GNUNET_STREAM_Socket'
  * @param tunnel connection to the other end
@@ -2009,7 +2438,7 @@ client_handle_ack (void *cls,
 
 
 /**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
  *
  * @param cls the server's listen socket
  * @param tunnel connection to the other end
@@ -2114,15 +2543,12 @@ mesh_peer_connect_callback (void *cls,
   if (connected_peer != socket->other_peer)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%x: A peer which is not our target has connected",
-                  "to our tunnel\n",
-                  socket->our_id);
+                  "A peer which is not our target has connected to our tunnel\n");
       return;
     }
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Target peer %x connected\n", 
-              socket->our_id,
+              "Target peer %x connected\n", 
               connected_peer);
   
   /* Set state to INIT */
@@ -2156,7 +2582,12 @@ static void
 mesh_peer_disconnect_callback (void *cls,
                                const struct GNUNET_PeerIdentity *peer)
 {
-
+  struct GNUNET_STREAM_Socket *socket=cls;
+  
+  /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Other peer %x disconnected\n",
+              socket->other_peer);
 }
 
 
@@ -2187,12 +2618,9 @@ new_tunnel_notify (void *cls,
   socket->tunnel = tunnel;
   socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
-  socket->lsocket = lsocket;
-  socket->our_id = lsocket->our_id;
-  
+  socket->lsocket = lsocket; 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Peer %x initiated tunnel to us\n", 
-              socket->our_id,
+              "Peer %x initiated tunnel to us\n", 
               socket->other_peer);
   
   /* FIXME: Copy MESH handle from lsocket to socket */
@@ -2219,10 +2647,13 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-  
+
+  if (tunnel != socket->tunnel)
+    return;
+
+  GNUNET_break_op(0);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: Peer %x has terminated connection abruptly\n",
-              socket->our_id,
+              "Peer %x has terminated connection abruptly\n",
               socket->other_peer);
 
   socket->status = GNUNET_STREAM_SHUTDOWN;
@@ -2233,6 +2664,25 @@ tunnel_cleaner (void *cls,
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
       socket->transmit_handle = NULL;
     }
+  if (NULL != socket->ack_transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+      GNUNET_free (socket->ack_msg);
+      socket->ack_msg = NULL;
+      socket->ack_transmit_handle = NULL;
+    }
+  /* Stop Tasks using socket->tunnel */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+      socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+      socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -2264,8 +2714,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
                     ...)
 {
   struct GNUNET_STREAM_Socket *socket;
-  struct GNUNET_PeerIdentity own_peer_id;
   enum GNUNET_STREAM_Option option;
+  GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
   va_list vargs;                /* Variable arguments */
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2275,9 +2725,6 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   socket->other_peer = GNUNET_PEER_intern (target);
   socket->open_cb = open_cb;
   socket->open_cls = open_cb_cls;
-  GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
-  socket->our_id = GNUNET_PEER_intern (&own_peer_id);
-  
   /* Set defaults */
   socket->retransmit_timeout = 
     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
@@ -2301,9 +2748,9 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                       10,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
-                                      &tunnel_cleaner, /* FIXME: not required? */
+                                      NULL, /* No in-tunnel cleaner */
                                       client_message_handlers,
-                                      &app_port); /* We don't get inbound tunnels */
+                                      ports); /* We don't get inbound tunnels */
   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
     {
       GNUNET_free (socket);
@@ -2329,15 +2776,103 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
 
 
 /**
- * Shutdown the stream for reading or writing (man 2 shutdown).
+ * Shutdown the stream for reading or writing (similar to man 2 shutdown).
  *
  * @param socket the stream socket
- * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
+ * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
+ * @param completion_cb the callback that will be called upon successful
+ *          shutdown of given operation
+ * @param completion_cls the closure for the completion callback
+ * @return the shutdown handle
  */
-void
+struct GNUNET_STREAM_ShutdownHandle *
 GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
-                       int how)
+                       int operation,
+                        GNUNET_STREAM_ShutdownCompletion completion_cb,
+                        void *completion_cls)
+{
+  struct GNUNET_STREAM_ShutdownHandle *handle;
+  struct GNUNET_STREAM_MessageHeader *msg;
+  
+  GNUNET_assert (NULL == socket->shutdown_handle);
+
+  handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
+  handle->socket = socket;
+  handle->completion_cb = completion_cb;
+  handle->completion_cls = completion_cls;
+  socket->shutdown_handle = handle;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  switch (operation)
+    {
+    case SHUT_RD:
+      handle->operation = SHUT_RD;
+      if (NULL != socket->read_handle)
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Existing read handle should be cancelled before shutting"
+                    " down reading\n");
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+      queue_message (socket,
+                     msg,
+                     &set_state_receive_close_wait,
+                     NULL);
+      break;
+    case SHUT_WR:
+      handle->operation = SHUT_WR;
+      if (NULL != socket->write_handle)
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Existing write handle should be cancelled before shutting"
+                    " down writing\n");
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+      queue_message (socket,
+                     msg,
+                     &set_state_transmit_close_wait,
+                     NULL);
+      break;
+    case SHUT_RDWR:
+      handle->operation = SHUT_RDWR;
+      if (NULL != socket->write_handle)
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Existing write handle should be cancelled before shutting"
+                    " down writing\n");
+      if (NULL != socket->read_handle)
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "Existing read handle should be cancelled before shutting"
+                    " down reading\n");
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+      queue_message (socket,
+                     msg,
+                     &set_state_close_wait,
+                     NULL);
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "GNUNET_STREAM_shutdown called with invalid value for "
+                  "parameter operation -- Ignoring\n");
+      GNUNET_free (msg);
+      GNUNET_free (handle);
+      return NULL;
+    }
+  handle->close_msg_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &close_msg_retransmission_task,
+                                  handle);
+  return handle;
+}
+
+
+/**
+ * Cancels a pending shutdown
+ *
+ * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
+ */
+void
+GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
 {
+  if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
+    GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+  GNUNET_free (handle);
   return;
 }
 
@@ -2352,13 +2887,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
 {
   struct MessageQueue *head;
 
+  GNUNET_break (NULL == socket->read_handle);
+  GNUNET_break (NULL == socket->write_handle);
+
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
-  {
-    /* socket closed with read task pending!? */
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
+    {
+      /* socket closed with read task pending!? */
+      GNUNET_break (0);
+      GNUNET_SCHEDULER_cancel (socket->read_task_id);
+      socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+  
+  /* Terminate the ack'ing tasks if they are still present */
+  if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+      socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
 
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
@@ -2366,6 +2911,13 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
       socket->transmit_handle = NULL;
     }
+  if (NULL != socket->ack_transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+      GNUNET_free (socket->ack_msg);
+      socket->ack_msg = NULL;
+      socket->ack_transmit_handle = NULL;
+    }
 
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {
@@ -2418,21 +2970,19 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
 {
   /* FIXME: Add variable args for passing configration options? */
   struct GNUNET_STREAM_ListenSocket *lsocket;
-  struct GNUNET_PeerIdentity our_peer_id;
+  GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
 
   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
   lsocket->port = app_port;
   lsocket->listen_cb = listen_cb;
   lsocket->listen_cb_cls = listen_cb_cls;
-  GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
-  lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
   lsocket->mesh = GNUNET_MESH_connect (cfg,
                                        10, /* FIXME: QUEUE size as parameter? */
                                        lsocket, /* Closure */
                                        &new_tunnel_notify,
                                        &tunnel_cleaner,
                                        server_message_handlers,
-                                       &app_port);
+                                       ports);
   GNUNET_assert (NULL != lsocket->mesh);
   return lsocket;
 }
@@ -2455,15 +3005,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
 
 
 /**
- * Tries to write the given data to the stream
+ * Tries to write the given data to the stream. The maximum size of data that
+ * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
+ * violation, however only the said number of maximum bytes will be written.
  *
  * @param socket the socket representing a stream
  * @param data the data buffer from where the data is written into the stream
  * @param size the number of bytes to be written from the data buffer
  * @param timeout the timeout period
- * @param write_cont the function to call upon writing some bytes into the stream
+ * @param write_cont the function to call upon writing some bytes into the
+ *          stream 
  * @param write_cont_cls the closure
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if a previous write is pending or
+ *           the stream has been shutdown for this operation then write_cont is
+ *           immediately called and NULL is returned.
  */
 struct GNUNET_STREAM_IOWriteHandle *
 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
@@ -2491,20 +3048,38 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
     GNUNET_break (0);
     return NULL;
   }
-  if (!((STATE_ESTABLISHED == socket->state)
-        || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
-        || (STATE_RECEIVE_CLOSED == socket->state)))
+
+  switch (socket->state)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "%x: Attempting to write on a closed (OR) not-yet-established"
-                  "stream\n",
-                  socket->our_id);
+    case STATE_TRANSMIT_CLOSED:
+    case STATE_TRANSMIT_CLOSE_WAIT:
+    case STATE_CLOSED:
+    case STATE_CLOSE_WAIT:
+      if (NULL != write_cont)
+        write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s() END\n", __func__);
       return NULL;
-    } 
+    case STATE_INIT:
+    case STATE_LISTEN:
+    case STATE_HELLO_WAIT:
+      if (NULL != write_cont)
+        /* FIXME: GNUNET_STREAM_SYSERR?? */
+        write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s() END\n", __func__);
+      return NULL;
+    case STATE_ESTABLISHED:
+    case STATE_RECEIVE_CLOSED:
+    case STATE_RECEIVE_CLOSE_WAIT:
+      break;
+    }
+
   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+  io_handle->socket = socket;
   io_handle->write_cont = write_cont;
   io_handle->write_cont_cls = write_cont_cls;
   io_handle->size = size;
@@ -2556,14 +3131,18 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
 }
 
 
+
 /**
- * Tries to read data from the stream
+ * Tries to read data from the stream.
  *
  * @param socket the socket representing a stream
  * @param timeout the timeout period
  * @param proc function to call with data (once only)
  * @param proc_cls the closure for proc
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if the stream has been shutdown for
+ *           this type of opeartion then the DataProcessor is immediately
+ *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
  */
 struct GNUNET_STREAM_IOReadHandle *
 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -2574,8 +3153,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   struct GNUNET_STREAM_IOReadHandle *read_handle;
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: %s()\n", 
-              socket->our_id,
+              "%s()\n", 
               __func__);
 
   /* Return NULL if there is already a read handle; the user has to cancel that
@@ -2584,6 +3162,21 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
 
   GNUNET_assert (NULL != proc);
 
+  switch (socket->state)
+    {
+    case STATE_RECEIVE_CLOSED:
+    case STATE_RECEIVE_CLOSE_WAIT:
+    case STATE_CLOSED:
+    case STATE_CLOSE_WAIT:
+      proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s() END\n",
+                  __func__);
+      return NULL;
+    default:
+      break;
+    }
+
   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
   read_handle->proc = proc;
   read_handle->proc_cls = proc_cls;
@@ -2599,12 +3192,12 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
     }
   
   /* Setup the read timeout task */
-  socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
-                                                               &read_io_timeout,
-                                                               socket);
+  socket->read_io_timeout_task_id =
+    GNUNET_SCHEDULER_add_delayed (timeout,
+                                  &read_io_timeout,
+                                  socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%x: %s() END\n",
-              socket->our_id,
+              "%s() END\n",
               __func__);
   return read_handle;
 }
@@ -2618,7 +3211,26 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
 void
 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
 {
-  /* FIXME: Should cancel the write retransmission task */
+  struct GNUNET_STREAM_Socket *socket = ioh->socket;
+  unsigned int packet;
+
+  GNUNET_assert (NULL != socket->write_handle);
+  GNUNET_assert (socket->write_handle == ioh);
+
+  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+      socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+
+  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+    {
+      if (NULL == ioh->messages[packet]) break;
+      GNUNET_free (ioh->messages[packet]);
+    }
+      
+  GNUNET_free (socket->write_handle);
+  socket->write_handle = NULL;
   return;
 }