More statistics.
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "gnunet-service-set.h"
29 #include "ibf.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
32 #include <gcrypt.h>
33
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
36
37
38 /**
39  * Number of IBFs in a strata estimator.
40  */
41 #define SE_STRATA_COUNT 32
42
43 /**
44  * Size of the IBFs in the strata estimator.
45  */
46 #define SE_IBF_SIZE 80
47
48 /**
49  * The hash num parameter for the difference digests and strata estimators.
50  */
51 #define SE_IBF_HASH_NUM 4
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (16)
64
65 /**
66  * Number of buckets used in the ibf per estimated
67  * difference.
68  */
69 #define IBF_ALPHA 4
70
71
72 /**
73  * Current phase we are in for a union operation.
74  */
75 enum UnionOperationPhase
76 {
77   /**
78    * We sent the request message, and expect a strata estimator.
79    */
80   PHASE_EXPECT_SE,
81
82   /**
83    * We sent the strata estimator, and expect an IBF. This phase is entered once
84    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
85    *
86    * XXX: could use better wording.
87    *
88    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
89    */
90   PHASE_EXPECT_IBF,
91
92   /**
93    * Continuation for multi part IBFs.
94    */
95   PHASE_EXPECT_IBF_CONT,
96
97   /**
98    * We are decoding an IBF.
99    */
100   PHASE_INVENTORY_ACTIVE,
101
102   /**
103    * The other peer is decoding the IBF we just sent.
104    */
105   PHASE_INVENTORY_PASSIVE,
106
107   /**
108    * The protocol is almost finished, but we still have to flush our message
109    * queue and/or expect some elements.
110    */
111   PHASE_FINISH_CLOSING,
112
113   /**
114    * In the penultimate phase,
115    * we wait until all our demands
116    * are satisfied.  Then we send a done
117    * message, and wait for another done message.*/
118   PHASE_FINISH_WAITING,
119
120   /**
121    * In the ultimate phase, we wait until
122    * our demands are satisfied and then
123    * quit (sending another DONE message). */
124   PHASE_DONE
125 };
126
127
128 /**
129  * State of an evaluate operation with another peer.
130  */
131 struct OperationState
132 {
133   /**
134    * Copy of the set's strata estimator at the time of
135    * creation of this operation.
136    */
137   struct StrataEstimator *se;
138
139   /**
140    * The IBF we currently receive.
141    */
142   struct InvertibleBloomFilter *remote_ibf;
143
144   /**
145    * The IBF with the local set's element.
146    */
147   struct InvertibleBloomFilter *local_ibf;
148
149   /**
150    * Maps IBF-Keys (specific to the current salt) to elements.
151    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
152    * Colliding IBF-Keys are linked.
153    */
154   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
155
156   /**
157    * Current state of the operation.
158    */
159   enum UnionOperationPhase phase;
160
161   /**
162    * Did we send the client that we are done?
163    */
164   int client_done_sent;
165
166   /**
167    * Number of ibf buckets already received into the @a remote_ibf.
168    */
169   unsigned int ibf_buckets_received;
170
171   /**
172    * Hashes for elements that we have demanded from the other peer.
173    */
174   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
175 };
176
177
178 /**
179  * The key entry is used to associate an ibf key with an element.
180  */
181 struct KeyEntry
182 {
183   /**
184    * IBF key for the entry, derived from the current salt.
185    */
186   struct IBF_Key ibf_key;
187
188   /**
189    * The actual element associated with the key.
190    *
191    * Only owned by the union operation if element->operation
192    * is #GNUNET_YES.
193    */
194   struct ElementEntry *element;
195 };
196
197
198 /**
199  * Used as a closure for sending elements
200  * with a specific IBF key.
201  */
202 struct SendElementClosure
203 {
204   /**
205    * The IBF key whose matching elements should be
206    * sent.
207    */
208   struct IBF_Key ibf_key;
209
210   /**
211    * Operation for which the elements
212    * should be sent.
213    */
214   struct Operation *op;
215 };
216
217
218 /**
219  * Extra state required for efficient set union.
220  */
221 struct SetState
222 {
223   /**
224    * The strata estimator is only generated once for
225    * each set.
226    * The IBF keys are derived from the element hashes with
227    * salt=0.
228    */
229   struct StrataEstimator *se;
230 };
231
232
233 /**
234  * Iterator over hash map entries, called to
235  * destroy the linked list of colliding ibf key entries.
236  *
237  * @param cls closure
238  * @param key current key code
239  * @param value value in the hash map
240  * @return #GNUNET_YES if we should continue to iterate,
241  *         #GNUNET_NO if not.
242  */
243 static int
244 destroy_key_to_element_iter (void *cls,
245                              uint32_t key,
246                              void *value)
247 {
248   struct KeyEntry *k = value;
249
250   GNUNET_assert (NULL != k);
251   if (GNUNET_YES == k->element->remote)
252   {
253     GNUNET_free (k->element);
254     k->element = NULL;
255   }
256   GNUNET_free (k);
257   return GNUNET_YES;
258 }
259
260
261 /**
262  * Destroy the union operation.  Only things specific to the union
263  * operation are destroyed.
264  *
265  * @param op union operation to destroy
266  */
267 static void
268 union_op_cancel (struct Operation *op)
269 {
270   LOG (GNUNET_ERROR_TYPE_DEBUG,
271        "destroying union op\n");
272   /* check if the op was canceled twice */
273   GNUNET_assert (NULL != op->state);
274   if (NULL != op->state->remote_ibf)
275   {
276     ibf_destroy (op->state->remote_ibf);
277     op->state->remote_ibf = NULL;
278   }
279   if (NULL != op->state->demanded_hashes)
280   {
281     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
282     op->state->demanded_hashes = NULL;
283   }
284   if (NULL != op->state->local_ibf)
285   {
286     ibf_destroy (op->state->local_ibf);
287     op->state->local_ibf = NULL;
288   }
289   if (NULL != op->state->se)
290   {
291     strata_estimator_destroy (op->state->se);
292     op->state->se = NULL;
293   }
294   if (NULL != op->state->key_to_element)
295   {
296     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
297                                              &destroy_key_to_element_iter,
298                                              NULL);
299     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
300     op->state->key_to_element = NULL;
301   }
302   GNUNET_free (op->state);
303   op->state = NULL;
304   LOG (GNUNET_ERROR_TYPE_DEBUG,
305        "destroying union op done\n");
306 }
307
308
309 /**
310  * Inform the client that the union operation has failed,
311  * and proceed to destroy the evaluate operation.
312  *
313  * @param op the union operation to fail
314  */
315 static void
316 fail_union_operation (struct Operation *op)
317 {
318   struct GNUNET_MQ_Envelope *ev;
319   struct GNUNET_SET_ResultMessage *msg;
320
321   LOG (GNUNET_ERROR_TYPE_ERROR,
322        "union operation failed\n");
323   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
324   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
325   msg->request_id = htonl (op->spec->client_request_id);
326   msg->element_type = htons (0);
327   GNUNET_MQ_send (op->spec->set->client_mq, ev);
328   _GSS_operation_destroy (op, GNUNET_YES);
329 }
330
331
332 /**
333  * Derive the IBF key from a hash code and
334  * a salt.
335  *
336  * @param src the hash code
337  * @param salt salt to use
338  * @return the derived IBF key
339  */
340 static struct IBF_Key
341 get_ibf_key (const struct GNUNET_HashCode *src,
342              uint16_t salt)
343 {
344   struct IBF_Key key;
345
346   /* FIXME: Ensure that the salt is handled correctly.
347      This is a quick fix so that consensus works for now. */
348   salt = 0;
349
350   GNUNET_CRYPTO_kdf (&key, sizeof (key),
351                      src, sizeof *src,
352                      &salt, sizeof (salt),
353                      NULL, 0);
354   return key;
355 }
356
357
358 /**
359  * Iterator over the mapping from IBF keys to element entries.  Checks if we
360  * have an element with a given GNUNET_HashCode.
361  *
362  * @param cls closure
363  * @param key current key code
364  * @param value value in the hash map
365  * @return #GNUNET_YES if we should search further,
366  *         #GNUNET_NO if we've found the element.
367  */
368 static int
369 op_has_element_iterator (void *cls,
370                          uint32_t key,
371                          void *value)
372 {
373   struct GNUNET_HashCode *element_hash = cls;
374   struct KeyEntry *k = value;
375
376   GNUNET_assert (NULL != k);
377   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
378                                    element_hash))
379     return GNUNET_NO;
380   return GNUNET_YES;
381 }
382
383
384 /**
385  * Determine whether the given element is already in the operation's element
386  * set.
387  *
388  * @param op operation that should be tested for 'element_hash'
389  * @param element_hash hash of the element to look for
390  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
391  */
392 static int
393 op_has_element (struct Operation *op,
394                 const struct GNUNET_HashCode *element_hash)
395 {
396   int ret;
397   struct IBF_Key ibf_key;
398
399   ibf_key = get_ibf_key (element_hash, op->spec->salt);
400   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
401                                                       (uint32_t) ibf_key.key_val,
402                                                       op_has_element_iterator,
403                                                       (void *) element_hash);
404
405   /* was the iteration aborted because we found the element? */
406   if (GNUNET_SYSERR == ret)
407     return GNUNET_YES;
408   return GNUNET_NO;
409 }
410
411
412 /**
413  * Insert an element into the union operation's
414  * key-to-element mapping. Takes ownership of 'ee'.
415  * Note that this does not insert the element in the set,
416  * only in the operation's key-element mapping.
417  * This is done to speed up re-tried operations, if some elements
418  * were transmitted, and then the IBF fails to decode.
419  *
420  * XXX: clarify ownership, doesn't sound right.
421  *
422  * @param op the union operation
423  * @param ee the element entry
424  */
425 static void
426 op_register_element (struct Operation *op,
427                      struct ElementEntry *ee)
428 {
429   struct IBF_Key ibf_key;
430   struct KeyEntry *k;
431
432   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
433   k = GNUNET_new (struct KeyEntry);
434   k->element = ee;
435   k->ibf_key = ibf_key;
436   GNUNET_assert (GNUNET_OK ==
437                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
438                                                       (uint32_t) ibf_key.key_val,
439                                                       k,
440                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
441 }
442
443
444 /**
445  * Insert a key into an ibf.
446  *
447  * @param cls the ibf
448  * @param key unused
449  * @param value the key entry to get the key from
450  */
451 static int
452 prepare_ibf_iterator (void *cls,
453                       uint32_t key,
454                       void *value)
455 {
456   struct Operation *op = cls;
457   struct KeyEntry *ke = value;
458
459   LOG (GNUNET_ERROR_TYPE_DEBUG,
460        "[OP %x] inserting %lx (hash %s) into ibf\n",
461        (void *) op,
462        (unsigned long) ke->ibf_key.key_val,
463        GNUNET_h2s (&ke->element->element_hash));
464   ibf_insert (op->state->local_ibf, ke->ibf_key);
465   return GNUNET_YES;
466 }
467
468
469 /**
470  * Iterator for initializing the
471  * key-to-element mapping of a union operation
472  *
473  * @param cls the union operation `struct Operation *`
474  * @param key unused
475  * @param value the `struct ElementEntry *` to insert
476  *        into the key-to-element mapping
477  * @return #GNUNET_YES (to continue iterating)
478  */
479 static int
480 init_key_to_element_iterator (void *cls,
481                               const struct GNUNET_HashCode *key,
482                               void *value)
483 {
484   struct Operation *op = cls;
485   struct ElementEntry *ee = value;
486
487   /* make sure that the element belongs to the set at the time
488    * of creating the operation */
489   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
490     return GNUNET_YES;
491
492   GNUNET_assert (GNUNET_NO == ee->remote);
493
494   op_register_element (op, ee);
495   return GNUNET_YES;
496 }
497
498
499 /**
500  * Create an ibf with the operation's elements
501  * of the specified size
502  *
503  * @param op the union operation
504  * @param size size of the ibf to create
505  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
506  */
507 static int
508 prepare_ibf (struct Operation *op,
509              uint32_t size)
510 {
511   if (NULL == op->state->key_to_element)
512   {
513     unsigned int len;
514
515     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
516     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
517     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
518                                            init_key_to_element_iterator, op);
519   }
520   if (NULL != op->state->local_ibf)
521     ibf_destroy (op->state->local_ibf);
522   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
523   if (NULL == op->state->local_ibf)
524   {
525     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                 "Failed to allocate local IBF\n");
527     return GNUNET_SYSERR;
528   }
529   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
530                                            &prepare_ibf_iterator,
531                                            op);
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Send an ibf of appropriate size.
538  *
539  * Fragments the IBF into multiple messages if necessary.
540  *
541  * @param op the union operation
542  * @param ibf_order order of the ibf to send, size=2^order
543  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
544  */
545 static int
546 send_ibf (struct Operation *op,
547           uint16_t ibf_order)
548 {
549   unsigned int buckets_sent = 0;
550   struct InvertibleBloomFilter *ibf;
551
552   if (GNUNET_OK !=
553       prepare_ibf (op, 1<<ibf_order))
554   {
555     /* allocation failed */
556     return GNUNET_SYSERR;
557   }
558
559   LOG (GNUNET_ERROR_TYPE_DEBUG,
560        "sending ibf of size %u\n",
561        1<<ibf_order);
562
563   {
564     char name[64] = { 0 };
565     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
566     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
567   }
568
569   ibf = op->state->local_ibf;
570
571   while (buckets_sent < (1 << ibf_order))
572   {
573     unsigned int buckets_in_message;
574     struct GNUNET_MQ_Envelope *ev;
575     struct IBFMessage *msg;
576
577     buckets_in_message = (1 << ibf_order) - buckets_sent;
578     /* limit to maximum */
579     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
580       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
581
582     ev = GNUNET_MQ_msg_extra (msg,
583                               buckets_in_message * IBF_BUCKET_SIZE,
584                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
585     msg->reserved = 0;
586     msg->order = ibf_order;
587     msg->offset = htons (buckets_sent);
588     ibf_write_slice (ibf, buckets_sent,
589                      buckets_in_message, &msg[1]);
590     buckets_sent += buckets_in_message;
591     LOG (GNUNET_ERROR_TYPE_DEBUG,
592          "ibf chunk size %u, %u/%u sent\n",
593          buckets_in_message,
594          buckets_sent,
595          1<<ibf_order);
596     GNUNET_MQ_send (op->mq, ev);
597   }
598
599   /* The other peer must decode the IBF, so
600    * we're passive. */
601   op->state->phase = PHASE_INVENTORY_PASSIVE;
602   return GNUNET_OK;
603 }
604
605
606 /**
607  * Send a strata estimator to the remote peer.
608  *
609  * @param op the union operation with the remote peer
610  */
611 static void
612 send_strata_estimator (struct Operation *op)
613 {
614   const struct StrataEstimator *se = op->state->se;
615   struct GNUNET_MQ_Envelope *ev;
616   struct GNUNET_MessageHeader *strata_msg;
617   char *buf;
618   size_t len;
619   uint16_t type;
620
621   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
622   len = strata_estimator_write (op->state->se,
623                                 buf);
624   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
625     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
626   else
627     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
628   ev = GNUNET_MQ_msg_header_extra (strata_msg,
629                                    len,
630                                    type);
631   memcpy (&strata_msg[1],
632           buf,
633           len);
634   GNUNET_free (buf);
635   GNUNET_MQ_send (op->mq,
636                   ev);
637   op->state->phase = PHASE_EXPECT_IBF;
638   LOG (GNUNET_ERROR_TYPE_DEBUG,
639        "sent SE, expecting IBF\n");
640 }
641
642
643 /**
644  * Compute the necessary order of an ibf
645  * from the size of the symmetric set difference.
646  *
647  * @param diff the difference
648  * @return the required size of the ibf
649  */
650 static unsigned int
651 get_order_from_difference (unsigned int diff)
652 {
653   unsigned int ibf_order;
654
655   ibf_order = 2;
656   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
657           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
658     ibf_order++;
659   if (ibf_order > MAX_IBF_ORDER)
660     ibf_order = MAX_IBF_ORDER;
661   return ibf_order;
662 }
663
664
665 /**
666  * Handle a strata estimator from a remote peer
667  *
668  * @param cls the union operation
669  * @param mh the message
670  * @param is_compressed #GNUNET_YES if the estimator is compressed
671  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
672  *         #GNUNET_OK otherwise
673  */
674 static int
675 handle_p2p_strata_estimator (void *cls,
676                              const struct GNUNET_MessageHeader *mh,
677                              int is_compressed)
678 {
679   struct Operation *op = cls;
680   struct StrataEstimator *remote_se;
681   int diff;
682   size_t len;
683
684   if (op->state->phase != PHASE_EXPECT_SE)
685   {
686     fail_union_operation (op);
687     GNUNET_break (0);
688     return GNUNET_SYSERR;
689   }
690   len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
691   if ( (GNUNET_NO == is_compressed) &&
692        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
693   {
694     fail_union_operation (op);
695     GNUNET_break (0);
696     return GNUNET_SYSERR;
697   }
698   remote_se = strata_estimator_create (SE_STRATA_COUNT,
699                                        SE_IBF_SIZE,
700                                        SE_IBF_HASH_NUM);
701   if (NULL == remote_se)
702   {
703     /* insufficient resources, fail */
704     fail_union_operation (op);
705     return GNUNET_SYSERR;
706   }
707   if (GNUNET_OK !=
708       strata_estimator_read (&mh[1],
709                              len,
710                              is_compressed,
711                              remote_se))
712   {
713     /* decompression failed */
714     fail_union_operation (op);
715     return GNUNET_SYSERR;
716   }
717   GNUNET_assert (NULL != op->state->se);
718   diff = strata_estimator_difference (remote_se,
719                                       op->state->se);
720   strata_estimator_destroy (remote_se);
721   strata_estimator_destroy (op->state->se);
722   op->state->se = NULL;
723   LOG (GNUNET_ERROR_TYPE_DEBUG,
724        "got se diff=%d, using ibf size %d\n",
725        diff,
726        1<<get_order_from_difference (diff));
727   if (GNUNET_OK !=
728       send_ibf (op,
729                 get_order_from_difference (diff)))
730   {
731     /* Internal error, best we can do is shut the connection */
732     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733                 "Failed to send IBF, closing connection\n");
734     fail_union_operation (op);
735     return GNUNET_SYSERR;
736   }
737   return GNUNET_OK;
738 }
739
740
741 /**
742  * Iterator to send elements to a remote peer
743  *
744  * @param cls closure with the element key and the union operation
745  * @param key ignored
746  * @param value the key entry
747  */
748 static int
749 send_offers_iterator (void *cls,
750                       uint32_t key,
751                       void *value)
752 {
753   struct SendElementClosure *sec = cls;
754   struct Operation *op = sec->op;
755   struct KeyEntry *ke = value;
756   struct GNUNET_MQ_Envelope *ev;
757   struct GNUNET_MessageHeader *mh;
758
759   /* Detect 32-bit key collision for the 64-bit IBF keys. */
760   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
761     return GNUNET_YES;
762
763   ev = GNUNET_MQ_msg_header_extra (mh,
764                                    sizeof (struct GNUNET_HashCode),
765                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
766
767   GNUNET_assert (NULL != ev);
768   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
769   LOG (GNUNET_ERROR_TYPE_DEBUG,
770        "[OP %x] sending element offer (%s) to peer\n",
771        (void *) op,
772        GNUNET_h2s (&ke->element->element_hash));
773   GNUNET_MQ_send (op->mq, ev);
774   return GNUNET_YES;
775 }
776
777
778 /**
779  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
780  *
781  * @param op union operation
782  * @param ibf_key IBF key of interest
783  */
784 static void
785 send_offers_for_key (struct Operation *op,
786                      struct IBF_Key ibf_key)
787 {
788   struct SendElementClosure send_cls;
789
790   send_cls.ibf_key = ibf_key;
791   send_cls.op = op;
792   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
793                                                        (uint32_t) ibf_key.key_val,
794                                                        &send_offers_iterator,
795                                                        &send_cls);
796 }
797
798
799 /**
800  * Decode which elements are missing on each side, and
801  * send the appropriate offers and inquiries.
802  *
803  * @param op union operation
804  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
805  */
806 static int
807 decode_and_send (struct Operation *op)
808 {
809   struct IBF_Key key;
810   struct IBF_Key last_key;
811   int side;
812   unsigned int num_decoded;
813   struct InvertibleBloomFilter *diff_ibf;
814
815   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
816
817   if (GNUNET_OK !=
818       prepare_ibf (op, op->state->remote_ibf->size))
819   {
820     /* allocation failed */
821     return GNUNET_SYSERR;
822   }
823   diff_ibf = ibf_dup (op->state->local_ibf);
824   ibf_subtract (diff_ibf, op->state->remote_ibf);
825
826   ibf_destroy (op->state->remote_ibf);
827   op->state->remote_ibf = NULL;
828
829   LOG (GNUNET_ERROR_TYPE_DEBUG,
830        "decoding IBF (size=%u)\n",
831        diff_ibf->size);
832
833   num_decoded = 0;
834   last_key.key_val = 0;
835
836   while (1)
837   {
838     int res;
839     int cycle_detected = GNUNET_NO;
840
841     last_key = key;
842
843     res = ibf_decode (diff_ibf, &side, &key);
844     if (res == GNUNET_OK)
845     {
846       LOG (GNUNET_ERROR_TYPE_DEBUG,
847            "decoded ibf key %lx\n",
848            (unsigned long) key.key_val);
849       num_decoded += 1;
850       if ( (num_decoded > diff_ibf->size) ||
851            (num_decoded > 1 && last_key.key_val == key.key_val) )
852       {
853         LOG (GNUNET_ERROR_TYPE_DEBUG,
854              "detected cyclic ibf (decoded %u/%u)\n",
855              num_decoded,
856              diff_ibf->size);
857         cycle_detected = GNUNET_YES;
858       }
859     }
860     if ( (GNUNET_SYSERR == res) ||
861          (GNUNET_YES == cycle_detected) )
862     {
863       int next_order;
864       next_order = 0;
865       while (1<<next_order < diff_ibf->size)
866         next_order++;
867       next_order++;
868       if (next_order <= MAX_IBF_ORDER)
869       {
870         LOG (GNUNET_ERROR_TYPE_DEBUG,
871              "decoding failed, sending larger ibf (size %u)\n",
872              1<<next_order);
873         GNUNET_STATISTICS_update (_GSS_statistics,
874                                   "# of IBF retries",
875                                   1,
876                                   GNUNET_NO);
877         if (GNUNET_OK !=
878             send_ibf (op, next_order))
879         {
880           /* Internal error, best we can do is shut the connection */
881           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
882                       "Failed to send IBF, closing connection\n");
883           fail_union_operation (op);
884           ibf_destroy (diff_ibf);
885           return GNUNET_SYSERR;
886         }
887       }
888       else
889       {
890         GNUNET_STATISTICS_update (_GSS_statistics,
891                                   "# of failed union operations (too large)",
892                                   1,
893                                   GNUNET_NO);
894         // XXX: Send the whole set, element-by-element
895         LOG (GNUNET_ERROR_TYPE_ERROR,
896              "set union failed: reached ibf limit\n");
897         fail_union_operation (op);
898         ibf_destroy (diff_ibf);
899         return GNUNET_SYSERR;
900       }
901       break;
902     }
903     if (GNUNET_NO == res)
904     {
905       struct GNUNET_MQ_Envelope *ev;
906
907       LOG (GNUNET_ERROR_TYPE_DEBUG,
908            "transmitted all values, sending DONE\n");
909       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
910       GNUNET_MQ_send (op->mq, ev);
911       /* We now wait until we get a DONE message back
912        * and then wait for our MQ to be flushed and all our
913        * demands be delivered. */
914       break;
915     }
916     if (1 == side)
917     {
918       send_offers_for_key (op, key);
919     }
920     else if (-1 == side)
921     {
922       struct GNUNET_MQ_Envelope *ev;
923       struct GNUNET_MessageHeader *msg;
924
925       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
926        * the effort additional complexity. */
927       ev = GNUNET_MQ_msg_header_extra (msg,
928                                        sizeof (struct IBF_Key),
929                                        GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
930
931       memcpy (&msg[1],
932               &key,
933               sizeof (struct IBF_Key));
934       LOG (GNUNET_ERROR_TYPE_DEBUG,
935            "sending element inquiry for IBF key %lx\n",
936            (unsigned long) key.key_val);
937       GNUNET_MQ_send (op->mq, ev);
938     }
939     else
940     {
941       GNUNET_assert (0);
942     }
943   }
944   ibf_destroy (diff_ibf);
945   return GNUNET_OK;
946 }
947
948
949 /**
950  * Handle an IBF message from a remote peer.
951  *
952  * Reassemble the IBF from multiple pieces, and
953  * process the whole IBF once possible.
954  *
955  * @param cls the union operation
956  * @param mh the header of the message
957  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
958  *         #GNUNET_OK otherwise
959  */
960 static int
961 handle_p2p_ibf (void *cls,
962                 const struct GNUNET_MessageHeader *mh)
963 {
964   struct Operation *op = cls;
965   const struct IBFMessage *msg;
966   unsigned int buckets_in_message;
967
968   if (ntohs (mh->size) < sizeof (struct IBFMessage))
969   {
970     GNUNET_break_op (0);
971     fail_union_operation (op);
972     return GNUNET_SYSERR;
973   }
974   msg = (const struct IBFMessage *) mh;
975   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
976        (op->state->phase == PHASE_EXPECT_IBF) )
977   {
978     op->state->phase = PHASE_EXPECT_IBF_CONT;
979     GNUNET_assert (NULL == op->state->remote_ibf);
980     LOG (GNUNET_ERROR_TYPE_DEBUG,
981          "Creating new ibf of size %u\n",
982          1 << msg->order);
983     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
984     if (NULL == op->state->remote_ibf)
985     {
986       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
987                   "Failed to parse remote IBF, closing connection\n");
988       fail_union_operation (op);
989       return GNUNET_SYSERR;
990     }
991     op->state->ibf_buckets_received = 0;
992     if (0 != ntohs (msg->offset))
993     {
994       GNUNET_break_op (0);
995       fail_union_operation (op);
996       return GNUNET_SYSERR;
997     }
998   }
999   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1000   {
1001     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
1002          (1<<msg->order != op->state->remote_ibf->size) )
1003     {
1004       GNUNET_break_op (0);
1005       fail_union_operation (op);
1006       return GNUNET_SYSERR;
1007     }
1008   }
1009   else
1010   {
1011     GNUNET_assert (0);
1012   }
1013
1014   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1015
1016   if (0 == buckets_in_message)
1017   {
1018     GNUNET_break_op (0);
1019     fail_union_operation (op);
1020     return GNUNET_SYSERR;
1021   }
1022
1023   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1024   {
1025     GNUNET_break_op (0);
1026     fail_union_operation (op);
1027     return GNUNET_SYSERR;
1028   }
1029
1030   GNUNET_assert (NULL != op->state->remote_ibf);
1031
1032   ibf_read_slice (&msg[1],
1033                   op->state->ibf_buckets_received,
1034                   buckets_in_message,
1035                   op->state->remote_ibf);
1036   op->state->ibf_buckets_received += buckets_in_message;
1037
1038   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1039   {
1040     LOG (GNUNET_ERROR_TYPE_DEBUG,
1041          "received full ibf\n");
1042     op->state->phase = PHASE_INVENTORY_ACTIVE;
1043     if (GNUNET_OK !=
1044         decode_and_send (op))
1045     {
1046       /* Internal error, best we can do is shut down */
1047       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1048                   "Failed to decode IBF, closing connection\n");
1049       return GNUNET_SYSERR;
1050     }
1051   }
1052   return GNUNET_OK;
1053 }
1054
1055
1056 /**
1057  * Send a result message to the client indicating
1058  * that there is a new element.
1059  *
1060  * @param op union operation
1061  * @param element element to send
1062  * @param status status to send with the new element
1063  */
1064 static void
1065 send_client_element (struct Operation *op,
1066                      struct GNUNET_SET_Element *element,
1067                      int status)
1068 {
1069   struct GNUNET_MQ_Envelope *ev;
1070   struct GNUNET_SET_ResultMessage *rm;
1071
1072   LOG (GNUNET_ERROR_TYPE_DEBUG,
1073        "sending element (size %u) to client\n",
1074        element->size);
1075   GNUNET_assert (0 != op->spec->client_request_id);
1076   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1077   if (NULL == ev)
1078   {
1079     GNUNET_MQ_discard (ev);
1080     GNUNET_break (0);
1081     return;
1082   }
1083   rm->result_status = htons (status);
1084   rm->request_id = htonl (op->spec->client_request_id);
1085   rm->element_type = element->element_type;
1086   memcpy (&rm[1], element->data, element->size);
1087   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1088 }
1089
1090
1091 /**
1092  * Signal to the client that the operation has finished and
1093  * destroy the operation.
1094  *
1095  * @param cls operation to destroy
1096  */
1097 static void
1098 send_done_and_destroy (void *cls)
1099 {
1100   struct Operation *op = cls;
1101   struct GNUNET_MQ_Envelope *ev;
1102   struct GNUNET_SET_ResultMessage *rm;
1103
1104   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1105   rm->request_id = htonl (op->spec->client_request_id);
1106   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1107   rm->element_type = htons (0);
1108   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1109   /* Will also call the union-specific cancel function. */
1110   _GSS_operation_destroy (op, GNUNET_YES);
1111 }
1112
1113
1114 static void
1115 maybe_finish (struct Operation *op)
1116 {
1117   unsigned int num_demanded;
1118
1119   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1120
1121   if (PHASE_FINISH_WAITING == op->state->phase)
1122   {
1123     LOG (GNUNET_ERROR_TYPE_DEBUG,
1124          "In PHASE_FINISH_WAITING, pending %u demands\n",
1125          num_demanded);
1126     if (0 == num_demanded)
1127     {
1128       struct GNUNET_MQ_Envelope *ev;
1129
1130       op->state->phase = PHASE_DONE;
1131       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1132       GNUNET_MQ_send (op->mq, ev);
1133
1134       /* We now wait until the other peer closes the channel
1135        * after it got all elements from us. */
1136     }
1137   }
1138   if (PHASE_FINISH_CLOSING == op->state->phase)
1139   {
1140     LOG (GNUNET_ERROR_TYPE_DEBUG,
1141          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1142          num_demanded);
1143     if (0 == num_demanded)
1144     {
1145       op->state->phase = PHASE_DONE;
1146       send_done_and_destroy (op);
1147     }
1148   }
1149 }
1150
1151
1152 /**
1153  * Handle an element message from a remote peer.
1154  *
1155  * @param cls the union operation
1156  * @param mh the message
1157  */
1158 static void
1159 handle_p2p_elements (void *cls,
1160                      const struct GNUNET_MessageHeader *mh)
1161 {
1162   struct Operation *op = cls;
1163   struct ElementEntry *ee;
1164   const struct GNUNET_SET_ElementMessage *emsg;
1165   uint16_t element_size;
1166
1167   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1168   {
1169     GNUNET_break_op (0);
1170     fail_union_operation (op);
1171     return;
1172   }
1173   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1174   {
1175     GNUNET_break_op (0);
1176     fail_union_operation (op);
1177     return;
1178   }
1179
1180   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1181
1182   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1183   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1184   memcpy (&ee[1], &emsg[1], element_size);
1185   ee->element.size = element_size;
1186   ee->element.data = &ee[1];
1187   ee->element.element_type = ntohs (emsg->element_type);
1188   ee->remote = GNUNET_YES;
1189   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1190
1191   if (GNUNET_NO ==
1192       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1193                                             &ee->element_hash,
1194                                             NULL))
1195   {
1196     /* We got something we didn't demand, since it's not in our map. */
1197     GNUNET_break_op (0);
1198     GNUNET_free (ee);
1199     fail_union_operation (op);
1200     return;
1201   }
1202
1203   LOG (GNUNET_ERROR_TYPE_DEBUG,
1204        "Got element (size %u, hash %s) from peer\n",
1205        (unsigned int) element_size,
1206        GNUNET_h2s (&ee->element_hash));
1207
1208   GNUNET_STATISTICS_update (_GSS_statistics,
1209                             "# received elements",
1210                             1,
1211                             GNUNET_NO);
1212   GNUNET_STATISTICS_update (_GSS_statistics,
1213                             "# exchanged elements",
1214                             1,
1215                             GNUNET_NO);
1216
1217   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1218   {
1219     /* Got repeated element.  Should not happen since
1220      * we track demands. */
1221     GNUNET_STATISTICS_update (_GSS_statistics,
1222                               "# repeated elements",
1223                               1,
1224                               GNUNET_NO);
1225     GNUNET_free (ee);
1226   }
1227   else
1228   {
1229     LOG (GNUNET_ERROR_TYPE_DEBUG,
1230          "Registering new element from remote peer\n");
1231     op_register_element (op, ee);
1232     /* only send results immediately if the client wants it */
1233     switch (op->spec->result_mode)
1234     {
1235       case GNUNET_SET_RESULT_ADDED:
1236         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1237         break;
1238       case GNUNET_SET_RESULT_SYMMETRIC:
1239         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1240         break;
1241       default:
1242         /* Result mode not supported, should have been caught earlier. */
1243         GNUNET_break (0);
1244         break;
1245     }
1246   }
1247
1248   maybe_finish (op);
1249 }
1250
1251
1252 /**
1253  * Send offers (for GNUNET_Hash-es) in response
1254  * to inquiries (for IBF_Key-s).
1255  *
1256  * @param cls the union operation
1257  * @param mh the message
1258  */
1259 static void
1260 handle_p2p_inquiry (void *cls,
1261                     const struct GNUNET_MessageHeader *mh)
1262 {
1263   struct Operation *op = cls;
1264   const struct IBF_Key *ibf_key;
1265   unsigned int num_keys;
1266
1267   /* look up elements and send them */
1268   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1269   {
1270     GNUNET_break_op (0);
1271     fail_union_operation (op);
1272     return;
1273   }
1274   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1275       / sizeof (struct IBF_Key);
1276   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1277       != num_keys * sizeof (struct IBF_Key))
1278   {
1279     GNUNET_break_op (0);
1280     fail_union_operation (op);
1281     return;
1282   }
1283
1284   ibf_key = (const struct IBF_Key *) &mh[1];
1285   while (0 != num_keys--)
1286   {
1287     send_offers_for_key (op, *ibf_key);
1288     ibf_key++;
1289   }
1290 }
1291
1292
1293 /**
1294  * FIXME
1295  */
1296 static void
1297 handle_p2p_demand (void *cls,
1298                    const struct GNUNET_MessageHeader *mh)
1299 {
1300   struct Operation *op = cls;
1301   struct ElementEntry *ee;
1302   struct GNUNET_SET_ElementMessage *emsg;
1303   const struct GNUNET_HashCode *hash;
1304   unsigned int num_hashes;
1305   struct GNUNET_MQ_Envelope *ev;
1306
1307   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1308     / sizeof (struct GNUNET_HashCode);
1309   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1310       != num_hashes * sizeof (struct GNUNET_HashCode))
1311   {
1312     GNUNET_break_op (0);
1313     fail_union_operation (op);
1314     return;
1315   }
1316
1317   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1318        num_hashes > 0;
1319        hash++, num_hashes--)
1320   {
1321     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1322     if (NULL == ee)
1323     {
1324       /* Demand for non-existing element. */
1325       GNUNET_break_op (0);
1326       fail_union_operation (op);
1327       return;
1328     }
1329     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1330     {
1331       /* Probably confused lazily copied sets. */
1332       GNUNET_break_op (0);
1333       fail_union_operation (op);
1334       return;
1335     }
1336     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1337     memcpy (&emsg[1], ee->element.data, ee->element.size);
1338     emsg->reserved = htons (0);
1339     emsg->element_type = htons (ee->element.element_type);
1340     LOG (GNUNET_ERROR_TYPE_DEBUG,
1341          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1342          (void *) op,
1343          (unsigned int) ee->element.size,
1344          GNUNET_h2s (&ee->element_hash));
1345     GNUNET_MQ_send (op->mq, ev);
1346     GNUNET_STATISTICS_update (_GSS_statistics,
1347                               "# exchanged elements",
1348                               1,
1349                               GNUNET_NO);
1350
1351     switch (op->spec->result_mode)
1352     {
1353       case GNUNET_SET_RESULT_ADDED:
1354         /* Nothing to do. */
1355         break;
1356       case GNUNET_SET_RESULT_SYMMETRIC:
1357         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1358         break;
1359       default:
1360         /* Result mode not supported, should have been caught earlier. */
1361         GNUNET_break (0);
1362         break;
1363     }
1364   }
1365 }
1366
1367
1368 /**
1369  * Handle offers (of GNUNET_HashCode-s) and
1370  * respond with demands (of GNUNET_HashCode-s).
1371  *
1372  * @param cls the union operation
1373  * @param mh the message
1374  */
1375 static void
1376 handle_p2p_offer (void *cls,
1377                     const struct GNUNET_MessageHeader *mh)
1378 {
1379   struct Operation *op = cls;
1380   const struct GNUNET_HashCode *hash;
1381   unsigned int num_hashes;
1382
1383   /* look up elements and send them */
1384   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1385        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1386   {
1387     GNUNET_break_op (0);
1388     fail_union_operation (op);
1389     return;
1390   }
1391   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1392     / sizeof (struct GNUNET_HashCode);
1393   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1394       != num_hashes * sizeof (struct GNUNET_HashCode))
1395   {
1396     GNUNET_break_op (0);
1397     fail_union_operation (op);
1398     return;
1399   }
1400
1401   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1402        num_hashes > 0;
1403        hash++, num_hashes--)
1404   {
1405     struct ElementEntry *ee;
1406     struct GNUNET_MessageHeader *demands;
1407     struct GNUNET_MQ_Envelope *ev;
1408
1409     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1410                                             hash);
1411     if (NULL != ee)
1412       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1413         continue;
1414
1415     if (GNUNET_YES ==
1416         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1417                                                 hash))
1418     {
1419       LOG (GNUNET_ERROR_TYPE_DEBUG,
1420            "Skipped sending duplicate demand\n");
1421       continue;
1422     }
1423
1424     GNUNET_assert (GNUNET_OK ==
1425                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1426                                                       hash,
1427                                                       NULL,
1428                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1429
1430     LOG (GNUNET_ERROR_TYPE_DEBUG,
1431          "[OP %x] Requesting element (hash %s)\n",
1432          (void *) op, GNUNET_h2s (hash));
1433     ev = GNUNET_MQ_msg_header_extra (demands,
1434                                      sizeof (struct GNUNET_HashCode),
1435                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1436     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1437     GNUNET_MQ_send (op->mq, ev);
1438   }
1439 }
1440
1441
1442 /**
1443  * Handle a done message from a remote peer
1444  *
1445  * @param cls the union operation
1446  * @param mh the message
1447  */
1448 static void
1449 handle_p2p_done (void *cls,
1450                  const struct GNUNET_MessageHeader *mh)
1451 {
1452   struct Operation *op = cls;
1453
1454   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1455   {
1456     /* We got all requests, but still have to send our elements in response. */
1457
1458     op->state->phase = PHASE_FINISH_WAITING;
1459
1460     LOG (GNUNET_ERROR_TYPE_DEBUG,
1461          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1462     /* The active peer is done sending offers
1463      * and inquiries.  This means that all
1464      * our responses to that (demands and offers)
1465      * must be in flight (queued or in mesh).
1466      *
1467      * We should notify the active peer once
1468      * all our demands are satisfied, so that the active
1469      * peer can quit if we gave him everything.
1470      */
1471     maybe_finish (op);
1472     return;
1473   }
1474   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1475   {
1476     LOG (GNUNET_ERROR_TYPE_DEBUG,
1477          "got DONE (as active partner), waiting to finish\n");
1478     /* All demands of the other peer are satisfied,
1479      * and we processed all offers, thus we know
1480      * exactly what our demands must be.
1481      *
1482      * We'll close the channel
1483      * to the other peer once our demands are met.
1484      */
1485     op->state->phase = PHASE_FINISH_CLOSING;
1486     maybe_finish (op);
1487     return;
1488   }
1489   GNUNET_break_op (0);
1490   fail_union_operation (op);
1491 }
1492
1493
1494 /**
1495  * Initiate operation to evaluate a set union with a remote peer.
1496  *
1497  * @param op operation to perform (to be initialized)
1498  * @param opaque_context message to be transmitted to the listener
1499  *        to convince him to accept, may be NULL
1500  */
1501 static void
1502 union_evaluate (struct Operation *op,
1503                 const struct GNUNET_MessageHeader *opaque_context)
1504 {
1505   struct GNUNET_MQ_Envelope *ev;
1506   struct OperationRequestMessage *msg;
1507
1508   GNUNET_assert (NULL == op->state);
1509   op->state = GNUNET_new (struct OperationState);
1510   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1511   /* copy the current generation's strata estimator for this operation */
1512   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1513   /* we started the operation, thus we have to send the operation request */
1514   op->state->phase = PHASE_EXPECT_SE;
1515   LOG (GNUNET_ERROR_TYPE_DEBUG,
1516        "Initiating union operation evaluation\n");
1517   GNUNET_STATISTICS_update (_GSS_statistics,
1518                             "# of total union operations",
1519                             1,
1520                             GNUNET_NO);
1521   GNUNET_STATISTICS_update (_GSS_statistics,
1522                             "# of initiated union operations",
1523                             1,
1524                             GNUNET_NO);
1525   ev = GNUNET_MQ_msg_nested_mh (msg,
1526                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1527                                 opaque_context);
1528   if (NULL == ev)
1529   {
1530     /* the context message is too large */
1531     GNUNET_break (0);
1532     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1533     return;
1534   }
1535   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1536   msg->app_id = op->spec->app_id;
1537   GNUNET_MQ_send (op->mq,
1538                   ev);
1539
1540   if (NULL != opaque_context)
1541     LOG (GNUNET_ERROR_TYPE_DEBUG,
1542          "sent op request with context message\n");
1543   else
1544     LOG (GNUNET_ERROR_TYPE_DEBUG,
1545          "sent op request without context message\n");
1546 }
1547
1548
1549 /**
1550  * Accept an union operation request from a remote peer.
1551  * Only initializes the private operation state.
1552  *
1553  * @param op operation that will be accepted as a union operation
1554  */
1555 static void
1556 union_accept (struct Operation *op)
1557 {
1558   LOG (GNUNET_ERROR_TYPE_DEBUG,
1559        "accepting set union operation\n");
1560   GNUNET_assert (NULL == op->state);
1561
1562   GNUNET_STATISTICS_update (_GSS_statistics,
1563                             "# of accepted union operations",
1564                             1,
1565                             GNUNET_NO);
1566   GNUNET_STATISTICS_update (_GSS_statistics,
1567                             "# of total union operations",
1568                             1,
1569                             GNUNET_NO);
1570
1571   op->state = GNUNET_new (struct OperationState);
1572   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1573   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1574   /* kick off the operation */
1575   send_strata_estimator (op);
1576 }
1577
1578
1579 /**
1580  * Create a new set supporting the union operation
1581  *
1582  * We maintain one strata estimator per set and then manipulate it over the
1583  * lifetime of the set, as recreating a strata estimator would be expensive.
1584  *
1585  * @return the newly created set, NULL on error
1586  */
1587 static struct SetState *
1588 union_set_create (void)
1589 {
1590   struct SetState *set_state;
1591
1592   LOG (GNUNET_ERROR_TYPE_DEBUG,
1593        "union set created\n");
1594   set_state = GNUNET_new (struct SetState);
1595   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1596                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1597   if (NULL == set_state->se)
1598   {
1599     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1600                 "Failed to allocate strata estimator\n");
1601     GNUNET_free (set_state);
1602     return NULL;
1603   }
1604   return set_state;
1605 }
1606
1607
1608 /**
1609  * Add the element from the given element message to the set.
1610  *
1611  * @param set_state state of the set want to add to
1612  * @param ee the element to add to the set
1613  */
1614 static void
1615 union_add (struct SetState *set_state, struct ElementEntry *ee)
1616 {
1617   strata_estimator_insert (set_state->se,
1618                            get_ibf_key (&ee->element_hash, 0));
1619 }
1620
1621
1622 /**
1623  * Remove the element given in the element message from the set.
1624  * Only marks the element as removed, so that older set operations can still exchange it.
1625  *
1626  * @param set_state state of the set to remove from
1627  * @param ee set element to remove
1628  */
1629 static void
1630 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1631 {
1632   strata_estimator_remove (set_state->se,
1633                            get_ibf_key (&ee->element_hash, 0));
1634 }
1635
1636
1637 /**
1638  * Destroy a set that supports the union operation.
1639  *
1640  * @param set_state the set to destroy
1641  */
1642 static void
1643 union_set_destroy (struct SetState *set_state)
1644 {
1645   if (NULL != set_state->se)
1646   {
1647     strata_estimator_destroy (set_state->se);
1648     set_state->se = NULL;
1649   }
1650   GNUNET_free (set_state);
1651 }
1652
1653
1654 /**
1655  * Dispatch messages for a union operation.
1656  *
1657  * @param op the state of the union evaluate operation
1658  * @param mh the received message
1659  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1660  *         #GNUNET_OK otherwise
1661  */
1662 int
1663 union_handle_p2p_message (struct Operation *op,
1664                           const struct GNUNET_MessageHeader *mh)
1665 {
1666   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1667   //            "received p2p message (t: %u, s: %u)\n",
1668   //            ntohs (mh->type),
1669   //            ntohs (mh->size));
1670   switch (ntohs (mh->type))
1671   {
1672     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1673       return handle_p2p_ibf (op, mh);
1674     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1675       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1676     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1677       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1678     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1679       handle_p2p_elements (op, mh);
1680       break;
1681     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1682       handle_p2p_inquiry (op, mh);
1683       break;
1684     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1685       handle_p2p_done (op, mh);
1686       break;
1687     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1688       handle_p2p_offer (op, mh);
1689       break;
1690     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1691       handle_p2p_demand (op, mh);
1692       break;
1693     default:
1694       /* Something wrong with cadet's message handlers? */
1695       GNUNET_assert (0);
1696   }
1697   return GNUNET_OK;
1698 }
1699
1700
1701 /**
1702  * Handler for peer-disconnects, notifies the client
1703  * about the aborted operation in case the op was not concluded.
1704  *
1705  * @param op the destroyed operation
1706  */
1707 static void
1708 union_peer_disconnect (struct Operation *op)
1709 {
1710   if (PHASE_DONE != op->state->phase)
1711   {
1712     struct GNUNET_MQ_Envelope *ev;
1713     struct GNUNET_SET_ResultMessage *msg;
1714
1715     ev = GNUNET_MQ_msg (msg,
1716                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1717     msg->request_id = htonl (op->spec->client_request_id);
1718     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1719     msg->element_type = htons (0);
1720     GNUNET_MQ_send (op->spec->set->client_mq,
1721                     ev);
1722     LOG (GNUNET_ERROR_TYPE_WARNING,
1723          "other peer disconnected prematurely, phase %u\n",
1724          op->state->phase);
1725     _GSS_operation_destroy (op,
1726                             GNUNET_YES);
1727     return;
1728   }
1729   // else: the session has already been concluded
1730   LOG (GNUNET_ERROR_TYPE_DEBUG,
1731        "other peer disconnected (finished)\n");
1732   if (GNUNET_NO == op->state->client_done_sent)
1733     send_done_and_destroy (op);
1734 }
1735
1736
1737 /**
1738  * Copy union-specific set state.
1739  *
1740  * @param set source set for copying the union state
1741  * @return a copy of the union-specific set state
1742  */
1743 static struct SetState *
1744 union_copy_state (struct Set *set)
1745 {
1746   struct SetState *new_state;
1747
1748   new_state = GNUNET_new (struct SetState);
1749   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1750   new_state->se = strata_estimator_dup (set->state->se);
1751
1752   return new_state;
1753 }
1754
1755
1756 /**
1757  * Get the table with implementing functions for
1758  * set union.
1759  *
1760  * @return the operation specific VTable
1761  */
1762 const struct SetVT *
1763 _GSS_union_vt ()
1764 {
1765   static const struct SetVT union_vt = {
1766     .create = &union_set_create,
1767     .msg_handler = &union_handle_p2p_message,
1768     .add = &union_add,
1769     .remove = &union_remove,
1770     .destroy_set = &union_set_destroy,
1771     .evaluate = &union_evaluate,
1772     .accept = &union_accept,
1773     .peer_disconnect = &union_peer_disconnect,
1774     .cancel = &union_op_cancel,
1775     .copy_state = &union_copy_state,
1776   };
1777
1778   return &union_vt;
1779 }