-control retransmission for HELLO and HELLO_ACK
authorSree Harsha Totakura <totakura@in.tum.de>
Thu, 28 Jun 2012 19:42:01 +0000 (19:42 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Thu, 28 Jun 2012 19:42:01 +0000 (19:42 +0000)
src/include/gnunet_stream_lib.h
src/stream/stream_api.c

index fd44ccfa0a9b970afcd313407cd4f315eac9af59..a134470c75b7ccca763fcdb0133e065f79afdd7c 100644 (file)
@@ -140,7 +140,8 @@ enum GNUNET_STREAM_Option
  * @param target the target peer to which the stream has to be opened
  * @param app_port the application port number which uniquely identifies this
  *            stream
- * @param open_cb this function will be called after stream has be established 
+ * @param open_cb this function will be called after stream has be established;
+ *          cannot be NULL
  * @param open_cb_cls the closure for open_cb
  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
  * @return if successful it returns the stream socket; NULL if stream cannot be
index df0710e80e434d8359af16064b78335b56e9b33d..168929b01900775feb187c753d9aece45514cc96 100644 (file)
@@ -272,7 +272,12 @@ struct GNUNET_STREAM_Socket
   /**
    * Task identifier for retransmission task after timeout
    */
-  GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+  GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
+
+  /**
+   * Task identifier for retransmission of control messages
+   */
+  GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
 
   /**
    * The task for sending timely Acks
@@ -576,7 +581,6 @@ send_message_notify (void *cls, size_t size, void *buf)
                                          socket);
     return 0;
   }
-
   ret = ntohs (head->message->header.size);
   GNUNET_assert (size >= ret);
   memcpy (buf, head->message, ret);
@@ -731,17 +735,16 @@ write_data (struct GNUNET_STREAM_Socket *socket);
  * @param tc the Task context
  */
 static void
-retransmission_timeout_task (void *cls,
-                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+data_retransmission_task (void *cls,
+                          const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
     return;
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -925,11 +928,11 @@ write_data (struct GNUNET_STREAM_Socket *socket)
                             NULL);
     packet++;
   }
-  if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
-    socket->retransmission_timeout_task_id = 
+  if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+    socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
                                     (GNUNET_TIME_UNIT_SECONDS, 8),
-                                    &retransmission_timeout_task,
+                                    &data_retransmission_task,
                                     socket);
 }
 
@@ -1292,7 +1295,7 @@ client_handle_data (void *cls,
 /**
  * Callback to set state to ESTABLISHED
  *
- * @param cls the closure from queue_message FIXME: document
+ * @param cls the closure NULL;
  * @param socket the socket to requiring state change
  */
 static void
@@ -1305,6 +1308,10 @@ set_state_established (void *cls,
   socket->write_offset = 0;
   socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+                 socket->control_retransmission_task_id);
+  GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (NULL != socket->lsocket)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1321,7 +1328,7 @@ set_state_established (void *cls,
       GNUNET_free (socket);
     }
   }
-  else if (NULL != socket->open_cb)
+  else
     socket->open_cb (socket->open_cls, socket);
 }
 
@@ -1337,7 +1344,7 @@ set_state_hello_wait (void *cls,
                       struct GNUNET_STREAM_Socket *socket)
 {
   GNUNET_assert (STATE_INIT == socket->state);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Attaining HELLO_WAIT state\n",
        GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_HELLO_WAIT;
@@ -1415,41 +1422,102 @@ set_state_closed (void *cls,
 }
 
 
+/**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+  struct GNUNET_STREAM_MessageHeader *msg;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  return msg;
+}
+
+
 /**
  * Returns a new HelloAckMessage. Also sets the write sequence number for the
  * socket
  *
  * @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ *          GNUNET_NO to use the existing sequence number
  * @return the HelloAckMessage
  */
 static struct GNUNET_STREAM_HelloAckMessage *
-generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+                    int generate_seq)
 {
   struct GNUNET_STREAM_HelloAckMessage *msg;
 
-  /* Get the random sequence number */
-  if (GNUNET_YES == socket->testing_active)
-    socket->write_sequence_number =
-      socket->testing_set_write_sequence_number_value;
-  else
-    socket->write_sequence_number = 
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: write sequence number %u\n",
-       GNUNET_i2s (&socket->other_peer),
-       (unsigned int) socket->write_sequence_number);
-  
+  if (GNUNET_YES == generate_seq)
+  {
+    if (GNUNET_YES == socket->testing_active)
+      socket->write_sequence_number =
+        socket->testing_set_write_sequence_number_value;
+    else
+      socket->write_sequence_number = 
+        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%s: write sequence number %u\n",
+         GNUNET_i2s (&socket->other_peer),
+         (unsigned int) socket->write_sequence_number);
+  }
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.size = 
     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
   msg->sequence_number = htonl (socket->write_sequence_number);
   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-
   return msg;
 }
 
 
+/**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+    
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    return;
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  switch (socket->status)
+  {
+  case STATE_INIT:    
+    GNUNET_break (0);
+    break;
+  case STATE_LISTEN:
+    GNUNET_break (0);
+    break;
+  case STATE_HELLO_WAIT:
+    if (NULL == socket->lsocket) /* We are client */
+      queue_message (socket, generate_hello (), NULL, NULL);
+    else
+      queue_message (socket,
+                     (struct GNUNET_STREAM_MessageHeader *)
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+    break;
+  default:
+    GNUNET_break (0);
+  }
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
+}
+
+
 /**
  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
  *
@@ -1499,11 +1567,11 @@ client_handle_hello_ack (void *cls,
          GNUNET_i2s (&socket->other_peer),
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
-    reply = generate_hello_ack_msg (socket);
+    reply = generate_hello_ack (socket, GNUNET_YES);
     queue_message (socket,
                    &reply->header, 
-                   &set_state_established, 
-                   NULL);      
+                   &set_state_established,
+                   NULL);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
   case STATE_RECEIVE_CLOSE_WAIT:
@@ -2087,31 +2155,34 @@ server_handle_hello (void *cls,
     return GNUNET_YES;
   }
 
-  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
-                 ntohs (message->type));
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Received HELLO from %s\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
 
-  if (STATE_INIT == socket->state)
+  switch (socket->status)
   {
-    reply = generate_hello_ack_msg (socket);
+  case STATE_INIT:
+    reply = generate_hello_ack (socket, GNUNET_YES);
     queue_message (socket, 
                    &reply->header,
                    &set_state_hello_wait, 
                    NULL);
-  }
-  else
-  {
+    break;
+  default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Client sent HELLO when in state %d\n", 
          GNUNET_i2s (&socket->other_peer),
          socket->state);
     /* FIXME: Send RESET? */
-      
   }
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                 socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
   return GNUNET_OK;
 }
 
@@ -2427,8 +2498,6 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-    /* FIXME: increment in the base sequence number is breaking current flow
-     */
     if (!((socket->write_sequence_number 
            - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
@@ -2450,10 +2519,10 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
          GNUNET_i2s (&socket->other_peer));
       
     /* Cancel the retransmission task */
-    if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
     {
-      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-      socket->retransmission_timeout_task_id = 
+      GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+      socket->data_retransmission_task_id = 
         GNUNET_SCHEDULER_NO_TASK;
     }
     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
@@ -2665,30 +2734,23 @@ mesh_peer_connect_callback (void *cls,
          GNUNET_i2s(peer));
     return;
   }
-  
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Target peer %s connected\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
   /* Set state to INIT */
   socket->state = STATE_INIT;
-
   /* Send HELLO message */
-  message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
-  message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  message = generate_hello ();
   queue_message (socket,
                  message,
                  &set_state_hello_wait,
                  NULL);
-
-  /* Call open callback */
-  if (NULL == socket->open_cb)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "STREAM_open callback is NULL\n");
-  }
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                 socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
 }
 
 
@@ -2752,15 +2814,12 @@ new_tunnel_notify (void *cls,
   socket->retransmit_timeout = lsocket->retransmit_timeout;
   socket->testing_active = lsocket->testing_active;
   socket->testing_set_write_sequence_number_value =
-    lsocket->testing_set_write_sequence_number_value;
-    
+    lsocket->testing_set_write_sequence_number_value;    
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
   /* FIXME: Copy MESH handle from lsocket to socket */
-  
   return socket;
 }
 
@@ -2814,10 +2873,10 @@ tunnel_cleaner (void *cls,
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
   /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
@@ -2905,7 +2964,8 @@ lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
  * @param target the target peer to which the stream has to be opened
  * @param app_port the application port number which uniquely identifies this
  *            stream
- * @param open_cb this function will be called after stream has be established 
+ * @param open_cb this function will be called after stream has be established;
+ *          cannot be NULL
  * @param open_cb_cls the closure for open_cb
  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
  * @return if successful it returns the stream socket; NULL if stream cannot be
@@ -2922,17 +2982,17 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   struct GNUNET_STREAM_Socket *socket;
   enum GNUNET_STREAM_Option option;
   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
-  va_list vargs;                /* Variable arguments */
+  va_list vargs;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
+  GNUNET_assert (NULL != open_cb);
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *target;
   socket->open_cb = open_cb;
   socket->open_cls = open_cb_cls;
   /* Set defaults */
-  socket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   socket->testing_active = GNUNET_NO;
   va_start (vargs, open_cb_cls); /* Parse variable args */
   do {
@@ -2972,10 +3032,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
     GNUNET_free (socket);
     return NULL;
   }
-
   /* Now create the mesh tunnel to target */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Creating MESH Tunnel\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
                                               NULL, /* Tunnel context */
                                               &mesh_peer_connect_callback,
@@ -2984,9 +3042,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
   GNUNET_assert (NULL != socket->tunnel);
   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
                                         &socket->other_peer);
-  
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s() END\n", __func__);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
   return socket;
 }
 
@@ -3088,6 +3144,7 @@ GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
 {
   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+  handle->socket->shutdown_handle = NULL;
   GNUNET_free (handle);
 }
 
@@ -3114,22 +3171,24 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     /* socket closed with read task pending!? */
     GNUNET_break (0);
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
-  
+  }  
   /* Terminate the ack'ing tasks if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  }
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
@@ -3143,7 +3202,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
     socket->ack_msg = NULL;
     socket->ack_transmit_handle = NULL;
   }
-
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {
     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
@@ -3213,8 +3271,7 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
   }
   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */  
   /* Set defaults */
-  lsocket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   lsocket->testing_active = GNUNET_NO;
   lsocket->listen_ok_cb = NULL;
   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
@@ -3491,10 +3548,10 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
   GNUNET_assert (NULL != socket->write_handle);
   GNUNET_assert (socket->write_handle == ioh);
 
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
 
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)