Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
index 2af68dbeffbf511471af9195fef6f3111739edb1..9fe1eabe64e98753f2db24796c01d7fe8431b1d1 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2013, 2014 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013, 2014 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
 */
 /**
  * @file set/gnunet-service-set_intersection.c
 enum IntersectionOperationPhase
 {
   /**
-   * Alices has suggested an operation to bob,
-   * and is waiting for a bf or session end.
+   * We are just starting.
    */
   PHASE_INITIAL,
 
   /**
-   * Bob has accepted the operation, Bob and Alice are now exchanging bfs
-   * until one notices the their element hashes are equal.
+   * We have send the number of our elements to the other
+   * peer, but did not setup our element set yet.
+   */
+  PHASE_COUNT_SENT,
+
+  /**
+   * We have initialized our set and are now reducing it by exchanging
+   * Bloom filters until one party notices the their element hashes
+   * are equal.
    */
   PHASE_BF_EXCHANGE,
 
@@ -53,6 +59,7 @@ enum IntersectionOperationPhase
    * client.
    */
   PHASE_FINISHED
+
 };
 
 
@@ -202,7 +209,7 @@ send_client_removed_element (struct Operation *op,
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
   rm->request_id = htonl (op->spec->client_request_id);
   rm->element_type = element->element_type;
-  memcpy (&rm[1],
+  GNUNET_memcpy (&rm[1],
           element->data,
           element->size);
   GNUNET_MQ_send (op->spec->set->client_mq,
@@ -227,14 +234,29 @@ filtered_map_initialization (void *cls,
   struct ElementEntry *ee = value;
   struct GNUNET_HashCode mutated_hash;
 
-  if ( (op->generation_created < ee->generation_removed) &&
-       (op->generation_created >= ee->generation_added) )
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "FIMA called for %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
+
+  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Reduced initialization, not starting with %s:%u (wrong generation)\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
     return GNUNET_YES; /* element not valid in our operation's generation */
+  }
 
-  /* Test if element is in Bob's bloomfilter */
+  /* Test if element is in other peer's bloomfilter */
   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
                             op->state->salt,
                             &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testing mingled hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
   if (GNUNET_NO ==
       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
                                          &mutated_hash))
@@ -242,12 +264,20 @@ filtered_map_initialization (void *cls,
     /* remove this element */
     send_client_removed_element (op,
                                  &ee->element);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Reduced initialization, not starting with %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
     return GNUNET_YES;
   }
   op->state->my_element_count++;
   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
                           &ee->element_hash,
                           &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Filtered initialization of my_elements, adding %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
   GNUNET_break (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
                                                    &ee->element_hash,
@@ -279,6 +309,10 @@ iterator_bf_reduce (void *cls,
   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
                             op->state->salt,
                             &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testing mingled hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
   if (GNUNET_NO ==
       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
                                          &mutated_hash))
@@ -288,6 +322,10 @@ iterator_bf_reduce (void *cls,
     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
                             &ee->element_hash,
                             &op->state->my_xor);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Bloom filter reduction of my_elements, removing %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
     GNUNET_assert (GNUNET_YES ==
                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
                                                          &ee->element_hash,
@@ -295,6 +333,13 @@ iterator_bf_reduce (void *cls,
     send_client_removed_element (op,
                                  &ee->element);
   }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Bloom filter reduction of my_elements, keeping %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
+  }
   return GNUNET_YES;
 }
 
@@ -319,6 +364,10 @@ iterator_bf_create (void *cls,
   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
                             op->state->salt,
                             &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initializing BF with hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
                                     &mutated_hash);
   return GNUNET_YES;
@@ -370,7 +419,6 @@ send_bloomfilter (struct Operation *op)
   uint32_t bf_size;
   uint32_t bf_elementbits;
   uint32_t chunk_size;
-  struct GNUNET_CONTAINER_BloomFilter *local_bf;
   char *bf_data;
   uint32_t offset;
 
@@ -387,11 +435,12 @@ send_bloomfilter (struct Operation *op)
   bf_size = ceil ((double) (op->state->my_element_count
                             * bf_elementbits / log(2)));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending bf of size %u\n",
+              "Sending Bloom filter (%u) of size %u bytes\n",
+              (unsigned int) bf_elementbits,
               (unsigned int) bf_size);
-  local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
-                                                bf_size,
-                                                bf_elementbits);
+  op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+                                                           bf_size,
+                                                           bf_elementbits);
   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
                                               UINT32_MAX);
   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
@@ -408,7 +457,7 @@ send_bloomfilter (struct Operation *op)
                               chunk_size,
                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
     GNUNET_assert (GNUNET_SYSERR !=
-                   GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
+                   GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
                                                               (char*) &msg[1],
                                                               bf_size));
     msg->sender_element_count = htonl (op->state->my_element_count);
@@ -423,7 +472,7 @@ send_bloomfilter (struct Operation *op)
     /* multipart */
     bf_data = GNUNET_malloc (bf_size);
     GNUNET_assert (GNUNET_SYSERR !=
-                   GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
+                   GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
                                                               bf_data,
                                                               bf_size));
     offset = 0;
@@ -434,7 +483,7 @@ send_bloomfilter (struct Operation *op)
       ev = GNUNET_MQ_msg_extra (msg,
                                 chunk_size,
                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
-      memcpy (&msg[1],
+      GNUNET_memcpy (&msg[1],
               &bf_data[offset],
               chunk_size);
       offset += chunk_size;
@@ -447,7 +496,8 @@ send_bloomfilter (struct Operation *op)
     }
     GNUNET_free (bf_data);
   }
-  GNUNET_CONTAINER_bloomfilter_free (local_bf);
+  GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
+  op->state->local_bf = NULL;
 }
 
 
@@ -499,13 +549,15 @@ send_remaining_elements (void *cls)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sending done and destroy because iterator ran out\n");
+    op->keep--;
     send_client_done_and_destroy (op);
     return;
   }
   ee = nxt;
   element = &ee->element;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending element (size %u) to client (full set)\n",
+              "Sending element %s:%u to client (full set)\n",
+              GNUNET_h2s (&ee->element_hash),
               element->size);
   GNUNET_assert (0 != op->spec->client_request_id);
   ev = GNUNET_MQ_msg_extra (rm,
@@ -515,7 +567,7 @@ send_remaining_elements (void *cls)
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
   rm->request_id = htonl (op->spec->client_request_id);
   rm->element_type = element->element_type;
-  memcpy (&rm[1],
+  GNUNET_memcpy (&rm[1],
           element->data,
           element->size);
   GNUNET_MQ_notify_sent (ev,
@@ -560,16 +612,26 @@ send_peer_done (struct Operation *op)
 static void
 process_bf (struct Operation *op)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
+              op->state->phase,
+              op->spec->remote_element_count,
+              op->state->my_element_count,
+              GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
   switch (op->state->phase)
   {
   case PHASE_INITIAL:
+    GNUNET_break_op (0);
+    fail_intersection_operation(op);
+    return;
+  case PHASE_COUNT_SENT:
     /* This is the first BF being sent, build our initial map with
        filtering in place */
     op->state->my_elements
       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
                                               GNUNET_YES);
-    GNUNET_break (0 == op->state->my_element_count);
-    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
+    op->state->my_element_count = 0;
+    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
                                            &filtered_map_initialization,
                                            op);
     break;
@@ -633,6 +695,7 @@ handle_p2p_bf (void *cls,
     GNUNET_break_op (0);
     fail_intersection_operation (op);
     break;
+  case PHASE_COUNT_SENT:
   case PHASE_BF_EXCHANGE:
     bf_size = ntohl (msg->bloomfilter_total_length);
     bf_bits_per_element = ntohl (msg->bits_per_element);
@@ -652,6 +715,7 @@ handle_p2p_bf (void *cls,
                                              bf_size,
                                              bf_bits_per_element);
       op->state->salt = ntohl (msg->sender_mutator);
+      op->spec->remote_element_count = ntohl (msg->sender_element_count);
       process_bf (op);
       return;
     }
@@ -680,7 +744,7 @@ handle_p2p_bf (void *cls,
         return;
       }
     }
-    memcpy (&op->state->bf_data[op->state->bf_data_offset],
+    GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
             (const char*) &msg[1],
             chunk_size);
     op->state->bf_data_offset += chunk_size;
@@ -722,12 +786,15 @@ initialize_map_unfiltered (void *cls,
   struct ElementEntry *ee = value;
   struct Operation *op = cls;
 
-  if ( (op->generation_created < ee->generation_removed) &&
-       (op->generation_created >= ee->generation_added) )
+  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
     return GNUNET_YES; /* element not live in operation's generation */
   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
                           &ee->element_hash,
                           &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initial full initialization of my_elements, adding %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
   GNUNET_break (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
                                                    &ee->element_hash,
@@ -750,7 +817,8 @@ send_element_count (struct Operation *op)
   struct IntersectionElementInfoMessage *msg;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending our element count (bf_msg)\n");
+              "Sending our element count (%u)\n",
+              op->state->my_element_count);
   ev = GNUNET_MQ_msg (msg,
                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
   msg->sender_element_count = htonl (op->state->my_element_count);
@@ -767,12 +835,11 @@ send_element_count (struct Operation *op)
 static void
 begin_bf_exchange (struct Operation *op)
 {
-  GNUNET_break (PHASE_INITIAL == op->state->phase);
   op->state->phase = PHASE_BF_EXCHANGE;
   op->state->my_elements
     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
                                             GNUNET_YES);
-  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
                                          &initialize_map_unfiltered,
                                          op);
   send_bloomfilter (op);
@@ -801,7 +868,12 @@ handle_p2p_element_info (void *cls,
   }
   msg = (const struct IntersectionElementInfoMessage *) mh;
   op->spec->remote_element_count = ntohl (msg->sender_element_count);
-  if ( (PHASE_INITIAL != op->state->phase) ||
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received remote element count (%u), I have %u\n",
+              op->spec->remote_element_count,
+              op->state->my_element_count);
+  if ( ( (PHASE_INITIAL != op->state->phase) &&
+         (PHASE_COUNT_SENT != op->state->phase) ) ||
        (op->state->my_element_count > op->spec->remote_element_count) ||
        (0 == op->state->my_element_count) ||
        (0 == op->spec->remote_element_count) )
@@ -830,9 +902,11 @@ finish_and_destroy (struct Operation *op)
   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending full result set\n");
+                "Sending full result set (%u elements)\n",
+                GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
     op->state->full_result_iter
       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
+    op->keep++;
     send_remaining_elements (op);
     return;
   }
@@ -861,6 +935,10 @@ filter_all (void *cls,
   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
                           &ee->element_hash,
                           &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Final reduction of my_elements, removing %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
                                                        &ee->element_hash,
@@ -918,7 +996,8 @@ handle_p2p_done (void *cls,
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got final DONE\n");
+              "Got IntersectionDoneMessage, have %u elements in intersection\n",
+              op->state->my_element_count);
   op->state->phase = PHASE_FINISHED;
   finish_and_destroy (op);
 }
@@ -945,7 +1024,7 @@ intersection_evaluate (struct Operation *op,
   op->state->my_element_count = op->spec->set->state->current_set_element_count;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Initiating intersection operation evaluation");
+              "Initiating intersection operation evaluation\n");
   ev = GNUNET_MQ_msg_nested_mh (msg,
                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
                                 opaque_context);
@@ -953,14 +1032,14 @@ intersection_evaluate (struct Operation *op,
   {
     /* the context message is too large!? */
     GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (op->spec->set->client);
+    GNUNET_SERVICE_client_drop (op->spec->set->client);
     return;
   }
   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
-  msg->app_id = op->spec->app_id;
   msg->element_count = htonl (op->state->my_element_count);
   GNUNET_MQ_send (op->mq,
                   ev);
+  op->state->phase = PHASE_COUNT_SENT;
   if (NULL != opaque_context)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sent op request with context message\n");
@@ -995,7 +1074,7 @@ intersection_accept (struct Operation *op)
     /* If the other peer (Alice) has fewer elements than us (Bob),
        we just send the count as Alice should send the first BF */
     send_element_count (op);
-    op->state->phase = PHASE_BF_EXCHANGE;
+    op->state->phase = PHASE_COUNT_SENT;
     return;
   }
   /* We have fewer elements, so we start with the BF */