plane hacking
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index 5831cf26a16916ab78584f401eda45d54ce69c2a..9fce6478c2e93b0a0bb0bee5807a6ecb1956d8f9 100644 (file)
@@ -24,8 +24,9 @@
  * @author Christian Grothoff
  *
  * TODO:
+ * - track per-peer request latency (using new load API)
+ * - consider more precise latency estimation (per-peer & request) -- again load API?
  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
- * - consider more precise latency estimation (per-peer & request)
  * - introduce random latency in processing
  * - tell other peers to stop migration if our PUTs fail (or if
  *   we don't support migration per configuration?)
@@ -35,7 +36,9 @@
 #include <float.h>
 #include "gnunet_constants.h"
 #include "gnunet_core_service.h"
+#include "gnunet_dht_service.h"
 #include "gnunet_datastore_service.h"
+#include "gnunet_load_lib.h"
 #include "gnunet_peer_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
  */
 #define MAX_QUEUE_PER_PEER 16
 
+/**
+ * Size for the hash map for DHT requests from the FS
+ * service.  Should be about the number of concurrent
+ * DHT requests we plan to make.
+ */
+#define FS_DHT_HT_SIZE 1024
+
 /**
  * How often do we flush trust values to disk?
  */
@@ -173,6 +183,12 @@ struct ConnectedPeer
    */ 
   struct GNUNET_TIME_Relative avg_delay;
 
+  /**
+   * Point in time until which this peer does not want us to migrate content
+   * to it.
+   */
+  struct GNUNET_TIME_Absolute migration_blocked;
+
   /**
    * Handle for an active request for transmission to this
    * peer, or NULL.
@@ -708,6 +724,11 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task;
  */
 static struct GNUNET_TIME_Relative min_migration_delay;
 
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *dht_handle;
+
 /**
  * Size of the doubly-linked list of migration blocks.
  */
@@ -730,6 +751,29 @@ static int active_migration;
  */
 static double current_priorities;
 
+/**
+ * Datastore load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_load;
+
+
+/**
+ * We've just now completed a datastore request.  Update our
+ * datastore load calculations.
+ *
+ * @param start time when the datastore request was issued
+ */
+static void
+update_datastore_delays (struct GNUNET_TIME_Absolute start)
+{
+  struct GNUNET_TIME_Relative delay;
+
+  delay = GNUNET_TIME_absolute_get_duration (start);
+  GNUNET_LOAD_update (datastore_load,
+                     delay.value);
+}
+
+
 /**
  * Get the filename under which we would store the GNUNET_HELLO_Message
  * for the given host and protocol.
@@ -768,7 +812,6 @@ transmit_to_peer (void *cls,
 
 /* ******************* clean up functions ************************ */
 
-
 /**
  * Delete the given migration block.
  *
@@ -831,6 +874,8 @@ consider_migration (void *cls,
   unsigned int repl;
   
   /* consider 'cp' as a migration target for mb */
+  if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
+    return GNUNET_YES; /* peer has requested no migration! */
   if (mb != NULL)
     {
       GNUNET_PEER_resolve (cp->pid,
@@ -986,7 +1031,7 @@ process_migration_content (void *cls,
        consider_migration_gathering ();
       return;
     }
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     {
       if (GNUNET_OK !=
          GNUNET_FS_handle_on_demand_block (key, size, data,
@@ -994,7 +1039,9 @@ process_migration_content (void *cls,
                                            expiration, uid, 
                                            &process_migration_content,
                                            NULL))
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
       return;
     }
 #if DEBUG_FS
@@ -1587,6 +1634,10 @@ shutdown_task (void *cls,
   while (mig_head != NULL)
     delete_migration_block (mig_head);
   GNUNET_assert (0 == mig_size);
+  GNUNET_DHT_disconnect (dht_handle);
+  dht_handle = NULL;
+  GNUNET_LOAD_value_free (datastore_load);
+  datastore_load = NULL;
   GNUNET_BLOCK_context_destroy (block_ctx);
   block_ctx = NULL;
   GNUNET_CONFIGURATION_destroy (block_cfg);
@@ -2297,6 +2348,21 @@ forward_request_task (void *cls,
     }
   if (GNUNET_YES == pr->local_only)
     return; /* configured to not do P2P search */
+  /* (0) try DHT */
+  if (0 == pr->anonymity_level)
+    {
+#if 0      
+      /* DHT API needs fixing... */
+      pr->dht_get = GNUNET_DHT_get_start (dht_handle,
+                                         GNUNET_TIME_UNIT_FOREVER_REL,
+                                         pr->type,
+                                         &pr->query,
+                                         &process_dht_reply,
+                                         pr,
+                                         FIXME,
+                                         FIXME);
+#endif                                   
+    }
   /* (1) select target */
   psc.pr = pr;
   psc.target_score = -DBL_MAX;
@@ -2376,14 +2442,14 @@ transmit_reply_continuation (void *cls,
   
   switch (pr->type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one reply expected, done with the request! */
       destroy_pending_request (pr);
       break;
     case GNUNET_BLOCK_TYPE_ANY:
-    case GNUNET_BLOCK_TYPE_KBLOCK:
-    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_KBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_SBLOCK:
       break;
     default:
       GNUNET_break (0);
@@ -2475,12 +2541,6 @@ struct ProcessReplyClosure
    */
   size_t size;
 
-  /**
-   * Namespace that this reply belongs to
-   * (if it is of type SBLOCK).
-   */
-  GNUNET_HashCode namespace;
-
   /**
    * Type of the block.
    */
@@ -2491,6 +2551,11 @@ struct ProcessReplyClosure
    */
   uint32_t priority;
 
+  /**
+   * Evaluation result (returned).
+   */
+  enum GNUNET_BLOCK_EvaluationResult eval;
+
   /**
    * Did we finish processing the associated request?
    */ 
@@ -2519,7 +2584,6 @@ process_reply (void *cls,
   struct PutMessage *pm;
   struct ConnectedPeer *cp;
   struct GNUNET_TIME_Relative cur_delay;
-  enum GNUNET_BLOCK_EvaluationResult eval;
   size_t msize;
 
 #if DEBUG_FS
@@ -2565,15 +2629,15 @@ process_reply (void *cls,
          GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
        }
     }
-  eval = GNUNET_BLOCK_evaluate (block_ctx,
-                               prq->type,
-                               key,
-                               &pr->bf,
-                               pr->mingle,
-                               pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
-                               prq->data,
-                               prq->size);
-  switch (eval)
+  prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+                                    prq->type,
+                                    key,
+                                    &pr->bf,
+                                    pr->mingle,
+                                    pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+                                    prq->data,
+                                    prq->size);
+  switch (prq->eval)
     {
     case GNUNET_BLOCK_EVALUATION_OK_MORE:
       break;
@@ -2636,8 +2700,21 @@ process_reply (void *cls,
                          &pr->replies_seen[pr->replies_seen_off++]);         
       refresh_bloomfilter (pr);
     }
+  if (NULL == prq->sender)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found result for query `%s' in local datastore\n",
+                 GNUNET_h2s (key));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# results found locally"),
+                               1,
+                               GNUNET_NO);      
+    }
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
+  pr->results_found++;
   if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
@@ -2753,7 +2830,6 @@ handle_p2p_put (void *cls,
   struct GNUNET_TIME_Absolute expiration;
   GNUNET_HashCode query;
   struct ProcessReplyClosure prq;
-  const struct SBlock *sb;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -2766,7 +2842,7 @@ handle_p2p_put (void *cls,
   type = ntohl (put->type);
   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
 
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     return GNUNET_SYSERR;
   if (GNUNET_OK !=
       GNUNET_BLOCK_get_key (block_ctx,
@@ -2778,14 +2854,6 @@ handle_p2p_put (void *cls,
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
     }
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) &put[1];
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
-
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received result for query `%s' from peer `%4s'\n",
@@ -2838,6 +2906,30 @@ handle_p2p_put (void *cls,
 }
 
 
+/**
+ * Handle P2P "MIGRATION_STOP" message.
+ *
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ *        for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other' 
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_p2p_migration_stop (void *cls,
+                          const struct GNUNET_PeerIdentity *other,
+                          const struct GNUNET_MessageHeader *message,
+                          struct GNUNET_TIME_Relative latency,
+                          uint32_t distance)
+{
+  // FIXME!
+}
+
+
+
 /* **************************** P2P GET Handling ************************ */
 
 
@@ -2923,10 +3015,8 @@ process_local_reply (void *cls,
   struct PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
   struct CheckDuplicateRequestClosure cdrc;
-  const struct SBlock *sb;
-  GNUNET_HashCode dhash;
-  GNUNET_HashCode mhash;
   GNUNET_HashCode query;
+  unsigned int old_rf;
   
   if (NULL == key)
     {
@@ -2973,7 +3063,7 @@ process_local_reply (void *cls,
              GNUNET_h2s (key),
              type);
 #endif
-  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2989,52 +3079,16 @@ process_local_reply (void *cls,
                                            &process_local_reply,
                                            pr))
       if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-      return;
-    }
-  /* check for duplicates */
-  GNUNET_CRYPTO_hash (data, size, &dhash);
-  GNUNET_BLOCK_mingle_hash (&dhash, 
-                           pr->mingle,
-                           &mhash);
-  if ( (pr->bf != NULL) &&
-       (GNUNET_YES ==
-       GNUNET_CONTAINER_bloomfilter_test (pr->bf,
-                                          &mhash)) )
-    {      
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result from datastore filtered by bloomfilter (duplicate).\n");
-#endif
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# results filtered by query bloomfilter"),
-                               1,
-                               GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
       return;
     }
-#if DEBUG_FS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found result for query `%s' in local datastore\n",
-             GNUNET_h2s (key));
-#endif
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# results found locally"),
-                           1,
-                           GNUNET_NO);
-  pr->results_found++;
+  old_rf = pr->results_found;
   memset (&prq, 0, sizeof (prq));
   prq.data = data;
   prq.expiration = expiration;
   prq.size = size;  
-  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
-    { 
-      sb = (const struct SBlock*) data;
-      GNUNET_CRYPTO_hash (&sb->subspace,
-                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                         &prq.namespace);
-    }
   if (GNUNET_OK != 
       GNUNET_BLOCK_get_key (block_ctx,
                            type,
@@ -3056,12 +3110,14 @@ process_local_reply (void *cls,
   prq.priority = priority;  
   prq.finished = GNUNET_NO;
   process_reply (&prq, key, pr);
+  if ( (old_rf == 0) &&
+       (pr->results_found == 1) )
+    update_datastore_delays (pr->start_time);
   if (prq.finished == GNUNET_YES)
     return;
   if (pr->qe == NULL)
     return; /* done here */
-  if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
+  if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
     {
       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
@@ -3209,12 +3265,6 @@ handle_p2p_get (void *cls,
   opt = (const GNUNET_HashCode*) &gm[1];
   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
   bm = ntohl (gm->hash_bitmap);
-  if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
-       (type != GNUNET_BLOCK_TYPE_SBLOCK) )
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;      
-    }
   bits = 0;
   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                           &other->hashPubKey);
@@ -3295,7 +3345,6 @@ handle_p2p_get (void *cls,
   pr->mingle = ntohl (gm->filter_mutator);
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-
   pr->anonymity_level = 1;
   pr->priority = bound_priority (ntohl (gm->priority), cps);
   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
@@ -3400,7 +3449,7 @@ handle_p2p_get (void *cls,
   /* calculate change in traffic preference */
   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
   /* process locally */
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
                                           (pr->priority + 1)); 
@@ -3416,8 +3465,8 @@ handle_p2p_get (void *cls,
   /* Are multiple results possible?  If so, start processing remotely now! */
   switch (pr->type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       /* only one result, wait for datastore */
       break;
     default:
@@ -3497,8 +3546,8 @@ handle_start_search (void *cls,
       client_list = cl;
     }
   /* detect duplicate KBLOCK requests */
-  if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
-       (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
+  if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
+       (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_ANY) )
     {
       crl = cl->rl_head;
@@ -3540,7 +3589,7 @@ handle_start_search (void *cls,
                            1,
                            GNUNET_NO);
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
+                     ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
   memset (crl, 0, sizeof (struct ClientRequestList));
   crl->client_list = cl;
@@ -3558,6 +3607,7 @@ handle_start_search (void *cls,
          sc * sizeof (GNUNET_HashCode));
   pr->replies_seen_off = sc;
   pr->anonymity_level = ntohl (sm->anonymity_level); 
+  pr->start_time = GNUNET_TIME_absolute_get ();
   refresh_bloomfilter (pr);
   pr->query = sm->query;
   if (0 == (1 & ntohl (sm->options)))
@@ -3566,14 +3616,14 @@ handle_start_search (void *cls,
     pr->local_only = GNUNET_YES;
   switch (type)
     {
-    case GNUNET_BLOCK_TYPE_DBLOCK:
-    case GNUNET_BLOCK_TYPE_IBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
       if (0 != memcmp (&sm->target,
                       &all_zeros,
                       sizeof (GNUNET_HashCode)))
        pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
       break;
-    case GNUNET_BLOCK_TYPE_SBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_SBLOCK:
       pr->namespace = (GNUNET_HashCode*) &pr[1];
       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
       break;
@@ -3585,7 +3635,7 @@ handle_start_search (void *cls,
                                                   &sm->query,
                                                   pr,
                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
   pr->qe = GNUNET_DATASTORE_get (dsh,
                                 &sm->query,
@@ -3617,6 +3667,9 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
        GNUNET_MESSAGE_TYPE_FS_GET, 0 },
       { &handle_p2p_put, 
        GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { &handle_p2p_migration_stop, 
+       GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+       sizeof (struct MigrationStopMessage) },
       { NULL, 0, 0 }
     };
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
@@ -3744,6 +3797,7 @@ run (void *cls,
       GNUNET_SCHEDULER_shutdown (sched);
       return;
     }
+  datastore_load = GNUNET_LOAD_value_init ();
   block_cfg = GNUNET_CONFIGURATION_create ();
   GNUNET_CONFIGURATION_set_value_string (block_cfg,
                                         "block",
@@ -3751,16 +3805,23 @@ run (void *cls,
                                         "fs");
   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
   GNUNET_assert (NULL != block_ctx);
+  dht_handle = GNUNET_DHT_connect (sched,
+                                  cfg,
+                                  FS_DHT_HT_SIZE);
   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
        (GNUNET_OK != main_init (sched, server, cfg)) )
     {    
       GNUNET_SCHEDULER_shutdown (sched);
       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
       dsh = NULL;
+      GNUNET_DHT_disconnect (dht_handle);
+      dht_handle = NULL;
       GNUNET_BLOCK_context_destroy (block_ctx);
       block_ctx = NULL;
       GNUNET_CONFIGURATION_destroy (block_cfg);
       block_cfg = NULL;
+      GNUNET_LOAD_value_free (datastore_load);
+      datastore_load = NULL;
       return;   
     }
 }