19d6498f5453259da9bd6ae05c779a7fe3ded706
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "gnunet_block_lib.h"
29 #include "gnunet-service-set_protocol.h"
30 #include <gcrypt.h>
31
32 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
33
34 /**
35  * Calculate the size of the bloom filter.
36  *
37  * @param A
38  * @param B
39  * @param s
40  * @param k
41  * @return
42  */
43 #define CALCULATE_BF_SIZE(A, B, s, k) \
44                           do { \
45                             k = ceil(1 + log2((double) (2*B / (double) A)));\
46                             if (k<1) k=1; /* k can be calculated as 0 */\
47                             s = ceil((double) (A * k / log(2))); \
48                           } while (0)
49
50
51 /**
52  * Current phase we are in for a intersection operation.
53  */
54 enum IntersectionOperationPhase
55 {
56   /**
57    * Alices has suggested an operation to bob,
58    * and is waiting for a bf or session end.
59    */
60   PHASE_INITIAL,
61
62   /**
63    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
64    * until one notices the their element count is equal
65    */
66   PHASE_BF_EXCHANGE,
67
68   /**
69    * if both peers have an equal peercount, they enter this state for
70    * one more turn, to see if they actually have agreed on a correct set.
71    * if a peer finds the same element count after the next iteration,
72    * it ends the the session
73    */
74   PHASE_MAYBE_FINISHED,
75
76   /**
77    * The protocol is over.
78    * Results may still have to be sent to the client.
79    */
80   PHASE_FINISHED
81 };
82
83
84 /**
85  * State of an evaluate operation with another peer.
86  */
87 struct OperationState
88 {
89   /**
90    * The bf we currently receive
91    */
92   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
93
94   /**
95    * BF of the set's element.
96    */
97   struct GNUNET_CONTAINER_BloomFilter *local_bf;
98
99   /**
100    * Remaining elements in the intersection operation.
101    * Maps element-id-hashes to 'elements in our set'.
102    */
103   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
104
105   /**
106    * Iterator for sending the final set of @e my_elements to the client.
107    */
108   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
109
110   /**
111    * Evaluate operations are held in a linked list.
112    */
113   struct OperationState *next;
114
115   /**
116    * Evaluate operations are held in a linked list.
117    */
118   struct OperationState *prev;
119
120   /**
121    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
122    */
123   char *bf_data;
124
125   /**
126    * Current element count contained within @e my_elements
127    */
128   uint32_t my_element_count;
129
130   /**
131    * size of the bloomfilter in @e bf_data.
132    */
133   uint32_t bf_data_size;
134
135   /**
136    * size of the bloomfilter
137    */
138   uint32_t bf_bits_per_element;
139
140   /**
141    * Current state of the operation.
142    */
143   enum IntersectionOperationPhase phase;
144
145   /**
146    * Generation in which the operation handle
147    * was created.
148    */
149   unsigned int generation_created;
150
151   /**
152    * Did we send the client that we are done?
153    */
154   int client_done_sent;
155 };
156
157
158 /**
159  * Extra state required for efficient set intersection.
160  * Merely tracks the total number of elements.
161  */
162 struct SetState
163 {
164   /**
165    * Number of currently valid elements in the set which have not been removed
166    */
167   uint32_t current_set_element_count;
168 };
169
170
171 /**
172  * If applicable in the current operation mode, send a result message
173  * to the client indicating we removed an element.
174  *
175  * @param op intersection operation
176  * @param element element to send
177  */
178 static void
179 send_client_removed_element (struct Operation *op,
180                              struct GNUNET_SET_Element *element)
181 {
182   struct GNUNET_MQ_Envelope *ev;
183   struct GNUNET_SET_ResultMessage *rm;
184
185   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
186     return; /* Wrong mode for transmitting removed elements */
187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
188               "Sending removed element (size %u) to client\n",
189               element->size);
190   GNUNET_assert (0 != op->spec->client_request_id);
191   ev = GNUNET_MQ_msg_extra (rm,
192                             element->size,
193                             GNUNET_MESSAGE_TYPE_SET_RESULT);
194   if (NULL == ev)
195   {
196     GNUNET_break (0);
197     return;
198   }
199   rm->result_status = htons (GNUNET_SET_STATUS_OK);
200   rm->request_id = htonl (op->spec->client_request_id);
201   rm->element_type = element->element_type;
202   memcpy (&rm[1],
203           element->data,
204           element->size);
205   GNUNET_MQ_send (op->spec->set->client_mq,
206                   ev);
207 }
208
209
210 /**
211  * Fills the "my_elements" hashmap with all relevant elements and
212  * adds their mutated hashes to our local bloomfilter with mutator+1.
213  *
214  * @param cls the `struct Operation *` we are performing
215  * @param key current key code
216  * @param value the `struct ElementEntry *` from the hash map
217  * @return #GNUNET_YES (we should continue to iterate)
218  */
219 static int
220 filtered_map_and_bf_initialization (void *cls,
221                                     const struct GNUNET_HashCode *key,
222                                     void *value)
223 {
224   struct Operation *op = cls;
225   struct ElementEntry *ee = value;
226   struct GNUNET_HashCode mutated_hash;
227
228   if ( (op->generation_created < ee->generation_removed) &&
229        (op->generation_created >= ee->generation_added) )
230     return GNUNET_YES; /* element not valid in our operation's generation */
231
232   /* Test if element is in Bob's bloomfilter */
233   // FIXME: where does this salt come from!?
234   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
235                             op->spec->salt,
236                             &mutated_hash);
237   if (GNUNET_NO ==
238       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
239                                          &mutated_hash))
240   {
241     /* remove this element */
242     send_client_removed_element (op,
243                                  &ee->element);
244     return GNUNET_YES;
245   }
246   op->state->my_element_count++;
247   GNUNET_break (GNUNET_YES ==
248                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
249                                                    &ee->element_hash,
250                                                    ee,
251                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
252
253   return GNUNET_YES;
254 }
255
256
257 /**
258  * Removes elements from our hashmap if they are not contained within the
259  * provided remote bloomfilter.
260  *
261  * @param cls closure with the `struct Operation *`
262  * @param key current key code
263  * @param value value in the hash map
264  * @return #GNUNET_YES (we should continue to iterate)
265  */
266 static int
267 iterator_bf_reduce (void *cls,
268                    const struct GNUNET_HashCode *key,
269                    void *value)
270 {
271   struct Operation *op = cls;
272   struct ElementEntry *ee = value;
273   struct GNUNET_HashCode mutated_hash;
274
275   // FIXME: where does this salt come from!?
276   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
277                             op->spec->salt,
278                             &mutated_hash);
279   if (GNUNET_NO ==
280       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
281                                          &mutated_hash))
282   {
283     op->state->my_element_count--;
284     GNUNET_assert (GNUNET_YES ==
285                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
286                                                          &ee->element_hash,
287                                                          ee));
288     send_client_removed_element (op,
289                                  &ee->element);
290   }
291   return GNUNET_YES;
292 }
293
294
295 /**
296  * Create initial bloomfilter based on all the elements given.
297  *
298  * @param cls the `struct Operation *`
299  * @param key current key code
300  * @param value the `struct ElementEntry` to process
301  * @return #GNUNET_YES (we should continue to iterate)
302  */
303 static int
304 iterator_bf_create (void *cls,
305                     const struct GNUNET_HashCode *key,
306                     void *value)
307 {
308   struct Operation *op = cls;
309   struct ElementEntry *ee = value;
310   struct GNUNET_HashCode mutated_hash;
311
312   // FIXME: where does this salt come from!?
313   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
314                             op->spec->salt,
315                             &mutated_hash);
316   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
317                                     &mutated_hash);
318   return GNUNET_YES;
319 }
320
321
322 /**
323  * Inform the client that the intersection operation has failed,
324  * and proceed to destroy the evaluate operation.
325  *
326  * @param op the intersection operation to fail
327  */
328 static void
329 fail_intersection_operation (struct Operation *op)
330 {
331   struct GNUNET_MQ_Envelope *ev;
332   struct GNUNET_SET_ResultMessage *msg;
333
334   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
335               "Intersection operation failed\n");
336   if (NULL != op->state->my_elements)
337   {
338     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
339     op->state->my_elements = NULL;
340   }
341   ev = GNUNET_MQ_msg (msg,
342                       GNUNET_MESSAGE_TYPE_SET_RESULT);
343   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
344   msg->request_id = htonl (op->spec->client_request_id);
345   msg->element_type = htons (0);
346   GNUNET_MQ_send (op->spec->set->client_mq,
347                   ev);
348   _GSS_operation_destroy (op,
349                           GNUNET_YES);
350 }
351
352
353
354
355
356
357
358 /**
359  *
360  * @param op
361  * @param offset
362  */
363 static void
364 send_bloomfilter_multipart (struct Operation *op,
365                             uint32_t offset)
366 {
367   struct GNUNET_MQ_Envelope *ev;
368   struct BFPart *msg;
369   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
370   uint32_t todo_size = op->state->bf_data_size - offset;
371
372   if (todo_size < chunk_size)
373     chunk_size = todo_size;
374
375   ev = GNUNET_MQ_msg_extra (msg,
376                             chunk_size,
377                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
378
379   msg->chunk_length = htonl (chunk_size);
380   msg->chunk_offset = htonl (offset);
381   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
382
383   GNUNET_MQ_send (op->mq, ev);
384
385   if (op->state->bf_data_size == offset + chunk_size)
386   {
387     // done
388     GNUNET_free(op->state->bf_data);
389     op->state->bf_data = NULL;
390     return;
391   }
392   send_bloomfilter_multipart (op, offset + chunk_size);
393 }
394
395
396 /**
397  * Send a bloomfilter to our peer.  After the result done message has
398  * been sent to the client, destroy the evaluate operation.
399  *
400  * @param op intersection operation
401  */
402 static void
403 send_bloomfilter (struct Operation *op)
404 {
405   struct GNUNET_MQ_Envelope *ev;
406   struct BFMessage *msg;
407   uint32_t bf_size;
408   uint32_t bf_elementbits;
409   uint32_t chunk_size;
410   struct GNUNET_CONTAINER_BloomFilter *local_bf;
411
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "sending bf of size %u\n");
414
415   CALCULATE_BF_SIZE(op->state->my_element_count,
416                     op->spec->remote_element_count,
417                     bf_size,
418                     bf_elementbits);
419
420   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
421                                                 bf_size,
422                                                 bf_elementbits);
423
424   op->spec->salt++;
425   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
426                                          &iterator_bf_create,
427                                          op);
428
429   // send our bloomfilter
430   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage))
431   {
432     // singlepart
433     chunk_size = bf_size;
434     ev = GNUNET_MQ_msg_extra (msg,
435                               chunk_size,
436                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
437     GNUNET_assert (GNUNET_SYSERR !=
438                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
439                                                               (char*)&msg[1],
440                                                               bf_size));
441   }
442   else
443   {
444     //multipart
445     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
446     ev = GNUNET_MQ_msg_extra (msg,
447                               chunk_size,
448                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
449     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
450     GNUNET_assert (GNUNET_SYSERR !=
451                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
452                                                               op->state->bf_data,
453                                                               bf_size));
454     memcpy (&msg[1], op->state->bf_data, chunk_size);
455     op->state->bf_data_size = bf_size;
456   }
457   GNUNET_CONTAINER_bloomfilter_free (local_bf);
458
459   msg->sender_element_count = htonl (op->state->my_element_count);
460   msg->bloomfilter_total_length = htonl (bf_size);
461   msg->bloomfilter_length = htonl (chunk_size);
462   msg->bits_per_element = htonl (bf_elementbits);
463   msg->sender_mutator = htonl (op->spec->salt);
464
465   GNUNET_MQ_send (op->mq, ev);
466
467   if (op->state->bf_data)
468     send_bloomfilter_multipart (op, chunk_size);
469 }
470
471
472 /**
473  * Signal to the client that the operation has finished and
474  * destroy the operation.
475  *
476  * @param cls operation to destroy
477  */
478 static void
479 send_client_done_and_destroy (void *cls)
480 {
481   struct Operation *op = cls;
482   struct GNUNET_MQ_Envelope *ev;
483   struct GNUNET_SET_ResultMessage *rm;
484
485   ev = GNUNET_MQ_msg (rm,
486                       GNUNET_MESSAGE_TYPE_SET_RESULT);
487   rm->request_id = htonl (op->spec->client_request_id);
488   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
489   rm->element_type = htons (0);
490   GNUNET_MQ_send (op->spec->set->client_mq,
491                   ev);
492   _GSS_operation_destroy (op,
493                           GNUNET_YES);
494 }
495
496
497 /**
498  * Send all elements in the full result iterator.
499  *
500  * @param cls the `struct Operation *`
501  */
502 static void
503 send_remaining_elements (void *cls)
504 {
505   struct Operation *op = cls;
506   const void *nxt;
507   const struct ElementEntry *ee;
508   struct GNUNET_MQ_Envelope *ev;
509   struct GNUNET_SET_ResultMessage *rm;
510   const struct GNUNET_SET_Element *element;
511   int res;
512
513   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
514                                                      NULL,
515                                                      &nxt);
516   if (GNUNET_NO == res)
517   {
518     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519                 "Sending done and destroy because iterator ran out\n");
520     send_client_done_and_destroy (op);
521     return;
522   }
523   ee = nxt;
524   element = &ee->element;
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
526               "Sending element (size %u) to client (full set)\n",
527               element->size);
528   GNUNET_assert (0 != op->spec->client_request_id);
529   ev = GNUNET_MQ_msg_extra (rm,
530                             element->size,
531                             GNUNET_MESSAGE_TYPE_SET_RESULT);
532   GNUNET_assert (NULL != ev);
533   rm->result_status = htons (GNUNET_SET_STATUS_OK);
534   rm->request_id = htonl (op->spec->client_request_id);
535   rm->element_type = element->element_type;
536   memcpy (&rm[1],
537           element->data,
538           element->size);
539   GNUNET_MQ_notify_sent (ev,
540                          &send_remaining_elements,
541                          op);
542   GNUNET_MQ_send (op->spec->set->client_mq,
543                   ev);
544 }
545
546
547 /**
548  * Inform the peer that this operation is complete.
549  *
550  * @param op the intersection operation to fail
551  */
552 static void
553 send_peer_done (struct Operation *op)
554 {
555   struct GNUNET_MQ_Envelope *ev;
556
557   op->state->phase = PHASE_FINISHED;
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Intersection succeeded, sending DONE\n");
560   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
561   op->state->local_bf = NULL;
562
563   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
564   GNUNET_MQ_send (op->mq, ev);
565 }
566
567
568 /**
569  * Process a Bloomfilter once we got all the chunks.
570  *
571  * @param op the intersection operation
572  */
573 static void
574 process_bf (struct Operation *op)
575 {
576   uint32_t old_elements;
577   uint32_t peer_elements;
578
579   old_elements = op->state->my_element_count;
580   peer_elements = op->spec->remote_element_count;
581   switch (op->state->phase)
582   {
583   case PHASE_INITIAL:
584     /* This is the first BF being sent, build our
585        initial map with filtering in place */
586     op->state->my_elements
587       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
588                                               GNUNET_YES);
589     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
590                                            &filtered_map_and_bf_initialization,
591                                            op);
592     break;
593   case PHASE_BF_EXCHANGE:
594   case PHASE_MAYBE_FINISHED:
595     /* Update our set by reduction */
596     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
597                                            &iterator_bf_reduce,
598                                            op);
599     break;
600   default:
601     GNUNET_break_op (0);
602     fail_intersection_operation(op);
603   }
604   // the iterators created a new BF with salt+1
605   // the peer needs this information for decoding the next BF
606   // this behavior can be modified at will later on.
607   op->spec->salt++;
608
609   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
610   op->state->remote_bf = NULL;
611
612   if ((0 == op->state->my_element_count) // fully disjoint
613       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
614           && (old_elements == op->state->my_element_count)
615           && (op->state->my_element_count == peer_elements)))
616   {
617     // In the last round we though we were finished, we now know this is correct
618     send_peer_done (op);
619     return;
620   }
621
622   op->state->phase = PHASE_BF_EXCHANGE;
623   if (op->state->my_element_count == peer_elements)
624     // maybe we are finished, but we do one more round to make certain
625     // we don't have false positives ...
626     op->state->phase = PHASE_MAYBE_FINISHED;
627
628   send_bloomfilter (op);
629 }
630
631
632 /**
633  * Handle an BF multipart message from a remote peer.
634  *
635  * @param cls the intersection operation
636  * @param mh the header of the message
637  */
638 static void
639 handle_p2p_bf_part (void *cls,
640                     const struct GNUNET_MessageHeader *mh)
641 {
642   struct Operation *op = cls;
643   const struct BFPart *msg = (const struct BFPart *) mh;
644   uint32_t chunk_size;
645   uint32_t chunk_offset;
646
647   chunk_size = ntohl(msg->chunk_length);
648   chunk_offset = ntohl(msg->chunk_offset);
649
650   if ((NULL == op->state->bf_data)
651        || (op->state->bf_data_size < chunk_size + chunk_offset))
652   {
653     // unexpected multipart chunk
654     GNUNET_break_op (0);
655     fail_intersection_operation(op);
656     return;
657   }
658
659   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
660
661   if (op->state->bf_data_size != chunk_offset + chunk_size)
662     // wait for next chunk
663     return;
664
665   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
666                                                             op->state->bf_data_size,
667                                                             op->state->bf_bits_per_element);
668
669   GNUNET_free (op->state->bf_data);
670   op->state->bf_data = NULL;
671
672   process_bf (op);
673 }
674
675
676 /**
677  * Handle an BF message from a remote peer.
678  *
679  * @param cls the intersection operation
680  * @param mh the header of the message
681  */
682 static void
683 handle_p2p_bf (void *cls,
684                const struct GNUNET_MessageHeader *mh)
685 {
686   struct Operation *op = cls;
687   const struct BFMessage *msg = (const struct BFMessage *) mh;
688   uint32_t bf_size;
689   uint32_t chunk_size;
690   uint32_t bf_bits_per_element;
691
692   switch (op->state->phase)
693   {
694   case PHASE_INITIAL:
695   case PHASE_BF_EXCHANGE:
696   case PHASE_MAYBE_FINISHED:
697     if (NULL == op->state->bf_data)
698     {
699       // no colliding multipart transaction going on currently
700       op->spec->salt = ntohl (msg->sender_mutator);
701       bf_size = ntohl (msg->bloomfilter_total_length);
702       bf_bits_per_element = ntohl (msg->bits_per_element);
703       chunk_size = ntohl (msg->bloomfilter_length);
704       op->spec->remote_element_count = ntohl(msg->sender_element_count);
705       if (bf_size == chunk_size)
706       {
707         // single part, done here
708         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
709                                                                   bf_size,
710                                                                   bf_bits_per_element);
711         process_bf (op);
712         return;
713       }
714
715       //first multipart chunk
716       op->state->bf_data = GNUNET_malloc (bf_size);
717       op->state->bf_data_size = bf_size;
718       op->state->bf_bits_per_element = bf_bits_per_element;
719       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
720       return;
721     }
722   default:
723     GNUNET_break_op (0);
724     fail_intersection_operation (op);
725   }
726 }
727
728
729 /**
730  * Fills the "my_elements" hashmap with the initial set of
731  * (non-deleted) elements from the set of the specification.
732  *
733  * @param cls closure with the `struct Operation *`
734  * @param key current key code for the element
735  * @param value value in the hash map with the `struct ElementEntry *`
736  * @return #GNUNET_YES (we should continue to iterate)
737  */
738 static int
739 initialize_map (void *cls,
740                 const struct GNUNET_HashCode *key,
741                 void *value)
742 {
743   struct ElementEntry *ee = value;
744   struct Operation *op = cls;
745
746   if ( (op->generation_created < ee->generation_removed) &&
747        (op->generation_created >= ee->generation_added) )
748     return GNUNET_YES; /* element not live in operation's generation */
749   GNUNET_break (GNUNET_YES ==
750                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
751                                                    &ee->element_hash,
752                                                    ee,
753                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
754   return GNUNET_YES;
755 }
756
757
758 /**
759  * Handle the initial `struct IntersectionElementInfoMessage` from a
760  * remote peer.
761  *
762  * @param cls the intersection operation
763  * @param mh the header of the message
764  */
765 static void
766 handle_p2p_element_info (void *cls,
767                          const struct GNUNET_MessageHeader *mh)
768 {
769   struct Operation *op = cls;
770   const struct IntersectionElementInfoMessage *msg;
771
772   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
773   {
774     GNUNET_break_op (0);
775     fail_intersection_operation(op);
776     return;
777   }
778   msg = (const struct IntersectionElementInfoMessage *) mh;
779   op->spec->remote_element_count = ntohl (msg->sender_element_count);
780   if ( (PHASE_INITIAL != op->state->phase) ||
781        (op->state->my_element_count > op->spec->remote_element_count) ||
782        (0 == op->state->my_element_count) ||
783        (0 == op->spec->remote_element_count) )
784   {
785     GNUNET_break_op (0);
786     fail_intersection_operation(op);
787     return;
788   }
789
790   op->state->phase = PHASE_BF_EXCHANGE;
791   // FIXME... -- why a new map here!?
792   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1,
793                                                                  GNUNET_YES);
794   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
795                                          &initialize_map, // FIXME: filtering!?
796                                          op);
797   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
798   op->state->remote_bf = NULL;
799
800   if (op->state->my_element_count == ntohl (msg->sender_element_count))
801     op->state->phase = PHASE_MAYBE_FINISHED;
802
803   send_bloomfilter (op);
804 }
805
806
807 /**
808  * Send our element count to the peer, in case our element count is lower than his
809  *
810  * @param op intersection operation
811  */
812 static void
813 send_element_count (struct Operation *op)
814 {
815   struct GNUNET_MQ_Envelope *ev;
816   struct IntersectionElementInfoMessage *msg;
817
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "Sending our element count (bf_msg)\n");
820   ev = GNUNET_MQ_msg (msg,
821                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
822   msg->sender_element_count = htonl (op->state->my_element_count);
823   GNUNET_MQ_send (op->mq, ev);
824 }
825
826
827 /**
828  * Send a result message to the client indicating that the operation
829  * is over.  After the result done message has been sent to the
830  * client, destroy the evaluate operation.
831  *
832  * @param op intersection operation
833  */
834 static void
835 finish_and_destroy (struct Operation *op)
836 {
837   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
838
839   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
840   {
841     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842                 "Sending full result set\n");
843     op->state->full_result_iter
844       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
845     send_remaining_elements (op);
846     return;
847   }
848   send_client_done_and_destroy (op);
849 }
850
851
852 /**
853  * Handle a done message from a remote peer
854  *
855  * @param cls the intersection operation
856  * @param mh the message
857  */
858 static void
859 handle_p2p_done (void *cls,
860                  const struct GNUNET_MessageHeader *mh)
861 {
862   struct Operation *op = cls;
863
864   if ( (op->state->phase = PHASE_FINISHED) ||
865        (op->state->phase = PHASE_MAYBE_FINISHED) )
866   {
867     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                 "Got final DONE\n");
869     finish_and_destroy (op);
870     return;
871   }
872   GNUNET_break_op (0);
873   fail_intersection_operation (op);
874 }
875
876
877 /**
878  * Initiate a set intersection operation with a remote peer.
879  *
880  * @param op operation that is created, should be initialized to
881  *        begin the evaluation
882  * @param opaque_context message to be transmitted to the listener
883  *        to convince him to accept, may be NULL
884  */
885 static void
886 intersection_evaluate (struct Operation *op,
887                        const struct GNUNET_MessageHeader *opaque_context)
888 {
889   struct GNUNET_MQ_Envelope *ev;
890   struct OperationRequestMessage *msg;
891
892   op->state = GNUNET_new (struct OperationState);
893   /* we started the operation, thus we have to send the operation request */
894   op->state->phase = PHASE_INITIAL;
895   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
896   op->state->my_element_count = op->spec->set->state->current_set_element_count;
897
898   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
899               "Initiating intersection operation evaluation");
900   ev = GNUNET_MQ_msg_nested_mh (msg,
901                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
902                                 opaque_context);
903   if (NULL == ev)
904   {
905     /* the context message is too large!? */
906     GNUNET_break (0);
907     GNUNET_SERVER_client_disconnect (op->spec->set->client);
908     return;
909   }
910   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
911   msg->app_id = op->spec->app_id;
912   // FIXME: where does this 'salt' come from?
913   msg->salt = htonl (op->spec->salt);
914   msg->element_count = htonl (op->state->my_element_count);
915   GNUNET_MQ_send (op->mq,
916                   ev);
917   if (NULL != opaque_context)
918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                 "Sent op request with context message\n");
920   else
921     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922                 "Sent op request without context message\n");
923 }
924
925
926 /**
927  * Accept an intersection operation request from a remote peer.  Only
928  * initializes the private operation state.
929  *
930  * @param op operation that will be accepted as an intersection operation
931  */
932 static void
933 intersection_accept (struct Operation *op)
934 {
935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936               "Accepting set intersection operation\n");
937   op->state = GNUNET_new (struct OperationState);
938   op->state->my_element_count
939     = op->spec->set->state->current_set_element_count;
940   op->state->my_elements
941     = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count,
942                                                         op->spec->remote_element_count),
943                                             GNUNET_YES);
944   if (op->spec->remote_element_count < op->state->my_element_count)
945   {
946     /* If the other peer (Alice) has fewer elements than us (Bob),
947        we just send the count as Alice should send the first BF */
948     op->state->phase = PHASE_INITIAL;
949     send_element_count (op);
950     return;
951   }
952   /* We have fewer elements, so we start with the BF */
953   op->state->phase = PHASE_BF_EXCHANGE;
954   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
955                                                            BLOOMFILTER_SIZE,
956                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
957   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
958                                          &initialize_map,
959                                          op);
960   send_bloomfilter (op);
961 }
962
963
964 /**
965  * Dispatch messages for a intersection operation.
966  *
967  * @param op the state of the intersection evaluate operation
968  * @param mh the received message
969  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
970  *         #GNUNET_OK otherwise
971  */
972 static int
973 intersection_handle_p2p_message (struct Operation *op,
974                                  const struct GNUNET_MessageHeader *mh)
975 {
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "Received p2p message (t: %u, s: %u)\n",
978               ntohs (mh->type), ntohs (mh->size));
979   switch (ntohs (mh->type))
980   {
981     /* this message handler is not active until after we received an
982      * operation request message, thus the ops request is not handled here
983      */
984   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
985     handle_p2p_element_info (op, mh);
986     break;
987   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
988     handle_p2p_bf (op, mh);
989     break;
990   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
991     handle_p2p_bf_part (op, mh);
992     break;
993   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
994     handle_p2p_done (op, mh);
995     break;
996   default:
997     /* something wrong with cadet's message handlers? */
998     GNUNET_assert (0);
999   }
1000   return GNUNET_OK;
1001 }
1002
1003
1004 /**
1005  * Handler for peer-disconnects, notifies the client about the aborted
1006  * operation.  If we did not expect anything from the other peer, we
1007  * gracefully terminate the operation.
1008  *
1009  * @param op the destroyed operation
1010  */
1011 static void
1012 intersection_peer_disconnect (struct Operation *op)
1013 {
1014   if (PHASE_FINISHED != op->state->phase)
1015   {
1016     fail_intersection_operation (op);
1017     return;
1018   }
1019   /* the session has already been concluded */
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1021               "Other peer disconnected (finished)\n");
1022   if (GNUNET_NO == op->state->client_done_sent)
1023     finish_and_destroy (op);
1024 }
1025
1026
1027 /**
1028  * Destroy the intersection operation.  Only things specific to the
1029  * intersection operation are destroyed.
1030  *
1031  * @param op intersection operation to destroy
1032  */
1033 static void
1034 intersection_op_cancel (struct Operation *op)
1035 {
1036   /* check if the op was canceled twice */
1037   GNUNET_assert (NULL != op->state);
1038   if (NULL != op->state->remote_bf)
1039   {
1040     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1041     op->state->remote_bf = NULL;
1042   }
1043   if (NULL != op->state->local_bf)
1044   {
1045     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1046     op->state->local_bf = NULL;
1047   }
1048   if (NULL != op->state->my_elements)
1049   {
1050     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1051     op->state->my_elements = NULL;
1052   }
1053   GNUNET_free (op->state);
1054   op->state = NULL;
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Destroying intersection op state done\n");
1057 }
1058
1059
1060 /**
1061  * Create a new set supporting the intersection operation.
1062  *
1063  * @return the newly created set
1064  */
1065 static struct SetState *
1066 intersection_set_create ()
1067 {
1068   struct SetState *set_state;
1069
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "Intersection set created\n");
1072   set_state = GNUNET_new (struct SetState);
1073   set_state->current_set_element_count = 0;
1074
1075   return set_state;
1076 }
1077
1078
1079 /**
1080  * Add the element from the given element message to the set.
1081  *
1082  * @param set_state state of the set want to add to
1083  * @param ee the element to add to the set
1084  */
1085 static void
1086 intersection_add (struct SetState *set_state,
1087                   struct ElementEntry *ee)
1088 {
1089   set_state->current_set_element_count++;
1090 }
1091
1092
1093 /**
1094  * Destroy a set that supports the intersection operation
1095  *
1096  * @param set_state the set to destroy
1097  */
1098 static void
1099 intersection_set_destroy (struct SetState *set_state)
1100 {
1101   GNUNET_free (set_state);
1102 }
1103
1104
1105 /**
1106  * Remove the element given in the element message from the set.
1107  *
1108  * @param set_state state of the set to remove from
1109  * @param element set element to remove
1110  */
1111 static void
1112 intersection_remove (struct SetState *set_state,
1113                      struct ElementEntry *element)
1114 {
1115   GNUNET_assert (0 < set_state->current_set_element_count);
1116   set_state->current_set_element_count--;
1117 }
1118
1119
1120 /**
1121  * Get the table with implementing functions for set intersection.
1122  *
1123  * @return the operation specific VTable
1124  */
1125 const struct SetVT *
1126 _GSS_intersection_vt ()
1127 {
1128   static const struct SetVT intersection_vt = {
1129     .create = &intersection_set_create,
1130     .msg_handler = &intersection_handle_p2p_message,
1131     .add = &intersection_add,
1132     .remove = &intersection_remove,
1133     .destroy_set = &intersection_set_destroy,
1134     .evaluate = &intersection_evaluate,
1135     .accept = &intersection_accept,
1136     .peer_disconnect = &intersection_peer_disconnect,
1137     .cancel = &intersection_op_cancel,
1138   };
1139
1140   return &intersection_vt;
1141 }