-indentation and comment fixes
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 4
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52 /**
53  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54  * Choose this value so that computing the IBF is still cheaper
55  * than transmitting all values.
56  */
57 #define MAX_IBF_ORDER (16)
58
59 /**
60  * Number of buckets used in the ibf per estimated
61  * difference.
62  */
63 #define IBF_ALPHA 4
64
65
66 /**
67  * Current phase we are in for a union operation.
68  */
69 enum UnionOperationPhase
70 {
71   /**
72    * We sent the request message, and expect a strata estimator
73    */
74   PHASE_EXPECT_SE,
75
76   /**
77    * We sent the strata estimator, and expect an IBF. This phase is entered once
78    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79    *
80    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
81    */
82   PHASE_EXPECT_IBF,
83
84   /**
85    * Continuation for multi part IBFs.
86    */
87   PHASE_EXPECT_IBF_CONT,
88
89   /**
90    * We are sending request and elements,
91    * and thus only expect elements from the other peer.
92    *
93    * We are currently decoding an IBF until it can no longer be decoded,
94    * we currently send requests and expect elements
95    * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
96    */
97   PHASE_EXPECT_ELEMENTS,
98
99   /**
100    * We are expecting elements and requests, and send
101    * requested elements back to the other peer.
102    *
103    * We are in this phase if we have SENT an IBF for the remote peer to decode.
104    * We expect requests, send elements or could receive an new IBF, which takes
105    * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
106    *
107    * The remote peer is thus in:
108    * #PHASE_EXPECT_ELEMENTS
109    */
110   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
111
112   /**
113    * The protocol is over.
114    * Results may still have to be sent to the client.
115    */
116   PHASE_FINISHED
117 };
118
119
120 /**
121  * State of an evaluate operation with another peer.
122  */
123 struct OperationState
124 {
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145    * Colliding IBF-Keys are linked.
146    */
147   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148
149   /**
150    * Iterator for sending elements on the key to element mapping to the client.
151    */
152   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153
154   /**
155    * Current state of the operation.
156    */
157   enum UnionOperationPhase phase;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163
164   /**
165    * Number of ibf buckets received
166    */
167   unsigned int ibf_buckets_received;
168
169 };
170
171
172 /**
173  * The key entry is used to associate an ibf key with an element.
174  */
175 struct KeyEntry
176 {
177   /**
178    * IBF key for the entry, derived from the current salt.
179    */
180   struct IBF_Key ibf_key;
181
182   /**
183    * The actual element associated with the key.
184    */
185   struct ElementEntry *element;
186
187   /**
188    * Element that collides with this element
189    * on the ibf key. All colliding entries must have the same ibf key.
190    */
191   struct KeyEntry *next_colliding;
192 };
193
194
195 /**
196  * Used as a closure for sending elements
197  * with a specific IBF key.
198  */
199 struct SendElementClosure
200 {
201   /**
202    * The IBF key whose matching elements should be
203    * sent.
204    */
205   struct IBF_Key ibf_key;
206
207   /**
208    * Operation for which the elements
209    * should be sent.
210    */
211   struct Operation *op;
212 };
213
214
215 /**
216  * Extra state required for efficient set union.
217  */
218 struct SetState
219 {
220   /**
221    * The strata estimator is only generated once for
222    * each set.
223    * The IBF keys are derived from the element hashes with
224    * salt=0.
225    */
226   struct StrataEstimator *se;
227 };
228
229
230 /**
231  * Iterator over hash map entries, called to
232  * destroy the linked list of colliding ibf key entries.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to iterate,
238  *         #GNUNET_NO if not.
239  */
240 static int
241 destroy_key_to_element_iter (void *cls,
242                              uint32_t key,
243                              void *value)
244 {
245   struct KeyEntry *k = value;
246
247   while (NULL != k)
248   {
249     struct KeyEntry *k_tmp = k;
250
251     k = k->next_colliding;
252     if (GNUNET_YES == k_tmp->element->remote)
253     {
254       GNUNET_free (k_tmp->element);
255       k_tmp->element = NULL;
256     }
257     GNUNET_free (k_tmp);
258   }
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Destroy the union operation.  Only things specific to the union
265  * operation are destroyed.
266  *
267  * @param op union operation to destroy
268  */
269 static void
270 union_op_cancel (struct Operation *op)
271 {
272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273               "destroying union op\n");
274   /* check if the op was canceled twice */
275   GNUNET_assert (NULL != op->state);
276   if (NULL != op->state->remote_ibf)
277   {
278     ibf_destroy (op->state->remote_ibf);
279     op->state->remote_ibf = NULL;
280   }
281   if (NULL != op->state->local_ibf)
282   {
283     ibf_destroy (op->state->local_ibf);
284     op->state->local_ibf = NULL;
285   }
286   if (NULL != op->state->se)
287   {
288     strata_estimator_destroy (op->state->se);
289     op->state->se = NULL;
290   }
291   if (NULL != op->state->key_to_element)
292   {
293     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
294                                              &destroy_key_to_element_iter,
295                                              NULL);
296     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
297     op->state->key_to_element = NULL;
298   }
299   GNUNET_free (op->state);
300   op->state = NULL;
301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302               "destroying union op done\n");
303 }
304
305
306 /**
307  * Inform the client that the union operation has failed,
308  * and proceed to destroy the evaluate operation.
309  *
310  * @param op the union operation to fail
311  */
312 static void
313 fail_union_operation (struct Operation *op)
314 {
315   struct GNUNET_MQ_Envelope *ev;
316   struct GNUNET_SET_ResultMessage *msg;
317
318   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
319               "union operation failed\n");
320   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322   msg->request_id = htonl (op->spec->client_request_id);
323   msg->element_type = htons (0);
324   GNUNET_MQ_send (op->spec->set->client_mq, ev);
325   _GSS_operation_destroy (op, GNUNET_YES);
326 }
327
328
329 /**
330  * Derive the IBF key from a hash code and
331  * a salt.
332  *
333  * @param src the hash code
334  * @param salt salt to use
335  * @return the derived IBF key
336  */
337 static struct IBF_Key
338 get_ibf_key (const struct GNUNET_HashCode *src,
339              uint16_t salt)
340 {
341   struct IBF_Key key;
342
343   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
344                       GCRY_MD_SHA512, GCRY_MD_SHA256,
345                       src, sizeof *src,
346                       &salt, sizeof (salt),
347                       NULL, 0);
348   return key;
349 }
350
351
352 /**
353  * Iterator to create the mapping between ibf keys
354  * and element entries.
355  *
356  * @param cls closure
357  * @param key current key code
358  * @param value value in the hash map
359  * @return #GNUNET_YES if we should continue to iterate,
360  *         #GNUNET_NO if not.
361  */
362 static int
363 op_register_element_iterator (void *cls,
364                               uint32_t key,
365                               void *value)
366 {
367   struct KeyEntry *const new_k = cls;
368   struct KeyEntry *old_k = value;
369
370   GNUNET_assert (NULL != old_k);
371   /* check if our ibf key collides with the ibf key in the existing entry */
372   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
373   {
374     /* insert the the new key in the collision chain */
375     new_k->next_colliding = old_k->next_colliding;
376     old_k->next_colliding = new_k;
377     /* signal to the caller that we were able to insert into a colliding bucket */
378     return GNUNET_NO;
379   }
380   return GNUNET_YES;
381 }
382
383
384 /**
385  * Iterator to create the mapping between ibf keys
386  * and element entries.
387  *
388  * @param cls closure
389  * @param key current key code
390  * @param value value in the hash map
391  * @return #GNUNET_YES (we should continue to iterate)
392  */
393 static int
394 op_has_element_iterator (void *cls,
395                          uint32_t key,
396                          void *value)
397 {
398   struct GNUNET_HashCode *element_hash = cls;
399   struct KeyEntry *k = value;
400
401   GNUNET_assert (NULL != k);
402   while (NULL != k)
403   {
404     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
405                                      element_hash))
406       return GNUNET_NO;
407     k = k->next_colliding;
408   }
409   return GNUNET_YES;
410 }
411
412
413 /**
414  * Determine whether the given element is already in the operation's element
415  * set.
416  *
417  * @param op operation that should be tested for 'element_hash'
418  * @param element_hash hash of the element to look for
419  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
420  */
421 static int
422 op_has_element (struct Operation *op,
423                 const struct GNUNET_HashCode *element_hash)
424 {
425   int ret;
426   struct IBF_Key ibf_key;
427
428   ibf_key = get_ibf_key (element_hash, op->spec->salt);
429   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
430                                                       (uint32_t) ibf_key.key_val,
431                                                       op_has_element_iterator,
432                                                       (void *) element_hash);
433
434   /* was the iteration aborted because we found the element? */
435   if (GNUNET_SYSERR == ret)
436     return GNUNET_YES;
437   return GNUNET_NO;
438 }
439
440
441 /**
442  * Insert an element into the union operation's
443  * key-to-element mapping. Takes ownership of 'ee'.
444  * Note that this does not insert the element in the set,
445  * only in the operation's key-element mapping.
446  * This is done to speed up re-tried operations, if some elements
447  * were transmitted, and then the IBF fails to decode.
448  *
449  * @param op the union operation
450  * @param ee the element entry
451  */
452 static void
453 op_register_element (struct Operation *op,
454                      struct ElementEntry *ee)
455 {
456   int ret;
457   struct IBF_Key ibf_key;
458   struct KeyEntry *k;
459
460   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
461   k = GNUNET_new (struct KeyEntry);
462   k->element = ee;
463   k->ibf_key = ibf_key;
464   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465                                                       (uint32_t) ibf_key.key_val,
466                                                       op_register_element_iterator,
467                                                       k);
468
469   /* was the element inserted into a colliding bucket? */
470   if (GNUNET_SYSERR == ret)
471     return;
472   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
473                                        (uint32_t) ibf_key.key_val,
474                                        k,
475                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
476 }
477
478
479 /**
480  * Insert a key into an ibf.
481  *
482  * @param cls the ibf
483  * @param key unused
484  * @param value the key entry to get the key from
485  */
486 static int
487 prepare_ibf_iterator (void *cls,
488                       uint32_t key,
489                       void *value)
490 {
491   struct InvertibleBloomFilter *ibf = cls;
492   struct KeyEntry *ke = value;
493
494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495               "inserting %x into ibf\n",
496               ke->ibf_key.key_val);
497   ibf_insert (ibf, ke->ibf_key);
498   return GNUNET_YES;
499 }
500
501
502 /**
503  * Iterator for initializing the
504  * key-to-element mapping of a union operation
505  *
506  * @param cls the union operation `struct Operation *`
507  * @param key unused
508  * @param value the `struct ElementEntry *` to insert
509  *        into the key-to-element mapping
510  * @return #GNUNET_YES (to continue iterating)
511  */
512 static int
513 init_key_to_element_iterator (void *cls,
514                               const struct GNUNET_HashCode *key,
515                               void *value)
516 {
517   struct Operation *op = cls;
518   struct ElementEntry *e = value;
519
520   /* make sure that the element belongs to the set at the time
521    * of creating the operation */
522   if ( (e->generation_added > op->generation_created) ||
523        ( (GNUNET_YES == e->removed) &&
524          (e->generation_removed < op->generation_created)))
525     return GNUNET_YES;
526
527   GNUNET_assert (GNUNET_NO == e->remote);
528
529   op_register_element (op, e);
530   return GNUNET_YES;
531 }
532
533
534 /**
535  * Create an ibf with the operation's elements
536  * of the specified size
537  *
538  * @param op the union operation
539  * @param size size of the ibf to create
540  */
541 static void
542 prepare_ibf (struct Operation *op,
543              uint16_t size)
544 {
545   if (NULL == op->state->key_to_element)
546   {
547     unsigned int len;
548
549     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
550     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
551     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
552                                            init_key_to_element_iterator, op);
553   }
554   if (NULL != op->state->local_ibf)
555     ibf_destroy (op->state->local_ibf);
556   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
557   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
558                                            &prepare_ibf_iterator,
559                                            op->state->local_ibf);
560 }
561
562
563 /**
564  * Send an ibf of appropriate size.
565  *
566  * @param op the union operation
567  * @param ibf_order order of the ibf to send, size=2^order
568  */
569 static void
570 send_ibf (struct Operation *op,
571           uint16_t ibf_order)
572 {
573   unsigned int buckets_sent = 0;
574   struct InvertibleBloomFilter *ibf;
575
576   prepare_ibf (op, 1<<ibf_order);
577
578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
579               "sending ibf of size %u\n",
580               1<<ibf_order);
581
582   ibf = op->state->local_ibf;
583
584   while (buckets_sent < (1 << ibf_order))
585   {
586     unsigned int buckets_in_message;
587     struct GNUNET_MQ_Envelope *ev;
588     struct IBFMessage *msg;
589
590     buckets_in_message = (1 << ibf_order) - buckets_sent;
591     /* limit to maximum */
592     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
593       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
594
595     ev = GNUNET_MQ_msg_extra (msg,
596                               buckets_in_message * IBF_BUCKET_SIZE,
597                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
598     msg->reserved = 0;
599     msg->order = ibf_order;
600     msg->offset = htons (buckets_sent);
601     ibf_write_slice (ibf, buckets_sent,
602                      buckets_in_message, &msg[1]);
603     buckets_sent += buckets_in_message;
604     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605                 "ibf chunk size %u, %u/%u sent\n",
606                 buckets_in_message,
607                 buckets_sent,
608                 1<<ibf_order);
609     GNUNET_MQ_send (op->mq, ev);
610   }
611
612   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
613 }
614
615
616 /**
617  * Send a strata estimator to the remote peer.
618  *
619  * @param op the union operation with the remote peer
620  */
621 static void
622 send_strata_estimator (struct Operation *op)
623 {
624   struct GNUNET_MQ_Envelope *ev;
625   struct GNUNET_MessageHeader *strata_msg;
626
627   ev = GNUNET_MQ_msg_header_extra (strata_msg,
628                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
629                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
630   strata_estimator_write (op->state->se, &strata_msg[1]);
631   GNUNET_MQ_send (op->mq,
632                   ev);
633   op->state->phase = PHASE_EXPECT_IBF;
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635               "sent SE, expecting IBF\n");
636 }
637
638
639 /**
640  * Compute the necessary order of an ibf
641  * from the size of the symmetric set difference.
642  *
643  * @param diff the difference
644  * @return the required size of the ibf
645  */
646 static unsigned int
647 get_order_from_difference (unsigned int diff)
648 {
649   unsigned int ibf_order;
650
651   ibf_order = 2;
652   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
653           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
654     ibf_order++;
655   if (ibf_order > MAX_IBF_ORDER)
656     ibf_order = MAX_IBF_ORDER;
657   return ibf_order;
658 }
659
660
661 /**
662  * Handle a strata estimator from a remote peer
663  *
664  * @param cls the union operation
665  * @param mh the message
666  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
667  *         #GNUNET_OK otherwise
668  */
669 static int
670 handle_p2p_strata_estimator (void *cls,
671                              const struct GNUNET_MessageHeader *mh)
672 {
673   struct Operation *op = cls;
674   struct StrataEstimator *remote_se;
675   int diff;
676
677   if (op->state->phase != PHASE_EXPECT_SE)
678   {
679     fail_union_operation (op);
680     GNUNET_break (0);
681     return GNUNET_SYSERR;
682   }
683   if (ntohs (mh->size) !=
684       SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE +
685       sizeof (struct GNUNET_MessageHeader))
686   {
687     fail_union_operation (op);
688     GNUNET_break (0);
689     return GNUNET_SYSERR;
690   }
691   remote_se = strata_estimator_create (SE_STRATA_COUNT,
692                                        SE_IBF_SIZE,
693                                        SE_IBF_HASH_NUM);
694   strata_estimator_read (&mh[1], remote_se);
695   GNUNET_assert (NULL != op->state->se);
696   diff = strata_estimator_difference (remote_se,
697                                       op->state->se);
698   strata_estimator_destroy (remote_se);
699   strata_estimator_destroy (op->state->se);
700   op->state->se = NULL;
701   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702               "got se diff=%d, using ibf size %d\n",
703               diff,
704               1<<get_order_from_difference (diff));
705   send_ibf (op,
706             get_order_from_difference (diff));
707   return GNUNET_OK;
708 }
709
710
711 /**
712  * Iterator to send elements to a remote peer
713  *
714  * @param cls closure with the element key and the union operation
715  * @param key ignored
716  * @param value the key entry
717  */
718 static int
719 send_element_iterator (void *cls,
720                        uint32_t key,
721                        void *value)
722 {
723   struct SendElementClosure *sec = cls;
724   struct IBF_Key ibf_key = sec->ibf_key;
725   struct Operation *op = sec->op;
726   struct KeyEntry *ke = value;
727
728   if (ke->ibf_key.key_val != ibf_key.key_val)
729     return GNUNET_YES;
730   while (NULL != ke)
731   {
732     const struct GNUNET_SET_Element *const element = &ke->element->element;
733     struct GNUNET_MQ_Envelope *ev;
734     struct GNUNET_MessageHeader *mh;
735
736     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
737     ev = GNUNET_MQ_msg_header_extra (mh,
738                                      element->size,
739                                      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
740     if (NULL == ev)
741     {
742       /* element too large */
743       GNUNET_break (0);
744       continue;
745     }
746     memcpy (&mh[1],
747             element->data,
748             element->size);
749     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750                 "sending element (%s) to peer\n",
751                 GNUNET_h2s (&ke->element->element_hash));
752     GNUNET_MQ_send (op->mq, ev);
753     ke = ke->next_colliding;
754   }
755   return GNUNET_NO;
756 }
757
758
759 /**
760  * Send all elements that have the specified IBF key
761  * to the remote peer of the union operation
762  *
763  * @param op union operation
764  * @param ibf_key IBF key of interest
765  */
766 static void
767 send_elements_for_key (struct Operation *op,
768                        struct IBF_Key ibf_key)
769 {
770   struct SendElementClosure send_cls;
771
772   send_cls.ibf_key = ibf_key;
773   send_cls.op = op;
774   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
775                                                        (uint32_t) ibf_key.key_val,
776                                                        &send_element_iterator,
777                                                        &send_cls);
778 }
779
780
781 /**
782  * Decode which elements are missing on each side, and
783  * send the appropriate elemens and requests
784  *
785  * @param op union operation
786  */
787 static void
788 decode_and_send (struct Operation *op)
789 {
790   struct IBF_Key key;
791   struct IBF_Key last_key;
792   int side;
793   unsigned int num_decoded;
794   struct InvertibleBloomFilter *diff_ibf;
795
796   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
797
798   prepare_ibf (op, op->state->remote_ibf->size);
799   diff_ibf = ibf_dup (op->state->local_ibf);
800   ibf_subtract (diff_ibf, op->state->remote_ibf);
801
802   ibf_destroy (op->state->remote_ibf);
803   op->state->remote_ibf = NULL;
804
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "decoding IBF (size=%u)\n",
807               diff_ibf->size);
808
809   num_decoded = 0;
810   last_key.key_val = 0;
811
812   while (1)
813   {
814     int res;
815     int cycle_detected = GNUNET_NO;
816
817     last_key = key;
818
819     res = ibf_decode (diff_ibf, &side, &key);
820     if (res == GNUNET_OK)
821     {
822       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
823                   "decoded ibf key %lx\n",
824                   key.key_val);
825       num_decoded += 1;
826       if ( (num_decoded > diff_ibf->size) ||
827            (num_decoded > 1 && last_key.key_val == key.key_val) )
828       {
829         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830                     "detected cyclic ibf (decoded %u/%u)\n",
831                     num_decoded,
832                     diff_ibf->size);
833         cycle_detected = GNUNET_YES;
834       }
835     }
836     if ( (GNUNET_SYSERR == res) ||
837          (GNUNET_YES == cycle_detected) )
838     {
839       int next_order;
840       next_order = 0;
841       while (1<<next_order < diff_ibf->size)
842         next_order++;
843       next_order++;
844       if (next_order <= MAX_IBF_ORDER)
845       {
846         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847                     "decoding failed, sending larger ibf (size %u)\n",
848                     1<<next_order);
849         send_ibf (op, next_order);
850       }
851       else
852       {
853         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
854                     "set union failed: reached ibf limit\n");
855       }
856       break;
857     }
858     if (GNUNET_NO == res)
859     {
860       struct GNUNET_MQ_Envelope *ev;
861
862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863                   "transmitted all values, sending DONE\n");
864       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
865       GNUNET_MQ_send (op->mq, ev);
866       break;
867     }
868     if (1 == side)
869     {
870       send_elements_for_key (op, key);
871     }
872     else if (-1 == side)
873     {
874       struct GNUNET_MQ_Envelope *ev;
875       struct GNUNET_MessageHeader *msg;
876
877       /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
878        * the effort additional complexity. */
879       ev = GNUNET_MQ_msg_header_extra (msg,
880                                        sizeof (struct IBF_Key),
881                                        GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
882
883       memcpy (&msg[1],
884               &key,
885               sizeof (struct IBF_Key));
886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887                   "sending element request\n");
888       GNUNET_MQ_send (op->mq, ev);
889     }
890     else
891     {
892       GNUNET_assert (0);
893     }
894   }
895   ibf_destroy (diff_ibf);
896 }
897
898
899 /**
900  * Handle an IBF message from a remote peer.
901  *
902  * @param cls the union operation
903  * @param mh the header of the message
904  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
905  *         #GNUNET_OK otherwise
906  */
907 static int
908 handle_p2p_ibf (void *cls,
909                 const struct GNUNET_MessageHeader *mh)
910 {
911   struct Operation *op = cls;
912   const struct IBFMessage *msg;
913   unsigned int buckets_in_message;
914
915   if (ntohs (mh->size) < sizeof (struct IBFMessage))
916   {
917     GNUNET_break_op (0);
918     fail_union_operation (op);
919     return GNUNET_SYSERR;
920   }
921   msg = (const struct IBFMessage *) mh;
922   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
923        (op->state->phase == PHASE_EXPECT_IBF) )
924   {
925     op->state->phase = PHASE_EXPECT_IBF_CONT;
926     GNUNET_assert (NULL == op->state->remote_ibf);
927     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                 "Creating new ibf of size %u\n",
929                 1 << msg->order);
930     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
931     op->state->ibf_buckets_received = 0;
932     if (0 != ntohs (msg->offset))
933     {
934       GNUNET_break_op (0);
935       fail_union_operation (op);
936       return GNUNET_SYSERR;
937     }
938   }
939   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
940   {
941     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
942          (1<<msg->order != op->state->remote_ibf->size) )
943     {
944       GNUNET_break_op (0);
945       fail_union_operation (op);
946       return GNUNET_SYSERR;
947     }
948   }
949
950   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
951
952   if (0 == buckets_in_message)
953   {
954     GNUNET_break_op (0);
955     fail_union_operation (op);
956     return GNUNET_SYSERR;
957   }
958
959   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
960   {
961     GNUNET_break_op (0);
962     fail_union_operation (op);
963     return GNUNET_SYSERR;
964   }
965
966   ibf_read_slice (&msg[1],
967                   op->state->ibf_buckets_received,
968                   buckets_in_message,
969                   op->state->remote_ibf);
970   op->state->ibf_buckets_received += buckets_in_message;
971
972   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
973   {
974     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                 "received full ibf\n");
976     op->state->phase = PHASE_EXPECT_ELEMENTS;
977     decode_and_send (op);
978   }
979   return GNUNET_OK;
980 }
981
982
983 /**
984  * Send a result message to the client indicating
985  * that there is a new element.
986  *
987  * @param op union operation
988  * @param element element to send
989  */
990 static void
991 send_client_element (struct Operation *op,
992                      struct GNUNET_SET_Element *element)
993 {
994   struct GNUNET_MQ_Envelope *ev;
995   struct GNUNET_SET_ResultMessage *rm;
996
997   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998               "sending element (size %u) to client\n",
999               element->size);
1000   GNUNET_assert (0 != op->spec->client_request_id);
1001   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1002   if (NULL == ev)
1003   {
1004     GNUNET_MQ_discard (ev);
1005     GNUNET_break (0);
1006     return;
1007   }
1008   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1009   rm->request_id = htonl (op->spec->client_request_id);
1010   rm->element_type = element->element_type;
1011   memcpy (&rm[1], element->data, element->size);
1012   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1013 }
1014
1015
1016 /**
1017  * Signal to the client that the operation has finished and
1018  * destroy the operation.
1019  *
1020  * @param cls operation to destroy
1021  */
1022 static void
1023 send_done_and_destroy (void *cls)
1024 {
1025   struct Operation *op = cls;
1026   struct GNUNET_MQ_Envelope *ev;
1027   struct GNUNET_SET_ResultMessage *rm;
1028
1029   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1030   rm->request_id = htonl (op->spec->client_request_id);
1031   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1032   rm->element_type = htons (0);
1033   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1034   _GSS_operation_destroy (op, GNUNET_YES);
1035   op->keep--;
1036   if (0 == op->keep)
1037     GNUNET_free (op);
1038 }
1039
1040
1041 /**
1042  * Send all remaining elements in the full result iterator.
1043  *
1044  * @param cls operation
1045  */
1046 static void
1047 send_remaining_elements (void *cls)
1048 {
1049   struct Operation *op = cls;
1050   struct KeyEntry *ke;
1051   int res;
1052
1053   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter,
1054                                                        NULL,
1055                                                        (const void **) &ke);
1056   if (GNUNET_NO == res)
1057   {
1058     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                 "sending done and destroy because iterator ran out\n");
1060     send_done_and_destroy (op);
1061     return;
1062   }
1063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064               "sending elements from key entry\n");
1065   while (1)
1066   {
1067     struct GNUNET_MQ_Envelope *ev;
1068     struct GNUNET_SET_ResultMessage *rm;
1069     struct GNUNET_SET_Element *element;
1070
1071     element = &ke->element->element;
1072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                 "sending element (size %u) to client (full set)\n",
1074                 element->size);
1075     GNUNET_assert (0 != op->spec->client_request_id);
1076     ev = GNUNET_MQ_msg_extra (rm,
1077                               element->size,
1078                               GNUNET_MESSAGE_TYPE_SET_RESULT);
1079     if (NULL == ev)
1080     {
1081       GNUNET_MQ_discard (ev);
1082       GNUNET_break (0);
1083       continue;
1084     }
1085     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1086     rm->request_id = htonl (op->spec->client_request_id);
1087     rm->element_type = element->element_type;
1088     memcpy (&rm[1], element->data, element->size);
1089     if (NULL == ke->next_colliding)
1090     {
1091       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1092       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1093       break;
1094     }
1095     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1096     ke = ke->next_colliding;
1097   }
1098 }
1099
1100
1101 /**
1102  * Send a result message to the client indicating
1103  * that the operation is over.
1104  * After the result done message has been sent to the client,
1105  * destroy the evaluate operation.
1106  *
1107  * @param op union operation
1108  */
1109 static void
1110 finish_and_destroy (struct Operation *op)
1111 {
1112   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1113   op->keep++;
1114   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1115   {
1116     /* prevent that the op is free'd by the tunnel end handler */
1117     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118                 "sending full result set\n");
1119     GNUNET_assert (NULL == op->state->full_result_iter);
1120     op->state->full_result_iter =
1121         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1122     send_remaining_elements (op);
1123     return;
1124   }
1125   send_done_and_destroy (op);
1126 }
1127
1128
1129 /**
1130  * Handle an element message from a remote peer.
1131  *
1132  * @param cls the union operation
1133  * @param mh the message
1134  */
1135 static void
1136 handle_p2p_elements (void *cls,
1137                      const struct GNUNET_MessageHeader *mh)
1138 {
1139   struct Operation *op = cls;
1140   struct ElementEntry *ee;
1141   uint16_t element_size;
1142
1143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1144               "Got element from peer\n");
1145   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1146        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1147   {
1148     fail_union_operation (op);
1149     GNUNET_break_op (0);
1150     return;
1151   }
1152   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1153   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1154   memcpy (&ee[1], &mh[1], element_size);
1155   ee->element.size = element_size;
1156   ee->element.data = &ee[1];
1157   ee->remote = GNUNET_YES;
1158   GNUNET_CRYPTO_hash (ee->element.data,
1159                       ee->element.size,
1160                       &ee->element_hash);
1161
1162   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1163   {
1164     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                 "got existing element from peer\n");
1166     GNUNET_free (ee);
1167     return;
1168   }
1169
1170   op_register_element (op, ee);
1171   /* only send results immediately if the client wants it */
1172   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1173     send_client_element (op, &ee->element);
1174 }
1175
1176
1177 /**
1178  * Handle an element request from a remote peer.
1179  *
1180  * @param cls the union operation
1181  * @param mh the message
1182  */
1183 static void
1184 handle_p2p_element_requests (void *cls,
1185                              const struct GNUNET_MessageHeader *mh)
1186 {
1187   struct Operation *op = cls;
1188   const struct IBF_Key *ibf_key;
1189   unsigned int num_keys;
1190
1191   /* look up elements and send them */
1192   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1193   {
1194     GNUNET_break_op (0);
1195     fail_union_operation (op);
1196     return;
1197   }
1198   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1199     / sizeof (struct IBF_Key);
1200   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1201       != num_keys * sizeof (struct IBF_Key))
1202   {
1203     GNUNET_break_op (0);
1204     fail_union_operation (op);
1205     return;
1206   }
1207
1208   ibf_key = (const struct IBF_Key *) &mh[1];
1209   while (0 != num_keys--)
1210   {
1211     send_elements_for_key (op, *ibf_key);
1212     ibf_key++;
1213   }
1214 }
1215
1216
1217 /**
1218  * Handle a done message from a remote peer
1219  *
1220  * @param cls the union operation
1221  * @param mh the message
1222  */
1223 static void
1224 handle_p2p_done (void *cls,
1225                  const struct GNUNET_MessageHeader *mh)
1226 {
1227   struct Operation *op = cls;
1228   struct GNUNET_MQ_Envelope *ev;
1229
1230   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1231   {
1232     /* we got all requests, but still have to send our elements as response */
1233
1234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235                 "got DONE, sending final DONE after elements\n");
1236     op->state->phase = PHASE_FINISHED;
1237     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1238     GNUNET_MQ_send (op->mq, ev);
1239     return;
1240   }
1241   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1242   {
1243     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244                 "got final DONE\n");
1245     op->state->phase = PHASE_FINISHED;
1246     finish_and_destroy (op);
1247     return;
1248   }
1249   GNUNET_break_op (0);
1250   fail_union_operation (op);
1251 }
1252
1253
1254 /**
1255  * Initiate operation to evaluate a set union with a remote peer.
1256  *
1257  * @param op operation to perform (to be initialized)
1258  * @param opaque_context message to be transmitted to the listener
1259  *        to convince him to accept, may be NULL
1260  */
1261 static void
1262 union_evaluate (struct Operation *op,
1263                 const struct GNUNET_MessageHeader *opaque_context)
1264 {
1265   struct GNUNET_MQ_Envelope *ev;
1266   struct OperationRequestMessage *msg;
1267
1268   op->state = GNUNET_new (struct OperationState);
1269   /* copy the current generation's strata estimator for this operation */
1270   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1271   /* we started the operation, thus we have to send the operation request */
1272   op->state->phase = PHASE_EXPECT_SE;
1273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274               "Initiating union operation evaluation\n");
1275   ev = GNUNET_MQ_msg_nested_mh (msg,
1276                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1277                                 opaque_context);
1278   if (NULL == ev)
1279   {
1280     /* the context message is too large */
1281     GNUNET_break (0);
1282     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1283     return;
1284   }
1285   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1286   msg->app_id = op->spec->app_id;
1287   GNUNET_MQ_send (op->mq,
1288                   ev);
1289
1290   if (NULL != opaque_context)
1291     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292                 "sent op request with context message\n");
1293   else
1294     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295                 "sent op request without context message\n");
1296 }
1297
1298
1299 /**
1300  * Accept an union operation request from a remote peer.
1301  * Only initializes the private operation state.
1302  *
1303  * @param op operation that will be accepted as a union operation
1304  */
1305 static void
1306 union_accept (struct Operation *op)
1307 {
1308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309               "accepting set union operation\n");
1310   op->state = GNUNET_new (struct OperationState);
1311   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1312   /* kick off the operation */
1313   send_strata_estimator (op);
1314 }
1315
1316
1317 /**
1318  * Create a new set supporting the union operation
1319  *
1320  * We maintain one strata estimator per set and then manipulate it over the
1321  * lifetime of the set, as recreating a strata estimator would be expensive.
1322  *
1323  * @return the newly created set
1324  */
1325 static struct SetState *
1326 union_set_create (void)
1327 {
1328   struct SetState *set_state;
1329
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "union set created\n");
1332   set_state = GNUNET_new (struct SetState);
1333   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1334                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1335   return set_state;
1336 }
1337
1338
1339 /**
1340  * Add the element from the given element message to the set.
1341  *
1342  * @param set_state state of the set want to add to
1343  * @param ee the element to add to the set
1344  */
1345 static void
1346 union_add (struct SetState *set_state, struct ElementEntry *ee)
1347 {
1348   strata_estimator_insert (set_state->se,
1349                            get_ibf_key (&ee->element_hash, 0));
1350 }
1351
1352
1353 /**
1354  * Remove the element given in the element message from the set.
1355  * Only marks the element as removed, so that older set operations can still exchange it.
1356  *
1357  * @param set_state state of the set to remove from
1358  * @param ee set element to remove
1359  */
1360 static void
1361 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1362 {
1363   strata_estimator_remove (set_state->se,
1364                            get_ibf_key (&ee->element_hash, 0));
1365 }
1366
1367
1368 /**
1369  * Destroy a set that supports the union operation.
1370  *
1371  * @param set_state the set to destroy
1372  */
1373 static void
1374 union_set_destroy (struct SetState *set_state)
1375 {
1376   if (NULL != set_state->se)
1377   {
1378     strata_estimator_destroy (set_state->se);
1379     set_state->se = NULL;
1380   }
1381   GNUNET_free (set_state);
1382 }
1383
1384
1385 /**
1386  * Dispatch messages for a union operation.
1387  *
1388  * @param op the state of the union evaluate operation
1389  * @param mh the received message
1390  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1391  *         #GNUNET_OK otherwise
1392  */
1393 int
1394 union_handle_p2p_message (struct Operation *op,
1395                           const struct GNUNET_MessageHeader *mh)
1396 {
1397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398               "received p2p message (t: %u, s: %u)\n",
1399               ntohs (mh->type),
1400               ntohs (mh->size));
1401   switch (ntohs (mh->type))
1402   {
1403     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1404       return handle_p2p_ibf (op, mh);
1405     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1406       return handle_p2p_strata_estimator (op, mh);
1407     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1408       handle_p2p_elements (op, mh);
1409       break;
1410     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1411       handle_p2p_element_requests (op, mh);
1412       break;
1413     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1414       handle_p2p_done (op, mh);
1415       break;
1416     default:
1417       /* something wrong with cadet's message handlers? */
1418       GNUNET_assert (0);
1419   }
1420   return GNUNET_OK;
1421 }
1422
1423 /**
1424  * handler for peer-disconnects, notifies the client
1425  * about the aborted operation in case the op was not concluded
1426  *
1427  * @param op the destroyed operation
1428  */
1429 static void
1430 union_peer_disconnect (struct Operation *op)
1431 {
1432   if (PHASE_FINISHED != op->state->phase)
1433   {
1434     struct GNUNET_MQ_Envelope *ev;
1435     struct GNUNET_SET_ResultMessage *msg;
1436
1437     ev = GNUNET_MQ_msg (msg,
1438                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1439     msg->request_id = htonl (op->spec->client_request_id);
1440     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1441     msg->element_type = htons (0);
1442     GNUNET_MQ_send (op->spec->set->client_mq,
1443                     ev);
1444     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1445                 "other peer disconnected prematurely\n");
1446     _GSS_operation_destroy (op,
1447                             GNUNET_YES);
1448     return;
1449   }
1450   // else: the session has already been concluded
1451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452               "other peer disconnected (finished)\n");
1453   if (GNUNET_NO == op->state->client_done_sent)
1454     finish_and_destroy (op);
1455 }
1456
1457
1458 /**
1459  * Get the table with implementing functions for
1460  * set union.
1461  *
1462  * @return the operation specific VTable
1463  */
1464 const struct SetVT *
1465 _GSS_union_vt ()
1466 {
1467   static const struct SetVT union_vt = {
1468     .create = &union_set_create,
1469     .msg_handler = &union_handle_p2p_message,
1470     .add = &union_add,
1471     .remove = &union_remove,
1472     .destroy_set = &union_set_destroy,
1473     .evaluate = &union_evaluate,
1474     .accept = &union_accept,
1475     .peer_disconnect = &union_peer_disconnect,
1476     .cancel = &union_op_cancel,
1477   };
1478
1479   return &union_vt;
1480 }