shutdown callback
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index c2ab1d9f817bc40025fc83c811a4370deab1d401..788817f5b0adcf326d8de832d08cc817d44eec65 100644 (file)
  * @brief gnunet anonymity protocol implementation
  * @author Christian Grothoff
  *
- * FIXME:
- * - TTL/priority calculations are absent!
  * TODO:
- * - have non-zero preference / priority for requests we initiate!
- * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high
- * - statistics
+ * - trust not properly received and pushed back to peerinfo!
+ * - bound_priority by priorities used by other peers
+ * - have a way to drop queries based on load
+ * - introduce random latency in processing
+ * - consider more precise latency estimation (per-peer & request)
+ * - better algorithm for priority selection for requests we initiate?
+ * - tell other peers to stop migration if our PUTs fail (or if
+ *   we don't support migration per configuration?)
+ * - more statistics
  */
 #include "platform.h"
 #include <float.h>
@@ -48,7 +51,6 @@
 
 /**
  * Maximum number of outgoing messages we queue per peer.
- * FIXME: make configurable?
  */
 #define MAX_QUEUE_PER_PEER 16
 
@@ -204,9 +206,14 @@ struct ConnectedPeer
 
   /**
    * Increase in traffic preference still to be submitted
-   * to the core service for this peer. FIXME: double or 'uint64_t'?
+   * to the core service for this peer.
    */
-  double inc_preference;
+  uint64_t inc_preference;
+
+  /**
+   * Trust delta to still commit to the system.
+   */
+  uint32_t trust_delta;
 
   /**
    * The peer's identity.
@@ -480,7 +487,6 @@ struct PendingRequest
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
-
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -760,7 +766,7 @@ is_closer (const GNUNET_HashCode *key,
  *            targets for (or NULL for none)
  * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
  */
 static int
 consider_migration (void *cls,
@@ -842,6 +848,12 @@ consider_migration (void *cls,
     }
   if (msize == 0)
     return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -880,7 +892,7 @@ consider_migration_gathering ()
     return;
   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
                                         mig_size);
-  delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+  delay = GNUNET_TIME_relative_divide (delay,
                                       MAX_MIGRATION_QUEUE);
   delay = GNUNET_TIME_relative_max (delay,
                                    min_migration_delay);
@@ -936,6 +948,12 @@ process_migration_content (void *cls,
        GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
   mb->expiration = expiration;
@@ -1223,19 +1241,23 @@ peer_disconnect_handler (void *cls,
            {
              GNUNET_PEER_change_rc (pos->target_list[i], -1);
              pos->target_list[i] = 0;
-             if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
-               {
-                 delete_migration_block (pos);
-                 consider_migration_gathering ();
-               }
-             GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                                    &consider_migration,
-                                                    pos);
-             break;
-           }
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
        }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
+    }
+  if (cp->trust_delta > 0)
+    {
+      /* FIXME: push trust back to peerinfo! 
+        (need better peerinfo API!) */
     }
-
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
@@ -1494,9 +1516,26 @@ transmit_to_peer (void *cls,
                  size -= sizeof (migm);
                  memcpy (&cbuf[msize], &mb[1], mb->size);
                  msize += mb->size;
-                 size -= mb->size;               
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
                  break;
                }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
            }
          if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
               (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
@@ -1573,7 +1612,10 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Failed to schedule transmission with core!\n");
 #endif
-      /* FIXME: call stats (rare, bad case) */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
     }
 }
 
@@ -1824,11 +1866,6 @@ target_reservation_cb (void *cls,
       return;
     }
   no_route = GNUNET_NO;
-  /* FIXME: check against DBLOCK_SIZE and possibly return
-     amount to reserve; however, this also needs to work
-     with testcases which currently start out with a far
-     too low per-peer bw limit, so they would never send
-     anything.  Big issue. */
   if (amount == 0)
     {
       if (pr->cp == NULL)
@@ -1965,7 +2002,13 @@ target_peer_select_cb (void *cls,
 
   /* 1) check that this peer is not the initiator */
   if (cp == pr->cp)
-    return GNUNET_YES; /* skip */         
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Skipping initiator in forwarding selection\n");
+#endif
+      return GNUNET_YES; /* skip */       
+    }
 
   /* 2) check if we have already (recently) forwarded to this peer */
   pc = 0;
@@ -1990,13 +2033,47 @@ target_peer_select_cb (void *cls,
                "Re-trying query that was previously transmitted %u times to this peer\n",
                (unsigned int) pc);
 #endif
-  // 3) calculate how much we'd like to forward to this peer
-  score = 42; // FIXME!
-  // FIXME: also need API to gather data on responsiveness
-  // of this peer (we have fields for that in 'cp', but
-  // they are never set!)
-  
+  /* 3) calculate how much we'd like to forward to this peer,
+     starting with a random value that is strong enough
+     to at least give any peer a chance sometimes 
+     (compared to the other factors that come later) */
+  /* 3a) count successful (recent) routes from cp for same source */
+  if (pr->cp != NULL)
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       P2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_p2p_replies[i] == pr->cp->pid)
+         score += 1; /* likely successful based on hot path */
+    }
+  else
+    {
+      score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                       CS2P_SUCCESS_LIST_SIZE);
+      for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+       if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
+         score += 1; /* likely successful based on hot path */
+    }
+  /* 3b) include latency */
+  if (cp->avg_delay.value < 4 * TTL_DECREMENT)
+    score += 1; /* likely fast based on latency */
+  /* 3c) include priorities */
+  if (cp->avg_priority <= pr->remaining_priority / 2.0)
+    score += 1; /* likely successful based on priorities */
+  /* 3d) penalize for queue size */  
+  score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
+  /* 3e) include peer proximity */
+  score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
+                                                   &pr->query)) / (double) UINT32_MAX);
   /* store best-fit in closure */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peer `%s' gets score %f for forwarding query, max is %f\n",
+             GNUNET_h2s (key),
+             score,
+             psc->target_score);
+#endif  
+  score++; /* avoid zero */
   if (score > psc->target_score)
     {
       psc->target_score = score;
@@ -2034,7 +2111,7 @@ bound_ttl (int32_t ttl_in, uint32_t prio)
 
 
 /**
- * We're processing a GET request from another peer and have decided
+ * We're processing a GET request and have decided
  * to forward it to other peers.  This function is called periodically
  * and should forward the request to other peers until we have all
  * possible replies.  If we have transmitted the *only* reply to
@@ -2068,11 +2145,11 @@ forward_request_task (void *cls,
     return; /* configured to not do P2P search */
   /* (1) select target */
   psc.pr = pr;
-  psc.target_score = DBL_MIN;
+  psc.target_score = -DBL_MAX;
   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
                                         &target_peer_select_cb,
                                         &psc);  
-  if (psc.target_score == DBL_MIN)
+  if (psc.target_score == -DBL_MAX)
     {
       delay = get_processing_delay ();
 #if DEBUG_FS 
@@ -2088,7 +2165,6 @@ forward_request_task (void *cls,
       return; /* nobody selected */
     }
   /* (3) update TTL/priority */
-  
   if (pr->client_request_list != NULL)
     {
       /* FIXME: use better algorithm!? */
@@ -2107,10 +2183,6 @@ forward_request_task (void *cls,
                  pr->ttl);
 #endif
     }
-  else
-    {
-      /* FIXME: should we do something here as well!? */
-    }
 
   /* (3) reserve reply bandwidth */
   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
@@ -2119,12 +2191,12 @@ forward_request_task (void *cls,
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
                                                &psc.target,
                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
                                                DBLOCK_SIZE * 2, 
-                                               (uint64_t) cp->inc_preference,
+                                               cp->inc_preference,
                                                &target_reservation_cb,
                                                pr);
-  cp->inc_preference = 0.0;
+  cp->inc_preference = 0;
 }
 
 
@@ -2260,6 +2332,11 @@ struct ProcessReplyClosure
    * How much was this reply worth to us?
    */
   uint32_t priority;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
 };
 
 
@@ -2312,7 +2389,8 @@ process_reply (void *cls,
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
          GNUNET_PEER_change_rc (pr->cp->pid, 1);
          prq->sender->last_p2p_replies
            [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
@@ -2420,7 +2498,7 @@ process_reply (void *cls,
     }
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
-  if (pr->client_request_list != NULL)
+  if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
                                gettext_noop ("# replies received for local clients"),
@@ -2456,7 +2534,10 @@ process_reply (void *cls,
        }
       GNUNET_break (cl->th != NULL);
       if (pr->do_remove)               
-       destroy_pending_request (pr);           
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
     }
   else
     {
@@ -2476,7 +2557,7 @@ process_reply (void *cls,
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
+      reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
@@ -2579,16 +2660,30 @@ handle_p2p_put (void *cls,
   if (other != NULL)
     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
                                                    &other->hashPubKey);
+  else
+    prq.sender = NULL;
   prq.size = dsize;
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.finished = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
                                              &prq);
+  if (prq.sender != NULL)
+    {
+      prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
+      prq.sender->trust_delta += prq.priority;
+    }
   if (GNUNET_YES == active_migration)
     {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2814,13 +2909,21 @@ process_local_reply (void *cls,
       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
+  prq.type = type;
+  prq.priority = priority;  
+  prq.finished = GNUNET_NO;
+  process_reply (&prq, key, pr);
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if ( (pr->client_request_list == NULL) &&
+  if ( (pr->client_request_list == NULL) &&
        ( (GNUNET_YES == test_load_too_high()) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
@@ -2832,14 +2935,10 @@ process_local_reply (void *cls,
                                gettext_noop ("# processing result set cut short due to load"),
                                1,
                                GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if (pr->qe != NULL)
-    GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-  prq.type = type;
-  prq.priority = priority;  
-  process_reply (&prq, key, pr);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
@@ -2855,7 +2954,13 @@ static uint32_t
 bound_priority (uint32_t prio_in,
                struct ConnectedPeer *cp)
 {
-  return 0; // FIXME!
+  if (cp->trust_delta > prio_in)
+    {
+      cp->trust_delta -= prio_in;
+      return prio_in;
+    }
+  // FIXME: get out trust in the target peer from peerinfo!
+  return 0; 
 }
 
 
@@ -2921,7 +3026,6 @@ handle_p2p_get (void *cls,
   size_t bfsize;
   uint32_t ttl_decrement;
   enum GNUNET_BLOCK_Type type;
-  double preference;
   int have_ns;
 
   msize = ntohs(message->size);
@@ -3032,11 +3136,12 @@ handle_p2p_get (void *cls,
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
                      (have_ns ? sizeof(GNUNET_HashCode) : 0));
   if (have_ns)
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
 
@@ -3062,7 +3167,7 @@ handle_p2p_get (void *cls,
                                gettext_noop ("# requests dropped due TTL underflow"),
                                1,
                                GNUNET_NO);
-      /* integer underflow => drop (should be very rare)! */
+      /* integer underflow => drop (should be very rare)! */      
       GNUNET_free (pr);
       return GNUNET_OK;
     } 
@@ -3142,11 +3247,7 @@ handle_p2p_get (void *cls,
                            GNUNET_NO);
 
   /* calculate change in traffic preference */
-  preference = (double) pr->priority;
-  if (preference < QUERY_BANDWIDTH_VALUE)
-    preference = QUERY_BANDWIDTH_VALUE;
-  cps->inc_preference += preference;
-
+  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
   /* process locally */
   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
@@ -3362,38 +3463,6 @@ handle_start_search (void *cls,
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
@@ -3406,6 +3475,26 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
           struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
@@ -3446,7 +3535,11 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
     }
   /* FIXME: distinguish between sending and storing in options? */
   if (active_migration) 
-    consider_migration_gathering ();
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
+    }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);