Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set_intersection.c
20  * @brief two-peer set intersection
21  * @author Christian Fuchs
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_statistics_service.h"
27 #include "gnunet-service-set.h"
28 #include "gnunet_block_lib.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet-service-set_intersection.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * We are just starting.
41    */
42   PHASE_INITIAL,
43
44   /**
45    * We have send the number of our elements to the other
46    * peer, but did not setup our element set yet.
47    */
48   PHASE_COUNT_SENT,
49
50   /**
51    * We have initialized our set and are now reducing it by exchanging
52    * Bloom filters until one party notices the their element hashes
53    * are equal.
54    */
55   PHASE_BF_EXCHANGE,
56
57   /**
58    * We must next send the P2P DONE message (after finishing mostly
59    * with the local client).  Then we will wait for the channel to close.
60    */
61   PHASE_MUST_SEND_DONE,
62
63   /**
64    * We have received the P2P DONE message, and must finish with the
65    * local client before terminating the channel.
66    */
67   PHASE_DONE_RECEIVED,
68
69   /**
70    * The protocol is over.  Results may still have to be sent to the
71    * client.
72    */
73   PHASE_FINISHED
74
75 };
76
77
78 /**
79  * State of an evaluate operation with another peer.
80  */
81 struct OperationState
82 {
83   /**
84    * The bf we currently receive
85    */
86   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
87
88   /**
89    * BF of the set's element.
90    */
91   struct GNUNET_CONTAINER_BloomFilter *local_bf;
92
93   /**
94    * Remaining elements in the intersection operation.
95    * Maps element-id-hashes to 'elements in our set'.
96    */
97   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
98
99   /**
100    * Iterator for sending the final set of @e my_elements to the client.
101    */
102   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
103
104   /**
105    * Evaluate operations are held in a linked list.
106    */
107   struct OperationState *next;
108
109   /**
110    * Evaluate operations are held in a linked list.
111    */
112   struct OperationState *prev;
113
114   /**
115    * For multipart BF transmissions, we have to store the
116    * bloomfilter-data until we fully received it.
117    */
118   char *bf_data;
119
120   /**
121    * XOR of the keys of all of the elements (remaining) in my set.
122    * Always updated when elements are added or removed to
123    * @e my_elements.
124    */
125   struct GNUNET_HashCode my_xor;
126
127   /**
128    * XOR of the keys of all of the elements (remaining) in
129    * the other peer's set.  Updated when we receive the
130    * other peer's Bloom filter.
131    */
132   struct GNUNET_HashCode other_xor;
133
134   /**
135    * How many bytes of @e bf_data are valid?
136    */
137   uint32_t bf_data_offset;
138
139   /**
140    * Current element count contained within @e my_elements.
141    * (May differ briefly during initialization.)
142    */
143   uint32_t my_element_count;
144
145   /**
146    * size of the bloomfilter in @e bf_data.
147    */
148   uint32_t bf_data_size;
149
150   /**
151    * size of the bloomfilter
152    */
153   uint32_t bf_bits_per_element;
154
155   /**
156    * Salt currently used for BF construction (by us or the other peer,
157    * depending on where we are in the code).
158    */
159   uint32_t salt;
160
161   /**
162    * Current state of the operation.
163    */
164   enum IntersectionOperationPhase phase;
165
166   /**
167    * Generation in which the operation handle
168    * was created.
169    */
170   unsigned int generation_created;
171
172   /**
173    * Did we send the client that we are done?
174    */
175   int client_done_sent;
176
177   /**
178    * Set whenever we reach the state where the death of the
179    * channel is perfectly find and should NOT result in the
180    * operation being cancelled.
181    */
182   int channel_death_expected;
183 };
184
185
186 /**
187  * Extra state required for efficient set intersection.
188  * Merely tracks the total number of elements.
189  */
190 struct SetState
191 {
192   /**
193    * Number of currently valid elements in the set which have not been
194    * removed.
195    */
196   uint32_t current_set_element_count;
197 };
198
199
200 /**
201  * If applicable in the current operation mode, send a result message
202  * to the client indicating we removed an element.
203  *
204  * @param op intersection operation
205  * @param element element to send
206  */
207 static void
208 send_client_removed_element (struct Operation *op,
209                              struct GNUNET_SET_Element *element)
210 {
211   struct GNUNET_MQ_Envelope *ev;
212   struct GNUNET_SET_ResultMessage *rm;
213
214   if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
215     return; /* Wrong mode for transmitting removed elements */
216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
217               "Sending removed element (size %u) to client\n",
218               element->size);
219   GNUNET_STATISTICS_update (_GSS_statistics,
220                             "# Element removed messages sent",
221                             1,
222                             GNUNET_NO);
223   GNUNET_assert (0 != op->client_request_id);
224   ev = GNUNET_MQ_msg_extra (rm,
225                             element->size,
226                             GNUNET_MESSAGE_TYPE_SET_RESULT);
227   if (NULL == ev)
228   {
229     GNUNET_break (0);
230     return;
231   }
232   rm->result_status = htons (GNUNET_SET_STATUS_OK);
233   rm->request_id = htonl (op->client_request_id);
234   rm->element_type = element->element_type;
235   GNUNET_memcpy (&rm[1],
236                  element->data,
237                  element->size);
238   GNUNET_MQ_send (op->set->cs->mq,
239                   ev);
240 }
241
242
243 /**
244  * Fills the "my_elements" hashmap with all relevant elements.
245  *
246  * @param cls the `struct Operation *` we are performing
247  * @param key current key code
248  * @param value the `struct ElementEntry *` from the hash map
249  * @return #GNUNET_YES (we should continue to iterate)
250  */
251 static int
252 filtered_map_initialization (void *cls,
253                              const struct GNUNET_HashCode *key,
254                              void *value)
255 {
256   struct Operation *op = cls;
257   struct ElementEntry *ee = value;
258   struct GNUNET_HashCode mutated_hash;
259
260
261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
262               "FIMA called for %s:%u\n",
263               GNUNET_h2s (&ee->element_hash),
264               ee->element.size);
265
266   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
267   {
268     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
269                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
270                 GNUNET_h2s (&ee->element_hash),
271                 ee->element.size);
272     return GNUNET_YES; /* element not valid in our operation's generation */
273   }
274
275   /* Test if element is in other peer's bloomfilter */
276   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
277                             op->state->salt,
278                             &mutated_hash);
279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
280               "Testing mingled hash %s with salt %u\n",
281               GNUNET_h2s (&mutated_hash),
282               op->state->salt);
283   if (GNUNET_NO ==
284       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
285                                          &mutated_hash))
286   {
287     /* remove this element */
288     send_client_removed_element (op,
289                                  &ee->element);
290     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
291                 "Reduced initialization, not starting with %s:%u\n",
292                 GNUNET_h2s (&ee->element_hash),
293                 ee->element.size);
294     return GNUNET_YES;
295   }
296   op->state->my_element_count++;
297   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
298                           &ee->element_hash,
299                           &op->state->my_xor);
300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301               "Filtered initialization of my_elements, adding %s:%u\n",
302               GNUNET_h2s (&ee->element_hash),
303               ee->element.size);
304   GNUNET_break (GNUNET_YES ==
305                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
306                                                    &ee->element_hash,
307                                                    ee,
308                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
309
310   return GNUNET_YES;
311 }
312
313
314 /**
315  * Removes elements from our hashmap if they are not contained within the
316  * provided remote bloomfilter.
317  *
318  * @param cls closure with the `struct Operation *`
319  * @param key current key code
320  * @param value value in the hash map
321  * @return #GNUNET_YES (we should continue to iterate)
322  */
323 static int
324 iterator_bf_reduce (void *cls,
325                    const struct GNUNET_HashCode *key,
326                    void *value)
327 {
328   struct Operation *op = cls;
329   struct ElementEntry *ee = value;
330   struct GNUNET_HashCode mutated_hash;
331
332   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
333                             op->state->salt,
334                             &mutated_hash);
335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
336               "Testing mingled hash %s with salt %u\n",
337               GNUNET_h2s (&mutated_hash),
338               op->state->salt);
339   if (GNUNET_NO ==
340       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
341                                          &mutated_hash))
342   {
343     GNUNET_break (0 < op->state->my_element_count);
344     op->state->my_element_count--;
345     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
346                             &ee->element_hash,
347                             &op->state->my_xor);
348     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
349                 "Bloom filter reduction of my_elements, removing %s:%u\n",
350                 GNUNET_h2s (&ee->element_hash),
351                 ee->element.size);
352     GNUNET_assert (GNUNET_YES ==
353                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
354                                                          &ee->element_hash,
355                                                          ee));
356     send_client_removed_element (op,
357                                  &ee->element);
358   }
359   else
360   {
361     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
363                 GNUNET_h2s (&ee->element_hash),
364                 ee->element.size);
365   }
366   return GNUNET_YES;
367 }
368
369
370 /**
371  * Create initial bloomfilter based on all the elements given.
372  *
373  * @param cls the `struct Operation *`
374  * @param key current key code
375  * @param value the `struct ElementEntry` to process
376  * @return #GNUNET_YES (we should continue to iterate)
377  */
378 static int
379 iterator_bf_create (void *cls,
380                     const struct GNUNET_HashCode *key,
381                     void *value)
382 {
383   struct Operation *op = cls;
384   struct ElementEntry *ee = value;
385   struct GNUNET_HashCode mutated_hash;
386
387   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
388                             op->state->salt,
389                             &mutated_hash);
390   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391               "Initializing BF with hash %s with salt %u\n",
392               GNUNET_h2s (&mutated_hash),
393               op->state->salt);
394   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
395                                     &mutated_hash);
396   return GNUNET_YES;
397 }
398
399
400 /**
401  * Inform the client that the intersection operation has failed,
402  * and proceed to destroy the evaluate operation.
403  *
404  * @param op the intersection operation to fail
405  */
406 static void
407 fail_intersection_operation (struct Operation *op)
408 {
409   struct GNUNET_MQ_Envelope *ev;
410   struct GNUNET_SET_ResultMessage *msg;
411
412   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
413               "Intersection operation failed\n");
414   GNUNET_STATISTICS_update (_GSS_statistics,
415                             "# Intersection operations failed",
416                             1,
417                             GNUNET_NO);
418   if (NULL != op->state->my_elements)
419   {
420     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
421     op->state->my_elements = NULL;
422   }
423   ev = GNUNET_MQ_msg (msg,
424                       GNUNET_MESSAGE_TYPE_SET_RESULT);
425   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
426   msg->request_id = htonl (op->client_request_id);
427   msg->element_type = htons (0);
428   GNUNET_MQ_send (op->set->cs->mq,
429                   ev);
430   _GSS_operation_destroy (op,
431                           GNUNET_YES);
432 }
433
434
435 /**
436  * Send a bloomfilter to our peer.  After the result done message has
437  * been sent to the client, destroy the evaluate operation.
438  *
439  * @param op intersection operation
440  */
441 static void
442 send_bloomfilter (struct Operation *op)
443 {
444   struct GNUNET_MQ_Envelope *ev;
445   struct BFMessage *msg;
446   uint32_t bf_size;
447   uint32_t bf_elementbits;
448   uint32_t chunk_size;
449   char *bf_data;
450   uint32_t offset;
451
452   /* We consider the ratio of the set sizes to determine
453      the number of bits per element, as the smaller set
454      should use more bits to maximize its set reduction
455      potential and minimize overall bandwidth consumption. */
456   bf_elementbits = 2 + ceil (log2((double)
457                              (op->remote_element_count /
458                               (double) op->state->my_element_count)));
459   if (bf_elementbits < 1)
460     bf_elementbits = 1; /* make sure k is not 0 */
461   /* optimize BF-size to ~50% of bits set */
462   bf_size = ceil ((double) (op->state->my_element_count
463                             * bf_elementbits / log(2)));
464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465               "Sending Bloom filter (%u) of size %u bytes\n",
466               (unsigned int) bf_elementbits,
467               (unsigned int) bf_size);
468   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
469                                                            bf_size,
470                                                            bf_elementbits);
471   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
472                                               UINT32_MAX);
473   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
474                                          &iterator_bf_create,
475                                          op);
476
477   /* send our Bloom filter */
478   GNUNET_STATISTICS_update (_GSS_statistics,
479                             "# Intersection Bloom filters sent",
480                             1,
481                             GNUNET_NO);
482   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
483   if (bf_size <= chunk_size)
484   {
485     /* singlepart */
486     chunk_size = bf_size;
487     ev = GNUNET_MQ_msg_extra (msg,
488                               chunk_size,
489                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
490     GNUNET_assert (GNUNET_SYSERR !=
491                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
492                                                               (char*) &msg[1],
493                                                               bf_size));
494     msg->sender_element_count = htonl (op->state->my_element_count);
495     msg->bloomfilter_total_length = htonl (bf_size);
496     msg->bits_per_element = htonl (bf_elementbits);
497     msg->sender_mutator = htonl (op->state->salt);
498     msg->element_xor_hash = op->state->my_xor;
499     GNUNET_MQ_send (op->mq, ev);
500   }
501   else
502   {
503     /* multipart */
504     bf_data = GNUNET_malloc (bf_size);
505     GNUNET_assert (GNUNET_SYSERR !=
506                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
507                                                               bf_data,
508                                                               bf_size));
509     offset = 0;
510     while (offset < bf_size)
511     {
512       if (bf_size - chunk_size < offset)
513         chunk_size = bf_size - offset;
514       ev = GNUNET_MQ_msg_extra (msg,
515                                 chunk_size,
516                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
517       GNUNET_memcpy (&msg[1],
518               &bf_data[offset],
519               chunk_size);
520       offset += chunk_size;
521       msg->sender_element_count = htonl (op->state->my_element_count);
522       msg->bloomfilter_total_length = htonl (bf_size);
523       msg->bits_per_element = htonl (bf_elementbits);
524       msg->sender_mutator = htonl (op->state->salt);
525       msg->element_xor_hash = op->state->my_xor;
526       GNUNET_MQ_send (op->mq, ev);
527     }
528     GNUNET_free (bf_data);
529   }
530   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
531   op->state->local_bf = NULL;
532 }
533
534
535 /**
536  * Signal to the client that the operation has finished and
537  * destroy the operation.
538  *
539  * @param cls operation to destroy
540  */
541 static void
542 send_client_done_and_destroy (void *cls)
543 {
544   struct Operation *op = cls;
545   struct GNUNET_MQ_Envelope *ev;
546   struct GNUNET_SET_ResultMessage *rm;
547
548   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549               "Intersection succeeded, sending DONE to local client\n");
550   GNUNET_STATISTICS_update (_GSS_statistics,
551                             "# Intersection operations succeeded",
552                             1,
553                             GNUNET_NO);
554   ev = GNUNET_MQ_msg (rm,
555                       GNUNET_MESSAGE_TYPE_SET_RESULT);
556   rm->request_id = htonl (op->client_request_id);
557   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
558   rm->element_type = htons (0);
559   GNUNET_MQ_send (op->set->cs->mq,
560                   ev);
561   _GSS_operation_destroy (op,
562                           GNUNET_YES);
563 }
564
565
566 /**
567  * Remember that we are done dealing with the local client
568  * AND have sent the other peer our message that we are done,
569  * so we are not just waiting for the channel to die before
570  * telling the local client that we are done as our last act.
571  *
572  * @param cls the `struct Operation`.
573  */
574 static void
575 finished_local_operations (void *cls)
576 {
577   struct Operation *op = cls;
578
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
580               "DONE sent to other peer, now waiting for other end to close the channel\n");
581   op->state->phase = PHASE_FINISHED;
582   op->state->channel_death_expected = GNUNET_YES;
583 }
584
585
586 /**
587  * Notify the other peer that we are done.  Once this message
588  * is out, we still need to notify the local client that we
589  * are done.
590  *
591  * @param op operation to notify for.
592  */
593 static void
594 send_p2p_done (struct Operation *op)
595 {
596   struct GNUNET_MQ_Envelope *ev;
597   struct IntersectionDoneMessage *idm;
598
599   GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
600   GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
601   ev = GNUNET_MQ_msg (idm,
602                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
603   idm->final_element_count = htonl (op->state->my_element_count);
604   idm->element_xor_hash = op->state->my_xor;
605   GNUNET_MQ_notify_sent (ev,
606                          &finished_local_operations,
607                          op);
608   GNUNET_MQ_send (op->mq,
609                   ev);
610 }
611
612
613 /**
614  * Send all elements in the full result iterator.
615  *
616  * @param cls the `struct Operation *`
617  */
618 static void
619 send_remaining_elements (void *cls)
620 {
621   struct Operation *op = cls;
622   const void *nxt;
623   const struct ElementEntry *ee;
624   struct GNUNET_MQ_Envelope *ev;
625   struct GNUNET_SET_ResultMessage *rm;
626   const struct GNUNET_SET_Element *element;
627   int res;
628
629   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
630                                                      NULL,
631                                                      &nxt);
632   if (GNUNET_NO == res)
633   {
634     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                 "Sending done and destroy because iterator ran out\n");
636     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
637     op->state->full_result_iter = NULL;
638     if (PHASE_DONE_RECEIVED == op->state->phase)
639     {
640       op->state->phase = PHASE_FINISHED;
641       send_client_done_and_destroy (op);
642     }
643     else if (PHASE_MUST_SEND_DONE == op->state->phase)
644     {
645       send_p2p_done (op);
646     }
647     else
648     {
649       GNUNET_assert (0);
650     }
651     return;
652   }
653   ee = nxt;
654   element = &ee->element;
655   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
656               "Sending element %s:%u to client (full set)\n",
657               GNUNET_h2s (&ee->element_hash),
658               element->size);
659   GNUNET_assert (0 != op->client_request_id);
660   ev = GNUNET_MQ_msg_extra (rm,
661                             element->size,
662                             GNUNET_MESSAGE_TYPE_SET_RESULT);
663   GNUNET_assert (NULL != ev);
664   rm->result_status = htons (GNUNET_SET_STATUS_OK);
665   rm->request_id = htonl (op->client_request_id);
666   rm->element_type = element->element_type;
667   GNUNET_memcpy (&rm[1],
668                  element->data,
669                  element->size);
670   GNUNET_MQ_notify_sent (ev,
671                          &send_remaining_elements,
672                          op);
673   GNUNET_MQ_send (op->set->cs->mq,
674                   ev);
675 }
676
677
678 /**
679  * Fills the "my_elements" hashmap with the initial set of
680  * (non-deleted) elements from the set of the specification.
681  *
682  * @param cls closure with the `struct Operation *`
683  * @param key current key code for the element
684  * @param value value in the hash map with the `struct ElementEntry *`
685  * @return #GNUNET_YES (we should continue to iterate)
686  */
687 static int
688 initialize_map_unfiltered (void *cls,
689                            const struct GNUNET_HashCode *key,
690                            void *value)
691 {
692   struct ElementEntry *ee = value;
693   struct Operation *op = cls;
694
695   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
696     return GNUNET_YES; /* element not live in operation's generation */
697   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
698                           &ee->element_hash,
699                           &op->state->my_xor);
700   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701               "Initial full initialization of my_elements, adding %s:%u\n",
702               GNUNET_h2s (&ee->element_hash),
703               ee->element.size);
704   GNUNET_break (GNUNET_YES ==
705                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
706                                                    &ee->element_hash,
707                                                    ee,
708                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
709   return GNUNET_YES;
710 }
711
712
713 /**
714  * Send our element count to the peer, in case our element count is
715  * lower than theirs.
716  *
717  * @param op intersection operation
718  */
719 static void
720 send_element_count (struct Operation *op)
721 {
722   struct GNUNET_MQ_Envelope *ev;
723   struct IntersectionElementInfoMessage *msg;
724
725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726               "Sending our element count (%u)\n",
727               op->state->my_element_count);
728   ev = GNUNET_MQ_msg (msg,
729                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
730   msg->sender_element_count = htonl (op->state->my_element_count);
731   GNUNET_MQ_send (op->mq, ev);
732 }
733
734
735 /**
736  * We go first, initialize our map with all elements and
737  * send the first Bloom filter.
738  *
739  * @param op operation to start exchange for
740  */
741 static void
742 begin_bf_exchange (struct Operation *op)
743 {
744   op->state->phase = PHASE_BF_EXCHANGE;
745   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
746                                          &initialize_map_unfiltered,
747                                          op);
748   send_bloomfilter (op);
749 }
750
751
752 /**
753  * Handle the initial `struct IntersectionElementInfoMessage` from a
754  * remote peer.
755  *
756  * @param cls the intersection operation
757  * @param mh the header of the message
758  */
759 void
760 handle_intersection_p2p_element_info (void *cls,
761                                       const struct IntersectionElementInfoMessage *msg)
762 {
763   struct Operation *op = cls;
764
765   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
766   {
767     GNUNET_break_op (0);
768     fail_intersection_operation(op);
769     return;
770   }
771   op->remote_element_count = ntohl (msg->sender_element_count);
772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
773               "Received remote element count (%u), I have %u\n",
774               op->remote_element_count,
775               op->state->my_element_count);
776   if ( ( (PHASE_INITIAL != op->state->phase) &&
777          (PHASE_COUNT_SENT != op->state->phase) ) ||
778        (op->state->my_element_count > op->remote_element_count) ||
779        (0 == op->state->my_element_count) ||
780        (0 == op->remote_element_count) )
781   {
782     GNUNET_break_op (0);
783     fail_intersection_operation(op);
784     return;
785   }
786   GNUNET_break (NULL == op->state->remote_bf);
787   begin_bf_exchange (op);
788   GNUNET_CADET_receive_done (op->channel);
789 }
790
791
792 /**
793  * Process a Bloomfilter once we got all the chunks.
794  *
795  * @param op the intersection operation
796  */
797 static void
798 process_bf (struct Operation *op)
799 {
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
802               op->state->phase,
803               op->remote_element_count,
804               op->state->my_element_count,
805               GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
806   switch (op->state->phase)
807   {
808   case PHASE_INITIAL:
809     GNUNET_break_op (0);
810     fail_intersection_operation(op);
811     return;
812   case PHASE_COUNT_SENT:
813     /* This is the first BF being sent, build our initial map with
814        filtering in place */
815     op->state->my_element_count = 0;
816     GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
817                                            &filtered_map_initialization,
818                                            op);
819     break;
820   case PHASE_BF_EXCHANGE:
821     /* Update our set by reduction */
822     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
823                                            &iterator_bf_reduce,
824                                            op);
825     break;
826   case PHASE_MUST_SEND_DONE:
827     GNUNET_break_op (0);
828     fail_intersection_operation(op);
829     return;
830   case PHASE_DONE_RECEIVED:
831     GNUNET_break_op (0);
832     fail_intersection_operation(op);
833     return;
834   case PHASE_FINISHED:
835     GNUNET_break_op (0);
836     fail_intersection_operation(op);
837     return;
838   }
839   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
840   op->state->remote_bf = NULL;
841
842   if ( (0 == op->state->my_element_count) || /* fully disjoint */
843        ( (op->state->my_element_count == op->remote_element_count) &&
844          (0 == memcmp (&op->state->my_xor,
845                        &op->state->other_xor,
846                        sizeof (struct GNUNET_HashCode))) ) )
847   {
848     /* we are done */
849     op->state->phase = PHASE_MUST_SEND_DONE;
850     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851                 "Intersection succeeded, sending DONE to other peer\n");
852     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
853     op->state->local_bf = NULL;
854     if (GNUNET_SET_RESULT_FULL == op->result_mode)
855     {
856       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                   "Sending full result set (%u elements)\n",
858                   GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
859       op->state->full_result_iter
860         = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
861       send_remaining_elements (op);
862       return;
863     }
864     send_p2p_done (op);
865     return;
866   }
867   op->state->phase = PHASE_BF_EXCHANGE;
868   send_bloomfilter (op);
869 }
870
871
872 /**
873  * Check an BF message from a remote peer.
874  *
875  * @param cls the intersection operation
876  * @param msg the header of the message
877  * @return #GNUNET_OK if @a msg is well-formed
878  */
879 int
880 check_intersection_p2p_bf (void *cls,
881                            const struct BFMessage *msg)
882 {
883   struct Operation *op = cls;
884
885   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
886   {
887     GNUNET_break_op (0);
888     return GNUNET_SYSERR;
889   }
890   return GNUNET_OK;
891 }
892
893
894 /**
895  * Handle an BF message from a remote peer.
896  *
897  * @param cls the intersection operation
898  * @param msg the header of the message
899  */
900 void
901 handle_intersection_p2p_bf (void *cls,
902                             const struct BFMessage *msg)
903 {
904   struct Operation *op = cls;
905   uint32_t bf_size;
906   uint32_t chunk_size;
907   uint32_t bf_bits_per_element;
908
909   switch (op->state->phase)
910   {
911   case PHASE_INITIAL:
912     GNUNET_break_op (0);
913     fail_intersection_operation (op);
914     return;
915   case PHASE_COUNT_SENT:
916   case PHASE_BF_EXCHANGE:
917     bf_size = ntohl (msg->bloomfilter_total_length);
918     bf_bits_per_element = ntohl (msg->bits_per_element);
919     chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
920     op->state->other_xor = msg->element_xor_hash;
921     if (bf_size == chunk_size)
922     {
923       if (NULL != op->state->bf_data)
924       {
925         GNUNET_break_op (0);
926         fail_intersection_operation (op);
927         return;
928       }
929       /* single part, done here immediately */
930       op->state->remote_bf
931         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
932                                              bf_size,
933                                              bf_bits_per_element);
934       op->state->salt = ntohl (msg->sender_mutator);
935       op->remote_element_count = ntohl (msg->sender_element_count);
936       process_bf (op);
937       break;
938     }
939     /* multipart chunk */
940     if (NULL == op->state->bf_data)
941     {
942       /* first chunk, initialize */
943       op->state->bf_data = GNUNET_malloc (bf_size);
944       op->state->bf_data_size = bf_size;
945       op->state->bf_bits_per_element = bf_bits_per_element;
946       op->state->bf_data_offset = 0;
947       op->state->salt = ntohl (msg->sender_mutator);
948       op->remote_element_count = ntohl (msg->sender_element_count);
949     }
950     else
951     {
952       /* increment */
953       if ( (op->state->bf_data_size != bf_size) ||
954            (op->state->bf_bits_per_element != bf_bits_per_element) ||
955            (op->state->bf_data_offset + chunk_size > bf_size) ||
956            (op->state->salt != ntohl (msg->sender_mutator)) ||
957            (op->remote_element_count != ntohl (msg->sender_element_count)) )
958       {
959         GNUNET_break_op (0);
960         fail_intersection_operation (op);
961         return;
962       }
963     }
964     GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
965             (const char*) &msg[1],
966             chunk_size);
967     op->state->bf_data_offset += chunk_size;
968     if (op->state->bf_data_offset == bf_size)
969     {
970       /* last chunk, run! */
971       op->state->remote_bf
972         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
973                                              bf_size,
974                                              bf_bits_per_element);
975       GNUNET_free (op->state->bf_data);
976       op->state->bf_data = NULL;
977       op->state->bf_data_size = 0;
978       process_bf (op);
979     }
980     break;
981   default:
982     GNUNET_break_op (0);
983     fail_intersection_operation (op);
984     return;
985   }
986   GNUNET_CADET_receive_done (op->channel);
987 }
988
989
990 /**
991  * Remove all elements from our hashmap.
992  *
993  * @param cls closure with the `struct Operation *`
994  * @param key current key code
995  * @param value value in the hash map
996  * @return #GNUNET_YES (we should continue to iterate)
997  */
998 static int
999 filter_all (void *cls,
1000             const struct GNUNET_HashCode *key,
1001             void *value)
1002 {
1003   struct Operation *op = cls;
1004   struct ElementEntry *ee = value;
1005
1006   GNUNET_break (0 < op->state->my_element_count);
1007   op->state->my_element_count--;
1008   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
1009                           &ee->element_hash,
1010                           &op->state->my_xor);
1011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012               "Final reduction of my_elements, removing %s:%u\n",
1013               GNUNET_h2s (&ee->element_hash),
1014               ee->element.size);
1015   GNUNET_assert (GNUNET_YES ==
1016                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1017                                                        &ee->element_hash,
1018                                                        ee));
1019   send_client_removed_element (op,
1020                                &ee->element);
1021   return GNUNET_YES;
1022 }
1023
1024
1025 /**
1026  * Handle a done message from a remote peer
1027  *
1028  * @param cls the intersection operation
1029  * @param mh the message
1030  */
1031 void
1032 handle_intersection_p2p_done (void *cls,
1033                               const struct IntersectionDoneMessage *idm)
1034 {
1035   struct Operation *op = cls;
1036
1037   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1038   {
1039     GNUNET_break_op (0);
1040     fail_intersection_operation (op);
1041     return;
1042   }
1043   if (PHASE_BF_EXCHANGE != op->state->phase)
1044   {
1045     /* wrong phase to conclude? FIXME: Or should we allow this
1046        if the other peer has _initially_ already an empty set? */
1047     GNUNET_break_op (0);
1048     fail_intersection_operation (op);
1049     return;
1050   }
1051   if (0 == ntohl (idm->final_element_count))
1052   {
1053     /* other peer determined empty set is the intersection,
1054        remove all elements */
1055     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1056                                            &filter_all,
1057                                            op);
1058   }
1059   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
1060        (0 != memcmp (&op->state->my_xor,
1061                      &idm->element_xor_hash,
1062                      sizeof (struct GNUNET_HashCode))) )
1063   {
1064     /* Other peer thinks we are done, but we disagree on the result! */
1065     GNUNET_break_op (0);
1066     fail_intersection_operation (op);
1067     return;
1068   }
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1071               op->state->my_element_count);
1072   op->state->phase = PHASE_DONE_RECEIVED;
1073   GNUNET_CADET_receive_done (op->channel);
1074
1075   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1076   if (GNUNET_SET_RESULT_FULL == op->result_mode)
1077   {
1078     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                 "Sending full result set to client (%u elements)\n",
1080                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1081     op->state->full_result_iter
1082       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1083     send_remaining_elements (op);
1084     return;
1085   }
1086   op->state->phase = PHASE_FINISHED;
1087   send_client_done_and_destroy (op);
1088 }
1089
1090
1091 /**
1092  * Initiate a set intersection operation with a remote peer.
1093  *
1094  * @param op operation that is created, should be initialized to
1095  *        begin the evaluation
1096  * @param opaque_context message to be transmitted to the listener
1097  *        to convince it to accept, may be NULL
1098  * @return operation-specific state to keep in @a op
1099  */
1100 static struct OperationState *
1101 intersection_evaluate (struct Operation *op,
1102                        const struct GNUNET_MessageHeader *opaque_context)
1103 {
1104   struct OperationState *state;
1105   struct GNUNET_MQ_Envelope *ev;
1106   struct OperationRequestMessage *msg;
1107
1108   ev = GNUNET_MQ_msg_nested_mh (msg,
1109                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1110                                 opaque_context);
1111   if (NULL == ev)
1112   {
1113     /* the context message is too large!? */
1114     GNUNET_break (0);
1115     return NULL;
1116   }
1117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118               "Initiating intersection operation evaluation\n");
1119   state = GNUNET_new (struct OperationState);
1120   /* we started the operation, thus we have to send the operation request */
1121   state->phase = PHASE_INITIAL;
1122   state->my_element_count = op->set->state->current_set_element_count;
1123   state->my_elements
1124     = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1125                                             GNUNET_YES);
1126
1127   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1128   msg->element_count = htonl (state->my_element_count);
1129   GNUNET_MQ_send (op->mq,
1130                   ev);
1131   state->phase = PHASE_COUNT_SENT;
1132   if (NULL != opaque_context)
1133     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134                 "Sent op request with context message\n");
1135   else
1136     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                 "Sent op request without context message\n");
1138   return state;
1139 }
1140
1141
1142 /**
1143  * Accept an intersection operation request from a remote peer.  Only
1144  * initializes the private operation state.
1145  *
1146  * @param op operation that will be accepted as an intersection operation
1147  */
1148 static struct OperationState *
1149 intersection_accept (struct Operation *op)
1150 {
1151   struct OperationState *state;
1152
1153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154               "Accepting set intersection operation\n");
1155   state = GNUNET_new (struct OperationState);
1156   state->phase = PHASE_INITIAL;
1157   state->my_element_count
1158     = op->set->state->current_set_element_count;
1159   state->my_elements
1160     = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1161                                                         op->remote_element_count),
1162                                             GNUNET_YES);
1163   op->state = state;
1164   if (op->remote_element_count < state->my_element_count)
1165   {
1166     /* If the other peer (Alice) has fewer elements than us (Bob),
1167        we just send the count as Alice should send the first BF */
1168     send_element_count (op);
1169     state->phase = PHASE_COUNT_SENT;
1170     return state;
1171   }
1172   /* We have fewer elements, so we start with the BF */
1173   begin_bf_exchange (op);
1174   return state;
1175 }
1176
1177
1178 /**
1179  * Destroy the intersection operation.  Only things specific to the
1180  * intersection operation are destroyed.
1181  *
1182  * @param op intersection operation to destroy
1183  */
1184 static void
1185 intersection_op_cancel (struct Operation *op)
1186 {
1187   /* check if the op was canceled twice */
1188   GNUNET_assert (NULL != op->state);
1189   if (NULL != op->state->remote_bf)
1190   {
1191     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1192     op->state->remote_bf = NULL;
1193   }
1194   if (NULL != op->state->local_bf)
1195   {
1196     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1197     op->state->local_bf = NULL;
1198   }
1199   if (NULL != op->state->my_elements)
1200   {
1201     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1202     op->state->my_elements = NULL;
1203   }
1204   if (NULL != op->state->full_result_iter)
1205   {
1206     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1207     op->state->full_result_iter = NULL;
1208   }
1209   GNUNET_free (op->state);
1210   op->state = NULL;
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Destroying intersection op state done\n");
1213 }
1214
1215
1216 /**
1217  * Create a new set supporting the intersection operation.
1218  *
1219  * @return the newly created set
1220  */
1221 static struct SetState *
1222 intersection_set_create ()
1223 {
1224   struct SetState *set_state;
1225
1226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227               "Intersection set created\n");
1228   set_state = GNUNET_new (struct SetState);
1229   set_state->current_set_element_count = 0;
1230
1231   return set_state;
1232 }
1233
1234
1235 /**
1236  * Add the element from the given element message to the set.
1237  *
1238  * @param set_state state of the set want to add to
1239  * @param ee the element to add to the set
1240  */
1241 static void
1242 intersection_add (struct SetState *set_state,
1243                   struct ElementEntry *ee)
1244 {
1245   set_state->current_set_element_count++;
1246 }
1247
1248
1249 /**
1250  * Destroy a set that supports the intersection operation
1251  *
1252  * @param set_state the set to destroy
1253  */
1254 static void
1255 intersection_set_destroy (struct SetState *set_state)
1256 {
1257   GNUNET_free (set_state);
1258 }
1259
1260
1261 /**
1262  * Remove the element given in the element message from the set.
1263  *
1264  * @param set_state state of the set to remove from
1265  * @param element set element to remove
1266  */
1267 static void
1268 intersection_remove (struct SetState *set_state,
1269                      struct ElementEntry *element)
1270 {
1271   GNUNET_assert (0 < set_state->current_set_element_count);
1272   set_state->current_set_element_count--;
1273 }
1274
1275
1276 /**
1277  * Callback for channel death for the intersection operation.
1278  *
1279  * @param op operation that lost the channel
1280  */
1281 static void
1282 intersection_channel_death (struct Operation *op)
1283 {
1284   if (GNUNET_YES == op->state->channel_death_expected)
1285   {
1286     /* oh goodie, we are done! */
1287     send_client_done_and_destroy (op);
1288   }
1289   else
1290   {
1291     /* sorry, channel went down early, too bad. */
1292     _GSS_operation_destroy (op,
1293                             GNUNET_YES);
1294   }
1295 }
1296
1297
1298 /**
1299  * Get the table with implementing functions for set intersection.
1300  *
1301  * @return the operation specific VTable
1302  */
1303 const struct SetVT *
1304 _GSS_intersection_vt ()
1305 {
1306   static const struct SetVT intersection_vt = {
1307     .create = &intersection_set_create,
1308     .add = &intersection_add,
1309     .remove = &intersection_remove,
1310     .destroy_set = &intersection_set_destroy,
1311     .evaluate = &intersection_evaluate,
1312     .accept = &intersection_accept,
1313     .cancel = &intersection_op_cancel,
1314     .channel_death = &intersection_channel_death,
1315   };
1316
1317   return &intersection_vt;
1318 }