9b327ec78c5974a42f40dfa96ab6726a3c502871
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             if (k<1) k=1; /* k can be calculated as 0 */\
39                             s = ceil((double) (A * k / log(2))); \
40                           } while (0)
41
42 /**
43  * Current phase we are in for a intersection operation.
44  */
45 enum IntersectionOperationPhase
46 {
47   /**
48    * Alices has suggested an operation to bob,
49    * and is waiting for a bf or session end.
50    */
51   PHASE_INITIAL,
52   /**
53    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
54    * until one notices the their element count is equal
55    */
56   PHASE_BF_EXCHANGE,
57   /**
58    * if both peers have an equal peercount, they enter this state for
59    * one more turn, to see if they actually have agreed on a correct set.
60    * if a peer finds the same element count after the next iteration,
61    * it ends the the session
62    */
63   PHASE_MAYBE_FINISHED,
64   /**
65    * The protocol is over.
66    * Results may still have to be sent to the client.
67    */
68   PHASE_FINISHED
69 };
70
71
72 /**
73  * State of an evaluate operation
74  * with another peer.
75  */
76 struct OperationState
77 {
78   /**
79    * The bf we currently receive
80    */
81   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
82
83   /**
84    * BF of the set's element.
85    */
86   struct GNUNET_CONTAINER_BloomFilter *local_bf;
87
88   /**
89    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
90    */
91   char * bf_data;
92
93   /**
94    * size of the bloomfilter
95    */
96   uint32_t bf_data_size;
97
98   /**
99    * size of the bloomfilter
100    */
101   uint32_t bf_bits_per_element;
102
103   /**
104    * Current state of the operation.
105    */
106   enum IntersectionOperationPhase phase;
107
108   /**
109    * Generation in which the operation handle
110    * was created.
111    */
112   unsigned int generation_created;
113
114   /**
115    * Maps element-id-hashes to 'elements in our set'.
116    */
117   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
118
119   /**
120    * Current element count contained within contained_elements
121    */
122   uint32_t my_element_count;
123
124   /**
125    * Iterator for sending elements on the key to element mapping to the client.
126    */
127   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
128
129   /**
130    * Evaluate operations are held in
131    * a linked list.
132    */
133   struct OperationState *next;
134
135    /**
136     * Evaluate operations are held in
137     * a linked list.
138     */
139   struct OperationState *prev;
140
141   /**
142    * Did we send the client that we are done?
143    */
144   int client_done_sent;
145 };
146
147
148 /**
149  * Extra state required for efficient set intersection.
150  */
151 struct SetState
152 {
153   /**
154    * Number of currently valid elements in the set which have not been removed
155    */
156   uint32_t current_set_element_count;
157 };
158
159
160 /**
161  * Send a result message to the client indicating
162  * we removed an element
163  *
164  * @param op union operation
165  * @param element element to send
166  */
167 static void
168 send_client_element (struct Operation *op,
169                      struct GNUNET_SET_Element *element)
170 {
171   struct GNUNET_MQ_Envelope *ev;
172   struct GNUNET_SET_ResultMessage *rm;
173
174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size);
175   GNUNET_assert (0 != op->spec->client_request_id);
176   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
177   if (NULL == ev)
178   {
179     GNUNET_MQ_discard (ev);
180     GNUNET_break (0);
181     return;
182   }
183   rm->result_status = htons (GNUNET_SET_STATUS_OK);
184   rm->request_id = htonl (op->spec->client_request_id);
185   rm->element_type = element->type;
186   memcpy (&rm[1], element->data, element->size);
187   GNUNET_MQ_send (op->spec->set->client_mq, ev);
188 }
189
190
191 /**
192  * Alice's version:
193  *
194  * fills the contained-elements hashmap with all relevant
195  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
196  *
197  * @param cls closure
198  * @param key current key code
199  * @param value value in the hash map
200  * @return #GNUNET_YES if we should continue to
201  *         iterate,
202  *         #GNUNET_NO if not.
203  */
204 static int
205 iterator_initialization_by_alice (void *cls,
206                                   const struct GNUNET_HashCode *key,
207                                   void *value)
208 {
209   struct ElementEntry *ee = value;
210   struct Operation *op = cls;
211   struct GNUNET_HashCode mutated_hash;
212
213   //only consider this element, if it is valid for us
214   if ((op->generation_created < ee->generation_removed)
215        && (op->generation_created >= ee->generation_added))
216     return GNUNET_YES;
217
218   // not contained according to bob's bloomfilter
219   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
220                            op->spec->salt,
221                            &mutated_hash);
222   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
223                                                       &mutated_hash)){
224     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
225       send_client_element (op, &ee->element);
226     return GNUNET_YES;
227   }
228
229   op->state->my_element_count++;
230   GNUNET_assert (GNUNET_YES ==
231                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
232                                                     &ee->element_hash, ee,
233                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
234
235   return GNUNET_YES;
236 }
237
238 /**
239  * fills the contained-elements hashmap with all relevant
240  * elements and adds their mutated hashes to our local bloomfilter
241  *
242  * @param cls closure
243  * @param key current key code
244  * @param value value in the hash map
245  * @return #GNUNET_YES if we should continue to
246  *         iterate,
247  *         #GNUNET_NO if not.
248  */
249 static int
250 iterator_initialization (void *cls,
251                          const struct GNUNET_HashCode *key,
252                          void *value)
253 {
254   struct ElementEntry *ee = value;
255   struct Operation *op = cls;
256
257   //only consider this element, if it is valid for us
258   if ((op->generation_created < ee->generation_removed)
259        && (op->generation_created >= ee->generation_added))
260     return GNUNET_YES;
261
262   GNUNET_assert (GNUNET_YES ==
263                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
264                                                     &ee->element_hash, ee,
265                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
266   return GNUNET_YES;
267 }
268
269
270 /**
271  * removes element from a hashmap if it is not contained within the
272  * provided remote bloomfilter. Then, fill our new bloomfilter.
273  *
274  * @param cls closure
275  * @param key current key code
276  * @param value value in the hash map
277  * @return #GNUNET_YES if we should continue to
278  *         iterate,
279  *         #GNUNET_NO if not.
280  */
281 static int
282 iterator_bf_reduce (void *cls,
283                    const struct GNUNET_HashCode *key,
284                    void *value)
285 {
286   struct ElementEntry *ee = value;
287   struct Operation *op = cls;
288   struct GNUNET_HashCode mutated_hash;
289
290   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
291
292   if (GNUNET_NO ==
293       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
294                                          &mutated_hash))
295   {
296     op->state->my_element_count--;
297     GNUNET_assert (GNUNET_YES ==
298                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
299                                                          &ee->element_hash,
300                                                          ee));
301     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
302       send_client_element (op, &ee->element);
303   }
304
305   return GNUNET_YES;
306 }
307
308 /**
309  * create a bloomfilter based on the elements given
310  *
311  * @param cls closure
312  * @param key current key code
313  * @param value value in the hash map
314  * @return #GNUNET_YES if we should continue to
315  *         iterate,
316  *         #GNUNET_NO if not.
317  */
318 static int
319 iterator_bf_create (void *cls,
320                    const struct GNUNET_HashCode *key,
321                    void *value)
322 {
323   struct ElementEntry *ee = value;
324   struct Operation *op = cls;
325   struct GNUNET_HashCode mutated_hash;
326
327   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
328
329   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
330                                     &mutated_hash);
331   return GNUNET_YES;
332 }
333
334 /**
335  * Inform the client that the union operation has failed,
336  * and proceed to destroy the evaluate operation.
337  *
338  * @param op the intersection operation to fail
339  */
340 static void
341 fail_intersection_operation (struct Operation *op)
342 {
343   struct GNUNET_MQ_Envelope *ev;
344   struct GNUNET_SET_ResultMessage *msg;
345
346   if (op->state->my_elements)
347     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
348
349   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
350
351   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
352   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
353   msg->request_id = htonl (op->spec->client_request_id);
354   msg->element_type = htons (0);
355   GNUNET_MQ_send (op->spec->set->client_mq, ev);
356   _GSS_operation_destroy (op);
357 }
358
359
360 /**
361  * Send a request for the evaluate operation to a remote peer
362  *
363  * @param op operation with the other peer
364  */
365 static void
366 send_operation_request (struct Operation *op)
367 {
368   struct GNUNET_MQ_Envelope *ev;
369   struct OperationRequestMessage *msg;
370
371   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
372                                 op->spec->context_msg);
373
374   if (NULL == ev)
375   {
376     /* the context message is too large */
377     GNUNET_break (0);
378     GNUNET_SERVER_client_disconnect (op->spec->set->client);
379     return;
380   }
381   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
382   msg->app_id = op->spec->app_id;
383   msg->salt = htonl (op->spec->salt);
384   msg->element_count = htonl(op->state->my_element_count);
385
386   GNUNET_MQ_send (op->mq, ev);
387
388   if (NULL != op->spec->context_msg)
389     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
390   else
391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
392
393   if (NULL != op->spec->context_msg)
394   {
395     GNUNET_free (op->spec->context_msg);
396     op->spec->context_msg = NULL;
397   }
398 }
399
400 static void
401 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
402 {
403   struct GNUNET_MQ_Envelope *ev;
404   struct BFPart *msg;
405   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
406   uint32_t todo_size = op->state->bf_data_size - offset;
407
408   if (todo_size < chunk_size)
409     chunk_size = todo_size;
410
411   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
412
413   msg->chunk_length = htonl (chunk_size);
414   msg->chunk_offset = htonl (offset);
415   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
416
417   GNUNET_MQ_send (op->mq, ev);
418
419   if (op->state->bf_data_size == offset + chunk_size)
420   {
421     // done
422     GNUNET_free(op->state->bf_data);
423     op->state->bf_data = NULL;
424     return;
425   }
426
427   send_bloomfilter_multipart (op, offset + chunk_size);
428 }
429
430 /**
431  * Send a bloomfilter to our peer.
432  * that the operation is over.
433  * After the result done message has been sent to the client,
434  * destroy the evaluate operation.
435  *
436  * @param op intersection operation
437  */
438 static void
439 send_bloomfilter (struct Operation *op)
440 {
441   struct GNUNET_MQ_Envelope *ev;
442   struct BFMessage *msg;
443   uint32_t bf_size;
444   uint32_t bf_elementbits;
445   uint32_t chunk_size;
446   struct GNUNET_CONTAINER_BloomFilter * local_bf;
447
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
449
450   CALCULATE_BF_SIZE(op->state->my_element_count,
451                     op->spec->remote_element_count,
452                     bf_size,
453                     bf_elementbits);
454
455   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
456                                                 bf_size,
457                                                 bf_elementbits);
458
459   op->spec->salt++;
460   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
461                                          &iterator_bf_create,
462                                          op);
463
464   // send our bloomfilter
465   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
466     // singlepart
467     chunk_size = bf_size;
468     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
469     GNUNET_assert (GNUNET_SYSERR !=
470                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
471                                                               (char*)&msg[1],
472                                                               bf_size));
473   }
474   else {
475     //multipart
476     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
477     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
478     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
479     GNUNET_assert (GNUNET_SYSERR !=
480                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
481                                                               op->state->bf_data,
482                                                               bf_size));
483     memcpy (&msg[1], op->state->bf_data, chunk_size);
484     op->state->bf_data_size = bf_size;
485   }
486   GNUNET_CONTAINER_bloomfilter_free (local_bf);
487
488   msg->sender_element_count = htonl (op->state->my_element_count);
489   msg->bloomfilter_total_length = htonl (bf_size);
490   msg->bloomfilter_length = htonl (chunk_size);
491   msg->bits_per_element = htonl (bf_elementbits);
492   msg->sender_mutator = htonl (op->spec->salt);
493
494   GNUNET_MQ_send (op->mq, ev);
495
496   if (op->state->bf_data)
497     send_bloomfilter_multipart (op, chunk_size);
498 }
499
500
501 /**
502  * Signal to the client that the operation has finished and
503  * destroy the operation.
504  *
505  * @param cls operation to destroy
506  */
507 static void
508 send_client_done_and_destroy (void *cls)
509 {
510   struct Operation *op = cls;
511   struct GNUNET_MQ_Envelope *ev;
512   struct GNUNET_SET_ResultMessage *rm;
513   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
514   rm->request_id = htonl (op->spec->client_request_id);
515   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
516   rm->element_type = htons (0);
517   GNUNET_MQ_send (op->spec->set->client_mq, ev);
518   _GSS_operation_destroy (op);
519 }
520
521
522 /**
523  * Send all elements in the full result iterator.
524  *
525  * @param cls operation
526  */
527 static void
528 send_remaining_elements (void *cls)
529 {
530   struct Operation *op = cls;
531   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
532   struct GNUNET_MQ_Envelope *ev;
533   struct GNUNET_SET_ResultMessage *rm;
534   struct GNUNET_SET_Element *element;
535   int res;
536
537   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
538   if (GNUNET_NO == res) {
539     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
540     send_client_done_and_destroy (op);
541     return;
542   }
543
544   element = &remaining->element;
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
546   GNUNET_assert (0 != op->spec->client_request_id);
547
548   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
549   GNUNET_assert (NULL != ev);
550
551   rm->result_status = htons (GNUNET_SET_STATUS_OK);
552   rm->request_id = htonl (op->spec->client_request_id);
553   rm->element_type = element->type;
554   memcpy (&rm[1], element->data, element->size);
555
556   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
557   GNUNET_MQ_send (op->spec->set->client_mq, ev);
558 }
559
560
561 /**
562  * Inform the peer that this operation is complete.
563  *
564  * @param op the intersection operation to fail
565  */
566 static void
567 send_peer_done (struct Operation *op)
568 {
569   struct GNUNET_MQ_Envelope *ev;
570
571   op->state->phase = PHASE_FINISHED;
572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
573   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
574   op->state->local_bf = NULL;
575
576   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
577   GNUNET_MQ_send (op->mq, ev);
578 }
579
580
581 /**
582  * Process a Bloomfilter once we got all the chunks
583  *
584  * @param op the intersection operation
585  */
586 static void
587 process_bf (struct Operation *op){
588   uint32_t old_elements;
589   uint32_t peer_elements;
590
591   old_elements = op->state->my_element_count;
592   peer_elements = op->spec->remote_element_count;
593   switch (op->state->phase)
594   {
595   case PHASE_INITIAL:
596     // If we are ot our first msg
597     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
598
599     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
600                                            &iterator_initialization_by_alice,
601                                            op);
602     break;
603   case PHASE_BF_EXCHANGE:
604   case PHASE_MAYBE_FINISHED:
605     // if we are bob or alice and are continuing operation
606     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
607                                            &iterator_bf_reduce,
608                                            op);
609     break;
610   default:
611     GNUNET_break_op (0);
612     fail_intersection_operation(op);
613   }
614   // the iterators created a new BF with salt+1
615   // the peer needs this information for decoding the next BF
616   // this behavior can be modified at will later on.
617   op->spec->salt++;
618
619   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
620   op->state->remote_bf = NULL;
621
622   if ((0 == op->state->my_element_count) // fully disjoint
623       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
624           && (old_elements == op->state->my_element_count)
625           && (op->state->my_element_count == peer_elements))) {
626     // In the last round we though we were finished, we now know this is correct
627     send_peer_done (op);
628     return;
629   }
630
631   op->state->phase = PHASE_BF_EXCHANGE;
632   if (op->state->my_element_count == peer_elements)
633     // maybe we are finished, but we do one more round to make certain
634     // we don't have false positives ...
635     op->state->phase = PHASE_MAYBE_FINISHED;
636
637   send_bloomfilter (op);
638 }
639
640
641 /**
642  * Handle an BF multipart message from a remote peer.
643  *
644  * @param cls the intersection operation
645  * @param mh the header of the message
646  */
647 static void
648 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
649 {
650   struct Operation *op = cls;
651   const struct BFPart *msg = (const struct BFPart *) mh;
652   uint32_t chunk_size;
653   uint32_t chunk_offset;
654
655   chunk_size = ntohl(msg->chunk_length);
656   chunk_offset = ntohl(msg->chunk_offset);
657
658   if ((NULL == op->state->bf_data)
659        || (op->state->bf_data_size < chunk_size + chunk_offset)){
660     // unexpected multipart chunk
661     GNUNET_break_op (0);
662     fail_intersection_operation(op);
663     return;
664   }
665
666   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
667
668   if (op->state->bf_data_size != chunk_offset + chunk_size)
669     // wait for next chunk
670     return;
671
672   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
673                                                             op->state->bf_data_size,
674                                                             op->state->bf_bits_per_element);
675
676   GNUNET_free (op->state->bf_data);
677   op->state->bf_data = NULL;
678
679   process_bf (op);
680 }
681
682
683 /**
684  * Handle an BF message from a remote peer.
685  *
686  * @param cls the intersection operation
687  * @param mh the header of the message
688  */
689 static void
690 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
691 {
692   struct Operation *op = cls;
693   const struct BFMessage *msg = (const struct BFMessage *) mh;
694   uint32_t bf_size;
695   uint32_t chunk_size;
696   uint32_t bf_bits_per_element;
697
698   switch (op->state->phase)
699   {
700   case PHASE_INITIAL:
701   case PHASE_BF_EXCHANGE:
702   case PHASE_MAYBE_FINISHED:
703     if (NULL == op->state->bf_data) {
704       // no colliding multipart transaction going on currently
705       op->spec->salt = ntohl (msg->sender_mutator);
706       bf_size = ntohl (msg->bloomfilter_total_length);
707       bf_bits_per_element = ntohl (msg->bits_per_element);
708       chunk_size = ntohl (msg->bloomfilter_length);
709       op->spec->remote_element_count = ntohl(msg->sender_element_count);
710       if (bf_size == chunk_size) {
711         // single part, done here
712         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
713                                                                   bf_size,
714                                                                   bf_bits_per_element);
715         process_bf (op);
716         return;
717       }
718
719       //first multipart chunk
720       op->state->bf_data = GNUNET_malloc (bf_size);
721       op->state->bf_data_size = bf_size;
722       op->state->bf_bits_per_element = bf_bits_per_element;
723       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
724       return;
725     }
726   default:
727     GNUNET_break_op (0);
728     fail_intersection_operation (op);
729   }
730 }
731
732
733 /**
734  * Handle an BF message from a remote peer.
735  *
736  * @param cls the intersection operation
737  * @param mh the header of the message
738  */
739 static void
740 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
741 {
742   struct Operation *op = cls;
743   struct BFMessage *msg = (struct BFMessage *) mh;
744
745   op->spec->remote_element_count = ntohl(msg->sender_element_count);
746   if ((op->state->phase != PHASE_INITIAL)
747       || (op->state->my_element_count > op->spec->remote_element_count)){
748     GNUNET_break_op (0);
749     fail_intersection_operation(op);
750   }
751
752   op->state->phase = PHASE_BF_EXCHANGE;
753   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
754
755   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
756                                          &iterator_initialization,
757                                          op);
758
759   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
760   op->state->remote_bf = NULL;
761
762   if (op->state->my_element_count == ntohl (msg->sender_element_count))
763     op->state->phase = PHASE_MAYBE_FINISHED;
764
765   send_bloomfilter (op);
766 }
767
768
769 /**
770  * Send our element count to the peer, in case our element count is lower than his
771  *
772  * @param op intersection operation
773  */
774 static void
775 send_element_count (struct Operation *op)
776 {
777   struct GNUNET_MQ_Envelope *ev;
778   struct BFMessage *msg;
779
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
781
782   // just send our element count, as the other peer must start
783   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
784   msg->sender_element_count = htonl (op->state->my_element_count);
785   msg->bloomfilter_length = htonl (0);
786   msg->sender_mutator = htonl (0);
787
788   GNUNET_MQ_send (op->mq, ev);
789 }
790
791
792 /**
793  * Send a result message to the client indicating
794  * that the operation is over.
795  * After the result done message has been sent to the client,
796  * destroy the evaluate operation.
797  *
798  * @param op intersection operation
799  */
800 static void
801 finish_and_destroy (struct Operation *op)
802 {
803   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
804
805   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
806   {
807     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
808     op->state->full_result_iter =
809         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
810     send_remaining_elements (op);
811     return;
812   }
813   send_client_done_and_destroy (op);
814 }
815
816
817 /**
818  * Handle a done message from a remote peer
819  *
820  * @param cls the union operation
821  * @param mh the message
822  */
823 static void
824 handle_p2p_done (void *cls,
825                  const struct GNUNET_MessageHeader *mh)
826 {
827   struct Operation *op = cls;
828
829   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
830     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
831
832     finish_and_destroy (op);
833     return;
834   }
835
836   GNUNET_break_op (0);
837   fail_intersection_operation (op);
838 }
839
840
841 /**
842  * Evaluate a union operation with
843  * a remote peer.
844  *
845  * @param op operation to evaluate
846  */
847 static void
848 intersection_evaluate (struct Operation *op)
849 {
850   op->state = GNUNET_new (struct OperationState);
851   /* we started the operation, thus we have to send the operation request */
852   op->state->phase = PHASE_INITIAL;
853   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
854   op->state->my_element_count = op->spec->set->state->current_set_element_count;
855
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "evaluating intersection operation");
858   send_operation_request (op);
859 }
860
861
862 /**
863  * Accept an union operation request from a remote peer.
864  * Only initializes the private operation state.
865  *
866  * @param op operation that will be accepted as a union operation
867  */
868 static void
869 intersection_accept (struct Operation *op)
870 {
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
872   op->state = GNUNET_new (struct OperationState);
873   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
874   op->state->my_element_count = op->spec->set->state->current_set_element_count;
875
876   // if Alice (the peer) has more elements than Bob (us), she should start
877   if (op->spec->remote_element_count < op->state->my_element_count){
878     op->state->phase = PHASE_INITIAL;
879     send_element_count(op);
880     return;
881   }
882   // create a new bloomfilter in case we have fewer elements
883   op->state->phase = PHASE_BF_EXCHANGE;
884   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
885                                                            BLOOMFILTER_SIZE,
886                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
887   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
888                                          &iterator_initialization,
889                                          op);
890   send_bloomfilter (op);
891 }
892
893
894 /**
895  * Create a new set supporting the intersection operation
896  *
897  * @return the newly created set
898  */
899 static struct SetState *
900 intersection_set_create ()
901 {
902   struct SetState *set_state;
903
904   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905               "intersection set created\n");
906   set_state = GNUNET_new (struct SetState);
907   set_state->current_set_element_count = 0;
908
909   return set_state;
910 }
911
912
913 /**
914  * Add the element from the given element message to the set.
915  *
916  * @param set_state state of the set want to add to
917  * @param ee the element to add to the set
918  */
919 static void
920 intersection_add (struct SetState *set_state,
921                   struct ElementEntry *ee)
922 {
923   set_state->current_set_element_count++;
924 }
925
926
927 /**
928  * Destroy a set that supports the intersection operation
929  *
930  * @param set_state the set to destroy
931  */
932 static void
933 intersection_set_destroy (struct SetState *set_state)
934 {
935   GNUNET_free (set_state);
936 }
937
938
939 /**
940  * Remove the element given in the element message from the set.
941  *
942  * @param set_state state of the set to remove from
943  * @param element set element to remove
944  */
945 static void
946 intersection_remove (struct SetState *set_state,
947                      struct ElementEntry *element)
948 {
949   GNUNET_assert(0 < set_state->current_set_element_count);
950   set_state->current_set_element_count--;
951 }
952
953
954 /**
955  * Dispatch messages for a intersection operation.
956  *
957  * @param op the state of the intersection evaluate operation
958  * @param mh the received message
959  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
960  *         #GNUNET_OK otherwise
961  */
962 static int
963 intersection_handle_p2p_message (struct Operation *op,
964                                  const struct GNUNET_MessageHeader *mh)
965 {
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
967               ntohs (mh->type), ntohs (mh->size));
968   switch (ntohs (mh->type))
969   {
970     /* this message handler is not active until after we received an
971      * operation request message, thus the ops request is not handled here
972      */
973   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
974     handle_p2p_element_info (op, mh);
975     break;
976   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
977     handle_p2p_bf (op, mh);
978     break;
979   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
980     handle_p2p_bf_part (op, mh);
981     break;
982   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
983     handle_p2p_done (op, mh);
984     break;
985   default:
986     /* something wrong with cadet's message handlers? */
987     GNUNET_assert (0);
988   }
989   return GNUNET_OK;
990 }
991
992
993 /**
994  * handler for peer-disconnects, notifies the client about the aborted operation
995  *
996  * @param op the destroyed operation
997  */
998 static void
999 intersection_peer_disconnect (struct Operation *op)
1000 {
1001   if (PHASE_FINISHED != op->state->phase)
1002   {
1003     struct GNUNET_MQ_Envelope *ev;
1004     struct GNUNET_SET_ResultMessage *msg;
1005
1006     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1007     msg->request_id = htonl (op->spec->client_request_id);
1008     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1009     msg->element_type = htons (0);
1010     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1011     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1012     _GSS_operation_destroy (op);
1013     return;
1014   }
1015   // else: the session has already been concluded
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1017   if (GNUNET_NO == op->state->client_done_sent)
1018     finish_and_destroy (op);
1019 }
1020
1021
1022 /**
1023  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1024  *
1025  * @param op union operation to destroy
1026  */
1027 static void
1028 intersection_op_cancel (struct Operation *op)
1029 {
1030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
1031   /* check if the op was canceled twice */
1032   GNUNET_assert (NULL != op->state);
1033   if (NULL != op->state->remote_bf)
1034   {
1035     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1036     op->state->remote_bf = NULL;
1037   }
1038   if (NULL != op->state->local_bf)
1039   {
1040     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1041     op->state->local_bf = NULL;
1042   }
1043   if (NULL != op->state->my_elements)
1044   {
1045     // no need to free the elements, they are still part of the set
1046     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1047     op->state->my_elements = NULL;
1048   }
1049   GNUNET_free (op->state);
1050   op->state = NULL;
1051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1052 }
1053
1054 const struct SetVT *
1055 _GSS_intersection_vt ()
1056 {
1057   static const struct SetVT intersection_vt = {
1058     .create = &intersection_set_create,
1059     .msg_handler = &intersection_handle_p2p_message,
1060     .add = &intersection_add,
1061     .remove = &intersection_remove,
1062     .destroy_set = &intersection_set_destroy,
1063     .evaluate = &intersection_evaluate,
1064     .accept = &intersection_accept,
1065     .peer_disconnect = &intersection_peer_disconnect,
1066     .cancel = &intersection_op_cancel,
1067   };
1068
1069   return &intersection_vt;
1070 }