added seeding function to the api
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * We are just starting.
41    */
42   PHASE_INITIAL,
43
44   /**
45    * We have send the number of our elements to the other
46    * peer, but did not setup our element set yet.
47    */
48   PHASE_COUNT_SENT,
49
50   /**
51    * We have initialized our set and are now reducing it by exchanging
52    * Bloom filters until one party notices the their element hashes
53    * are equal.
54    */
55   PHASE_BF_EXCHANGE,
56
57   /**
58    * The protocol is over.  Results may still have to be sent to the
59    * client.
60    */
61   PHASE_FINISHED
62 };
63
64
65 /**
66  * State of an evaluate operation with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79
80   /**
81    * Remaining elements in the intersection operation.
82    * Maps element-id-hashes to 'elements in our set'.
83    */
84   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
85
86   /**
87    * Iterator for sending the final set of @e my_elements to the client.
88    */
89   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
90
91   /**
92    * Evaluate operations are held in a linked list.
93    */
94   struct OperationState *next;
95
96   /**
97    * Evaluate operations are held in a linked list.
98    */
99   struct OperationState *prev;
100
101   /**
102    * For multipart BF transmissions, we have to store the
103    * bloomfilter-data until we fully received it.
104    */
105   char *bf_data;
106
107   /**
108    * XOR of the keys of all of the elements (remaining) in my set.
109    * Always updated when elements are added or removed to
110    * @e my_elements.
111    */
112   struct GNUNET_HashCode my_xor;
113
114   /**
115    * XOR of the keys of all of the elements (remaining) in
116    * the other peer's set.  Updated when we receive the
117    * other peer's Bloom filter.
118    */
119   struct GNUNET_HashCode other_xor;
120
121   /**
122    * How many bytes of @e bf_data are valid?
123    */
124   uint32_t bf_data_offset;
125
126   /**
127    * Current element count contained within @e my_elements.
128    * (May differ briefly during initialization.)
129    */
130   uint32_t my_element_count;
131
132   /**
133    * size of the bloomfilter in @e bf_data.
134    */
135   uint32_t bf_data_size;
136
137   /**
138    * size of the bloomfilter
139    */
140   uint32_t bf_bits_per_element;
141
142   /**
143    * Salt currently used for BF construction (by us or the other peer,
144    * depending on where we are in the code).
145    */
146   uint32_t salt;
147
148   /**
149    * Current state of the operation.
150    */
151   enum IntersectionOperationPhase phase;
152
153   /**
154    * Generation in which the operation handle
155    * was created.
156    */
157   unsigned int generation_created;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163 };
164
165
166 /**
167  * Extra state required for efficient set intersection.
168  * Merely tracks the total number of elements.
169  */
170 struct SetState
171 {
172   /**
173    * Number of currently valid elements in the set which have not been
174    * removed.
175    */
176   uint32_t current_set_element_count;
177 };
178
179
180 /**
181  * If applicable in the current operation mode, send a result message
182  * to the client indicating we removed an element.
183  *
184  * @param op intersection operation
185  * @param element element to send
186  */
187 static void
188 send_client_removed_element (struct Operation *op,
189                              struct GNUNET_SET_Element *element)
190 {
191   struct GNUNET_MQ_Envelope *ev;
192   struct GNUNET_SET_ResultMessage *rm;
193
194   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
195     return; /* Wrong mode for transmitting removed elements */
196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
197               "Sending removed element (size %u) to client\n",
198               element->size);
199   GNUNET_assert (0 != op->spec->client_request_id);
200   ev = GNUNET_MQ_msg_extra (rm,
201                             element->size,
202                             GNUNET_MESSAGE_TYPE_SET_RESULT);
203   if (NULL == ev)
204   {
205     GNUNET_break (0);
206     return;
207   }
208   rm->result_status = htons (GNUNET_SET_STATUS_OK);
209   rm->request_id = htonl (op->spec->client_request_id);
210   rm->element_type = element->element_type;
211   memcpy (&rm[1],
212           element->data,
213           element->size);
214   GNUNET_MQ_send (op->spec->set->client_mq,
215                   ev);
216 }
217
218
219 /**
220  * Fills the "my_elements" hashmap with all relevant elements.
221  *
222  * @param cls the `struct Operation *` we are performing
223  * @param key current key code
224  * @param value the `struct ElementEntry *` from the hash map
225  * @return #GNUNET_YES (we should continue to iterate)
226  */
227 static int
228 filtered_map_initialization (void *cls,
229                              const struct GNUNET_HashCode *key,
230                              void *value)
231 {
232   struct Operation *op = cls;
233   struct ElementEntry *ee = value;
234   struct GNUNET_HashCode mutated_hash;
235
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "FIMA called for %s:%u\n",
239               GNUNET_h2s (&ee->element_hash),
240               ee->element.size);
241
242   if ( (op->generation_created < ee->generation_removed) &&
243        (op->generation_created >= ee->generation_added) )
244   {
245     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
247                 GNUNET_h2s (&ee->element_hash),
248                 ee->element.size);
249     return GNUNET_YES; /* element not valid in our operation's generation */
250   }
251
252   /* Test if element is in other peer's bloomfilter */
253   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
254                             op->state->salt,
255                             &mutated_hash);
256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257               "Testing mingled hash %s with salt %u\n",
258               GNUNET_h2s (&mutated_hash),
259               op->state->salt);
260   if (GNUNET_NO ==
261       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
262                                          &mutated_hash))
263   {
264     /* remove this element */
265     send_client_removed_element (op,
266                                  &ee->element);
267     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
268                 "Reduced initialization, not starting with %s:%u\n",
269                 GNUNET_h2s (&ee->element_hash),
270                 ee->element.size);
271     return GNUNET_YES;
272   }
273   op->state->my_element_count++;
274   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
275                           &ee->element_hash,
276                           &op->state->my_xor);
277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278               "Filtered initialization of my_elements, adding %s:%u\n",
279               GNUNET_h2s (&ee->element_hash),
280               ee->element.size);
281   GNUNET_break (GNUNET_YES ==
282                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
283                                                    &ee->element_hash,
284                                                    ee,
285                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
286
287   return GNUNET_YES;
288 }
289
290
291 /**
292  * Removes elements from our hashmap if they are not contained within the
293  * provided remote bloomfilter.
294  *
295  * @param cls closure with the `struct Operation *`
296  * @param key current key code
297  * @param value value in the hash map
298  * @return #GNUNET_YES (we should continue to iterate)
299  */
300 static int
301 iterator_bf_reduce (void *cls,
302                    const struct GNUNET_HashCode *key,
303                    void *value)
304 {
305   struct Operation *op = cls;
306   struct ElementEntry *ee = value;
307   struct GNUNET_HashCode mutated_hash;
308
309   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
310                             op->state->salt,
311                             &mutated_hash);
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Testing mingled hash %s with salt %u\n",
314               GNUNET_h2s (&mutated_hash),
315               op->state->salt);
316   if (GNUNET_NO ==
317       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
318                                          &mutated_hash))
319   {
320     GNUNET_break (0 < op->state->my_element_count);
321     op->state->my_element_count--;
322     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
323                             &ee->element_hash,
324                             &op->state->my_xor);
325     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326                 "Bloom filter reduction of my_elements, removing %s:%u\n",
327                 GNUNET_h2s (&ee->element_hash),
328                 ee->element.size);
329     GNUNET_assert (GNUNET_YES ==
330                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
331                                                          &ee->element_hash,
332                                                          ee));
333     send_client_removed_element (op,
334                                  &ee->element);
335   }
336   else
337   {
338     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
340                 GNUNET_h2s (&ee->element_hash),
341                 ee->element.size);
342   }
343   return GNUNET_YES;
344 }
345
346
347 /**
348  * Create initial bloomfilter based on all the elements given.
349  *
350  * @param cls the `struct Operation *`
351  * @param key current key code
352  * @param value the `struct ElementEntry` to process
353  * @return #GNUNET_YES (we should continue to iterate)
354  */
355 static int
356 iterator_bf_create (void *cls,
357                     const struct GNUNET_HashCode *key,
358                     void *value)
359 {
360   struct Operation *op = cls;
361   struct ElementEntry *ee = value;
362   struct GNUNET_HashCode mutated_hash;
363
364   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
365                             op->state->salt,
366                             &mutated_hash);
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Initializing BF with hash %s with salt %u\n",
369               GNUNET_h2s (&mutated_hash),
370               op->state->salt);
371   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
372                                     &mutated_hash);
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Inform the client that the intersection operation has failed,
379  * and proceed to destroy the evaluate operation.
380  *
381  * @param op the intersection operation to fail
382  */
383 static void
384 fail_intersection_operation (struct Operation *op)
385 {
386   struct GNUNET_MQ_Envelope *ev;
387   struct GNUNET_SET_ResultMessage *msg;
388
389   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
390               "Intersection operation failed\n");
391   if (NULL != op->state->my_elements)
392   {
393     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
394     op->state->my_elements = NULL;
395   }
396   ev = GNUNET_MQ_msg (msg,
397                       GNUNET_MESSAGE_TYPE_SET_RESULT);
398   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
399   msg->request_id = htonl (op->spec->client_request_id);
400   msg->element_type = htons (0);
401   GNUNET_MQ_send (op->spec->set->client_mq,
402                   ev);
403   _GSS_operation_destroy (op,
404                           GNUNET_YES);
405 }
406
407
408 /**
409  * Send a bloomfilter to our peer.  After the result done message has
410  * been sent to the client, destroy the evaluate operation.
411  *
412  * @param op intersection operation
413  */
414 static void
415 send_bloomfilter (struct Operation *op)
416 {
417   struct GNUNET_MQ_Envelope *ev;
418   struct BFMessage *msg;
419   uint32_t bf_size;
420   uint32_t bf_elementbits;
421   uint32_t chunk_size;
422   char *bf_data;
423   uint32_t offset;
424
425   /* We consider the ratio of the set sizes to determine
426      the number of bits per element, as the smaller set
427      should use more bits to maximize its set reduction
428      potential and minimize overall bandwidth consumption. */
429   bf_elementbits = 2 + ceil (log2((double)
430                              (op->spec->remote_element_count /
431                                    (double) op->state->my_element_count)));
432   if (bf_elementbits < 1)
433     bf_elementbits = 1; /* make sure k is not 0 */
434   /* optimize BF-size to ~50% of bits set */
435   bf_size = ceil ((double) (op->state->my_element_count
436                             * bf_elementbits / log(2)));
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Sending Bloom filter (%u) of size %u bytes\n",
439               (unsigned int) bf_elementbits,
440               (unsigned int) bf_size);
441   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
442                                                            bf_size,
443                                                            bf_elementbits);
444   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
445                                               UINT32_MAX);
446   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
447                                          &iterator_bf_create,
448                                          op);
449
450   /* send our Bloom filter */
451   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
452   if (bf_size <= chunk_size)
453   {
454     /* singlepart */
455     chunk_size = bf_size;
456     ev = GNUNET_MQ_msg_extra (msg,
457                               chunk_size,
458                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
459     GNUNET_assert (GNUNET_SYSERR !=
460                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
461                                                               (char*) &msg[1],
462                                                               bf_size));
463     msg->sender_element_count = htonl (op->state->my_element_count);
464     msg->bloomfilter_total_length = htonl (bf_size);
465     msg->bits_per_element = htonl (bf_elementbits);
466     msg->sender_mutator = htonl (op->state->salt);
467     msg->element_xor_hash = op->state->my_xor;
468     GNUNET_MQ_send (op->mq, ev);
469   }
470   else
471   {
472     /* multipart */
473     bf_data = GNUNET_malloc (bf_size);
474     GNUNET_assert (GNUNET_SYSERR !=
475                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
476                                                               bf_data,
477                                                               bf_size));
478     offset = 0;
479     while (offset < bf_size)
480     {
481       if (bf_size - chunk_size < offset)
482         chunk_size = bf_size - offset;
483       ev = GNUNET_MQ_msg_extra (msg,
484                                 chunk_size,
485                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
486       memcpy (&msg[1],
487               &bf_data[offset],
488               chunk_size);
489       offset += chunk_size;
490       msg->sender_element_count = htonl (op->state->my_element_count);
491       msg->bloomfilter_total_length = htonl (bf_size);
492       msg->bits_per_element = htonl (bf_elementbits);
493       msg->sender_mutator = htonl (op->state->salt);
494       msg->element_xor_hash = op->state->my_xor;
495       GNUNET_MQ_send (op->mq, ev);
496     }
497     GNUNET_free (bf_data);
498   }
499   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
500   op->state->local_bf = NULL;
501 }
502
503
504 /**
505  * Signal to the client that the operation has finished and
506  * destroy the operation.
507  *
508  * @param cls operation to destroy
509  */
510 static void
511 send_client_done_and_destroy (void *cls)
512 {
513   struct Operation *op = cls;
514   struct GNUNET_MQ_Envelope *ev;
515   struct GNUNET_SET_ResultMessage *rm;
516
517   ev = GNUNET_MQ_msg (rm,
518                       GNUNET_MESSAGE_TYPE_SET_RESULT);
519   rm->request_id = htonl (op->spec->client_request_id);
520   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
521   rm->element_type = htons (0);
522   GNUNET_MQ_send (op->spec->set->client_mq,
523                   ev);
524   _GSS_operation_destroy (op,
525                           GNUNET_YES);
526 }
527
528
529 /**
530  * Send all elements in the full result iterator.
531  *
532  * @param cls the `struct Operation *`
533  */
534 static void
535 send_remaining_elements (void *cls)
536 {
537   struct Operation *op = cls;
538   const void *nxt;
539   const struct ElementEntry *ee;
540   struct GNUNET_MQ_Envelope *ev;
541   struct GNUNET_SET_ResultMessage *rm;
542   const struct GNUNET_SET_Element *element;
543   int res;
544
545   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
546                                                      NULL,
547                                                      &nxt);
548   if (GNUNET_NO == res)
549   {
550     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551                 "Sending done and destroy because iterator ran out\n");
552     op->keep--;
553     send_client_done_and_destroy (op);
554     return;
555   }
556   ee = nxt;
557   element = &ee->element;
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Sending element %s:%u to client (full set)\n",
560               GNUNET_h2s (&ee->element_hash),
561               element->size);
562   GNUNET_assert (0 != op->spec->client_request_id);
563   ev = GNUNET_MQ_msg_extra (rm,
564                             element->size,
565                             GNUNET_MESSAGE_TYPE_SET_RESULT);
566   GNUNET_assert (NULL != ev);
567   rm->result_status = htons (GNUNET_SET_STATUS_OK);
568   rm->request_id = htonl (op->spec->client_request_id);
569   rm->element_type = element->element_type;
570   memcpy (&rm[1],
571           element->data,
572           element->size);
573   GNUNET_MQ_notify_sent (ev,
574                          &send_remaining_elements,
575                          op);
576   GNUNET_MQ_send (op->spec->set->client_mq,
577                   ev);
578 }
579
580
581 /**
582  * Inform the peer that this operation is complete.
583  *
584  * @param op the intersection operation to fail
585  */
586 static void
587 send_peer_done (struct Operation *op)
588 {
589   struct GNUNET_MQ_Envelope *ev;
590   struct IntersectionDoneMessage *idm;
591
592   op->state->phase = PHASE_FINISHED;
593   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594               "Intersection succeeded, sending DONE\n");
595   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
596   op->state->local_bf = NULL;
597
598   ev = GNUNET_MQ_msg (idm,
599                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
600   idm->final_element_count = htonl (op->state->my_element_count);
601   idm->element_xor_hash = op->state->my_xor;
602   GNUNET_MQ_send (op->mq,
603                   ev);
604 }
605
606
607 /**
608  * Process a Bloomfilter once we got all the chunks.
609  *
610  * @param op the intersection operation
611  */
612 static void
613 process_bf (struct Operation *op)
614 {
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
617               op->state->phase,
618               op->spec->remote_element_count,
619               op->state->my_element_count,
620               GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements));
621   switch (op->state->phase)
622   {
623   case PHASE_INITIAL:
624     GNUNET_break_op (0);
625     fail_intersection_operation(op);
626     return;
627   case PHASE_COUNT_SENT:
628     /* This is the first BF being sent, build our initial map with
629        filtering in place */
630     op->state->my_elements
631       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
632                                               GNUNET_YES);
633     op->state->my_element_count = 0;
634     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
635                                            &filtered_map_initialization,
636                                            op);
637     break;
638   case PHASE_BF_EXCHANGE:
639     /* Update our set by reduction */
640     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
641                                            &iterator_bf_reduce,
642                                            op);
643     break;
644   case PHASE_FINISHED:
645     GNUNET_break_op (0);
646     fail_intersection_operation(op);
647     return;
648   }
649   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
650   op->state->remote_bf = NULL;
651
652   if ( (0 == op->state->my_element_count) || /* fully disjoint */
653        ( (op->state->my_element_count == op->spec->remote_element_count) &&
654          (0 == memcmp (&op->state->my_xor,
655                        &op->state->other_xor,
656                        sizeof (struct GNUNET_HashCode))) ) )
657   {
658     /* we are done */
659     send_peer_done (op);
660     return;
661   }
662   op->state->phase = PHASE_BF_EXCHANGE;
663   send_bloomfilter (op);
664 }
665
666
667 /**
668  * Handle an BF message from a remote peer.
669  *
670  * @param cls the intersection operation
671  * @param mh the header of the message
672  */
673 static void
674 handle_p2p_bf (void *cls,
675                const struct GNUNET_MessageHeader *mh)
676 {
677   struct Operation *op = cls;
678   const struct BFMessage *msg;
679   uint32_t bf_size;
680   uint32_t chunk_size;
681   uint32_t bf_bits_per_element;
682   uint16_t msize;
683
684   msize = htons (mh->size);
685   if (msize < sizeof (struct BFMessage))
686   {
687     GNUNET_break_op (0);
688     fail_intersection_operation (op);
689     return;
690   }
691   msg = (const struct BFMessage *) mh;
692   switch (op->state->phase)
693   {
694   case PHASE_INITIAL:
695     GNUNET_break_op (0);
696     fail_intersection_operation (op);
697     break;
698   case PHASE_COUNT_SENT:
699   case PHASE_BF_EXCHANGE:
700     bf_size = ntohl (msg->bloomfilter_total_length);
701     bf_bits_per_element = ntohl (msg->bits_per_element);
702     chunk_size = msize - sizeof (struct BFMessage);
703     op->state->other_xor = msg->element_xor_hash;
704     if (bf_size == chunk_size)
705     {
706       if (NULL != op->state->bf_data)
707       {
708         GNUNET_break_op (0);
709         fail_intersection_operation (op);
710         return;
711       }
712       /* single part, done here immediately */
713       op->state->remote_bf
714         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
715                                              bf_size,
716                                              bf_bits_per_element);
717       op->state->salt = ntohl (msg->sender_mutator);
718       op->spec->remote_element_count = ntohl (msg->sender_element_count);
719       process_bf (op);
720       return;
721     }
722     /* multipart chunk */
723     if (NULL == op->state->bf_data)
724     {
725       /* first chunk, initialize */
726       op->state->bf_data = GNUNET_malloc (bf_size);
727       op->state->bf_data_size = bf_size;
728       op->state->bf_bits_per_element = bf_bits_per_element;
729       op->state->bf_data_offset = 0;
730       op->state->salt = ntohl (msg->sender_mutator);
731       op->spec->remote_element_count = ntohl (msg->sender_element_count);
732     }
733     else
734     {
735       /* increment */
736       if ( (op->state->bf_data_size != bf_size) ||
737            (op->state->bf_bits_per_element != bf_bits_per_element) ||
738            (op->state->bf_data_offset + chunk_size > bf_size) ||
739            (op->state->salt != ntohl (msg->sender_mutator)) ||
740            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
741       {
742         GNUNET_break_op (0);
743         fail_intersection_operation (op);
744         return;
745       }
746     }
747     memcpy (&op->state->bf_data[op->state->bf_data_offset],
748             (const char*) &msg[1],
749             chunk_size);
750     op->state->bf_data_offset += chunk_size;
751     if (op->state->bf_data_offset == bf_size)
752     {
753       /* last chunk, run! */
754       op->state->remote_bf
755         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
756                                              bf_size,
757                                              bf_bits_per_element);
758       GNUNET_free (op->state->bf_data);
759       op->state->bf_data = NULL;
760       op->state->bf_data_size = 0;
761       process_bf (op);
762     }
763     break;
764   default:
765     GNUNET_break_op (0);
766     fail_intersection_operation (op);
767     break;
768   }
769 }
770
771
772 /**
773  * Fills the "my_elements" hashmap with the initial set of
774  * (non-deleted) elements from the set of the specification.
775  *
776  * @param cls closure with the `struct Operation *`
777  * @param key current key code for the element
778  * @param value value in the hash map with the `struct ElementEntry *`
779  * @return #GNUNET_YES (we should continue to iterate)
780  */
781 static int
782 initialize_map_unfiltered (void *cls,
783                            const struct GNUNET_HashCode *key,
784                            void *value)
785 {
786   struct ElementEntry *ee = value;
787   struct Operation *op = cls;
788
789   if ( (op->generation_created < ee->generation_removed) &&
790        (op->generation_created >= ee->generation_added) )
791     return GNUNET_YES; /* element not live in operation's generation */
792   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
793                           &ee->element_hash,
794                           &op->state->my_xor);
795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796               "Initial full initialization of my_elements, adding %s:%u\n",
797               GNUNET_h2s (&ee->element_hash),
798               ee->element.size);
799   GNUNET_break (GNUNET_YES ==
800                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
801                                                    &ee->element_hash,
802                                                    ee,
803                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
804   return GNUNET_YES;
805 }
806
807
808 /**
809  * Send our element count to the peer, in case our element count is
810  * lower than his.
811  *
812  * @param op intersection operation
813  */
814 static void
815 send_element_count (struct Operation *op)
816 {
817   struct GNUNET_MQ_Envelope *ev;
818   struct IntersectionElementInfoMessage *msg;
819
820   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821               "Sending our element count (%u)\n",
822               op->state->my_element_count);
823   ev = GNUNET_MQ_msg (msg,
824                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
825   msg->sender_element_count = htonl (op->state->my_element_count);
826   GNUNET_MQ_send (op->mq, ev);
827 }
828
829
830 /**
831  * We go first, initialize our map with all elements and
832  * send the first Bloom filter.
833  *
834  * @param op operation to start exchange for
835  */
836 static void
837 begin_bf_exchange (struct Operation *op)
838 {
839   op->state->phase = PHASE_BF_EXCHANGE;
840   op->state->my_elements
841     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
842                                             GNUNET_YES);
843   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
844                                          &initialize_map_unfiltered,
845                                          op);
846   send_bloomfilter (op);
847 }
848
849
850 /**
851  * Handle the initial `struct IntersectionElementInfoMessage` from a
852  * remote peer.
853  *
854  * @param cls the intersection operation
855  * @param mh the header of the message
856  */
857 static void
858 handle_p2p_element_info (void *cls,
859                          const struct GNUNET_MessageHeader *mh)
860 {
861   struct Operation *op = cls;
862   const struct IntersectionElementInfoMessage *msg;
863
864   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
865   {
866     GNUNET_break_op (0);
867     fail_intersection_operation(op);
868     return;
869   }
870   msg = (const struct IntersectionElementInfoMessage *) mh;
871   op->spec->remote_element_count = ntohl (msg->sender_element_count);
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Received remote element count (%u), I have %u\n",
874               op->spec->remote_element_count,
875               op->state->my_element_count);
876   if ( ( (PHASE_INITIAL != op->state->phase) &&
877          (PHASE_COUNT_SENT != op->state->phase) ) ||
878        (op->state->my_element_count > op->spec->remote_element_count) ||
879        (0 == op->state->my_element_count) ||
880        (0 == op->spec->remote_element_count) )
881   {
882     GNUNET_break_op (0);
883     fail_intersection_operation(op);
884     return;
885   }
886   GNUNET_break (NULL == op->state->remote_bf);
887   begin_bf_exchange (op);
888 }
889
890
891 /**
892  * Send a result message to the client indicating that the operation
893  * is over.  After the result done message has been sent to the
894  * client, destroy the evaluate operation.
895  *
896  * @param op intersection operation
897  */
898 static void
899 finish_and_destroy (struct Operation *op)
900 {
901   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
902
903   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
904   {
905     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906                 "Sending full result set (%u elements)\n",
907                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
908     op->state->full_result_iter
909       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
910     op->keep++;
911     send_remaining_elements (op);
912     return;
913   }
914   send_client_done_and_destroy (op);
915 }
916
917
918 /**
919  * Remove all elements from our hashmap.
920  *
921  * @param cls closure with the `struct Operation *`
922  * @param key current key code
923  * @param value value in the hash map
924  * @return #GNUNET_YES (we should continue to iterate)
925  */
926 static int
927 filter_all (void *cls,
928             const struct GNUNET_HashCode *key,
929             void *value)
930 {
931   struct Operation *op = cls;
932   struct ElementEntry *ee = value;
933
934   GNUNET_break (0 < op->state->my_element_count);
935   op->state->my_element_count--;
936   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
937                           &ee->element_hash,
938                           &op->state->my_xor);
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "Final reduction of my_elements, removing %s:%u\n",
941               GNUNET_h2s (&ee->element_hash),
942               ee->element.size);
943   GNUNET_assert (GNUNET_YES ==
944                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
945                                                        &ee->element_hash,
946                                                        ee));
947   send_client_removed_element (op,
948                                &ee->element);
949   return GNUNET_YES;
950 }
951
952
953 /**
954  * Handle a done message from a remote peer
955  *
956  * @param cls the intersection operation
957  * @param mh the message
958  */
959 static void
960 handle_p2p_done (void *cls,
961                  const struct GNUNET_MessageHeader *mh)
962 {
963   struct Operation *op = cls;
964   const struct IntersectionDoneMessage *idm;
965
966   if (PHASE_BF_EXCHANGE != op->state->phase)
967   {
968     /* wrong phase to conclude? FIXME: Or should we allow this
969        if the other peer has _initially_ already an empty set? */
970     GNUNET_break_op (0);
971     fail_intersection_operation (op);
972     return;
973   }
974   if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
975   {
976     GNUNET_break_op (0);
977     fail_intersection_operation (op);
978     return;
979   }
980   idm = (const struct IntersectionDoneMessage *) mh;
981   if (0 == ntohl (idm->final_element_count))
982   {
983     /* other peer determined empty set is the intersection,
984        remove all elements */
985     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
986                                            &filter_all,
987                                            op);
988   }
989   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
990        (0 != memcmp (&op->state->my_xor,
991                      &idm->element_xor_hash,
992                      sizeof (struct GNUNET_HashCode))) )
993   {
994     /* Other peer thinks we are done, but we disagree on the result! */
995     GNUNET_break_op (0);
996     fail_intersection_operation (op);
997     return;
998   }
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1001               op->state->my_element_count);
1002   op->state->phase = PHASE_FINISHED;
1003   finish_and_destroy (op);
1004 }
1005
1006
1007 /**
1008  * Initiate a set intersection operation with a remote peer.
1009  *
1010  * @param op operation that is created, should be initialized to
1011  *        begin the evaluation
1012  * @param opaque_context message to be transmitted to the listener
1013  *        to convince him to accept, may be NULL
1014  */
1015 static void
1016 intersection_evaluate (struct Operation *op,
1017                        const struct GNUNET_MessageHeader *opaque_context)
1018 {
1019   struct GNUNET_MQ_Envelope *ev;
1020   struct OperationRequestMessage *msg;
1021
1022   op->state = GNUNET_new (struct OperationState);
1023   /* we started the operation, thus we have to send the operation request */
1024   op->state->phase = PHASE_INITIAL;
1025   op->state->my_element_count = op->spec->set->state->current_set_element_count;
1026
1027   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028               "Initiating intersection operation evaluation\n");
1029   ev = GNUNET_MQ_msg_nested_mh (msg,
1030                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1031                                 opaque_context);
1032   if (NULL == ev)
1033   {
1034     /* the context message is too large!? */
1035     GNUNET_break (0);
1036     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1037     return;
1038   }
1039   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1040   msg->app_id = op->spec->app_id;
1041   msg->element_count = htonl (op->state->my_element_count);
1042   GNUNET_MQ_send (op->mq,
1043                   ev);
1044   op->state->phase = PHASE_COUNT_SENT;
1045   if (NULL != opaque_context)
1046     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                 "Sent op request with context message\n");
1048   else
1049     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1050                 "Sent op request without context message\n");
1051 }
1052
1053
1054 /**
1055  * Accept an intersection operation request from a remote peer.  Only
1056  * initializes the private operation state.
1057  *
1058  * @param op operation that will be accepted as an intersection operation
1059  */
1060 static void
1061 intersection_accept (struct Operation *op)
1062 {
1063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064               "Accepting set intersection operation\n");
1065   op->state = GNUNET_new (struct OperationState);
1066   op->state->phase = PHASE_INITIAL;
1067   op->state->my_element_count
1068     = op->spec->set->state->current_set_element_count;
1069   op->state->my_elements
1070     = GNUNET_CONTAINER_multihashmap_create
1071     (GNUNET_MIN (op->state->my_element_count,
1072                  op->spec->remote_element_count),
1073      GNUNET_YES);
1074   if (op->spec->remote_element_count < op->state->my_element_count)
1075   {
1076     /* If the other peer (Alice) has fewer elements than us (Bob),
1077        we just send the count as Alice should send the first BF */
1078     send_element_count (op);
1079     op->state->phase = PHASE_COUNT_SENT;
1080     return;
1081   }
1082   /* We have fewer elements, so we start with the BF */
1083   begin_bf_exchange (op);
1084 }
1085
1086
1087 /**
1088  * Dispatch messages for a intersection operation.
1089  *
1090  * @param op the state of the intersection evaluate operation
1091  * @param mh the received message
1092  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1093  *         #GNUNET_OK otherwise
1094  */
1095 static int
1096 intersection_handle_p2p_message (struct Operation *op,
1097                                  const struct GNUNET_MessageHeader *mh)
1098 {
1099   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1100               "Received p2p message (t: %u, s: %u)\n",
1101               ntohs (mh->type), ntohs (mh->size));
1102   switch (ntohs (mh->type))
1103   {
1104     /* this message handler is not active until after we received an
1105      * operation request message, thus the ops request is not handled here
1106      */
1107   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1108     handle_p2p_element_info (op, mh);
1109     break;
1110   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1111     handle_p2p_bf (op, mh);
1112     break;
1113   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1114     handle_p2p_done (op, mh);
1115     break;
1116   default:
1117     /* something wrong with cadet's message handlers? */
1118     GNUNET_assert (0);
1119   }
1120   return GNUNET_OK;
1121 }
1122
1123
1124 /**
1125  * Handler for peer-disconnects, notifies the client about the aborted
1126  * operation.  If we did not expect anything from the other peer, we
1127  * gracefully terminate the operation.
1128  *
1129  * @param op the destroyed operation
1130  */
1131 static void
1132 intersection_peer_disconnect (struct Operation *op)
1133 {
1134   if (PHASE_FINISHED != op->state->phase)
1135   {
1136     fail_intersection_operation (op);
1137     return;
1138   }
1139   /* the session has already been concluded */
1140   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1141               "Other peer disconnected (finished)\n");
1142   if (GNUNET_NO == op->state->client_done_sent)
1143     finish_and_destroy (op);
1144 }
1145
1146
1147 /**
1148  * Destroy the intersection operation.  Only things specific to the
1149  * intersection operation are destroyed.
1150  *
1151  * @param op intersection operation to destroy
1152  */
1153 static void
1154 intersection_op_cancel (struct Operation *op)
1155 {
1156   /* check if the op was canceled twice */
1157   GNUNET_assert (NULL != op->state);
1158   if (NULL != op->state->remote_bf)
1159   {
1160     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1161     op->state->remote_bf = NULL;
1162   }
1163   if (NULL != op->state->local_bf)
1164   {
1165     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1166     op->state->local_bf = NULL;
1167   }
1168   if (NULL != op->state->my_elements)
1169   {
1170     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1171     op->state->my_elements = NULL;
1172   }
1173   GNUNET_free (op->state);
1174   op->state = NULL;
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Destroying intersection op state done\n");
1177 }
1178
1179
1180 /**
1181  * Create a new set supporting the intersection operation.
1182  *
1183  * @return the newly created set
1184  */
1185 static struct SetState *
1186 intersection_set_create ()
1187 {
1188   struct SetState *set_state;
1189
1190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191               "Intersection set created\n");
1192   set_state = GNUNET_new (struct SetState);
1193   set_state->current_set_element_count = 0;
1194
1195   return set_state;
1196 }
1197
1198
1199 /**
1200  * Add the element from the given element message to the set.
1201  *
1202  * @param set_state state of the set want to add to
1203  * @param ee the element to add to the set
1204  */
1205 static void
1206 intersection_add (struct SetState *set_state,
1207                   struct ElementEntry *ee)
1208 {
1209   set_state->current_set_element_count++;
1210 }
1211
1212
1213 /**
1214  * Destroy a set that supports the intersection operation
1215  *
1216  * @param set_state the set to destroy
1217  */
1218 static void
1219 intersection_set_destroy (struct SetState *set_state)
1220 {
1221   GNUNET_free (set_state);
1222 }
1223
1224
1225 /**
1226  * Remove the element given in the element message from the set.
1227  *
1228  * @param set_state state of the set to remove from
1229  * @param element set element to remove
1230  */
1231 static void
1232 intersection_remove (struct SetState *set_state,
1233                      struct ElementEntry *element)
1234 {
1235   GNUNET_assert (0 < set_state->current_set_element_count);
1236   set_state->current_set_element_count--;
1237 }
1238
1239
1240 /**
1241  * Get the table with implementing functions for set intersection.
1242  *
1243  * @return the operation specific VTable
1244  */
1245 const struct SetVT *
1246 _GSS_intersection_vt ()
1247 {
1248   static const struct SetVT intersection_vt = {
1249     .create = &intersection_set_create,
1250     .msg_handler = &intersection_handle_p2p_message,
1251     .add = &intersection_add,
1252     .remove = &intersection_remove,
1253     .destroy_set = &intersection_set_destroy,
1254     .evaluate = &intersection_evaluate,
1255     .accept = &intersection_accept,
1256     .peer_disconnect = &intersection_peer_disconnect,
1257     .cancel = &intersection_op_cancel,
1258   };
1259
1260   return &intersection_vt;
1261 }