fixing block reconstruction start/shutdown code
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 139cb5aa92843278615f8608b8dd0f413a9020d4..08b01fbbf561819e6a96763e1a432382367ba6e1 100644 (file)
@@ -24,6 +24,8 @@
  * @author Christian Grothoff
  *
  * TODO:
+ * - collect traffic data for anonymity levels > 1
+ * - implement transmission restrictions for anonymity level > 1
  * - more statistics
  */
 #include "platform.h"
@@ -37,6 +39,7 @@
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_transport_service.h"
 #include "gnunet_util_lib.h"
 #include "gnunet-service-fs_indexing.h"
 #include "fs.h"
  */
 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
+/**
+ * How quickly do we age cover traffic?  At the given 
+ * time interval, remaining cover traffic counters are
+ * decremented by 1/16th.
+ */
+#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
 /**
  * How often do we at most PUT content into the DHT?
  */
@@ -282,6 +292,16 @@ struct ConnectedPeer
    */
   struct GNUNET_LOAD_Value *transmission_delay;
 
+  /**
+   * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
+   */
+  struct GNUNET_CORE_InformationRequestContext *irc;
+
+  /**
+   * Request for which 'irc' is currently active (or NULL).
+   */
+  struct PendingRequest *pr;
+
   /**
    * Time when the last transmission request was issued.
    */
@@ -482,9 +502,6 @@ struct UsedTargetEntry
 };
 
 
-
-
-
 /**
  * Doubly-linked list of messages we are performing
  * due to a pending request.
@@ -554,14 +571,14 @@ struct PendingRequest
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
   /**
-   * Context of our GNUNET_CORE_peer_change_preference call.
+   * Reference to DHT get operation for this request (or NULL).
    */
-  struct GNUNET_CORE_InformationRequestContext *irc;
+  struct GNUNET_DHT_GetHandle *dht_get;
 
   /**
-   * Reference to DHT get operation for this request (or NULL).
+   * Context of our GNUNET_CORE_peer_change_preference call.
    */
-  struct GNUNET_DHT_GetHandle *dht_get;
+  struct ConnectedPeer *pirc;
 
   /**
    * Hash code of all replies that we have seen so far (only valid
@@ -875,7 +892,12 @@ static unsigned int mig_size;
 /**
  * Are we allowed to migrate content to this peer.
  */
-static int active_migration;
+static int active_to_migration;
+
+/**
+ * Are we allowed to push out content from this peer.
+ */
+static int active_from_migration;
 
 /**
  * How many entires with zero anonymity do we currently estimate
@@ -910,6 +932,34 @@ static struct GNUNET_LOAD_Value *datastore_put_load;
  */
 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
 
+/**
+ * How many query messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_query_count;
+
+/**
+ * How many content messages have we received 'recently' that 
+ * have not yet been claimed as cover traffic?
+ */
+static unsigned int cover_content_count;
+
+/**
+ * ID of our task that we use to age the cover counters.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
+
+static void
+age_cover_counters (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  cover_content_count = (cover_content_count * 15) / 16;
+  cover_query_count = (cover_query_count * 15) / 16;
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
+}
+
 /**
  * We've just now completed a datastore request.  Update our
  * datastore load calculations.
@@ -1524,10 +1574,11 @@ destroy_pending_request (struct PendingRequest *pr)
       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                       
       pr->bf = NULL;
     }
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
-      GNUNET_CORE_peer_change_preference_cancel (pr->irc);
-      pr->irc = NULL;
+      GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
+      pr->pirc->irc = NULL;
+      pr->pirc = NULL;
     }
   if (pr->replies_seen != NULL)
     {
@@ -1564,8 +1615,17 @@ destroy_pending_request (struct PendingRequest *pr)
 static struct GNUNET_TIME_Relative
 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
-  /* FIXME: extract latency data from 'atsi' */
-  return GNUNET_TIME_UNIT_SECONDS;
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+         (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
+    {
+      GNUNET_break (0);
+      /* how can we not have latency data? */
+      return GNUNET_TIME_UNIT_SECONDS;
+    }
+  return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                       ntohl (atsi->value));
 }
 
 
@@ -1816,6 +1876,14 @@ peer_disconnect_handler (void *cls,
                GNUNET_CONTAINER_multihashmap_remove (connected_peers,
                                                      &peer->hashPubKey,
                                                      cp));
+  if (cp->irc != NULL)
+    {
+      GNUNET_CORE_peer_change_preference_cancel (cp->irc);
+      cp->irc = NULL;
+      cp->pr->pirc = NULL;
+      cp->pr = NULL;
+    }
+
   /* remove this peer from migration considerations; schedule
      alternatives */
   next = mig_head;
@@ -2048,6 +2116,8 @@ shutdown_task (void *cls,
   cfg = NULL;  
   GNUNET_free_non_null (trustDirectory);
   trustDirectory = NULL;
+  GNUNET_SCHEDULER_cancel (cover_age_task);
+  cover_age_task = GNUNET_SCHEDULER_NO_TASK;
 }
 
 
@@ -2124,8 +2194,28 @@ transmit_to_peer (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping message, core too busy.\n");
 #endif
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "Dropping message, core too busy.\n");
       GNUNET_LOAD_update (cp->transmission_delay,
                          UINT64_MAX);
+      
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
+                                      cp->pending_messages_tail,
+                                      pm);
+         cp->pending_requests--;    
+         destroy_pending_message (pm, 0);
+       }
+      if (NULL != (pm = cp->pending_messages_head))
+       {
+         GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
+         min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
+         cp->delayed_transmission_request_task
+           = GNUNET_SCHEDULER_add_delayed (min_delay,
+                                           &delayed_transmission_request,
+                                           cp);
+       }
       return 0;
     }  
   GNUNET_LOAD_update (cp->transmission_delay,
@@ -2567,16 +2657,6 @@ target_reservation_cb (void *cls,
   uint32_t bm;
   unsigned int i;
 
-  pr->irc = NULL;
-  if (peer == NULL)
-    {
-      /* error in communication with core, try again later */
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
-                                                &forward_request_task,
-                                                pr);
-      return;
-    }
   /* (3) transmit, update ttl/priority */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                          &peer->hashPubKey);
@@ -2593,6 +2673,17 @@ target_reservation_cb (void *cls,
                                                 pr);
       return;
     }
+  cp->irc = NULL;
+  pr->pirc = NULL;
+  if (peer == NULL)
+    {
+      /* error in communication with core, try again later */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
+                                                &forward_request_task,
+                                                pr);
+      return;
+    }
   no_route = GNUNET_NO;
   if (amount == 0)
     {
@@ -2710,6 +2801,11 @@ struct PeerSelectionContext
    */
   double target_score;
 
+  /**
+   * Does it make sense to we re-try quickly again?
+   */
+  int fast_retry;
+
 };
 
 
@@ -2738,7 +2834,7 @@ target_peer_select_cb (void *cls,
   unsigned int pc;
 
   /* 1) check that this peer is not the initiator */
-  if (cp == pr->cp)
+  if (cp == pr->cp)     
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2746,6 +2842,11 @@ target_peer_select_cb (void *cls,
 #endif
       return GNUNET_YES; /* skip */       
     }
+  if (cp->irc != NULL)
+    {
+      psc->fast_retry = GNUNET_YES;
+      return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
+    }
 
   /* 2) check if we have already (recently) forwarded to this peer */
   /* 2a) this particular request */
@@ -2755,6 +2856,8 @@ target_peer_select_cb (void *cls,
       {
        pc = pr->used_targets[i].num_requests;
        GNUNET_assert (pc > 0);
+       /* FIXME: make re-enabling a peer independent of how often
+          this function is called??? */
        if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                           RETRY_PROBABILITY_INV * pc))
          {
@@ -2824,19 +2927,19 @@ target_peer_select_cb (void *cls,
   if (pr->target_pid == cp->pid)
     score += 100.0;
   /* store best-fit in closure */
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Peer `%s' gets score %f for forwarding query, max is %f\n",
-             GNUNET_h2s (key),
-             score,
-             psc->target_score);
-#endif  
   score++; /* avoid zero */
   if (score > psc->target_score)
     {
       psc->target_score = score;
       psc->target.hashPubKey = *key; 
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peer `%s' gets score %f for forwarding query, max is %8f\n",
+             GNUNET_h2s (key),
+             score,
+             psc->target_score);
+#endif
   return GNUNET_YES;
 }
   
@@ -2916,7 +3019,7 @@ forward_request_task (void *cls,
   struct GNUNET_TIME_Relative delay;
 
   pr->task = GNUNET_SCHEDULER_NO_TASK;
-  if (pr->irc != NULL)
+  if (pr->pirc != NULL)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2946,15 +3049,39 @@ forward_request_task (void *cls,
                                          &process_dht_reply,
                                          pr);
     }
+
+  if ( (pr->anonymity_level > 1) &&
+       (cover_query_count < pr->anonymity_level - 1) )
+    {
+      delay = get_processing_delay ();
+#if DEBUG_FS 
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
+                 GNUNET_h2s (&pr->query),
+                 delay.rel_value);
+#endif
+      pr->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &forward_request_task,
+                                              pr);
+      return;
+    }
+  /* consume cover traffic */
+  if (pr->anonymity_level > 1) 
+    cover_query_count -= pr->anonymity_level - 1;
+
   /* (1) select target */
   psc.pr = pr;
   psc.target_score = -DBL_MAX;
+  psc.fast_retry = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &target_peer_select_cb,
                                         &psc);  
   if (psc.target_score == -DBL_MAX)
     {
-      delay = get_processing_delay ();
+      if (psc.fast_retry == GNUNET_YES)
+       delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
+      else
+       delay = get_processing_delay ();
 #if DEBUG_FS 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
@@ -2996,7 +3123,10 @@ forward_request_task (void *cls,
       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                              &psc.target.hashPubKey);
       GNUNET_assert (NULL != cp);
-      pr->irc = GNUNET_CORE_peer_change_preference (core,
+      GNUNET_assert (cp->irc == NULL);
+      pr->pirc = cp;
+      cp->pr = pr;
+      cp->irc = GNUNET_CORE_peer_change_preference (core,
                                                    &psc.target,
                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
                                                    GNUNET_BANDWIDTH_value_init (UINT32_MAX),
@@ -3004,6 +3134,7 @@ forward_request_task (void *cls,
                                                    cp->inc_preference,
                                                    &target_reservation_cb,
                                                    pr);
+      GNUNET_assert (cp->irc != NULL);
       cp->inc_preference = 0;
     }
   else
@@ -3143,6 +3274,11 @@ struct ProcessReplyClosure
    */
   uint32_t priority;
 
+  /**
+   * Anonymity requirements for this reply.
+   */
+  uint32_t anonymity_level;
+
   /**
    * Evaluation result (returned).
    */
@@ -3187,6 +3323,22 @@ struct GNUNET_TIME_Relative art_delay;
   size_t msize;
   unsigned int i;
 
+  if (NULL == pr->client_request_list)
+    {
+      /* reply will go over the network, check for cover traffic */
+      if ( (prq->anonymity_level >  1) &&
+          (cover_content_count < prq->anonymity_level - 1) )
+       {
+         /* insufficient cover traffic, skip */
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# replies suppressed due to lack of cover traffic"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_YES;
+       }       
+      if (prq->anonymity_level >  1) 
+       cover_content_count -= prq->anonymity_level - 1;
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Matched result (type %u) for query `%s' with pending request\n",
@@ -3526,6 +3678,7 @@ handle_p2p_put (void *cls,
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
+  cover_content_count++;
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received result for query `%s' from peer `%4s'\n",
@@ -3547,6 +3700,7 @@ handle_p2p_put (void *cls,
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.anonymity_level = 1;
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -3558,7 +3712,7 @@ handle_p2p_put (void *cls,
       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
       change_host_trust (prq.sender, prq.priority);
     }
-  if ( (GNUNET_YES == active_migration) &&
+  if ( (GNUNET_YES == active_to_migration) &&
        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
     {      
 #if DEBUG_FS
@@ -3580,7 +3734,7 @@ handle_p2p_put (void *cls,
     }
   putl = GNUNET_LOAD_get_load (datastore_put_load);
   if ( (GNUNET_NO == prq.request_found) &&
-       ( (GNUNET_YES != active_migration) ||
+       ( (GNUNET_YES != active_to_migration) ||
                 (putl > 2.5 * (1 + prq.priority)) ) )
     {
       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
@@ -3588,7 +3742,7 @@ handle_p2p_put (void *cls,
       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
        return GNUNET_OK; /* already blocked */
       /* We're too busy; send MigrationStop message! */
-      if (GNUNET_YES != active_migration) 
+      if (GNUNET_YES != active_to_migration) 
        putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
                                                  5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
@@ -3828,6 +3982,7 @@ process_local_reply (void *cls,
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
   prq.request_found = GNUNET_NO;
+  prq.anonymity_level = anonymity;
   if ( (old_rf == 0) &&
        (pr->results_found == 0) )
     update_datastore_delays (pr->start_time);
@@ -4023,6 +4178,7 @@ handle_p2p_get (void *cls,
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
+  cover_query_count++;
   bm = ntohl (gm->hash_bitmap);
   bits = 0;
   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
@@ -4527,8 +4683,7 @@ main_init (struct GNUNET_SERVER_Handle *server,
        }
       return GNUNET_SYSERR;
     }
-  /* FIXME: distinguish between sending and storing in options? */
-  if (active_migration) 
+  if (active_from_migration) 
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                  _("Content migration is enabled, will start to gather data\n"));
@@ -4549,6 +4704,9 @@ main_init (struct GNUNET_SERVER_Handle *server,
 
 
   GNUNET_SERVER_add_handlers (server, handlers);
+  cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
+                                                &age_cover_counters,
+                                                NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                &shutdown_task,
                                NULL);
@@ -4568,9 +4726,12 @@ run (void *cls,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
-                                                          "FS",
-                                                          "ACTIVEMIGRATION");
+  active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                             "FS",
+                                                             "CONTENT_CACHING");
+  active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                                               "FS",
+                                                               "CONTENT_PUSHING");
   dsh = GNUNET_DATASTORE_connect (cfg);
   if (dsh == NULL)
     {