shutdown callback
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index c5a47973c2d6b3efe51a4cdfeb01a0926c551688..788817f5b0adcf326d8de832d08cc817d44eec65 100644 (file)
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
- * FIXME:
- * - TTL/priority calculations are absent!
  * TODO:
- * - have non-zero preference / priority for requests we initiate!
- * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high
- * - statistics
+ * - trust not properly received and pushed back to peerinfo!
+ * - bound_priority by priorities used by other peers
+ * - have a way to drop queries based on load
+ * - introduce random latency in processing
+ * - consider more precise latency estimation (per-peer & request)
+ * - better algorithm for priority selection for requests we initiate?
+ * - tell other peers to stop migration if our PUTs fail (or if
+ *   we don't support migration per configuration?)
+ * - more statistics
  */
 #include "platform.h"
 #include <float.h>
 #include "gnunet-service-fs_indexing.h"
 #include "fs.h"
 
-#define DEBUG_FS GNUNET_YES
+#define DEBUG_FS GNUNET_NO
 
 /**
  * Maximum number of outgoing messages we queue per peer.
- * FIXME: make configurable?
  */
 #define MAX_QUEUE_PER_PEER 16
 
@@ -204,9 +206,14 @@ struct ConnectedPeer
 
   /**
    * Increase in traffic preference still to be submitted
-   * to the core service for this peer. FIXME: double or 'uint64_t'?
+   * to the core service for this peer.
    */
-  double inc_preference;
+  uint64_t inc_preference;
+
+  /**
+   * Trust delta to still commit to the system.
+   */
+  uint32_t trust_delta;
 
   /**
    * The peer's identity.
@@ -480,7 +487,6 @@ struct PendingRequest
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
-
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -760,7 +766,7 @@ is_closer (const GNUNET_HashCode *key,
  *            targets for (or NULL for none)
  * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
  */
 static int
 consider_migration (void *cls,
@@ -842,11 +848,12 @@ consider_migration (void *cls,
     }
   if (msize == 0)
     return GNUNET_YES; /* no content available */
+#if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Trying to migrate `%s' (%u bytes) to `%s'\n",
-             GNUNET_h2s (&mb->query),
+             "Trying to migrate at least %u bytes to peer `%s'\n",
              msize,
-             GNUNET_i2s (&cppid));
+             GNUNET_h2s (key));
+#endif
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1246,7 +1253,11 @@ peer_disconnect_handler (void *cls,
                                             &consider_migration,
                                             pos);
     }
-
+  if (cp->trust_delta > 0)
+    {
+      /* FIXME: push trust back to peerinfo! 
+        (need better peerinfo API!) */
+    }
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
@@ -1855,11 +1866,6 @@ target_reservation_cb (void *cls,
       return;
     }
   no_route = GNUNET_NO;
-  /* FIXME: check against DBLOCK_SIZE and possibly return
-     amount to reserve; however, this also needs to work
-     with testcases which currently start out with a far
-     too low per-peer bw limit, so they would never send
-     anything.  Big issue. */
   if (amount == 0)
     {
       if (pr->cp == NULL)
@@ -1996,7 +2002,13 @@ target_peer_select_cb (void *cls,
 
   /* 1) check that this peer is not the initiator */
   if (cp == pr->cp)
-    return GNUNET_YES; /* skip */         
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Skipping initiator in forwarding selection\n");
+#endif
+      return GNUNET_YES; /* skip */       
+    }
 
   /* 2) check if we have already (recently) forwarded to this peer */
   pc = 0;
@@ -2021,13 +2033,47 @@ target_peer_select_cb (void *cls,
                "Re-trying query that was previously transmitted %u times to this peer\n",
                (unsigned int) pc);
 #endif
-  // 3) calculate how much we'd like to forward to this peer
-  score = 42; // FIXME!
-  // FIXME: also need API to gather data on responsiveness
-  // of this peer (we have fields for that in 'cp', but
-  // they are never set!)
-  
+  /* 3) calculate how much we'd like to forward to this peer,
+     starting with a random value that is strong enough
+     to at least give any peer a chance sometimes 
+     (compared to the other factors that come later) */
+  /* 3a) count successful (recent) routes from cp for same source */
+  if (pr->cp != NULL)
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       P2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_p2p_replies[i] == pr->cp->pid)
+         score += 1; /* likely successful based on hot path */
+    }
+  else
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       CS2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
+         score += 1; /* likely successful based on hot path */
+    }
+  /* 3b) include latency */
+  if (cp->avg_delay.value < 4 * TTL_DECREMENT)
+    score += 1; /* likely fast based on latency */
+  /* 3c) include priorities */
+  if (cp->avg_priority <= pr->remaining_priority / 2.0)
+    score += 1; /* likely successful based on priorities */
+  /* 3d) penalize for queue size */  
+  score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
+  /* 3e) include peer proximity */
+  score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
+                                                   &pr->query)) / (double) UINT32_MAX);
   /* store best-fit in closure */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peer `%s' gets score %f for forwarding query, max is %f\n",
+             GNUNET_h2s (key),
+             score,
+             psc->target_score);
+#endif  
+  score++; /* avoid zero */
   if (score > psc->target_score)
     {
       psc->target_score = score;
@@ -2065,7 +2111,7 @@ bound_ttl (int32_t ttl_in, uint32_t prio)
 
 
 /**
- * We're processing a GET request from another peer and have decided
+ * We're processing a GET request and have decided
  * to forward it to other peers.  This function is called periodically
  * and should forward the request to other peers until we have all
  * possible replies.  If we have transmitted the *only* reply to
@@ -2099,11 +2145,11 @@ forward_request_task (void *cls,
     return; /* configured to not do P2P search */
   /* (1) select target */
   psc.pr = pr;
-  psc.target_score = DBL_MIN;
+  psc.target_score = -DBL_MAX;
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &target_peer_select_cb,
                                         &psc);  
-  if (psc.target_score == DBL_MIN)
+  if (psc.target_score == -DBL_MAX)
     {
       delay = get_processing_delay ();
 #if DEBUG_FS 
@@ -2119,7 +2165,6 @@ forward_request_task (void *cls,
       return; /* nobody selected */
     }
   /* (3) update TTL/priority */
-  
   if (pr->client_request_list != NULL)
     {
       /* FIXME: use better algorithm!? */
@@ -2138,10 +2183,6 @@ forward_request_task (void *cls,
                  pr->ttl);
 #endif
     }
-  else
-    {
-      /* FIXME: should we do something here as well!? */
-    }
 
   /* (3) reserve reply bandwidth */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
@@ -2150,12 +2191,12 @@ forward_request_task (void *cls,
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
                                                &psc.target,
                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
                                                DBLOCK_SIZE * 2, 
-                                               (uint64_t) cp->inc_preference,
+                                               cp->inc_preference,
                                                &target_reservation_cb,
                                                pr);
-  cp->inc_preference = 0.0;
+  cp->inc_preference = 0;
 }
 
 
@@ -2348,7 +2389,8 @@ process_reply (void *cls,
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
          GNUNET_PEER_change_rc (pr->cp->pid, 1);
          prq->sender->last_p2p_replies
            [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
@@ -2515,7 +2557,7 @@ process_reply (void *cls,
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
+      reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
@@ -2618,6 +2660,8 @@ handle_p2p_put (void *cls,
   if (other != NULL)
     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                                    &other->hashPubKey);
+  else
+    prq.sender = NULL;
   prq.size = dsize;
   prq.type = type;
   prq.expiration = expiration;
@@ -2627,6 +2671,11 @@ handle_p2p_put (void *cls,
                                              &query,
                                              &process_reply,
                                              &prq);
+  if (prq.sender != NULL)
+    {
+      prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
+      prq.sender->trust_delta += prq.priority;
+    }
   if (GNUNET_YES == active_migration)
     {
 #if DEBUG_FS
@@ -2905,7 +2954,13 @@ static uint32_t
 bound_priority (uint32_t prio_in,
                struct ConnectedPeer *cp)
 {
-  return 0; // FIXME!
+  if (cp->trust_delta > prio_in)
+    {
+      cp->trust_delta -= prio_in;
+      return prio_in;
+    }
+  // FIXME: get out trust in the target peer from peerinfo!
+  return 0; 
 }
 
 
@@ -2971,7 +3026,6 @@ handle_p2p_get (void *cls,
   size_t bfsize;
   uint32_t ttl_decrement;
   enum GNUNET_BLOCK_Type type;
-  double preference;
   int have_ns;
 
   msize = ntohs(message->size);
@@ -3082,11 +3136,12 @@ handle_p2p_get (void *cls,
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
                      (have_ns ? sizeof(GNUNET_HashCode) : 0));
   if (have_ns)
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
 
@@ -3192,11 +3247,7 @@ handle_p2p_get (void *cls,
                            GNUNET_NO);
 
   /* calculate change in traffic preference */
-  preference = (double) pr->priority;
-  if (preference < QUERY_BANDWIDTH_VALUE)
-    preference = QUERY_BANDWIDTH_VALUE;
-  cps->inc_preference += preference;
-
+  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
   /* process locally */
   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
@@ -3412,38 +3463,6 @@ handle_start_search (void *cls,
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
@@ -3456,6 +3475,26 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
           struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);