fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index a08a041da8147a562ec7d47373ba3e2f8136c08c..ccdd76de235fb1add6c72bd6d857fd95dd9d53d7 100644 (file)
  * - TTL/priority calculations are absent!
  * TODO:
  * - have non-zero preference / priority for requests we initiate!
- * - track stats for hot-path routing
  * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high, validate_nblock
- * - add content migration support (store locally) [or create new service]
+ * - implement: bound_priority, test_load_too_high
  * - statistics
  */
 #include "platform.h"
@@ -482,6 +480,7 @@ struct PendingRequest
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
+
    * Size of the 'bf' (in bytes).
    */
   size_t bf_size;
@@ -585,11 +584,22 @@ struct MigrationReadyBlock
    */
   struct GNUNET_TIME_Absolute expiration;
 
+  /**
+   * Peers we would consider forwarding this
+   * block to.  Zero for empty entries.
+   */
+  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
   /**
    * Size of the block.
    */
   size_t size;
 
+  /**
+   *  Number of targets already used.
+   */
+  unsigned int used_targets;
+
   /**
    * Type of the block.
    */
@@ -684,6 +694,25 @@ static unsigned int mig_size;
  */
 static int active_migration;
 
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf".  "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime.  In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+                 size_t size, void *buf);
+
+
 /* ******************* clean up functions ************************ */
 
 
@@ -698,35 +727,138 @@ delete_migration_block (struct MigrationReadyBlock *mb)
   GNUNET_CONTAINER_DLL_remove (mig_head,
                               mig_tail,
                               mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
   mig_size--;
   GNUNET_free (mb);
 }
 
 
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+          const struct GNUNET_PeerIdentity *p1,
+          const struct GNUNET_PeerIdentity *p2)
+{
+  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+                                   &p2->hashPubKey,
+                                   key);
+}
+
+
 /**
  * Consider migrating content to a given peer.
  *
- * @param cls not used
- * @param key ID of the peer (not used)
+ * @param cls 'struct MigrationReadyBlock*' to select
+ *            targets for (or NULL for none)
+ * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
  */
 static int
 consider_migration (void *cls,
                    const GNUNET_HashCode *key,
                    void *value)
 {
+  struct MigrationReadyBlock *mb = cls;
   struct ConnectedPeer *cp = value;
+  struct MigrationReadyBlock *pos;
+  struct GNUNET_PeerIdentity cppid;
+  struct GNUNET_PeerIdentity otherpid;
+  struct GNUNET_PeerIdentity worstpid;
+  size_t msize;
+  unsigned int i;
+  unsigned int repl;
   
+  /* consider 'cp' as a migration target for mb */
+  if (mb != NULL)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &cppid);
+      repl = MIGRATION_LIST_SIZE;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (mb->target_list[i] == 0)
+           {
+             mb->target_list[i] = cp->pid;
+             GNUNET_PEER_change_rc (mb->target_list[i], 1);
+             repl = MIGRATION_LIST_SIZE;
+             break;
+           }
+         GNUNET_PEER_resolve (mb->target_list[i],
+                              &otherpid);
+         if ( (repl == MIGRATION_LIST_SIZE) &&
+              is_closer (&mb->query,
+                         &cppid,
+                         &otherpid)) 
+           {
+             repl = i;
+             worstpid = otherpid;
+           }
+         else if ( (repl != MIGRATION_LIST_SIZE) &&
+                   (is_closer (&mb->query,
+                               &worstpid,
+                               &otherpid) ) )
+           {
+             repl = i;
+             worstpid = otherpid;
+           }       
+       }
+      if (repl != MIGRATION_LIST_SIZE) 
+       {
+         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+         mb->target_list[repl] = cp->pid;
+         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+       }
+    }
+
+  /* consider scheduling transmission to cp for content migration */
   if (cp->cth != NULL)
-    return GNUNET_YES; /* or what? */
-  /* FIXME: not implemented! */
+    return GNUNET_YES; 
+  msize = 0;
+  pos = mig_head;
+  while (pos != NULL)
+    {
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (cp->pid == pos->target_list[i])
+           {
+             if (msize == 0)
+               msize = pos->size;
+             else
+               msize = GNUNET_MIN (msize,
+                                   pos->size);
+             break;
+           }
+       }
+      pos = pos->next;
+    }
+  if (msize == 0)
+    return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
+  cp->cth 
+    = GNUNET_CORE_notify_transmit_ready (core,
+                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
+                                        (const struct GNUNET_PeerIdentity*) key,
+                                        msize + sizeof (struct PutMessage),
+                                        &transmit_to_peer,
+                                        cp);
   return GNUNET_YES;
 }
 
 
-
-
 /**
  * Task that is run periodically to obtain blocks for content
  * migration
@@ -739,6 +871,32 @@ gather_migration_blocks (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (mig_qe != NULL)
+    return;
+  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (delay,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
+  mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                          delay,
+                                          &gather_migration_blocks,
+                                          NULL);
+}
+
+
 /**
  * Process content offered for migration.
  *
@@ -765,26 +923,31 @@ process_migration_content (void *cls,
                           expiration, uint64_t uid)
 {
   struct MigrationReadyBlock *mb;
-  struct GNUNET_TIME_Relative delay;
   
   if (key == NULL)
     {
       mig_qe = NULL;
       if (mig_size < MAX_MIGRATION_QUEUE)  
-       {
-         delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                                mig_size);
-         delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
-                                              MAX_MIGRATION_QUEUE);
-         delay = GNUNET_TIME_relative_max (delay,
-                                           min_migration_delay);
-         mig_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                                  delay,
-                                                  &gather_migration_blocks,
-                                                  NULL);
-       }
+       consider_migration_gathering ();
       return;
     }
+  if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+    {
+      if (GNUNET_OK !=
+         GNUNET_FS_handle_on_demand_block (key, size, data,
+                                           type, priority, anonymity,
+                                           expiration, uid, 
+                                           &process_migration_content,
+                                           NULL))
+       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
+    }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
   mb->expiration = expiration;
@@ -796,10 +959,9 @@ process_migration_content (void *cls,
                                     mig_tail,
                                     mb);
   mig_size++;
-  if (mig_size == 1)
-    GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                          &consider_migration,
-                                          NULL);
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &consider_migration,
+                                        mb);
   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
@@ -819,6 +981,7 @@ gather_migration_blocks (void *cls,
   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
                                        GNUNET_TIME_UNIT_FOREVER_REL,
                                        &process_migration_content, NULL);
+  GNUNET_assert (mig_qe != NULL);
 }
 
 
@@ -978,7 +1141,8 @@ peer_connect_handler (void *cls,
                      uint32_t distance)
 {
   struct ConnectedPeer *cp;
-
+  struct MigrationReadyBlock *pos;
+  
   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
   cp->pid = GNUNET_PEER_intern (peer);
   GNUNET_break (GNUNET_OK ==
@@ -986,8 +1150,13 @@ peer_connect_handler (void *cls,
                                                   &peer->hashPubKey,
                                                   cp,
                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  if (mig_size > 0)
-    (void) consider_migration (NULL, &peer->hashPubKey, cp);
+
+  pos = mig_head;
+  while (NULL != pos)
+    {
+      (void) consider_migration (pos, &peer->hashPubKey, cp);
+      pos = pos->next;
+    }
 }
 
 
@@ -1031,6 +1200,8 @@ peer_disconnect_handler (void *cls,
   struct ConnectedPeer *cp;
   struct PendingMessage *pm;
   unsigned int i;
+  struct MigrationReadyBlock *pos;
+  struct MigrationReadyBlock *next;
 
   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
                                              &peer->hashPubKey,
@@ -1052,6 +1223,31 @@ peer_disconnect_handler (void *cls,
                GNUNET_CONTAINER_multihashmap_remove (connected_peers,
                                                      &peer->hashPubKey,
                                                      cp));
+  /* remove this peer from migration considerations; schedule
+     alternatives */
+  next = mig_head;
+  while (NULL != (pos = next))
+    {
+      next = pos->next;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (pos->target_list[i] == cp->pid)
+           {
+             GNUNET_PEER_change_rc (pos->target_list[i], -1);
+             pos->target_list[i] = 0;
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
+       }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
+    }
+
   GNUNET_PEER_change_rc (cp->pid, -1);
   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
   if (NULL != cp->cth)
@@ -1231,7 +1427,7 @@ shutdown_task (void *cls,
 
 
 /**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
  * for writing in the meantime.  In that case, do nothing
  * (the disconnect or shutdown handler will take care of the rest).
@@ -1251,7 +1447,11 @@ transmit_to_peer (void *cls,
   char *cbuf = buf;
   struct GNUNET_PeerIdentity pid;
   struct PendingMessage *pm;
+  struct MigrationReadyBlock *mb;
+  struct MigrationReadyBlock *next;
+  struct PutMessage migm;
   size_t msize;
+  unsigned int i;
  
   cp->cth = NULL;
   if (NULL == buf)
@@ -1283,6 +1483,61 @@ transmit_to_peer (void *cls,
                                                   &transmit_to_peer,
                                                   cp);
     }
+  else
+    {      
+      next = mig_head;
+      while (NULL != (mb = next))
+       {
+         next = mb->next;
+         for (i=0;i<MIGRATION_LIST_SIZE;i++)
+           {
+             if ( (cp->pid == mb->target_list[i]) &&
+                  (mb->size + sizeof (migm) <= size) )
+               {
+                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
+                 mb->target_list[i] = 0;
+                 mb->used_targets++;
+                 migm.header.size = htons (sizeof (migm) + mb->size);
+                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+                 migm.type = htonl (mb->type);
+                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+                 memcpy (&cbuf[msize], &migm, sizeof (migm));
+                 msize += sizeof (migm);
+                 size -= sizeof (migm);
+                 memcpy (&cbuf[msize], &mb[1], mb->size);
+                 msize += mb->size;
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+                 break;
+               }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
+           }
+         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+           {
+             delete_migration_block (mb);
+             consider_migration_gathering ();
+           }
+       }
+      consider_migration (NULL, 
+                         &pid.hashPubKey,
+                         cp);
+    }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Transmitting %u bytes to peer %u\n",
@@ -1330,25 +1585,27 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
   cp->pending_requests++;
   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
     destroy_pending_message (cp->pending_messages_tail, 0);  
-  if (cp->cth == NULL)
-    {
-      /* need to schedule transmission */
-      GNUNET_PEER_resolve (cp->pid, &pid);
-      cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-                                                  cp->pending_messages_head->priority,
-                                                  MAX_TRANSMIT_DELAY,
-                                                  &pid,
-                                                  cp->pending_messages_head->msize,
-                                                  &transmit_to_peer,
-                                                  cp);
-    }
+  GNUNET_PEER_resolve (cp->pid, &pid);
+  if (NULL != cp->cth)
+    GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+  /* need to schedule transmission */
+  cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+                                              cp->pending_messages_head->priority,
+                                              MAX_TRANSMIT_DELAY,
+                                              &pid,
+                                              cp->pending_messages_head->msize,
+                                              &transmit_to_peer,
+                                              cp);
   if (cp->cth == NULL)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Failed to schedule transmission with core!\n");
 #endif
-      /* FIXME: call stats (rare, bad case) */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
     }
 }
 
@@ -1894,7 +2151,7 @@ forward_request_task (void *cls,
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
                                                &psc.target,
                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
                                                DBLOCK_SIZE * 2, 
                                                (uint64_t) cp->inc_preference,
                                                &target_reservation_cb,
@@ -2005,7 +2262,10 @@ struct ProcessReplyClosure
    */
   const void *data;
 
-  // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
+  /**
+   * Who gave us this reply? NULL for local host.
+   */
+  struct ConnectedPeer *sender;
 
   /**
    * When the reply expires.
@@ -2032,6 +2292,11 @@ struct ProcessReplyClosure
    * How much was this reply worth to us?
    */
   uint32_t priority;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
 };
 
 
@@ -2055,6 +2320,7 @@ process_reply (void *cls,
   struct ClientList *cl;
   struct PutMessage *pm;
   struct ConnectedPeer *cp;
+  struct GNUNET_TIME_Relative cur_delay;
   GNUNET_HashCode chash;
   GNUNET_HashCode mhash;
   size_t msize;
@@ -2069,6 +2335,39 @@ process_reply (void *cls,
                            gettext_noop ("# replies received and matched"),
                            1,
                            GNUNET_NO);
+  if (prq->sender != NULL)
+    {
+      /* FIXME: should we be more precise here and not use
+        "start_time" but a peer-specific time stamp? */
+      cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
+      prq->sender->avg_delay.value
+       = (prq->sender->avg_delay.value * 
+          (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
+      prq->sender->avg_priority
+       = (prq->sender->avg_priority * 
+          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+      if (pr->cp != NULL)
+       {
+         GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
+         GNUNET_PEER_change_rc (pr->cp->pid, 1);
+         prq->sender->last_p2p_replies
+           [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+           = pr->cp->pid;
+       }
+      else
+       {
+         if (NULL != prq->sender->last_client_replies
+             [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+           GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+                                      [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+         prq->sender->last_client_replies
+           [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+           = pr->client_request_list->client_list->client;
+         GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+       }
+    }
   GNUNET_CRYPTO_hash (prq->data,
                      prq->size,
                      &chash);
@@ -2159,7 +2458,7 @@ process_reply (void *cls,
     }
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
-  if (pr->client_request_list != NULL)
+  if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
                                gettext_noop ("# replies received for local clients"),
@@ -2195,7 +2494,10 @@ process_reply (void *cls,
        }
       GNUNET_break (cl->th != NULL);
       if (pr->do_remove)               
-       destroy_pending_request (pr);           
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
     }
   else
     {
@@ -2215,7 +2517,7 @@ process_reply (void *cls,
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
+      reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
@@ -2224,12 +2526,10 @@ process_reply (void *cls,
       memcpy (&pm[1], prq->data, prq->size);
       add_to_pending_messages_for_peer (cp, reply, pr);
     }
-  // FIXME: implement hot-path routing statistics keeping!
   return GNUNET_YES;
 }
 
 
-
 /**
  * Continuation called to notify client about result of the
  * operation.
@@ -2317,16 +2617,26 @@ handle_p2p_put (void *cls,
                            GNUNET_NO);
   /* now, lookup 'query' */
   prq.data = (const void*) &put[1];
+  if (other != NULL)
+    prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                                   &other->hashPubKey);
   prq.size = dsize;
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.finished = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
                                              &prq);
   if (GNUNET_YES == active_migration)
     {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2554,13 +2864,16 @@ process_local_reply (void *cls,
     }
   prq.type = type;
   prq.priority = priority;  
+  prq.finished = GNUNET_NO;
   process_reply (&prq, key, pr);
-
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
     }
   if ( (pr->client_request_list == NULL) &&
@@ -2575,12 +2888,10 @@ process_local_reply (void *cls,
                                gettext_noop ("# processing result set cut short due to load"),
                                1,
                                GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
       return;
     }
-  if (pr->qe != NULL)
-    GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
@@ -2773,11 +3084,12 @@ handle_p2p_get (void *cls,
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
                      (have_ns ? sizeof(GNUNET_HashCode) : 0));
   if (have_ns)
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
 
@@ -2803,7 +3115,7 @@ handle_p2p_get (void *cls,
                                gettext_noop ("# requests dropped due TTL underflow"),
                                1,
                                GNUNET_NO);
-      /* integer underflow => drop (should be very rare)! */
+      /* integer underflow => drop (should be very rare)! */      
       GNUNET_free (pr);
       return GNUNET_OK;
     } 
@@ -3103,38 +3415,6 @@ handle_start_search (void *cls,
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
@@ -3147,6 +3427,26 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
           struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
@@ -3188,9 +3488,9 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
   /* FIXME: distinguish between sending and storing in options? */
   if (active_migration) 
     {
-      mig_task = GNUNET_SCHEDULER_add_now (sched,
-                                          &gather_migration_blocks,
-                                          NULL);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
     }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,