session time out for http client/server
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "ibf.h"
30 #include "strata_estimator.h"
31 #include "set_protocol.h"
32 #include <gcrypt.h>
33
34
35 /**
36  * Number of IBFs in a strata estimator.
37  */
38 #define SE_STRATA_COUNT 32
39 /**
40  * Size of the IBFs in the strata estimator.
41  */
42 #define SE_IBF_SIZE 80
43 /**
44  * hash num parameter for the difference digests and strata estimators
45  */
46 #define SE_IBF_HASH_NUM 4
47
48 /**
49  * Number of buckets that can be transmitted in one message.
50  */
51 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
52
53 /**
54  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55  * Choose this value so that computing the IBF is still cheaper
56  * than transmitting all values.
57  */
58 #define MAX_IBF_ORDER (16)
59
60 /**
61  * Number of buckets used in the ibf per estimated
62  * difference.
63  */
64 #define IBF_ALPHA 4
65
66
67 /**
68  * Current phase we are in for a union operation.
69  */
70 enum UnionOperationPhase
71 {
72   /**
73    * We sent the request message, and expect a strata estimator
74    */
75   PHASE_EXPECT_SE,
76   /**
77    * We sent the strata estimator, and expect an IBF. This phase is entered once
78    * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79    *
80    * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
81    */
82   PHASE_EXPECT_IBF,
83   /**
84    * Continuation for multi part IBFs.
85    */
86   PHASE_EXPECT_IBF_CONT,
87   /**
88    * We are sending request and elements,
89    * and thus only expect elements from the other peer.
90    *
91    * We are currently decoding an IBF until it can no longer be decoded,
92    * we currently send requests and expect elements
93    * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
94    */
95   PHASE_EXPECT_ELEMENTS,
96   /**
97    * We are expecting elements and requests, and send
98    * requested elements back to the other peer.
99    *
100    * We are in this phase if we have SENT an IBF for the remote peer to decode.
101    * We expect requests, send elements or could receive an new IBF, which takes
102    * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
103    *
104    * The remote peer is thus in:
105    * PHASE_EXPECT_ELEMENTS
106    */
107   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
108   /**
109    * The protocol is over.
110    * Results may still have to be sent to the client.
111    */
112   PHASE_FINISHED
113 };
114
115
116 /**
117  * State of an evaluate operation
118  * with another peer.
119  */
120 struct OperationState
121 {
122   /**
123    * Number of ibf buckets received
124    */
125   unsigned int ibf_buckets_received;
126
127   /**
128    * Copy of the set's strata estimator at the time of
129    * creation of this operation
130    */
131   struct StrataEstimator *se;
132
133   /**
134    * The ibf we currently receive
135    */
136   struct InvertibleBloomFilter *remote_ibf;
137
138   /**
139    * IBF of the set's element.
140    */
141   struct InvertibleBloomFilter *local_ibf;
142
143   /**
144    * Maps IBF-Keys (specific to the current salt) to elements.
145    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
146    * Colliding IBF-Keys are linked.
147    */
148   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
149
150   /**
151    * Iterator for sending elements on the key to element mapping to the client.
152    */
153   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
154
155   /**
156    * Current state of the operation.
157    */
158   enum UnionOperationPhase phase;
159
160   /**
161    * Did we send the client that we are done?
162    */
163   int client_done_sent;
164 };
165
166
167 /**
168  * The key entry is used to associate an ibf key with
169  * an element.
170  */
171 struct KeyEntry
172 {
173   /**
174    * IBF key for the entry, derived from the current salt.
175    */
176   struct IBF_Key ibf_key;
177
178   /**
179    * The actual element associated with the key.
180    */
181   struct ElementEntry *element;
182
183   /**
184    * Element that collides with this element
185    * on the ibf key. All colliding entries must have the same ibf key.
186    */
187   struct KeyEntry *next_colliding;
188 };
189
190
191 /**
192  * Used as a closure for sending elements
193  * with a specific IBF key.
194  */
195 struct SendElementClosure
196 {
197   /**
198    * The IBF key whose matching elements should be
199    * sent.
200    */
201   struct IBF_Key ibf_key;
202
203   /**
204    * Operation for which the elements
205    * should be sent.
206    */
207   struct Operation *op;
208 };
209
210
211 /**
212  * Extra state required for efficient set union.
213  */
214 struct SetState
215 {
216   /**
217    * The strata estimator is only generated once for
218    * each set.
219    * The IBF keys are derived from the element hashes with
220    * salt=0.
221    */
222   struct StrataEstimator *se;
223 };
224
225
226 /**
227  * Iterator over hash map entries.
228  *
229  * @param cls closure
230  * @param key current key code
231  * @param value value in the hash map
232  * @return #GNUNET_YES if we should continue to
233  *         iterate,
234  *         #GNUNET_NO if not.
235  */
236 static int
237 destroy_key_to_element_iter (void *cls,
238                              uint32_t key,
239                              void *value)
240 {
241   struct KeyEntry *k = value;
242   /* destroy the linked list of colliding ibf key entries */
243   while (NULL != k)
244   {
245     struct KeyEntry *k_tmp = k;
246     k = k->next_colliding;
247     if (GNUNET_YES == k_tmp->element->remote)
248     {
249       GNUNET_free (k_tmp->element);
250       k_tmp->element = NULL;
251     }
252     GNUNET_free (k_tmp);
253   }
254   return GNUNET_YES;
255 }
256
257
258 /**
259  * Destroy the union operation.  Only things specific to the union operation are destroyed.
260  *
261  * @param op union operation to destroy
262  */
263 static void
264 union_op_cancel (struct Operation *op)
265 {
266   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
267   /* check if the op was canceled twice */
268   GNUNET_assert (NULL != op->state);
269   if (NULL != op->state->remote_ibf)
270   {
271     ibf_destroy (op->state->remote_ibf);
272     op->state->remote_ibf = NULL;
273   }
274   if (NULL != op->state->local_ibf)
275   {
276     ibf_destroy (op->state->local_ibf);
277     op->state->local_ibf = NULL;
278   }
279   if (NULL != op->state->se)
280   {
281     strata_estimator_destroy (op->state->se);
282     op->state->se = NULL;
283   }
284   if (NULL != op->state->key_to_element)
285   {
286     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
287     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
288     op->state->key_to_element = NULL;
289   }
290   GNUNET_free (op->state);
291   op->state = NULL;
292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
293 }
294
295
296 /**
297  * Inform the client that the union operation has failed,
298  * and proceed to destroy the evaluate operation.
299  *
300  * @param op the union operation to fail
301  */
302 static void
303 fail_union_operation (struct Operation *op)
304 {
305   struct GNUNET_MQ_Envelope *ev;
306   struct GNUNET_SET_ResultMessage *msg;
307
308   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
309
310   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
311   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
312   msg->request_id = htonl (op->spec->client_request_id);
313   msg->element_type = htons (0);
314   GNUNET_MQ_send (op->spec->set->client_mq, ev);
315   _GSS_operation_destroy (op);
316 }
317
318
319 /**
320  * Derive the IBF key from a hash code and
321  * a salt.
322  *
323  * @param src the hash code
324  * @param salt salt to use
325  * @return the derived IBF key
326  */
327 static struct IBF_Key
328 get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
329 {
330   struct IBF_Key key;
331
332   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
333                       GCRY_MD_SHA512, GCRY_MD_SHA256,
334                       src, sizeof *src,
335                       &salt, sizeof (salt),
336                       NULL, 0);
337   return key;
338 }
339
340
341 /**
342  * Send a request for the evaluate operation to a remote peer
343  *
344  * @param op operation with the other peer
345  */
346 static void
347 send_operation_request (struct Operation *op)
348 {
349   struct GNUNET_MQ_Envelope *ev;
350   struct OperationRequestMessage *msg;
351
352   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
353                                 op->spec->context_msg);
354
355   if (NULL == ev)
356   {
357     /* the context message is too large */
358     GNUNET_break (0);
359     GNUNET_SERVER_client_disconnect (op->spec->set->client);
360     return;
361   }
362   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
363   msg->app_id = op->spec->app_id;
364   msg->salt = htonl (op->spec->salt);
365   GNUNET_MQ_send (op->mq, ev);
366
367   if (NULL != op->spec->context_msg)
368     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
369   else
370     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
371
372   if (NULL != op->spec->context_msg)
373   {
374     GNUNET_free (op->spec->context_msg);
375     op->spec->context_msg = NULL;
376   }
377 }
378
379
380 /**
381  * Iterator to create the mapping between ibf keys
382  * and element entries.
383  *
384  * @param cls closure
385  * @param key current key code
386  * @param value value in the hash map
387  * @return #GNUNET_YES if we should continue to
388  *         iterate,
389  *         #GNUNET_NO if not.
390  */
391 static int
392 op_register_element_iterator (void *cls,
393                               uint32_t key,
394                               void *value)
395 {
396   struct KeyEntry *const new_k = cls;
397   struct KeyEntry *old_k = value;
398
399   GNUNET_assert (NULL != old_k);
400   /* check if our ibf key collides with the ibf key in the existing entry */
401   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
402   {
403     /* insert the the new key in the collision chain */
404     new_k->next_colliding = old_k->next_colliding;
405     old_k->next_colliding = new_k;
406     /* signal to the caller that we were able to insert into a colliding bucket */
407     return GNUNET_NO;
408   }
409   return GNUNET_YES;
410 }
411
412
413 /**
414  * Iterator to create the mapping between ibf keys
415  * and element entries.
416  *
417  * @param cls closure
418  * @param key current key code
419  * @param value value in the hash map
420  * @return #GNUNET_YES if we should continue to
421  *         iterate,
422  *         #GNUNET_NO if not.
423  */
424 static int
425 op_has_element_iterator (void *cls,
426                          uint32_t key,
427                          void *value)
428 {
429   struct GNUNET_HashCode *element_hash = cls;
430   struct KeyEntry *k = value;
431
432   GNUNET_assert (NULL != k);
433   while (NULL != k)
434   {
435     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
436       return GNUNET_NO;
437     k = k->next_colliding;
438   }
439   return GNUNET_YES;
440 }
441
442
443 /**
444  * Determine whether the given element is already in the operation's element
445  * set.
446  *
447  * @param op operation that should be tested for 'element_hash'
448  * @param element_hash hash of the element to look for
449  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
450  */
451 static int
452 op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
453 {
454   int ret;
455   struct IBF_Key ibf_key;
456
457   ibf_key = get_ibf_key (element_hash, op->spec->salt);
458   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
459                                                       (uint32_t) ibf_key.key_val,
460                                                       op_has_element_iterator, (void *) element_hash);
461
462   /* was the iteration aborted because we found the element? */
463   if (GNUNET_SYSERR == ret)
464     return GNUNET_YES;
465   return GNUNET_NO;
466 }
467
468
469 /**
470  * Insert an element into the union operation's
471  * key-to-element mapping. Takes ownership of 'ee'.
472  * Note that this does not insert the element in the set,
473  * only in the operation's key-element mapping.
474  * This is done to speed up re-tried operations, if some elements
475  * were transmitted, and then the IBF fails to decode.
476  *
477  * @param op the union operation
478  * @param ee the element entry
479  */
480 static void
481 op_register_element (struct Operation *op, struct ElementEntry *ee)
482 {
483   int ret;
484   struct IBF_Key ibf_key;
485   struct KeyEntry *k;
486
487   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
488   k = GNUNET_new (struct KeyEntry);
489   k->element = ee;
490   k->ibf_key = ibf_key;
491   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
492                                                       (uint32_t) ibf_key.key_val,
493                                                       op_register_element_iterator, k);
494
495   /* was the element inserted into a colliding bucket? */
496   if (GNUNET_SYSERR == ret)
497     return;
498
499   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
500                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
501 }
502
503
504 /**
505  * Insert a key into an ibf.
506  *
507  * @param cls the ibf
508  * @param key unused
509  * @param value the key entry to get the key from
510  */
511 static int
512 prepare_ibf_iterator (void *cls,
513                       uint32_t key,
514                       void *value)
515 {
516   struct InvertibleBloomFilter *ibf = cls;
517   struct KeyEntry *ke = value;
518
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
520
521   ibf_insert (ibf, ke->ibf_key);
522   return GNUNET_YES;
523 }
524
525
526 /**
527  * Iterator for initializing the
528  * key-to-element mapping of a union operation
529  *
530  * @param cls the union operation
531  * @param key unised
532  * @param value the element entry to insert
533  *        into the key-to-element mapping
534  * @return GNUNET_YES to continue iterating,
535  *         GNUNET_NO to stop
536  */
537 static int
538 init_key_to_element_iterator (void *cls,
539                               const struct GNUNET_HashCode *key,
540                               void *value)
541 {
542   struct Operation *op = cls;
543   struct ElementEntry *e = value;
544
545   /* make sure that the element belongs to the set at the time
546    * of creating the operation */
547   if ( (e->generation_added > op->generation_created) ||
548        ( (GNUNET_YES == e->removed) &&
549          (e->generation_removed < op->generation_created)))
550     return GNUNET_YES;
551
552   GNUNET_assert (GNUNET_NO == e->remote);
553
554   op_register_element (op, e);
555   return GNUNET_YES;
556 }
557
558
559 /**
560  * Create an ibf with the operation's elements
561  * of the specified size
562  *
563  * @param op the union operation
564  * @param size size of the ibf to create
565  */
566 static void
567 prepare_ibf (struct Operation *op, uint16_t size)
568 {
569   if (NULL == op->state->key_to_element)
570   {
571     unsigned int len;
572     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
573     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
574     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
575                                            init_key_to_element_iterator, op);
576   }
577   if (NULL != op->state->local_ibf)
578     ibf_destroy (op->state->local_ibf);
579   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
580   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
581                                            prepare_ibf_iterator, op->state->local_ibf);
582 }
583
584
585 /**
586  * Send an ibf of appropriate size.
587  *
588  * @param op the union operation
589  * @param ibf_order order of the ibf to send, size=2^order
590  */
591 static void
592 send_ibf (struct Operation *op, uint16_t ibf_order)
593 {
594   unsigned int buckets_sent = 0;
595   struct InvertibleBloomFilter *ibf;
596
597   prepare_ibf (op, 1<<ibf_order);
598
599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
600
601   ibf = op->state->local_ibf;
602
603   while (buckets_sent < (1 << ibf_order))
604   {
605     unsigned int buckets_in_message;
606     struct GNUNET_MQ_Envelope *ev;
607     struct IBFMessage *msg;
608
609     buckets_in_message = (1 << ibf_order) - buckets_sent;
610     /* limit to maximum */
611     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
612       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
613
614     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
615                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
616     msg->reserved = 0;
617     msg->order = ibf_order;
618     msg->offset = htons (buckets_sent);
619     ibf_write_slice (ibf, buckets_sent,
620                      buckets_in_message, &msg[1]);
621     buckets_sent += buckets_in_message;
622     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
623                 buckets_in_message, buckets_sent, 1<<ibf_order);
624     GNUNET_MQ_send (op->mq, ev);
625   }
626
627   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
628 }
629
630
631 /**
632  * Send a strata estimator to the remote peer.
633  *
634  * @param op the union operation with the remote peer
635  */
636 static void
637 send_strata_estimator (struct Operation *op)
638 {
639   struct GNUNET_MQ_Envelope *ev;
640   struct GNUNET_MessageHeader *strata_msg;
641
642   ev = GNUNET_MQ_msg_header_extra (strata_msg,
643                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
644                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
645   strata_estimator_write (op->state->se, &strata_msg[1]);
646   GNUNET_MQ_send (op->mq, ev);
647   op->state->phase = PHASE_EXPECT_IBF;
648   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
649 }
650
651
652 /**
653  * Compute the necessary order of an ibf
654  * from the size of the symmetric set difference.
655  *
656  * @param diff the difference
657  * @return the required size of the ibf
658  */
659 static unsigned int
660 get_order_from_difference (unsigned int diff)
661 {
662   unsigned int ibf_order;
663
664   ibf_order = 2;
665   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
666     ibf_order++;
667   if (ibf_order > MAX_IBF_ORDER)
668     ibf_order = MAX_IBF_ORDER;
669   return ibf_order;
670 }
671
672
673 /**
674  * Handle a strata estimator from a remote peer
675  *
676  * @param cls the union operation
677  * @param mh the message
678  */
679 static void
680 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
681 {
682   struct Operation *op = cls;
683   struct StrataEstimator *remote_se;
684   int diff;
685
686   if (op->state->phase != PHASE_EXPECT_SE)
687   {
688     fail_union_operation (op);
689     GNUNET_break (0);
690     return;
691   }
692   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
693                                        SE_IBF_HASH_NUM);
694   strata_estimator_read (&mh[1], remote_se);
695   GNUNET_assert (NULL != op->state->se);
696   diff = strata_estimator_difference (remote_se, op->state->se);
697   strata_estimator_destroy (remote_se);
698   strata_estimator_destroy (op->state->se);
699   op->state->se = NULL;
700   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
701               diff, 1<<get_order_from_difference (diff));
702   send_ibf (op, get_order_from_difference (diff));
703 }
704
705
706
707 /**
708  * Iterator to send elements to a remote peer
709  *
710  * @param cls closure with the element key and the union operation
711  * @param key ignored
712  * @param value the key entry
713  */
714 static int
715 send_element_iterator (void *cls,
716                        uint32_t key,
717                        void *value)
718 {
719   struct SendElementClosure *sec = cls;
720   struct IBF_Key ibf_key = sec->ibf_key;
721   struct Operation *op = sec->op;
722   struct KeyEntry *ke = value;
723
724   if (ke->ibf_key.key_val != ibf_key.key_val)
725     return GNUNET_YES;
726   while (NULL != ke)
727   {
728     const struct GNUNET_SET_Element *const element = &ke->element->element;
729     struct GNUNET_MQ_Envelope *ev;
730     struct GNUNET_MessageHeader *mh;
731
732     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
733     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
734     if (NULL == ev)
735     {
736       /* element too large */
737       GNUNET_break (0);
738       continue;
739     }
740     memcpy (&mh[1], element->data, element->size);
741     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
742                 GNUNET_h2s (&ke->element->element_hash));
743     GNUNET_MQ_send (op->mq, ev);
744     ke = ke->next_colliding;
745   }
746   return GNUNET_NO;
747 }
748
749 /**
750  * Send all elements that have the specified IBF key
751  * to the remote peer of the union operation
752  *
753  * @param op union operation
754  * @param ibf_key IBF key of interest
755  */
756 static void
757 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
758 {
759   struct SendElementClosure send_cls;
760
761   send_cls.ibf_key = ibf_key;
762   send_cls.op = op;
763   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
764                                                        (uint32_t) ibf_key.key_val,
765                                                        &send_element_iterator, &send_cls);
766 }
767
768
769 /**
770  * Decode which elements are missing on each side, and
771  * send the appropriate elemens and requests
772  *
773  * @param op union operation
774  */
775 static void
776 decode_and_send (struct Operation *op)
777 {
778   struct IBF_Key key;
779   struct IBF_Key last_key;
780   int side;
781   unsigned int num_decoded;
782   struct InvertibleBloomFilter *diff_ibf;
783
784   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
785
786   prepare_ibf (op, op->state->remote_ibf->size);
787   diff_ibf = ibf_dup (op->state->local_ibf);
788   ibf_subtract (diff_ibf, op->state->remote_ibf);
789
790   ibf_destroy (op->state->remote_ibf);
791   op->state->remote_ibf = NULL;
792
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
794
795   num_decoded = 0;
796   last_key.key_val = 0;
797
798   while (1)
799   {
800     int res;
801     int cycle_detected = GNUNET_NO;
802
803     last_key = key;
804
805     res = ibf_decode (diff_ibf, &side, &key);
806     if (res == GNUNET_OK)
807     {
808       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
809                   key.key_val);
810       num_decoded += 1;
811       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
812       {
813         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
814                     num_decoded, diff_ibf->size);
815         cycle_detected = GNUNET_YES;
816       }
817     }
818     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
819     {
820       int next_order;
821       next_order = 0;
822       while (1<<next_order < diff_ibf->size)
823         next_order++;
824       next_order++;
825       if (next_order <= MAX_IBF_ORDER)
826       {
827         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828                     "decoding failed, sending larger ibf (size %u)\n",
829                     1<<next_order);
830         send_ibf (op, next_order);
831       }
832       else
833       {
834         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
835                     "set union failed: reached ibf limit\n");
836       }
837       break;
838     }
839     if (GNUNET_NO == res)
840     {
841       struct GNUNET_MQ_Envelope *ev;
842
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
844       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
845       GNUNET_MQ_send (op->mq, ev);
846       break;
847     }
848     if (1 == side)
849     {
850       send_elements_for_key (op, key);
851     }
852     else if (-1 == side)
853     {
854       struct GNUNET_MQ_Envelope *ev;
855       struct GNUNET_MessageHeader *msg;
856
857       /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
858        * the effort additional complexity. */
859       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
860                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
861
862       *(struct IBF_Key *) &msg[1] = key;
863       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
864       GNUNET_MQ_send (op->mq, ev);
865     }
866     else
867     {
868       GNUNET_assert (0);
869     }
870   }
871   ibf_destroy (diff_ibf);
872 }
873
874
875 /**
876  * Handle an IBF message from a remote peer.
877  *
878  * @param cls the union operation
879  * @param mh the header of the message
880  */
881 static void
882 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
883 {
884   struct Operation *op = cls;
885   struct IBFMessage *msg = (struct IBFMessage *) mh;
886   unsigned int buckets_in_message;
887
888   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
889        (op->state->phase == PHASE_EXPECT_IBF) )
890   {
891     op->state->phase = PHASE_EXPECT_IBF_CONT;
892     GNUNET_assert (NULL == op->state->remote_ibf);
893     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
894     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
895     op->state->ibf_buckets_received = 0;
896     if (0 != ntohs (msg->offset))
897     {
898       GNUNET_break (0);
899       fail_union_operation (op);
900       return;
901     }
902   }
903   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
904   {
905     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
906          (1<<msg->order != op->state->remote_ibf->size) )
907     {
908       GNUNET_break (0);
909       fail_union_operation (op);
910       return;
911     }
912   }
913
914   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
915
916   if (0 == buckets_in_message)
917   {
918     GNUNET_break_op (0);
919     fail_union_operation (op);
920     return;
921   }
922
923   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
924   {
925     GNUNET_break (0);
926     fail_union_operation (op);
927     return;
928   }
929
930   ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
931   op->state->ibf_buckets_received += buckets_in_message;
932
933   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
934   {
935     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
936     op->state->phase = PHASE_EXPECT_ELEMENTS;
937     decode_and_send (op);
938   }
939 }
940
941
942 /**
943  * Send a result message to the client indicating
944  * that there is a new element.
945  *
946  * @param op union operation
947  * @param element element to send
948  */
949 static void
950 send_client_element (struct Operation *op,
951                      struct GNUNET_SET_Element *element)
952 {
953   struct GNUNET_MQ_Envelope *ev;
954   struct GNUNET_SET_ResultMessage *rm;
955
956   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
957   GNUNET_assert (0 != op->spec->client_request_id);
958   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
959   if (NULL == ev)
960   {
961     GNUNET_MQ_discard (ev);
962     GNUNET_break (0);
963     return;
964   }
965   rm->result_status = htons (GNUNET_SET_STATUS_OK);
966   rm->request_id = htonl (op->spec->client_request_id);
967   rm->element_type = element->type;
968   memcpy (&rm[1], element->data, element->size);
969   GNUNET_MQ_send (op->spec->set->client_mq, ev);
970 }
971
972
973 /**
974  * Signal to the client that the operation has finished and
975  * destroy the operation.
976  *
977  * @param cls operation to destroy
978  */
979 static void
980 send_done_and_destroy (void *cls)
981 {
982   struct Operation *op = cls;
983   struct GNUNET_MQ_Envelope *ev;
984   struct GNUNET_SET_ResultMessage *rm;
985   int keep = op->keep;
986   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
987   rm->request_id = htonl (op->spec->client_request_id);
988   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
989   rm->element_type = htons (0);
990   GNUNET_MQ_send (op->spec->set->client_mq, ev);
991   _GSS_operation_destroy (op);
992   if (GNUNET_YES == keep)
993     GNUNET_free (op);
994 }
995
996
997 /**
998  * Send all remaining elements in the full result iterator.
999  *
1000  * @param cls operation
1001  */
1002 static void
1003 send_remaining_elements (void *cls)
1004 {
1005   struct Operation *op = cls;
1006   struct KeyEntry *ke;
1007   int res;
1008
1009   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
1010   if (GNUNET_NO == res)
1011   {
1012     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
1013     send_done_and_destroy (op);
1014     return;
1015   }
1016
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
1018
1019   while (1)
1020   {
1021     struct GNUNET_MQ_Envelope *ev;
1022     struct GNUNET_SET_ResultMessage *rm;
1023     struct GNUNET_SET_Element *element;
1024     element = &ke->element->element;
1025
1026     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
1027     GNUNET_assert (0 != op->spec->client_request_id);
1028     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1029     if (NULL == ev)
1030     {
1031       GNUNET_MQ_discard (ev);
1032       GNUNET_break (0);
1033       continue;
1034     }
1035     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1036     rm->request_id = htonl (op->spec->client_request_id);
1037     rm->element_type = element->type;
1038     memcpy (&rm[1], element->data, element->size);
1039     if (ke->next_colliding == NULL)
1040     {
1041       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1042       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1043       break;
1044     }
1045     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1046     ke = ke->next_colliding;
1047   }
1048 }
1049
1050
1051 /**
1052  * Send a result message to the client indicating
1053  * that the operation is over.
1054  * After the result done message has been sent to the client,
1055  * destroy the evaluate operation.
1056  *
1057  * @param op union operation
1058  */
1059 static void
1060 finish_and_destroy (struct Operation *op)
1061 {
1062   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1063
1064   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1065   {
1066     /* prevent that the op is free'd by the tunnel end handler */
1067     op->keep = GNUNET_YES;
1068     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1069     GNUNET_assert (NULL == op->state->full_result_iter);
1070     op->state->full_result_iter =
1071         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1072     send_remaining_elements (op);
1073     return;
1074   }
1075   send_done_and_destroy (op);
1076 }
1077
1078
1079 /**
1080  * Handle an element message from a remote peer.
1081  *
1082  * @param cls the union operation
1083  * @param mh the message
1084  */
1085 static void
1086 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1087 {
1088   struct Operation *op = cls;
1089   struct ElementEntry *ee;
1090   uint16_t element_size;
1091
1092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1093
1094   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1095        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1096   {
1097     fail_union_operation (op);
1098     GNUNET_break (0);
1099     return;
1100   }
1101   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1102   ee = GNUNET_malloc (sizeof *ee + element_size);
1103   memcpy (&ee[1], &mh[1], element_size);
1104   ee->element.size = element_size;
1105   ee->element.data = &ee[1];
1106   ee->remote = GNUNET_YES;
1107   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1108
1109   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1110   {
1111     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
1112     GNUNET_free (ee);
1113     return;
1114   }
1115
1116   op_register_element (op, ee);
1117   /* only send results immediately if the client wants it */
1118   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1119     send_client_element (op, &ee->element);
1120 }
1121
1122
1123 /**
1124  * Handle an element request from a remote peer.
1125  *
1126  * @param cls the union operation
1127  * @param mh the message
1128  */
1129 static void
1130 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1131 {
1132   struct Operation *op = cls;
1133   struct IBF_Key *ibf_key;
1134   unsigned int num_keys;
1135
1136   /* look up elements and send them */
1137   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1138   {
1139     GNUNET_break (0);
1140     fail_union_operation (op);
1141     return;
1142   }
1143
1144   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1145
1146   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1147   {
1148     GNUNET_break (0);
1149     fail_union_operation (op);
1150     return;
1151   }
1152
1153   ibf_key = (struct IBF_Key *) &mh[1];
1154   while (0 != num_keys--)
1155   {
1156     send_elements_for_key (op, *ibf_key);
1157     ibf_key++;
1158   }
1159 }
1160
1161
1162 /**
1163  * Handle a done message from a remote peer
1164  *
1165  * @param cls the union operation
1166  * @param mh the message
1167  */
1168 static void
1169 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1170 {
1171   struct Operation *op = cls;
1172   struct GNUNET_MQ_Envelope *ev;
1173
1174   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1175   {
1176     /* we got all requests, but still have to send our elements as response */
1177
1178     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1179     op->state->phase = PHASE_FINISHED;
1180     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1181     GNUNET_MQ_send (op->mq, ev);
1182     return;
1183   }
1184   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1185   {
1186     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1187     op->state->phase = PHASE_FINISHED;
1188     finish_and_destroy (op);
1189     return;
1190   }
1191   GNUNET_break (0);
1192   fail_union_operation (op);
1193 }
1194
1195
1196 /**
1197  * Evaluate a union operation with
1198  * a remote peer.
1199  *
1200  * @param op operation to evaluate
1201  */
1202 static void
1203 union_evaluate (struct Operation *op)
1204 {
1205   op->state = GNUNET_new (struct OperationState);
1206   // copy the current generation's strata estimator for this operation
1207   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1208   /* we started the operation, thus we have to send the operation request */
1209   op->state->phase = PHASE_EXPECT_SE;
1210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation\n");
1211   send_operation_request (op);
1212 }
1213
1214
1215 /**
1216  * Accept an union operation request from a remote peer.
1217  * Only initializes the private operation state.
1218  *
1219  * @param op operation that will be accepted as a union operation
1220  */
1221 static void
1222 union_accept (struct Operation *op)
1223 {
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1225   op->state = GNUNET_new (struct OperationState);
1226   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1227   /* kick off the operation */
1228   send_strata_estimator (op);
1229 }
1230
1231
1232 /**
1233  * Create a new set supporting the union operation
1234  *
1235  * We maintain one strata estimator per set and then manipulate it over the
1236  * lifetime of the set, as recreating a strata estimator would be expensive.
1237  *
1238  * @return the newly created set
1239  */
1240 static struct SetState *
1241 union_set_create (void)
1242 {
1243   struct SetState *set_state;
1244
1245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1246
1247   set_state = GNUNET_new (struct SetState);
1248   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1249                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1250   return set_state;
1251 }
1252
1253
1254 /**
1255  * Add the element from the given element message to the set.
1256  *
1257  * @param set_state state of the set want to add to
1258  * @param ee the element to add to the set
1259  */
1260 static void
1261 union_add (struct SetState *set_state, struct ElementEntry *ee)
1262 {
1263   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1264 }
1265
1266
1267 /**
1268  * Remove the element given in the element message from the set.
1269  * Only marks the element as removed, so that older set operations can still exchange it.
1270  *
1271  * @param set_state state of the set to remove from
1272  * @param ee set element to remove
1273  */
1274 static void
1275 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1276 {
1277   strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
1278 }
1279
1280
1281 /**
1282  * Destroy a set that supports the union operation.
1283  *
1284  * @param set_state the set to destroy
1285  */
1286 static void
1287 union_set_destroy (struct SetState *set_state)
1288 {
1289   if (NULL != set_state->se)
1290   {
1291     strata_estimator_destroy (set_state->se);
1292     set_state->se = NULL;
1293   }
1294   GNUNET_free (set_state);
1295 }
1296
1297
1298 /**
1299  * Dispatch messages for a union operation.
1300  *
1301  * @param op the state of the union evaluate operation
1302  * @param mh the received message
1303  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1304  *         GNUNET_OK otherwise
1305  */
1306 int
1307 union_handle_p2p_message (struct Operation *op,
1308                           const struct GNUNET_MessageHeader *mh)
1309 {
1310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1311               ntohs (mh->type), ntohs (mh->size));
1312   switch (ntohs (mh->type))
1313   {
1314     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1315       handle_p2p_ibf (op, mh);
1316       break;
1317     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1318       handle_p2p_strata_estimator (op, mh);
1319       break;
1320     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1321       handle_p2p_elements (op, mh);
1322       break;
1323     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1324       handle_p2p_element_requests (op, mh);
1325       break;
1326     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1327       handle_p2p_done (op, mh);
1328       break;
1329     default:
1330       /* something wrong with mesh's message handlers? */
1331       GNUNET_assert (0);
1332   }
1333   return GNUNET_OK;
1334 }
1335
1336 /**
1337  * handler for peer-disconnects, notifies the client
1338  * about the aborted operation in case the op was not concluded
1339  *
1340  * @param op the destroyed operation
1341  */
1342 static void
1343 union_peer_disconnect (struct Operation *op)
1344 {
1345   if (PHASE_FINISHED != op->state->phase)
1346   {
1347     struct GNUNET_MQ_Envelope *ev;
1348     struct GNUNET_SET_ResultMessage *msg;
1349
1350     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1351     msg->request_id = htonl (op->spec->client_request_id);
1352     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1353     msg->element_type = htons (0);
1354     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1355     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1356     _GSS_operation_destroy (op);
1357     return;
1358   }
1359   // else: the session has already been concluded
1360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1361   if (GNUNET_NO == op->state->client_done_sent)
1362     finish_and_destroy (op);
1363 }
1364
1365
1366 /**
1367  * Get the table with implementing functions for
1368  * set union.
1369  *
1370  * @return the operation specific VTable
1371  */
1372 const struct SetVT *
1373 _GSS_union_vt ()
1374 {
1375   static const struct SetVT union_vt = {
1376     .create = &union_set_create,
1377     .msg_handler = &union_handle_p2p_message,
1378     .add = &union_add,
1379     .remove = &union_remove,
1380     .destroy_set = &union_set_destroy,
1381     .evaluate = &union_evaluate,
1382     .accept = &union_accept,
1383     .peer_disconnect = &union_peer_disconnect,
1384     .cancel = &union_op_cancel,
1385   };
1386
1387   return &union_vt;
1388 }