0e2ac4863a06fd006aef3f8e712b99ee9640c1fa
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation
73  * with another peer.
74  */
75 struct OperationState
76 {
77   /**
78    * The bf we currently receive
79    */
80   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
81
82   /**
83    * BF of the set's element.
84    */
85   struct GNUNET_CONTAINER_BloomFilter *local_bf;
86
87   /**
88    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
89    */
90   char * local_bf_data;
91
92   /**
93    * size of the bloomfilter
94    */
95   uint32_t local_bf_data_size;
96   
97   /**
98    * Current state of the operation.
99    */
100   enum IntersectionOperationPhase phase;
101
102   /**
103    * Generation in which the operation handle
104    * was created.
105    */
106   unsigned int generation_created;
107
108   /**
109    * Maps element-id-hashes to 'elements in our set'.
110    */
111   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
112
113   /**
114    * Current element count contained within contained_elements
115    */
116   uint32_t my_element_count;
117
118   /**
119    * Iterator for sending elements on the key to element mapping to the client.
120    */
121   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
122
123   /**
124    * Evaluate operations are held in
125    * a linked list.
126    */
127   struct OperationState *next;
128
129    /**
130     * Evaluate operations are held in
131     * a linked list.
132     */
133   struct OperationState *prev;
134
135   /**
136    * Did we send the client that we are done?
137    */
138   int client_done_sent;
139 };
140
141
142 /**
143  * Extra state required for efficient set intersection.
144  */
145 struct SetState
146 {
147   /**
148    * Number of currently valid elements in the set which have not been removed
149    */
150   uint32_t current_set_element_count;
151 };
152
153
154 /**
155  * Alice's version:
156  *
157  * fills the contained-elements hashmap with all relevant
158  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
159  *
160  * @param cls closure
161  * @param key current key code
162  * @param value value in the hash map
163  * @return #GNUNET_YES if we should continue to
164  *         iterate,
165  *         #GNUNET_NO if not.
166  */
167 static int
168 iterator_initialization_by_alice (void *cls,
169                                   const struct GNUNET_HashCode *key,
170                                   void *value)
171 {
172   struct ElementEntry *ee = value;
173   struct Operation *op = cls;
174   struct GNUNET_HashCode mutated_hash;
175
176   //only consider this element, if it is valid for us
177   if ((op->generation_created >= ee->generation_removed)
178        || (op->generation_created < ee->generation_added))
179     return GNUNET_YES;
180
181   // not contained according to bob's bloomfilter
182   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
183                            op->spec->salt,
184                            &mutated_hash);
185   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
186                                                       &mutated_hash))
187     return GNUNET_YES;
188
189   op->state->my_element_count++;
190   GNUNET_assert (GNUNET_YES ==
191                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
192                                                     &ee->element_hash, ee,
193                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
194
195   return GNUNET_YES;
196 }
197
198 /**
199  * fills the contained-elements hashmap with all relevant
200  * elements and adds their mutated hashes to our local bloomfilter
201  *
202  * @param cls closure
203  * @param key current key code
204  * @param value value in the hash map
205  * @return #GNUNET_YES if we should continue to
206  *         iterate,
207  *         #GNUNET_NO if not.
208  */
209 static int
210 iterator_initialization (void *cls,
211                          const struct GNUNET_HashCode *key,
212                          void *value)
213 {
214   struct ElementEntry *ee = value;
215   struct Operation *op = cls;
216   struct GNUNET_HashCode mutated_hash;
217
218   //only consider this element, if it is valid for us
219   if ((op->generation_created >= ee->generation_removed)
220        || (op->generation_created < ee->generation_added))
221     return GNUNET_YES;
222
223   GNUNET_assert (GNUNET_YES ==
224                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
225                                                     &ee->element_hash, ee,
226                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
227   return GNUNET_YES;
228 }
229
230
231 /**
232  * removes element from a hashmap if it is not contained within the
233  * provided remote bloomfilter. Then, fill our new bloomfilter.
234  *
235  * @param cls closure
236  * @param key current key code
237  * @param value value in the hash map
238  * @return #GNUNET_YES if we should continue to
239  *         iterate,
240  *         #GNUNET_NO if not.
241  */
242 static int
243 iterator_bf_reduce (void *cls,
244                    const struct GNUNET_HashCode *key,
245                    void *value)
246 {
247   struct ElementEntry *ee = value;
248   struct Operation *op = cls;
249   struct GNUNET_HashCode mutated_hash;
250
251   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
252
253   if (GNUNET_NO ==
254       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
255                                          &mutated_hash))
256   {
257     op->state->my_element_count--;
258     GNUNET_assert (GNUNET_YES ==
259                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
260                                                          &ee->element_hash,
261                                                          ee));
262   }
263
264   return GNUNET_YES;
265 }
266
267 /**
268  * create a bloomfilter based on the elements given
269  *
270  * @param cls closure
271  * @param key current key code
272  * @param value value in the hash map
273  * @return #GNUNET_YES if we should continue to
274  *         iterate,
275  *         #GNUNET_NO if not.
276  */
277 static int
278 iterator_bf_create (void *cls,
279                    const struct GNUNET_HashCode *key,
280                    void *value)
281 {
282   struct ElementEntry *ee = value;
283   struct Operation *op = cls;
284   struct GNUNET_HashCode mutated_hash;
285
286   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
287
288   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
289                                     &mutated_hash);
290   return GNUNET_YES;
291 }
292
293 /**
294  * Inform the client that the union operation has failed,
295  * and proceed to destroy the evaluate operation.
296  *
297  * @param op the intersection operation to fail
298  */
299 static void
300 fail_intersection_operation (struct Operation *op)
301 {
302   struct GNUNET_MQ_Envelope *ev;
303   struct GNUNET_SET_ResultMessage *msg;
304
305   if (op->state->my_elements)
306     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
307
308   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
309
310   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
311   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
312   msg->request_id = htonl (op->spec->client_request_id);
313   msg->element_type = htons (0);
314   GNUNET_MQ_send (op->spec->set->client_mq, ev);
315   _GSS_operation_destroy (op);
316 }
317
318
319 /**
320  * Send a request for the evaluate operation to a remote peer
321  *
322  * @param op operation with the other peer
323  */
324 static void
325 send_operation_request (struct Operation *op)
326 {
327   struct GNUNET_MQ_Envelope *ev;
328   struct OperationRequestMessage *msg;
329
330   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
331                                 op->spec->context_msg);
332
333   if (NULL == ev)
334   {
335     /* the context message is too large */
336     GNUNET_break (0);
337     GNUNET_SERVER_client_disconnect (op->spec->set->client);
338     return;
339   }
340   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
341   msg->app_id = op->spec->app_id;
342   msg->salt = htonl (op->spec->salt);
343   msg->element_count = htonl(op->state->my_element_count);
344
345   GNUNET_MQ_send (op->mq, ev);
346
347   if (NULL != op->spec->context_msg)
348     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
349   else
350     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
351
352   if (NULL != op->spec->context_msg)
353   {
354     GNUNET_free (op->spec->context_msg);
355     op->spec->context_msg = NULL;
356   }
357 }
358
359 static void
360 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
361 {
362   struct GNUNET_MQ_Envelope *ev;
363   struct BFPart *msg;
364   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
365   uint32_t todo_size = op->state->local_bf_data_size - offset;
366
367   if (todo_size < chunk_size)
368     chunk_size = todo_size;
369
370   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
371
372   msg->bloomfilter_length = htonl (chunk_size);
373   msg->bloomfilter_offset = htonl (offset);
374   memcpy(&msg[1], &op->state->local_bf_data[offset], chunk_size);
375
376   GNUNET_MQ_send (op->mq, ev);
377
378   if (op->state->local_bf_data_size == offset + chunk_size)
379   {
380     // done
381     GNUNET_free(op->state->local_bf_data);
382     op->state->local_bf_data = NULL;
383     return;
384   }
385
386   send_bloomfilter_multipart (op, offset + chunk_size);
387 }
388
389 /**
390  * Send a bloomfilter to our peer.
391  * that the operation is over.
392  * After the result done message has been sent to the client,
393  * destroy the evaluate operation.
394  *
395  * @param op intersection operation
396  */
397 static void
398 send_bloomfilter (struct Operation *op)
399 {
400   struct GNUNET_MQ_Envelope *ev;
401   struct BFMessage *msg;
402   uint32_t bf_size;
403   uint32_t bf_elementbits;
404   uint32_t chunk_size;
405   struct GNUNET_CONTAINER_BloomFilter * local_bf;
406
407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
408   
409   CALCULATE_BF_SIZE(op->state->my_element_count,
410                     op->spec->remote_element_count,
411                     bf_size,
412                     bf_elementbits);
413
414   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
415                                                 bf_size,
416                                                 bf_elementbits);
417
418   op->spec->salt++;
419   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
420                                          &iterator_bf_create,
421                                          op);
422
423   // send our bloomfilter
424   if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= bf_size + sizeof (struct BFMessage)) {
425     // singlepart
426     chunk_size = bf_size;
427     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
428     GNUNET_assert (GNUNET_SYSERR !=
429                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
430                                                               &msg[1],
431                                                               bf_size));
432   }
433   else {
434     //multipart
435     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
436     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
437     op->state->local_bf_data = (char *) GNUNET_malloc (bf_size);
438     GNUNET_assert (GNUNET_SYSERR !=
439                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
440                                                               op->state->local_bf_data,
441                                                               bf_size));
442     memcpy (&msg[1], op->state->local_bf_data, chunk_size);
443     op->state->local_bf_data_size = bf_size;
444   }
445   GNUNET_CONTAINER_bloomfilter_free (local_bf);
446
447   msg->sender_element_count = htonl (op->state->my_element_count);
448   msg->bloomfilter_total_length = htonl (bf_size);
449   msg->bloomfilter_length = htonl (chunk_size);
450   msg->bits_per_element = htonl (bf_elementbits);
451   msg->sender_mutator = htonl (op->spec->salt);
452
453   GNUNET_MQ_send (op->mq, ev);
454
455   if (op->state->local_bf_data) {
456     send_bloomfilter_multipart (op, chunk_size);
457   }
458 }
459
460
461 /**
462  * Signal to the client that the operation has finished and
463  * destroy the operation.
464  *
465  * @param cls operation to destroy
466  */
467 static void
468 send_client_done_and_destroy (void *cls)
469 {
470   struct Operation *op = cls;
471   struct GNUNET_MQ_Envelope *ev;
472   struct GNUNET_SET_ResultMessage *rm;
473   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
474   rm->request_id = htonl (op->spec->client_request_id);
475   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
476   rm->element_type = htons (0);
477   GNUNET_MQ_send (op->spec->set->client_mq, ev);
478   _GSS_operation_destroy (op);
479 }
480
481
482 /**
483  * Send all elements in the full result iterator.
484  *
485  * @param cls operation
486  */
487 static void
488 send_remaining_elements (void *cls)
489 {
490   struct Operation *op = cls;
491   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
492   struct GNUNET_MQ_Envelope *ev;
493   struct GNUNET_SET_ResultMessage *rm;
494   struct GNUNET_SET_Element *element;
495   int res;
496
497   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
498   if (GNUNET_NO == res) {
499     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
500     send_client_done_and_destroy (op);
501     return;
502   }
503
504   element = &remaining->element;
505   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
506   GNUNET_assert (0 != op->spec->client_request_id);
507
508   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
509   GNUNET_assert (NULL != ev);
510
511   rm->result_status = htons (GNUNET_SET_STATUS_OK);
512   rm->request_id = htonl (op->spec->client_request_id);
513   rm->element_type = element->type;
514   memcpy (&rm[1], element->data, element->size);
515
516   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
517   GNUNET_MQ_send (op->spec->set->client_mq, ev);
518 }
519
520
521 /**
522  * Inform the peer that this operation is complete.
523  *
524  * @param op the intersection operation to fail
525  */
526 static void
527 send_peer_done (struct Operation *op)
528 {
529   struct GNUNET_MQ_Envelope *ev;
530
531   op->state->phase = PHASE_FINISHED;
532   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
533   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
534   op->state->local_bf = NULL;
535
536   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
537   GNUNET_MQ_send (op->mq, ev);
538 }
539
540
541 /**
542  * Handle an BF message from a remote peer.
543  *
544  * @param cls the intersection operation
545  * @param mh the header of the message
546  */
547 static void
548 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
549 {
550   struct Operation *op = cls;
551   const struct BFMessage *msg = (const struct BFMessage *) mh;
552   uint32_t old_elements;
553   uint32_t peer_elements;
554
555   old_elements = op->state->my_element_count;
556   op->spec->salt = ntohl (msg->sender_mutator);
557
558   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
559                                                             ntohl (msg->bloomfilter_length),
560                                                             GNUNET_CONSTANTS_BLOOMFILTER_K);
561   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
562                                                            BLOOMFILTER_SIZE,
563                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
564   switch (op->state->phase)
565   {
566   case PHASE_INITIAL:
567     // If we are ot our first msg
568     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
569
570     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
571                                            &iterator_initialization_by_alice,
572                                            op);
573     break;
574   case PHASE_BF_EXCHANGE:
575   case PHASE_MAYBE_FINISHED:
576     // if we are bob or alice and are continuing operation
577     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
578                                            &iterator_bf_reduce,
579                                            op);
580     break;
581   default:
582     GNUNET_break_op (0);
583     fail_intersection_operation(op);
584   }
585   // the iterators created a new BF with salt+1
586   // the peer needs this information for decoding the next BF
587   // this behavior can be modified at will later on.
588   op->spec->salt++;
589
590   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
591   op->state->remote_bf = NULL;
592
593   peer_elements = ntohl(msg->sender_element_count);
594   if ((op->state->phase == PHASE_MAYBE_FINISHED)
595        && (old_elements == op->state->my_element_count)
596        && (op->state->my_element_count == peer_elements)){
597     // In the last round we though we were finished, we now know this is correct
598     send_peer_done(op);
599     return;
600   }
601
602   op->state->phase = PHASE_BF_EXCHANGE;
603   // maybe we are finished, but we do one more round to make certain
604   // we don't have false positives ...
605   if (op->state->my_element_count == peer_elements)
606       op->state->phase = PHASE_MAYBE_FINISHED;
607
608   send_bloomfilter (op);
609 }
610
611
612 /**
613  * Handle an BF message from a remote peer.
614  *
615  * @param cls the intersection operation
616  * @param mh the header of the message
617  */
618 static void
619 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
620 {
621   struct Operation *op = cls;
622   struct BFMessage *msg = (struct BFMessage *) mh;
623
624   op->spec->remote_element_count = ntohl(msg->sender_element_count);
625   if ((op->state->phase != PHASE_INITIAL)
626       || (op->state->my_element_count > op->spec->remote_element_count)){
627     GNUNET_break_op (0);
628     fail_intersection_operation(op);
629   }
630
631   op->state->phase = PHASE_BF_EXCHANGE;
632   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
633   
634   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
635                                          &iterator_initialization,
636                                          op);
637
638   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
639   op->state->remote_bf = NULL;
640
641   if (op->state->my_element_count == ntohl (msg->sender_element_count))
642     op->state->phase = PHASE_MAYBE_FINISHED;
643
644   send_bloomfilter (op);
645 }
646
647
648 /**
649  * Send our element to the peer, in case our element count is lower than his
650  *
651  * @param op intersection operation
652  */
653 static void
654 send_element_count (struct Operation *op)
655 {
656   struct GNUNET_MQ_Envelope *ev;
657   struct BFMessage *msg;
658
659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
660
661   // just send our element count, as the other peer must start
662   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
663   msg->sender_element_count = htonl (op->state->my_element_count);
664   msg->bloomfilter_length = htonl (0);
665   msg->sender_mutator = htonl (0);
666
667   GNUNET_MQ_send (op->mq, ev);
668 }
669
670
671 /**
672  * Send a result message to the client indicating
673  * that the operation is over.
674  * After the result done message has been sent to the client,
675  * destroy the evaluate operation.
676  *
677  * @param op intersection operation
678  */
679 static void
680 finish_and_destroy (struct Operation *op)
681 {
682   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
683
684   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
685   {
686     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
687     op->state->full_result_iter =
688         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
689     send_remaining_elements (op);
690     return;
691   }
692   send_client_done_and_destroy (op);
693 }
694
695
696 /**
697  * Handle a done message from a remote peer
698  *
699  * @param cls the union operation
700  * @param mh the message
701  */
702 static void
703 handle_p2p_done (void *cls,
704                  const struct GNUNET_MessageHeader *mh)
705 {
706   struct Operation *op = cls;
707
708   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
710
711     finish_and_destroy (op);
712     return;
713   }
714
715   GNUNET_break_op (0);
716   fail_intersection_operation (op);
717 }
718
719
720 /**
721  * Evaluate a union operation with
722  * a remote peer.
723  *
724  * @param op operation to evaluate
725  */
726 static void
727 intersection_evaluate (struct Operation *op)
728 {
729   op->state = GNUNET_new (struct OperationState);
730   /* we started the operation, thus we have to send the operation request */
731   op->state->phase = PHASE_INITIAL;
732   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
733   op->state->my_element_count = op->spec->set->state->current_set_element_count;
734
735   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736               "evaluating intersection operation");
737   send_operation_request (op);
738 }
739
740
741 /**
742  * Accept an union operation request from a remote peer.
743  * Only initializes the private operation state.
744  *
745  * @param op operation that will be accepted as a union operation
746  */
747 static void
748 intersection_accept (struct Operation *op)
749 {
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
751   op->state = GNUNET_new (struct OperationState);
752   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
753   op->state->my_element_count = op->spec->set->state->current_set_element_count;
754
755   // if Alice (the peer) has more elements than Bob (us), she should start
756   if (op->spec->remote_element_count < op->state->my_element_count){
757     op->state->phase = PHASE_INITIAL;
758     send_element_count(op);
759     return;
760   }
761   // create a new bloomfilter in case we have fewer elements
762   op->state->phase = PHASE_BF_EXCHANGE;
763   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
764                                                            BLOOMFILTER_SIZE,
765                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
766   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
767                                          &iterator_initialization,
768                                          op);
769   send_bloomfilter (op);
770 }
771
772
773 /**
774  * Create a new set supporting the intersection operation
775  *
776  * @return the newly created set
777  */
778 static struct SetState *
779 intersection_set_create ()
780 {
781   struct SetState *set_state;
782
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "intersection set created\n");
785   set_state = GNUNET_new (struct SetState);
786   set_state->current_set_element_count = 0;
787
788   return set_state;
789 }
790
791
792 /**
793  * Add the element from the given element message to the set.
794  *
795  * @param set_state state of the set want to add to
796  * @param ee the element to add to the set
797  */
798 static void
799 intersection_add (struct SetState *set_state,
800                   struct ElementEntry *ee)
801 {
802   GNUNET_assert(0 < set_state->current_set_element_count);
803   set_state->current_set_element_count++;
804 }
805
806
807 /**
808  * Destroy a set that supports the intersection operation
809  *
810  * @param set_state the set to destroy
811  */
812 static void
813 intersection_set_destroy (struct SetState *set_state)
814 {
815   GNUNET_free (set_state);
816 }
817
818
819 /**
820  * Remove the element given in the element message from the set.
821  *
822  * @param set_state state of the set to remove from
823  * @param element set element to remove
824  */
825 static void
826 intersection_remove (struct SetState *set_state,
827                      struct ElementEntry *element)
828 {
829   GNUNET_assert(0 < set_state->current_set_element_count);
830   set_state->current_set_element_count--;
831 }
832
833
834 /**
835  * Dispatch messages for a intersection operation.
836  *
837  * @param op the state of the intersection evaluate operation
838  * @param mh the received message
839  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
840  *         #GNUNET_OK otherwise
841  */
842 static int
843 intersection_handle_p2p_message (struct Operation *op,
844                                  const struct GNUNET_MessageHeader *mh)
845 {
846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
847               ntohs (mh->type), ntohs (mh->size));
848   switch (ntohs (mh->type))
849   {
850     /* this message handler is not active until after we received an
851      * operation request message, thus the ops request is not handled here
852      */
853   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
854     handle_p2p_element_info (op, mh);
855     break;
856   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
857     handle_p2p_bf (op, mh);
858     break;
859   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
860     handle_p2p_done (op, mh);
861     break;
862   default:
863     /* something wrong with mesh's message handlers? */
864     GNUNET_assert (0);
865   }
866   return GNUNET_OK;
867 }
868
869
870 /**
871  * handler for peer-disconnects, notifies the client about the aborted operation
872  *
873  * @param op the destroyed operation
874  */
875 static void
876 intersection_peer_disconnect (struct Operation *op)
877 {
878   if (PHASE_FINISHED != op->state->phase)
879   {
880     struct GNUNET_MQ_Envelope *ev;
881     struct GNUNET_SET_ResultMessage *msg;
882
883     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
884     msg->request_id = htonl (op->spec->client_request_id);
885     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
886     msg->element_type = htons (0);
887     GNUNET_MQ_send (op->spec->set->client_mq, ev);
888     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
889     _GSS_operation_destroy (op);
890     return;
891   }
892   // else: the session has already been concluded
893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
894   if (GNUNET_NO == op->state->client_done_sent)
895     finish_and_destroy (op);
896 }
897
898
899 /**
900  * Destroy the union operation.  Only things specific to the union operation are destroyed.
901  *
902  * @param op union operation to destroy
903  */
904 static void
905 intersection_op_cancel (struct Operation *op)
906 {
907   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
908   /* check if the op was canceled twice */
909   GNUNET_assert (NULL != op->state);
910   if (NULL != op->state->remote_bf)
911   {
912     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
913     op->state->remote_bf = NULL;
914   }
915   if (NULL != op->state->local_bf)
916   {
917     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
918     op->state->local_bf = NULL;
919   }
920   if (NULL != op->state->my_elements)
921   {
922     // no need to free the elements, they are still part of the set
923     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
924     op->state->my_elements = NULL;
925   }
926   GNUNET_free (op->state);
927   op->state = NULL;
928   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
929 }
930
931 const struct SetVT *
932 _GSS_intersection_vt ()
933 {
934   static const struct SetVT intersection_vt = {
935     .create = &intersection_set_create,
936     .msg_handler = &intersection_handle_p2p_message,
937     .add = &intersection_add,
938     .remove = &intersection_remove,
939     .destroy_set = &intersection_set_destroy,
940     .evaluate = &intersection_evaluate,
941     .accept = &intersection_accept,
942     .peer_disconnect = &intersection_peer_disconnect,
943     .cancel = &intersection_op_cancel,
944   };
945
946   return &intersection_vt;
947 }