adding delay heap
authorChristian Grothoff <christian@grothoff.org>
Mon, 14 Mar 2011 19:04:15 +0000 (19:04 +0000)
committerChristian Grothoff <christian@grothoff.org>
Mon, 14 Mar 2011 19:04:15 +0000 (19:04 +0000)
src/fs/gnunet-service-fs_pe.c

index af20c4bf30f493e18cd22c6219e059e390722287..1c73c8dda6a757c9f3ee314e0213434f5be8aeb5 100644 (file)
@@ -59,14 +59,24 @@ struct GSF_RequestPlan
   struct GSF_PendingRequest *pr;
 
   /**
-   * Earliest time we'd be happy to transmit this request.
+   * Earliest time we'd be happy to (re)transmit this request.
    */
   struct GNUNET_TIME_Absolute earliest_transmission;
 
   /**
-   * Priority for this request for this target.
+   * When was the last time we transmitted this request to this peer? 0 for never.
    */
-  uint32_t priority;
+  struct GNUNET_TIME_Absolute last_transmission;
+
+  /**
+   * Current priority for this request for this target.
+   */
+  uint64_t priority;
+
+  /**
+   * How often did we transmit this request to this peer?
+   */
+  unsigned int transmission_counter;
 
 };
 
@@ -77,9 +87,14 @@ struct GSF_RequestPlan
 struct PeerPlan
 {
   /**
-   * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority.
+   * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
    */
-  struct GNUNET_CONTAINER_Heap *heap;
+  struct GNUNET_CONTAINER_Heap *priority_heap;
+
+  /**
+   * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
+   */
+  struct GNUNET_CONTAINER_Heap *delay_heap;
 
   /**
    * Current transmission request handle.
@@ -114,16 +129,20 @@ static void
 plan (struct PeerPlan *pp,
       struct GSF_RequestPlan *rp)
 {
-  GNUNET_CONTAINER_HeapCostType weight;
   struct GSF_PendingRequestData *prd;
 
   prd = GSF_pending_request_get_data_ (rp->pr);
-  weight = 0; // FIXME: calculate real weight!
   // FIXME: calculate 'rp->earliest_transmission'!
   // fIXME: claculate 'rp->priority'! 
-  rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap,
-                                        rp,
-                                        weight);
+
+  if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
+    rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+                                          rp,
+                                          rp->priority);
+  else
+    rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+                                          rp,
+                                          rp->earliest_transmission.abs_value);
 }
 
 
@@ -161,7 +180,7 @@ transmit_message_callback (void *cls,
       pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
       return 0;
     }
-  rp = GNUNET_CONTAINER_heap_peek (pp->heap);
+  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
   msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
   if (msize > buf_size)
     {
@@ -170,8 +189,10 @@ transmit_message_callback (void *cls,
       return 0;
     }
   /* remove from root, add again elsewhere... */
-  GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+  GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
   rp->hn = NULL;
+  rp->last_transmission = GNUNET_TIME_absolute_get ();
+  rp->transmission_counter++;
   plan (pp, rp);
   return msize;
 }
@@ -191,24 +212,34 @@ schedule_peer_transmission (void *cls,
   struct GSF_RequestPlan *rp;
   struct GSF_PendingRequestData *prd;
   size_t msize;
-  struct GNUNET_TIME_Relative delay;
 
   pp->task = GNUNET_SCHEDULER_NO_TASK;
-  if (NULL == pp->heap)
-    return;
-  if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
-    return;
   GNUNET_assert (NULL == pp->pth);
-  rp = GNUNET_CONTAINER_heap_peek (pp->heap);
-  prd = GSF_pending_request_get_data_ (rp->pr);
-  delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
-  if (delay.rel_value > 0)
+  /* move ready requests to priority queue */
+  while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
+         (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
+    {
+      rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
+      GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+                                   rp, 
+                                   rp->priority);                                      
+      if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
+       break;
+    }   
+  if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
     {
-      pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+      /* priority heap (still) empty, check for delay... */
+      rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
+      if (NULL == rp)
+       return; /* both queues empty */
+      pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
                                               &schedule_peer_transmission,
                                               pp);
       return;
     }
+  /* process from priority heap */
+  rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+  prd = GSF_pending_request_get_data_ (rp->pr);
   msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);                                     
   pp->pth = GSF_peer_transmit_ (pp->cp,
                                GNUNET_YES,
@@ -242,7 +273,8 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
   if (NULL == pp)
     {
       pp = GNUNET_malloc (sizeof (struct PeerPlan));
-      pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+      pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
+      pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
       GNUNET_CONTAINER_multihashmap_put (plans,
                                         &id.hashPubKey,
                                         pp,
@@ -255,10 +287,24 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
                               prd->rp_tail,
                               rp);
   plan (pp, rp);
+  if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
+    {
+      /* no request that should be done immediately, figure out delay */
+      if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
+       return; /* did not change delay heap top, no need to do anything */
+      GNUNET_assert (NULL == pp->pth);
+      if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+       GNUNET_SCHEDULER_cancel (pp->task);
+      pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
+                                              &schedule_peer_transmission,
+                                              pp);
+      return;
+    }
+
   if (pp->pth != NULL)
     {
-      if (rp != GNUNET_CONTAINER_heap_peek (pp->heap))
-       return;
+      if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
+       return; /* did not change priority heap top, no need to do anyhing */
       GSF_peer_transmit_cancel_ (pp->pth);
       pp->pth = NULL;
     }
@@ -293,7 +339,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
     GSF_peer_transmit_cancel_ (pp->pth);
   if (GNUNET_SCHEDULER_NO_TASK != pp->task)
     GNUNET_SCHEDULER_cancel (pp->task);
-  while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap)))
+  while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
+    {
+      prd = GSF_pending_request_get_data_ (rp->pr);
+      GNUNET_CONTAINER_DLL_remove (prd->rp_head,
+                                  prd->rp_tail,
+                                  rp);
+      GNUNET_free (rp);
+    }
+  GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
+  while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
     {
       prd = GSF_pending_request_get_data_ (rp->pr);
       GNUNET_CONTAINER_DLL_remove (prd->rp_head,
@@ -301,7 +356,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
                                   rp);
       GNUNET_free (rp);
     }
-  GNUNET_CONTAINER_heap_destroy (pp->heap);
+  GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
   GNUNET_free (pp);
 }