-ftbfs
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "ibf.h"
30 #include "strata_estimator.h"
31 #include "set_protocol.h"
32 #include <gcrypt.h>
33
34
35 /**
36  * Number of IBFs in a strata estimator.
37  */
38 #define SE_STRATA_COUNT 32
39 /**
40  * Size of the IBFs in the strata estimator.
41  */
42 #define SE_IBF_SIZE 80
43 /**
44  * hash num parameter for the difference digests and strata estimators
45  */
46 #define SE_IBF_HASH_NUM 4
47
48 /**
49  * Number of buckets that can be transmitted in one message.
50  */
51 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
52
53 /**
54  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55  * Choose this value so that computing the IBF is still cheaper
56  * than transmitting all values.
57  */
58 #define MAX_IBF_ORDER (16)
59
60 /**
61  * Number of buckets used in the ibf per estimated
62  * difference.
63  */
64 #define IBF_ALPHA 4
65
66
67 /**
68  * Current phase we are in for a union operation.
69  */
70 enum UnionOperationPhase
71 {
72   /**
73    * We sent the request message, and expect a strata estimator
74    */
75   PHASE_EXPECT_SE,
76   /**
77    * We sent the strata estimator, and expect an IBF
78    */
79   PHASE_EXPECT_IBF,
80   /**
81    * We know what type of IBF the other peer wants to send us,
82    * and expect the remaining parts
83    */
84   PHASE_EXPECT_IBF_CONT,
85   /**
86    * We are sending request and elements,
87    * and thus only expect elements from the other peer.
88    */
89   PHASE_EXPECT_ELEMENTS,
90   /**
91    * We are expecting elements and requests, and send
92    * requested elements back to the other peer.
93    */
94   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
95   /**
96    * The protocol is over.
97    * Results may still have to be sent to the client.
98    */
99   PHASE_FINISHED
100 };
101
102
103 /**
104  * State of an evaluate operation
105  * with another peer.
106  */
107 struct OperationState
108 {
109   /**
110    * Tunnel to the remote peer.
111    */
112   struct GNUNET_MESH_Tunnel *tunnel;
113
114   /**
115    * Detail information about the set operation,
116    * including the set to use.
117    */
118   struct OperationSpecification *spec;
119
120   /**
121    * Message queue for the peer.
122    */
123   struct GNUNET_MQ_Handle *mq;
124
125   /**
126    * Number of ibf buckets received
127    */
128   unsigned int ibf_buckets_received;
129
130   /**
131    * Copy of the set's strata estimator at the time of
132    * creation of this operation
133    */
134   struct StrataEstimator *se;
135
136   /**
137    * The ibf we currently receive
138    */
139   struct InvertibleBloomFilter *remote_ibf;
140
141   /**
142    * IBF of the set's element.
143    */
144   struct InvertibleBloomFilter *local_ibf;
145
146   /**
147    * Maps IBF-Keys (specific to the current salt) to elements.
148    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
149    * Colliding IBF-Keys are linked.
150    */
151   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152
153   /**
154    * Current state of the operation.
155    */
156   enum UnionOperationPhase phase;
157
158   /**
159    * Generation in which the operation handle
160    * was created.
161    */
162   unsigned int generation_created;
163
164   /**
165    * Set state of the set that this operation
166    * belongs to.
167    */
168   struct Set *set;
169
170   /**
171    * Evaluate operations are held in
172    * a linked list.
173    */
174   struct OperationState *next;
175
176    /**
177     * Evaluate operations are held in
178     * a linked list.
179     */
180   struct OperationState *prev;
181
182   /**
183    * Did we send the client that we are done?
184    */
185   int client_done_sent;
186 };
187
188
189 /**
190  * The key entry is used to associate an ibf key with
191  * an element.
192  */
193 struct KeyEntry
194 {
195   /**
196    * IBF key for the entry, derived from the current salt.
197    */
198   struct IBF_Key ibf_key;
199
200   /**
201    * The actual element associated with the key
202    */
203   struct ElementEntry *element;
204
205   /**
206    * Element that collides with this element
207    * on the ibf key
208    */
209   struct KeyEntry *next_colliding;
210 };
211
212
213 /**
214  * Used as a closure for sending elements
215  * with a specific IBF key.
216  */
217 struct SendElementClosure
218 {
219   /**
220    * The IBF key whose matching elements should be
221    * sent.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * Operation for which the elements
227    * should be sent.
228    */
229   struct OperationState *eo;
230 };
231
232
233 /**
234  * Extra state required for efficient set union.
235  */
236 struct SetState
237 {
238   /**
239    * The strata estimator is only generated once for
240    * each set.
241    * The IBF keys are derived from the element hashes with
242    * salt=0.
243    */
244   struct StrataEstimator *se;
245
246   /**
247    * Evaluate operations are held in
248    * a linked list.
249    */
250   struct OperationState *ops_head;
251
252   /**
253    * Evaluate operations are held in
254    * a linked list.
255    */
256   struct OperationState *ops_tail;
257 };
258
259
260 /**
261  * Iterator over hash map entries.
262  *
263  * @param cls closure
264  * @param key current key code
265  * @param value value in the hash map
266  * @return GNUNET_YES if we should continue to
267  *         iterate,
268  *         GNUNET_NO if not.
269  */
270 static int
271 destroy_key_to_element_iter (void *cls,
272                              uint32_t key,
273                              void *value)
274 {
275   struct KeyEntry *k = value;
276
277   while (NULL != k)
278   {
279     struct KeyEntry *k_tmp = k;
280     k = k->next_colliding;
281     if (GNUNET_YES == k_tmp->element->remote)
282     {
283       GNUNET_free (k_tmp->element);
284       k_tmp->element = NULL;
285     }
286     GNUNET_free (k_tmp);
287   }
288   return GNUNET_YES;
289 }
290
291
292 /**
293  * Destroy a union operation, and free all resources
294  * associated with it.
295  *
296  * @param eo the union operation to destroy
297  */
298 static void
299 union_operation_destroy (struct OperationState *eo)
300 {
301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
302   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
303                                eo->set->state->ops_tail,
304                                eo);
305   if (NULL != eo->mq)
306   {
307     GNUNET_MQ_destroy (eo->mq);
308     eo->mq = NULL;
309   }
310   if (NULL != eo->tunnel)
311   {
312     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
313     eo->tunnel = NULL;
314     GNUNET_MESH_tunnel_destroy (t);
315   }
316   if (NULL != eo->remote_ibf)
317   {
318     ibf_destroy (eo->remote_ibf);
319     eo->remote_ibf = NULL;
320   }
321   if (NULL != eo->local_ibf)
322   {
323     ibf_destroy (eo->local_ibf);
324     eo->local_ibf = NULL;
325   }
326   if (NULL != eo->se)
327   {
328     strata_estimator_destroy (eo->se);
329     eo->se = NULL;
330   }
331   if (NULL != eo->key_to_element)
332   {
333     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
334     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
335     eo->key_to_element = NULL;
336   }
337   if (NULL != eo->spec)
338   {
339     if (NULL != eo->spec->context_msg)
340     {
341       GNUNET_free (eo->spec->context_msg);
342       eo->spec->context_msg = NULL;
343     }
344     GNUNET_free (eo->spec);
345     eo->spec = NULL;
346   }
347   GNUNET_free (eo);
348
349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
350
351   /* FIXME: do a garbage collection of the set generations */
352 }
353
354
355 /**
356  * Inform the client that the union operation has failed,
357  * and proceed to destroy the evaluate operation.
358  *
359  * @param eo the union operation to fail
360  */
361 static void
362 fail_union_operation (struct OperationState *eo)
363 {
364   struct GNUNET_MQ_Envelope *ev;
365   struct GNUNET_SET_ResultMessage *msg;
366
367   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369   msg->request_id = htonl (eo->spec->client_request_id);
370   msg->element_type = htons (0);
371   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
372   union_operation_destroy (eo);
373 }
374
375
376 /**
377  * Derive the IBF key from a hash code and
378  * a salt.
379  *
380  * @param src the hash code
381  * @param salt salt to use
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
386 {
387   struct IBF_Key key;
388
389   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
390                       GCRY_MD_SHA512, GCRY_MD_SHA256,
391                       src, sizeof *src,
392                       &salt, sizeof (salt),
393                       NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Send a request for the evaluate operation to a remote peer
400  *
401  * @param eo operation with the other peer
402  */
403 static void
404 send_operation_request (struct OperationState *eo)
405 {
406   struct GNUNET_MQ_Envelope *ev;
407   struct OperationRequestMessage *msg;
408
409   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
410                                 eo->spec->context_msg);
411
412   if (NULL == ev)
413   {
414     /* the context message is too large */
415     GNUNET_break (0);
416     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
417     return;
418   }
419   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
420   msg->app_id = eo->spec->app_id;
421   msg->salt = htonl (eo->spec->salt);
422   GNUNET_MQ_send (eo->mq, ev);
423
424   if (NULL != eo->spec->context_msg)
425     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
426   else
427     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
428
429   if (NULL != eo->spec->context_msg)
430   {
431     GNUNET_free (eo->spec->context_msg);
432     eo->spec->context_msg = NULL;
433   }
434
435 }
436
437
438 /**
439  * Iterator to create the mapping between ibf keys
440  * and element entries.
441  *
442  * @param cls closure
443  * @param key current key code
444  * @param value value in the hash map
445  * @return GNUNET_YES if we should continue to
446  *         iterate,
447  *         GNUNET_NO if not.
448  */
449 static int
450 op_register_element_iterator (void *cls,
451                          uint32_t key,
452                          void *value)
453 {
454   struct KeyEntry *const new_k = cls;
455   struct KeyEntry *old_k = value;
456
457   GNUNET_assert (NULL != old_k);
458   do
459   {
460     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
461     {
462       new_k->next_colliding = old_k->next_colliding;
463       old_k->next_colliding = new_k;
464       return GNUNET_NO;
465     }
466     old_k = old_k->next_colliding;
467   } while (NULL != old_k);
468   return GNUNET_YES;
469 }
470
471
472 /**
473  * Insert an element into the union operation's
474  * key-to-element mapping. Takes ownership of 'ee'.
475  * Note that this does not insert the element in the set,
476  * only in the operation's key-element mapping.
477  * This is done to speed up re-tried operations, if some elements
478  * were transmitted, and then the IBF fails to decode.
479  *
480  * @param eo the union operation
481  * @param ee the element entry
482  */
483 static void
484 op_register_element (struct OperationState *eo, struct ElementEntry *ee)
485 {
486   int ret;
487   struct IBF_Key ibf_key;
488   struct KeyEntry *k;
489
490   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
491   k = GNUNET_new (struct KeyEntry);
492   k->element = ee;
493   k->ibf_key = ibf_key;
494   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
495                                                       (uint32_t) ibf_key.key_val,
496                                                       op_register_element_iterator, k);
497
498   /* was the element inserted into a colliding bucket? */
499   if (GNUNET_SYSERR == ret)
500     return;
501
502   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
503                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
504 }
505
506
507 /**
508  * Insert a key into an ibf.
509  *
510  * @param cls the ibf
511  * @param key unused
512  * @param value the key entry to get the key from
513  */
514 static int
515 prepare_ibf_iterator (void *cls,
516                       uint32_t key,
517                       void *value)
518 {
519   struct InvertibleBloomFilter *ibf = cls;
520   struct KeyEntry *ke = value;
521
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
523
524   ibf_insert (ibf, ke->ibf_key);
525   return GNUNET_YES;
526 }
527
528
529 /**
530  * Iterator for initializing the
531  * key-to-element mapping of a union operation
532  *
533  * @param cls the union operation
534  * @param key unised
535  * @param value the element entry to insert
536  *        into the key-to-element mapping
537  * @return GNUNET_YES to continue iterating,
538  *         GNUNET_NO to stop
539  */
540 static int
541 init_key_to_element_iterator (void *cls,
542                               const struct GNUNET_HashCode *key,
543                               void *value)
544 {
545   struct OperationState *eo = cls;
546   struct ElementEntry *e = value;
547
548   /* make sure that the element belongs to the set at the time
549    * of creating the operation */
550   if ( (e->generation_added > eo->generation_created) ||
551        ( (GNUNET_YES == e->removed) &&
552          (e->generation_removed < eo->generation_created)))
553     return GNUNET_YES;
554
555   GNUNET_assert (GNUNET_NO == e->remote);
556
557   op_register_element (eo, e);
558   return GNUNET_YES;
559 }
560
561
562 /**
563  * Create an ibf with the operation's elements
564  * of the specified size
565  *
566  * @param eo the union operation
567  * @param size size of the ibf to create
568  */
569 static void
570 prepare_ibf (struct OperationState *eo, uint16_t size)
571 {
572   if (NULL == eo->key_to_element)
573   {
574     unsigned int len;
575     len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
576     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
577     GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
578                                            init_key_to_element_iterator, eo);
579   }
580   if (NULL != eo->local_ibf)
581     ibf_destroy (eo->local_ibf);
582   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
583   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
584                                            prepare_ibf_iterator, eo->local_ibf);
585 }
586
587
588 /**
589  * Send an ibf of appropriate size.
590  *
591  * @param eo the union operation
592  * @param ibf_order order of the ibf to send, size=2^order
593  */
594 static void
595 send_ibf (struct OperationState *eo, uint16_t ibf_order)
596 {
597   unsigned int buckets_sent = 0;
598   struct InvertibleBloomFilter *ibf;
599
600   prepare_ibf (eo, 1<<ibf_order);
601
602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
603
604   ibf = eo->local_ibf;
605
606   while (buckets_sent < (1 << ibf_order))
607   {
608     unsigned int buckets_in_message;
609     struct GNUNET_MQ_Envelope *ev;
610     struct IBFMessage *msg;
611
612     buckets_in_message = (1 << ibf_order) - buckets_sent;
613     /* limit to maximum */
614     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
615       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
616
617     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
618                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
619     msg->reserved = 0;
620     msg->order = ibf_order;
621     msg->offset = htons (buckets_sent);
622     ibf_write_slice (ibf, buckets_sent,
623                      buckets_in_message, &msg[1]);
624     buckets_sent += buckets_in_message;
625     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
626                 buckets_in_message, buckets_sent, 1<<ibf_order);
627     GNUNET_MQ_send (eo->mq, ev);
628   }
629
630   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
631 }
632
633
634 /**
635  * Send a strata estimator to the remote peer.
636  *
637  * @param eo the union operation with the remote peer
638  */
639 static void
640 send_strata_estimator (struct OperationState *eo)
641 {
642   struct GNUNET_MQ_Envelope *ev;
643   struct GNUNET_MessageHeader *strata_msg;
644
645   ev = GNUNET_MQ_msg_header_extra (strata_msg,
646                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
647                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
648   strata_estimator_write (eo->set->state->se, &strata_msg[1]);
649   GNUNET_MQ_send (eo->mq, ev);
650   eo->phase = PHASE_EXPECT_IBF;
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
652 }
653
654
655 /**
656  * Compute the necessary order of an ibf
657  * from the size of the symmetric set difference.
658  *
659  * @param diff the difference
660  * @return the required size of the ibf
661  */
662 static unsigned int
663 get_order_from_difference (unsigned int diff)
664 {
665   unsigned int ibf_order;
666
667   ibf_order = 2;
668   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
669     ibf_order++;
670   if (ibf_order > MAX_IBF_ORDER)
671     ibf_order = MAX_IBF_ORDER;
672   return ibf_order;
673 }
674
675
676 /**
677  * Handle a strata estimator from a remote peer
678  *
679  * @param cls the union operation
680  * @param mh the message
681  */
682 static void
683 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
684 {
685   struct OperationState *eo = cls;
686   struct StrataEstimator *remote_se;
687   int diff;
688
689   if (eo->phase != PHASE_EXPECT_SE)
690   {
691     fail_union_operation (eo);
692     GNUNET_break (0);
693     return;
694   }
695   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
696                                        SE_IBF_HASH_NUM);
697   strata_estimator_read (&mh[1], remote_se);
698   GNUNET_assert (NULL != eo->se);
699   diff = strata_estimator_difference (remote_se, eo->se);
700   strata_estimator_destroy (remote_se);
701   strata_estimator_destroy (eo->se);
702   eo->se = NULL;
703   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
704               diff, 1<<get_order_from_difference (diff));
705   send_ibf (eo, get_order_from_difference (diff));
706 }
707
708
709
710 /**
711  * Iterator to send elements to a remote peer
712  *
713  * @param cls closure with the element key and the union operation
714  * @param key ignored
715  * @param value the key entry
716  */
717 static int
718 send_element_iterator (void *cls,
719                        uint32_t key,
720                        void *value)
721 {
722   struct SendElementClosure *sec = cls;
723   struct IBF_Key ibf_key = sec->ibf_key;
724   struct OperationState *eo = sec->eo;
725   struct KeyEntry *ke = value;
726
727   if (ke->ibf_key.key_val != ibf_key.key_val)
728     return GNUNET_YES;
729   while (NULL != ke)
730   {
731     const struct GNUNET_SET_Element *const element = &ke->element->element;
732     struct GNUNET_MQ_Envelope *ev;
733     struct GNUNET_MessageHeader *mh;
734
735     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
736     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
737     if (NULL == ev)
738     {
739       /* element too large */
740       GNUNET_break (0);
741       continue;
742     }
743     memcpy (&mh[1], element->data, element->size);
744     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
745                 GNUNET_h2s (&ke->element->element_hash));
746     GNUNET_MQ_send (eo->mq, ev);
747     ke = ke->next_colliding;
748   }
749   return GNUNET_NO;
750 }
751
752 /**
753  * Send all elements that have the specified IBF key
754  * to the remote peer of the union operation
755  *
756  * @param eo union operation
757  * @param ibf_key IBF key of interest
758  */
759 static void
760 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
761 {
762   struct SendElementClosure send_cls;
763
764   send_cls.ibf_key = ibf_key;
765   send_cls.eo = eo;
766   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
767                                                 &send_element_iterator, &send_cls);
768 }
769
770
771 /**
772  * Decode which elements are missing on each side, and
773  * send the appropriate elemens and requests
774  *
775  * @param eo union operation
776  */
777 static void
778 decode_and_send (struct OperationState *eo)
779 {
780   struct IBF_Key key;
781   struct IBF_Key last_key;
782   int side;
783   unsigned int num_decoded;
784   struct InvertibleBloomFilter *diff_ibf;
785
786   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
787
788   prepare_ibf (eo, eo->remote_ibf->size);
789   diff_ibf = ibf_dup (eo->local_ibf);
790   ibf_subtract (diff_ibf, eo->remote_ibf);
791
792   ibf_destroy (eo->remote_ibf);
793   eo->remote_ibf = NULL;
794
795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
796
797   num_decoded = 0;
798   last_key.key_val = 0;
799
800   while (1)
801   {
802     int res;
803     int cycle_detected = GNUNET_NO;
804
805     last_key = key;
806
807     res = ibf_decode (diff_ibf, &side, &key);
808     if (res == GNUNET_OK)
809     {
810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
811                   key.key_val);
812       num_decoded += 1;
813       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
814       {
815         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
816                     num_decoded, diff_ibf->size);
817         cycle_detected = GNUNET_YES;
818       }
819     }
820     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
821     {
822       int next_order;
823       next_order = 0;
824       while (1<<next_order < diff_ibf->size)
825         next_order++;
826       next_order++;
827       if (next_order <= MAX_IBF_ORDER)
828       {
829         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
830                     "decoding failed, sending larger ibf (size %u)\n",
831                     1<<next_order);
832         send_ibf (eo, next_order);
833       }
834       else
835       {
836         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
837                     "set union failed: reached ibf limit\n");
838       }
839       break;
840     }
841     if (GNUNET_NO == res)
842     {
843       struct GNUNET_MQ_Envelope *ev;
844
845       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
846       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
847       GNUNET_MQ_send (eo->mq, ev);
848       break;
849     }
850     if (1 == side)
851     {
852       send_elements_for_key (eo, key);
853     }
854     else if (-1 == side)
855     {
856       struct GNUNET_MQ_Envelope *ev;
857       struct GNUNET_MessageHeader *msg;
858
859       /* FIXME: before sending the request, check if we may just have the element */
860       /* FIXME: merge multiple requests */
861       /* FIXME: remember somewhere that we already requested the element,
862        * so that we don't request it again with the next ibf if decoding fails */
863       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
864                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
865
866       *(struct IBF_Key *) &msg[1] = key;
867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
868       GNUNET_MQ_send (eo->mq, ev);
869     }
870     else
871     {
872       GNUNET_assert (0);
873     }
874   }
875   ibf_destroy (diff_ibf);
876 }
877
878
879 /**
880  * Handle an IBF message from a remote peer.
881  *
882  * @param cls the union operation
883  * @param mh the header of the message
884  */
885 static void
886 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
887 {
888   struct OperationState *eo = cls;
889   struct IBFMessage *msg = (struct IBFMessage *) mh;
890   unsigned int buckets_in_message;
891
892   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
893        (eo->phase == PHASE_EXPECT_IBF) )
894   {
895     eo->phase = PHASE_EXPECT_IBF_CONT;
896     GNUNET_assert (NULL == eo->remote_ibf);
897     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
898     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
899     eo->ibf_buckets_received = 0;
900     if (0 != ntohs (msg->offset))
901     {
902       GNUNET_break (0);
903       fail_union_operation (eo);
904       return;
905     }
906   }
907   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
908   {
909     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
910          (1<<msg->order != eo->remote_ibf->size) )
911     {
912       GNUNET_break (0);
913       fail_union_operation (eo);
914       return;
915     }
916   }
917
918   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
919
920   if (0 == buckets_in_message)
921   {
922     GNUNET_break_op (0);
923     fail_union_operation (eo);
924     return;
925   }
926
927   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
928   {
929     GNUNET_break (0);
930     fail_union_operation (eo);
931     return;
932   }
933
934   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
935   eo->ibf_buckets_received += buckets_in_message;
936
937   if (eo->ibf_buckets_received == eo->remote_ibf->size)
938   {
939     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
940     eo->phase = PHASE_EXPECT_ELEMENTS;
941     decode_and_send (eo);
942   }
943 }
944
945
946 /**
947  * Send a result message to the client indicating
948  * that there is a new element.
949  *
950  * @param eo union operation
951  * @param element element to send
952  */
953 static void
954 send_client_element (struct OperationState *eo,
955                      struct GNUNET_SET_Element *element)
956 {
957   struct GNUNET_MQ_Envelope *ev;
958   struct GNUNET_SET_ResultMessage *rm;
959
960   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
961   GNUNET_assert (0 != eo->spec->client_request_id);
962   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
963   if (NULL == ev)
964   {
965     GNUNET_MQ_discard (ev);
966     GNUNET_break (0);
967     return;
968   }
969   rm->result_status = htons (GNUNET_SET_STATUS_OK);
970   rm->request_id = htonl (eo->spec->client_request_id);
971   rm->element_type = element->type;
972   memcpy (&rm[1], element->data, element->size);
973   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
974 }
975
976
977 /**
978  * Send a result message to the client indicating
979  * that the operation is over.
980  * After the result done message has been sent to the client,
981  * destroy the evaluate operation.
982  *
983  * @param eo union operation
984  */
985 static void
986 send_client_done_and_destroy (struct OperationState *eo)
987 {
988   struct GNUNET_MQ_Envelope *ev;
989   struct GNUNET_SET_ResultMessage *rm;
990
991   GNUNET_assert (GNUNET_NO == eo->client_done_sent);
992
993   eo->client_done_sent = GNUNET_YES;
994
995   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996   rm->request_id = htonl (eo->spec->client_request_id);
997   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998   rm->element_type = htons (0);
999   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1000
1001   union_operation_destroy (eo);
1002 }
1003
1004
1005 /**
1006  * Handle an element message from a remote peer.
1007  *
1008  * @param cls the union operation
1009  * @param mh the message
1010  */
1011 static void
1012 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1013 {
1014   struct OperationState *eo = cls;
1015   struct ElementEntry *ee;
1016   uint16_t element_size;
1017
1018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1019
1020   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1021        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1022   {
1023     fail_union_operation (eo);
1024     GNUNET_break (0);
1025     return;
1026   }
1027   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1028   ee = GNUNET_malloc (sizeof *ee + element_size);
1029   memcpy (&ee[1], &mh[1], element_size);
1030   ee->element.size = element_size;
1031   ee->element.data = &ee[1];
1032   ee->remote = GNUNET_YES;
1033   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1034
1035   /* FIXME: see if the element has already been inserted! */
1036
1037   op_register_element (eo, ee);
1038   send_client_element (eo, &ee->element);
1039 }
1040
1041
1042 /**
1043  * Handle an element request from a remote peer.
1044  *
1045  * @param cls the union operation
1046  * @param mh the message
1047  */
1048 static void
1049 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1050 {
1051   struct OperationState *eo = cls;
1052   struct IBF_Key *ibf_key;
1053   unsigned int num_keys;
1054
1055   /* look up elements and send them */
1056   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1057   {
1058     GNUNET_break (0);
1059     fail_union_operation (eo);
1060     return;
1061   }
1062
1063   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1064
1065   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1066   {
1067     GNUNET_break (0);
1068     fail_union_operation (eo);
1069     return;
1070   }
1071
1072   ibf_key = (struct IBF_Key *) &mh[1];
1073   while (0 != num_keys--)
1074   {
1075     send_elements_for_key (eo, *ibf_key);
1076     ibf_key++;
1077   }
1078 }
1079
1080
1081 /**
1082  * Handle a done message from a remote peer
1083  *
1084  * @param cls the union operation
1085  * @param mh the message
1086  */
1087 static void
1088 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1089 {
1090   struct OperationState *eo = cls;
1091   struct GNUNET_MQ_Envelope *ev;
1092
1093   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1094   {
1095     /* we got all requests, but still have to send our elements as response */
1096
1097     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1098     eo->phase = PHASE_FINISHED;
1099     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1100     GNUNET_MQ_send (eo->mq, ev);
1101     return;
1102   }
1103   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1104   {
1105     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1106     eo->phase = PHASE_FINISHED;
1107     send_client_done_and_destroy (eo);
1108     return;
1109   }
1110   GNUNET_break (0);
1111   fail_union_operation (eo);
1112 }
1113
1114
1115 /**
1116  * Evaluate a union operation with
1117  * a remote peer.
1118  *
1119  * @param spec specification of the operation the evaluate
1120  * @param tunnel tunnel already connected to the partner peer
1121  * @param tc tunnel context, passed here so all new incoming
1122  *        messages are directly going to the union operations
1123  * @return a handle to the operation
1124  */
1125 static void
1126 union_evaluate (struct OperationSpecification *spec,
1127                 struct GNUNET_MESH_Tunnel *tunnel,
1128                 struct TunnelContext *tc)
1129 {
1130   struct OperationState *eo;
1131
1132   eo = GNUNET_new (struct OperationState);
1133   tc->vt = _GSS_union_vt ();
1134   tc->op = eo;
1135   eo->se = strata_estimator_dup (spec->set->state->se);
1136   eo->generation_created = spec->set->current_generation++;
1137   eo->set = spec->set;
1138   eo->spec = spec;
1139   eo->tunnel = tunnel;
1140   eo->mq = GNUNET_MESH_mq_create (tunnel);
1141
1142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143               "evaluating union operation, (app %s)\n",
1144               GNUNET_h2s (&eo->spec->app_id));
1145
1146   /* we started the operation, thus we have to send the operation request */
1147   eo->phase = PHASE_EXPECT_SE;
1148
1149   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1150                                eo->set->state->ops_tail,
1151                                eo);
1152
1153   send_operation_request (eo);
1154 }
1155
1156
1157 /**
1158  * Accept an union operation request from a remote peer
1159  *
1160  * @param spec all necessary information about the operation
1161  * @param tunnel open tunnel to the partner's peer
1162  * @param tc tunnel context, passed here so all new incoming
1163  *        messages are directly going to the union operations
1164  * @return operation
1165  */
1166 static void
1167 union_accept (struct OperationSpecification *spec,
1168               struct GNUNET_MESH_Tunnel *tunnel,
1169               struct TunnelContext *tc)
1170 {
1171   struct OperationState *eo;
1172
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1174
1175   eo = GNUNET_new (struct OperationState);
1176   tc->vt = _GSS_union_vt ();
1177   tc->op = eo;
1178   eo->set = spec->set;
1179   eo->generation_created = eo->set->current_generation++;
1180   eo->spec = spec;
1181   eo->tunnel = tunnel;
1182   eo->mq = GNUNET_MESH_mq_create (tunnel);
1183   eo->se = strata_estimator_dup (eo->set->state->se);
1184   /* transfer ownership of mq and socket from incoming to eo */
1185   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1186                                eo->set->state->ops_tail,
1187                                eo);
1188   /* kick off the operation */
1189   send_strata_estimator (eo);
1190 }
1191
1192
1193 /**
1194  * Create a new set supporting the union operation
1195  *
1196  * @return the newly created set
1197  */
1198 static struct SetState *
1199 union_set_create (void)
1200 {
1201   struct SetState *set_state;
1202
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1204
1205   set_state = GNUNET_new (struct SetState);
1206   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1207                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1208   return set_state;
1209 }
1210
1211
1212 /**
1213  * Add the element from the given element message to the set.
1214  *
1215  * @param set_state state of the set want to add to
1216  * @param ee the element to add to the set
1217  */
1218 static void
1219 union_add (struct SetState *set_state, struct ElementEntry *ee)
1220 {
1221   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1222 }
1223
1224
1225 /**
1226  * Destroy a set that supports the union operation
1227  *
1228  * @param set_state the set to destroy
1229  */
1230 static void
1231 union_set_destroy (struct SetState *set_state)
1232 {
1233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1234   /* important to destroy operations before the rest of the set */
1235   while (NULL != set_state->ops_head)
1236     union_operation_destroy (set_state->ops_head);
1237   if (NULL != set_state->se)
1238   {
1239     strata_estimator_destroy (set_state->se);
1240     set_state->se = NULL;
1241   }
1242   GNUNET_free (set_state);
1243 }
1244
1245
1246 /**
1247  * Remove the element given in the element message from the set.
1248  * Only marks the element as removed, so that older set operations can still exchange it.
1249  *
1250  * @param set_state state of the set to remove from
1251  * @param element set element to remove
1252  */
1253 static void
1254 union_remove (struct SetState *set_state, struct ElementEntry *element)
1255 {
1256   /* FIXME: remove from strata estimator */
1257 }
1258
1259
1260 /**
1261  * Dispatch messages for a union operation.
1262  *
1263  * @param eo the state of the union evaluate operation
1264  * @param mh the received message
1265  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1266  *         GNUNET_OK otherwise
1267  */
1268 int
1269 union_handle_p2p_message (struct OperationState *eo,
1270                           const struct GNUNET_MessageHeader *mh)
1271 {
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1273               ntohs (mh->type), ntohs (mh->size));
1274   switch (ntohs (mh->type))
1275   {
1276     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1277       handle_p2p_ibf (eo, mh);
1278       break;
1279     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1280       handle_p2p_strata_estimator (eo, mh);
1281       break;
1282     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1283       handle_p2p_elements (eo, mh);
1284       break;
1285     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1286       handle_p2p_element_requests (eo, mh);
1287       break;
1288     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1289       handle_p2p_done (eo, mh);
1290       break;
1291     default:
1292       /* something wrong with mesh's message handlers? */
1293       GNUNET_assert (0);
1294   }
1295   return GNUNET_OK;
1296 }
1297
1298
1299 static void
1300 union_peer_disconnect (struct OperationState *op)
1301 {
1302   /* Are we already disconnected? */
1303   if (NULL == op->tunnel)
1304     return;
1305   op->tunnel = NULL;
1306   if (NULL != op->mq)
1307   {
1308     GNUNET_MQ_destroy (op->mq);
1309     op->mq = NULL;
1310   }
1311   if (PHASE_FINISHED != op->phase)
1312   {
1313     struct GNUNET_MQ_Envelope *ev;
1314     struct GNUNET_SET_ResultMessage *msg;
1315
1316     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1317     msg->request_id = htonl (op->spec->client_request_id);
1318     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1319     msg->element_type = htons (0);
1320     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1321     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1322     union_operation_destroy (op);
1323     return;
1324   }
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1326   if (GNUNET_NO == op->client_done_sent)
1327     send_client_done_and_destroy (op);
1328 }
1329
1330
1331 static void
1332 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1333 {
1334   /* FIXME: implement */
1335 }
1336
1337
1338 const struct SetVT *
1339 _GSS_union_vt ()
1340 {
1341   static const struct SetVT union_vt = {
1342     .create = &union_set_create,
1343     .msg_handler = &union_handle_p2p_message,
1344     .add = &union_add,
1345     .remove = &union_remove,
1346     .destroy_set = &union_set_destroy,
1347     .evaluate = &union_evaluate,
1348     .accept = &union_accept,
1349     .peer_disconnect = &union_peer_disconnect,
1350     .cancel = &union_op_cancel,
1351   };
1352
1353   return &union_vt;
1354 }