-task cancellations in tunnel cleaner
authorSree Harsha Totakura <totakura@in.tum.de>
Sun, 22 Apr 2012 22:16:39 +0000 (22:16 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Sun, 22 Apr 2012 22:16:39 +0000 (22:16 +0000)
src/stream/stream_api.c

index e20d438635c27fb602b4cbbbfc43108cc4826725..7973660f12e9da22398a2e76c34422748957c119 100644 (file)
@@ -211,6 +211,16 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_MESH_TransmitHandle *transmit_handle;
 
+  /**
+   * The current act transmit handle (if a pending ack transmit request exists)
+   */
+  struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
+
+  /**
+   * Pointer to the current ack message using in ack_task
+   */
+  struct GNUNET_STREAM_AckMessage *ack_msg;
+
   /**
    * The current message associated with the transmit handle
    */
@@ -630,7 +640,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
 static size_t
 send_ack_notify (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+  struct GNUNET_STREAM_Socket *socket = cls;
 
   if (0 == size)
     {
@@ -638,10 +648,14 @@ send_ack_notify (void *cls, size_t size, void *buf)
                   "%s called with size 0\n", __func__);
       return 0;
     }
-  GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
+  GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
+  
+  size = ntohs (socket->ack_msg->header.header.size);
+  memcpy (buf, socket->ack_msg, size);
   
-  size = ntohs (ack_msg->header.header.size);
-  memcpy (buf, ack_msg, size);
+  GNUNET_free (socket->ack_msg);
+  socket->ack_msg = NULL;
+  socket->ack_transmit_handle = NULL;
   return size;
 }
 
@@ -708,18 +722,18 @@ ack_task (void *cls,
   ack_msg->receive_window_remaining = 
     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
 
+  socket->ack_msg = ack_msg;
   GNUNET_PEER_resolve (socket->other_peer, &target);
   /* Request MESH for sending ACK */
-  GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                     0, /* Corking */
-                                     1, /* Priority */
-                                     socket->retransmit_timeout,
-                                     &target,
-                                     ntohs (ack_msg->header.header.size),
-                                     &send_ack_notify,
-                                     ack_msg);
-
-  
+  socket->ack_transmit_handle = 
+    GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                       0, /* Corking */
+                                       1, /* Priority */
+                                       socket->retransmit_timeout,
+                                       &target,
+                                       ntohs (ack_msg->header.header.size),
+                                       &send_ack_notify,
+                                       socket);
 }
 
 
@@ -1662,7 +1676,7 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
             }
 
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Received TRAMSMIT_CLOSE_ACK from %x\n",
+                      "%x: Received TRANSMIT_CLOSE_ACK from %x\n",
                       socket->our_id,
                       socket->other_peer);
           socket->state = STATE_TRANSMIT_CLOSED;
@@ -2706,7 +2720,11 @@ tunnel_cleaner (void *cls,
                 void *tunnel_ctx)
 {
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-  
+
+  if (tunnel != socket->tunnel)
+    return;
+
+  GNUNET_break_op(0);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%x: Peer %x has terminated connection abruptly\n",
               socket->our_id,
@@ -2720,6 +2738,25 @@ tunnel_cleaner (void *cls,
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
       socket->transmit_handle = NULL;
     }
+  if (NULL != socket->ack_transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+      GNUNET_free (socket->ack_msg);
+      socket->ack_msg = NULL;
+      socket->ack_transmit_handle = NULL;
+    }
+  /* Stop Tasks using socket->tunnel */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+      socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    {
+      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+      socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    }
+  /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
 }
 
@@ -2789,7 +2826,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                       10,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
-                                      &tunnel_cleaner, /* FIXME: not required? */
+                                      NULL, /* No in-tunnel cleaner */
                                       client_message_handlers,
                                       ports); /* We don't get inbound tunnels */
   if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
@@ -2952,6 +2989,13 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
       socket->transmit_handle = NULL;
     }
+  if (NULL != socket->ack_transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+      GNUNET_free (socket->ack_msg);
+      socket->ack_msg = NULL;
+      socket->ack_transmit_handle = NULL;
+    }
 
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {