-do not run tests if disabled
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "gnunet_block_lib.h"
29 #include "set_protocol.h"
30 #include <gcrypt.h>
31
32 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
33
34 #define CALCULATE_BF_SIZE(A, B, s, k) \
35                           do { \
36                             k = ceil(1 + log2((double) (2*B / (double) A)));\
37                             if (k<1) k=1; /* k can be calculated as 0 */\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation
73  * with another peer.
74  */
75 struct OperationState
76 {
77   /**
78    * The bf we currently receive
79    */
80   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
81
82   /**
83    * BF of the set's element.
84    */
85   struct GNUNET_CONTAINER_BloomFilter *local_bf;
86
87   /**
88    * Iterator for sending elements on the key to element mapping to the client.
89    */
90   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
91
92   /**
93    * Evaluate operations are held in a linked list.
94    */
95   struct OperationState *next;
96
97   /**
98    * Evaluate operations are held in a linked list.
99    */
100   struct OperationState *prev;
101
102   /**
103    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
104    */
105   char *bf_data;
106
107   /**
108    * Maps element-id-hashes to 'elements in our set'.
109    */
110   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
111
112   /**
113    * Current element count contained within @e my_elements
114    */
115   uint32_t my_element_count;
116
117   /**
118    * size of the bloomfilter in @e bf_data.
119    */
120   uint32_t bf_data_size;
121
122   /**
123    * size of the bloomfilter
124    */
125   uint32_t bf_bits_per_element;
126
127   /**
128    * Current state of the operation.
129    */
130   enum IntersectionOperationPhase phase;
131
132   /**
133    * Generation in which the operation handle
134    * was created.
135    */
136   unsigned int generation_created;
137
138   /**
139    * Did we send the client that we are done?
140    */
141   int client_done_sent;
142 };
143
144
145 /**
146  * Extra state required for efficient set intersection.
147  */
148 struct SetState
149 {
150   /**
151    * Number of currently valid elements in the set which have not been removed
152    */
153   uint32_t current_set_element_count;
154 };
155
156
157 /**
158  * Send a result message to the client indicating
159  * we removed an element
160  *
161  * @param op union operation
162  * @param element element to send
163  */
164 static void
165 send_client_element (struct Operation *op,
166                      struct GNUNET_SET_Element *element)
167 {
168   struct GNUNET_MQ_Envelope *ev;
169   struct GNUNET_SET_ResultMessage *rm;
170
171   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
172               "sending removed element (size %u) to client\n",
173               element->size);
174   GNUNET_assert (0 != op->spec->client_request_id);
175   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
176   if (NULL == ev)
177   {
178     GNUNET_MQ_discard (ev);
179     GNUNET_break (0);
180     return;
181   }
182   rm->result_status = htons (GNUNET_SET_STATUS_OK);
183   rm->request_id = htonl (op->spec->client_request_id);
184   rm->element_type = element->type;
185   memcpy (&rm[1], element->data, element->size);
186   GNUNET_MQ_send (op->spec->set->client_mq, ev);
187 }
188
189
190 /**
191  * Alice's version:
192  *
193  * fills the contained-elements hashmap with all relevant
194  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
195  *
196  * @param cls closure
197  * @param key current key code
198  * @param value value in the hash map
199  * @return #GNUNET_YES if we should continue to
200  *         iterate,
201  *         #GNUNET_NO if not.
202  */
203 static int
204 iterator_initialization_by_alice (void *cls,
205                                   const struct GNUNET_HashCode *key,
206                                   void *value)
207 {
208   struct ElementEntry *ee = value;
209   struct Operation *op = cls;
210   struct GNUNET_HashCode mutated_hash;
211
212   //only consider this element, if it is valid for us
213   if ((op->generation_created < ee->generation_removed)
214        && (op->generation_created >= ee->generation_added))
215     return GNUNET_YES;
216
217   // not contained according to bob's bloomfilter
218   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
219                            op->spec->salt,
220                            &mutated_hash);
221   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
222                                                       &mutated_hash)){
223     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
224       send_client_element (op, &ee->element);
225     return GNUNET_YES;
226   }
227
228   op->state->my_element_count++;
229   GNUNET_assert (GNUNET_YES ==
230                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
231                                                     &ee->element_hash, ee,
232                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
233
234   return GNUNET_YES;
235 }
236
237
238 /**
239  * fills the contained-elements hashmap with all relevant
240  * elements and adds their mutated hashes to our local bloomfilter
241  *
242  * @param cls closure
243  * @param key current key code
244  * @param value value in the hash map
245  * @return #GNUNET_YES if we should continue to
246  *         iterate,
247  *         #GNUNET_NO if not.
248  */
249 static int
250 iterator_initialization (void *cls,
251                          const struct GNUNET_HashCode *key,
252                          void *value)
253 {
254   struct ElementEntry *ee = value;
255   struct Operation *op = cls;
256
257   //only consider this element, if it is valid for us
258   if ((op->generation_created < ee->generation_removed)
259        && (op->generation_created >= ee->generation_added))
260     return GNUNET_YES;
261
262   GNUNET_assert (GNUNET_YES ==
263                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
264                                                     &ee->element_hash, ee,
265                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
266   return GNUNET_YES;
267 }
268
269
270 /**
271  * removes element from a hashmap if it is not contained within the
272  * provided remote bloomfilter. Then, fill our new bloomfilter.
273  *
274  * @param cls closure
275  * @param key current key code
276  * @param value value in the hash map
277  * @return #GNUNET_YES if we should continue to
278  *         iterate,
279  *         #GNUNET_NO if not.
280  */
281 static int
282 iterator_bf_reduce (void *cls,
283                    const struct GNUNET_HashCode *key,
284                    void *value)
285 {
286   struct ElementEntry *ee = value;
287   struct Operation *op = cls;
288   struct GNUNET_HashCode mutated_hash;
289
290   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
291
292   if (GNUNET_NO ==
293       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
294                                          &mutated_hash))
295   {
296     op->state->my_element_count--;
297     GNUNET_assert (GNUNET_YES ==
298                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
299                                                          &ee->element_hash,
300                                                          ee));
301     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
302       send_client_element (op, &ee->element);
303   }
304
305   return GNUNET_YES;
306 }
307
308
309 /**
310  * Create a bloomfilter based on the elements given
311  *
312  * @param cls closure
313  * @param key current key code
314  * @param value value in the hash map
315  * @return #GNUNET_YES if we should continue to
316  *         iterate,
317  *         #GNUNET_NO if not.
318  */
319 static int
320 iterator_bf_create (void *cls,
321                     const struct GNUNET_HashCode *key,
322                     void *value)
323 {
324   struct ElementEntry *ee = value;
325   struct Operation *op = cls;
326   struct GNUNET_HashCode mutated_hash;
327
328   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
329                             op->spec->salt,
330                             &mutated_hash);
331   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
332                                     &mutated_hash);
333   return GNUNET_YES;
334 }
335
336
337 /**
338  * Inform the client that the union operation has failed,
339  * and proceed to destroy the evaluate operation.
340  *
341  * @param op the intersection operation to fail
342  */
343 static void
344 fail_intersection_operation (struct Operation *op)
345 {
346   struct GNUNET_MQ_Envelope *ev;
347   struct GNUNET_SET_ResultMessage *msg;
348
349   if (op->state->my_elements)
350   {
351     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
352     op->state->my_elements = NULL;
353   }
354   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
355               "intersection operation failed\n");
356
357   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
358   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
359   msg->request_id = htonl (op->spec->client_request_id);
360   msg->element_type = htons (0);
361   GNUNET_MQ_send (op->spec->set->client_mq, ev);
362   _GSS_operation_destroy (op, GNUNET_YES);
363 }
364
365
366 /**
367  * Send a request for the evaluate operation to a remote peer
368  *
369  * @param op operation with the other peer
370  */
371 static void
372 send_operation_request (struct Operation *op)
373 {
374   struct GNUNET_MQ_Envelope *ev;
375   struct OperationRequestMessage *msg;
376
377   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
378                                 op->spec->context_msg);
379
380   if (NULL == ev)
381   {
382     /* the context message is too large */
383     GNUNET_break (0);
384     GNUNET_SERVER_client_disconnect (op->spec->set->client);
385     return;
386   }
387   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
388   msg->app_id = op->spec->app_id;
389   msg->salt = htonl (op->spec->salt);
390   msg->element_count = htonl(op->state->my_element_count);
391
392   GNUNET_MQ_send (op->mq, ev);
393
394   if (NULL != op->spec->context_msg)
395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396                 "sent op request with context message\n");
397   else
398     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399                 "sent op request without context message\n");
400
401   if (NULL != op->spec->context_msg)
402   {
403     GNUNET_free (op->spec->context_msg);
404     op->spec->context_msg = NULL;
405   }
406 }
407
408
409 static void
410 send_bloomfilter_multipart (struct Operation *op,
411                             uint32_t offset)
412 {
413   struct GNUNET_MQ_Envelope *ev;
414   struct BFPart *msg;
415   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
416   uint32_t todo_size = op->state->bf_data_size - offset;
417
418   if (todo_size < chunk_size)
419     chunk_size = todo_size;
420
421   ev = GNUNET_MQ_msg_extra (msg,
422                             chunk_size,
423                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
424
425   msg->chunk_length = htonl (chunk_size);
426   msg->chunk_offset = htonl (offset);
427   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
428
429   GNUNET_MQ_send (op->mq, ev);
430
431   if (op->state->bf_data_size == offset + chunk_size)
432   {
433     // done
434     GNUNET_free(op->state->bf_data);
435     op->state->bf_data = NULL;
436     return;
437   }
438   send_bloomfilter_multipart (op, offset + chunk_size);
439 }
440
441
442 /**
443  * Send a bloomfilter to our peer.
444  * that the operation is over.
445  * After the result done message has been sent to the client,
446  * destroy the evaluate operation.
447  *
448  * @param op intersection operation
449  */
450 static void
451 send_bloomfilter (struct Operation *op)
452 {
453   struct GNUNET_MQ_Envelope *ev;
454   struct BFMessage *msg;
455   uint32_t bf_size;
456   uint32_t bf_elementbits;
457   uint32_t chunk_size;
458   struct GNUNET_CONTAINER_BloomFilter * local_bf;
459
460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
461               "sending bf of size %u\n");
462
463   CALCULATE_BF_SIZE(op->state->my_element_count,
464                     op->spec->remote_element_count,
465                     bf_size,
466                     bf_elementbits);
467
468   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
469                                                 bf_size,
470                                                 bf_elementbits);
471
472   op->spec->salt++;
473   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
474                                          &iterator_bf_create,
475                                          op);
476
477   // send our bloomfilter
478   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage))
479   {
480     // singlepart
481     chunk_size = bf_size;
482     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
483     GNUNET_assert (GNUNET_SYSERR !=
484                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
485                                                               (char*)&msg[1],
486                                                               bf_size));
487   }
488   else
489   {
490     //multipart
491     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
492     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
493     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
494     GNUNET_assert (GNUNET_SYSERR !=
495                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
496                                                               op->state->bf_data,
497                                                               bf_size));
498     memcpy (&msg[1], op->state->bf_data, chunk_size);
499     op->state->bf_data_size = bf_size;
500   }
501   GNUNET_CONTAINER_bloomfilter_free (local_bf);
502
503   msg->sender_element_count = htonl (op->state->my_element_count);
504   msg->bloomfilter_total_length = htonl (bf_size);
505   msg->bloomfilter_length = htonl (chunk_size);
506   msg->bits_per_element = htonl (bf_elementbits);
507   msg->sender_mutator = htonl (op->spec->salt);
508
509   GNUNET_MQ_send (op->mq, ev);
510
511   if (op->state->bf_data)
512     send_bloomfilter_multipart (op, chunk_size);
513 }
514
515
516 /**
517  * Signal to the client that the operation has finished and
518  * destroy the operation.
519  *
520  * @param cls operation to destroy
521  */
522 static void
523 send_client_done_and_destroy (void *cls)
524 {
525   struct Operation *op = cls;
526   struct GNUNET_MQ_Envelope *ev;
527   struct GNUNET_SET_ResultMessage *rm;
528
529   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
530   rm->request_id = htonl (op->spec->client_request_id);
531   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
532   rm->element_type = htons (0);
533   GNUNET_MQ_send (op->spec->set->client_mq, ev);
534   _GSS_operation_destroy (op, GNUNET_YES);
535 }
536
537
538 /**
539  * Send all elements in the full result iterator.
540  *
541  * @param cls operation
542  */
543 static void
544 send_remaining_elements (void *cls)
545 {
546   struct Operation *op = cls;
547   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
548   struct GNUNET_MQ_Envelope *ev;
549   struct GNUNET_SET_ResultMessage *rm;
550   struct GNUNET_SET_Element *element;
551   int res;
552
553   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
554                                                      NULL,
555                                                      (const void **) &remaining);
556   if (GNUNET_NO == res)
557   {
558     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559                 "sending done and destroy because iterator ran out\n");
560     send_client_done_and_destroy (op);
561     return;
562   }
563
564   element = &remaining->element;
565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566               "sending element (size %u) to client (full set)\n",
567               element->size);
568   GNUNET_assert (0 != op->spec->client_request_id);
569
570   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
571   GNUNET_assert (NULL != ev);
572
573   rm->result_status = htons (GNUNET_SET_STATUS_OK);
574   rm->request_id = htonl (op->spec->client_request_id);
575   rm->element_type = element->type;
576   memcpy (&rm[1], element->data, element->size);
577
578   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
579   GNUNET_MQ_send (op->spec->set->client_mq, ev);
580 }
581
582
583 /**
584  * Inform the peer that this operation is complete.
585  *
586  * @param op the intersection operation to fail
587  */
588 static void
589 send_peer_done (struct Operation *op)
590 {
591   struct GNUNET_MQ_Envelope *ev;
592
593   op->state->phase = PHASE_FINISHED;
594   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595               "Intersection succeeded, sending DONE\n");
596   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
597   op->state->local_bf = NULL;
598
599   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
600   GNUNET_MQ_send (op->mq, ev);
601 }
602
603
604 /**
605  * Process a Bloomfilter once we got all the chunks
606  *
607  * @param op the intersection operation
608  */
609 static void
610 process_bf (struct Operation *op)
611 {
612   uint32_t old_elements;
613   uint32_t peer_elements;
614
615   old_elements = op->state->my_element_count;
616   peer_elements = op->spec->remote_element_count;
617   switch (op->state->phase)
618   {
619   case PHASE_INITIAL:
620     // If we are ot our first msg
621     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
622
623     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
624                                            &iterator_initialization_by_alice,
625                                            op);
626     break;
627   case PHASE_BF_EXCHANGE:
628   case PHASE_MAYBE_FINISHED:
629     // if we are bob or alice and are continuing operation
630     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
631                                            &iterator_bf_reduce,
632                                            op);
633     break;
634   default:
635     GNUNET_break_op (0);
636     fail_intersection_operation(op);
637   }
638   // the iterators created a new BF with salt+1
639   // the peer needs this information for decoding the next BF
640   // this behavior can be modified at will later on.
641   op->spec->salt++;
642
643   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
644   op->state->remote_bf = NULL;
645
646   if ((0 == op->state->my_element_count) // fully disjoint
647       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
648           && (old_elements == op->state->my_element_count)
649           && (op->state->my_element_count == peer_elements)))
650   {
651     // In the last round we though we were finished, we now know this is correct
652     send_peer_done (op);
653     return;
654   }
655
656   op->state->phase = PHASE_BF_EXCHANGE;
657   if (op->state->my_element_count == peer_elements)
658     // maybe we are finished, but we do one more round to make certain
659     // we don't have false positives ...
660     op->state->phase = PHASE_MAYBE_FINISHED;
661
662   send_bloomfilter (op);
663 }
664
665
666 /**
667  * Handle an BF multipart message from a remote peer.
668  *
669  * @param cls the intersection operation
670  * @param mh the header of the message
671  */
672 static void
673 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
674 {
675   struct Operation *op = cls;
676   const struct BFPart *msg = (const struct BFPart *) mh;
677   uint32_t chunk_size;
678   uint32_t chunk_offset;
679
680   chunk_size = ntohl(msg->chunk_length);
681   chunk_offset = ntohl(msg->chunk_offset);
682
683   if ((NULL == op->state->bf_data)
684        || (op->state->bf_data_size < chunk_size + chunk_offset))
685   {
686     // unexpected multipart chunk
687     GNUNET_break_op (0);
688     fail_intersection_operation(op);
689     return;
690   }
691
692   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
693
694   if (op->state->bf_data_size != chunk_offset + chunk_size)
695     // wait for next chunk
696     return;
697
698   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
699                                                             op->state->bf_data_size,
700                                                             op->state->bf_bits_per_element);
701
702   GNUNET_free (op->state->bf_data);
703   op->state->bf_data = NULL;
704
705   process_bf (op);
706 }
707
708
709 /**
710  * Handle an BF message from a remote peer.
711  *
712  * @param cls the intersection operation
713  * @param mh the header of the message
714  */
715 static void
716 handle_p2p_bf (void *cls,
717                const struct GNUNET_MessageHeader *mh)
718 {
719   struct Operation *op = cls;
720   const struct BFMessage *msg = (const struct BFMessage *) mh;
721   uint32_t bf_size;
722   uint32_t chunk_size;
723   uint32_t bf_bits_per_element;
724
725   switch (op->state->phase)
726   {
727   case PHASE_INITIAL:
728   case PHASE_BF_EXCHANGE:
729   case PHASE_MAYBE_FINISHED:
730     if (NULL == op->state->bf_data)
731     {
732       // no colliding multipart transaction going on currently
733       op->spec->salt = ntohl (msg->sender_mutator);
734       bf_size = ntohl (msg->bloomfilter_total_length);
735       bf_bits_per_element = ntohl (msg->bits_per_element);
736       chunk_size = ntohl (msg->bloomfilter_length);
737       op->spec->remote_element_count = ntohl(msg->sender_element_count);
738       if (bf_size == chunk_size)
739       {
740         // single part, done here
741         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
742                                                                   bf_size,
743                                                                   bf_bits_per_element);
744         process_bf (op);
745         return;
746       }
747
748       //first multipart chunk
749       op->state->bf_data = GNUNET_malloc (bf_size);
750       op->state->bf_data_size = bf_size;
751       op->state->bf_bits_per_element = bf_bits_per_element;
752       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
753       return;
754     }
755   default:
756     GNUNET_break_op (0);
757     fail_intersection_operation (op);
758   }
759 }
760
761
762 /**
763  * Handle an BF message from a remote peer.
764  *
765  * @param cls the intersection operation
766  * @param mh the header of the message
767  */
768 static void
769 handle_p2p_element_info (void *cls,
770                          const struct GNUNET_MessageHeader *mh)
771 {
772   struct Operation *op = cls;
773   const struct BFMessage *msg = (const struct BFMessage *) mh;
774
775   op->spec->remote_element_count = ntohl(msg->sender_element_count);
776   if ((op->state->phase != PHASE_INITIAL)
777       || (op->state->my_element_count > op->spec->remote_element_count)
778           || (0 == op->state->my_element_count)
779               || (0 == op->spec->remote_element_count))
780   {
781     GNUNET_break_op (0);
782     fail_intersection_operation(op);
783     return;
784   }
785
786   op->state->phase = PHASE_BF_EXCHANGE;
787   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
788
789   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
790                                          &iterator_initialization,
791                                          op);
792
793   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
794   op->state->remote_bf = NULL;
795
796   if (op->state->my_element_count == ntohl (msg->sender_element_count))
797     op->state->phase = PHASE_MAYBE_FINISHED;
798
799   send_bloomfilter (op);
800 }
801
802
803 /**
804  * Send our element count to the peer, in case our element count is lower than his
805  *
806  * @param op intersection operation
807  */
808 static void
809 send_element_count (struct Operation *op)
810 {
811   struct GNUNET_MQ_Envelope *ev;
812   struct BFMessage *msg;
813
814   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815               "sending element count (bf_msg)\n");
816
817   // just send our element count, as the other peer must start
818   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
819   msg->sender_element_count = htonl (op->state->my_element_count);
820   msg->bloomfilter_length = htonl (0);
821   msg->sender_mutator = htonl (0);
822
823   GNUNET_MQ_send (op->mq, ev);
824 }
825
826
827 /**
828  * Send a result message to the client indicating
829  * that the operation is over.
830  * After the result done message has been sent to the client,
831  * destroy the evaluate operation.
832  *
833  * @param op intersection operation
834  */
835 static void
836 finish_and_destroy (struct Operation *op)
837 {
838   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
839
840   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
841   {
842     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843                 "sending full result set\n");
844     op->state->full_result_iter =
845         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
846     send_remaining_elements (op);
847     return;
848   }
849   send_client_done_and_destroy (op);
850 }
851
852
853 /**
854  * Handle a done message from a remote peer
855  *
856  * @param cls the union operation
857  * @param mh the message
858  */
859 static void
860 handle_p2p_done (void *cls,
861                  const struct GNUNET_MessageHeader *mh)
862 {
863   struct Operation *op = cls;
864
865   if ( (op->state->phase = PHASE_FINISHED) ||
866        (op->state->phase = PHASE_MAYBE_FINISHED) )
867   {
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                 "got final DONE\n");
870
871     finish_and_destroy (op);
872     return;
873   }
874
875   GNUNET_break_op (0);
876   fail_intersection_operation (op);
877 }
878
879
880 /**
881  * Evaluate a union operation with
882  * a remote peer.
883  *
884  * @param op operation to evaluate
885  */
886 static void
887 intersection_evaluate (struct Operation *op)
888 {
889   op->state = GNUNET_new (struct OperationState);
890   /* we started the operation, thus we have to send the operation request */
891   op->state->phase = PHASE_INITIAL;
892   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
893   op->state->my_element_count = op->spec->set->state->current_set_element_count;
894
895   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896               "evaluating intersection operation");
897   send_operation_request (op);
898 }
899
900
901 /**
902  * Accept an union operation request from a remote peer.
903  * Only initializes the private operation state.
904  *
905  * @param op operation that will be accepted as a union operation
906  */
907 static void
908 intersection_accept (struct Operation *op)
909 {
910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911               "accepting set union operation\n");
912   op->state = GNUNET_new (struct OperationState);
913   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
914   op->state->my_element_count = op->spec->set->state->current_set_element_count;
915
916   // if Alice (the peer) has more elements than Bob (us), she should start
917   if (op->spec->remote_element_count < op->state->my_element_count){
918     op->state->phase = PHASE_INITIAL;
919     send_element_count(op);
920     return;
921   }
922   // create a new bloomfilter in case we have fewer elements
923   op->state->phase = PHASE_BF_EXCHANGE;
924   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
925                                                            BLOOMFILTER_SIZE,
926                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
927   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
928                                          &iterator_initialization,
929                                          op);
930   send_bloomfilter (op);
931 }
932
933
934 /**
935  * Create a new set supporting the intersection operation
936  *
937  * @return the newly created set
938  */
939 static struct SetState *
940 intersection_set_create ()
941 {
942   struct SetState *set_state;
943
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "intersection set created\n");
946   set_state = GNUNET_new (struct SetState);
947   set_state->current_set_element_count = 0;
948
949   return set_state;
950 }
951
952
953 /**
954  * Add the element from the given element message to the set.
955  *
956  * @param set_state state of the set want to add to
957  * @param ee the element to add to the set
958  */
959 static void
960 intersection_add (struct SetState *set_state,
961                   struct ElementEntry *ee)
962 {
963   set_state->current_set_element_count++;
964 }
965
966
967 /**
968  * Destroy a set that supports the intersection operation
969  *
970  * @param set_state the set to destroy
971  */
972 static void
973 intersection_set_destroy (struct SetState *set_state)
974 {
975   GNUNET_free (set_state);
976 }
977
978
979 /**
980  * Remove the element given in the element message from the set.
981  *
982  * @param set_state state of the set to remove from
983  * @param element set element to remove
984  */
985 static void
986 intersection_remove (struct SetState *set_state,
987                      struct ElementEntry *element)
988 {
989   GNUNET_assert(0 < set_state->current_set_element_count);
990   set_state->current_set_element_count--;
991 }
992
993
994 /**
995  * Dispatch messages for a intersection operation.
996  *
997  * @param op the state of the intersection evaluate operation
998  * @param mh the received message
999  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1000  *         #GNUNET_OK otherwise
1001  */
1002 static int
1003 intersection_handle_p2p_message (struct Operation *op,
1004                                  const struct GNUNET_MessageHeader *mh)
1005 {
1006   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1007               "received p2p message (t: %u, s: %u)\n",
1008               ntohs (mh->type), ntohs (mh->size));
1009   switch (ntohs (mh->type))
1010   {
1011     /* this message handler is not active until after we received an
1012      * operation request message, thus the ops request is not handled here
1013      */
1014   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1015     handle_p2p_element_info (op, mh);
1016     break;
1017   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1018     handle_p2p_bf (op, mh);
1019     break;
1020   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
1021     handle_p2p_bf_part (op, mh);
1022     break;
1023   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1024     handle_p2p_done (op, mh);
1025     break;
1026   default:
1027     /* something wrong with cadet's message handlers? */
1028     GNUNET_assert (0);
1029   }
1030   return GNUNET_OK;
1031 }
1032
1033
1034 /**
1035  * handler for peer-disconnects, notifies the client about the aborted operation
1036  *
1037  * @param op the destroyed operation
1038  */
1039 static void
1040 intersection_peer_disconnect (struct Operation *op)
1041 {
1042   if (PHASE_FINISHED != op->state->phase)
1043   {
1044     struct GNUNET_MQ_Envelope *ev;
1045     struct GNUNET_SET_ResultMessage *msg;
1046
1047     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1048     msg->request_id = htonl (op->spec->client_request_id);
1049     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1050     msg->element_type = htons (0);
1051     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1052     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1053                 "other peer disconnected prematurely\n");
1054     _GSS_operation_destroy (op, GNUNET_YES);
1055     return;
1056   }
1057   // else: the session has already been concluded
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "other peer disconnected (finished)\n");
1060   if (GNUNET_NO == op->state->client_done_sent)
1061     finish_and_destroy (op);
1062 }
1063
1064
1065 /**
1066  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1067  *
1068  * @param op union operation to destroy
1069  */
1070 static void
1071 intersection_op_cancel (struct Operation *op)
1072 {
1073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074               "destroying intersection op\n");
1075   /* check if the op was canceled twice */
1076   GNUNET_assert (NULL != op->state);
1077   if (NULL != op->state->remote_bf)
1078   {
1079     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1080     op->state->remote_bf = NULL;
1081   }
1082   if (NULL != op->state->local_bf)
1083   {
1084     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1085     op->state->local_bf = NULL;
1086   }
1087 /*  if (NULL != op->state->my_elements)
1088   {
1089     // no need to free the elements, they are still part of the set
1090     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1091     op->state->my_elements = NULL;
1092   }*/
1093   GNUNET_free (op->state);
1094   op->state = NULL;
1095   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096               "destroying intersection op done\n");
1097 }
1098
1099
1100 /**
1101  * Get the table with implementing functions for set intersection.
1102  *
1103  * @return the operation specific VTable
1104  */
1105 const struct SetVT *
1106 _GSS_intersection_vt ()
1107 {
1108   static const struct SetVT intersection_vt = {
1109     .create = &intersection_set_create,
1110     .msg_handler = &intersection_handle_p2p_message,
1111     .add = &intersection_add,
1112     .remove = &intersection_remove,
1113     .destroy_set = &intersection_set_destroy,
1114     .evaluate = &intersection_evaluate,
1115     .accept = &intersection_accept,
1116     .peer_disconnect = &intersection_peer_disconnect,
1117     .cancel = &intersection_op_cancel,
1118   };
1119
1120   return &intersection_vt;
1121 }