e512761cc9a23052d84eed3a93a51bba79230a76
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation.
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct OperationState
104 {
105   /**
106    * Tunnel to the remote peer.
107    */
108   struct GNUNET_MESH_Tunnel *tunnel;
109
110   /**
111    * Detail information about the set operation,
112    * including the set to use.
113    */
114   struct OperationSpecification *spec;
115
116   /**
117    * Message queue for the peer.
118    */
119   struct GNUNET_MQ_Handle *mq;
120
121   /**
122    * Number of ibf buckets received
123    */
124   unsigned int ibf_buckets_received;
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    */
145   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
146
147   /**
148    * Current state of the operation.
149    */
150   enum UnionOperationPhase phase;
151
152   /**
153    * Generation in which the operation handle
154    * was created.
155    */
156   unsigned int generation_created;
157
158   /**
159    * Set state of the set that this operation
160    * belongs to.
161    */
162   struct SetState *set_state;
163   
164   /**
165    * Evaluate operations are held in
166    * a linked list.
167    */
168   struct OperationState *next;
169   
170    /**
171     * Evaluate operations are held in
172     * a linked list.
173     */
174   struct OperationState *prev;
175 };
176
177
178 /**
179  * Information about an element element in the set.
180  * All elements are stored in a hash-table
181  * from their hash-code to their 'struct Element',
182  * so that the remove and add operations are reasonably
183  * fast.
184  */
185 struct ElementEntry
186 {
187   /**
188    * The actual element. The data for the element
189    * should be allocated at the end of this struct.
190    */
191   struct GNUNET_SET_Element element;
192
193   /**
194    * Hash of the element.
195    * Will be used to derive the different IBF keys
196    * for different salts.
197    */
198   struct GNUNET_HashCode element_hash;
199
200   /**
201    * Generation the element was added by the client.
202    * Operations of earlier generations will not consider the element.
203    */
204   unsigned int generation_added;
205
206   /**
207    * GNUNET_YES if the element has been removed in some generation.
208    */
209   int removed;
210
211   /**
212    * Generation the element was removed by the client. 
213    * Operations of later generations will not consider the element.
214    * Only valid if is_removed is GNUNET_YES.
215    */
216   unsigned int generation_removed;
217
218   /**
219    * GNUNET_YES if the element is a remote element, and does not belong
220    * to the operation's set.
221    */
222   int remote;
223 };
224
225
226 /**
227  * The key entry is used to associate an ibf key with
228  * an element.
229  */
230 struct KeyEntry
231 {
232   /**
233    * IBF key for the entry, derived from the current salt.
234    */
235   struct IBF_Key ibf_key;
236
237   /**
238    * The actual element associated with the key
239    */
240   struct ElementEntry *element;
241
242   /**
243    * Element that collides with this element
244    * on the ibf key
245    */
246   struct KeyEntry *next_colliding;
247 };
248
249
250 /**
251  * Used as a closure for sending elements
252  * with a specific IBF key.
253  */
254 struct SendElementClosure
255 {
256   /**
257    * The IBF key whose matching elements should be
258    * sent.
259    */
260   struct IBF_Key ibf_key;
261
262   /**
263    * Operation for which the elements
264    * should be sent.
265    */
266   struct OperationState *eo;
267 };
268
269
270 /**
271  * Extra state required for efficient set union.
272  */
273 struct SetState
274 {
275   /**
276    * The strata estimator is only generated once for
277    * each set.
278    * The IBF keys are derived from the element hashes with
279    * salt=0.
280    */
281   struct StrataEstimator *se;
282
283   /**
284    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
285    */
286   struct GNUNET_CONTAINER_MultiHashMap *elements;
287
288   /**
289    * Evaluate operations are held in
290    * a linked list.
291    */
292   struct OperationState *ops_head;
293
294   /**
295    * Evaluate operations are held in
296    * a linked list.
297    */
298   struct OperationState *ops_tail;
299
300   /**
301    * Current generation, that is, number of
302    * previously executed operations on this set
303    */
304   unsigned int current_generation;
305 };
306
307
308
309 /**
310  * Iterator over hash map entries.
311  *
312  * @param cls closure
313  * @param key current key code
314  * @param value value in the hash map
315  * @return GNUNET_YES if we should continue to
316  *         iterate,
317  *         GNUNET_NO if not.
318  */
319 static int
320 destroy_elements_iterator (void *cls,
321                            const struct GNUNET_HashCode * key,
322                            void *value)
323 {
324   struct ElementEntry *ee = value;
325
326   GNUNET_free (ee);
327   return GNUNET_YES;
328 }
329
330
331 /**
332  * Iterator over hash map entries.
333  *
334  * @param cls closure
335  * @param key current key code
336  * @param value value in the hash map
337  * @return GNUNET_YES if we should continue to
338  *         iterate,
339  *         GNUNET_NO if not.
340  */
341 static int
342 destroy_key_to_element_iter (void *cls,
343                              uint32_t key,
344                              void *value)
345 {
346   struct KeyEntry *k = value;
347   
348   while (NULL != k)
349   {
350     struct KeyEntry *k_tmp = k;
351     k = k->next_colliding;
352     if (GNUNET_YES == k_tmp->element->remote)
353     {
354       GNUNET_free (k_tmp->element);
355       k_tmp->element = NULL;
356     }
357     GNUNET_free (k_tmp);
358   }
359   return GNUNET_YES;
360 }
361
362
363 /**
364  * Destroy a union operation, and free all resources
365  * associated with it.
366  *
367  * @param eo the union operation to destroy
368  */
369 static void
370 union_operation_destroy (struct OperationState *eo)
371 {
372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
373   GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
374                                eo->set_state->ops_tail,
375                                eo);
376   if (NULL != eo->mq)
377   {
378     GNUNET_MQ_destroy (eo->mq);
379     eo->mq = NULL;
380   }
381   if (NULL != eo->tunnel)
382   {
383     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
384     eo->tunnel = NULL;
385     GNUNET_MESH_tunnel_destroy (t);
386   }
387   if (NULL != eo->remote_ibf)
388   {
389     ibf_destroy (eo->remote_ibf);
390     eo->remote_ibf = NULL;
391   }
392   if (NULL != eo->local_ibf)
393   {
394     ibf_destroy (eo->local_ibf);
395     eo->local_ibf = NULL;
396   }
397   if (NULL != eo->se)
398   {
399     strata_estimator_destroy (eo->se);
400     eo->se = NULL;
401   }
402   if (NULL != eo->key_to_element)
403   {
404     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
405     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
406     eo->key_to_element = NULL;
407   }
408   if (NULL != eo->spec)
409   {
410     if (NULL != eo->spec->context_msg)
411     {
412       GNUNET_free (eo->spec->context_msg);
413       eo->spec->context_msg = NULL;
414     }
415     GNUNET_free (eo->spec);
416     eo->spec = NULL;
417   }
418   GNUNET_free (eo);
419
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
421
422   /* FIXME: do a garbage collection of the set generations */
423 }
424
425
426 /**
427  * Inform the client that the union operation has failed,
428  * and proceed to destroy the evaluate operation.
429  *
430  * @param eo the union operation to fail
431  */
432 static void
433 fail_union_operation (struct OperationState *eo)
434 {
435   struct GNUNET_MQ_Envelope *ev;
436   struct GNUNET_SET_ResultMessage *msg;
437
438   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
439   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
440   msg->request_id = htonl (eo->spec->client_request_id);
441   msg->element_type = htons (0);
442   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
443   union_operation_destroy (eo);
444 }
445
446
447 /**
448  * Derive the IBF key from a hash code and 
449  * a salt.
450  *
451  * @param src the hash code
452  * @param salt salt to use
453  * @return the derived IBF key
454  */
455 static struct IBF_Key
456 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
457 {
458   struct IBF_Key key;
459
460   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
461                       GCRY_MD_SHA512, GCRY_MD_SHA256,
462                       src, sizeof *src,
463                       &salt, sizeof (salt),
464                       NULL, 0);
465   return key;
466 }
467
468
469 /**
470  * Send a request for the evaluate operation to a remote peer
471  *
472  * @param eo operation with the other peer
473  */
474 static void
475 send_operation_request (struct OperationState *eo)
476 {
477   struct GNUNET_MQ_Envelope *ev;
478   struct OperationRequestMessage *msg;
479
480   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
481                                 eo->spec->context_msg);
482
483   if (NULL == ev)
484   {
485     /* the context message is too large */
486     GNUNET_break (0);
487     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
488     return;
489   }
490   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
491   msg->app_id = eo->spec->app_id;
492   msg->salt = htonl (eo->spec->salt);
493   GNUNET_MQ_send (eo->mq, ev);
494
495   if (NULL != eo->spec->context_msg)
496   {
497     GNUNET_free (eo->spec->context_msg);
498     eo->spec->context_msg = NULL;
499   }
500
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
502 }
503
504
505 /**
506  * Iterator to create the mapping between ibf keys
507  * and element entries.
508  *
509  * @param cls closure
510  * @param key current key code
511  * @param value value in the hash map
512  * @return GNUNET_YES if we should continue to
513  *         iterate,
514  *         GNUNET_NO if not.
515  */
516 static int
517 insert_element_iterator (void *cls,
518                          uint32_t key,
519                          void *value)
520 {
521   struct KeyEntry *const new_k = cls;
522   struct KeyEntry *old_k = value;
523
524   GNUNET_assert (NULL != old_k);
525   do
526   {
527     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
528     {
529       new_k->next_colliding = old_k->next_colliding;
530       old_k->next_colliding = new_k;
531       return GNUNET_NO;
532     }
533     old_k = old_k->next_colliding;
534   } while (NULL != old_k);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Insert an element into the union operation's
541  * key-to-element mapping. Takes ownership of 'ee'.
542  *
543  * @param eo the union operation
544  * @param ee the element entry
545  */
546 static void
547 insert_element (struct OperationState *eo, struct ElementEntry *ee)
548 {
549   int ret;
550   struct IBF_Key ibf_key;
551   struct KeyEntry *k;
552
553   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
554   k = GNUNET_new (struct KeyEntry);
555   k->element = ee;
556   k->ibf_key = ibf_key;
557   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
558                                                       (uint32_t) ibf_key.key_val,
559                                                       insert_element_iterator, k);
560
561   /* was the element inserted into a colliding bucket? */
562   if (GNUNET_SYSERR == ret)
563     return;
564
565   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
566                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
567 }
568
569
570 /**
571  * Insert a key into an ibf.
572  *
573  * @param cls the ibf
574  * @param key unused
575  * @param value the key entry to get the key from
576  */
577 static int
578 prepare_ibf_iterator (void *cls,
579                       uint32_t key,
580                       void *value)
581 {
582   struct InvertibleBloomFilter *ibf = cls;
583   struct KeyEntry *ke = value;
584
585   ibf_insert (ibf, ke->ibf_key);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Iterator for initializing the
592  * key-to-element mapping of a union operation
593  *
594  * @param cls the union operation
595  * @param key unised
596  * @param value the element entry to insert
597  *        into the key-to-element mapping
598  */
599 static int
600 init_key_to_element_iterator (void *cls,
601                               const struct GNUNET_HashCode *key,
602                               void *value)
603 {
604   struct OperationState *eo = cls;
605   struct ElementEntry *e = value;
606
607   /* make sure that the element belongs to the set at the time
608    * of creating the operation */
609   if ( (e->generation_added > eo->generation_created) ||
610        ( (GNUNET_YES == e->removed) &&
611          (e->generation_removed < eo->generation_created)))
612     return GNUNET_YES;
613
614   e->remote = GNUNET_NO;
615
616   insert_element (eo, e);
617   return GNUNET_YES;
618 }
619
620
621 /**
622  * Create an ibf with the operation's elements
623  * of the specified size
624  *
625  * @param eo the union operation
626  * @param size size of the ibf to create
627  */
628 static void
629 prepare_ibf (struct OperationState *eo, uint16_t size)
630 {
631   if (NULL == eo->key_to_element)
632   {
633     unsigned int len;
634     len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
635     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
636     GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
637                                            init_key_to_element_iterator, eo);
638   }
639   if (NULL != eo->local_ibf)
640     ibf_destroy (eo->local_ibf);
641   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
642   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
643                                            prepare_ibf_iterator, eo->local_ibf);
644 }
645
646
647 /**
648  * Send an ibf of appropriate size.
649  *
650  * @param eo the union operation
651  * @param ibf_order order of the ibf to send, size=2^order
652  */
653 static void
654 send_ibf (struct OperationState *eo, uint16_t ibf_order)
655 {
656   unsigned int buckets_sent = 0;
657   struct InvertibleBloomFilter *ibf;
658
659   prepare_ibf (eo, 1<<ibf_order);
660
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
662
663   ibf = eo->local_ibf;
664
665   while (buckets_sent < (1 << ibf_order))
666   {
667     unsigned int buckets_in_message;
668     struct GNUNET_MQ_Envelope *ev;
669     struct IBFMessage *msg;
670
671     buckets_in_message = (1 << ibf_order) - buckets_sent;
672     /* limit to maximum */
673     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
674       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
675
676     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
677                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
678     msg->reserved = 0;
679     msg->order = ibf_order;
680     msg->offset = htons (buckets_sent);
681     ibf_write_slice (ibf, buckets_sent,
682                      buckets_in_message, &msg[1]);
683     buckets_sent += buckets_in_message;
684     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
685                 buckets_in_message, buckets_sent, 1<<ibf_order);
686     GNUNET_MQ_send (eo->mq, ev);
687   }
688
689   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
690 }
691
692
693 /**
694  * Send a strata estimator to the remote peer.
695  *
696  * @param eo the union operation with the remote peer
697  */
698 static void
699 send_strata_estimator (struct OperationState *eo)
700 {
701   struct GNUNET_MQ_Envelope *ev;
702   struct GNUNET_MessageHeader *strata_msg;
703
704   ev = GNUNET_MQ_msg_header_extra (strata_msg,
705                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
706                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
707   strata_estimator_write (eo->set_state->se, &strata_msg[1]);
708   GNUNET_MQ_send (eo->mq, ev);
709   eo->phase = PHASE_EXPECT_IBF;
710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
711 }
712
713
714 /**
715  * Compute the necessary order of an ibf
716  * from the size of the symmetric set difference.
717  *
718  * @param diff the difference
719  * @return the required size of the ibf
720  */
721 static unsigned int
722 get_order_from_difference (unsigned int diff)
723 {
724   unsigned int ibf_order;
725
726   ibf_order = 2;
727   while ((1<<ibf_order) < (2 * diff))
728     ibf_order++;
729   if (ibf_order > MAX_IBF_ORDER)
730     ibf_order = MAX_IBF_ORDER;
731   return ibf_order;
732 }
733
734
735 /**
736  * Handle a strata estimator from a remote peer
737  *
738  * @param cls the union operation
739  * @param mh the message
740  */
741 static void
742 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
743 {
744   struct OperationState *eo = cls;
745   struct StrataEstimator *remote_se;
746   int diff;
747
748   if (eo->phase != PHASE_EXPECT_SE)
749   {
750     fail_union_operation (eo);
751     GNUNET_break (0);
752     return;
753   }
754   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
755                                        SE_IBF_HASH_NUM);
756   strata_estimator_read (&mh[1], remote_se);
757   GNUNET_assert (NULL != eo->se);
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, calculating diff\n");
759   diff = strata_estimator_difference (remote_se, eo->se);
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "se diff=%d\n", diff);
761   strata_estimator_destroy (remote_se);
762   strata_estimator_destroy (eo->se);
763   eo->se = NULL;
764   send_ibf (eo, get_order_from_difference (diff));
765 }
766
767
768
769 /**
770  * Iterator to send elements to a remote peer
771  *
772  * @param cls closure with the element key and the union operation
773  * @param key ignored
774  * @param value the key entry
775  */
776 static int
777 send_element_iterator (void *cls,
778                        uint32_t key,
779                        void *value)
780 {
781   struct SendElementClosure *sec = cls;
782   struct IBF_Key ibf_key = sec->ibf_key;
783   struct OperationState *eo = sec->eo;
784   struct KeyEntry *ke = value;
785
786   if (ke->ibf_key.key_val != ibf_key.key_val)
787     return GNUNET_YES;
788   while (NULL != ke)
789   {
790     const struct GNUNET_SET_Element *const element = &ke->element->element;
791     struct GNUNET_MQ_Envelope *ev;
792     struct GNUNET_MessageHeader *mh;
793
794     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
795     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
796     if (NULL == ev)
797     {
798       /* element too large */
799       GNUNET_break (0);
800       continue;
801     }
802     memcpy (&mh[1], element->data, element->size);
803     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to peer\n");
804     GNUNET_MQ_send (eo->mq, ev);
805     ke = ke->next_colliding;
806   }
807   return GNUNET_NO;
808 }
809
810 /**
811  * Send all elements that have the specified IBF key
812  * to the remote peer of the union operation
813  *
814  * @param eo union operation
815  * @param ibf_key IBF key of interest
816  */
817 static void
818 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
819 {
820   struct SendElementClosure send_cls;
821
822   send_cls.ibf_key = ibf_key;
823   send_cls.eo = eo;
824   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
825                                                 &send_element_iterator, &send_cls);
826 }
827
828
829 /**
830  * Decode which elements are missing on each side, and
831  * send the appropriate elemens and requests
832  *
833  * @param eo union operation
834  */
835 static void
836 decode_and_send (struct OperationState *eo)
837 {
838   struct IBF_Key key;
839   int side;
840   unsigned int num_decoded;
841   struct InvertibleBloomFilter *diff_ibf;
842
843   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
844
845   prepare_ibf (eo, eo->remote_ibf->size);
846   diff_ibf = ibf_dup (eo->local_ibf);
847   ibf_subtract (diff_ibf, eo->remote_ibf);
848   
849   ibf_destroy (eo->remote_ibf);
850   eo->remote_ibf = NULL;
851
852   num_decoded = 0;
853
854   while (1)
855   {
856     int res;
857
858     res = ibf_decode (diff_ibf, &side, &key);
859     num_decoded += 1;
860     if (num_decoded > diff_ibf->size)
861       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf\n");
862     if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size))
863     {
864       int next_order;
865       next_order = 0;
866       while (1<<next_order < diff_ibf->size)
867         next_order++;
868       next_order++;
869       if (next_order <= MAX_IBF_ORDER)
870       {
871         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
872                     "decoding failed, sending larger ibf (size %u)\n",
873                     1<<next_order);
874         send_ibf (eo, next_order);
875       }
876       else
877       {
878         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
879                     "set union failed: reached ibf limit\n");
880       }
881       break;
882     }
883     if (GNUNET_NO == res)
884     {
885       struct GNUNET_MQ_Envelope *ev;
886
887       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
888       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
889       GNUNET_MQ_send (eo->mq, ev);
890       break;
891     }
892     if (1 == side)
893     {
894       send_elements_for_key (eo, key);
895     }
896     else
897     {
898       struct GNUNET_MQ_Envelope *ev;
899       struct GNUNET_MessageHeader *msg;
900
901       /* FIXME: before sending the request, check if we may just have the element */
902       /* FIXME: merge multiple requests */
903       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
904                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
905       *(struct IBF_Key *) &msg[1] = key;
906       GNUNET_MQ_send (eo->mq, ev);
907     }
908   }
909   ibf_destroy (diff_ibf);
910 }
911
912
913 /**
914  * Handle an IBF message from a remote peer.
915  *
916  * @param cls the union operation
917  * @param mh the header of the message
918  */
919 static void
920 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
921 {
922   struct OperationState *eo = cls;
923   struct IBFMessage *msg = (struct IBFMessage *) mh;
924   unsigned int buckets_in_message;
925
926   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
927        (eo->phase == PHASE_EXPECT_IBF) )
928   {
929     eo->phase = PHASE_EXPECT_IBF_CONT;
930     GNUNET_assert (NULL == eo->remote_ibf);
931     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
932     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
933     eo->ibf_buckets_received = 0;
934     if (0 != ntohs (msg->offset))
935     {
936       GNUNET_break (0);
937       fail_union_operation (eo);
938       return;
939     }
940   }
941   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
942   {
943     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
944          (1<<msg->order != eo->remote_ibf->size) )
945     {
946       GNUNET_break (0);
947       fail_union_operation (eo);
948       return;
949     }
950   }
951
952   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
953
954   if (0 == buckets_in_message)
955   {
956     GNUNET_break_op (0);
957     fail_union_operation (eo);
958     return;
959   }
960
961   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
962   {
963     GNUNET_break (0);
964     fail_union_operation (eo);
965     return;
966   }
967   
968   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
969   eo->ibf_buckets_received += buckets_in_message;
970
971   if (eo->ibf_buckets_received == eo->remote_ibf->size)
972   {
973
974     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
975     eo->phase = PHASE_EXPECT_ELEMENTS;
976     decode_and_send (eo);
977   }
978 }
979
980
981 /**
982  * Send a result message to the client indicating
983  * that there is a new element.
984  *
985  * @param eo union operation
986  * @param element element to send
987  */
988 static void
989 send_client_element (struct OperationState *eo,
990                      struct GNUNET_SET_Element *element)
991 {
992   struct GNUNET_MQ_Envelope *ev;
993   struct GNUNET_SET_ResultMessage *rm;
994
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size);
996   GNUNET_assert (0 != eo->spec->client_request_id);
997   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
998   if (NULL == ev)
999   {
1000     GNUNET_MQ_discard (ev);
1001     GNUNET_break (0);
1002     return;
1003   }
1004   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1005   rm->request_id = htonl (eo->spec->client_request_id);
1006   rm->element_type = element->type;
1007   memcpy (&rm[1], element->data, element->size);
1008   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1009 }
1010
1011
1012 /**
1013  * Send a result message to the client indicating
1014  * that the operation is over.
1015  * After the result done message has been sent to the client,
1016  * destroy the evaluate operation.
1017  *
1018  * @param eo union operation
1019  */
1020 static void
1021 send_client_done_and_destroy (struct OperationState *eo)
1022 {
1023   struct GNUNET_MQ_Envelope *ev;
1024   struct GNUNET_SET_ResultMessage *rm;
1025
1026   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1027   rm->request_id = htonl (eo->spec->client_request_id);
1028   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1029   rm->element_type = htons (0);
1030   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1031
1032 }
1033
1034
1035 /**
1036  * Handle an element message from a remote peer.
1037  *
1038  * @param cls the union operation
1039  * @param mh the message
1040  */
1041 static void
1042 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1043 {
1044   struct OperationState *eo = cls;
1045   struct ElementEntry *ee;
1046   uint16_t element_size;
1047
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1049
1050   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1051        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1052   {
1053     fail_union_operation (eo);
1054     GNUNET_break (0);
1055     return;
1056   }
1057   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1058   ee = GNUNET_malloc (sizeof *eo + element_size);
1059   memcpy (&ee[1], &mh[1], element_size);
1060   ee->element.size = element_size;
1061   ee->element.data = &ee[1];
1062   ee->remote = GNUNET_YES;
1063
1064   insert_element (eo, ee);
1065   send_client_element (eo, &ee->element);
1066 }
1067
1068
1069 /**
1070  * Handle an element request from a remote peer.
1071  *
1072  * @param cls the union operation
1073  * @param mh the message
1074  */
1075 static void
1076 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1077 {
1078   struct OperationState *eo = cls;
1079   struct IBF_Key *ibf_key;
1080   unsigned int num_keys;
1081
1082   /* look up elements and send them */
1083   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1084   {
1085     GNUNET_break (0);
1086     fail_union_operation (eo);
1087     return;
1088   }
1089
1090   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1091
1092   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1093   {
1094     GNUNET_break (0);
1095     fail_union_operation (eo);
1096     return;
1097   }
1098
1099   ibf_key = (struct IBF_Key *) &mh[1];
1100   while (0 != num_keys--)
1101   {
1102     send_elements_for_key (eo, *ibf_key);
1103     ibf_key++;
1104   }
1105 }
1106
1107
1108 /**
1109  * Callback used for notifications
1110  *
1111  * @param cls closure
1112  */
1113 static void
1114 peer_done_sent_cb (void *cls)
1115 {
1116   struct OperationState *eo = cls;
1117
1118   send_client_done_and_destroy (eo);
1119 }
1120
1121
1122 /**
1123  * Handle a done message from a remote peer
1124  * 
1125  * @param cls the union operation
1126  * @param mh the message
1127  */
1128 static void
1129 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1130 {
1131   struct OperationState *eo = cls;
1132
1133   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1134   {
1135     /* we got all requests, but still have to send our elements as response */
1136     struct GNUNET_MQ_Envelope *ev;
1137
1138     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1139     eo->phase = PHASE_FINISHED;
1140     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1141     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1142     GNUNET_MQ_send (eo->mq, ev);
1143     return;
1144   }
1145   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1146   {
1147     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1148     eo->phase = PHASE_FINISHED;
1149     send_client_done_and_destroy (eo);
1150     return;
1151   }
1152   GNUNET_break (0);
1153   fail_union_operation (eo);
1154 }
1155
1156
1157 /**
1158  * Evaluate a union operation with
1159  * a remote peer.
1160  *
1161  * @param spec specification of the operation the evaluate
1162  * @param tunnel tunnel already connected to the partner peer
1163  * @param tc tunnel context, passed here so all new incoming
1164  *        messages are directly going to the union operations
1165  * @return a handle to the operation
1166  */
1167 static void
1168 union_evaluate (struct OperationSpecification *spec,
1169                 struct GNUNET_MESH_Tunnel *tunnel,
1170                 struct TunnelContext *tc)
1171 {
1172   struct OperationState *eo;
1173
1174   eo = GNUNET_new (struct OperationState);
1175   tc->vt = _GSS_union_vt ();
1176   tc->op = eo;
1177   eo->se = strata_estimator_dup (spec->set->state->se);
1178   eo->set_state = spec->set->state;
1179   eo->spec = spec;
1180   eo->tunnel = tunnel;
1181   eo->mq = GNUNET_MESH_mq_create (tunnel);
1182
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1184               "evaluating union operation, (app %s)\n", 
1185               GNUNET_h2s (&eo->spec->app_id));
1186
1187   /* we started the operation, thus we have to send the operation request */
1188   eo->phase = PHASE_EXPECT_SE;
1189
1190   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1191                                eo->set_state->ops_tail,
1192                                eo);
1193
1194   send_operation_request (eo);
1195 }
1196
1197
1198 /**
1199  * Accept an union operation request from a remote peer
1200  *
1201  * @param spec all necessary information about the operation
1202  * @param tunnel open tunnel to the partner's peer
1203  * @param tc tunnel context, passed here so all new incoming
1204  *        messages are directly going to the union operations
1205  * @return operation
1206  */
1207 static void
1208 union_accept (struct OperationSpecification *spec,
1209               struct GNUNET_MESH_Tunnel *tunnel,
1210               struct TunnelContext *tc)
1211 {
1212   struct OperationState *eo;
1213
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1215
1216   eo = GNUNET_new (struct OperationState);
1217   tc->vt = _GSS_union_vt ();
1218   tc->op = eo;
1219   eo->set_state = spec->set->state;
1220   eo->generation_created = eo->set_state->current_generation++;
1221   eo->spec = spec;
1222   eo->tunnel = tunnel;
1223   eo->mq = GNUNET_MESH_mq_create (tunnel);
1224   eo->se = strata_estimator_dup (eo->set_state->se);
1225   /* transfer ownership of mq and socket from incoming to eo */
1226   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1227                                eo->set_state->ops_tail,
1228                                eo);
1229   /* kick off the operation */
1230   send_strata_estimator (eo);
1231 }
1232
1233
1234 /**
1235  * Create a new set supporting the union operation
1236  *
1237  * @return the newly created set
1238  */
1239 static struct SetState *
1240 union_set_create (void)
1241 {
1242   struct SetState *set_state;
1243
1244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1245   
1246   set_state = GNUNET_new (struct SetState);
1247   /* keys of the hash map are stored in the element entrys, thus we do not
1248    * want the hash map to copy them */
1249   set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1250   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1251                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1252   return set_state;
1253 }
1254
1255
1256 /**
1257  * Add the element from the given element message to the set.
1258  *
1259  * @param set_state state of the set want to add to
1260  * @param element the element to add to the set
1261  */
1262 static void
1263 union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1264 {
1265   struct ElementEntry *ee;
1266   struct ElementEntry *ee_dup;
1267
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
1269
1270   ee = GNUNET_malloc (element->size + sizeof *ee);
1271   ee->element.size = element->size;
1272   memcpy (&ee[1], element->data, element->size);
1273   ee->element.data = &ee[1];
1274   ee->generation_added = set_state->current_generation;
1275   ee->remote = GNUNET_NO;
1276   GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
1277   ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
1278                                               &ee->element_hash);
1279   if (NULL != ee_dup)
1280   {
1281     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1282     GNUNET_free (ee);
1283     return;
1284   }
1285   GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
1286                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1287   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1288 }
1289
1290
1291 /**
1292  * Destroy a set that supports the union operation
1293  *
1294  * @param set_state the set to destroy
1295  */
1296 static void
1297 union_set_destroy (struct SetState *set_state)
1298 {
1299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1300   /* important to destroy operations before the rest of the set */
1301   while (NULL != set_state->ops_head)
1302     union_operation_destroy (set_state->ops_head);
1303   if (NULL != set_state->se)
1304   {
1305     strata_estimator_destroy (set_state->se);
1306     set_state->se = NULL;
1307   }
1308   if (NULL != set_state->elements)
1309   {
1310     GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
1311                                            destroy_elements_iterator, NULL);
1312     GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
1313     set_state->elements = NULL;
1314   }
1315
1316   GNUNET_free (set_state);
1317 }
1318
1319 /**
1320  * Remove the element given in the element message from the set.
1321  * Only marks the element as removed, so that older set operations can still exchange it.
1322  *
1323  * @param set_state state of the set to remove from
1324  * @param element set element to remove
1325  */
1326 static void
1327 union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1328 {
1329   struct GNUNET_HashCode hash;
1330   struct ElementEntry *ee;
1331
1332   GNUNET_CRYPTO_hash (element->data, element->size, &hash);
1333   ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
1334   if (NULL == ee)
1335   {
1336     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1337     return;
1338   }
1339   if (GNUNET_YES == ee->removed)
1340   {
1341     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1342     return;
1343   }
1344   ee->removed = GNUNET_YES;
1345   ee->generation_removed = set_state->current_generation;
1346 }
1347
1348
1349 /**
1350  * Dispatch messages for a union operation.
1351  *
1352  * @param eo the state of the union evaluate operation
1353  * @param mh the received message
1354  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1355  *         GNUNET_OK otherwise
1356  */
1357 int
1358 union_handle_p2p_message (struct OperationState *eo,
1359                           const struct GNUNET_MessageHeader *mh)
1360 {
1361   switch (ntohs (mh->type))
1362   {
1363     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1364       handle_p2p_ibf (eo, mh);
1365       break;
1366     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1367       handle_p2p_strata_estimator (eo, mh);
1368       break;
1369     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1370       handle_p2p_elements (eo, mh);
1371       break;
1372     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1373       handle_p2p_element_requests (eo, mh);
1374       break;
1375     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1376       handle_p2p_done (eo, mh);
1377       break;
1378     default:
1379       /* something wrong with mesh's message handlers? */
1380       GNUNET_assert (0);
1381   }
1382   return GNUNET_OK;
1383 }
1384
1385
1386 static void
1387 union_peer_disconnect (struct OperationState *op)
1388 {
1389   /* Are we already disconnected? */
1390   if (NULL == op->tunnel)
1391     return;
1392   op->tunnel = NULL;
1393   if (NULL != op->mq)
1394   {
1395     GNUNET_MQ_destroy (op->mq);
1396     op->mq = NULL;
1397   }
1398   if (PHASE_FINISHED != op->phase)
1399   {
1400     struct GNUNET_MQ_Envelope *ev;
1401     struct GNUNET_SET_ResultMessage *msg;
1402     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1403     msg->request_id = htonl (op->spec->client_request_id);
1404     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1405     msg->element_type = htons (0);
1406     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1407     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1408   }
1409   else
1410   {
1411     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1412   }
1413   union_operation_destroy (op);
1414 }
1415
1416
1417 static void
1418 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1419 {
1420   /* FIXME: implement */
1421 }
1422
1423
1424 const struct SetVT *
1425 _GSS_union_vt ()
1426 {
1427   static const struct SetVT union_vt = {
1428     .create = &union_set_create,
1429     .msg_handler = &union_handle_p2p_message,
1430     .add = &union_add,
1431     .remove = &union_remove,
1432     .destroy_set = &union_set_destroy,
1433     .evaluate = &union_evaluate,
1434     .accept = &union_accept,
1435     .peer_disconnect = &union_peer_disconnect,
1436     .cancel = &union_op_cancel
1437   };
1438
1439   return &union_vt;
1440 }