remove additional variant of operation state
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "gnunet_block_lib.h"
29 #include "set_protocol.h"
30 #include <gcrypt.h>
31
32 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
33
34 #define CALCULATE_BF_SIZE(A, B, s, k) \
35                           do { \
36                             k = ceil(1 + log2((double) (2*B / (double) A)));\
37                             if (k<1) k=1; /* k can be calculated as 0 */\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation with another peer.
73  */
74 struct OperationState
75 {
76   /**
77    * The bf we currently receive
78    */
79   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
80
81   /**
82    * BF of the set's element.
83    */
84   struct GNUNET_CONTAINER_BloomFilter *local_bf;
85
86   /**
87    * Iterator for sending elements on the key to element mapping to the client.
88    */
89   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
90
91   /**
92    * Evaluate operations are held in a linked list.
93    */
94   struct OperationState *next;
95
96   /**
97    * Evaluate operations are held in a linked list.
98    */
99   struct OperationState *prev;
100
101   /**
102    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
103    */
104   char *bf_data;
105
106   /**
107    * Maps element-id-hashes to 'elements in our set'.
108    */
109   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
110
111   /**
112    * Current element count contained within @e my_elements
113    */
114   uint32_t my_element_count;
115
116   /**
117    * size of the bloomfilter in @e bf_data.
118    */
119   uint32_t bf_data_size;
120
121   /**
122    * size of the bloomfilter
123    */
124   uint32_t bf_bits_per_element;
125
126   /**
127    * Current state of the operation.
128    */
129   enum IntersectionOperationPhase phase;
130
131   /**
132    * Generation in which the operation handle
133    * was created.
134    */
135   unsigned int generation_created;
136
137   /**
138    * Did we send the client that we are done?
139    */
140   int client_done_sent;
141 };
142
143
144 /**
145  * Extra state required for efficient set intersection.
146  */
147 struct SetState
148 {
149   /**
150    * Number of currently valid elements in the set which have not been removed
151    */
152   uint32_t current_set_element_count;
153 };
154
155
156 /**
157  * Send a result message to the client indicating
158  * we removed an element
159  *
160  * @param op union operation
161  * @param element element to send
162  */
163 static void
164 send_client_element (struct Operation *op,
165                      struct GNUNET_SET_Element *element)
166 {
167   struct GNUNET_MQ_Envelope *ev;
168   struct GNUNET_SET_ResultMessage *rm;
169
170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171               "sending removed element (size %u) to client\n",
172               element->size);
173   GNUNET_assert (0 != op->spec->client_request_id);
174   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
175   if (NULL == ev)
176   {
177     GNUNET_MQ_discard (ev);
178     GNUNET_break (0);
179     return;
180   }
181   rm->result_status = htons (GNUNET_SET_STATUS_OK);
182   rm->request_id = htonl (op->spec->client_request_id);
183   rm->element_type = element->element_type;
184   memcpy (&rm[1], element->data, element->size);
185   GNUNET_MQ_send (op->spec->set->client_mq, ev);
186 }
187
188
189 /**
190  * Alice's version:
191  *
192  * fills the contained-elements hashmap with all relevant
193  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
194  *
195  * @param cls closure
196  * @param key current key code
197  * @param value value in the hash map
198  * @return #GNUNET_YES if we should continue to
199  *         iterate,
200  *         #GNUNET_NO if not.
201  */
202 static int
203 iterator_initialization_by_alice (void *cls,
204                                   const struct GNUNET_HashCode *key,
205                                   void *value)
206 {
207   struct ElementEntry *ee = value;
208   struct Operation *op = cls;
209   struct GNUNET_HashCode mutated_hash;
210
211   //only consider this element, if it is valid for us
212   if ((op->generation_created < ee->generation_removed)
213        && (op->generation_created >= ee->generation_added))
214     return GNUNET_YES;
215
216   // not contained according to bob's bloomfilter
217   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
218                            op->spec->salt,
219                            &mutated_hash);
220   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
221                                                       &mutated_hash)){
222     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
223       send_client_element (op, &ee->element);
224     return GNUNET_YES;
225   }
226
227   op->state->my_element_count++;
228   GNUNET_assert (GNUNET_YES ==
229                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
230                                                     &ee->element_hash, ee,
231                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
232
233   return GNUNET_YES;
234 }
235
236
237 /**
238  * fills the contained-elements hashmap with all relevant
239  * elements and adds their mutated hashes to our local bloomfilter
240  *
241  * @param cls closure
242  * @param key current key code
243  * @param value value in the hash map
244  * @return #GNUNET_YES if we should continue to
245  *         iterate,
246  *         #GNUNET_NO if not.
247  */
248 static int
249 iterator_initialization (void *cls,
250                          const struct GNUNET_HashCode *key,
251                          void *value)
252 {
253   struct ElementEntry *ee = value;
254   struct Operation *op = cls;
255
256   //only consider this element, if it is valid for us
257   if ((op->generation_created < ee->generation_removed)
258        && (op->generation_created >= ee->generation_added))
259     return GNUNET_YES;
260
261   GNUNET_assert (GNUNET_YES ==
262                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
263                                                     &ee->element_hash, ee,
264                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
265   return GNUNET_YES;
266 }
267
268
269 /**
270  * removes element from a hashmap if it is not contained within the
271  * provided remote bloomfilter. Then, fill our new bloomfilter.
272  *
273  * @param cls closure
274  * @param key current key code
275  * @param value value in the hash map
276  * @return #GNUNET_YES if we should continue to
277  *         iterate,
278  *         #GNUNET_NO if not.
279  */
280 static int
281 iterator_bf_reduce (void *cls,
282                    const struct GNUNET_HashCode *key,
283                    void *value)
284 {
285   struct ElementEntry *ee = value;
286   struct Operation *op = cls;
287   struct GNUNET_HashCode mutated_hash;
288
289   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
290
291   if (GNUNET_NO ==
292       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
293                                          &mutated_hash))
294   {
295     op->state->my_element_count--;
296     GNUNET_assert (GNUNET_YES ==
297                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
298                                                          &ee->element_hash,
299                                                          ee));
300     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
301       send_client_element (op, &ee->element);
302   }
303
304   return GNUNET_YES;
305 }
306
307
308 /**
309  * Create a bloomfilter based on the elements given
310  *
311  * @param cls closure
312  * @param key current key code
313  * @param value value in the hash map
314  * @return #GNUNET_YES if we should continue to
315  *         iterate,
316  *         #GNUNET_NO if not.
317  */
318 static int
319 iterator_bf_create (void *cls,
320                     const struct GNUNET_HashCode *key,
321                     void *value)
322 {
323   struct ElementEntry *ee = value;
324   struct Operation *op = cls;
325   struct GNUNET_HashCode mutated_hash;
326
327   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
328                             op->spec->salt,
329                             &mutated_hash);
330   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
331                                     &mutated_hash);
332   return GNUNET_YES;
333 }
334
335
336 /**
337  * Inform the client that the union operation has failed,
338  * and proceed to destroy the evaluate operation.
339  *
340  * @param op the intersection operation to fail
341  */
342 static void
343 fail_intersection_operation (struct Operation *op)
344 {
345   struct GNUNET_MQ_Envelope *ev;
346   struct GNUNET_SET_ResultMessage *msg;
347
348   if (op->state->my_elements)
349   {
350     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
351     op->state->my_elements = NULL;
352   }
353   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
354               "intersection operation failed\n");
355
356   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
357   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
358   msg->request_id = htonl (op->spec->client_request_id);
359   msg->element_type = htons (0);
360   GNUNET_MQ_send (op->spec->set->client_mq, ev);
361   _GSS_operation_destroy (op, GNUNET_YES);
362 }
363
364
365 /**
366  * Send a request for the evaluate operation to a remote peer
367  *
368  * @param op operation with the other peer
369  */
370 static void
371 send_operation_request (struct Operation *op)
372 {
373   struct GNUNET_MQ_Envelope *ev;
374   struct OperationRequestMessage *msg;
375
376   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
377                                 op->spec->context_msg);
378
379   if (NULL == ev)
380   {
381     /* the context message is too large */
382     GNUNET_break (0);
383     GNUNET_SERVER_client_disconnect (op->spec->set->client);
384     return;
385   }
386   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
387   msg->app_id = op->spec->app_id;
388   msg->salt = htonl (op->spec->salt);
389   msg->element_count = htonl(op->state->my_element_count);
390
391   GNUNET_MQ_send (op->mq, ev);
392
393   if (NULL != op->spec->context_msg)
394     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395                 "sent op request with context message\n");
396   else
397     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398                 "sent op request without context message\n");
399
400   if (NULL != op->spec->context_msg)
401   {
402     GNUNET_free (op->spec->context_msg);
403     op->spec->context_msg = NULL;
404   }
405 }
406
407
408 static void
409 send_bloomfilter_multipart (struct Operation *op,
410                             uint32_t offset)
411 {
412   struct GNUNET_MQ_Envelope *ev;
413   struct BFPart *msg;
414   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
415   uint32_t todo_size = op->state->bf_data_size - offset;
416
417   if (todo_size < chunk_size)
418     chunk_size = todo_size;
419
420   ev = GNUNET_MQ_msg_extra (msg,
421                             chunk_size,
422                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
423
424   msg->chunk_length = htonl (chunk_size);
425   msg->chunk_offset = htonl (offset);
426   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
427
428   GNUNET_MQ_send (op->mq, ev);
429
430   if (op->state->bf_data_size == offset + chunk_size)
431   {
432     // done
433     GNUNET_free(op->state->bf_data);
434     op->state->bf_data = NULL;
435     return;
436   }
437   send_bloomfilter_multipart (op, offset + chunk_size);
438 }
439
440
441 /**
442  * Send a bloomfilter to our peer.
443  * that the operation is over.
444  * After the result done message has been sent to the client,
445  * destroy the evaluate operation.
446  *
447  * @param op intersection operation
448  */
449 static void
450 send_bloomfilter (struct Operation *op)
451 {
452   struct GNUNET_MQ_Envelope *ev;
453   struct BFMessage *msg;
454   uint32_t bf_size;
455   uint32_t bf_elementbits;
456   uint32_t chunk_size;
457   struct GNUNET_CONTAINER_BloomFilter * local_bf;
458
459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
460               "sending bf of size %u\n");
461
462   CALCULATE_BF_SIZE(op->state->my_element_count,
463                     op->spec->remote_element_count,
464                     bf_size,
465                     bf_elementbits);
466
467   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
468                                                 bf_size,
469                                                 bf_elementbits);
470
471   op->spec->salt++;
472   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
473                                          &iterator_bf_create,
474                                          op);
475
476   // send our bloomfilter
477   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage))
478   {
479     // singlepart
480     chunk_size = bf_size;
481     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
482     GNUNET_assert (GNUNET_SYSERR !=
483                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
484                                                               (char*)&msg[1],
485                                                               bf_size));
486   }
487   else
488   {
489     //multipart
490     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
491     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
492     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
493     GNUNET_assert (GNUNET_SYSERR !=
494                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
495                                                               op->state->bf_data,
496                                                               bf_size));
497     memcpy (&msg[1], op->state->bf_data, chunk_size);
498     op->state->bf_data_size = bf_size;
499   }
500   GNUNET_CONTAINER_bloomfilter_free (local_bf);
501
502   msg->sender_element_count = htonl (op->state->my_element_count);
503   msg->bloomfilter_total_length = htonl (bf_size);
504   msg->bloomfilter_length = htonl (chunk_size);
505   msg->bits_per_element = htonl (bf_elementbits);
506   msg->sender_mutator = htonl (op->spec->salt);
507
508   GNUNET_MQ_send (op->mq, ev);
509
510   if (op->state->bf_data)
511     send_bloomfilter_multipart (op, chunk_size);
512 }
513
514
515 /**
516  * Signal to the client that the operation has finished and
517  * destroy the operation.
518  *
519  * @param cls operation to destroy
520  */
521 static void
522 send_client_done_and_destroy (void *cls)
523 {
524   struct Operation *op = cls;
525   struct GNUNET_MQ_Envelope *ev;
526   struct GNUNET_SET_ResultMessage *rm;
527
528   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
529   rm->request_id = htonl (op->spec->client_request_id);
530   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
531   rm->element_type = htons (0);
532   GNUNET_MQ_send (op->spec->set->client_mq, ev);
533   _GSS_operation_destroy (op, GNUNET_YES);
534 }
535
536
537 /**
538  * Send all elements in the full result iterator.
539  *
540  * @param cls operation
541  */
542 static void
543 send_remaining_elements (void *cls)
544 {
545   struct Operation *op = cls;
546   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
547   struct GNUNET_MQ_Envelope *ev;
548   struct GNUNET_SET_ResultMessage *rm;
549   struct GNUNET_SET_Element *element;
550   int res;
551
552   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
553                                                      NULL,
554                                                      (const void **) &remaining);
555   if (GNUNET_NO == res)
556   {
557     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558                 "sending done and destroy because iterator ran out\n");
559     send_client_done_and_destroy (op);
560     return;
561   }
562
563   element = &remaining->element;
564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565               "sending element (size %u) to client (full set)\n",
566               element->size);
567   GNUNET_assert (0 != op->spec->client_request_id);
568
569   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
570   GNUNET_assert (NULL != ev);
571
572   rm->result_status = htons (GNUNET_SET_STATUS_OK);
573   rm->request_id = htonl (op->spec->client_request_id);
574   rm->element_type = element->element_type;
575   memcpy (&rm[1], element->data, element->size);
576
577   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
578   GNUNET_MQ_send (op->spec->set->client_mq, ev);
579 }
580
581
582 /**
583  * Inform the peer that this operation is complete.
584  *
585  * @param op the intersection operation to fail
586  */
587 static void
588 send_peer_done (struct Operation *op)
589 {
590   struct GNUNET_MQ_Envelope *ev;
591
592   op->state->phase = PHASE_FINISHED;
593   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594               "Intersection succeeded, sending DONE\n");
595   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
596   op->state->local_bf = NULL;
597
598   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
599   GNUNET_MQ_send (op->mq, ev);
600 }
601
602
603 /**
604  * Process a Bloomfilter once we got all the chunks
605  *
606  * @param op the intersection operation
607  */
608 static void
609 process_bf (struct Operation *op)
610 {
611   uint32_t old_elements;
612   uint32_t peer_elements;
613
614   old_elements = op->state->my_element_count;
615   peer_elements = op->spec->remote_element_count;
616   switch (op->state->phase)
617   {
618   case PHASE_INITIAL:
619     // If we are ot our first msg
620     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
621
622     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
623                                            &iterator_initialization_by_alice,
624                                            op);
625     break;
626   case PHASE_BF_EXCHANGE:
627   case PHASE_MAYBE_FINISHED:
628     // if we are bob or alice and are continuing operation
629     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
630                                            &iterator_bf_reduce,
631                                            op);
632     break;
633   default:
634     GNUNET_break_op (0);
635     fail_intersection_operation(op);
636   }
637   // the iterators created a new BF with salt+1
638   // the peer needs this information for decoding the next BF
639   // this behavior can be modified at will later on.
640   op->spec->salt++;
641
642   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
643   op->state->remote_bf = NULL;
644
645   if ((0 == op->state->my_element_count) // fully disjoint
646       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
647           && (old_elements == op->state->my_element_count)
648           && (op->state->my_element_count == peer_elements)))
649   {
650     // In the last round we though we were finished, we now know this is correct
651     send_peer_done (op);
652     return;
653   }
654
655   op->state->phase = PHASE_BF_EXCHANGE;
656   if (op->state->my_element_count == peer_elements)
657     // maybe we are finished, but we do one more round to make certain
658     // we don't have false positives ...
659     op->state->phase = PHASE_MAYBE_FINISHED;
660
661   send_bloomfilter (op);
662 }
663
664
665 /**
666  * Handle an BF multipart message from a remote peer.
667  *
668  * @param cls the intersection operation
669  * @param mh the header of the message
670  */
671 static void
672 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
673 {
674   struct Operation *op = cls;
675   const struct BFPart *msg = (const struct BFPart *) mh;
676   uint32_t chunk_size;
677   uint32_t chunk_offset;
678
679   chunk_size = ntohl(msg->chunk_length);
680   chunk_offset = ntohl(msg->chunk_offset);
681
682   if ((NULL == op->state->bf_data)
683        || (op->state->bf_data_size < chunk_size + chunk_offset))
684   {
685     // unexpected multipart chunk
686     GNUNET_break_op (0);
687     fail_intersection_operation(op);
688     return;
689   }
690
691   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
692
693   if (op->state->bf_data_size != chunk_offset + chunk_size)
694     // wait for next chunk
695     return;
696
697   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
698                                                             op->state->bf_data_size,
699                                                             op->state->bf_bits_per_element);
700
701   GNUNET_free (op->state->bf_data);
702   op->state->bf_data = NULL;
703
704   process_bf (op);
705 }
706
707
708 /**
709  * Handle an BF message from a remote peer.
710  *
711  * @param cls the intersection operation
712  * @param mh the header of the message
713  */
714 static void
715 handle_p2p_bf (void *cls,
716                const struct GNUNET_MessageHeader *mh)
717 {
718   struct Operation *op = cls;
719   const struct BFMessage *msg = (const struct BFMessage *) mh;
720   uint32_t bf_size;
721   uint32_t chunk_size;
722   uint32_t bf_bits_per_element;
723
724   switch (op->state->phase)
725   {
726   case PHASE_INITIAL:
727   case PHASE_BF_EXCHANGE:
728   case PHASE_MAYBE_FINISHED:
729     if (NULL == op->state->bf_data)
730     {
731       // no colliding multipart transaction going on currently
732       op->spec->salt = ntohl (msg->sender_mutator);
733       bf_size = ntohl (msg->bloomfilter_total_length);
734       bf_bits_per_element = ntohl (msg->bits_per_element);
735       chunk_size = ntohl (msg->bloomfilter_length);
736       op->spec->remote_element_count = ntohl(msg->sender_element_count);
737       if (bf_size == chunk_size)
738       {
739         // single part, done here
740         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
741                                                                   bf_size,
742                                                                   bf_bits_per_element);
743         process_bf (op);
744         return;
745       }
746
747       //first multipart chunk
748       op->state->bf_data = GNUNET_malloc (bf_size);
749       op->state->bf_data_size = bf_size;
750       op->state->bf_bits_per_element = bf_bits_per_element;
751       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
752       return;
753     }
754   default:
755     GNUNET_break_op (0);
756     fail_intersection_operation (op);
757   }
758 }
759
760
761 /**
762  * Handle an BF message from a remote peer.
763  *
764  * @param cls the intersection operation
765  * @param mh the header of the message
766  */
767 static void
768 handle_p2p_element_info (void *cls,
769                          const struct GNUNET_MessageHeader *mh)
770 {
771   struct Operation *op = cls;
772   const struct BFMessage *msg = (const struct BFMessage *) mh;
773
774   op->spec->remote_element_count = ntohl(msg->sender_element_count);
775   if ((op->state->phase != PHASE_INITIAL)
776       || (op->state->my_element_count > op->spec->remote_element_count)
777           || (0 == op->state->my_element_count)
778               || (0 == op->spec->remote_element_count))
779   {
780     GNUNET_break_op (0);
781     fail_intersection_operation(op);
782     return;
783   }
784
785   op->state->phase = PHASE_BF_EXCHANGE;
786   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
787
788   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
789                                          &iterator_initialization,
790                                          op);
791
792   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
793   op->state->remote_bf = NULL;
794
795   if (op->state->my_element_count == ntohl (msg->sender_element_count))
796     op->state->phase = PHASE_MAYBE_FINISHED;
797
798   send_bloomfilter (op);
799 }
800
801
802 /**
803  * Send our element count to the peer, in case our element count is lower than his
804  *
805  * @param op intersection operation
806  */
807 static void
808 send_element_count (struct Operation *op)
809 {
810   struct GNUNET_MQ_Envelope *ev;
811   struct BFMessage *msg;
812
813   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
814               "sending element count (bf_msg)\n");
815
816   // just send our element count, as the other peer must start
817   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
818   msg->sender_element_count = htonl (op->state->my_element_count);
819   msg->bloomfilter_length = htonl (0);
820   msg->sender_mutator = htonl (0);
821
822   GNUNET_MQ_send (op->mq, ev);
823 }
824
825
826 /**
827  * Send a result message to the client indicating
828  * that the operation is over.
829  * After the result done message has been sent to the client,
830  * destroy the evaluate operation.
831  *
832  * @param op intersection operation
833  */
834 static void
835 finish_and_destroy (struct Operation *op)
836 {
837   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
838
839   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
840   {
841     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842                 "sending full result set\n");
843     op->state->full_result_iter =
844         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
845     send_remaining_elements (op);
846     return;
847   }
848   send_client_done_and_destroy (op);
849 }
850
851
852 /**
853  * Handle a done message from a remote peer
854  *
855  * @param cls the union operation
856  * @param mh the message
857  */
858 static void
859 handle_p2p_done (void *cls,
860                  const struct GNUNET_MessageHeader *mh)
861 {
862   struct Operation *op = cls;
863
864   if ( (op->state->phase = PHASE_FINISHED) ||
865        (op->state->phase = PHASE_MAYBE_FINISHED) )
866   {
867     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                 "got final DONE\n");
869
870     finish_and_destroy (op);
871     return;
872   }
873
874   GNUNET_break_op (0);
875   fail_intersection_operation (op);
876 }
877
878
879 /**
880  * Evaluate a union operation with
881  * a remote peer.
882  *
883  * @param op operation to evaluate
884  */
885 static void
886 intersection_evaluate (struct Operation *op)
887 {
888   op->state = GNUNET_new (struct OperationState);
889   /* we started the operation, thus we have to send the operation request */
890   op->state->phase = PHASE_INITIAL;
891   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
892   op->state->my_element_count = op->spec->set->state->current_set_element_count;
893
894   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895               "evaluating intersection operation");
896   send_operation_request (op);
897 }
898
899
900 /**
901  * Accept an union operation request from a remote peer.
902  * Only initializes the private operation state.
903  *
904  * @param op operation that will be accepted as a union operation
905  */
906 static void
907 intersection_accept (struct Operation *op)
908 {
909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910               "accepting set union operation\n");
911   op->state = GNUNET_new (struct OperationState);
912   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
913   op->state->my_element_count = op->spec->set->state->current_set_element_count;
914
915   // if Alice (the peer) has more elements than Bob (us), she should start
916   if (op->spec->remote_element_count < op->state->my_element_count){
917     op->state->phase = PHASE_INITIAL;
918     send_element_count(op);
919     return;
920   }
921   // create a new bloomfilter in case we have fewer elements
922   op->state->phase = PHASE_BF_EXCHANGE;
923   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
924                                                            BLOOMFILTER_SIZE,
925                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
926   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
927                                          &iterator_initialization,
928                                          op);
929   send_bloomfilter (op);
930 }
931
932
933 /**
934  * Create a new set supporting the intersection operation
935  *
936  * @return the newly created set
937  */
938 static struct SetState *
939 intersection_set_create ()
940 {
941   struct SetState *set_state;
942
943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944               "intersection set created\n");
945   set_state = GNUNET_new (struct SetState);
946   set_state->current_set_element_count = 0;
947
948   return set_state;
949 }
950
951
952 /**
953  * Add the element from the given element message to the set.
954  *
955  * @param set_state state of the set want to add to
956  * @param ee the element to add to the set
957  */
958 static void
959 intersection_add (struct SetState *set_state,
960                   struct ElementEntry *ee)
961 {
962   set_state->current_set_element_count++;
963 }
964
965
966 /**
967  * Destroy a set that supports the intersection operation
968  *
969  * @param set_state the set to destroy
970  */
971 static void
972 intersection_set_destroy (struct SetState *set_state)
973 {
974   GNUNET_free (set_state);
975 }
976
977
978 /**
979  * Remove the element given in the element message from the set.
980  *
981  * @param set_state state of the set to remove from
982  * @param element set element to remove
983  */
984 static void
985 intersection_remove (struct SetState *set_state,
986                      struct ElementEntry *element)
987 {
988   GNUNET_assert(0 < set_state->current_set_element_count);
989   set_state->current_set_element_count--;
990 }
991
992
993 /**
994  * Dispatch messages for a intersection operation.
995  *
996  * @param op the state of the intersection evaluate operation
997  * @param mh the received message
998  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
999  *         #GNUNET_OK otherwise
1000  */
1001 static int
1002 intersection_handle_p2p_message (struct Operation *op,
1003                                  const struct GNUNET_MessageHeader *mh)
1004 {
1005   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006               "received p2p message (t: %u, s: %u)\n",
1007               ntohs (mh->type), ntohs (mh->size));
1008   switch (ntohs (mh->type))
1009   {
1010     /* this message handler is not active until after we received an
1011      * operation request message, thus the ops request is not handled here
1012      */
1013   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1014     handle_p2p_element_info (op, mh);
1015     break;
1016   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1017     handle_p2p_bf (op, mh);
1018     break;
1019   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
1020     handle_p2p_bf_part (op, mh);
1021     break;
1022   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1023     handle_p2p_done (op, mh);
1024     break;
1025   default:
1026     /* something wrong with cadet's message handlers? */
1027     GNUNET_assert (0);
1028   }
1029   return GNUNET_OK;
1030 }
1031
1032
1033 /**
1034  * handler for peer-disconnects, notifies the client about the aborted operation
1035  *
1036  * @param op the destroyed operation
1037  */
1038 static void
1039 intersection_peer_disconnect (struct Operation *op)
1040 {
1041   if (PHASE_FINISHED != op->state->phase)
1042   {
1043     struct GNUNET_MQ_Envelope *ev;
1044     struct GNUNET_SET_ResultMessage *msg;
1045
1046     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1047     msg->request_id = htonl (op->spec->client_request_id);
1048     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1049     msg->element_type = htons (0);
1050     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1051     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1052                 "other peer disconnected prematurely\n");
1053     _GSS_operation_destroy (op, GNUNET_YES);
1054     return;
1055   }
1056   // else: the session has already been concluded
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "other peer disconnected (finished)\n");
1059   if (GNUNET_NO == op->state->client_done_sent)
1060     finish_and_destroy (op);
1061 }
1062
1063
1064 /**
1065  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1066  *
1067  * @param op union operation to destroy
1068  */
1069 static void
1070 intersection_op_cancel (struct Operation *op)
1071 {
1072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073               "destroying intersection op\n");
1074   /* check if the op was canceled twice */
1075   GNUNET_assert (NULL != op->state);
1076   if (NULL != op->state->remote_bf)
1077   {
1078     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1079     op->state->remote_bf = NULL;
1080   }
1081   if (NULL != op->state->local_bf)
1082   {
1083     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1084     op->state->local_bf = NULL;
1085   }
1086 /*  if (NULL != op->state->my_elements)
1087   {
1088     // no need to free the elements, they are still part of the set
1089     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1090     op->state->my_elements = NULL;
1091   }*/
1092   GNUNET_free (op->state);
1093   op->state = NULL;
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "destroying intersection op done\n");
1096 }
1097
1098
1099 /**
1100  * Get the table with implementing functions for set intersection.
1101  *
1102  * @return the operation specific VTable
1103  */
1104 const struct SetVT *
1105 _GSS_intersection_vt ()
1106 {
1107   static const struct SetVT intersection_vt = {
1108     .create = &intersection_set_create,
1109     .msg_handler = &intersection_handle_p2p_message,
1110     .add = &intersection_add,
1111     .remove = &intersection_remove,
1112     .destroy_set = &intersection_set_destroy,
1113     .evaluate = &intersection_evaluate,
1114     .accept = &intersection_accept,
1115     .peer_disconnect = &intersection_peer_disconnect,
1116     .cancel = &intersection_op_cancel,
1117   };
1118
1119   return &intersection_vt;
1120 }