work work
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * Alices has suggested an operation to bob, 
41    * and is waiting for a bf or session end.
42    */
43   PHASE_INITIAL,
44   /**
45    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
46    * until one notices the their element count is equal
47    */
48   PHASE_BF_EXCHANGE,
49   /**
50    * if both peers have an equal peercount, they enter this state for 
51    * one more turn, to see if they actually have agreed on a correct set.
52    * if a peer finds the same element count after the next iteration, 
53    * it ends the the session
54    */
55   PHASE_MAYBE_FINISHED,
56   /**
57    * The protocol is over.
58    * Results may still have to be sent to the client.
59    */
60   PHASE_FINISHED
61 };
62
63
64 /**
65  * State of an evaluate operation
66  * with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79
80   /**
81    * Current state of the operation.
82    */
83   enum IntersectionOperationPhase phase;
84
85   /**
86    * Generation in which the operation handle
87    * was created.
88    */
89   unsigned int generation_created;
90   
91   /**
92    * Maps element-id-hashes to 'elements in our set'.
93    */
94   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
95   
96   /**
97    * Current element count contained within contained_elements
98    */
99   uint32_t my_elements_count;
100
101   /**
102    * Iterator for sending elements on the key to element mapping to the client.
103    */
104   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
105   
106   /**
107    * Evaluate operations are held in
108    * a linked list.
109    */
110   struct OperationState *next;
111
112    /**
113     * Evaluate operations are held in
114     * a linked list.
115     */
116   struct OperationState *prev;
117
118   /**
119    * Did we send the client that we are done?
120    */
121   int client_done_sent;
122 };
123
124
125 /**
126  * Alice's version:
127  * 
128  * fills the contained-elements hashmap with all relevant 
129  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
130  * 
131  * @param cls closure
132  * @param key current key code
133  * @param value value in the hash map
134  * @return #GNUNET_YES if we should continue to
135  *         iterate,
136  *         #GNUNET_NO if not.
137  */
138 static int 
139 iterator_initialization_by_alice (void *cls,
140                                       const struct GNUNET_HashCode *key,
141                                       void *value){
142   struct ElementEntry *ee = value;
143   struct Operation *op = cls;
144   struct GNUNET_HashCode mutated_hash;
145   
146   //only consider this element, if it is valid for us
147   if ((op->generation_created >= ee->generation_removed) 
148        || (op->generation_created < ee->generation_added))
149     return GNUNET_YES;
150   
151   // not contained according to bob's bloomfilter
152   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
153   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 
154                                                       &mutated_hash))
155     return GNUNET_YES;
156   
157   op->state->my_elements_count++;  
158   GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 
159                                      &ee->element_hash, ee,
160                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
161   
162   // create our own bloomfilter with salt+1
163   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
164   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
165                                     &mutated_hash);
166   
167   return GNUNET_YES;
168 }
169
170 /**
171  * fills the contained-elements hashmap with all relevant 
172  * elements and adds their mutated hashes to our local bloomfilter
173  * 
174  * @param cls closure
175  * @param key current key code
176  * @param value value in the hash map
177  * @return #GNUNET_YES if we should continue to
178  *         iterate,
179  *         #GNUNET_NO if not.
180  */
181 static int 
182 iterator_initialization (void *cls,
183                                       const struct GNUNET_HashCode *key,
184                                       void *value){
185   struct ElementEntry *ee = value;
186   struct Operation *op = cls;
187   struct GNUNET_HashCode mutated_hash;
188   
189   //only consider this element, if it is valid for us
190   if ((op->generation_created >= ee->generation_removed) 
191        || (op->generation_created < ee->generation_added))
192     return GNUNET_YES;
193   
194   GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 
195                                      &ee->element_hash, ee,
196                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
197   
198   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
199   
200   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
201                                     &mutated_hash);
202   
203   return GNUNET_YES;
204 }
205
206 /**
207  * Counts all valid elements in the hashmap 
208  * (the ones that are valid in our generation)
209  * 
210  * @param cls closure
211  * @param key current key code
212  * @param value value in the hash map
213  * @return #GNUNET_YES if we should continue to
214  *         iterate,
215  *         #GNUNET_NO if not.
216  */
217 static int 
218 iterator_element_count (void *cls,
219                                       const struct GNUNET_HashCode *key,
220                                       void *value){
221   struct ElementEntry *ee = value;
222   struct Operation *op = cls;
223   
224   //only consider this element, if it is valid for us
225   if ((op->generation_created >= ee->generation_removed) 
226        || (op->generation_created < ee->generation_added))
227     return GNUNET_YES;
228   
229   op->state->my_elements_count++;
230   
231   return GNUNET_YES;
232 }
233
234 /**
235  * removes element from a hashmap if it is not contained within the
236  * provided remote bloomfilter. Then, fill our new bloomfilter.
237  * 
238  * @param cls closure
239  * @param key current key code
240  * @param value value in the hash map
241  * @return #GNUNET_YES if we should continue to
242  *         iterate,
243  *         #GNUNET_NO if not.
244  */
245 static int
246 iterator_bf_round (void *cls,
247                                       const struct GNUNET_HashCode *key,
248                                       void *value){
249   struct ElementEntry *ee = value;
250   struct Operation *op = cls;
251   struct GNUNET_HashCode mutated_hash;
252   
253   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
254   
255   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 
256                                      &mutated_hash)){
257     op->state->my_elements_count--;
258     GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, 
259                                      &ee->element_hash,
260                                      ee);
261     return GNUNET_YES;
262   }
263   
264   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
265   
266   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
267                                     &mutated_hash);
268   
269   return GNUNET_YES;
270 }
271
272 /**
273  * Destroy a intersection operation, and free all resources
274  * associated with it.
275  *
276  * @param eo the intersection operation to destroy
277  */
278 static void
279 intersection_operation_destroy (struct OperationState *eo)
280 {
281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
282   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
283                                eo->set->state->ops_tail,
284                                eo);
285   if (NULL != eo->mq)
286   {
287     GNUNET_MQ_destroy (eo->mq);
288     eo->mq = NULL;
289   }
290   if (NULL != eo->tunnel)
291   {
292     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
293     eo->tunnel = NULL;
294     GNUNET_MESH_tunnel_destroy (t);
295   }
296   // TODO: destroy set elements?
297   if (NULL != eo->spec)
298   {
299     if (NULL != eo->spec->context_msg)
300     {
301       GNUNET_free (eo->spec->context_msg);
302       eo->spec->context_msg = NULL;
303     }
304     GNUNET_free (eo->spec);
305     eo->spec = NULL;
306   }
307   GNUNET_free (eo);
308
309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
310
311   /* FIXME: do a garbage collection of the set generations */
312 }
313
314
315 /**
316  * Inform the client that the union operation has failed,
317  * and proceed to destroy the evaluate operation.
318  *
319  * @param op the intersection operation to fail
320  */
321 static void
322 fail_intersection_operation (struct Operation *op)
323 {
324   struct GNUNET_MQ_Envelope *ev;
325   struct GNUNET_SET_ResultMessage *msg;
326
327   if (op->state->my_elements)
328     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
329   
330   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
331
332   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
333   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
334   msg->request_id = htonl (op->spec->client_request_id);
335   msg->element_type = htons (0);
336   GNUNET_MQ_send (op->spec->set->client_mq, ev);
337   _GSS_operation_destroy (op);
338 }
339
340
341
342 /**
343  * Inform the peer that this operation is complete.
344  *
345  * @param eo the intersection operation to fail
346  */
347 static void
348 send_peer_done (struct Operation *op)
349 {
350   struct GNUNET_MQ_Envelope *ev;
351
352   op->state->phase = PHASE_FINISHED;
353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
354   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
355   op->state->local_bf = NULL;
356
357   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
358   GNUNET_MQ_send (op->mq, ev);
359 }
360
361 /**
362  * Send a request for the evaluate operation to a remote peer
363  *
364  * @param eo operation with the other peer
365  */
366 static void
367 send_operation_request (struct Operation *op)
368 {
369   struct GNUNET_MQ_Envelope *ev;
370   struct OperationRequestMessage *msg;
371
372   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
373                                 op->spec->context_msg);
374
375   if (NULL == ev)
376   {
377     /* the context message is too large */
378     GNUNET_break (0);
379     GNUNET_SERVER_client_disconnect (op->spec->set->client);
380     return;
381   }
382   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
383   msg->app_id = op->spec->app_id;
384   msg->salt = htonl (op->spec->salt);
385   msg->element_count = htonl(op->state->my_elements);
386   
387   GNUNET_MQ_send (op->mq, ev);
388
389   if (NULL != op->spec->context_msg)
390     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
391   else
392     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
393
394   if (NULL != op->spec->context_msg)
395   {
396     GNUNET_free (op->spec->context_msg);
397     op->spec->context_msg = NULL;
398   }
399
400 }
401
402 /**
403  * Handle an BF message from a remote peer.
404  *
405  * @param cls the intersection operation
406  * @param mh the header of the message
407  */
408 static void
409 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
410 {
411   struct Operation *op = cls;
412   struct BFMessage *msg = (struct BFMessage *) mh;
413   uint32_t old_count;
414
415   old_count = op->state->my_elements_count;
416   op->spec->salt = ntohl (msg->sender_mutator);
417   
418   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init (&msg[1],
419                                                             BLOOMFILTER_SIZE,
420                                                             ntohl (msg->bloomfilter_length));
421   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
422                                                            BLOOMFILTER_SIZE,
423                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
424   switch (op->state->phase)
425   {
426   case PHASE_INITIAL:
427     // If we are ot our first msg
428     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_elements_count, GNUNET_YES);
429
430     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
431                                            &iterator_initialization_by_alice,
432                                            op);
433     break;
434   case PHASE_BF_EXCHANGE:
435   case PHASE_MAYBE_FINISHED:
436     // if we are bob or alice and are continuing operation
437     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
438                                            &iterator_bf_round,
439                                            op);
440     break;
441   default:
442     GNUNET_break_op (0);
443     fail_intersection_operation(op);
444   }
445   // the iterators created a new BF with salt+1
446   // the peer needs this information for decoding the next BF
447   // this behavior can be modified at will later on.
448   op->spec->salt++;
449   
450   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
451   op->state->remote_bf = NULL;
452   
453   if ((op->state->phase == PHASE_MAYBE_FINISHED) 
454        && (old_count == op->state->my_elements_count)){
455     // In the last round we though we were finished, we now know this is correct
456     send_peer_done(op);
457     return;
458   }
459   
460   op->state->phase = PHASE_BF_EXCHANGE;
461   // maybe we are finished, but we do one more round to make certain
462   // we don't have false positives ...
463   if (op->state->my_elements_count == ntohl (msg->sender_element_count))
464       op->state->phase = PHASE_MAYBE_FINISHED;
465
466   send_bloomfilter (op);
467 }
468
469
470 /**
471  * Handle an BF message from a remote peer.
472  *
473  * @param cls the intersection operation
474  * @param mh the header of the message
475  */
476 static void
477 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
478 {
479   struct Operation *op = cls;
480   struct BFMessage *msg = (struct BFMessage *) mh;
481   uint32_t remote_element_count;
482
483   remote_element_count = ntohl(msg->sender_element_count);
484   if ((op->state->phase == PHASE_INITIAL)
485       || (op->state->my_elements_count > remote_element_count)){
486     GNUNET_break_op (0);
487     fail_intersection_operation(op);
488   }
489
490   op->state->phase = PHASE_BF_EXCHANGE;
491   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
492
493   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
494                                                            BLOOMFILTER_SIZE,
495                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
496   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
497                                          &iterator_initialization,
498                                          op);
499
500   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
501   op->state->remote_bf = NULL;
502
503   if (op->state->my_elements_count == ntohl (msg->sender_element_count))
504     op->state->phase = PHASE_MAYBE_FINISHED;
505
506   send_bloomfilter (op);
507 }
508
509
510 /**
511  * Send a bloomfilter to our peer.
512  * that the operation is over.
513  * After the result done message has been sent to the client,
514  * destroy the evaluate operation.
515  *
516  * @param eo intersection operation
517  */
518 static void
519 send_bloomfilter (struct Operation *op)
520 {
521   struct GNUNET_MQ_Envelope *ev;
522   struct BFMessage *msg;
523   uint32_t bf_size;
524
525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n",);
526
527   // send our bloomfilter
528   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
529
530   ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
531   msg->reserved = 0;
532   msg->sender_element_count = htonl (op->state->my_elements_count);
533   msg->bloomfilter_length = htonl (bf_size);
534   msg->sender_mutator = htonl (op->spec->salt);
535   GNUNET_assert (GNUNET_SYSERR !=
536                  GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
537                                                             &msg->sender_bf_data,
538                                                             BLOOMFILTER_SIZE));
539   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
540   op->state->local_bf = NULL;
541   GNUNET_MQ_send (op->mq, ev);
542 }
543
544 /**
545  * Send our element to the peer, in case our element count is lower than his
546  *
547  * @param eo intersection operation
548  */
549 static void
550 send_element_count (struct Operation *op)
551 {
552   struct GNUNET_MQ_Envelope *ev;
553   struct BFMessage *msg;
554
555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
556
557   // just send our element count, as the other peer must start
558   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
559   msg->reserved = 0;
560   msg->sender_element_count = htonl (op->state->my_elements_count);
561   msg->bloomfilter_length = htonl (0);
562   msg->sender_mutator = htonl (0);
563
564   GNUNET_MQ_send (op->mq, ev);
565 }
566
567 /**
568  * Send a result message to the client indicating
569  * that the operation is over.
570  * After the result done message has been sent to the client,
571  * destroy the evaluate operation.
572  *
573  * @param op intersection operation
574  */
575 static void
576 finish_and_destroy (struct Operation *op)
577 {
578   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
579
580   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
581   {
582     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
583     op->state->full_result_iter =
584         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements);
585     send_remaining_elements (op);
586     return;
587   }
588   send_client_done_and_destroy (op);
589 }
590
591 /**
592  * Handle a done message from a remote peer
593  *
594  * @param cls the union operation
595  * @param mh the message
596  */
597 static void
598 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
599 {
600   struct Operation *op = cls;
601   struct GNUNET_MQ_Envelope *ev;
602
603   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
604     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
605     
606     finish_and_destroy (op);
607     return;
608   }
609   
610   GNUNET_break_op (0);
611   fail_intersection_operation (op);
612 }
613
614
615 /**
616  * Evaluate a union operation with
617  * a remote peer.
618  *
619  * @param op operation to evaluate
620  */
621 static void
622 intersection_evaluate (struct Operation *op)
623 {
624   op->state = GNUNET_new (struct OperationState);
625   /* we started the operation, thus we have to send the operation request */
626   op->state->phase = PHASE_INITIAL;
627   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
628   GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 
629                                         &iterator_element_count,
630                                         op);
631   
632   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
633   send_operation_request (op);
634 }
635
636 /**
637  * Accept an union operation request from a remote peer.
638  * Only initializes the private operation state.
639  *
640  * @param op operation that will be accepted as a union operation
641  */
642 static void
643 intersection_accept (struct Operation *op)
644 {
645   struct OperationRequestMessage * context = 1;
646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
647   
648   op->state = GNUNET_new (struct OperationState);
649   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
650   
651   GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 
652                                         &iterator_element_count,
653                                         op);
654   
655   // if Alice (the peer) has more elements than Bob (us), she should start
656   if (op->spec->element_count < op->state->my_elements_count){
657     op->state->phase = PHASE_INITIAL;
658     send_element_count(op);
659     return;
660   }
661
662   // create a new bloomfilter in case we have fewer elements
663   op->state->phase = PHASE_BF_EXCHANGE;
664   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
665                                                            BLOOMFILTER_SIZE,
666                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
667
668   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
669                                          &iterator_initialization,
670                                          op);
671
672   send_bloomfilter (op);
673 }
674
675
676 /**
677  * Create a new set supporting the intersection operation
678  *
679  * @return the newly created set
680  */
681 static struct SetState *
682 intersection_set_create (void)
683 {
684   struct SetState *set_state;
685
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
687
688   set_state = GNUNET_new (struct SetState);
689   
690   return set_state;
691 }
692
693
694 /**
695  * Add the element from the given element message to the set.
696  *
697  * @param set_state state of the set want to add to
698  * @param ee the element to add to the set
699  */
700 static void
701 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
702 {
703   //nothing to do here
704 }
705
706
707 /**
708  * Destroy a set that supports the intersection operation
709  *
710  * @param set_state the set to destroy
711  */
712 static void
713 intersection_set_destroy (struct SetState *set_state)
714 {
715   GNUNET_free (set_state);
716 }
717
718
719 /**
720  * Remove the element given in the element message from the set.
721  *
722  * @param set_state state of the set to remove from
723  * @param element set element to remove
724  */
725 static void
726 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
727 {
728   //nothing to do here
729 }
730
731
732 /**
733  * Dispatch messages for a intersection operation.
734  *
735  * @param eo the state of the intersection evaluate operation
736  * @param mh the received message
737  * @return GNUNET_SYSERR if the tunnel should be disconnected,
738  *         GNUNET_OK otherwise
739  */
740 int
741 intersection_handle_p2p_message (struct Operation *op,
742                                  const struct GNUNET_MessageHeader *mh)
743 {
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
745               ntohs (mh->type), ntohs (mh->size));
746   switch (ntohs (mh->type))
747   {
748     /* this message handler is not active until after we received an
749      * operation request message, thus the ops request is not handled here
750      */
751   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
752     handle_p2p_element_info (op, mh);
753     break;
754   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
755     handle_p2p_bf (op, mh);
756     break;
757   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
758     handle_p2p_done (op, mh);
759     break;
760   default:
761     /* something wrong with mesh's message handlers? */
762     GNUNET_assert (0);
763   }
764   return GNUNET_OK;
765 }
766
767 /**
768  * Signal to the client that the operation has finished and
769  * destroy the operation.
770  *
771  * @param cls operation to destroy
772  */
773 static void
774 send_client_done_and_destroy (void *cls)
775 {
776   struct Operation *op = cls;
777   struct GNUNET_MQ_Envelope *ev;
778   struct GNUNET_SET_ResultMessage *rm;
779   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
780   rm->request_id = htonl (op->spec->client_request_id);
781   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
782   rm->element_type = htons (0);
783   GNUNET_MQ_send (op->spec->set->client_mq, ev);
784   _GSS_operation_destroy (op);
785 }
786
787 /**
788  * Send all elements in the full result iterator.
789  *
790  * @param cls operation
791  */
792 static void
793 send_remaining_elements (void *cls)
794 {
795   struct Operation *op = cls;
796   struct KeyEntry *remaining; //TODO rework this, key entry does not exist here
797   int res;
798
799   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
800   res = GNUNET_NO;
801   if (GNUNET_NO == res)
802   {
803     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
804     send_client_done_and_destroy (op);
805     return;
806   }
807
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
809
810   while (1)
811   {
812     struct GNUNET_MQ_Envelope *ev;
813     struct GNUNET_SET_ResultMessage *rm;
814     struct GNUNET_SET_Element *element;
815     element = &remaining->element->element;
816
817     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
818     GNUNET_assert (0 != op->spec->client_request_id);
819     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
820     if (NULL == ev)
821     {
822       GNUNET_MQ_discard (ev);
823       GNUNET_break (0);
824       continue;
825     }
826     rm->result_status = htons (GNUNET_SET_STATUS_OK);
827     rm->request_id = htonl (op->spec->client_request_id);
828     rm->element_type = element->type;
829     memcpy (&rm[1], element->data, element->size);
830     if (remaining->next_colliding == NULL)
831     {
832       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
833       GNUNET_MQ_send (op->spec->set->client_mq, ev);
834       break;
835     }
836     GNUNET_MQ_send (op->spec->set->client_mq, ev);
837     remaining = remaining->next_colliding;
838   }
839 }
840
841 /**
842  * handler for peer-disconnects, notifies the client about the aborted operation
843  * 
844  * @param op the destroyed operation
845  */
846 static void
847 intersection_peer_disconnect (struct Operation *op)
848 {
849   if (PHASE_FINISHED != op->state->phase)
850   {
851     struct GNUNET_MQ_Envelope *ev;
852     struct GNUNET_SET_ResultMessage *msg;
853
854     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
855     msg->request_id = htonl (op->spec->client_request_id);
856     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
857     msg->element_type = htons (0);
858     GNUNET_MQ_send (op->spec->set->client_mq, ev);
859     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
860     _GSS_operation_destroy (op);
861     return;
862   }
863   // else: the session has already been concluded
864   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
865   if (GNUNET_NO == op->state->client_done_sent)
866     finish_and_destroy (op);
867 }
868
869
870 /**
871  * Destroy the union operation.  Only things specific to the union operation are destroyed.
872  * 
873  * @param op union operation to destroy
874  */
875 static void
876 intersection_op_cancel (struct Operation *op)
877 {
878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
879   /* check if the op was canceled twice */
880   GNUNET_assert (NULL != op->state);
881   if (NULL != op->state->remote_bf)
882   {
883     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
884     op->state->remote_bf = NULL;
885   }
886   if (NULL != op->state->local_bf)
887   {
888     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
889     op->state->local_bf = NULL;
890   }
891   if (NULL != op->state->my_elements)
892   {
893     // no need to free the elements, they are still part of the set
894     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
895     op->state->my_elements = NULL;
896   }
897   GNUNET_free (op->state);
898   op->state = NULL;
899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
900 }
901
902 const struct SetVT *
903 _GSS_intersection_vt ()
904 {
905   static const struct SetVT intersection_vt = {
906     .create = &intersection_set_create,
907     .msg_handler = &intersection_handle_p2p_message,
908     .add = &intersection_add,
909     .remove = &intersection_remove,
910     .destroy_set = &intersection_set_destroy,
911     .evaluate = &intersection_evaluate,
912     .accept = &intersection_accept,
913     .peer_disconnect = &intersection_peer_disconnect,
914     .cancel = &intersection_op_cancel,
915   };
916
917   return &intersection_vt;
918 }