missing check
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 4
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62 /**
63  * Number of buckets used in the ibf per estimated
64  * difference.
65  */
66 #define IBF_ALPHA 4
67
68
69 /**
70  * Current phase we are in for a union operation.
71  */
72 enum UnionOperationPhase
73 {
74   /**
75    * We sent the request message, and expect a strata estimator
76    */
77   PHASE_EXPECT_SE,
78   /**
79    * We sent the strata estimator, and expect an IBF
80    */
81   PHASE_EXPECT_IBF,
82   /**
83    * We know what type of IBF the other peer wants to send us,
84    * and expect the remaining parts
85    */
86   PHASE_EXPECT_IBF_CONT,
87   /**
88    * We are sending request and elements,
89    * and thus only expect elements from the other peer.
90    */
91   PHASE_EXPECT_ELEMENTS,
92   /**
93    * We are expecting elements and requests, and send
94    * requested elements back to the other peer.
95    */
96   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
97   /**
98    * The protocol is over.
99    * Results may still have to be sent to the client.
100    */
101   PHASE_FINISHED
102 };
103
104
105 /**
106  * State of an evaluate operation
107  * with another peer.
108  */
109 struct OperationState
110 {
111   /**
112    * Tunnel to the remote peer.
113    */
114   struct GNUNET_MESH_Tunnel *tunnel;
115
116   /**
117    * Detail information about the set operation,
118    * including the set to use.
119    */
120   struct OperationSpecification *spec;
121
122   /**
123    * Message queue for the peer.
124    */
125   struct GNUNET_MQ_Handle *mq;
126
127   /**
128    * Number of ibf buckets received
129    */
130   unsigned int ibf_buckets_received;
131
132   /**
133    * Copy of the set's strata estimator at the time of
134    * creation of this operation
135    */
136   struct StrataEstimator *se;
137
138   /**
139    * The ibf we currently receive
140    */
141   struct InvertibleBloomFilter *remote_ibf;
142
143   /**
144    * IBF of the set's element.
145    */
146   struct InvertibleBloomFilter *local_ibf;
147
148   /**
149    * Maps IBF-Keys (specific to the current salt) to elements.
150    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
151    * Colliding IBF-Keys are linked.
152    */
153   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
154
155   /**
156    * Current state of the operation.
157    */
158   enum UnionOperationPhase phase;
159
160   /**
161    * Generation in which the operation handle
162    * was created.
163    */
164   unsigned int generation_created;
165
166   /**
167    * Set state of the set that this operation
168    * belongs to.
169    */
170   struct Set *set;
171   
172   /**
173    * Evaluate operations are held in
174    * a linked list.
175    */
176   struct OperationState *next;
177   
178    /**
179     * Evaluate operations are held in
180     * a linked list.
181     */
182   struct OperationState *prev;
183
184   /**
185    * Did we send the client that we are done?
186    */
187   int client_done_sent;
188 };
189
190
191 /**
192  * The key entry is used to associate an ibf key with
193  * an element.
194  */
195 struct KeyEntry
196 {
197   /**
198    * IBF key for the entry, derived from the current salt.
199    */
200   struct IBF_Key ibf_key;
201
202   /**
203    * The actual element associated with the key
204    */
205   struct ElementEntry *element;
206
207   /**
208    * Element that collides with this element
209    * on the ibf key
210    */
211   struct KeyEntry *next_colliding;
212 };
213
214
215 /**
216  * Used as a closure for sending elements
217  * with a specific IBF key.
218  */
219 struct SendElementClosure
220 {
221   /**
222    * The IBF key whose matching elements should be
223    * sent.
224    */
225   struct IBF_Key ibf_key;
226
227   /**
228    * Operation for which the elements
229    * should be sent.
230    */
231   struct OperationState *eo;
232 };
233
234
235 /**
236  * Extra state required for efficient set union.
237  */
238 struct SetState
239 {
240   /**
241    * The strata estimator is only generated once for
242    * each set.
243    * The IBF keys are derived from the element hashes with
244    * salt=0.
245    */
246   struct StrataEstimator *se;
247
248   /**
249    * Evaluate operations are held in
250    * a linked list.
251    */
252   struct OperationState *ops_head;
253
254   /**
255    * Evaluate operations are held in
256    * a linked list.
257    */
258   struct OperationState *ops_tail;
259 };
260
261
262 /**
263  * Iterator over hash map entries.
264  *
265  * @param cls closure
266  * @param key current key code
267  * @param value value in the hash map
268  * @return GNUNET_YES if we should continue to
269  *         iterate,
270  *         GNUNET_NO if not.
271  */
272 static int
273 destroy_key_to_element_iter (void *cls,
274                              uint32_t key,
275                              void *value)
276 {
277   struct KeyEntry *k = value;
278   
279   while (NULL != k)
280   {
281     struct KeyEntry *k_tmp = k;
282     k = k->next_colliding;
283     if (GNUNET_YES == k_tmp->element->remote)
284     {
285       GNUNET_free (k_tmp->element);
286       k_tmp->element = NULL;
287     }
288     GNUNET_free (k_tmp);
289   }
290   return GNUNET_YES;
291 }
292
293
294 /**
295  * Destroy a union operation, and free all resources
296  * associated with it.
297  *
298  * @param eo the union operation to destroy
299  */
300 static void
301 union_operation_destroy (struct OperationState *eo)
302 {
303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
304   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
305                                eo->set->state->ops_tail,
306                                eo);
307   if (NULL != eo->mq)
308   {
309     GNUNET_MQ_destroy (eo->mq);
310     eo->mq = NULL;
311   }
312   if (NULL != eo->tunnel)
313   {
314     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
315     eo->tunnel = NULL;
316     GNUNET_MESH_tunnel_destroy (t);
317   }
318   if (NULL != eo->remote_ibf)
319   {
320     ibf_destroy (eo->remote_ibf);
321     eo->remote_ibf = NULL;
322   }
323   if (NULL != eo->local_ibf)
324   {
325     ibf_destroy (eo->local_ibf);
326     eo->local_ibf = NULL;
327   }
328   if (NULL != eo->se)
329   {
330     strata_estimator_destroy (eo->se);
331     eo->se = NULL;
332   }
333   if (NULL != eo->key_to_element)
334   {
335     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
336     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
337     eo->key_to_element = NULL;
338   }
339   if (NULL != eo->spec)
340   {
341     if (NULL != eo->spec->context_msg)
342     {
343       GNUNET_free (eo->spec->context_msg);
344       eo->spec->context_msg = NULL;
345     }
346     GNUNET_free (eo->spec);
347     eo->spec = NULL;
348   }
349   GNUNET_free (eo);
350
351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
352
353   /* FIXME: do a garbage collection of the set generations */
354 }
355
356
357 /**
358  * Inform the client that the union operation has failed,
359  * and proceed to destroy the evaluate operation.
360  *
361  * @param eo the union operation to fail
362  */
363 static void
364 fail_union_operation (struct OperationState *eo)
365 {
366   struct GNUNET_MQ_Envelope *ev;
367   struct GNUNET_SET_ResultMessage *msg;
368
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (eo->spec->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
374   union_operation_destroy (eo);
375 }
376
377
378 /**
379  * Derive the IBF key from a hash code and 
380  * a salt.
381  *
382  * @param src the hash code
383  * @param salt salt to use
384  * @return the derived IBF key
385  */
386 static struct IBF_Key
387 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
388 {
389   struct IBF_Key key;
390
391   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
392                       GCRY_MD_SHA512, GCRY_MD_SHA256,
393                       src, sizeof *src,
394                       &salt, sizeof (salt),
395                       NULL, 0);
396   return key;
397 }
398
399
400 /**
401  * Send a request for the evaluate operation to a remote peer
402  *
403  * @param eo operation with the other peer
404  */
405 static void
406 send_operation_request (struct OperationState *eo)
407 {
408   struct GNUNET_MQ_Envelope *ev;
409   struct OperationRequestMessage *msg;
410
411   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
412                                 eo->spec->context_msg);
413
414   if (NULL == ev)
415   {
416     /* the context message is too large */
417     GNUNET_break (0);
418     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
419     return;
420   }
421   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
422   msg->app_id = eo->spec->app_id;
423   msg->salt = htonl (eo->spec->salt);
424   GNUNET_MQ_send (eo->mq, ev);
425
426   if (NULL != eo->spec->context_msg)
427     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
428   else
429     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
430
431   if (NULL != eo->spec->context_msg)
432   {
433     GNUNET_free (eo->spec->context_msg);
434     eo->spec->context_msg = NULL;
435   }
436
437 }
438
439
440 /**
441  * Iterator to create the mapping between ibf keys
442  * and element entries.
443  *
444  * @param cls closure
445  * @param key current key code
446  * @param value value in the hash map
447  * @return GNUNET_YES if we should continue to
448  *         iterate,
449  *         GNUNET_NO if not.
450  */
451 static int
452 op_register_element_iterator (void *cls,
453                          uint32_t key,
454                          void *value)
455 {
456   struct KeyEntry *const new_k = cls;
457   struct KeyEntry *old_k = value;
458
459   GNUNET_assert (NULL != old_k);
460   do
461   {
462     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
463     {
464       new_k->next_colliding = old_k->next_colliding;
465       old_k->next_colliding = new_k;
466       return GNUNET_NO;
467     }
468     old_k = old_k->next_colliding;
469   } while (NULL != old_k);
470   return GNUNET_YES;
471 }
472
473
474 /**
475  * Insert an element into the union operation's
476  * key-to-element mapping. Takes ownership of 'ee'.
477  * Note that this does not insert the element in the set,
478  * only in the operation's key-element mapping.
479  * This is done to speed up re-tried operations, if some elements
480  * were transmitted, and then the IBF fails to decode.
481  *
482  * @param eo the union operation
483  * @param ee the element entry
484  */
485 static void
486 op_register_element (struct OperationState *eo, struct ElementEntry *ee)
487 {
488   int ret;
489   struct IBF_Key ibf_key;
490   struct KeyEntry *k;
491
492   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
493   k = GNUNET_new (struct KeyEntry);
494   k->element = ee;
495   k->ibf_key = ibf_key;
496   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
497                                                       (uint32_t) ibf_key.key_val,
498                                                       op_register_element_iterator, k);
499
500   /* was the element inserted into a colliding bucket? */
501   if (GNUNET_SYSERR == ret)
502     return;
503
504   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
505                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
506 }
507
508
509 /**
510  * Insert a key into an ibf.
511  *
512  * @param cls the ibf
513  * @param key unused
514  * @param value the key entry to get the key from
515  */
516 static int
517 prepare_ibf_iterator (void *cls,
518                       uint32_t key,
519                       void *value)
520 {
521   struct InvertibleBloomFilter *ibf = cls;
522   struct KeyEntry *ke = value;
523
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
525
526   ibf_insert (ibf, ke->ibf_key);
527   return GNUNET_YES;
528 }
529
530
531 /**
532  * Iterator for initializing the
533  * key-to-element mapping of a union operation
534  *
535  * @param cls the union operation
536  * @param key unised
537  * @param value the element entry to insert
538  *        into the key-to-element mapping
539  * @return GNUNET_YES to continue iterating,
540  *         GNUNET_NO to stop
541  */
542 static int
543 init_key_to_element_iterator (void *cls,
544                               const struct GNUNET_HashCode *key,
545                               void *value)
546 {
547   struct OperationState *eo = cls;
548   struct ElementEntry *e = value;
549
550   /* make sure that the element belongs to the set at the time
551    * of creating the operation */
552   if ( (e->generation_added > eo->generation_created) ||
553        ( (GNUNET_YES == e->removed) &&
554          (e->generation_removed < eo->generation_created)))
555     return GNUNET_YES;
556
557   GNUNET_assert (GNUNET_NO == e->remote);
558
559   op_register_element (eo, e);
560   return GNUNET_YES;
561 }
562
563
564 /**
565  * Create an ibf with the operation's elements
566  * of the specified size
567  *
568  * @param eo the union operation
569  * @param size size of the ibf to create
570  */
571 static void
572 prepare_ibf (struct OperationState *eo, uint16_t size)
573 {
574   if (NULL == eo->key_to_element)
575   {
576     unsigned int len;
577     len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
578     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
579     GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
580                                            init_key_to_element_iterator, eo);
581   }
582   if (NULL != eo->local_ibf)
583     ibf_destroy (eo->local_ibf);
584   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
585   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
586                                            prepare_ibf_iterator, eo->local_ibf);
587 }
588
589
590 /**
591  * Send an ibf of appropriate size.
592  *
593  * @param eo the union operation
594  * @param ibf_order order of the ibf to send, size=2^order
595  */
596 static void
597 send_ibf (struct OperationState *eo, uint16_t ibf_order)
598 {
599   unsigned int buckets_sent = 0;
600   struct InvertibleBloomFilter *ibf;
601
602   prepare_ibf (eo, 1<<ibf_order);
603
604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
605
606   ibf = eo->local_ibf;
607
608   while (buckets_sent < (1 << ibf_order))
609   {
610     unsigned int buckets_in_message;
611     struct GNUNET_MQ_Envelope *ev;
612     struct IBFMessage *msg;
613
614     buckets_in_message = (1 << ibf_order) - buckets_sent;
615     /* limit to maximum */
616     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
617       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
618
619     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
620                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
621     msg->reserved = 0;
622     msg->order = ibf_order;
623     msg->offset = htons (buckets_sent);
624     ibf_write_slice (ibf, buckets_sent,
625                      buckets_in_message, &msg[1]);
626     buckets_sent += buckets_in_message;
627     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
628                 buckets_in_message, buckets_sent, 1<<ibf_order);
629     GNUNET_MQ_send (eo->mq, ev);
630   }
631
632   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
633 }
634
635
636 /**
637  * Send a strata estimator to the remote peer.
638  *
639  * @param eo the union operation with the remote peer
640  */
641 static void
642 send_strata_estimator (struct OperationState *eo)
643 {
644   struct GNUNET_MQ_Envelope *ev;
645   struct GNUNET_MessageHeader *strata_msg;
646
647   ev = GNUNET_MQ_msg_header_extra (strata_msg,
648                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
649                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
650   strata_estimator_write (eo->set->state->se, &strata_msg[1]);
651   GNUNET_MQ_send (eo->mq, ev);
652   eo->phase = PHASE_EXPECT_IBF;
653   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
654 }
655
656
657 /**
658  * Compute the necessary order of an ibf
659  * from the size of the symmetric set difference.
660  *
661  * @param diff the difference
662  * @return the required size of the ibf
663  */
664 static unsigned int
665 get_order_from_difference (unsigned int diff)
666 {
667   unsigned int ibf_order;
668
669   ibf_order = 2;
670   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
671     ibf_order++;
672   if (ibf_order > MAX_IBF_ORDER)
673     ibf_order = MAX_IBF_ORDER;
674   return ibf_order;
675 }
676
677
678 /**
679  * Handle a strata estimator from a remote peer
680  *
681  * @param cls the union operation
682  * @param mh the message
683  */
684 static void
685 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
686 {
687   struct OperationState *eo = cls;
688   struct StrataEstimator *remote_se;
689   int diff;
690
691   if (eo->phase != PHASE_EXPECT_SE)
692   {
693     fail_union_operation (eo);
694     GNUNET_break (0);
695     return;
696   }
697   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
698                                        SE_IBF_HASH_NUM);
699   strata_estimator_read (&mh[1], remote_se);
700   GNUNET_assert (NULL != eo->se);
701   diff = strata_estimator_difference (remote_se, eo->se);
702   strata_estimator_destroy (remote_se);
703   strata_estimator_destroy (eo->se);
704   eo->se = NULL;
705   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
706               diff, 1<<get_order_from_difference (diff));
707   send_ibf (eo, get_order_from_difference (diff));
708 }
709
710
711
712 /**
713  * Iterator to send elements to a remote peer
714  *
715  * @param cls closure with the element key and the union operation
716  * @param key ignored
717  * @param value the key entry
718  */
719 static int
720 send_element_iterator (void *cls,
721                        uint32_t key,
722                        void *value)
723 {
724   struct SendElementClosure *sec = cls;
725   struct IBF_Key ibf_key = sec->ibf_key;
726   struct OperationState *eo = sec->eo;
727   struct KeyEntry *ke = value;
728
729   if (ke->ibf_key.key_val != ibf_key.key_val)
730     return GNUNET_YES;
731   while (NULL != ke)
732   {
733     const struct GNUNET_SET_Element *const element = &ke->element->element;
734     struct GNUNET_MQ_Envelope *ev;
735     struct GNUNET_MessageHeader *mh;
736
737     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
738     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
739     if (NULL == ev)
740     {
741       /* element too large */
742       GNUNET_break (0);
743       continue;
744     }
745     memcpy (&mh[1], element->data, element->size);
746     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
747                 GNUNET_h2s (&ke->element->element_hash));
748     GNUNET_MQ_send (eo->mq, ev);
749     ke = ke->next_colliding;
750   }
751   return GNUNET_NO;
752 }
753
754 /**
755  * Send all elements that have the specified IBF key
756  * to the remote peer of the union operation
757  *
758  * @param eo union operation
759  * @param ibf_key IBF key of interest
760  */
761 static void
762 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
763 {
764   struct SendElementClosure send_cls;
765
766   send_cls.ibf_key = ibf_key;
767   send_cls.eo = eo;
768   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
769                                                 &send_element_iterator, &send_cls);
770 }
771
772
773 /**
774  * Decode which elements are missing on each side, and
775  * send the appropriate elemens and requests
776  *
777  * @param eo union operation
778  */
779 static void
780 decode_and_send (struct OperationState *eo)
781 {
782   struct IBF_Key key;
783   struct IBF_Key last_key;
784   int side;
785   unsigned int num_decoded;
786   struct InvertibleBloomFilter *diff_ibf;
787
788   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
789
790   prepare_ibf (eo, eo->remote_ibf->size);
791   diff_ibf = ibf_dup (eo->local_ibf);
792   ibf_subtract (diff_ibf, eo->remote_ibf);
793   
794   ibf_destroy (eo->remote_ibf);
795   eo->remote_ibf = NULL;
796
797   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
798
799   num_decoded = 0;
800   last_key.key_val = 0;
801
802   while (1)
803   {
804     int res;
805     int cycle_detected = GNUNET_NO;
806
807     last_key = key;
808
809     res = ibf_decode (diff_ibf, &side, &key);
810     if (res == GNUNET_OK)
811     {
812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
813                   key.key_val);
814       num_decoded += 1;
815       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
816       {
817         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
818                     num_decoded, diff_ibf->size);
819         cycle_detected = GNUNET_YES;
820       }
821     }
822     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
823     {
824       int next_order;
825       next_order = 0;
826       while (1<<next_order < diff_ibf->size)
827         next_order++;
828       next_order++;
829       if (next_order <= MAX_IBF_ORDER)
830       {
831         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
832                     "decoding failed, sending larger ibf (size %u)\n",
833                     1<<next_order);
834         send_ibf (eo, next_order);
835       }
836       else
837       {
838         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
839                     "set union failed: reached ibf limit\n");
840       }
841       break;
842     }
843     if (GNUNET_NO == res)
844     {
845       struct GNUNET_MQ_Envelope *ev;
846
847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
848       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
849       GNUNET_MQ_send (eo->mq, ev);
850       break;
851     }
852     if (1 == side)
853     {
854       send_elements_for_key (eo, key);
855     }
856     else if (-1 == side)
857     {
858       struct GNUNET_MQ_Envelope *ev;
859       struct GNUNET_MessageHeader *msg;
860
861       /* FIXME: before sending the request, check if we may just have the element */
862       /* FIXME: merge multiple requests */
863       /* FIXME: remember somewhere that we already requested the element,
864        * so that we don't request it again with the next ibf if decoding fails */
865       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
866                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
867       
868       *(struct IBF_Key *) &msg[1] = key;
869       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
870       GNUNET_MQ_send (eo->mq, ev);
871     }
872     else
873     {
874       GNUNET_assert (0);
875     }
876   }
877   ibf_destroy (diff_ibf);
878 }
879
880
881 /**
882  * Handle an IBF message from a remote peer.
883  *
884  * @param cls the union operation
885  * @param mh the header of the message
886  */
887 static void
888 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
889 {
890   struct OperationState *eo = cls;
891   struct IBFMessage *msg = (struct IBFMessage *) mh;
892   unsigned int buckets_in_message;
893
894   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
895        (eo->phase == PHASE_EXPECT_IBF) )
896   {
897     eo->phase = PHASE_EXPECT_IBF_CONT;
898     GNUNET_assert (NULL == eo->remote_ibf);
899     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
900     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
901     eo->ibf_buckets_received = 0;
902     if (0 != ntohs (msg->offset))
903     {
904       GNUNET_break (0);
905       fail_union_operation (eo);
906       return;
907     }
908   }
909   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
910   {
911     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
912          (1<<msg->order != eo->remote_ibf->size) )
913     {
914       GNUNET_break (0);
915       fail_union_operation (eo);
916       return;
917     }
918   }
919
920   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
921
922   if (0 == buckets_in_message)
923   {
924     GNUNET_break_op (0);
925     fail_union_operation (eo);
926     return;
927   }
928
929   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
930   {
931     GNUNET_break (0);
932     fail_union_operation (eo);
933     return;
934   }
935   
936   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
937   eo->ibf_buckets_received += buckets_in_message;
938
939   if (eo->ibf_buckets_received == eo->remote_ibf->size)
940   {
941     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
942     eo->phase = PHASE_EXPECT_ELEMENTS;
943     decode_and_send (eo);
944   }
945 }
946
947
948 /**
949  * Send a result message to the client indicating
950  * that there is a new element.
951  *
952  * @param eo union operation
953  * @param element element to send
954  */
955 static void
956 send_client_element (struct OperationState *eo,
957                      struct GNUNET_SET_Element *element)
958 {
959   struct GNUNET_MQ_Envelope *ev;
960   struct GNUNET_SET_ResultMessage *rm;
961
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
963   GNUNET_assert (0 != eo->spec->client_request_id);
964   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
965   if (NULL == ev)
966   {
967     GNUNET_MQ_discard (ev);
968     GNUNET_break (0);
969     return;
970   }
971   rm->result_status = htons (GNUNET_SET_STATUS_OK);
972   rm->request_id = htonl (eo->spec->client_request_id);
973   rm->element_type = element->type;
974   memcpy (&rm[1], element->data, element->size);
975   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
976 }
977
978
979 /**
980  * Send a result message to the client indicating
981  * that the operation is over.
982  * After the result done message has been sent to the client,
983  * destroy the evaluate operation.
984  *
985  * @param eo union operation
986  */
987 static void
988 send_client_done_and_destroy (struct OperationState *eo)
989 {
990   struct GNUNET_MQ_Envelope *ev;
991   struct GNUNET_SET_ResultMessage *rm;
992
993   GNUNET_assert (GNUNET_NO == eo->client_done_sent);
994
995   eo->client_done_sent = GNUNET_YES;
996
997   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
998   rm->request_id = htonl (eo->spec->client_request_id);
999   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1000   rm->element_type = htons (0);
1001   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1002
1003   union_operation_destroy (eo);
1004 }
1005
1006
1007 /**
1008  * Handle an element message from a remote peer.
1009  *
1010  * @param cls the union operation
1011  * @param mh the message
1012  */
1013 static void
1014 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1015 {
1016   struct OperationState *eo = cls;
1017   struct ElementEntry *ee;
1018   uint16_t element_size;
1019
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1021
1022   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1023        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1024   {
1025     fail_union_operation (eo);
1026     GNUNET_break (0);
1027     return;
1028   }
1029   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1030   ee = GNUNET_malloc (sizeof *eo + element_size);
1031   memcpy (&ee[1], &mh[1], element_size);
1032   ee->element.size = element_size;
1033   ee->element.data = &ee[1];
1034   ee->remote = GNUNET_YES;
1035   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1036
1037   /* FIXME: see if the element has already been inserted! */
1038
1039   op_register_element (eo, ee);
1040   send_client_element (eo, &ee->element);
1041 }
1042
1043
1044 /**
1045  * Handle an element request from a remote peer.
1046  *
1047  * @param cls the union operation
1048  * @param mh the message
1049  */
1050 static void
1051 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1052 {
1053   struct OperationState *eo = cls;
1054   struct IBF_Key *ibf_key;
1055   unsigned int num_keys;
1056
1057   /* look up elements and send them */
1058   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1059   {
1060     GNUNET_break (0);
1061     fail_union_operation (eo);
1062     return;
1063   }
1064
1065   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1066
1067   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1068   {
1069     GNUNET_break (0);
1070     fail_union_operation (eo);
1071     return;
1072   }
1073
1074   ibf_key = (struct IBF_Key *) &mh[1];
1075   while (0 != num_keys--)
1076   {
1077     send_elements_for_key (eo, *ibf_key);
1078     ibf_key++;
1079   }
1080 }
1081
1082
1083 /**
1084  * Handle a done message from a remote peer
1085  * 
1086  * @param cls the union operation
1087  * @param mh the message
1088  */
1089 static void
1090 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1091 {
1092   struct OperationState *eo = cls;
1093   struct GNUNET_MQ_Envelope *ev;
1094
1095   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1096   {
1097     /* we got all requests, but still have to send our elements as response */
1098
1099     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1100     eo->phase = PHASE_FINISHED;
1101     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1102     GNUNET_MQ_send (eo->mq, ev);
1103     return;
1104   }
1105   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1106   {
1107     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1108     eo->phase = PHASE_FINISHED;
1109     send_client_done_and_destroy (eo);
1110     return;
1111   }
1112   GNUNET_break (0);
1113   fail_union_operation (eo);
1114 }
1115
1116
1117 /**
1118  * Evaluate a union operation with
1119  * a remote peer.
1120  *
1121  * @param spec specification of the operation the evaluate
1122  * @param tunnel tunnel already connected to the partner peer
1123  * @param tc tunnel context, passed here so all new incoming
1124  *        messages are directly going to the union operations
1125  * @return a handle to the operation
1126  */
1127 static void
1128 union_evaluate (struct OperationSpecification *spec,
1129                 struct GNUNET_MESH_Tunnel *tunnel,
1130                 struct TunnelContext *tc)
1131 {
1132   struct OperationState *eo;
1133
1134   eo = GNUNET_new (struct OperationState);
1135   tc->vt = _GSS_union_vt ();
1136   tc->op = eo;
1137   eo->se = strata_estimator_dup (spec->set->state->se);
1138   eo->generation_created = spec->set->current_generation++;
1139   eo->set = spec->set;
1140   eo->spec = spec;
1141   eo->tunnel = tunnel;
1142   eo->mq = GNUNET_MESH_mq_create (tunnel);
1143
1144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1145               "evaluating union operation, (app %s)\n", 
1146               GNUNET_h2s (&eo->spec->app_id));
1147
1148   /* we started the operation, thus we have to send the operation request */
1149   eo->phase = PHASE_EXPECT_SE;
1150
1151   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1152                                eo->set->state->ops_tail,
1153                                eo);
1154
1155   send_operation_request (eo);
1156 }
1157
1158
1159 /**
1160  * Accept an union operation request from a remote peer
1161  *
1162  * @param spec all necessary information about the operation
1163  * @param tunnel open tunnel to the partner's peer
1164  * @param tc tunnel context, passed here so all new incoming
1165  *        messages are directly going to the union operations
1166  * @return operation
1167  */
1168 static void
1169 union_accept (struct OperationSpecification *spec,
1170               struct GNUNET_MESH_Tunnel *tunnel,
1171               struct TunnelContext *tc)
1172 {
1173   struct OperationState *eo;
1174
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1176
1177   eo = GNUNET_new (struct OperationState);
1178   tc->vt = _GSS_union_vt ();
1179   tc->op = eo;
1180   eo->set = spec->set;
1181   eo->generation_created = eo->set->current_generation++;
1182   eo->spec = spec;
1183   eo->tunnel = tunnel;
1184   eo->mq = GNUNET_MESH_mq_create (tunnel);
1185   eo->se = strata_estimator_dup (eo->set->state->se);
1186   /* transfer ownership of mq and socket from incoming to eo */
1187   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1188                                eo->set->state->ops_tail,
1189                                eo);
1190   /* kick off the operation */
1191   send_strata_estimator (eo);
1192 }
1193
1194
1195 /**
1196  * Create a new set supporting the union operation
1197  *
1198  * @return the newly created set
1199  */
1200 static struct SetState *
1201 union_set_create (void)
1202 {
1203   struct SetState *set_state;
1204
1205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1206   
1207   set_state = GNUNET_new (struct SetState);
1208   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1209                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1210   return set_state;
1211 }
1212
1213
1214 /**
1215  * Add the element from the given element message to the set.
1216  *
1217  * @param set_state state of the set want to add to
1218  * @param element the element to add to the set
1219  */
1220 static void
1221 union_add (struct SetState *set_state, struct ElementEntry *ee)
1222 {
1223   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1224 }
1225
1226
1227 /**
1228  * Destroy a set that supports the union operation
1229  *
1230  * @param set_state the set to destroy
1231  */
1232 static void
1233 union_set_destroy (struct SetState *set_state)
1234 {
1235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1236   /* important to destroy operations before the rest of the set */
1237   while (NULL != set_state->ops_head)
1238     union_operation_destroy (set_state->ops_head);
1239   if (NULL != set_state->se)
1240   {
1241     strata_estimator_destroy (set_state->se);
1242     set_state->se = NULL;
1243   }
1244   GNUNET_free (set_state);
1245 }
1246
1247
1248 /**
1249  * Remove the element given in the element message from the set.
1250  * Only marks the element as removed, so that older set operations can still exchange it.
1251  *
1252  * @param set_state state of the set to remove from
1253  * @param element set element to remove
1254  */
1255 static void
1256 union_remove (struct SetState *set_state, struct ElementEntry *element)
1257 {
1258   /* FIXME: remove from strata estimator */
1259 }
1260
1261
1262 /**
1263  * Dispatch messages for a union operation.
1264  *
1265  * @param eo the state of the union evaluate operation
1266  * @param mh the received message
1267  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1268  *         GNUNET_OK otherwise
1269  */
1270 int
1271 union_handle_p2p_message (struct OperationState *eo,
1272                           const struct GNUNET_MessageHeader *mh)
1273 {
1274   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1275               ntohs (mh->type), ntohs (mh->size));
1276   switch (ntohs (mh->type))
1277   {
1278     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1279       handle_p2p_ibf (eo, mh);
1280       break;
1281     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1282       handle_p2p_strata_estimator (eo, mh);
1283       break;
1284     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1285       handle_p2p_elements (eo, mh);
1286       break;
1287     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1288       handle_p2p_element_requests (eo, mh);
1289       break;
1290     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1291       handle_p2p_done (eo, mh);
1292       break;
1293     default:
1294       /* something wrong with mesh's message handlers? */
1295       GNUNET_assert (0);
1296   }
1297   return GNUNET_OK;
1298 }
1299
1300
1301 static void
1302 union_peer_disconnect (struct OperationState *op)
1303 {
1304   /* Are we already disconnected? */
1305   if (NULL == op->tunnel)
1306     return;
1307   op->tunnel = NULL;
1308   if (NULL != op->mq)
1309   {
1310     GNUNET_MQ_destroy (op->mq);
1311     op->mq = NULL;
1312   }
1313   if (PHASE_FINISHED != op->phase)
1314   {
1315     struct GNUNET_MQ_Envelope *ev;
1316     struct GNUNET_SET_ResultMessage *msg;
1317
1318     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1319     msg->request_id = htonl (op->spec->client_request_id);
1320     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1321     msg->element_type = htons (0);
1322     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1323     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1324     union_operation_destroy (op);
1325     return;
1326   }
1327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1328   if (GNUNET_NO == op->client_done_sent)
1329     send_client_done_and_destroy (op);
1330 }
1331
1332
1333 static void
1334 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1335 {
1336   /* FIXME: implement */
1337 }
1338
1339
1340 const struct SetVT *
1341 _GSS_union_vt ()
1342 {
1343   static const struct SetVT union_vt = {
1344     .create = &union_set_create,
1345     .msg_handler = &union_handle_p2p_message,
1346     .add = &union_add,
1347     .remove = &union_remove,
1348     .destroy_set = &union_set_destroy,
1349     .evaluate = &union_evaluate,
1350     .accept = &union_accept,
1351     .peer_disconnect = &union_peer_disconnect,
1352     .cancel = &union_op_cancel,
1353   };
1354
1355   return &union_vt;
1356 }