session time out for http client/server
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
34
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
36                           do { \
37                             k = ceil(1 + log2((double) (2*B / (double) A)));\
38                             s = ceil((double) (A * k / log(2))); \
39                           } while (0)
40
41 /**
42  * Current phase we are in for a intersection operation.
43  */
44 enum IntersectionOperationPhase
45 {
46   /**
47    * Alices has suggested an operation to bob,
48    * and is waiting for a bf or session end.
49    */
50   PHASE_INITIAL,
51   /**
52    * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53    * until one notices the their element count is equal
54    */
55   PHASE_BF_EXCHANGE,
56   /**
57    * Multipart continuation of BF_exchange
58    */
59   PHASE_BF_AWAIT_MULTIPART,
60   /**
61    * if both peers have an equal peercount, they enter this state for
62    * one more turn, to see if they actually have agreed on a correct set.
63    * if a peer finds the same element count after the next iteration,
64    * it ends the the session
65    */
66   PHASE_MAYBE_FINISHED,
67   /**
68    * The protocol is over.
69    * Results may still have to be sent to the client.
70    */
71   PHASE_FINISHED
72 };
73
74
75 /**
76  * State of an evaluate operation
77  * with another peer.
78  */
79 struct OperationState
80 {
81   /**
82    * The bf we currently receive
83    */
84   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
85
86   /**
87    * BF of the set's element.
88    */
89   struct GNUNET_CONTAINER_BloomFilter *local_bf;
90
91   /**
92    * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
93    */
94   char * local_bf_data;
95
96   /**
97    * size of the bloomfilter
98    */
99   uint32_t local_bf_data_size;
100   
101   /**
102    * Current state of the operation.
103    */
104   enum IntersectionOperationPhase phase;
105
106   /**
107    * Generation in which the operation handle
108    * was created.
109    */
110   unsigned int generation_created;
111
112   /**
113    * Maps element-id-hashes to 'elements in our set'.
114    */
115   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
116
117   /**
118    * Current element count contained within contained_elements
119    */
120   uint32_t my_element_count;
121
122   /**
123    * Iterator for sending elements on the key to element mapping to the client.
124    */
125   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
126
127   /**
128    * Evaluate operations are held in
129    * a linked list.
130    */
131   struct OperationState *next;
132
133    /**
134     * Evaluate operations are held in
135     * a linked list.
136     */
137   struct OperationState *prev;
138
139   /**
140    * Did we send the client that we are done?
141    */
142   int client_done_sent;
143 };
144
145
146 /**
147  * Extra state required for efficient set intersection.
148  */
149 struct SetState
150 {
151   /**
152    * Number of currently valid elements in the set which have not been removed
153    */
154   uint32_t current_set_element_count;
155 };
156
157
158 /**
159  * Alice's version:
160  *
161  * fills the contained-elements hashmap with all relevant
162  * elements and adds their mutated hashes to our local bloomfilter with mutator+1
163  *
164  * @param cls closure
165  * @param key current key code
166  * @param value value in the hash map
167  * @return #GNUNET_YES if we should continue to
168  *         iterate,
169  *         #GNUNET_NO if not.
170  */
171 static int
172 iterator_initialization_by_alice (void *cls,
173                                   const struct GNUNET_HashCode *key,
174                                   void *value)
175 {
176   struct ElementEntry *ee = value;
177   struct Operation *op = cls;
178   struct GNUNET_HashCode mutated_hash;
179
180   //only consider this element, if it is valid for us
181   if ((op->generation_created >= ee->generation_removed)
182        || (op->generation_created < ee->generation_added))
183     return GNUNET_YES;
184
185   // not contained according to bob's bloomfilter
186   GNUNET_BLOCK_mingle_hash(&ee->element_hash,
187                            op->spec->salt,
188                            &mutated_hash);
189   if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
190                                                       &mutated_hash))
191     return GNUNET_YES;
192
193   op->state->my_element_count++;
194   GNUNET_assert (GNUNET_YES ==
195                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
196                                                     &ee->element_hash, ee,
197                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
198
199   return GNUNET_YES;
200 }
201
202 /**
203  * fills the contained-elements hashmap with all relevant
204  * elements and adds their mutated hashes to our local bloomfilter
205  *
206  * @param cls closure
207  * @param key current key code
208  * @param value value in the hash map
209  * @return #GNUNET_YES if we should continue to
210  *         iterate,
211  *         #GNUNET_NO if not.
212  */
213 static int
214 iterator_initialization (void *cls,
215                          const struct GNUNET_HashCode *key,
216                          void *value)
217 {
218   struct ElementEntry *ee = value;
219   struct Operation *op = cls;
220   struct GNUNET_HashCode mutated_hash;
221
222   //only consider this element, if it is valid for us
223   if ((op->generation_created >= ee->generation_removed)
224        || (op->generation_created < ee->generation_added))
225     return GNUNET_YES;
226
227   GNUNET_assert (GNUNET_YES ==
228                  GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
229                                                     &ee->element_hash, ee,
230                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
231   return GNUNET_YES;
232 }
233
234
235 /**
236  * removes element from a hashmap if it is not contained within the
237  * provided remote bloomfilter. Then, fill our new bloomfilter.
238  *
239  * @param cls closure
240  * @param key current key code
241  * @param value value in the hash map
242  * @return #GNUNET_YES if we should continue to
243  *         iterate,
244  *         #GNUNET_NO if not.
245  */
246 static int
247 iterator_bf_reduce (void *cls,
248                    const struct GNUNET_HashCode *key,
249                    void *value)
250 {
251   struct ElementEntry *ee = value;
252   struct Operation *op = cls;
253   struct GNUNET_HashCode mutated_hash;
254
255   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
256
257   if (GNUNET_NO ==
258       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
259                                          &mutated_hash))
260   {
261     op->state->my_element_count--;
262     GNUNET_assert (GNUNET_YES ==
263                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
264                                                          &ee->element_hash,
265                                                          ee));
266   }
267
268   return GNUNET_YES;
269 }
270
271 /**
272  * create a bloomfilter based on the elements given
273  *
274  * @param cls closure
275  * @param key current key code
276  * @param value value in the hash map
277  * @return #GNUNET_YES if we should continue to
278  *         iterate,
279  *         #GNUNET_NO if not.
280  */
281 static int
282 iterator_bf_create (void *cls,
283                    const struct GNUNET_HashCode *key,
284                    void *value)
285 {
286   struct ElementEntry *ee = value;
287   struct Operation *op = cls;
288   struct GNUNET_HashCode mutated_hash;
289
290   GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
291
292   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
293                                     &mutated_hash);
294   return GNUNET_YES;
295 }
296
297 /**
298  * Inform the client that the union operation has failed,
299  * and proceed to destroy the evaluate operation.
300  *
301  * @param op the intersection operation to fail
302  */
303 static void
304 fail_intersection_operation (struct Operation *op)
305 {
306   struct GNUNET_MQ_Envelope *ev;
307   struct GNUNET_SET_ResultMessage *msg;
308
309   if (op->state->my_elements)
310     GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
311
312   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
313
314   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
315   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
316   msg->request_id = htonl (op->spec->client_request_id);
317   msg->element_type = htons (0);
318   GNUNET_MQ_send (op->spec->set->client_mq, ev);
319   _GSS_operation_destroy (op);
320 }
321
322
323 /**
324  * Send a request for the evaluate operation to a remote peer
325  *
326  * @param op operation with the other peer
327  */
328 static void
329 send_operation_request (struct Operation *op)
330 {
331   struct GNUNET_MQ_Envelope *ev;
332   struct OperationRequestMessage *msg;
333
334   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
335                                 op->spec->context_msg);
336
337   if (NULL == ev)
338   {
339     /* the context message is too large */
340     GNUNET_break (0);
341     GNUNET_SERVER_client_disconnect (op->spec->set->client);
342     return;
343   }
344   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
345   msg->app_id = op->spec->app_id;
346   msg->salt = htonl (op->spec->salt);
347   msg->element_count = htonl(op->state->my_element_count);
348
349   GNUNET_MQ_send (op->mq, ev);
350
351   if (NULL != op->spec->context_msg)
352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
353   else
354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
355
356   if (NULL != op->spec->context_msg)
357   {
358     GNUNET_free (op->spec->context_msg);
359     op->spec->context_msg = NULL;
360   }
361 }
362
363 static void
364 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
365 {
366   struct GNUNET_MQ_Envelope *ev;
367   struct BFPart *msg;
368   uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
369   uint32_t todo_size = op->state->local_bf_data_size - offset;
370
371   if (todo_size < chunk_size)
372     chunk_size = todo_size;
373
374   ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
375
376   msg->bloomfilter_length = htonl (chunk_size);
377   msg->bloomfilter_offset = htonl (offset);
378   memcpy(&msg[1], &op->state->local_bf_data[offset], chunk_size);
379
380   GNUNET_MQ_send (op->mq, ev);
381
382   if (op->state->local_bf_data_size == offset + chunk_size)
383   {
384     // done
385     GNUNET_free(op->state->local_bf_data);
386     op->state->local_bf_data = NULL;
387     return;
388   }
389
390   send_bloomfilter_multipart (op, offset + chunk_size);
391 }
392
393 /**
394  * Send a bloomfilter to our peer.
395  * that the operation is over.
396  * After the result done message has been sent to the client,
397  * destroy the evaluate operation.
398  *
399  * @param op intersection operation
400  */
401 static void
402 send_bloomfilter (struct Operation *op)
403 {
404   struct GNUNET_MQ_Envelope *ev;
405   struct BFMessage *msg;
406   uint32_t bf_size;
407   uint32_t bf_elementbits;
408   uint32_t chunk_size;
409   struct GNUNET_CONTAINER_BloomFilter * local_bf;
410
411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
412   
413   CALCULATE_BF_SIZE(op->state->my_element_count,
414                     op->spec->remote_element_count,
415                     bf_size,
416                     bf_elementbits);
417
418   local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
419                                                 bf_size,
420                                                 bf_elementbits);
421
422   op->spec->salt++;
423   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
424                                          &iterator_bf_create,
425                                          op);
426
427   // send our bloomfilter
428   if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
429     // singlepart
430     chunk_size = bf_size;
431     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
432     GNUNET_assert (GNUNET_SYSERR !=
433                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
434                                                               &msg[1],
435                                                               bf_size));
436   }
437   else {
438     //multipart
439     chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
440     ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
441     op->state->local_bf_data = (char *) GNUNET_malloc (bf_size);
442     GNUNET_assert (GNUNET_SYSERR !=
443                    GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
444                                                               op->state->local_bf_data,
445                                                               bf_size));
446     memcpy (&msg[1], op->state->local_bf_data, chunk_size);
447     op->state->local_bf_data_size = bf_size;
448   }
449   GNUNET_CONTAINER_bloomfilter_free (local_bf);
450
451   msg->sender_element_count = htonl (op->state->my_element_count);
452   msg->bloomfilter_total_length = htonl (bf_size);
453   msg->bloomfilter_length = htonl (chunk_size);
454   msg->bits_per_element = htonl (bf_elementbits);
455   msg->sender_mutator = htonl (op->spec->salt);
456
457   GNUNET_MQ_send (op->mq, ev);
458
459   if (op->state->local_bf_data)
460     send_bloomfilter_multipart (op, chunk_size);
461 }
462
463
464 /**
465  * Signal to the client that the operation has finished and
466  * destroy the operation.
467  *
468  * @param cls operation to destroy
469  */
470 static void
471 send_client_done_and_destroy (void *cls)
472 {
473   struct Operation *op = cls;
474   struct GNUNET_MQ_Envelope *ev;
475   struct GNUNET_SET_ResultMessage *rm;
476   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
477   rm->request_id = htonl (op->spec->client_request_id);
478   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
479   rm->element_type = htons (0);
480   GNUNET_MQ_send (op->spec->set->client_mq, ev);
481   _GSS_operation_destroy (op);
482 }
483
484
485 /**
486  * Send all elements in the full result iterator.
487  *
488  * @param cls operation
489  */
490 static void
491 send_remaining_elements (void *cls)
492 {
493   struct Operation *op = cls;
494   struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
495   struct GNUNET_MQ_Envelope *ev;
496   struct GNUNET_SET_ResultMessage *rm;
497   struct GNUNET_SET_Element *element;
498   int res;
499
500   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
501   if (GNUNET_NO == res) {
502     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
503     send_client_done_and_destroy (op);
504     return;
505   }
506
507   element = &remaining->element;
508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
509   GNUNET_assert (0 != op->spec->client_request_id);
510
511   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
512   GNUNET_assert (NULL != ev);
513
514   rm->result_status = htons (GNUNET_SET_STATUS_OK);
515   rm->request_id = htonl (op->spec->client_request_id);
516   rm->element_type = element->type;
517   memcpy (&rm[1], element->data, element->size);
518
519   GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
520   GNUNET_MQ_send (op->spec->set->client_mq, ev);
521 }
522
523
524 /**
525  * Inform the peer that this operation is complete.
526  *
527  * @param op the intersection operation to fail
528  */
529 static void
530 send_peer_done (struct Operation *op)
531 {
532   struct GNUNET_MQ_Envelope *ev;
533
534   op->state->phase = PHASE_FINISHED;
535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
536   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
537   op->state->local_bf = NULL;
538
539   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
540   GNUNET_MQ_send (op->mq, ev);
541 }
542
543 /**
544  * Handle an BF multipart message from a remote peer.
545  *
546  * @param cls the intersection operation
547  * @param mh the header of the message
548  */
549 static void
550 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
551 {
552   struct Operation *op = cls;
553   const struct BFPart *msg = (const struct BFPart *) mh;
554   
555   if (op->state->phase != PHASE_BF_AWAIT_MULTIPART){
556     GNUNET_break_op (0);
557     fail_intersection_operation(op);
558     return;
559   }
560   
561   
562 }
563
564 /**
565  * Handle an BF message from a remote peer.
566  *
567  * @param cls the intersection operation
568  * @param mh the header of the message
569  */
570 static void
571 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
572 {
573   struct Operation *op = cls;
574   const struct BFMessage *msg = (const struct BFMessage *) mh;
575   uint32_t old_elements;
576   uint32_t peer_elements;
577
578   old_elements = op->state->my_element_count;
579   op->spec->salt = ntohl (msg->sender_mutator);
580
581   op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
582                                                             ntohl (msg->bloomfilter_total_length),
583                                                             ntohl (msg->bits_per_element));
584   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
585                                                            BLOOMFILTER_SIZE,
586                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
587   switch (op->state->phase)
588   {
589   case PHASE_INITIAL:
590     // If we are ot our first msg
591     op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
592
593     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
594                                            &iterator_initialization_by_alice,
595                                            op);
596     break;
597   case PHASE_BF_EXCHANGE:
598   case PHASE_MAYBE_FINISHED:
599     // if we are bob or alice and are continuing operation
600     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
601                                            &iterator_bf_reduce,
602                                            op);
603     break;
604   default:
605     GNUNET_break_op (0);
606     fail_intersection_operation(op);
607   }
608   // the iterators created a new BF with salt+1
609   // the peer needs this information for decoding the next BF
610   // this behavior can be modified at will later on.
611   op->spec->salt++;
612
613   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
614   op->state->remote_bf = NULL;
615
616   peer_elements = ntohl(msg->sender_element_count);
617   if ((op->state->phase == PHASE_MAYBE_FINISHED)
618        && (old_elements == op->state->my_element_count)
619        && (op->state->my_element_count == peer_elements)){
620     // In the last round we though we were finished, we now know this is correct
621     send_peer_done(op);
622     return;
623   }
624
625   op->state->phase = PHASE_BF_EXCHANGE;
626   // maybe we are finished, but we do one more round to make certain
627   // we don't have false positives ...
628   if (op->state->my_element_count == peer_elements)
629       op->state->phase = PHASE_MAYBE_FINISHED;
630
631   send_bloomfilter (op);
632 }
633
634
635 /**
636  * Handle an BF message from a remote peer.
637  *
638  * @param cls the intersection operation
639  * @param mh the header of the message
640  */
641 static void
642 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
643 {
644   struct Operation *op = cls;
645   struct BFMessage *msg = (struct BFMessage *) mh;
646
647   op->spec->remote_element_count = ntohl(msg->sender_element_count);
648   if ((op->state->phase != PHASE_INITIAL)
649       || (op->state->my_element_count > op->spec->remote_element_count)){
650     GNUNET_break_op (0);
651     fail_intersection_operation(op);
652   }
653
654   op->state->phase = PHASE_BF_EXCHANGE;
655   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
656   
657   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
658                                          &iterator_initialization,
659                                          op);
660
661   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
662   op->state->remote_bf = NULL;
663
664   if (op->state->my_element_count == ntohl (msg->sender_element_count))
665     op->state->phase = PHASE_MAYBE_FINISHED;
666
667   send_bloomfilter (op);
668 }
669
670
671 /**
672  * Send our element to the peer, in case our element count is lower than his
673  *
674  * @param op intersection operation
675  */
676 static void
677 send_element_count (struct Operation *op)
678 {
679   struct GNUNET_MQ_Envelope *ev;
680   struct BFMessage *msg;
681
682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
683
684   // just send our element count, as the other peer must start
685   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
686   msg->sender_element_count = htonl (op->state->my_element_count);
687   msg->bloomfilter_length = htonl (0);
688   msg->sender_mutator = htonl (0);
689
690   GNUNET_MQ_send (op->mq, ev);
691 }
692
693
694 /**
695  * Send a result message to the client indicating
696  * that the operation is over.
697  * After the result done message has been sent to the client,
698  * destroy the evaluate operation.
699  *
700  * @param op intersection operation
701  */
702 static void
703 finish_and_destroy (struct Operation *op)
704 {
705   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
706
707   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
708   {
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
710     op->state->full_result_iter =
711         GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
712     send_remaining_elements (op);
713     return;
714   }
715   send_client_done_and_destroy (op);
716 }
717
718
719 /**
720  * Handle a done message from a remote peer
721  *
722  * @param cls the union operation
723  * @param mh the message
724  */
725 static void
726 handle_p2p_done (void *cls,
727                  const struct GNUNET_MessageHeader *mh)
728 {
729   struct Operation *op = cls;
730
731   if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
732     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
733
734     finish_and_destroy (op);
735     return;
736   }
737
738   GNUNET_break_op (0);
739   fail_intersection_operation (op);
740 }
741
742
743 /**
744  * Evaluate a union operation with
745  * a remote peer.
746  *
747  * @param op operation to evaluate
748  */
749 static void
750 intersection_evaluate (struct Operation *op)
751 {
752   op->state = GNUNET_new (struct OperationState);
753   /* we started the operation, thus we have to send the operation request */
754   op->state->phase = PHASE_INITIAL;
755   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
756   op->state->my_element_count = op->spec->set->state->current_set_element_count;
757
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "evaluating intersection operation");
760   send_operation_request (op);
761 }
762
763
764 /**
765  * Accept an union operation request from a remote peer.
766  * Only initializes the private operation state.
767  *
768  * @param op operation that will be accepted as a union operation
769  */
770 static void
771 intersection_accept (struct Operation *op)
772 {
773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
774   op->state = GNUNET_new (struct OperationState);
775   op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
776   op->state->my_element_count = op->spec->set->state->current_set_element_count;
777
778   // if Alice (the peer) has more elements than Bob (us), she should start
779   if (op->spec->remote_element_count < op->state->my_element_count){
780     op->state->phase = PHASE_INITIAL;
781     send_element_count(op);
782     return;
783   }
784   // create a new bloomfilter in case we have fewer elements
785   op->state->phase = PHASE_BF_EXCHANGE;
786   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
787                                                            BLOOMFILTER_SIZE,
788                                                            GNUNET_CONSTANTS_BLOOMFILTER_K);
789   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
790                                          &iterator_initialization,
791                                          op);
792   send_bloomfilter (op);
793 }
794
795
796 /**
797  * Create a new set supporting the intersection operation
798  *
799  * @return the newly created set
800  */
801 static struct SetState *
802 intersection_set_create ()
803 {
804   struct SetState *set_state;
805
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "intersection set created\n");
808   set_state = GNUNET_new (struct SetState);
809   set_state->current_set_element_count = 0;
810
811   return set_state;
812 }
813
814
815 /**
816  * Add the element from the given element message to the set.
817  *
818  * @param set_state state of the set want to add to
819  * @param ee the element to add to the set
820  */
821 static void
822 intersection_add (struct SetState *set_state,
823                   struct ElementEntry *ee)
824 {
825   GNUNET_assert(0 < set_state->current_set_element_count);
826   set_state->current_set_element_count++;
827 }
828
829
830 /**
831  * Destroy a set that supports the intersection operation
832  *
833  * @param set_state the set to destroy
834  */
835 static void
836 intersection_set_destroy (struct SetState *set_state)
837 {
838   GNUNET_free (set_state);
839 }
840
841
842 /**
843  * Remove the element given in the element message from the set.
844  *
845  * @param set_state state of the set to remove from
846  * @param element set element to remove
847  */
848 static void
849 intersection_remove (struct SetState *set_state,
850                      struct ElementEntry *element)
851 {
852   GNUNET_assert(0 < set_state->current_set_element_count);
853   set_state->current_set_element_count--;
854 }
855
856
857 /**
858  * Dispatch messages for a intersection operation.
859  *
860  * @param op the state of the intersection evaluate operation
861  * @param mh the received message
862  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
863  *         #GNUNET_OK otherwise
864  */
865 static int
866 intersection_handle_p2p_message (struct Operation *op,
867                                  const struct GNUNET_MessageHeader *mh)
868 {
869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
870               ntohs (mh->type), ntohs (mh->size));
871   switch (ntohs (mh->type))
872   {
873     /* this message handler is not active until after we received an
874      * operation request message, thus the ops request is not handled here
875      */
876   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
877     handle_p2p_element_info (op, mh);
878     break;
879   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
880     handle_p2p_bf (op, mh);
881     break;
882   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
883     handle_p2p_bf_part (op, mh);
884     break;
885   case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
886     handle_p2p_done (op, mh);
887     break;
888   default:
889     /* something wrong with mesh's message handlers? */
890     GNUNET_assert (0);
891   }
892   return GNUNET_OK;
893 }
894
895
896 /**
897  * handler for peer-disconnects, notifies the client about the aborted operation
898  *
899  * @param op the destroyed operation
900  */
901 static void
902 intersection_peer_disconnect (struct Operation *op)
903 {
904   if (PHASE_FINISHED != op->state->phase)
905   {
906     struct GNUNET_MQ_Envelope *ev;
907     struct GNUNET_SET_ResultMessage *msg;
908
909     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
910     msg->request_id = htonl (op->spec->client_request_id);
911     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
912     msg->element_type = htons (0);
913     GNUNET_MQ_send (op->spec->set->client_mq, ev);
914     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
915     _GSS_operation_destroy (op);
916     return;
917   }
918   // else: the session has already been concluded
919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
920   if (GNUNET_NO == op->state->client_done_sent)
921     finish_and_destroy (op);
922 }
923
924
925 /**
926  * Destroy the union operation.  Only things specific to the union operation are destroyed.
927  *
928  * @param op union operation to destroy
929  */
930 static void
931 intersection_op_cancel (struct Operation *op)
932 {
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
934   /* check if the op was canceled twice */
935   GNUNET_assert (NULL != op->state);
936   if (NULL != op->state->remote_bf)
937   {
938     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
939     op->state->remote_bf = NULL;
940   }
941   if (NULL != op->state->local_bf)
942   {
943     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
944     op->state->local_bf = NULL;
945   }
946   if (NULL != op->state->my_elements)
947   {
948     // no need to free the elements, they are still part of the set
949     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
950     op->state->my_elements = NULL;
951   }
952   GNUNET_free (op->state);
953   op->state = NULL;
954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
955 }
956
957 const struct SetVT *
958 _GSS_intersection_vt ()
959 {
960   static const struct SetVT intersection_vt = {
961     .create = &intersection_set_create,
962     .msg_handler = &intersection_handle_p2p_message,
963     .add = &intersection_add,
964     .remove = &intersection_remove,
965     .destroy_set = &intersection_set_destroy,
966     .evaluate = &intersection_evaluate,
967     .accept = &intersection_accept,
968     .peer_disconnect = &intersection_peer_disconnect,
969     .cancel = &intersection_op_cancel,
970   };
971
972   return &intersection_vt;
973 }