towards multipart msging for bloomfilters in set intersection
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * Alices has suggested an operation to bob,
41    * and is waiting for a bf or session end.
42    */
43   PHASE_INITIAL,
44   /**
45    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
46    * until one notices the their element count is equal
47    */
48   PHASE_BF_EXCHANGE,
49   /**
50    * if both peers have an equal peercount, they enter this state for
51    * one more turn, to see if they actually have agreed on a correct set.
52    * if a peer finds the same element count after the next iteration,
53    * it ends the the session
54    */
55   PHASE_MAYBE_FINISHED,
56   /**
57    * The protocol is over.
58    * Results may still have to be sent to the client.
59    */
60   PHASE_FINISHED
61 };
62
63
64 /**
65  * State of an evaluate operation
66  * with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79   
80   /**
81    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
82    */
83   char * local_bf_data;
84   
85   /**
86    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
87    */
88   uint32_t local_bf_data_size;
89
90   /**
91    * Current state of the operation.
92    */
93   enum IntersectionOperationPhase phase;
94
95   /**
96    * Generation in which the operation handle
97    * was created.
98    */
99   unsigned int generation_created;
100
101   /**
102    * Maps element-id-hashes to 'elements in our set'.
103    */
104   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
105
106   /**
107    * Current element count contained within contained_elements
108    */
109   uint32_t my_element_count;
110
111   /**
112    * Iterator for sending elements on the key to element mapping to the client.
113    */
114   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
115
116   /**
117    * Evaluate operations are held in
118    * a linked list.
119    */
120   struct OperationState *next;
121
122    /**
123     * Evaluate operations are held in
124     * a linked list.
125     */
126   struct OperationState *prev;
127
128   /**
129    * Did we send the client that we are done?
130    */
131   int client_done_sent;
132 };
133
134
135 /**
136  * Extra state required for efficient set intersection.
137  */
138 struct SetState
139 {
140   /**
141    * Number of currently valid elements in the set which have not been removed
142    */
143   uint32_t current_set_element_count;
144 };
145
146
147 /**
148  * Alice's version:
149  *
150  * fills the contained-elements hashmap with all relevant
151  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
152  *
153  * @param cls closure
154  * @param key current key code
155  * @param value value in the hash map
156  * @return #GNUNET_YES if we should continue to
157  *         iterate,
158  *         #GNUNET_NO if not.
159  */
160 static int
161 iterator_initialization_by_alice (void *cls,
162                                   const struct GNUNET_HashCode *key,
163                                   void *value)
164 {
165   struct ElementEntry *ee = value;
166   struct Operation *op = cls;
167   struct GNUNET_HashCode mutated_hash;
168
169   //only consider this element, if it is valid for us
170   if ((op->generation_created >= ee->generation_removed)
171        || (op->generation_created < ee->generation_added))
172     return GNUNET_YES;
173
174   // not contained according to bob's bloomfilter
175   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
176                            op->spec->salt,
177                            &mutated_hash);
178   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
179                                                       &mutated_hash))
180     return GNUNET_YES;
181
182   op->state->my_element_count++;
183   GNUNET_assert (GNUNET_YES ==
184                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
185                                                     &ee->element_hash, ee,
186                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
187
188   /* create our own bloomfilter with salt+1 */
189   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
190                             op->spec->salt + 1,
191                             &mutated_hash);
192   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
193                                     &mutated_hash);
194
195   return GNUNET_YES;
196 }
197
198 /**
199  * fills the contained-elements hashmap with all relevant
200  * elements and adds their mutated hashes to our local bloomfilter
201  *
202  * @param cls closure
203  * @param key current key code
204  * @param value value in the hash map
205  * @return #GNUNET_YES if we should continue to
206  *         iterate,
207  *         #GNUNET_NO if not.
208  */
209 static int
210 iterator_initialization (void *cls,
211                          const struct GNUNET_HashCode *key,
212                          void *value)
213 {
214   struct ElementEntry *ee = value;
215   struct Operation *op = cls;
216   struct GNUNET_HashCode mutated_hash;
217
218   //only consider this element, if it is valid for us
219   if ((op->generation_created >= ee->generation_removed)
220        || (op->generation_created < ee->generation_added))
221     return GNUNET_YES;
222
223   GNUNET_assert (GNUNET_YES ==
224                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
225                                                     &ee->element_hash, ee,
226                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
227   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
228                             op->spec->salt,
229                             &mutated_hash);
230   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
231                                     &mutated_hash);
232   return GNUNET_YES;
233 }
234
235
236 /**
237  * removes element from a hashmap if it is not contained within the
238  * provided remote bloomfilter. Then, fill our new bloomfilter.
239  *
240  * @param cls closure
241  * @param key current key code
242  * @param value value in the hash map
243  * @return #GNUNET_YES if we should continue to
244  *         iterate,
245  *         #GNUNET_NO if not.
246  */
247 static int
248 iterator_bf_round (void *cls,
249                    const struct GNUNET_HashCode *key,
250                    void *value)
251 {
252   struct ElementEntry *ee = value;
253   struct Operation *op = cls;
254   struct GNUNET_HashCode mutated_hash;
255
256   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
257
258   if (GNUNET_NO ==
259       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
260                                          &mutated_hash))
261   {
262     op->state->my_element_count--;
263     GNUNET_assert (GNUNET_YES ==
264                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
265                                                          &ee->element_hash,
266                                                          ee));
267     return GNUNET_YES;
268   }
269   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
270                            op->spec->salt+1,
271                            &mutated_hash);
272   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
273                                     &mutated_hash);
274   return GNUNET_YES;
275 }
276
277
278 /**
279  * Inform the client that the union operation has failed,
280  * and proceed to destroy the evaluate operation.
281  *
282  * @param op the intersection operation to fail
283  */
284 static void
285 fail_intersection_operation (struct Operation *op)
286 {
287   struct GNUNET_MQ_Envelope *ev;
288   struct GNUNET_SET_ResultMessage *msg;
289
290   if (op->state->my_elements)
291     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
292
293   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
294
295   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
296   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
297   msg->request_id = htonl (op->spec->client_request_id);
298   msg->element_type = htons (0);
299   GNUNET_MQ_send (op->spec->set->client_mq, ev);
300   _GSS_operation_destroy (op);
301 }
302
303
304 /**
305  * Send a request for the evaluate operation to a remote peer
306  *
307  * @param eo operation with the other peer
308  */
309 static void
310 send_operation_request (struct Operation *op)
311 {
312   struct GNUNET_MQ_Envelope *ev;
313   struct OperationRequestMessage *msg;
314
315   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
316                                 op->spec->context_msg);
317
318   if (NULL == ev)
319   {
320     /* the context message is too large */
321     GNUNET_break (0);
322     GNUNET_SERVER_client_disconnect (op->spec->set->client);
323     return;
324   }
325   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
326   msg->app_id = op->spec->app_id;
327   msg->salt = htonl (op->spec->salt);
328   msg->element_count = htonl(op->state->my_element_count);
329
330   GNUNET_MQ_send (op->mq, ev);
331
332   if (NULL != op->spec->context_msg)
333     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
334   else
335     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
336
337   if (NULL != op->spec->context_msg)
338   {
339     GNUNET_free (op->spec->context_msg);
340     op->spec->context_msg = NULL;
341   }
342 }
343
344
345 /**
346  * Send a bloomfilter to our peer.
347  * that the operation is over.
348  * After the result done message has been sent to the client,
349  * destroy the evaluate operation.
350  *
351  * @param eo intersection operation
352  */
353 static void
354 send_bloomfilter (struct Operation *op)
355 {
356   struct GNUNET_MQ_Envelope *ev;
357   struct BFMessage *msg;
358   uint32_t bf_size;
359
360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
361
362   // send our bloomfilter
363   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
364   if ( GNUNET_SERVER_MAX_MESSAGE_SIZE <= bf_size + sizeof(struct BFMessage))
365   {
366     // singlepart
367     ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
368     
369     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
370     op->state->local_bf = NULL;
371     
372     msg->reserved = 0;
373     msg->sender_element_count = htonl (op->state->my_element_count);
374     msg->bloomfilter_length = htonl (bf_size);
375     msg->bloomfilter_offset = htonl (0);
376     msg->sender_mutator = htonl (op->spec->salt);
377
378     GNUNET_MQ_send (op->mq, ev);
379   }
380   else {
381     op->state->local_bf_data = (char *)GNUNET_malloc(bf_size);
382     GNUNET_assert (GNUNET_SYSERR !=
383                  GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
384                                                             op->state->local_bf_data,
385                                                             bf_size));
386     op->state->local_bf_data_size = bf_size;
387     send_bloomfilter_multipart (op, 0);
388   }
389 }
390
391
392 /**
393  * Signal to the client that the operation has finished and
394  * destroy the operation.
395  *
396  * @param cls operation to destroy
397  */
398 static void
399 send_client_done_and_destroy (void *cls)
400 {
401   struct Operation *op = cls;
402   struct GNUNET_MQ_Envelope *ev;
403   struct GNUNET_SET_ResultMessage *rm;
404   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
405   rm->request_id = htonl (op->spec->client_request_id);
406   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
407   rm->element_type = htons (0);
408   GNUNET_MQ_send (op->spec->set->client_mq, ev);
409   _GSS_operation_destroy (op);
410 }
411
412
413 /**
414  * Send all elements in the full result iterator.
415  *
416  * @param cls operation
417  */
418 static void
419 send_remaining_elements (void *cls)
420 {
421   struct Operation *op = cls;
422   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
423   struct GNUNET_MQ_Envelope *ev;
424   struct GNUNET_SET_ResultMessage *rm;
425   struct GNUNET_SET_Element *element;
426   int res;
427
428   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
429   if (GNUNET_NO == res) {
430     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
431     send_client_done_and_destroy (op);
432     return;
433   }
434
435   element = &remaining->element;
436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
437   GNUNET_assert (0 != op->spec->client_request_id);
438
439   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
440   GNUNET_assert (NULL != ev);
441
442   rm->result_status = htons (GNUNET_SET_STATUS_OK);
443   rm->request_id = htonl (op->spec->client_request_id);
444   rm->element_type = element->type;
445   memcpy (&rm[1], element->data, element->size);
446
447   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
448   GNUNET_MQ_send (op->spec->set->client_mq, ev);
449 }
450
451
452 /**
453  * Inform the peer that this operation is complete.
454  *
455  * @param eo the intersection operation to fail
456  */
457 static void
458 send_peer_done (struct Operation *op)
459 {
460   struct GNUNET_MQ_Envelope *ev;
461
462   op->state->phase = PHASE_FINISHED;
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
464   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
465   op->state->local_bf = NULL;
466
467   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
468   GNUNET_MQ_send (op->mq, ev);
469 }
470
471
472 /**
473  * Handle an BF message from a remote peer.
474  *
475  * @param cls the intersection operation
476  * @param mh the header of the message
477  */
478 static void
479 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
480 {
481   struct Operation *op = cls;
482   const struct BFMessage *msg = (const struct BFMessage *) mh;
483   uint32_t old_elements;
484   uint32_t peer_elements;
485
486   old_elements = op->state->my_element_count;
487   op->spec->salt = ntohl (msg->sender_mutator);
488
489   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
490                                                             BLOOMFILTER_SIZE,
491                                                             ntohl (msg->bloomfilter_length));
492   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
493                                                            BLOOMFILTER_SIZE,
494                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
495   switch (op->state->phase)
496   {
497   case PHASE_INITIAL:
498     // If we are ot our first msg
499     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
500
501     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
502                                            &iterator_initialization_by_alice,
503                                            op);
504     break;
505   case PHASE_BF_EXCHANGE:
506   case PHASE_MAYBE_FINISHED:
507     // if we are bob or alice and are continuing operation
508     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
509                                            &iterator_bf_round,
510                                            op);
511     break;
512   default:
513     GNUNET_break_op (0);
514     fail_intersection_operation(op);
515   }
516   // the iterators created a new BF with salt+1
517   // the peer needs this information for decoding the next BF
518   // this behavior can be modified at will later on.
519   op->spec->salt++;
520
521   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
522   op->state->remote_bf = NULL;
523
524   peer_elements = ntohl(msg->sender_element_count);
525   if ((op->state->phase == PHASE_MAYBE_FINISHED)
526        && (old_elements == op->state->my_element_count)
527        && (op->state->my_element_count == peer_elements)){
528     // In the last round we though we were finished, we now know this is correct
529     send_peer_done(op);
530     return;
531   }
532
533   op->state->phase = PHASE_BF_EXCHANGE;
534   // maybe we are finished, but we do one more round to make certain
535   // we don't have false positives ...
536   if (op->state->my_element_count == peer_elements)
537       op->state->phase = PHASE_MAYBE_FINISHED;
538
539   send_bloomfilter (op);
540 }
541
542
543 /**
544  * Handle an BF message from a remote peer.
545  *
546  * @param cls the intersection operation
547  * @param mh the header of the message
548  */
549 static void
550 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
551 {
552   struct Operation *op = cls;
553   struct BFMessage *msg = (struct BFMessage *) mh;
554
555   op->spec->remote_element_count = ntohl(msg->sender_element_count);
556   if ((op->state->phase != PHASE_INITIAL)
557       || (op->state->my_element_count > op->spec->remote_element_count)){
558     GNUNET_break_op (0);
559     fail_intersection_operation(op);
560   }
561
562   op->state->phase = PHASE_BF_EXCHANGE;
563   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
564
565   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
566                                                            BLOOMFILTER_SIZE,
567                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
568   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
569                                          &iterator_initialization,
570                                          op);
571
572   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
573   op->state->remote_bf = NULL;
574
575   if (op->state->my_element_count == ntohl (msg->sender_element_count))
576     op->state->phase = PHASE_MAYBE_FINISHED;
577
578   send_bloomfilter (op);
579 }
580
581
582 /**
583  * Send our element to the peer, in case our element count is lower than his
584  *
585  * @param eo intersection operation
586  */
587 static void
588 send_element_count (struct Operation *op)
589 {
590   struct GNUNET_MQ_Envelope *ev;
591   struct BFMessage *msg;
592
593   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
594
595   // just send our element count, as the other peer must start
596   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
597   msg->reserved = 0;
598   msg->sender_element_count = htonl (op->state->my_element_count);
599   msg->bloomfilter_length = htonl (0);
600   msg->sender_mutator = htonl (0);
601
602   GNUNET_MQ_send (op->mq, ev);
603 }
604
605
606 /**
607  * Send a result message to the client indicating
608  * that the operation is over.
609  * After the result done message has been sent to the client,
610  * destroy the evaluate operation.
611  *
612  * @param op intersection operation
613  */
614 static void
615 finish_and_destroy (struct Operation *op)
616 {
617   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
618
619   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
620   {
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
622     op->state->full_result_iter =
623         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
624     send_remaining_elements (op);
625     return;
626   }
627   send_client_done_and_destroy (op);
628 }
629
630
631 /**
632  * Handle a done message from a remote peer
633  *
634  * @param cls the union operation
635  * @param mh the message
636  */
637 static void
638 handle_p2p_done (void *cls,
639                  const struct GNUNET_MessageHeader *mh)
640 {
641   struct Operation *op = cls;
642
643   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
644     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
645
646     finish_and_destroy (op);
647     return;
648   }
649
650   GNUNET_break_op (0);
651   fail_intersection_operation (op);
652 }
653
654
655 /**
656  * Evaluate a union operation with
657  * a remote peer.
658  *
659  * @param op operation to evaluate
660  */
661 static void
662 intersection_evaluate (struct Operation *op)
663 {
664   op->state = GNUNET_new (struct OperationState);
665   /* we started the operation, thus we have to send the operation request */
666   op->state->phase = PHASE_INITIAL;
667   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
668   op->state->my_element_count = op->spec->set->state->current_set_element_count;
669
670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671               "evaluating intersection operation");
672   send_operation_request (op);
673 }
674
675
676 /**
677  * Accept an union operation request from a remote peer.
678  * Only initializes the private operation state.
679  *
680  * @param op operation that will be accepted as a union operation
681  */
682 static void
683 intersection_accept (struct Operation *op)
684 {
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
686   op->state = GNUNET_new (struct OperationState);
687   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
688   op->state->my_element_count = op->spec->set->state->current_set_element_count;
689
690   // if Alice (the peer) has more elements than Bob (us), she should start
691   if (op->spec->remote_element_count < op->state->my_element_count){
692     op->state->phase = PHASE_INITIAL;
693     send_element_count(op);
694     return;
695   }
696   // create a new bloomfilter in case we have fewer elements
697   op->state->phase = PHASE_BF_EXCHANGE;
698   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
699                                                            BLOOMFILTER_SIZE,
700                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
701   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
702                                          &iterator_initialization,
703                                          op);
704   send_bloomfilter (op);
705 }
706
707
708 /**
709  * Create a new set supporting the intersection operation
710  *
711  * @return the newly created set
712  */
713 static struct SetState *
714 intersection_set_create ()
715 {
716   struct SetState *set_state;
717
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "intersection set created\n");
720   set_state = GNUNET_new (struct SetState);
721   set_state->current_set_element_count = 0;
722
723   return set_state;
724 }
725
726
727 /**
728  * Add the element from the given element message to the set.
729  *
730  * @param set_state state of the set want to add to
731  * @param ee the element to add to the set
732  */
733 static void
734 intersection_add (struct SetState *set_state,
735                   struct ElementEntry *ee)
736 {
737   GNUNET_assert(0 < set_state->current_set_element_count);
738   set_state->current_set_element_count++;
739 }
740
741
742 /**
743  * Destroy a set that supports the intersection operation
744  *
745  * @param set_state the set to destroy
746  */
747 static void
748 intersection_set_destroy (struct SetState *set_state)
749 {
750   GNUNET_free (set_state);
751 }
752
753
754 /**
755  * Remove the element given in the element message from the set.
756  *
757  * @param set_state state of the set to remove from
758  * @param element set element to remove
759  */
760 static void
761 intersection_remove (struct SetState *set_state,
762                      struct ElementEntry *element)
763 {
764   GNUNET_assert(0 < set_state->current_set_element_count);
765   set_state->current_set_element_count--;
766 }
767
768
769 /**
770  * Dispatch messages for a intersection operation.
771  *
772  * @param eo the state of the intersection evaluate operation
773  * @param mh the received message
774  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
775  *         #GNUNET_OK otherwise
776  */
777 static int
778 intersection_handle_p2p_message (struct Operation *op,
779                                  const struct GNUNET_MessageHeader *mh)
780 {
781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
782               ntohs (mh->type), ntohs (mh->size));
783   switch (ntohs (mh->type))
784   {
785     /* this message handler is not active until after we received an
786      * operation request message, thus the ops request is not handled here
787      */
788   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
789     handle_p2p_element_info (op, mh);
790     break;
791   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
792     handle_p2p_bf (op, mh);
793     break;
794   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
795     handle_p2p_done (op, mh);
796     break;
797   default:
798     /* something wrong with mesh's message handlers? */
799     GNUNET_assert (0);
800   }
801   return GNUNET_OK;
802 }
803
804
805 /**
806  * handler for peer-disconnects, notifies the client about the aborted operation
807  *
808  * @param op the destroyed operation
809  */
810 static void
811 intersection_peer_disconnect (struct Operation *op)
812 {
813   if (PHASE_FINISHED != op->state->phase)
814   {
815     struct GNUNET_MQ_Envelope *ev;
816     struct GNUNET_SET_ResultMessage *msg;
817
818     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
819     msg->request_id = htonl (op->spec->client_request_id);
820     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
821     msg->element_type = htons (0);
822     GNUNET_MQ_send (op->spec->set->client_mq, ev);
823     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
824     _GSS_operation_destroy (op);
825     return;
826   }
827   // else: the session has already been concluded
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
829   if (GNUNET_NO == op->state->client_done_sent)
830     finish_and_destroy (op);
831 }
832
833
834 /**
835  * Destroy the union operation.  Only things specific to the union operation are destroyed.
836  *
837  * @param op union operation to destroy
838  */
839 static void
840 intersection_op_cancel (struct Operation *op)
841 {
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
843   /* check if the op was canceled twice */
844   GNUNET_assert (NULL != op->state);
845   if (NULL != op->state->remote_bf)
846   {
847     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
848     op->state->remote_bf = NULL;
849   }
850   if (NULL != op->state->local_bf)
851   {
852     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
853     op->state->local_bf = NULL;
854   }
855   if (NULL != op->state->my_elements)
856   {
857     // no need to free the elements, they are still part of the set
858     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
859     op->state->my_elements = NULL;
860   }
861   GNUNET_free (op->state);
862   op->state = NULL;
863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
864 }
865
866 const struct SetVT *
867 _GSS_intersection_vt ()
868 {
869   static const struct SetVT intersection_vt = {
870     .create = &intersection_set_create,
871     .msg_handler = &intersection_handle_p2p_message,
872     .add = &intersection_add,
873     .remove = &intersection_remove,
874     .destroy_set = &intersection_set_destroy,
875     .evaluate = &intersection_evaluate,
876     .accept = &intersection_accept,
877     .peer_disconnect = &intersection_peer_disconnect,
878     .cancel = &intersection_op_cancel,
879   };
880
881   return &intersection_vt;
882 }