-rename file
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 4
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52 /**
53  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54  * Choose this value so that computing the IBF is still cheaper
55  * than transmitting all values.
56  */
57 #define MAX_IBF_ORDER (16)
58
59 /**
60  * Number of buckets used in the ibf per estimated
61  * difference.
62  */
63 #define IBF_ALPHA 4
64
65
66 /**
67  * Current phase we are in for a union operation.
68  */
69 enum UnionOperationPhase
70 {
71   /**
72    * We sent the request message, and expect a strata estimator
73    */
74   PHASE_EXPECT_SE,
75
76   /**
77    * We sent the strata estimator, and expect an IBF. This phase is entered once
78    * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79    *
80    * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
81    */
82   PHASE_EXPECT_IBF,
83
84   /**
85    * Continuation for multi part IBFs.
86    */
87   PHASE_EXPECT_IBF_CONT,
88
89   /**
90    * We are sending request and elements,
91    * and thus only expect elements from the other peer.
92    *
93    * We are currently decoding an IBF until it can no longer be decoded,
94    * we currently send requests and expect elements
95    * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
96    */
97   PHASE_EXPECT_ELEMENTS,
98
99   /**
100    * We are expecting elements and requests, and send
101    * requested elements back to the other peer.
102    *
103    * We are in this phase if we have SENT an IBF for the remote peer to decode.
104    * We expect requests, send elements or could receive an new IBF, which takes
105    * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
106    *
107    * The remote peer is thus in:
108    * PHASE_EXPECT_ELEMENTS
109    */
110   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
111
112   /**
113    * The protocol is over.
114    * Results may still have to be sent to the client.
115    */
116   PHASE_FINISHED
117 };
118
119
120 /**
121  * State of an evaluate operation with another peer.
122  */
123 struct OperationState
124 {
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145    * Colliding IBF-Keys are linked.
146    */
147   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148
149   /**
150    * Iterator for sending elements on the key to element mapping to the client.
151    */
152   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153
154   /**
155    * Current state of the operation.
156    */
157   enum UnionOperationPhase phase;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163
164   /**
165    * Number of ibf buckets received
166    */
167   unsigned int ibf_buckets_received;
168
169 };
170
171
172 /**
173  * The key entry is used to associate an ibf key with
174  * an element.
175  */
176 struct KeyEntry
177 {
178   /**
179    * IBF key for the entry, derived from the current salt.
180    */
181   struct IBF_Key ibf_key;
182
183   /**
184    * The actual element associated with the key.
185    */
186   struct ElementEntry *element;
187
188   /**
189    * Element that collides with this element
190    * on the ibf key. All colliding entries must have the same ibf key.
191    */
192   struct KeyEntry *next_colliding;
193 };
194
195
196 /**
197  * Used as a closure for sending elements
198  * with a specific IBF key.
199  */
200 struct SendElementClosure
201 {
202   /**
203    * The IBF key whose matching elements should be
204    * sent.
205    */
206   struct IBF_Key ibf_key;
207
208   /**
209    * Operation for which the elements
210    * should be sent.
211    */
212   struct Operation *op;
213 };
214
215
216 /**
217  * Extra state required for efficient set union.
218  */
219 struct SetState
220 {
221   /**
222    * The strata estimator is only generated once for
223    * each set.
224    * The IBF keys are derived from the element hashes with
225    * salt=0.
226    */
227   struct StrataEstimator *se;
228 };
229
230
231 /**
232  * Iterator over hash map entries.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to
238  *         iterate,
239  *         #GNUNET_NO if not.
240  */
241 static int
242 destroy_key_to_element_iter (void *cls,
243                              uint32_t key,
244                              void *value)
245 {
246   struct KeyEntry *k = value;
247   /* destroy the linked list of colliding ibf key entries */
248   while (NULL != k)
249   {
250     struct KeyEntry *k_tmp = k;
251     k = k->next_colliding;
252     if (GNUNET_YES == k_tmp->element->remote)
253     {
254       GNUNET_free (k_tmp->element);
255       k_tmp->element = NULL;
256     }
257     GNUNET_free (k_tmp);
258   }
259   return GNUNET_YES;
260 }
261
262
263 /**
264  * Destroy the union operation.  Only things specific to the union operation are destroyed.
265  *
266  * @param op union operation to destroy
267  */
268 static void
269 union_op_cancel (struct Operation *op)
270 {
271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
272   /* check if the op was canceled twice */
273   GNUNET_assert (NULL != op->state);
274   if (NULL != op->state->remote_ibf)
275   {
276     ibf_destroy (op->state->remote_ibf);
277     op->state->remote_ibf = NULL;
278   }
279   if (NULL != op->state->local_ibf)
280   {
281     ibf_destroy (op->state->local_ibf);
282     op->state->local_ibf = NULL;
283   }
284   if (NULL != op->state->se)
285   {
286     strata_estimator_destroy (op->state->se);
287     op->state->se = NULL;
288   }
289   if (NULL != op->state->key_to_element)
290   {
291     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
292     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
293     op->state->key_to_element = NULL;
294   }
295   GNUNET_free (op->state);
296   op->state = NULL;
297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
298 }
299
300
301 /**
302  * Inform the client that the union operation has failed,
303  * and proceed to destroy the evaluate operation.
304  *
305  * @param op the union operation to fail
306  */
307 static void
308 fail_union_operation (struct Operation *op)
309 {
310   struct GNUNET_MQ_Envelope *ev;
311   struct GNUNET_SET_ResultMessage *msg;
312
313   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
314
315   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
316   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
317   msg->request_id = htonl (op->spec->client_request_id);
318   msg->element_type = htons (0);
319   GNUNET_MQ_send (op->spec->set->client_mq, ev);
320   _GSS_operation_destroy (op, GNUNET_YES);
321 }
322
323
324 /**
325  * Derive the IBF key from a hash code and
326  * a salt.
327  *
328  * @param src the hash code
329  * @param salt salt to use
330  * @return the derived IBF key
331  */
332 static struct IBF_Key
333 get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
334 {
335   struct IBF_Key key;
336
337   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
338                       GCRY_MD_SHA512, GCRY_MD_SHA256,
339                       src, sizeof *src,
340                       &salt, sizeof (salt),
341                       NULL, 0);
342   return key;
343 }
344
345
346 /**
347  * Iterator to create the mapping between ibf keys
348  * and element entries.
349  *
350  * @param cls closure
351  * @param key current key code
352  * @param value value in the hash map
353  * @return #GNUNET_YES if we should continue to
354  *         iterate,
355  *         #GNUNET_NO if not.
356  */
357 static int
358 op_register_element_iterator (void *cls,
359                               uint32_t key,
360                               void *value)
361 {
362   struct KeyEntry *const new_k = cls;
363   struct KeyEntry *old_k = value;
364
365   GNUNET_assert (NULL != old_k);
366   /* check if our ibf key collides with the ibf key in the existing entry */
367   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
368   {
369     /* insert the the new key in the collision chain */
370     new_k->next_colliding = old_k->next_colliding;
371     old_k->next_colliding = new_k;
372     /* signal to the caller that we were able to insert into a colliding bucket */
373     return GNUNET_NO;
374   }
375   return GNUNET_YES;
376 }
377
378
379 /**
380  * Iterator to create the mapping between ibf keys
381  * and element entries.
382  *
383  * @param cls closure
384  * @param key current key code
385  * @param value value in the hash map
386  * @return #GNUNET_YES if we should continue to
387  *         iterate,
388  *         #GNUNET_NO if not.
389  */
390 static int
391 op_has_element_iterator (void *cls,
392                          uint32_t key,
393                          void *value)
394 {
395   struct GNUNET_HashCode *element_hash = cls;
396   struct KeyEntry *k = value;
397
398   GNUNET_assert (NULL != k);
399   while (NULL != k)
400   {
401     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
402       return GNUNET_NO;
403     k = k->next_colliding;
404   }
405   return GNUNET_YES;
406 }
407
408
409 /**
410  * Determine whether the given element is already in the operation's element
411  * set.
412  *
413  * @param op operation that should be tested for 'element_hash'
414  * @param element_hash hash of the element to look for
415  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
416  */
417 static int
418 op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
419 {
420   int ret;
421   struct IBF_Key ibf_key;
422
423   ibf_key = get_ibf_key (element_hash, op->spec->salt);
424   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
425                                                       (uint32_t) ibf_key.key_val,
426                                                       op_has_element_iterator, (void *) element_hash);
427
428   /* was the iteration aborted because we found the element? */
429   if (GNUNET_SYSERR == ret)
430     return GNUNET_YES;
431   return GNUNET_NO;
432 }
433
434
435 /**
436  * Insert an element into the union operation's
437  * key-to-element mapping. Takes ownership of 'ee'.
438  * Note that this does not insert the element in the set,
439  * only in the operation's key-element mapping.
440  * This is done to speed up re-tried operations, if some elements
441  * were transmitted, and then the IBF fails to decode.
442  *
443  * @param op the union operation
444  * @param ee the element entry
445  */
446 static void
447 op_register_element (struct Operation *op,
448                      struct ElementEntry *ee)
449 {
450   int ret;
451   struct IBF_Key ibf_key;
452   struct KeyEntry *k;
453
454   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
455   k = GNUNET_new (struct KeyEntry);
456   k->element = ee;
457   k->ibf_key = ibf_key;
458   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
459                                                       (uint32_t) ibf_key.key_val,
460                                                       op_register_element_iterator, k);
461
462   /* was the element inserted into a colliding bucket? */
463   if (GNUNET_SYSERR == ret)
464     return;
465
466   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
467                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
468 }
469
470
471 /**
472  * Insert a key into an ibf.
473  *
474  * @param cls the ibf
475  * @param key unused
476  * @param value the key entry to get the key from
477  */
478 static int
479 prepare_ibf_iterator (void *cls,
480                       uint32_t key,
481                       void *value)
482 {
483   struct InvertibleBloomFilter *ibf = cls;
484   struct KeyEntry *ke = value;
485
486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
487
488   ibf_insert (ibf, ke->ibf_key);
489   return GNUNET_YES;
490 }
491
492
493 /**
494  * Iterator for initializing the
495  * key-to-element mapping of a union operation
496  *
497  * @param cls the union operation
498  * @param key unised
499  * @param value the element entry to insert
500  *        into the key-to-element mapping
501  * @return GNUNET_YES to continue iterating,
502  *         GNUNET_NO to stop
503  */
504 static int
505 init_key_to_element_iterator (void *cls,
506                               const struct GNUNET_HashCode *key,
507                               void *value)
508 {
509   struct Operation *op = cls;
510   struct ElementEntry *e = value;
511
512   /* make sure that the element belongs to the set at the time
513    * of creating the operation */
514   if ( (e->generation_added > op->generation_created) ||
515        ( (GNUNET_YES == e->removed) &&
516          (e->generation_removed < op->generation_created)))
517     return GNUNET_YES;
518
519   GNUNET_assert (GNUNET_NO == e->remote);
520
521   op_register_element (op, e);
522   return GNUNET_YES;
523 }
524
525
526 /**
527  * Create an ibf with the operation's elements
528  * of the specified size
529  *
530  * @param op the union operation
531  * @param size size of the ibf to create
532  */
533 static void
534 prepare_ibf (struct Operation *op, uint16_t size)
535 {
536   if (NULL == op->state->key_to_element)
537   {
538     unsigned int len;
539     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
540     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
541     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
542                                            init_key_to_element_iterator, op);
543   }
544   if (NULL != op->state->local_ibf)
545     ibf_destroy (op->state->local_ibf);
546   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
547   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
548                                            prepare_ibf_iterator, op->state->local_ibf);
549 }
550
551
552 /**
553  * Send an ibf of appropriate size.
554  *
555  * @param op the union operation
556  * @param ibf_order order of the ibf to send, size=2^order
557  */
558 static void
559 send_ibf (struct Operation *op, uint16_t ibf_order)
560 {
561   unsigned int buckets_sent = 0;
562   struct InvertibleBloomFilter *ibf;
563
564   prepare_ibf (op, 1<<ibf_order);
565
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
567
568   ibf = op->state->local_ibf;
569
570   while (buckets_sent < (1 << ibf_order))
571   {
572     unsigned int buckets_in_message;
573     struct GNUNET_MQ_Envelope *ev;
574     struct IBFMessage *msg;
575
576     buckets_in_message = (1 << ibf_order) - buckets_sent;
577     /* limit to maximum */
578     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
579       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
580
581     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
582                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
583     msg->reserved = 0;
584     msg->order = ibf_order;
585     msg->offset = htons (buckets_sent);
586     ibf_write_slice (ibf, buckets_sent,
587                      buckets_in_message, &msg[1]);
588     buckets_sent += buckets_in_message;
589     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
590                 buckets_in_message, buckets_sent, 1<<ibf_order);
591     GNUNET_MQ_send (op->mq, ev);
592   }
593
594   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
595 }
596
597
598 /**
599  * Send a strata estimator to the remote peer.
600  *
601  * @param op the union operation with the remote peer
602  */
603 static void
604 send_strata_estimator (struct Operation *op)
605 {
606   struct GNUNET_MQ_Envelope *ev;
607   struct GNUNET_MessageHeader *strata_msg;
608
609   ev = GNUNET_MQ_msg_header_extra (strata_msg,
610                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
611                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
612   strata_estimator_write (op->state->se, &strata_msg[1]);
613   GNUNET_MQ_send (op->mq, ev);
614   op->state->phase = PHASE_EXPECT_IBF;
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
616 }
617
618
619 /**
620  * Compute the necessary order of an ibf
621  * from the size of the symmetric set difference.
622  *
623  * @param diff the difference
624  * @return the required size of the ibf
625  */
626 static unsigned int
627 get_order_from_difference (unsigned int diff)
628 {
629   unsigned int ibf_order;
630
631   ibf_order = 2;
632   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
633     ibf_order++;
634   if (ibf_order > MAX_IBF_ORDER)
635     ibf_order = MAX_IBF_ORDER;
636   return ibf_order;
637 }
638
639
640 /**
641  * Handle a strata estimator from a remote peer
642  *
643  * @param cls the union operation
644  * @param mh the message
645  */
646 static void
647 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
648 {
649   struct Operation *op = cls;
650   struct StrataEstimator *remote_se;
651   int diff;
652
653   if (op->state->phase != PHASE_EXPECT_SE)
654   {
655     fail_union_operation (op);
656     GNUNET_break (0);
657     return;
658   }
659   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
660                                        SE_IBF_HASH_NUM);
661   strata_estimator_read (&mh[1], remote_se);
662   GNUNET_assert (NULL != op->state->se);
663   diff = strata_estimator_difference (remote_se, op->state->se);
664   strata_estimator_destroy (remote_se);
665   strata_estimator_destroy (op->state->se);
666   op->state->se = NULL;
667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
668               diff, 1<<get_order_from_difference (diff));
669   send_ibf (op, get_order_from_difference (diff));
670 }
671
672
673
674 /**
675  * Iterator to send elements to a remote peer
676  *
677  * @param cls closure with the element key and the union operation
678  * @param key ignored
679  * @param value the key entry
680  */
681 static int
682 send_element_iterator (void *cls,
683                        uint32_t key,
684                        void *value)
685 {
686   struct SendElementClosure *sec = cls;
687   struct IBF_Key ibf_key = sec->ibf_key;
688   struct Operation *op = sec->op;
689   struct KeyEntry *ke = value;
690
691   if (ke->ibf_key.key_val != ibf_key.key_val)
692     return GNUNET_YES;
693   while (NULL != ke)
694   {
695     const struct GNUNET_SET_Element *const element = &ke->element->element;
696     struct GNUNET_MQ_Envelope *ev;
697     struct GNUNET_MessageHeader *mh;
698
699     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
700     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
701     if (NULL == ev)
702     {
703       /* element too large */
704       GNUNET_break (0);
705       continue;
706     }
707     memcpy (&mh[1], element->data, element->size);
708     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
709                 GNUNET_h2s (&ke->element->element_hash));
710     GNUNET_MQ_send (op->mq, ev);
711     ke = ke->next_colliding;
712   }
713   return GNUNET_NO;
714 }
715
716 /**
717  * Send all elements that have the specified IBF key
718  * to the remote peer of the union operation
719  *
720  * @param op union operation
721  * @param ibf_key IBF key of interest
722  */
723 static void
724 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
725 {
726   struct SendElementClosure send_cls;
727
728   send_cls.ibf_key = ibf_key;
729   send_cls.op = op;
730   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
731                                                        (uint32_t) ibf_key.key_val,
732                                                        &send_element_iterator, &send_cls);
733 }
734
735
736 /**
737  * Decode which elements are missing on each side, and
738  * send the appropriate elemens and requests
739  *
740  * @param op union operation
741  */
742 static void
743 decode_and_send (struct Operation *op)
744 {
745   struct IBF_Key key;
746   struct IBF_Key last_key;
747   int side;
748   unsigned int num_decoded;
749   struct InvertibleBloomFilter *diff_ibf;
750
751   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
752
753   prepare_ibf (op, op->state->remote_ibf->size);
754   diff_ibf = ibf_dup (op->state->local_ibf);
755   ibf_subtract (diff_ibf, op->state->remote_ibf);
756
757   ibf_destroy (op->state->remote_ibf);
758   op->state->remote_ibf = NULL;
759
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
761
762   num_decoded = 0;
763   last_key.key_val = 0;
764
765   while (1)
766   {
767     int res;
768     int cycle_detected = GNUNET_NO;
769
770     last_key = key;
771
772     res = ibf_decode (diff_ibf, &side, &key);
773     if (res == GNUNET_OK)
774     {
775       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
776                   key.key_val);
777       num_decoded += 1;
778       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
779       {
780         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
781                     num_decoded, diff_ibf->size);
782         cycle_detected = GNUNET_YES;
783       }
784     }
785     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
786     {
787       int next_order;
788       next_order = 0;
789       while (1<<next_order < diff_ibf->size)
790         next_order++;
791       next_order++;
792       if (next_order <= MAX_IBF_ORDER)
793       {
794         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795                     "decoding failed, sending larger ibf (size %u)\n",
796                     1<<next_order);
797         send_ibf (op, next_order);
798       }
799       else
800       {
801         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
802                     "set union failed: reached ibf limit\n");
803       }
804       break;
805     }
806     if (GNUNET_NO == res)
807     {
808       struct GNUNET_MQ_Envelope *ev;
809
810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
811       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
812       GNUNET_MQ_send (op->mq, ev);
813       break;
814     }
815     if (1 == side)
816     {
817       send_elements_for_key (op, key);
818     }
819     else if (-1 == side)
820     {
821       struct GNUNET_MQ_Envelope *ev;
822       struct GNUNET_MessageHeader *msg;
823
824       /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
825        * the effort additional complexity. */
826       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
827                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
828
829       *(struct IBF_Key *) &msg[1] = key;
830       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
831       GNUNET_MQ_send (op->mq, ev);
832     }
833     else
834     {
835       GNUNET_assert (0);
836     }
837   }
838   ibf_destroy (diff_ibf);
839 }
840
841
842 /**
843  * Handle an IBF message from a remote peer.
844  *
845  * @param cls the union operation
846  * @param mh the header of the message
847  */
848 static void
849 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
850 {
851   struct Operation *op = cls;
852   struct IBFMessage *msg = (struct IBFMessage *) mh;
853   unsigned int buckets_in_message;
854
855   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
856        (op->state->phase == PHASE_EXPECT_IBF) )
857   {
858     op->state->phase = PHASE_EXPECT_IBF_CONT;
859     GNUNET_assert (NULL == op->state->remote_ibf);
860     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
861     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
862     op->state->ibf_buckets_received = 0;
863     if (0 != ntohs (msg->offset))
864     {
865       GNUNET_break (0);
866       fail_union_operation (op);
867       return;
868     }
869   }
870   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
871   {
872     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
873          (1<<msg->order != op->state->remote_ibf->size) )
874     {
875       GNUNET_break (0);
876       fail_union_operation (op);
877       return;
878     }
879   }
880
881   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
882
883   if (0 == buckets_in_message)
884   {
885     GNUNET_break_op (0);
886     fail_union_operation (op);
887     return;
888   }
889
890   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
891   {
892     GNUNET_break (0);
893     fail_union_operation (op);
894     return;
895   }
896
897   ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
898   op->state->ibf_buckets_received += buckets_in_message;
899
900   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
901   {
902     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
903     op->state->phase = PHASE_EXPECT_ELEMENTS;
904     decode_and_send (op);
905   }
906 }
907
908
909 /**
910  * Send a result message to the client indicating
911  * that there is a new element.
912  *
913  * @param op union operation
914  * @param element element to send
915  */
916 static void
917 send_client_element (struct Operation *op,
918                      struct GNUNET_SET_Element *element)
919 {
920   struct GNUNET_MQ_Envelope *ev;
921   struct GNUNET_SET_ResultMessage *rm;
922
923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
924   GNUNET_assert (0 != op->spec->client_request_id);
925   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
926   if (NULL == ev)
927   {
928     GNUNET_MQ_discard (ev);
929     GNUNET_break (0);
930     return;
931   }
932   rm->result_status = htons (GNUNET_SET_STATUS_OK);
933   rm->request_id = htonl (op->spec->client_request_id);
934   rm->element_type = element->element_type;
935   memcpy (&rm[1], element->data, element->size);
936   GNUNET_MQ_send (op->spec->set->client_mq, ev);
937 }
938
939
940 /**
941  * Signal to the client that the operation has finished and
942  * destroy the operation.
943  *
944  * @param cls operation to destroy
945  */
946 static void
947 send_done_and_destroy (void *cls)
948 {
949   struct Operation *op = cls;
950   struct GNUNET_MQ_Envelope *ev;
951   struct GNUNET_SET_ResultMessage *rm;
952   int keep = op->keep;
953
954   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
955   rm->request_id = htonl (op->spec->client_request_id);
956   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
957   rm->element_type = htons (0);
958   GNUNET_MQ_send (op->spec->set->client_mq, ev);
959   _GSS_operation_destroy (op, GNUNET_YES);
960   if (GNUNET_YES == keep)
961     GNUNET_free (op);
962 }
963
964
965 /**
966  * Send all remaining elements in the full result iterator.
967  *
968  * @param cls operation
969  */
970 static void
971 send_remaining_elements (void *cls)
972 {
973   struct Operation *op = cls;
974   struct KeyEntry *ke;
975   int res;
976
977   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
978   if (GNUNET_NO == res)
979   {
980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
981     send_done_and_destroy (op);
982     return;
983   }
984
985   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
986
987   while (1)
988   {
989     struct GNUNET_MQ_Envelope *ev;
990     struct GNUNET_SET_ResultMessage *rm;
991     struct GNUNET_SET_Element *element;
992     element = &ke->element->element;
993
994     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
995     GNUNET_assert (0 != op->spec->client_request_id);
996     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
997     if (NULL == ev)
998     {
999       GNUNET_MQ_discard (ev);
1000       GNUNET_break (0);
1001       continue;
1002     }
1003     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1004     rm->request_id = htonl (op->spec->client_request_id);
1005     rm->element_type = element->element_type;
1006     memcpy (&rm[1], element->data, element->size);
1007     if (ke->next_colliding == NULL)
1008     {
1009       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1010       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1011       break;
1012     }
1013     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1014     ke = ke->next_colliding;
1015   }
1016 }
1017
1018
1019 /**
1020  * Send a result message to the client indicating
1021  * that the operation is over.
1022  * After the result done message has been sent to the client,
1023  * destroy the evaluate operation.
1024  *
1025  * @param op union operation
1026  */
1027 static void
1028 finish_and_destroy (struct Operation *op)
1029 {
1030   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1031
1032   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1033   {
1034     /* prevent that the op is free'd by the tunnel end handler */
1035     op->keep = GNUNET_YES;
1036     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1037     GNUNET_assert (NULL == op->state->full_result_iter);
1038     op->state->full_result_iter =
1039         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1040     send_remaining_elements (op);
1041     return;
1042   }
1043   send_done_and_destroy (op);
1044 }
1045
1046
1047 /**
1048  * Handle an element message from a remote peer.
1049  *
1050  * @param cls the union operation
1051  * @param mh the message
1052  */
1053 static void
1054 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1055 {
1056   struct Operation *op = cls;
1057   struct ElementEntry *ee;
1058   uint16_t element_size;
1059
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "got element from peer\n");
1062
1063   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1064        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1065   {
1066     fail_union_operation (op);
1067     GNUNET_break (0);
1068     return;
1069   }
1070   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1071   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1072   memcpy (&ee[1], &mh[1], element_size);
1073   ee->element.size = element_size;
1074   ee->element.data = &ee[1];
1075   ee->remote = GNUNET_YES;
1076   GNUNET_CRYPTO_hash (ee->element.data,
1077                       ee->element.size,
1078                       &ee->element_hash);
1079
1080   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1081   {
1082     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083                 "got existing element from peer\n");
1084     GNUNET_free (ee);
1085     return;
1086   }
1087
1088   op_register_element (op, ee);
1089   /* only send results immediately if the client wants it */
1090   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1091     send_client_element (op, &ee->element);
1092 }
1093
1094
1095 /**
1096  * Handle an element request from a remote peer.
1097  *
1098  * @param cls the union operation
1099  * @param mh the message
1100  */
1101 static void
1102 handle_p2p_element_requests (void *cls,
1103                              const struct GNUNET_MessageHeader *mh)
1104 {
1105   struct Operation *op = cls;
1106   struct IBF_Key *ibf_key;
1107   unsigned int num_keys;
1108
1109   /* look up elements and send them */
1110   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1111   {
1112     GNUNET_break (0);
1113     fail_union_operation (op);
1114     return;
1115   }
1116
1117   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1118
1119   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1120   {
1121     GNUNET_break (0);
1122     fail_union_operation (op);
1123     return;
1124   }
1125
1126   ibf_key = (struct IBF_Key *) &mh[1];
1127   while (0 != num_keys--)
1128   {
1129     send_elements_for_key (op, *ibf_key);
1130     ibf_key++;
1131   }
1132 }
1133
1134
1135 /**
1136  * Handle a done message from a remote peer
1137  *
1138  * @param cls the union operation
1139  * @param mh the message
1140  */
1141 static void
1142 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1143 {
1144   struct Operation *op = cls;
1145   struct GNUNET_MQ_Envelope *ev;
1146
1147   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1148   {
1149     /* we got all requests, but still have to send our elements as response */
1150
1151     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1152     op->state->phase = PHASE_FINISHED;
1153     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1154     GNUNET_MQ_send (op->mq, ev);
1155     return;
1156   }
1157   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1158   {
1159     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1160     op->state->phase = PHASE_FINISHED;
1161     finish_and_destroy (op);
1162     return;
1163   }
1164   GNUNET_break (0);
1165   fail_union_operation (op);
1166 }
1167
1168
1169 /**
1170  * Initiate operation to evaluate a set union with a remote peer.
1171  *
1172  * @param op operation to perform (to be initialized)
1173  * @param opaque_context message to be transmitted to the listener
1174  *        to convince him to accept, may be NULL
1175  */
1176 static void
1177 union_evaluate (struct Operation *op,
1178                 const struct GNUNET_MessageHeader *opaque_context)
1179 {
1180   struct GNUNET_MQ_Envelope *ev;
1181   struct OperationRequestMessage *msg;
1182
1183   op->state = GNUNET_new (struct OperationState);
1184   /* copy the current generation's strata estimator for this operation */
1185   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1186   /* we started the operation, thus we have to send the operation request */
1187   op->state->phase = PHASE_EXPECT_SE;
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Initiating union operation evaluation\n");
1190   ev = GNUNET_MQ_msg_nested_mh (msg,
1191                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1192                                 opaque_context);
1193   if (NULL == ev)
1194   {
1195     /* the context message is too large */
1196     GNUNET_break (0);
1197     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1198     return;
1199   }
1200   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1201   msg->app_id = op->spec->app_id;
1202   msg->salt = htonl (op->spec->salt);
1203   GNUNET_MQ_send (op->mq, ev);
1204
1205   if (NULL != opaque_context)
1206     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207                 "sent op request with context message\n");
1208   else
1209     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210                 "sent op request without context message\n");
1211 }
1212
1213
1214 /**
1215  * Accept an union operation request from a remote peer.
1216  * Only initializes the private operation state.
1217  *
1218  * @param op operation that will be accepted as a union operation
1219  */
1220 static void
1221 union_accept (struct Operation *op)
1222 {
1223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1224   op->state = GNUNET_new (struct OperationState);
1225   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1226   /* kick off the operation */
1227   send_strata_estimator (op);
1228 }
1229
1230
1231 /**
1232  * Create a new set supporting the union operation
1233  *
1234  * We maintain one strata estimator per set and then manipulate it over the
1235  * lifetime of the set, as recreating a strata estimator would be expensive.
1236  *
1237  * @return the newly created set
1238  */
1239 static struct SetState *
1240 union_set_create (void)
1241 {
1242   struct SetState *set_state;
1243
1244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1245
1246   set_state = GNUNET_new (struct SetState);
1247   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1248                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1249   return set_state;
1250 }
1251
1252
1253 /**
1254  * Add the element from the given element message to the set.
1255  *
1256  * @param set_state state of the set want to add to
1257  * @param ee the element to add to the set
1258  */
1259 static void
1260 union_add (struct SetState *set_state, struct ElementEntry *ee)
1261 {
1262   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1263 }
1264
1265
1266 /**
1267  * Remove the element given in the element message from the set.
1268  * Only marks the element as removed, so that older set operations can still exchange it.
1269  *
1270  * @param set_state state of the set to remove from
1271  * @param ee set element to remove
1272  */
1273 static void
1274 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1275 {
1276   strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
1277 }
1278
1279
1280 /**
1281  * Destroy a set that supports the union operation.
1282  *
1283  * @param set_state the set to destroy
1284  */
1285 static void
1286 union_set_destroy (struct SetState *set_state)
1287 {
1288   if (NULL != set_state->se)
1289   {
1290     strata_estimator_destroy (set_state->se);
1291     set_state->se = NULL;
1292   }
1293   GNUNET_free (set_state);
1294 }
1295
1296
1297 /**
1298  * Dispatch messages for a union operation.
1299  *
1300  * @param op the state of the union evaluate operation
1301  * @param mh the received message
1302  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1303  *         GNUNET_OK otherwise
1304  */
1305 int
1306 union_handle_p2p_message (struct Operation *op,
1307                           const struct GNUNET_MessageHeader *mh)
1308 {
1309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1310               ntohs (mh->type), ntohs (mh->size));
1311   switch (ntohs (mh->type))
1312   {
1313     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1314       handle_p2p_ibf (op, mh);
1315       break;
1316     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1317       handle_p2p_strata_estimator (op, mh);
1318       break;
1319     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1320       handle_p2p_elements (op, mh);
1321       break;
1322     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1323       handle_p2p_element_requests (op, mh);
1324       break;
1325     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1326       handle_p2p_done (op, mh);
1327       break;
1328     default:
1329       /* something wrong with cadet's message handlers? */
1330       GNUNET_assert (0);
1331   }
1332   return GNUNET_OK;
1333 }
1334
1335 /**
1336  * handler for peer-disconnects, notifies the client
1337  * about the aborted operation in case the op was not concluded
1338  *
1339  * @param op the destroyed operation
1340  */
1341 static void
1342 union_peer_disconnect (struct Operation *op)
1343 {
1344   if (PHASE_FINISHED != op->state->phase)
1345   {
1346     struct GNUNET_MQ_Envelope *ev;
1347     struct GNUNET_SET_ResultMessage *msg;
1348
1349     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1350     msg->request_id = htonl (op->spec->client_request_id);
1351     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1352     msg->element_type = htons (0);
1353     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1354     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1355                 "other peer disconnected prematurely\n");
1356     _GSS_operation_destroy (op, GNUNET_YES);
1357     return;
1358   }
1359   // else: the session has already been concluded
1360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1361   if (GNUNET_NO == op->state->client_done_sent)
1362     finish_and_destroy (op);
1363 }
1364
1365
1366 /**
1367  * Get the table with implementing functions for
1368  * set union.
1369  *
1370  * @return the operation specific VTable
1371  */
1372 const struct SetVT *
1373 _GSS_union_vt ()
1374 {
1375   static const struct SetVT union_vt = {
1376     .create = &union_set_create,
1377     .msg_handler = &union_handle_p2p_message,
1378     .add = &union_add,
1379     .remove = &union_remove,
1380     .destroy_set = &union_set_destroy,
1381     .evaluate = &union_evaluate,
1382     .accept = &union_accept,
1383     .peer_disconnect = &union_peer_disconnect,
1384     .cancel = &union_op_cancel,
1385   };
1386
1387   return &union_vt;
1388 }