- further work on multipart receiving
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * if both peers have an equal peercount, they enter this state for
58    * one more turn, to see if they actually have agreed on a correct set.
59    * if a peer finds the same element count after the next iteration,
60    * it ends the the session
61    */
62   PHASE_MAYBE_FINISHED,
63   /**
64    * The protocol is over.
65    * Results may still have to be sent to the client.
66    */
67   PHASE_FINISHED
68 };
69
70
71 /**
72  * State of an evaluate operation
73  * with another peer.
74  */
75 struct OperationState
76 {
77   /**
78    * The bf we currently receive
79    */
80   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
81
82   /**
83    * BF of the set's element.
84    */
85   struct GNUNET_CONTAINER_BloomFilter *local_bf;
86
87   /**
88    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
89    */
90   char * bf_data;
91
92   /**
93    * size of the bloomfilter
94    */
95   uint32_t bf_data_size;
96   
97   /**
98    * size of the bloomfilter
99    */
100   uint32_t bf_bits_per_element;
101   
102   /**
103    * Current state of the operation.
104    */
105   enum IntersectionOperationPhase phase;
106
107   /**
108    * Generation in which the operation handle
109    * was created.
110    */
111   unsigned int generation_created;
112
113   /**
114    * Maps element-id-hashes to 'elements in our set'.
115    */
116   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
117
118   /**
119    * Current element count contained within contained_elements
120    */
121   uint32_t my_element_count;
122
123   /**
124    * Iterator for sending elements on the key to element mapping to the client.
125    */
126   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
127
128   /**
129    * Evaluate operations are held in
130    * a linked list.
131    */
132   struct OperationState *next;
133
134    /**
135     * Evaluate operations are held in
136     * a linked list.
137     */
138   struct OperationState *prev;
139
140   /**
141    * Did we send the client that we are done?
142    */
143   int client_done_sent;
144 };
145
146
147 /**
148  * Extra state required for efficient set intersection.
149  */
150 struct SetState
151 {
152   /**
153    * Number of currently valid elements in the set which have not been removed
154    */
155   uint32_t current_set_element_count;
156 };
157
158
159 /**
160  * Alice's version:
161  *
162  * fills the contained-elements hashmap with all relevant
163  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
164  *
165  * @param cls closure
166  * @param key current key code
167  * @param value value in the hash map
168  * @return #GNUNET_YES if we should continue to
169  *         iterate,
170  *         #GNUNET_NO if not.
171  */
172 static int
173 iterator_initialization_by_alice (void *cls,
174                                   const struct GNUNET_HashCode *key,
175                                   void *value)
176 {
177   struct ElementEntry *ee = value;
178   struct Operation *op = cls;
179   struct GNUNET_HashCode mutated_hash;
180
181   //only consider this element, if it is valid for us
182   if ((op->generation_created >= ee->generation_removed)
183        || (op->generation_created < ee->generation_added))
184     return GNUNET_YES;
185
186   // not contained according to bob's bloomfilter
187   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
188                            op->spec->salt,
189                            &mutated_hash);
190   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
191                                                       &mutated_hash))
192     return GNUNET_YES;
193
194   op->state->my_element_count++;
195   GNUNET_assert (GNUNET_YES ==
196                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
197                                                     &ee->element_hash, ee,
198                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
199
200   return GNUNET_YES;
201 }
202
203 /**
204  * fills the contained-elements hashmap with all relevant
205  * elements and adds their mutated hashes to our local bloomfilter
206  *
207  * @param cls closure
208  * @param key current key code
209  * @param value value in the hash map
210  * @return #GNUNET_YES if we should continue to
211  *         iterate,
212  *         #GNUNET_NO if not.
213  */
214 static int
215 iterator_initialization (void *cls,
216                          const struct GNUNET_HashCode *key,
217                          void *value)
218 {
219   struct ElementEntry *ee = value;
220   struct Operation *op = cls;
221
222   //only consider this element, if it is valid for us
223   if ((op->generation_created >= ee->generation_removed)
224        || (op->generation_created < ee->generation_added))
225     return GNUNET_YES;
226
227   GNUNET_assert (GNUNET_YES ==
228                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
229                                                     &ee->element_hash, ee,
230                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
231   return GNUNET_YES;
232 }
233
234
235 /**
236  * removes element from a hashmap if it is not contained within the
237  * provided remote bloomfilter. Then, fill our new bloomfilter.
238  *
239  * @param cls closure
240  * @param key current key code
241  * @param value value in the hash map
242  * @return #GNUNET_YES if we should continue to
243  *         iterate,
244  *         #GNUNET_NO if not.
245  */
246 static int
247 iterator_bf_reduce (void *cls,
248                    const struct GNUNET_HashCode *key,
249                    void *value)
250 {
251   struct ElementEntry *ee = value;
252   struct Operation *op = cls;
253   struct GNUNET_HashCode mutated_hash;
254
255   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
256
257   if (GNUNET_NO ==
258       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
259                                          &mutated_hash))
260   {
261     op->state->my_element_count--;
262     GNUNET_assert (GNUNET_YES ==
263                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
264                                                          &ee->element_hash,
265                                                          ee));
266   }
267
268   return GNUNET_YES;
269 }
270
271 /**
272  * create a bloomfilter based on the elements given
273  *
274  * @param cls closure
275  * @param key current key code
276  * @param value value in the hash map
277  * @return #GNUNET_YES if we should continue to
278  *         iterate,
279  *         #GNUNET_NO if not.
280  */
281 static int
282 iterator_bf_create (void *cls,
283                    const struct GNUNET_HashCode *key,
284                    void *value)
285 {
286   struct ElementEntry *ee = value;
287   struct Operation *op = cls;
288   struct GNUNET_HashCode mutated_hash;
289
290   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
291
292   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
293                                     &mutated_hash);
294   return GNUNET_YES;
295 }
296
297 /**
298  * Inform the client that the union operation has failed,
299  * and proceed to destroy the evaluate operation.
300  *
301  * @param op the intersection operation to fail
302  */
303 static void
304 fail_intersection_operation (struct Operation *op)
305 {
306   struct GNUNET_MQ_Envelope *ev;
307   struct GNUNET_SET_ResultMessage *msg;
308
309   if (op->state->my_elements)
310     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
311
312   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
313
314   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
315   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
316   msg->request_id = htonl (op->spec->client_request_id);
317   msg->element_type = htons (0);
318   GNUNET_MQ_send (op->spec->set->client_mq, ev);
319   _GSS_operation_destroy (op);
320 }
321
322
323 /**
324  * Send a request for the evaluate operation to a remote peer
325  *
326  * @param op operation with the other peer
327  */
328 static void
329 send_operation_request (struct Operation *op)
330 {
331   struct GNUNET_MQ_Envelope *ev;
332   struct OperationRequestMessage *msg;
333
334   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
335                                 op->spec->context_msg);
336
337   if (NULL == ev)
338   {
339     /* the context message is too large */
340     GNUNET_break (0);
341     GNUNET_SERVER_client_disconnect (op->spec->set->client);
342     return;
343   }
344   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
345   msg->app_id = op->spec->app_id;
346   msg->salt = htonl (op->spec->salt);
347   msg->element_count = htonl(op->state->my_element_count);
348
349   GNUNET_MQ_send (op->mq, ev);
350
351   if (NULL != op->spec->context_msg)
352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
353   else
354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
355
356   if (NULL != op->spec->context_msg)
357   {
358     GNUNET_free (op->spec->context_msg);
359     op->spec->context_msg = NULL;
360   }
361 }
362
363 static void
364 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
365 {
366   struct GNUNET_MQ_Envelope *ev;
367   struct BFPart *msg;
368   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
369   uint32_t todo_size = op->state->bf_data_size - offset;
370
371   if (todo_size < chunk_size)
372     chunk_size = todo_size;
373
374   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
375
376   msg->chunk_length = htonl (chunk_size);
377   msg->chunk_offset = htonl (offset);
378   memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
379
380   GNUNET_MQ_send (op->mq, ev);
381
382   if (op->state->bf_data_size == offset + chunk_size)
383   {
384     // done
385     GNUNET_free(op->state->bf_data);
386     op->state->bf_data = NULL;
387     return;
388   }
389
390   send_bloomfilter_multipart (op, offset + chunk_size);
391 }
392
393 /**
394  * Send a bloomfilter to our peer.
395  * that the operation is over.
396  * After the result done message has been sent to the client,
397  * destroy the evaluate operation.
398  *
399  * @param op intersection operation
400  */
401 static void
402 send_bloomfilter (struct Operation *op)
403 {
404   struct GNUNET_MQ_Envelope *ev;
405   struct BFMessage *msg;
406   uint32_t bf_size;
407   uint32_t bf_elementbits;
408   uint32_t chunk_size;
409   struct GNUNET_CONTAINER_BloomFilter * local_bf;
410
411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
412   
413   CALCULATE_BF_SIZE(op->state->my_element_count,
414                     op->spec->remote_element_count,
415                     bf_size,
416                     bf_elementbits);
417
418   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
419                                                 bf_size,
420                                                 bf_elementbits);
421
422   op->spec->salt++;
423   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
424                                          &iterator_bf_create,
425                                          op);
426
427   // send our bloomfilter
428   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
429     // singlepart
430     chunk_size = bf_size;
431     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
432     GNUNET_assert (GNUNET_SYSERR !=
433                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
434                                                               (char*)&msg[1],
435                                                               bf_size));
436   }
437   else {
438     //multipart
439     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
440     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
441     op->state->bf_data = (char *) GNUNET_malloc (bf_size);
442     GNUNET_assert (GNUNET_SYSERR !=
443                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
444                                                               op->state->bf_data,
445                                                               bf_size));
446     memcpy (&msg[1], op->state->bf_data, chunk_size);
447     op->state->bf_data_size = bf_size;
448   }
449   GNUNET_CONTAINER_bloomfilter_free (local_bf);
450
451   msg->sender_element_count = htonl (op->state->my_element_count);
452   msg->bloomfilter_total_length = htonl (bf_size);
453   msg->bloomfilter_length = htonl (chunk_size);
454   msg->bits_per_element = htonl (bf_elementbits);
455   msg->sender_mutator = htonl (op->spec->salt);
456
457   GNUNET_MQ_send (op->mq, ev);
458
459   if (op->state->bf_data)
460     send_bloomfilter_multipart (op, chunk_size);
461 }
462
463
464 /**
465  * Signal to the client that the operation has finished and
466  * destroy the operation.
467  *
468  * @param cls operation to destroy
469  */
470 static void
471 send_client_done_and_destroy (void *cls)
472 {
473   struct Operation *op = cls;
474   struct GNUNET_MQ_Envelope *ev;
475   struct GNUNET_SET_ResultMessage *rm;
476   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
477   rm->request_id = htonl (op->spec->client_request_id);
478   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
479   rm->element_type = htons (0);
480   GNUNET_MQ_send (op->spec->set->client_mq, ev);
481   _GSS_operation_destroy (op);
482 }
483
484
485 /**
486  * Send all elements in the full result iterator.
487  *
488  * @param cls operation
489  */
490 static void
491 send_remaining_elements (void *cls)
492 {
493   struct Operation *op = cls;
494   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
495   struct GNUNET_MQ_Envelope *ev;
496   struct GNUNET_SET_ResultMessage *rm;
497   struct GNUNET_SET_Element *element;
498   int res;
499
500   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
501   if (GNUNET_NO == res) {
502     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
503     send_client_done_and_destroy (op);
504     return;
505   }
506
507   element = &remaining->element;
508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
509   GNUNET_assert (0 != op->spec->client_request_id);
510
511   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
512   GNUNET_assert (NULL != ev);
513
514   rm->result_status = htons (GNUNET_SET_STATUS_OK);
515   rm->request_id = htonl (op->spec->client_request_id);
516   rm->element_type = element->type;
517   memcpy (&rm[1], element->data, element->size);
518
519   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
520   GNUNET_MQ_send (op->spec->set->client_mq, ev);
521 }
522
523
524 /**
525  * Inform the peer that this operation is complete.
526  *
527  * @param op the intersection operation to fail
528  */
529 static void
530 send_peer_done (struct Operation *op)
531 {
532   struct GNUNET_MQ_Envelope *ev;
533
534   op->state->phase = PHASE_FINISHED;
535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
536   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
537   op->state->local_bf = NULL;
538
539   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
540   GNUNET_MQ_send (op->mq, ev);
541 }
542
543
544 /**
545  * Process a Bloomfilter once we got all the chunks
546  *
547  * @param op the intersection operation
548  */
549 static void
550 process_bf (struct Operation *op){
551   uint32_t old_elements;
552   uint32_t peer_elements;
553   
554   old_elements = op->state->my_element_count;
555   peer_elements = op->spec->remote_element_count;
556   switch (op->state->phase)
557   {
558   case PHASE_INITIAL:
559     // If we are ot our first msg
560     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
561
562     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
563                                            &iterator_initialization_by_alice,
564                                            op);
565     break;
566   case PHASE_BF_EXCHANGE:
567   case PHASE_MAYBE_FINISHED:
568     // if we are bob or alice and are continuing operation
569     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
570                                            &iterator_bf_reduce,
571                                            op);
572     break;
573   default:
574     GNUNET_break_op (0);
575     fail_intersection_operation(op);
576   }
577   // the iterators created a new BF with salt+1
578   // the peer needs this information for decoding the next BF
579   // this behavior can be modified at will later on.
580   op->spec->salt++;
581
582   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
583   op->state->remote_bf = NULL;
584
585   if ((op->state->phase == PHASE_MAYBE_FINISHED)
586        && (old_elements == op->state->my_element_count)
587        && (op->state->my_element_count == peer_elements)){
588     // In the last round we though we were finished, we now know this is correct
589     send_peer_done (op);
590     return;
591   }
592
593   op->state->phase = PHASE_BF_EXCHANGE;
594   if (op->state->my_element_count == peer_elements)
595     // maybe we are finished, but we do one more round to make certain
596     // we don't have false positives ...
597     op->state->phase = PHASE_MAYBE_FINISHED;
598
599   send_bloomfilter (op);
600 }
601
602
603 /**
604  * Handle an BF multipart message from a remote peer.
605  *
606  * @param cls the intersection operation
607  * @param mh the header of the message
608  */
609 static void
610 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
611 {
612   struct Operation *op = cls;
613   const struct BFPart *msg = (const struct BFPart *) mh;
614   uint32_t chunk_size;
615   uint32_t chunk_offset;
616   
617   chunk_size = ntohl(msg->chunk_length);
618   chunk_offset = ntohl(msg->chunk_offset);
619   
620   if ((NULL == op->state->bf_data) 
621        || (op->state->bf_data_size < chunk_size + chunk_offset)){
622     // unexpected multipart chunk
623     GNUNET_break_op (0);
624     fail_intersection_operation(op);
625     return;
626   }
627   
628   memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
629
630   if (op->state->bf_data_size > chunk_size + chunk_offset)
631     // wait for next chunk
632     return;
633
634   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
635                                                             op->state->bf_data_size,
636                                                             op->state->bf_bits_per_element);
637   
638   GNUNET_free (op->state->bf_data);
639   op->state->bf_data = NULL;
640   
641   process_bf (op);
642 }
643
644
645 /**
646  * Handle an BF message from a remote peer.
647  *
648  * @param cls the intersection operation
649  * @param mh the header of the message
650  */
651 static void
652 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
653 {
654   struct Operation *op = cls;
655   const struct BFMessage *msg = (const struct BFMessage *) mh;
656   uint32_t bf_size;
657   uint32_t chunk_size;
658   uint32_t bf_bits_per_element;
659
660   switch (op->state->phase)
661   {
662   case PHASE_INITIAL:
663   case PHASE_BF_EXCHANGE:
664   case PHASE_MAYBE_FINISHED:
665     if (NULL == op->state->bf_data) {
666       // no colliding multipart transaction going on currently
667       op->spec->salt = ntohl (msg->sender_mutator);
668       bf_size = ntohl (msg->bloomfilter_total_length);
669       bf_bits_per_element = ntohl (msg->bits_per_element);
670       chunk_size = ntohl (msg->bloomfilter_length);
671       op->spec->remote_element_count = ntohl(msg->sender_element_count);
672       if (bf_size == chunk_size) {
673         // single part, done here
674         op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
675                                                                   bf_size,
676                                                                   bf_bits_per_element);
677         process_bf (op);
678         return;
679       }
680
681       //first multipart chunk
682       op->state->bf_data = GNUNET_malloc (bf_size);
683       op->state->bf_data_size = bf_size;
684       op->state->bf_bits_per_element = bf_bits_per_element;
685       memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
686       return;
687     }
688   default:
689     GNUNET_break_op (0);
690     fail_intersection_operation (op);
691   }
692 }
693
694
695 /**
696  * Handle an BF message from a remote peer.
697  *
698  * @param cls the intersection operation
699  * @param mh the header of the message
700  */
701 static void
702 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
703 {
704   struct Operation *op = cls;
705   struct BFMessage *msg = (struct BFMessage *) mh;
706
707   op->spec->remote_element_count = ntohl(msg->sender_element_count);
708   if ((op->state->phase != PHASE_INITIAL)
709       || (op->state->my_element_count > op->spec->remote_element_count)){
710     GNUNET_break_op (0);
711     fail_intersection_operation(op);
712   }
713
714   op->state->phase = PHASE_BF_EXCHANGE;
715   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
716   
717   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
718                                          &iterator_initialization,
719                                          op);
720
721   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
722   op->state->remote_bf = NULL;
723
724   if (op->state->my_element_count == ntohl (msg->sender_element_count))
725     op->state->phase = PHASE_MAYBE_FINISHED;
726
727   send_bloomfilter (op);
728 }
729
730
731 /**
732  * Send our element to the peer, in case our element count is lower than his
733  *
734  * @param op intersection operation
735  */
736 static void
737 send_element_count (struct Operation *op)
738 {
739   struct GNUNET_MQ_Envelope *ev;
740   struct BFMessage *msg;
741
742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
743
744   // just send our element count, as the other peer must start
745   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
746   msg->sender_element_count = htonl (op->state->my_element_count);
747   msg->bloomfilter_length = htonl (0);
748   msg->sender_mutator = htonl (0);
749
750   GNUNET_MQ_send (op->mq, ev);
751 }
752
753
754 /**
755  * Send a result message to the client indicating
756  * that the operation is over.
757  * After the result done message has been sent to the client,
758  * destroy the evaluate operation.
759  *
760  * @param op intersection operation
761  */
762 static void
763 finish_and_destroy (struct Operation *op)
764 {
765   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
766
767   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
768   {
769     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
770     op->state->full_result_iter =
771         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
772     send_remaining_elements (op);
773     return;
774   }
775   send_client_done_and_destroy (op);
776 }
777
778
779 /**
780  * Handle a done message from a remote peer
781  *
782  * @param cls the union operation
783  * @param mh the message
784  */
785 static void
786 handle_p2p_done (void *cls,
787                  const struct GNUNET_MessageHeader *mh)
788 {
789   struct Operation *op = cls;
790
791   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
792     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
793
794     finish_and_destroy (op);
795     return;
796   }
797
798   GNUNET_break_op (0);
799   fail_intersection_operation (op);
800 }
801
802
803 /**
804  * Evaluate a union operation with
805  * a remote peer.
806  *
807  * @param op operation to evaluate
808  */
809 static void
810 intersection_evaluate (struct Operation *op)
811 {
812   op->state = GNUNET_new (struct OperationState);
813   /* we started the operation, thus we have to send the operation request */
814   op->state->phase = PHASE_INITIAL;
815   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
816   op->state->my_element_count = op->spec->set->state->current_set_element_count;
817
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "evaluating intersection operation");
820   send_operation_request (op);
821 }
822
823
824 /**
825  * Accept an union operation request from a remote peer.
826  * Only initializes the private operation state.
827  *
828  * @param op operation that will be accepted as a union operation
829  */
830 static void
831 intersection_accept (struct Operation *op)
832 {
833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
834   op->state = GNUNET_new (struct OperationState);
835   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
836   op->state->my_element_count = op->spec->set->state->current_set_element_count;
837
838   // if Alice (the peer) has more elements than Bob (us), she should start
839   if (op->spec->remote_element_count < op->state->my_element_count){
840     op->state->phase = PHASE_INITIAL;
841     send_element_count(op);
842     return;
843   }
844   // create a new bloomfilter in case we have fewer elements
845   op->state->phase = PHASE_BF_EXCHANGE;
846   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
847                                                            BLOOMFILTER_SIZE,
848                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
849   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
850                                          &iterator_initialization,
851                                          op);
852   send_bloomfilter (op);
853 }
854
855
856 /**
857  * Create a new set supporting the intersection operation
858  *
859  * @return the newly created set
860  */
861 static struct SetState *
862 intersection_set_create ()
863 {
864   struct SetState *set_state;
865
866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867               "intersection set created\n");
868   set_state = GNUNET_new (struct SetState);
869   set_state->current_set_element_count = 0;
870
871   return set_state;
872 }
873
874
875 /**
876  * Add the element from the given element message to the set.
877  *
878  * @param set_state state of the set want to add to
879  * @param ee the element to add to the set
880  */
881 static void
882 intersection_add (struct SetState *set_state,
883                   struct ElementEntry *ee)
884 {
885   GNUNET_assert(0 < set_state->current_set_element_count);
886   set_state->current_set_element_count++;
887 }
888
889
890 /**
891  * Destroy a set that supports the intersection operation
892  *
893  * @param set_state the set to destroy
894  */
895 static void
896 intersection_set_destroy (struct SetState *set_state)
897 {
898   GNUNET_free (set_state);
899 }
900
901
902 /**
903  * Remove the element given in the element message from the set.
904  *
905  * @param set_state state of the set to remove from
906  * @param element set element to remove
907  */
908 static void
909 intersection_remove (struct SetState *set_state,
910                      struct ElementEntry *element)
911 {
912   GNUNET_assert(0 < set_state->current_set_element_count);
913   set_state->current_set_element_count--;
914 }
915
916
917 /**
918  * Dispatch messages for a intersection operation.
919  *
920  * @param op the state of the intersection evaluate operation
921  * @param mh the received message
922  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
923  *         #GNUNET_OK otherwise
924  */
925 static int
926 intersection_handle_p2p_message (struct Operation *op,
927                                  const struct GNUNET_MessageHeader *mh)
928 {
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
930               ntohs (mh->type), ntohs (mh->size));
931   switch (ntohs (mh->type))
932   {
933     /* this message handler is not active until after we received an
934      * operation request message, thus the ops request is not handled here
935      */
936   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
937     handle_p2p_element_info (op, mh);
938     break;
939   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
940     handle_p2p_bf (op, mh);
941     break;
942   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
943     handle_p2p_bf_part (op, mh);
944     break;
945   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
946     handle_p2p_done (op, mh);
947     break;
948   default:
949     /* something wrong with mesh's message handlers? */
950     GNUNET_assert (0);
951   }
952   return GNUNET_OK;
953 }
954
955
956 /**
957  * handler for peer-disconnects, notifies the client about the aborted operation
958  *
959  * @param op the destroyed operation
960  */
961 static void
962 intersection_peer_disconnect (struct Operation *op)
963 {
964   if (PHASE_FINISHED != op->state->phase)
965   {
966     struct GNUNET_MQ_Envelope *ev;
967     struct GNUNET_SET_ResultMessage *msg;
968
969     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
970     msg->request_id = htonl (op->spec->client_request_id);
971     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
972     msg->element_type = htons (0);
973     GNUNET_MQ_send (op->spec->set->client_mq, ev);
974     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
975     _GSS_operation_destroy (op);
976     return;
977   }
978   // else: the session has already been concluded
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
980   if (GNUNET_NO == op->state->client_done_sent)
981     finish_and_destroy (op);
982 }
983
984
985 /**
986  * Destroy the union operation.  Only things specific to the union operation are destroyed.
987  *
988  * @param op union operation to destroy
989  */
990 static void
991 intersection_op_cancel (struct Operation *op)
992 {
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
994   /* check if the op was canceled twice */
995   GNUNET_assert (NULL != op->state);
996   if (NULL != op->state->remote_bf)
997   {
998     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
999     op->state->remote_bf = NULL;
1000   }
1001   if (NULL != op->state->local_bf)
1002   {
1003     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1004     op->state->local_bf = NULL;
1005   }
1006   if (NULL != op->state->my_elements)
1007   {
1008     // no need to free the elements, they are still part of the set
1009     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1010     op->state->my_elements = NULL;
1011   }
1012   GNUNET_free (op->state);
1013   op->state = NULL;
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1015 }
1016
1017 const struct SetVT *
1018 _GSS_intersection_vt ()
1019 {
1020   static const struct SetVT intersection_vt = {
1021     .create = &intersection_set_create,
1022     .msg_handler = &intersection_handle_p2p_message,
1023     .add = &intersection_add,
1024     .remove = &intersection_remove,
1025     .destroy_set = &intersection_set_destroy,
1026     .evaluate = &intersection_evaluate,
1027     .accept = &intersection_accept,
1028     .peer_disconnect = &intersection_peer_disconnect,
1029     .cancel = &intersection_op_cancel,
1030   };
1031
1032   return &intersection_vt;
1033 }