- another fix to generation handling and lazy copying
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * We are just starting.
41    */
42   PHASE_INITIAL,
43
44   /**
45    * We have send the number of our elements to the other
46    * peer, but did not setup our element set yet.
47    */
48   PHASE_COUNT_SENT,
49
50   /**
51    * We have initialized our set and are now reducing it by exchanging
52    * Bloom filters until one party notices the their element hashes
53    * are equal.
54    */
55   PHASE_BF_EXCHANGE,
56
57   /**
58    * The protocol is over.  Results may still have to be sent to the
59    * client.
60    */
61   PHASE_FINISHED
62 };
63
64
65 /**
66  * State of an evaluate operation with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79
80   /**
81    * Remaining elements in the intersection operation.
82    * Maps element-id-hashes to 'elements in our set'.
83    */
84   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
85
86   /**
87    * Iterator for sending the final set of @e my_elements to the client.
88    */
89   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
90
91   /**
92    * Evaluate operations are held in a linked list.
93    */
94   struct OperationState *next;
95
96   /**
97    * Evaluate operations are held in a linked list.
98    */
99   struct OperationState *prev;
100
101   /**
102    * For multipart BF transmissions, we have to store the
103    * bloomfilter-data until we fully received it.
104    */
105   char *bf_data;
106
107   /**
108    * XOR of the keys of all of the elements (remaining) in my set.
109    * Always updated when elements are added or removed to
110    * @e my_elements.
111    */
112   struct GNUNET_HashCode my_xor;
113
114   /**
115    * XOR of the keys of all of the elements (remaining) in
116    * the other peer's set.  Updated when we receive the
117    * other peer's Bloom filter.
118    */
119   struct GNUNET_HashCode other_xor;
120
121   /**
122    * How many bytes of @e bf_data are valid?
123    */
124   uint32_t bf_data_offset;
125
126   /**
127    * Current element count contained within @e my_elements.
128    * (May differ briefly during initialization.)
129    */
130   uint32_t my_element_count;
131
132   /**
133    * size of the bloomfilter in @e bf_data.
134    */
135   uint32_t bf_data_size;
136
137   /**
138    * size of the bloomfilter
139    */
140   uint32_t bf_bits_per_element;
141
142   /**
143    * Salt currently used for BF construction (by us or the other peer,
144    * depending on where we are in the code).
145    */
146   uint32_t salt;
147
148   /**
149    * Current state of the operation.
150    */
151   enum IntersectionOperationPhase phase;
152
153   /**
154    * Generation in which the operation handle
155    * was created.
156    */
157   unsigned int generation_created;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163 };
164
165
166 /**
167  * Extra state required for efficient set intersection.
168  * Merely tracks the total number of elements.
169  */
170 struct SetState
171 {
172   /**
173    * Number of currently valid elements in the set which have not been
174    * removed.
175    */
176   uint32_t current_set_element_count;
177 };
178
179
180 /**
181  * If applicable in the current operation mode, send a result message
182  * to the client indicating we removed an element.
183  *
184  * @param op intersection operation
185  * @param element element to send
186  */
187 static void
188 send_client_removed_element (struct Operation *op,
189                              struct GNUNET_SET_Element *element)
190 {
191   struct GNUNET_MQ_Envelope *ev;
192   struct GNUNET_SET_ResultMessage *rm;
193
194   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
195     return; /* Wrong mode for transmitting removed elements */
196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
197               "Sending removed element (size %u) to client\n",
198               element->size);
199   GNUNET_assert (0 != op->spec->client_request_id);
200   ev = GNUNET_MQ_msg_extra (rm,
201                             element->size,
202                             GNUNET_MESSAGE_TYPE_SET_RESULT);
203   if (NULL == ev)
204   {
205     GNUNET_break (0);
206     return;
207   }
208   rm->result_status = htons (GNUNET_SET_STATUS_OK);
209   rm->request_id = htonl (op->spec->client_request_id);
210   rm->element_type = element->element_type;
211   memcpy (&rm[1],
212           element->data,
213           element->size);
214   GNUNET_MQ_send (op->spec->set->client_mq,
215                   ev);
216 }
217
218
219 /**
220  * Fills the "my_elements" hashmap with all relevant elements.
221  *
222  * @param cls the `struct Operation *` we are performing
223  * @param key current key code
224  * @param value the `struct ElementEntry *` from the hash map
225  * @return #GNUNET_YES (we should continue to iterate)
226  */
227 static int
228 filtered_map_initialization (void *cls,
229                              const struct GNUNET_HashCode *key,
230                              void *value)
231 {
232   struct Operation *op = cls;
233   struct ElementEntry *ee = value;
234   struct GNUNET_HashCode mutated_hash;
235
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "FIMA called for %s:%u\n",
239               GNUNET_h2s (&ee->element_hash),
240               ee->element.size);
241
242   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
243   {
244     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
245                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
246                 GNUNET_h2s (&ee->element_hash),
247                 ee->element.size);
248     return GNUNET_YES; /* element not valid in our operation's generation */
249   }
250
251   /* Test if element is in other peer's bloomfilter */
252   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
253                             op->state->salt,
254                             &mutated_hash);
255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
256               "Testing mingled hash %s with salt %u\n",
257               GNUNET_h2s (&mutated_hash),
258               op->state->salt);
259   if (GNUNET_NO ==
260       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
261                                          &mutated_hash))
262   {
263     /* remove this element */
264     send_client_removed_element (op,
265                                  &ee->element);
266     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267                 "Reduced initialization, not starting with %s:%u\n",
268                 GNUNET_h2s (&ee->element_hash),
269                 ee->element.size);
270     return GNUNET_YES;
271   }
272   op->state->my_element_count++;
273   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
274                           &ee->element_hash,
275                           &op->state->my_xor);
276   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
277               "Filtered initialization of my_elements, adding %s:%u\n",
278               GNUNET_h2s (&ee->element_hash),
279               ee->element.size);
280   GNUNET_break (GNUNET_YES ==
281                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
282                                                    &ee->element_hash,
283                                                    ee,
284                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
285
286   return GNUNET_YES;
287 }
288
289
290 /**
291  * Removes elements from our hashmap if they are not contained within the
292  * provided remote bloomfilter.
293  *
294  * @param cls closure with the `struct Operation *`
295  * @param key current key code
296  * @param value value in the hash map
297  * @return #GNUNET_YES (we should continue to iterate)
298  */
299 static int
300 iterator_bf_reduce (void *cls,
301                    const struct GNUNET_HashCode *key,
302                    void *value)
303 {
304   struct Operation *op = cls;
305   struct ElementEntry *ee = value;
306   struct GNUNET_HashCode mutated_hash;
307
308   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
309                             op->state->salt,
310                             &mutated_hash);
311   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
312               "Testing mingled hash %s with salt %u\n",
313               GNUNET_h2s (&mutated_hash),
314               op->state->salt);
315   if (GNUNET_NO ==
316       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
317                                          &mutated_hash))
318   {
319     GNUNET_break (0 < op->state->my_element_count);
320     op->state->my_element_count--;
321     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
322                             &ee->element_hash,
323                             &op->state->my_xor);
324     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
325                 "Bloom filter reduction of my_elements, removing %s:%u\n",
326                 GNUNET_h2s (&ee->element_hash),
327                 ee->element.size);
328     GNUNET_assert (GNUNET_YES ==
329                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
330                                                          &ee->element_hash,
331                                                          ee));
332     send_client_removed_element (op,
333                                  &ee->element);
334   }
335   else
336   {
337     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
339                 GNUNET_h2s (&ee->element_hash),
340                 ee->element.size);
341   }
342   return GNUNET_YES;
343 }
344
345
346 /**
347  * Create initial bloomfilter based on all the elements given.
348  *
349  * @param cls the `struct Operation *`
350  * @param key current key code
351  * @param value the `struct ElementEntry` to process
352  * @return #GNUNET_YES (we should continue to iterate)
353  */
354 static int
355 iterator_bf_create (void *cls,
356                     const struct GNUNET_HashCode *key,
357                     void *value)
358 {
359   struct Operation *op = cls;
360   struct ElementEntry *ee = value;
361   struct GNUNET_HashCode mutated_hash;
362
363   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
364                             op->state->salt,
365                             &mutated_hash);
366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367               "Initializing BF with hash %s with salt %u\n",
368               GNUNET_h2s (&mutated_hash),
369               op->state->salt);
370   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
371                                     &mutated_hash);
372   return GNUNET_YES;
373 }
374
375
376 /**
377  * Inform the client that the intersection operation has failed,
378  * and proceed to destroy the evaluate operation.
379  *
380  * @param op the intersection operation to fail
381  */
382 static void
383 fail_intersection_operation (struct Operation *op)
384 {
385   struct GNUNET_MQ_Envelope *ev;
386   struct GNUNET_SET_ResultMessage *msg;
387
388   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
389               "Intersection operation failed\n");
390   if (NULL != op->state->my_elements)
391   {
392     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
393     op->state->my_elements = NULL;
394   }
395   ev = GNUNET_MQ_msg (msg,
396                       GNUNET_MESSAGE_TYPE_SET_RESULT);
397   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
398   msg->request_id = htonl (op->spec->client_request_id);
399   msg->element_type = htons (0);
400   GNUNET_MQ_send (op->spec->set->client_mq,
401                   ev);
402   _GSS_operation_destroy (op,
403                           GNUNET_YES);
404 }
405
406
407 /**
408  * Send a bloomfilter to our peer.  After the result done message has
409  * been sent to the client, destroy the evaluate operation.
410  *
411  * @param op intersection operation
412  */
413 static void
414 send_bloomfilter (struct Operation *op)
415 {
416   struct GNUNET_MQ_Envelope *ev;
417   struct BFMessage *msg;
418   uint32_t bf_size;
419   uint32_t bf_elementbits;
420   uint32_t chunk_size;
421   char *bf_data;
422   uint32_t offset;
423
424   /* We consider the ratio of the set sizes to determine
425      the number of bits per element, as the smaller set
426      should use more bits to maximize its set reduction
427      potential and minimize overall bandwidth consumption. */
428   bf_elementbits = 2 + ceil (log2((double)
429                              (op->spec->remote_element_count /
430                                    (double) op->state->my_element_count)));
431   if (bf_elementbits < 1)
432     bf_elementbits = 1; /* make sure k is not 0 */
433   /* optimize BF-size to ~50% of bits set */
434   bf_size = ceil ((double) (op->state->my_element_count
435                             * bf_elementbits / log(2)));
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
437               "Sending Bloom filter (%u) of size %u bytes\n",
438               (unsigned int) bf_elementbits,
439               (unsigned int) bf_size);
440   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
441                                                            bf_size,
442                                                            bf_elementbits);
443   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
444                                               UINT32_MAX);
445   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
446                                          &iterator_bf_create,
447                                          op);
448
449   /* send our Bloom filter */
450   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
451   if (bf_size <= chunk_size)
452   {
453     /* singlepart */
454     chunk_size = bf_size;
455     ev = GNUNET_MQ_msg_extra (msg,
456                               chunk_size,
457                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
458     GNUNET_assert (GNUNET_SYSERR !=
459                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
460                                                               (char*) &msg[1],
461                                                               bf_size));
462     msg->sender_element_count = htonl (op->state->my_element_count);
463     msg->bloomfilter_total_length = htonl (bf_size);
464     msg->bits_per_element = htonl (bf_elementbits);
465     msg->sender_mutator = htonl (op->state->salt);
466     msg->element_xor_hash = op->state->my_xor;
467     GNUNET_MQ_send (op->mq, ev);
468   }
469   else
470   {
471     /* multipart */
472     bf_data = GNUNET_malloc (bf_size);
473     GNUNET_assert (GNUNET_SYSERR !=
474                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
475                                                               bf_data,
476                                                               bf_size));
477     offset = 0;
478     while (offset < bf_size)
479     {
480       if (bf_size - chunk_size < offset)
481         chunk_size = bf_size - offset;
482       ev = GNUNET_MQ_msg_extra (msg,
483                                 chunk_size,
484                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
485       memcpy (&msg[1],
486               &bf_data[offset],
487               chunk_size);
488       offset += chunk_size;
489       msg->sender_element_count = htonl (op->state->my_element_count);
490       msg->bloomfilter_total_length = htonl (bf_size);
491       msg->bits_per_element = htonl (bf_elementbits);
492       msg->sender_mutator = htonl (op->state->salt);
493       msg->element_xor_hash = op->state->my_xor;
494       GNUNET_MQ_send (op->mq, ev);
495     }
496     GNUNET_free (bf_data);
497   }
498   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
499   op->state->local_bf = NULL;
500 }
501
502
503 /**
504  * Signal to the client that the operation has finished and
505  * destroy the operation.
506  *
507  * @param cls operation to destroy
508  */
509 static void
510 send_client_done_and_destroy (void *cls)
511 {
512   struct Operation *op = cls;
513   struct GNUNET_MQ_Envelope *ev;
514   struct GNUNET_SET_ResultMessage *rm;
515
516   ev = GNUNET_MQ_msg (rm,
517                       GNUNET_MESSAGE_TYPE_SET_RESULT);
518   rm->request_id = htonl (op->spec->client_request_id);
519   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
520   rm->element_type = htons (0);
521   GNUNET_MQ_send (op->spec->set->client_mq,
522                   ev);
523   _GSS_operation_destroy (op,
524                           GNUNET_YES);
525 }
526
527
528 /**
529  * Send all elements in the full result iterator.
530  *
531  * @param cls the `struct Operation *`
532  */
533 static void
534 send_remaining_elements (void *cls)
535 {
536   struct Operation *op = cls;
537   const void *nxt;
538   const struct ElementEntry *ee;
539   struct GNUNET_MQ_Envelope *ev;
540   struct GNUNET_SET_ResultMessage *rm;
541   const struct GNUNET_SET_Element *element;
542   int res;
543
544   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
545                                                      NULL,
546                                                      &nxt);
547   if (GNUNET_NO == res)
548   {
549     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550                 "Sending done and destroy because iterator ran out\n");
551     op->keep--;
552     send_client_done_and_destroy (op);
553     return;
554   }
555   ee = nxt;
556   element = &ee->element;
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               "Sending element %s:%u to client (full set)\n",
559               GNUNET_h2s (&ee->element_hash),
560               element->size);
561   GNUNET_assert (0 != op->spec->client_request_id);
562   ev = GNUNET_MQ_msg_extra (rm,
563                             element->size,
564                             GNUNET_MESSAGE_TYPE_SET_RESULT);
565   GNUNET_assert (NULL != ev);
566   rm->result_status = htons (GNUNET_SET_STATUS_OK);
567   rm->request_id = htonl (op->spec->client_request_id);
568   rm->element_type = element->element_type;
569   memcpy (&rm[1],
570           element->data,
571           element->size);
572   GNUNET_MQ_notify_sent (ev,
573                          &send_remaining_elements,
574                          op);
575   GNUNET_MQ_send (op->spec->set->client_mq,
576                   ev);
577 }
578
579
580 /**
581  * Inform the peer that this operation is complete.
582  *
583  * @param op the intersection operation to fail
584  */
585 static void
586 send_peer_done (struct Operation *op)
587 {
588   struct GNUNET_MQ_Envelope *ev;
589   struct IntersectionDoneMessage *idm;
590
591   op->state->phase = PHASE_FINISHED;
592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593               "Intersection succeeded, sending DONE\n");
594   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
595   op->state->local_bf = NULL;
596
597   ev = GNUNET_MQ_msg (idm,
598                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
599   idm->final_element_count = htonl (op->state->my_element_count);
600   idm->element_xor_hash = op->state->my_xor;
601   GNUNET_MQ_send (op->mq,
602                   ev);
603 }
604
605
606 /**
607  * Process a Bloomfilter once we got all the chunks.
608  *
609  * @param op the intersection operation
610  */
611 static void
612 process_bf (struct Operation *op)
613 {
614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
616               op->state->phase,
617               op->spec->remote_element_count,
618               op->state->my_element_count,
619               GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
620   switch (op->state->phase)
621   {
622   case PHASE_INITIAL:
623     GNUNET_break_op (0);
624     fail_intersection_operation(op);
625     return;
626   case PHASE_COUNT_SENT:
627     /* This is the first BF being sent, build our initial map with
628        filtering in place */
629     op->state->my_elements
630       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
631                                               GNUNET_YES);
632     op->state->my_element_count = 0;
633     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
634                                            &filtered_map_initialization,
635                                            op);
636     break;
637   case PHASE_BF_EXCHANGE:
638     /* Update our set by reduction */
639     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
640                                            &iterator_bf_reduce,
641                                            op);
642     break;
643   case PHASE_FINISHED:
644     GNUNET_break_op (0);
645     fail_intersection_operation(op);
646     return;
647   }
648   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
649   op->state->remote_bf = NULL;
650
651   if ( (0 == op->state->my_element_count) || /* fully disjoint */
652        ( (op->state->my_element_count == op->spec->remote_element_count) &&
653          (0 == memcmp (&op->state->my_xor,
654                        &op->state->other_xor,
655                        sizeof (struct GNUNET_HashCode))) ) )
656   {
657     /* we are done */
658     send_peer_done (op);
659     return;
660   }
661   op->state->phase = PHASE_BF_EXCHANGE;
662   send_bloomfilter (op);
663 }
664
665
666 /**
667  * Handle an BF message from a remote peer.
668  *
669  * @param cls the intersection operation
670  * @param mh the header of the message
671  */
672 static void
673 handle_p2p_bf (void *cls,
674                const struct GNUNET_MessageHeader *mh)
675 {
676   struct Operation *op = cls;
677   const struct BFMessage *msg;
678   uint32_t bf_size;
679   uint32_t chunk_size;
680   uint32_t bf_bits_per_element;
681   uint16_t msize;
682
683   msize = htons (mh->size);
684   if (msize < sizeof (struct BFMessage))
685   {
686     GNUNET_break_op (0);
687     fail_intersection_operation (op);
688     return;
689   }
690   msg = (const struct BFMessage *) mh;
691   switch (op->state->phase)
692   {
693   case PHASE_INITIAL:
694     GNUNET_break_op (0);
695     fail_intersection_operation (op);
696     break;
697   case PHASE_COUNT_SENT:
698   case PHASE_BF_EXCHANGE:
699     bf_size = ntohl (msg->bloomfilter_total_length);
700     bf_bits_per_element = ntohl (msg->bits_per_element);
701     chunk_size = msize - sizeof (struct BFMessage);
702     op->state->other_xor = msg->element_xor_hash;
703     if (bf_size == chunk_size)
704     {
705       if (NULL != op->state->bf_data)
706       {
707         GNUNET_break_op (0);
708         fail_intersection_operation (op);
709         return;
710       }
711       /* single part, done here immediately */
712       op->state->remote_bf
713         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
714                                              bf_size,
715                                              bf_bits_per_element);
716       op->state->salt = ntohl (msg->sender_mutator);
717       op->spec->remote_element_count = ntohl (msg->sender_element_count);
718       process_bf (op);
719       return;
720     }
721     /* multipart chunk */
722     if (NULL == op->state->bf_data)
723     {
724       /* first chunk, initialize */
725       op->state->bf_data = GNUNET_malloc (bf_size);
726       op->state->bf_data_size = bf_size;
727       op->state->bf_bits_per_element = bf_bits_per_element;
728       op->state->bf_data_offset = 0;
729       op->state->salt = ntohl (msg->sender_mutator);
730       op->spec->remote_element_count = ntohl (msg->sender_element_count);
731     }
732     else
733     {
734       /* increment */
735       if ( (op->state->bf_data_size != bf_size) ||
736            (op->state->bf_bits_per_element != bf_bits_per_element) ||
737            (op->state->bf_data_offset + chunk_size > bf_size) ||
738            (op->state->salt != ntohl (msg->sender_mutator)) ||
739            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
740       {
741         GNUNET_break_op (0);
742         fail_intersection_operation (op);
743         return;
744       }
745     }
746     memcpy (&op->state->bf_data[op->state->bf_data_offset],
747             (const char*) &msg[1],
748             chunk_size);
749     op->state->bf_data_offset += chunk_size;
750     if (op->state->bf_data_offset == bf_size)
751     {
752       /* last chunk, run! */
753       op->state->remote_bf
754         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
755                                              bf_size,
756                                              bf_bits_per_element);
757       GNUNET_free (op->state->bf_data);
758       op->state->bf_data = NULL;
759       op->state->bf_data_size = 0;
760       process_bf (op);
761     }
762     break;
763   default:
764     GNUNET_break_op (0);
765     fail_intersection_operation (op);
766     break;
767   }
768 }
769
770
771 /**
772  * Fills the "my_elements" hashmap with the initial set of
773  * (non-deleted) elements from the set of the specification.
774  *
775  * @param cls closure with the `struct Operation *`
776  * @param key current key code for the element
777  * @param value value in the hash map with the `struct ElementEntry *`
778  * @return #GNUNET_YES (we should continue to iterate)
779  */
780 static int
781 initialize_map_unfiltered (void *cls,
782                            const struct GNUNET_HashCode *key,
783                            void *value)
784 {
785   struct ElementEntry *ee = value;
786   struct Operation *op = cls;
787
788   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
789     return GNUNET_YES; /* element not live in operation's generation */
790   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
791                           &ee->element_hash,
792                           &op->state->my_xor);
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Initial full initialization of my_elements, adding %s:%u\n",
795               GNUNET_h2s (&ee->element_hash),
796               ee->element.size);
797   GNUNET_break (GNUNET_YES ==
798                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
799                                                    &ee->element_hash,
800                                                    ee,
801                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
802   return GNUNET_YES;
803 }
804
805
806 /**
807  * Send our element count to the peer, in case our element count is
808  * lower than his.
809  *
810  * @param op intersection operation
811  */
812 static void
813 send_element_count (struct Operation *op)
814 {
815   struct GNUNET_MQ_Envelope *ev;
816   struct IntersectionElementInfoMessage *msg;
817
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "Sending our element count (%u)\n",
820               op->state->my_element_count);
821   ev = GNUNET_MQ_msg (msg,
822                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
823   msg->sender_element_count = htonl (op->state->my_element_count);
824   GNUNET_MQ_send (op->mq, ev);
825 }
826
827
828 /**
829  * We go first, initialize our map with all elements and
830  * send the first Bloom filter.
831  *
832  * @param op operation to start exchange for
833  */
834 static void
835 begin_bf_exchange (struct Operation *op)
836 {
837   op->state->phase = PHASE_BF_EXCHANGE;
838   op->state->my_elements
839     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
840                                             GNUNET_YES);
841   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
842                                          &initialize_map_unfiltered,
843                                          op);
844   send_bloomfilter (op);
845 }
846
847
848 /**
849  * Handle the initial `struct IntersectionElementInfoMessage` from a
850  * remote peer.
851  *
852  * @param cls the intersection operation
853  * @param mh the header of the message
854  */
855 static void
856 handle_p2p_element_info (void *cls,
857                          const struct GNUNET_MessageHeader *mh)
858 {
859   struct Operation *op = cls;
860   const struct IntersectionElementInfoMessage *msg;
861
862   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
863   {
864     GNUNET_break_op (0);
865     fail_intersection_operation(op);
866     return;
867   }
868   msg = (const struct IntersectionElementInfoMessage *) mh;
869   op->spec->remote_element_count = ntohl (msg->sender_element_count);
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Received remote element count (%u), I have %u\n",
872               op->spec->remote_element_count,
873               op->state->my_element_count);
874   if ( ( (PHASE_INITIAL != op->state->phase) &&
875          (PHASE_COUNT_SENT != op->state->phase) ) ||
876        (op->state->my_element_count > op->spec->remote_element_count) ||
877        (0 == op->state->my_element_count) ||
878        (0 == op->spec->remote_element_count) )
879   {
880     GNUNET_break_op (0);
881     fail_intersection_operation(op);
882     return;
883   }
884   GNUNET_break (NULL == op->state->remote_bf);
885   begin_bf_exchange (op);
886 }
887
888
889 /**
890  * Send a result message to the client indicating that the operation
891  * is over.  After the result done message has been sent to the
892  * client, destroy the evaluate operation.
893  *
894  * @param op intersection operation
895  */
896 static void
897 finish_and_destroy (struct Operation *op)
898 {
899   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
900
901   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
902   {
903     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                 "Sending full result set (%u elements)\n",
905                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
906     op->state->full_result_iter
907       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
908     op->keep++;
909     send_remaining_elements (op);
910     return;
911   }
912   send_client_done_and_destroy (op);
913 }
914
915
916 /**
917  * Remove all elements from our hashmap.
918  *
919  * @param cls closure with the `struct Operation *`
920  * @param key current key code
921  * @param value value in the hash map
922  * @return #GNUNET_YES (we should continue to iterate)
923  */
924 static int
925 filter_all (void *cls,
926             const struct GNUNET_HashCode *key,
927             void *value)
928 {
929   struct Operation *op = cls;
930   struct ElementEntry *ee = value;
931
932   GNUNET_break (0 < op->state->my_element_count);
933   op->state->my_element_count--;
934   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
935                           &ee->element_hash,
936                           &op->state->my_xor);
937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938               "Final reduction of my_elements, removing %s:%u\n",
939               GNUNET_h2s (&ee->element_hash),
940               ee->element.size);
941   GNUNET_assert (GNUNET_YES ==
942                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
943                                                        &ee->element_hash,
944                                                        ee));
945   send_client_removed_element (op,
946                                &ee->element);
947   return GNUNET_YES;
948 }
949
950
951 /**
952  * Handle a done message from a remote peer
953  *
954  * @param cls the intersection operation
955  * @param mh the message
956  */
957 static void
958 handle_p2p_done (void *cls,
959                  const struct GNUNET_MessageHeader *mh)
960 {
961   struct Operation *op = cls;
962   const struct IntersectionDoneMessage *idm;
963
964   if (PHASE_BF_EXCHANGE != op->state->phase)
965   {
966     /* wrong phase to conclude? FIXME: Or should we allow this
967        if the other peer has _initially_ already an empty set? */
968     GNUNET_break_op (0);
969     fail_intersection_operation (op);
970     return;
971   }
972   if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
973   {
974     GNUNET_break_op (0);
975     fail_intersection_operation (op);
976     return;
977   }
978   idm = (const struct IntersectionDoneMessage *) mh;
979   if (0 == ntohl (idm->final_element_count))
980   {
981     /* other peer determined empty set is the intersection,
982        remove all elements */
983     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
984                                            &filter_all,
985                                            op);
986   }
987   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
988        (0 != memcmp (&op->state->my_xor,
989                      &idm->element_xor_hash,
990                      sizeof (struct GNUNET_HashCode))) )
991   {
992     /* Other peer thinks we are done, but we disagree on the result! */
993     GNUNET_break_op (0);
994     fail_intersection_operation (op);
995     return;
996   }
997   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998               "Got IntersectionDoneMessage, have %u elements in intersection\n",
999               op->state->my_element_count);
1000   op->state->phase = PHASE_FINISHED;
1001   finish_and_destroy (op);
1002 }
1003
1004
1005 /**
1006  * Initiate a set intersection operation with a remote peer.
1007  *
1008  * @param op operation that is created, should be initialized to
1009  *        begin the evaluation
1010  * @param opaque_context message to be transmitted to the listener
1011  *        to convince him to accept, may be NULL
1012  */
1013 static void
1014 intersection_evaluate (struct Operation *op,
1015                        const struct GNUNET_MessageHeader *opaque_context)
1016 {
1017   struct GNUNET_MQ_Envelope *ev;
1018   struct OperationRequestMessage *msg;
1019
1020   op->state = GNUNET_new (struct OperationState);
1021   /* we started the operation, thus we have to send the operation request */
1022   op->state->phase = PHASE_INITIAL;
1023   op->state->my_element_count = op->spec->set->state->current_set_element_count;
1024
1025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026               "Initiating intersection operation evaluation\n");
1027   ev = GNUNET_MQ_msg_nested_mh (msg,
1028                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1029                                 opaque_context);
1030   if (NULL == ev)
1031   {
1032     /* the context message is too large!? */
1033     GNUNET_break (0);
1034     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1035     return;
1036   }
1037   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1038   msg->app_id = op->spec->app_id;
1039   msg->element_count = htonl (op->state->my_element_count);
1040   GNUNET_MQ_send (op->mq,
1041                   ev);
1042   op->state->phase = PHASE_COUNT_SENT;
1043   if (NULL != opaque_context)
1044     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045                 "Sent op request with context message\n");
1046   else
1047     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048                 "Sent op request without context message\n");
1049 }
1050
1051
1052 /**
1053  * Accept an intersection operation request from a remote peer.  Only
1054  * initializes the private operation state.
1055  *
1056  * @param op operation that will be accepted as an intersection operation
1057  */
1058 static void
1059 intersection_accept (struct Operation *op)
1060 {
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "Accepting set intersection operation\n");
1063   op->state = GNUNET_new (struct OperationState);
1064   op->state->phase = PHASE_INITIAL;
1065   op->state->my_element_count
1066     = op->spec->set->state->current_set_element_count;
1067   op->state->my_elements
1068     = GNUNET_CONTAINER_multihashmap_create
1069     (GNUNET_MIN (op->state->my_element_count,
1070                  op->spec->remote_element_count),
1071      GNUNET_YES);
1072   if (op->spec->remote_element_count < op->state->my_element_count)
1073   {
1074     /* If the other peer (Alice) has fewer elements than us (Bob),
1075        we just send the count as Alice should send the first BF */
1076     send_element_count (op);
1077     op->state->phase = PHASE_COUNT_SENT;
1078     return;
1079   }
1080   /* We have fewer elements, so we start with the BF */
1081   begin_bf_exchange (op);
1082 }
1083
1084
1085 /**
1086  * Dispatch messages for a intersection operation.
1087  *
1088  * @param op the state of the intersection evaluate operation
1089  * @param mh the received message
1090  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1091  *         #GNUNET_OK otherwise
1092  */
1093 static int
1094 intersection_handle_p2p_message (struct Operation *op,
1095                                  const struct GNUNET_MessageHeader *mh)
1096 {
1097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098               "Received p2p message (t: %u, s: %u)\n",
1099               ntohs (mh->type), ntohs (mh->size));
1100   switch (ntohs (mh->type))
1101   {
1102     /* this message handler is not active until after we received an
1103      * operation request message, thus the ops request is not handled here
1104      */
1105   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1106     handle_p2p_element_info (op, mh);
1107     break;
1108   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1109     handle_p2p_bf (op, mh);
1110     break;
1111   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1112     handle_p2p_done (op, mh);
1113     break;
1114   default:
1115     /* something wrong with cadet's message handlers? */
1116     GNUNET_assert (0);
1117   }
1118   return GNUNET_OK;
1119 }
1120
1121
1122 /**
1123  * Handler for peer-disconnects, notifies the client about the aborted
1124  * operation.  If we did not expect anything from the other peer, we
1125  * gracefully terminate the operation.
1126  *
1127  * @param op the destroyed operation
1128  */
1129 static void
1130 intersection_peer_disconnect (struct Operation *op)
1131 {
1132   if (PHASE_FINISHED != op->state->phase)
1133   {
1134     fail_intersection_operation (op);
1135     return;
1136   }
1137   /* the session has already been concluded */
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Other peer disconnected (finished)\n");
1140   if (GNUNET_NO == op->state->client_done_sent)
1141     finish_and_destroy (op);
1142 }
1143
1144
1145 /**
1146  * Destroy the intersection operation.  Only things specific to the
1147  * intersection operation are destroyed.
1148  *
1149  * @param op intersection operation to destroy
1150  */
1151 static void
1152 intersection_op_cancel (struct Operation *op)
1153 {
1154   /* check if the op was canceled twice */
1155   GNUNET_assert (NULL != op->state);
1156   if (NULL != op->state->remote_bf)
1157   {
1158     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1159     op->state->remote_bf = NULL;
1160   }
1161   if (NULL != op->state->local_bf)
1162   {
1163     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1164     op->state->local_bf = NULL;
1165   }
1166   if (NULL != op->state->my_elements)
1167   {
1168     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1169     op->state->my_elements = NULL;
1170   }
1171   GNUNET_free (op->state);
1172   op->state = NULL;
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174               "Destroying intersection op state done\n");
1175 }
1176
1177
1178 /**
1179  * Create a new set supporting the intersection operation.
1180  *
1181  * @return the newly created set
1182  */
1183 static struct SetState *
1184 intersection_set_create ()
1185 {
1186   struct SetState *set_state;
1187
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Intersection set created\n");
1190   set_state = GNUNET_new (struct SetState);
1191   set_state->current_set_element_count = 0;
1192
1193   return set_state;
1194 }
1195
1196
1197 /**
1198  * Add the element from the given element message to the set.
1199  *
1200  * @param set_state state of the set want to add to
1201  * @param ee the element to add to the set
1202  */
1203 static void
1204 intersection_add (struct SetState *set_state,
1205                   struct ElementEntry *ee)
1206 {
1207   set_state->current_set_element_count++;
1208 }
1209
1210
1211 /**
1212  * Destroy a set that supports the intersection operation
1213  *
1214  * @param set_state the set to destroy
1215  */
1216 static void
1217 intersection_set_destroy (struct SetState *set_state)
1218 {
1219   GNUNET_free (set_state);
1220 }
1221
1222
1223 /**
1224  * Remove the element given in the element message from the set.
1225  *
1226  * @param set_state state of the set to remove from
1227  * @param element set element to remove
1228  */
1229 static void
1230 intersection_remove (struct SetState *set_state,
1231                      struct ElementEntry *element)
1232 {
1233   GNUNET_assert (0 < set_state->current_set_element_count);
1234   set_state->current_set_element_count--;
1235 }
1236
1237
1238 /**
1239  * Get the table with implementing functions for set intersection.
1240  *
1241  * @return the operation specific VTable
1242  */
1243 const struct SetVT *
1244 _GSS_intersection_vt ()
1245 {
1246   static const struct SetVT intersection_vt = {
1247     .create = &intersection_set_create,
1248     .msg_handler = &intersection_handle_p2p_message,
1249     .add = &intersection_add,
1250     .remove = &intersection_remove,
1251     .destroy_set = &intersection_set_destroy,
1252     .evaluate = &intersection_evaluate,
1253     .accept = &intersection_accept,
1254     .peer_disconnect = &intersection_peer_disconnect,
1255     .cancel = &intersection_op_cancel,
1256   };
1257
1258   return &intersection_vt;
1259 }