regex profiler cleanup
[oweals/gnunet.git] / src / stream / stream_api.c
index 0ef6ef85f6e3c0a5aec925590fc0d136117e90fd..3abea2a49f41afb0e1c7234ad4ff287a130d718a 100644 (file)
@@ -260,11 +260,6 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_PeerIdentity other_peer;
 
-  /**
-   * Task identifier for the read io timeout task
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
-
   /**
    * Task identifier for retransmission task after timeout
    */
@@ -280,11 +275,6 @@ struct GNUNET_STREAM_Socket
    */
   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
 
-  /**
-   * Task scheduled to continue a read operation.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
   /**
    * The state of the protocol associated with this socket
    */
@@ -315,12 +305,6 @@ struct GNUNET_STREAM_Socket
    */
   uint32_t testing_set_write_sequence_number_value;
 
-  /**
-   * The session id associated with this stream connection
-   * FIXME: Not used currently, may be removed
-   */
-  uint32_t session_id;
-
   /**
    * Write sequence number. Set to random when sending HELLO(client) and
    * HELLO_ACK(server) 
@@ -482,6 +466,14 @@ struct GNUNET_STREAM_IOWriteHandle
    * Number of bytes in this write handle
    */
   size_t size;
+
+  /**
+   * Number of packets already transmitted from this IO handle. Retransmitted
+   * packets are not taken into account here. This is used to determine which
+   * packets account for retransmission and which packets occupy buffer space at
+   * the receiver.
+   */
+  unsigned int packets_sent;
 };
 
 
@@ -504,6 +496,16 @@ struct GNUNET_STREAM_IOReadHandle
    * The closure pointer for the read processor callback
    */
   void *proc_cls;
+
+  /**
+   * Task identifier for the read io timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+  /**
+   * Task scheduled to continue a read operation.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
 };
 
 
@@ -667,13 +669,13 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
   {
     socket->retries = 0;
     socket->transmit_handle = 
-      GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                        GNUNET_NO, /* Corking */
-                                        socket->retransmit_timeout,
-                                        &socket->other_peer,
-                                        ntohs (message->header.size),
-                                        &send_message_notify,
-                                        socket);
+       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                          GNUNET_NO, /* Corking */
+                                          socket->retransmit_timeout,
+                                          &socket->other_peer,
+                                          ntohs (message->header.size),
+                                          &send_message_notify,
+                                          socket);
   }
 }
 
@@ -726,11 +728,11 @@ data_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
     return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -748,11 +750,9 @@ ack_task (void *cls,
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_AckMessage *ack_msg;
 
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
-  {
-    return;
-  }
   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
   ack_msg->header.header.size = htons (sizeof (struct 
@@ -781,9 +781,11 @@ close_msg_retransmission_task (void *cls,
   struct GNUNET_STREAM_MessageHeader *msg;
   struct GNUNET_STREAM_Socket *socket;
 
+  shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL != shutdown_handle);
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   socket = shutdown_handle->socket;
-
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   switch (shutdown_handle->operation)
@@ -857,36 +859,23 @@ static void
 write_data (struct GNUNET_STREAM_Socket *socket)
 {
   struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
-  int packet;                   /* Although an int, should never be negative */
-  int ack_packet;
-
-  ack_packet = -1;
-  /* Find the last acknowledged packet */
-  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
-  {      
-    if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                            packet))
-      ack_packet = packet;        
-    else if (NULL == io_handle->messages[packet])
-      break;
-  }
-  /* Resend packets which weren't ack'ed */
-  for (packet=0; packet < ack_packet; packet++)
+  unsigned int packet;
+  
+  for (packet=0; packet < io_handle->packets_sent; packet++)
   {
     if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
-                                           packet))
+                                          packet))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "%s: Placing DATA message with sequence %u in send queue\n",
-           GNUNET_i2s (&socket->other_peer),
-           ntohl (io_handle->messages[packet]->sequence_number));
+          "%s: Retransmitting DATA message with sequence %u\n",
+          GNUNET_i2s (&socket->other_peer),
+          ntohl (io_handle->messages[packet]->sequence_number));
       copy_and_queue_message (socket,
-                              &io_handle->messages[packet]->header,
-                              NULL,
-                              NULL);
+                             &io_handle->messages[packet]->header,
+                             NULL,
+                             NULL);
     }
   }
-  packet = ack_packet + 1;
   /* Now send new packets if there is enough buffer space */
   while ( (NULL != io_handle->messages[packet]) &&
          (socket->receiver_window_available 
@@ -905,6 +894,8 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                             NULL);
     packet++;
   }
+  io_handle->packets_sent = packet;
+  // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
     socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
@@ -925,22 +916,22 @@ call_read_processor (void *cls,
                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   size_t read_size;
   size_t valid_read_size;
   unsigned int packet;
   uint32_t sequence_increase;
   uint32_t offset_increase;
 
-  socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   if (NULL == socket->receive_buffer) 
     return;
-
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (NULL != socket->read_handle->proc);
-
   /* Check the bitmap for any holes */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
@@ -954,22 +945,19 @@ call_read_processor (void *cls,
     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
   GNUNET_assert (0 != valid_read_size);
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   /* Call the data processor */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Calling read processor\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
        GNUNET_i2s (&socket->other_peer));
   read_size = 
-    socket->read_handle->proc (socket->read_handle->proc_cls,
-                               socket->status,
-                               socket->receive_buffer + socket->copy_offset,
-                               valid_read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor read %d bytes\n",
+      socket->read_handle->proc (socket->read_handle->proc_cls,
+                                socket->status,
+                                socket->receive_buffer + socket->copy_offset,
+                                valid_read_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
        GNUNET_i2s (&socket->other_peer), read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor completed successfully\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
        GNUNET_i2s (&socket->other_peer));
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
@@ -984,14 +972,13 @@ call_read_processor (void *cls,
     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
       break;
   }
-
   /* If no packets can be removed we can't move the buffer */
-  if (0 == packet) return;
+  if (0 == packet) 
+    return;
   sequence_increase = packet;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Sequence increase after read processor completion: %u\n",
        GNUNET_i2s (&socket->other_peer), sequence_increase);
-
   /* Shift the data in the receive buffer */
   socket->receive_buffer = 
     memmove (socket->receive_buffer,
@@ -1044,22 +1031,26 @@ read_io_timeout (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
+  if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Read task timedout - Cancelling it\n",
          GNUNET_i2s (&socket->other_peer));
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+    read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  GNUNET_assert (NULL != socket->read_handle);
-  proc = socket->read_handle->proc;
-  proc_cls = socket->read_handle->proc_cls;
-  GNUNET_free (socket->read_handle);
+  proc = read_handle->proc;
+  proc_cls = read_handle->proc_cls;
+  GNUNET_free (read_handle);
   socket->read_handle = NULL;
   /* Call the read processor to signal timeout */
   proc (proc_cls,
@@ -1222,15 +1213,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     }
     if ((NULL != socket->read_handle) /* A read handle is waiting */
         /* There is no current read task */
-        && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+        && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
         /* We have the first packet */
         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 
-          GNUNET_i2s (&socket->other_peer));          
-      socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                      socket);
-    }      
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+          GNUNET_i2s (&socket->other_peer));
+      socket->read_handle->read_task_id =
+         GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+    }
     break;
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1466,9 +1457,9 @@ control_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
     
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
     return;
-  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   LOG_DEBUG ("%s: Retransmitting a control message\n",
                  GNUNET_i2s (&socket->other_peer));
   switch (socket->state)
@@ -2113,7 +2104,7 @@ server_handle_hello (void *cls,
   GNUNET_assert (socket->tunnel == tunnel);
   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
              GNUNET_i2s (&socket->other_peer));
-  switch (socket->status)
+  switch (socket->state)
   {
   case STATE_INIT:
     reply = generate_hello_ack (socket, GNUNET_YES);
@@ -2450,8 +2441,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-    if (!((socket->write_sequence_number 
-           - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+    sequence_difference = 
+       socket->write_sequence_number - ntohl (ack->base_sequence_number);
+    if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Received DATA_ACK with unexpected base sequence number\n",
@@ -2465,17 +2457,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     }
     /* FIXME: include the case when write_handle is cancelled - ignore the 
        acks */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s: Received DATA_ACK from %s\n",
-         GNUNET_i2s (&socket->other_peer),
-         GNUNET_i2s (&socket->other_peer));
-      
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+         GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
     /* Cancel the retransmission task */
     if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
     {
       GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
-      socket->data_retransmission_task_id = 
-        GNUNET_SCHEDULER_NO_TASK;
+      socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
     }
     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
     {
@@ -2483,22 +2471,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
       /* BS: Base sequence from ack; PS: sequence num of current packet */
       sequence_difference = ntohl (ack->base_sequence_number)
         - ntohl (socket->write_handle->messages[packet]->sequence_number);
+      if ((0 == sequence_difference) ||
+         (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH < sequence_difference))
+       continue; /* The message in our handle is not yet received */
       /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
-      if ((sequence_difference == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-          || ((sequence_difference < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
-              && (0 != sequence_difference))) /* case: BS > PS and BS != PS*/
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-        continue;
-      }
-      if (GNUNET_YES == 
-          ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
-                                -sequence_difference))/*inversion as PS >= BS */
-      {
-        ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet,
-                              GNUNET_YES);
-      }
+      /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+      ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+                           packet, GNUNET_YES);
     }
     /* Update the receive window remaining
        FIXME : Should update with the value from a data ack with greater
@@ -2753,7 +2732,6 @@ new_tunnel_notify (void *cls,
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *initiator;
   socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
   socket->lsocket = lsocket;
   socket->retransmit_timeout = lsocket->retransmit_timeout;
@@ -2765,7 +2743,6 @@ new_tunnel_notify (void *cls,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  /* FIXME: Copy MESH handle from lsocket to socket */
   return socket;
 }
 
@@ -2788,18 +2765,15 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  struct MessageQueue *head;
 
-  if (tunnel != socket->tunnel)
-    return;
-
+  GNUNET_assert (tunnel == socket->tunnel);
   GNUNET_break_op(0);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s has terminated connection abruptly\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-
   socket->status = GNUNET_STREAM_SHUTDOWN;
-
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
@@ -2816,8 +2790,27 @@ tunnel_cleaner (void *cls,
   {
     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }  
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+  {
+    GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+    socket->transmit_handle = NULL;
+  }
+  /* Clear existing message queue */
+  while (NULL != (head = socket->queue_head)) {
+    GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+                                socket->queue_tail,
+                                head);
+    GNUNET_free (head->message);
+    GNUNET_free (head);
   }
-  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -2839,9 +2832,8 @@ lockmanager_acquire_timeout (void *cls,
   lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
   listen_cb = lsocket->listen_cb;
   listen_cb_cls = lsocket->listen_cb_cls;
-  GNUNET_STREAM_listen_close (lsocket);
   if (NULL != listen_cb)
-    listen_cb (listen_cb_cls, NULL, NULL);  
+    listen_cb (listen_cb_cls, NULL, NULL);
 }
 
 
@@ -3110,14 +3102,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
-  {
-    /* socket closed with read task pending!? */
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }  
-  /* Terminate the ack'ing tasks if they are still present */
+  /* Terminate the ack'ing task if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
@@ -3351,6 +3336,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   io_handle->write_cont = write_cont;
   io_handle->write_cont_cls = write_cont_cls;
   io_handle->size = size;
+  io_handle->packets_sent = 0;
   sweep = data;
   /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
      determined from RTT */
@@ -3408,9 +3394,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
  * @param proc function to call with data (once only)
  * @param proc_cls the closure for proc
  *
- * @return handle to cancel the operation; if the stream has been shutdown for
- *           this type of opeartion then the DataProcessor is immediately
- *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ *           been shutdown for this type of opeartion (the DataProcessor is
+ *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ *           read handle is present (only one read handle per socket is present
+ *           at any time)
  */
 struct GNUNET_STREAM_IOReadHandle *
 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -3449,22 +3437,14 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   read_handle->proc_cls = proc_cls;
   read_handle->socket = socket;
   socket->read_handle = read_handle;
-  /* Check if we have a packet at bitmap 0 */
   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                           0))
-  {
-    socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                     socket);   
-  }
-  /* Setup the read timeout task */
-  socket->read_io_timeout_task_id =
-    GNUNET_SCHEDULER_add_delayed (timeout,
-                                  &read_io_timeout,
-                                  socket);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: %s() END\n",
-       GNUNET_i2s (&socket->other_peer),
-       __func__);
+    read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+                                                         socket);   
+  read_handle->read_io_timeout_task_id =
+      GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+       GNUNET_i2s (&socket->other_peer), __func__);
   return read_handle;
 }
 
@@ -3514,16 +3494,13 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (ioh == socket->read_handle);
   /* Read io time task should be there; if it is already executed then this
-  read handle is not valid */
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read handle is not valid; However upon scheduler shutdown the read io task
+  may be executed before */
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
   /* reading task may be present; if so we have to stop it */
-  if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
-  {
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_task_id);
   GNUNET_free (ioh);
   socket->read_handle = NULL;
 }