added ack sending
authorSree Harsha Totakura <totakura@in.tum.de>
Wed, 22 Feb 2012 10:10:18 +0000 (10:10 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Wed, 22 Feb 2012 10:10:18 +0000 (10:10 +0000)
src/stream/stream_api.c

index e412b679df97de4661a2d4f8cd7bffe6bd44b01e..f7ab231458a7f915cd3dfa2ef5312b1aaa5eb866 100644 (file)
@@ -160,6 +160,26 @@ struct GNUNET_STREAM_Socket
    */
   struct GNUNET_TIME_Relative retransmit_timeout;
 
+  /**
+   * The Acknowledgement Bitmap
+   */
+  GNUNET_STREAM_AckBitmap ack_bitmap;
+
+  /**
+   * Time when the Acknowledgement was queued
+   */
+  struct GNUNET_TIME_Absolute ack_time_registered;
+
+  /**
+   * Queued Acknowledgement deadline
+   */
+  struct GNUNET_TIME_Relative ack_time_deadline;
+
+  /**
+   * The task for sending timely Acks
+   */
+  GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
   /**
    * The mesh handle
    */
@@ -243,7 +263,7 @@ struct GNUNET_STREAM_Socket
   uint32_t read_sequence_number;
 
   /**
-   * receiver's available buffer
+   * receiver's available buffer after the last acknowledged packet
    */
   uint32_t receive_window_available;
 };
@@ -421,6 +441,76 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
 }
 
 
+/**
+ * Callback function for sending ack message
+ *
+ * @param cls closure the ACK message created in ack_task
+ * @param size number of bytes available in buffer
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_ack_notify (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+
+  if (0 == size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s called with size 0\n", __func__);
+      return 0;
+    }
+  GNUNET_assert (ack_msg->header.header.size <= size);
+  
+  size = ack_msg->header.header.size;
+  memcpy (buf, ack_msg, size);
+  return size;
+}
+
+
+/**
+ * Task for sending ACK message
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+ack_task (void *cls,
+          const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_AckMessage *ack_msg;
+
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    {
+      return;
+    }
+
+  socket->ack_task_id = 0;
+
+  /* Create the ACK Message */
+  ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
+  ack_msg->header.header.size = htons (sizeof (struct 
+                                               GNUNET_STREAM_AckMessage));
+  ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
+  ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
+  ack_msg->base_sequence_number = htonl (socket->write_sequence_number);
+  ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+
+  /* Request MESH for sending ACK */
+  GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                     0, /* Corking */
+                                     1, /* Priority */
+                                     socket->retransmit_timeout,
+                                     &socket->other_peer,
+                                     ntohs (ack_msg->header.header.size),
+                                     &send_ack_notify,
+                                     ack_msg);
+
+  
+}
+
+
 /**
  * Function to modify a bit in GNUNET_STREAM_AckBitmap
  *
@@ -577,7 +667,23 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
               * MAX_PACKET_SIZE,
               payload,
               size);
-      /* FIXME: We have to send GNUNET_STREAM_AckMessage */
+      
+      /* Modify the ACK bitmap */
+      ackbitmap_modify_bit (&socket->bitmap,
+                            ntohl (msg->sequence_number) -
+                            socket->read_sequence_number,
+                            GNUNET_YES);
+
+      /* Start ACK sending task if one is not already present */
+      if (0 == socket->ack_task_id)
+       {
+         socket->ack_task_id = 
+           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                         (msg->ack_deadline),
+                                         &ack_task,
+                                         socket);
+       }
+      
       break;
 
     default:
@@ -1740,8 +1846,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   unsigned int num_needed_packets;
   unsigned int packet;
   struct GNUNET_STREAM_IOHandle *io_handle;
-  struct GNUNET_STREAM_DataMessage *data_msg;
   size_t packet_size;
+  struct GNUNET_STREAM_DataMessage *data_msg;
   const void *sweep;
 
   /* There is already a write request pending */