-dce
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 4
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62 /**
63  * Number of buckets used in the ibf per estimated
64  * difference.
65  */
66 #define IBF_ALPHA 3
67
68
69 /**
70  * Current phase we are in for a union operation.
71  */
72 enum UnionOperationPhase
73 {
74   /**
75    * We sent the request message, and expect a strata estimator
76    */
77   PHASE_EXPECT_SE,
78   /**
79    * We sent the strata estimator, and expect an IBF
80    */
81   PHASE_EXPECT_IBF,
82   /**
83    * We know what type of IBF the other peer wants to send us,
84    * and expect the remaining parts
85    */
86   PHASE_EXPECT_IBF_CONT,
87   /**
88    * We are sending request and elements,
89    * and thus only expect elements from the other peer.
90    */
91   PHASE_EXPECT_ELEMENTS,
92   /**
93    * We are expecting elements and requests, and send
94    * requested elements back to the other peer.
95    */
96   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
97   /**
98    * The protocol is over.
99    * Results may still have to be sent to the client.
100    */
101   PHASE_FINISHED
102 };
103
104
105 /**
106  * State of an evaluate operation
107  * with another peer.
108  */
109 struct OperationState
110 {
111   /**
112    * Tunnel to the remote peer.
113    */
114   struct GNUNET_MESH_Tunnel *tunnel;
115
116   /**
117    * Detail information about the set operation,
118    * including the set to use.
119    */
120   struct OperationSpecification *spec;
121
122   /**
123    * Message queue for the peer.
124    */
125   struct GNUNET_MQ_Handle *mq;
126
127   /**
128    * Number of ibf buckets received
129    */
130   unsigned int ibf_buckets_received;
131
132   /**
133    * Copy of the set's strata estimator at the time of
134    * creation of this operation
135    */
136   struct StrataEstimator *se;
137
138   /**
139    * The ibf we currently receive
140    */
141   struct InvertibleBloomFilter *remote_ibf;
142
143   /**
144    * IBF of the set's element.
145    */
146   struct InvertibleBloomFilter *local_ibf;
147
148   /**
149    * Maps IBF-Keys (specific to the current salt) to elements.
150    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
151    * Colliding IBF-Keys are linked.
152    */
153   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
154
155   /**
156    * Current state of the operation.
157    */
158   enum UnionOperationPhase phase;
159
160   /**
161    * Generation in which the operation handle
162    * was created.
163    */
164   unsigned int generation_created;
165
166   /**
167    * Set state of the set that this operation
168    * belongs to.
169    */
170   struct Set *set;
171   
172   /**
173    * Evaluate operations are held in
174    * a linked list.
175    */
176   struct OperationState *next;
177   
178    /**
179     * Evaluate operations are held in
180     * a linked list.
181     */
182   struct OperationState *prev;
183
184   /**
185    * Did we send the client that we are done?
186    */
187   int client_done_sent;
188 };
189
190
191 /**
192  * The key entry is used to associate an ibf key with
193  * an element.
194  */
195 struct KeyEntry
196 {
197   /**
198    * IBF key for the entry, derived from the current salt.
199    */
200   struct IBF_Key ibf_key;
201
202   /**
203    * The actual element associated with the key
204    */
205   struct ElementEntry *element;
206
207   /**
208    * Element that collides with this element
209    * on the ibf key
210    */
211   struct KeyEntry *next_colliding;
212 };
213
214
215 /**
216  * Used as a closure for sending elements
217  * with a specific IBF key.
218  */
219 struct SendElementClosure
220 {
221   /**
222    * The IBF key whose matching elements should be
223    * sent.
224    */
225   struct IBF_Key ibf_key;
226
227   /**
228    * Operation for which the elements
229    * should be sent.
230    */
231   struct OperationState *eo;
232 };
233
234
235 /**
236  * Extra state required for efficient set union.
237  */
238 struct SetState
239 {
240   /**
241    * The strata estimator is only generated once for
242    * each set.
243    * The IBF keys are derived from the element hashes with
244    * salt=0.
245    */
246   struct StrataEstimator *se;
247
248   /**
249    * Evaluate operations are held in
250    * a linked list.
251    */
252   struct OperationState *ops_head;
253
254   /**
255    * Evaluate operations are held in
256    * a linked list.
257    */
258   struct OperationState *ops_tail;
259 };
260
261
262 /**
263  * Iterator over hash map entries.
264  *
265  * @param cls closure
266  * @param key current key code
267  * @param value value in the hash map
268  * @return GNUNET_YES if we should continue to
269  *         iterate,
270  *         GNUNET_NO if not.
271  */
272 static int
273 destroy_key_to_element_iter (void *cls,
274                              uint32_t key,
275                              void *value)
276 {
277   struct KeyEntry *k = value;
278   
279   while (NULL != k)
280   {
281     struct KeyEntry *k_tmp = k;
282     k = k->next_colliding;
283     if (GNUNET_YES == k_tmp->element->remote)
284     {
285       GNUNET_free (k_tmp->element);
286       k_tmp->element = NULL;
287     }
288     GNUNET_free (k_tmp);
289   }
290   return GNUNET_YES;
291 }
292
293
294 /**
295  * Destroy a union operation, and free all resources
296  * associated with it.
297  *
298  * @param eo the union operation to destroy
299  */
300 static void
301 union_operation_destroy (struct OperationState *eo)
302 {
303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
304   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
305                                eo->set->state->ops_tail,
306                                eo);
307   if (NULL != eo->mq)
308   {
309     GNUNET_MQ_destroy (eo->mq);
310     eo->mq = NULL;
311   }
312   if (NULL != eo->tunnel)
313   {
314     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
315     eo->tunnel = NULL;
316     GNUNET_MESH_tunnel_destroy (t);
317   }
318   if (NULL != eo->remote_ibf)
319   {
320     ibf_destroy (eo->remote_ibf);
321     eo->remote_ibf = NULL;
322   }
323   if (NULL != eo->local_ibf)
324   {
325     ibf_destroy (eo->local_ibf);
326     eo->local_ibf = NULL;
327   }
328   if (NULL != eo->se)
329   {
330     strata_estimator_destroy (eo->se);
331     eo->se = NULL;
332   }
333   if (NULL != eo->key_to_element)
334   {
335     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
336     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
337     eo->key_to_element = NULL;
338   }
339   if (NULL != eo->spec)
340   {
341     if (NULL != eo->spec->context_msg)
342     {
343       GNUNET_free (eo->spec->context_msg);
344       eo->spec->context_msg = NULL;
345     }
346     GNUNET_free (eo->spec);
347     eo->spec = NULL;
348   }
349   GNUNET_free (eo);
350
351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
352
353   /* FIXME: do a garbage collection of the set generations */
354 }
355
356
357 /**
358  * Inform the client that the union operation has failed,
359  * and proceed to destroy the evaluate operation.
360  *
361  * @param eo the union operation to fail
362  */
363 static void
364 fail_union_operation (struct OperationState *eo)
365 {
366   struct GNUNET_MQ_Envelope *ev;
367   struct GNUNET_SET_ResultMessage *msg;
368
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (eo->spec->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
374   union_operation_destroy (eo);
375 }
376
377
378 /**
379  * Derive the IBF key from a hash code and 
380  * a salt.
381  *
382  * @param src the hash code
383  * @param salt salt to use
384  * @return the derived IBF key
385  */
386 static struct IBF_Key
387 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
388 {
389   struct IBF_Key key;
390
391   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
392                       GCRY_MD_SHA512, GCRY_MD_SHA256,
393                       src, sizeof *src,
394                       &salt, sizeof (salt),
395                       NULL, 0);
396   return key;
397 }
398
399
400 /**
401  * Send a request for the evaluate operation to a remote peer
402  *
403  * @param eo operation with the other peer
404  */
405 static void
406 send_operation_request (struct OperationState *eo)
407 {
408   struct GNUNET_MQ_Envelope *ev;
409   struct OperationRequestMessage *msg;
410
411   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
412                                 eo->spec->context_msg);
413
414   if (NULL == ev)
415   {
416     /* the context message is too large */
417     GNUNET_break (0);
418     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
419     return;
420   }
421   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
422   msg->app_id = eo->spec->app_id;
423   msg->salt = htonl (eo->spec->salt);
424   GNUNET_MQ_send (eo->mq, ev);
425
426   if (NULL != eo->spec->context_msg)
427     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
428   else
429     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
430
431   if (NULL != eo->spec->context_msg)
432   {
433     GNUNET_free (eo->spec->context_msg);
434     eo->spec->context_msg = NULL;
435   }
436
437 }
438
439
440 /**
441  * Iterator to create the mapping between ibf keys
442  * and element entries.
443  *
444  * @param cls closure
445  * @param key current key code
446  * @param value value in the hash map
447  * @return GNUNET_YES if we should continue to
448  *         iterate,
449  *         GNUNET_NO if not.
450  */
451 static int
452 op_register_element_iterator (void *cls,
453                          uint32_t key,
454                          void *value)
455 {
456   struct KeyEntry *const new_k = cls;
457   struct KeyEntry *old_k = value;
458
459   GNUNET_assert (NULL != old_k);
460   do
461   {
462     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
463     {
464       new_k->next_colliding = old_k->next_colliding;
465       old_k->next_colliding = new_k;
466       return GNUNET_NO;
467     }
468     old_k = old_k->next_colliding;
469   } while (NULL != old_k);
470   return GNUNET_YES;
471 }
472
473
474 /**
475  * Insert an element into the union operation's
476  * key-to-element mapping. Takes ownership of 'ee'.
477  * Note that this does not insert the element in the set,
478  * only in the operation's key-element mapping.
479  * This is done to speed up re-tried operations, if some elements
480  * were transmitted, and then the IBF fails to decode.
481  *
482  * @param eo the union operation
483  * @param ee the element entry
484  */
485 static void
486 op_register_element (struct OperationState *eo, struct ElementEntry *ee)
487 {
488   int ret;
489   struct IBF_Key ibf_key;
490   struct KeyEntry *k;
491
492   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
493   k = GNUNET_new (struct KeyEntry);
494   k->element = ee;
495   k->ibf_key = ibf_key;
496   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
497                                                       (uint32_t) ibf_key.key_val,
498                                                       op_register_element_iterator, k);
499
500   /* was the element inserted into a colliding bucket? */
501   if (GNUNET_SYSERR == ret)
502     return;
503
504   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
505                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
506 }
507
508
509 /**
510  * Insert a key into an ibf.
511  *
512  * @param cls the ibf
513  * @param key unused
514  * @param value the key entry to get the key from
515  */
516 static int
517 prepare_ibf_iterator (void *cls,
518                       uint32_t key,
519                       void *value)
520 {
521   struct InvertibleBloomFilter *ibf = cls;
522   struct KeyEntry *ke = value;
523
524   ibf_insert (ibf, ke->ibf_key);
525   return GNUNET_YES;
526 }
527
528
529 /**
530  * Iterator for initializing the
531  * key-to-element mapping of a union operation
532  *
533  * @param cls the union operation
534  * @param key unised
535  * @param value the element entry to insert
536  *        into the key-to-element mapping
537  */
538 static int
539 init_key_to_element_iterator (void *cls,
540                               const struct GNUNET_HashCode *key,
541                               void *value)
542 {
543   struct OperationState *eo = cls;
544   struct ElementEntry *e = value;
545
546   /* make sure that the element belongs to the set at the time
547    * of creating the operation */
548   if ( (e->generation_added > eo->generation_created) ||
549        ( (GNUNET_YES == e->removed) &&
550          (e->generation_removed < eo->generation_created)))
551     return GNUNET_YES;
552
553   e->remote = GNUNET_NO;
554
555   op_register_element (eo, e);
556   return GNUNET_YES;
557 }
558
559
560 /**
561  * Create an ibf with the operation's elements
562  * of the specified size
563  *
564  * @param eo the union operation
565  * @param size size of the ibf to create
566  */
567 static void
568 prepare_ibf (struct OperationState *eo, uint16_t size)
569 {
570   if (NULL == eo->key_to_element)
571   {
572     unsigned int len;
573     len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
574     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
575     GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
576                                            init_key_to_element_iterator, eo);
577   }
578   if (NULL != eo->local_ibf)
579     ibf_destroy (eo->local_ibf);
580   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
581   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
582                                            prepare_ibf_iterator, eo->local_ibf);
583 }
584
585
586 /**
587  * Send an ibf of appropriate size.
588  *
589  * @param eo the union operation
590  * @param ibf_order order of the ibf to send, size=2^order
591  */
592 static void
593 send_ibf (struct OperationState *eo, uint16_t ibf_order)
594 {
595   unsigned int buckets_sent = 0;
596   struct InvertibleBloomFilter *ibf;
597
598   prepare_ibf (eo, 1<<ibf_order);
599
600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
601
602   ibf = eo->local_ibf;
603
604   while (buckets_sent < (1 << ibf_order))
605   {
606     unsigned int buckets_in_message;
607     struct GNUNET_MQ_Envelope *ev;
608     struct IBFMessage *msg;
609
610     buckets_in_message = (1 << ibf_order) - buckets_sent;
611     /* limit to maximum */
612     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
613       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
614
615     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
616                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
617     msg->reserved = 0;
618     msg->order = ibf_order;
619     msg->offset = htons (buckets_sent);
620     ibf_write_slice (ibf, buckets_sent,
621                      buckets_in_message, &msg[1]);
622     buckets_sent += buckets_in_message;
623     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
624                 buckets_in_message, buckets_sent, 1<<ibf_order);
625     GNUNET_MQ_send (eo->mq, ev);
626   }
627
628   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
629 }
630
631
632 /**
633  * Send a strata estimator to the remote peer.
634  *
635  * @param eo the union operation with the remote peer
636  */
637 static void
638 send_strata_estimator (struct OperationState *eo)
639 {
640   struct GNUNET_MQ_Envelope *ev;
641   struct GNUNET_MessageHeader *strata_msg;
642
643   ev = GNUNET_MQ_msg_header_extra (strata_msg,
644                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
645                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
646   strata_estimator_write (eo->set->state->se, &strata_msg[1]);
647   GNUNET_MQ_send (eo->mq, ev);
648   eo->phase = PHASE_EXPECT_IBF;
649   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
650 }
651
652
653 /**
654  * Compute the necessary order of an ibf
655  * from the size of the symmetric set difference.
656  *
657  * @param diff the difference
658  * @return the required size of the ibf
659  */
660 static unsigned int
661 get_order_from_difference (unsigned int diff)
662 {
663   unsigned int ibf_order;
664
665   ibf_order = 2;
666   while ((1<<ibf_order) < (IBF_ALPHA * diff))
667     ibf_order++;
668   if (ibf_order > MAX_IBF_ORDER)
669     ibf_order = MAX_IBF_ORDER;
670   return ibf_order;
671 }
672
673
674 /**
675  * Handle a strata estimator from a remote peer
676  *
677  * @param cls the union operation
678  * @param mh the message
679  */
680 static void
681 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
682 {
683   struct OperationState *eo = cls;
684   struct StrataEstimator *remote_se;
685   int diff;
686
687   if (eo->phase != PHASE_EXPECT_SE)
688   {
689     fail_union_operation (eo);
690     GNUNET_break (0);
691     return;
692   }
693   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
694                                        SE_IBF_HASH_NUM);
695   strata_estimator_read (&mh[1], remote_se);
696   GNUNET_assert (NULL != eo->se);
697   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, calculating diff\n");
698   diff = strata_estimator_difference (remote_se, eo->se);
699   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "se diff=%d\n", diff);
700   strata_estimator_destroy (remote_se);
701   strata_estimator_destroy (eo->se);
702   eo->se = NULL;
703   send_ibf (eo, get_order_from_difference (diff));
704 }
705
706
707
708 /**
709  * Iterator to send elements to a remote peer
710  *
711  * @param cls closure with the element key and the union operation
712  * @param key ignored
713  * @param value the key entry
714  */
715 static int
716 send_element_iterator (void *cls,
717                        uint32_t key,
718                        void *value)
719 {
720   struct SendElementClosure *sec = cls;
721   struct IBF_Key ibf_key = sec->ibf_key;
722   struct OperationState *eo = sec->eo;
723   struct KeyEntry *ke = value;
724
725   if (ke->ibf_key.key_val != ibf_key.key_val)
726     return GNUNET_YES;
727   while (NULL != ke)
728   {
729     const struct GNUNET_SET_Element *const element = &ke->element->element;
730     struct GNUNET_MQ_Envelope *ev;
731     struct GNUNET_MessageHeader *mh;
732
733     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
734     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
735     if (NULL == ev)
736     {
737       /* element too large */
738       GNUNET_break (0);
739       continue;
740     }
741     memcpy (&mh[1], element->data, element->size);
742     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
743                 GNUNET_h2s (&ke->element->element_hash));
744     GNUNET_MQ_send (eo->mq, ev);
745     ke = ke->next_colliding;
746   }
747   return GNUNET_NO;
748 }
749
750 /**
751  * Send all elements that have the specified IBF key
752  * to the remote peer of the union operation
753  *
754  * @param eo union operation
755  * @param ibf_key IBF key of interest
756  */
757 static void
758 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
759 {
760   struct SendElementClosure send_cls;
761
762   send_cls.ibf_key = ibf_key;
763   send_cls.eo = eo;
764   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
765                                                 &send_element_iterator, &send_cls);
766 }
767
768
769 /**
770  * Decode which elements are missing on each side, and
771  * send the appropriate elemens and requests
772  *
773  * @param eo union operation
774  */
775 static void
776 decode_and_send (struct OperationState *eo)
777 {
778   struct IBF_Key key;
779   struct IBF_Key last_key;
780   int side;
781   unsigned int num_decoded;
782   struct InvertibleBloomFilter *diff_ibf;
783
784   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
785
786   prepare_ibf (eo, eo->remote_ibf->size);
787   diff_ibf = ibf_dup (eo->local_ibf);
788   ibf_subtract (diff_ibf, eo->remote_ibf);
789   
790   ibf_destroy (eo->remote_ibf);
791   eo->remote_ibf = NULL;
792
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
794
795   num_decoded = 0;
796   last_key.key_val = 0;
797
798   while (1)
799   {
800     int res;
801     int cycle_detected = GNUNET_NO;
802
803     last_key = key;
804
805     res = ibf_decode (diff_ibf, &side, &key);
806     if (res == GNUNET_OK)
807     {
808       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
809                   key.key_val);
810       num_decoded += 1;
811       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
812       {
813         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
814                     num_decoded, diff_ibf->size);
815         cycle_detected = GNUNET_YES;
816       }
817     }
818     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
819     {
820       int next_order;
821       next_order = 0;
822       while (1<<next_order < diff_ibf->size)
823         next_order++;
824       next_order++;
825       if (next_order <= MAX_IBF_ORDER)
826       {
827         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
828                     "decoding failed, sending larger ibf (size %u)\n",
829                     1<<next_order);
830         send_ibf (eo, next_order);
831       }
832       else
833       {
834         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
835                     "set union failed: reached ibf limit\n");
836       }
837       break;
838     }
839     if (GNUNET_NO == res)
840     {
841       struct GNUNET_MQ_Envelope *ev;
842
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
844       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
845       GNUNET_MQ_send (eo->mq, ev);
846       break;
847     }
848     if (1 == side)
849     {
850       send_elements_for_key (eo, key);
851     }
852     else if (-1 == side)
853     {
854       struct GNUNET_MQ_Envelope *ev;
855       struct GNUNET_MessageHeader *msg;
856
857       /* FIXME: before sending the request, check if we may just have the element */
858       /* FIXME: merge multiple requests */
859       /* FIXME: remember somewhere that we already requested the element,
860        * so that we don't request it again with the next ibf if decoding fails */
861       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
862                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
863       
864       *(struct IBF_Key *) &msg[1] = key;
865       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
866       GNUNET_MQ_send (eo->mq, ev);
867     }
868     else
869     {
870       GNUNET_assert (0);
871     }
872   }
873   ibf_destroy (diff_ibf);
874 }
875
876
877 /**
878  * Handle an IBF message from a remote peer.
879  *
880  * @param cls the union operation
881  * @param mh the header of the message
882  */
883 static void
884 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
885 {
886   struct OperationState *eo = cls;
887   struct IBFMessage *msg = (struct IBFMessage *) mh;
888   unsigned int buckets_in_message;
889
890   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
891        (eo->phase == PHASE_EXPECT_IBF) )
892   {
893     eo->phase = PHASE_EXPECT_IBF_CONT;
894     GNUNET_assert (NULL == eo->remote_ibf);
895     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
896     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
897     eo->ibf_buckets_received = 0;
898     if (0 != ntohs (msg->offset))
899     {
900       GNUNET_break (0);
901       fail_union_operation (eo);
902       return;
903     }
904   }
905   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
906   {
907     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
908          (1<<msg->order != eo->remote_ibf->size) )
909     {
910       GNUNET_break (0);
911       fail_union_operation (eo);
912       return;
913     }
914   }
915
916   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
917
918   if (0 == buckets_in_message)
919   {
920     GNUNET_break_op (0);
921     fail_union_operation (eo);
922     return;
923   }
924
925   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
926   {
927     GNUNET_break (0);
928     fail_union_operation (eo);
929     return;
930   }
931   
932   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
933   eo->ibf_buckets_received += buckets_in_message;
934
935   if (eo->ibf_buckets_received == eo->remote_ibf->size)
936   {
937     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
938     eo->phase = PHASE_EXPECT_ELEMENTS;
939     decode_and_send (eo);
940   }
941 }
942
943
944 /**
945  * Send a result message to the client indicating
946  * that there is a new element.
947  *
948  * @param eo union operation
949  * @param element element to send
950  */
951 static void
952 send_client_element (struct OperationState *eo,
953                      struct GNUNET_SET_Element *element)
954 {
955   struct GNUNET_MQ_Envelope *ev;
956   struct GNUNET_SET_ResultMessage *rm;
957
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
959   GNUNET_assert (0 != eo->spec->client_request_id);
960   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
961   if (NULL == ev)
962   {
963     GNUNET_MQ_discard (ev);
964     GNUNET_break (0);
965     return;
966   }
967   rm->result_status = htons (GNUNET_SET_STATUS_OK);
968   rm->request_id = htonl (eo->spec->client_request_id);
969   rm->element_type = element->type;
970   memcpy (&rm[1], element->data, element->size);
971   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
972 }
973
974
975 /**
976  * Send a result message to the client indicating
977  * that the operation is over.
978  * After the result done message has been sent to the client,
979  * destroy the evaluate operation.
980  *
981  * @param eo union operation
982  */
983 static void
984 send_client_done_and_destroy (struct OperationState *eo)
985 {
986   struct GNUNET_MQ_Envelope *ev;
987   struct GNUNET_SET_ResultMessage *rm;
988
989   GNUNET_assert (GNUNET_NO == eo->client_done_sent);
990
991   eo->client_done_sent = GNUNET_YES;
992
993   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
994   rm->request_id = htonl (eo->spec->client_request_id);
995   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
996   rm->element_type = htons (0);
997   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
998
999   union_operation_destroy (eo);
1000 }
1001
1002
1003 /**
1004  * Handle an element message from a remote peer.
1005  *
1006  * @param cls the union operation
1007  * @param mh the message
1008  */
1009 static void
1010 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1011 {
1012   struct OperationState *eo = cls;
1013   struct ElementEntry *ee;
1014   uint16_t element_size;
1015
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1017
1018   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1019        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1020   {
1021     fail_union_operation (eo);
1022     GNUNET_break (0);
1023     return;
1024   }
1025   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1026   ee = GNUNET_malloc (sizeof *eo + element_size);
1027   memcpy (&ee[1], &mh[1], element_size);
1028   ee->element.size = element_size;
1029   ee->element.data = &ee[1];
1030   ee->remote = GNUNET_YES;
1031   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1032
1033   /* FIXME: see if the element has already been inserted! */
1034
1035   op_register_element (eo, ee);
1036   send_client_element (eo, &ee->element);
1037 }
1038
1039
1040 /**
1041  * Handle an element request from a remote peer.
1042  *
1043  * @param cls the union operation
1044  * @param mh the message
1045  */
1046 static void
1047 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1048 {
1049   struct OperationState *eo = cls;
1050   struct IBF_Key *ibf_key;
1051   unsigned int num_keys;
1052
1053   /* look up elements and send them */
1054   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1055   {
1056     GNUNET_break (0);
1057     fail_union_operation (eo);
1058     return;
1059   }
1060
1061   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1062
1063   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1064   {
1065     GNUNET_break (0);
1066     fail_union_operation (eo);
1067     return;
1068   }
1069
1070   ibf_key = (struct IBF_Key *) &mh[1];
1071   while (0 != num_keys--)
1072   {
1073     send_elements_for_key (eo, *ibf_key);
1074     ibf_key++;
1075   }
1076 }
1077
1078
1079 /**
1080  * Handle a 'DIE' message from the remote peer.
1081  * This indicates that the other peer is terminated.
1082  * 
1083  * @param cls the union operation
1084  * @param mh the message
1085  */
1086 static void
1087 handle_p2p_die (void *cls, const struct GNUNET_MessageHeader *mh)
1088 {
1089   struct OperationState *eo = cls;
1090
1091   send_client_done_and_destroy (eo);
1092 }
1093
1094
1095 /**
1096  * Handle a done message from a remote peer
1097  * 
1098  * @param cls the union operation
1099  * @param mh the message
1100  */
1101 static void
1102 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1103 {
1104   struct OperationState *eo = cls;
1105   struct GNUNET_MQ_Envelope *ev;
1106
1107   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1108   {
1109     /* we got all requests, but still have to send our elements as response */
1110
1111     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1112     eo->phase = PHASE_FINISHED;
1113     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1114     GNUNET_MQ_send (eo->mq, ev);
1115     return;
1116   }
1117   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1118   {
1119     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE, trying to send DIE\n");
1120     /* send the die message, which might not even be delivered,
1121      * as we could have shut down before that */
1122     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DIE);
1123     eo->phase = PHASE_FINISHED;
1124     send_client_done_and_destroy (eo);
1125     return;
1126   }
1127   GNUNET_break (0);
1128   fail_union_operation (eo);
1129 }
1130
1131
1132 /**
1133  * Evaluate a union operation with
1134  * a remote peer.
1135  *
1136  * @param spec specification of the operation the evaluate
1137  * @param tunnel tunnel already connected to the partner peer
1138  * @param tc tunnel context, passed here so all new incoming
1139  *        messages are directly going to the union operations
1140  * @return a handle to the operation
1141  */
1142 static void
1143 union_evaluate (struct OperationSpecification *spec,
1144                 struct GNUNET_MESH_Tunnel *tunnel,
1145                 struct TunnelContext *tc)
1146 {
1147   struct OperationState *eo;
1148
1149   eo = GNUNET_new (struct OperationState);
1150   tc->vt = _GSS_union_vt ();
1151   tc->op = eo;
1152   eo->se = strata_estimator_dup (spec->set->state->se);
1153   eo->set = spec->set;
1154   eo->spec = spec;
1155   eo->tunnel = tunnel;
1156   eo->mq = GNUNET_MESH_mq_create (tunnel);
1157
1158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1159               "evaluating union operation, (app %s)\n", 
1160               GNUNET_h2s (&eo->spec->app_id));
1161
1162   /* we started the operation, thus we have to send the operation request */
1163   eo->phase = PHASE_EXPECT_SE;
1164
1165   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1166                                eo->set->state->ops_tail,
1167                                eo);
1168
1169   send_operation_request (eo);
1170 }
1171
1172
1173 /**
1174  * Accept an union operation request from a remote peer
1175  *
1176  * @param spec all necessary information about the operation
1177  * @param tunnel open tunnel to the partner's peer
1178  * @param tc tunnel context, passed here so all new incoming
1179  *        messages are directly going to the union operations
1180  * @return operation
1181  */
1182 static void
1183 union_accept (struct OperationSpecification *spec,
1184               struct GNUNET_MESH_Tunnel *tunnel,
1185               struct TunnelContext *tc)
1186 {
1187   struct OperationState *eo;
1188
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1190
1191   eo = GNUNET_new (struct OperationState);
1192   tc->vt = _GSS_union_vt ();
1193   tc->op = eo;
1194   eo->set = spec->set;
1195   eo->generation_created = eo->set->current_generation++;
1196   eo->spec = spec;
1197   eo->tunnel = tunnel;
1198   eo->mq = GNUNET_MESH_mq_create (tunnel);
1199   eo->se = strata_estimator_dup (eo->set->state->se);
1200   /* transfer ownership of mq and socket from incoming to eo */
1201   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1202                                eo->set->state->ops_tail,
1203                                eo);
1204   /* kick off the operation */
1205   send_strata_estimator (eo);
1206 }
1207
1208
1209 /**
1210  * Create a new set supporting the union operation
1211  *
1212  * @return the newly created set
1213  */
1214 static struct SetState *
1215 union_set_create (void)
1216 {
1217   struct SetState *set_state;
1218
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1220   
1221   set_state = GNUNET_new (struct SetState);
1222   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1223                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1224   return set_state;
1225 }
1226
1227
1228 /**
1229  * Add the element from the given element message to the set.
1230  *
1231  * @param set_state state of the set want to add to
1232  * @param element the element to add to the set
1233  */
1234 static void
1235 union_add (struct SetState *set_state, struct ElementEntry *ee)
1236 {
1237   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1238 }
1239
1240
1241 /**
1242  * Destroy a set that supports the union operation
1243  *
1244  * @param set_state the set to destroy
1245  */
1246 static void
1247 union_set_destroy (struct SetState *set_state)
1248 {
1249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1250   /* important to destroy operations before the rest of the set */
1251   while (NULL != set_state->ops_head)
1252     union_operation_destroy (set_state->ops_head);
1253   if (NULL != set_state->se)
1254   {
1255     strata_estimator_destroy (set_state->se);
1256     set_state->se = NULL;
1257   }
1258   GNUNET_free (set_state);
1259 }
1260
1261
1262 /**
1263  * Remove the element given in the element message from the set.
1264  * Only marks the element as removed, so that older set operations can still exchange it.
1265  *
1266  * @param set_state state of the set to remove from
1267  * @param element set element to remove
1268  */
1269 static void
1270 union_remove (struct SetState *set_state, struct ElementEntry *element)
1271 {
1272   /* FIXME: remove from strata estimator */
1273 }
1274
1275
1276 /**
1277  * Dispatch messages for a union operation.
1278  *
1279  * @param eo the state of the union evaluate operation
1280  * @param mh the received message
1281  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1282  *         GNUNET_OK otherwise
1283  */
1284 int
1285 union_handle_p2p_message (struct OperationState *eo,
1286                           const struct GNUNET_MessageHeader *mh)
1287 {
1288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1289               ntohs (mh->type), ntohs (mh->size));
1290   switch (ntohs (mh->type))
1291   {
1292     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1293       handle_p2p_ibf (eo, mh);
1294       break;
1295     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1296       handle_p2p_strata_estimator (eo, mh);
1297       break;
1298     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1299       handle_p2p_elements (eo, mh);
1300       break;
1301     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1302       handle_p2p_element_requests (eo, mh);
1303       break;
1304     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1305       handle_p2p_done (eo, mh);
1306       break;
1307     case GNUNET_MESSAGE_TYPE_SET_P2P_DIE:
1308       handle_p2p_die (eo, mh);
1309       break;
1310     default:
1311       /* something wrong with mesh's message handlers? */
1312       GNUNET_assert (0);
1313   }
1314   return GNUNET_OK;
1315 }
1316
1317
1318 static void
1319 union_peer_disconnect (struct OperationState *op)
1320 {
1321   /* Are we already disconnected? */
1322   if (NULL == op->tunnel)
1323     return;
1324   op->tunnel = NULL;
1325   if (NULL != op->mq)
1326   {
1327     GNUNET_MQ_destroy (op->mq);
1328     op->mq = NULL;
1329   }
1330   if (PHASE_FINISHED != op->phase)
1331   {
1332     struct GNUNET_MQ_Envelope *ev;
1333     struct GNUNET_SET_ResultMessage *msg;
1334
1335     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1336     msg->request_id = htonl (op->spec->client_request_id);
1337     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1338     msg->element_type = htons (0);
1339     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1340     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1341     union_operation_destroy (op);
1342     return;
1343   }
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1345   /* maybe the other peer did not get to send his 'DIE' message before he died? */
1346   if (GNUNET_NO == op->client_done_sent)
1347     send_client_done_and_destroy (op);
1348 }
1349
1350
1351 static void
1352 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1353 {
1354   /* FIXME: implement */
1355 }
1356
1357
1358 const struct SetVT *
1359 _GSS_union_vt ()
1360 {
1361   static const struct SetVT union_vt = {
1362     .create = &union_set_create,
1363     .msg_handler = &union_handle_p2p_message,
1364     .add = &union_add,
1365     .remove = &union_remove,
1366     .destroy_set = &union_set_destroy,
1367     .evaluate = &union_evaluate,
1368     .accept = &union_accept,
1369     .peer_disconnect = &union_peer_disconnect,
1370     .cancel = &union_op_cancel,
1371   };
1372
1373   return &union_vt;
1374 }