regex profiler cleanup
[oweals/gnunet.git] / src / stream / stream_api.c
index 70459ee3f8c6baa803feb75ec518195e332ad227..3abea2a49f41afb0e1c7234ad4ff287a130d718a 100644 (file)
@@ -260,11 +260,6 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_PeerIdentity other_peer;
 
-  /**
-   * Task identifier for the read io timeout task
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
-
   /**
    * Task identifier for retransmission task after timeout
    */
@@ -280,11 +275,6 @@ struct GNUNET_STREAM_Socket
    */
   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
 
-  /**
-   * Task scheduled to continue a read operation.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
-
   /**
    * The state of the protocol associated with this socket
    */
@@ -315,12 +305,6 @@ struct GNUNET_STREAM_Socket
    */
   uint32_t testing_set_write_sequence_number_value;
 
-  /**
-   * The session id associated with this stream connection
-   * FIXME: Not used currently, may be removed
-   */
-  uint32_t session_id;
-
   /**
    * Write sequence number. Set to random when sending HELLO(client) and
    * HELLO_ACK(server) 
@@ -512,6 +496,16 @@ struct GNUNET_STREAM_IOReadHandle
    * The closure pointer for the read processor callback
    */
   void *proc_cls;
+
+  /**
+   * Task identifier for the read io timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+  /**
+   * Task scheduled to continue a read operation.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier read_task_id;
 };
 
 
@@ -675,13 +669,13 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
   {
     socket->retries = 0;
     socket->transmit_handle = 
-      GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                        GNUNET_NO, /* Corking */
-                                        socket->retransmit_timeout,
-                                        &socket->other_peer,
-                                        ntohs (message->header.size),
-                                        &send_message_notify,
-                                        socket);
+       GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                          GNUNET_NO, /* Corking */
+                                          socket->retransmit_timeout,
+                                          &socket->other_peer,
+                                          ntohs (message->header.size),
+                                          &send_message_notify,
+                                          socket);
   }
 }
 
@@ -734,11 +728,11 @@ data_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
     return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -756,11 +750,9 @@ ack_task (void *cls,
   struct GNUNET_STREAM_Socket *socket = cls;
   struct GNUNET_STREAM_AckMessage *ack_msg;
 
-  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
-  {
-    return;
-  }
   socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   /* Create the ACK Message */
   ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
   ack_msg->header.header.size = htons (sizeof (struct 
@@ -789,9 +781,11 @@ close_msg_retransmission_task (void *cls,
   struct GNUNET_STREAM_MessageHeader *msg;
   struct GNUNET_STREAM_Socket *socket;
 
+  shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (NULL != shutdown_handle);
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
   socket = shutdown_handle->socket;
-
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   switch (shutdown_handle->operation)
@@ -901,6 +895,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
     packet++;
   }
   io_handle->packets_sent = packet;
+  // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
   if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
     socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
@@ -921,22 +916,22 @@ call_read_processor (void *cls,
                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   size_t read_size;
   size_t valid_read_size;
   unsigned int packet;
   uint32_t sequence_increase;
   uint32_t offset_increase;
 
-  socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   if (NULL == socket->receive_buffer) 
     return;
-
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (NULL != socket->read_handle->proc);
-
   /* Check the bitmap for any holes */
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
   {
@@ -950,22 +945,19 @@ call_read_processor (void *cls,
     socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
   GNUNET_assert (0 != valid_read_size);
   /* Cancel the read_io_timeout_task */
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
   /* Call the data processor */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Calling read processor\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
        GNUNET_i2s (&socket->other_peer));
   read_size = 
-    socket->read_handle->proc (socket->read_handle->proc_cls,
-                               socket->status,
-                               socket->receive_buffer + socket->copy_offset,
-                               valid_read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor read %d bytes\n",
+      socket->read_handle->proc (socket->read_handle->proc_cls,
+                                socket->status,
+                                socket->receive_buffer + socket->copy_offset,
+                                valid_read_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
        GNUNET_i2s (&socket->other_peer), read_size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: Read processor completed successfully\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
        GNUNET_i2s (&socket->other_peer));
   /* Free the read handle */
   GNUNET_free (socket->read_handle);
@@ -980,14 +972,13 @@ call_read_processor (void *cls,
     if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
       break;
   }
-
   /* If no packets can be removed we can't move the buffer */
-  if (0 == packet) return;
+  if (0 == packet) 
+    return;
   sequence_increase = packet;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Sequence increase after read processor completion: %u\n",
        GNUNET_i2s (&socket->other_peer), sequence_increase);
-
   /* Shift the data in the receive buffer */
   socket->receive_buffer = 
     memmove (socket->receive_buffer,
@@ -1040,22 +1031,26 @@ read_io_timeout (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
   GNUNET_STREAM_DataProcessor proc;
   void *proc_cls;
 
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+  read_handle = socket->read_handle;
+  GNUNET_assert (NULL != read_handle);
+  read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+    return;
+  if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Read task timedout - Cancelling it\n",
          GNUNET_i2s (&socket->other_peer));
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+    read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  GNUNET_assert (NULL != socket->read_handle);
-  proc = socket->read_handle->proc;
-  proc_cls = socket->read_handle->proc_cls;
-  GNUNET_free (socket->read_handle);
+  proc = read_handle->proc;
+  proc_cls = read_handle->proc_cls;
+  GNUNET_free (read_handle);
   socket->read_handle = NULL;
   /* Call the read processor to signal timeout */
   proc (proc_cls,
@@ -1218,15 +1213,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
     }
     if ((NULL != socket->read_handle) /* A read handle is waiting */
         /* There is no current read task */
-        && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+        && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
         /* We have the first packet */
         && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 
-          GNUNET_i2s (&socket->other_peer));          
-      socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                      socket);
-    }      
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+          GNUNET_i2s (&socket->other_peer));
+      socket->read_handle->read_task_id =
+         GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+    }
     break;
   default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1462,9 +1457,9 @@ control_retransmission_task (void *cls,
 {
   struct GNUNET_STREAM_Socket *socket = cls;
     
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
     return;
-  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   LOG_DEBUG ("%s: Retransmitting a control message\n",
                  GNUNET_i2s (&socket->other_peer));
   switch (socket->state)
@@ -2109,7 +2104,7 @@ server_handle_hello (void *cls,
   GNUNET_assert (socket->tunnel == tunnel);
   LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
              GNUNET_i2s (&socket->other_peer));
-  switch (socket->status)
+  switch (socket->state)
   {
   case STATE_INIT:
     reply = generate_hello_ack (socket, GNUNET_YES);
@@ -2737,7 +2732,6 @@ new_tunnel_notify (void *cls,
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *initiator;
   socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
   socket->state = STATE_INIT;
   socket->lsocket = lsocket;
   socket->retransmit_timeout = lsocket->retransmit_timeout;
@@ -2749,7 +2743,6 @@ new_tunnel_notify (void *cls,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  /* FIXME: Copy MESH handle from lsocket to socket */
   return socket;
 }
 
@@ -2772,18 +2765,15 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  struct MessageQueue *head;
 
-  if (tunnel != socket->tunnel)
-    return;
-
+  GNUNET_assert (tunnel == socket->tunnel);
   GNUNET_break_op(0);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s has terminated connection abruptly\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-
   socket->status = GNUNET_STREAM_SHUTDOWN;
-
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
@@ -2800,8 +2790,27 @@ tunnel_cleaner (void *cls,
   {
     GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
     socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }  
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+    socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  }
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+  {
+    GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+    socket->transmit_handle = NULL;
+  }
+  /* Clear existing message queue */
+  while (NULL != (head = socket->queue_head)) {
+    GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+                                socket->queue_tail,
+                                head);
+    GNUNET_free (head->message);
+    GNUNET_free (head);
   }
-  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -3093,14 +3102,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-  if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
-  {
-    /* socket closed with read task pending!? */
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }  
-  /* Terminate the ack'ing tasks if they are still present */
+  /* Terminate the ack'ing task if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
@@ -3392,9 +3394,11 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
  * @param proc function to call with data (once only)
  * @param proc_cls the closure for proc
  *
- * @return handle to cancel the operation; if the stream has been shutdown for
- *           this type of opeartion then the DataProcessor is immediately
- *           called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if: the stream has
+ *           been shutdown for this type of opeartion (the DataProcessor is
+ *           immediately called with GNUNET_STREAM_SHUTDOWN as status) OR another
+ *           read handle is present (only one read handle per socket is present
+ *           at any time)
  */
 struct GNUNET_STREAM_IOReadHandle *
 GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -3433,22 +3437,14 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   read_handle->proc_cls = proc_cls;
   read_handle->socket = socket;
   socket->read_handle = read_handle;
-  /* Check if we have a packet at bitmap 0 */
   if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                           0))
-  {
-    socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
-                                                     socket);   
-  }
-  /* Setup the read timeout task */
-  socket->read_io_timeout_task_id =
-    GNUNET_SCHEDULER_add_delayed (timeout,
-                                  &read_io_timeout,
-                                  socket);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: %s() END\n",
-       GNUNET_i2s (&socket->other_peer),
-       __func__);
+    read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+                                                         socket);   
+  read_handle->read_io_timeout_task_id =
+      GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+       GNUNET_i2s (&socket->other_peer), __func__);
   return read_handle;
 }
 
@@ -3498,16 +3494,13 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
   GNUNET_assert (NULL != socket->read_handle);
   GNUNET_assert (ioh == socket->read_handle);
   /* Read io time task should be there; if it is already executed then this
-  read handle is not valid */
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id);
-  GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
-  socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  read handle is not valid; However upon scheduler shutdown the read io task
+  may be executed before */
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
   /* reading task may be present; if so we have to stop it */
-  if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id)
-  {
-    GNUNET_SCHEDULER_cancel (socket->read_task_id);
-    socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
+  if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
+    GNUNET_SCHEDULER_cancel (ioh->read_task_id);
   GNUNET_free (ioh);
   socket->read_handle = NULL;
 }