- extract context msg correctly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62 /**
63  * Number of buckets used in the ibf per estimated
64  * difference.
65  */
66 #define IBF_ALPHA 1
67
68
69 /**
70  * Current phase we are in for a union operation.
71  */
72 enum UnionOperationPhase
73 {
74   /**
75    * We sent the request message, and expect a strata estimator
76    */
77   PHASE_EXPECT_SE,
78   /**
79    * We sent the strata estimator, and expect an IBF
80    */
81   PHASE_EXPECT_IBF,
82   /**
83    * We know what type of IBF the other peer wants to send us,
84    * and expect the remaining parts
85    */
86   PHASE_EXPECT_IBF_CONT,
87   /**
88    * We are sending request and elements,
89    * and thus only expect elements from the other peer.
90    */
91   PHASE_EXPECT_ELEMENTS,
92   /**
93    * We are expecting elements and requests, and send
94    * requested elements back to the other peer.
95    */
96   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
97   /**
98    * The protocol is over.
99    * Results may still have to be sent to the client.
100    */
101   PHASE_FINISHED
102 };
103
104
105 /**
106  * State of an evaluate operation
107  * with another peer.
108  */
109 struct OperationState
110 {
111   /**
112    * Tunnel to the remote peer.
113    */
114   struct GNUNET_MESH_Tunnel *tunnel;
115
116   /**
117    * Detail information about the set operation,
118    * including the set to use.
119    */
120   struct OperationSpecification *spec;
121
122   /**
123    * Message queue for the peer.
124    */
125   struct GNUNET_MQ_Handle *mq;
126
127   /**
128    * Number of ibf buckets received
129    */
130   unsigned int ibf_buckets_received;
131
132   /**
133    * Copy of the set's strata estimator at the time of
134    * creation of this operation
135    */
136   struct StrataEstimator *se;
137
138   /**
139    * The ibf we currently receive
140    */
141   struct InvertibleBloomFilter *remote_ibf;
142
143   /**
144    * IBF of the set's element.
145    */
146   struct InvertibleBloomFilter *local_ibf;
147
148   /**
149    * Maps IBF-Keys (specific to the current salt) to elements.
150    */
151   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152
153   /**
154    * Current state of the operation.
155    */
156   enum UnionOperationPhase phase;
157
158   /**
159    * Generation in which the operation handle
160    * was created.
161    */
162   unsigned int generation_created;
163
164   /**
165    * Set state of the set that this operation
166    * belongs to.
167    */
168   struct SetState *set_state;
169   
170   /**
171    * Evaluate operations are held in
172    * a linked list.
173    */
174   struct OperationState *next;
175   
176    /**
177     * Evaluate operations are held in
178     * a linked list.
179     */
180   struct OperationState *prev;
181 };
182
183
184 /**
185  * Information about an element element in the set.
186  * All elements are stored in a hash-table
187  * from their hash-code to their 'struct Element',
188  * so that the remove and add operations are reasonably
189  * fast.
190  */
191 struct ElementEntry
192 {
193   /**
194    * The actual element. The data for the element
195    * should be allocated at the end of this struct.
196    */
197   struct GNUNET_SET_Element element;
198
199   /**
200    * Hash of the element.
201    * Will be used to derive the different IBF keys
202    * for different salts.
203    */
204   struct GNUNET_HashCode element_hash;
205
206   /**
207    * Generation the element was added by the client.
208    * Operations of earlier generations will not consider the element.
209    */
210   unsigned int generation_added;
211
212   /**
213    * GNUNET_YES if the element has been removed in some generation.
214    */
215   int removed;
216
217   /**
218    * Generation the element was removed by the client. 
219    * Operations of later generations will not consider the element.
220    * Only valid if is_removed is GNUNET_YES.
221    */
222   unsigned int generation_removed;
223
224   /**
225    * GNUNET_YES if the element is a remote element, and does not belong
226    * to the operation's set.
227    */
228   int remote;
229 };
230
231
232 /**
233  * The key entry is used to associate an ibf key with
234  * an element.
235  */
236 struct KeyEntry
237 {
238   /**
239    * IBF key for the entry, derived from the current salt.
240    */
241   struct IBF_Key ibf_key;
242
243   /**
244    * The actual element associated with the key
245    */
246   struct ElementEntry *element;
247
248   /**
249    * Element that collides with this element
250    * on the ibf key
251    */
252   struct KeyEntry *next_colliding;
253 };
254
255
256 /**
257  * Used as a closure for sending elements
258  * with a specific IBF key.
259  */
260 struct SendElementClosure
261 {
262   /**
263    * The IBF key whose matching elements should be
264    * sent.
265    */
266   struct IBF_Key ibf_key;
267
268   /**
269    * Operation for which the elements
270    * should be sent.
271    */
272   struct OperationState *eo;
273 };
274
275
276 /**
277  * Extra state required for efficient set union.
278  */
279 struct SetState
280 {
281   /**
282    * The strata estimator is only generated once for
283    * each set.
284    * The IBF keys are derived from the element hashes with
285    * salt=0.
286    */
287   struct StrataEstimator *se;
288
289   /**
290    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
291    */
292   struct GNUNET_CONTAINER_MultiHashMap *elements;
293
294   /**
295    * Evaluate operations are held in
296    * a linked list.
297    */
298   struct OperationState *ops_head;
299
300   /**
301    * Evaluate operations are held in
302    * a linked list.
303    */
304   struct OperationState *ops_tail;
305
306   /**
307    * Current generation, that is, number of
308    * previously executed operations on this set
309    */
310   unsigned int current_generation;
311 };
312
313
314
315 /**
316  * Iterator over hash map entries.
317  *
318  * @param cls closure
319  * @param key current key code
320  * @param value value in the hash map
321  * @return GNUNET_YES if we should continue to
322  *         iterate,
323  *         GNUNET_NO if not.
324  */
325 static int
326 destroy_elements_iterator (void *cls,
327                            const struct GNUNET_HashCode * key,
328                            void *value)
329 {
330   struct ElementEntry *ee = value;
331
332   GNUNET_free (ee);
333   return GNUNET_YES;
334 }
335
336
337 /**
338  * Iterator over hash map entries.
339  *
340  * @param cls closure
341  * @param key current key code
342  * @param value value in the hash map
343  * @return GNUNET_YES if we should continue to
344  *         iterate,
345  *         GNUNET_NO if not.
346  */
347 static int
348 destroy_key_to_element_iter (void *cls,
349                              uint32_t key,
350                              void *value)
351 {
352   struct KeyEntry *k = value;
353   
354   while (NULL != k)
355   {
356     struct KeyEntry *k_tmp = k;
357     k = k->next_colliding;
358     if (GNUNET_YES == k_tmp->element->remote)
359     {
360       GNUNET_free (k_tmp->element);
361       k_tmp->element = NULL;
362     }
363     GNUNET_free (k_tmp);
364   }
365   return GNUNET_YES;
366 }
367
368
369 /**
370  * Destroy a union operation, and free all resources
371  * associated with it.
372  *
373  * @param eo the union operation to destroy
374  */
375 static void
376 union_operation_destroy (struct OperationState *eo)
377 {
378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
379   GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
380                                eo->set_state->ops_tail,
381                                eo);
382   if (NULL != eo->mq)
383   {
384     GNUNET_MQ_destroy (eo->mq);
385     eo->mq = NULL;
386   }
387   if (NULL != eo->tunnel)
388   {
389     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
390     eo->tunnel = NULL;
391     GNUNET_MESH_tunnel_destroy (t);
392   }
393   if (NULL != eo->remote_ibf)
394   {
395     ibf_destroy (eo->remote_ibf);
396     eo->remote_ibf = NULL;
397   }
398   if (NULL != eo->local_ibf)
399   {
400     ibf_destroy (eo->local_ibf);
401     eo->local_ibf = NULL;
402   }
403   if (NULL != eo->se)
404   {
405     strata_estimator_destroy (eo->se);
406     eo->se = NULL;
407   }
408   if (NULL != eo->key_to_element)
409   {
410     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
411     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
412     eo->key_to_element = NULL;
413   }
414   if (NULL != eo->spec)
415   {
416     if (NULL != eo->spec->context_msg)
417     {
418       GNUNET_free (eo->spec->context_msg);
419       eo->spec->context_msg = NULL;
420     }
421     GNUNET_free (eo->spec);
422     eo->spec = NULL;
423   }
424   GNUNET_free (eo);
425
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
427
428   /* FIXME: do a garbage collection of the set generations */
429 }
430
431
432 /**
433  * Inform the client that the union operation has failed,
434  * and proceed to destroy the evaluate operation.
435  *
436  * @param eo the union operation to fail
437  */
438 static void
439 fail_union_operation (struct OperationState *eo)
440 {
441   struct GNUNET_MQ_Envelope *ev;
442   struct GNUNET_SET_ResultMessage *msg;
443
444   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
445   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
446   msg->request_id = htonl (eo->spec->client_request_id);
447   msg->element_type = htons (0);
448   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
449   union_operation_destroy (eo);
450 }
451
452
453 /**
454  * Derive the IBF key from a hash code and 
455  * a salt.
456  *
457  * @param src the hash code
458  * @param salt salt to use
459  * @return the derived IBF key
460  */
461 static struct IBF_Key
462 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
463 {
464   struct IBF_Key key;
465
466   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
467                       GCRY_MD_SHA512, GCRY_MD_SHA256,
468                       src, sizeof *src,
469                       &salt, sizeof (salt),
470                       NULL, 0);
471   return key;
472 }
473
474
475 /**
476  * Send a request for the evaluate operation to a remote peer
477  *
478  * @param eo operation with the other peer
479  */
480 static void
481 send_operation_request (struct OperationState *eo)
482 {
483   struct GNUNET_MQ_Envelope *ev;
484   struct OperationRequestMessage *msg;
485
486   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
487                                 eo->spec->context_msg);
488
489   if (NULL == ev)
490   {
491     /* the context message is too large */
492     GNUNET_break (0);
493     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
494     return;
495   }
496   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
497   msg->app_id = eo->spec->app_id;
498   msg->salt = htonl (eo->spec->salt);
499   GNUNET_MQ_send (eo->mq, ev);
500
501   if (NULL != eo->spec->context_msg)
502     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
503   else
504     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
505
506   if (NULL != eo->spec->context_msg)
507   {
508     GNUNET_free (eo->spec->context_msg);
509     eo->spec->context_msg = NULL;
510   }
511
512 }
513
514
515 /**
516  * Iterator to create the mapping between ibf keys
517  * and element entries.
518  *
519  * @param cls closure
520  * @param key current key code
521  * @param value value in the hash map
522  * @return GNUNET_YES if we should continue to
523  *         iterate,
524  *         GNUNET_NO if not.
525  */
526 static int
527 insert_element_iterator (void *cls,
528                          uint32_t key,
529                          void *value)
530 {
531   struct KeyEntry *const new_k = cls;
532   struct KeyEntry *old_k = value;
533
534   GNUNET_assert (NULL != old_k);
535   do
536   {
537     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
538     {
539       new_k->next_colliding = old_k->next_colliding;
540       old_k->next_colliding = new_k;
541       return GNUNET_NO;
542     }
543     old_k = old_k->next_colliding;
544   } while (NULL != old_k);
545   return GNUNET_YES;
546 }
547
548
549 /**
550  * Insert an element into the union operation's
551  * key-to-element mapping. Takes ownership of 'ee'.
552  *
553  * @param eo the union operation
554  * @param ee the element entry
555  */
556 static void
557 insert_element (struct OperationState *eo, struct ElementEntry *ee)
558 {
559   int ret;
560   struct IBF_Key ibf_key;
561   struct KeyEntry *k;
562
563   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
564   k = GNUNET_new (struct KeyEntry);
565   k->element = ee;
566   k->ibf_key = ibf_key;
567   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
568                                                       (uint32_t) ibf_key.key_val,
569                                                       insert_element_iterator, k);
570
571   /* was the element inserted into a colliding bucket? */
572   if (GNUNET_SYSERR == ret)
573     return;
574
575   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
576                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
577 }
578
579
580 /**
581  * Insert a key into an ibf.
582  *
583  * @param cls the ibf
584  * @param key unused
585  * @param value the key entry to get the key from
586  */
587 static int
588 prepare_ibf_iterator (void *cls,
589                       uint32_t key,
590                       void *value)
591 {
592   struct InvertibleBloomFilter *ibf = cls;
593   struct KeyEntry *ke = value;
594
595   ibf_insert (ibf, ke->ibf_key);
596   return GNUNET_YES;
597 }
598
599
600 /**
601  * Iterator for initializing the
602  * key-to-element mapping of a union operation
603  *
604  * @param cls the union operation
605  * @param key unised
606  * @param value the element entry to insert
607  *        into the key-to-element mapping
608  */
609 static int
610 init_key_to_element_iterator (void *cls,
611                               const struct GNUNET_HashCode *key,
612                               void *value)
613 {
614   struct OperationState *eo = cls;
615   struct ElementEntry *e = value;
616
617   /* make sure that the element belongs to the set at the time
618    * of creating the operation */
619   if ( (e->generation_added > eo->generation_created) ||
620        ( (GNUNET_YES == e->removed) &&
621          (e->generation_removed < eo->generation_created)))
622     return GNUNET_YES;
623
624   e->remote = GNUNET_NO;
625
626   insert_element (eo, e);
627   return GNUNET_YES;
628 }
629
630
631 /**
632  * Create an ibf with the operation's elements
633  * of the specified size
634  *
635  * @param eo the union operation
636  * @param size size of the ibf to create
637  */
638 static void
639 prepare_ibf (struct OperationState *eo, uint16_t size)
640 {
641   if (NULL == eo->key_to_element)
642   {
643     unsigned int len;
644     len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
645     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
646     GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
647                                            init_key_to_element_iterator, eo);
648   }
649   if (NULL != eo->local_ibf)
650     ibf_destroy (eo->local_ibf);
651   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
652   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
653                                            prepare_ibf_iterator, eo->local_ibf);
654 }
655
656
657 /**
658  * Send an ibf of appropriate size.
659  *
660  * @param eo the union operation
661  * @param ibf_order order of the ibf to send, size=2^order
662  */
663 static void
664 send_ibf (struct OperationState *eo, uint16_t ibf_order)
665 {
666   unsigned int buckets_sent = 0;
667   struct InvertibleBloomFilter *ibf;
668
669   prepare_ibf (eo, 1<<ibf_order);
670
671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
672
673   ibf = eo->local_ibf;
674
675   while (buckets_sent < (1 << ibf_order))
676   {
677     unsigned int buckets_in_message;
678     struct GNUNET_MQ_Envelope *ev;
679     struct IBFMessage *msg;
680
681     buckets_in_message = (1 << ibf_order) - buckets_sent;
682     /* limit to maximum */
683     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
684       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
685
686     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
687                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
688     msg->reserved = 0;
689     msg->order = ibf_order;
690     msg->offset = htons (buckets_sent);
691     ibf_write_slice (ibf, buckets_sent,
692                      buckets_in_message, &msg[1]);
693     buckets_sent += buckets_in_message;
694     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
695                 buckets_in_message, buckets_sent, 1<<ibf_order);
696     GNUNET_MQ_send (eo->mq, ev);
697   }
698
699   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
700 }
701
702
703 /**
704  * Send a strata estimator to the remote peer.
705  *
706  * @param eo the union operation with the remote peer
707  */
708 static void
709 send_strata_estimator (struct OperationState *eo)
710 {
711   struct GNUNET_MQ_Envelope *ev;
712   struct GNUNET_MessageHeader *strata_msg;
713
714   ev = GNUNET_MQ_msg_header_extra (strata_msg,
715                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
716                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
717   strata_estimator_write (eo->set_state->se, &strata_msg[1]);
718   GNUNET_MQ_send (eo->mq, ev);
719   eo->phase = PHASE_EXPECT_IBF;
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
721 }
722
723
724 /**
725  * Compute the necessary order of an ibf
726  * from the size of the symmetric set difference.
727  *
728  * @param diff the difference
729  * @return the required size of the ibf
730  */
731 static unsigned int
732 get_order_from_difference (unsigned int diff)
733 {
734   unsigned int ibf_order;
735
736   ibf_order = 2;
737   while ((1<<ibf_order) < (IBF_ALPHA * diff))
738     ibf_order++;
739   if (ibf_order > MAX_IBF_ORDER)
740     ibf_order = MAX_IBF_ORDER;
741   return ibf_order;
742 }
743
744
745 /**
746  * Handle a strata estimator from a remote peer
747  *
748  * @param cls the union operation
749  * @param mh the message
750  */
751 static void
752 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
753 {
754   struct OperationState *eo = cls;
755   struct StrataEstimator *remote_se;
756   int diff;
757
758   if (eo->phase != PHASE_EXPECT_SE)
759   {
760     fail_union_operation (eo);
761     GNUNET_break (0);
762     return;
763   }
764   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
765                                        SE_IBF_HASH_NUM);
766   strata_estimator_read (&mh[1], remote_se);
767   GNUNET_assert (NULL != eo->se);
768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, calculating diff\n");
769   diff = strata_estimator_difference (remote_se, eo->se);
770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "se diff=%d\n", diff);
771   strata_estimator_destroy (remote_se);
772   strata_estimator_destroy (eo->se);
773   eo->se = NULL;
774   send_ibf (eo, get_order_from_difference (diff));
775 }
776
777
778
779 /**
780  * Iterator to send elements to a remote peer
781  *
782  * @param cls closure with the element key and the union operation
783  * @param key ignored
784  * @param value the key entry
785  */
786 static int
787 send_element_iterator (void *cls,
788                        uint32_t key,
789                        void *value)
790 {
791   struct SendElementClosure *sec = cls;
792   struct IBF_Key ibf_key = sec->ibf_key;
793   struct OperationState *eo = sec->eo;
794   struct KeyEntry *ke = value;
795
796   if (ke->ibf_key.key_val != ibf_key.key_val)
797     return GNUNET_YES;
798   while (NULL != ke)
799   {
800     const struct GNUNET_SET_Element *const element = &ke->element->element;
801     struct GNUNET_MQ_Envelope *ev;
802     struct GNUNET_MessageHeader *mh;
803
804     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
805     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
806     if (NULL == ev)
807     {
808       /* element too large */
809       GNUNET_break (0);
810       continue;
811     }
812     memcpy (&mh[1], element->data, element->size);
813     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
814                 GNUNET_h2s (&ke->element->element_hash));
815     GNUNET_MQ_send (eo->mq, ev);
816     ke = ke->next_colliding;
817   }
818   return GNUNET_NO;
819 }
820
821 /**
822  * Send all elements that have the specified IBF key
823  * to the remote peer of the union operation
824  *
825  * @param eo union operation
826  * @param ibf_key IBF key of interest
827  */
828 static void
829 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
830 {
831   struct SendElementClosure send_cls;
832
833   send_cls.ibf_key = ibf_key;
834   send_cls.eo = eo;
835   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
836                                                 &send_element_iterator, &send_cls);
837 }
838
839
840 /**
841  * Decode which elements are missing on each side, and
842  * send the appropriate elemens and requests
843  *
844  * @param eo union operation
845  */
846 static void
847 decode_and_send (struct OperationState *eo)
848 {
849   struct IBF_Key key;
850   struct IBF_Key last_key;
851   int side;
852   unsigned int num_decoded;
853   struct InvertibleBloomFilter *diff_ibf;
854
855   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
856
857   prepare_ibf (eo, eo->remote_ibf->size);
858   diff_ibf = ibf_dup (eo->local_ibf);
859   ibf_subtract (diff_ibf, eo->remote_ibf);
860   
861   ibf_destroy (eo->remote_ibf);
862   eo->remote_ibf = NULL;
863
864   num_decoded = 0;
865
866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
867
868   while (1)
869   {
870     int res;
871
872     if (num_decoded > 0)
873       last_key = key;
874
875     res = ibf_decode (diff_ibf, &side, &key);
876     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
877                 key.key_val);
878     num_decoded += 1;
879     if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
880       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
881                   num_decoded, diff_ibf->size);
882     if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) ||
883         (num_decoded > 1 && last_key.key_val == key.key_val))
884     {
885       int next_order;
886       next_order = 0;
887       while (1<<next_order < diff_ibf->size)
888         next_order++;
889       next_order++;
890       if (next_order <= MAX_IBF_ORDER)
891       {
892         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
893                     "decoding failed, sending larger ibf (size %u)\n",
894                     1<<next_order);
895         send_ibf (eo, next_order);
896       }
897       else
898       {
899         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
900                     "set union failed: reached ibf limit\n");
901       }
902       break;
903     }
904     if (GNUNET_NO == res)
905     {
906       struct GNUNET_MQ_Envelope *ev;
907
908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
909       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
910       GNUNET_MQ_send (eo->mq, ev);
911       break;
912     }
913     if (1 == side)
914     {
915       send_elements_for_key (eo, key);
916     }
917     else if (-1 == side)
918     {
919       struct GNUNET_MQ_Envelope *ev;
920       struct GNUNET_MessageHeader *msg;
921
922       /* FIXME: before sending the request, check if we may just have the element */
923       /* FIXME: merge multiple requests */
924       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
925                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
926       
927       *(struct IBF_Key *) &msg[1] = key;
928       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
929       GNUNET_MQ_send (eo->mq, ev);
930     }
931     else
932     {
933       GNUNET_assert (0);
934     }
935   }
936   ibf_destroy (diff_ibf);
937 }
938
939
940 /**
941  * Handle an IBF message from a remote peer.
942  *
943  * @param cls the union operation
944  * @param mh the header of the message
945  */
946 static void
947 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
948 {
949   struct OperationState *eo = cls;
950   struct IBFMessage *msg = (struct IBFMessage *) mh;
951   unsigned int buckets_in_message;
952
953   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
954        (eo->phase == PHASE_EXPECT_IBF) )
955   {
956     eo->phase = PHASE_EXPECT_IBF_CONT;
957     GNUNET_assert (NULL == eo->remote_ibf);
958     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
959     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
960     eo->ibf_buckets_received = 0;
961     if (0 != ntohs (msg->offset))
962     {
963       GNUNET_break (0);
964       fail_union_operation (eo);
965       return;
966     }
967   }
968   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
969   {
970     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
971          (1<<msg->order != eo->remote_ibf->size) )
972     {
973       GNUNET_break (0);
974       fail_union_operation (eo);
975       return;
976     }
977   }
978
979   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
980
981   if (0 == buckets_in_message)
982   {
983     GNUNET_break_op (0);
984     fail_union_operation (eo);
985     return;
986   }
987
988   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
989   {
990     GNUNET_break (0);
991     fail_union_operation (eo);
992     return;
993   }
994   
995   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
996   eo->ibf_buckets_received += buckets_in_message;
997
998   if (eo->ibf_buckets_received == eo->remote_ibf->size)
999   {
1000     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
1001     eo->phase = PHASE_EXPECT_ELEMENTS;
1002     decode_and_send (eo);
1003   }
1004 }
1005
1006
1007 /**
1008  * Send a result message to the client indicating
1009  * that there is a new element.
1010  *
1011  * @param eo union operation
1012  * @param element element to send
1013  */
1014 static void
1015 send_client_element (struct OperationState *eo,
1016                      struct GNUNET_SET_Element *element)
1017 {
1018   struct GNUNET_MQ_Envelope *ev;
1019   struct GNUNET_SET_ResultMessage *rm;
1020
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
1022   GNUNET_assert (0 != eo->spec->client_request_id);
1023   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1024   if (NULL == ev)
1025   {
1026     GNUNET_MQ_discard (ev);
1027     GNUNET_break (0);
1028     return;
1029   }
1030   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1031   rm->request_id = htonl (eo->spec->client_request_id);
1032   rm->element_type = element->type;
1033   memcpy (&rm[1], element->data, element->size);
1034   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1035 }
1036
1037
1038 /**
1039  * Send a result message to the client indicating
1040  * that the operation is over.
1041  * After the result done message has been sent to the client,
1042  * destroy the evaluate operation.
1043  *
1044  * @param eo union operation
1045  */
1046 static void
1047 send_client_done_and_destroy (struct OperationState *eo)
1048 {
1049   struct GNUNET_MQ_Envelope *ev;
1050   struct GNUNET_SET_ResultMessage *rm;
1051
1052   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1053   rm->request_id = htonl (eo->spec->client_request_id);
1054   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1055   rm->element_type = htons (0);
1056   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1057
1058 }
1059
1060
1061 /**
1062  * Handle an element message from a remote peer.
1063  *
1064  * @param cls the union operation
1065  * @param mh the message
1066  */
1067 static void
1068 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1069 {
1070   struct OperationState *eo = cls;
1071   struct ElementEntry *ee;
1072   uint16_t element_size;
1073
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1075
1076   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1077        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1078   {
1079     fail_union_operation (eo);
1080     GNUNET_break (0);
1081     return;
1082   }
1083   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1084   ee = GNUNET_malloc (sizeof *eo + element_size);
1085   memcpy (&ee[1], &mh[1], element_size);
1086   ee->element.size = element_size;
1087   ee->element.data = &ee[1];
1088   ee->remote = GNUNET_YES;
1089   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1090
1091   insert_element (eo, ee);
1092   send_client_element (eo, &ee->element);
1093 }
1094
1095
1096 /**
1097  * Handle an element request from a remote peer.
1098  *
1099  * @param cls the union operation
1100  * @param mh the message
1101  */
1102 static void
1103 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1104 {
1105   struct OperationState *eo = cls;
1106   struct IBF_Key *ibf_key;
1107   unsigned int num_keys;
1108
1109   /* look up elements and send them */
1110   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1111   {
1112     GNUNET_break (0);
1113     fail_union_operation (eo);
1114     return;
1115   }
1116
1117   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1118
1119   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1120   {
1121     GNUNET_break (0);
1122     fail_union_operation (eo);
1123     return;
1124   }
1125
1126   ibf_key = (struct IBF_Key *) &mh[1];
1127   while (0 != num_keys--)
1128   {
1129     send_elements_for_key (eo, *ibf_key);
1130     ibf_key++;
1131   }
1132 }
1133
1134
1135 /**
1136  * Callback used for notifications
1137  *
1138  * @param cls closure
1139  */
1140 static void
1141 peer_done_sent_cb (void *cls)
1142 {
1143   struct OperationState *eo = cls;
1144
1145   send_client_done_and_destroy (eo);
1146 }
1147
1148
1149 /**
1150  * Handle a done message from a remote peer
1151  * 
1152  * @param cls the union operation
1153  * @param mh the message
1154  */
1155 static void
1156 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1157 {
1158   struct OperationState *eo = cls;
1159
1160   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1161   {
1162     /* we got all requests, but still have to send our elements as response */
1163     struct GNUNET_MQ_Envelope *ev;
1164
1165     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1166     eo->phase = PHASE_FINISHED;
1167     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1168     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1169     GNUNET_MQ_send (eo->mq, ev);
1170     return;
1171   }
1172   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1173   {
1174     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1175     eo->phase = PHASE_FINISHED;
1176     send_client_done_and_destroy (eo);
1177     return;
1178   }
1179   GNUNET_break (0);
1180   fail_union_operation (eo);
1181 }
1182
1183
1184 /**
1185  * Evaluate a union operation with
1186  * a remote peer.
1187  *
1188  * @param spec specification of the operation the evaluate
1189  * @param tunnel tunnel already connected to the partner peer
1190  * @param tc tunnel context, passed here so all new incoming
1191  *        messages are directly going to the union operations
1192  * @return a handle to the operation
1193  */
1194 static void
1195 union_evaluate (struct OperationSpecification *spec,
1196                 struct GNUNET_MESH_Tunnel *tunnel,
1197                 struct TunnelContext *tc)
1198 {
1199   struct OperationState *eo;
1200
1201   eo = GNUNET_new (struct OperationState);
1202   tc->vt = _GSS_union_vt ();
1203   tc->op = eo;
1204   eo->se = strata_estimator_dup (spec->set->state->se);
1205   eo->set_state = spec->set->state;
1206   eo->spec = spec;
1207   eo->tunnel = tunnel;
1208   eo->mq = GNUNET_MESH_mq_create (tunnel);
1209
1210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1211               "evaluating union operation, (app %s)\n", 
1212               GNUNET_h2s (&eo->spec->app_id));
1213
1214   /* we started the operation, thus we have to send the operation request */
1215   eo->phase = PHASE_EXPECT_SE;
1216
1217   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1218                                eo->set_state->ops_tail,
1219                                eo);
1220
1221   send_operation_request (eo);
1222 }
1223
1224
1225 /**
1226  * Accept an union operation request from a remote peer
1227  *
1228  * @param spec all necessary information about the operation
1229  * @param tunnel open tunnel to the partner's peer
1230  * @param tc tunnel context, passed here so all new incoming
1231  *        messages are directly going to the union operations
1232  * @return operation
1233  */
1234 static void
1235 union_accept (struct OperationSpecification *spec,
1236               struct GNUNET_MESH_Tunnel *tunnel,
1237               struct TunnelContext *tc)
1238 {
1239   struct OperationState *eo;
1240
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1242
1243   eo = GNUNET_new (struct OperationState);
1244   tc->vt = _GSS_union_vt ();
1245   tc->op = eo;
1246   eo->set_state = spec->set->state;
1247   eo->generation_created = eo->set_state->current_generation++;
1248   eo->spec = spec;
1249   eo->tunnel = tunnel;
1250   eo->mq = GNUNET_MESH_mq_create (tunnel);
1251   eo->se = strata_estimator_dup (eo->set_state->se);
1252   /* transfer ownership of mq and socket from incoming to eo */
1253   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1254                                eo->set_state->ops_tail,
1255                                eo);
1256   /* kick off the operation */
1257   send_strata_estimator (eo);
1258 }
1259
1260
1261 /**
1262  * Create a new set supporting the union operation
1263  *
1264  * @return the newly created set
1265  */
1266 static struct SetState *
1267 union_set_create (void)
1268 {
1269   struct SetState *set_state;
1270
1271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1272   
1273   set_state = GNUNET_new (struct SetState);
1274   /* keys of the hash map are stored in the element entrys, thus we do not
1275    * want the hash map to copy them */
1276   set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1277   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1278                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1279   return set_state;
1280 }
1281
1282
1283 /**
1284  * Add the element from the given element message to the set.
1285  *
1286  * @param set_state state of the set want to add to
1287  * @param element the element to add to the set
1288  */
1289 static void
1290 union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1291 {
1292   struct ElementEntry *ee;
1293   struct ElementEntry *ee_dup;
1294
1295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
1296
1297   ee = GNUNET_malloc (element->size + sizeof *ee);
1298   ee->element.size = element->size;
1299   memcpy (&ee[1], element->data, element->size);
1300   ee->element.data = &ee[1];
1301   ee->generation_added = set_state->current_generation;
1302   ee->remote = GNUNET_NO;
1303   GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
1304   ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
1305                                               &ee->element_hash);
1306   if (NULL != ee_dup)
1307   {
1308     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1309     GNUNET_free (ee);
1310     return;
1311   }
1312   GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
1313                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1314   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1315 }
1316
1317
1318 /**
1319  * Destroy a set that supports the union operation
1320  *
1321  * @param set_state the set to destroy
1322  */
1323 static void
1324 union_set_destroy (struct SetState *set_state)
1325 {
1326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1327   /* important to destroy operations before the rest of the set */
1328   while (NULL != set_state->ops_head)
1329     union_operation_destroy (set_state->ops_head);
1330   if (NULL != set_state->se)
1331   {
1332     strata_estimator_destroy (set_state->se);
1333     set_state->se = NULL;
1334   }
1335   if (NULL != set_state->elements)
1336   {
1337     GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
1338                                            destroy_elements_iterator, NULL);
1339     GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
1340     set_state->elements = NULL;
1341   }
1342
1343   GNUNET_free (set_state);
1344 }
1345
1346 /**
1347  * Remove the element given in the element message from the set.
1348  * Only marks the element as removed, so that older set operations can still exchange it.
1349  *
1350  * @param set_state state of the set to remove from
1351  * @param element set element to remove
1352  */
1353 static void
1354 union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1355 {
1356   struct GNUNET_HashCode hash;
1357   struct ElementEntry *ee;
1358
1359   GNUNET_CRYPTO_hash (element->data, element->size, &hash);
1360   ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
1361   if (NULL == ee)
1362   {
1363     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1364     return;
1365   }
1366   if (GNUNET_YES == ee->removed)
1367   {
1368     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1369     return;
1370   }
1371   ee->removed = GNUNET_YES;
1372   ee->generation_removed = set_state->current_generation;
1373 }
1374
1375
1376 /**
1377  * Dispatch messages for a union operation.
1378  *
1379  * @param eo the state of the union evaluate operation
1380  * @param mh the received message
1381  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1382  *         GNUNET_OK otherwise
1383  */
1384 int
1385 union_handle_p2p_message (struct OperationState *eo,
1386                           const struct GNUNET_MessageHeader *mh)
1387 {
1388   switch (ntohs (mh->type))
1389   {
1390     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1391       handle_p2p_ibf (eo, mh);
1392       break;
1393     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1394       handle_p2p_strata_estimator (eo, mh);
1395       break;
1396     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1397       handle_p2p_elements (eo, mh);
1398       break;
1399     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1400       handle_p2p_element_requests (eo, mh);
1401       break;
1402     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1403       handle_p2p_done (eo, mh);
1404       break;
1405     default:
1406       /* something wrong with mesh's message handlers? */
1407       GNUNET_assert (0);
1408   }
1409   return GNUNET_OK;
1410 }
1411
1412
1413 static void
1414 union_peer_disconnect (struct OperationState *op)
1415 {
1416   /* Are we already disconnected? */
1417   if (NULL == op->tunnel)
1418     return;
1419   op->tunnel = NULL;
1420   if (NULL != op->mq)
1421   {
1422     GNUNET_MQ_destroy (op->mq);
1423     op->mq = NULL;
1424   }
1425   if (PHASE_FINISHED != op->phase)
1426   {
1427     struct GNUNET_MQ_Envelope *ev;
1428     struct GNUNET_SET_ResultMessage *msg;
1429     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1430     msg->request_id = htonl (op->spec->client_request_id);
1431     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1432     msg->element_type = htons (0);
1433     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1434     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1435   }
1436   else
1437   {
1438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1439   }
1440   union_operation_destroy (op);
1441 }
1442
1443
1444 static void
1445 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1446 {
1447   /* FIXME: implement */
1448 }
1449
1450
1451 /**
1452  * Iterator over hash map entries.
1453  *
1454  * @param cls closure
1455  * @param key current key code
1456  * @param value value in the hash map
1457  * @return GNUNET_YES if we should continue to
1458  *         iterate,
1459  *         GNUNET_NO if not.
1460  */
1461 static int
1462 send_iter_element_iter (void *cls,
1463                         const struct GNUNET_HashCode *key,
1464                         void *value)
1465 {
1466   struct ElementEntry *ee = value;
1467   struct Set *set = cls;
1468   struct GNUNET_SET_IterResponseMessage *m;
1469   struct GNUNET_MQ_Envelope *ev;
1470
1471   ev = GNUNET_MQ_msg_extra (m, ee->element.size,
1472                             GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1473
1474   m->element_type = ee->element.type;
1475   memcpy (&m[1], ee->element.data, ee->element.size);
1476   GNUNET_MQ_send (set->client_mq, ev);
1477
1478   return GNUNET_YES;
1479 }
1480
1481
1482 /**
1483  * Send all elements of the union set to the client.
1484  *
1485  * @param set set to iterate over
1486  */
1487 static void
1488 union_iterate (struct Set *set)
1489 {
1490   struct GNUNET_MQ_Envelope *ev;
1491
1492   GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set);
1493   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
1494   GNUNET_MQ_send (set->client_mq, ev);
1495 }
1496
1497
1498 const struct SetVT *
1499 _GSS_union_vt ()
1500 {
1501   static const struct SetVT union_vt = {
1502     .create = &union_set_create,
1503     .msg_handler = &union_handle_p2p_message,
1504     .add = &union_add,
1505     .remove = &union_remove,
1506     .destroy_set = &union_set_destroy,
1507     .evaluate = &union_evaluate,
1508     .accept = &union_accept,
1509     .peer_disconnect = &union_peer_disconnect,
1510     .cancel = &union_op_cancel,
1511     .iterate = &union_iterate
1512   };
1513
1514   return &union_vt;
1515 }