missing fi
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * Alices has suggested an operation to bob, 
41    * and is waiting for a bf or session end.
42    */
43   PHASE_INITIAL,
44   /**
45    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
46    * until one notices the their element count is equal
47    */
48   PHASE_BF_EXCHANGE,
49   /**
50    * if both peers have an equal peercount, they enter this state for 
51    * one more turn, to see if they actually have agreed on a correct set.
52    * if a peer finds the same element count after the next iteration, 
53    * it ends the the session
54    */
55   PHASE_MAYBE_FINISHED,
56   /**
57    * The protocol is over.
58    * Results may still have to be sent to the client.
59    */
60   PHASE_FINISHED
61 };
62
63
64 /**
65  * State of an evaluate operation
66  * with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79
80   /**
81    * Current state of the operation.
82    */
83   enum IntersectionOperationPhase phase;
84
85   /**
86    * Generation in which the operation handle
87    * was created.
88    */
89   unsigned int generation_created;
90   
91   /**
92    * Maps element-id-hashes to 'elements in our set'.
93    */
94   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
95   
96   /**
97    * Current element count contained within contained_elements
98    */
99   uint32_t my_elements_count;
100
101   /**
102    * Iterator for sending elements on the key to element mapping to the client.
103    */
104   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
105   
106   /**
107    * Evaluate operations are held in
108    * a linked list.
109    */
110   struct OperationState *next;
111
112    /**
113     * Evaluate operations are held in
114     * a linked list.
115     */
116   struct OperationState *prev;
117
118   /**
119    * Did we send the client that we are done?
120    */
121   int client_done_sent;
122 };
123
124
125 /**
126  * Alice's version:
127  * 
128  * fills the contained-elements hashmap with all relevant 
129  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
130  * 
131  * @param cls closure
132  * @param key current key code
133  * @param value value in the hash map
134  * @return #GNUNET_YES if we should continue to
135  *         iterate,
136  *         #GNUNET_NO if not.
137  */
138 static int 
139 iterator_initialization_by_alice (void *cls,
140                                       const struct GNUNET_HashCode *key,
141                                       void *value){
142   struct ElementEntry *ee = value;
143   struct Operation *op = cls;
144   struct GNUNET_HashCode mutated_hash;
145   
146   //only consider this element, if it is valid for us
147   if ((op->generation_created >= ee->generation_removed) 
148        || (op->generation_created < ee->generation_added))
149     return GNUNET_YES;
150   
151   // not contained according to bob's bloomfilter
152   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
153   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 
154                                                       &mutated_hash))
155     return GNUNET_YES;
156   
157   op->state->my_elements_count++;  
158   GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 
159                                      &ee->element_hash, ee,
160                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
161   
162   // create our own bloomfilter with salt+1
163   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
164   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
165                                     &mutated_hash);
166   
167   return GNUNET_YES;
168 }
169
170 /**
171  * fills the contained-elements hashmap with all relevant 
172  * elements and adds their mutated hashes to our local bloomfilter
173  * 
174  * @param cls closure
175  * @param key current key code
176  * @param value value in the hash map
177  * @return #GNUNET_YES if we should continue to
178  *         iterate,
179  *         #GNUNET_NO if not.
180  */
181 static int 
182 iterator_initialization (void *cls,
183                                       const struct GNUNET_HashCode *key,
184                                       void *value){
185   struct ElementEntry *ee = value;
186   struct Operation *op = cls;
187   struct GNUNET_HashCode mutated_hash;
188   
189   //only consider this element, if it is valid for us
190   if ((op->generation_created >= ee->generation_removed) 
191        || (op->generation_created < ee->generation_added))
192     return GNUNET_YES;
193   
194   GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 
195                                      &ee->element_hash, ee,
196                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
197   
198   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
199   
200   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
201                                     &mutated_hash);
202   
203   return GNUNET_YES;
204 }
205
206 /**
207  * Counts all valid elements in the hashmap 
208  * (the ones that are valid in our generation)
209  * 
210  * @param cls closure
211  * @param key current key code
212  * @param value value in the hash map
213  * @return #GNUNET_YES if we should continue to
214  *         iterate,
215  *         #GNUNET_NO if not.
216  */
217 static int 
218 iterator_element_count (void *cls,
219                                       const struct GNUNET_HashCode *key,
220                                       void *value){
221   struct ElementEntry *ee = value;
222   struct Operation *op = cls;
223   
224   //only consider this element, if it is valid for us
225   if ((op->generation_created >= ee->generation_removed) 
226        || (op->generation_created < ee->generation_added))
227     return GNUNET_YES;
228   
229   op->state->my_elements_count++;
230   
231   return GNUNET_YES;
232 }
233
234 /**
235  * removes element from a hashmap if it is not contained within the
236  * provided remote bloomfilter. Then, fill our new bloomfilter.
237  * 
238  * @param cls closure
239  * @param key current key code
240  * @param value value in the hash map
241  * @return #GNUNET_YES if we should continue to
242  *         iterate,
243  *         #GNUNET_NO if not.
244  */
245 static int
246 iterator_bf_round (void *cls,
247                                       const struct GNUNET_HashCode *key,
248                                       void *value){
249   struct ElementEntry *ee = value;
250   struct Operation *op = cls;
251   struct GNUNET_HashCode mutated_hash;
252   
253   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
254   
255   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 
256                                      &mutated_hash)){
257     op->state->my_elements_count--;
258     GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, 
259                                      &ee->element_hash,
260                                      ee);
261     return GNUNET_YES;
262   }
263   
264   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
265   
266   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 
267                                     &mutated_hash);
268   
269   return GNUNET_YES;
270 }
271
272 /**
273  * Inform the client that the union operation has failed,
274  * and proceed to destroy the evaluate operation.
275  *
276  * @param op the intersection operation to fail
277  */
278 static void
279 fail_intersection_operation (struct Operation *op)
280 {
281   struct GNUNET_MQ_Envelope *ev;
282   struct GNUNET_SET_ResultMessage *msg;
283
284   if (op->state->my_elements)
285     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
286   
287   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
288
289   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
290   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
291   msg->request_id = htonl (op->spec->client_request_id);
292   msg->element_type = htons (0);
293   GNUNET_MQ_send (op->spec->set->client_mq, ev);
294   _GSS_operation_destroy (op);
295 }
296
297
298
299 /**
300  * Inform the peer that this operation is complete.
301  *
302  * @param eo the intersection operation to fail
303  */
304 static void
305 send_peer_done (struct Operation *op)
306 {
307   struct GNUNET_MQ_Envelope *ev;
308
309   op->state->phase = PHASE_FINISHED;
310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
311   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
312   op->state->local_bf = NULL;
313
314   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
315   GNUNET_MQ_send (op->mq, ev);
316 }
317
318 /**
319  * Send a request for the evaluate operation to a remote peer
320  *
321  * @param eo operation with the other peer
322  */
323 static void
324 send_operation_request (struct Operation *op)
325 {
326   struct GNUNET_MQ_Envelope *ev;
327   struct OperationRequestMessage *msg;
328
329   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
330                                 op->spec->context_msg);
331
332   if (NULL == ev)
333   {
334     /* the context message is too large */
335     GNUNET_break (0);
336     GNUNET_SERVER_client_disconnect (op->spec->set->client);
337     return;
338   }
339   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
340   msg->app_id = op->spec->app_id;
341   msg->salt = htonl (op->spec->salt);
342   msg->element_count = htonl(op->state->my_elements);
343   
344   GNUNET_MQ_send (op->mq, ev);
345
346   if (NULL != op->spec->context_msg)
347     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
348   else
349     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
350
351   if (NULL != op->spec->context_msg)
352   {
353     GNUNET_free (op->spec->context_msg);
354     op->spec->context_msg = NULL;
355   }
356
357 }
358
359 /**
360  * Handle an BF message from a remote peer.
361  *
362  * @param cls the intersection operation
363  * @param mh the header of the message
364  */
365 static void
366 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
367 {
368   struct Operation *op = cls;
369   struct BFMessage *msg = (struct BFMessage *) mh;
370   uint32_t old_count;
371
372   old_count = op->state->my_elements_count;
373   op->spec->salt = ntohl (msg->sender_mutator);
374   
375   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init (&msg[1],
376                                                             BLOOMFILTER_SIZE,
377                                                             ntohl (msg->bloomfilter_length));
378   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
379                                                            BLOOMFILTER_SIZE,
380                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
381   switch (op->state->phase)
382   {
383   case PHASE_INITIAL:
384     // If we are ot our first msg
385     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_elements_count, GNUNET_YES);
386
387     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
388                                            &iterator_initialization_by_alice,
389                                            op);
390     break;
391   case PHASE_BF_EXCHANGE:
392   case PHASE_MAYBE_FINISHED:
393     // if we are bob or alice and are continuing operation
394     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
395                                            &iterator_bf_round,
396                                            op);
397     break;
398   default:
399     GNUNET_break_op (0);
400     fail_intersection_operation(op);
401   }
402   // the iterators created a new BF with salt+1
403   // the peer needs this information for decoding the next BF
404   // this behavior can be modified at will later on.
405   op->spec->salt++;
406   
407   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
408   op->state->remote_bf = NULL;
409   
410   if ((op->state->phase == PHASE_MAYBE_FINISHED) 
411        && (old_count == op->state->my_elements_count)){
412     // In the last round we though we were finished, we now know this is correct
413     send_peer_done(op);
414     return;
415   }
416   
417   op->state->phase = PHASE_BF_EXCHANGE;
418   // maybe we are finished, but we do one more round to make certain
419   // we don't have false positives ...
420   if (op->state->my_elements_count == ntohl (msg->sender_element_count))
421       op->state->phase = PHASE_MAYBE_FINISHED;
422
423   send_bloomfilter (op);
424 }
425
426
427 /**
428  * Handle an BF message from a remote peer.
429  *
430  * @param cls the intersection operation
431  * @param mh the header of the message
432  */
433 static void
434 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
435 {
436   struct Operation *op = cls;
437   struct BFMessage *msg = (struct BFMessage *) mh;
438   uint32_t remote_element_count;
439
440   remote_element_count = ntohl(msg->sender_element_count);
441   if ((op->state->phase == PHASE_INITIAL)
442       || (op->state->my_elements_count > remote_element_count)){
443     GNUNET_break_op (0);
444     fail_intersection_operation(op);
445   }
446
447   op->state->phase = PHASE_BF_EXCHANGE;
448   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
449
450   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
451                                                            BLOOMFILTER_SIZE,
452                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
453   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
454                                          &iterator_initialization,
455                                          op);
456
457   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
458   op->state->remote_bf = NULL;
459
460   if (op->state->my_elements_count == ntohl (msg->sender_element_count))
461     op->state->phase = PHASE_MAYBE_FINISHED;
462
463   send_bloomfilter (op);
464 }
465
466
467 /**
468  * Send a bloomfilter to our peer.
469  * that the operation is over.
470  * After the result done message has been sent to the client,
471  * destroy the evaluate operation.
472  *
473  * @param eo intersection operation
474  */
475 static void
476 send_bloomfilter (struct Operation *op)
477 {
478   struct GNUNET_MQ_Envelope *ev;
479   struct BFMessage *msg;
480   uint32_t bf_size;
481
482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n",);
483
484   // send our bloomfilter
485   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
486
487   ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
488   msg->reserved = 0;
489   msg->sender_element_count = htonl (op->state->my_elements_count);
490   msg->bloomfilter_length = htonl (bf_size);
491   msg->sender_mutator = htonl (op->spec->salt);
492   GNUNET_assert (GNUNET_SYSERR !=
493                  GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
494                                                             &msg->sender_bf_data,
495                                                             BLOOMFILTER_SIZE));
496   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
497   op->state->local_bf = NULL;
498   GNUNET_MQ_send (op->mq, ev);
499 }
500
501 /**
502  * Send our element to the peer, in case our element count is lower than his
503  *
504  * @param eo intersection operation
505  */
506 static void
507 send_element_count (struct Operation *op)
508 {
509   struct GNUNET_MQ_Envelope *ev;
510   struct BFMessage *msg;
511
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
513
514   // just send our element count, as the other peer must start
515   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
516   msg->reserved = 0;
517   msg->sender_element_count = htonl (op->state->my_elements_count);
518   msg->bloomfilter_length = htonl (0);
519   msg->sender_mutator = htonl (0);
520
521   GNUNET_MQ_send (op->mq, ev);
522 }
523
524 /**
525  * Send a result message to the client indicating
526  * that the operation is over.
527  * After the result done message has been sent to the client,
528  * destroy the evaluate operation.
529  *
530  * @param op intersection operation
531  */
532 static void
533 finish_and_destroy (struct Operation *op)
534 {
535   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
536
537   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
538   {
539     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
540     op->state->full_result_iter =
541         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements);
542     send_remaining_elements (op);
543     return;
544   }
545   send_client_done_and_destroy (op);
546 }
547
548 /**
549  * Handle a done message from a remote peer
550  *
551  * @param cls the union operation
552  * @param mh the message
553  */
554 static void
555 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
556 {
557   struct Operation *op = cls;
558   struct GNUNET_MQ_Envelope *ev;
559
560   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
561     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
562     
563     finish_and_destroy (op);
564     return;
565   }
566   
567   GNUNET_break_op (0);
568   fail_intersection_operation (op);
569 }
570
571
572 /**
573  * Evaluate a union operation with
574  * a remote peer.
575  *
576  * @param op operation to evaluate
577  */
578 static void
579 intersection_evaluate (struct Operation *op)
580 {
581   op->state = GNUNET_new (struct OperationState);
582   /* we started the operation, thus we have to send the operation request */
583   op->state->phase = PHASE_INITIAL;
584   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
585   GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 
586                                         &iterator_element_count,
587                                         op);
588   
589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
590   send_operation_request (op);
591 }
592
593 /**
594  * Accept an union operation request from a remote peer.
595  * Only initializes the private operation state.
596  *
597  * @param op operation that will be accepted as a union operation
598  */
599 static void
600 intersection_accept (struct Operation *op)
601 {
602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
603   op->state = GNUNET_new (struct OperationState);
604   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
605   GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 
606                                         &iterator_element_count,
607                                         op);
608   // if Alice (the peer) has more elements than Bob (us), she should start
609   if (op->spec->element_count < op->state->my_elements_count){
610     op->state->phase = PHASE_INITIAL;
611     send_element_count(op);
612     return;
613   }
614   // create a new bloomfilter in case we have fewer elements
615   op->state->phase = PHASE_BF_EXCHANGE;
616   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
617                                                            BLOOMFILTER_SIZE,
618                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
619   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
620                                          &iterator_initialization,
621                                          op);
622   send_bloomfilter (op);
623 }
624
625
626 /**
627  * Create a new set supporting the intersection operation
628  *
629  * @return the newly created set
630  */
631 static struct SetState *
632 intersection_set_create (void)
633 {
634   struct SetState *set_state;
635
636   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
637
638   set_state = GNUNET_new (struct SetState);
639   
640   return set_state;
641 }
642
643
644 /**
645  * Add the element from the given element message to the set.
646  *
647  * @param set_state state of the set want to add to
648  * @param ee the element to add to the set
649  */
650 static void
651 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
652 {
653   //nothing to do here
654 }
655
656
657 /**
658  * Destroy a set that supports the intersection operation
659  *
660  * @param set_state the set to destroy
661  */
662 static void
663 intersection_set_destroy (struct SetState *set_state)
664 {
665   GNUNET_free (set_state);
666 }
667
668
669 /**
670  * Remove the element given in the element message from the set.
671  *
672  * @param set_state state of the set to remove from
673  * @param element set element to remove
674  */
675 static void
676 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
677 {
678   //nothing to do here
679 }
680
681
682 /**
683  * Dispatch messages for a intersection operation.
684  *
685  * @param eo the state of the intersection evaluate operation
686  * @param mh the received message
687  * @return GNUNET_SYSERR if the tunnel should be disconnected,
688  *         GNUNET_OK otherwise
689  */
690 int
691 intersection_handle_p2p_message (struct Operation *op,
692                                  const struct GNUNET_MessageHeader *mh)
693 {
694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
695               ntohs (mh->type), ntohs (mh->size));
696   switch (ntohs (mh->type))
697   {
698     /* this message handler is not active until after we received an
699      * operation request message, thus the ops request is not handled here
700      */
701   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
702     handle_p2p_element_info (op, mh);
703     break;
704   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
705     handle_p2p_bf (op, mh);
706     break;
707   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
708     handle_p2p_done (op, mh);
709     break;
710   default:
711     /* something wrong with mesh's message handlers? */
712     GNUNET_assert (0);
713   }
714   return GNUNET_OK;
715 }
716
717 /**
718  * Signal to the client that the operation has finished and
719  * destroy the operation.
720  *
721  * @param cls operation to destroy
722  */
723 static void
724 send_client_done_and_destroy (void *cls)
725 {
726   struct Operation *op = cls;
727   struct GNUNET_MQ_Envelope *ev;
728   struct GNUNET_SET_ResultMessage *rm;
729   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
730   rm->request_id = htonl (op->spec->client_request_id);
731   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
732   rm->element_type = htons (0);
733   GNUNET_MQ_send (op->spec->set->client_mq, ev);
734   _GSS_operation_destroy (op);
735 }
736 /**
737  * Send all elements in the full result iterator.
738  *
739  * @param cls operation
740  */
741 static void
742 send_remaining_elements (void *cls)
743 {
744   struct Operation *op = cls;
745   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
746   struct GNUNET_MQ_Envelope *ev;
747   struct GNUNET_SET_ResultMessage *rm;
748   struct GNUNET_SET_Element *element;
749   int res;
750
751   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
752   if (GNUNET_NO == res) {
753     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
754     send_client_done_and_destroy (op);
755     return;
756   }
757   
758   element = &remaining->element;
759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
760   GNUNET_assert (0 != op->spec->client_request_id);
761   
762   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
763   GNUNET_assert (NULL != ev);
764   
765   rm->result_status = htons (GNUNET_SET_STATUS_OK);
766   rm->request_id = htonl (op->spec->client_request_id);
767   rm->element_type = element->type;
768   memcpy (&rm[1], element->data, element->size);
769
770   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
771   GNUNET_MQ_send (op->spec->set->client_mq, ev);
772 }
773
774 /**
775  * handler for peer-disconnects, notifies the client about the aborted operation
776  * 
777  * @param op the destroyed operation
778  */
779 static void
780 intersection_peer_disconnect (struct Operation *op)
781 {
782   if (PHASE_FINISHED != op->state->phase)
783   {
784     struct GNUNET_MQ_Envelope *ev;
785     struct GNUNET_SET_ResultMessage *msg;
786
787     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
788     msg->request_id = htonl (op->spec->client_request_id);
789     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
790     msg->element_type = htons (0);
791     GNUNET_MQ_send (op->spec->set->client_mq, ev);
792     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
793     _GSS_operation_destroy (op);
794     return;
795   }
796   // else: the session has already been concluded
797   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
798   if (GNUNET_NO == op->state->client_done_sent)
799     finish_and_destroy (op);
800 }
801
802
803 /**
804  * Destroy the union operation.  Only things specific to the union operation are destroyed.
805  * 
806  * @param op union operation to destroy
807  */
808 static void
809 intersection_op_cancel (struct Operation *op)
810 {
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
812   /* check if the op was canceled twice */
813   GNUNET_assert (NULL != op->state);
814   if (NULL != op->state->remote_bf)
815   {
816     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
817     op->state->remote_bf = NULL;
818   }
819   if (NULL != op->state->local_bf)
820   {
821     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
822     op->state->local_bf = NULL;
823   }
824   if (NULL != op->state->my_elements)
825   {
826     // no need to free the elements, they are still part of the set
827     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
828     op->state->my_elements = NULL;
829   }
830   GNUNET_free (op->state);
831   op->state = NULL;
832   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
833 }
834
835 const struct SetVT *
836 _GSS_intersection_vt ()
837 {
838   static const struct SetVT intersection_vt = {
839     .create = &intersection_set_create,
840     .msg_handler = &intersection_handle_p2p_message,
841     .add = &intersection_add,
842     .remove = &intersection_remove,
843     .destroy_set = &intersection_set_destroy,
844     .evaluate = &intersection_evaluate,
845     .accept = &intersection_accept,
846     .peer_disconnect = &intersection_peer_disconnect,
847     .cancel = &intersection_op_cancel,
848   };
849
850   return &intersection_vt;
851 }