- dist fix
[oweals/gnunet.git] / src / stream / stream_api.c
index d09182d2b50cde986eaee6edd2727a7d823eb3e5..1aec60285a8867e5bbdaf4b003f77ce15c4e9f12 100644 (file)
@@ -260,11 +260,6 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_PeerIdentity other_peer;
 
-  /**
-   * Task identifier for the read io timeout task
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
-
   /**
    * Task identifier for retransmission task after timeout
    */
@@ -280,11 +275,6 @@ struct GNUNET_STREAM_Socket
    */
   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
 
-  /**
-   * Task scheduled to continue a read operation.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
   /**
    * The state of the protocol associated with this socket
    */
@@ -315,12 +305,6 @@ struct GNUNET_STREAM_Socket
    */
   uint32_t testing_set_write_sequence_number_value;
 
-  /**
-   * The session id associated with this stream connection
-   * FIXME: Not used currently, may be removed
-   */
-  uint32_t session_id;
-
   /**
    * Write sequence number. Set to random when sending HELLO(client) and
    * HELLO_ACK(server) 
@@ -482,6 +466,14 @@ struct GNUNET_STREAM_IOWriteHandle
    * Number of bytes in this write handle
    */
   size_t size;
+
+  /**
+   * Number of packets already transmitted from this IO handle. Retransmitted
+   * packets are not taken into account here. This is used to determine which
+   * packets account for retransmission and which packets occupy buffer space at
+   * the receiver.
+   */
+  unsigned int packets_sent;
 };
 
 
@@ -504,6 +496,16 @@ struct GNUNET_STREAM_IOReadHandle
    * The closure pointer for the read processor callback
    */
   void *proc_cls;
+
+  /**
+   * Task identifier for the read io timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+  /**
+   * Task scheduled to continue a read operation.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
 };
 
 
@@ -667,13 +669,13 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
   {
     socket->retries = 0;
     socket->transmit_handle = 
-      GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                        GNUNET_NO, /* Corking */
-                                        socket->retransmit_timeout,
-                                        &socket->other_peer,
-                                        ntohs (message->header.size),
-                                        &send_message_notify,
-                                        socket);
+       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                          GNUNET_NO, /* Corking */
+                                          socket->retransmit_timeout,
+                                          &socket->other_peer,
+                                          ntohs (message->header.size),
+                                          &send_message_notify,
+                                          socket);
   }
 }
 
@@ -726,11 +728,11 @@ data_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
     return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -748,11 +750,9 @@ ack_task (void *cls,
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_AckMessage *ack_msg;
 
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
-  {
-    return;
-  }
   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
   ack_msg->header.header.size = htons (sizeof (struct 
@@ -781,9 +781,11 @@ close_msg_retransmission_task (void *cls,
   struct GNUNET_STREAM_MessageHeader *msg;
   struct GNUNET_STREAM_Socket *socket;
 
+  shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL != shutdown_handle);
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   socket = shutdown_handle->socket;
-
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   switch (shutdown_handle->operation)
@@ -857,41 +859,28 @@ static void
 write_data (struct GNUNET_STREAM_Socket *socket)
 {
   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
-  int packet;                   /* Although an int, should never be negative */
-  int ack_packet;
-
-  ack_packet = -1;
-  /* Find the last acknowledged packet */
-  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
-  {      
-    if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                            packet))
-      ack_packet = packet;        
-    else if (NULL == io_handle->messages[packet])
-      break;
-  }
-  /* Resend packets which weren't ack'ed */
-  for (packet=0; packet < ack_packet; packet++)
+  unsigned int packet;
+  
+  for (packet=0; packet < io_handle->packets_sent; packet++)
   {
     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                           packet))
+                                          packet))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Placing DATA message with sequence %u in send queue\n",
-           GNUNET_i2s (&socket->other_peer),
-           ntohl (io_handle->messages[packet]->sequence_number));
+          "%s: Retransmitting DATA message with sequence %u\n",
+          GNUNET_i2s (&socket->other_peer),
+          ntohl (io_handle->messages[packet]->sequence_number));
       copy_and_queue_message (socket,
-                              &io_handle->messages[packet]->header,
-                              NULL,
-                              NULL);
+                             &io_handle->messages[packet]->header,
+                             NULL,
+                             NULL);
     }
   }
-  packet = ack_packet + 1;
   /* Now send new packets if there is enough buffer space */
-  while ( (NULL != io_handle->messages[packet]) &&
-         (socket->receiver_window_available 
-           >= ntohs (io_handle->messages[packet]->header.header.size)) &&
-          (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+  while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
+         (NULL != io_handle->messages[packet]) &&
+         (socket->receiver_window_available 
+          >= ntohs (io_handle->messages[packet]->header.header.size)))
   {
     socket->receiver_window_available -= 
       ntohs (io_handle->messages[packet]->header.header.size);
@@ -905,6 +894,8 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                             NULL);
     packet++;
   }
+  io_handle->packets_sent = packet;
+  // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
     socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
@@ -925,22 +916,22 @@ call_read_processor (void *cls,
                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   size_t read_size;
   size_t valid_read_size;
   unsigned int packet;
   uint32_t sequence_increase;
   uint32_t offset_increase;
 
-  socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   if (NULL == socket->receive_buffer) 
     return;
-
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (NULL != socket->read_handle->proc);
-
   /* Check the bitmap for any holes */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
@@ -954,22 +945,19 @@ call_read_processor (void *cls,
     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
   GNUNET_assert (0 != valid_read_size);
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   /* Call the data processor */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Calling read processor\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
        GNUNET_i2s (&socket->other_peer));
   read_size = 
-    socket->read_handle->proc (socket->read_handle->proc_cls,
-                               socket->status,
-                               socket->receive_buffer + socket->copy_offset,
-                               valid_read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor read %d bytes\n",
+      socket->read_handle->proc (socket->read_handle->proc_cls,
+                                socket->status,
+                                socket->receive_buffer + socket->copy_offset,
+                                valid_read_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
        GNUNET_i2s (&socket->other_peer), read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor completed successfully\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
        GNUNET_i2s (&socket->other_peer));
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
@@ -980,18 +968,20 @@ call_read_processor (void *cls,
   for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
     if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
-    { packet++; break; }
+    { 
+      packet++; 
+      break;
+    }
     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
       break;
   }
-
   /* If no packets can be removed we can't move the buffer */
-  if (0 == packet) return;
+  if (0 == packet) 
+    return;
   sequence_increase = packet;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Sequence increase after read processor completion: %u\n",
        GNUNET_i2s (&socket->other_peer), sequence_increase);
-
   /* Shift the data in the receive buffer */
   socket->receive_buffer = 
     memmove (socket->receive_buffer,
@@ -1012,7 +1002,7 @@ call_read_processor (void *cls,
   /* Fix relative boundaries */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
-    if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+    if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
     {
       uint32_t ahead_buffer_boundary;
 
@@ -1044,22 +1034,26 @@ read_io_timeout (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
+  if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Read task timedout - Cancelling it\n",
          GNUNET_i2s (&socket->other_peer));
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+    read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  GNUNET_assert (NULL != socket->read_handle);
-  proc = socket->read_handle->proc;
-  proc_cls = socket->read_handle->proc_cls;
-  GNUNET_free (socket->read_handle);
+  proc = read_handle->proc;
+  proc_cls = read_handle->proc_cls;
+  GNUNET_free (read_handle);
   socket->read_handle = NULL;
   /* Call the read processor to signal timeout */
   proc (proc_cls,
@@ -1088,6 +1082,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
              const struct GNUNET_ATS_Information*atsi)
 {
   const void *payload;
+  struct GNUNET_TIME_Relative ack_deadline_rel;
   uint32_t bytes_needed;
   uint32_t relative_offset;
   uint32_t relative_sequence_number;
@@ -1099,28 +1094,24 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA from non-confirming peer\n",
-         GNUNET_i2s (&socket->other_peer));
+        "%s: Received DATA from non-confirming peer\n",
+        GNUNET_i2s (&socket->other_peer));
     return GNUNET_YES;
   }
-
   switch (socket->state)
   {
   case STATE_ESTABLISHED:
   case STATE_TRANSMIT_CLOSED:
-  case STATE_TRANSMIT_CLOSE_WAIT:
-      
+  case STATE_TRANSMIT_CLOSE_WAIT:      
     /* check if the message's sequence number is in the range we are
        expecting */
     relative_sequence_number = 
       ntohl (msg->sequence_number) - socket->read_sequence_number;
-    if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+    if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Ignoring received message with sequence number %u\n",
@@ -1136,8 +1127,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                                         socket);
       }
       return GNUNET_YES;
-    }
-      
+    }      
     /* Check if we have already seen this message */
     if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                             relative_sequence_number))
@@ -1151,20 +1141,14 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       {
         socket->ack_task_id = 
           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                        (msg->ack_deadline),
-                                        &ack_task,
-                                        socket);
+                                        (msg->ack_deadline), &ack_task, socket);
       }
       return GNUNET_YES;
     }
-
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         ntohl (msg->sequence_number),
-         ntohs (msg->header.header.size),
-         GNUNET_i2s (&socket->other_peer));
-      
+         GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+         ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
     /* Check if we have to allocate the buffer */
     size -= sizeof (struct GNUNET_STREAM_DataMessage);
     relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1181,54 +1165,67 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Cannot accommodate packet %d as buffer is full\n",
-             GNUNET_i2s (&socket->other_peer),
-             ntohl (msg->sequence_number));
+             GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
         return GNUNET_YES;
       }
     }
-      
     /* Copy Data to buffer */
     payload = &msg[1];
     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
-    memcpy (socket->receive_buffer + relative_offset,
-            payload,
-            size);
+    memcpy (socket->receive_buffer + relative_offset, payload, size);
     socket->receive_buffer_boundaries[relative_sequence_number] = 
-      relative_offset + size;
-      
+       relative_offset + size;
     /* Modify the ACK bitmap */
-    ackbitmap_modify_bit (&socket->ack_bitmap,
-                          relative_sequence_number,
-                          GNUNET_YES);
-
+    ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+                         GNUNET_YES);
     /* Start ACK sending task if one is not already present */
+    ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
     if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
     {
+      ack_deadline_rel = 
+         GNUNET_TIME_relative_min (ack_deadline_rel,
+                                   GNUNET_TIME_relative_multiply
+                                   (GNUNET_TIME_UNIT_SECONDS, 300));
       socket->ack_task_id = 
-        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
-                                      (msg->ack_deadline),
-                                      &ack_task,
-                                      socket);
+         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh 
+                                       (msg->ack_deadline), &ack_task, socket);
+      socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+      socket->ack_time_deadline = ack_deadline_rel;
+    }
+    else
+    {
+      struct GNUNET_TIME_Relative ack_time_past;
+      struct GNUNET_TIME_Relative ack_time_remaining;
+      struct GNUNET_TIME_Relative ack_time_min;
+      ack_time_past = 
+         GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+      ack_time_remaining = GNUNET_TIME_relative_subtract
+         (socket->ack_time_deadline, ack_time_past);
+      ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+                                              ack_deadline_rel);
+      if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+                     sizeof (struct GNUNET_TIME_Relative)))
+      {
+       ack_deadline_rel = ack_time_min;
+       GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+       socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+                                                           &ack_task, socket);
+       socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+       socket->ack_time_deadline = ack_deadline_rel;
+      }
     }
-
     if ((NULL != socket->read_handle) /* A read handle is waiting */
         /* There is no current read task */
-        && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+        && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
         /* We have the first packet */
-        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
-                                               0)))
+        && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Scheduling read processor\n",
-           GNUNET_i2s (&socket->other_peer));
-          
-      socket->read_task_id = 
-        GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                  socket);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+          GNUNET_i2s (&socket->other_peer));
+      socket->read_handle->read_task_id =
+         GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
     }
-      
     break;
-
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received data message when it cannot be handled\n",
@@ -1261,11 +1258,8 @@ client_handle_data (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
 
-  return handle_data (socket, 
-                      tunnel, 
-                      sender, 
-                      (const struct GNUNET_STREAM_DataMessage *) message, 
-                      atsi);
+  return handle_data (socket, tunnel, sender, 
+                      (const struct GNUNET_STREAM_DataMessage *) message, atsi);
 }
 
 
@@ -1466,9 +1460,9 @@ control_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
     
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
     return;
-  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   LOG_DEBUG ("%s: Retransmitting a control message\n",
                  GNUNET_i2s (&socket->other_peer));
   switch (socket->state)
@@ -1499,6 +1493,7 @@ control_retransmission_task (void *cls,
                      GNUNET_NO);
     else
       GNUNET_break (0);
+    break;
   default:
     GNUNET_break (0);
   }  
@@ -1529,9 +1524,8 @@ client_handle_hello_ack (void *cls,
   const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
-  if (0 != memcmp (sender,
-                   &socket->other_peer,
-                   sizeof (struct GNUNET_PeerIdentity)))
+  if (0 != memcmp (sender, &socket->other_peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1539,11 +1533,8 @@ client_handle_hello_ack (void *cls,
     return GNUNET_YES;
   }
   ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Received HELLO_ACK from %s\n",
-       GNUNET_i2s (&socket->other_peer),
-       GNUNET_i2s (&socket->other_peer));
-
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+       GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
   GNUNET_assert (socket->tunnel == tunnel);
   switch (socket->state)
   {
@@ -1555,7 +1546,7 @@ client_handle_hello_ack (void *cls,
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
     reply = generate_hello_ack (socket, GNUNET_YES);
-    queue_message (socket, &reply->header, &set_state_established, 
+    queue_message (socket, &reply->header, &set_state_established,
                    NULL, GNUNET_NO);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
@@ -1568,8 +1559,7 @@ client_handle_hello_ack (void *cls,
   default:
     LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n", 
                GNUNET_i2s (&socket->other_peer),
-               GNUNET_i2s (&socket->other_peer),
-               socket->state);
+              GNUNET_i2s (&socket->other_peer), socket->state);
     socket->state = STATE_CLOSED; // introduce STATE_ERROR?
     return GNUNET_SYSERR;
   }
@@ -1626,7 +1616,6 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
   {
   case STATE_ESTABLISHED:
     socket->state = STATE_RECEIVE_CLOSED;
-
     /* Send TRANSMIT_CLOSE_ACK */
     reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
     reply->header.type = 
@@ -1634,7 +1623,6 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
     queue_message (socket, reply, NULL, NULL, GNUNET_NO);
     break;
-
   default:
     /* FIXME: Call statistics? */
     break;
@@ -1703,7 +1691,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
          GNUNET_i2s (&socket->other_peer));
     return GNUNET_OK;
   }
-
   switch (operation)
   {
   case SHUT_RDWR:
@@ -1714,15 +1701,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received CLOSE_ACK when shutdown handle is not for "
-             "SHUT_RDWR\n",
-             GNUNET_i2s (&socket->other_peer));
+             "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_CLOSED;
       break;
     default:
@@ -1732,7 +1715,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       return GNUNET_OK;
     }
     break;
-
   case SHUT_RD:
     switch (socket->state)
     {
@@ -1741,15 +1723,11 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
-             "is not for SHUT_RD\n",
-             GNUNET_i2s (&socket->other_peer));
+             "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received RECEIVE_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_RECEIVE_CLOSED;
       break;
     default:
@@ -1758,7 +1736,6 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-
     break;
   case SHUT_WR:
     switch (socket->state)
@@ -1772,25 +1749,20 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
              GNUNET_i2s (&socket->other_peer));
         return GNUNET_OK;
       }
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
-           GNUNET_i2s (&socket->other_peer),
-           GNUNET_i2s (&socket->other_peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+           GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
       socket->state = STATE_TRANSMIT_CLOSED;
       break;
     default:
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
-           GNUNET_i2s (&socket->other_peer));
-          
+           GNUNET_i2s (&socket->other_peer));          
       return GNUNET_OK;
     }
     break;
   default:
     GNUNET_assert (0);
   }
-
   if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
     shutdown_handle->completion_cb(shutdown_handle->completion_cls,
                                    operation);
@@ -1800,7 +1772,7 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
     GNUNET_SCHEDULER_cancel
       (shutdown_handle->close_msg_retransmission_task_id);
     shutdown_handle->close_msg_retransmission_task_id =
-      GNUNET_SCHEDULER_NO_TASK;
+       GNUNET_SCHEDULER_NO_TASK;
   }
   GNUNET_free (shutdown_handle); /* Free shutdown handle */
   socket->shutdown_handle = NULL;
@@ -2136,7 +2108,7 @@ server_handle_hello (void *cls,
   GNUNET_assert (socket->tunnel == tunnel);
   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
              GNUNET_i2s (&socket->other_peer));
-  switch (socket->status)
+  switch (socket->state)
   {
   case STATE_INIT:
     reply = generate_hello_ack (socket, GNUNET_YES);
@@ -2473,8 +2445,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-    if (!((socket->write_sequence_number 
-           - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+    sequence_difference = 
+       socket->write_sequence_number - ntohl (ack->base_sequence_number);
+    if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received DATA_ACK with unexpected base sequence number\n",
@@ -2488,17 +2461,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     }
     /* FIXME: include the case when write_handle is cancelled - ignore the 
        acks */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA_ACK from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         GNUNET_i2s (&socket->other_peer));
-      
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+         GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
     /* Cancel the retransmission task */
     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
     {
       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
-      socket->data_retransmission_task_id = 
-        GNUNET_SCHEDULER_NO_TASK;
+      socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
     }
     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
     {
@@ -2506,22 +2475,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
       /* BS: Base sequence from ack; PS: sequence num of current packet */
       sequence_difference = ntohl (ack->base_sequence_number)
         - ntohl (socket->write_handle->messages[packet]->sequence_number);
+      if ((0 == sequence_difference) ||
+         (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
+       continue; /* The message in our handle is not yet received */
       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
-      if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-          || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-              && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-        continue;
-      }
-      if (GNUNET_YES == 
-          ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
-                                -sequence_difference))/*inversion as PS >= BS */
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-      }
+      /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+      ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+                           packet, GNUNET_YES);
     }
     /* Update the receive window remaining
        FIXME : Should update with the value from a data ack with greater
@@ -2776,7 +2736,6 @@ new_tunnel_notify (void *cls,
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *initiator;
   socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
   socket->lsocket = lsocket;
   socket->retransmit_timeout = lsocket->retransmit_timeout;
@@ -2788,7 +2747,6 @@ new_tunnel_notify (void *cls,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  /* FIXME: Copy MESH handle from lsocket to socket */
   return socket;
 }
 
@@ -2811,18 +2769,15 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  struct MessageQueue *head;
 
-  if (tunnel != socket->tunnel)
-    return;
-
+  GNUNET_assert (tunnel == socket->tunnel);
   GNUNET_break_op(0);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s has terminated connection abruptly\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-
   socket->status = GNUNET_STREAM_SHUTDOWN;
-
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
@@ -2839,8 +2794,27 @@ tunnel_cleaner (void *cls,
   {
     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }  
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+  {
+    GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+    socket->transmit_handle = NULL;
+  }
+  /* Clear existing message queue */
+  while (NULL != (head = socket->queue_head)) {
+    GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+                                socket->queue_tail,
+                                head);
+    GNUNET_free (head->message);
+    GNUNET_free (head);
   }
-  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -2862,9 +2836,8 @@ lockmanager_acquire_timeout (void *cls,
   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
   listen_cb = lsocket->listen_cb;
   listen_cb_cls = lsocket->listen_cb_cls;
-  GNUNET_STREAM_listen_close (lsocket);
   if (NULL != listen_cb)
-    listen_cb (listen_cb_cls, NULL, NULL);  
+    listen_cb (listen_cb_cls, NULL, NULL);
 }
 
 
@@ -3124,6 +3097,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Closing STREAM socket when a read handle is pending\n");
+    GNUNET_STREAM_io_read_cancel (socket->read_handle);
   }
   if (NULL != socket->write_handle)
   {
@@ -3132,14 +3106,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
-  {
-    /* socket closed with read task pending!? */
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }  
-  /* Terminate the ack'ing tasks if they are still present */
+  /* Terminate the ack'ing task if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
@@ -3164,27 +3131,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_free (head->message);
     GNUNET_free (head);
   }
-
   /* Close associated tunnel */
   if (NULL != socket->tunnel)
   {
     GNUNET_MESH_tunnel_destroy (socket->tunnel);
     socket->tunnel = NULL;
   }
-
   /* Close mesh connection */
   if (NULL != socket->mesh && NULL == socket->lsocket)
   {
     GNUNET_MESH_disconnect (socket->mesh);
     socket->mesh = NULL;
-  }
-  
+  }  
   /* Release receive buffer */
   if (NULL != socket->receive_buffer)
   {
     GNUNET_free (socket->receive_buffer);
   }
-
   GNUNET_free (socket);
 }
 
@@ -3338,14 +3301,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
-
-  /* Return NULL if there is already a write request pending */
   if (NULL != socket->write_handle)
   {
     GNUNET_break (0);
     return NULL;
   }
-
   switch (socket->state)
   {
   case STATE_TRANSMIT_CLOSED:
@@ -3371,7 +3331,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   case STATE_RECEIVE_CLOSE_WAIT:
     break;
   }
-
   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * socket->max_payload_size;
   num_needed_packets =
@@ -3381,6 +3340,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   io_handle->write_cont = write_cont;
   io_handle->write_cont_cls = write_cont_cls;
   io_handle->size = size;
+  io_handle->packets_sent = 0;
   sweep = data;
   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
      determined from RTT */
@@ -3408,25 +3368,24 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
     io_handle->messages[packet]->sequence_number =
       htonl (socket->write_sequence_number++);
     io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
     /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
        determined from RTT */
     io_handle->messages[packet]->ack_deadline =
       GNUNET_TIME_relative_hton (ack_deadline);
     data_msg = io_handle->messages[packet];
     /* Copy data from given buffer to the packet */
-    memcpy (&data_msg[1],
-            sweep,
-            payload_size);
+    memcpy (&data_msg[1], sweep, payload_size);
     sweep += payload_size;
     socket->write_offset += payload_size;
   }
+  /* ack the last data message. FIXME: remove when we figure out how to do this
+     using RTT */
+  io_handle->messages[num_needed_packets - 1]->ack_deadline = 
+      GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
   socket->write_handle = io_handle;
   write_data (socket);
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s() END\n", __func__);
-
   return io_handle;
 }
 
@@ -3439,9 +3398,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
  * @param proc function to call with data (once only)
  * @param proc_cls the closure for proc
  *
- * @return handle to cancel the operation; if the stream has been shutdown for
- *           this type of opeartion then the DataProcessor is immediately
- *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ *           been shutdown for this type of opeartion (the DataProcessor is
+ *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ *           read handle is present (only one read handle per socket is present
+ *           at any time)
  */
 struct GNUNET_STREAM_IOReadHandle *
 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -3480,22 +3441,14 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   read_handle->proc_cls = proc_cls;
   read_handle->socket = socket;
   socket->read_handle = read_handle;
-  /* Check if we have a packet at bitmap 0 */
   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                           0))
-  {
-    socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                     socket);   
-  }
-  /* Setup the read timeout task */
-  socket->read_io_timeout_task_id =
-    GNUNET_SCHEDULER_add_delayed (timeout,
-                                  &read_io_timeout,
-                                  socket);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: %s() END\n",
-       GNUNET_i2s (&socket->other_peer),
-       __func__);
+    read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+                                                         socket);   
+  read_handle->read_io_timeout_task_id =
+      GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+       GNUNET_i2s (&socket->other_peer), __func__);
   return read_handle;
 }
 
@@ -3545,16 +3498,13 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (ioh == socket->read_handle);
   /* Read io time task should be there; if it is already executed then this
-  read handle is not valid */
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read handle is not valid; However upon scheduler shutdown the read io task
+  may be executed before */
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
   /* reading task may be present; if so we have to stop it */
-  if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
-  {
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_task_id);
   GNUNET_free (ioh);
   socket->read_handle = NULL;
 }