Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
index f8a7b61f0fc1119fde9d70dd673f9bb87eb51194..b0fda24b55cfd72185d4beba0ab4ddb134373b19 100644 (file)
@@ -97,9 +97,9 @@ struct GSF_PendingRequest
   struct GNUNET_HashCode *replies_seen;
 
   /**
-   * Bloomfilter masking replies we've already seen.
+   * Block group for filtering replies we've already seen.
    */
-  struct GNUNET_CONTAINER_BloomFilter *bf;
+  struct GNUNET_BLOCK_Group *bg;
 
   /**
    * Entry for this pending request in the expiration heap, or NULL.
@@ -189,11 +189,6 @@ struct GSF_PendingRequest
    */
   unsigned int replies_seen_size;
 
-  /**
-   * Mingle value we currently use for the bf.
-   */
-  uint32_t mingle;
-
   /**
    * Do we have a first UID yet?
    */
@@ -248,18 +243,36 @@ static unsigned long long max_pending_requests = (32 * 1024);
  * fresh one of minimal size without problems) OR if our peer is the
  * initiator (in which case we may resize to larger than mimimum size).
  *
+ * @param type type of the request
  * @param pr request for which the BF is to be recomputed
  */
 static void
-refresh_bloomfilter (struct GSF_PendingRequest *pr)
+refresh_bloomfilter (enum GNUNET_BLOCK_Type type,
+                     struct GSF_PendingRequest *pr)
 {
-  if (pr->bf != NULL)
-    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-  pr->mingle =
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
-  pr->bf =
-      GNUNET_BLOCK_construct_bloomfilter (pr->mingle, pr->replies_seen,
-                                          pr->replies_seen_count);
+  if (NULL != pr->bg)
+  {
+    GNUNET_BLOCK_group_destroy (pr->bg);
+    pr->bg = NULL;
+  }
+  if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
+    return; /* no need */
+  pr->bg
+    = GNUNET_BLOCK_group_create (GSF_block_ctx,
+                                 type,
+                                 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                           UINT32_MAX),
+                                 NULL,
+                                 0,
+                                 "seen-set-size",
+                                 pr->replies_seen_count,
+                                 NULL);
+  if (NULL == pr->bg)
+    return;
+  GNUNET_break (GNUNET_OK ==
+                GNUNET_BLOCK_group_set_seen (pr->bg,
+                                             pr->replies_seen,
+                                             pr->replies_seen_count));
 }
 
 
@@ -355,25 +368,31 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
   if (replies_seen_count > 0)
   {
     pr->replies_seen_size = replies_seen_count;
-    pr->replies_seen =
-        GNUNET_malloc (sizeof (struct GNUNET_HashCode) * pr->replies_seen_size);
+    pr->replies_seen = GNUNET_new_array (pr->replies_seen_size,
+                                         struct GNUNET_HashCode);
     GNUNET_memcpy (pr->replies_seen,
-            replies_seen,
-            replies_seen_count * sizeof (struct GNUNET_HashCode));
+                   replies_seen,
+                   replies_seen_count * sizeof (struct GNUNET_HashCode));
     pr->replies_seen_count = replies_seen_count;
   }
-  if (NULL != bf_data)
+  if ( (NULL != bf_data) &&
+       (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type) )
   {
-    pr->bf =
-        GNUNET_CONTAINER_bloomfilter_init (bf_data,
-                                           bf_size,
-                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
-    pr->mingle = mingle;
+    pr->bg
+      =  GNUNET_BLOCK_group_create (GSF_block_ctx,
+                                    pr->public_data.type,
+                                    mingle,
+                                    bf_data,
+                                    bf_size,
+                                    "seen-set-size",
+                                    0,
+                                    NULL);
   }
   else if ((replies_seen_count > 0) &&
            (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
   {
-    refresh_bloomfilter (pr);
+    refresh_bloomfilter (pr->public_data.type,
+                         pr);
   }
   GNUNET_CONTAINER_multihashmap_put (pr_map,
                                     &pr->public_data.query,
@@ -461,46 +480,37 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
                              const struct GNUNET_HashCode * replies_seen,
                              unsigned int replies_seen_count)
 {
-  unsigned int i;
-  struct GNUNET_HashCode mhash;
-
   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
     return;                     /* integer overflow */
   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
   {
     /* we're responsible for the BF, full refresh */
     if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
-      GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size,
+      GNUNET_array_grow (pr->replies_seen,
+                         pr->replies_seen_size,
                          replies_seen_count + pr->replies_seen_count);
-    GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen,
-            sizeof (struct GNUNET_HashCode) * replies_seen_count);
+    GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count],
+                   replies_seen,
+                   sizeof (struct GNUNET_HashCode) * replies_seen_count);
     pr->replies_seen_count += replies_seen_count;
-    refresh_bloomfilter (pr);
+    refresh_bloomfilter (pr->public_data.type,
+                         pr);
   }
   else
   {
-    if (NULL == pr->bf)
+    if (NULL == pr->bg)
     {
       /* we're not the initiator, but the initiator did not give us
        * any bloom-filter, so we need to create one on-the-fly */
-      pr->mingle =
-          GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                    UINT32_MAX);
-      pr->bf =
-          GNUNET_BLOCK_construct_bloomfilter (pr->mingle,
-                                              replies_seen,
-                                              replies_seen_count);
+      refresh_bloomfilter (pr->public_data.type,
+                           pr);
     }
     else
     {
-      for (i = 0; i < pr->replies_seen_count; i++)
-      {
-        GNUNET_BLOCK_mingle_hash (&replies_seen[i],
-                                  pr->mingle,
-                                  &mhash);
-        GNUNET_CONTAINER_bloomfilter_add (pr->bf,
-                                          &mhash);
-      }
+      GNUNET_break (GNUNET_OK ==
+                    GNUNET_BLOCK_group_set_seen (pr->bg,
+                                                 replies_seen,
+                                                 pr->replies_seen_count));
     }
   }
   if (NULL != pr->gh)
@@ -530,6 +540,8 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
   struct GNUNET_TIME_Absolute now;
   int64_t ttl;
   int do_route;
+  void *bf_data;
+  uint32_t bf_nonce;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Building request message for `%s' of type %d\n",
@@ -553,7 +565,15 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
     bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
     k++;
   }
-  bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
+  if (GNUNET_OK !=
+      GNUNET_BLOCK_group_serialize (pr->bg,
+                                    &bf_nonce,
+                                    &bf_data,
+                                    &bf_size))
+  {
+    bf_size = 0;
+    bf_data = NULL;
+  }
   env = GNUNET_MQ_msg_extra (gm,
                             bf_size + k * sizeof (struct GNUNET_PeerIdentity),
                             GNUNET_MESSAGE_TYPE_FS_GET);
@@ -571,7 +591,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
   now = GNUNET_TIME_absolute_get ();
   ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
   gm->ttl = htonl (ttl / 1000LL / 1000LL);
-  gm->filter_mutator = htonl (pr->mingle);
+  gm->filter_mutator = htonl (bf_nonce);
   gm->hash_bitmap = htonl (bm);
   gm->query = pr->public_data.query;
   ext = (struct GNUNET_PeerIdentity *) &gm[1];
@@ -581,11 +601,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
                          &ext[k++]);
   if (NULL != pr->public_data.target)
     ext[k++] = *pr->public_data.target;
-  if (NULL != pr->bf)
-    GNUNET_assert (GNUNET_SYSERR !=
-                   GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
-                                                              (char *) &ext[k],
-                                                              bf_size));
+  GNUNET_memcpy (&ext[k],
+                 bf_data,
+                 bf_size);
+  GNUNET_free_non_null (bf_data);
   return env;
 }
 
@@ -624,11 +643,8 @@ clean_request (void *cls,
   }
   GSF_plan_notify_request_done_ (pr);
   GNUNET_free_non_null (pr->replies_seen);
-  if (NULL != pr->bf)
-  {
-    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
-    pr->bf = NULL;
-  }
+  GNUNET_BLOCK_group_destroy (pr->bg);
+  pr->bg = NULL;
   GNUNET_PEER_change_rc (pr->sender_pid, -1);
   pr->sender_pid = 0;
   GNUNET_PEER_change_rc (pr->origin_pid, -1);
@@ -844,10 +860,9 @@ process_reply (void *cls,
   prq->eval =
       GNUNET_BLOCK_evaluate (GSF_block_ctx,
                              prq->type,
+                             pr->bg,
                              prq->eo,
                              key,
-                             &pr->bf,
-                             pr->mingle,
                              NULL,
                              0,
                              prq->data,
@@ -1036,7 +1051,7 @@ put_migration_continuation (void *cls, int success,
                                                       ppd->migration_delay);
       mig_pause.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                         ppd->migration_delay.rel_value_us);
-      ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2);
+      ppd->migration_delay = GNUNET_TIME_relative_saturating_multiply (ppd->migration_delay, 2);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Replicated content already exists locally, asking to stop migration for %s\n",
                  GNUNET_STRINGS_relative_time_to_string (mig_pause,