fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
index b8184262f1db16f7a85b4a7bb59098d4d5486c39..ccdd76de235fb1add6c72bd6d857fd95dd9d53d7 100644 (file)
@@ -760,7 +760,7 @@ is_closer (const GNUNET_HashCode *key,
  *            targets for (or NULL for none)
  * @param key ID of the peer 
  * @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
  */
 static int
 consider_migration (void *cls,
@@ -842,6 +842,12 @@ consider_migration (void *cls,
     }
   if (msize == 0)
     return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
   cp->cth 
     = GNUNET_CORE_notify_transmit_ready (core,
                                         0, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -880,7 +886,7 @@ consider_migration_gathering ()
     return;
   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
                                         mig_size);
-  delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+  delay = GNUNET_TIME_relative_divide (delay,
                                       MAX_MIGRATION_QUEUE);
   delay = GNUNET_TIME_relative_max (delay,
                                    min_migration_delay);
@@ -936,6 +942,12 @@ process_migration_content (void *cls,
        GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
   mb->query = *key;
   mb->expiration = expiration;
@@ -969,6 +981,7 @@ gather_migration_blocks (void *cls,
   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
                                        GNUNET_TIME_UNIT_FOREVER_REL,
                                        &process_migration_content, NULL);
+  GNUNET_assert (mig_qe != NULL);
 }
 
 
@@ -1222,17 +1235,17 @@ peer_disconnect_handler (void *cls,
            {
              GNUNET_PEER_change_rc (pos->target_list[i], -1);
              pos->target_list[i] = 0;
-             if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
-               {
-                 delete_migration_block (pos);
-                 consider_migration_gathering ();
-               }
-             GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
-                                                    &consider_migration,
-                                                    pos);
-             break;
-           }
+            }
+         }
+      if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+       {
+         delete_migration_block (pos);
+         consider_migration_gathering ();
+          continue;
        }
+      GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                            &consider_migration,
+                                            pos);
     }
 
   GNUNET_PEER_change_rc (cp->pid, -1);
@@ -1493,9 +1506,26 @@ transmit_to_peer (void *cls,
                  size -= sizeof (migm);
                  memcpy (&cbuf[msize], &mb[1], mb->size);
                  msize += mb->size;
-                 size -= mb->size;               
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
                  break;
                }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
            }
          if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
               (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
@@ -1572,7 +1602,10 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Failed to schedule transmission with core!\n");
 #endif
-      /* FIXME: call stats (rare, bad case) */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# CORE transmission failures"),
+                               1,
+                               GNUNET_NO);
     }
 }
 
@@ -2118,7 +2151,7 @@ forward_request_task (void *cls,
   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
                                                &psc.target,
                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
-                                               GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
+                                               GNUNET_BANDWIDTH_value_init (UINT32_MAX),
                                                DBLOCK_SIZE * 2, 
                                                (uint64_t) cp->inc_preference,
                                                &target_reservation_cb,
@@ -2259,6 +2292,11 @@ struct ProcessReplyClosure
    * How much was this reply worth to us?
    */
   uint32_t priority;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
 };
 
 
@@ -2311,7 +2349,8 @@ process_reply (void *cls,
       if (pr->cp != NULL)
        {
          GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
-                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+                                [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
+                                -1);
          GNUNET_PEER_change_rc (pr->cp->pid, 1);
          prq->sender->last_p2p_replies
            [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
@@ -2419,7 +2458,7 @@ process_reply (void *cls,
     }
   prq->priority += pr->remaining_priority;
   pr->remaining_priority = 0;
-  if (pr->client_request_list != NULL)
+  if (NULL != pr->client_request_list)
     {
       GNUNET_STATISTICS_update (stats,
                                gettext_noop ("# replies received for local clients"),
@@ -2455,7 +2494,10 @@ process_reply (void *cls,
        }
       GNUNET_break (cl->th != NULL);
       if (pr->do_remove)               
-       destroy_pending_request (pr);           
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
     }
   else
     {
@@ -2475,7 +2517,7 @@ process_reply (void *cls,
       reply->cont = &transmit_reply_continuation;
       reply->cont_cls = pr;
       reply->msize = msize;
-      reply->priority = (uint32_t) -1; /* send replies first! */
+      reply->priority = UINT32_MAX; /* send replies first! */
       pm = (struct PutMessage*) &reply[1];
       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
       pm->header.size = htons (msize);
@@ -2582,12 +2624,19 @@ handle_p2p_put (void *cls,
   prq.type = type;
   prq.expiration = expiration;
   prq.priority = 0;
+  prq.finished = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
                                              &query,
                                              &process_reply,
                                              &prq);
   if (GNUNET_YES == active_migration)
     {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
       GNUNET_DATASTORE_put (dsh,
                            0, &query, dsize, &put[1],
                            type, prq.priority, 1 /* anonymity */, 
@@ -2813,13 +2862,21 @@ process_local_reply (void *cls,
       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
       return;
     }
+  prq.type = type;
+  prq.priority = priority;  
+  prq.finished = GNUNET_NO;
+  process_reply (&prq, key, pr);
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
     {
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if ( (pr->client_request_list == NULL) &&
+  if ( (pr->client_request_list == NULL) &&
        ( (GNUNET_YES == test_load_too_high()) ||
         (pr->results_found > 5 + 2 * pr->priority) ) )
     {
@@ -2831,14 +2888,10 @@ process_local_reply (void *cls,
                                gettext_noop ("# processing result set cut short due to load"),
                                1,
                                GNUNET_NO);
-      if (pr->qe != NULL)
-       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
     }
-  else if (pr->qe != NULL)
-    GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-  prq.type = type;
-  prq.priority = priority;  
-  process_reply (&prq, key, pr);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
 }
 
 
@@ -3031,11 +3084,12 @@ handle_p2p_get (void *cls,
   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
                      (have_ns ? sizeof(GNUNET_HashCode) : 0));
   if (have_ns)
-    pr->namespace = (GNUNET_HashCode*) &pr[1];
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
   pr->type = type;
   pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
-    memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
 
@@ -3061,7 +3115,7 @@ handle_p2p_get (void *cls,
                                gettext_noop ("# requests dropped due TTL underflow"),
                                1,
                                GNUNET_NO);
-      /* integer underflow => drop (should be very rare)! */
+      /* integer underflow => drop (should be very rare)! */      
       GNUNET_free (pr);
       return GNUNET_OK;
     } 
@@ -3361,38 +3415,6 @@ handle_start_search (void *cls,
 
 /* **************************** Startup ************************ */
 
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
-  {
-    { &handle_p2p_get, 
-      GNUNET_MESSAGE_TYPE_FS_GET, 0 },
-    { &handle_p2p_put, 
-      GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
-    { NULL, 0, 0 }
-  };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
-  {&GNUNET_FS_handle_index_start, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
-  {&GNUNET_FS_handle_index_list_get, NULL, 
-   GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
-  {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
-   sizeof (struct UnindexMessage) },
-  {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
-   0 },
-  {NULL, NULL, 0, 0}
-};
-
-
 /**
  * Process fs requests.
  *
@@ -3405,6 +3427,26 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
           struct GNUNET_SERVER_Handle *server,
           const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+    {
+      { &handle_p2p_get, 
+       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+      { &handle_p2p_put, 
+       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+      { NULL, 0, 0 }
+    };
+  static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+    {&GNUNET_FS_handle_index_start, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+    {&GNUNET_FS_handle_index_list_get, NULL, 
+     GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+    {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
+     sizeof (struct UnindexMessage) },
+    {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
+     0 },
+    {NULL, NULL, 0, 0}
+  };
+
   sched = s;
   cfg = c;
   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
@@ -3445,7 +3487,11 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
     }
   /* FIXME: distinguish between sending and storing in options? */
   if (active_migration) 
-    consider_migration_gathering ();
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Content migration is enabled, will start to gather data\n"));
+      consider_migration_gathering ();
+    }
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,
                                   NULL);