8a183391e01f75401d4a42e4a253f44713c9f7bf
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation.
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct OperationState
104 {
105   /**
106    * Tunnel to the remote peer.
107    */
108   struct GNUNET_MESH_Tunnel *tunnel;
109
110   /**
111    * Detail information about the set operation,
112    * including the set to use.
113    */
114   struct OperationSpecification *spec;
115
116   /**
117    * Message queue for the peer.
118    */
119   struct GNUNET_MQ_Handle *mq;
120
121   /**
122    * Number of ibf buckets received
123    */
124   unsigned int ibf_buckets_received;
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    */
145   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
146
147   /**
148    * Current state of the operation.
149    */
150   enum UnionOperationPhase phase;
151
152   /**
153    * Generation in which the operation handle
154    * was created.
155    */
156   unsigned int generation_created;
157
158   /**
159    * Set state of the set that this operation
160    * belongs to.
161    */
162   struct SetState *set_state;
163   
164   /**
165    * Evaluate operations are held in
166    * a linked list.
167    */
168   struct OperationState *next;
169   
170    /**
171     * Evaluate operations are held in
172     * a linked list.
173     */
174   struct OperationState *prev;
175 };
176
177
178 /**
179  * Information about an element element in the set.
180  * All elements are stored in a hash-table
181  * from their hash-code to their 'struct Element',
182  * so that the remove and add operations are reasonably
183  * fast.
184  */
185 struct ElementEntry
186 {
187   /**
188    * The actual element. The data for the element
189    * should be allocated at the end of this struct.
190    */
191   struct GNUNET_SET_Element element;
192
193   /**
194    * Hash of the element.
195    * Will be used to derive the different IBF keys
196    * for different salts.
197    */
198   struct GNUNET_HashCode element_hash;
199
200   /**
201    * Generation the element was added by the client.
202    * Operations of earlier generations will not consider the element.
203    */
204   unsigned int generation_added;
205
206   /**
207    * GNUNET_YES if the element has been removed in some generation.
208    */
209   int removed;
210
211   /**
212    * Generation the element was removed by the client. 
213    * Operations of later generations will not consider the element.
214    * Only valid if is_removed is GNUNET_YES.
215    */
216   unsigned int generation_removed;
217
218   /**
219    * GNUNET_YES if the element is a remote element, and does not belong
220    * to the operation's set.
221    */
222   int remote;
223 };
224
225
226 /**
227  * The key entry is used to associate an ibf key with
228  * an element.
229  */
230 struct KeyEntry
231 {
232   /**
233    * IBF key for the entry, derived from the current salt.
234    */
235   struct IBF_Key ibf_key;
236
237   /**
238    * The actual element associated with the key
239    */
240   struct ElementEntry *element;
241
242   /**
243    * Element that collides with this element
244    * on the ibf key
245    */
246   struct KeyEntry *next_colliding;
247 };
248
249
250 /**
251  * Used as a closure for sending elements
252  * with a specific IBF key.
253  */
254 struct SendElementClosure
255 {
256   /**
257    * The IBF key whose matching elements should be
258    * sent.
259    */
260   struct IBF_Key ibf_key;
261
262   /**
263    * Operation for which the elements
264    * should be sent.
265    */
266   struct OperationState *eo;
267 };
268
269
270 /**
271  * Extra state required for efficient set union.
272  */
273 struct SetState
274 {
275   /**
276    * The strata estimator is only generated once for
277    * each set.
278    * The IBF keys are derived from the element hashes with
279    * salt=0.
280    */
281   struct StrataEstimator *se;
282
283   /**
284    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
285    */
286   struct GNUNET_CONTAINER_MultiHashMap *elements;
287
288   /**
289    * Evaluate operations are held in
290    * a linked list.
291    */
292   struct OperationState *ops_head;
293
294   /**
295    * Evaluate operations are held in
296    * a linked list.
297    */
298   struct OperationState *ops_tail;
299
300   /**
301    * Current generation, that is, number of
302    * previously executed operations on this set
303    */
304   unsigned int current_generation;
305 };
306
307
308
309 /**
310  * Iterator over hash map entries.
311  *
312  * @param cls closure
313  * @param key current key code
314  * @param value value in the hash map
315  * @return GNUNET_YES if we should continue to
316  *         iterate,
317  *         GNUNET_NO if not.
318  */
319 static int
320 destroy_elements_iterator (void *cls,
321                            const struct GNUNET_HashCode * key,
322                            void *value)
323 {
324   struct ElementEntry *ee = value;
325
326   GNUNET_free (ee);
327   return GNUNET_YES;
328 }
329
330
331 /**
332  * Iterator over hash map entries.
333  *
334  * @param cls closure
335  * @param key current key code
336  * @param value value in the hash map
337  * @return GNUNET_YES if we should continue to
338  *         iterate,
339  *         GNUNET_NO if not.
340  */
341 static int
342 destroy_key_to_element_iter (void *cls,
343                              uint32_t key,
344                              void *value)
345 {
346   struct KeyEntry *k = value;
347   
348   while (NULL != k)
349   {
350     struct KeyEntry *k_tmp = k;
351     k = k->next_colliding;
352     if (GNUNET_YES == k_tmp->element->remote)
353     {
354       GNUNET_free (k_tmp->element);
355       k_tmp->element = NULL;
356     }
357     GNUNET_free (k_tmp);
358   }
359   return GNUNET_YES;
360 }
361
362
363 /**
364  * Destroy a union operation, and free all resources
365  * associated with it.
366  *
367  * @param eo the union operation to destroy
368  */
369 static void
370 union_operation_destroy (struct OperationState *eo)
371 {
372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
373   GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
374                                eo->set_state->ops_tail,
375                                eo);
376   if (NULL != eo->mq)
377   {
378     GNUNET_MQ_destroy (eo->mq);
379     eo->mq = NULL;
380   }
381   if (NULL != eo->tunnel)
382   {
383     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
384     eo->tunnel = NULL;
385     GNUNET_MESH_tunnel_destroy (t);
386   }
387   if (NULL != eo->remote_ibf)
388   {
389     ibf_destroy (eo->remote_ibf);
390     eo->remote_ibf = NULL;
391   }
392   if (NULL != eo->local_ibf)
393   {
394     ibf_destroy (eo->local_ibf);
395     eo->local_ibf = NULL;
396   }
397   if (NULL != eo->se)
398   {
399     strata_estimator_destroy (eo->se);
400     eo->se = NULL;
401   }
402   if (NULL != eo->key_to_element)
403   {
404     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
405     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
406     eo->key_to_element = NULL;
407   }
408   if (NULL != eo->spec)
409   {
410     if (NULL != eo->spec->context_msg)
411     {
412       GNUNET_free (eo->spec->context_msg);
413       eo->spec->context_msg = NULL;
414     }
415     GNUNET_free (eo->spec);
416     eo->spec = NULL;
417   }
418   GNUNET_free (eo);
419
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
421
422   /* FIXME: do a garbage collection of the set generations */
423 }
424
425
426 /**
427  * Inform the client that the union operation has failed,
428  * and proceed to destroy the evaluate operation.
429  *
430  * @param eo the union operation to fail
431  */
432 static void
433 fail_union_operation (struct OperationState *eo)
434 {
435   struct GNUNET_MQ_Envelope *ev;
436   struct GNUNET_SET_ResultMessage *msg;
437
438   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
439   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
440   msg->request_id = htonl (eo->spec->client_request_id);
441   msg->element_type = htons (0);
442   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
443   union_operation_destroy (eo);
444 }
445
446
447 /**
448  * Derive the IBF key from a hash code and 
449  * a salt.
450  *
451  * @param src the hash code
452  * @param salt salt to use
453  * @return the derived IBF key
454  */
455 static struct IBF_Key
456 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
457 {
458   struct IBF_Key key;
459
460   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
461                       GCRY_MD_SHA512, GCRY_MD_SHA256,
462                       src, sizeof *src,
463                       &salt, sizeof (salt),
464                       NULL, 0);
465   return key;
466 }
467
468
469 /**
470  * Send a request for the evaluate operation to a remote peer
471  *
472  * @param eo operation with the other peer
473  */
474 static void
475 send_operation_request (struct OperationState *eo)
476 {
477   struct GNUNET_MQ_Envelope *ev;
478   struct OperationRequestMessage *msg;
479
480   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
481                                 eo->spec->context_msg);
482
483   if (NULL == ev)
484   {
485     /* the context message is too large */
486     GNUNET_break (0);
487     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
488     return;
489   }
490   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
491   msg->app_id = eo->spec->app_id;
492   msg->salt = htonl (eo->spec->salt);
493   GNUNET_MQ_send (eo->mq, ev);
494
495   if (NULL != eo->spec->context_msg)
496   {
497     GNUNET_free (eo->spec->context_msg);
498     eo->spec->context_msg = NULL;
499   }
500
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
502 }
503
504
505 /**
506  * Iterator to create the mapping between ibf keys
507  * and element entries.
508  *
509  * @param cls closure
510  * @param key current key code
511  * @param value value in the hash map
512  * @return GNUNET_YES if we should continue to
513  *         iterate,
514  *         GNUNET_NO if not.
515  */
516 static int
517 insert_element_iterator (void *cls,
518                          uint32_t key,
519                          void *value)
520 {
521   struct KeyEntry *const new_k = cls;
522   struct KeyEntry *old_k = value;
523
524   GNUNET_assert (NULL != old_k);
525   do
526   {
527     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
528     {
529       new_k->next_colliding = old_k->next_colliding;
530       old_k->next_colliding = new_k;
531       return GNUNET_NO;
532     }
533     old_k = old_k->next_colliding;
534   } while (NULL != old_k);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Insert an element into the union operation's
541  * key-to-element mapping. Takes ownership of 'ee'.
542  *
543  * @param eo the union operation
544  * @param ee the element entry
545  */
546 static void
547 insert_element (struct OperationState *eo, struct ElementEntry *ee)
548 {
549   int ret;
550   struct IBF_Key ibf_key;
551   struct KeyEntry *k;
552
553   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
554   k = GNUNET_new (struct KeyEntry);
555   k->element = ee;
556   k->ibf_key = ibf_key;
557   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
558                                                       (uint32_t) ibf_key.key_val,
559                                                       insert_element_iterator, k);
560
561   /* was the element inserted into a colliding bucket? */
562   if (GNUNET_SYSERR == ret)
563     return;
564
565   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
566                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
567 }
568
569
570 /**
571  * Insert a key into an ibf.
572  *
573  * @param cls the ibf
574  * @param key unused
575  * @param value the key entry to get the key from
576  */
577 static int
578 prepare_ibf_iterator (void *cls,
579                       uint32_t key,
580                       void *value)
581 {
582   struct InvertibleBloomFilter *ibf = cls;
583   struct KeyEntry *ke = value;
584
585   ibf_insert (ibf, ke->ibf_key);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Iterator for initializing the
592  * key-to-element mapping of a union operation
593  *
594  * @param cls the union operation
595  * @param key unised
596  * @param value the element entry to insert
597  *        into the key-to-element mapping
598  */
599 static int
600 init_key_to_element_iterator (void *cls,
601                               const struct GNUNET_HashCode *key,
602                               void *value)
603 {
604   struct OperationState *eo = cls;
605   struct ElementEntry *e = value;
606
607   /* make sure that the element belongs to the set at the time
608    * of creating the operation */
609   if ( (e->generation_added > eo->generation_created) ||
610        ( (GNUNET_YES == e->removed) &&
611          (e->generation_removed < eo->generation_created)))
612     return GNUNET_YES;
613
614   e->remote = GNUNET_NO;
615
616   insert_element (eo, e);
617   return GNUNET_YES;
618 }
619
620
621 /**
622  * Create an ibf with the operation's elements
623  * of the specified size
624  *
625  * @param eo the union operation
626  * @param size size of the ibf to create
627  */
628 static void
629 prepare_ibf (struct OperationState *eo, uint16_t size)
630 {
631   if (NULL == eo->key_to_element)
632   {
633     unsigned int len;
634     len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
635     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
636     GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
637                                            init_key_to_element_iterator, eo);
638   }
639   if (NULL != eo->local_ibf)
640     ibf_destroy (eo->local_ibf);
641   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
642   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
643                                            prepare_ibf_iterator, eo->local_ibf);
644 }
645
646
647 /**
648  * Send an ibf of appropriate size.
649  *
650  * @param eo the union operation
651  * @param ibf_order order of the ibf to send, size=2^order
652  */
653 static void
654 send_ibf (struct OperationState *eo, uint16_t ibf_order)
655 {
656   unsigned int buckets_sent = 0;
657   struct InvertibleBloomFilter *ibf;
658
659   prepare_ibf (eo, 1<<ibf_order);
660
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
662
663   ibf = eo->local_ibf;
664
665   while (buckets_sent < (1 << ibf_order))
666   {
667     unsigned int buckets_in_message;
668     struct GNUNET_MQ_Envelope *ev;
669     struct IBFMessage *msg;
670
671     buckets_in_message = (1 << ibf_order) - buckets_sent;
672     /* limit to maximum */
673     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
674       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
675
676     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
677                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
678     msg->reserved = 0;
679     msg->order = ibf_order;
680     msg->offset = htons (buckets_sent);
681     ibf_write_slice (ibf, buckets_sent,
682                      buckets_in_message, &msg[1]);
683     buckets_sent += buckets_in_message;
684     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
685                 buckets_in_message, buckets_sent, 1<<ibf_order);
686     GNUNET_MQ_send (eo->mq, ev);
687   }
688
689   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
690 }
691
692
693 /**
694  * Send a strata estimator to the remote peer.
695  *
696  * @param eo the union operation with the remote peer
697  */
698 static void
699 send_strata_estimator (struct OperationState *eo)
700 {
701   struct GNUNET_MQ_Envelope *ev;
702   struct GNUNET_MessageHeader *strata_msg;
703
704   ev = GNUNET_MQ_msg_header_extra (strata_msg,
705                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
706                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
707   strata_estimator_write (eo->set_state->se, &strata_msg[1]);
708   GNUNET_MQ_send (eo->mq, ev);
709   eo->phase = PHASE_EXPECT_IBF;
710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
711 }
712
713
714 /**
715  * Compute the necessary order of an ibf
716  * from the size of the symmetric set difference.
717  *
718  * @param diff the difference
719  * @return the required size of the ibf
720  */
721 static unsigned int
722 get_order_from_difference (unsigned int diff)
723 {
724   unsigned int ibf_order;
725
726   ibf_order = 2;
727   while ((1<<ibf_order) < (2 * diff))
728     ibf_order++;
729   if (ibf_order > MAX_IBF_ORDER)
730     ibf_order = MAX_IBF_ORDER;
731   return ibf_order;
732 }
733
734
735 /**
736  * Handle a strata estimator from a remote peer
737  *
738  * @param cls the union operation
739  * @param mh the message
740  */
741 static void
742 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
743 {
744   struct OperationState *eo = cls;
745   struct StrataEstimator *remote_se;
746   int diff;
747
748   if (eo->phase != PHASE_EXPECT_SE)
749   {
750     fail_union_operation (eo);
751     GNUNET_break (0);
752     return;
753   }
754   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
755                                        SE_IBF_HASH_NUM);
756   strata_estimator_read (&mh[1], remote_se);
757   GNUNET_assert (NULL != eo->se);
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, calculating diff\n");
759   diff = strata_estimator_difference (remote_se, eo->se);
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "se diff=%d\n", diff);
761   strata_estimator_destroy (remote_se);
762   strata_estimator_destroy (eo->se);
763   eo->se = NULL;
764   send_ibf (eo, get_order_from_difference (diff));
765 }
766
767
768
769 /**
770  * Iterator to send elements to a remote peer
771  *
772  * @param cls closure with the element key and the union operation
773  * @param key ignored
774  * @param value the key entry
775  */
776 static int
777 send_element_iterator (void *cls,
778                        uint32_t key,
779                        void *value)
780 {
781   struct SendElementClosure *sec = cls;
782   struct IBF_Key ibf_key = sec->ibf_key;
783   struct OperationState *eo = sec->eo;
784   struct KeyEntry *ke = value;
785
786   if (ke->ibf_key.key_val != ibf_key.key_val)
787     return GNUNET_YES;
788   while (NULL != ke)
789   {
790     const struct GNUNET_SET_Element *const element = &ke->element->element;
791     struct GNUNET_MQ_Envelope *ev;
792     struct GNUNET_MessageHeader *mh;
793
794     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
795     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
796     if (NULL == ev)
797     {
798       /* element too large */
799       GNUNET_break (0);
800       continue;
801     }
802     memcpy (&mh[1], element->data, element->size);
803     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to peer\n");
804     GNUNET_MQ_send (eo->mq, ev);
805     ke = ke->next_colliding;
806   }
807   return GNUNET_NO;
808 }
809
810 /**
811  * Send all elements that have the specified IBF key
812  * to the remote peer of the union operation
813  *
814  * @param eo union operation
815  * @param ibf_key IBF key of interest
816  */
817 static void
818 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
819 {
820   struct SendElementClosure send_cls;
821
822   send_cls.ibf_key = ibf_key;
823   send_cls.eo = eo;
824   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
825                                                 &send_element_iterator, &send_cls);
826 }
827
828
829 /**
830  * Decode which elements are missing on each side, and
831  * send the appropriate elemens and requests
832  *
833  * @param eo union operation
834  */
835 static void
836 decode_and_send (struct OperationState *eo)
837 {
838   struct IBF_Key key;
839   int side;
840   unsigned int num_decoded;
841   struct InvertibleBloomFilter *diff_ibf;
842
843   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
844
845   prepare_ibf (eo, eo->remote_ibf->size);
846   diff_ibf = ibf_dup (eo->local_ibf);
847   ibf_subtract (diff_ibf, eo->remote_ibf);
848   
849   ibf_destroy (eo->remote_ibf);
850   eo->remote_ibf = NULL;
851
852   num_decoded = 0;
853
854   while (1)
855   {
856     int res;
857
858     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF for elements\n");
859     res = ibf_decode (diff_ibf, &side, &key);
860     num_decoded += 1;
861     if (num_decoded > diff_ibf->size)
862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf\n");
863     if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size))
864     {
865       int next_order;
866       next_order = 0;
867       while (1<<next_order < diff_ibf->size)
868         next_order++;
869       next_order++;
870       if (next_order <= MAX_IBF_ORDER)
871       {
872         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
873                     "decoding failed, sending larger ibf (size %u)\n",
874                     1<<next_order);
875         send_ibf (eo, next_order);
876       }
877       else
878       {
879         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
880                     "set union failed: reached ibf limit\n");
881       }
882       break;
883     }
884     if (GNUNET_NO == res)
885     {
886       struct GNUNET_MQ_Envelope *ev;
887
888       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
889       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
890       GNUNET_MQ_send (eo->mq, ev);
891       break;
892     }
893     if (1 == side)
894     {
895       send_elements_for_key (eo, key);
896     }
897     else
898     {
899       struct GNUNET_MQ_Envelope *ev;
900       struct GNUNET_MessageHeader *msg;
901
902       /* FIXME: before sending the request, check if we may just have the element */
903       /* FIXME: merge multiple requests */
904       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
905                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
906       *(struct IBF_Key *) &msg[1] = key;
907       GNUNET_MQ_send (eo->mq, ev);
908     }
909   }
910   ibf_destroy (diff_ibf);
911 }
912
913
914 /**
915  * Handle an IBF message from a remote peer.
916  *
917  * @param cls the union operation
918  * @param mh the header of the message
919  */
920 static void
921 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
922 {
923   struct OperationState *eo = cls;
924   struct IBFMessage *msg = (struct IBFMessage *) mh;
925   unsigned int buckets_in_message;
926
927   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
928        (eo->phase == PHASE_EXPECT_IBF) )
929   {
930     eo->phase = PHASE_EXPECT_IBF_CONT;
931     GNUNET_assert (NULL == eo->remote_ibf);
932     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
933     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
934     eo->ibf_buckets_received = 0;
935     if (0 != ntohs (msg->offset))
936     {
937       GNUNET_break (0);
938       fail_union_operation (eo);
939       return;
940     }
941   }
942   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
943   {
944     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
945          (1<<msg->order != eo->remote_ibf->size) )
946     {
947       GNUNET_break (0);
948       fail_union_operation (eo);
949       return;
950     }
951   }
952
953   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
954
955   if (0 == buckets_in_message)
956   {
957     GNUNET_break_op (0);
958     fail_union_operation (eo);
959     return;
960   }
961
962   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
963   {
964     GNUNET_break (0);
965     fail_union_operation (eo);
966     return;
967   }
968   
969   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
970   eo->ibf_buckets_received += buckets_in_message;
971
972   if (eo->ibf_buckets_received == eo->remote_ibf->size)
973   {
974
975     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
976     eo->phase = PHASE_EXPECT_ELEMENTS;
977     decode_and_send (eo);
978   }
979 }
980
981
982 /**
983  * Send a result message to the client indicating
984  * that there is a new element.
985  *
986  * @param eo union operation
987  * @param element element to send
988  */
989 static void
990 send_client_element (struct OperationState *eo,
991                      struct GNUNET_SET_Element *element)
992 {
993   struct GNUNET_MQ_Envelope *ev;
994   struct GNUNET_SET_ResultMessage *rm;
995
996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size);
997   GNUNET_assert (0 != eo->spec->client_request_id);
998   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
999   if (NULL == ev)
1000   {
1001     GNUNET_MQ_discard (ev);
1002     GNUNET_break (0);
1003     return;
1004   }
1005   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1006   rm->request_id = htonl (eo->spec->client_request_id);
1007   rm->element_type = element->type;
1008   memcpy (&rm[1], element->data, element->size);
1009   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1010 }
1011
1012
1013 /**
1014  * Send a result message to the client indicating
1015  * that the operation is over.
1016  * After the result done message has been sent to the client,
1017  * destroy the evaluate operation.
1018  *
1019  * @param eo union operation
1020  */
1021 static void
1022 send_client_done_and_destroy (struct OperationState *eo)
1023 {
1024   struct GNUNET_MQ_Envelope *ev;
1025   struct GNUNET_SET_ResultMessage *rm;
1026
1027   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1028   rm->request_id = htonl (eo->spec->client_request_id);
1029   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1030   rm->element_type = htons (0);
1031   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1032
1033 }
1034
1035
1036 /**
1037  * Handle an element message from a remote peer.
1038  *
1039  * @param cls the union operation
1040  * @param mh the message
1041  */
1042 static void
1043 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1044 {
1045   struct OperationState *eo = cls;
1046   struct ElementEntry *ee;
1047   uint16_t element_size;
1048
1049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1050
1051   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1052        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1053   {
1054     fail_union_operation (eo);
1055     GNUNET_break (0);
1056     return;
1057   }
1058   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1059   ee = GNUNET_malloc (sizeof *eo + element_size);
1060   memcpy (&ee[1], &mh[1], element_size);
1061   ee->element.size = element_size;
1062   ee->element.data = &ee[1];
1063   ee->remote = GNUNET_YES;
1064
1065   insert_element (eo, ee);
1066   send_client_element (eo, &ee->element);
1067 }
1068
1069
1070 /**
1071  * Handle an element request from a remote peer.
1072  *
1073  * @param cls the union operation
1074  * @param mh the message
1075  */
1076 static void
1077 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1078 {
1079   struct OperationState *eo = cls;
1080   struct IBF_Key *ibf_key;
1081   unsigned int num_keys;
1082
1083   /* look up elements and send them */
1084   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1085   {
1086     GNUNET_break (0);
1087     fail_union_operation (eo);
1088     return;
1089   }
1090
1091   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1092
1093   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1094   {
1095     GNUNET_break (0);
1096     fail_union_operation (eo);
1097     return;
1098   }
1099
1100   ibf_key = (struct IBF_Key *) &mh[1];
1101   while (0 != num_keys--)
1102   {
1103     send_elements_for_key (eo, *ibf_key);
1104     ibf_key++;
1105   }
1106 }
1107
1108
1109 /**
1110  * Callback used for notifications
1111  *
1112  * @param cls closure
1113  */
1114 static void
1115 peer_done_sent_cb (void *cls)
1116 {
1117   struct OperationState *eo = cls;
1118
1119   send_client_done_and_destroy (eo);
1120 }
1121
1122
1123 /**
1124  * Handle a done message from a remote peer
1125  * 
1126  * @param cls the union operation
1127  * @param mh the message
1128  */
1129 static void
1130 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1131 {
1132   struct OperationState *eo = cls;
1133
1134   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1135   {
1136     /* we got all requests, but still have to send our elements as response */
1137     struct GNUNET_MQ_Envelope *ev;
1138
1139     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1140     eo->phase = PHASE_FINISHED;
1141     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1142     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1143     GNUNET_MQ_send (eo->mq, ev);
1144     return;
1145   }
1146   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1147   {
1148     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1149     eo->phase = PHASE_FINISHED;
1150     send_client_done_and_destroy (eo);
1151     return;
1152   }
1153   GNUNET_break (0);
1154   fail_union_operation (eo);
1155 }
1156
1157
1158 /**
1159  * Evaluate a union operation with
1160  * a remote peer.
1161  *
1162  * @param spec specification of the operation the evaluate
1163  * @param tunnel tunnel already connected to the partner peer
1164  * @param tc tunnel context, passed here so all new incoming
1165  *        messages are directly going to the union operations
1166  * @return a handle to the operation
1167  */
1168 static void
1169 union_evaluate (struct OperationSpecification *spec,
1170                 struct GNUNET_MESH_Tunnel *tunnel,
1171                 struct TunnelContext *tc)
1172 {
1173   struct OperationState *eo;
1174
1175   eo = GNUNET_new (struct OperationState);
1176   tc->vt = _GSS_union_vt ();
1177   tc->op = eo;
1178   eo->se = strata_estimator_dup (spec->set->state->se);
1179   eo->set_state = spec->set->state;
1180   eo->spec = spec;
1181   eo->tunnel = tunnel;
1182   eo->mq = GNUNET_MESH_mq_create (tunnel);
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1185               "evaluating union operation, (app %s)\n", 
1186               GNUNET_h2s (&eo->spec->app_id));
1187
1188   /* we started the operation, thus we have to send the operation request */
1189   eo->phase = PHASE_EXPECT_SE;
1190
1191   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1192                                eo->set_state->ops_tail,
1193                                eo);
1194
1195   send_operation_request (eo);
1196 }
1197
1198
1199 /**
1200  * Accept an union operation request from a remote peer
1201  *
1202  * @param spec all necessary information about the operation
1203  * @param tunnel open tunnel to the partner's peer
1204  * @param tc tunnel context, passed here so all new incoming
1205  *        messages are directly going to the union operations
1206  * @return operation
1207  */
1208 static void
1209 union_accept (struct OperationSpecification *spec,
1210               struct GNUNET_MESH_Tunnel *tunnel,
1211               struct TunnelContext *tc)
1212 {
1213   struct OperationState *eo;
1214
1215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1216
1217   eo = GNUNET_new (struct OperationState);
1218   tc->vt = _GSS_union_vt ();
1219   tc->op = eo;
1220   eo->set_state = spec->set->state;
1221   eo->generation_created = eo->set_state->current_generation++;
1222   eo->spec = spec;
1223   eo->tunnel = tunnel;
1224   eo->mq = GNUNET_MESH_mq_create (tunnel);
1225   eo->se = strata_estimator_dup (eo->set_state->se);
1226   /* transfer ownership of mq and socket from incoming to eo */
1227   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1228                                eo->set_state->ops_tail,
1229                                eo);
1230   /* kick off the operation */
1231   send_strata_estimator (eo);
1232 }
1233
1234
1235 /**
1236  * Create a new set supporting the union operation
1237  *
1238  * @return the newly created set
1239  */
1240 static struct SetState *
1241 union_set_create (void)
1242 {
1243   struct SetState *set_state;
1244
1245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1246   
1247   set_state = GNUNET_new (struct SetState);
1248   /* keys of the hash map are stored in the element entrys, thus we do not
1249    * want the hash map to copy them */
1250   set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1251   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1252                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1253   return set_state;
1254 }
1255
1256
1257 /**
1258  * Add the element from the given element message to the set.
1259  *
1260  * @param set_state state of the set want to add to
1261  * @param element the element to add to the set
1262  */
1263 static void
1264 union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1265 {
1266   struct ElementEntry *ee;
1267   struct ElementEntry *ee_dup;
1268
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
1270
1271   ee = GNUNET_malloc (element->size + sizeof *ee);
1272   ee->element.size = element->size;
1273   memcpy (&ee[1], element->data, element->size);
1274   ee->element.data = &ee[1];
1275   ee->generation_added = set_state->current_generation;
1276   ee->remote = GNUNET_NO;
1277   GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
1278   ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
1279                                               &ee->element_hash);
1280   if (NULL != ee_dup)
1281   {
1282     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1283     GNUNET_free (ee);
1284     return;
1285   }
1286   GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
1287                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1288   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1289 }
1290
1291
1292 /**
1293  * Destroy a set that supports the union operation
1294  *
1295  * @param set_state the set to destroy
1296  */
1297 static void
1298 union_set_destroy (struct SetState *set_state)
1299 {
1300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1301   /* important to destroy operations before the rest of the set */
1302   while (NULL != set_state->ops_head)
1303     union_operation_destroy (set_state->ops_head);
1304   if (NULL != set_state->se)
1305   {
1306     strata_estimator_destroy (set_state->se);
1307     set_state->se = NULL;
1308   }
1309   if (NULL != set_state->elements)
1310   {
1311     GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
1312                                            destroy_elements_iterator, NULL);
1313     GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
1314     set_state->elements = NULL;
1315   }
1316
1317   GNUNET_free (set_state);
1318 }
1319
1320 /**
1321  * Remove the element given in the element message from the set.
1322  * Only marks the element as removed, so that older set operations can still exchange it.
1323  *
1324  * @param set_state state of the set to remove from
1325  * @param element set element to remove
1326  */
1327 static void
1328 union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1329 {
1330   struct GNUNET_HashCode hash;
1331   struct ElementEntry *ee;
1332
1333   GNUNET_CRYPTO_hash (element->data, element->size, &hash);
1334   ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
1335   if (NULL == ee)
1336   {
1337     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1338     return;
1339   }
1340   if (GNUNET_YES == ee->removed)
1341   {
1342     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1343     return;
1344   }
1345   ee->removed = GNUNET_YES;
1346   ee->generation_removed = set_state->current_generation;
1347 }
1348
1349
1350 /**
1351  * Dispatch messages for a union operation.
1352  *
1353  * @param eo the state of the union evaluate operation
1354  * @param mh the received message
1355  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1356  *         GNUNET_OK otherwise
1357  */
1358 int
1359 union_handle_p2p_message (struct OperationState *eo,
1360                           const struct GNUNET_MessageHeader *mh)
1361 {
1362   switch (ntohs (mh->type))
1363   {
1364     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1365       handle_p2p_ibf (eo, mh);
1366       break;
1367     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1368       handle_p2p_strata_estimator (eo, mh);
1369       break;
1370     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1371       handle_p2p_elements (eo, mh);
1372       break;
1373     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1374       handle_p2p_element_requests (eo, mh);
1375       break;
1376     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1377       handle_p2p_done (eo, mh);
1378       break;
1379     default:
1380       /* something wrong with mesh's message handlers? */
1381       GNUNET_assert (0);
1382   }
1383   return GNUNET_OK;
1384 }
1385
1386
1387 static void
1388 union_peer_disconnect (struct OperationState *op)
1389 {
1390   /* Are we already disconnected? */
1391   if (NULL == op->tunnel)
1392     return;
1393   op->tunnel = NULL;
1394   if (NULL != op->mq)
1395   {
1396     GNUNET_MQ_destroy (op->mq);
1397     op->mq = NULL;
1398   }
1399   if (PHASE_FINISHED != op->phase)
1400   {
1401     struct GNUNET_MQ_Envelope *ev;
1402     struct GNUNET_SET_ResultMessage *msg;
1403     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1404     msg->request_id = htonl (op->spec->client_request_id);
1405     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1406     msg->element_type = htons (0);
1407     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1408     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1409   }
1410   else
1411   {
1412     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1413   }
1414   union_operation_destroy (op);
1415 }
1416
1417
1418 static void
1419 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1420 {
1421   /* FIXME: implement */
1422 }
1423
1424
1425 const struct SetVT *
1426 _GSS_union_vt ()
1427 {
1428   static const struct SetVT union_vt = {
1429     .create = &union_set_create,
1430     .msg_handler = &union_handle_p2p_message,
1431     .add = &union_add,
1432     .remove = &union_remove,
1433     .destroy_set = &union_set_destroy,
1434     .evaluate = &union_evaluate,
1435     .accept = &union_accept,
1436     .peer_disconnect = &union_peer_disconnect,
1437     .cancel = &union_op_cancel
1438   };
1439
1440   return &union_vt;
1441 }