-fixing misc issues and bugs, including better termination logic for intersection...
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * Alices has suggested an operation to bob,
41    * and is waiting for a bf or session end.
42    */
43   PHASE_INITIAL,
44
45   /**
46    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
47    * until one notices the their element hashes are equal.
48    */
49   PHASE_BF_EXCHANGE,
50
51   /**
52    * The protocol is over.  Results may still have to be sent to the
53    * client.
54    */
55   PHASE_FINISHED
56 };
57
58
59 /**
60  * State of an evaluate operation with another peer.
61  */
62 struct OperationState
63 {
64   /**
65    * The bf we currently receive
66    */
67   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
68
69   /**
70    * BF of the set's element.
71    */
72   struct GNUNET_CONTAINER_BloomFilter *local_bf;
73
74   /**
75    * Remaining elements in the intersection operation.
76    * Maps element-id-hashes to 'elements in our set'.
77    */
78   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
79
80   /**
81    * Iterator for sending the final set of @e my_elements to the client.
82    */
83   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
84
85   /**
86    * Evaluate operations are held in a linked list.
87    */
88   struct OperationState *next;
89
90   /**
91    * Evaluate operations are held in a linked list.
92    */
93   struct OperationState *prev;
94
95   /**
96    * For multipart BF transmissions, we have to store the
97    * bloomfilter-data until we fully received it.
98    */
99   char *bf_data;
100
101   /**
102    * XOR of the keys of all of the elements (remaining) in my set.
103    * Always updated when elements are added or removed to
104    * @e my_elements.
105    */
106   struct GNUNET_HashCode my_xor;
107
108   /**
109    * XOR of the keys of all of the elements (remaining) in
110    * the other peer's set.  Updated when we receive the
111    * other peer's Bloom filter.
112    */
113   struct GNUNET_HashCode other_xor;
114
115   /**
116    * How many bytes of @e bf_data are valid?
117    */
118   uint32_t bf_data_offset;
119
120   /**
121    * Current element count contained within @e my_elements.
122    * (May differ briefly during initialization.)
123    */
124   uint32_t my_element_count;
125
126   /**
127    * size of the bloomfilter in @e bf_data.
128    */
129   uint32_t bf_data_size;
130
131   /**
132    * size of the bloomfilter
133    */
134   uint32_t bf_bits_per_element;
135
136   /**
137    * Salt currently used for BF construction (by us or the other peer,
138    * depending on where we are in the code).
139    */
140   uint32_t salt;
141
142   /**
143    * Current state of the operation.
144    */
145   enum IntersectionOperationPhase phase;
146
147   /**
148    * Generation in which the operation handle
149    * was created.
150    */
151   unsigned int generation_created;
152
153   /**
154    * Did we send the client that we are done?
155    */
156   int client_done_sent;
157 };
158
159
160 /**
161  * Extra state required for efficient set intersection.
162  * Merely tracks the total number of elements.
163  */
164 struct SetState
165 {
166   /**
167    * Number of currently valid elements in the set which have not been
168    * removed.
169    */
170   uint32_t current_set_element_count;
171 };
172
173
174 /**
175  * If applicable in the current operation mode, send a result message
176  * to the client indicating we removed an element.
177  *
178  * @param op intersection operation
179  * @param element element to send
180  */
181 static void
182 send_client_removed_element (struct Operation *op,
183                              struct GNUNET_SET_Element *element)
184 {
185   struct GNUNET_MQ_Envelope *ev;
186   struct GNUNET_SET_ResultMessage *rm;
187
188   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
189     return; /* Wrong mode for transmitting removed elements */
190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
191               "Sending removed element (size %u) to client\n",
192               element->size);
193   GNUNET_assert (0 != op->spec->client_request_id);
194   ev = GNUNET_MQ_msg_extra (rm,
195                             element->size,
196                             GNUNET_MESSAGE_TYPE_SET_RESULT);
197   if (NULL == ev)
198   {
199     GNUNET_break (0);
200     return;
201   }
202   rm->result_status = htons (GNUNET_SET_STATUS_OK);
203   rm->request_id = htonl (op->spec->client_request_id);
204   rm->element_type = element->element_type;
205   memcpy (&rm[1],
206           element->data,
207           element->size);
208   GNUNET_MQ_send (op->spec->set->client_mq,
209                   ev);
210 }
211
212
213 /**
214  * Fills the "my_elements" hashmap with all relevant elements.
215  *
216  * @param cls the `struct Operation *` we are performing
217  * @param key current key code
218  * @param value the `struct ElementEntry *` from the hash map
219  * @return #GNUNET_YES (we should continue to iterate)
220  */
221 static int
222 filtered_map_initialization (void *cls,
223                              const struct GNUNET_HashCode *key,
224                              void *value)
225 {
226   struct Operation *op = cls;
227   struct ElementEntry *ee = value;
228   struct GNUNET_HashCode mutated_hash;
229
230   if ( (op->generation_created < ee->generation_removed) &&
231        (op->generation_created >= ee->generation_added) )
232     return GNUNET_YES; /* element not valid in our operation's generation */
233
234   /* Test if element is in Bob's bloomfilter */
235   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
236                             op->state->salt,
237                             &mutated_hash);
238   if (GNUNET_NO ==
239       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
240                                          &mutated_hash))
241   {
242     /* remove this element */
243     send_client_removed_element (op,
244                                  &ee->element);
245     return GNUNET_YES;
246   }
247   op->state->my_element_count++;
248   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
249                           &ee->element_hash,
250                           &op->state->my_xor);
251   GNUNET_break (GNUNET_YES ==
252                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
253                                                    &ee->element_hash,
254                                                    ee,
255                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
256
257   return GNUNET_YES;
258 }
259
260
261 /**
262  * Removes elements from our hashmap if they are not contained within the
263  * provided remote bloomfilter.
264  *
265  * @param cls closure with the `struct Operation *`
266  * @param key current key code
267  * @param value value in the hash map
268  * @return #GNUNET_YES (we should continue to iterate)
269  */
270 static int
271 iterator_bf_reduce (void *cls,
272                    const struct GNUNET_HashCode *key,
273                    void *value)
274 {
275   struct Operation *op = cls;
276   struct ElementEntry *ee = value;
277   struct GNUNET_HashCode mutated_hash;
278
279   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
280                             op->state->salt,
281                             &mutated_hash);
282   if (GNUNET_NO ==
283       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
284                                          &mutated_hash))
285   {
286     GNUNET_break (0 < op->state->my_element_count);
287     op->state->my_element_count--;
288     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
289                             &ee->element_hash,
290                             &op->state->my_xor);
291     GNUNET_assert (GNUNET_YES ==
292                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
293                                                          &ee->element_hash,
294                                                          ee));
295     send_client_removed_element (op,
296                                  &ee->element);
297   }
298   return GNUNET_YES;
299 }
300
301
302 /**
303  * Create initial bloomfilter based on all the elements given.
304  *
305  * @param cls the `struct Operation *`
306  * @param key current key code
307  * @param value the `struct ElementEntry` to process
308  * @return #GNUNET_YES (we should continue to iterate)
309  */
310 static int
311 iterator_bf_create (void *cls,
312                     const struct GNUNET_HashCode *key,
313                     void *value)
314 {
315   struct Operation *op = cls;
316   struct ElementEntry *ee = value;
317   struct GNUNET_HashCode mutated_hash;
318
319   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
320                             op->state->salt,
321                             &mutated_hash);
322   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
323                                     &mutated_hash);
324   return GNUNET_YES;
325 }
326
327
328 /**
329  * Inform the client that the intersection operation has failed,
330  * and proceed to destroy the evaluate operation.
331  *
332  * @param op the intersection operation to fail
333  */
334 static void
335 fail_intersection_operation (struct Operation *op)
336 {
337   struct GNUNET_MQ_Envelope *ev;
338   struct GNUNET_SET_ResultMessage *msg;
339
340   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
341               "Intersection operation failed\n");
342   if (NULL != op->state->my_elements)
343   {
344     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
345     op->state->my_elements = NULL;
346   }
347   ev = GNUNET_MQ_msg (msg,
348                       GNUNET_MESSAGE_TYPE_SET_RESULT);
349   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
350   msg->request_id = htonl (op->spec->client_request_id);
351   msg->element_type = htons (0);
352   GNUNET_MQ_send (op->spec->set->client_mq,
353                   ev);
354   _GSS_operation_destroy (op,
355                           GNUNET_YES);
356 }
357
358
359 /**
360  * Send a bloomfilter to our peer.  After the result done message has
361  * been sent to the client, destroy the evaluate operation.
362  *
363  * @param op intersection operation
364  */
365 static void
366 send_bloomfilter (struct Operation *op)
367 {
368   struct GNUNET_MQ_Envelope *ev;
369   struct BFMessage *msg;
370   uint32_t bf_size;
371   uint32_t bf_elementbits;
372   uint32_t chunk_size;
373   struct GNUNET_CONTAINER_BloomFilter *local_bf;
374   char *bf_data;
375   uint32_t offset;
376
377   /* We consider the ratio of the set sizes to determine
378      the number of bits per element, as the smaller set
379      should use more bits to maximize its set reduction
380      potential and minimize overall bandwidth consumption. */
381   bf_elementbits = 2 + ceil (log2((double)
382                              (op->spec->remote_element_count /
383                                    (double) op->state->my_element_count)));
384   if (bf_elementbits < 1)
385     bf_elementbits = 1; /* make sure k is not 0 */
386   /* optimize BF-size to ~50% of bits set */
387   bf_size = ceil ((double) (op->state->my_element_count
388                             * bf_elementbits / log(2)));
389   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
390               "Sending bf of size %u\n",
391               (unsigned int) bf_size);
392   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
393                                                 bf_size,
394                                                 bf_elementbits);
395   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
396                                               UINT32_MAX);
397   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
398                                          &iterator_bf_create,
399                                          op);
400
401   /* send our Bloom filter */
402   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
403   if (bf_size <= chunk_size)
404   {
405     /* singlepart */
406     chunk_size = bf_size;
407     ev = GNUNET_MQ_msg_extra (msg,
408                               chunk_size,
409                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
410     GNUNET_assert (GNUNET_SYSERR !=
411                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
412                                                               (char*) &msg[1],
413                                                               bf_size));
414     msg->sender_element_count = htonl (op->state->my_element_count);
415     msg->bloomfilter_total_length = htonl (bf_size);
416     msg->bits_per_element = htonl (bf_elementbits);
417     msg->sender_mutator = htonl (op->state->salt);
418     msg->element_xor_hash = op->state->my_xor;
419     GNUNET_MQ_send (op->mq, ev);
420   }
421   else
422   {
423     /* multipart */
424     bf_data = GNUNET_malloc (bf_size);
425     GNUNET_assert (GNUNET_SYSERR !=
426                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
427                                                               bf_data,
428                                                               bf_size));
429     offset = 0;
430     while (offset < bf_size)
431     {
432       if (bf_size - chunk_size < offset)
433         chunk_size = bf_size - offset;
434       ev = GNUNET_MQ_msg_extra (msg,
435                                 chunk_size,
436                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
437       memcpy (&msg[1],
438               &bf_data[offset],
439               chunk_size);
440       offset += chunk_size;
441       msg->sender_element_count = htonl (op->state->my_element_count);
442       msg->bloomfilter_total_length = htonl (bf_size);
443       msg->bits_per_element = htonl (bf_elementbits);
444       msg->sender_mutator = htonl (op->state->salt);
445       msg->element_xor_hash = op->state->my_xor;
446       GNUNET_MQ_send (op->mq, ev);
447     }
448     GNUNET_free (bf_data);
449   }
450   GNUNET_CONTAINER_bloomfilter_free (local_bf);
451 }
452
453
454 /**
455  * Signal to the client that the operation has finished and
456  * destroy the operation.
457  *
458  * @param cls operation to destroy
459  */
460 static void
461 send_client_done_and_destroy (void *cls)
462 {
463   struct Operation *op = cls;
464   struct GNUNET_MQ_Envelope *ev;
465   struct GNUNET_SET_ResultMessage *rm;
466
467   ev = GNUNET_MQ_msg (rm,
468                       GNUNET_MESSAGE_TYPE_SET_RESULT);
469   rm->request_id = htonl (op->spec->client_request_id);
470   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
471   rm->element_type = htons (0);
472   GNUNET_MQ_send (op->spec->set->client_mq,
473                   ev);
474   _GSS_operation_destroy (op,
475                           GNUNET_YES);
476 }
477
478
479 /**
480  * Send all elements in the full result iterator.
481  *
482  * @param cls the `struct Operation *`
483  */
484 static void
485 send_remaining_elements (void *cls)
486 {
487   struct Operation *op = cls;
488   const void *nxt;
489   const struct ElementEntry *ee;
490   struct GNUNET_MQ_Envelope *ev;
491   struct GNUNET_SET_ResultMessage *rm;
492   const struct GNUNET_SET_Element *element;
493   int res;
494
495   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
496                                                      NULL,
497                                                      &nxt);
498   if (GNUNET_NO == res)
499   {
500     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                 "Sending done and destroy because iterator ran out\n");
502     send_client_done_and_destroy (op);
503     return;
504   }
505   ee = nxt;
506   element = &ee->element;
507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
508               "Sending element (size %u) to client (full set)\n",
509               element->size);
510   GNUNET_assert (0 != op->spec->client_request_id);
511   ev = GNUNET_MQ_msg_extra (rm,
512                             element->size,
513                             GNUNET_MESSAGE_TYPE_SET_RESULT);
514   GNUNET_assert (NULL != ev);
515   rm->result_status = htons (GNUNET_SET_STATUS_OK);
516   rm->request_id = htonl (op->spec->client_request_id);
517   rm->element_type = element->element_type;
518   memcpy (&rm[1],
519           element->data,
520           element->size);
521   GNUNET_MQ_notify_sent (ev,
522                          &send_remaining_elements,
523                          op);
524   GNUNET_MQ_send (op->spec->set->client_mq,
525                   ev);
526 }
527
528
529 /**
530  * Inform the peer that this operation is complete.
531  *
532  * @param op the intersection operation to fail
533  */
534 static void
535 send_peer_done (struct Operation *op)
536 {
537   struct GNUNET_MQ_Envelope *ev;
538   struct IntersectionDoneMessage *idm;
539
540   op->state->phase = PHASE_FINISHED;
541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542               "Intersection succeeded, sending DONE\n");
543   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
544   op->state->local_bf = NULL;
545
546   ev = GNUNET_MQ_msg (idm,
547                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
548   idm->final_element_count = htonl (op->state->my_element_count);
549   idm->element_xor_hash = op->state->my_xor;
550   GNUNET_MQ_send (op->mq,
551                   ev);
552 }
553
554
555 /**
556  * Process a Bloomfilter once we got all the chunks.
557  *
558  * @param op the intersection operation
559  */
560 static void
561 process_bf (struct Operation *op)
562 {
563   switch (op->state->phase)
564   {
565   case PHASE_INITIAL:
566     /* This is the first BF being sent, build our initial map with
567        filtering in place */
568     op->state->my_elements
569       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
570                                               GNUNET_YES);
571     GNUNET_break (0 == op->state->my_element_count);
572     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
573                                            &filtered_map_initialization,
574                                            op);
575     break;
576   case PHASE_BF_EXCHANGE:
577     /* Update our set by reduction */
578     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
579                                            &iterator_bf_reduce,
580                                            op);
581     break;
582   case PHASE_FINISHED:
583     GNUNET_break_op (0);
584     fail_intersection_operation(op);
585     return;
586   }
587   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
588   op->state->remote_bf = NULL;
589
590   if ( (0 == op->state->my_element_count) || /* fully disjoint */
591        ( (op->state->my_element_count == op->spec->remote_element_count) &&
592          (0 == memcmp (&op->state->my_xor,
593                        &op->state->other_xor,
594                        sizeof (struct GNUNET_HashCode))) ) )
595   {
596     /* we are done */
597     send_peer_done (op);
598     return;
599   }
600   op->state->phase = PHASE_BF_EXCHANGE;
601   send_bloomfilter (op);
602 }
603
604
605 /**
606  * Handle an BF message from a remote peer.
607  *
608  * @param cls the intersection operation
609  * @param mh the header of the message
610  */
611 static void
612 handle_p2p_bf (void *cls,
613                const struct GNUNET_MessageHeader *mh)
614 {
615   struct Operation *op = cls;
616   const struct BFMessage *msg;
617   uint32_t bf_size;
618   uint32_t chunk_size;
619   uint32_t bf_bits_per_element;
620   uint16_t msize;
621
622   msize = htons (mh->size);
623   if (msize < sizeof (struct BFMessage))
624   {
625     GNUNET_break_op (0);
626     fail_intersection_operation (op);
627     return;
628   }
629   msg = (const struct BFMessage *) mh;
630   switch (op->state->phase)
631   {
632   case PHASE_INITIAL:
633     GNUNET_break_op (0);
634     fail_intersection_operation (op);
635     break;
636   case PHASE_BF_EXCHANGE:
637     bf_size = ntohl (msg->bloomfilter_total_length);
638     bf_bits_per_element = ntohl (msg->bits_per_element);
639     chunk_size = msize - sizeof (struct BFMessage);
640     op->state->other_xor = msg->element_xor_hash;
641     if (bf_size == chunk_size)
642     {
643       if (NULL != op->state->bf_data)
644       {
645         GNUNET_break_op (0);
646         fail_intersection_operation (op);
647         return;
648       }
649       /* single part, done here immediately */
650       op->state->remote_bf
651         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
652                                              bf_size,
653                                              bf_bits_per_element);
654       op->state->salt = ntohl (msg->sender_mutator);
655       process_bf (op);
656       return;
657     }
658     /* multipart chunk */
659     if (NULL == op->state->bf_data)
660     {
661       /* first chunk, initialize */
662       op->state->bf_data = GNUNET_malloc (bf_size);
663       op->state->bf_data_size = bf_size;
664       op->state->bf_bits_per_element = bf_bits_per_element;
665       op->state->bf_data_offset = 0;
666       op->state->salt = ntohl (msg->sender_mutator);
667       op->spec->remote_element_count = ntohl (msg->sender_element_count);
668     }
669     else
670     {
671       /* increment */
672       if ( (op->state->bf_data_size != bf_size) ||
673            (op->state->bf_bits_per_element != bf_bits_per_element) ||
674            (op->state->bf_data_offset + chunk_size > bf_size) ||
675            (op->state->salt != ntohl (msg->sender_mutator)) ||
676            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
677       {
678         GNUNET_break_op (0);
679         fail_intersection_operation (op);
680         return;
681       }
682     }
683     memcpy (&op->state->bf_data[op->state->bf_data_offset],
684             (const char*) &msg[1],
685             chunk_size);
686     op->state->bf_data_offset += chunk_size;
687     if (op->state->bf_data_offset == bf_size)
688     {
689       /* last chunk, run! */
690       op->state->remote_bf
691         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
692                                              bf_size,
693                                              bf_bits_per_element);
694       GNUNET_free (op->state->bf_data);
695       op->state->bf_data = NULL;
696       op->state->bf_data_size = 0;
697       process_bf (op);
698     }
699     break;
700   default:
701     GNUNET_break_op (0);
702     fail_intersection_operation (op);
703     break;
704   }
705 }
706
707
708 /**
709  * Fills the "my_elements" hashmap with the initial set of
710  * (non-deleted) elements from the set of the specification.
711  *
712  * @param cls closure with the `struct Operation *`
713  * @param key current key code for the element
714  * @param value value in the hash map with the `struct ElementEntry *`
715  * @return #GNUNET_YES (we should continue to iterate)
716  */
717 static int
718 initialize_map_unfiltered (void *cls,
719                            const struct GNUNET_HashCode *key,
720                            void *value)
721 {
722   struct ElementEntry *ee = value;
723   struct Operation *op = cls;
724
725   if ( (op->generation_created < ee->generation_removed) &&
726        (op->generation_created >= ee->generation_added) )
727     return GNUNET_YES; /* element not live in operation's generation */
728   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
729                           &ee->element_hash,
730                           &op->state->my_xor);
731   GNUNET_break (GNUNET_YES ==
732                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
733                                                    &ee->element_hash,
734                                                    ee,
735                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * Send our element count to the peer, in case our element count is
742  * lower than his.
743  *
744  * @param op intersection operation
745  */
746 static void
747 send_element_count (struct Operation *op)
748 {
749   struct GNUNET_MQ_Envelope *ev;
750   struct IntersectionElementInfoMessage *msg;
751
752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
753               "Sending our element count (bf_msg)\n");
754   ev = GNUNET_MQ_msg (msg,
755                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
756   msg->sender_element_count = htonl (op->state->my_element_count);
757   GNUNET_MQ_send (op->mq, ev);
758 }
759
760
761 /**
762  * We go first, initialize our map with all elements and
763  * send the first Bloom filter.
764  *
765  * @param op operation to start exchange for
766  */
767 static void
768 begin_bf_exchange (struct Operation *op)
769 {
770   GNUNET_break (PHASE_INITIAL == op->state->phase);
771   op->state->phase = PHASE_BF_EXCHANGE;
772   op->state->my_elements
773     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
774                                             GNUNET_YES);
775   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
776                                          &initialize_map_unfiltered,
777                                          op);
778   send_bloomfilter (op);
779 }
780
781
782 /**
783  * Handle the initial `struct IntersectionElementInfoMessage` from a
784  * remote peer.
785  *
786  * @param cls the intersection operation
787  * @param mh the header of the message
788  */
789 static void
790 handle_p2p_element_info (void *cls,
791                          const struct GNUNET_MessageHeader *mh)
792 {
793   struct Operation *op = cls;
794   const struct IntersectionElementInfoMessage *msg;
795
796   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
797   {
798     GNUNET_break_op (0);
799     fail_intersection_operation(op);
800     return;
801   }
802   msg = (const struct IntersectionElementInfoMessage *) mh;
803   op->spec->remote_element_count = ntohl (msg->sender_element_count);
804   if ( (PHASE_INITIAL != op->state->phase) ||
805        (op->state->my_element_count > op->spec->remote_element_count) ||
806        (0 == op->state->my_element_count) ||
807        (0 == op->spec->remote_element_count) )
808   {
809     GNUNET_break_op (0);
810     fail_intersection_operation(op);
811     return;
812   }
813   GNUNET_break (NULL == op->state->remote_bf);
814   begin_bf_exchange (op);
815 }
816
817
818 /**
819  * Send a result message to the client indicating that the operation
820  * is over.  After the result done message has been sent to the
821  * client, destroy the evaluate operation.
822  *
823  * @param op intersection operation
824  */
825 static void
826 finish_and_destroy (struct Operation *op)
827 {
828   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
829
830   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
831   {
832     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833                 "Sending full result set\n");
834     op->state->full_result_iter
835       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
836     send_remaining_elements (op);
837     return;
838   }
839   send_client_done_and_destroy (op);
840 }
841
842
843 /**
844  * Remove all elements from our hashmap.
845  *
846  * @param cls closure with the `struct Operation *`
847  * @param key current key code
848  * @param value value in the hash map
849  * @return #GNUNET_YES (we should continue to iterate)
850  */
851 static int
852 filter_all (void *cls,
853             const struct GNUNET_HashCode *key,
854             void *value)
855 {
856   struct Operation *op = cls;
857   struct ElementEntry *ee = value;
858
859   GNUNET_break (0 < op->state->my_element_count);
860   op->state->my_element_count--;
861   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
862                           &ee->element_hash,
863                           &op->state->my_xor);
864   GNUNET_assert (GNUNET_YES ==
865                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
866                                                        &ee->element_hash,
867                                                        ee));
868   send_client_removed_element (op,
869                                &ee->element);
870   return GNUNET_YES;
871 }
872
873
874 /**
875  * Handle a done message from a remote peer
876  *
877  * @param cls the intersection operation
878  * @param mh the message
879  */
880 static void
881 handle_p2p_done (void *cls,
882                  const struct GNUNET_MessageHeader *mh)
883 {
884   struct Operation *op = cls;
885   const struct IntersectionDoneMessage *idm;
886
887   if (PHASE_BF_EXCHANGE != op->state->phase)
888   {
889     /* wrong phase to conclude? FIXME: Or should we allow this
890        if the other peer has _initially_ already an empty set? */
891     GNUNET_break_op (0);
892     fail_intersection_operation (op);
893     return;
894   }
895   if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
896   {
897     GNUNET_break_op (0);
898     fail_intersection_operation (op);
899     return;
900   }
901   idm = (const struct IntersectionDoneMessage *) mh;
902   if (0 == ntohl (idm->final_element_count))
903   {
904     /* other peer determined empty set is the intersection,
905        remove all elements */
906     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
907                                            &filter_all,
908                                            op);
909   }
910   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
911        (0 != memcmp (&op->state->my_xor,
912                      &idm->element_xor_hash,
913                      sizeof (struct GNUNET_HashCode))) )
914   {
915     /* Other peer thinks we are done, but we disagree on the result! */
916     GNUNET_break_op (0);
917     fail_intersection_operation (op);
918     return;
919   }
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Got final DONE\n");
922   op->state->phase = PHASE_FINISHED;
923   finish_and_destroy (op);
924 }
925
926
927 /**
928  * Initiate a set intersection operation with a remote peer.
929  *
930  * @param op operation that is created, should be initialized to
931  *        begin the evaluation
932  * @param opaque_context message to be transmitted to the listener
933  *        to convince him to accept, may be NULL
934  */
935 static void
936 intersection_evaluate (struct Operation *op,
937                        const struct GNUNET_MessageHeader *opaque_context)
938 {
939   struct GNUNET_MQ_Envelope *ev;
940   struct OperationRequestMessage *msg;
941
942   op->state = GNUNET_new (struct OperationState);
943   /* we started the operation, thus we have to send the operation request */
944   op->state->phase = PHASE_INITIAL;
945   op->state->my_element_count = op->spec->set->state->current_set_element_count;
946
947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948               "Initiating intersection operation evaluation");
949   ev = GNUNET_MQ_msg_nested_mh (msg,
950                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
951                                 opaque_context);
952   if (NULL == ev)
953   {
954     /* the context message is too large!? */
955     GNUNET_break (0);
956     GNUNET_SERVER_client_disconnect (op->spec->set->client);
957     return;
958   }
959   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
960   msg->app_id = op->spec->app_id;
961   msg->element_count = htonl (op->state->my_element_count);
962   GNUNET_MQ_send (op->mq,
963                   ev);
964   if (NULL != opaque_context)
965     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966                 "Sent op request with context message\n");
967   else
968     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                 "Sent op request without context message\n");
970 }
971
972
973 /**
974  * Accept an intersection operation request from a remote peer.  Only
975  * initializes the private operation state.
976  *
977  * @param op operation that will be accepted as an intersection operation
978  */
979 static void
980 intersection_accept (struct Operation *op)
981 {
982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983               "Accepting set intersection operation\n");
984   op->state = GNUNET_new (struct OperationState);
985   op->state->phase = PHASE_INITIAL;
986   op->state->my_element_count
987     = op->spec->set->state->current_set_element_count;
988   op->state->my_elements
989     = GNUNET_CONTAINER_multihashmap_create
990     (GNUNET_MIN (op->state->my_element_count,
991                  op->spec->remote_element_count),
992      GNUNET_YES);
993   if (op->spec->remote_element_count < op->state->my_element_count)
994   {
995     /* If the other peer (Alice) has fewer elements than us (Bob),
996        we just send the count as Alice should send the first BF */
997     send_element_count (op);
998     return;
999   }
1000   /* We have fewer elements, so we start with the BF */
1001   begin_bf_exchange (op);
1002 }
1003
1004
1005 /**
1006  * Dispatch messages for a intersection operation.
1007  *
1008  * @param op the state of the intersection evaluate operation
1009  * @param mh the received message
1010  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1011  *         #GNUNET_OK otherwise
1012  */
1013 static int
1014 intersection_handle_p2p_message (struct Operation *op,
1015                                  const struct GNUNET_MessageHeader *mh)
1016 {
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Received p2p message (t: %u, s: %u)\n",
1019               ntohs (mh->type), ntohs (mh->size));
1020   switch (ntohs (mh->type))
1021   {
1022     /* this message handler is not active until after we received an
1023      * operation request message, thus the ops request is not handled here
1024      */
1025   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1026     handle_p2p_element_info (op, mh);
1027     break;
1028   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1029     handle_p2p_bf (op, mh);
1030     break;
1031   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1032     handle_p2p_done (op, mh);
1033     break;
1034   default:
1035     /* something wrong with cadet's message handlers? */
1036     GNUNET_assert (0);
1037   }
1038   return GNUNET_OK;
1039 }
1040
1041
1042 /**
1043  * Handler for peer-disconnects, notifies the client about the aborted
1044  * operation.  If we did not expect anything from the other peer, we
1045  * gracefully terminate the operation.
1046  *
1047  * @param op the destroyed operation
1048  */
1049 static void
1050 intersection_peer_disconnect (struct Operation *op)
1051 {
1052   if (PHASE_FINISHED != op->state->phase)
1053   {
1054     fail_intersection_operation (op);
1055     return;
1056   }
1057   /* the session has already been concluded */
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Other peer disconnected (finished)\n");
1060   if (GNUNET_NO == op->state->client_done_sent)
1061     finish_and_destroy (op);
1062 }
1063
1064
1065 /**
1066  * Destroy the intersection operation.  Only things specific to the
1067  * intersection operation are destroyed.
1068  *
1069  * @param op intersection operation to destroy
1070  */
1071 static void
1072 intersection_op_cancel (struct Operation *op)
1073 {
1074   /* check if the op was canceled twice */
1075   GNUNET_assert (NULL != op->state);
1076   if (NULL != op->state->remote_bf)
1077   {
1078     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1079     op->state->remote_bf = NULL;
1080   }
1081   if (NULL != op->state->local_bf)
1082   {
1083     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1084     op->state->local_bf = NULL;
1085   }
1086   if (NULL != op->state->my_elements)
1087   {
1088     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1089     op->state->my_elements = NULL;
1090   }
1091   GNUNET_free (op->state);
1092   op->state = NULL;
1093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094               "Destroying intersection op state done\n");
1095 }
1096
1097
1098 /**
1099  * Create a new set supporting the intersection operation.
1100  *
1101  * @return the newly created set
1102  */
1103 static struct SetState *
1104 intersection_set_create ()
1105 {
1106   struct SetState *set_state;
1107
1108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109               "Intersection set created\n");
1110   set_state = GNUNET_new (struct SetState);
1111   set_state->current_set_element_count = 0;
1112
1113   return set_state;
1114 }
1115
1116
1117 /**
1118  * Add the element from the given element message to the set.
1119  *
1120  * @param set_state state of the set want to add to
1121  * @param ee the element to add to the set
1122  */
1123 static void
1124 intersection_add (struct SetState *set_state,
1125                   struct ElementEntry *ee)
1126 {
1127   set_state->current_set_element_count++;
1128 }
1129
1130
1131 /**
1132  * Destroy a set that supports the intersection operation
1133  *
1134  * @param set_state the set to destroy
1135  */
1136 static void
1137 intersection_set_destroy (struct SetState *set_state)
1138 {
1139   GNUNET_free (set_state);
1140 }
1141
1142
1143 /**
1144  * Remove the element given in the element message from the set.
1145  *
1146  * @param set_state state of the set to remove from
1147  * @param element set element to remove
1148  */
1149 static void
1150 intersection_remove (struct SetState *set_state,
1151                      struct ElementEntry *element)
1152 {
1153   GNUNET_assert (0 < set_state->current_set_element_count);
1154   set_state->current_set_element_count--;
1155 }
1156
1157
1158 /**
1159  * Get the table with implementing functions for set intersection.
1160  *
1161  * @return the operation specific VTable
1162  */
1163 const struct SetVT *
1164 _GSS_intersection_vt ()
1165 {
1166   static const struct SetVT intersection_vt = {
1167     .create = &intersection_set_create,
1168     .msg_handler = &intersection_handle_p2p_message,
1169     .add = &intersection_add,
1170     .remove = &intersection_remove,
1171     .destroy_set = &intersection_set_destroy,
1172     .evaluate = &intersection_evaluate,
1173     .accept = &intersection_accept,
1174     .peer_disconnect = &intersection_peer_disconnect,
1175     .cancel = &intersection_op_cancel,
1176   };
1177
1178   return &intersection_vt;
1179 }