-comments: the world ain't all male
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14      
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set_intersection.c
20  * @brief two-peer set intersection
21  * @author Christian Fuchs
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet-service-set.h"
27 #include "gnunet_block_lib.h"
28 #include "gnunet-service-set_protocol.h"
29 #include "gnunet-service-set_intersection.h"
30 #include <gcrypt.h>
31
32
33 /**
34  * Current phase we are in for a intersection operation.
35  */
36 enum IntersectionOperationPhase
37 {
38   /**
39    * We are just starting.
40    */
41   PHASE_INITIAL,
42
43   /**
44    * We have send the number of our elements to the other
45    * peer, but did not setup our element set yet.
46    */
47   PHASE_COUNT_SENT,
48
49   /**
50    * We have initialized our set and are now reducing it by exchanging
51    * Bloom filters until one party notices the their element hashes
52    * are equal.
53    */
54   PHASE_BF_EXCHANGE,
55
56   /**
57    * We must next send the P2P DONE message (after finishing mostly
58    * with the local client).  Then we will wait for the channel to close.
59    */
60   PHASE_MUST_SEND_DONE,
61
62   /**
63    * We have received the P2P DONE message, and must finish with the
64    * local client before terminating the channel.
65    */
66   PHASE_DONE_RECEIVED,
67
68   /**
69    * The protocol is over.  Results may still have to be sent to the
70    * client.
71    */
72   PHASE_FINISHED
73
74 };
75
76
77 /**
78  * State of an evaluate operation with another peer.
79  */
80 struct OperationState
81 {
82   /**
83    * The bf we currently receive
84    */
85   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
86
87   /**
88    * BF of the set's element.
89    */
90   struct GNUNET_CONTAINER_BloomFilter *local_bf;
91
92   /**
93    * Remaining elements in the intersection operation.
94    * Maps element-id-hashes to 'elements in our set'.
95    */
96   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
97
98   /**
99    * Iterator for sending the final set of @e my_elements to the client.
100    */
101   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
102
103   /**
104    * Evaluate operations are held in a linked list.
105    */
106   struct OperationState *next;
107
108   /**
109    * Evaluate operations are held in a linked list.
110    */
111   struct OperationState *prev;
112
113   /**
114    * For multipart BF transmissions, we have to store the
115    * bloomfilter-data until we fully received it.
116    */
117   char *bf_data;
118
119   /**
120    * XOR of the keys of all of the elements (remaining) in my set.
121    * Always updated when elements are added or removed to
122    * @e my_elements.
123    */
124   struct GNUNET_HashCode my_xor;
125
126   /**
127    * XOR of the keys of all of the elements (remaining) in
128    * the other peer's set.  Updated when we receive the
129    * other peer's Bloom filter.
130    */
131   struct GNUNET_HashCode other_xor;
132
133   /**
134    * How many bytes of @e bf_data are valid?
135    */
136   uint32_t bf_data_offset;
137
138   /**
139    * Current element count contained within @e my_elements.
140    * (May differ briefly during initialization.)
141    */
142   uint32_t my_element_count;
143
144   /**
145    * size of the bloomfilter in @e bf_data.
146    */
147   uint32_t bf_data_size;
148
149   /**
150    * size of the bloomfilter
151    */
152   uint32_t bf_bits_per_element;
153
154   /**
155    * Salt currently used for BF construction (by us or the other peer,
156    * depending on where we are in the code).
157    */
158   uint32_t salt;
159
160   /**
161    * Current state of the operation.
162    */
163   enum IntersectionOperationPhase phase;
164
165   /**
166    * Generation in which the operation handle
167    * was created.
168    */
169   unsigned int generation_created;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Set whenever we reach the state where the death of the
178    * channel is perfectly find and should NOT result in the
179    * operation being cancelled.
180    */
181   int channel_death_expected;
182 };
183
184
185 /**
186  * Extra state required for efficient set intersection.
187  * Merely tracks the total number of elements.
188  */
189 struct SetState
190 {
191   /**
192    * Number of currently valid elements in the set which have not been
193    * removed.
194    */
195   uint32_t current_set_element_count;
196 };
197
198
199 /**
200  * If applicable in the current operation mode, send a result message
201  * to the client indicating we removed an element.
202  *
203  * @param op intersection operation
204  * @param element element to send
205  */
206 static void
207 send_client_removed_element (struct Operation *op,
208                              struct GNUNET_SET_Element *element)
209 {
210   struct GNUNET_MQ_Envelope *ev;
211   struct GNUNET_SET_ResultMessage *rm;
212
213   if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
214     return; /* Wrong mode for transmitting removed elements */
215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
216               "Sending removed element (size %u) to client\n",
217               element->size);
218   GNUNET_assert (0 != op->client_request_id);
219   ev = GNUNET_MQ_msg_extra (rm,
220                             element->size,
221                             GNUNET_MESSAGE_TYPE_SET_RESULT);
222   if (NULL == ev)
223   {
224     GNUNET_break (0);
225     return;
226   }
227   rm->result_status = htons (GNUNET_SET_STATUS_OK);
228   rm->request_id = htonl (op->client_request_id);
229   rm->element_type = element->element_type;
230   GNUNET_memcpy (&rm[1],
231                  element->data,
232                  element->size);
233   GNUNET_MQ_send (op->set->cs->mq,
234                   ev);
235 }
236
237
238 /**
239  * Fills the "my_elements" hashmap with all relevant elements.
240  *
241  * @param cls the `struct Operation *` we are performing
242  * @param key current key code
243  * @param value the `struct ElementEntry *` from the hash map
244  * @return #GNUNET_YES (we should continue to iterate)
245  */
246 static int
247 filtered_map_initialization (void *cls,
248                              const struct GNUNET_HashCode *key,
249                              void *value)
250 {
251   struct Operation *op = cls;
252   struct ElementEntry *ee = value;
253   struct GNUNET_HashCode mutated_hash;
254
255
256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257               "FIMA called for %s:%u\n",
258               GNUNET_h2s (&ee->element_hash),
259               ee->element.size);
260
261   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
262   {
263     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
264                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
265                 GNUNET_h2s (&ee->element_hash),
266                 ee->element.size);
267     return GNUNET_YES; /* element not valid in our operation's generation */
268   }
269
270   /* Test if element is in other peer's bloomfilter */
271   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
272                             op->state->salt,
273                             &mutated_hash);
274   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
275               "Testing mingled hash %s with salt %u\n",
276               GNUNET_h2s (&mutated_hash),
277               op->state->salt);
278   if (GNUNET_NO ==
279       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
280                                          &mutated_hash))
281   {
282     /* remove this element */
283     send_client_removed_element (op,
284                                  &ee->element);
285     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
286                 "Reduced initialization, not starting with %s:%u\n",
287                 GNUNET_h2s (&ee->element_hash),
288                 ee->element.size);
289     return GNUNET_YES;
290   }
291   op->state->my_element_count++;
292   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
293                           &ee->element_hash,
294                           &op->state->my_xor);
295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
296               "Filtered initialization of my_elements, adding %s:%u\n",
297               GNUNET_h2s (&ee->element_hash),
298               ee->element.size);
299   GNUNET_break (GNUNET_YES ==
300                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
301                                                    &ee->element_hash,
302                                                    ee,
303                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
304
305   return GNUNET_YES;
306 }
307
308
309 /**
310  * Removes elements from our hashmap if they are not contained within the
311  * provided remote bloomfilter.
312  *
313  * @param cls closure with the `struct Operation *`
314  * @param key current key code
315  * @param value value in the hash map
316  * @return #GNUNET_YES (we should continue to iterate)
317  */
318 static int
319 iterator_bf_reduce (void *cls,
320                    const struct GNUNET_HashCode *key,
321                    void *value)
322 {
323   struct Operation *op = cls;
324   struct ElementEntry *ee = value;
325   struct GNUNET_HashCode mutated_hash;
326
327   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
328                             op->state->salt,
329                             &mutated_hash);
330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
331               "Testing mingled hash %s with salt %u\n",
332               GNUNET_h2s (&mutated_hash),
333               op->state->salt);
334   if (GNUNET_NO ==
335       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
336                                          &mutated_hash))
337   {
338     GNUNET_break (0 < op->state->my_element_count);
339     op->state->my_element_count--;
340     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
341                             &ee->element_hash,
342                             &op->state->my_xor);
343     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344                 "Bloom filter reduction of my_elements, removing %s:%u\n",
345                 GNUNET_h2s (&ee->element_hash),
346                 ee->element.size);
347     GNUNET_assert (GNUNET_YES ==
348                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
349                                                          &ee->element_hash,
350                                                          ee));
351     send_client_removed_element (op,
352                                  &ee->element);
353   }
354   else
355   {
356     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
358                 GNUNET_h2s (&ee->element_hash),
359                 ee->element.size);
360   }
361   return GNUNET_YES;
362 }
363
364
365 /**
366  * Create initial bloomfilter based on all the elements given.
367  *
368  * @param cls the `struct Operation *`
369  * @param key current key code
370  * @param value the `struct ElementEntry` to process
371  * @return #GNUNET_YES (we should continue to iterate)
372  */
373 static int
374 iterator_bf_create (void *cls,
375                     const struct GNUNET_HashCode *key,
376                     void *value)
377 {
378   struct Operation *op = cls;
379   struct ElementEntry *ee = value;
380   struct GNUNET_HashCode mutated_hash;
381
382   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
383                             op->state->salt,
384                             &mutated_hash);
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Initializing BF with hash %s with salt %u\n",
387               GNUNET_h2s (&mutated_hash),
388               op->state->salt);
389   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
390                                     &mutated_hash);
391   return GNUNET_YES;
392 }
393
394
395 /**
396  * Inform the client that the intersection operation has failed,
397  * and proceed to destroy the evaluate operation.
398  *
399  * @param op the intersection operation to fail
400  */
401 static void
402 fail_intersection_operation (struct Operation *op)
403 {
404   struct GNUNET_MQ_Envelope *ev;
405   struct GNUNET_SET_ResultMessage *msg;
406
407   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
408               "Intersection operation failed\n");
409   if (NULL != op->state->my_elements)
410   {
411     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
412     op->state->my_elements = NULL;
413   }
414   ev = GNUNET_MQ_msg (msg,
415                       GNUNET_MESSAGE_TYPE_SET_RESULT);
416   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
417   msg->request_id = htonl (op->client_request_id);
418   msg->element_type = htons (0);
419   GNUNET_MQ_send (op->set->cs->mq,
420                   ev);
421   _GSS_operation_destroy (op,
422                           GNUNET_YES);
423 }
424
425
426 /**
427  * Send a bloomfilter to our peer.  After the result done message has
428  * been sent to the client, destroy the evaluate operation.
429  *
430  * @param op intersection operation
431  */
432 static void
433 send_bloomfilter (struct Operation *op)
434 {
435   struct GNUNET_MQ_Envelope *ev;
436   struct BFMessage *msg;
437   uint32_t bf_size;
438   uint32_t bf_elementbits;
439   uint32_t chunk_size;
440   char *bf_data;
441   uint32_t offset;
442
443   /* We consider the ratio of the set sizes to determine
444      the number of bits per element, as the smaller set
445      should use more bits to maximize its set reduction
446      potential and minimize overall bandwidth consumption. */
447   bf_elementbits = 2 + ceil (log2((double)
448                              (op->remote_element_count /
449                               (double) op->state->my_element_count)));
450   if (bf_elementbits < 1)
451     bf_elementbits = 1; /* make sure k is not 0 */
452   /* optimize BF-size to ~50% of bits set */
453   bf_size = ceil ((double) (op->state->my_element_count
454                             * bf_elementbits / log(2)));
455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
456               "Sending Bloom filter (%u) of size %u bytes\n",
457               (unsigned int) bf_elementbits,
458               (unsigned int) bf_size);
459   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
460                                                            bf_size,
461                                                            bf_elementbits);
462   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
463                                               UINT32_MAX);
464   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
465                                          &iterator_bf_create,
466                                          op);
467
468   /* send our Bloom filter */
469   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
470   if (bf_size <= chunk_size)
471   {
472     /* singlepart */
473     chunk_size = bf_size;
474     ev = GNUNET_MQ_msg_extra (msg,
475                               chunk_size,
476                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
477     GNUNET_assert (GNUNET_SYSERR !=
478                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
479                                                               (char*) &msg[1],
480                                                               bf_size));
481     msg->sender_element_count = htonl (op->state->my_element_count);
482     msg->bloomfilter_total_length = htonl (bf_size);
483     msg->bits_per_element = htonl (bf_elementbits);
484     msg->sender_mutator = htonl (op->state->salt);
485     msg->element_xor_hash = op->state->my_xor;
486     GNUNET_MQ_send (op->mq, ev);
487   }
488   else
489   {
490     /* multipart */
491     bf_data = GNUNET_malloc (bf_size);
492     GNUNET_assert (GNUNET_SYSERR !=
493                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
494                                                               bf_data,
495                                                               bf_size));
496     offset = 0;
497     while (offset < bf_size)
498     {
499       if (bf_size - chunk_size < offset)
500         chunk_size = bf_size - offset;
501       ev = GNUNET_MQ_msg_extra (msg,
502                                 chunk_size,
503                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
504       GNUNET_memcpy (&msg[1],
505               &bf_data[offset],
506               chunk_size);
507       offset += chunk_size;
508       msg->sender_element_count = htonl (op->state->my_element_count);
509       msg->bloomfilter_total_length = htonl (bf_size);
510       msg->bits_per_element = htonl (bf_elementbits);
511       msg->sender_mutator = htonl (op->state->salt);
512       msg->element_xor_hash = op->state->my_xor;
513       GNUNET_MQ_send (op->mq, ev);
514     }
515     GNUNET_free (bf_data);
516   }
517   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
518   op->state->local_bf = NULL;
519 }
520
521
522 /**
523  * Signal to the client that the operation has finished and
524  * destroy the operation.
525  *
526  * @param cls operation to destroy
527  */
528 static void
529 send_client_done_and_destroy (void *cls)
530 {
531   struct Operation *op = cls;
532   struct GNUNET_MQ_Envelope *ev;
533   struct GNUNET_SET_ResultMessage *rm;
534
535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536               "Intersection succeeded, sending DONE to local client\n");
537   ev = GNUNET_MQ_msg (rm,
538                       GNUNET_MESSAGE_TYPE_SET_RESULT);
539   rm->request_id = htonl (op->client_request_id);
540   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
541   rm->element_type = htons (0);
542   GNUNET_MQ_send (op->set->cs->mq,
543                   ev);
544   _GSS_operation_destroy (op,
545                           GNUNET_YES);
546 }
547
548
549 /**
550  * Remember that we are done dealing with the local client
551  * AND have sent the other peer our message that we are done,
552  * so we are not just waiting for the channel to die before
553  * telling the local client that we are done as our last act.
554  *
555  * @param cls the `struct Operation`.
556  */
557 static void
558 finished_local_operations (void *cls)
559 {
560   struct Operation *op = cls;
561
562   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563               "DONE sent to other peer, now waiting for other end to close the channel\n");
564   op->state->phase = PHASE_FINISHED;
565   op->state->channel_death_expected = GNUNET_YES;
566 }
567
568
569 /**
570  * Notify the other peer that we are done.  Once this message
571  * is out, we still need to notify the local client that we
572  * are done.
573  *
574  * @param op operation to notify for.
575  */
576 static void
577 send_p2p_done (struct Operation *op)
578 {
579   struct GNUNET_MQ_Envelope *ev;
580   struct IntersectionDoneMessage *idm;
581
582   GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
583   GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
584   ev = GNUNET_MQ_msg (idm,
585                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
586   idm->final_element_count = htonl (op->state->my_element_count);
587   idm->element_xor_hash = op->state->my_xor;
588   GNUNET_MQ_notify_sent (ev,
589                          &finished_local_operations,
590                          op);
591   GNUNET_MQ_send (op->mq,
592                   ev);
593 }
594
595
596 /**
597  * Send all elements in the full result iterator.
598  *
599  * @param cls the `struct Operation *`
600  */
601 static void
602 send_remaining_elements (void *cls)
603 {
604   struct Operation *op = cls;
605   const void *nxt;
606   const struct ElementEntry *ee;
607   struct GNUNET_MQ_Envelope *ev;
608   struct GNUNET_SET_ResultMessage *rm;
609   const struct GNUNET_SET_Element *element;
610   int res;
611
612   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
613                                                      NULL,
614                                                      &nxt);
615   if (GNUNET_NO == res)
616   {
617     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                 "Sending done and destroy because iterator ran out\n");
619     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
620     op->state->full_result_iter = NULL;
621     if (PHASE_DONE_RECEIVED == op->state->phase)
622     {
623       op->state->phase = PHASE_FINISHED;
624       send_client_done_and_destroy (op);
625     }
626     else if (PHASE_MUST_SEND_DONE == op->state->phase)
627     {
628       send_p2p_done (op);
629     }
630     else
631     {
632       GNUNET_assert (0);
633     }
634     return;
635   }
636   ee = nxt;
637   element = &ee->element;
638   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639               "Sending element %s:%u to client (full set)\n",
640               GNUNET_h2s (&ee->element_hash),
641               element->size);
642   GNUNET_assert (0 != op->client_request_id);
643   ev = GNUNET_MQ_msg_extra (rm,
644                             element->size,
645                             GNUNET_MESSAGE_TYPE_SET_RESULT);
646   GNUNET_assert (NULL != ev);
647   rm->result_status = htons (GNUNET_SET_STATUS_OK);
648   rm->request_id = htonl (op->client_request_id);
649   rm->element_type = element->element_type;
650   GNUNET_memcpy (&rm[1],
651                  element->data,
652                  element->size);
653   GNUNET_MQ_notify_sent (ev,
654                          &send_remaining_elements,
655                          op);
656   GNUNET_MQ_send (op->set->cs->mq,
657                   ev);
658 }
659
660
661 /**
662  * Fills the "my_elements" hashmap with the initial set of
663  * (non-deleted) elements from the set of the specification.
664  *
665  * @param cls closure with the `struct Operation *`
666  * @param key current key code for the element
667  * @param value value in the hash map with the `struct ElementEntry *`
668  * @return #GNUNET_YES (we should continue to iterate)
669  */
670 static int
671 initialize_map_unfiltered (void *cls,
672                            const struct GNUNET_HashCode *key,
673                            void *value)
674 {
675   struct ElementEntry *ee = value;
676   struct Operation *op = cls;
677
678   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
679     return GNUNET_YES; /* element not live in operation's generation */
680   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
681                           &ee->element_hash,
682                           &op->state->my_xor);
683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
684               "Initial full initialization of my_elements, adding %s:%u\n",
685               GNUNET_h2s (&ee->element_hash),
686               ee->element.size);
687   GNUNET_break (GNUNET_YES ==
688                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
689                                                    &ee->element_hash,
690                                                    ee,
691                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
692   return GNUNET_YES;
693 }
694
695
696 /**
697  * Send our element count to the peer, in case our element count is
698  * lower than theirs.
699  *
700  * @param op intersection operation
701  */
702 static void
703 send_element_count (struct Operation *op)
704 {
705   struct GNUNET_MQ_Envelope *ev;
706   struct IntersectionElementInfoMessage *msg;
707
708   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709               "Sending our element count (%u)\n",
710               op->state->my_element_count);
711   ev = GNUNET_MQ_msg (msg,
712                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
713   msg->sender_element_count = htonl (op->state->my_element_count);
714   GNUNET_MQ_send (op->mq, ev);
715 }
716
717
718 /**
719  * We go first, initialize our map with all elements and
720  * send the first Bloom filter.
721  *
722  * @param op operation to start exchange for
723  */
724 static void
725 begin_bf_exchange (struct Operation *op)
726 {
727   op->state->phase = PHASE_BF_EXCHANGE;
728   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
729                                          &initialize_map_unfiltered,
730                                          op);
731   send_bloomfilter (op);
732 }
733
734
735 /**
736  * Handle the initial `struct IntersectionElementInfoMessage` from a
737  * remote peer.
738  *
739  * @param cls the intersection operation
740  * @param mh the header of the message
741  */
742 void
743 handle_intersection_p2p_element_info (void *cls,
744                                       const struct IntersectionElementInfoMessage *msg)
745 {
746   struct Operation *op = cls;
747
748   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
749   {
750     GNUNET_break_op (0);
751     fail_intersection_operation(op);
752     return;
753   }
754   op->remote_element_count = ntohl (msg->sender_element_count);
755   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
756               "Received remote element count (%u), I have %u\n",
757               op->remote_element_count,
758               op->state->my_element_count);
759   if ( ( (PHASE_INITIAL != op->state->phase) &&
760          (PHASE_COUNT_SENT != op->state->phase) ) ||
761        (op->state->my_element_count > op->remote_element_count) ||
762        (0 == op->state->my_element_count) ||
763        (0 == op->remote_element_count) )
764   {
765     GNUNET_break_op (0);
766     fail_intersection_operation(op);
767     return;
768   }
769   GNUNET_break (NULL == op->state->remote_bf);
770   begin_bf_exchange (op);
771   GNUNET_CADET_receive_done (op->channel);
772 }
773
774
775 /**
776  * Process a Bloomfilter once we got all the chunks.
777  *
778  * @param op the intersection operation
779  */
780 static void
781 process_bf (struct Operation *op)
782 {
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
785               op->state->phase,
786               op->remote_element_count,
787               op->state->my_element_count,
788               GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
789   switch (op->state->phase)
790   {
791   case PHASE_INITIAL:
792     GNUNET_break_op (0);
793     fail_intersection_operation(op);
794     return;
795   case PHASE_COUNT_SENT:
796     /* This is the first BF being sent, build our initial map with
797        filtering in place */
798     op->state->my_element_count = 0;
799     GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
800                                            &filtered_map_initialization,
801                                            op);
802     break;
803   case PHASE_BF_EXCHANGE:
804     /* Update our set by reduction */
805     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
806                                            &iterator_bf_reduce,
807                                            op);
808     break;
809   case PHASE_MUST_SEND_DONE:
810     GNUNET_break_op (0);
811     fail_intersection_operation(op);
812     return;
813   case PHASE_DONE_RECEIVED:
814     GNUNET_break_op (0);
815     fail_intersection_operation(op);
816     return;
817   case PHASE_FINISHED:
818     GNUNET_break_op (0);
819     fail_intersection_operation(op);
820     return;
821   }
822   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
823   op->state->remote_bf = NULL;
824
825   if ( (0 == op->state->my_element_count) || /* fully disjoint */
826        ( (op->state->my_element_count == op->remote_element_count) &&
827          (0 == memcmp (&op->state->my_xor,
828                        &op->state->other_xor,
829                        sizeof (struct GNUNET_HashCode))) ) )
830   {
831     /* we are done */
832     op->state->phase = PHASE_MUST_SEND_DONE;
833     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834                 "Intersection succeeded, sending DONE to other peer\n");
835     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
836     op->state->local_bf = NULL;
837     if (GNUNET_SET_RESULT_FULL == op->result_mode)
838     {
839       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840                   "Sending full result set (%u elements)\n",
841                   GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
842       op->state->full_result_iter
843         = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
844       send_remaining_elements (op);
845       return;
846     }
847     send_p2p_done (op);
848     return;
849   }
850   op->state->phase = PHASE_BF_EXCHANGE;
851   send_bloomfilter (op);
852 }
853
854
855 /**
856  * Check an BF message from a remote peer.
857  *
858  * @param cls the intersection operation
859  * @param msg the header of the message
860  * @return #GNUNET_OK if @a msg is well-formed
861  */
862 int
863 check_intersection_p2p_bf (void *cls,
864                            const struct BFMessage *msg)
865 {
866   struct Operation *op = cls;
867
868   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
869   {
870     GNUNET_break_op (0);
871     return GNUNET_SYSERR;
872   }
873   return GNUNET_OK;
874 }
875
876
877 /**
878  * Handle an BF message from a remote peer.
879  *
880  * @param cls the intersection operation
881  * @param msg the header of the message
882  */
883 void
884 handle_intersection_p2p_bf (void *cls,
885                             const struct BFMessage *msg)
886 {
887   struct Operation *op = cls;
888   uint32_t bf_size;
889   uint32_t chunk_size;
890   uint32_t bf_bits_per_element;
891
892   switch (op->state->phase)
893   {
894   case PHASE_INITIAL:
895     GNUNET_break_op (0);
896     fail_intersection_operation (op);
897     return;
898   case PHASE_COUNT_SENT:
899   case PHASE_BF_EXCHANGE:
900     bf_size = ntohl (msg->bloomfilter_total_length);
901     bf_bits_per_element = ntohl (msg->bits_per_element);
902     chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
903     op->state->other_xor = msg->element_xor_hash;
904     if (bf_size == chunk_size)
905     {
906       if (NULL != op->state->bf_data)
907       {
908         GNUNET_break_op (0);
909         fail_intersection_operation (op);
910         return;
911       }
912       /* single part, done here immediately */
913       op->state->remote_bf
914         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
915                                              bf_size,
916                                              bf_bits_per_element);
917       op->state->salt = ntohl (msg->sender_mutator);
918       op->remote_element_count = ntohl (msg->sender_element_count);
919       process_bf (op);
920       break;
921     }
922     /* multipart chunk */
923     if (NULL == op->state->bf_data)
924     {
925       /* first chunk, initialize */
926       op->state->bf_data = GNUNET_malloc (bf_size);
927       op->state->bf_data_size = bf_size;
928       op->state->bf_bits_per_element = bf_bits_per_element;
929       op->state->bf_data_offset = 0;
930       op->state->salt = ntohl (msg->sender_mutator);
931       op->remote_element_count = ntohl (msg->sender_element_count);
932     }
933     else
934     {
935       /* increment */
936       if ( (op->state->bf_data_size != bf_size) ||
937            (op->state->bf_bits_per_element != bf_bits_per_element) ||
938            (op->state->bf_data_offset + chunk_size > bf_size) ||
939            (op->state->salt != ntohl (msg->sender_mutator)) ||
940            (op->remote_element_count != ntohl (msg->sender_element_count)) )
941       {
942         GNUNET_break_op (0);
943         fail_intersection_operation (op);
944         return;
945       }
946     }
947     GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
948             (const char*) &msg[1],
949             chunk_size);
950     op->state->bf_data_offset += chunk_size;
951     if (op->state->bf_data_offset == bf_size)
952     {
953       /* last chunk, run! */
954       op->state->remote_bf
955         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
956                                              bf_size,
957                                              bf_bits_per_element);
958       GNUNET_free (op->state->bf_data);
959       op->state->bf_data = NULL;
960       op->state->bf_data_size = 0;
961       process_bf (op);
962     }
963     break;
964   default:
965     GNUNET_break_op (0);
966     fail_intersection_operation (op);
967     return;
968   }
969   GNUNET_CADET_receive_done (op->channel);
970 }
971
972
973 /**
974  * Remove all elements from our hashmap.
975  *
976  * @param cls closure with the `struct Operation *`
977  * @param key current key code
978  * @param value value in the hash map
979  * @return #GNUNET_YES (we should continue to iterate)
980  */
981 static int
982 filter_all (void *cls,
983             const struct GNUNET_HashCode *key,
984             void *value)
985 {
986   struct Operation *op = cls;
987   struct ElementEntry *ee = value;
988
989   GNUNET_break (0 < op->state->my_element_count);
990   op->state->my_element_count--;
991   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
992                           &ee->element_hash,
993                           &op->state->my_xor);
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "Final reduction of my_elements, removing %s:%u\n",
996               GNUNET_h2s (&ee->element_hash),
997               ee->element.size);
998   GNUNET_assert (GNUNET_YES ==
999                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1000                                                        &ee->element_hash,
1001                                                        ee));
1002   send_client_removed_element (op,
1003                                &ee->element);
1004   return GNUNET_YES;
1005 }
1006
1007
1008 /**
1009  * Handle a done message from a remote peer
1010  *
1011  * @param cls the intersection operation
1012  * @param mh the message
1013  */
1014 void
1015 handle_intersection_p2p_done (void *cls,
1016                               const struct IntersectionDoneMessage *idm)
1017 {
1018   struct Operation *op = cls;
1019
1020   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1021   {
1022     GNUNET_break_op (0);
1023     fail_intersection_operation (op);
1024     return;
1025   }
1026   if (PHASE_BF_EXCHANGE != op->state->phase)
1027   {
1028     /* wrong phase to conclude? FIXME: Or should we allow this
1029        if the other peer has _initially_ already an empty set? */
1030     GNUNET_break_op (0);
1031     fail_intersection_operation (op);
1032     return;
1033   }
1034   if (0 == ntohl (idm->final_element_count))
1035   {
1036     /* other peer determined empty set is the intersection,
1037        remove all elements */
1038     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1039                                            &filter_all,
1040                                            op);
1041   }
1042   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
1043        (0 != memcmp (&op->state->my_xor,
1044                      &idm->element_xor_hash,
1045                      sizeof (struct GNUNET_HashCode))) )
1046   {
1047     /* Other peer thinks we are done, but we disagree on the result! */
1048     GNUNET_break_op (0);
1049     fail_intersection_operation (op);
1050     return;
1051   }
1052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1053               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1054               op->state->my_element_count);
1055   op->state->phase = PHASE_DONE_RECEIVED;
1056   GNUNET_CADET_receive_done (op->channel);
1057
1058   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1059   if (GNUNET_SET_RESULT_FULL == op->result_mode)
1060   {
1061     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062                 "Sending full result set to client (%u elements)\n",
1063                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1064     op->state->full_result_iter
1065       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1066     send_remaining_elements (op);
1067     return;
1068   }
1069   op->state->phase = PHASE_FINISHED;
1070   send_client_done_and_destroy (op);
1071 }
1072
1073
1074 /**
1075  * Initiate a set intersection operation with a remote peer.
1076  *
1077  * @param op operation that is created, should be initialized to
1078  *        begin the evaluation
1079  * @param opaque_context message to be transmitted to the listener
1080  *        to convince it to accept, may be NULL
1081  * @return operation-specific state to keep in @a op
1082  */
1083 static struct OperationState *
1084 intersection_evaluate (struct Operation *op,
1085                        const struct GNUNET_MessageHeader *opaque_context)
1086 {
1087   struct OperationState *state;
1088   struct GNUNET_MQ_Envelope *ev;
1089   struct OperationRequestMessage *msg;
1090
1091   ev = GNUNET_MQ_msg_nested_mh (msg,
1092                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1093                                 opaque_context);
1094   if (NULL == ev)
1095   {
1096     /* the context message is too large!? */
1097     GNUNET_break (0);
1098     return NULL;
1099   }
1100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101               "Initiating intersection operation evaluation\n");
1102   state = GNUNET_new (struct OperationState);
1103   /* we started the operation, thus we have to send the operation request */
1104   state->phase = PHASE_INITIAL;
1105   state->my_element_count = op->set->state->current_set_element_count;
1106   state->my_elements
1107     = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1108                                             GNUNET_YES);
1109
1110   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1111   msg->element_count = htonl (state->my_element_count);
1112   GNUNET_MQ_send (op->mq,
1113                   ev);
1114   state->phase = PHASE_COUNT_SENT;
1115   if (NULL != opaque_context)
1116     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1117                 "Sent op request with context message\n");
1118   else
1119     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1120                 "Sent op request without context message\n");
1121   return state;
1122 }
1123
1124
1125 /**
1126  * Accept an intersection operation request from a remote peer.  Only
1127  * initializes the private operation state.
1128  *
1129  * @param op operation that will be accepted as an intersection operation
1130  */
1131 static struct OperationState *
1132 intersection_accept (struct Operation *op)
1133 {
1134   struct OperationState *state;
1135
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137               "Accepting set intersection operation\n");
1138   state = GNUNET_new (struct OperationState);
1139   state->phase = PHASE_INITIAL;
1140   state->my_element_count
1141     = op->set->state->current_set_element_count;
1142   state->my_elements
1143     = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1144                                                         op->remote_element_count),
1145                                             GNUNET_YES);
1146   op->state = state;
1147   if (op->remote_element_count < state->my_element_count)
1148   {
1149     /* If the other peer (Alice) has fewer elements than us (Bob),
1150        we just send the count as Alice should send the first BF */
1151     send_element_count (op);
1152     state->phase = PHASE_COUNT_SENT;
1153     return state;
1154   }
1155   /* We have fewer elements, so we start with the BF */
1156   begin_bf_exchange (op);
1157   return state;
1158 }
1159
1160
1161 /**
1162  * Destroy the intersection operation.  Only things specific to the
1163  * intersection operation are destroyed.
1164  *
1165  * @param op intersection operation to destroy
1166  */
1167 static void
1168 intersection_op_cancel (struct Operation *op)
1169 {
1170   /* check if the op was canceled twice */
1171   GNUNET_assert (NULL != op->state);
1172   if (NULL != op->state->remote_bf)
1173   {
1174     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1175     op->state->remote_bf = NULL;
1176   }
1177   if (NULL != op->state->local_bf)
1178   {
1179     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1180     op->state->local_bf = NULL;
1181   }
1182   if (NULL != op->state->my_elements)
1183   {
1184     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1185     op->state->my_elements = NULL;
1186   }
1187   if (NULL != op->state->full_result_iter)
1188   {
1189     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1190     op->state->full_result_iter = NULL;
1191   }
1192   GNUNET_free (op->state);
1193   op->state = NULL;
1194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195               "Destroying intersection op state done\n");
1196 }
1197
1198
1199 /**
1200  * Create a new set supporting the intersection operation.
1201  *
1202  * @return the newly created set
1203  */
1204 static struct SetState *
1205 intersection_set_create ()
1206 {
1207   struct SetState *set_state;
1208
1209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210               "Intersection set created\n");
1211   set_state = GNUNET_new (struct SetState);
1212   set_state->current_set_element_count = 0;
1213
1214   return set_state;
1215 }
1216
1217
1218 /**
1219  * Add the element from the given element message to the set.
1220  *
1221  * @param set_state state of the set want to add to
1222  * @param ee the element to add to the set
1223  */
1224 static void
1225 intersection_add (struct SetState *set_state,
1226                   struct ElementEntry *ee)
1227 {
1228   set_state->current_set_element_count++;
1229 }
1230
1231
1232 /**
1233  * Destroy a set that supports the intersection operation
1234  *
1235  * @param set_state the set to destroy
1236  */
1237 static void
1238 intersection_set_destroy (struct SetState *set_state)
1239 {
1240   GNUNET_free (set_state);
1241 }
1242
1243
1244 /**
1245  * Remove the element given in the element message from the set.
1246  *
1247  * @param set_state state of the set to remove from
1248  * @param element set element to remove
1249  */
1250 static void
1251 intersection_remove (struct SetState *set_state,
1252                      struct ElementEntry *element)
1253 {
1254   GNUNET_assert (0 < set_state->current_set_element_count);
1255   set_state->current_set_element_count--;
1256 }
1257
1258
1259 /**
1260  * Callback for channel death for the intersection operation.
1261  *
1262  * @param op operation that lost the channel
1263  */
1264 static void
1265 intersection_channel_death (struct Operation *op)
1266 {
1267   if (GNUNET_YES == op->state->channel_death_expected)
1268   {
1269     /* oh goodie, we are done! */
1270     send_client_done_and_destroy (op);
1271   }
1272   else
1273   {
1274     /* sorry, channel went down early, too bad. */
1275     _GSS_operation_destroy (op,
1276                             GNUNET_YES);
1277   }
1278 }
1279
1280
1281 /**
1282  * Get the table with implementing functions for set intersection.
1283  *
1284  * @return the operation specific VTable
1285  */
1286 const struct SetVT *
1287 _GSS_intersection_vt ()
1288 {
1289   static const struct SetVT intersection_vt = {
1290     .create = &intersection_set_create,
1291     .add = &intersection_add,
1292     .remove = &intersection_remove,
1293     .destroy_set = &intersection_set_destroy,
1294     .evaluate = &intersection_evaluate,
1295     .accept = &intersection_accept,
1296     .cancel = &intersection_op_cancel,
1297     .channel_death = &intersection_channel_death,
1298   };
1299
1300   return &intersection_vt;
1301 }