ed29033b451e3f9fd529a54f0b408ed2b10eac35
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * We are just starting.
41    */
42   PHASE_INITIAL,
43
44   /**
45    * We have send the number of our elements to the other
46    * peer, but did not setup our element set yet.
47    */
48   PHASE_COUNT_SENT,
49
50   /**
51    * We have initialized our set and are now reducing it by exchanging
52    * Bloom filters until one party notices the their element hashes
53    * are equal.
54    */
55   PHASE_BF_EXCHANGE,
56
57   /**
58    * The protocol is over.  Results may still have to be sent to the
59    * client.
60    */
61   PHASE_FINISHED
62 };
63
64
65 /**
66  * State of an evaluate operation with another peer.
67  */
68 struct OperationState
69 {
70   /**
71    * The bf we currently receive
72    */
73   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
74
75   /**
76    * BF of the set's element.
77    */
78   struct GNUNET_CONTAINER_BloomFilter *local_bf;
79
80   /**
81    * Remaining elements in the intersection operation.
82    * Maps element-id-hashes to 'elements in our set'.
83    */
84   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
85
86   /**
87    * Iterator for sending the final set of @e my_elements to the client.
88    */
89   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
90
91   /**
92    * Evaluate operations are held in a linked list.
93    */
94   struct OperationState *next;
95
96   /**
97    * Evaluate operations are held in a linked list.
98    */
99   struct OperationState *prev;
100
101   /**
102    * For multipart BF transmissions, we have to store the
103    * bloomfilter-data until we fully received it.
104    */
105   char *bf_data;
106
107   /**
108    * XOR of the keys of all of the elements (remaining) in my set.
109    * Always updated when elements are added or removed to
110    * @e my_elements.
111    */
112   struct GNUNET_HashCode my_xor;
113
114   /**
115    * XOR of the keys of all of the elements (remaining) in
116    * the other peer's set.  Updated when we receive the
117    * other peer's Bloom filter.
118    */
119   struct GNUNET_HashCode other_xor;
120
121   /**
122    * How many bytes of @e bf_data are valid?
123    */
124   uint32_t bf_data_offset;
125
126   /**
127    * Current element count contained within @e my_elements.
128    * (May differ briefly during initialization.)
129    */
130   uint32_t my_element_count;
131
132   /**
133    * size of the bloomfilter in @e bf_data.
134    */
135   uint32_t bf_data_size;
136
137   /**
138    * size of the bloomfilter
139    */
140   uint32_t bf_bits_per_element;
141
142   /**
143    * Salt currently used for BF construction (by us or the other peer,
144    * depending on where we are in the code).
145    */
146   uint32_t salt;
147
148   /**
149    * Current state of the operation.
150    */
151   enum IntersectionOperationPhase phase;
152
153   /**
154    * Generation in which the operation handle
155    * was created.
156    */
157   unsigned int generation_created;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163 };
164
165
166 /**
167  * Extra state required for efficient set intersection.
168  * Merely tracks the total number of elements.
169  */
170 struct SetState
171 {
172   /**
173    * Number of currently valid elements in the set which have not been
174    * removed.
175    */
176   uint32_t current_set_element_count;
177 };
178
179
180 /**
181  * If applicable in the current operation mode, send a result message
182  * to the client indicating we removed an element.
183  *
184  * @param op intersection operation
185  * @param element element to send
186  */
187 static void
188 send_client_removed_element (struct Operation *op,
189                              struct GNUNET_SET_Element *element)
190 {
191   struct GNUNET_MQ_Envelope *ev;
192   struct GNUNET_SET_ResultMessage *rm;
193
194   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
195     return; /* Wrong mode for transmitting removed elements */
196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
197               "Sending removed element (size %u) to client\n",
198               element->size);
199   GNUNET_assert (0 != op->spec->client_request_id);
200   ev = GNUNET_MQ_msg_extra (rm,
201                             element->size,
202                             GNUNET_MESSAGE_TYPE_SET_RESULT);
203   if (NULL == ev)
204   {
205     GNUNET_break (0);
206     return;
207   }
208   rm->result_status = htons (GNUNET_SET_STATUS_OK);
209   rm->request_id = htonl (op->spec->client_request_id);
210   rm->element_type = element->element_type;
211   memcpy (&rm[1],
212           element->data,
213           element->size);
214   GNUNET_MQ_send (op->spec->set->client_mq,
215                   ev);
216 }
217
218
219 /**
220  * Fills the "my_elements" hashmap with all relevant elements.
221  *
222  * @param cls the `struct Operation *` we are performing
223  * @param key current key code
224  * @param value the `struct ElementEntry *` from the hash map
225  * @return #GNUNET_YES (we should continue to iterate)
226  */
227 static int
228 filtered_map_initialization (void *cls,
229                              const struct GNUNET_HashCode *key,
230                              void *value)
231 {
232   struct Operation *op = cls;
233   struct ElementEntry *ee = value;
234   struct GNUNET_HashCode mutated_hash;
235
236
237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238               "FIMA called for %s:%u\n",
239               GNUNET_h2s (&ee->element_hash),
240               ee->element.size);
241
242   if ( (op->generation_created < ee->generation_removed) &&
243        (op->generation_created >= ee->generation_added) )
244   {
245     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
247                 GNUNET_h2s (&ee->element_hash),
248                 ee->element.size);
249     return GNUNET_YES; /* element not valid in our operation's generation */
250   }
251
252   /* Test if element is in other peer's bloomfilter */
253   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
254                             op->state->salt,
255                             &mutated_hash);
256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257               "Testing mingled hash %s with salt %u\n",
258               GNUNET_h2s (&mutated_hash),
259               op->state->salt);
260   if (GNUNET_NO ==
261       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
262                                          &mutated_hash))
263   {
264     /* remove this element */
265     send_client_removed_element (op,
266                                  &ee->element);
267     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
268                 "Reduced initialization, not starting with %s:%u\n",
269                 GNUNET_h2s (&ee->element_hash),
270                 ee->element.size);
271     return GNUNET_YES;
272   }
273   op->state->my_element_count++;
274   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
275                           &ee->element_hash,
276                           &op->state->my_xor);
277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278               "Filtered initialization of my_elements, adding %s:%u\n",
279               GNUNET_h2s (&ee->element_hash),
280               ee->element.size);
281   GNUNET_break (GNUNET_YES ==
282                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
283                                                    &ee->element_hash,
284                                                    ee,
285                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
286
287   return GNUNET_YES;
288 }
289
290
291 /**
292  * Removes elements from our hashmap if they are not contained within the
293  * provided remote bloomfilter.
294  *
295  * @param cls closure with the `struct Operation *`
296  * @param key current key code
297  * @param value value in the hash map
298  * @return #GNUNET_YES (we should continue to iterate)
299  */
300 static int
301 iterator_bf_reduce (void *cls,
302                    const struct GNUNET_HashCode *key,
303                    void *value)
304 {
305   struct Operation *op = cls;
306   struct ElementEntry *ee = value;
307   struct GNUNET_HashCode mutated_hash;
308
309   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
310                             op->state->salt,
311                             &mutated_hash);
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Testing mingled hash %s with salt %u\n",
314               GNUNET_h2s (&mutated_hash),
315               op->state->salt);
316   if (GNUNET_NO ==
317       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
318                                          &mutated_hash))
319   {
320     GNUNET_break (0 < op->state->my_element_count);
321     op->state->my_element_count--;
322     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
323                             &ee->element_hash,
324                             &op->state->my_xor);
325     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326                 "Bloom filter reduction of my_elements, removing %s:%u\n",
327                 GNUNET_h2s (&ee->element_hash),
328                 ee->element.size);
329     GNUNET_assert (GNUNET_YES ==
330                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
331                                                          &ee->element_hash,
332                                                          ee));
333     send_client_removed_element (op,
334                                  &ee->element);
335   }
336   else
337   {
338     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
340                 GNUNET_h2s (&ee->element_hash),
341                 ee->element.size);
342   }
343   return GNUNET_YES;
344 }
345
346
347 /**
348  * Create initial bloomfilter based on all the elements given.
349  *
350  * @param cls the `struct Operation *`
351  * @param key current key code
352  * @param value the `struct ElementEntry` to process
353  * @return #GNUNET_YES (we should continue to iterate)
354  */
355 static int
356 iterator_bf_create (void *cls,
357                     const struct GNUNET_HashCode *key,
358                     void *value)
359 {
360   struct Operation *op = cls;
361   struct ElementEntry *ee = value;
362   struct GNUNET_HashCode mutated_hash;
363
364   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
365                             op->state->salt,
366                             &mutated_hash);
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Initializing BF with hash %s with salt %u\n",
369               GNUNET_h2s (&mutated_hash),
370               op->state->salt);
371   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
372                                     &mutated_hash);
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Inform the client that the intersection operation has failed,
379  * and proceed to destroy the evaluate operation.
380  *
381  * @param op the intersection operation to fail
382  */
383 static void
384 fail_intersection_operation (struct Operation *op)
385 {
386   struct GNUNET_MQ_Envelope *ev;
387   struct GNUNET_SET_ResultMessage *msg;
388
389   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
390               "Intersection operation failed\n");
391   if (NULL != op->state->my_elements)
392   {
393     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
394     op->state->my_elements = NULL;
395   }
396   ev = GNUNET_MQ_msg (msg,
397                       GNUNET_MESSAGE_TYPE_SET_RESULT);
398   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
399   msg->request_id = htonl (op->spec->client_request_id);
400   msg->element_type = htons (0);
401   GNUNET_MQ_send (op->spec->set->client_mq,
402                   ev);
403   _GSS_operation_destroy (op,
404                           GNUNET_YES);
405 }
406
407
408 /**
409  * Send a bloomfilter to our peer.  After the result done message has
410  * been sent to the client, destroy the evaluate operation.
411  *
412  * @param op intersection operation
413  */
414 static void
415 send_bloomfilter (struct Operation *op)
416 {
417   struct GNUNET_MQ_Envelope *ev;
418   struct BFMessage *msg;
419   uint32_t bf_size;
420   uint32_t bf_elementbits;
421   uint32_t chunk_size;
422   char *bf_data;
423   uint32_t offset;
424
425   /* We consider the ratio of the set sizes to determine
426      the number of bits per element, as the smaller set
427      should use more bits to maximize its set reduction
428      potential and minimize overall bandwidth consumption. */
429   bf_elementbits = 2 + ceil (log2((double)
430                              (op->spec->remote_element_count /
431                                    (double) op->state->my_element_count)));
432   if (bf_elementbits < 1)
433     bf_elementbits = 1; /* make sure k is not 0 */
434   /* optimize BF-size to ~50% of bits set */
435   bf_size = ceil ((double) (op->state->my_element_count
436                             * bf_elementbits / log(2)));
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Sending Bloom filter (%u) of size %u bytes\n",
439               (unsigned int) bf_elementbits,
440               (unsigned int) bf_size);
441   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
442                                                            bf_size,
443                                                            bf_elementbits);
444   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
445                                               UINT32_MAX);
446   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
447                                          &iterator_bf_create,
448                                          op);
449
450   /* send our Bloom filter */
451   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
452   if (bf_size <= chunk_size)
453   {
454     /* singlepart */
455     chunk_size = bf_size;
456     ev = GNUNET_MQ_msg_extra (msg,
457                               chunk_size,
458                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
459     GNUNET_assert (GNUNET_SYSERR !=
460                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
461                                                               (char*) &msg[1],
462                                                               bf_size));
463     msg->sender_element_count = htonl (op->state->my_element_count);
464     msg->bloomfilter_total_length = htonl (bf_size);
465     msg->bits_per_element = htonl (bf_elementbits);
466     msg->sender_mutator = htonl (op->state->salt);
467     msg->element_xor_hash = op->state->my_xor;
468     GNUNET_MQ_send (op->mq, ev);
469   }
470   else
471   {
472     /* multipart */
473     bf_data = GNUNET_malloc (bf_size);
474     GNUNET_assert (GNUNET_SYSERR !=
475                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
476                                                               bf_data,
477                                                               bf_size));
478     offset = 0;
479     while (offset < bf_size)
480     {
481       if (bf_size - chunk_size < offset)
482         chunk_size = bf_size - offset;
483       ev = GNUNET_MQ_msg_extra (msg,
484                                 chunk_size,
485                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
486       memcpy (&msg[1],
487               &bf_data[offset],
488               chunk_size);
489       offset += chunk_size;
490       msg->sender_element_count = htonl (op->state->my_element_count);
491       msg->bloomfilter_total_length = htonl (bf_size);
492       msg->bits_per_element = htonl (bf_elementbits);
493       msg->sender_mutator = htonl (op->state->salt);
494       msg->element_xor_hash = op->state->my_xor;
495       GNUNET_MQ_send (op->mq, ev);
496     }
497     GNUNET_free (bf_data);
498   }
499   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
500   op->state->local_bf = NULL;
501 }
502
503
504 /**
505  * Signal to the client that the operation has finished and
506  * destroy the operation.
507  *
508  * @param cls operation to destroy
509  */
510 static void
511 send_client_done_and_destroy (void *cls)
512 {
513   struct Operation *op = cls;
514   struct GNUNET_MQ_Envelope *ev;
515   struct GNUNET_SET_ResultMessage *rm;
516
517   ev = GNUNET_MQ_msg (rm,
518                       GNUNET_MESSAGE_TYPE_SET_RESULT);
519   rm->request_id = htonl (op->spec->client_request_id);
520   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
521   rm->element_type = htons (0);
522   GNUNET_MQ_send (op->spec->set->client_mq,
523                   ev);
524   _GSS_operation_destroy (op,
525                           GNUNET_YES);
526 }
527
528
529 /**
530  * Send all elements in the full result iterator.
531  *
532  * @param cls the `struct Operation *`
533  */
534 static void
535 send_remaining_elements (void *cls)
536 {
537   struct Operation *op = cls;
538   const void *nxt;
539   const struct ElementEntry *ee;
540   struct GNUNET_MQ_Envelope *ev;
541   struct GNUNET_SET_ResultMessage *rm;
542   const struct GNUNET_SET_Element *element;
543   int res;
544
545   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
546                                                      NULL,
547                                                      &nxt);
548   if (GNUNET_NO == res)
549   {
550     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551                 "Sending done and destroy because iterator ran out\n");
552     send_client_done_and_destroy (op);
553     return;
554   }
555   ee = nxt;
556   element = &ee->element;
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               "Sending element (size %u) to client (full set)\n",
559               element->size);
560   GNUNET_assert (0 != op->spec->client_request_id);
561   ev = GNUNET_MQ_msg_extra (rm,
562                             element->size,
563                             GNUNET_MESSAGE_TYPE_SET_RESULT);
564   GNUNET_assert (NULL != ev);
565   rm->result_status = htons (GNUNET_SET_STATUS_OK);
566   rm->request_id = htonl (op->spec->client_request_id);
567   rm->element_type = element->element_type;
568   memcpy (&rm[1],
569           element->data,
570           element->size);
571   GNUNET_MQ_notify_sent (ev,
572                          &send_remaining_elements,
573                          op);
574   GNUNET_MQ_send (op->spec->set->client_mq,
575                   ev);
576 }
577
578
579 /**
580  * Inform the peer that this operation is complete.
581  *
582  * @param op the intersection operation to fail
583  */
584 static void
585 send_peer_done (struct Operation *op)
586 {
587   struct GNUNET_MQ_Envelope *ev;
588   struct IntersectionDoneMessage *idm;
589
590   op->state->phase = PHASE_FINISHED;
591   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592               "Intersection succeeded, sending DONE\n");
593   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
594   op->state->local_bf = NULL;
595
596   ev = GNUNET_MQ_msg (idm,
597                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
598   idm->final_element_count = htonl (op->state->my_element_count);
599   idm->element_xor_hash = op->state->my_xor;
600   GNUNET_MQ_send (op->mq,
601                   ev);
602 }
603
604
605 /**
606  * Process a Bloomfilter once we got all the chunks.
607  *
608  * @param op the intersection operation
609  */
610 static void
611 process_bf (struct Operation *op)
612 {
613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
615               op->state->phase,
616               op->spec->remote_element_count,
617               op->state->my_element_count,
618               GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements));
619   switch (op->state->phase)
620   {
621   case PHASE_INITIAL:
622     GNUNET_break_op (0);
623     fail_intersection_operation(op);
624     return;
625   case PHASE_COUNT_SENT:
626     /* This is the first BF being sent, build our initial map with
627        filtering in place */
628     op->state->my_elements
629       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
630                                               GNUNET_YES);
631     op->state->my_element_count = 0;
632     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
633                                            &filtered_map_initialization,
634                                            op);
635     break;
636   case PHASE_BF_EXCHANGE:
637     /* Update our set by reduction */
638     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
639                                            &iterator_bf_reduce,
640                                            op);
641     break;
642   case PHASE_FINISHED:
643     GNUNET_break_op (0);
644     fail_intersection_operation(op);
645     return;
646   }
647   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
648   op->state->remote_bf = NULL;
649
650   if ( (0 == op->state->my_element_count) || /* fully disjoint */
651        ( (op->state->my_element_count == op->spec->remote_element_count) &&
652          (0 == memcmp (&op->state->my_xor,
653                        &op->state->other_xor,
654                        sizeof (struct GNUNET_HashCode))) ) )
655   {
656     /* we are done */
657     send_peer_done (op);
658     return;
659   }
660   op->state->phase = PHASE_BF_EXCHANGE;
661   send_bloomfilter (op);
662 }
663
664
665 /**
666  * Handle an BF message from a remote peer.
667  *
668  * @param cls the intersection operation
669  * @param mh the header of the message
670  */
671 static void
672 handle_p2p_bf (void *cls,
673                const struct GNUNET_MessageHeader *mh)
674 {
675   struct Operation *op = cls;
676   const struct BFMessage *msg;
677   uint32_t bf_size;
678   uint32_t chunk_size;
679   uint32_t bf_bits_per_element;
680   uint16_t msize;
681
682   msize = htons (mh->size);
683   if (msize < sizeof (struct BFMessage))
684   {
685     GNUNET_break_op (0);
686     fail_intersection_operation (op);
687     return;
688   }
689   msg = (const struct BFMessage *) mh;
690   switch (op->state->phase)
691   {
692   case PHASE_INITIAL:
693     GNUNET_break_op (0);
694     fail_intersection_operation (op);
695     break;
696   case PHASE_COUNT_SENT:
697   case PHASE_BF_EXCHANGE:
698     bf_size = ntohl (msg->bloomfilter_total_length);
699     bf_bits_per_element = ntohl (msg->bits_per_element);
700     chunk_size = msize - sizeof (struct BFMessage);
701     op->state->other_xor = msg->element_xor_hash;
702     if (bf_size == chunk_size)
703     {
704       if (NULL != op->state->bf_data)
705       {
706         GNUNET_break_op (0);
707         fail_intersection_operation (op);
708         return;
709       }
710       /* single part, done here immediately */
711       op->state->remote_bf
712         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
713                                              bf_size,
714                                              bf_bits_per_element);
715       op->state->salt = ntohl (msg->sender_mutator);
716       op->spec->remote_element_count = ntohl (msg->sender_element_count);
717       process_bf (op);
718       return;
719     }
720     /* multipart chunk */
721     if (NULL == op->state->bf_data)
722     {
723       /* first chunk, initialize */
724       op->state->bf_data = GNUNET_malloc (bf_size);
725       op->state->bf_data_size = bf_size;
726       op->state->bf_bits_per_element = bf_bits_per_element;
727       op->state->bf_data_offset = 0;
728       op->state->salt = ntohl (msg->sender_mutator);
729       op->spec->remote_element_count = ntohl (msg->sender_element_count);
730     }
731     else
732     {
733       /* increment */
734       if ( (op->state->bf_data_size != bf_size) ||
735            (op->state->bf_bits_per_element != bf_bits_per_element) ||
736            (op->state->bf_data_offset + chunk_size > bf_size) ||
737            (op->state->salt != ntohl (msg->sender_mutator)) ||
738            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
739       {
740         GNUNET_break_op (0);
741         fail_intersection_operation (op);
742         return;
743       }
744     }
745     memcpy (&op->state->bf_data[op->state->bf_data_offset],
746             (const char*) &msg[1],
747             chunk_size);
748     op->state->bf_data_offset += chunk_size;
749     if (op->state->bf_data_offset == bf_size)
750     {
751       /* last chunk, run! */
752       op->state->remote_bf
753         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
754                                              bf_size,
755                                              bf_bits_per_element);
756       GNUNET_free (op->state->bf_data);
757       op->state->bf_data = NULL;
758       op->state->bf_data_size = 0;
759       process_bf (op);
760     }
761     break;
762   default:
763     GNUNET_break_op (0);
764     fail_intersection_operation (op);
765     break;
766   }
767 }
768
769
770 /**
771  * Fills the "my_elements" hashmap with the initial set of
772  * (non-deleted) elements from the set of the specification.
773  *
774  * @param cls closure with the `struct Operation *`
775  * @param key current key code for the element
776  * @param value value in the hash map with the `struct ElementEntry *`
777  * @return #GNUNET_YES (we should continue to iterate)
778  */
779 static int
780 initialize_map_unfiltered (void *cls,
781                            const struct GNUNET_HashCode *key,
782                            void *value)
783 {
784   struct ElementEntry *ee = value;
785   struct Operation *op = cls;
786
787   if ( (op->generation_created < ee->generation_removed) &&
788        (op->generation_created >= ee->generation_added) )
789     return GNUNET_YES; /* element not live in operation's generation */
790   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
791                           &ee->element_hash,
792                           &op->state->my_xor);
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Initial full initialization of my_elements, adding %s:%u\n",
795               GNUNET_h2s (&ee->element_hash),
796               ee->element.size);
797   GNUNET_break (GNUNET_YES ==
798                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
799                                                    &ee->element_hash,
800                                                    ee,
801                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
802   return GNUNET_YES;
803 }
804
805
806 /**
807  * Send our element count to the peer, in case our element count is
808  * lower than his.
809  *
810  * @param op intersection operation
811  */
812 static void
813 send_element_count (struct Operation *op)
814 {
815   struct GNUNET_MQ_Envelope *ev;
816   struct IntersectionElementInfoMessage *msg;
817
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "Sending our element count (%u)\n",
820               op->state->my_element_count);
821   ev = GNUNET_MQ_msg (msg,
822                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
823   msg->sender_element_count = htonl (op->state->my_element_count);
824   GNUNET_MQ_send (op->mq, ev);
825 }
826
827
828 /**
829  * We go first, initialize our map with all elements and
830  * send the first Bloom filter.
831  *
832  * @param op operation to start exchange for
833  */
834 static void
835 begin_bf_exchange (struct Operation *op)
836 {
837   op->state->phase = PHASE_BF_EXCHANGE;
838   op->state->my_elements
839     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
840                                             GNUNET_YES);
841   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
842                                          &initialize_map_unfiltered,
843                                          op);
844   send_bloomfilter (op);
845 }
846
847
848 /**
849  * Handle the initial `struct IntersectionElementInfoMessage` from a
850  * remote peer.
851  *
852  * @param cls the intersection operation
853  * @param mh the header of the message
854  */
855 static void
856 handle_p2p_element_info (void *cls,
857                          const struct GNUNET_MessageHeader *mh)
858 {
859   struct Operation *op = cls;
860   const struct IntersectionElementInfoMessage *msg;
861
862   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
863   {
864     GNUNET_break_op (0);
865     fail_intersection_operation(op);
866     return;
867   }
868   msg = (const struct IntersectionElementInfoMessage *) mh;
869   op->spec->remote_element_count = ntohl (msg->sender_element_count);
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Received remote element count (%u), I have %u\n",
872               op->spec->remote_element_count,
873               op->state->my_element_count);
874   if ( ( (PHASE_INITIAL != op->state->phase) &&
875          (PHASE_COUNT_SENT != op->state->phase) ) ||
876        (op->state->my_element_count > op->spec->remote_element_count) ||
877        (0 == op->state->my_element_count) ||
878        (0 == op->spec->remote_element_count) )
879   {
880     GNUNET_break_op (0);
881     fail_intersection_operation(op);
882     return;
883   }
884   GNUNET_break (NULL == op->state->remote_bf);
885   begin_bf_exchange (op);
886 }
887
888
889 /**
890  * Send a result message to the client indicating that the operation
891  * is over.  After the result done message has been sent to the
892  * client, destroy the evaluate operation.
893  *
894  * @param op intersection operation
895  */
896 static void
897 finish_and_destroy (struct Operation *op)
898 {
899   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
900
901   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
902   {
903     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                 "Sending full result set\n");
905     op->state->full_result_iter
906       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
907     send_remaining_elements (op);
908     return;
909   }
910   send_client_done_and_destroy (op);
911 }
912
913
914 /**
915  * Remove all elements from our hashmap.
916  *
917  * @param cls closure with the `struct Operation *`
918  * @param key current key code
919  * @param value value in the hash map
920  * @return #GNUNET_YES (we should continue to iterate)
921  */
922 static int
923 filter_all (void *cls,
924             const struct GNUNET_HashCode *key,
925             void *value)
926 {
927   struct Operation *op = cls;
928   struct ElementEntry *ee = value;
929
930   GNUNET_break (0 < op->state->my_element_count);
931   op->state->my_element_count--;
932   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
933                           &ee->element_hash,
934                           &op->state->my_xor);
935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936               "Final reduction of my_elements, removing %s:%u\n",
937               GNUNET_h2s (&ee->element_hash),
938               ee->element.size);
939   GNUNET_assert (GNUNET_YES ==
940                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
941                                                        &ee->element_hash,
942                                                        ee));
943   send_client_removed_element (op,
944                                &ee->element);
945   return GNUNET_YES;
946 }
947
948
949 /**
950  * Handle a done message from a remote peer
951  *
952  * @param cls the intersection operation
953  * @param mh the message
954  */
955 static void
956 handle_p2p_done (void *cls,
957                  const struct GNUNET_MessageHeader *mh)
958 {
959   struct Operation *op = cls;
960   const struct IntersectionDoneMessage *idm;
961
962   if (PHASE_BF_EXCHANGE != op->state->phase)
963   {
964     /* wrong phase to conclude? FIXME: Or should we allow this
965        if the other peer has _initially_ already an empty set? */
966     GNUNET_break_op (0);
967     fail_intersection_operation (op);
968     return;
969   }
970   if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
971   {
972     GNUNET_break_op (0);
973     fail_intersection_operation (op);
974     return;
975   }
976   idm = (const struct IntersectionDoneMessage *) mh;
977   if (0 == ntohl (idm->final_element_count))
978   {
979     /* other peer determined empty set is the intersection,
980        remove all elements */
981     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
982                                            &filter_all,
983                                            op);
984   }
985   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
986        (0 != memcmp (&op->state->my_xor,
987                      &idm->element_xor_hash,
988                      sizeof (struct GNUNET_HashCode))) )
989   {
990     /* Other peer thinks we are done, but we disagree on the result! */
991     GNUNET_break_op (0);
992     fail_intersection_operation (op);
993     return;
994   }
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996               "Got final DONE\n");
997   op->state->phase = PHASE_FINISHED;
998   finish_and_destroy (op);
999 }
1000
1001
1002 /**
1003  * Initiate a set intersection operation with a remote peer.
1004  *
1005  * @param op operation that is created, should be initialized to
1006  *        begin the evaluation
1007  * @param opaque_context message to be transmitted to the listener
1008  *        to convince him to accept, may be NULL
1009  */
1010 static void
1011 intersection_evaluate (struct Operation *op,
1012                        const struct GNUNET_MessageHeader *opaque_context)
1013 {
1014   struct GNUNET_MQ_Envelope *ev;
1015   struct OperationRequestMessage *msg;
1016
1017   op->state = GNUNET_new (struct OperationState);
1018   /* we started the operation, thus we have to send the operation request */
1019   op->state->phase = PHASE_INITIAL;
1020   op->state->my_element_count = op->spec->set->state->current_set_element_count;
1021
1022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023               "Initiating intersection operation evaluation\n");
1024   ev = GNUNET_MQ_msg_nested_mh (msg,
1025                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1026                                 opaque_context);
1027   if (NULL == ev)
1028   {
1029     /* the context message is too large!? */
1030     GNUNET_break (0);
1031     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1032     return;
1033   }
1034   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1035   msg->app_id = op->spec->app_id;
1036   msg->element_count = htonl (op->state->my_element_count);
1037   GNUNET_MQ_send (op->mq,
1038                   ev);
1039   if (NULL != opaque_context)
1040     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                 "Sent op request with context message\n");
1042   else
1043     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                 "Sent op request without context message\n");
1045 }
1046
1047
1048 /**
1049  * Accept an intersection operation request from a remote peer.  Only
1050  * initializes the private operation state.
1051  *
1052  * @param op operation that will be accepted as an intersection operation
1053  */
1054 static void
1055 intersection_accept (struct Operation *op)
1056 {
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Accepting set intersection operation\n");
1059   op->state = GNUNET_new (struct OperationState);
1060   op->state->phase = PHASE_INITIAL;
1061   op->state->my_element_count
1062     = op->spec->set->state->current_set_element_count;
1063   op->state->my_elements
1064     = GNUNET_CONTAINER_multihashmap_create
1065     (GNUNET_MIN (op->state->my_element_count,
1066                  op->spec->remote_element_count),
1067      GNUNET_YES);
1068   if (op->spec->remote_element_count < op->state->my_element_count)
1069   {
1070     /* If the other peer (Alice) has fewer elements than us (Bob),
1071        we just send the count as Alice should send the first BF */
1072     send_element_count (op);
1073     op->state->phase = PHASE_COUNT_SENT;
1074     return;
1075   }
1076   /* We have fewer elements, so we start with the BF */
1077   begin_bf_exchange (op);
1078 }
1079
1080
1081 /**
1082  * Dispatch messages for a intersection operation.
1083  *
1084  * @param op the state of the intersection evaluate operation
1085  * @param mh the received message
1086  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1087  *         #GNUNET_OK otherwise
1088  */
1089 static int
1090 intersection_handle_p2p_message (struct Operation *op,
1091                                  const struct GNUNET_MessageHeader *mh)
1092 {
1093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094               "Received p2p message (t: %u, s: %u)\n",
1095               ntohs (mh->type), ntohs (mh->size));
1096   switch (ntohs (mh->type))
1097   {
1098     /* this message handler is not active until after we received an
1099      * operation request message, thus the ops request is not handled here
1100      */
1101   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1102     handle_p2p_element_info (op, mh);
1103     break;
1104   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1105     handle_p2p_bf (op, mh);
1106     break;
1107   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1108     handle_p2p_done (op, mh);
1109     break;
1110   default:
1111     /* something wrong with cadet's message handlers? */
1112     GNUNET_assert (0);
1113   }
1114   return GNUNET_OK;
1115 }
1116
1117
1118 /**
1119  * Handler for peer-disconnects, notifies the client about the aborted
1120  * operation.  If we did not expect anything from the other peer, we
1121  * gracefully terminate the operation.
1122  *
1123  * @param op the destroyed operation
1124  */
1125 static void
1126 intersection_peer_disconnect (struct Operation *op)
1127 {
1128   if (PHASE_FINISHED != op->state->phase)
1129   {
1130     fail_intersection_operation (op);
1131     return;
1132   }
1133   /* the session has already been concluded */
1134   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135               "Other peer disconnected (finished)\n");
1136   if (GNUNET_NO == op->state->client_done_sent)
1137     finish_and_destroy (op);
1138 }
1139
1140
1141 /**
1142  * Destroy the intersection operation.  Only things specific to the
1143  * intersection operation are destroyed.
1144  *
1145  * @param op intersection operation to destroy
1146  */
1147 static void
1148 intersection_op_cancel (struct Operation *op)
1149 {
1150   /* check if the op was canceled twice */
1151   GNUNET_assert (NULL != op->state);
1152   if (NULL != op->state->remote_bf)
1153   {
1154     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1155     op->state->remote_bf = NULL;
1156   }
1157   if (NULL != op->state->local_bf)
1158   {
1159     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1160     op->state->local_bf = NULL;
1161   }
1162   if (NULL != op->state->my_elements)
1163   {
1164     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1165     op->state->my_elements = NULL;
1166   }
1167   GNUNET_free (op->state);
1168   op->state = NULL;
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Destroying intersection op state done\n");
1171 }
1172
1173
1174 /**
1175  * Create a new set supporting the intersection operation.
1176  *
1177  * @return the newly created set
1178  */
1179 static struct SetState *
1180 intersection_set_create ()
1181 {
1182   struct SetState *set_state;
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "Intersection set created\n");
1186   set_state = GNUNET_new (struct SetState);
1187   set_state->current_set_element_count = 0;
1188
1189   return set_state;
1190 }
1191
1192
1193 /**
1194  * Add the element from the given element message to the set.
1195  *
1196  * @param set_state state of the set want to add to
1197  * @param ee the element to add to the set
1198  */
1199 static void
1200 intersection_add (struct SetState *set_state,
1201                   struct ElementEntry *ee)
1202 {
1203   set_state->current_set_element_count++;
1204 }
1205
1206
1207 /**
1208  * Destroy a set that supports the intersection operation
1209  *
1210  * @param set_state the set to destroy
1211  */
1212 static void
1213 intersection_set_destroy (struct SetState *set_state)
1214 {
1215   GNUNET_free (set_state);
1216 }
1217
1218
1219 /**
1220  * Remove the element given in the element message from the set.
1221  *
1222  * @param set_state state of the set to remove from
1223  * @param element set element to remove
1224  */
1225 static void
1226 intersection_remove (struct SetState *set_state,
1227                      struct ElementEntry *element)
1228 {
1229   GNUNET_assert (0 < set_state->current_set_element_count);
1230   set_state->current_set_element_count--;
1231 }
1232
1233
1234 /**
1235  * Get the table with implementing functions for set intersection.
1236  *
1237  * @return the operation specific VTable
1238  */
1239 const struct SetVT *
1240 _GSS_intersection_vt ()
1241 {
1242   static const struct SetVT intersection_vt = {
1243     .create = &intersection_set_create,
1244     .msg_handler = &intersection_handle_p2p_message,
1245     .add = &intersection_add,
1246     .remove = &intersection_remove,
1247     .destroy_set = &intersection_set_destroy,
1248     .evaluate = &intersection_evaluate,
1249     .accept = &intersection_accept,
1250     .peer_disconnect = &intersection_peer_disconnect,
1251     .cancel = &intersection_op_cancel,
1252   };
1253
1254   return &intersection_vt;
1255 }