27110077e0b38e20a15f78b56cb688f9410452a1
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             if (k<1) k=1; /* k can be calculated as 0 */\
39                             s = ceil((double) (A * k / log(2))); \
40                           } while (0)
41
42 /**
43  * Current phase we are in for a intersection operation.
44  */
45 enum IntersectionOperationPhase
46 {
47   /**
48    * Alices has suggested an operation to bob,
49    * and is waiting for a bf or session end.
50    */
51   PHASE_INITIAL,
52   /**
53    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
54    * until one notices the their element count is equal
55    */
56   PHASE_BF_EXCHANGE,
57   /**
58    * if both peers have an equal peercount, they enter this state for
59    * one more turn, to see if they actually have agreed on a correct set.
60    * if a peer finds the same element count after the next iteration,
61    * it ends the the session
62    */
63   PHASE_MAYBE_FINISHED,
64   /**
65    * The protocol is over.
66    * Results may still have to be sent to the client.
67    */
68   PHASE_FINISHED
69 };
70
71
72 /**
73  * State of an evaluate operation
74  * with another peer.
75  */
76 struct OperationState
77 {
78   /**
79    * The bf we currently receive
80    */
81   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
82
83   /**
84    * BF of the set's element.
85    */
86   struct GNUNET_CONTAINER_BloomFilter *local_bf;
87
88   /**
89    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
90    */
91   char * bf_data;
92
93   /**
94    * size of the bloomfilter
95    */
96   uint32_t bf_data_size;
97
98   /**
99    * size of the bloomfilter
100    */
101   uint32_t bf_bits_per_element;
102
103   /**
104    * Current state of the operation.
105    */
106   enum IntersectionOperationPhase phase;
107
108   /**
109    * Generation in which the operation handle
110    * was created.
111    */
112   unsigned int generation_created;
113
114   /**
115    * Maps element-id-hashes to 'elements in our set'.
116    */
117   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
118
119   /**
120    * Current element count contained within contained_elements
121    */
122   uint32_t my_element_count;
123
124   /**
125    * Iterator for sending elements on the key to element mapping to the client.
126    */
127   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
128
129   /**
130    * Evaluate operations are held in
131    * a linked list.
132    */
133   struct OperationState *next;
134
135    /**
136     * Evaluate operations are held in
137     * a linked list.
138     */
139   struct OperationState *prev;
140
141   /**
142    * Did we send the client that we are done?
143    */
144   int client_done_sent;
145 };
146
147
148 /**
149  * Extra state required for efficient set intersection.
150  */
151 struct SetState
152 {
153   /**
154    * Number of currently valid elements in the set which have not been removed
155    */
156   uint32_t current_set_element_count;
157 };
158
159
160 /**
161  * Send a result message to the client indicating
162  * we removed an element
163  *
164  * @param op union operation
165  * @param element element to send
166  */
167 static void
168 send_client_element (struct Operation *op,
169                      struct GNUNET_SET_Element *element)
170 {
171   struct GNUNET_MQ_Envelope *ev;
172   struct GNUNET_SET_ResultMessage *rm;
173
174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
175               "sending removed element (size %u) to client\n",
176               element->size);
177   GNUNET_assert (0 != op->spec->client_request_id);
178   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
179   if (NULL == ev)
180   {
181     GNUNET_MQ_discard (ev);
182     GNUNET_break (0);
183     return;
184   }
185   rm->result_status = htons (GNUNET_SET_STATUS_OK);
186   rm->request_id = htonl (op->spec->client_request_id);
187   rm->element_type = element->type;
188   memcpy (&rm[1], element->data, element->size);
189   GNUNET_MQ_send (op->spec->set->client_mq, ev);
190 }
191
192
193 /**
194  * Alice's version:
195  *
196  * fills the contained-elements hashmap with all relevant
197  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
198  *
199  * @param cls closure
200  * @param key current key code
201  * @param value value in the hash map
202  * @return #GNUNET_YES if we should continue to
203  *         iterate,
204  *         #GNUNET_NO if not.
205  */
206 static int
207 iterator_initialization_by_alice (void *cls,
208                                   const struct GNUNET_HashCode *key,
209                                   void *value)
210 {
211   struct ElementEntry *ee = value;
212   struct Operation *op = cls;
213   struct GNUNET_HashCode mutated_hash;
214
215   //only consider this element, if it is valid for us
216   if ((op->generation_created < ee->generation_removed)
217        && (op->generation_created >= ee->generation_added))
218     return GNUNET_YES;
219
220   // not contained according to bob's bloomfilter
221   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
222                            op->spec->salt,
223                            &mutated_hash);
224   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
225                                                       &mutated_hash)){
226     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
227       send_client_element (op, &ee->element);
228     return GNUNET_YES;
229   }
230
231   op->state->my_element_count++;
232   GNUNET_assert (GNUNET_YES ==
233                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
234                                                     &ee->element_hash, ee,
235                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
236
237   return GNUNET_YES;
238 }
239
240 /**
241  * fills the contained-elements hashmap with all relevant
242  * elements and adds their mutated hashes to our local bloomfilter
243  *
244  * @param cls closure
245  * @param key current key code
246  * @param value value in the hash map
247  * @return #GNUNET_YES if we should continue to
248  *         iterate,
249  *         #GNUNET_NO if not.
250  */
251 static int
252 iterator_initialization (void *cls,
253                          const struct GNUNET_HashCode *key,
254                          void *value)
255 {
256   struct ElementEntry *ee = value;
257   struct Operation *op = cls;
258
259   //only consider this element, if it is valid for us
260   if ((op->generation_created < ee->generation_removed)
261        && (op->generation_created >= ee->generation_added))
262     return GNUNET_YES;
263
264   GNUNET_assert (GNUNET_YES ==
265                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
266                                                     &ee->element_hash, ee,
267                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
268   return GNUNET_YES;
269 }
270
271
272 /**
273  * removes element from a hashmap if it is not contained within the
274  * provided remote bloomfilter. Then, fill our new bloomfilter.
275  *
276  * @param cls closure
277  * @param key current key code
278  * @param value value in the hash map
279  * @return #GNUNET_YES if we should continue to
280  *         iterate,
281  *         #GNUNET_NO if not.
282  */
283 static int
284 iterator_bf_reduce (void *cls,
285                    const struct GNUNET_HashCode *key,
286                    void *value)
287 {
288   struct ElementEntry *ee = value;
289   struct Operation *op = cls;
290   struct GNUNET_HashCode mutated_hash;
291
292   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
293
294   if (GNUNET_NO ==
295       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
296                                          &mutated_hash))
297   {
298     op->state->my_element_count--;
299     GNUNET_assert (GNUNET_YES ==
300                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
301                                                          &ee->element_hash,
302                                                          ee));
303     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
304       send_client_element (op, &ee->element);
305   }
306
307   return GNUNET_YES;
308 }
309
310
311 /**
312  * Create a bloomfilter based on the elements given
313  *
314  * @param cls closure
315  * @param key current key code
316  * @param value value in the hash map
317  * @return #GNUNET_YES if we should continue to
318  *         iterate,
319  *         #GNUNET_NO if not.
320  */
321 static int
322 iterator_bf_create (void *cls,
323                     const struct GNUNET_HashCode *key,
324                     void *value)
325 {
326   struct ElementEntry *ee = value;
327   struct Operation *op = cls;
328   struct GNUNET_HashCode mutated_hash;
329
330   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
331                             op->spec->salt,
332                             &mutated_hash);
333   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
334                                     &mutated_hash);
335   return GNUNET_YES;
336 }
337
338
339 /**
340  * Inform the client that the union operation has failed,
341  * and proceed to destroy the evaluate operation.
342  *
343  * @param op the intersection operation to fail
344  */
345 static void
346 fail_intersection_operation (struct Operation *op)
347 {
348   struct GNUNET_MQ_Envelope *ev;
349   struct GNUNET_SET_ResultMessage *msg;
350
351   if (op->state->my_elements)
352   {
353     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
354     op->state->my_elements = NULL;
355   }
356   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
357               "intersection operation failed\n");
358
359   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
360   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
361   msg->request_id = htonl (op->spec->client_request_id);
362   msg->element_type = htons (0);
363   GNUNET_MQ_send (op->spec->set->client_mq, ev);
364   _GSS_operation_destroy (op);
365 }
366
367
368 /**
369  * Send a request for the evaluate operation to a remote peer
370  *
371  * @param op operation with the other peer
372  */
373 static void
374 send_operation_request (struct Operation *op)
375 {
376   struct GNUNET_MQ_Envelope *ev;
377   struct OperationRequestMessage *msg;
378
379   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
380                                 op->spec->context_msg);
381
382   if (NULL == ev)
383   {
384     /* the context message is too large */
385     GNUNET_break (0);
386     GNUNET_SERVER_client_disconnect (op->spec->set->client);
387     return;
388   }
389   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
390   msg->app_id = op->spec->app_id;
391   msg->salt = htonl (op->spec->salt);
392   msg->element_count = htonl(op->state->my_element_count);
393
394   GNUNET_MQ_send (op->mq, ev);
395
396   if (NULL != op->spec->context_msg)
397     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
398                 "sent op request with context message\n");
399   else
400     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401                 "sent op request without context message\n");
402
403   if (NULL != op->spec->context_msg)
404   {
405     GNUNET_free (op->spec->context_msg);
406     op->spec->context_msg = NULL;
407   }
408 }
409
410
411 static void
412 send_bloomfilter_multipart (struct Operation *op,
413                             uint32_t offset)
414 {
415   struct GNUNET_MQ_Envelope *ev;
416   struct BFPart *msg;
417   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
418   uint32_t todo_size = op->state->bf_data_size - offset;
419
420   if (todo_size < chunk_size)
421     chunk_size = todo_size;
422
423   ev = GNUNET_MQ_msg_extra (msg,
424                             chunk_size,
425                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
426
427   msg->chunk_length = htonl (chunk_size);
428   msg->chunk_offset = htonl (offset);
429   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
430
431   GNUNET_MQ_send (op->mq, ev);
432
433   if (op->state->bf_data_size == offset + chunk_size)
434   {
435     // done
436     GNUNET_free(op->state->bf_data);
437     op->state->bf_data = NULL;
438     return;
439   }
440   send_bloomfilter_multipart (op, offset + chunk_size);
441 }
442
443
444 /**
445  * Send a bloomfilter to our peer.
446  * that the operation is over.
447  * After the result done message has been sent to the client,
448  * destroy the evaluate operation.
449  *
450  * @param op intersection operation
451  */
452 static void
453 send_bloomfilter (struct Operation *op)
454 {
455   struct GNUNET_MQ_Envelope *ev;
456   struct BFMessage *msg;
457   uint32_t bf_size;
458   uint32_t bf_elementbits;
459   uint32_t chunk_size;
460   struct GNUNET_CONTAINER_BloomFilter * local_bf;
461
462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463               "sending bf of size %u\n");
464
465   CALCULATE_BF_SIZE(op->state->my_element_count,
466                     op->spec->remote_element_count,
467                     bf_size,
468                     bf_elementbits);
469
470   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
471                                                 bf_size,
472                                                 bf_elementbits);
473
474   op->spec->salt++;
475   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
476                                          &iterator_bf_create,
477                                          op);
478
479   // send our bloomfilter
480   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage))
481   {
482     // singlepart
483     chunk_size = bf_size;
484     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
485     GNUNET_assert (GNUNET_SYSERR !=
486                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
487                                                               (char*)&msg[1],
488                                                               bf_size));
489   }
490   else
491   {
492     //multipart
493     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
494     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
495     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
496     GNUNET_assert (GNUNET_SYSERR !=
497                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
498                                                               op->state->bf_data,
499                                                               bf_size));
500     memcpy (&msg[1], op->state->bf_data, chunk_size);
501     op->state->bf_data_size = bf_size;
502   }
503   GNUNET_CONTAINER_bloomfilter_free (local_bf);
504
505   msg->sender_element_count = htonl (op->state->my_element_count);
506   msg->bloomfilter_total_length = htonl (bf_size);
507   msg->bloomfilter_length = htonl (chunk_size);
508   msg->bits_per_element = htonl (bf_elementbits);
509   msg->sender_mutator = htonl (op->spec->salt);
510
511   GNUNET_MQ_send (op->mq, ev);
512
513   if (op->state->bf_data)
514     send_bloomfilter_multipart (op, chunk_size);
515 }
516
517
518 /**
519  * Signal to the client that the operation has finished and
520  * destroy the operation.
521  *
522  * @param cls operation to destroy
523  */
524 static void
525 send_client_done_and_destroy (void *cls)
526 {
527   struct Operation *op = cls;
528   struct GNUNET_MQ_Envelope *ev;
529   struct GNUNET_SET_ResultMessage *rm;
530   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
531   rm->request_id = htonl (op->spec->client_request_id);
532   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
533   rm->element_type = htons (0);
534   GNUNET_MQ_send (op->spec->set->client_mq, ev);
535   _GSS_operation_destroy (op);
536 }
537
538
539 /**
540  * Send all elements in the full result iterator.
541  *
542  * @param cls operation
543  */
544 static void
545 send_remaining_elements (void *cls)
546 {
547   struct Operation *op = cls;
548   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
549   struct GNUNET_MQ_Envelope *ev;
550   struct GNUNET_SET_ResultMessage *rm;
551   struct GNUNET_SET_Element *element;
552   int res;
553
554   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
555                                                      NULL,
556                                                      (const void **) &remaining);
557   if (GNUNET_NO == res)
558   {
559     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
560                 "sending done and destroy because iterator ran out\n");
561     send_client_done_and_destroy (op);
562     return;
563   }
564
565   element = &remaining->element;
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567               "sending element (size %u) to client (full set)\n",
568               element->size);
569   GNUNET_assert (0 != op->spec->client_request_id);
570
571   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
572   GNUNET_assert (NULL != ev);
573
574   rm->result_status = htons (GNUNET_SET_STATUS_OK);
575   rm->request_id = htonl (op->spec->client_request_id);
576   rm->element_type = element->type;
577   memcpy (&rm[1], element->data, element->size);
578
579   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
580   GNUNET_MQ_send (op->spec->set->client_mq, ev);
581 }
582
583
584 /**
585  * Inform the peer that this operation is complete.
586  *
587  * @param op the intersection operation to fail
588  */
589 static void
590 send_peer_done (struct Operation *op)
591 {
592   struct GNUNET_MQ_Envelope *ev;
593
594   op->state->phase = PHASE_FINISHED;
595   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596               "Intersection succeeded, sending DONE\n");
597   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
598   op->state->local_bf = NULL;
599
600   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
601   GNUNET_MQ_send (op->mq, ev);
602 }
603
604
605 /**
606  * Process a Bloomfilter once we got all the chunks
607  *
608  * @param op the intersection operation
609  */
610 static void
611 process_bf (struct Operation *op)
612 {
613   uint32_t old_elements;
614   uint32_t peer_elements;
615
616   old_elements = op->state->my_element_count;
617   peer_elements = op->spec->remote_element_count;
618   switch (op->state->phase)
619   {
620   case PHASE_INITIAL:
621     // If we are ot our first msg
622     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
623
624     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
625                                            &iterator_initialization_by_alice,
626                                            op);
627     break;
628   case PHASE_BF_EXCHANGE:
629   case PHASE_MAYBE_FINISHED:
630     // if we are bob or alice and are continuing operation
631     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
632                                            &iterator_bf_reduce,
633                                            op);
634     break;
635   default:
636     GNUNET_break_op (0);
637     fail_intersection_operation(op);
638   }
639   // the iterators created a new BF with salt+1
640   // the peer needs this information for decoding the next BF
641   // this behavior can be modified at will later on.
642   op->spec->salt++;
643
644   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
645   op->state->remote_bf = NULL;
646
647   if ((0 == op->state->my_element_count) // fully disjoint
648       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
649           && (old_elements == op->state->my_element_count)
650           && (op->state->my_element_count == peer_elements)))
651   {
652     // In the last round we though we were finished, we now know this is correct
653     send_peer_done (op);
654     return;
655   }
656
657   op->state->phase = PHASE_BF_EXCHANGE;
658   if (op->state->my_element_count == peer_elements)
659     // maybe we are finished, but we do one more round to make certain
660     // we don't have false positives ...
661     op->state->phase = PHASE_MAYBE_FINISHED;
662
663   send_bloomfilter (op);
664 }
665
666
667 /**
668  * Handle an BF multipart message from a remote peer.
669  *
670  * @param cls the intersection operation
671  * @param mh the header of the message
672  */
673 static void
674 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
675 {
676   struct Operation *op = cls;
677   const struct BFPart *msg = (const struct BFPart *) mh;
678   uint32_t chunk_size;
679   uint32_t chunk_offset;
680
681   chunk_size = ntohl(msg->chunk_length);
682   chunk_offset = ntohl(msg->chunk_offset);
683
684   if ((NULL == op->state->bf_data)
685        || (op->state->bf_data_size < chunk_size + chunk_offset))
686   {
687     // unexpected multipart chunk
688     GNUNET_break_op (0);
689     fail_intersection_operation(op);
690     return;
691   }
692
693   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
694
695   if (op->state->bf_data_size != chunk_offset + chunk_size)
696     // wait for next chunk
697     return;
698
699   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
700                                                             op->state->bf_data_size,
701                                                             op->state->bf_bits_per_element);
702
703   GNUNET_free (op->state->bf_data);
704   op->state->bf_data = NULL;
705
706   process_bf (op);
707 }
708
709
710 /**
711  * Handle an BF message from a remote peer.
712  *
713  * @param cls the intersection operation
714  * @param mh the header of the message
715  */
716 static void
717 handle_p2p_bf (void *cls,
718                const struct GNUNET_MessageHeader *mh)
719 {
720   struct Operation *op = cls;
721   const struct BFMessage *msg = (const struct BFMessage *) mh;
722   uint32_t bf_size;
723   uint32_t chunk_size;
724   uint32_t bf_bits_per_element;
725
726   switch (op->state->phase)
727   {
728   case PHASE_INITIAL:
729   case PHASE_BF_EXCHANGE:
730   case PHASE_MAYBE_FINISHED:
731     if (NULL == op->state->bf_data)
732     {
733       // no colliding multipart transaction going on currently
734       op->spec->salt = ntohl (msg->sender_mutator);
735       bf_size = ntohl (msg->bloomfilter_total_length);
736       bf_bits_per_element = ntohl (msg->bits_per_element);
737       chunk_size = ntohl (msg->bloomfilter_length);
738       op->spec->remote_element_count = ntohl(msg->sender_element_count);
739       if (bf_size == chunk_size)
740       {
741         // single part, done here
742         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
743                                                                   bf_size,
744                                                                   bf_bits_per_element);
745         process_bf (op);
746         return;
747       }
748
749       //first multipart chunk
750       op->state->bf_data = GNUNET_malloc (bf_size);
751       op->state->bf_data_size = bf_size;
752       op->state->bf_bits_per_element = bf_bits_per_element;
753       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
754       return;
755     }
756   default:
757     GNUNET_break_op (0);
758     fail_intersection_operation (op);
759   }
760 }
761
762
763 /**
764  * Handle an BF message from a remote peer.
765  *
766  * @param cls the intersection operation
767  * @param mh the header of the message
768  */
769 static void
770 handle_p2p_element_info (void *cls,
771                          const struct GNUNET_MessageHeader *mh)
772 {
773   struct Operation *op = cls;
774   const struct BFMessage *msg = (const struct BFMessage *) mh;
775
776   op->spec->remote_element_count = ntohl(msg->sender_element_count);
777   if ((op->state->phase != PHASE_INITIAL)
778       || (op->state->my_element_count > op->spec->remote_element_count)
779           || (0 == op->state->my_element_count)
780               || (0 == op->spec->remote_element_count))
781   {
782     GNUNET_break_op (0);
783     fail_intersection_operation(op);
784     return;
785   }
786
787   op->state->phase = PHASE_BF_EXCHANGE;
788   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
789
790   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
791                                          &iterator_initialization,
792                                          op);
793
794   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
795   op->state->remote_bf = NULL;
796
797   if (op->state->my_element_count == ntohl (msg->sender_element_count))
798     op->state->phase = PHASE_MAYBE_FINISHED;
799
800   send_bloomfilter (op);
801 }
802
803
804 /**
805  * Send our element count to the peer, in case our element count is lower than his
806  *
807  * @param op intersection operation
808  */
809 static void
810 send_element_count (struct Operation *op)
811 {
812   struct GNUNET_MQ_Envelope *ev;
813   struct BFMessage *msg;
814
815   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
816               "sending element count (bf_msg)\n");
817
818   // just send our element count, as the other peer must start
819   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
820   msg->sender_element_count = htonl (op->state->my_element_count);
821   msg->bloomfilter_length = htonl (0);
822   msg->sender_mutator = htonl (0);
823
824   GNUNET_MQ_send (op->mq, ev);
825 }
826
827
828 /**
829  * Send a result message to the client indicating
830  * that the operation is over.
831  * After the result done message has been sent to the client,
832  * destroy the evaluate operation.
833  *
834  * @param op intersection operation
835  */
836 static void
837 finish_and_destroy (struct Operation *op)
838 {
839   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
840
841   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
842   {
843     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                 "sending full result set\n");
845     op->state->full_result_iter =
846         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
847     send_remaining_elements (op);
848     return;
849   }
850   send_client_done_and_destroy (op);
851 }
852
853
854 /**
855  * Handle a done message from a remote peer
856  *
857  * @param cls the union operation
858  * @param mh the message
859  */
860 static void
861 handle_p2p_done (void *cls,
862                  const struct GNUNET_MessageHeader *mh)
863 {
864   struct Operation *op = cls;
865
866   if ( (op->state->phase = PHASE_FINISHED) ||
867        (op->state->phase = PHASE_MAYBE_FINISHED) )
868   {
869     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870                 "got final DONE\n");
871
872     finish_and_destroy (op);
873     return;
874   }
875
876   GNUNET_break_op (0);
877   fail_intersection_operation (op);
878 }
879
880
881 /**
882  * Evaluate a union operation with
883  * a remote peer.
884  *
885  * @param op operation to evaluate
886  */
887 static void
888 intersection_evaluate (struct Operation *op)
889 {
890   op->state = GNUNET_new (struct OperationState);
891   /* we started the operation, thus we have to send the operation request */
892   op->state->phase = PHASE_INITIAL;
893   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
894   op->state->my_element_count = op->spec->set->state->current_set_element_count;
895
896   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
897               "evaluating intersection operation");
898   send_operation_request (op);
899 }
900
901
902 /**
903  * Accept an union operation request from a remote peer.
904  * Only initializes the private operation state.
905  *
906  * @param op operation that will be accepted as a union operation
907  */
908 static void
909 intersection_accept (struct Operation *op)
910 {
911   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
912               "accepting set union operation\n");
913   op->state = GNUNET_new (struct OperationState);
914   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
915   op->state->my_element_count = op->spec->set->state->current_set_element_count;
916
917   // if Alice (the peer) has more elements than Bob (us), she should start
918   if (op->spec->remote_element_count < op->state->my_element_count){
919     op->state->phase = PHASE_INITIAL;
920     send_element_count(op);
921     return;
922   }
923   // create a new bloomfilter in case we have fewer elements
924   op->state->phase = PHASE_BF_EXCHANGE;
925   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
926                                                            BLOOMFILTER_SIZE,
927                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
928   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
929                                          &iterator_initialization,
930                                          op);
931   send_bloomfilter (op);
932 }
933
934
935 /**
936  * Create a new set supporting the intersection operation
937  *
938  * @return the newly created set
939  */
940 static struct SetState *
941 intersection_set_create ()
942 {
943   struct SetState *set_state;
944
945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
946               "intersection set created\n");
947   set_state = GNUNET_new (struct SetState);
948   set_state->current_set_element_count = 0;
949
950   return set_state;
951 }
952
953
954 /**
955  * Add the element from the given element message to the set.
956  *
957  * @param set_state state of the set want to add to
958  * @param ee the element to add to the set
959  */
960 static void
961 intersection_add (struct SetState *set_state,
962                   struct ElementEntry *ee)
963 {
964   set_state->current_set_element_count++;
965 }
966
967
968 /**
969  * Destroy a set that supports the intersection operation
970  *
971  * @param set_state the set to destroy
972  */
973 static void
974 intersection_set_destroy (struct SetState *set_state)
975 {
976   GNUNET_free (set_state);
977 }
978
979
980 /**
981  * Remove the element given in the element message from the set.
982  *
983  * @param set_state state of the set to remove from
984  * @param element set element to remove
985  */
986 static void
987 intersection_remove (struct SetState *set_state,
988                      struct ElementEntry *element)
989 {
990   GNUNET_assert(0 < set_state->current_set_element_count);
991   set_state->current_set_element_count--;
992 }
993
994
995 /**
996  * Dispatch messages for a intersection operation.
997  *
998  * @param op the state of the intersection evaluate operation
999  * @param mh the received message
1000  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1001  *         #GNUNET_OK otherwise
1002  */
1003 static int
1004 intersection_handle_p2p_message (struct Operation *op,
1005                                  const struct GNUNET_MessageHeader *mh)
1006 {
1007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1008               "received p2p message (t: %u, s: %u)\n",
1009               ntohs (mh->type), ntohs (mh->size));
1010   switch (ntohs (mh->type))
1011   {
1012     /* this message handler is not active until after we received an
1013      * operation request message, thus the ops request is not handled here
1014      */
1015   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1016     handle_p2p_element_info (op, mh);
1017     break;
1018   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1019     handle_p2p_bf (op, mh);
1020     break;
1021   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
1022     handle_p2p_bf_part (op, mh);
1023     break;
1024   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1025     handle_p2p_done (op, mh);
1026     break;
1027   default:
1028     /* something wrong with cadet's message handlers? */
1029     GNUNET_assert (0);
1030   }
1031   return GNUNET_OK;
1032 }
1033
1034
1035 /**
1036  * handler for peer-disconnects, notifies the client about the aborted operation
1037  *
1038  * @param op the destroyed operation
1039  */
1040 static void
1041 intersection_peer_disconnect (struct Operation *op)
1042 {
1043   if (PHASE_FINISHED != op->state->phase)
1044   {
1045     struct GNUNET_MQ_Envelope *ev;
1046     struct GNUNET_SET_ResultMessage *msg;
1047
1048     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1049     msg->request_id = htonl (op->spec->client_request_id);
1050     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1051     msg->element_type = htons (0);
1052     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1053     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1054                 "other peer disconnected prematurely\n");
1055     _GSS_operation_destroy (op);
1056     return;
1057   }
1058   // else: the session has already been concluded
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "other peer disconnected (finished)\n");
1061   if (GNUNET_NO == op->state->client_done_sent)
1062     finish_and_destroy (op);
1063 }
1064
1065
1066 /**
1067  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1068  *
1069  * @param op union operation to destroy
1070  */
1071 static void
1072 intersection_op_cancel (struct Operation *op)
1073 {
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075               "destroying intersection op\n");
1076   /* check if the op was canceled twice */
1077   GNUNET_assert (NULL != op->state);
1078   if (NULL != op->state->remote_bf)
1079   {
1080     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1081     op->state->remote_bf = NULL;
1082   }
1083   if (NULL != op->state->local_bf)
1084   {
1085     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1086     op->state->local_bf = NULL;
1087   }
1088 /*  if (NULL != op->state->my_elements)
1089   {
1090     // no need to free the elements, they are still part of the set
1091     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1092     op->state->my_elements = NULL;
1093   }*/
1094   GNUNET_free (op->state);
1095   op->state = NULL;
1096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1097 }
1098
1099 const struct SetVT *
1100 _GSS_intersection_vt ()
1101 {
1102   static const struct SetVT intersection_vt = {
1103     .create = &intersection_set_create,
1104     .msg_handler = &intersection_handle_p2p_message,
1105     .add = &intersection_add,
1106     .remove = &intersection_remove,
1107     .destroy_set = &intersection_set_destroy,
1108     .evaluate = &intersection_evaluate,
1109     .accept = &intersection_accept,
1110     .peer_disconnect = &intersection_peer_disconnect,
1111     .cancel = &intersection_op_cancel,
1112   };
1113
1114   return &intersection_vt;
1115 }