+
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_intersection.c
22  * @brief two-peer set intersection
23  * @author Christian Fuchs
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet-service-set_protocol.h"
31 #include "gnunet-service-set_intersection.h"
32 #include <gcrypt.h>
33
34
35 /**
36  * Current phase we are in for a intersection operation.
37  */
38 enum IntersectionOperationPhase
39 {
40   /**
41    * We are just starting.
42    */
43   PHASE_INITIAL,
44
45   /**
46    * We have send the number of our elements to the other
47    * peer, but did not setup our element set yet.
48    */
49   PHASE_COUNT_SENT,
50
51   /**
52    * We have initialized our set and are now reducing it by exchanging
53    * Bloom filters until one party notices the their element hashes
54    * are equal.
55    */
56   PHASE_BF_EXCHANGE,
57
58   /**
59    * We must next send the P2P DONE message (after finishing mostly
60    * with the local client).  Then we will wait for the channel to close.
61    */
62   PHASE_MUST_SEND_DONE,
63
64   /**
65    * We have received the P2P DONE message, and must finish with the
66    * local client before terminating the channel.
67    */
68   PHASE_DONE_RECEIVED,
69
70   /**
71    * The protocol is over.  Results may still have to be sent to the
72    * client.
73    */
74   PHASE_FINISHED
75
76 };
77
78
79 /**
80  * State of an evaluate operation with another peer.
81  */
82 struct OperationState
83 {
84   /**
85    * The bf we currently receive
86    */
87   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
88
89   /**
90    * BF of the set's element.
91    */
92   struct GNUNET_CONTAINER_BloomFilter *local_bf;
93
94   /**
95    * Remaining elements in the intersection operation.
96    * Maps element-id-hashes to 'elements in our set'.
97    */
98   struct GNUNET_CONTAINER_MultiHashMap *my_elements;
99
100   /**
101    * Iterator for sending the final set of @e my_elements to the client.
102    */
103   struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
104
105   /**
106    * Evaluate operations are held in a linked list.
107    */
108   struct OperationState *next;
109
110   /**
111    * Evaluate operations are held in a linked list.
112    */
113   struct OperationState *prev;
114
115   /**
116    * For multipart BF transmissions, we have to store the
117    * bloomfilter-data until we fully received it.
118    */
119   char *bf_data;
120
121   /**
122    * XOR of the keys of all of the elements (remaining) in my set.
123    * Always updated when elements are added or removed to
124    * @e my_elements.
125    */
126   struct GNUNET_HashCode my_xor;
127
128   /**
129    * XOR of the keys of all of the elements (remaining) in
130    * the other peer's set.  Updated when we receive the
131    * other peer's Bloom filter.
132    */
133   struct GNUNET_HashCode other_xor;
134
135   /**
136    * How many bytes of @e bf_data are valid?
137    */
138   uint32_t bf_data_offset;
139
140   /**
141    * Current element count contained within @e my_elements.
142    * (May differ briefly during initialization.)
143    */
144   uint32_t my_element_count;
145
146   /**
147    * size of the bloomfilter in @e bf_data.
148    */
149   uint32_t bf_data_size;
150
151   /**
152    * size of the bloomfilter
153    */
154   uint32_t bf_bits_per_element;
155
156   /**
157    * Salt currently used for BF construction (by us or the other peer,
158    * depending on where we are in the code).
159    */
160   uint32_t salt;
161
162   /**
163    * Current state of the operation.
164    */
165   enum IntersectionOperationPhase phase;
166
167   /**
168    * Generation in which the operation handle
169    * was created.
170    */
171   unsigned int generation_created;
172
173   /**
174    * Did we send the client that we are done?
175    */
176   int client_done_sent;
177
178   /**
179    * Set whenever we reach the state where the death of the
180    * channel is perfectly find and should NOT result in the
181    * operation being cancelled.
182    */
183   int channel_death_expected;
184 };
185
186
187 /**
188  * Extra state required for efficient set intersection.
189  * Merely tracks the total number of elements.
190  */
191 struct SetState
192 {
193   /**
194    * Number of currently valid elements in the set which have not been
195    * removed.
196    */
197   uint32_t current_set_element_count;
198 };
199
200
201 /**
202  * If applicable in the current operation mode, send a result message
203  * to the client indicating we removed an element.
204  *
205  * @param op intersection operation
206  * @param element element to send
207  */
208 static void
209 send_client_removed_element (struct Operation *op,
210                              struct GNUNET_SET_Element *element)
211 {
212   struct GNUNET_MQ_Envelope *ev;
213   struct GNUNET_SET_ResultMessage *rm;
214
215   if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
216     return; /* Wrong mode for transmitting removed elements */
217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
218               "Sending removed element (size %u) to client\n",
219               element->size);
220   GNUNET_assert (0 != op->client_request_id);
221   ev = GNUNET_MQ_msg_extra (rm,
222                             element->size,
223                             GNUNET_MESSAGE_TYPE_SET_RESULT);
224   if (NULL == ev)
225   {
226     GNUNET_break (0);
227     return;
228   }
229   rm->result_status = htons (GNUNET_SET_STATUS_OK);
230   rm->request_id = htonl (op->client_request_id);
231   rm->element_type = element->element_type;
232   GNUNET_memcpy (&rm[1],
233                  element->data,
234                  element->size);
235   GNUNET_MQ_send (op->set->cs->mq,
236                   ev);
237 }
238
239
240 /**
241  * Fills the "my_elements" hashmap with all relevant elements.
242  *
243  * @param cls the `struct Operation *` we are performing
244  * @param key current key code
245  * @param value the `struct ElementEntry *` from the hash map
246  * @return #GNUNET_YES (we should continue to iterate)
247  */
248 static int
249 filtered_map_initialization (void *cls,
250                              const struct GNUNET_HashCode *key,
251                              void *value)
252 {
253   struct Operation *op = cls;
254   struct ElementEntry *ee = value;
255   struct GNUNET_HashCode mutated_hash;
256
257
258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
259               "FIMA called for %s:%u\n",
260               GNUNET_h2s (&ee->element_hash),
261               ee->element.size);
262
263   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
264   {
265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266                 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
267                 GNUNET_h2s (&ee->element_hash),
268                 ee->element.size);
269     return GNUNET_YES; /* element not valid in our operation's generation */
270   }
271
272   /* Test if element is in other peer's bloomfilter */
273   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
274                             op->state->salt,
275                             &mutated_hash);
276   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
277               "Testing mingled hash %s with salt %u\n",
278               GNUNET_h2s (&mutated_hash),
279               op->state->salt);
280   if (GNUNET_NO ==
281       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
282                                          &mutated_hash))
283   {
284     /* remove this element */
285     send_client_removed_element (op,
286                                  &ee->element);
287     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288                 "Reduced initialization, not starting with %s:%u\n",
289                 GNUNET_h2s (&ee->element_hash),
290                 ee->element.size);
291     return GNUNET_YES;
292   }
293   op->state->my_element_count++;
294   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
295                           &ee->element_hash,
296                           &op->state->my_xor);
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298               "Filtered initialization of my_elements, adding %s:%u\n",
299               GNUNET_h2s (&ee->element_hash),
300               ee->element.size);
301   GNUNET_break (GNUNET_YES ==
302                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
303                                                    &ee->element_hash,
304                                                    ee,
305                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
306
307   return GNUNET_YES;
308 }
309
310
311 /**
312  * Removes elements from our hashmap if they are not contained within the
313  * provided remote bloomfilter.
314  *
315  * @param cls closure with the `struct Operation *`
316  * @param key current key code
317  * @param value value in the hash map
318  * @return #GNUNET_YES (we should continue to iterate)
319  */
320 static int
321 iterator_bf_reduce (void *cls,
322                    const struct GNUNET_HashCode *key,
323                    void *value)
324 {
325   struct Operation *op = cls;
326   struct ElementEntry *ee = value;
327   struct GNUNET_HashCode mutated_hash;
328
329   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
330                             op->state->salt,
331                             &mutated_hash);
332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
333               "Testing mingled hash %s with salt %u\n",
334               GNUNET_h2s (&mutated_hash),
335               op->state->salt);
336   if (GNUNET_NO ==
337       GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
338                                          &mutated_hash))
339   {
340     GNUNET_break (0 < op->state->my_element_count);
341     op->state->my_element_count--;
342     GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
343                             &ee->element_hash,
344                             &op->state->my_xor);
345     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346                 "Bloom filter reduction of my_elements, removing %s:%u\n",
347                 GNUNET_h2s (&ee->element_hash),
348                 ee->element.size);
349     GNUNET_assert (GNUNET_YES ==
350                    GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
351                                                          &ee->element_hash,
352                                                          ee));
353     send_client_removed_element (op,
354                                  &ee->element);
355   }
356   else
357   {
358     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359                 "Bloom filter reduction of my_elements, keeping %s:%u\n",
360                 GNUNET_h2s (&ee->element_hash),
361                 ee->element.size);
362   }
363   return GNUNET_YES;
364 }
365
366
367 /**
368  * Create initial bloomfilter based on all the elements given.
369  *
370  * @param cls the `struct Operation *`
371  * @param key current key code
372  * @param value the `struct ElementEntry` to process
373  * @return #GNUNET_YES (we should continue to iterate)
374  */
375 static int
376 iterator_bf_create (void *cls,
377                     const struct GNUNET_HashCode *key,
378                     void *value)
379 {
380   struct Operation *op = cls;
381   struct ElementEntry *ee = value;
382   struct GNUNET_HashCode mutated_hash;
383
384   GNUNET_BLOCK_mingle_hash (&ee->element_hash,
385                             op->state->salt,
386                             &mutated_hash);
387   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
388               "Initializing BF with hash %s with salt %u\n",
389               GNUNET_h2s (&mutated_hash),
390               op->state->salt);
391   GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
392                                     &mutated_hash);
393   return GNUNET_YES;
394 }
395
396
397 /**
398  * Inform the client that the intersection operation has failed,
399  * and proceed to destroy the evaluate operation.
400  *
401  * @param op the intersection operation to fail
402  */
403 static void
404 fail_intersection_operation (struct Operation *op)
405 {
406   struct GNUNET_MQ_Envelope *ev;
407   struct GNUNET_SET_ResultMessage *msg;
408
409   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
410               "Intersection operation failed\n");
411   if (NULL != op->state->my_elements)
412   {
413     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
414     op->state->my_elements = NULL;
415   }
416   ev = GNUNET_MQ_msg (msg,
417                       GNUNET_MESSAGE_TYPE_SET_RESULT);
418   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
419   msg->request_id = htonl (op->client_request_id);
420   msg->element_type = htons (0);
421   GNUNET_MQ_send (op->set->cs->mq,
422                   ev);
423   _GSS_operation_destroy (op,
424                           GNUNET_YES);
425 }
426
427
428 /**
429  * Send a bloomfilter to our peer.  After the result done message has
430  * been sent to the client, destroy the evaluate operation.
431  *
432  * @param op intersection operation
433  */
434 static void
435 send_bloomfilter (struct Operation *op)
436 {
437   struct GNUNET_MQ_Envelope *ev;
438   struct BFMessage *msg;
439   uint32_t bf_size;
440   uint32_t bf_elementbits;
441   uint32_t chunk_size;
442   char *bf_data;
443   uint32_t offset;
444
445   /* We consider the ratio of the set sizes to determine
446      the number of bits per element, as the smaller set
447      should use more bits to maximize its set reduction
448      potential and minimize overall bandwidth consumption. */
449   bf_elementbits = 2 + ceil (log2((double)
450                              (op->remote_element_count /
451                               (double) op->state->my_element_count)));
452   if (bf_elementbits < 1)
453     bf_elementbits = 1; /* make sure k is not 0 */
454   /* optimize BF-size to ~50% of bits set */
455   bf_size = ceil ((double) (op->state->my_element_count
456                             * bf_elementbits / log(2)));
457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458               "Sending Bloom filter (%u) of size %u bytes\n",
459               (unsigned int) bf_elementbits,
460               (unsigned int) bf_size);
461   op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
462                                                            bf_size,
463                                                            bf_elementbits);
464   op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
465                                               UINT32_MAX);
466   GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
467                                          &iterator_bf_create,
468                                          op);
469
470   /* send our Bloom filter */
471   chunk_size = 60 * 1024 - sizeof (struct BFMessage);
472   if (bf_size <= chunk_size)
473   {
474     /* singlepart */
475     chunk_size = bf_size;
476     ev = GNUNET_MQ_msg_extra (msg,
477                               chunk_size,
478                               GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
479     GNUNET_assert (GNUNET_SYSERR !=
480                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
481                                                               (char*) &msg[1],
482                                                               bf_size));
483     msg->sender_element_count = htonl (op->state->my_element_count);
484     msg->bloomfilter_total_length = htonl (bf_size);
485     msg->bits_per_element = htonl (bf_elementbits);
486     msg->sender_mutator = htonl (op->state->salt);
487     msg->element_xor_hash = op->state->my_xor;
488     GNUNET_MQ_send (op->mq, ev);
489   }
490   else
491   {
492     /* multipart */
493     bf_data = GNUNET_malloc (bf_size);
494     GNUNET_assert (GNUNET_SYSERR !=
495                    GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
496                                                               bf_data,
497                                                               bf_size));
498     offset = 0;
499     while (offset < bf_size)
500     {
501       if (bf_size - chunk_size < offset)
502         chunk_size = bf_size - offset;
503       ev = GNUNET_MQ_msg_extra (msg,
504                                 chunk_size,
505                                 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
506       GNUNET_memcpy (&msg[1],
507               &bf_data[offset],
508               chunk_size);
509       offset += chunk_size;
510       msg->sender_element_count = htonl (op->state->my_element_count);
511       msg->bloomfilter_total_length = htonl (bf_size);
512       msg->bits_per_element = htonl (bf_elementbits);
513       msg->sender_mutator = htonl (op->state->salt);
514       msg->element_xor_hash = op->state->my_xor;
515       GNUNET_MQ_send (op->mq, ev);
516     }
517     GNUNET_free (bf_data);
518   }
519   GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
520   op->state->local_bf = NULL;
521 }
522
523
524 /**
525  * Signal to the client that the operation has finished and
526  * destroy the operation.
527  *
528  * @param cls operation to destroy
529  */
530 static void
531 send_client_done_and_destroy (void *cls)
532 {
533   struct Operation *op = cls;
534   struct GNUNET_MQ_Envelope *ev;
535   struct GNUNET_SET_ResultMessage *rm;
536
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538               "Intersection succeeded, sending DONE to local client\n");
539   ev = GNUNET_MQ_msg (rm,
540                       GNUNET_MESSAGE_TYPE_SET_RESULT);
541   rm->request_id = htonl (op->client_request_id);
542   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
543   rm->element_type = htons (0);
544   GNUNET_MQ_send (op->set->cs->mq,
545                   ev);
546   _GSS_operation_destroy (op,
547                           GNUNET_YES);
548 }
549
550
551 /**
552  * Remember that we are done dealing with the local client
553  * AND have sent the other peer our message that we are done,
554  * so we are not just waiting for the channel to die before
555  * telling the local client that we are done as our last act.
556  *
557  * @param cls the `struct Operation`.
558  */
559 static void
560 finished_local_operations (void *cls)
561 {
562   struct Operation *op = cls;
563
564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565               "DONE sent to other peer, now waiting for other end to close the channel\n");
566   op->state->phase = PHASE_FINISHED;
567   op->state->channel_death_expected = GNUNET_YES;
568 }
569
570
571 /**
572  * Notify the other peer that we are done.  Once this message
573  * is out, we still need to notify the local client that we
574  * are done.
575  *
576  * @param op operation to notify for.
577  */
578 static void
579 send_p2p_done (struct Operation *op)
580 {
581   struct GNUNET_MQ_Envelope *ev;
582   struct IntersectionDoneMessage *idm;
583
584   GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
585   GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
586   ev = GNUNET_MQ_msg (idm,
587                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
588   idm->final_element_count = htonl (op->state->my_element_count);
589   idm->element_xor_hash = op->state->my_xor;
590   GNUNET_MQ_notify_sent (ev,
591                          &finished_local_operations,
592                          op);
593   GNUNET_MQ_send (op->mq,
594                   ev);
595 }
596
597
598 /**
599  * Send all elements in the full result iterator.
600  *
601  * @param cls the `struct Operation *`
602  */
603 static void
604 send_remaining_elements (void *cls)
605 {
606   struct Operation *op = cls;
607   const void *nxt;
608   const struct ElementEntry *ee;
609   struct GNUNET_MQ_Envelope *ev;
610   struct GNUNET_SET_ResultMessage *rm;
611   const struct GNUNET_SET_Element *element;
612   int res;
613
614   res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
615                                                      NULL,
616                                                      &nxt);
617   if (GNUNET_NO == res)
618   {
619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620                 "Sending done and destroy because iterator ran out\n");
621     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
622     op->state->full_result_iter = NULL;
623     if (PHASE_DONE_RECEIVED == op->state->phase)
624     {
625       op->state->phase = PHASE_FINISHED;
626       send_client_done_and_destroy (op);
627     }
628     else if (PHASE_MUST_SEND_DONE == op->state->phase)
629     {
630       send_p2p_done (op);
631     }
632     else
633     {
634       GNUNET_assert (0);
635     }
636     return;
637   }
638   ee = nxt;
639   element = &ee->element;
640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
641               "Sending element %s:%u to client (full set)\n",
642               GNUNET_h2s (&ee->element_hash),
643               element->size);
644   GNUNET_assert (0 != op->client_request_id);
645   ev = GNUNET_MQ_msg_extra (rm,
646                             element->size,
647                             GNUNET_MESSAGE_TYPE_SET_RESULT);
648   GNUNET_assert (NULL != ev);
649   rm->result_status = htons (GNUNET_SET_STATUS_OK);
650   rm->request_id = htonl (op->client_request_id);
651   rm->element_type = element->element_type;
652   GNUNET_memcpy (&rm[1],
653                  element->data,
654                  element->size);
655   GNUNET_MQ_notify_sent (ev,
656                          &send_remaining_elements,
657                          op);
658   GNUNET_MQ_send (op->set->cs->mq,
659                   ev);
660 }
661
662
663 /**
664  * Fills the "my_elements" hashmap with the initial set of
665  * (non-deleted) elements from the set of the specification.
666  *
667  * @param cls closure with the `struct Operation *`
668  * @param key current key code for the element
669  * @param value value in the hash map with the `struct ElementEntry *`
670  * @return #GNUNET_YES (we should continue to iterate)
671  */
672 static int
673 initialize_map_unfiltered (void *cls,
674                            const struct GNUNET_HashCode *key,
675                            void *value)
676 {
677   struct ElementEntry *ee = value;
678   struct Operation *op = cls;
679
680   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
681     return GNUNET_YES; /* element not live in operation's generation */
682   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
683                           &ee->element_hash,
684                           &op->state->my_xor);
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "Initial full initialization of my_elements, adding %s:%u\n",
687               GNUNET_h2s (&ee->element_hash),
688               ee->element.size);
689   GNUNET_break (GNUNET_YES ==
690                 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
691                                                    &ee->element_hash,
692                                                    ee,
693                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
694   return GNUNET_YES;
695 }
696
697
698 /**
699  * Send our element count to the peer, in case our element count is
700  * lower than his.
701  *
702  * @param op intersection operation
703  */
704 static void
705 send_element_count (struct Operation *op)
706 {
707   struct GNUNET_MQ_Envelope *ev;
708   struct IntersectionElementInfoMessage *msg;
709
710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
711               "Sending our element count (%u)\n",
712               op->state->my_element_count);
713   ev = GNUNET_MQ_msg (msg,
714                       GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
715   msg->sender_element_count = htonl (op->state->my_element_count);
716   GNUNET_MQ_send (op->mq, ev);
717 }
718
719
720 /**
721  * We go first, initialize our map with all elements and
722  * send the first Bloom filter.
723  *
724  * @param op operation to start exchange for
725  */
726 static void
727 begin_bf_exchange (struct Operation *op)
728 {
729   op->state->phase = PHASE_BF_EXCHANGE;
730   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
731                                          &initialize_map_unfiltered,
732                                          op);
733   send_bloomfilter (op);
734 }
735
736
737 /**
738  * Handle the initial `struct IntersectionElementInfoMessage` from a
739  * remote peer.
740  *
741  * @param cls the intersection operation
742  * @param mh the header of the message
743  */
744 void
745 handle_intersection_p2p_element_info (void *cls,
746                                       const struct IntersectionElementInfoMessage *msg)
747 {
748   struct Operation *op = cls;
749
750   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
751   {
752     GNUNET_break_op (0);
753     fail_intersection_operation(op);
754     return;
755   }
756   op->remote_element_count = ntohl (msg->sender_element_count);
757   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758               "Received remote element count (%u), I have %u\n",
759               op->remote_element_count,
760               op->state->my_element_count);
761   if ( ( (PHASE_INITIAL != op->state->phase) &&
762          (PHASE_COUNT_SENT != op->state->phase) ) ||
763        (op->state->my_element_count > op->remote_element_count) ||
764        (0 == op->state->my_element_count) ||
765        (0 == op->remote_element_count) )
766   {
767     GNUNET_break_op (0);
768     fail_intersection_operation(op);
769     return;
770   }
771   GNUNET_break (NULL == op->state->remote_bf);
772   begin_bf_exchange (op);
773   GNUNET_CADET_receive_done (op->channel);
774 }
775
776
777 /**
778  * Process a Bloomfilter once we got all the chunks.
779  *
780  * @param op the intersection operation
781  */
782 static void
783 process_bf (struct Operation *op)
784 {
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786               "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
787               op->state->phase,
788               op->remote_element_count,
789               op->state->my_element_count,
790               GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
791   switch (op->state->phase)
792   {
793   case PHASE_INITIAL:
794     GNUNET_break_op (0);
795     fail_intersection_operation(op);
796     return;
797   case PHASE_COUNT_SENT:
798     /* This is the first BF being sent, build our initial map with
799        filtering in place */
800     op->state->my_element_count = 0;
801     GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
802                                            &filtered_map_initialization,
803                                            op);
804     break;
805   case PHASE_BF_EXCHANGE:
806     /* Update our set by reduction */
807     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
808                                            &iterator_bf_reduce,
809                                            op);
810     break;
811   case PHASE_MUST_SEND_DONE:
812     GNUNET_break_op (0);
813     fail_intersection_operation(op);
814     return;
815   case PHASE_DONE_RECEIVED:
816     GNUNET_break_op (0);
817     fail_intersection_operation(op);
818     return;
819   case PHASE_FINISHED:
820     GNUNET_break_op (0);
821     fail_intersection_operation(op);
822     return;
823   }
824   GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
825   op->state->remote_bf = NULL;
826
827   if ( (0 == op->state->my_element_count) || /* fully disjoint */
828        ( (op->state->my_element_count == op->remote_element_count) &&
829          (0 == memcmp (&op->state->my_xor,
830                        &op->state->other_xor,
831                        sizeof (struct GNUNET_HashCode))) ) )
832   {
833     /* we are done */
834     op->state->phase = PHASE_MUST_SEND_DONE;
835     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                 "Intersection succeeded, sending DONE to other peer\n");
837     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
838     op->state->local_bf = NULL;
839     if (GNUNET_SET_RESULT_FULL == op->result_mode)
840     {
841       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842                   "Sending full result set (%u elements)\n",
843                   GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
844       op->state->full_result_iter
845         = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
846       send_remaining_elements (op);
847       return;
848     }
849     send_p2p_done (op);
850     return;
851   }
852   op->state->phase = PHASE_BF_EXCHANGE;
853   send_bloomfilter (op);
854 }
855
856
857 /**
858  * Check an BF message from a remote peer.
859  *
860  * @param cls the intersection operation
861  * @param msg the header of the message
862  * @return #GNUNET_OK if @a msg is well-formed
863  */
864 int
865 check_intersection_p2p_bf (void *cls,
866                            const struct BFMessage *msg)
867 {
868   struct Operation *op = cls;
869
870   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
871   {
872     GNUNET_break_op (0);
873     return GNUNET_SYSERR;
874   }
875   return GNUNET_OK;
876 }
877
878
879 /**
880  * Handle an BF message from a remote peer.
881  *
882  * @param cls the intersection operation
883  * @param msg the header of the message
884  */
885 void
886 handle_intersection_p2p_bf (void *cls,
887                             const struct BFMessage *msg)
888 {
889   struct Operation *op = cls;
890   uint32_t bf_size;
891   uint32_t chunk_size;
892   uint32_t bf_bits_per_element;
893
894   switch (op->state->phase)
895   {
896   case PHASE_INITIAL:
897     GNUNET_break_op (0);
898     fail_intersection_operation (op);
899     return;
900   case PHASE_COUNT_SENT:
901   case PHASE_BF_EXCHANGE:
902     bf_size = ntohl (msg->bloomfilter_total_length);
903     bf_bits_per_element = ntohl (msg->bits_per_element);
904     chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
905     op->state->other_xor = msg->element_xor_hash;
906     if (bf_size == chunk_size)
907     {
908       if (NULL != op->state->bf_data)
909       {
910         GNUNET_break_op (0);
911         fail_intersection_operation (op);
912         return;
913       }
914       /* single part, done here immediately */
915       op->state->remote_bf
916         = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
917                                              bf_size,
918                                              bf_bits_per_element);
919       op->state->salt = ntohl (msg->sender_mutator);
920       op->remote_element_count = ntohl (msg->sender_element_count);
921       process_bf (op);
922       break;
923     }
924     /* multipart chunk */
925     if (NULL == op->state->bf_data)
926     {
927       /* first chunk, initialize */
928       op->state->bf_data = GNUNET_malloc (bf_size);
929       op->state->bf_data_size = bf_size;
930       op->state->bf_bits_per_element = bf_bits_per_element;
931       op->state->bf_data_offset = 0;
932       op->state->salt = ntohl (msg->sender_mutator);
933       op->remote_element_count = ntohl (msg->sender_element_count);
934     }
935     else
936     {
937       /* increment */
938       if ( (op->state->bf_data_size != bf_size) ||
939            (op->state->bf_bits_per_element != bf_bits_per_element) ||
940            (op->state->bf_data_offset + chunk_size > bf_size) ||
941            (op->state->salt != ntohl (msg->sender_mutator)) ||
942            (op->remote_element_count != ntohl (msg->sender_element_count)) )
943       {
944         GNUNET_break_op (0);
945         fail_intersection_operation (op);
946         return;
947       }
948     }
949     GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
950             (const char*) &msg[1],
951             chunk_size);
952     op->state->bf_data_offset += chunk_size;
953     if (op->state->bf_data_offset == bf_size)
954     {
955       /* last chunk, run! */
956       op->state->remote_bf
957         = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
958                                              bf_size,
959                                              bf_bits_per_element);
960       GNUNET_free (op->state->bf_data);
961       op->state->bf_data = NULL;
962       op->state->bf_data_size = 0;
963       process_bf (op);
964     }
965     break;
966   default:
967     GNUNET_break_op (0);
968     fail_intersection_operation (op);
969     return;
970   }
971   GNUNET_CADET_receive_done (op->channel);
972 }
973
974
975 /**
976  * Remove all elements from our hashmap.
977  *
978  * @param cls closure with the `struct Operation *`
979  * @param key current key code
980  * @param value value in the hash map
981  * @return #GNUNET_YES (we should continue to iterate)
982  */
983 static int
984 filter_all (void *cls,
985             const struct GNUNET_HashCode *key,
986             void *value)
987 {
988   struct Operation *op = cls;
989   struct ElementEntry *ee = value;
990
991   GNUNET_break (0 < op->state->my_element_count);
992   op->state->my_element_count--;
993   GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
994                           &ee->element_hash,
995                           &op->state->my_xor);
996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997               "Final reduction of my_elements, removing %s:%u\n",
998               GNUNET_h2s (&ee->element_hash),
999               ee->element.size);
1000   GNUNET_assert (GNUNET_YES ==
1001                  GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1002                                                        &ee->element_hash,
1003                                                        ee));
1004   send_client_removed_element (op,
1005                                &ee->element);
1006   return GNUNET_YES;
1007 }
1008
1009
1010 /**
1011  * Handle a done message from a remote peer
1012  *
1013  * @param cls the intersection operation
1014  * @param mh the message
1015  */
1016 void
1017 handle_intersection_p2p_done (void *cls,
1018                               const struct IntersectionDoneMessage *idm)
1019 {
1020   struct Operation *op = cls;
1021
1022   if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1023   {
1024     GNUNET_break_op (0);
1025     fail_intersection_operation (op);
1026     return;
1027   }
1028   if (PHASE_BF_EXCHANGE != op->state->phase)
1029   {
1030     /* wrong phase to conclude? FIXME: Or should we allow this
1031        if the other peer has _initially_ already an empty set? */
1032     GNUNET_break_op (0);
1033     fail_intersection_operation (op);
1034     return;
1035   }
1036   if (0 == ntohl (idm->final_element_count))
1037   {
1038     /* other peer determined empty set is the intersection,
1039        remove all elements */
1040     GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1041                                            &filter_all,
1042                                            op);
1043   }
1044   if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
1045        (0 != memcmp (&op->state->my_xor,
1046                      &idm->element_xor_hash,
1047                      sizeof (struct GNUNET_HashCode))) )
1048   {
1049     /* Other peer thinks we are done, but we disagree on the result! */
1050     GNUNET_break_op (0);
1051     fail_intersection_operation (op);
1052     return;
1053   }
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Got IntersectionDoneMessage, have %u elements in intersection\n",
1056               op->state->my_element_count);
1057   op->state->phase = PHASE_DONE_RECEIVED;
1058   GNUNET_CADET_receive_done (op->channel);
1059
1060   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1061   if (GNUNET_SET_RESULT_FULL == op->result_mode)
1062   {
1063     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                 "Sending full result set to client (%u elements)\n",
1065                 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1066     op->state->full_result_iter
1067       = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1068     send_remaining_elements (op);
1069     return;
1070   }
1071   op->state->phase = PHASE_FINISHED;
1072   send_client_done_and_destroy (op);
1073 }
1074
1075
1076 /**
1077  * Initiate a set intersection operation with a remote peer.
1078  *
1079  * @param op operation that is created, should be initialized to
1080  *        begin the evaluation
1081  * @param opaque_context message to be transmitted to the listener
1082  *        to convince him to accept, may be NULL
1083  * @return operation-specific state to keep in @a op
1084  */
1085 static struct OperationState *
1086 intersection_evaluate (struct Operation *op,
1087                        const struct GNUNET_MessageHeader *opaque_context)
1088 {
1089   struct OperationState *state;
1090   struct GNUNET_MQ_Envelope *ev;
1091   struct OperationRequestMessage *msg;
1092
1093   ev = GNUNET_MQ_msg_nested_mh (msg,
1094                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1095                                 opaque_context);
1096   if (NULL == ev)
1097   {
1098     /* the context message is too large!? */
1099     GNUNET_break (0);
1100     return NULL;
1101   }
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Initiating intersection operation evaluation\n");
1104   state = GNUNET_new (struct OperationState);
1105   /* we started the operation, thus we have to send the operation request */
1106   state->phase = PHASE_INITIAL;
1107   state->my_element_count = op->set->state->current_set_element_count;
1108   state->my_elements
1109     = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1110                                             GNUNET_YES);
1111
1112   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1113   msg->element_count = htonl (state->my_element_count);
1114   GNUNET_MQ_send (op->mq,
1115                   ev);
1116   state->phase = PHASE_COUNT_SENT;
1117   if (NULL != opaque_context)
1118     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119                 "Sent op request with context message\n");
1120   else
1121     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122                 "Sent op request without context message\n");
1123   return state;
1124 }
1125
1126
1127 /**
1128  * Accept an intersection operation request from a remote peer.  Only
1129  * initializes the private operation state.
1130  *
1131  * @param op operation that will be accepted as an intersection operation
1132  */
1133 static struct OperationState *
1134 intersection_accept (struct Operation *op)
1135 {
1136   struct OperationState *state;
1137
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Accepting set intersection operation\n");
1140   state = GNUNET_new (struct OperationState);
1141   state->phase = PHASE_INITIAL;
1142   state->my_element_count
1143     = op->set->state->current_set_element_count;
1144   state->my_elements
1145     = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1146                                                         op->remote_element_count),
1147                                             GNUNET_YES);
1148   op->state = state;
1149   if (op->remote_element_count < state->my_element_count)
1150   {
1151     /* If the other peer (Alice) has fewer elements than us (Bob),
1152        we just send the count as Alice should send the first BF */
1153     send_element_count (op);
1154     state->phase = PHASE_COUNT_SENT;
1155     return state;
1156   }
1157   /* We have fewer elements, so we start with the BF */
1158   begin_bf_exchange (op);
1159   return state;
1160 }
1161
1162
1163 /**
1164  * Destroy the intersection operation.  Only things specific to the
1165  * intersection operation are destroyed.
1166  *
1167  * @param op intersection operation to destroy
1168  */
1169 static void
1170 intersection_op_cancel (struct Operation *op)
1171 {
1172   /* check if the op was canceled twice */
1173   GNUNET_assert (NULL != op->state);
1174   if (NULL != op->state->remote_bf)
1175   {
1176     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1177     op->state->remote_bf = NULL;
1178   }
1179   if (NULL != op->state->local_bf)
1180   {
1181     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1182     op->state->local_bf = NULL;
1183   }
1184   if (NULL != op->state->my_elements)
1185   {
1186     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1187     op->state->my_elements = NULL;
1188   }
1189   if (NULL != op->state->full_result_iter)
1190   {
1191     GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1192     op->state->full_result_iter = NULL;
1193   }
1194   GNUNET_free (op->state);
1195   op->state = NULL;
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "Destroying intersection op state done\n");
1198 }
1199
1200
1201 /**
1202  * Create a new set supporting the intersection operation.
1203  *
1204  * @return the newly created set
1205  */
1206 static struct SetState *
1207 intersection_set_create ()
1208 {
1209   struct SetState *set_state;
1210
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Intersection set created\n");
1213   set_state = GNUNET_new (struct SetState);
1214   set_state->current_set_element_count = 0;
1215
1216   return set_state;
1217 }
1218
1219
1220 /**
1221  * Add the element from the given element message to the set.
1222  *
1223  * @param set_state state of the set want to add to
1224  * @param ee the element to add to the set
1225  */
1226 static void
1227 intersection_add (struct SetState *set_state,
1228                   struct ElementEntry *ee)
1229 {
1230   set_state->current_set_element_count++;
1231 }
1232
1233
1234 /**
1235  * Destroy a set that supports the intersection operation
1236  *
1237  * @param set_state the set to destroy
1238  */
1239 static void
1240 intersection_set_destroy (struct SetState *set_state)
1241 {
1242   GNUNET_free (set_state);
1243 }
1244
1245
1246 /**
1247  * Remove the element given in the element message from the set.
1248  *
1249  * @param set_state state of the set to remove from
1250  * @param element set element to remove
1251  */
1252 static void
1253 intersection_remove (struct SetState *set_state,
1254                      struct ElementEntry *element)
1255 {
1256   GNUNET_assert (0 < set_state->current_set_element_count);
1257   set_state->current_set_element_count--;
1258 }
1259
1260
1261 /**
1262  * Callback for channel death for the intersection operation.
1263  *
1264  * @param op operation that lost the channel
1265  */
1266 static void
1267 intersection_channel_death (struct Operation *op)
1268 {
1269   if (GNUNET_YES == op->state->channel_death_expected)
1270   {
1271     /* oh goodie, we are done! */
1272     send_client_done_and_destroy (op);
1273   }
1274   else
1275   {
1276     /* sorry, channel went down early, too bad. */
1277     _GSS_operation_destroy (op,
1278                             GNUNET_YES);
1279   }
1280 }
1281
1282
1283 /**
1284  * Get the table with implementing functions for set intersection.
1285  *
1286  * @return the operation specific VTable
1287  */
1288 const struct SetVT *
1289 _GSS_intersection_vt ()
1290 {
1291   static const struct SetVT intersection_vt = {
1292     .create = &intersection_set_create,
1293     .add = &intersection_add,
1294     .remove = &intersection_remove,
1295     .destroy_set = &intersection_set_destroy,
1296     .evaluate = &intersection_evaluate,
1297     .accept = &intersection_accept,
1298     .cancel = &intersection_op_cancel,
1299     .channel_death = &intersection_channel_death,
1300   };
1301
1302   return &intersection_vt;
1303 }