-bring set/gns into the fold of non-experimental subsystems
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation.
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct OperationState
104 {
105   /**
106    * Tunnel to the remote peer.
107    */
108   struct GNUNET_MESH_Tunnel *tunnel;
109
110   /**
111    * Detail information about the set operation,
112    * including the set to use.
113    */
114   struct OperationSpecification *spec;
115
116   /**
117    * Message queue for the peer.
118    */
119   struct GNUNET_MQ_Handle *mq;
120
121   /**
122    * Number of ibf buckets received
123    */
124   unsigned int ibf_buckets_received;
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    */
145   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
146
147   /**
148    * Current state of the operation.
149    */
150   enum UnionOperationPhase phase;
151
152   /**
153    * Generation in which the operation handle
154    * was created.
155    */
156   unsigned int generation_created;
157
158   /**
159    * Set state of the set that this operation
160    * belongs to.
161    */
162   struct SetState *set_state;
163   
164   /**
165    * Evaluate operations are held in
166    * a linked list.
167    */
168   struct OperationState *next;
169   
170    /**
171     * Evaluate operations are held in
172     * a linked list.
173     */
174   struct OperationState *prev;
175 };
176
177
178 /**
179  * Information about an element element in the set.
180  * All elements are stored in a hash-table
181  * from their hash-code to their 'struct Element',
182  * so that the remove and add operations are reasonably
183  * fast.
184  */
185 struct ElementEntry
186 {
187   /**
188    * The actual element. The data for the element
189    * should be allocated at the end of this struct.
190    */
191   struct GNUNET_SET_Element element;
192
193   /**
194    * Hash of the element.
195    * Will be used to derive the different IBF keys
196    * for different salts.
197    */
198   struct GNUNET_HashCode element_hash;
199
200   /**
201    * Generation the element was added by the client.
202    * Operations of earlier generations will not consider the element.
203    */
204   unsigned int generation_added;
205
206   /**
207    * GNUNET_YES if the element has been removed in some generation.
208    */
209   int removed;
210
211   /**
212    * Generation the element was removed by the client. 
213    * Operations of later generations will not consider the element.
214    * Only valid if is_removed is GNUNET_YES.
215    */
216   unsigned int generation_removed;
217
218   /**
219    * GNUNET_YES if the element is a remote element, and does not belong
220    * to the operation's set.
221    */
222   int remote;
223 };
224
225
226 /**
227  * The key entry is used to associate an ibf key with
228  * an element.
229  */
230 struct KeyEntry
231 {
232   /**
233    * IBF key for the entry, derived from the current salt.
234    */
235   struct IBF_Key ibf_key;
236
237   /**
238    * The actual element associated with the key
239    */
240   struct ElementEntry *element;
241
242   /**
243    * Element that collides with this element
244    * on the ibf key
245    */
246   struct KeyEntry *next_colliding;
247 };
248
249
250 /**
251  * Used as a closure for sending elements
252  * with a specific IBF key.
253  */
254 struct SendElementClosure
255 {
256   /**
257    * The IBF key whose matching elements should be
258    * sent.
259    */
260   struct IBF_Key ibf_key;
261
262   /**
263    * Operation for which the elements
264    * should be sent.
265    */
266   struct OperationState *eo;
267 };
268
269
270 /**
271  * Extra state required for efficient set union.
272  */
273 struct SetState
274 {
275   /**
276    * The strata estimator is only generated once for
277    * each set.
278    * The IBF keys are derived from the element hashes with
279    * salt=0.
280    */
281   struct StrataEstimator *se;
282
283   /**
284    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
285    */
286   struct GNUNET_CONTAINER_MultiHashMap *elements;
287
288   /**
289    * Evaluate operations are held in
290    * a linked list.
291    */
292   struct OperationState *ops_head;
293
294   /**
295    * Evaluate operations are held in
296    * a linked list.
297    */
298   struct OperationState *ops_tail;
299
300   /**
301    * Current generation, that is, number of
302    * previously executed operations on this set
303    */
304   unsigned int current_generation;
305 };
306
307
308
309 /**
310  * Iterator over hash map entries.
311  *
312  * @param cls closure
313  * @param key current key code
314  * @param value value in the hash map
315  * @return GNUNET_YES if we should continue to
316  *         iterate,
317  *         GNUNET_NO if not.
318  */
319 static int
320 destroy_elements_iterator (void *cls,
321                            const struct GNUNET_HashCode * key,
322                            void *value)
323 {
324   struct ElementEntry *ee = value;
325
326   GNUNET_free (ee);
327   return GNUNET_YES;
328 }
329
330
331 /**
332  * Iterator over hash map entries.
333  *
334  * @param cls closure
335  * @param key current key code
336  * @param value value in the hash map
337  * @return GNUNET_YES if we should continue to
338  *         iterate,
339  *         GNUNET_NO if not.
340  */
341 static int
342 destroy_key_to_element_iter (void *cls,
343                              uint32_t key,
344                              void *value)
345 {
346   struct KeyEntry *k = value;
347   
348   while (NULL != k)
349   {
350     struct KeyEntry *k_tmp = k;
351     k = k->next_colliding;
352     if (GNUNET_YES == k_tmp->element->remote)
353     {
354       GNUNET_free (k_tmp->element);
355       k_tmp->element = NULL;
356     }
357     GNUNET_free (k_tmp);
358   }
359   return GNUNET_YES;
360 }
361
362
363 /**
364  * Destroy a union operation, and free all resources
365  * associated with it.
366  *
367  * @param eo the union operation to destroy
368  */
369 static void
370 union_operation_destroy (struct OperationState *eo)
371 {
372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
373   GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
374                                eo->set_state->ops_tail,
375                                eo);
376   if (NULL != eo->mq)
377   {
378     GNUNET_MQ_destroy (eo->mq);
379     eo->mq = NULL;
380   }
381   if (NULL != eo->tunnel)
382   {
383     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
384     eo->tunnel = NULL;
385     GNUNET_MESH_tunnel_destroy (t);
386   }
387   if (NULL != eo->remote_ibf)
388   {
389     ibf_destroy (eo->remote_ibf);
390     eo->remote_ibf = NULL;
391   }
392   if (NULL != eo->local_ibf)
393   {
394     ibf_destroy (eo->local_ibf);
395     eo->local_ibf = NULL;
396   }
397   if (NULL != eo->se)
398   {
399     strata_estimator_destroy (eo->se);
400     eo->se = NULL;
401   }
402   if (NULL != eo->key_to_element)
403   {
404     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
405     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
406     eo->key_to_element = NULL;
407   }
408   if (NULL != eo->spec)
409   {
410     if (NULL != eo->spec->context_msg)
411     {
412       GNUNET_free (eo->spec->context_msg);
413       eo->spec->context_msg = NULL;
414     }
415     GNUNET_free (eo->spec);
416     eo->spec = NULL;
417   }
418   GNUNET_free (eo);
419
420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
421
422   /* FIXME: do a garbage collection of the set generations */
423 }
424
425
426 /**
427  * Inform the client that the union operation has failed,
428  * and proceed to destroy the evaluate operation.
429  *
430  * @param eo the union operation to fail
431  */
432 static void
433 fail_union_operation (struct OperationState *eo)
434 {
435   struct GNUNET_MQ_Envelope *ev;
436   struct GNUNET_SET_ResultMessage *msg;
437
438   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
439   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
440   msg->request_id = htonl (eo->spec->client_request_id);
441   msg->element_type = htons (0);
442   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
443   union_operation_destroy (eo);
444 }
445
446
447 /**
448  * Derive the IBF key from a hash code and 
449  * a salt.
450  *
451  * @param src the hash code
452  * @param salt salt to use
453  * @return the derived IBF key
454  */
455 static struct IBF_Key
456 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
457 {
458   struct IBF_Key key;
459
460   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
461                       GCRY_MD_SHA512, GCRY_MD_SHA256,
462                       src, sizeof *src,
463                       &salt, sizeof (salt),
464                       NULL, 0);
465   return key;
466 }
467
468
469 /**
470  * Send a request for the evaluate operation to a remote peer
471  *
472  * @param eo operation with the other peer
473  */
474 static void
475 send_operation_request (struct OperationState *eo)
476 {
477   struct GNUNET_MQ_Envelope *ev;
478   struct OperationRequestMessage *msg;
479
480   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
481                                 eo->spec->context_msg);
482
483   if (NULL == ev)
484   {
485     /* the context message is too large */
486     GNUNET_break (0);
487     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
488     return;
489   }
490   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
491   msg->app_id = eo->spec->app_id;
492   msg->salt = htonl (eo->spec->salt);
493   GNUNET_MQ_send (eo->mq, ev);
494
495   if (NULL != eo->spec->context_msg)
496   {
497     GNUNET_free (eo->spec->context_msg);
498     eo->spec->context_msg = NULL;
499   }
500
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
502 }
503
504
505 /**
506  * Iterator to create the mapping between ibf keys
507  * and element entries.
508  *
509  * @param cls closure
510  * @param key current key code
511  * @param value value in the hash map
512  * @return GNUNET_YES if we should continue to
513  *         iterate,
514  *         GNUNET_NO if not.
515  */
516 static int
517 insert_element_iterator (void *cls,
518                          uint32_t key,
519                          void *value)
520 {
521   struct KeyEntry *const new_k = cls;
522   struct KeyEntry *old_k = value;
523
524   GNUNET_assert (NULL != old_k);
525   do
526   {
527     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
528     {
529       new_k->next_colliding = old_k->next_colliding;
530       old_k->next_colliding = new_k;
531       return GNUNET_NO;
532     }
533     old_k = old_k->next_colliding;
534   } while (NULL != old_k);
535   return GNUNET_YES;
536 }
537
538
539 /**
540  * Insert an element into the union operation's
541  * key-to-element mapping. Takes ownership of 'ee'.
542  *
543  * @param eo the union operation
544  * @param ee the element entry
545  */
546 static void
547 insert_element (struct OperationState *eo, struct ElementEntry *ee)
548 {
549   int ret;
550   struct IBF_Key ibf_key;
551   struct KeyEntry *k;
552
553   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
554   k = GNUNET_new (struct KeyEntry);
555   k->element = ee;
556   k->ibf_key = ibf_key;
557   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
558                                                       (uint32_t) ibf_key.key_val,
559                                                       insert_element_iterator, k);
560
561   /* was the element inserted into a colliding bucket? */
562   if (GNUNET_SYSERR == ret)
563     return;
564
565   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
566                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
567 }
568
569
570 /**
571  * Insert a key into an ibf.
572  *
573  * @param cls the ibf
574  * @param key unused
575  * @param value the key entry to get the key from
576  */
577 static int
578 prepare_ibf_iterator (void *cls,
579                       uint32_t key,
580                       void *value)
581 {
582   struct InvertibleBloomFilter *ibf = cls;
583   struct KeyEntry *ke = value;
584
585   ibf_insert (ibf, ke->ibf_key);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Iterator for initializing the
592  * key-to-element mapping of a union operation
593  *
594  * @param cls the union operation
595  * @param key unised
596  * @param value the element entry to insert
597  *        into the key-to-element mapping
598  */
599 static int
600 init_key_to_element_iterator (void *cls,
601                               const struct GNUNET_HashCode *key,
602                               void *value)
603 {
604   struct OperationState *eo = cls;
605   struct ElementEntry *e = value;
606
607   /* make sure that the element belongs to the set at the time
608    * of creating the operation */
609   if ( (e->generation_added > eo->generation_created) ||
610        ( (GNUNET_YES == e->removed) &&
611          (e->generation_removed < eo->generation_created)))
612     return GNUNET_YES;
613
614   e->remote = GNUNET_NO;
615
616   insert_element (eo, e);
617   return GNUNET_YES;
618 }
619
620
621 /**
622  * Create an ibf with the operation's elements
623  * of the specified size
624  *
625  * @param eo the union operation
626  * @param size size of the ibf to create
627  */
628 static void
629 prepare_ibf (struct OperationState *eo, uint16_t size)
630 {
631   if (NULL == eo->key_to_element)
632   {
633     unsigned int len;
634     len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
635     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
636     GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
637                                            init_key_to_element_iterator, eo);
638   }
639   if (NULL != eo->local_ibf)
640     ibf_destroy (eo->local_ibf);
641   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
642   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
643                                            prepare_ibf_iterator, eo->local_ibf);
644 }
645
646
647 /**
648  * Send an ibf of appropriate size.
649  *
650  * @param eo the union operation
651  * @param ibf_order order of the ibf to send, size=2^order
652  */
653 static void
654 send_ibf (struct OperationState *eo, uint16_t ibf_order)
655 {
656   unsigned int buckets_sent = 0;
657   struct InvertibleBloomFilter *ibf;
658
659   prepare_ibf (eo, 1<<ibf_order);
660
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
662
663   ibf = eo->local_ibf;
664
665   while (buckets_sent < (1 << ibf_order))
666   {
667     unsigned int buckets_in_message;
668     struct GNUNET_MQ_Envelope *ev;
669     struct IBFMessage *msg;
670
671     buckets_in_message = (1 << ibf_order) - buckets_sent;
672     /* limit to maximum */
673     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
674       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
675
676     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
677                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
678     msg->reserved = 0;
679     msg->order = ibf_order;
680     msg->offset = htons (buckets_sent);
681     ibf_write_slice (ibf, buckets_sent,
682                      buckets_in_message, &msg[1]);
683     buckets_sent += buckets_in_message;
684     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
685                 buckets_in_message, buckets_sent, 1<<ibf_order);
686     GNUNET_MQ_send (eo->mq, ev);
687   }
688
689   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
690 }
691
692
693 /**
694  * Send a strata estimator to the remote peer.
695  *
696  * @param eo the union operation with the remote peer
697  */
698 static void
699 send_strata_estimator (struct OperationState *eo)
700 {
701   struct GNUNET_MQ_Envelope *ev;
702   struct GNUNET_MessageHeader *strata_msg;
703
704   ev = GNUNET_MQ_msg_header_extra (strata_msg,
705                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
706                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
707   strata_estimator_write (eo->set_state->se, &strata_msg[1]);
708   GNUNET_MQ_send (eo->mq, ev);
709   eo->phase = PHASE_EXPECT_IBF;
710   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
711 }
712
713
714 /**
715  * Compute the necessary order of an ibf
716  * from the size of the symmetric set difference.
717  *
718  * @param diff the difference
719  * @return the required size of the ibf
720  */
721 static unsigned int
722 get_order_from_difference (unsigned int diff)
723 {
724   unsigned int ibf_order;
725
726   ibf_order = 2;
727   while ((1<<ibf_order) < (2 * diff))
728     ibf_order++;
729   if (ibf_order > MAX_IBF_ORDER)
730     ibf_order = MAX_IBF_ORDER;
731   return ibf_order;
732 }
733
734
735 /**
736  * Handle a strata estimator from a remote peer
737  *
738  * @param cls the union operation
739  * @param mh the message
740  */
741 static void
742 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
743 {
744   struct OperationState *eo = cls;
745   struct StrataEstimator *remote_se;
746   int diff;
747
748
749   if (eo->phase != PHASE_EXPECT_SE)
750   {
751     fail_union_operation (eo);
752     GNUNET_break (0);
753     return;
754   }
755   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
756                                        SE_IBF_HASH_NUM);
757   strata_estimator_read (&mh[1], remote_se);
758   GNUNET_assert (NULL != eo->se);
759   diff = strata_estimator_difference (remote_se, eo->se);
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, diff=%d\n", diff);
761   strata_estimator_destroy (remote_se);
762   strata_estimator_destroy (eo->se);
763   eo->se = NULL;
764   send_ibf (eo, get_order_from_difference (diff));
765 }
766
767
768
769 /**
770  * Iterator to send elements to a remote peer
771  *
772  * @param cls closure with the element key and the union operation
773  * @param key ignored
774  * @param value the key entry
775  */
776 static int
777 send_element_iterator (void *cls,
778                        uint32_t key,
779                        void *value)
780 {
781   struct SendElementClosure *sec = cls;
782   struct IBF_Key ibf_key = sec->ibf_key;
783   struct OperationState *eo = sec->eo;
784   struct KeyEntry *ke = value;
785
786   if (ke->ibf_key.key_val != ibf_key.key_val)
787     return GNUNET_YES;
788   while (NULL != ke)
789   {
790     const struct GNUNET_SET_Element *const element = &ke->element->element;
791     struct GNUNET_MQ_Envelope *ev;
792     struct GNUNET_MessageHeader *mh;
793
794     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
795     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
796     if (NULL == ev)
797     {
798       /* element too large */
799       GNUNET_break (0);
800       continue;
801     }
802     memcpy (&mh[1], element->data, element->size);
803     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to client\n");
804     GNUNET_MQ_send (eo->mq, ev);
805     ke = ke->next_colliding;
806   }
807   return GNUNET_NO;
808 }
809
810 /**
811  * Send all elements that have the specified IBF key
812  * to the remote peer of the union operation
813  *
814  * @param eo union operation
815  * @param ibf_key IBF key of interest
816  */
817 static void
818 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
819 {
820   struct SendElementClosure send_cls;
821
822   send_cls.ibf_key = ibf_key;
823   send_cls.eo = eo;
824   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
825                                                 &send_element_iterator, &send_cls);
826 }
827
828
829 /**
830  * Decode which elements are missing on each side, and
831  * send the appropriate elemens and requests
832  *
833  * @param eo union operation
834  */
835 static void
836 decode_and_send (struct OperationState *eo)
837 {
838   struct IBF_Key key;
839   int side;
840   struct InvertibleBloomFilter *diff_ibf;
841
842   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
843
844   prepare_ibf (eo, eo->remote_ibf->size);
845   diff_ibf = ibf_dup (eo->local_ibf);
846   ibf_subtract (diff_ibf, eo->remote_ibf);
847   
848   ibf_destroy (eo->remote_ibf);
849   eo->remote_ibf = NULL;
850
851   while (1)
852   {
853     int res;
854
855     res = ibf_decode (diff_ibf, &side, &key);
856     if (GNUNET_SYSERR == res)
857     {
858       int next_order;
859       next_order = 0;
860       while (1<<next_order < diff_ibf->size)
861         next_order++;
862       next_order++;
863       if (next_order <= MAX_IBF_ORDER)
864       {
865         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
866                     "decoding failed, sending larger ibf (size %u)\n",
867                     1<<next_order);
868         send_ibf (eo, next_order);
869       }
870       else
871       {
872         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
873                     "set union failed: reached ibf limit\n");
874       }
875       break;
876     }
877     if (GNUNET_NO == res)
878     {
879       struct GNUNET_MQ_Envelope *ev;
880
881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
882       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
883       GNUNET_MQ_send (eo->mq, ev);
884       break;
885     }
886     if (1 == side)
887     {
888       send_elements_for_key (eo, key);
889     }
890     else
891     {
892       struct GNUNET_MQ_Envelope *ev;
893       struct GNUNET_MessageHeader *msg;
894
895       /* FIXME: before sending the request, check if we may just have the element */
896       /* FIXME: merge multiple requests */
897       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
898                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
899       *(struct IBF_Key *) &msg[1] = key;
900       GNUNET_MQ_send (eo->mq, ev);
901     }
902   }
903   ibf_destroy (diff_ibf);
904 }
905
906
907 /**
908  * Handle an IBF message from a remote peer.
909  *
910  * @param cls the union operation
911  * @param mh the header of the message
912  */
913 static void
914 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
915 {
916   struct OperationState *eo = cls;
917   struct IBFMessage *msg = (struct IBFMessage *) mh;
918   unsigned int buckets_in_message;
919
920   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
921        (eo->phase == PHASE_EXPECT_IBF) )
922   {
923     eo->phase = PHASE_EXPECT_IBF_CONT;
924     GNUNET_assert (NULL == eo->remote_ibf);
925     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
926     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
927     eo->ibf_buckets_received = 0;
928     if (0 != ntohs (msg->offset))
929     {
930       GNUNET_break (0);
931       fail_union_operation (eo);
932     }
933   }
934   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
935   {
936     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
937          (1<<msg->order != eo->remote_ibf->size) )
938     {
939       GNUNET_break (0);
940       fail_union_operation (eo);
941       return;
942     }
943   }
944
945   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
946
947   if (0 == buckets_in_message)
948   {
949     GNUNET_break_op (0);
950     fail_union_operation (eo);
951     return;
952   }
953
954   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
955   {
956     GNUNET_break (0);
957     fail_union_operation (eo);
958     return;
959   }
960   
961   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
962   eo->ibf_buckets_received += buckets_in_message;
963
964   if (eo->ibf_buckets_received == eo->remote_ibf->size)
965   {
966
967     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
968     eo->phase = PHASE_EXPECT_ELEMENTS;
969     decode_and_send (eo);
970   }
971 }
972
973
974 /**
975  * Send a result message to the client indicating
976  * that there is a new element.
977  *
978  * @param eo union operation
979  * @param element element to send
980  */
981 static void
982 send_client_element (struct OperationState *eo,
983                      struct GNUNET_SET_Element *element)
984 {
985   struct GNUNET_MQ_Envelope *ev;
986   struct GNUNET_SET_ResultMessage *rm;
987
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size);
989   GNUNET_assert (0 != eo->spec->client_request_id);
990   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
991   if (NULL == ev)
992   {
993     GNUNET_MQ_discard (ev);
994     GNUNET_break (0);
995     return;
996   }
997   rm->result_status = htons (GNUNET_SET_STATUS_OK);
998   rm->request_id = htonl (eo->spec->client_request_id);
999   rm->element_type = element->type;
1000   memcpy (&rm[1], element->data, element->size);
1001   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1002 }
1003
1004
1005 /**
1006  * Send a result message to the client indicating
1007  * that the operation is over.
1008  * After the result done message has been sent to the client,
1009  * destroy the evaluate operation.
1010  *
1011  * @param eo union operation
1012  */
1013 static void
1014 send_client_done_and_destroy (struct OperationState *eo)
1015 {
1016   struct GNUNET_MQ_Envelope *ev;
1017   struct GNUNET_SET_ResultMessage *rm;
1018
1019   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1020   rm->request_id = htonl (eo->spec->client_request_id);
1021   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1022   rm->element_type = htons (0);
1023   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1024
1025 }
1026
1027
1028 /**
1029  * Handle an element message from a remote peer.
1030  *
1031  * @param cls the union operation
1032  * @param mh the message
1033  */
1034 static void
1035 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1036 {
1037   struct OperationState *eo = cls;
1038   struct ElementEntry *ee;
1039   uint16_t element_size;
1040
1041   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1042
1043   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1044        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1045   {
1046     fail_union_operation (eo);
1047     GNUNET_break (0);
1048     return;
1049   }
1050   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1051   ee = GNUNET_malloc (sizeof *eo + element_size);
1052   memcpy (&ee[1], &mh[1], element_size);
1053   ee->element.size = element_size;
1054   ee->element.data = &ee[1];
1055   ee->remote = GNUNET_YES;
1056
1057   insert_element (eo, ee);
1058   send_client_element (eo, &ee->element);
1059 }
1060
1061
1062 /**
1063  * Handle an element request from a remote peer.
1064  *
1065  * @param cls the union operation
1066  * @param mh the message
1067  */
1068 static void
1069 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1070 {
1071   struct OperationState *eo = cls;
1072   struct IBF_Key *ibf_key;
1073   unsigned int num_keys;
1074
1075   /* look up elements and send them */
1076   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1077   {
1078     GNUNET_break (0);
1079     fail_union_operation (eo);
1080     return;
1081   }
1082
1083   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1084
1085   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1086   {
1087     GNUNET_break (0);
1088     fail_union_operation (eo);
1089     return;
1090   }
1091
1092   ibf_key = (struct IBF_Key *) &mh[1];
1093   while (0 != num_keys--)
1094   {
1095     send_elements_for_key (eo, *ibf_key);
1096     ibf_key++;
1097   }
1098 }
1099
1100
1101 /**
1102  * Callback used for notifications
1103  *
1104  * @param cls closure
1105  */
1106 static void
1107 peer_done_sent_cb (void *cls)
1108 {
1109   struct OperationState *eo = cls;
1110
1111   send_client_done_and_destroy (eo);
1112 }
1113
1114
1115 /**
1116  * Handle a done message from a remote peer
1117  * 
1118  * @param cls the union operation
1119  * @param mh the message
1120  */
1121 static void
1122 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1123 {
1124   struct OperationState *eo = cls;
1125
1126   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1127   {
1128     /* we got all requests, but still have to send our elements as response */
1129     struct GNUNET_MQ_Envelope *ev;
1130
1131     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1132     eo->phase = PHASE_FINISHED;
1133     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1134     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1135     GNUNET_MQ_send (eo->mq, ev);
1136     return;
1137   }
1138   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1139   {
1140     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1141     eo->phase = PHASE_FINISHED;
1142     send_client_done_and_destroy (eo);
1143     return;
1144   }
1145   GNUNET_break (0);
1146   fail_union_operation (eo);
1147 }
1148
1149
1150 /**
1151  * Evaluate a union operation with
1152  * a remote peer.
1153  *
1154  * @param spec specification of the operation the evaluate
1155  * @param tunnel tunnel already connected to the partner peer
1156  * @param tc tunnel context, passed here so all new incoming
1157  *        messages are directly going to the union operations
1158  * @return a handle to the operation
1159  */
1160 static void
1161 union_evaluate (struct OperationSpecification *spec,
1162                 struct GNUNET_MESH_Tunnel *tunnel,
1163                 struct TunnelContext *tc)
1164 {
1165   struct OperationState *eo;
1166
1167   eo = GNUNET_new (struct OperationState);
1168   tc->vt = _GSS_union_vt ();
1169   tc->op = eo;
1170   eo->se = strata_estimator_dup (spec->set->state->se);
1171   eo->set_state = spec->set->state;
1172   eo->spec = spec;
1173   eo->tunnel = tunnel;
1174   eo->mq = GNUNET_MESH_mq_create (tunnel);
1175
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1177               "evaluating union operation, (app %s)\n", 
1178               GNUNET_h2s (&eo->spec->app_id));
1179
1180   /* we started the operation, thus we have to send the operation request */
1181   eo->phase = PHASE_EXPECT_SE;
1182
1183   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1184                                eo->set_state->ops_tail,
1185                                eo);
1186
1187   send_operation_request (eo);
1188 }
1189
1190
1191 /**
1192  * Accept an union operation request from a remote peer
1193  *
1194  * @param spec all necessary information about the operation
1195  * @param tunnel open tunnel to the partner's peer
1196  * @param tc tunnel context, passed here so all new incoming
1197  *        messages are directly going to the union operations
1198  * @return operation
1199  */
1200 static void
1201 union_accept (struct OperationSpecification *spec,
1202               struct GNUNET_MESH_Tunnel *tunnel,
1203               struct TunnelContext *tc)
1204 {
1205   struct OperationState *eo;
1206
1207   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1208
1209   eo = GNUNET_new (struct OperationState);
1210   tc->vt = _GSS_union_vt ();
1211   tc->op = eo;
1212   eo->set_state = spec->set->state;
1213   eo->generation_created = eo->set_state->current_generation++;
1214   eo->spec = spec;
1215   eo->tunnel = tunnel;
1216   eo->mq = GNUNET_MESH_mq_create (tunnel);
1217   eo->se = strata_estimator_dup (eo->set_state->se);
1218   /* transfer ownership of mq and socket from incoming to eo */
1219   GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
1220                                eo->set_state->ops_tail,
1221                                eo);
1222   /* kick off the operation */
1223   send_strata_estimator (eo);
1224 }
1225
1226
1227 /**
1228  * Create a new set supporting the union operation
1229  *
1230  * @return the newly created set
1231  */
1232 static struct SetState *
1233 union_set_create (void)
1234 {
1235   struct SetState *set_state;
1236
1237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1238   
1239   set_state = GNUNET_new (struct SetState);
1240   /* keys of the hash map are stored in the element entrys, thus we do not
1241    * want the hash map to copy them */
1242   set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1243   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1244                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1245   return set_state;
1246 }
1247
1248
1249 /**
1250  * Add the element from the given element message to the set.
1251  *
1252  * @param set_state state of the set want to add to
1253  * @param element the element to add to the set
1254  */
1255 static void
1256 union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1257 {
1258   struct ElementEntry *ee;
1259   struct ElementEntry *ee_dup;
1260
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
1262
1263   ee = GNUNET_malloc (element->size + sizeof *ee);
1264   ee->element.size = element->size;
1265   memcpy (&ee[1], element->data, element->size);
1266   ee->element.data = &ee[1];
1267   ee->generation_added = set_state->current_generation;
1268   ee->remote = GNUNET_NO;
1269   GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
1270   ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
1271                                               &ee->element_hash);
1272   if (NULL != ee_dup)
1273   {
1274     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1275     GNUNET_free (ee);
1276     return;
1277   }
1278   GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
1279                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1280   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1281 }
1282
1283
1284 /**
1285  * Destroy a set that supports the union operation
1286  *
1287  * @param set_state the set to destroy
1288  */
1289 static void
1290 union_set_destroy (struct SetState *set_state)
1291 {
1292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1293   /* important to destroy operations before the rest of the set */
1294   while (NULL != set_state->ops_head)
1295     union_operation_destroy (set_state->ops_head);
1296   if (NULL != set_state->se)
1297   {
1298     strata_estimator_destroy (set_state->se);
1299     set_state->se = NULL;
1300   }
1301   if (NULL != set_state->elements)
1302   {
1303     GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
1304                                            destroy_elements_iterator, NULL);
1305     GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
1306     set_state->elements = NULL;
1307   }
1308
1309   GNUNET_free (set_state);
1310 }
1311
1312 /**
1313  * Remove the element given in the element message from the set.
1314  * Only marks the element as removed, so that older set operations can still exchange it.
1315  *
1316  * @param set_state state of the set to remove from
1317  * @param element set element to remove
1318  */
1319 static void
1320 union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
1321 {
1322   struct GNUNET_HashCode hash;
1323   struct ElementEntry *ee;
1324
1325   GNUNET_CRYPTO_hash (element->data, element->size, &hash);
1326   ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
1327   if (NULL == ee)
1328   {
1329     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1330     return;
1331   }
1332   if (GNUNET_YES == ee->removed)
1333   {
1334     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1335     return;
1336   }
1337   ee->removed = GNUNET_YES;
1338   ee->generation_removed = set_state->current_generation;
1339 }
1340
1341
1342 /**
1343  * Dispatch messages for a union operation.
1344  *
1345  * @param eo the state of the union evaluate operation
1346  * @param mh the received message
1347  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1348  *         GNUNET_OK otherwise
1349  */
1350 int
1351 union_handle_p2p_message (struct OperationState *eo,
1352                           const struct GNUNET_MessageHeader *mh)
1353 {
1354   switch (ntohs (mh->type))
1355   {
1356     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1357       handle_p2p_ibf (eo, mh);
1358       break;
1359     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1360       handle_p2p_strata_estimator (eo, mh);
1361       break;
1362     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1363       handle_p2p_elements (eo, mh);
1364       break;
1365     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1366       handle_p2p_element_requests (eo, mh);
1367       break;
1368     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1369       handle_p2p_done (eo, mh);
1370       break;
1371     default:
1372       /* something wrong with mesh's message handlers? */
1373       GNUNET_assert (0);
1374   }
1375   return GNUNET_OK;
1376 }
1377
1378
1379 static void
1380 union_peer_disconnect (struct OperationState *op)
1381 {
1382   /* Are we already disconnected? */
1383   if (NULL == op->tunnel)
1384     return;
1385   op->tunnel = NULL;
1386   if (NULL != op->mq)
1387   {
1388     GNUNET_MQ_destroy (op->mq);
1389     op->mq = NULL;
1390   }
1391   if (PHASE_FINISHED != op->phase)
1392   {
1393     struct GNUNET_MQ_Envelope *ev;
1394     struct GNUNET_SET_ResultMessage *msg;
1395     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1396     msg->request_id = htonl (op->spec->client_request_id);
1397     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1398     msg->element_type = htons (0);
1399     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1400     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1401   }
1402   else
1403   {
1404     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1405   }
1406   union_operation_destroy (op);
1407 }
1408
1409
1410 static void
1411 union_op_cancel (struct SetState *set_state, uint32_t op_id)
1412 {
1413   /* FIXME: implement */
1414 }
1415
1416
1417 const struct SetVT *
1418 _GSS_union_vt ()
1419 {
1420   static const struct SetVT union_vt = {
1421     .create = &union_set_create,
1422     .msg_handler = &union_handle_p2p_message,
1423     .add = &union_add,
1424     .remove = &union_remove,
1425     .destroy_set = &union_set_destroy,
1426     .evaluate = &union_evaluate,
1427     .accept = &union_accept,
1428     .peer_disconnect = &union_peer_disconnect,
1429     .cancel = &union_op_cancel
1430   };
1431
1432   return &union_vt;
1433 }