-preparations for replacement of try_connect call
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "gnunet-service-set.h"
29 #include "ibf.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
32 #include <gcrypt.h>
33
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
36
37
38 /**
39  * Number of IBFs in a strata estimator.
40  */
41 #define SE_STRATA_COUNT 32
42
43 /**
44  * Size of the IBFs in the strata estimator.
45  */
46 #define SE_IBF_SIZE 80
47
48 /**
49  * The hash num parameter for the difference digests and strata estimators.
50  */
51 #define SE_IBF_HASH_NUM 4
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (16)
64
65 /**
66  * Number of buckets used in the ibf per estimated
67  * difference.
68  */
69 #define IBF_ALPHA 4
70
71
72 /**
73  * Current phase we are in for a union operation.
74  */
75 enum UnionOperationPhase
76 {
77   /**
78    * We sent the request message, and expect a strata estimator.
79    */
80   PHASE_EXPECT_SE,
81
82   /**
83    * We sent the strata estimator, and expect an IBF. This phase is entered once
84    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
85    *
86    * XXX: could use better wording.
87    *
88    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
89    */
90   PHASE_EXPECT_IBF,
91
92   /**
93    * Continuation for multi part IBFs.
94    */
95   PHASE_EXPECT_IBF_CONT,
96
97   /**
98    * We are decoding an IBF.
99    */
100   PHASE_INVENTORY_ACTIVE,
101
102   /**
103    * The other peer is decoding the IBF we just sent.
104    */
105   PHASE_INVENTORY_PASSIVE,
106
107   /**
108    * The protocol is almost finished, but we still have to flush our message
109    * queue and/or expect some elements.
110    */
111   PHASE_FINISH_CLOSING,
112
113   /**
114    * In the penultimate phase,
115    * we wait until all our demands
116    * are satisfied.  Then we send a done
117    * message, and wait for another done message.*/
118   PHASE_FINISH_WAITING,
119
120   /**
121    * In the ultimate phase, we wait until
122    * our demands are satisfied and then
123    * quit (sending another DONE message). */
124   PHASE_DONE
125 };
126
127
128 /**
129  * State of an evaluate operation with another peer.
130  */
131 struct OperationState
132 {
133   /**
134    * Copy of the set's strata estimator at the time of
135    * creation of this operation.
136    */
137   struct StrataEstimator *se;
138
139   /**
140    * The IBF we currently receive.
141    */
142   struct InvertibleBloomFilter *remote_ibf;
143
144   /**
145    * The IBF with the local set's element.
146    */
147   struct InvertibleBloomFilter *local_ibf;
148
149   /**
150    * Maps IBF-Keys (specific to the current salt) to elements.
151    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
152    * Colliding IBF-Keys are linked.
153    */
154   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
155
156   /**
157    * Current state of the operation.
158    */
159   enum UnionOperationPhase phase;
160
161   /**
162    * Did we send the client that we are done?
163    */
164   int client_done_sent;
165
166   /**
167    * Number of ibf buckets already received into the @a remote_ibf.
168    */
169   unsigned int ibf_buckets_received;
170
171   /**
172    * Hashes for elements that we have demanded from the other peer.
173    */
174   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
175 };
176
177
178 /**
179  * The key entry is used to associate an ibf key with an element.
180  */
181 struct KeyEntry
182 {
183   /**
184    * IBF key for the entry, derived from the current salt.
185    */
186   struct IBF_Key ibf_key;
187
188   /**
189    * The actual element associated with the key.
190    *
191    * Only owned by the union operation if element->operation
192    * is #GNUNET_YES.
193    */
194   struct ElementEntry *element;
195 };
196
197
198 /**
199  * Used as a closure for sending elements
200  * with a specific IBF key.
201  */
202 struct SendElementClosure
203 {
204   /**
205    * The IBF key whose matching elements should be
206    * sent.
207    */
208   struct IBF_Key ibf_key;
209
210   /**
211    * Operation for which the elements
212    * should be sent.
213    */
214   struct Operation *op;
215 };
216
217
218 /**
219  * Extra state required for efficient set union.
220  */
221 struct SetState
222 {
223   /**
224    * The strata estimator is only generated once for
225    * each set.
226    * The IBF keys are derived from the element hashes with
227    * salt=0.
228    */
229   struct StrataEstimator *se;
230 };
231
232
233 /**
234  * Iterator over hash map entries, called to
235  * destroy the linked list of colliding ibf key entries.
236  *
237  * @param cls closure
238  * @param key current key code
239  * @param value value in the hash map
240  * @return #GNUNET_YES if we should continue to iterate,
241  *         #GNUNET_NO if not.
242  */
243 static int
244 destroy_key_to_element_iter (void *cls,
245                              uint32_t key,
246                              void *value)
247 {
248   struct KeyEntry *k = value;
249
250   GNUNET_assert (NULL != k);
251   if (GNUNET_YES == k->element->remote)
252   {
253     GNUNET_free (k->element);
254     k->element = NULL;
255   }
256   GNUNET_free (k);
257   return GNUNET_YES;
258 }
259
260
261 /**
262  * Destroy the union operation.  Only things specific to the union
263  * operation are destroyed.
264  *
265  * @param op union operation to destroy
266  */
267 static void
268 union_op_cancel (struct Operation *op)
269 {
270   LOG (GNUNET_ERROR_TYPE_DEBUG,
271        "destroying union op\n");
272   /* check if the op was canceled twice */
273   GNUNET_assert (NULL != op->state);
274   if (NULL != op->state->remote_ibf)
275   {
276     ibf_destroy (op->state->remote_ibf);
277     op->state->remote_ibf = NULL;
278   }
279   if (NULL != op->state->demanded_hashes)
280   {
281     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
282     op->state->demanded_hashes = NULL;
283   }
284   if (NULL != op->state->local_ibf)
285   {
286     ibf_destroy (op->state->local_ibf);
287     op->state->local_ibf = NULL;
288   }
289   if (NULL != op->state->se)
290   {
291     strata_estimator_destroy (op->state->se);
292     op->state->se = NULL;
293   }
294   if (NULL != op->state->key_to_element)
295   {
296     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
297                                              &destroy_key_to_element_iter,
298                                              NULL);
299     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
300     op->state->key_to_element = NULL;
301   }
302   GNUNET_free (op->state);
303   op->state = NULL;
304   LOG (GNUNET_ERROR_TYPE_DEBUG,
305        "destroying union op done\n");
306 }
307
308
309 /**
310  * Inform the client that the union operation has failed,
311  * and proceed to destroy the evaluate operation.
312  *
313  * @param op the union operation to fail
314  */
315 static void
316 fail_union_operation (struct Operation *op)
317 {
318   struct GNUNET_MQ_Envelope *ev;
319   struct GNUNET_SET_ResultMessage *msg;
320
321   LOG (GNUNET_ERROR_TYPE_ERROR,
322        "union operation failed\n");
323   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
324   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
325   msg->request_id = htonl (op->spec->client_request_id);
326   msg->element_type = htons (0);
327   GNUNET_MQ_send (op->spec->set->client_mq, ev);
328   _GSS_operation_destroy (op, GNUNET_YES);
329 }
330
331
332 /**
333  * Derive the IBF key from a hash code and
334  * a salt.
335  *
336  * @param src the hash code
337  * @param salt salt to use
338  * @return the derived IBF key
339  */
340 static struct IBF_Key
341 get_ibf_key (const struct GNUNET_HashCode *src,
342              uint16_t salt)
343 {
344   struct IBF_Key key;
345
346   /* FIXME: Ensure that the salt is handled correctly.
347      This is a quick fix so that consensus works for now. */
348   salt = 0;
349
350   GNUNET_CRYPTO_kdf (&key, sizeof (key),
351                      src, sizeof *src,
352                      &salt, sizeof (salt),
353                      NULL, 0);
354   return key;
355 }
356
357
358 /**
359  * Iterator over the mapping from IBF keys to element entries.  Checks if we
360  * have an element with a given GNUNET_HashCode.
361  *
362  * @param cls closure
363  * @param key current key code
364  * @param value value in the hash map
365  * @return #GNUNET_YES if we should search further,
366  *         #GNUNET_NO if we've found the element.
367  */
368 static int
369 op_has_element_iterator (void *cls,
370                          uint32_t key,
371                          void *value)
372 {
373   struct GNUNET_HashCode *element_hash = cls;
374   struct KeyEntry *k = value;
375
376   GNUNET_assert (NULL != k);
377   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
378                                    element_hash))
379     return GNUNET_NO;
380   return GNUNET_YES;
381 }
382
383
384 /**
385  * Determine whether the given element is already in the operation's element
386  * set.
387  *
388  * @param op operation that should be tested for 'element_hash'
389  * @param element_hash hash of the element to look for
390  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
391  */
392 static int
393 op_has_element (struct Operation *op,
394                 const struct GNUNET_HashCode *element_hash)
395 {
396   int ret;
397   struct IBF_Key ibf_key;
398
399   ibf_key = get_ibf_key (element_hash, op->spec->salt);
400   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
401                                                       (uint32_t) ibf_key.key_val,
402                                                       op_has_element_iterator,
403                                                       (void *) element_hash);
404
405   /* was the iteration aborted because we found the element? */
406   if (GNUNET_SYSERR == ret)
407     return GNUNET_YES;
408   return GNUNET_NO;
409 }
410
411
412 /**
413  * Insert an element into the union operation's
414  * key-to-element mapping. Takes ownership of 'ee'.
415  * Note that this does not insert the element in the set,
416  * only in the operation's key-element mapping.
417  * This is done to speed up re-tried operations, if some elements
418  * were transmitted, and then the IBF fails to decode.
419  *
420  * XXX: clarify ownership, doesn't sound right.
421  *
422  * @param op the union operation
423  * @param ee the element entry
424  */
425 static void
426 op_register_element (struct Operation *op,
427                      struct ElementEntry *ee)
428 {
429   struct IBF_Key ibf_key;
430   struct KeyEntry *k;
431
432   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
433   k = GNUNET_new (struct KeyEntry);
434   k->element = ee;
435   k->ibf_key = ibf_key;
436   GNUNET_assert (GNUNET_OK ==
437                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
438                                                       (uint32_t) ibf_key.key_val,
439                                                       k,
440                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
441 }
442
443
444 /**
445  * Insert a key into an ibf.
446  *
447  * @param cls the ibf
448  * @param key unused
449  * @param value the key entry to get the key from
450  */
451 static int
452 prepare_ibf_iterator (void *cls,
453                       uint32_t key,
454                       void *value)
455 {
456   struct Operation *op = cls;
457   struct KeyEntry *ke = value;
458
459   LOG (GNUNET_ERROR_TYPE_DEBUG,
460        "[OP %x] inserting %lx (hash %s) into ibf\n",
461        (void *) op,
462        (unsigned long) ke->ibf_key.key_val,
463        GNUNET_h2s (&ke->element->element_hash));
464   ibf_insert (op->state->local_ibf, ke->ibf_key);
465   return GNUNET_YES;
466 }
467
468
469 /**
470  * Iterator for initializing the
471  * key-to-element mapping of a union operation
472  *
473  * @param cls the union operation `struct Operation *`
474  * @param key unused
475  * @param value the `struct ElementEntry *` to insert
476  *        into the key-to-element mapping
477  * @return #GNUNET_YES (to continue iterating)
478  */
479 static int
480 init_key_to_element_iterator (void *cls,
481                               const struct GNUNET_HashCode *key,
482                               void *value)
483 {
484   struct Operation *op = cls;
485   struct ElementEntry *ee = value;
486
487   /* make sure that the element belongs to the set at the time
488    * of creating the operation */
489   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
490     return GNUNET_YES;
491
492   GNUNET_assert (GNUNET_NO == ee->remote);
493
494   op_register_element (op, ee);
495   return GNUNET_YES;
496 }
497
498
499 /**
500  * Create an ibf with the operation's elements
501  * of the specified size
502  *
503  * @param op the union operation
504  * @param size size of the ibf to create
505  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
506  */
507 static int
508 prepare_ibf (struct Operation *op,
509              uint32_t size)
510 {
511   if (NULL == op->state->key_to_element)
512   {
513     unsigned int len;
514
515     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
516     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
517     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
518                                            init_key_to_element_iterator, op);
519   }
520   if (NULL != op->state->local_ibf)
521     ibf_destroy (op->state->local_ibf);
522   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
523   if (NULL == op->state->local_ibf)
524   {
525     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                 "Failed to allocate local IBF\n");
527     return GNUNET_SYSERR;
528   }
529   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
530                                            &prepare_ibf_iterator,
531                                            op);
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Send an ibf of appropriate size.
538  *
539  * Fragments the IBF into multiple messages if necessary.
540  *
541  * @param op the union operation
542  * @param ibf_order order of the ibf to send, size=2^order
543  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
544  */
545 static int
546 send_ibf (struct Operation *op,
547           uint16_t ibf_order)
548 {
549   unsigned int buckets_sent = 0;
550   struct InvertibleBloomFilter *ibf;
551
552   if (GNUNET_OK !=
553       prepare_ibf (op, 1<<ibf_order))
554   {
555     /* allocation failed */
556     return GNUNET_SYSERR;
557   }
558
559   LOG (GNUNET_ERROR_TYPE_DEBUG,
560        "sending ibf of size %u\n",
561        1<<ibf_order);
562
563   {
564     char name[64] = { 0 };
565     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
566     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
567   }
568
569   ibf = op->state->local_ibf;
570
571   while (buckets_sent < (1 << ibf_order))
572   {
573     unsigned int buckets_in_message;
574     struct GNUNET_MQ_Envelope *ev;
575     struct IBFMessage *msg;
576
577     buckets_in_message = (1 << ibf_order) - buckets_sent;
578     /* limit to maximum */
579     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
580       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
581
582     ev = GNUNET_MQ_msg_extra (msg,
583                               buckets_in_message * IBF_BUCKET_SIZE,
584                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
585     msg->reserved = 0;
586     msg->order = ibf_order;
587     msg->offset = htons (buckets_sent);
588     ibf_write_slice (ibf, buckets_sent,
589                      buckets_in_message, &msg[1]);
590     buckets_sent += buckets_in_message;
591     LOG (GNUNET_ERROR_TYPE_DEBUG,
592          "ibf chunk size %u, %u/%u sent\n",
593          buckets_in_message,
594          buckets_sent,
595          1<<ibf_order);
596     GNUNET_MQ_send (op->mq, ev);
597   }
598
599   /* The other peer must decode the IBF, so
600    * we're passive. */
601   op->state->phase = PHASE_INVENTORY_PASSIVE;
602   return GNUNET_OK;
603 }
604
605
606 /**
607  * Send a strata estimator to the remote peer.
608  *
609  * @param op the union operation with the remote peer
610  */
611 static void
612 send_strata_estimator (struct Operation *op)
613 {
614   const struct StrataEstimator *se = op->state->se;
615   struct GNUNET_MQ_Envelope *ev;
616   struct GNUNET_MessageHeader *strata_msg;
617   char *buf;
618   size_t len;
619   uint16_t type;
620
621   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
622   len = strata_estimator_write (op->state->se,
623                                 buf);
624   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
625     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
626   else
627     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
628   ev = GNUNET_MQ_msg_header_extra (strata_msg,
629                                    len,
630                                    type);
631   memcpy (&strata_msg[1],
632           buf,
633           len);
634   GNUNET_free (buf);
635   GNUNET_MQ_send (op->mq,
636                   ev);
637   op->state->phase = PHASE_EXPECT_IBF;
638   LOG (GNUNET_ERROR_TYPE_DEBUG,
639        "sent SE, expecting IBF\n");
640 }
641
642
643 /**
644  * Compute the necessary order of an ibf
645  * from the size of the symmetric set difference.
646  *
647  * @param diff the difference
648  * @return the required size of the ibf
649  */
650 static unsigned int
651 get_order_from_difference (unsigned int diff)
652 {
653   unsigned int ibf_order;
654
655   ibf_order = 2;
656   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
657           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
658     ibf_order++;
659   if (ibf_order > MAX_IBF_ORDER)
660     ibf_order = MAX_IBF_ORDER;
661   return ibf_order;
662 }
663
664
665 /**
666  * Handle a strata estimator from a remote peer
667  *
668  * @param cls the union operation
669  * @param mh the message
670  * @param is_compressed #GNUNET_YES if the estimator is compressed
671  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
672  *         #GNUNET_OK otherwise
673  */
674 static int
675 handle_p2p_strata_estimator (void *cls,
676                              const struct GNUNET_MessageHeader *mh,
677                              int is_compressed)
678 {
679   struct Operation *op = cls;
680   struct StrataEstimator *remote_se;
681   int diff;
682   size_t len;
683
684   if (op->state->phase != PHASE_EXPECT_SE)
685   {
686     fail_union_operation (op);
687     GNUNET_break (0);
688     return GNUNET_SYSERR;
689   }
690   len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
691   if ( (GNUNET_NO == is_compressed) &&
692        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
693   {
694     fail_union_operation (op);
695     GNUNET_break (0);
696     return GNUNET_SYSERR;
697   }
698   remote_se = strata_estimator_create (SE_STRATA_COUNT,
699                                        SE_IBF_SIZE,
700                                        SE_IBF_HASH_NUM);
701   if (NULL == remote_se)
702   {
703     /* insufficient resources, fail */
704     fail_union_operation (op);
705     return GNUNET_SYSERR;
706   }
707   if (GNUNET_OK !=
708       strata_estimator_read (&mh[1],
709                              len,
710                              is_compressed,
711                              remote_se))
712   {
713     /* decompression failed */
714     fail_union_operation (op);
715     return GNUNET_SYSERR;
716   }
717   GNUNET_assert (NULL != op->state->se);
718   diff = strata_estimator_difference (remote_se,
719                                       op->state->se);
720   strata_estimator_destroy (remote_se);
721   strata_estimator_destroy (op->state->se);
722   op->state->se = NULL;
723   LOG (GNUNET_ERROR_TYPE_DEBUG,
724        "got se diff=%d, using ibf size %d\n",
725        diff,
726        1<<get_order_from_difference (diff));
727   if (GNUNET_OK !=
728       send_ibf (op,
729                 get_order_from_difference (diff)))
730   {
731     /* Internal error, best we can do is shut the connection */
732     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733                 "Failed to send IBF, closing connection\n");
734     fail_union_operation (op);
735     return GNUNET_SYSERR;
736   }
737   return GNUNET_OK;
738 }
739
740
741 /**
742  * Iterator to send elements to a remote peer
743  *
744  * @param cls closure with the element key and the union operation
745  * @param key ignored
746  * @param value the key entry
747  */
748 static int
749 send_offers_iterator (void *cls,
750                       uint32_t key,
751                       void *value)
752 {
753   struct SendElementClosure *sec = cls;
754   struct Operation *op = sec->op;
755   struct KeyEntry *ke = value;
756   struct GNUNET_MQ_Envelope *ev;
757   struct GNUNET_MessageHeader *mh;
758
759   /* Detect 32-bit key collision for the 64-bit IBF keys. */
760   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
761     return GNUNET_YES;
762
763   ev = GNUNET_MQ_msg_header_extra (mh,
764                                    sizeof (struct GNUNET_HashCode),
765                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
766
767   GNUNET_assert (NULL != ev);
768   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
769   LOG (GNUNET_ERROR_TYPE_DEBUG,
770        "[OP %x] sending element offer (%s) to peer\n",
771        (void *) op,
772        GNUNET_h2s (&ke->element->element_hash));
773   GNUNET_MQ_send (op->mq, ev);
774   return GNUNET_YES;
775 }
776
777
778 /**
779  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
780  *
781  * @param op union operation
782  * @param ibf_key IBF key of interest
783  */
784 static void
785 send_offers_for_key (struct Operation *op,
786                      struct IBF_Key ibf_key)
787 {
788   struct SendElementClosure send_cls;
789
790   send_cls.ibf_key = ibf_key;
791   send_cls.op = op;
792   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
793                                                        (uint32_t) ibf_key.key_val,
794                                                        &send_offers_iterator,
795                                                        &send_cls);
796 }
797
798
799 /**
800  * Decode which elements are missing on each side, and
801  * send the appropriate offers and inquiries.
802  *
803  * @param op union operation
804  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
805  */
806 static int
807 decode_and_send (struct Operation *op)
808 {
809   struct IBF_Key key;
810   struct IBF_Key last_key;
811   int side;
812   unsigned int num_decoded;
813   struct InvertibleBloomFilter *diff_ibf;
814
815   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
816
817   if (GNUNET_OK !=
818       prepare_ibf (op, op->state->remote_ibf->size))
819   {
820     /* allocation failed */
821     return GNUNET_SYSERR;
822   }
823   diff_ibf = ibf_dup (op->state->local_ibf);
824   ibf_subtract (diff_ibf, op->state->remote_ibf);
825
826   ibf_destroy (op->state->remote_ibf);
827   op->state->remote_ibf = NULL;
828
829   LOG (GNUNET_ERROR_TYPE_DEBUG,
830        "decoding IBF (size=%u)\n",
831        diff_ibf->size);
832
833   num_decoded = 0;
834   last_key.key_val = 0;
835
836   while (1)
837   {
838     int res;
839     int cycle_detected = GNUNET_NO;
840
841     last_key = key;
842
843     res = ibf_decode (diff_ibf, &side, &key);
844     if (res == GNUNET_OK)
845     {
846       LOG (GNUNET_ERROR_TYPE_DEBUG,
847            "decoded ibf key %lx\n",
848            (unsigned long) key.key_val);
849       num_decoded += 1;
850       if ( (num_decoded > diff_ibf->size) ||
851            (num_decoded > 1 && last_key.key_val == key.key_val) )
852       {
853         LOG (GNUNET_ERROR_TYPE_DEBUG,
854              "detected cyclic ibf (decoded %u/%u)\n",
855              num_decoded,
856              diff_ibf->size);
857         cycle_detected = GNUNET_YES;
858       }
859     }
860     if ( (GNUNET_SYSERR == res) ||
861          (GNUNET_YES == cycle_detected) )
862     {
863       int next_order;
864       next_order = 0;
865       while (1<<next_order < diff_ibf->size)
866         next_order++;
867       next_order++;
868       if (next_order <= MAX_IBF_ORDER)
869       {
870         LOG (GNUNET_ERROR_TYPE_DEBUG,
871              "decoding failed, sending larger ibf (size %u)\n",
872              1<<next_order);
873         GNUNET_STATISTICS_update (_GSS_statistics,
874                                   "# of IBF retries",
875                                   1,
876                                   GNUNET_NO);
877         if (GNUNET_OK !=
878             send_ibf (op, next_order))
879         {
880           /* Internal error, best we can do is shut the connection */
881           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
882                       "Failed to send IBF, closing connection\n");
883           fail_union_operation (op);
884           ibf_destroy (diff_ibf);
885           return GNUNET_SYSERR;
886         }
887       }
888       else
889       {
890         GNUNET_STATISTICS_update (_GSS_statistics,
891                                   "# of failed union operations (too large)",
892                                   1,
893                                   GNUNET_NO);
894         // XXX: Send the whole set, element-by-element
895         LOG (GNUNET_ERROR_TYPE_ERROR,
896              "set union failed: reached ibf limit\n");
897         fail_union_operation (op);
898         ibf_destroy (diff_ibf);
899         return GNUNET_SYSERR;
900       }
901       break;
902     }
903     if (GNUNET_NO == res)
904     {
905       struct GNUNET_MQ_Envelope *ev;
906
907       LOG (GNUNET_ERROR_TYPE_DEBUG,
908            "transmitted all values, sending DONE\n");
909       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
910       GNUNET_MQ_send (op->mq, ev);
911       /* We now wait until we get a DONE message back
912        * and then wait for our MQ to be flushed and all our
913        * demands be delivered. */
914       break;
915     }
916     if (1 == side)
917     {
918       send_offers_for_key (op, key);
919     }
920     else if (-1 == side)
921     {
922       struct GNUNET_MQ_Envelope *ev;
923       struct GNUNET_MessageHeader *msg;
924
925       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
926        * the effort additional complexity. */
927       ev = GNUNET_MQ_msg_header_extra (msg,
928                                        sizeof (struct IBF_Key),
929                                        GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
930
931       memcpy (&msg[1],
932               &key,
933               sizeof (struct IBF_Key));
934       LOG (GNUNET_ERROR_TYPE_DEBUG,
935            "sending element inquiry for IBF key %lx\n",
936            (unsigned long) key.key_val);
937       GNUNET_MQ_send (op->mq, ev);
938     }
939     else
940     {
941       GNUNET_assert (0);
942     }
943   }
944   ibf_destroy (diff_ibf);
945   return GNUNET_OK;
946 }
947
948
949 /**
950  * Handle an IBF message from a remote peer.
951  *
952  * Reassemble the IBF from multiple pieces, and
953  * process the whole IBF once possible.
954  *
955  * @param cls the union operation
956  * @param mh the header of the message
957  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
958  *         #GNUNET_OK otherwise
959  */
960 static int
961 handle_p2p_ibf (void *cls,
962                 const struct GNUNET_MessageHeader *mh)
963 {
964   struct Operation *op = cls;
965   const struct IBFMessage *msg;
966   unsigned int buckets_in_message;
967
968   if (ntohs (mh->size) < sizeof (struct IBFMessage))
969   {
970     GNUNET_break_op (0);
971     fail_union_operation (op);
972     return GNUNET_SYSERR;
973   }
974   msg = (const struct IBFMessage *) mh;
975   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
976        (op->state->phase == PHASE_EXPECT_IBF) )
977   {
978     op->state->phase = PHASE_EXPECT_IBF_CONT;
979     GNUNET_assert (NULL == op->state->remote_ibf);
980     LOG (GNUNET_ERROR_TYPE_DEBUG,
981          "Creating new ibf of size %u\n",
982          1 << msg->order);
983     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
984     if (NULL == op->state->remote_ibf)
985     {
986       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
987                   "Failed to parse remote IBF, closing connection\n");
988       fail_union_operation (op);
989       return GNUNET_SYSERR;
990     }
991     op->state->ibf_buckets_received = 0;
992     if (0 != ntohs (msg->offset))
993     {
994       GNUNET_break_op (0);
995       fail_union_operation (op);
996       return GNUNET_SYSERR;
997     }
998   }
999   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1000   {
1001     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
1002          (1<<msg->order != op->state->remote_ibf->size) )
1003     {
1004       GNUNET_break_op (0);
1005       fail_union_operation (op);
1006       return GNUNET_SYSERR;
1007     }
1008   }
1009   else
1010   {
1011     GNUNET_assert (0);
1012   }
1013
1014   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1015
1016   if (0 == buckets_in_message)
1017   {
1018     GNUNET_break_op (0);
1019     fail_union_operation (op);
1020     return GNUNET_SYSERR;
1021   }
1022
1023   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1024   {
1025     GNUNET_break_op (0);
1026     fail_union_operation (op);
1027     return GNUNET_SYSERR;
1028   }
1029
1030   GNUNET_assert (NULL != op->state->remote_ibf);
1031
1032   ibf_read_slice (&msg[1],
1033                   op->state->ibf_buckets_received,
1034                   buckets_in_message,
1035                   op->state->remote_ibf);
1036   op->state->ibf_buckets_received += buckets_in_message;
1037
1038   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1039   {
1040     LOG (GNUNET_ERROR_TYPE_DEBUG,
1041          "received full ibf\n");
1042     op->state->phase = PHASE_INVENTORY_ACTIVE;
1043     if (GNUNET_OK !=
1044         decode_and_send (op))
1045     {
1046       /* Internal error, best we can do is shut down */
1047       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1048                   "Failed to decode IBF, closing connection\n");
1049       return GNUNET_SYSERR;
1050     }
1051   }
1052   return GNUNET_OK;
1053 }
1054
1055
1056 /**
1057  * Send a result message to the client indicating
1058  * that there is a new element.
1059  *
1060  * @param op union operation
1061  * @param element element to send
1062  * @param status status to send with the new element
1063  */
1064 static void
1065 send_client_element (struct Operation *op,
1066                      struct GNUNET_SET_Element *element,
1067                      int status)
1068 {
1069   struct GNUNET_MQ_Envelope *ev;
1070   struct GNUNET_SET_ResultMessage *rm;
1071
1072   LOG (GNUNET_ERROR_TYPE_DEBUG,
1073        "sending element (size %u) to client\n",
1074        element->size);
1075   GNUNET_assert (0 != op->spec->client_request_id);
1076   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1077   if (NULL == ev)
1078   {
1079     GNUNET_MQ_discard (ev);
1080     GNUNET_break (0);
1081     return;
1082   }
1083   rm->result_status = htons (status);
1084   rm->request_id = htonl (op->spec->client_request_id);
1085   rm->element_type = element->element_type;
1086   memcpy (&rm[1], element->data, element->size);
1087   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1088 }
1089
1090
1091 /**
1092  * Signal to the client that the operation has finished and
1093  * destroy the operation.
1094  *
1095  * @param cls operation to destroy
1096  */
1097 static void
1098 send_done_and_destroy (void *cls)
1099 {
1100   struct Operation *op = cls;
1101   struct GNUNET_MQ_Envelope *ev;
1102   struct GNUNET_SET_ResultMessage *rm;
1103
1104   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1105   rm->request_id = htonl (op->spec->client_request_id);
1106   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1107   rm->element_type = htons (0);
1108   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1109   /* Will also call the union-specific cancel function. */
1110   _GSS_operation_destroy (op, GNUNET_YES);
1111 }
1112
1113
1114 static void
1115 maybe_finish (struct Operation *op)
1116 {
1117   unsigned int num_demanded;
1118
1119   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1120
1121   if (PHASE_FINISH_WAITING == op->state->phase)
1122   {
1123     LOG (GNUNET_ERROR_TYPE_DEBUG,
1124          "In PHASE_FINISH_WAITING, pending %u demands\n",
1125          num_demanded);
1126     if (0 == num_demanded)
1127     {
1128       struct GNUNET_MQ_Envelope *ev;
1129
1130       op->state->phase = PHASE_DONE;
1131       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1132       GNUNET_MQ_send (op->mq, ev);
1133
1134       /* We now wait until the other peer closes the channel
1135        * after it got all elements from us. */
1136     }
1137   }
1138   if (PHASE_FINISH_CLOSING == op->state->phase)
1139   {
1140     LOG (GNUNET_ERROR_TYPE_DEBUG,
1141          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1142          num_demanded);
1143     if (0 == num_demanded)
1144     {
1145       op->state->phase = PHASE_DONE;
1146       send_done_and_destroy (op);
1147     }
1148   }
1149 }
1150
1151
1152 /**
1153  * Handle an element message from a remote peer.
1154  *
1155  * @param cls the union operation
1156  * @param mh the message
1157  */
1158 static void
1159 handle_p2p_elements (void *cls,
1160                      const struct GNUNET_MessageHeader *mh)
1161 {
1162   struct Operation *op = cls;
1163   struct ElementEntry *ee;
1164   const struct GNUNET_SET_ElementMessage *emsg;
1165   uint16_t element_size;
1166
1167   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1168   {
1169     GNUNET_break_op (0);
1170     fail_union_operation (op);
1171     return;
1172   }
1173   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1174   {
1175     GNUNET_break_op (0);
1176     fail_union_operation (op);
1177     return;
1178   }
1179
1180   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1181
1182   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1183   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1184   memcpy (&ee[1], &emsg[1], element_size);
1185   ee->element.size = element_size;
1186   ee->element.data = &ee[1];
1187   ee->element.element_type = ntohs (emsg->element_type);
1188   ee->remote = GNUNET_YES;
1189   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1190
1191   if (GNUNET_NO ==
1192       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1193                                             &ee->element_hash,
1194                                             NULL))
1195   {
1196     /* We got something we didn't demand, since it's not in our map. */
1197     GNUNET_break_op (0);
1198     GNUNET_free (ee);
1199     fail_union_operation (op);
1200     return;
1201   }
1202
1203   LOG (GNUNET_ERROR_TYPE_DEBUG,
1204        "Got element (size %u, hash %s) from peer\n",
1205        (unsigned int) element_size,
1206        GNUNET_h2s (&ee->element_hash));
1207
1208   GNUNET_STATISTICS_update (_GSS_statistics,
1209                             "# received elements",
1210                             1,
1211                             GNUNET_NO);
1212
1213   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1214   {
1215     /* Got repeated element.  Should not happen since
1216      * we track demands. */
1217     GNUNET_STATISTICS_update (_GSS_statistics,
1218                               "# repeated elements",
1219                               1,
1220                               GNUNET_NO);
1221     GNUNET_free (ee);
1222   }
1223   else
1224   {
1225     LOG (GNUNET_ERROR_TYPE_DEBUG,
1226          "Registering new element from remote peer\n");
1227     op_register_element (op, ee);
1228     /* only send results immediately if the client wants it */
1229     switch (op->spec->result_mode)
1230     {
1231       case GNUNET_SET_RESULT_ADDED:
1232         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1233         break;
1234       case GNUNET_SET_RESULT_SYMMETRIC:
1235         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1236         break;
1237       default:
1238         /* Result mode not supported, should have been caught earlier. */
1239         GNUNET_break (0);
1240         break;
1241     }
1242   }
1243
1244   maybe_finish (op);
1245 }
1246
1247
1248 /**
1249  * Send offers (for GNUNET_Hash-es) in response
1250  * to inquiries (for IBF_Key-s).
1251  *
1252  * @param cls the union operation
1253  * @param mh the message
1254  */
1255 static void
1256 handle_p2p_inquiry (void *cls,
1257                     const struct GNUNET_MessageHeader *mh)
1258 {
1259   struct Operation *op = cls;
1260   const struct IBF_Key *ibf_key;
1261   unsigned int num_keys;
1262
1263   /* look up elements and send them */
1264   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1265   {
1266     GNUNET_break_op (0);
1267     fail_union_operation (op);
1268     return;
1269   }
1270   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1271       / sizeof (struct IBF_Key);
1272   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1273       != num_keys * sizeof (struct IBF_Key))
1274   {
1275     GNUNET_break_op (0);
1276     fail_union_operation (op);
1277     return;
1278   }
1279
1280   ibf_key = (const struct IBF_Key *) &mh[1];
1281   while (0 != num_keys--)
1282   {
1283     send_offers_for_key (op, *ibf_key);
1284     ibf_key++;
1285   }
1286 }
1287
1288
1289 /**
1290  * FIXME
1291  */
1292 static void
1293 handle_p2p_demand (void *cls,
1294                    const struct GNUNET_MessageHeader *mh)
1295 {
1296   struct Operation *op = cls;
1297   struct ElementEntry *ee;
1298   struct GNUNET_SET_ElementMessage *emsg;
1299   const struct GNUNET_HashCode *hash;
1300   unsigned int num_hashes;
1301   struct GNUNET_MQ_Envelope *ev;
1302
1303   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1304     / sizeof (struct GNUNET_HashCode);
1305   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1306       != num_hashes * sizeof (struct GNUNET_HashCode))
1307   {
1308     GNUNET_break_op (0);
1309     fail_union_operation (op);
1310     return;
1311   }
1312
1313   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1314        num_hashes > 0;
1315        hash++, num_hashes--)
1316   {
1317     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1318     if (NULL == ee)
1319     {
1320       /* Demand for non-existing element. */
1321       GNUNET_break_op (0);
1322       fail_union_operation (op);
1323       return;
1324     }
1325     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1326     {
1327       /* Probably confused lazily copied sets. */
1328       GNUNET_break_op (0);
1329       fail_union_operation (op);
1330       return;
1331     }
1332     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1333     memcpy (&emsg[1], ee->element.data, ee->element.size);
1334     emsg->reserved = htons (0);
1335     emsg->element_type = htons (ee->element.element_type);
1336     LOG (GNUNET_ERROR_TYPE_DEBUG,
1337          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1338          (void *) op,
1339          (unsigned int) ee->element.size,
1340          GNUNET_h2s (&ee->element_hash));
1341     GNUNET_MQ_send (op->mq, ev);
1342
1343     switch (op->spec->result_mode)
1344     {
1345       case GNUNET_SET_RESULT_ADDED:
1346         /* Nothing to do. */
1347         break;
1348       case GNUNET_SET_RESULT_SYMMETRIC:
1349         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1350         break;
1351       default:
1352         /* Result mode not supported, should have been caught earlier. */
1353         GNUNET_break (0);
1354         break;
1355     }
1356   }
1357 }
1358
1359
1360 /**
1361  * Handle offers (of GNUNET_HashCode-s) and
1362  * respond with demands (of GNUNET_HashCode-s).
1363  *
1364  * @param cls the union operation
1365  * @param mh the message
1366  */
1367 static void
1368 handle_p2p_offer (void *cls,
1369                     const struct GNUNET_MessageHeader *mh)
1370 {
1371   struct Operation *op = cls;
1372   const struct GNUNET_HashCode *hash;
1373   unsigned int num_hashes;
1374
1375   /* look up elements and send them */
1376   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1377        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1378   {
1379     GNUNET_break_op (0);
1380     fail_union_operation (op);
1381     return;
1382   }
1383   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1384     / sizeof (struct GNUNET_HashCode);
1385   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1386       != num_hashes * sizeof (struct GNUNET_HashCode))
1387   {
1388     GNUNET_break_op (0);
1389     fail_union_operation (op);
1390     return;
1391   }
1392
1393   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1394        num_hashes > 0;
1395        hash++, num_hashes--)
1396   {
1397     struct ElementEntry *ee;
1398     struct GNUNET_MessageHeader *demands;
1399     struct GNUNET_MQ_Envelope *ev;
1400
1401     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1402                                             hash);
1403     if (NULL != ee)
1404       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1405         continue;
1406
1407     if (GNUNET_YES ==
1408         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1409                                                 hash))
1410     {
1411       LOG (GNUNET_ERROR_TYPE_DEBUG,
1412            "Skipped sending duplicate demand\n");
1413       continue;
1414     }
1415
1416     GNUNET_assert (GNUNET_OK ==
1417                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1418                                                       hash,
1419                                                       NULL,
1420                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1421
1422     LOG (GNUNET_ERROR_TYPE_DEBUG,
1423          "[OP %x] Requesting element (hash %s)\n",
1424          (void *) op, GNUNET_h2s (hash));
1425     ev = GNUNET_MQ_msg_header_extra (demands,
1426                                      sizeof (struct GNUNET_HashCode),
1427                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1428     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1429     GNUNET_MQ_send (op->mq, ev);
1430   }
1431 }
1432
1433
1434 /**
1435  * Handle a done message from a remote peer
1436  *
1437  * @param cls the union operation
1438  * @param mh the message
1439  */
1440 static void
1441 handle_p2p_done (void *cls,
1442                  const struct GNUNET_MessageHeader *mh)
1443 {
1444   struct Operation *op = cls;
1445
1446   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1447   {
1448     /* We got all requests, but still have to send our elements in response. */
1449
1450     op->state->phase = PHASE_FINISH_WAITING;
1451
1452     LOG (GNUNET_ERROR_TYPE_DEBUG,
1453          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1454     /* The active peer is done sending offers
1455      * and inquiries.  This means that all
1456      * our responses to that (demands and offers)
1457      * must be in flight (queued or in mesh).
1458      *
1459      * We should notify the active peer once
1460      * all our demands are satisfied, so that the active
1461      * peer can quit if we gave him everything.
1462      */
1463     maybe_finish (op);
1464     return;
1465   }
1466   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1467   {
1468     LOG (GNUNET_ERROR_TYPE_DEBUG,
1469          "got DONE (as active partner), waiting to finish\n");
1470     /* All demands of the other peer are satisfied,
1471      * and we processed all offers, thus we know
1472      * exactly what our demands must be.
1473      *
1474      * We'll close the channel
1475      * to the other peer once our demands are met.
1476      */
1477     op->state->phase = PHASE_FINISH_CLOSING;
1478     maybe_finish (op);
1479     return;
1480   }
1481   GNUNET_break_op (0);
1482   fail_union_operation (op);
1483 }
1484
1485
1486 /**
1487  * Initiate operation to evaluate a set union with a remote peer.
1488  *
1489  * @param op operation to perform (to be initialized)
1490  * @param opaque_context message to be transmitted to the listener
1491  *        to convince him to accept, may be NULL
1492  */
1493 static void
1494 union_evaluate (struct Operation *op,
1495                 const struct GNUNET_MessageHeader *opaque_context)
1496 {
1497   struct GNUNET_MQ_Envelope *ev;
1498   struct OperationRequestMessage *msg;
1499
1500   GNUNET_assert (NULL == op->state);
1501   op->state = GNUNET_new (struct OperationState);
1502   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1503   /* copy the current generation's strata estimator for this operation */
1504   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1505   /* we started the operation, thus we have to send the operation request */
1506   op->state->phase = PHASE_EXPECT_SE;
1507   LOG (GNUNET_ERROR_TYPE_DEBUG,
1508        "Initiating union operation evaluation\n");
1509   GNUNET_STATISTICS_update (_GSS_statistics,
1510                             "# of initiated union operations",
1511                             1,
1512                             GNUNET_NO);
1513   ev = GNUNET_MQ_msg_nested_mh (msg,
1514                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1515                                 opaque_context);
1516   if (NULL == ev)
1517   {
1518     /* the context message is too large */
1519     GNUNET_break (0);
1520     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1521     return;
1522   }
1523   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1524   msg->app_id = op->spec->app_id;
1525   GNUNET_MQ_send (op->mq,
1526                   ev);
1527
1528   if (NULL != opaque_context)
1529     LOG (GNUNET_ERROR_TYPE_DEBUG,
1530          "sent op request with context message\n");
1531   else
1532     LOG (GNUNET_ERROR_TYPE_DEBUG,
1533          "sent op request without context message\n");
1534 }
1535
1536
1537 /**
1538  * Accept an union operation request from a remote peer.
1539  * Only initializes the private operation state.
1540  *
1541  * @param op operation that will be accepted as a union operation
1542  */
1543 static void
1544 union_accept (struct Operation *op)
1545 {
1546   LOG (GNUNET_ERROR_TYPE_DEBUG,
1547        "accepting set union operation\n");
1548   GNUNET_assert (NULL == op->state);
1549
1550   GNUNET_STATISTICS_update (_GSS_statistics,
1551                             "# of accepted union operations",
1552                             1,
1553                             GNUNET_NO);
1554
1555   op->state = GNUNET_new (struct OperationState);
1556   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1557   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1558   /* kick off the operation */
1559   send_strata_estimator (op);
1560 }
1561
1562
1563 /**
1564  * Create a new set supporting the union operation
1565  *
1566  * We maintain one strata estimator per set and then manipulate it over the
1567  * lifetime of the set, as recreating a strata estimator would be expensive.
1568  *
1569  * @return the newly created set, NULL on error
1570  */
1571 static struct SetState *
1572 union_set_create (void)
1573 {
1574   struct SetState *set_state;
1575
1576   LOG (GNUNET_ERROR_TYPE_DEBUG,
1577        "union set created\n");
1578   set_state = GNUNET_new (struct SetState);
1579   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1580                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1581   if (NULL == set_state->se)
1582   {
1583     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1584                 "Failed to allocate strata estimator\n");
1585     GNUNET_free (set_state);
1586     return NULL;
1587   }
1588   return set_state;
1589 }
1590
1591
1592 /**
1593  * Add the element from the given element message to the set.
1594  *
1595  * @param set_state state of the set want to add to
1596  * @param ee the element to add to the set
1597  */
1598 static void
1599 union_add (struct SetState *set_state, struct ElementEntry *ee)
1600 {
1601   strata_estimator_insert (set_state->se,
1602                            get_ibf_key (&ee->element_hash, 0));
1603 }
1604
1605
1606 /**
1607  * Remove the element given in the element message from the set.
1608  * Only marks the element as removed, so that older set operations can still exchange it.
1609  *
1610  * @param set_state state of the set to remove from
1611  * @param ee set element to remove
1612  */
1613 static void
1614 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1615 {
1616   strata_estimator_remove (set_state->se,
1617                            get_ibf_key (&ee->element_hash, 0));
1618 }
1619
1620
1621 /**
1622  * Destroy a set that supports the union operation.
1623  *
1624  * @param set_state the set to destroy
1625  */
1626 static void
1627 union_set_destroy (struct SetState *set_state)
1628 {
1629   if (NULL != set_state->se)
1630   {
1631     strata_estimator_destroy (set_state->se);
1632     set_state->se = NULL;
1633   }
1634   GNUNET_free (set_state);
1635 }
1636
1637
1638 /**
1639  * Dispatch messages for a union operation.
1640  *
1641  * @param op the state of the union evaluate operation
1642  * @param mh the received message
1643  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1644  *         #GNUNET_OK otherwise
1645  */
1646 int
1647 union_handle_p2p_message (struct Operation *op,
1648                           const struct GNUNET_MessageHeader *mh)
1649 {
1650   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1651   //            "received p2p message (t: %u, s: %u)\n",
1652   //            ntohs (mh->type),
1653   //            ntohs (mh->size));
1654   switch (ntohs (mh->type))
1655   {
1656     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1657       return handle_p2p_ibf (op, mh);
1658     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1659       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1660     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1661       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1662     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1663       handle_p2p_elements (op, mh);
1664       break;
1665     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1666       handle_p2p_inquiry (op, mh);
1667       break;
1668     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1669       handle_p2p_done (op, mh);
1670       break;
1671     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1672       handle_p2p_offer (op, mh);
1673       break;
1674     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1675       handle_p2p_demand (op, mh);
1676       break;
1677     default:
1678       /* Something wrong with cadet's message handlers? */
1679       GNUNET_assert (0);
1680   }
1681   return GNUNET_OK;
1682 }
1683
1684
1685 /**
1686  * Handler for peer-disconnects, notifies the client
1687  * about the aborted operation in case the op was not concluded.
1688  *
1689  * @param op the destroyed operation
1690  */
1691 static void
1692 union_peer_disconnect (struct Operation *op)
1693 {
1694   if (PHASE_DONE != op->state->phase)
1695   {
1696     struct GNUNET_MQ_Envelope *ev;
1697     struct GNUNET_SET_ResultMessage *msg;
1698
1699     ev = GNUNET_MQ_msg (msg,
1700                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1701     msg->request_id = htonl (op->spec->client_request_id);
1702     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1703     msg->element_type = htons (0);
1704     GNUNET_MQ_send (op->spec->set->client_mq,
1705                     ev);
1706     LOG (GNUNET_ERROR_TYPE_WARNING,
1707          "other peer disconnected prematurely, phase %u\n",
1708          op->state->phase);
1709     _GSS_operation_destroy (op,
1710                             GNUNET_YES);
1711     return;
1712   }
1713   // else: the session has already been concluded
1714   LOG (GNUNET_ERROR_TYPE_DEBUG,
1715        "other peer disconnected (finished)\n");
1716   if (GNUNET_NO == op->state->client_done_sent)
1717     send_done_and_destroy (op);
1718 }
1719
1720
1721 /**
1722  * Copy union-specific set state.
1723  *
1724  * @param set source set for copying the union state
1725  * @return a copy of the union-specific set state
1726  */
1727 static struct SetState *
1728 union_copy_state (struct Set *set)
1729 {
1730   struct SetState *new_state;
1731
1732   new_state = GNUNET_new (struct SetState);
1733   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1734   new_state->se = strata_estimator_dup (set->state->se);
1735
1736   return new_state;
1737 }
1738
1739
1740 /**
1741  * Get the table with implementing functions for
1742  * set union.
1743  *
1744  * @return the operation specific VTable
1745  */
1746 const struct SetVT *
1747 _GSS_union_vt ()
1748 {
1749   static const struct SetVT union_vt = {
1750     .create = &union_set_create,
1751     .msg_handler = &union_handle_p2p_message,
1752     .add = &union_add,
1753     .remove = &union_remove,
1754     .destroy_set = &union_set_destroy,
1755     .evaluate = &union_evaluate,
1756     .accept = &union_accept,
1757     .peer_disconnect = &union_peer_disconnect,
1758     .cancel = &union_op_cancel,
1759     .copy_state = &union_copy_state,
1760   };
1761
1762   return &union_vt;
1763 }