-fixing misc issues and bugs, including better termination logic for intersection...
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 4
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52 /**
53  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54  * Choose this value so that computing the IBF is still cheaper
55  * than transmitting all values.
56  */
57 #define MAX_IBF_ORDER (16)
58
59 /**
60  * Number of buckets used in the ibf per estimated
61  * difference.
62  */
63 #define IBF_ALPHA 4
64
65
66 /**
67  * Current phase we are in for a union operation.
68  */
69 enum UnionOperationPhase
70 {
71   /**
72    * We sent the request message, and expect a strata estimator
73    */
74   PHASE_EXPECT_SE,
75
76   /**
77    * We sent the strata estimator, and expect an IBF. This phase is entered once
78    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79    *
80    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
81    */
82   PHASE_EXPECT_IBF,
83
84   /**
85    * Continuation for multi part IBFs.
86    */
87   PHASE_EXPECT_IBF_CONT,
88
89   /**
90    * We are sending request and elements,
91    * and thus only expect elements from the other peer.
92    *
93    * We are currently decoding an IBF until it can no longer be decoded,
94    * we currently send requests and expect elements
95    * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
96    */
97   PHASE_EXPECT_ELEMENTS,
98
99   /**
100    * We are expecting elements and requests, and send
101    * requested elements back to the other peer.
102    *
103    * We are in this phase if we have SENT an IBF for the remote peer to decode.
104    * We expect requests, send elements or could receive an new IBF, which takes
105    * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
106    *
107    * The remote peer is thus in:
108    * #PHASE_EXPECT_ELEMENTS
109    */
110   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
111
112   /**
113    * The protocol is over.
114    * Results may still have to be sent to the client.
115    */
116   PHASE_FINISHED
117 };
118
119
120 /**
121  * State of an evaluate operation with another peer.
122  */
123 struct OperationState
124 {
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145    * Colliding IBF-Keys are linked.
146    */
147   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148
149   /**
150    * Iterator for sending elements on the key to element mapping to the client.
151    */
152   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153
154   /**
155    * Current state of the operation.
156    */
157   enum UnionOperationPhase phase;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163
164   /**
165    * Number of ibf buckets received
166    */
167   unsigned int ibf_buckets_received;
168
169 };
170
171
172 /**
173  * The key entry is used to associate an ibf key with an element.
174  */
175 struct KeyEntry
176 {
177   /**
178    * IBF key for the entry, derived from the current salt.
179    */
180   struct IBF_Key ibf_key;
181
182   /**
183    * The actual element associated with the key.
184    */
185   struct ElementEntry *element;
186
187   /**
188    * Element that collides with this element
189    * on the ibf key. All colliding entries must have the same ibf key.
190    */
191   struct KeyEntry *next_colliding;
192 };
193
194
195 /**
196  * Used as a closure for sending elements
197  * with a specific IBF key.
198  */
199 struct SendElementClosure
200 {
201   /**
202    * The IBF key whose matching elements should be
203    * sent.
204    */
205   struct IBF_Key ibf_key;
206
207   /**
208    * Operation for which the elements
209    * should be sent.
210    */
211   struct Operation *op;
212 };
213
214
215 /**
216  * Extra state required for efficient set union.
217  */
218 struct SetState
219 {
220   /**
221    * The strata estimator is only generated once for
222    * each set.
223    * The IBF keys are derived from the element hashes with
224    * salt=0.
225    */
226   struct StrataEstimator *se;
227 };
228
229
230 /**
231  * Iterator over hash map entries, called to
232  * destroy the linked list of colliding ibf key entries.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to
238  *         iterate,
239  *         #GNUNET_NO if not.
240  */
241 static int
242 destroy_key_to_element_iter (void *cls,
243                              uint32_t key,
244                              void *value)
245 {
246   struct KeyEntry *k = value;
247
248   while (NULL != k)
249   {
250     struct KeyEntry *k_tmp = k;
251     k = k->next_colliding;
252     if (GNUNET_YES == k_tmp->element->remote)
253     {
254       GNUNET_free (k_tmp->element);
255       k_tmp->element = NULL;
256     }
257     GNUNET_free (k_tmp);
258   }
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Destroy the union operation.  Only things specific to the union
265  * operation are destroyed.
266  *
267  * @param op union operation to destroy
268  */
269 static void
270 union_op_cancel (struct Operation *op)
271 {
272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273               "destroying union op\n");
274   /* check if the op was canceled twice */
275   GNUNET_assert (NULL != op->state);
276   if (NULL != op->state->remote_ibf)
277   {
278     ibf_destroy (op->state->remote_ibf);
279     op->state->remote_ibf = NULL;
280   }
281   if (NULL != op->state->local_ibf)
282   {
283     ibf_destroy (op->state->local_ibf);
284     op->state->local_ibf = NULL;
285   }
286   if (NULL != op->state->se)
287   {
288     strata_estimator_destroy (op->state->se);
289     op->state->se = NULL;
290   }
291   if (NULL != op->state->key_to_element)
292   {
293     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
294                                              &destroy_key_to_element_iter,
295                                              NULL);
296     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
297     op->state->key_to_element = NULL;
298   }
299   GNUNET_free (op->state);
300   op->state = NULL;
301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302               "destroying union op done\n");
303 }
304
305
306 /**
307  * Inform the client that the union operation has failed,
308  * and proceed to destroy the evaluate operation.
309  *
310  * @param op the union operation to fail
311  */
312 static void
313 fail_union_operation (struct Operation *op)
314 {
315   struct GNUNET_MQ_Envelope *ev;
316   struct GNUNET_SET_ResultMessage *msg;
317
318   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
319               "union operation failed\n");
320   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322   msg->request_id = htonl (op->spec->client_request_id);
323   msg->element_type = htons (0);
324   GNUNET_MQ_send (op->spec->set->client_mq, ev);
325   _GSS_operation_destroy (op, GNUNET_YES);
326 }
327
328
329 /**
330  * Derive the IBF key from a hash code and
331  * a salt.
332  *
333  * @param src the hash code
334  * @param salt salt to use
335  * @return the derived IBF key
336  */
337 static struct IBF_Key
338 get_ibf_key (const struct GNUNET_HashCode *src,
339              uint16_t salt)
340 {
341   struct IBF_Key key;
342
343   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
344                       GCRY_MD_SHA512, GCRY_MD_SHA256,
345                       src, sizeof *src,
346                       &salt, sizeof (salt),
347                       NULL, 0);
348   return key;
349 }
350
351
352 /**
353  * Iterator to create the mapping between ibf keys
354  * and element entries.
355  *
356  * @param cls closure
357  * @param key current key code
358  * @param value value in the hash map
359  * @return #GNUNET_YES if we should continue to iterate,
360  *         #GNUNET_NO if not.
361  */
362 static int
363 op_register_element_iterator (void *cls,
364                               uint32_t key,
365                               void *value)
366 {
367   struct KeyEntry *const new_k = cls;
368   struct KeyEntry *old_k = value;
369
370   GNUNET_assert (NULL != old_k);
371   /* check if our ibf key collides with the ibf key in the existing entry */
372   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
373   {
374     /* insert the the new key in the collision chain */
375     new_k->next_colliding = old_k->next_colliding;
376     old_k->next_colliding = new_k;
377     /* signal to the caller that we were able to insert into a colliding bucket */
378     return GNUNET_NO;
379   }
380   return GNUNET_YES;
381 }
382
383
384 /**
385  * Iterator to create the mapping between ibf keys
386  * and element entries.
387  *
388  * @param cls closure
389  * @param key current key code
390  * @param value value in the hash map
391  * @return #GNUNET_YES (we should continue to iterate)
392  */
393 static int
394 op_has_element_iterator (void *cls,
395                          uint32_t key,
396                          void *value)
397 {
398   struct GNUNET_HashCode *element_hash = cls;
399   struct KeyEntry *k = value;
400
401   GNUNET_assert (NULL != k);
402   while (NULL != k)
403   {
404     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
405                                      element_hash))
406       return GNUNET_NO;
407     k = k->next_colliding;
408   }
409   return GNUNET_YES;
410 }
411
412
413 /**
414  * Determine whether the given element is already in the operation's element
415  * set.
416  *
417  * @param op operation that should be tested for 'element_hash'
418  * @param element_hash hash of the element to look for
419  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
420  */
421 static int
422 op_has_element (struct Operation *op,
423                 const struct GNUNET_HashCode *element_hash)
424 {
425   int ret;
426   struct IBF_Key ibf_key;
427
428   ibf_key = get_ibf_key (element_hash, op->spec->salt);
429   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
430                                                       (uint32_t) ibf_key.key_val,
431                                                       op_has_element_iterator,
432                                                       (void *) element_hash);
433
434   /* was the iteration aborted because we found the element? */
435   if (GNUNET_SYSERR == ret)
436     return GNUNET_YES;
437   return GNUNET_NO;
438 }
439
440
441 /**
442  * Insert an element into the union operation's
443  * key-to-element mapping. Takes ownership of 'ee'.
444  * Note that this does not insert the element in the set,
445  * only in the operation's key-element mapping.
446  * This is done to speed up re-tried operations, if some elements
447  * were transmitted, and then the IBF fails to decode.
448  *
449  * @param op the union operation
450  * @param ee the element entry
451  */
452 static void
453 op_register_element (struct Operation *op,
454                      struct ElementEntry *ee)
455 {
456   int ret;
457   struct IBF_Key ibf_key;
458   struct KeyEntry *k;
459
460   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
461   k = GNUNET_new (struct KeyEntry);
462   k->element = ee;
463   k->ibf_key = ibf_key;
464   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465                                                       (uint32_t) ibf_key.key_val,
466                                                       op_register_element_iterator,
467                                                       k);
468
469   /* was the element inserted into a colliding bucket? */
470   if (GNUNET_SYSERR == ret)
471     return;
472   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
473                                        (uint32_t) ibf_key.key_val,
474                                        k,
475                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
476 }
477
478
479 /**
480  * Insert a key into an ibf.
481  *
482  * @param cls the ibf
483  * @param key unused
484  * @param value the key entry to get the key from
485  */
486 static int
487 prepare_ibf_iterator (void *cls,
488                       uint32_t key,
489                       void *value)
490 {
491   struct InvertibleBloomFilter *ibf = cls;
492   struct KeyEntry *ke = value;
493
494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495               "inserting %x into ibf\n",
496               ke->ibf_key.key_val);
497   ibf_insert (ibf, ke->ibf_key);
498   return GNUNET_YES;
499 }
500
501
502 /**
503  * Iterator for initializing the
504  * key-to-element mapping of a union operation
505  *
506  * @param cls the union operation `struct Operation *`
507  * @param key unused
508  * @param value the `struct ElementEntry *` to insert
509  *        into the key-to-element mapping
510  * @return #GNUNET_YES (to continue iterating)
511  */
512 static int
513 init_key_to_element_iterator (void *cls,
514                               const struct GNUNET_HashCode *key,
515                               void *value)
516 {
517   struct Operation *op = cls;
518   struct ElementEntry *e = value;
519
520   /* make sure that the element belongs to the set at the time
521    * of creating the operation */
522   if ( (e->generation_added > op->generation_created) ||
523        ( (GNUNET_YES == e->removed) &&
524          (e->generation_removed < op->generation_created)))
525     return GNUNET_YES;
526
527   GNUNET_assert (GNUNET_NO == e->remote);
528
529   op_register_element (op, e);
530   return GNUNET_YES;
531 }
532
533
534 /**
535  * Create an ibf with the operation's elements
536  * of the specified size
537  *
538  * @param op the union operation
539  * @param size size of the ibf to create
540  */
541 static void
542 prepare_ibf (struct Operation *op,
543              uint16_t size)
544 {
545   if (NULL == op->state->key_to_element)
546   {
547     unsigned int len;
548     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
549     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
551                                            init_key_to_element_iterator, op);
552   }
553   if (NULL != op->state->local_ibf)
554     ibf_destroy (op->state->local_ibf);
555   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
556   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
557                                            &prepare_ibf_iterator,
558                                            op->state->local_ibf);
559 }
560
561
562 /**
563  * Send an ibf of appropriate size.
564  *
565  * @param op the union operation
566  * @param ibf_order order of the ibf to send, size=2^order
567  */
568 static void
569 send_ibf (struct Operation *op,
570           uint16_t ibf_order)
571 {
572   unsigned int buckets_sent = 0;
573   struct InvertibleBloomFilter *ibf;
574
575   prepare_ibf (op, 1<<ibf_order);
576
577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578               "sending ibf of size %u\n",
579               1<<ibf_order);
580
581   ibf = op->state->local_ibf;
582
583   while (buckets_sent < (1 << ibf_order))
584   {
585     unsigned int buckets_in_message;
586     struct GNUNET_MQ_Envelope *ev;
587     struct IBFMessage *msg;
588
589     buckets_in_message = (1 << ibf_order) - buckets_sent;
590     /* limit to maximum */
591     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
592       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
593
594     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
595                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
596     msg->reserved = 0;
597     msg->order = ibf_order;
598     msg->offset = htons (buckets_sent);
599     ibf_write_slice (ibf, buckets_sent,
600                      buckets_in_message, &msg[1]);
601     buckets_sent += buckets_in_message;
602     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603                 "ibf chunk size %u, %u/%u sent\n",
604                 buckets_in_message,
605                 buckets_sent,
606                 1<<ibf_order);
607     GNUNET_MQ_send (op->mq, ev);
608   }
609
610   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
611 }
612
613
614 /**
615  * Send a strata estimator to the remote peer.
616  *
617  * @param op the union operation with the remote peer
618  */
619 static void
620 send_strata_estimator (struct Operation *op)
621 {
622   struct GNUNET_MQ_Envelope *ev;
623   struct GNUNET_MessageHeader *strata_msg;
624
625   ev = GNUNET_MQ_msg_header_extra (strata_msg,
626                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
627                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
628   strata_estimator_write (op->state->se, &strata_msg[1]);
629   GNUNET_MQ_send (op->mq, ev);
630   op->state->phase = PHASE_EXPECT_IBF;
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "sent SE, expecting IBF\n");
633 }
634
635
636 /**
637  * Compute the necessary order of an ibf
638  * from the size of the symmetric set difference.
639  *
640  * @param diff the difference
641  * @return the required size of the ibf
642  */
643 static unsigned int
644 get_order_from_difference (unsigned int diff)
645 {
646   unsigned int ibf_order;
647
648   ibf_order = 2;
649   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
650           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
651     ibf_order++;
652   if (ibf_order > MAX_IBF_ORDER)
653     ibf_order = MAX_IBF_ORDER;
654   return ibf_order;
655 }
656
657
658 /**
659  * Handle a strata estimator from a remote peer
660  *
661  * @param cls the union operation
662  * @param mh the message
663  */
664 static void
665 handle_p2p_strata_estimator (void *cls,
666                              const struct GNUNET_MessageHeader *mh)
667 {
668   struct Operation *op = cls;
669   struct StrataEstimator *remote_se;
670   int diff;
671
672   if (op->state->phase != PHASE_EXPECT_SE)
673   {
674     fail_union_operation (op);
675     GNUNET_break (0);
676     return;
677   }
678   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
679                                        SE_IBF_HASH_NUM);
680   strata_estimator_read (&mh[1], remote_se);
681   GNUNET_assert (NULL != op->state->se);
682   diff = strata_estimator_difference (remote_se, op->state->se);
683   strata_estimator_destroy (remote_se);
684   strata_estimator_destroy (op->state->se);
685   op->state->se = NULL;
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "got se diff=%d, using ibf size %d\n",
688               diff,
689               1<<get_order_from_difference (diff));
690   send_ibf (op,
691             get_order_from_difference (diff));
692 }
693
694
695
696 /**
697  * Iterator to send elements to a remote peer
698  *
699  * @param cls closure with the element key and the union operation
700  * @param key ignored
701  * @param value the key entry
702  */
703 static int
704 send_element_iterator (void *cls,
705                        uint32_t key,
706                        void *value)
707 {
708   struct SendElementClosure *sec = cls;
709   struct IBF_Key ibf_key = sec->ibf_key;
710   struct Operation *op = sec->op;
711   struct KeyEntry *ke = value;
712
713   if (ke->ibf_key.key_val != ibf_key.key_val)
714     return GNUNET_YES;
715   while (NULL != ke)
716   {
717     const struct GNUNET_SET_Element *const element = &ke->element->element;
718     struct GNUNET_MQ_Envelope *ev;
719     struct GNUNET_MessageHeader *mh;
720
721     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
722     ev = GNUNET_MQ_msg_header_extra (mh,
723                                      element->size,
724                                      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
725     if (NULL == ev)
726     {
727       /* element too large */
728       GNUNET_break (0);
729       continue;
730     }
731     memcpy (&mh[1],
732             element->data,
733             element->size);
734     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735                 "sending element (%s) to peer\n",
736                 GNUNET_h2s (&ke->element->element_hash));
737     GNUNET_MQ_send (op->mq, ev);
738     ke = ke->next_colliding;
739   }
740   return GNUNET_NO;
741 }
742
743
744 /**
745  * Send all elements that have the specified IBF key
746  * to the remote peer of the union operation
747  *
748  * @param op union operation
749  * @param ibf_key IBF key of interest
750  */
751 static void
752 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
753 {
754   struct SendElementClosure send_cls;
755
756   send_cls.ibf_key = ibf_key;
757   send_cls.op = op;
758   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
759                                                        (uint32_t) ibf_key.key_val,
760                                                        &send_element_iterator, &send_cls);
761 }
762
763
764 /**
765  * Decode which elements are missing on each side, and
766  * send the appropriate elemens and requests
767  *
768  * @param op union operation
769  */
770 static void
771 decode_and_send (struct Operation *op)
772 {
773   struct IBF_Key key;
774   struct IBF_Key last_key;
775   int side;
776   unsigned int num_decoded;
777   struct InvertibleBloomFilter *diff_ibf;
778
779   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
780
781   prepare_ibf (op, op->state->remote_ibf->size);
782   diff_ibf = ibf_dup (op->state->local_ibf);
783   ibf_subtract (diff_ibf, op->state->remote_ibf);
784
785   ibf_destroy (op->state->remote_ibf);
786   op->state->remote_ibf = NULL;
787
788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789               "decoding IBF (size=%u)\n",
790               diff_ibf->size);
791
792   num_decoded = 0;
793   last_key.key_val = 0;
794
795   while (1)
796   {
797     int res;
798     int cycle_detected = GNUNET_NO;
799
800     last_key = key;
801
802     res = ibf_decode (diff_ibf, &side, &key);
803     if (res == GNUNET_OK)
804     {
805       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806                   "decoded ibf key %lx\n",
807                   key.key_val);
808       num_decoded += 1;
809       if ( (num_decoded > diff_ibf->size) ||
810            (num_decoded > 1 && last_key.key_val == key.key_val) )
811       {
812         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                     "detected cyclic ibf (decoded %u/%u)\n",
814                     num_decoded,
815                     diff_ibf->size);
816         cycle_detected = GNUNET_YES;
817       }
818     }
819     if ( (GNUNET_SYSERR == res) ||
820          (GNUNET_YES == cycle_detected) )
821     {
822       int next_order;
823       next_order = 0;
824       while (1<<next_order < diff_ibf->size)
825         next_order++;
826       next_order++;
827       if (next_order <= MAX_IBF_ORDER)
828       {
829         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830                     "decoding failed, sending larger ibf (size %u)\n",
831                     1<<next_order);
832         send_ibf (op, next_order);
833       }
834       else
835       {
836         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
837                     "set union failed: reached ibf limit\n");
838       }
839       break;
840     }
841     if (GNUNET_NO == res)
842     {
843       struct GNUNET_MQ_Envelope *ev;
844
845       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846                   "transmitted all values, sending DONE\n");
847       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
848       GNUNET_MQ_send (op->mq, ev);
849       break;
850     }
851     if (1 == side)
852     {
853       send_elements_for_key (op, key);
854     }
855     else if (-1 == side)
856     {
857       struct GNUNET_MQ_Envelope *ev;
858       struct GNUNET_MessageHeader *msg;
859
860       /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
861        * the effort additional complexity. */
862       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
863                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
864
865       *(struct IBF_Key *) &msg[1] = key;
866       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867                   "sending element request\n");
868       GNUNET_MQ_send (op->mq, ev);
869     }
870     else
871     {
872       GNUNET_assert (0);
873     }
874   }
875   ibf_destroy (diff_ibf);
876 }
877
878
879 /**
880  * Handle an IBF message from a remote peer.
881  *
882  * @param cls the union operation
883  * @param mh the header of the message
884  */
885 static void
886 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
887 {
888   struct Operation *op = cls;
889   struct IBFMessage *msg = (struct IBFMessage *) mh;
890   unsigned int buckets_in_message;
891
892   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
893        (op->state->phase == PHASE_EXPECT_IBF) )
894   {
895     op->state->phase = PHASE_EXPECT_IBF_CONT;
896     GNUNET_assert (NULL == op->state->remote_ibf);
897     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898                 "creating new ibf of size %u\n",
899                 1<<msg->order);
900     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
901     op->state->ibf_buckets_received = 0;
902     if (0 != ntohs (msg->offset))
903     {
904       GNUNET_break (0);
905       fail_union_operation (op);
906       return;
907     }
908   }
909   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
910   {
911     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
912          (1<<msg->order != op->state->remote_ibf->size) )
913     {
914       GNUNET_break (0);
915       fail_union_operation (op);
916       return;
917     }
918   }
919
920   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
921
922   if (0 == buckets_in_message)
923   {
924     GNUNET_break_op (0);
925     fail_union_operation (op);
926     return;
927   }
928
929   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
930   {
931     GNUNET_break (0);
932     fail_union_operation (op);
933     return;
934   }
935
936   ibf_read_slice (&msg[1],
937                   op->state->ibf_buckets_received,
938                   buckets_in_message,
939                   op->state->remote_ibf);
940   op->state->ibf_buckets_received += buckets_in_message;
941
942   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
943   {
944     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945                 "received full ibf\n");
946     op->state->phase = PHASE_EXPECT_ELEMENTS;
947     decode_and_send (op);
948   }
949 }
950
951
952 /**
953  * Send a result message to the client indicating
954  * that there is a new element.
955  *
956  * @param op union operation
957  * @param element element to send
958  */
959 static void
960 send_client_element (struct Operation *op,
961                      struct GNUNET_SET_Element *element)
962 {
963   struct GNUNET_MQ_Envelope *ev;
964   struct GNUNET_SET_ResultMessage *rm;
965
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967               "sending element (size %u) to client\n",
968               element->size);
969   GNUNET_assert (0 != op->spec->client_request_id);
970   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
971   if (NULL == ev)
972   {
973     GNUNET_MQ_discard (ev);
974     GNUNET_break (0);
975     return;
976   }
977   rm->result_status = htons (GNUNET_SET_STATUS_OK);
978   rm->request_id = htonl (op->spec->client_request_id);
979   rm->element_type = element->element_type;
980   memcpy (&rm[1], element->data, element->size);
981   GNUNET_MQ_send (op->spec->set->client_mq, ev);
982 }
983
984
985 /**
986  * Signal to the client that the operation has finished and
987  * destroy the operation.
988  *
989  * @param cls operation to destroy
990  */
991 static void
992 send_done_and_destroy (void *cls)
993 {
994   struct Operation *op = cls;
995   struct GNUNET_MQ_Envelope *ev;
996   struct GNUNET_SET_ResultMessage *rm;
997   int keep = op->keep;
998
999   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1000   rm->request_id = htonl (op->spec->client_request_id);
1001   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1002   rm->element_type = htons (0);
1003   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1004   _GSS_operation_destroy (op, GNUNET_YES);
1005   if (GNUNET_YES == keep)
1006     GNUNET_free (op);
1007 }
1008
1009
1010 /**
1011  * Send all remaining elements in the full result iterator.
1012  *
1013  * @param cls operation
1014  */
1015 static void
1016 send_remaining_elements (void *cls)
1017 {
1018   struct Operation *op = cls;
1019   struct KeyEntry *ke;
1020   int res;
1021
1022   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter,
1023                                                        NULL,
1024                                                        (const void **) &ke);
1025   if (GNUNET_NO == res)
1026   {
1027     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028                 "sending done and destroy because iterator ran out\n");
1029     send_done_and_destroy (op);
1030     return;
1031   }
1032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033               "sending elements from key entry\n");
1034   while (1)
1035   {
1036     struct GNUNET_MQ_Envelope *ev;
1037     struct GNUNET_SET_ResultMessage *rm;
1038     struct GNUNET_SET_Element *element;
1039     element = &ke->element->element;
1040
1041     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042                 "sending element (size %u) to client (full set)\n",
1043                 element->size);
1044     GNUNET_assert (0 != op->spec->client_request_id);
1045     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1046     if (NULL == ev)
1047     {
1048       GNUNET_MQ_discard (ev);
1049       GNUNET_break (0);
1050       continue;
1051     }
1052     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1053     rm->request_id = htonl (op->spec->client_request_id);
1054     rm->element_type = element->element_type;
1055     memcpy (&rm[1], element->data, element->size);
1056     if (NULL == ke->next_colliding)
1057     {
1058       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1059       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1060       break;
1061     }
1062     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1063     ke = ke->next_colliding;
1064   }
1065 }
1066
1067
1068 /**
1069  * Send a result message to the client indicating
1070  * that the operation is over.
1071  * After the result done message has been sent to the client,
1072  * destroy the evaluate operation.
1073  *
1074  * @param op union operation
1075  */
1076 static void
1077 finish_and_destroy (struct Operation *op)
1078 {
1079   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1080
1081   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1082   {
1083     /* prevent that the op is free'd by the tunnel end handler */
1084     op->keep = GNUNET_YES;
1085     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086                 "sending full result set\n");
1087     GNUNET_assert (NULL == op->state->full_result_iter);
1088     op->state->full_result_iter =
1089         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1090     send_remaining_elements (op);
1091     return;
1092   }
1093   send_done_and_destroy (op);
1094 }
1095
1096
1097 /**
1098  * Handle an element message from a remote peer.
1099  *
1100  * @param cls the union operation
1101  * @param mh the message
1102  */
1103 static void
1104 handle_p2p_elements (void *cls,
1105                      const struct GNUNET_MessageHeader *mh)
1106 {
1107   struct Operation *op = cls;
1108   struct ElementEntry *ee;
1109   uint16_t element_size;
1110
1111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112               "got element from peer\n");
1113
1114   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1115        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1116   {
1117     fail_union_operation (op);
1118     GNUNET_break (0);
1119     return;
1120   }
1121   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1122   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1123   memcpy (&ee[1], &mh[1], element_size);
1124   ee->element.size = element_size;
1125   ee->element.data = &ee[1];
1126   ee->remote = GNUNET_YES;
1127   GNUNET_CRYPTO_hash (ee->element.data,
1128                       ee->element.size,
1129                       &ee->element_hash);
1130
1131   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1132   {
1133     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134                 "got existing element from peer\n");
1135     GNUNET_free (ee);
1136     return;
1137   }
1138
1139   op_register_element (op, ee);
1140   /* only send results immediately if the client wants it */
1141   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1142     send_client_element (op, &ee->element);
1143 }
1144
1145
1146 /**
1147  * Handle an element request from a remote peer.
1148  *
1149  * @param cls the union operation
1150  * @param mh the message
1151  */
1152 static void
1153 handle_p2p_element_requests (void *cls,
1154                              const struct GNUNET_MessageHeader *mh)
1155 {
1156   struct Operation *op = cls;
1157   struct IBF_Key *ibf_key;
1158   unsigned int num_keys;
1159
1160   /* look up elements and send them */
1161   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1162   {
1163     GNUNET_break (0);
1164     fail_union_operation (op);
1165     return;
1166   }
1167
1168   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1169
1170   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1171   {
1172     GNUNET_break (0);
1173     fail_union_operation (op);
1174     return;
1175   }
1176
1177   ibf_key = (struct IBF_Key *) &mh[1];
1178   while (0 != num_keys--)
1179   {
1180     send_elements_for_key (op, *ibf_key);
1181     ibf_key++;
1182   }
1183 }
1184
1185
1186 /**
1187  * Handle a done message from a remote peer
1188  *
1189  * @param cls the union operation
1190  * @param mh the message
1191  */
1192 static void
1193 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1194 {
1195   struct Operation *op = cls;
1196   struct GNUNET_MQ_Envelope *ev;
1197
1198   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1199   {
1200     /* we got all requests, but still have to send our elements as response */
1201
1202     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203                 "got DONE, sending final DONE after elements\n");
1204     op->state->phase = PHASE_FINISHED;
1205     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1206     GNUNET_MQ_send (op->mq, ev);
1207     return;
1208   }
1209   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1210   {
1211     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212                 "got final DONE\n");
1213     op->state->phase = PHASE_FINISHED;
1214     finish_and_destroy (op);
1215     return;
1216   }
1217   GNUNET_break (0);
1218   fail_union_operation (op);
1219 }
1220
1221
1222 /**
1223  * Initiate operation to evaluate a set union with a remote peer.
1224  *
1225  * @param op operation to perform (to be initialized)
1226  * @param opaque_context message to be transmitted to the listener
1227  *        to convince him to accept, may be NULL
1228  */
1229 static void
1230 union_evaluate (struct Operation *op,
1231                 const struct GNUNET_MessageHeader *opaque_context)
1232 {
1233   struct GNUNET_MQ_Envelope *ev;
1234   struct OperationRequestMessage *msg;
1235
1236   op->state = GNUNET_new (struct OperationState);
1237   /* copy the current generation's strata estimator for this operation */
1238   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1239   /* we started the operation, thus we have to send the operation request */
1240   op->state->phase = PHASE_EXPECT_SE;
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "Initiating union operation evaluation\n");
1243   ev = GNUNET_MQ_msg_nested_mh (msg,
1244                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1245                                 opaque_context);
1246   if (NULL == ev)
1247   {
1248     /* the context message is too large */
1249     GNUNET_break (0);
1250     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1251     return;
1252   }
1253   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1254   msg->app_id = op->spec->app_id;
1255   GNUNET_MQ_send (op->mq, ev);
1256
1257   if (NULL != opaque_context)
1258     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259                 "sent op request with context message\n");
1260   else
1261     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                 "sent op request without context message\n");
1263 }
1264
1265
1266 /**
1267  * Accept an union operation request from a remote peer.
1268  * Only initializes the private operation state.
1269  *
1270  * @param op operation that will be accepted as a union operation
1271  */
1272 static void
1273 union_accept (struct Operation *op)
1274 {
1275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276               "accepting set union operation\n");
1277   op->state = GNUNET_new (struct OperationState);
1278   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1279   /* kick off the operation */
1280   send_strata_estimator (op);
1281 }
1282
1283
1284 /**
1285  * Create a new set supporting the union operation
1286  *
1287  * We maintain one strata estimator per set and then manipulate it over the
1288  * lifetime of the set, as recreating a strata estimator would be expensive.
1289  *
1290  * @return the newly created set
1291  */
1292 static struct SetState *
1293 union_set_create (void)
1294 {
1295   struct SetState *set_state;
1296
1297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298               "union set created\n");
1299   set_state = GNUNET_new (struct SetState);
1300   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1301                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1302   return set_state;
1303 }
1304
1305
1306 /**
1307  * Add the element from the given element message to the set.
1308  *
1309  * @param set_state state of the set want to add to
1310  * @param ee the element to add to the set
1311  */
1312 static void
1313 union_add (struct SetState *set_state, struct ElementEntry *ee)
1314 {
1315   strata_estimator_insert (set_state->se,
1316                            get_ibf_key (&ee->element_hash, 0));
1317 }
1318
1319
1320 /**
1321  * Remove the element given in the element message from the set.
1322  * Only marks the element as removed, so that older set operations can still exchange it.
1323  *
1324  * @param set_state state of the set to remove from
1325  * @param ee set element to remove
1326  */
1327 static void
1328 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1329 {
1330   strata_estimator_remove (set_state->se,
1331                            get_ibf_key (&ee->element_hash, 0));
1332 }
1333
1334
1335 /**
1336  * Destroy a set that supports the union operation.
1337  *
1338  * @param set_state the set to destroy
1339  */
1340 static void
1341 union_set_destroy (struct SetState *set_state)
1342 {
1343   if (NULL != set_state->se)
1344   {
1345     strata_estimator_destroy (set_state->se);
1346     set_state->se = NULL;
1347   }
1348   GNUNET_free (set_state);
1349 }
1350
1351
1352 /**
1353  * Dispatch messages for a union operation.
1354  *
1355  * @param op the state of the union evaluate operation
1356  * @param mh the received message
1357  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1358  *         GNUNET_OK otherwise
1359  */
1360 int
1361 union_handle_p2p_message (struct Operation *op,
1362                           const struct GNUNET_MessageHeader *mh)
1363 {
1364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365               "received p2p message (t: %u, s: %u)\n",
1366               ntohs (mh->type),
1367               ntohs (mh->size));
1368   switch (ntohs (mh->type))
1369   {
1370     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1371       handle_p2p_ibf (op, mh);
1372       break;
1373     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1374       handle_p2p_strata_estimator (op, mh);
1375       break;
1376     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1377       handle_p2p_elements (op, mh);
1378       break;
1379     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1380       handle_p2p_element_requests (op, mh);
1381       break;
1382     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1383       handle_p2p_done (op, mh);
1384       break;
1385     default:
1386       /* something wrong with cadet's message handlers? */
1387       GNUNET_assert (0);
1388   }
1389   return GNUNET_OK;
1390 }
1391
1392 /**
1393  * handler for peer-disconnects, notifies the client
1394  * about the aborted operation in case the op was not concluded
1395  *
1396  * @param op the destroyed operation
1397  */
1398 static void
1399 union_peer_disconnect (struct Operation *op)
1400 {
1401   if (PHASE_FINISHED != op->state->phase)
1402   {
1403     struct GNUNET_MQ_Envelope *ev;
1404     struct GNUNET_SET_ResultMessage *msg;
1405
1406     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1407     msg->request_id = htonl (op->spec->client_request_id);
1408     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1409     msg->element_type = htons (0);
1410     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1411     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1412                 "other peer disconnected prematurely\n");
1413     _GSS_operation_destroy (op, GNUNET_YES);
1414     return;
1415   }
1416   // else: the session has already been concluded
1417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418               "other peer disconnected (finished)\n");
1419   if (GNUNET_NO == op->state->client_done_sent)
1420     finish_and_destroy (op);
1421 }
1422
1423
1424 /**
1425  * Get the table with implementing functions for
1426  * set union.
1427  *
1428  * @return the operation specific VTable
1429  */
1430 const struct SetVT *
1431 _GSS_union_vt ()
1432 {
1433   static const struct SetVT union_vt = {
1434     .create = &union_set_create,
1435     .msg_handler = &union_handle_p2p_message,
1436     .add = &union_add,
1437     .remove = &union_remove,
1438     .destroy_set = &union_set_destroy,
1439     .evaluate = &union_evaluate,
1440     .accept = &union_accept,
1441     .peer_disconnect = &union_peer_disconnect,
1442     .cancel = &union_op_cancel,
1443   };
1444
1445   return &union_vt;
1446 }