fix use-after-free on exit
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 4
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52 /**
53  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54  * Choose this value so that computing the IBF is still cheaper
55  * than transmitting all values.
56  */
57 #define MAX_IBF_ORDER (16)
58
59 /**
60  * Number of buckets used in the ibf per estimated
61  * difference.
62  */
63 #define IBF_ALPHA 4
64
65
66 /**
67  * Current phase we are in for a union operation.
68  */
69 enum UnionOperationPhase
70 {
71   /**
72    * We sent the request message, and expect a strata estimator
73    */
74   PHASE_EXPECT_SE,
75
76   /**
77    * We sent the strata estimator, and expect an IBF. This phase is entered once
78    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79    *
80    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
81    */
82   PHASE_EXPECT_IBF,
83
84   /**
85    * Continuation for multi part IBFs.
86    */
87   PHASE_EXPECT_IBF_CONT,
88
89   /**
90    * We are sending request and elements,
91    * and thus only expect elements from the other peer.
92    *
93    * We are currently decoding an IBF until it can no longer be decoded,
94    * we currently send requests and expect elements
95    * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
96    */
97   PHASE_EXPECT_ELEMENTS,
98
99   /**
100    * We are expecting elements and requests, and send
101    * requested elements back to the other peer.
102    *
103    * We are in this phase if we have SENT an IBF for the remote peer to decode.
104    * We expect requests, send elements or could receive an new IBF, which takes
105    * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
106    *
107    * The remote peer is thus in:
108    * #PHASE_EXPECT_ELEMENTS
109    */
110   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
111
112   /**
113    * The protocol is over.
114    * Results may still have to be sent to the client.
115    */
116   PHASE_FINISHED
117 };
118
119
120 /**
121  * State of an evaluate operation with another peer.
122  */
123 struct OperationState
124 {
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145    * Colliding IBF-Keys are linked.
146    */
147   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148
149   /**
150    * Iterator for sending elements on the key to element mapping to the client.
151    */
152   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153
154   /**
155    * Current state of the operation.
156    */
157   enum UnionOperationPhase phase;
158
159   /**
160    * Did we send the client that we are done?
161    */
162   int client_done_sent;
163
164   /**
165    * Number of ibf buckets received
166    */
167   unsigned int ibf_buckets_received;
168
169 };
170
171
172 /**
173  * The key entry is used to associate an ibf key with an element.
174  */
175 struct KeyEntry
176 {
177   /**
178    * IBF key for the entry, derived from the current salt.
179    */
180   struct IBF_Key ibf_key;
181
182   /**
183    * The actual element associated with the key.
184    */
185   struct ElementEntry *element;
186
187   /**
188    * Element that collides with this element
189    * on the ibf key. All colliding entries must have the same ibf key.
190    */
191   struct KeyEntry *next_colliding;
192 };
193
194
195 /**
196  * Used as a closure for sending elements
197  * with a specific IBF key.
198  */
199 struct SendElementClosure
200 {
201   /**
202    * The IBF key whose matching elements should be
203    * sent.
204    */
205   struct IBF_Key ibf_key;
206
207   /**
208    * Operation for which the elements
209    * should be sent.
210    */
211   struct Operation *op;
212 };
213
214
215 /**
216  * Extra state required for efficient set union.
217  */
218 struct SetState
219 {
220   /**
221    * The strata estimator is only generated once for
222    * each set.
223    * The IBF keys are derived from the element hashes with
224    * salt=0.
225    */
226   struct StrataEstimator *se;
227 };
228
229
230 /**
231  * Iterator over hash map entries, called to
232  * destroy the linked list of colliding ibf key entries.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to
238  *         iterate,
239  *         #GNUNET_NO if not.
240  */
241 static int
242 destroy_key_to_element_iter (void *cls,
243                              uint32_t key,
244                              void *value)
245 {
246   struct KeyEntry *k = value;
247
248   while (NULL != k)
249   {
250     struct KeyEntry *k_tmp = k;
251
252     k = k->next_colliding;
253     if (GNUNET_YES == k_tmp->element->remote)
254     {
255       GNUNET_free (k_tmp->element);
256       k_tmp->element = NULL;
257     }
258     GNUNET_free (k_tmp);
259   }
260   return GNUNET_YES;
261 }
262
263
264 /**
265  * Destroy the union operation.  Only things specific to the union
266  * operation are destroyed.
267  *
268  * @param op union operation to destroy
269  */
270 static void
271 union_op_cancel (struct Operation *op)
272 {
273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
274               "destroying union op\n");
275   /* check if the op was canceled twice */
276   GNUNET_assert (NULL != op->state);
277   if (NULL != op->state->remote_ibf)
278   {
279     ibf_destroy (op->state->remote_ibf);
280     op->state->remote_ibf = NULL;
281   }
282   if (NULL != op->state->local_ibf)
283   {
284     ibf_destroy (op->state->local_ibf);
285     op->state->local_ibf = NULL;
286   }
287   if (NULL != op->state->se)
288   {
289     strata_estimator_destroy (op->state->se);
290     op->state->se = NULL;
291   }
292   if (NULL != op->state->key_to_element)
293   {
294     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
295                                              &destroy_key_to_element_iter,
296                                              NULL);
297     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
298     op->state->key_to_element = NULL;
299   }
300   GNUNET_free (op->state);
301   op->state = NULL;
302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
303               "destroying union op done\n");
304 }
305
306
307 /**
308  * Inform the client that the union operation has failed,
309  * and proceed to destroy the evaluate operation.
310  *
311  * @param op the union operation to fail
312  */
313 static void
314 fail_union_operation (struct Operation *op)
315 {
316   struct GNUNET_MQ_Envelope *ev;
317   struct GNUNET_SET_ResultMessage *msg;
318
319   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320               "union operation failed\n");
321   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
322   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
323   msg->request_id = htonl (op->spec->client_request_id);
324   msg->element_type = htons (0);
325   GNUNET_MQ_send (op->spec->set->client_mq, ev);
326   _GSS_operation_destroy (op, GNUNET_YES);
327 }
328
329
330 /**
331  * Derive the IBF key from a hash code and
332  * a salt.
333  *
334  * @param src the hash code
335  * @param salt salt to use
336  * @return the derived IBF key
337  */
338 static struct IBF_Key
339 get_ibf_key (const struct GNUNET_HashCode *src,
340              uint16_t salt)
341 {
342   struct IBF_Key key;
343
344   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
345                       GCRY_MD_SHA512, GCRY_MD_SHA256,
346                       src, sizeof *src,
347                       &salt, sizeof (salt),
348                       NULL, 0);
349   return key;
350 }
351
352
353 /**
354  * Iterator to create the mapping between ibf keys
355  * and element entries.
356  *
357  * @param cls closure
358  * @param key current key code
359  * @param value value in the hash map
360  * @return #GNUNET_YES if we should continue to iterate,
361  *         #GNUNET_NO if not.
362  */
363 static int
364 op_register_element_iterator (void *cls,
365                               uint32_t key,
366                               void *value)
367 {
368   struct KeyEntry *const new_k = cls;
369   struct KeyEntry *old_k = value;
370
371   GNUNET_assert (NULL != old_k);
372   /* check if our ibf key collides with the ibf key in the existing entry */
373   if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
374   {
375     /* insert the the new key in the collision chain */
376     new_k->next_colliding = old_k->next_colliding;
377     old_k->next_colliding = new_k;
378     /* signal to the caller that we were able to insert into a colliding bucket */
379     return GNUNET_NO;
380   }
381   return GNUNET_YES;
382 }
383
384
385 /**
386  * Iterator to create the mapping between ibf keys
387  * and element entries.
388  *
389  * @param cls closure
390  * @param key current key code
391  * @param value value in the hash map
392  * @return #GNUNET_YES (we should continue to iterate)
393  */
394 static int
395 op_has_element_iterator (void *cls,
396                          uint32_t key,
397                          void *value)
398 {
399   struct GNUNET_HashCode *element_hash = cls;
400   struct KeyEntry *k = value;
401
402   GNUNET_assert (NULL != k);
403   while (NULL != k)
404   {
405     if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
406                                      element_hash))
407       return GNUNET_NO;
408     k = k->next_colliding;
409   }
410   return GNUNET_YES;
411 }
412
413
414 /**
415  * Determine whether the given element is already in the operation's element
416  * set.
417  *
418  * @param op operation that should be tested for 'element_hash'
419  * @param element_hash hash of the element to look for
420  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
421  */
422 static int
423 op_has_element (struct Operation *op,
424                 const struct GNUNET_HashCode *element_hash)
425 {
426   int ret;
427   struct IBF_Key ibf_key;
428
429   ibf_key = get_ibf_key (element_hash, op->spec->salt);
430   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
431                                                       (uint32_t) ibf_key.key_val,
432                                                       op_has_element_iterator,
433                                                       (void *) element_hash);
434
435   /* was the iteration aborted because we found the element? */
436   if (GNUNET_SYSERR == ret)
437     return GNUNET_YES;
438   return GNUNET_NO;
439 }
440
441
442 /**
443  * Insert an element into the union operation's
444  * key-to-element mapping. Takes ownership of 'ee'.
445  * Note that this does not insert the element in the set,
446  * only in the operation's key-element mapping.
447  * This is done to speed up re-tried operations, if some elements
448  * were transmitted, and then the IBF fails to decode.
449  *
450  * @param op the union operation
451  * @param ee the element entry
452  */
453 static void
454 op_register_element (struct Operation *op,
455                      struct ElementEntry *ee)
456 {
457   int ret;
458   struct IBF_Key ibf_key;
459   struct KeyEntry *k;
460
461   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
462   k = GNUNET_new (struct KeyEntry);
463   k->element = ee;
464   k->ibf_key = ibf_key;
465   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
466                                                       (uint32_t) ibf_key.key_val,
467                                                       op_register_element_iterator,
468                                                       k);
469
470   /* was the element inserted into a colliding bucket? */
471   if (GNUNET_SYSERR == ret)
472     return;
473   GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
474                                        (uint32_t) ibf_key.key_val,
475                                        k,
476                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
477 }
478
479
480 /**
481  * Insert a key into an ibf.
482  *
483  * @param cls the ibf
484  * @param key unused
485  * @param value the key entry to get the key from
486  */
487 static int
488 prepare_ibf_iterator (void *cls,
489                       uint32_t key,
490                       void *value)
491 {
492   struct InvertibleBloomFilter *ibf = cls;
493   struct KeyEntry *ke = value;
494
495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496               "inserting %x into ibf\n",
497               ke->ibf_key.key_val);
498   ibf_insert (ibf, ke->ibf_key);
499   return GNUNET_YES;
500 }
501
502
503 /**
504  * Iterator for initializing the
505  * key-to-element mapping of a union operation
506  *
507  * @param cls the union operation `struct Operation *`
508  * @param key unused
509  * @param value the `struct ElementEntry *` to insert
510  *        into the key-to-element mapping
511  * @return #GNUNET_YES (to continue iterating)
512  */
513 static int
514 init_key_to_element_iterator (void *cls,
515                               const struct GNUNET_HashCode *key,
516                               void *value)
517 {
518   struct Operation *op = cls;
519   struct ElementEntry *e = value;
520
521   /* make sure that the element belongs to the set at the time
522    * of creating the operation */
523   if ( (e->generation_added > op->generation_created) ||
524        ( (GNUNET_YES == e->removed) &&
525          (e->generation_removed < op->generation_created)))
526     return GNUNET_YES;
527
528   GNUNET_assert (GNUNET_NO == e->remote);
529
530   op_register_element (op, e);
531   return GNUNET_YES;
532 }
533
534
535 /**
536  * Create an ibf with the operation's elements
537  * of the specified size
538  *
539  * @param op the union operation
540  * @param size size of the ibf to create
541  */
542 static void
543 prepare_ibf (struct Operation *op,
544              uint16_t size)
545 {
546   if (NULL == op->state->key_to_element)
547   {
548     unsigned int len;
549     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
550     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
551     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
552                                            init_key_to_element_iterator, op);
553   }
554   if (NULL != op->state->local_ibf)
555     ibf_destroy (op->state->local_ibf);
556   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
557   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
558                                            &prepare_ibf_iterator,
559                                            op->state->local_ibf);
560 }
561
562
563 /**
564  * Send an ibf of appropriate size.
565  *
566  * @param op the union operation
567  * @param ibf_order order of the ibf to send, size=2^order
568  */
569 static void
570 send_ibf (struct Operation *op,
571           uint16_t ibf_order)
572 {
573   unsigned int buckets_sent = 0;
574   struct InvertibleBloomFilter *ibf;
575
576   prepare_ibf (op, 1<<ibf_order);
577
578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
579               "sending ibf of size %u\n",
580               1<<ibf_order);
581
582   ibf = op->state->local_ibf;
583
584   while (buckets_sent < (1 << ibf_order))
585   {
586     unsigned int buckets_in_message;
587     struct GNUNET_MQ_Envelope *ev;
588     struct IBFMessage *msg;
589
590     buckets_in_message = (1 << ibf_order) - buckets_sent;
591     /* limit to maximum */
592     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
593       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
594
595     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
596                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
597     msg->reserved = 0;
598     msg->order = ibf_order;
599     msg->offset = htons (buckets_sent);
600     ibf_write_slice (ibf, buckets_sent,
601                      buckets_in_message, &msg[1]);
602     buckets_sent += buckets_in_message;
603     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604                 "ibf chunk size %u, %u/%u sent\n",
605                 buckets_in_message,
606                 buckets_sent,
607                 1<<ibf_order);
608     GNUNET_MQ_send (op->mq, ev);
609   }
610
611   op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
612 }
613
614
615 /**
616  * Send a strata estimator to the remote peer.
617  *
618  * @param op the union operation with the remote peer
619  */
620 static void
621 send_strata_estimator (struct Operation *op)
622 {
623   struct GNUNET_MQ_Envelope *ev;
624   struct GNUNET_MessageHeader *strata_msg;
625
626   ev = GNUNET_MQ_msg_header_extra (strata_msg,
627                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
628                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
629   strata_estimator_write (op->state->se, &strata_msg[1]);
630   GNUNET_MQ_send (op->mq, ev);
631   op->state->phase = PHASE_EXPECT_IBF;
632   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633               "sent SE, expecting IBF\n");
634 }
635
636
637 /**
638  * Compute the necessary order of an ibf
639  * from the size of the symmetric set difference.
640  *
641  * @param diff the difference
642  * @return the required size of the ibf
643  */
644 static unsigned int
645 get_order_from_difference (unsigned int diff)
646 {
647   unsigned int ibf_order;
648
649   ibf_order = 2;
650   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
651           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
652     ibf_order++;
653   if (ibf_order > MAX_IBF_ORDER)
654     ibf_order = MAX_IBF_ORDER;
655   return ibf_order;
656 }
657
658
659 /**
660  * Handle a strata estimator from a remote peer
661  *
662  * @param cls the union operation
663  * @param mh the message
664  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
665  *         #GNUNET_OK otherwise
666  */
667 static int
668 handle_p2p_strata_estimator (void *cls,
669                              const struct GNUNET_MessageHeader *mh)
670 {
671   struct Operation *op = cls;
672   struct StrataEstimator *remote_se;
673   int diff;
674
675   if (op->state->phase != PHASE_EXPECT_SE)
676   {
677     fail_union_operation (op);
678     GNUNET_break (0);
679     return GNUNET_SYSERR;
680   }
681   if (ntohs (mh->size) !=
682       SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE +
683       sizeof (struct GNUNET_MessageHeader))
684   {
685     fail_union_operation (op);
686     GNUNET_break (0);
687     return GNUNET_SYSERR;
688   }
689   remote_se = strata_estimator_create (SE_STRATA_COUNT,
690                                        SE_IBF_SIZE,
691                                        SE_IBF_HASH_NUM);
692   strata_estimator_read (&mh[1], remote_se);
693   GNUNET_assert (NULL != op->state->se);
694   diff = strata_estimator_difference (remote_se,
695                                       op->state->se);
696   strata_estimator_destroy (remote_se);
697   strata_estimator_destroy (op->state->se);
698   op->state->se = NULL;
699   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700               "got se diff=%d, using ibf size %d\n",
701               diff,
702               1<<get_order_from_difference (diff));
703   send_ibf (op,
704             get_order_from_difference (diff));
705   return GNUNET_OK;
706 }
707
708
709 /**
710  * Iterator to send elements to a remote peer
711  *
712  * @param cls closure with the element key and the union operation
713  * @param key ignored
714  * @param value the key entry
715  */
716 static int
717 send_element_iterator (void *cls,
718                        uint32_t key,
719                        void *value)
720 {
721   struct SendElementClosure *sec = cls;
722   struct IBF_Key ibf_key = sec->ibf_key;
723   struct Operation *op = sec->op;
724   struct KeyEntry *ke = value;
725
726   if (ke->ibf_key.key_val != ibf_key.key_val)
727     return GNUNET_YES;
728   while (NULL != ke)
729   {
730     const struct GNUNET_SET_Element *const element = &ke->element->element;
731     struct GNUNET_MQ_Envelope *ev;
732     struct GNUNET_MessageHeader *mh;
733
734     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
735     ev = GNUNET_MQ_msg_header_extra (mh,
736                                      element->size,
737                                      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
738     if (NULL == ev)
739     {
740       /* element too large */
741       GNUNET_break (0);
742       continue;
743     }
744     memcpy (&mh[1],
745             element->data,
746             element->size);
747     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748                 "sending element (%s) to peer\n",
749                 GNUNET_h2s (&ke->element->element_hash));
750     GNUNET_MQ_send (op->mq, ev);
751     ke = ke->next_colliding;
752   }
753   return GNUNET_NO;
754 }
755
756
757 /**
758  * Send all elements that have the specified IBF key
759  * to the remote peer of the union operation
760  *
761  * @param op union operation
762  * @param ibf_key IBF key of interest
763  */
764 static void
765 send_elements_for_key (struct Operation *op,
766                        struct IBF_Key ibf_key)
767 {
768   struct SendElementClosure send_cls;
769
770   send_cls.ibf_key = ibf_key;
771   send_cls.op = op;
772   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
773                                                        (uint32_t) ibf_key.key_val,
774                                                        &send_element_iterator,
775                                                        &send_cls);
776 }
777
778
779 /**
780  * Decode which elements are missing on each side, and
781  * send the appropriate elemens and requests
782  *
783  * @param op union operation
784  */
785 static void
786 decode_and_send (struct Operation *op)
787 {
788   struct IBF_Key key;
789   struct IBF_Key last_key;
790   int side;
791   unsigned int num_decoded;
792   struct InvertibleBloomFilter *diff_ibf;
793
794   GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
795
796   prepare_ibf (op, op->state->remote_ibf->size);
797   diff_ibf = ibf_dup (op->state->local_ibf);
798   ibf_subtract (diff_ibf, op->state->remote_ibf);
799
800   ibf_destroy (op->state->remote_ibf);
801   op->state->remote_ibf = NULL;
802
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804               "decoding IBF (size=%u)\n",
805               diff_ibf->size);
806
807   num_decoded = 0;
808   last_key.key_val = 0;
809
810   while (1)
811   {
812     int res;
813     int cycle_detected = GNUNET_NO;
814
815     last_key = key;
816
817     res = ibf_decode (diff_ibf, &side, &key);
818     if (res == GNUNET_OK)
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821                   "decoded ibf key %lx\n",
822                   key.key_val);
823       num_decoded += 1;
824       if ( (num_decoded > diff_ibf->size) ||
825            (num_decoded > 1 && last_key.key_val == key.key_val) )
826       {
827         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828                     "detected cyclic ibf (decoded %u/%u)\n",
829                     num_decoded,
830                     diff_ibf->size);
831         cycle_detected = GNUNET_YES;
832       }
833     }
834     if ( (GNUNET_SYSERR == res) ||
835          (GNUNET_YES == cycle_detected) )
836     {
837       int next_order;
838       next_order = 0;
839       while (1<<next_order < diff_ibf->size)
840         next_order++;
841       next_order++;
842       if (next_order <= MAX_IBF_ORDER)
843       {
844         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845                     "decoding failed, sending larger ibf (size %u)\n",
846                     1<<next_order);
847         send_ibf (op, next_order);
848       }
849       else
850       {
851         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
852                     "set union failed: reached ibf limit\n");
853       }
854       break;
855     }
856     if (GNUNET_NO == res)
857     {
858       struct GNUNET_MQ_Envelope *ev;
859
860       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861                   "transmitted all values, sending DONE\n");
862       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
863       GNUNET_MQ_send (op->mq, ev);
864       break;
865     }
866     if (1 == side)
867     {
868       send_elements_for_key (op, key);
869     }
870     else if (-1 == side)
871     {
872       struct GNUNET_MQ_Envelope *ev;
873       struct GNUNET_MessageHeader *msg;
874
875       /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
876        * the effort additional complexity. */
877       ev = GNUNET_MQ_msg_header_extra (msg,
878                                        sizeof (struct IBF_Key),
879                                        GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
880
881       memcpy (&msg[1],
882               &key,
883               sizeof (struct IBF_Key));
884       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                   "sending element request\n");
886       GNUNET_MQ_send (op->mq, ev);
887     }
888     else
889     {
890       GNUNET_assert (0);
891     }
892   }
893   ibf_destroy (diff_ibf);
894 }
895
896
897 /**
898  * Handle an IBF message from a remote peer.
899  *
900  * @param cls the union operation
901  * @param mh the header of the message
902  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
903  *         #GNUNET_OK otherwise
904  */
905 static int
906 handle_p2p_ibf (void *cls,
907                 const struct GNUNET_MessageHeader *mh)
908 {
909   struct Operation *op = cls;
910   const struct IBFMessage *msg;
911   unsigned int buckets_in_message;
912
913   if (ntohs (mh->size) < sizeof (struct IBFMessage))
914   {
915     GNUNET_break_op (0);
916     fail_union_operation (op);
917     return GNUNET_SYSERR;
918   }
919   msg = (const struct IBFMessage *) mh;
920   if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
921        (op->state->phase == PHASE_EXPECT_IBF) )
922   {
923     op->state->phase = PHASE_EXPECT_IBF_CONT;
924     GNUNET_assert (NULL == op->state->remote_ibf);
925     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926                 "Creating new ibf of size %u\n",
927                 1 << msg->order);
928     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
929     op->state->ibf_buckets_received = 0;
930     if (0 != ntohs (msg->offset))
931     {
932       GNUNET_break_op (0);
933       fail_union_operation (op);
934       return GNUNET_SYSERR;
935     }
936   }
937   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
938   {
939     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
940          (1<<msg->order != op->state->remote_ibf->size) )
941     {
942       GNUNET_break_op (0);
943       fail_union_operation (op);
944       return GNUNET_SYSERR;
945     }
946   }
947
948   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
949
950   if (0 == buckets_in_message)
951   {
952     GNUNET_break_op (0);
953     fail_union_operation (op);
954     return GNUNET_SYSERR;
955   }
956
957   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
958   {
959     GNUNET_break_op (0);
960     fail_union_operation (op);
961     return GNUNET_SYSERR;
962   }
963
964   ibf_read_slice (&msg[1],
965                   op->state->ibf_buckets_received,
966                   buckets_in_message,
967                   op->state->remote_ibf);
968   op->state->ibf_buckets_received += buckets_in_message;
969
970   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
971   {
972     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973                 "received full ibf\n");
974     op->state->phase = PHASE_EXPECT_ELEMENTS;
975     decode_and_send (op);
976   }
977   return GNUNET_OK;
978 }
979
980
981 /**
982  * Send a result message to the client indicating
983  * that there is a new element.
984  *
985  * @param op union operation
986  * @param element element to send
987  */
988 static void
989 send_client_element (struct Operation *op,
990                      struct GNUNET_SET_Element *element)
991 {
992   struct GNUNET_MQ_Envelope *ev;
993   struct GNUNET_SET_ResultMessage *rm;
994
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996               "sending element (size %u) to client\n",
997               element->size);
998   GNUNET_assert (0 != op->spec->client_request_id);
999   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1000   if (NULL == ev)
1001   {
1002     GNUNET_MQ_discard (ev);
1003     GNUNET_break (0);
1004     return;
1005   }
1006   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1007   rm->request_id = htonl (op->spec->client_request_id);
1008   rm->element_type = element->element_type;
1009   memcpy (&rm[1], element->data, element->size);
1010   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1011 }
1012
1013
1014 /**
1015  * Signal to the client that the operation has finished and
1016  * destroy the operation.
1017  *
1018  * @param cls operation to destroy
1019  */
1020 static void
1021 send_done_and_destroy (void *cls)
1022 {
1023   struct Operation *op = cls;
1024   struct GNUNET_MQ_Envelope *ev;
1025   struct GNUNET_SET_ResultMessage *rm;
1026
1027   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1028   rm->request_id = htonl (op->spec->client_request_id);
1029   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1030   rm->element_type = htons (0);
1031   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1032   _GSS_operation_destroy (op, GNUNET_YES);
1033   op->keep--;
1034   if (0 == op->keep)
1035     GNUNET_free (op);
1036 }
1037
1038
1039 /**
1040  * Send all remaining elements in the full result iterator.
1041  *
1042  * @param cls operation
1043  */
1044 static void
1045 send_remaining_elements (void *cls)
1046 {
1047   struct Operation *op = cls;
1048   struct KeyEntry *ke;
1049   int res;
1050
1051   res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter,
1052                                                        NULL,
1053                                                        (const void **) &ke);
1054   if (GNUNET_NO == res)
1055   {
1056     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057                 "sending done and destroy because iterator ran out\n");
1058     send_done_and_destroy (op);
1059     return;
1060   }
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "sending elements from key entry\n");
1063   while (1)
1064   {
1065     struct GNUNET_MQ_Envelope *ev;
1066     struct GNUNET_SET_ResultMessage *rm;
1067     struct GNUNET_SET_Element *element;
1068     element = &ke->element->element;
1069
1070     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                 "sending element (size %u) to client (full set)\n",
1072                 element->size);
1073     GNUNET_assert (0 != op->spec->client_request_id);
1074     ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1075     if (NULL == ev)
1076     {
1077       GNUNET_MQ_discard (ev);
1078       GNUNET_break (0);
1079       continue;
1080     }
1081     rm->result_status = htons (GNUNET_SET_STATUS_OK);
1082     rm->request_id = htonl (op->spec->client_request_id);
1083     rm->element_type = element->element_type;
1084     memcpy (&rm[1], element->data, element->size);
1085     if (NULL == ke->next_colliding)
1086     {
1087       GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1088       GNUNET_MQ_send (op->spec->set->client_mq, ev);
1089       break;
1090     }
1091     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1092     ke = ke->next_colliding;
1093   }
1094 }
1095
1096
1097 /**
1098  * Send a result message to the client indicating
1099  * that the operation is over.
1100  * After the result done message has been sent to the client,
1101  * destroy the evaluate operation.
1102  *
1103  * @param op union operation
1104  */
1105 static void
1106 finish_and_destroy (struct Operation *op)
1107 {
1108   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1109   op->keep++;
1110   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1111   {
1112     /* prevent that the op is free'd by the tunnel end handler */
1113     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114                 "sending full result set\n");
1115     GNUNET_assert (NULL == op->state->full_result_iter);
1116     op->state->full_result_iter =
1117         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1118     send_remaining_elements (op);
1119     return;
1120   }
1121   send_done_and_destroy (op);
1122 }
1123
1124
1125 /**
1126  * Handle an element message from a remote peer.
1127  *
1128  * @param cls the union operation
1129  * @param mh the message
1130  */
1131 static void
1132 handle_p2p_elements (void *cls,
1133                      const struct GNUNET_MessageHeader *mh)
1134 {
1135   struct Operation *op = cls;
1136   struct ElementEntry *ee;
1137   uint16_t element_size;
1138
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Got element from peer\n");
1141   if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1142        (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1143   {
1144     fail_union_operation (op);
1145     GNUNET_break_op (0);
1146     return;
1147   }
1148   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1149   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1150   memcpy (&ee[1], &mh[1], element_size);
1151   ee->element.size = element_size;
1152   ee->element.data = &ee[1];
1153   ee->remote = GNUNET_YES;
1154   GNUNET_CRYPTO_hash (ee->element.data,
1155                       ee->element.size,
1156                       &ee->element_hash);
1157
1158   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1159   {
1160     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161                 "got existing element from peer\n");
1162     GNUNET_free (ee);
1163     return;
1164   }
1165
1166   op_register_element (op, ee);
1167   /* only send results immediately if the client wants it */
1168   if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1169     send_client_element (op, &ee->element);
1170 }
1171
1172
1173 /**
1174  * Handle an element request from a remote peer.
1175  *
1176  * @param cls the union operation
1177  * @param mh the message
1178  */
1179 static void
1180 handle_p2p_element_requests (void *cls,
1181                              const struct GNUNET_MessageHeader *mh)
1182 {
1183   struct Operation *op = cls;
1184   const struct IBF_Key *ibf_key;
1185   unsigned int num_keys;
1186
1187   /* look up elements and send them */
1188   if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1189   {
1190     GNUNET_break_op (0);
1191     fail_union_operation (op);
1192     return;
1193   }
1194   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1195     / sizeof (struct IBF_Key);
1196   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1197       != num_keys * sizeof (struct IBF_Key))
1198   {
1199     GNUNET_break_op (0);
1200     fail_union_operation (op);
1201     return;
1202   }
1203
1204   ibf_key = (const struct IBF_Key *) &mh[1];
1205   while (0 != num_keys--)
1206   {
1207     send_elements_for_key (op, *ibf_key);
1208     ibf_key++;
1209   }
1210 }
1211
1212
1213 /**
1214  * Handle a done message from a remote peer
1215  *
1216  * @param cls the union operation
1217  * @param mh the message
1218  */
1219 static void
1220 handle_p2p_done (void *cls,
1221                  const struct GNUNET_MessageHeader *mh)
1222 {
1223   struct Operation *op = cls;
1224   struct GNUNET_MQ_Envelope *ev;
1225
1226   if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1227   {
1228     /* we got all requests, but still have to send our elements as response */
1229
1230     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1231                 "got DONE, sending final DONE after elements\n");
1232     op->state->phase = PHASE_FINISHED;
1233     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1234     GNUNET_MQ_send (op->mq, ev);
1235     return;
1236   }
1237   if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1238   {
1239     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240                 "got final DONE\n");
1241     op->state->phase = PHASE_FINISHED;
1242     finish_and_destroy (op);
1243     return;
1244   }
1245   GNUNET_break_op (0);
1246   fail_union_operation (op);
1247 }
1248
1249
1250 /**
1251  * Initiate operation to evaluate a set union with a remote peer.
1252  *
1253  * @param op operation to perform (to be initialized)
1254  * @param opaque_context message to be transmitted to the listener
1255  *        to convince him to accept, may be NULL
1256  */
1257 static void
1258 union_evaluate (struct Operation *op,
1259                 const struct GNUNET_MessageHeader *opaque_context)
1260 {
1261   struct GNUNET_MQ_Envelope *ev;
1262   struct OperationRequestMessage *msg;
1263
1264   op->state = GNUNET_new (struct OperationState);
1265   /* copy the current generation's strata estimator for this operation */
1266   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1267   /* we started the operation, thus we have to send the operation request */
1268   op->state->phase = PHASE_EXPECT_SE;
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270               "Initiating union operation evaluation\n");
1271   ev = GNUNET_MQ_msg_nested_mh (msg,
1272                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1273                                 opaque_context);
1274   if (NULL == ev)
1275   {
1276     /* the context message is too large */
1277     GNUNET_break (0);
1278     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1279     return;
1280   }
1281   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1282   msg->app_id = op->spec->app_id;
1283   GNUNET_MQ_send (op->mq, ev);
1284
1285   if (NULL != opaque_context)
1286     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                 "sent op request with context message\n");
1288   else
1289     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1290                 "sent op request without context message\n");
1291 }
1292
1293
1294 /**
1295  * Accept an union operation request from a remote peer.
1296  * Only initializes the private operation state.
1297  *
1298  * @param op operation that will be accepted as a union operation
1299  */
1300 static void
1301 union_accept (struct Operation *op)
1302 {
1303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1304               "accepting set union operation\n");
1305   op->state = GNUNET_new (struct OperationState);
1306   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1307   /* kick off the operation */
1308   send_strata_estimator (op);
1309 }
1310
1311
1312 /**
1313  * Create a new set supporting the union operation
1314  *
1315  * We maintain one strata estimator per set and then manipulate it over the
1316  * lifetime of the set, as recreating a strata estimator would be expensive.
1317  *
1318  * @return the newly created set
1319  */
1320 static struct SetState *
1321 union_set_create (void)
1322 {
1323   struct SetState *set_state;
1324
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "union set created\n");
1327   set_state = GNUNET_new (struct SetState);
1328   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1329                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1330   return set_state;
1331 }
1332
1333
1334 /**
1335  * Add the element from the given element message to the set.
1336  *
1337  * @param set_state state of the set want to add to
1338  * @param ee the element to add to the set
1339  */
1340 static void
1341 union_add (struct SetState *set_state, struct ElementEntry *ee)
1342 {
1343   strata_estimator_insert (set_state->se,
1344                            get_ibf_key (&ee->element_hash, 0));
1345 }
1346
1347
1348 /**
1349  * Remove the element given in the element message from the set.
1350  * Only marks the element as removed, so that older set operations can still exchange it.
1351  *
1352  * @param set_state state of the set to remove from
1353  * @param ee set element to remove
1354  */
1355 static void
1356 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1357 {
1358   strata_estimator_remove (set_state->se,
1359                            get_ibf_key (&ee->element_hash, 0));
1360 }
1361
1362
1363 /**
1364  * Destroy a set that supports the union operation.
1365  *
1366  * @param set_state the set to destroy
1367  */
1368 static void
1369 union_set_destroy (struct SetState *set_state)
1370 {
1371   if (NULL != set_state->se)
1372   {
1373     strata_estimator_destroy (set_state->se);
1374     set_state->se = NULL;
1375   }
1376   GNUNET_free (set_state);
1377 }
1378
1379
1380 /**
1381  * Dispatch messages for a union operation.
1382  *
1383  * @param op the state of the union evaluate operation
1384  * @param mh the received message
1385  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1386  *         #GNUNET_OK otherwise
1387  */
1388 int
1389 union_handle_p2p_message (struct Operation *op,
1390                           const struct GNUNET_MessageHeader *mh)
1391 {
1392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393               "received p2p message (t: %u, s: %u)\n",
1394               ntohs (mh->type),
1395               ntohs (mh->size));
1396   switch (ntohs (mh->type))
1397   {
1398     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1399       return handle_p2p_ibf (op, mh);
1400     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1401       return handle_p2p_strata_estimator (op, mh);
1402     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1403       handle_p2p_elements (op, mh);
1404       break;
1405     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1406       handle_p2p_element_requests (op, mh);
1407       break;
1408     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1409       handle_p2p_done (op, mh);
1410       break;
1411     default:
1412       /* something wrong with cadet's message handlers? */
1413       GNUNET_assert (0);
1414   }
1415   return GNUNET_OK;
1416 }
1417
1418 /**
1419  * handler for peer-disconnects, notifies the client
1420  * about the aborted operation in case the op was not concluded
1421  *
1422  * @param op the destroyed operation
1423  */
1424 static void
1425 union_peer_disconnect (struct Operation *op)
1426 {
1427   if (PHASE_FINISHED != op->state->phase)
1428   {
1429     struct GNUNET_MQ_Envelope *ev;
1430     struct GNUNET_SET_ResultMessage *msg;
1431
1432     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1433     msg->request_id = htonl (op->spec->client_request_id);
1434     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1435     msg->element_type = htons (0);
1436     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1437     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1438                 "other peer disconnected prematurely\n");
1439     _GSS_operation_destroy (op, GNUNET_YES);
1440     return;
1441   }
1442   // else: the session has already been concluded
1443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444               "other peer disconnected (finished)\n");
1445   if (GNUNET_NO == op->state->client_done_sent)
1446     finish_and_destroy (op);
1447 }
1448
1449
1450 /**
1451  * Get the table with implementing functions for
1452  * set union.
1453  *
1454  * @return the operation specific VTable
1455  */
1456 const struct SetVT *
1457 _GSS_union_vt ()
1458 {
1459   static const struct SetVT union_vt = {
1460     .create = &union_set_create,
1461     .msg_handler = &union_handle_p2p_message,
1462     .add = &union_add,
1463     .remove = &union_remove,
1464     .destroy_set = &union_set_destroy,
1465     .evaluate = &union_evaluate,
1466     .accept = &union_accept,
1467     .peer_disconnect = &union_peer_disconnect,
1468     .cancel = &union_op_cancel,
1469   };
1470
1471   return &union_vt;
1472 }