clean up internal set API, avoid copying context message needlessly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "ibf.h"
30 #include "strata_estimator.h"
31 #include "set_protocol.h"
32 #include <gcrypt.h>
33
34
35 /**
36  * Number of IBFs in a strata estimator.
37  */
38 #define SE_STRATA_COUNT 32
39 /**
40  * Size of the IBFs in the strata estimator.
41  */
42 #define SE_IBF_SIZE 80
43 /**
44  * hash num parameter for the difference digests and strata estimators
45  */
46 #define SE_IBF_HASH_NUM 4
47
48 /**
49  * Number of buckets that can be transmitted in one message.
50  */
51 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
52
53 /**
54  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55  * Choose this value so that computing the IBF is still cheaper
56  * than transmitting all values.
57  */
58 #define MAX_IBF_ORDER (16)
59
60 /**
61  * Number of buckets used in the ibf per estimated
62  * difference.
63  */
64 #define IBF_ALPHA 4
65
66
67 /**
68  * Current phase we are in for a union operation.
69  */
70 enum UnionOperationPhase
71 {
72   /**
73    * We sent the request message, and expect a strata estimator
74    */
75   PHASE_EXPECT_SE,
76
77   /**
78    * We sent the strata estimator, and expect an IBF. This phase is entered once
79    * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
80    *
81    * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
82    */
83   PHASE_EXPECT_IBF,
84
85   /**
86    * Continuation for multi part IBFs.
87    */
88   PHASE_EXPECT_IBF_CONT,
89
90   /**
91    * We are sending request and elements,
92    * and thus only expect elements from the other peer.
93    *
94    * We are currently decoding an IBF until it can no longer be decoded,
95    * we currently send requests and expect elements
96    * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
97    */
98   PHASE_EXPECT_ELEMENTS,
99
100   /**
101    * We are expecting elements and requests, and send
102    * requested elements back to the other peer.
103    *
104    * We are in this phase if we have SENT an IBF for the remote peer to decode.
105    * We expect requests, send elements or could receive an new IBF, which takes
106    * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
107    *
108    * The remote peer is thus in:
109    * PHASE_EXPECT_ELEMENTS
110    */
111   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
112
113   /**
114    * The protocol is over.
115    * Results may still have to be sent to the client.
116    */
117   PHASE_FINISHED
118 };
119
120
121 /**
122  * State of an evaluate operation with another peer.
123  */
124 struct OperationState
125 {
126
127   /**
128    * Copy of the set's strata estimator at the time of
129    * creation of this operation
130    */
131   struct StrataEstimator *se;
132
133   /**
134    * The ibf we currently receive
135    */
136   struct InvertibleBloomFilter *remote_ibf;
137
138   /**
139    * IBF of the set's element.
140    */
141   struct InvertibleBloomFilter *local_ibf;
142
143   /**
144    * Maps IBF-Keys (specific to the current salt) to elements.
145    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
146    * Colliding IBF-Keys are linked.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
149
150   /**
151    * Iterator for sending elements on the key to element mapping to the client.
152    */
153   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
154
155   /**
156    * Current state of the operation.
157    */
158   enum UnionOperationPhase phase;
159
160   /**
161    * Did we send the client that we are done?
162    */
163   int client_done_sent;
164
165   /**
166    * Number of ibf buckets received
167    */
168   unsigned int ibf_buckets_received;
169
170 };
171
172
173 /**
174  * The key entry is used to associate an ibf key with
175  * an element.
176  */
177 struct KeyEntry
178 {
179   /**
180    * IBF key for the entry, derived from the current salt.
181    */
182   struct IBF_Key ibf_key;
183
184   /**
185    * The actual element associated with the key.
186    */
187   struct ElementEntry *element;
188
189   /**
190    * Element that collides with this element
191    * on the ibf key. All colliding entries must have the same ibf key.
192    */
193   struct KeyEntry *next_colliding;
194 };
195
196
197 /**
198  * Used as a closure for sending elements
199  * with a specific IBF key.
200  */
201 struct SendElementClosure
202 {
203   /**
204    * The IBF key whose matching elements should be
205    * sent.
206    */
207   struct IBF_Key ibf_key;
208
209   /**
210    * Operation for which the elements
211    * should be sent.
212    */
213   struct Operation *op;
214 };
215
216
217 /**
218  * Extra state required for efficient set union.
219  */
220 struct SetState
221 {
222   /**
223    * The strata estimator is only generated once for
224    * each set.
225    * The IBF keys are derived from the element hashes with
226    * salt=0.
227    */
228   struct StrataEstimator *se;
229 };
230
231
232 /**
233  * Iterator over hash map entries.
234  *
235  * @param cls closure
236  * @param key current key code
237  * @param value value in the hash map
238  * @return #GNUNET_YES if we should continue to
239  *         iterate,
240  *         #GNUNET_NO if not.
241  */
242 static int
243 destroy_key_to_element_iter (void *cls,
244                              uint32_t key,
245                              void *value)
246 {
247   struct KeyEntry *k = value;
248   /* destroy the linked list of colliding ibf key entries */
249   while (NULL != k)
250   {
251     struct KeyEntry *k_tmp = k;
252     k = k->next_colliding;
253     if (GNUNET_YES == k_tmp->element->remote)
254     {
255       GNUNET_free (k_tmp->element);
256       k_tmp->element = NULL;
257     }
258     GNUNET_free (k_tmp);
259   }
260   return GNUNET_YES;
261 }
262
263
264 /**
265  * Destroy the union operation.  Only things specific to the union operation are destroyed.
266  *
267  * @param op union operation to destroy
268  */
269 static void
270 union_op_cancel (struct Operation *op)
271 {
272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
273   /* check if the op was canceled twice */
274   GNUNET_assert (NULL != op->state);
275   if (NULL != op->state->remote_ibf)
276   {
277     ibf_destroy (op->state->remote_ibf);
278     op->state->remote_ibf = NULL;
279   }
280   if (NULL != op->state->local_ibf)
281   {
282     ibf_destroy (op->state->local_ibf);
283     op->state->local_ibf = NULL;
284   }
285   if (NULL != op->state->se)
286   {
287     strata_estimator_destroy (op->state->se);
288     op->state->se = NULL;
289   }
290   if (NULL != op->state->key_to_element)
291   {
292     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
293     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
294     op->state->key_to_element = NULL;
295   }
296   GNUNET_free (op->state);
297   op->state = NULL;
298   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
299 }
300
301
302 /**
303  * Inform the client that the union operation has failed,
304  * and proceed to destroy the evaluate operation.
305  *
306  * @param op the union operation to fail
307  */
308 static void
309 fail_union_operation (struct Operation *op)
310 {
311   struct GNUNET_MQ_Envelope *ev;
312   struct GNUNET_SET_ResultMessage *msg;
313
314   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
315
316   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
317   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
318   msg->request_id = htonl (op->spec->client_request_id);
319   msg->element_type = htons (0);
320   GNUNET_MQ_send (op->spec->set->client_mq, ev);
321   _GSS_operation_destroy (op, GNUNET_YES);
322 }
323
324
325 /**
326  * Derive the IBF key from a hash code and
327  * a salt.
328  *
329  * @param src the hash code
330  * @param salt salt to use
331  * @return the derived IBF key
332  */
333 static struct IBF_Key
334 get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
335 {
336   struct IBF_Key key;
337
338   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
339                       GCRY_MD_SHA512, GCRY_MD_SHA256,
340                       src, sizeof *src,
341                       &salt, sizeof (salt),
342                       NULL, 0);
343   return key;
344 }
345
346
347 /**
348  * Iterator to create the mapping between ibf keys
349  * and element entries.
350  *
351  * @param cls closure
352  * @param key current key code
353  * @param value value in the hash map
354  * @return #GNUNET_YES if we should continue to
355  *         iterate,
356  *         #GNUNET_NO if not.
357  */
358 static int
359 op_register_element_iterator (void *cls,
360                               uint32_t key,
361                               void *value)
362 {
363   struct KeyEntry *const new_k = cls;
364   struct KeyEntry *old_k = value;
365
366   GNUNET_assert (NULL != old_k);
367   /* check if our ibf key collides with the ibf key in the existing entry */
368   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
369   {
370     /* insert the the new key in the collision chain */
371     new_k->next_colliding = old_k->next_colliding;
372     old_k->next_colliding = new_k;
373     /* signal to the caller that we were able to insert into a colliding bucket */
374     return GNUNET_NO;
375   }
376   return GNUNET_YES;
377 }
378
379
380 /**
381  * Iterator to create the mapping between ibf keys
382  * and element entries.
383  *
384  * @param cls closure
385  * @param key current key code
386  * @param value value in the hash map
387  * @return #GNUNET_YES if we should continue to
388  *         iterate,
389  *         #GNUNET_NO if not.
390  */
391 static int
392 op_has_element_iterator (void *cls,
393                          uint32_t key,
394                          void *value)
395 {
396   struct GNUNET_HashCode *element_hash = cls;
397   struct KeyEntry *k = value;
398
399   GNUNET_assert (NULL != k);
400   while (NULL != k)
401   {
402     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
403       return GNUNET_NO;
404     k = k->next_colliding;
405   }
406   return GNUNET_YES;
407 }
408
409
410 /**
411  * Determine whether the given element is already in the operation's element
412  * set.
413  *
414  * @param op operation that should be tested for 'element_hash'
415  * @param element_hash hash of the element to look for
416  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
417  */
418 static int
419 op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
420 {
421   int ret;
422   struct IBF_Key ibf_key;
423
424   ibf_key = get_ibf_key (element_hash, op->spec->salt);
425   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
426                                                       (uint32_t) ibf_key.key_val,
427                                                       op_has_element_iterator, (void *) element_hash);
428
429   /* was the iteration aborted because we found the element? */
430   if (GNUNET_SYSERR == ret)
431     return GNUNET_YES;
432   return GNUNET_NO;
433 }
434
435
436 /**
437  * Insert an element into the union operation's
438  * key-to-element mapping. Takes ownership of 'ee'.
439  * Note that this does not insert the element in the set,
440  * only in the operation's key-element mapping.
441  * This is done to speed up re-tried operations, if some elements
442  * were transmitted, and then the IBF fails to decode.
443  *
444  * @param op the union operation
445  * @param ee the element entry
446  */
447 static void
448 op_register_element (struct Operation *op,
449                      struct ElementEntry *ee)
450 {
451   int ret;
452   struct IBF_Key ibf_key;
453   struct KeyEntry *k;
454
455   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
456   k = GNUNET_new (struct KeyEntry);
457   k->element = ee;
458   k->ibf_key = ibf_key;
459   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
460                                                       (uint32_t) ibf_key.key_val,
461                                                       op_register_element_iterator, k);
462
463   /* was the element inserted into a colliding bucket? */
464   if (GNUNET_SYSERR == ret)
465     return;
466
467   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
468                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
469 }
470
471
472 /**
473  * Insert a key into an ibf.
474  *
475  * @param cls the ibf
476  * @param key unused
477  * @param value the key entry to get the key from
478  */
479 static int
480 prepare_ibf_iterator (void *cls,
481                       uint32_t key,
482                       void *value)
483 {
484   struct InvertibleBloomFilter *ibf = cls;
485   struct KeyEntry *ke = value;
486
487   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
488
489   ibf_insert (ibf, ke->ibf_key);
490   return GNUNET_YES;
491 }
492
493
494 /**
495  * Iterator for initializing the
496  * key-to-element mapping of a union operation
497  *
498  * @param cls the union operation
499  * @param key unised
500  * @param value the element entry to insert
501  *        into the key-to-element mapping
502  * @return GNUNET_YES to continue iterating,
503  *         GNUNET_NO to stop
504  */
505 static int
506 init_key_to_element_iterator (void *cls,
507                               const struct GNUNET_HashCode *key,
508                               void *value)
509 {
510   struct Operation *op = cls;
511   struct ElementEntry *e = value;
512
513   /* make sure that the element belongs to the set at the time
514    * of creating the operation */
515   if ( (e->generation_added > op->generation_created) ||
516        ( (GNUNET_YES == e->removed) &&
517          (e->generation_removed < op->generation_created)))
518     return GNUNET_YES;
519
520   GNUNET_assert (GNUNET_NO == e->remote);
521
522   op_register_element (op, e);
523   return GNUNET_YES;
524 }
525
526
527 /**
528  * Create an ibf with the operation's elements
529  * of the specified size
530  *
531  * @param op the union operation
532  * @param size size of the ibf to create
533  */
534 static void
535 prepare_ibf (struct Operation *op, uint16_t size)
536 {
537   if (NULL == op->state->key_to_element)
538   {
539     unsigned int len;
540     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
541     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
542     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
543                                            init_key_to_element_iterator, op);
544   }
545   if (NULL != op->state->local_ibf)
546     ibf_destroy (op->state->local_ibf);
547   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
548   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
549                                            prepare_ibf_iterator, op->state->local_ibf);
550 }
551
552
553 /**
554  * Send an ibf of appropriate size.
555  *
556  * @param op the union operation
557  * @param ibf_order order of the ibf to send, size=2^order
558  */
559 static void
560 send_ibf (struct Operation *op, uint16_t ibf_order)
561 {
562   unsigned int buckets_sent = 0;
563   struct InvertibleBloomFilter *ibf;
564
565   prepare_ibf (op, 1<<ibf_order);
566
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
568
569   ibf = op->state->local_ibf;
570
571   while (buckets_sent < (1 << ibf_order))
572   {
573     unsigned int buckets_in_message;
574     struct GNUNET_MQ_Envelope *ev;
575     struct IBFMessage *msg;
576
577     buckets_in_message = (1 << ibf_order) - buckets_sent;
578     /* limit to maximum */
579     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
580       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
581
582     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
583                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
584     msg->reserved = 0;
585     msg->order = ibf_order;
586     msg->offset = htons (buckets_sent);
587     ibf_write_slice (ibf, buckets_sent,
588                      buckets_in_message, &msg[1]);
589     buckets_sent += buckets_in_message;
590     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
591                 buckets_in_message, buckets_sent, 1<<ibf_order);
592     GNUNET_MQ_send (op->mq, ev);
593   }
594
595   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
596 }
597
598
599 /**
600  * Send a strata estimator to the remote peer.
601  *
602  * @param op the union operation with the remote peer
603  */
604 static void
605 send_strata_estimator (struct Operation *op)
606 {
607   struct GNUNET_MQ_Envelope *ev;
608   struct GNUNET_MessageHeader *strata_msg;
609
610   ev = GNUNET_MQ_msg_header_extra (strata_msg,
611                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
612                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
613   strata_estimator_write (op->state->se, &strata_msg[1]);
614   GNUNET_MQ_send (op->mq, ev);
615   op->state->phase = PHASE_EXPECT_IBF;
616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
617 }
618
619
620 /**
621  * Compute the necessary order of an ibf
622  * from the size of the symmetric set difference.
623  *
624  * @param diff the difference
625  * @return the required size of the ibf
626  */
627 static unsigned int
628 get_order_from_difference (unsigned int diff)
629 {
630   unsigned int ibf_order;
631
632   ibf_order = 2;
633   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
634     ibf_order++;
635   if (ibf_order > MAX_IBF_ORDER)
636     ibf_order = MAX_IBF_ORDER;
637   return ibf_order;
638 }
639
640
641 /**
642  * Handle a strata estimator from a remote peer
643  *
644  * @param cls the union operation
645  * @param mh the message
646  */
647 static void
648 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
649 {
650   struct Operation *op = cls;
651   struct StrataEstimator *remote_se;
652   int diff;
653
654   if (op->state->phase != PHASE_EXPECT_SE)
655   {
656     fail_union_operation (op);
657     GNUNET_break (0);
658     return;
659   }
660   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
661                                        SE_IBF_HASH_NUM);
662   strata_estimator_read (&mh[1], remote_se);
663   GNUNET_assert (NULL != op->state->se);
664   diff = strata_estimator_difference (remote_se, op->state->se);
665   strata_estimator_destroy (remote_se);
666   strata_estimator_destroy (op->state->se);
667   op->state->se = NULL;
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
669               diff, 1<<get_order_from_difference (diff));
670   send_ibf (op, get_order_from_difference (diff));
671 }
672
673
674
675 /**
676  * Iterator to send elements to a remote peer
677  *
678  * @param cls closure with the element key and the union operation
679  * @param key ignored
680  * @param value the key entry
681  */
682 static int
683 send_element_iterator (void *cls,
684                        uint32_t key,
685                        void *value)
686 {
687   struct SendElementClosure *sec = cls;
688   struct IBF_Key ibf_key = sec->ibf_key;
689   struct Operation *op = sec->op;
690   struct KeyEntry *ke = value;
691
692   if (ke->ibf_key.key_val != ibf_key.key_val)
693     return GNUNET_YES;
694   while (NULL != ke)
695   {
696     const struct GNUNET_SET_Element *const element = &ke->element->element;
697     struct GNUNET_MQ_Envelope *ev;
698     struct GNUNET_MessageHeader *mh;
699
700     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
701     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
702     if (NULL == ev)
703     {
704       /* element too large */
705       GNUNET_break (0);
706       continue;
707     }
708     memcpy (&mh[1], element->data, element->size);
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
710                 GNUNET_h2s (&ke->element->element_hash));
711     GNUNET_MQ_send (op->mq, ev);
712     ke = ke->next_colliding;
713   }
714   return GNUNET_NO;
715 }
716
717 /**
718  * Send all elements that have the specified IBF key
719  * to the remote peer of the union operation
720  *
721  * @param op union operation
722  * @param ibf_key IBF key of interest
723  */
724 static void
725 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
726 {
727   struct SendElementClosure send_cls;
728
729   send_cls.ibf_key = ibf_key;
730   send_cls.op = op;
731   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
732                                                        (uint32_t) ibf_key.key_val,
733                                                        &send_element_iterator, &send_cls);
734 }
735
736
737 /**
738  * Decode which elements are missing on each side, and
739  * send the appropriate elemens and requests
740  *
741  * @param op union operation
742  */
743 static void
744 decode_and_send (struct Operation *op)
745 {
746   struct IBF_Key key;
747   struct IBF_Key last_key;
748   int side;
749   unsigned int num_decoded;
750   struct InvertibleBloomFilter *diff_ibf;
751
752   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
753
754   prepare_ibf (op, op->state->remote_ibf->size);
755   diff_ibf = ibf_dup (op->state->local_ibf);
756   ibf_subtract (diff_ibf, op->state->remote_ibf);
757
758   ibf_destroy (op->state->remote_ibf);
759   op->state->remote_ibf = NULL;
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
762
763   num_decoded = 0;
764   last_key.key_val = 0;
765
766   while (1)
767   {
768     int res;
769     int cycle_detected = GNUNET_NO;
770
771     last_key = key;
772
773     res = ibf_decode (diff_ibf, &side, &key);
774     if (res == GNUNET_OK)
775     {
776       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
777                   key.key_val);
778       num_decoded += 1;
779       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
780       {
781         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
782                     num_decoded, diff_ibf->size);
783         cycle_detected = GNUNET_YES;
784       }
785     }
786     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
787     {
788       int next_order;
789       next_order = 0;
790       while (1<<next_order < diff_ibf->size)
791         next_order++;
792       next_order++;
793       if (next_order <= MAX_IBF_ORDER)
794       {
795         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796                     "decoding failed, sending larger ibf (size %u)\n",
797                     1<<next_order);
798         send_ibf (op, next_order);
799       }
800       else
801       {
802         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
803                     "set union failed: reached ibf limit\n");
804       }
805       break;
806     }
807     if (GNUNET_NO == res)
808     {
809       struct GNUNET_MQ_Envelope *ev;
810
811       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
812       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
813       GNUNET_MQ_send (op->mq, ev);
814       break;
815     }
816     if (1 == side)
817     {
818       send_elements_for_key (op, key);
819     }
820     else if (-1 == side)
821     {
822       struct GNUNET_MQ_Envelope *ev;
823       struct GNUNET_MessageHeader *msg;
824
825       /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
826        * the effort additional complexity. */
827       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
828                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
829
830       *(struct IBF_Key *) &msg[1] = key;
831       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
832       GNUNET_MQ_send (op->mq, ev);
833     }
834     else
835     {
836       GNUNET_assert (0);
837     }
838   }
839   ibf_destroy (diff_ibf);
840 }
841
842
843 /**
844  * Handle an IBF message from a remote peer.
845  *
846  * @param cls the union operation
847  * @param mh the header of the message
848  */
849 static void
850 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
851 {
852   struct Operation *op = cls;
853   struct IBFMessage *msg = (struct IBFMessage *) mh;
854   unsigned int buckets_in_message;
855
856   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
857        (op->state->phase == PHASE_EXPECT_IBF) )
858   {
859     op->state->phase = PHASE_EXPECT_IBF_CONT;
860     GNUNET_assert (NULL == op->state->remote_ibf);
861     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
862     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
863     op->state->ibf_buckets_received = 0;
864     if (0 != ntohs (msg->offset))
865     {
866       GNUNET_break (0);
867       fail_union_operation (op);
868       return;
869     }
870   }
871   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
872   {
873     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
874          (1<<msg->order != op->state->remote_ibf->size) )
875     {
876       GNUNET_break (0);
877       fail_union_operation (op);
878       return;
879     }
880   }
881
882   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
883
884   if (0 == buckets_in_message)
885   {
886     GNUNET_break_op (0);
887     fail_union_operation (op);
888     return;
889   }
890
891   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
892   {
893     GNUNET_break (0);
894     fail_union_operation (op);
895     return;
896   }
897
898   ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
899   op->state->ibf_buckets_received += buckets_in_message;
900
901   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
902   {
903     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
904     op->state->phase = PHASE_EXPECT_ELEMENTS;
905     decode_and_send (op);
906   }
907 }
908
909
910 /**
911  * Send a result message to the client indicating
912  * that there is a new element.
913  *
914  * @param op union operation
915  * @param element element to send
916  */
917 static void
918 send_client_element (struct Operation *op,
919                      struct GNUNET_SET_Element *element)
920 {
921   struct GNUNET_MQ_Envelope *ev;
922   struct GNUNET_SET_ResultMessage *rm;
923
924   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
925   GNUNET_assert (0 != op->spec->client_request_id);
926   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
927   if (NULL == ev)
928   {
929     GNUNET_MQ_discard (ev);
930     GNUNET_break (0);
931     return;
932   }
933   rm->result_status = htons (GNUNET_SET_STATUS_OK);
934   rm->request_id = htonl (op->spec->client_request_id);
935   rm->element_type = element->element_type;
936   memcpy (&rm[1], element->data, element->size);
937   GNUNET_MQ_send (op->spec->set->client_mq, ev);
938 }
939
940
941 /**
942  * Signal to the client that the operation has finished and
943  * destroy the operation.
944  *
945  * @param cls operation to destroy
946  */
947 static void
948 send_done_and_destroy (void *cls)
949 {
950   struct Operation *op = cls;
951   struct GNUNET_MQ_Envelope *ev;
952   struct GNUNET_SET_ResultMessage *rm;
953   int keep = op->keep;
954
955   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
956   rm->request_id = htonl (op->spec->client_request_id);
957   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
958   rm->element_type = htons (0);
959   GNUNET_MQ_send (op->spec->set->client_mq, ev);
960   _GSS_operation_destroy (op, GNUNET_YES);
961   if (GNUNET_YES == keep)
962     GNUNET_free (op);
963 }
964
965
966 /**
967  * Send all remaining elements in the full result iterator.
968  *
969  * @param cls operation
970  */
971 static void
972 send_remaining_elements (void *cls)
973 {
974   struct Operation *op = cls;
975   struct KeyEntry *ke;
976   int res;
977
978   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
979   if (GNUNET_NO == res)
980   {
981     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
982     send_done_and_destroy (op);
983     return;
984   }
985
986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
987
988   while (1)
989   {
990     struct GNUNET_MQ_Envelope *ev;
991     struct GNUNET_SET_ResultMessage *rm;
992     struct GNUNET_SET_Element *element;
993     element = &ke->element->element;
994
995     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
996     GNUNET_assert (0 != op->spec->client_request_id);
997     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
998     if (NULL == ev)
999     {
1000       GNUNET_MQ_discard (ev);
1001       GNUNET_break (0);
1002       continue;
1003     }
1004     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1005     rm->request_id = htonl (op->spec->client_request_id);
1006     rm->element_type = element->element_type;
1007     memcpy (&rm[1], element->data, element->size);
1008     if (ke->next_colliding == NULL)
1009     {
1010       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1011       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1012       break;
1013     }
1014     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1015     ke = ke->next_colliding;
1016   }
1017 }
1018
1019
1020 /**
1021  * Send a result message to the client indicating
1022  * that the operation is over.
1023  * After the result done message has been sent to the client,
1024  * destroy the evaluate operation.
1025  *
1026  * @param op union operation
1027  */
1028 static void
1029 finish_and_destroy (struct Operation *op)
1030 {
1031   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1032
1033   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1034   {
1035     /* prevent that the op is free'd by the tunnel end handler */
1036     op->keep = GNUNET_YES;
1037     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1038     GNUNET_assert (NULL == op->state->full_result_iter);
1039     op->state->full_result_iter =
1040         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1041     send_remaining_elements (op);
1042     return;
1043   }
1044   send_done_and_destroy (op);
1045 }
1046
1047
1048 /**
1049  * Handle an element message from a remote peer.
1050  *
1051  * @param cls the union operation
1052  * @param mh the message
1053  */
1054 static void
1055 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1056 {
1057   struct Operation *op = cls;
1058   struct ElementEntry *ee;
1059   uint16_t element_size;
1060
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "got element from peer\n");
1063
1064   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1065        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1066   {
1067     fail_union_operation (op);
1068     GNUNET_break (0);
1069     return;
1070   }
1071   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1072   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1073   memcpy (&ee[1], &mh[1], element_size);
1074   ee->element.size = element_size;
1075   ee->element.data = &ee[1];
1076   ee->remote = GNUNET_YES;
1077   GNUNET_CRYPTO_hash (ee->element.data,
1078                       ee->element.size,
1079                       &ee->element_hash);
1080
1081   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1082   {
1083     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084                 "got existing element from peer\n");
1085     GNUNET_free (ee);
1086     return;
1087   }
1088
1089   op_register_element (op, ee);
1090   /* only send results immediately if the client wants it */
1091   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1092     send_client_element (op, &ee->element);
1093 }
1094
1095
1096 /**
1097  * Handle an element request from a remote peer.
1098  *
1099  * @param cls the union operation
1100  * @param mh the message
1101  */
1102 static void
1103 handle_p2p_element_requests (void *cls,
1104                              const struct GNUNET_MessageHeader *mh)
1105 {
1106   struct Operation *op = cls;
1107   struct IBF_Key *ibf_key;
1108   unsigned int num_keys;
1109
1110   /* look up elements and send them */
1111   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1112   {
1113     GNUNET_break (0);
1114     fail_union_operation (op);
1115     return;
1116   }
1117
1118   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1119
1120   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1121   {
1122     GNUNET_break (0);
1123     fail_union_operation (op);
1124     return;
1125   }
1126
1127   ibf_key = (struct IBF_Key *) &mh[1];
1128   while (0 != num_keys--)
1129   {
1130     send_elements_for_key (op, *ibf_key);
1131     ibf_key++;
1132   }
1133 }
1134
1135
1136 /**
1137  * Handle a done message from a remote peer
1138  *
1139  * @param cls the union operation
1140  * @param mh the message
1141  */
1142 static void
1143 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1144 {
1145   struct Operation *op = cls;
1146   struct GNUNET_MQ_Envelope *ev;
1147
1148   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1149   {
1150     /* we got all requests, but still have to send our elements as response */
1151
1152     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1153     op->state->phase = PHASE_FINISHED;
1154     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1155     GNUNET_MQ_send (op->mq, ev);
1156     return;
1157   }
1158   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1159   {
1160     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1161     op->state->phase = PHASE_FINISHED;
1162     finish_and_destroy (op);
1163     return;
1164   }
1165   GNUNET_break (0);
1166   fail_union_operation (op);
1167 }
1168
1169
1170 /**
1171  * Initiate operation to evaluate a set union with a remote peer.
1172  *
1173  * @param op operation to perform (to be initialized)
1174  * @param opaque_context message to be transmitted to the listener
1175  *        to convince him to accept, may be NULL
1176  */
1177 static void
1178 union_evaluate (struct Operation *op,
1179                 const struct GNUNET_MessageHeader *opaque_context)
1180 {
1181   struct GNUNET_MQ_Envelope *ev;
1182   struct OperationRequestMessage *msg;
1183
1184   op->state = GNUNET_new (struct OperationState);
1185   /* copy the current generation's strata estimator for this operation */
1186   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1187   /* we started the operation, thus we have to send the operation request */
1188   op->state->phase = PHASE_EXPECT_SE;
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190               "Initiating union operation evaluation\n");
1191   ev = GNUNET_MQ_msg_nested_mh (msg,
1192                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1193                                 opaque_context);
1194   if (NULL == ev)
1195   {
1196     /* the context message is too large */
1197     GNUNET_break (0);
1198     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1199     return;
1200   }
1201   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1202   msg->app_id = op->spec->app_id;
1203   msg->salt = htonl (op->spec->salt);
1204   GNUNET_MQ_send (op->mq, ev);
1205
1206   if (NULL != opaque_context)
1207     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208                 "sent op request with context message\n");
1209   else
1210     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211                 "sent op request without context message\n");
1212 }
1213
1214
1215 /**
1216  * Accept an union operation request from a remote peer.
1217  * Only initializes the private operation state.
1218  *
1219  * @param op operation that will be accepted as a union operation
1220  */
1221 static void
1222 union_accept (struct Operation *op)
1223 {
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1225   op->state = GNUNET_new (struct OperationState);
1226   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1227   /* kick off the operation */
1228   send_strata_estimator (op);
1229 }
1230
1231
1232 /**
1233  * Create a new set supporting the union operation
1234  *
1235  * We maintain one strata estimator per set and then manipulate it over the
1236  * lifetime of the set, as recreating a strata estimator would be expensive.
1237  *
1238  * @return the newly created set
1239  */
1240 static struct SetState *
1241 union_set_create (void)
1242 {
1243   struct SetState *set_state;
1244
1245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1246
1247   set_state = GNUNET_new (struct SetState);
1248   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1249                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1250   return set_state;
1251 }
1252
1253
1254 /**
1255  * Add the element from the given element message to the set.
1256  *
1257  * @param set_state state of the set want to add to
1258  * @param ee the element to add to the set
1259  */
1260 static void
1261 union_add (struct SetState *set_state, struct ElementEntry *ee)
1262 {
1263   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1264 }
1265
1266
1267 /**
1268  * Remove the element given in the element message from the set.
1269  * Only marks the element as removed, so that older set operations can still exchange it.
1270  *
1271  * @param set_state state of the set to remove from
1272  * @param ee set element to remove
1273  */
1274 static void
1275 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1276 {
1277   strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
1278 }
1279
1280
1281 /**
1282  * Destroy a set that supports the union operation.
1283  *
1284  * @param set_state the set to destroy
1285  */
1286 static void
1287 union_set_destroy (struct SetState *set_state)
1288 {
1289   if (NULL != set_state->se)
1290   {
1291     strata_estimator_destroy (set_state->se);
1292     set_state->se = NULL;
1293   }
1294   GNUNET_free (set_state);
1295 }
1296
1297
1298 /**
1299  * Dispatch messages for a union operation.
1300  *
1301  * @param op the state of the union evaluate operation
1302  * @param mh the received message
1303  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1304  *         GNUNET_OK otherwise
1305  */
1306 int
1307 union_handle_p2p_message (struct Operation *op,
1308                           const struct GNUNET_MessageHeader *mh)
1309 {
1310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1311               ntohs (mh->type), ntohs (mh->size));
1312   switch (ntohs (mh->type))
1313   {
1314     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1315       handle_p2p_ibf (op, mh);
1316       break;
1317     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1318       handle_p2p_strata_estimator (op, mh);
1319       break;
1320     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1321       handle_p2p_elements (op, mh);
1322       break;
1323     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1324       handle_p2p_element_requests (op, mh);
1325       break;
1326     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1327       handle_p2p_done (op, mh);
1328       break;
1329     default:
1330       /* something wrong with cadet's message handlers? */
1331       GNUNET_assert (0);
1332   }
1333   return GNUNET_OK;
1334 }
1335
1336 /**
1337  * handler for peer-disconnects, notifies the client
1338  * about the aborted operation in case the op was not concluded
1339  *
1340  * @param op the destroyed operation
1341  */
1342 static void
1343 union_peer_disconnect (struct Operation *op)
1344 {
1345   if (PHASE_FINISHED != op->state->phase)
1346   {
1347     struct GNUNET_MQ_Envelope *ev;
1348     struct GNUNET_SET_ResultMessage *msg;
1349
1350     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1351     msg->request_id = htonl (op->spec->client_request_id);
1352     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1353     msg->element_type = htons (0);
1354     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1355     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1356                 "other peer disconnected prematurely\n");
1357     _GSS_operation_destroy (op, GNUNET_YES);
1358     return;
1359   }
1360   // else: the session has already been concluded
1361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1362   if (GNUNET_NO == op->state->client_done_sent)
1363     finish_and_destroy (op);
1364 }
1365
1366
1367 /**
1368  * Get the table with implementing functions for
1369  * set union.
1370  *
1371  * @return the operation specific VTable
1372  */
1373 const struct SetVT *
1374 _GSS_union_vt ()
1375 {
1376   static const struct SetVT union_vt = {
1377     .create = &union_set_create,
1378     .msg_handler = &union_handle_p2p_message,
1379     .add = &union_add,
1380     .remove = &union_remove,
1381     .destroy_set = &union_set_destroy,
1382     .evaluate = &union_evaluate,
1383     .accept = &union_accept,
1384     .peer_disconnect = &union_peer_disconnect,
1385     .cancel = &union_op_cancel,
1386   };
1387
1388   return &union_vt;
1389 }