de-duplicate operation types
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include "gnunet-service-set_intersection.h"
32 #include <gcrypt.h>
33
34
35 /**
36  * Current phase we are in for a intersection operation.
37  */
38 enum IntersectionOperationPhase
39 {
40   /**
41    * We are just starting.
42    */
43   PHASE_INITIAL,
44
45   /**
46    * We have send the number of our elements to the other
47    * peer, but did not setup our element set yet.
48    */
49   PHASE_COUNT_SENT,
50
51   /**
52    * We have initialized our set and are now reducing it by exchanging
53    * Bloom filters until one party notices the their element hashes
54    * are equal.
55    */
56   PHASE_BF_EXCHANGE,
57
58   /**
59    * The protocol is over.  Results may still have to be sent to the
60    * client.
61    */
62   PHASE_FINISHED
63
64 };
65
66
67 /**
68  * State of an evaluate operation with another peer.
69  */
70 struct OperationState
71 {
72   /**
73    * The bf we currently receive
74    */
75   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
76
77   /**
78    * BF of the set's element.
79    */
80   struct GNUNET_CONTAINER_BloomFilter *local_bf;
81
82   /**
83    * Remaining elements in the intersection operation.
84    * Maps element-id-hashes to 'elements in our set'.
85    */
86   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
87
88   /**
89    * Iterator for sending the final set of @e my_elements to the client.
90    */
91   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
92
93   /**
94    * Evaluate operations are held in a linked list.
95    */
96   struct OperationState *next;
97
98   /**
99    * Evaluate operations are held in a linked list.
100    */
101   struct OperationState *prev;
102
103   /**
104    * For multipart BF transmissions, we have to store the
105    * bloomfilter-data until we fully received it.
106    */
107   char *bf_data;
108
109   /**
110    * XOR of the keys of all of the elements (remaining) in my set.
111    * Always updated when elements are added or removed to
112    * @e my_elements.
113    */
114   struct GNUNET_HashCode my_xor;
115
116   /**
117    * XOR of the keys of all of the elements (remaining) in
118    * the other peer's set.  Updated when we receive the
119    * other peer's Bloom filter.
120    */
121   struct GNUNET_HashCode other_xor;
122
123   /**
124    * How many bytes of @e bf_data are valid?
125    */
126   uint32_t bf_data_offset;
127
128   /**
129    * Current element count contained within @e my_elements.
130    * (May differ briefly during initialization.)
131    */
132   uint32_t my_element_count;
133
134   /**
135    * size of the bloomfilter in @e bf_data.
136    */
137   uint32_t bf_data_size;
138
139   /**
140    * size of the bloomfilter
141    */
142   uint32_t bf_bits_per_element;
143
144   /**
145    * Salt currently used for BF construction (by us or the other peer,
146    * depending on where we are in the code).
147    */
148   uint32_t salt;
149
150   /**
151    * Current state of the operation.
152    */
153   enum IntersectionOperationPhase phase;
154
155   /**
156    * Generation in which the operation handle
157    * was created.
158    */
159   unsigned int generation_created;
160
161   /**
162    * Did we send the client that we are done?
163    */
164   int client_done_sent;
165 };
166
167
168 /**
169  * Extra state required for efficient set intersection.
170  * Merely tracks the total number of elements.
171  */
172 struct SetState
173 {
174   /**
175    * Number of currently valid elements in the set which have not been
176    * removed.
177    */
178   uint32_t current_set_element_count;
179 };
180
181
182 /**
183  * If applicable in the current operation mode, send a result message
184  * to the client indicating we removed an element.
185  *
186  * @param op intersection operation
187  * @param element element to send
188  */
189 static void
190 send_client_removed_element (struct Operation *op,
191                              struct GNUNET_SET_Element *element)
192 {
193   struct GNUNET_MQ_Envelope *ev;
194   struct GNUNET_SET_ResultMessage *rm;
195
196   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
197     return; /* Wrong mode for transmitting removed elements */
198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199               "Sending removed element (size %u) to client\n",
200               element->size);
201   GNUNET_assert (0 != op->spec->client_request_id);
202   ev = GNUNET_MQ_msg_extra (rm,
203                             element->size,
204                             GNUNET_MESSAGE_TYPE_SET_RESULT);
205   if (NULL == ev)
206   {
207     GNUNET_break (0);
208     return;
209   }
210   rm->result_status = htons (GNUNET_SET_STATUS_OK);
211   rm->request_id = htonl (op->spec->client_request_id);
212   rm->element_type = element->element_type;
213   GNUNET_memcpy (&rm[1],
214           element->data,
215           element->size);
216   GNUNET_MQ_send (op->spec->set->client_mq,
217                   ev);
218 }
219
220
221 /**
222  * Fills the "my_elements" hashmap with all relevant elements.
223  *
224  * @param cls the `struct Operation *` we are performing
225  * @param key current key code
226  * @param value the `struct ElementEntry *` from the hash map
227  * @return #GNUNET_YES (we should continue to iterate)
228  */
229 static int
230 filtered_map_initialization (void *cls,
231                              const struct GNUNET_HashCode *key,
232                              void *value)
233 {
234   struct Operation *op = cls;
235   struct ElementEntry *ee = value;
236   struct GNUNET_HashCode mutated_hash;
237
238
239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
240               "FIMA called for %s:%u\n",
241               GNUNET_h2s (&ee->element_hash),
242               ee->element.size);
243
244   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
245   {
246     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
247                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
248                 GNUNET_h2s (&ee->element_hash),
249                 ee->element.size);
250     return GNUNET_YES; /* element not valid in our operation's generation */
251   }
252
253   /* Test if element is in other peer's bloomfilter */
254   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
255                             op->state->salt,
256                             &mutated_hash);
257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
258               "Testing mingled hash %s with salt %u\n",
259               GNUNET_h2s (&mutated_hash),
260               op->state->salt);
261   if (GNUNET_NO ==
262       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
263                                          &mutated_hash))
264   {
265     /* remove this element */
266     send_client_removed_element (op,
267                                  &ee->element);
268     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
269                 "Reduced initialization, not starting with %s:%u\n",
270                 GNUNET_h2s (&ee->element_hash),
271                 ee->element.size);
272     return GNUNET_YES;
273   }
274   op->state->my_element_count++;
275   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
276                           &ee->element_hash,
277                           &op->state->my_xor);
278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
279               "Filtered initialization of my_elements, adding %s:%u\n",
280               GNUNET_h2s (&ee->element_hash),
281               ee->element.size);
282   GNUNET_break (GNUNET_YES ==
283                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
284                                                    &ee->element_hash,
285                                                    ee,
286                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
287
288   return GNUNET_YES;
289 }
290
291
292 /**
293  * Removes elements from our hashmap if they are not contained within the
294  * provided remote bloomfilter.
295  *
296  * @param cls closure with the `struct Operation *`
297  * @param key current key code
298  * @param value value in the hash map
299  * @return #GNUNET_YES (we should continue to iterate)
300  */
301 static int
302 iterator_bf_reduce (void *cls,
303                    const struct GNUNET_HashCode *key,
304                    void *value)
305 {
306   struct Operation *op = cls;
307   struct ElementEntry *ee = value;
308   struct GNUNET_HashCode mutated_hash;
309
310   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
311                             op->state->salt,
312                             &mutated_hash);
313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
314               "Testing mingled hash %s with salt %u\n",
315               GNUNET_h2s (&mutated_hash),
316               op->state->salt);
317   if (GNUNET_NO ==
318       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
319                                          &mutated_hash))
320   {
321     GNUNET_break (0 < op->state->my_element_count);
322     op->state->my_element_count--;
323     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
324                             &ee->element_hash,
325                             &op->state->my_xor);
326     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327                 "Bloom filter reduction of my_elements, removing %s:%u\n",
328                 GNUNET_h2s (&ee->element_hash),
329                 ee->element.size);
330     GNUNET_assert (GNUNET_YES ==
331                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
332                                                          &ee->element_hash,
333                                                          ee));
334     send_client_removed_element (op,
335                                  &ee->element);
336   }
337   else
338   {
339     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
341                 GNUNET_h2s (&ee->element_hash),
342                 ee->element.size);
343   }
344   return GNUNET_YES;
345 }
346
347
348 /**
349  * Create initial bloomfilter based on all the elements given.
350  *
351  * @param cls the `struct Operation *`
352  * @param key current key code
353  * @param value the `struct ElementEntry` to process
354  * @return #GNUNET_YES (we should continue to iterate)
355  */
356 static int
357 iterator_bf_create (void *cls,
358                     const struct GNUNET_HashCode *key,
359                     void *value)
360 {
361   struct Operation *op = cls;
362   struct ElementEntry *ee = value;
363   struct GNUNET_HashCode mutated_hash;
364
365   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
366                             op->state->salt,
367                             &mutated_hash);
368   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
369               "Initializing BF with hash %s with salt %u\n",
370               GNUNET_h2s (&mutated_hash),
371               op->state->salt);
372   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
373                                     &mutated_hash);
374   return GNUNET_YES;
375 }
376
377
378 /**
379  * Inform the client that the intersection operation has failed,
380  * and proceed to destroy the evaluate operation.
381  *
382  * @param op the intersection operation to fail
383  */
384 static void
385 fail_intersection_operation (struct Operation *op)
386 {
387   struct GNUNET_MQ_Envelope *ev;
388   struct GNUNET_SET_ResultMessage *msg;
389
390   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
391               "Intersection operation failed\n");
392   if (NULL != op->state->my_elements)
393   {
394     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
395     op->state->my_elements = NULL;
396   }
397   ev = GNUNET_MQ_msg (msg,
398                       GNUNET_MESSAGE_TYPE_SET_RESULT);
399   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
400   msg->request_id = htonl (op->spec->client_request_id);
401   msg->element_type = htons (0);
402   GNUNET_MQ_send (op->spec->set->client_mq,
403                   ev);
404   _GSS_operation_destroy (op,
405                           GNUNET_YES);
406 }
407
408
409 /**
410  * Send a bloomfilter to our peer.  After the result done message has
411  * been sent to the client, destroy the evaluate operation.
412  *
413  * @param op intersection operation
414  */
415 static void
416 send_bloomfilter (struct Operation *op)
417 {
418   struct GNUNET_MQ_Envelope *ev;
419   struct BFMessage *msg;
420   uint32_t bf_size;
421   uint32_t bf_elementbits;
422   uint32_t chunk_size;
423   char *bf_data;
424   uint32_t offset;
425
426   /* We consider the ratio of the set sizes to determine
427      the number of bits per element, as the smaller set
428      should use more bits to maximize its set reduction
429      potential and minimize overall bandwidth consumption. */
430   bf_elementbits = 2 + ceil (log2((double)
431                              (op->spec->remote_element_count /
432                                    (double) op->state->my_element_count)));
433   if (bf_elementbits < 1)
434     bf_elementbits = 1; /* make sure k is not 0 */
435   /* optimize BF-size to ~50% of bits set */
436   bf_size = ceil ((double) (op->state->my_element_count
437                             * bf_elementbits / log(2)));
438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
439               "Sending Bloom filter (%u) of size %u bytes\n",
440               (unsigned int) bf_elementbits,
441               (unsigned int) bf_size);
442   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
443                                                            bf_size,
444                                                            bf_elementbits);
445   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
446                                               UINT32_MAX);
447   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
448                                          &iterator_bf_create,
449                                          op);
450
451   /* send our Bloom filter */
452   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
453   if (bf_size <= chunk_size)
454   {
455     /* singlepart */
456     chunk_size = bf_size;
457     ev = GNUNET_MQ_msg_extra (msg,
458                               chunk_size,
459                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
460     GNUNET_assert (GNUNET_SYSERR !=
461                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
462                                                               (char*) &msg[1],
463                                                               bf_size));
464     msg->sender_element_count = htonl (op->state->my_element_count);
465     msg->bloomfilter_total_length = htonl (bf_size);
466     msg->bits_per_element = htonl (bf_elementbits);
467     msg->sender_mutator = htonl (op->state->salt);
468     msg->element_xor_hash = op->state->my_xor;
469     GNUNET_MQ_send (op->mq, ev);
470   }
471   else
472   {
473     /* multipart */
474     bf_data = GNUNET_malloc (bf_size);
475     GNUNET_assert (GNUNET_SYSERR !=
476                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
477                                                               bf_data,
478                                                               bf_size));
479     offset = 0;
480     while (offset < bf_size)
481     {
482       if (bf_size - chunk_size < offset)
483         chunk_size = bf_size - offset;
484       ev = GNUNET_MQ_msg_extra (msg,
485                                 chunk_size,
486                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
487       GNUNET_memcpy (&msg[1],
488               &bf_data[offset],
489               chunk_size);
490       offset += chunk_size;
491       msg->sender_element_count = htonl (op->state->my_element_count);
492       msg->bloomfilter_total_length = htonl (bf_size);
493       msg->bits_per_element = htonl (bf_elementbits);
494       msg->sender_mutator = htonl (op->state->salt);
495       msg->element_xor_hash = op->state->my_xor;
496       GNUNET_MQ_send (op->mq, ev);
497     }
498     GNUNET_free (bf_data);
499   }
500   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
501   op->state->local_bf = NULL;
502 }
503
504
505 /**
506  * Signal to the client that the operation has finished and
507  * destroy the operation.
508  *
509  * @param cls operation to destroy
510  */
511 static void
512 send_client_done_and_destroy (void *cls)
513 {
514   struct Operation *op = cls;
515   struct GNUNET_MQ_Envelope *ev;
516   struct GNUNET_SET_ResultMessage *rm;
517
518   ev = GNUNET_MQ_msg (rm,
519                       GNUNET_MESSAGE_TYPE_SET_RESULT);
520   rm->request_id = htonl (op->spec->client_request_id);
521   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
522   rm->element_type = htons (0);
523   GNUNET_MQ_send (op->spec->set->client_mq,
524                   ev);
525   _GSS_operation_destroy (op,
526                           GNUNET_YES);
527 }
528
529
530 /**
531  * Send all elements in the full result iterator.
532  *
533  * @param cls the `struct Operation *`
534  */
535 static void
536 send_remaining_elements (void *cls)
537 {
538   struct Operation *op = cls;
539   const void *nxt;
540   const struct ElementEntry *ee;
541   struct GNUNET_MQ_Envelope *ev;
542   struct GNUNET_SET_ResultMessage *rm;
543   const struct GNUNET_SET_Element *element;
544   int res;
545
546   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
547                                                      NULL,
548                                                      &nxt);
549   if (GNUNET_NO == res)
550   {
551     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552                 "Sending done and destroy because iterator ran out\n");
553     op->keep--;
554     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
555     op->state->full_result_iter = NULL;
556     send_client_done_and_destroy (op);
557     return;
558   }
559   ee = nxt;
560   element = &ee->element;
561   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562               "Sending element %s:%u to client (full set)\n",
563               GNUNET_h2s (&ee->element_hash),
564               element->size);
565   GNUNET_assert (0 != op->spec->client_request_id);
566   ev = GNUNET_MQ_msg_extra (rm,
567                             element->size,
568                             GNUNET_MESSAGE_TYPE_SET_RESULT);
569   GNUNET_assert (NULL != ev);
570   rm->result_status = htons (GNUNET_SET_STATUS_OK);
571   rm->request_id = htonl (op->spec->client_request_id);
572   rm->element_type = element->element_type;
573   GNUNET_memcpy (&rm[1],
574           element->data,
575           element->size);
576   GNUNET_MQ_notify_sent (ev,
577                          &send_remaining_elements,
578                          op);
579   GNUNET_MQ_send (op->spec->set->client_mq,
580                   ev);
581 }
582
583
584 /**
585  * Inform the peer that this operation is complete.
586  *
587  * @param op the intersection operation to fail
588  */
589 static void
590 send_peer_done (struct Operation *op)
591 {
592   struct GNUNET_MQ_Envelope *ev;
593   struct IntersectionDoneMessage *idm;
594
595   op->state->phase = PHASE_FINISHED;
596   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597               "Intersection succeeded, sending DONE\n");
598   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
599   op->state->local_bf = NULL;
600
601   ev = GNUNET_MQ_msg (idm,
602                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
603   idm->final_element_count = htonl (op->state->my_element_count);
604   idm->element_xor_hash = op->state->my_xor;
605   GNUNET_MQ_send (op->mq,
606                   ev);
607 }
608
609
610 /**
611  * Process a Bloomfilter once we got all the chunks.
612  *
613  * @param op the intersection operation
614  */
615 static void
616 process_bf (struct Operation *op)
617 {
618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
620               op->state->phase,
621               op->spec->remote_element_count,
622               op->state->my_element_count,
623               GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
624   switch (op->state->phase)
625   {
626   case PHASE_INITIAL:
627     GNUNET_break_op (0);
628     fail_intersection_operation(op);
629     return;
630   case PHASE_COUNT_SENT:
631     /* This is the first BF being sent, build our initial map with
632        filtering in place */
633     op->state->my_element_count = 0;
634     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
635                                            &filtered_map_initialization,
636                                            op);
637     break;
638   case PHASE_BF_EXCHANGE:
639     /* Update our set by reduction */
640     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
641                                            &iterator_bf_reduce,
642                                            op);
643     break;
644   case PHASE_FINISHED:
645     GNUNET_break_op (0);
646     fail_intersection_operation(op);
647     return;
648   }
649   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
650   op->state->remote_bf = NULL;
651
652   if ( (0 == op->state->my_element_count) || /* fully disjoint */
653        ( (op->state->my_element_count == op->spec->remote_element_count) &&
654          (0 == memcmp (&op->state->my_xor,
655                        &op->state->other_xor,
656                        sizeof (struct GNUNET_HashCode))) ) )
657   {
658     /* we are done */
659     send_peer_done (op);
660     return;
661   }
662   op->state->phase = PHASE_BF_EXCHANGE;
663   send_bloomfilter (op);
664 }
665
666
667 /**
668  * Check an BF message from a remote peer.
669  *
670  * @param cls the intersection operation
671  * @param msg the header of the message
672  * @return #GNUNET_OK if @a msg is well-formed
673  */
674 int
675 check_intersection_p2p_bf (void *cls,
676                            const struct BFMessage *msg)
677 {
678   struct Operation *op = cls;
679
680   if (GNUNET_SET_OPERATION_INTERSECTION != op->operation)
681   {
682     GNUNET_break_op (0);
683     return GNUNET_SYSERR;
684   }
685   return GNUNET_OK;
686 }
687
688
689 /**
690  * Handle an BF message from a remote peer.
691  *
692  * @param cls the intersection operation
693  * @param msg the header of the message
694  */
695 void
696 handle_intersection_p2p_bf (void *cls,
697                             const struct BFMessage *msg)
698 {
699   struct Operation *op = cls;
700   uint32_t bf_size;
701   uint32_t chunk_size;
702   uint32_t bf_bits_per_element;
703
704   switch (op->state->phase)
705   {
706   case PHASE_INITIAL:
707     GNUNET_break_op (0);
708     fail_intersection_operation (op);
709     return;
710   case PHASE_COUNT_SENT:
711   case PHASE_BF_EXCHANGE:
712     bf_size = ntohl (msg->bloomfilter_total_length);
713     bf_bits_per_element = ntohl (msg->bits_per_element);
714     chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
715     op->state->other_xor = msg->element_xor_hash;
716     if (bf_size == chunk_size)
717     {
718       if (NULL != op->state->bf_data)
719       {
720         GNUNET_break_op (0);
721         fail_intersection_operation (op);
722         return;
723       }
724       /* single part, done here immediately */
725       op->state->remote_bf
726         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
727                                              bf_size,
728                                              bf_bits_per_element);
729       op->state->salt = ntohl (msg->sender_mutator);
730       op->spec->remote_element_count = ntohl (msg->sender_element_count);
731       process_bf (op);
732       break;
733     }
734     /* multipart chunk */
735     if (NULL == op->state->bf_data)
736     {
737       /* first chunk, initialize */
738       op->state->bf_data = GNUNET_malloc (bf_size);
739       op->state->bf_data_size = bf_size;
740       op->state->bf_bits_per_element = bf_bits_per_element;
741       op->state->bf_data_offset = 0;
742       op->state->salt = ntohl (msg->sender_mutator);
743       op->spec->remote_element_count = ntohl (msg->sender_element_count);
744     }
745     else
746     {
747       /* increment */
748       if ( (op->state->bf_data_size != bf_size) ||
749            (op->state->bf_bits_per_element != bf_bits_per_element) ||
750            (op->state->bf_data_offset + chunk_size > bf_size) ||
751            (op->state->salt != ntohl (msg->sender_mutator)) ||
752            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
753       {
754         GNUNET_break_op (0);
755         fail_intersection_operation (op);
756         return;
757       }
758     }
759     GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
760             (const char*) &msg[1],
761             chunk_size);
762     op->state->bf_data_offset += chunk_size;
763     if (op->state->bf_data_offset == bf_size)
764     {
765       /* last chunk, run! */
766       op->state->remote_bf
767         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
768                                              bf_size,
769                                              bf_bits_per_element);
770       GNUNET_free (op->state->bf_data);
771       op->state->bf_data = NULL;
772       op->state->bf_data_size = 0;
773       process_bf (op);
774     }
775     break;
776   default:
777     GNUNET_break_op (0);
778     fail_intersection_operation (op);
779     return;
780   }
781   GNUNET_CADET_receive_done (op->channel);
782 }
783
784
785 /**
786  * Fills the "my_elements" hashmap with the initial set of
787  * (non-deleted) elements from the set of the specification.
788  *
789  * @param cls closure with the `struct Operation *`
790  * @param key current key code for the element
791  * @param value value in the hash map with the `struct ElementEntry *`
792  * @return #GNUNET_YES (we should continue to iterate)
793  */
794 static int
795 initialize_map_unfiltered (void *cls,
796                            const struct GNUNET_HashCode *key,
797                            void *value)
798 {
799   struct ElementEntry *ee = value;
800   struct Operation *op = cls;
801
802   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
803     return GNUNET_YES; /* element not live in operation's generation */
804   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
805                           &ee->element_hash,
806                           &op->state->my_xor);
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Initial full initialization of my_elements, adding %s:%u\n",
809               GNUNET_h2s (&ee->element_hash),
810               ee->element.size);
811   GNUNET_break (GNUNET_YES ==
812                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
813                                                    &ee->element_hash,
814                                                    ee,
815                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
816   return GNUNET_YES;
817 }
818
819
820 /**
821  * Send our element count to the peer, in case our element count is
822  * lower than his.
823  *
824  * @param op intersection operation
825  */
826 static void
827 send_element_count (struct Operation *op)
828 {
829   struct GNUNET_MQ_Envelope *ev;
830   struct IntersectionElementInfoMessage *msg;
831
832   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833               "Sending our element count (%u)\n",
834               op->state->my_element_count);
835   ev = GNUNET_MQ_msg (msg,
836                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
837   msg->sender_element_count = htonl (op->state->my_element_count);
838   GNUNET_MQ_send (op->mq, ev);
839 }
840
841
842 /**
843  * We go first, initialize our map with all elements and
844  * send the first Bloom filter.
845  *
846  * @param op operation to start exchange for
847  */
848 static void
849 begin_bf_exchange (struct Operation *op)
850 {
851   op->state->phase = PHASE_BF_EXCHANGE;
852   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
853                                          &initialize_map_unfiltered,
854                                          op);
855   send_bloomfilter (op);
856 }
857
858
859 /**
860  * Handle the initial `struct IntersectionElementInfoMessage` from a
861  * remote peer.
862  *
863  * @param cls the intersection operation
864  * @param mh the header of the message
865  */
866 void
867 handle_intersection_p2p_element_info (void *cls,
868                                       const struct IntersectionElementInfoMessage *msg)
869 {
870   struct Operation *op = cls;
871
872   if (GNUNET_SET_OPERATION_INTERSECTION != op->operation)
873   {
874     GNUNET_break_op (0);
875     fail_intersection_operation(op);
876     return;
877   }
878   op->spec->remote_element_count = ntohl (msg->sender_element_count);
879   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880               "Received remote element count (%u), I have %u\n",
881               op->spec->remote_element_count,
882               op->state->my_element_count);
883   if ( ( (PHASE_INITIAL != op->state->phase) &&
884          (PHASE_COUNT_SENT != op->state->phase) ) ||
885        (op->state->my_element_count > op->spec->remote_element_count) ||
886        (0 == op->state->my_element_count) ||
887        (0 == op->spec->remote_element_count) )
888   {
889     GNUNET_break_op (0);
890     fail_intersection_operation(op);
891     return;
892   }
893   GNUNET_break (NULL == op->state->remote_bf);
894   begin_bf_exchange (op);
895   GNUNET_CADET_receive_done (op->channel);
896 }
897
898
899 /**
900  * Send a result message to the client indicating that the operation
901  * is over.  After the result done message has been sent to the
902  * client, destroy the evaluate operation.
903  *
904  * @param op intersection operation
905  */
906 static void
907 finish_and_destroy (struct Operation *op)
908 {
909   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
910
911   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
912   {
913     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                 "Sending full result set (%u elements)\n",
915                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
916     op->state->full_result_iter
917       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
918     op->keep++;
919     send_remaining_elements (op);
920     return;
921   }
922   send_client_done_and_destroy (op);
923 }
924
925
926 /**
927  * Remove all elements from our hashmap.
928  *
929  * @param cls closure with the `struct Operation *`
930  * @param key current key code
931  * @param value value in the hash map
932  * @return #GNUNET_YES (we should continue to iterate)
933  */
934 static int
935 filter_all (void *cls,
936             const struct GNUNET_HashCode *key,
937             void *value)
938 {
939   struct Operation *op = cls;
940   struct ElementEntry *ee = value;
941
942   GNUNET_break (0 < op->state->my_element_count);
943   op->state->my_element_count--;
944   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
945                           &ee->element_hash,
946                           &op->state->my_xor);
947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948               "Final reduction of my_elements, removing %s:%u\n",
949               GNUNET_h2s (&ee->element_hash),
950               ee->element.size);
951   GNUNET_assert (GNUNET_YES ==
952                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
953                                                        &ee->element_hash,
954                                                        ee));
955   send_client_removed_element (op,
956                                &ee->element);
957   return GNUNET_YES;
958 }
959
960
961 /**
962  * Handle a done message from a remote peer
963  *
964  * @param cls the intersection operation
965  * @param mh the message
966  */
967 void
968 handle_intersection_p2p_done (void *cls,
969                               const struct IntersectionDoneMessage *idm)
970 {
971   struct Operation *op = cls;
972
973   if (GNUNET_SET_OPERATION_INTERSECTION != op->operation)
974   {
975     GNUNET_break_op (0);
976     fail_intersection_operation(op);
977     return;
978   }
979   if (PHASE_BF_EXCHANGE != op->state->phase)
980   {
981     /* wrong phase to conclude? FIXME: Or should we allow this
982        if the other peer has _initially_ already an empty set? */
983     GNUNET_break_op (0);
984     fail_intersection_operation (op);
985     return;
986   }
987   if (0 == ntohl (idm->final_element_count))
988   {
989     /* other peer determined empty set is the intersection,
990        remove all elements */
991     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
992                                            &filter_all,
993                                            op);
994   }
995   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
996        (0 != memcmp (&op->state->my_xor,
997                      &idm->element_xor_hash,
998                      sizeof (struct GNUNET_HashCode))) )
999   {
1000     /* Other peer thinks we are done, but we disagree on the result! */
1001     GNUNET_break_op (0);
1002     fail_intersection_operation (op);
1003     return;
1004   }
1005   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1007               op->state->my_element_count);
1008   op->state->phase = PHASE_FINISHED;
1009   finish_and_destroy (op);
1010   GNUNET_CADET_receive_done (op->channel);
1011 }
1012
1013
1014 /**
1015  * Initiate a set intersection operation with a remote peer.
1016  *
1017  * @param op operation that is created, should be initialized to
1018  *        begin the evaluation
1019  * @param opaque_context message to be transmitted to the listener
1020  *        to convince him to accept, may be NULL
1021  */
1022 static void
1023 intersection_evaluate (struct Operation *op,
1024                        const struct GNUNET_MessageHeader *opaque_context)
1025 {
1026   struct GNUNET_MQ_Envelope *ev;
1027   struct OperationRequestMessage *msg;
1028
1029   op->state = GNUNET_new (struct OperationState);
1030   /* we started the operation, thus we have to send the operation request */
1031   op->state->phase = PHASE_INITIAL;
1032   op->state->my_element_count = op->spec->set->state->current_set_element_count;
1033   op->state->my_elements
1034     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
1035                                             GNUNET_YES);
1036
1037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038               "Initiating intersection operation evaluation\n");
1039   ev = GNUNET_MQ_msg_nested_mh (msg,
1040                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1041                                 opaque_context);
1042   if (NULL == ev)
1043   {
1044     /* the context message is too large!? */
1045     GNUNET_break (0);
1046     GNUNET_SERVICE_client_drop (op->spec->set->client);
1047     return;
1048   }
1049   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1050   msg->element_count = htonl (op->state->my_element_count);
1051   GNUNET_MQ_send (op->mq,
1052                   ev);
1053   op->state->phase = PHASE_COUNT_SENT;
1054   if (NULL != opaque_context)
1055     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056                 "Sent op request with context message\n");
1057   else
1058     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                 "Sent op request without context message\n");
1060 }
1061
1062
1063 /**
1064  * Accept an intersection operation request from a remote peer.  Only
1065  * initializes the private operation state.
1066  *
1067  * @param op operation that will be accepted as an intersection operation
1068  */
1069 static void
1070 intersection_accept (struct Operation *op)
1071 {
1072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073               "Accepting set intersection operation\n");
1074   op->state = GNUNET_new (struct OperationState);
1075   op->state->phase = PHASE_INITIAL;
1076   op->state->my_element_count
1077     = op->spec->set->state->current_set_element_count;
1078   GNUNET_assert (NULL == op->state->my_elements);
1079   op->state->my_elements
1080     = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count,
1081                                                         op->spec->remote_element_count),
1082                                             GNUNET_YES);
1083   if (op->spec->remote_element_count < op->state->my_element_count)
1084   {
1085     /* If the other peer (Alice) has fewer elements than us (Bob),
1086        we just send the count as Alice should send the first BF */
1087     send_element_count (op);
1088     op->state->phase = PHASE_COUNT_SENT;
1089     return;
1090   }
1091   /* We have fewer elements, so we start with the BF */
1092   begin_bf_exchange (op);
1093 }
1094
1095
1096 /**
1097  * Handler for peer-disconnects, notifies the client about the aborted
1098  * operation.  If we did not expect anything from the other peer, we
1099  * gracefully terminate the operation.
1100  *
1101  * @param op the destroyed operation
1102  */
1103 static void
1104 intersection_peer_disconnect (struct Operation *op)
1105 {
1106   if (PHASE_FINISHED != op->state->phase)
1107   {
1108     fail_intersection_operation (op);
1109     return;
1110   }
1111   /* the session has already been concluded */
1112   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113               "Other peer disconnected (finished)\n");
1114   if (GNUNET_NO == op->state->client_done_sent)
1115     finish_and_destroy (op);
1116 }
1117
1118
1119 /**
1120  * Destroy the intersection operation.  Only things specific to the
1121  * intersection operation are destroyed.
1122  *
1123  * @param op intersection operation to destroy
1124  */
1125 static void
1126 intersection_op_cancel (struct Operation *op)
1127 {
1128   /* check if the op was canceled twice */
1129   GNUNET_assert (NULL != op->state);
1130   if (NULL != op->state->remote_bf)
1131   {
1132     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1133     op->state->remote_bf = NULL;
1134   }
1135   if (NULL != op->state->local_bf)
1136   {
1137     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1138     op->state->local_bf = NULL;
1139   }
1140   if (NULL != op->state->my_elements)
1141   {
1142     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1143     op->state->my_elements = NULL;
1144   }
1145   if (NULL != op->state->full_result_iter)
1146   {
1147     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1148     op->state->full_result_iter = NULL;
1149   }
1150   GNUNET_free (op->state);
1151   op->state = NULL;
1152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153               "Destroying intersection op state done\n");
1154 }
1155
1156
1157 /**
1158  * Create a new set supporting the intersection operation.
1159  *
1160  * @return the newly created set
1161  */
1162 static struct SetState *
1163 intersection_set_create ()
1164 {
1165   struct SetState *set_state;
1166
1167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168               "Intersection set created\n");
1169   set_state = GNUNET_new (struct SetState);
1170   set_state->current_set_element_count = 0;
1171
1172   return set_state;
1173 }
1174
1175
1176 /**
1177  * Add the element from the given element message to the set.
1178  *
1179  * @param set_state state of the set want to add to
1180  * @param ee the element to add to the set
1181  */
1182 static void
1183 intersection_add (struct SetState *set_state,
1184                   struct ElementEntry *ee)
1185 {
1186   set_state->current_set_element_count++;
1187 }
1188
1189
1190 /**
1191  * Destroy a set that supports the intersection operation
1192  *
1193  * @param set_state the set to destroy
1194  */
1195 static void
1196 intersection_set_destroy (struct SetState *set_state)
1197 {
1198   GNUNET_free (set_state);
1199 }
1200
1201
1202 /**
1203  * Remove the element given in the element message from the set.
1204  *
1205  * @param set_state state of the set to remove from
1206  * @param element set element to remove
1207  */
1208 static void
1209 intersection_remove (struct SetState *set_state,
1210                      struct ElementEntry *element)
1211 {
1212   GNUNET_assert (0 < set_state->current_set_element_count);
1213   set_state->current_set_element_count--;
1214 }
1215
1216
1217 /**
1218  * Get the table with implementing functions for set intersection.
1219  *
1220  * @return the operation specific VTable
1221  */
1222 const struct SetVT *
1223 _GSS_intersection_vt ()
1224 {
1225   static const struct SetVT intersection_vt = {
1226     .create = &intersection_set_create,
1227     .add = &intersection_add,
1228     .remove = &intersection_remove,
1229     .destroy_set = &intersection_set_destroy,
1230     .evaluate = &intersection_evaluate,
1231     .accept = &intersection_accept,
1232     .peer_disconnect = &intersection_peer_disconnect,
1233     .cancel = &intersection_op_cancel,
1234   };
1235
1236   return &intersection_vt;
1237 }