stream misc fixing
authorSree Harsha Totakura <totakura@in.tum.de>
Fri, 15 Jun 2012 15:19:08 +0000 (15:19 +0000)
committerSree Harsha Totakura <totakura@in.tum.de>
Fri, 15 Jun 2012 15:19:08 +0000 (15:19 +0000)
src/stream/stream_api.c
src/stream/test_stream_big.c

index 5d22ff7b3d6ca4dcc9eb2ab8e4ff7cc2ca4cf1a5..63e27ea98f5d15bd66de56943f3a34b0c16f6126 100644 (file)
@@ -470,6 +470,22 @@ struct GNUNET_STREAM_ShutdownHandle
 static unsigned int default_timeout = 10;
 
 
+/**
+ * Function to print the contents of an address location. Used only for debugging
+ *
+ * @param ptr the address location; Should be more than 5 bytes long
+ */
+static void
+debug_print_contents (const void *ptr)
+{
+  /* const char *c; */
+  
+  /* c = ptr; */
+  /* LOG (GNUNET_ERROR_TYPE_DEBUG, */
+  /*      "--- contents: %u %u %u %u %u\n", c[0], c[1], c[2], c[3], c[4]); */
+}
+
+
 /**
  * Callback function for sending queued message
  *
@@ -830,6 +846,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
            "%s: Placing DATA message with sequence %u in send queue\n",
            GNUNET_i2s (&socket->other_peer),
            ntohl (io_handle->messages[packet]->sequence_number));
+      debug_print_contents(&(io_handle->messages[packet][1]));
       copy_and_queue_message (socket,
                               &io_handle->messages[packet]->header,
                               NULL,
@@ -849,6 +866,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
          "%s: Placing DATA message with sequence %u in send queue\n",
          GNUNET_i2s (&socket->other_peer),
          ntohl (io_handle->messages[packet]->sequence_number));
+    debug_print_contents(&(io_handle->messages[packet][1]));
     copy_and_queue_message (socket,
                             &io_handle->messages[packet]->header,
                             NULL,
@@ -910,6 +928,7 @@ call_read_processor (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Calling read processor\n",
        GNUNET_i2s (&socket->other_peer));
+  debug_print_contents (socket->receive_buffer + socket->copy_offset);
   read_size = 
     socket->read_handle->proc (socket->read_handle->proc_cls,
                                socket->status,
@@ -943,11 +962,12 @@ call_read_processor (void *cls,
        GNUNET_i2s (&socket->other_peer), sequence_increase);
 
   /* Shift the data in the receive buffer */
-  memmove (socket->receive_buffer,
-           socket->receive_buffer 
-           + socket->receive_buffer_boundaries[sequence_increase-1],
-           socket->receive_buffer_size
-           - socket->receive_buffer_boundaries[sequence_increase-1]);
+  socket->receive_buffer = 
+    memmove (socket->receive_buffer,
+            socket->receive_buffer 
+            + socket->receive_buffer_boundaries[sequence_increase-1],
+            socket->receive_buffer_size
+            - socket->receive_buffer_boundaries[sequence_increase-1]);
   /* Shift the bitmap */
   socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
   /* Set read_sequence_number */
@@ -963,9 +983,18 @@ call_read_processor (void *cls,
   {
     if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
     {
-      socket->receive_buffer_boundaries[packet] = 
-        socket->receive_buffer_boundaries[packet + sequence_increase] 
-        - offset_increase;
+      uint32_t ahead_buffer_boundary;
+
+      ahead_buffer_boundary = 
+       socket->receive_buffer_boundaries[packet + sequence_increase];
+      if (0 == ahead_buffer_boundary)
+       socket->receive_buffer_boundaries[packet] = 0;
+      else
+      {
+       GNUNET_assert (offset_increase < ahead_buffer_boundary);
+       socket->receive_buffer_boundaries[packet] = 
+         ahead_buffer_boundary - offset_increase;
+      }
     }
     else
       socket->receive_buffer_boundaries[packet] = 0;
@@ -1130,6 +1159,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
       
     /* Copy Data to buffer */
     payload = &msg[1];
+    debug_print_contents(payload);
     GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
     memcpy (socket->receive_buffer + relative_offset,
             payload,
@@ -2418,22 +2448,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
     }
     else      /* We have to call the write continuation callback now */
     {
+      struct GNUNET_STREAM_IOWriteHandle *write_handle;
+      
       /* Free the packets */
       for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
       {
         GNUNET_free_non_null (socket->write_handle->messages[packet]);
       }
-      if (NULL != socket->write_handle->write_cont)
-        socket->write_handle->write_cont
-          (socket->write_handle->write_cont_cls,
-           socket->status,
-           socket->write_handle->size);
+      write_handle = socket->write_handle;
+      socket->write_handle = NULL;
+      if (NULL != write_handle->write_cont)
+        write_handle->write_cont (write_handle->write_cont_cls,
+                                 socket->status,
+                                 write_handle->size);
+      /* We are done with the write handle - Freeing it */
+      GNUNET_free (write_handle);
       LOG (GNUNET_ERROR_TYPE_DEBUG,
            "%s: Write completion callback completed\n",
-           GNUNET_i2s (&socket->other_peer));
-      /* We are done with the write handle - Freeing it */
-      GNUNET_free (socket->write_handle);
-      socket->write_handle = NULL;
+           GNUNET_i2s (&socket->other_peer));      
     }
     break;
   default:
index b3e6fb20cf510ff886242bbd509eada55bd4fe1a..001c4f67eb7da0c0f98b5eb9de27bc324bdab3d8 100644 (file)
@@ -89,6 +89,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_STREAM_close (peer1.socket);
   if (NULL != peer2.socket)
     GNUNET_STREAM_close (peer2.socket);
+  if (NULL != peer2_listen_socket)
+    GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
   if (0 != abort_task)
   {
@@ -152,7 +154,7 @@ write_completion (void *cls,
     {
       peer->io_write_handle =
         GNUNET_STREAM_write (peer->socket,
-                             (void *) data,
+                             ((void *) data) + peer->bytes_wrote,
                              DATA_SIZE - peer->bytes_wrote,
                              GNUNET_TIME_relative_multiply
                              (GNUNET_TIME_UNIT_SECONDS, 5),
@@ -221,10 +223,18 @@ stream_open_cb (void *cls,
 }
 
 
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
  * Input processor
  *
- * @param cls the closure from GNUNET_STREAM_write/read
+ * @param cls peer2
  * @param status the status of the stream at the time this function is called
  * @param data traffic from the other side
  * @param size the number of bytes available in data read 
@@ -240,21 +250,23 @@ input_processor (void *cls,
   struct PeerData *peer = cls;
 
   GNUNET_assert (GNUNET_STREAM_OK == status);
+  GNUNET_assert (&peer2 == peer);
   GNUNET_assert (size < DATA_SIZE);
-  GNUNET_assert (memcmp (data + peer->bytes_read, 
-                         input_data,
-                         size));
+  GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read, 
+                             input_data, size));
   peer->bytes_read += size;
   
   if (peer->bytes_read < DATA_SIZE)
   {
-    peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                               peer->socket,
-                                               GNUNET_TIME_relative_multiply
-                                               (GNUNET_TIME_UNIT_SECONDS, 5),
-                                               &input_processor,
-                                               cls);
-    GNUNET_assert (NULL != peer->io_read_handle);
+    GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+    read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+    /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */
+    /*                                            peer->socket, */
+    /*                                            GNUNET_TIME_relative_multiply */
+    /*                                            (GNUNET_TIME_UNIT_SECONDS, 5), */
+    /*                                            &input_processor, */
+    /*                                            cls); */
+    /* GNUNET_assert (NULL != peer->io_read_handle); */
   }
   else 
   {
@@ -274,18 +286,17 @@ static void
 stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct PeerData *peer = cls;
 
   read_task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_assert (socket == peer2.socket);
-  peer2.bytes_read = 0;
-  GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
-  peer2.io_read_handle =
-    GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+  GNUNET_assert (&peer2 == peer);  
+  peer->io_read_handle =
+    GNUNET_STREAM_read (peer->socket,
                         GNUNET_TIME_relative_multiply
                         (GNUNET_TIME_UNIT_SECONDS, 10),
                         &input_processor,
-                        cls);
-  GNUNET_assert (NULL != peer2.io_read_handle);
+                        peer);
+  GNUNET_assert (NULL != peer->io_read_handle);
 }
 
 
@@ -311,7 +322,8 @@ stream_listen_cb (void *cls,
               "Peer connected: %s\n", GNUNET_i2s(initiator));
 
   peer2.socket = socket;
-  read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, socket);
+  peer2.bytes_read = 0;
+  read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
   return GNUNET_OK;
 }
 
@@ -347,6 +359,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   GNUNET_assert (NULL != peer1.socket);                  
 }
 
+
 /**
  * Initialize framework and start test
  */
@@ -366,7 +379,7 @@ run (void *cls, char *const *args, const char *cfgfile,
   
   abort_task =
     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort,
+                                  (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
                                   NULL);
   
   test_task = GNUNET_SCHEDULER_add_now (&test, NULL);