use c99
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013, 2014 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Current phase we are in for a intersection operation.
36  */
37 enum IntersectionOperationPhase
38 {
39   /**
40    * We are just starting.
41    */
42   PHASE_INITIAL,
43
44   /**
45    * We have send the number of our elements to the other
46    * peer, but did not setup our element set yet.
47    */
48   PHASE_COUNT_SENT,
49
50   /**
51    * We have initialized our set and are now reducing it by exchanging
52    * Bloom filters until one party notices the their element hashes
53    * are equal.
54    */
55   PHASE_BF_EXCHANGE,
56
57   /**
58    * The protocol is over.  Results may still have to be sent to the
59    * client.
60    */
61   PHASE_FINISHED
62
63 };
64
65
66 /**
67  * State of an evaluate operation with another peer.
68  */
69 struct OperationState
70 {
71   /**
72    * The bf we currently receive
73    */
74   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
75
76   /**
77    * BF of the set's element.
78    */
79   struct GNUNET_CONTAINER_BloomFilter *local_bf;
80
81   /**
82    * Remaining elements in the intersection operation.
83    * Maps element-id-hashes to 'elements in our set'.
84    */
85   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
86
87   /**
88    * Iterator for sending the final set of @e my_elements to the client.
89    */
90   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
91
92   /**
93    * Evaluate operations are held in a linked list.
94    */
95   struct OperationState *next;
96
97   /**
98    * Evaluate operations are held in a linked list.
99    */
100   struct OperationState *prev;
101
102   /**
103    * For multipart BF transmissions, we have to store the
104    * bloomfilter-data until we fully received it.
105    */
106   char *bf_data;
107
108   /**
109    * XOR of the keys of all of the elements (remaining) in my set.
110    * Always updated when elements are added or removed to
111    * @e my_elements.
112    */
113   struct GNUNET_HashCode my_xor;
114
115   /**
116    * XOR of the keys of all of the elements (remaining) in
117    * the other peer's set.  Updated when we receive the
118    * other peer's Bloom filter.
119    */
120   struct GNUNET_HashCode other_xor;
121
122   /**
123    * How many bytes of @e bf_data are valid?
124    */
125   uint32_t bf_data_offset;
126
127   /**
128    * Current element count contained within @e my_elements.
129    * (May differ briefly during initialization.)
130    */
131   uint32_t my_element_count;
132
133   /**
134    * size of the bloomfilter in @e bf_data.
135    */
136   uint32_t bf_data_size;
137
138   /**
139    * size of the bloomfilter
140    */
141   uint32_t bf_bits_per_element;
142
143   /**
144    * Salt currently used for BF construction (by us or the other peer,
145    * depending on where we are in the code).
146    */
147   uint32_t salt;
148
149   /**
150    * Current state of the operation.
151    */
152   enum IntersectionOperationPhase phase;
153
154   /**
155    * Generation in which the operation handle
156    * was created.
157    */
158   unsigned int generation_created;
159
160   /**
161    * Did we send the client that we are done?
162    */
163   int client_done_sent;
164 };
165
166
167 /**
168  * Extra state required for efficient set intersection.
169  * Merely tracks the total number of elements.
170  */
171 struct SetState
172 {
173   /**
174    * Number of currently valid elements in the set which have not been
175    * removed.
176    */
177   uint32_t current_set_element_count;
178 };
179
180
181 /**
182  * If applicable in the current operation mode, send a result message
183  * to the client indicating we removed an element.
184  *
185  * @param op intersection operation
186  * @param element element to send
187  */
188 static void
189 send_client_removed_element (struct Operation *op,
190                              struct GNUNET_SET_Element *element)
191 {
192   struct GNUNET_MQ_Envelope *ev;
193   struct GNUNET_SET_ResultMessage *rm;
194
195   if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
196     return; /* Wrong mode for transmitting removed elements */
197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
198               "Sending removed element (size %u) to client\n",
199               element->size);
200   GNUNET_assert (0 != op->spec->client_request_id);
201   ev = GNUNET_MQ_msg_extra (rm,
202                             element->size,
203                             GNUNET_MESSAGE_TYPE_SET_RESULT);
204   if (NULL == ev)
205   {
206     GNUNET_break (0);
207     return;
208   }
209   rm->result_status = htons (GNUNET_SET_STATUS_OK);
210   rm->request_id = htonl (op->spec->client_request_id);
211   rm->element_type = element->element_type;
212   memcpy (&rm[1],
213           element->data,
214           element->size);
215   GNUNET_MQ_send (op->spec->set->client_mq,
216                   ev);
217 }
218
219
220 /**
221  * Fills the "my_elements" hashmap with all relevant elements.
222  *
223  * @param cls the `struct Operation *` we are performing
224  * @param key current key code
225  * @param value the `struct ElementEntry *` from the hash map
226  * @return #GNUNET_YES (we should continue to iterate)
227  */
228 static int
229 filtered_map_initialization (void *cls,
230                              const struct GNUNET_HashCode *key,
231                              void *value)
232 {
233   struct Operation *op = cls;
234   struct ElementEntry *ee = value;
235   struct GNUNET_HashCode mutated_hash;
236
237
238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
239               "FIMA called for %s:%u\n",
240               GNUNET_h2s (&ee->element_hash),
241               ee->element.size);
242
243   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
244   {
245     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
247                 GNUNET_h2s (&ee->element_hash),
248                 ee->element.size);
249     return GNUNET_YES; /* element not valid in our operation's generation */
250   }
251
252   /* Test if element is in other peer's bloomfilter */
253   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
254                             op->state->salt,
255                             &mutated_hash);
256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257               "Testing mingled hash %s with salt %u\n",
258               GNUNET_h2s (&mutated_hash),
259               op->state->salt);
260   if (GNUNET_NO ==
261       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
262                                          &mutated_hash))
263   {
264     /* remove this element */
265     send_client_removed_element (op,
266                                  &ee->element);
267     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
268                 "Reduced initialization, not starting with %s:%u\n",
269                 GNUNET_h2s (&ee->element_hash),
270                 ee->element.size);
271     return GNUNET_YES;
272   }
273   op->state->my_element_count++;
274   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
275                           &ee->element_hash,
276                           &op->state->my_xor);
277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278               "Filtered initialization of my_elements, adding %s:%u\n",
279               GNUNET_h2s (&ee->element_hash),
280               ee->element.size);
281   GNUNET_break (GNUNET_YES ==
282                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
283                                                    &ee->element_hash,
284                                                    ee,
285                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
286
287   return GNUNET_YES;
288 }
289
290
291 /**
292  * Removes elements from our hashmap if they are not contained within the
293  * provided remote bloomfilter.
294  *
295  * @param cls closure with the `struct Operation *`
296  * @param key current key code
297  * @param value value in the hash map
298  * @return #GNUNET_YES (we should continue to iterate)
299  */
300 static int
301 iterator_bf_reduce (void *cls,
302                    const struct GNUNET_HashCode *key,
303                    void *value)
304 {
305   struct Operation *op = cls;
306   struct ElementEntry *ee = value;
307   struct GNUNET_HashCode mutated_hash;
308
309   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
310                             op->state->salt,
311                             &mutated_hash);
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Testing mingled hash %s with salt %u\n",
314               GNUNET_h2s (&mutated_hash),
315               op->state->salt);
316   if (GNUNET_NO ==
317       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
318                                          &mutated_hash))
319   {
320     GNUNET_break (0 < op->state->my_element_count);
321     op->state->my_element_count--;
322     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
323                             &ee->element_hash,
324                             &op->state->my_xor);
325     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326                 "Bloom filter reduction of my_elements, removing %s:%u\n",
327                 GNUNET_h2s (&ee->element_hash),
328                 ee->element.size);
329     GNUNET_assert (GNUNET_YES ==
330                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
331                                                          &ee->element_hash,
332                                                          ee));
333     send_client_removed_element (op,
334                                  &ee->element);
335   }
336   else
337   {
338     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
339                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
340                 GNUNET_h2s (&ee->element_hash),
341                 ee->element.size);
342   }
343   return GNUNET_YES;
344 }
345
346
347 /**
348  * Create initial bloomfilter based on all the elements given.
349  *
350  * @param cls the `struct Operation *`
351  * @param key current key code
352  * @param value the `struct ElementEntry` to process
353  * @return #GNUNET_YES (we should continue to iterate)
354  */
355 static int
356 iterator_bf_create (void *cls,
357                     const struct GNUNET_HashCode *key,
358                     void *value)
359 {
360   struct Operation *op = cls;
361   struct ElementEntry *ee = value;
362   struct GNUNET_HashCode mutated_hash;
363
364   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
365                             op->state->salt,
366                             &mutated_hash);
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Initializing BF with hash %s with salt %u\n",
369               GNUNET_h2s (&mutated_hash),
370               op->state->salt);
371   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
372                                     &mutated_hash);
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Inform the client that the intersection operation has failed,
379  * and proceed to destroy the evaluate operation.
380  *
381  * @param op the intersection operation to fail
382  */
383 static void
384 fail_intersection_operation (struct Operation *op)
385 {
386   struct GNUNET_MQ_Envelope *ev;
387   struct GNUNET_SET_ResultMessage *msg;
388
389   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
390               "Intersection operation failed\n");
391   if (NULL != op->state->my_elements)
392   {
393     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
394     op->state->my_elements = NULL;
395   }
396   ev = GNUNET_MQ_msg (msg,
397                       GNUNET_MESSAGE_TYPE_SET_RESULT);
398   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
399   msg->request_id = htonl (op->spec->client_request_id);
400   msg->element_type = htons (0);
401   GNUNET_MQ_send (op->spec->set->client_mq,
402                   ev);
403   _GSS_operation_destroy (op,
404                           GNUNET_YES);
405 }
406
407
408 /**
409  * Send a bloomfilter to our peer.  After the result done message has
410  * been sent to the client, destroy the evaluate operation.
411  *
412  * @param op intersection operation
413  */
414 static void
415 send_bloomfilter (struct Operation *op)
416 {
417   struct GNUNET_MQ_Envelope *ev;
418   struct BFMessage *msg;
419   uint32_t bf_size;
420   uint32_t bf_elementbits;
421   uint32_t chunk_size;
422   char *bf_data;
423   uint32_t offset;
424
425   /* We consider the ratio of the set sizes to determine
426      the number of bits per element, as the smaller set
427      should use more bits to maximize its set reduction
428      potential and minimize overall bandwidth consumption. */
429   bf_elementbits = 2 + ceil (log2((double)
430                              (op->spec->remote_element_count /
431                                    (double) op->state->my_element_count)));
432   if (bf_elementbits < 1)
433     bf_elementbits = 1; /* make sure k is not 0 */
434   /* optimize BF-size to ~50% of bits set */
435   bf_size = ceil ((double) (op->state->my_element_count
436                             * bf_elementbits / log(2)));
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Sending Bloom filter (%u) of size %u bytes\n",
439               (unsigned int) bf_elementbits,
440               (unsigned int) bf_size);
441   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
442                                                            bf_size,
443                                                            bf_elementbits);
444   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
445                                               UINT32_MAX);
446   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
447                                          &iterator_bf_create,
448                                          op);
449
450   /* send our Bloom filter */
451   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
452   if (bf_size <= chunk_size)
453   {
454     /* singlepart */
455     chunk_size = bf_size;
456     ev = GNUNET_MQ_msg_extra (msg,
457                               chunk_size,
458                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
459     GNUNET_assert (GNUNET_SYSERR !=
460                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
461                                                               (char*) &msg[1],
462                                                               bf_size));
463     msg->sender_element_count = htonl (op->state->my_element_count);
464     msg->bloomfilter_total_length = htonl (bf_size);
465     msg->bits_per_element = htonl (bf_elementbits);
466     msg->sender_mutator = htonl (op->state->salt);
467     msg->element_xor_hash = op->state->my_xor;
468     GNUNET_MQ_send (op->mq, ev);
469   }
470   else
471   {
472     /* multipart */
473     bf_data = GNUNET_malloc (bf_size);
474     GNUNET_assert (GNUNET_SYSERR !=
475                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
476                                                               bf_data,
477                                                               bf_size));
478     offset = 0;
479     while (offset < bf_size)
480     {
481       if (bf_size - chunk_size < offset)
482         chunk_size = bf_size - offset;
483       ev = GNUNET_MQ_msg_extra (msg,
484                                 chunk_size,
485                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
486       memcpy (&msg[1],
487               &bf_data[offset],
488               chunk_size);
489       offset += chunk_size;
490       msg->sender_element_count = htonl (op->state->my_element_count);
491       msg->bloomfilter_total_length = htonl (bf_size);
492       msg->bits_per_element = htonl (bf_elementbits);
493       msg->sender_mutator = htonl (op->state->salt);
494       msg->element_xor_hash = op->state->my_xor;
495       GNUNET_MQ_send (op->mq, ev);
496     }
497     GNUNET_free (bf_data);
498   }
499   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
500   op->state->local_bf = NULL;
501 }
502
503
504 /**
505  * Signal to the client that the operation has finished and
506  * destroy the operation.
507  *
508  * @param cls operation to destroy
509  */
510 static void
511 send_client_done_and_destroy (void *cls)
512 {
513   struct Operation *op = cls;
514   struct GNUNET_MQ_Envelope *ev;
515   struct GNUNET_SET_ResultMessage *rm;
516
517   ev = GNUNET_MQ_msg (rm,
518                       GNUNET_MESSAGE_TYPE_SET_RESULT);
519   rm->request_id = htonl (op->spec->client_request_id);
520   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
521   rm->element_type = htons (0);
522   GNUNET_MQ_send (op->spec->set->client_mq,
523                   ev);
524   _GSS_operation_destroy (op,
525                           GNUNET_YES);
526 }
527
528
529 /**
530  * Send all elements in the full result iterator.
531  *
532  * @param cls the `struct Operation *`
533  */
534 static void
535 send_remaining_elements (void *cls)
536 {
537   struct Operation *op = cls;
538   const void *nxt;
539   const struct ElementEntry *ee;
540   struct GNUNET_MQ_Envelope *ev;
541   struct GNUNET_SET_ResultMessage *rm;
542   const struct GNUNET_SET_Element *element;
543   int res;
544
545   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
546                                                      NULL,
547                                                      &nxt);
548   if (GNUNET_NO == res)
549   {
550     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551                 "Sending done and destroy because iterator ran out\n");
552     op->keep--;
553     send_client_done_and_destroy (op);
554     return;
555   }
556   ee = nxt;
557   element = &ee->element;
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Sending element %s:%u to client (full set)\n",
560               GNUNET_h2s (&ee->element_hash),
561               element->size);
562   GNUNET_assert (0 != op->spec->client_request_id);
563   ev = GNUNET_MQ_msg_extra (rm,
564                             element->size,
565                             GNUNET_MESSAGE_TYPE_SET_RESULT);
566   GNUNET_assert (NULL != ev);
567   rm->result_status = htons (GNUNET_SET_STATUS_OK);
568   rm->request_id = htonl (op->spec->client_request_id);
569   rm->element_type = element->element_type;
570   memcpy (&rm[1],
571           element->data,
572           element->size);
573   GNUNET_MQ_notify_sent (ev,
574                          &send_remaining_elements,
575                          op);
576   GNUNET_MQ_send (op->spec->set->client_mq,
577                   ev);
578 }
579
580
581 /**
582  * Inform the peer that this operation is complete.
583  *
584  * @param op the intersection operation to fail
585  */
586 static void
587 send_peer_done (struct Operation *op)
588 {
589   struct GNUNET_MQ_Envelope *ev;
590   struct IntersectionDoneMessage *idm;
591
592   op->state->phase = PHASE_FINISHED;
593   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594               "Intersection succeeded, sending DONE\n");
595   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
596   op->state->local_bf = NULL;
597
598   ev = GNUNET_MQ_msg (idm,
599                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
600   idm->final_element_count = htonl (op->state->my_element_count);
601   idm->element_xor_hash = op->state->my_xor;
602   GNUNET_MQ_send (op->mq,
603                   ev);
604 }
605
606
607 /**
608  * Process a Bloomfilter once we got all the chunks.
609  *
610  * @param op the intersection operation
611  */
612 static void
613 process_bf (struct Operation *op)
614 {
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
617               op->state->phase,
618               op->spec->remote_element_count,
619               op->state->my_element_count,
620               GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
621   switch (op->state->phase)
622   {
623   case PHASE_INITIAL:
624     GNUNET_break_op (0);
625     fail_intersection_operation(op);
626     return;
627   case PHASE_COUNT_SENT:
628     /* This is the first BF being sent, build our initial map with
629        filtering in place */
630     op->state->my_elements
631       = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
632                                               GNUNET_YES);
633     op->state->my_element_count = 0;
634     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
635                                            &filtered_map_initialization,
636                                            op);
637     break;
638   case PHASE_BF_EXCHANGE:
639     /* Update our set by reduction */
640     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
641                                            &iterator_bf_reduce,
642                                            op);
643     break;
644   case PHASE_FINISHED:
645     GNUNET_break_op (0);
646     fail_intersection_operation(op);
647     return;
648   }
649   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
650   op->state->remote_bf = NULL;
651
652   if ( (0 == op->state->my_element_count) || /* fully disjoint */
653        ( (op->state->my_element_count == op->spec->remote_element_count) &&
654          (0 == memcmp (&op->state->my_xor,
655                        &op->state->other_xor,
656                        sizeof (struct GNUNET_HashCode))) ) )
657   {
658     /* we are done */
659     send_peer_done (op);
660     return;
661   }
662   op->state->phase = PHASE_BF_EXCHANGE;
663   send_bloomfilter (op);
664 }
665
666
667 /**
668  * Handle an BF message from a remote peer.
669  *
670  * @param cls the intersection operation
671  * @param mh the header of the message
672  */
673 static void
674 handle_p2p_bf (void *cls,
675                const struct GNUNET_MessageHeader *mh)
676 {
677   struct Operation *op = cls;
678   const struct BFMessage *msg;
679   uint32_t bf_size;
680   uint32_t chunk_size;
681   uint32_t bf_bits_per_element;
682   uint16_t msize;
683
684   msize = htons (mh->size);
685   if (msize < sizeof (struct BFMessage))
686   {
687     GNUNET_break_op (0);
688     fail_intersection_operation (op);
689     return;
690   }
691   msg = (const struct BFMessage *) mh;
692   switch (op->state->phase)
693   {
694   case PHASE_INITIAL:
695     GNUNET_break_op (0);
696     fail_intersection_operation (op);
697     break;
698   case PHASE_COUNT_SENT:
699   case PHASE_BF_EXCHANGE:
700     bf_size = ntohl (msg->bloomfilter_total_length);
701     bf_bits_per_element = ntohl (msg->bits_per_element);
702     chunk_size = msize - sizeof (struct BFMessage);
703     op->state->other_xor = msg->element_xor_hash;
704     if (bf_size == chunk_size)
705     {
706       if (NULL != op->state->bf_data)
707       {
708         GNUNET_break_op (0);
709         fail_intersection_operation (op);
710         return;
711       }
712       /* single part, done here immediately */
713       op->state->remote_bf
714         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
715                                              bf_size,
716                                              bf_bits_per_element);
717       op->state->salt = ntohl (msg->sender_mutator);
718       op->spec->remote_element_count = ntohl (msg->sender_element_count);
719       process_bf (op);
720       return;
721     }
722     /* multipart chunk */
723     if (NULL == op->state->bf_data)
724     {
725       /* first chunk, initialize */
726       op->state->bf_data = GNUNET_malloc (bf_size);
727       op->state->bf_data_size = bf_size;
728       op->state->bf_bits_per_element = bf_bits_per_element;
729       op->state->bf_data_offset = 0;
730       op->state->salt = ntohl (msg->sender_mutator);
731       op->spec->remote_element_count = ntohl (msg->sender_element_count);
732     }
733     else
734     {
735       /* increment */
736       if ( (op->state->bf_data_size != bf_size) ||
737            (op->state->bf_bits_per_element != bf_bits_per_element) ||
738            (op->state->bf_data_offset + chunk_size > bf_size) ||
739            (op->state->salt != ntohl (msg->sender_mutator)) ||
740            (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
741       {
742         GNUNET_break_op (0);
743         fail_intersection_operation (op);
744         return;
745       }
746     }
747     memcpy (&op->state->bf_data[op->state->bf_data_offset],
748             (const char*) &msg[1],
749             chunk_size);
750     op->state->bf_data_offset += chunk_size;
751     if (op->state->bf_data_offset == bf_size)
752     {
753       /* last chunk, run! */
754       op->state->remote_bf
755         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
756                                              bf_size,
757                                              bf_bits_per_element);
758       GNUNET_free (op->state->bf_data);
759       op->state->bf_data = NULL;
760       op->state->bf_data_size = 0;
761       process_bf (op);
762     }
763     break;
764   default:
765     GNUNET_break_op (0);
766     fail_intersection_operation (op);
767     break;
768   }
769 }
770
771
772 /**
773  * Fills the "my_elements" hashmap with the initial set of
774  * (non-deleted) elements from the set of the specification.
775  *
776  * @param cls closure with the `struct Operation *`
777  * @param key current key code for the element
778  * @param value value in the hash map with the `struct ElementEntry *`
779  * @return #GNUNET_YES (we should continue to iterate)
780  */
781 static int
782 initialize_map_unfiltered (void *cls,
783                            const struct GNUNET_HashCode *key,
784                            void *value)
785 {
786   struct ElementEntry *ee = value;
787   struct Operation *op = cls;
788
789   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
790     return GNUNET_YES; /* element not live in operation's generation */
791   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
792                           &ee->element_hash,
793                           &op->state->my_xor);
794   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795               "Initial full initialization of my_elements, adding %s:%u\n",
796               GNUNET_h2s (&ee->element_hash),
797               ee->element.size);
798   GNUNET_break (GNUNET_YES ==
799                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
800                                                    &ee->element_hash,
801                                                    ee,
802                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * Send our element count to the peer, in case our element count is
809  * lower than his.
810  *
811  * @param op intersection operation
812  */
813 static void
814 send_element_count (struct Operation *op)
815 {
816   struct GNUNET_MQ_Envelope *ev;
817   struct IntersectionElementInfoMessage *msg;
818
819   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820               "Sending our element count (%u)\n",
821               op->state->my_element_count);
822   ev = GNUNET_MQ_msg (msg,
823                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
824   msg->sender_element_count = htonl (op->state->my_element_count);
825   GNUNET_MQ_send (op->mq, ev);
826 }
827
828
829 /**
830  * We go first, initialize our map with all elements and
831  * send the first Bloom filter.
832  *
833  * @param op operation to start exchange for
834  */
835 static void
836 begin_bf_exchange (struct Operation *op)
837 {
838   op->state->phase = PHASE_BF_EXCHANGE;
839   op->state->my_elements
840     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
841                                             GNUNET_YES);
842   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
843                                          &initialize_map_unfiltered,
844                                          op);
845   send_bloomfilter (op);
846 }
847
848
849 /**
850  * Handle the initial `struct IntersectionElementInfoMessage` from a
851  * remote peer.
852  *
853  * @param cls the intersection operation
854  * @param mh the header of the message
855  */
856 static void
857 handle_p2p_element_info (void *cls,
858                          const struct GNUNET_MessageHeader *mh)
859 {
860   struct Operation *op = cls;
861   const struct IntersectionElementInfoMessage *msg;
862
863   if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
864   {
865     GNUNET_break_op (0);
866     fail_intersection_operation(op);
867     return;
868   }
869   msg = (const struct IntersectionElementInfoMessage *) mh;
870   op->spec->remote_element_count = ntohl (msg->sender_element_count);
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872               "Received remote element count (%u), I have %u\n",
873               op->spec->remote_element_count,
874               op->state->my_element_count);
875   if ( ( (PHASE_INITIAL != op->state->phase) &&
876          (PHASE_COUNT_SENT != op->state->phase) ) ||
877        (op->state->my_element_count > op->spec->remote_element_count) ||
878        (0 == op->state->my_element_count) ||
879        (0 == op->spec->remote_element_count) )
880   {
881     GNUNET_break_op (0);
882     fail_intersection_operation(op);
883     return;
884   }
885   GNUNET_break (NULL == op->state->remote_bf);
886   begin_bf_exchange (op);
887 }
888
889
890 /**
891  * Send a result message to the client indicating that the operation
892  * is over.  After the result done message has been sent to the
893  * client, destroy the evaluate operation.
894  *
895  * @param op intersection operation
896  */
897 static void
898 finish_and_destroy (struct Operation *op)
899 {
900   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
901
902   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
903   {
904     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905                 "Sending full result set (%u elements)\n",
906                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
907     op->state->full_result_iter
908       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
909     op->keep++;
910     send_remaining_elements (op);
911     return;
912   }
913   send_client_done_and_destroy (op);
914 }
915
916
917 /**
918  * Remove all elements from our hashmap.
919  *
920  * @param cls closure with the `struct Operation *`
921  * @param key current key code
922  * @param value value in the hash map
923  * @return #GNUNET_YES (we should continue to iterate)
924  */
925 static int
926 filter_all (void *cls,
927             const struct GNUNET_HashCode *key,
928             void *value)
929 {
930   struct Operation *op = cls;
931   struct ElementEntry *ee = value;
932
933   GNUNET_break (0 < op->state->my_element_count);
934   op->state->my_element_count--;
935   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
936                           &ee->element_hash,
937                           &op->state->my_xor);
938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939               "Final reduction of my_elements, removing %s:%u\n",
940               GNUNET_h2s (&ee->element_hash),
941               ee->element.size);
942   GNUNET_assert (GNUNET_YES ==
943                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
944                                                        &ee->element_hash,
945                                                        ee));
946   send_client_removed_element (op,
947                                &ee->element);
948   return GNUNET_YES;
949 }
950
951
952 /**
953  * Handle a done message from a remote peer
954  *
955  * @param cls the intersection operation
956  * @param mh the message
957  */
958 static void
959 handle_p2p_done (void *cls,
960                  const struct GNUNET_MessageHeader *mh)
961 {
962   struct Operation *op = cls;
963   const struct IntersectionDoneMessage *idm;
964
965   if (PHASE_BF_EXCHANGE != op->state->phase)
966   {
967     /* wrong phase to conclude? FIXME: Or should we allow this
968        if the other peer has _initially_ already an empty set? */
969     GNUNET_break_op (0);
970     fail_intersection_operation (op);
971     return;
972   }
973   if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
974   {
975     GNUNET_break_op (0);
976     fail_intersection_operation (op);
977     return;
978   }
979   idm = (const struct IntersectionDoneMessage *) mh;
980   if (0 == ntohl (idm->final_element_count))
981   {
982     /* other peer determined empty set is the intersection,
983        remove all elements */
984     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
985                                            &filter_all,
986                                            op);
987   }
988   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
989        (0 != memcmp (&op->state->my_xor,
990                      &idm->element_xor_hash,
991                      sizeof (struct GNUNET_HashCode))) )
992   {
993     /* Other peer thinks we are done, but we disagree on the result! */
994     GNUNET_break_op (0);
995     fail_intersection_operation (op);
996     return;
997   }
998   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
999               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1000               op->state->my_element_count);
1001   op->state->phase = PHASE_FINISHED;
1002   finish_and_destroy (op);
1003 }
1004
1005
1006 /**
1007  * Initiate a set intersection operation with a remote peer.
1008  *
1009  * @param op operation that is created, should be initialized to
1010  *        begin the evaluation
1011  * @param opaque_context message to be transmitted to the listener
1012  *        to convince him to accept, may be NULL
1013  */
1014 static void
1015 intersection_evaluate (struct Operation *op,
1016                        const struct GNUNET_MessageHeader *opaque_context)
1017 {
1018   struct GNUNET_MQ_Envelope *ev;
1019   struct OperationRequestMessage *msg;
1020
1021   op->state = GNUNET_new (struct OperationState);
1022   /* we started the operation, thus we have to send the operation request */
1023   op->state->phase = PHASE_INITIAL;
1024   op->state->my_element_count = op->spec->set->state->current_set_element_count;
1025
1026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027               "Initiating intersection operation evaluation\n");
1028   ev = GNUNET_MQ_msg_nested_mh (msg,
1029                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1030                                 opaque_context);
1031   if (NULL == ev)
1032   {
1033     /* the context message is too large!? */
1034     GNUNET_break (0);
1035     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1036     return;
1037   }
1038   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1039   msg->app_id = op->spec->app_id;
1040   msg->element_count = htonl (op->state->my_element_count);
1041   GNUNET_MQ_send (op->mq,
1042                   ev);
1043   op->state->phase = PHASE_COUNT_SENT;
1044   if (NULL != opaque_context)
1045     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046                 "Sent op request with context message\n");
1047   else
1048     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049                 "Sent op request without context message\n");
1050 }
1051
1052
1053 /**
1054  * Accept an intersection operation request from a remote peer.  Only
1055  * initializes the private operation state.
1056  *
1057  * @param op operation that will be accepted as an intersection operation
1058  */
1059 static void
1060 intersection_accept (struct Operation *op)
1061 {
1062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063               "Accepting set intersection operation\n");
1064   op->state = GNUNET_new (struct OperationState);
1065   op->state->phase = PHASE_INITIAL;
1066   op->state->my_element_count
1067     = op->spec->set->state->current_set_element_count;
1068   op->state->my_elements
1069     = GNUNET_CONTAINER_multihashmap_create
1070     (GNUNET_MIN (op->state->my_element_count,
1071                  op->spec->remote_element_count),
1072      GNUNET_YES);
1073   if (op->spec->remote_element_count < op->state->my_element_count)
1074   {
1075     /* If the other peer (Alice) has fewer elements than us (Bob),
1076        we just send the count as Alice should send the first BF */
1077     send_element_count (op);
1078     op->state->phase = PHASE_COUNT_SENT;
1079     return;
1080   }
1081   /* We have fewer elements, so we start with the BF */
1082   begin_bf_exchange (op);
1083 }
1084
1085
1086 /**
1087  * Dispatch messages for a intersection operation.
1088  *
1089  * @param op the state of the intersection evaluate operation
1090  * @param mh the received message
1091  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1092  *         #GNUNET_OK otherwise
1093  */
1094 static int
1095 intersection_handle_p2p_message (struct Operation *op,
1096                                  const struct GNUNET_MessageHeader *mh)
1097 {
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099               "Received p2p message (t: %u, s: %u)\n",
1100               ntohs (mh->type), ntohs (mh->size));
1101   switch (ntohs (mh->type))
1102   {
1103     /* this message handler is not active until after we received an
1104      * operation request message, thus the ops request is not handled here
1105      */
1106   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1107     handle_p2p_element_info (op, mh);
1108     break;
1109   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1110     handle_p2p_bf (op, mh);
1111     break;
1112   case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1113     handle_p2p_done (op, mh);
1114     break;
1115   default:
1116     /* something wrong with cadet's message handlers? */
1117     GNUNET_assert (0);
1118   }
1119   return GNUNET_OK;
1120 }
1121
1122
1123 /**
1124  * Handler for peer-disconnects, notifies the client about the aborted
1125  * operation.  If we did not expect anything from the other peer, we
1126  * gracefully terminate the operation.
1127  *
1128  * @param op the destroyed operation
1129  */
1130 static void
1131 intersection_peer_disconnect (struct Operation *op)
1132 {
1133   if (PHASE_FINISHED != op->state->phase)
1134   {
1135     fail_intersection_operation (op);
1136     return;
1137   }
1138   /* the session has already been concluded */
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Other peer disconnected (finished)\n");
1141   if (GNUNET_NO == op->state->client_done_sent)
1142     finish_and_destroy (op);
1143 }
1144
1145
1146 /**
1147  * Destroy the intersection operation.  Only things specific to the
1148  * intersection operation are destroyed.
1149  *
1150  * @param op intersection operation to destroy
1151  */
1152 static void
1153 intersection_op_cancel (struct Operation *op)
1154 {
1155   /* check if the op was canceled twice */
1156   GNUNET_assert (NULL != op->state);
1157   if (NULL != op->state->remote_bf)
1158   {
1159     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1160     op->state->remote_bf = NULL;
1161   }
1162   if (NULL != op->state->local_bf)
1163   {
1164     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1165     op->state->local_bf = NULL;
1166   }
1167   if (NULL != op->state->my_elements)
1168   {
1169     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1170     op->state->my_elements = NULL;
1171   }
1172   GNUNET_free (op->state);
1173   op->state = NULL;
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175               "Destroying intersection op state done\n");
1176 }
1177
1178
1179 /**
1180  * Create a new set supporting the intersection operation.
1181  *
1182  * @return the newly created set
1183  */
1184 static struct SetState *
1185 intersection_set_create ()
1186 {
1187   struct SetState *set_state;
1188
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190               "Intersection set created\n");
1191   set_state = GNUNET_new (struct SetState);
1192   set_state->current_set_element_count = 0;
1193
1194   return set_state;
1195 }
1196
1197
1198 /**
1199  * Add the element from the given element message to the set.
1200  *
1201  * @param set_state state of the set want to add to
1202  * @param ee the element to add to the set
1203  */
1204 static void
1205 intersection_add (struct SetState *set_state,
1206                   struct ElementEntry *ee)
1207 {
1208   set_state->current_set_element_count++;
1209 }
1210
1211
1212 /**
1213  * Destroy a set that supports the intersection operation
1214  *
1215  * @param set_state the set to destroy
1216  */
1217 static void
1218 intersection_set_destroy (struct SetState *set_state)
1219 {
1220   GNUNET_free (set_state);
1221 }
1222
1223
1224 /**
1225  * Remove the element given in the element message from the set.
1226  *
1227  * @param set_state state of the set to remove from
1228  * @param element set element to remove
1229  */
1230 static void
1231 intersection_remove (struct SetState *set_state,
1232                      struct ElementEntry *element)
1233 {
1234   GNUNET_assert (0 < set_state->current_set_element_count);
1235   set_state->current_set_element_count--;
1236 }
1237
1238
1239 /**
1240  * Get the table with implementing functions for set intersection.
1241  *
1242  * @return the operation specific VTable
1243  */
1244 const struct SetVT *
1245 _GSS_intersection_vt ()
1246 {
1247   static const struct SetVT intersection_vt = {
1248     .create = &intersection_set_create,
1249     .msg_handler = &intersection_handle_p2p_message,
1250     .add = &intersection_add,
1251     .remove = &intersection_remove,
1252     .destroy_set = &intersection_set_destroy,
1253     .evaluate = &intersection_evaluate,
1254     .accept = &intersection_accept,
1255     .peer_disconnect = &intersection_peer_disconnect,
1256     .cancel = &intersection_op_cancel,
1257   };
1258
1259   return &intersection_vt;
1260 }