749f2e98d7f0a757359b83738b5836f8a6035e26
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "gnunet_block_lib.h"
29 #include "gnunet-service-set_protocol.h"
30 #include <gcrypt.h>
31
32 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
33
34 #define CALCULATE_BF_SIZE(A, B, s, k) \
35                           do { \
36                             k = ceil(1 + log2((double) (2*B / (double) A)));\
37                             if (k<1) k=1; /* k can be calculated as 0 */\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation with another peer.
73  */
74 struct OperationState
75 {
76   /**
77    * The bf we currently receive
78    */
79   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
80
81   /**
82    * BF of the set's element.
83    */
84   struct GNUNET_CONTAINER_BloomFilter *local_bf;
85
86   /**
87    * Iterator for sending elements on the key to element mapping to the client.
88    */
89   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
90
91   /**
92    * Evaluate operations are held in a linked list.
93    */
94   struct OperationState *next;
95
96   /**
97    * Evaluate operations are held in a linked list.
98    */
99   struct OperationState *prev;
100
101   /**
102    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
103    */
104   char *bf_data;
105
106   /**
107    * Maps element-id-hashes to 'elements in our set'.
108    */
109   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
110
111   /**
112    * Current element count contained within @e my_elements
113    */
114   uint32_t my_element_count;
115
116   /**
117    * size of the bloomfilter in @e bf_data.
118    */
119   uint32_t bf_data_size;
120
121   /**
122    * size of the bloomfilter
123    */
124   uint32_t bf_bits_per_element;
125
126   /**
127    * Current state of the operation.
128    */
129   enum IntersectionOperationPhase phase;
130
131   /**
132    * Generation in which the operation handle
133    * was created.
134    */
135   unsigned int generation_created;
136
137   /**
138    * Did we send the client that we are done?
139    */
140   int client_done_sent;
141 };
142
143
144 /**
145  * Extra state required for efficient set intersection.
146  */
147 struct SetState
148 {
149   /**
150    * Number of currently valid elements in the set which have not been removed
151    */
152   uint32_t current_set_element_count;
153 };
154
155
156 /**
157  * Send a result message to the client indicating
158  * we removed an element
159  *
160  * @param op union operation
161  * @param element element to send
162  */
163 static void
164 send_client_element (struct Operation *op,
165                      struct GNUNET_SET_Element *element)
166 {
167   struct GNUNET_MQ_Envelope *ev;
168   struct GNUNET_SET_ResultMessage *rm;
169
170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171               "sending removed element (size %u) to client\n",
172               element->size);
173   GNUNET_assert (0 != op->spec->client_request_id);
174   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
175   if (NULL == ev)
176   {
177     GNUNET_MQ_discard (ev);
178     GNUNET_break (0);
179     return;
180   }
181   rm->result_status = htons (GNUNET_SET_STATUS_OK);
182   rm->request_id = htonl (op->spec->client_request_id);
183   rm->element_type = element->element_type;
184   memcpy (&rm[1], element->data, element->size);
185   GNUNET_MQ_send (op->spec->set->client_mq, ev);
186 }
187
188
189 /**
190  * Alice's version:
191  *
192  * fills the contained-elements hashmap with all relevant
193  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
194  *
195  * @param cls closure
196  * @param key current key code
197  * @param value value in the hash map
198  * @return #GNUNET_YES if we should continue to
199  *         iterate,
200  *         #GNUNET_NO if not.
201  */
202 static int
203 iterator_initialization_by_alice (void *cls,
204                                   const struct GNUNET_HashCode *key,
205                                   void *value)
206 {
207   struct ElementEntry *ee = value;
208   struct Operation *op = cls;
209   struct GNUNET_HashCode mutated_hash;
210
211   //only consider this element, if it is valid for us
212   if ((op->generation_created < ee->generation_removed)
213        && (op->generation_created >= ee->generation_added))
214     return GNUNET_YES;
215
216   // not contained according to bob's bloomfilter
217   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
218                            op->spec->salt,
219                            &mutated_hash);
220   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
221                                                       &mutated_hash)){
222     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
223       send_client_element (op, &ee->element);
224     return GNUNET_YES;
225   }
226
227   op->state->my_element_count++;
228   GNUNET_assert (GNUNET_YES ==
229                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
230                                                     &ee->element_hash, ee,
231                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
232
233   return GNUNET_YES;
234 }
235
236
237 /**
238  * fills the contained-elements hashmap with all relevant
239  * elements and adds their mutated hashes to our local bloomfilter
240  *
241  * @param cls closure
242  * @param key current key code
243  * @param value value in the hash map
244  * @return #GNUNET_YES if we should continue to
245  *         iterate,
246  *         #GNUNET_NO if not.
247  */
248 static int
249 iterator_initialization (void *cls,
250                          const struct GNUNET_HashCode *key,
251                          void *value)
252 {
253   struct ElementEntry *ee = value;
254   struct Operation *op = cls;
255
256   //only consider this element, if it is valid for us
257   if ((op->generation_created < ee->generation_removed)
258        && (op->generation_created >= ee->generation_added))
259     return GNUNET_YES;
260
261   GNUNET_assert (GNUNET_YES ==
262                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
263                                                     &ee->element_hash, ee,
264                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
265   return GNUNET_YES;
266 }
267
268
269 /**
270  * removes element from a hashmap if it is not contained within the
271  * provided remote bloomfilter. Then, fill our new bloomfilter.
272  *
273  * @param cls closure
274  * @param key current key code
275  * @param value value in the hash map
276  * @return #GNUNET_YES if we should continue to
277  *         iterate,
278  *         #GNUNET_NO if not.
279  */
280 static int
281 iterator_bf_reduce (void *cls,
282                    const struct GNUNET_HashCode *key,
283                    void *value)
284 {
285   struct ElementEntry *ee = value;
286   struct Operation *op = cls;
287   struct GNUNET_HashCode mutated_hash;
288
289   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
290
291   if (GNUNET_NO ==
292       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
293                                          &mutated_hash))
294   {
295     op->state->my_element_count--;
296     GNUNET_assert (GNUNET_YES ==
297                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
298                                                          &ee->element_hash,
299                                                          ee));
300     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
301       send_client_element (op, &ee->element);
302   }
303
304   return GNUNET_YES;
305 }
306
307
308 /**
309  * Create a bloomfilter based on the elements given
310  *
311  * @param cls closure
312  * @param key current key code
313  * @param value value in the hash map
314  * @return #GNUNET_YES if we should continue to
315  *         iterate,
316  *         #GNUNET_NO if not.
317  */
318 static int
319 iterator_bf_create (void *cls,
320                     const struct GNUNET_HashCode *key,
321                     void *value)
322 {
323   struct ElementEntry *ee = value;
324   struct Operation *op = cls;
325   struct GNUNET_HashCode mutated_hash;
326
327   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
328                             op->spec->salt,
329                             &mutated_hash);
330   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
331                                     &mutated_hash);
332   return GNUNET_YES;
333 }
334
335
336 /**
337  * Inform the client that the union operation has failed,
338  * and proceed to destroy the evaluate operation.
339  *
340  * @param op the intersection operation to fail
341  */
342 static void
343 fail_intersection_operation (struct Operation *op)
344 {
345   struct GNUNET_MQ_Envelope *ev;
346   struct GNUNET_SET_ResultMessage *msg;
347
348   if (op->state->my_elements)
349   {
350     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
351     op->state->my_elements = NULL;
352   }
353   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
354               "intersection operation failed\n");
355
356   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
357   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
358   msg->request_id = htonl (op->spec->client_request_id);
359   msg->element_type = htons (0);
360   GNUNET_MQ_send (op->spec->set->client_mq, ev);
361   _GSS_operation_destroy (op, GNUNET_YES);
362 }
363
364
365 static void
366 send_bloomfilter_multipart (struct Operation *op,
367                             uint32_t offset)
368 {
369   struct GNUNET_MQ_Envelope *ev;
370   struct BFPart *msg;
371   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
372   uint32_t todo_size = op->state->bf_data_size - offset;
373
374   if (todo_size < chunk_size)
375     chunk_size = todo_size;
376
377   ev = GNUNET_MQ_msg_extra (msg,
378                             chunk_size,
379                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
380
381   msg->chunk_length = htonl (chunk_size);
382   msg->chunk_offset = htonl (offset);
383   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
384
385   GNUNET_MQ_send (op->mq, ev);
386
387   if (op->state->bf_data_size == offset + chunk_size)
388   {
389     // done
390     GNUNET_free(op->state->bf_data);
391     op->state->bf_data = NULL;
392     return;
393   }
394   send_bloomfilter_multipart (op, offset + chunk_size);
395 }
396
397
398 /**
399  * Send a bloomfilter to our peer.
400  * that the operation is over.
401  * After the result done message has been sent to the client,
402  * destroy the evaluate operation.
403  *
404  * @param op intersection operation
405  */
406 static void
407 send_bloomfilter (struct Operation *op)
408 {
409   struct GNUNET_MQ_Envelope *ev;
410   struct BFMessage *msg;
411   uint32_t bf_size;
412   uint32_t bf_elementbits;
413   uint32_t chunk_size;
414   struct GNUNET_CONTAINER_BloomFilter * local_bf;
415
416   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417               "sending bf of size %u\n");
418
419   CALCULATE_BF_SIZE(op->state->my_element_count,
420                     op->spec->remote_element_count,
421                     bf_size,
422                     bf_elementbits);
423
424   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
425                                                 bf_size,
426                                                 bf_elementbits);
427
428   op->spec->salt++;
429   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
430                                          &iterator_bf_create,
431                                          op);
432
433   // send our bloomfilter
434   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage))
435   {
436     // singlepart
437     chunk_size = bf_size;
438     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
439     GNUNET_assert (GNUNET_SYSERR !=
440                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
441                                                               (char*)&msg[1],
442                                                               bf_size));
443   }
444   else
445   {
446     //multipart
447     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
448     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
449     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
450     GNUNET_assert (GNUNET_SYSERR !=
451                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
452                                                               op->state->bf_data,
453                                                               bf_size));
454     memcpy (&msg[1], op->state->bf_data, chunk_size);
455     op->state->bf_data_size = bf_size;
456   }
457   GNUNET_CONTAINER_bloomfilter_free (local_bf);
458
459   msg->sender_element_count = htonl (op->state->my_element_count);
460   msg->bloomfilter_total_length = htonl (bf_size);
461   msg->bloomfilter_length = htonl (chunk_size);
462   msg->bits_per_element = htonl (bf_elementbits);
463   msg->sender_mutator = htonl (op->spec->salt);
464
465   GNUNET_MQ_send (op->mq, ev);
466
467   if (op->state->bf_data)
468     send_bloomfilter_multipart (op, chunk_size);
469 }
470
471
472 /**
473  * Signal to the client that the operation has finished and
474  * destroy the operation.
475  *
476  * @param cls operation to destroy
477  */
478 static void
479 send_client_done_and_destroy (void *cls)
480 {
481   struct Operation *op = cls;
482   struct GNUNET_MQ_Envelope *ev;
483   struct GNUNET_SET_ResultMessage *rm;
484
485   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
486   rm->request_id = htonl (op->spec->client_request_id);
487   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
488   rm->element_type = htons (0);
489   GNUNET_MQ_send (op->spec->set->client_mq, ev);
490   _GSS_operation_destroy (op, GNUNET_YES);
491 }
492
493
494 /**
495  * Send all elements in the full result iterator.
496  *
497  * @param cls operation
498  */
499 static void
500 send_remaining_elements (void *cls)
501 {
502   struct Operation *op = cls;
503   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
504   struct GNUNET_MQ_Envelope *ev;
505   struct GNUNET_SET_ResultMessage *rm;
506   struct GNUNET_SET_Element *element;
507   int res;
508
509   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
510                                                      NULL,
511                                                      (const void **) &remaining);
512   if (GNUNET_NO == res)
513   {
514     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
515                 "sending done and destroy because iterator ran out\n");
516     send_client_done_and_destroy (op);
517     return;
518   }
519
520   element = &remaining->element;
521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522               "sending element (size %u) to client (full set)\n",
523               element->size);
524   GNUNET_assert (0 != op->spec->client_request_id);
525
526   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
527   GNUNET_assert (NULL != ev);
528
529   rm->result_status = htons (GNUNET_SET_STATUS_OK);
530   rm->request_id = htonl (op->spec->client_request_id);
531   rm->element_type = element->element_type;
532   memcpy (&rm[1], element->data, element->size);
533
534   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
535   GNUNET_MQ_send (op->spec->set->client_mq, ev);
536 }
537
538
539 /**
540  * Inform the peer that this operation is complete.
541  *
542  * @param op the intersection operation to fail
543  */
544 static void
545 send_peer_done (struct Operation *op)
546 {
547   struct GNUNET_MQ_Envelope *ev;
548
549   op->state->phase = PHASE_FINISHED;
550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551               "Intersection succeeded, sending DONE\n");
552   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
553   op->state->local_bf = NULL;
554
555   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
556   GNUNET_MQ_send (op->mq, ev);
557 }
558
559
560 /**
561  * Process a Bloomfilter once we got all the chunks
562  *
563  * @param op the intersection operation
564  */
565 static void
566 process_bf (struct Operation *op)
567 {
568   uint32_t old_elements;
569   uint32_t peer_elements;
570
571   old_elements = op->state->my_element_count;
572   peer_elements = op->spec->remote_element_count;
573   switch (op->state->phase)
574   {
575   case PHASE_INITIAL:
576     // If we are ot our first msg
577     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
578
579     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
580                                            &iterator_initialization_by_alice,
581                                            op);
582     break;
583   case PHASE_BF_EXCHANGE:
584   case PHASE_MAYBE_FINISHED:
585     // if we are bob or alice and are continuing operation
586     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
587                                            &iterator_bf_reduce,
588                                            op);
589     break;
590   default:
591     GNUNET_break_op (0);
592     fail_intersection_operation(op);
593   }
594   // the iterators created a new BF with salt+1
595   // the peer needs this information for decoding the next BF
596   // this behavior can be modified at will later on.
597   op->spec->salt++;
598
599   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
600   op->state->remote_bf = NULL;
601
602   if ((0 == op->state->my_element_count) // fully disjoint
603       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
604           && (old_elements == op->state->my_element_count)
605           && (op->state->my_element_count == peer_elements)))
606   {
607     // In the last round we though we were finished, we now know this is correct
608     send_peer_done (op);
609     return;
610   }
611
612   op->state->phase = PHASE_BF_EXCHANGE;
613   if (op->state->my_element_count == peer_elements)
614     // maybe we are finished, but we do one more round to make certain
615     // we don't have false positives ...
616     op->state->phase = PHASE_MAYBE_FINISHED;
617
618   send_bloomfilter (op);
619 }
620
621
622 /**
623  * Handle an BF multipart message from a remote peer.
624  *
625  * @param cls the intersection operation
626  * @param mh the header of the message
627  */
628 static void
629 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
630 {
631   struct Operation *op = cls;
632   const struct BFPart *msg = (const struct BFPart *) mh;
633   uint32_t chunk_size;
634   uint32_t chunk_offset;
635
636   chunk_size = ntohl(msg->chunk_length);
637   chunk_offset = ntohl(msg->chunk_offset);
638
639   if ((NULL == op->state->bf_data)
640        || (op->state->bf_data_size < chunk_size + chunk_offset))
641   {
642     // unexpected multipart chunk
643     GNUNET_break_op (0);
644     fail_intersection_operation(op);
645     return;
646   }
647
648   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
649
650   if (op->state->bf_data_size != chunk_offset + chunk_size)
651     // wait for next chunk
652     return;
653
654   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
655                                                             op->state->bf_data_size,
656                                                             op->state->bf_bits_per_element);
657
658   GNUNET_free (op->state->bf_data);
659   op->state->bf_data = NULL;
660
661   process_bf (op);
662 }
663
664
665 /**
666  * Handle an BF message from a remote peer.
667  *
668  * @param cls the intersection operation
669  * @param mh the header of the message
670  */
671 static void
672 handle_p2p_bf (void *cls,
673                const struct GNUNET_MessageHeader *mh)
674 {
675   struct Operation *op = cls;
676   const struct BFMessage *msg = (const struct BFMessage *) mh;
677   uint32_t bf_size;
678   uint32_t chunk_size;
679   uint32_t bf_bits_per_element;
680
681   switch (op->state->phase)
682   {
683   case PHASE_INITIAL:
684   case PHASE_BF_EXCHANGE:
685   case PHASE_MAYBE_FINISHED:
686     if (NULL == op->state->bf_data)
687     {
688       // no colliding multipart transaction going on currently
689       op->spec->salt = ntohl (msg->sender_mutator);
690       bf_size = ntohl (msg->bloomfilter_total_length);
691       bf_bits_per_element = ntohl (msg->bits_per_element);
692       chunk_size = ntohl (msg->bloomfilter_length);
693       op->spec->remote_element_count = ntohl(msg->sender_element_count);
694       if (bf_size == chunk_size)
695       {
696         // single part, done here
697         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
698                                                                   bf_size,
699                                                                   bf_bits_per_element);
700         process_bf (op);
701         return;
702       }
703
704       //first multipart chunk
705       op->state->bf_data = GNUNET_malloc (bf_size);
706       op->state->bf_data_size = bf_size;
707       op->state->bf_bits_per_element = bf_bits_per_element;
708       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
709       return;
710     }
711   default:
712     GNUNET_break_op (0);
713     fail_intersection_operation (op);
714   }
715 }
716
717
718 /**
719  * Handle an BF message from a remote peer.
720  *
721  * @param cls the intersection operation
722  * @param mh the header of the message
723  */
724 static void
725 handle_p2p_element_info (void *cls,
726                          const struct GNUNET_MessageHeader *mh)
727 {
728   struct Operation *op = cls;
729   const struct BFMessage *msg = (const struct BFMessage *) mh;
730
731   op->spec->remote_element_count = ntohl(msg->sender_element_count);
732   if ((op->state->phase != PHASE_INITIAL)
733       || (op->state->my_element_count > op->spec->remote_element_count)
734           || (0 == op->state->my_element_count)
735               || (0 == op->spec->remote_element_count))
736   {
737     GNUNET_break_op (0);
738     fail_intersection_operation(op);
739     return;
740   }
741
742   op->state->phase = PHASE_BF_EXCHANGE;
743   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
744
745   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
746                                          &iterator_initialization,
747                                          op);
748
749   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
750   op->state->remote_bf = NULL;
751
752   if (op->state->my_element_count == ntohl (msg->sender_element_count))
753     op->state->phase = PHASE_MAYBE_FINISHED;
754
755   send_bloomfilter (op);
756 }
757
758
759 /**
760  * Send our element count to the peer, in case our element count is lower than his
761  *
762  * @param op intersection operation
763  */
764 static void
765 send_element_count (struct Operation *op)
766 {
767   struct GNUNET_MQ_Envelope *ev;
768   struct BFMessage *msg;
769
770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771               "sending element count (bf_msg)\n");
772
773   // just send our element count, as the other peer must start
774   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
775   msg->sender_element_count = htonl (op->state->my_element_count);
776   msg->bloomfilter_length = htonl (0);
777   msg->sender_mutator = htonl (0);
778
779   GNUNET_MQ_send (op->mq, ev);
780 }
781
782
783 /**
784  * Send a result message to the client indicating
785  * that the operation is over.
786  * After the result done message has been sent to the client,
787  * destroy the evaluate operation.
788  *
789  * @param op intersection operation
790  */
791 static void
792 finish_and_destroy (struct Operation *op)
793 {
794   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
795
796   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
797   {
798     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799                 "sending full result set\n");
800     op->state->full_result_iter =
801         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
802     send_remaining_elements (op);
803     return;
804   }
805   send_client_done_and_destroy (op);
806 }
807
808
809 /**
810  * Handle a done message from a remote peer
811  *
812  * @param cls the union operation
813  * @param mh the message
814  */
815 static void
816 handle_p2p_done (void *cls,
817                  const struct GNUNET_MessageHeader *mh)
818 {
819   struct Operation *op = cls;
820
821   if ( (op->state->phase = PHASE_FINISHED) ||
822        (op->state->phase = PHASE_MAYBE_FINISHED) )
823   {
824     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
825                 "got final DONE\n");
826
827     finish_and_destroy (op);
828     return;
829   }
830
831   GNUNET_break_op (0);
832   fail_intersection_operation (op);
833 }
834
835
836 /**
837  * Initiate a set union operation with a remote peer.
838  *
839  * @param op operation that is created, should be initialized to
840  *        begin the evaluation
841  * @param opaque_context message to be transmitted to the listener
842  *        to convince him to accept, may be NULL
843  */
844 static void
845 intersection_evaluate (struct Operation *op,
846                        const struct GNUNET_MessageHeader *opaque_context)
847 {
848   struct GNUNET_MQ_Envelope *ev;
849   struct OperationRequestMessage *msg;
850
851   op->state = GNUNET_new (struct OperationState);
852   /* we started the operation, thus we have to send the operation request */
853   op->state->phase = PHASE_INITIAL;
854   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
855   op->state->my_element_count = op->spec->set->state->current_set_element_count;
856
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "Initiating intersection operation evaluation");
859   ev = GNUNET_MQ_msg_nested_mh (msg,
860                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
861                                 opaque_context);
862   if (NULL == ev)
863   {
864     /* the context message is too large */
865     GNUNET_break (0);
866     GNUNET_SERVER_client_disconnect (op->spec->set->client);
867     return;
868   }
869   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
870   msg->app_id = op->spec->app_id;
871   msg->salt = htonl (op->spec->salt);
872   msg->element_count = htonl(op->state->my_element_count);
873   GNUNET_MQ_send (op->mq, ev);
874   if (NULL != opaque_context)
875     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
876                 "sent op request with context message\n");
877   else
878     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879                 "sent op request without context message\n");
880 }
881
882
883 /**
884  * Accept an union operation request from a remote peer.
885  * Only initializes the private operation state.
886  *
887  * @param op operation that will be accepted as a union operation
888  */
889 static void
890 intersection_accept (struct Operation *op)
891 {
892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
893               "accepting set union operation\n");
894   op->state = GNUNET_new (struct OperationState);
895   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
896   op->state->my_element_count = op->spec->set->state->current_set_element_count;
897
898   // if Alice (the peer) has more elements than Bob (us), she should start
899   if (op->spec->remote_element_count < op->state->my_element_count){
900     op->state->phase = PHASE_INITIAL;
901     send_element_count(op);
902     return;
903   }
904   // create a new bloomfilter in case we have fewer elements
905   op->state->phase = PHASE_BF_EXCHANGE;
906   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
907                                                            BLOOMFILTER_SIZE,
908                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
909   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
910                                          &iterator_initialization,
911                                          op);
912   send_bloomfilter (op);
913 }
914
915
916 /**
917  * Create a new set supporting the intersection operation
918  *
919  * @return the newly created set
920  */
921 static struct SetState *
922 intersection_set_create ()
923 {
924   struct SetState *set_state;
925
926   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927               "intersection set created\n");
928   set_state = GNUNET_new (struct SetState);
929   set_state->current_set_element_count = 0;
930
931   return set_state;
932 }
933
934
935 /**
936  * Add the element from the given element message to the set.
937  *
938  * @param set_state state of the set want to add to
939  * @param ee the element to add to the set
940  */
941 static void
942 intersection_add (struct SetState *set_state,
943                   struct ElementEntry *ee)
944 {
945   set_state->current_set_element_count++;
946 }
947
948
949 /**
950  * Destroy a set that supports the intersection operation
951  *
952  * @param set_state the set to destroy
953  */
954 static void
955 intersection_set_destroy (struct SetState *set_state)
956 {
957   GNUNET_free (set_state);
958 }
959
960
961 /**
962  * Remove the element given in the element message from the set.
963  *
964  * @param set_state state of the set to remove from
965  * @param element set element to remove
966  */
967 static void
968 intersection_remove (struct SetState *set_state,
969                      struct ElementEntry *element)
970 {
971   GNUNET_assert(0 < set_state->current_set_element_count);
972   set_state->current_set_element_count--;
973 }
974
975
976 /**
977  * Dispatch messages for a intersection operation.
978  *
979  * @param op the state of the intersection evaluate operation
980  * @param mh the received message
981  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
982  *         #GNUNET_OK otherwise
983  */
984 static int
985 intersection_handle_p2p_message (struct Operation *op,
986                                  const struct GNUNET_MessageHeader *mh)
987 {
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989               "received p2p message (t: %u, s: %u)\n",
990               ntohs (mh->type), ntohs (mh->size));
991   switch (ntohs (mh->type))
992   {
993     /* this message handler is not active until after we received an
994      * operation request message, thus the ops request is not handled here
995      */
996   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
997     handle_p2p_element_info (op, mh);
998     break;
999   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1000     handle_p2p_bf (op, mh);
1001     break;
1002   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
1003     handle_p2p_bf_part (op, mh);
1004     break;
1005   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1006     handle_p2p_done (op, mh);
1007     break;
1008   default:
1009     /* something wrong with cadet's message handlers? */
1010     GNUNET_assert (0);
1011   }
1012   return GNUNET_OK;
1013 }
1014
1015
1016 /**
1017  * handler for peer-disconnects, notifies the client about the aborted operation
1018  *
1019  * @param op the destroyed operation
1020  */
1021 static void
1022 intersection_peer_disconnect (struct Operation *op)
1023 {
1024   if (PHASE_FINISHED != op->state->phase)
1025   {
1026     struct GNUNET_MQ_Envelope *ev;
1027     struct GNUNET_SET_ResultMessage *msg;
1028
1029     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1030     msg->request_id = htonl (op->spec->client_request_id);
1031     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1032     msg->element_type = htons (0);
1033     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1034     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1035                 "other peer disconnected prematurely\n");
1036     _GSS_operation_destroy (op, GNUNET_YES);
1037     return;
1038   }
1039   // else: the session has already been concluded
1040   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041               "other peer disconnected (finished)\n");
1042   if (GNUNET_NO == op->state->client_done_sent)
1043     finish_and_destroy (op);
1044 }
1045
1046
1047 /**
1048  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1049  *
1050  * @param op union operation to destroy
1051  */
1052 static void
1053 intersection_op_cancel (struct Operation *op)
1054 {
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "destroying intersection op\n");
1057   /* check if the op was canceled twice */
1058   GNUNET_assert (NULL != op->state);
1059   if (NULL != op->state->remote_bf)
1060   {
1061     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1062     op->state->remote_bf = NULL;
1063   }
1064   if (NULL != op->state->local_bf)
1065   {
1066     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1067     op->state->local_bf = NULL;
1068   }
1069 /*  if (NULL != op->state->my_elements)
1070   {
1071     // no need to free the elements, they are still part of the set
1072     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1073     op->state->my_elements = NULL;
1074   }*/
1075   GNUNET_free (op->state);
1076   op->state = NULL;
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "destroying intersection op done\n");
1079 }
1080
1081
1082 /**
1083  * Get the table with implementing functions for set intersection.
1084  *
1085  * @return the operation specific VTable
1086  */
1087 const struct SetVT *
1088 _GSS_intersection_vt ()
1089 {
1090   static const struct SetVT intersection_vt = {
1091     .create = &intersection_set_create,
1092     .msg_handler = &intersection_handle_p2p_message,
1093     .add = &intersection_add,
1094     .remove = &intersection_remove,
1095     .destroy_set = &intersection_set_destroy,
1096     .evaluate = &intersection_evaluate,
1097     .accept = &intersection_accept,
1098     .peer_disconnect = &intersection_peer_disconnect,
1099     .cancel = &intersection_op_cancel,
1100   };
1101
1102   return &intersection_vt;
1103 }