fixed read timeout problem and added ack sending incase of ignored data messages
[oweals/gnunet.git] / src / stream / stream_api.c
index 1547f0228653c3a090462243816bf8c5d5b3d9eb..2b9363c68be1955806162e352da199d135601431 100644 (file)
@@ -845,11 +845,18 @@ call_read_processor (void *cls,
   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
 
   /* Call the data processor */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Calling read processor\n",
+              socket->our_id);
   read_size = 
     socket->read_handle->proc (socket->read_handle->proc_cls,
                                socket->status,
                                socket->receive_buffer + socket->copy_offset,
                                valid_read_size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%x: Read processor read %d bytes\n",
+              socket->our_id,
+              read_size);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%x: Read processor completed successfully\n",
               socket->our_id);
@@ -917,17 +924,29 @@ read_io_timeout (void *cls,
                 const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  GNUNET_STREAM_DataProcessor proc;
+  void *proc_cls;
 
   socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%x: Read task timedout - Cancelling it\n",
+                socket->our_id);
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
   GNUNET_assert (NULL != socket->read_handle);
-  
+  proc = socket->read_handle->proc;
+  proc_cls = socket->read_handle->proc_cls;
+
   GNUNET_free (socket->read_handle);
   socket->read_handle = NULL;
+  /* Call the read processor to signal timeout */
+  proc (proc_cls,
+        GNUNET_STREAM_TIMEOUT,
+        NULL,
+        0);
 }
 
 
@@ -986,9 +1005,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                       "%x: Ignoring received message with sequence number %u\n",
                       socket->our_id,
                       ntohl (msg->sequence_number));
+          /* Start ACK sending task if one is not already present */
+          if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+            {
+              socket->ack_task_id = 
+                GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                              (msg->ack_deadline),
+                                              &ack_task,
+                                              socket);
+            }
           return GNUNET_YES;
         }
-
+      
       /* Check if we have already seen this message */
       if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
                                               relative_sequence_number))
@@ -998,6 +1026,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
                       "number %u\n",
                       socket->our_id,
                       ntohl (msg->sequence_number));
+          /* Start ACK sending task if one is not already present */
+          if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+            {
+              socket->ack_task_id = 
+                GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+                                              (msg->ack_deadline),
+                                              &ack_task,
+                                              socket);
+            }
           return GNUNET_YES;
         }
 
@@ -1063,6 +1100,10 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
           && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
                                                  0)))
         {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "%x: Scheduling read processor\n",
+                      socket->our_id);
+
           socket->read_task_id = 
             GNUNET_SCHEDULER_add_now (&call_read_processor,
                                       socket);
@@ -1864,12 +1905,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
                       socket->our_id);
           return GNUNET_OK;
         }
-      
+      /* FIXME: increment in the base sequence number is breaking current flow
+       */
       if (!((socket->write_sequence_number 
              - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%x: Received DATA_ACK with unexpected base sequence",
+                      "%x: Received DATA_ACK with unexpected base sequence "
                       "number\n",
                       socket->our_id);
           return GNUNET_OK;
@@ -2532,14 +2574,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
   struct GNUNET_STREAM_IOReadHandle *read_handle;
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s()\n", __func__);
+              "%x: %s()\n", 
+              socket->our_id,
+              __func__);
 
   /* Return NULL if there is already a read handle; the user has to cancel that
   first before continuing or has to wait until it is completed */
   if (NULL != socket->read_handle) return NULL;
 
+  GNUNET_assert (NULL != proc);
+
   read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
   read_handle->proc = proc;
+  read_handle->proc_cls = proc_cls;
   socket->read_handle = read_handle;
 
   /* Check if we have a packet at bitmap 0 */
@@ -2556,7 +2603,9 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
                                                                &read_io_timeout,
                                                                socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s() END\n", __func__);
+              "%x: %s() END\n",
+              socket->our_id,
+              __func__);
   return read_handle;
 }
 
@@ -2569,6 +2618,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
 void
 GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
 {
+  /* FIXME: Should cancel the write retransmission task */
   return;
 }