fix migration support
authorChristian Grothoff <christian@grothoff.org>
Thu, 31 Mar 2011 15:35:00 +0000 (15:35 +0000)
committerChristian Grothoff <christian@grothoff.org>
Thu, 31 Mar 2011 15:35:00 +0000 (15:35 +0000)
src/fs/gnunet-service-fs_cp.c
src/fs/gnunet-service-fs_cp.h
src/fs/gnunet-service-fs_pr.c
src/fs/gnunet-service-fs_push.c

index 3298a5216fd60c2ca57e36e79106ba2e353266ad..702442bb248dac4f82225da2a7031d81fe653a65 100644 (file)
@@ -509,6 +509,7 @@ revive_migration (void *cls,
  * Get a handle for a connected peer.
  *
  * @param peer peer's identity
+ * @return NULL if the peer is not currently connected
  */
 struct GSF_ConnectedPeer *
 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
@@ -548,6 +549,10 @@ GSF_handle_p2p_migration_stop_ (void *cls,
       return GNUNET_OK;
     }
   bt = GNUNET_TIME_relative_ntoh (msm->duration);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             _("Migration of content to peer `%s' blocked for %llu ms\n"),
+             GNUNET_i2s (other),
+             (unsigned long long) bt.rel_value);
   cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
   if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
     {
index 7b8cf40abaaa616adf76a7c8d2d2841a680c7d41..2925dd8ae3d91d47fc3df442d5f8e83dd4e79ffb 100644 (file)
@@ -110,6 +110,10 @@ struct GSF_PeerPerformanceData
    */
   unsigned int pending_replies;
 
+  /**
+   * How many of the last blocks from migration were duplicates?
+   */
+  unsigned int migration_duplication;
 };
 
 
@@ -175,6 +179,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
  * Get a handle for a connected peer.
  *
  * @param peer peer's identity
+ * @return NULL if this peer is not currently connected
  */
 struct GSF_ConnectedPeer *
 GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer);
index ed156daf57b8e5afceb3735f2e33fd9a0f2ed161..f6b31c4e2bb3c6f4b5551d21a0c99d93541d6d59 100644 (file)
@@ -759,6 +759,30 @@ process_reply (void *cls,
 }
 
 
+/**
+ * Context for the 'put_migration_continuation'.
+ */
+struct PutMigrationContext
+{
+
+  /**
+   * Start time for the operation.
+   */
+  struct GNUNET_TIME_Absolute start;
+
+  /**
+   * Request origin.
+   */
+  struct GNUNET_PeerIdentity origin;
+
+  /**
+   * GNUNET_YES if we had a matching request for this block,
+   * GNUNET_NO if not.
+   */
+  int requested;
+};
+
+
 /**
  * Continuation called to notify client about result of the
  * operation.
@@ -772,11 +796,37 @@ put_migration_continuation (void *cls,
                            int success,
                            const char *msg)
 {
-  struct GNUNET_TIME_Absolute *start = cls;
+  struct PutMigrationContext *pmc = cls;
   struct GNUNET_TIME_Relative delay;
-  
-  delay = GNUNET_TIME_absolute_get_duration (*start);
-  GNUNET_free (start);
+  struct GNUNET_TIME_Relative block_time;  
+  struct GSF_ConnectedPeer *cp;
+  struct GSF_PeerPerformanceData *ppd;
+                        
+  delay = GNUNET_TIME_absolute_get_duration (pmc->start);
+  cp = GSF_peer_get_ (&pmc->origin);
+  if ( (GNUNET_OK != success) &&
+       (GNUNET_NO == pmc->requested) )
+    {
+      /* block migration for a bit... */
+      if (NULL != cp)
+       {
+         ppd = GSF_get_peer_performance_data_ (cp);
+         ppd->migration_duplication++;
+         block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                                     5 * ppd->migration_duplication + 
+                                                     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5));
+         GSF_block_peer_migration_ (cp, block_time);
+       }
+    }
+  else
+    {
+      if (NULL != cp)
+       {
+         ppd = GSF_get_peer_performance_data_ (cp);
+         ppd->migration_duplication = 0; /* reset counter */
+       }
+    }
+  GNUNET_free (pmc);
   /* FIXME: should we really update the load value on failure? */
   GNUNET_LOAD_update (datastore_put_load,
                      delay.rel_value);
@@ -842,7 +892,7 @@ handle_dht_reply (void *cls,
 {
   struct GSF_PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
-  struct GNUNET_TIME_Absolute *start;
+  struct PutMigrationContext *pmc;
 
   memset (&prq, 0, sizeof (prq));
   prq.data = data;
@@ -859,8 +909,9 @@ handle_dht_reply (void *cls,
                  GNUNET_h2s (key),
                  prq.priority);
 #endif
-      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
-      *start = GNUNET_TIME_absolute_get ();
+      pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
+      pmc->start = GNUNET_TIME_absolute_get ();
+      pmc->requested = GNUNET_YES;
       GNUNET_DATASTORE_put (GSF_dsh,
                            0, key, size, data,
                            type, prq.priority, 1 /* anonymity */, 
@@ -868,7 +919,7 @@ handle_dht_reply (void *cls,
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
-                           start);
+                           pmc);
     }
 }
 
@@ -1124,7 +1175,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
   struct ProcessReplyClosure prq;
   struct GNUNET_TIME_Relative block_time;  
   double putl;
-  struct GNUNET_TIME_Absolute *start;
+  struct PutMigrationContext *pmc;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PutMessage))
@@ -1178,8 +1229,11 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
                  GNUNET_h2s (&query),
                  prq.priority);
 #endif
-      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
-      *start = GNUNET_TIME_absolute_get ();
+      pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
+      pmc->start = GNUNET_TIME_absolute_get ();
+      pmc->requested = prq.request_found;
+      GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
+                          &pmc->origin);
       GNUNET_DATASTORE_put (GSF_dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -1187,7 +1241,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
                            1 + prq.priority, MAX_DATASTORE_QUEUE,
                            GNUNET_CONSTANTS_SERVICE_TIMEOUT,
                            &put_migration_continuation, 
-                           start);
+                           pmc);
     }
   else
     {
@@ -1232,6 +1286,9 @@ GSF_pending_request_init_ ()
                  _("Configuration fails to specify `%s', assuming default value."),
                  "MAX_PENDING_REQUESTS");
     }
+  active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
+                                                             "FS",
+                                                             "CONTENT_CACHING");
   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
index b96ee3d1c9af08a9beee347d2d44a2b1fa989e8e..38d91683299ff0c91210f0f7af2695b20a0f3a32 100644 (file)
@@ -525,9 +525,11 @@ process_migration_content (void *cls,
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Retrieved block `%s' of type %u for migration\n",
+             "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
              GNUNET_h2s (key),
-             type);
+             type,
+             mig_size + 1,
+             MIGRATION_LIST_SIZE);
 #endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
@@ -570,6 +572,8 @@ gather_migration_blocks (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   mig_task = GNUNET_SCHEDULER_NO_TASK;
+  if (mig_size >= MAX_MIGRATION_QUEUE)  
+    return;
   if (GSF_dsh != NULL)
     {
       mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,