-added write operation
authorSree Harsha Totakura <totakura@in.tum.de>
Sun, 12 Feb 2012 12:28:31 +0000 (12:28 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Sun, 12 Feb 2012 12:28:31 +0000 (12:28 +0000)
src/stream/stream_api.c
src/stream/stream_protocol.h

index faceac297623dd7308e2225be2fc796508d2932d..c25cb6c9bb633faf32380164f0b2a2bbfae3d141 100644 (file)
 #include "stream_protocol.h"
 
 
+/**
+ * The maximum packet size of a stream packet
+ */
 #define MAX_PACKET_SIZE 64000
 
+/**
+ * The maximum payload a data message packet can carry
+ */
+static size_t max_payload_size = 
+  MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+
+/**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
 
 /**
  * states in the Protocol
@@ -221,6 +234,11 @@ struct GNUNET_STREAM_Socket
    * Read sequence number. This number's value is determined during handshake
    */
   uint32_t read_sequence_number;
+
+  /**
+   * receiver's available buffer
+   */
+  uint32_t receive_window_available;
 };
 
 
@@ -266,7 +284,19 @@ struct GNUNET_STREAM_IOHandle
    * The bitmap of this IOHandle; Corresponding bit for a message is set when
    * it has been acknowledged by the receiver
    */
-  GNUNET_STREAM_AckBitmap bitmap;
+  GNUNET_STREAM_AckBitmap ack_bitmap;
+
+  /**
+   * receiver's available buffer
+   */
+  uint32_t receive_window_available;
+
+  /**
+   * Number of packets sent before waiting for an ack
+   *
+   * FIXME: Do we need this?
+   */
+  unsigned int sent_packets;
 };
 
 
@@ -392,7 +422,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
  * @param value GNUNET_YES to on, GNUNET_NO to off
  */
 static void
-AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
                      unsigned int bit, 
                      int value)
 {
@@ -411,7 +441,7 @@ AckBitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
  * @return GNUNET_YES if the bit is set; GNUNET_NO if not
  */
 static uint8_t
-AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
                       unsigned int bit)
 {
   GNUNET_assert (bit < 64);
@@ -419,6 +449,71 @@ AckBitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
 }
 
 
+
+/**
+ * Function called when Data Message is sent
+ *
+ * @param cls the io_handle corresponding to the Data Message
+ * @param socket the socket which was used
+ */
+static void
+write_data_finish_cb (void *cls,
+                      struct GNUNET_STREAM_Socket *socket)
+{
+  struct GNUNET_STREAM_IOHandle *io_handle = cls;
+
+  io_handle->sent_packets++;
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void 
+write_data (struct GNUNET_STREAM_Socket *socket)
+{
+  struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+  unsigned int packet;
+  int ack_packet;
+
+  ack_packet = -1;
+  /* Find the last acknowledged packet */
+  for (packet=0; packet < 64; packet++)
+    {
+      if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+                                              packet))
+        {
+          ack_packet = packet;
+        }
+    }
+  /* Resend packets which weren't ack'ed */
+  for (packet=0; packet < ack_packet; packet++)
+    {
+      if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+                                             packet))
+        {
+          queue_message (socket,
+                         &io_handle->messages[packet]->header,
+                         NULL,
+                         NULL);
+        }
+    }
+  packet = ack_packet + 1;
+  /* Now send new packets if there is enough buffer space */
+  while (io_handle->receive_window_available -=
+         io_handle->messages[packet]->header.header.size > 0)
+    {
+      queue_message (socket,
+                     &io_handle->messages[packet]->header,
+                     &write_data_finish_cb,
+                     io_handle);
+    }
+}
+
+
 /**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
@@ -520,6 +615,7 @@ client_handle_hello_ack (void *cls,
   {
   case STATE_HELLO_WAIT:
       socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+      socket->receive_window_available = ntohl (ack_msg->receive_window_size);
       /* Get the random sequence number */
       socket->write_sequence_number = 
         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
@@ -530,6 +626,7 @@ client_handle_hello_ack (void *cls,
       reply->header.header.type = 
         htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
       reply->sequence_number = htonl (socket->write_sequence_number);
+      reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
       queue_message (socket, 
                      &reply->header, 
                      &set_state_established, 
@@ -840,7 +937,9 @@ server_handle_hello_ack (void *cls,
   GNUNET_assert (socket->tunnel == tunnel);
   if (STATE_HELLO_WAIT == socket->state)
     {
-      socket->read_sequence_number = ntohs (ack_message->sequence_number);
+      socket->read_sequence_number = ntohl (ack_message->sequence_number);
+      socket->receive_window_available = 
+        ntohl (ack_message->receive_window_size);
       socket->state = STATE_ESTABLISHED;
     }
   else
@@ -1039,11 +1138,10 @@ server_handle_close_ack (void *cls,
 /**
  * Message Handler for mesh
  *
- * @param cls closure (set from GNUNET_MESH_connect)
+ * @param socket the socket through which the ack was received
  * @param tunnel connection to the other end
- * @param tunnel_ctx place to store local state associated with the tunnel
  * @param sender who sent the message
- * @param ack the actual message
+ * @param ack the acknowledgment message
  * @param atsi performance data for the connection
  * @return GNUNET_OK to keep the connection open,
  *         GNUNET_SYSERR to close it (signal serious error)
@@ -1055,6 +1153,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
            const struct GNUNET_STREAM_AckMessage *ack,
            const struct GNUNET_ATS_Information*atsi)
 {
+  switch (socket->state)
+    {
+    case (STATE_ESTABLISHED):
+      if (NULL == socket->write_handle)
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Received DATA ACK when write_handle is NULL\n");
+          return GNUNET_OK;
+        }
+
+      socket->write_handle->ack_bitmap = ntoh64 (ack->bitmap);
+      socket->write_handle->receive_window_available = 
+        ntohl (ack->receive_window_remaining);
+      write_data (socket);
+      break;
+    default:
+      break;
+    }
   return GNUNET_OK;
 }
 
@@ -1502,7 +1618,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
   unsigned int packet;
   struct GNUNET_STREAM_IOHandle *io_handle;
   struct GNUNET_STREAM_DataMessage *data_msg;
-  size_t max_payload_size;
   size_t packet_size;
   const void *sweep;
 
@@ -1517,8 +1632,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
       return NULL;
     }
       
-  max_payload_size = 
-    MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
   num_needed_packets = ceil (size / max_payload_size);
   if (64 < num_needed_packets) 
     {
@@ -1528,8 +1641,9 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
     }
 
   io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
+  io_handle->receive_window_available = socket->receive_window_available;
   sweep = data;
-  /* Divide the given area into packets for sending */
+  /* Divide the given buffer into packets for sending */
   for (packet=0; packet < num_needed_packets; packet++)
     {
       if ((packet + 1) * max_payload_size < size) 
@@ -1545,6 +1659,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
       io_handle->messages[packet]->header.header.size = htons (packet_size);
       io_handle->messages[packet]->header.header.type =
         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+      io_handle->messages[packet]->sequence_number =
+        htons (socket->write_sequence_number++);
       data_msg = io_handle->messages[packet];
       memcpy (&data_msg[1],
               sweep,
@@ -1552,5 +1668,7 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
       sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
     }
 
+  write_data (socket);
+
   return io_handle;
 }
index 426cd345b65d1a14ab279d17d3820a7318834a2a..1b3a6838a57a81a1cf9e98e7f11990cdc9ad4426 100644 (file)
@@ -123,6 +123,8 @@ struct GNUNET_STREAM_AckMessage
   /**
    * The sequence number of the Data Message upto which the receiver has filled
    * its buffer without any missing packets
+   *
+   * FIXME: Do we need this?
    */
   uint32_t base_sequence_number GNUNET_PACKED;
 
@@ -147,6 +149,10 @@ struct GNUNET_STREAM_HelloAckMessage
    */
   uint32_t sequence_number;
 
+  /**
+   * The size(in bytes) of the receive window on the peer sending this message
+   */
+  uint32_t receive_window_size;
 };
 
 GNUNET_NETWORK_STRUCT_END