Rename mesh->cadet
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation
73  * with another peer.
74  */
75 struct OperationState
76 {
77   /**
78    * The bf we currently receive
79    */
80   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
81
82   /**
83    * BF of the set's element.
84    */
85   struct GNUNET_CONTAINER_BloomFilter *local_bf;
86
87   /**
88    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
89    */
90   char * bf_data;
91
92   /**
93    * size of the bloomfilter
94    */
95   uint32_t bf_data_size;
96
97   /**
98    * size of the bloomfilter
99    */
100   uint32_t bf_bits_per_element;
101
102   /**
103    * Current state of the operation.
104    */
105   enum IntersectionOperationPhase phase;
106
107   /**
108    * Generation in which the operation handle
109    * was created.
110    */
111   unsigned int generation_created;
112
113   /**
114    * Maps element-id-hashes to 'elements in our set'.
115    */
116   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
117
118   /**
119    * Current element count contained within contained_elements
120    */
121   uint32_t my_element_count;
122
123   /**
124    * Iterator for sending elements on the key to element mapping to the client.
125    */
126   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
127
128   /**
129    * Evaluate operations are held in
130    * a linked list.
131    */
132   struct OperationState *next;
133
134    /**
135     * Evaluate operations are held in
136     * a linked list.
137     */
138   struct OperationState *prev;
139
140   /**
141    * Did we send the client that we are done?
142    */
143   int client_done_sent;
144 };
145
146
147 /**
148  * Extra state required for efficient set intersection.
149  */
150 struct SetState
151 {
152   /**
153    * Number of currently valid elements in the set which have not been removed
154    */
155   uint32_t current_set_element_count;
156 };
157
158
159 /**
160  * Send a result message to the client indicating
161  * we removed an element
162  *
163  * @param op union operation
164  * @param element element to send
165  */
166 static void
167 send_client_element (struct Operation *op,
168                      struct GNUNET_SET_Element *element)
169 {
170   struct GNUNET_MQ_Envelope *ev;
171   struct GNUNET_SET_ResultMessage *rm;
172
173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size);
174   GNUNET_assert (0 != op->spec->client_request_id);
175   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
176   if (NULL == ev)
177   {
178     GNUNET_MQ_discard (ev);
179     GNUNET_break (0);
180     return;
181   }
182   rm->result_status = htons (GNUNET_SET_STATUS_OK);
183   rm->request_id = htonl (op->spec->client_request_id);
184   rm->element_type = element->type;
185   memcpy (&rm[1], element->data, element->size);
186   GNUNET_MQ_send (op->spec->set->client_mq, ev);
187 }
188
189
190 /**
191  * Alice's version:
192  *
193  * fills the contained-elements hashmap with all relevant
194  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
195  *
196  * @param cls closure
197  * @param key current key code
198  * @param value value in the hash map
199  * @return #GNUNET_YES if we should continue to
200  *         iterate,
201  *         #GNUNET_NO if not.
202  */
203 static int
204 iterator_initialization_by_alice (void *cls,
205                                   const struct GNUNET_HashCode *key,
206                                   void *value)
207 {
208   struct ElementEntry *ee = value;
209   struct Operation *op = cls;
210   struct GNUNET_HashCode mutated_hash;
211
212   //only consider this element, if it is valid for us
213   if ((op->generation_created < ee->generation_removed)
214        && (op->generation_created >= ee->generation_added))
215     return GNUNET_YES;
216
217   // not contained according to bob's bloomfilter
218   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
219                            op->spec->salt,
220                            &mutated_hash);
221   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
222                                                       &mutated_hash)){
223     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
224       send_client_element (op, &ee->element);
225     return GNUNET_YES;
226   }
227
228   op->state->my_element_count++;
229   GNUNET_assert (GNUNET_YES ==
230                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
231                                                     &ee->element_hash, ee,
232                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
233
234   return GNUNET_YES;
235 }
236
237 /**
238  * fills the contained-elements hashmap with all relevant
239  * elements and adds their mutated hashes to our local bloomfilter
240  *
241  * @param cls closure
242  * @param key current key code
243  * @param value value in the hash map
244  * @return #GNUNET_YES if we should continue to
245  *         iterate,
246  *         #GNUNET_NO if not.
247  */
248 static int
249 iterator_initialization (void *cls,
250                          const struct GNUNET_HashCode *key,
251                          void *value)
252 {
253   struct ElementEntry *ee = value;
254   struct Operation *op = cls;
255
256   //only consider this element, if it is valid for us
257   if ((op->generation_created < ee->generation_removed)
258        && (op->generation_created >= ee->generation_added))
259     return GNUNET_YES;
260
261   GNUNET_assert (GNUNET_YES ==
262                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
263                                                     &ee->element_hash, ee,
264                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
265   return GNUNET_YES;
266 }
267
268
269 /**
270  * removes element from a hashmap if it is not contained within the
271  * provided remote bloomfilter. Then, fill our new bloomfilter.
272  *
273  * @param cls closure
274  * @param key current key code
275  * @param value value in the hash map
276  * @return #GNUNET_YES if we should continue to
277  *         iterate,
278  *         #GNUNET_NO if not.
279  */
280 static int
281 iterator_bf_reduce (void *cls,
282                    const struct GNUNET_HashCode *key,
283                    void *value)
284 {
285   struct ElementEntry *ee = value;
286   struct Operation *op = cls;
287   struct GNUNET_HashCode mutated_hash;
288
289   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
290
291   if (GNUNET_NO ==
292       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
293                                          &mutated_hash))
294   {
295     op->state->my_element_count--;
296     GNUNET_assert (GNUNET_YES ==
297                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
298                                                          &ee->element_hash,
299                                                          ee));
300     if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
301       send_client_element (op, &ee->element);
302   }
303
304   return GNUNET_YES;
305 }
306
307 /**
308  * create a bloomfilter based on the elements given
309  *
310  * @param cls closure
311  * @param key current key code
312  * @param value value in the hash map
313  * @return #GNUNET_YES if we should continue to
314  *         iterate,
315  *         #GNUNET_NO if not.
316  */
317 static int
318 iterator_bf_create (void *cls,
319                    const struct GNUNET_HashCode *key,
320                    void *value)
321 {
322   struct ElementEntry *ee = value;
323   struct Operation *op = cls;
324   struct GNUNET_HashCode mutated_hash;
325
326   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
327
328   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
329                                     &mutated_hash);
330   return GNUNET_YES;
331 }
332
333 /**
334  * Inform the client that the union operation has failed,
335  * and proceed to destroy the evaluate operation.
336  *
337  * @param op the intersection operation to fail
338  */
339 static void
340 fail_intersection_operation (struct Operation *op)
341 {
342   struct GNUNET_MQ_Envelope *ev;
343   struct GNUNET_SET_ResultMessage *msg;
344
345   if (op->state->my_elements)
346     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
347
348   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
349
350   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
351   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
352   msg->request_id = htonl (op->spec->client_request_id);
353   msg->element_type = htons (0);
354   GNUNET_MQ_send (op->spec->set->client_mq, ev);
355   _GSS_operation_destroy (op);
356 }
357
358
359 /**
360  * Send a request for the evaluate operation to a remote peer
361  *
362  * @param op operation with the other peer
363  */
364 static void
365 send_operation_request (struct Operation *op)
366 {
367   struct GNUNET_MQ_Envelope *ev;
368   struct OperationRequestMessage *msg;
369
370   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
371                                 op->spec->context_msg);
372
373   if (NULL == ev)
374   {
375     /* the context message is too large */
376     GNUNET_break (0);
377     GNUNET_SERVER_client_disconnect (op->spec->set->client);
378     return;
379   }
380   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
381   msg->app_id = op->spec->app_id;
382   msg->salt = htonl (op->spec->salt);
383   msg->element_count = htonl(op->state->my_element_count);
384
385   GNUNET_MQ_send (op->mq, ev);
386
387   if (NULL != op->spec->context_msg)
388     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
389   else
390     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
391
392   if (NULL != op->spec->context_msg)
393   {
394     GNUNET_free (op->spec->context_msg);
395     op->spec->context_msg = NULL;
396   }
397 }
398
399 static void
400 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
401 {
402   struct GNUNET_MQ_Envelope *ev;
403   struct BFPart *msg;
404   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
405   uint32_t todo_size = op->state->bf_data_size - offset;
406
407   if (todo_size < chunk_size)
408     chunk_size = todo_size;
409
410   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
411
412   msg->chunk_length = htonl (chunk_size);
413   msg->chunk_offset = htonl (offset);
414   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
415
416   GNUNET_MQ_send (op->mq, ev);
417
418   if (op->state->bf_data_size == offset + chunk_size)
419   {
420     // done
421     GNUNET_free(op->state->bf_data);
422     op->state->bf_data = NULL;
423     return;
424   }
425
426   send_bloomfilter_multipart (op, offset + chunk_size);
427 }
428
429 /**
430  * Send a bloomfilter to our peer.
431  * that the operation is over.
432  * After the result done message has been sent to the client,
433  * destroy the evaluate operation.
434  *
435  * @param op intersection operation
436  */
437 static void
438 send_bloomfilter (struct Operation *op)
439 {
440   struct GNUNET_MQ_Envelope *ev;
441   struct BFMessage *msg;
442   uint32_t bf_size;
443   uint32_t bf_elementbits;
444   uint32_t chunk_size;
445   struct GNUNET_CONTAINER_BloomFilter * local_bf;
446
447   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
448
449   CALCULATE_BF_SIZE(op->state->my_element_count,
450                     op->spec->remote_element_count,
451                     bf_size,
452                     bf_elementbits);
453
454   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
455                                                 bf_size,
456                                                 bf_elementbits);
457
458   op->spec->salt++;
459   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
460                                          &iterator_bf_create,
461                                          op);
462
463   // send our bloomfilter
464   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
465     // singlepart
466     chunk_size = bf_size;
467     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
468     GNUNET_assert (GNUNET_SYSERR !=
469                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
470                                                               (char*)&msg[1],
471                                                               bf_size));
472   }
473   else {
474     //multipart
475     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
476     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
477     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
478     GNUNET_assert (GNUNET_SYSERR !=
479                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
480                                                               op->state->bf_data,
481                                                               bf_size));
482     memcpy (&msg[1], op->state->bf_data, chunk_size);
483     op->state->bf_data_size = bf_size;
484   }
485   GNUNET_CONTAINER_bloomfilter_free (local_bf);
486
487   msg->sender_element_count = htonl (op->state->my_element_count);
488   msg->bloomfilter_total_length = htonl (bf_size);
489   msg->bloomfilter_length = htonl (chunk_size);
490   msg->bits_per_element = htonl (bf_elementbits);
491   msg->sender_mutator = htonl (op->spec->salt);
492
493   GNUNET_MQ_send (op->mq, ev);
494
495   if (op->state->bf_data)
496     send_bloomfilter_multipart (op, chunk_size);
497 }
498
499
500 /**
501  * Signal to the client that the operation has finished and
502  * destroy the operation.
503  *
504  * @param cls operation to destroy
505  */
506 static void
507 send_client_done_and_destroy (void *cls)
508 {
509   struct Operation *op = cls;
510   struct GNUNET_MQ_Envelope *ev;
511   struct GNUNET_SET_ResultMessage *rm;
512   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
513   rm->request_id = htonl (op->spec->client_request_id);
514   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
515   rm->element_type = htons (0);
516   GNUNET_MQ_send (op->spec->set->client_mq, ev);
517   _GSS_operation_destroy (op);
518 }
519
520
521 /**
522  * Send all elements in the full result iterator.
523  *
524  * @param cls operation
525  */
526 static void
527 send_remaining_elements (void *cls)
528 {
529   struct Operation *op = cls;
530   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
531   struct GNUNET_MQ_Envelope *ev;
532   struct GNUNET_SET_ResultMessage *rm;
533   struct GNUNET_SET_Element *element;
534   int res;
535
536   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
537   if (GNUNET_NO == res) {
538     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
539     send_client_done_and_destroy (op);
540     return;
541   }
542
543   element = &remaining->element;
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
545   GNUNET_assert (0 != op->spec->client_request_id);
546
547   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
548   GNUNET_assert (NULL != ev);
549
550   rm->result_status = htons (GNUNET_SET_STATUS_OK);
551   rm->request_id = htonl (op->spec->client_request_id);
552   rm->element_type = element->type;
553   memcpy (&rm[1], element->data, element->size);
554
555   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
556   GNUNET_MQ_send (op->spec->set->client_mq, ev);
557 }
558
559
560 /**
561  * Inform the peer that this operation is complete.
562  *
563  * @param op the intersection operation to fail
564  */
565 static void
566 send_peer_done (struct Operation *op)
567 {
568   struct GNUNET_MQ_Envelope *ev;
569
570   op->state->phase = PHASE_FINISHED;
571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
572   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
573   op->state->local_bf = NULL;
574
575   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
576   GNUNET_MQ_send (op->mq, ev);
577 }
578
579
580 /**
581  * Process a Bloomfilter once we got all the chunks
582  *
583  * @param op the intersection operation
584  */
585 static void
586 process_bf (struct Operation *op){
587   uint32_t old_elements;
588   uint32_t peer_elements;
589
590   old_elements = op->state->my_element_count;
591   peer_elements = op->spec->remote_element_count;
592   switch (op->state->phase)
593   {
594   case PHASE_INITIAL:
595     // If we are ot our first msg
596     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
597
598     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
599                                            &iterator_initialization_by_alice,
600                                            op);
601     break;
602   case PHASE_BF_EXCHANGE:
603   case PHASE_MAYBE_FINISHED:
604     // if we are bob or alice and are continuing operation
605     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
606                                            &iterator_bf_reduce,
607                                            op);
608     break;
609   default:
610     GNUNET_break_op (0);
611     fail_intersection_operation(op);
612   }
613   // the iterators created a new BF with salt+1
614   // the peer needs this information for decoding the next BF
615   // this behavior can be modified at will later on.
616   op->spec->salt++;
617
618   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
619   op->state->remote_bf = NULL;
620
621   if ((0 == op->state->my_element_count) // fully disjoint
622       || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
623           && (old_elements == op->state->my_element_count)
624           && (op->state->my_element_count == peer_elements))) {
625     // In the last round we though we were finished, we now know this is correct
626     send_peer_done (op);
627     return;
628   }
629
630   op->state->phase = PHASE_BF_EXCHANGE;
631   if (op->state->my_element_count == peer_elements)
632     // maybe we are finished, but we do one more round to make certain
633     // we don't have false positives ...
634     op->state->phase = PHASE_MAYBE_FINISHED;
635
636   send_bloomfilter (op);
637 }
638
639
640 /**
641  * Handle an BF multipart message from a remote peer.
642  *
643  * @param cls the intersection operation
644  * @param mh the header of the message
645  */
646 static void
647 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
648 {
649   struct Operation *op = cls;
650   const struct BFPart *msg = (const struct BFPart *) mh;
651   uint32_t chunk_size;
652   uint32_t chunk_offset;
653
654   chunk_size = ntohl(msg->chunk_length);
655   chunk_offset = ntohl(msg->chunk_offset);
656
657   if ((NULL == op->state->bf_data)
658        || (op->state->bf_data_size < chunk_size + chunk_offset)){
659     // unexpected multipart chunk
660     GNUNET_break_op (0);
661     fail_intersection_operation(op);
662     return;
663   }
664
665   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
666
667   if (op->state->bf_data_size != chunk_offset + chunk_size)
668     // wait for next chunk
669     return;
670
671   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
672                                                             op->state->bf_data_size,
673                                                             op->state->bf_bits_per_element);
674
675   GNUNET_free (op->state->bf_data);
676   op->state->bf_data = NULL;
677
678   process_bf (op);
679 }
680
681
682 /**
683  * Handle an BF message from a remote peer.
684  *
685  * @param cls the intersection operation
686  * @param mh the header of the message
687  */
688 static void
689 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
690 {
691   struct Operation *op = cls;
692   const struct BFMessage *msg = (const struct BFMessage *) mh;
693   uint32_t bf_size;
694   uint32_t chunk_size;
695   uint32_t bf_bits_per_element;
696
697   switch (op->state->phase)
698   {
699   case PHASE_INITIAL:
700   case PHASE_BF_EXCHANGE:
701   case PHASE_MAYBE_FINISHED:
702     if (NULL == op->state->bf_data) {
703       // no colliding multipart transaction going on currently
704       op->spec->salt = ntohl (msg->sender_mutator);
705       bf_size = ntohl (msg->bloomfilter_total_length);
706       bf_bits_per_element = ntohl (msg->bits_per_element);
707       chunk_size = ntohl (msg->bloomfilter_length);
708       op->spec->remote_element_count = ntohl(msg->sender_element_count);
709       if (bf_size == chunk_size) {
710         // single part, done here
711         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
712                                                                   bf_size,
713                                                                   bf_bits_per_element);
714         process_bf (op);
715         return;
716       }
717
718       //first multipart chunk
719       op->state->bf_data = GNUNET_malloc (bf_size);
720       op->state->bf_data_size = bf_size;
721       op->state->bf_bits_per_element = bf_bits_per_element;
722       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
723       return;
724     }
725   default:
726     GNUNET_break_op (0);
727     fail_intersection_operation (op);
728   }
729 }
730
731
732 /**
733  * Handle an BF message from a remote peer.
734  *
735  * @param cls the intersection operation
736  * @param mh the header of the message
737  */
738 static void
739 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
740 {
741   struct Operation *op = cls;
742   struct BFMessage *msg = (struct BFMessage *) mh;
743
744   op->spec->remote_element_count = ntohl(msg->sender_element_count);
745   if ((op->state->phase != PHASE_INITIAL)
746       || (op->state->my_element_count > op->spec->remote_element_count)){
747     GNUNET_break_op (0);
748     fail_intersection_operation(op);
749   }
750
751   op->state->phase = PHASE_BF_EXCHANGE;
752   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
753
754   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
755                                          &iterator_initialization,
756                                          op);
757
758   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
759   op->state->remote_bf = NULL;
760
761   if (op->state->my_element_count == ntohl (msg->sender_element_count))
762     op->state->phase = PHASE_MAYBE_FINISHED;
763
764   send_bloomfilter (op);
765 }
766
767
768 /**
769  * Send our element count to the peer, in case our element count is lower than his
770  *
771  * @param op intersection operation
772  */
773 static void
774 send_element_count (struct Operation *op)
775 {
776   struct GNUNET_MQ_Envelope *ev;
777   struct BFMessage *msg;
778
779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
780
781   // just send our element count, as the other peer must start
782   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
783   msg->sender_element_count = htonl (op->state->my_element_count);
784   msg->bloomfilter_length = htonl (0);
785   msg->sender_mutator = htonl (0);
786
787   GNUNET_MQ_send (op->mq, ev);
788 }
789
790
791 /**
792  * Send a result message to the client indicating
793  * that the operation is over.
794  * After the result done message has been sent to the client,
795  * destroy the evaluate operation.
796  *
797  * @param op intersection operation
798  */
799 static void
800 finish_and_destroy (struct Operation *op)
801 {
802   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
803
804   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
805   {
806     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
807     op->state->full_result_iter =
808         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
809     send_remaining_elements (op);
810     return;
811   }
812   send_client_done_and_destroy (op);
813 }
814
815
816 /**
817  * Handle a done message from a remote peer
818  *
819  * @param cls the union operation
820  * @param mh the message
821  */
822 static void
823 handle_p2p_done (void *cls,
824                  const struct GNUNET_MessageHeader *mh)
825 {
826   struct Operation *op = cls;
827
828   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
829     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
830
831     finish_and_destroy (op);
832     return;
833   }
834
835   GNUNET_break_op (0);
836   fail_intersection_operation (op);
837 }
838
839
840 /**
841  * Evaluate a union operation with
842  * a remote peer.
843  *
844  * @param op operation to evaluate
845  */
846 static void
847 intersection_evaluate (struct Operation *op)
848 {
849   op->state = GNUNET_new (struct OperationState);
850   /* we started the operation, thus we have to send the operation request */
851   op->state->phase = PHASE_INITIAL;
852   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
853   op->state->my_element_count = op->spec->set->state->current_set_element_count;
854
855   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856               "evaluating intersection operation");
857   send_operation_request (op);
858 }
859
860
861 /**
862  * Accept an union operation request from a remote peer.
863  * Only initializes the private operation state.
864  *
865  * @param op operation that will be accepted as a union operation
866  */
867 static void
868 intersection_accept (struct Operation *op)
869 {
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
871   op->state = GNUNET_new (struct OperationState);
872   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
873   op->state->my_element_count = op->spec->set->state->current_set_element_count;
874
875   // if Alice (the peer) has more elements than Bob (us), she should start
876   if (op->spec->remote_element_count < op->state->my_element_count){
877     op->state->phase = PHASE_INITIAL;
878     send_element_count(op);
879     return;
880   }
881   // create a new bloomfilter in case we have fewer elements
882   op->state->phase = PHASE_BF_EXCHANGE;
883   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
884                                                            BLOOMFILTER_SIZE,
885                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
886   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
887                                          &iterator_initialization,
888                                          op);
889   send_bloomfilter (op);
890 }
891
892
893 /**
894  * Create a new set supporting the intersection operation
895  *
896  * @return the newly created set
897  */
898 static struct SetState *
899 intersection_set_create ()
900 {
901   struct SetState *set_state;
902
903   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904               "intersection set created\n");
905   set_state = GNUNET_new (struct SetState);
906   set_state->current_set_element_count = 0;
907
908   return set_state;
909 }
910
911
912 /**
913  * Add the element from the given element message to the set.
914  *
915  * @param set_state state of the set want to add to
916  * @param ee the element to add to the set
917  */
918 static void
919 intersection_add (struct SetState *set_state,
920                   struct ElementEntry *ee)
921 {
922   set_state->current_set_element_count++;
923 }
924
925
926 /**
927  * Destroy a set that supports the intersection operation
928  *
929  * @param set_state the set to destroy
930  */
931 static void
932 intersection_set_destroy (struct SetState *set_state)
933 {
934   GNUNET_free (set_state);
935 }
936
937
938 /**
939  * Remove the element given in the element message from the set.
940  *
941  * @param set_state state of the set to remove from
942  * @param element set element to remove
943  */
944 static void
945 intersection_remove (struct SetState *set_state,
946                      struct ElementEntry *element)
947 {
948   GNUNET_assert(0 < set_state->current_set_element_count);
949   set_state->current_set_element_count--;
950 }
951
952
953 /**
954  * Dispatch messages for a intersection operation.
955  *
956  * @param op the state of the intersection evaluate operation
957  * @param mh the received message
958  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
959  *         #GNUNET_OK otherwise
960  */
961 static int
962 intersection_handle_p2p_message (struct Operation *op,
963                                  const struct GNUNET_MessageHeader *mh)
964 {
965   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
966               ntohs (mh->type), ntohs (mh->size));
967   switch (ntohs (mh->type))
968   {
969     /* this message handler is not active until after we received an
970      * operation request message, thus the ops request is not handled here
971      */
972   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
973     handle_p2p_element_info (op, mh);
974     break;
975   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
976     handle_p2p_bf (op, mh);
977     break;
978   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
979     handle_p2p_bf_part (op, mh);
980     break;
981   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
982     handle_p2p_done (op, mh);
983     break;
984   default:
985     /* something wrong with cadet's message handlers? */
986     GNUNET_assert (0);
987   }
988   return GNUNET_OK;
989 }
990
991
992 /**
993  * handler for peer-disconnects, notifies the client about the aborted operation
994  *
995  * @param op the destroyed operation
996  */
997 static void
998 intersection_peer_disconnect (struct Operation *op)
999 {
1000   if (PHASE_FINISHED != op->state->phase)
1001   {
1002     struct GNUNET_MQ_Envelope *ev;
1003     struct GNUNET_SET_ResultMessage *msg;
1004
1005     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1006     msg->request_id = htonl (op->spec->client_request_id);
1007     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1008     msg->element_type = htons (0);
1009     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1010     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1011     _GSS_operation_destroy (op);
1012     return;
1013   }
1014   // else: the session has already been concluded
1015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1016   if (GNUNET_NO == op->state->client_done_sent)
1017     finish_and_destroy (op);
1018 }
1019
1020
1021 /**
1022  * Destroy the union operation.  Only things specific to the union operation are destroyed.
1023  *
1024  * @param op union operation to destroy
1025  */
1026 static void
1027 intersection_op_cancel (struct Operation *op)
1028 {
1029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
1030   /* check if the op was canceled twice */
1031   GNUNET_assert (NULL != op->state);
1032   if (NULL != op->state->remote_bf)
1033   {
1034     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1035     op->state->remote_bf = NULL;
1036   }
1037   if (NULL != op->state->local_bf)
1038   {
1039     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1040     op->state->local_bf = NULL;
1041   }
1042   if (NULL != op->state->my_elements)
1043   {
1044     // no need to free the elements, they are still part of the set
1045     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1046     op->state->my_elements = NULL;
1047   }
1048   GNUNET_free (op->state);
1049   op->state = NULL;
1050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1051 }
1052
1053 const struct SetVT *
1054 _GSS_intersection_vt ()
1055 {
1056   static const struct SetVT intersection_vt = {
1057     .create = &intersection_set_create,
1058     .msg_handler = &intersection_handle_p2p_message,
1059     .add = &intersection_add,
1060     .remove = &intersection_remove,
1061     .destroy_set = &intersection_set_destroy,
1062     .evaluate = &intersection_evaluate,
1063     .accept = &intersection_accept,
1064     .peer_disconnect = &intersection_peer_disconnect,
1065     .cancel = &intersection_op_cancel,
1066   };
1067
1068   return &intersection_vt;
1069 }