arg
[oweals/gnunet.git] / src / fs / gnunet-service-fs_push.c
index c1f5205108bf1dc81b6bcef6a71b4641da67fbe5..339f98616e598336089d6fa437ef6e0ae74d0a0d 100644 (file)
@@ -31,6 +31,8 @@
 #include "gnunet-service-fs_push.h"
 
 
+#define DEBUG_FS_MIGRATION GNUNET_NO
+
 /**
  * How long must content remain valid for us to consider it for migration?  
  * If content will expire too soon, there is clearly no point in pushing
@@ -219,6 +221,10 @@ transmit_message (void *cls,
   peer->msg = NULL;
   if (buf == NULL)
     {
+#if DEBUG_FS_MIGRATION
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Failed to migrate content to another peer (disconnect)\n");
+#endif
       GNUNET_free (msg);
       return 0;
     }
@@ -226,6 +232,11 @@ transmit_message (void *cls,
   GNUNET_assert (msize <= buf_size);
   memcpy (buf, msg, msize);
   GNUNET_free (msg);
+#if DEBUG_FS_MIGRATION
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Pushing %u bytes to another peer\n",
+             msize);
+#endif
   find_content (peer);
   return msize;
 }
@@ -252,9 +263,10 @@ transmit_content (struct MigrationReadyPeer *peer,
   GNUNET_assert (NULL == peer->th);
   msize = sizeof (struct PutMessage) + block->size;
   msg = GNUNET_malloc (msize);
-  msg->header.type = htons (42);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
   msg->header.size = htons (msize);
-  
+  msg->type = htonl (block->type);
+  msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
   memcpy (&msg[1],
          &block[1],
          block->size);
@@ -277,6 +289,11 @@ transmit_content (struct MigrationReadyPeer *peer,
     {
       ret = GNUNET_NO;
     }
+#if DEBUG_FS_MIGRATION
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Asking for transmission of %u bytes for migration\n",
+             msize);
+#endif
   peer->th = GSF_peer_transmit_ (peer->peer,
                                 GNUNET_NO,
                                 0 /* priority */,
@@ -372,13 +389,23 @@ find_content (struct MigrationReadyPeer *mrp)
        }
       pos = pos->next;
     }
-  if ( (NULL == best) &&
-       (mig_size >= MAX_MIGRATION_QUEUE) )
+  if (NULL == best) 
     {
+      if (mig_size < MAX_MIGRATION_QUEUE)
+       {
+#if DEBUG_FS_MIGRATION
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "No content found for pushing, waiting for queue to fill\n");
+#endif
+         return; /* will fill up eventually... */
+       }
+#if DEBUG_FS_MIGRATION
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No suitable content found, purging content from full queue\n");
+#endif
       /* failed to find migration target AND
         queue is full, purge most-forwarded
         block from queue to make room for more */
-      score = 0;
       pos = mig_head;
       while (NULL != pos)
        {
@@ -391,10 +418,14 @@ find_content (struct MigrationReadyPeer *mrp)
          pos = pos->next;
        }
       GNUNET_assert (NULL != best);
-      delete_migration_block (best);      
+      delete_migration_block (best);
       consider_gathering ();
       return;
     }
+#if DEBUG_FS_MIGRATION
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Preparing to push best content to peer\n");
+#endif
   transmit_content (mrp, best);
 }
 
@@ -434,6 +465,11 @@ consider_gathering ()
                                       MAX_MIGRATION_QUEUE);
   delay = GNUNET_TIME_relative_max (delay,
                                    min_migration_delay);
+#if DEBUG_FS_MIGRATION
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Scheduling gathering task (queue size: %u)\n",
+             mig_size);
+#endif
   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
                                           &gather_migration_blocks,
                                           NULL);
@@ -456,7 +492,7 @@ consider_gathering ()
  */
 static void
 process_migration_content (void *cls,
-                          const GNUNET_HashCode * key,
+                          const GNUNET_HashCode *key,
                           size_t size,
                           const void *data,
                           enum GNUNET_BLOCK_Type type,
@@ -468,17 +504,21 @@ process_migration_content (void *cls,
   struct MigrationReadyBlock *mb;
   struct MigrationReadyPeer *pos;
   
+  mig_qe = NULL;
   if (key == NULL)
     {
-      mig_qe = NULL;
+#if DEBUG_FS_MIGRATION
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "No content found for migration...\n");
+#endif
       consider_gathering ();
       return;
     }
   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
       MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
     {
-      /* content will expire soon, don't bother */
-      GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
+      /* content will expire soon, don't bother */      
+      consider_gathering ();
       return;
     }
   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
@@ -488,17 +528,17 @@ process_migration_content (void *cls,
                                            type, priority, anonymity,
                                            expiration, uid, 
                                            &process_migration_content,
-                                           NULL))
-       {
-         GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
-       }
+                                           NULL))      
+       consider_gathering ();  
       return;
     }
-#if DEBUG_FS
+#if DEBUG_FS_MIGRATION
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Retrieved block `%s' of type %u for migration\n",
+             "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
              GNUNET_h2s (key),
-             type);
+             type,
+             mig_size + 1,
+             MAX_MIGRATION_QUEUE);
 #endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
@@ -516,12 +556,16 @@ process_migration_content (void *cls,
     {
       if (NULL == pos->th)
        {
+#if DEBUG_FS_MIGRATION
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Preparing to push best content to peer\n");
+#endif
          if (GNUNET_YES == transmit_content (pos, mb))
            break; /* 'mb' was freed! */
        }
       pos = pos->next;
     }
-  GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
+  consider_gathering ();
 }
 
 
@@ -537,13 +581,21 @@ gather_migration_blocks (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   mig_task = GNUNET_SCHEDULER_NO_TASK;
+  if (mig_size >= MAX_MIGRATION_QUEUE)  
+    return;
   if (GSF_dsh != NULL)
     {
-      mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, 
-                                           0, UINT_MAX,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           &process_migration_content, NULL);
-      GNUNET_assert (mig_qe != NULL);
+#if DEBUG_FS_MIGRATION
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Asking datastore for content for replication (queue size: %u)\n",
+                 mig_size);
+#endif
+      mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, 
+                                                    0, UINT_MAX,
+                                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                                    &process_migration_content, NULL);
+      if (NULL == mig_qe)
+       consider_gathering ();
     }
 }
 
@@ -601,22 +653,18 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
 
 /**
  * Setup the module.
- * 
- * @param cfg configuration to use
  */
 void
-GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
+GSF_push_init_ ()
 {
-  int enabled;
-
-  enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+  enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
                                                  "FS",
                                                  "CONTENT_PUSHING");
   if (GNUNET_YES != enabled)
     return;
  
   if (GNUNET_OK != 
-      GNUNET_CONFIGURATION_get_value_time (cfg,
+      GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
                                           "fs",
                                           "MIN_MIGRATION_DELAY",
                                           &min_migration_delay))