changes
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Request ID to multiplex set operations to
139    * the client inhabiting the set.
140    */
141   uint32_t request_id;
142
143   /**
144    * Number of ibf buckets received
145    */
146   unsigned int ibf_buckets_received;
147
148   /**
149    * Copy of the set's strata estimator at the time of
150    * creation of this operation
151    */
152   struct StrataEstimator *se;
153
154   /**
155    * The ibf we currently receive
156    */
157   struct InvertibleBloomFilter *remote_ibf;
158
159   /**
160    * IBF of the set's element.
161    */
162   struct InvertibleBloomFilter *local_ibf;
163
164   /**
165    * Maps IBF-Keys (specific to the current salt) to elements.
166    */
167   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168
169   /**
170    * Current state of the operation.
171    */
172   enum UnionOperationPhase phase;
173
174   /**
175    * Salt to use for this operation.
176    */
177   uint16_t salt;
178
179   /**
180    * Generation in which the operation handle
181    * was created.
182    */
183   unsigned int generation_created;
184   
185   /**
186    * Evaluate operations are held in
187    * a linked list.
188    */
189   struct UnionEvaluateOperation *next;
190   
191    /**
192    * Evaluate operations are held in
193    * a linked list.
194    */
195   struct UnionEvaluateOperation *prev;
196 };
197
198
199 /**
200  * Information about the element in a set.
201  * All elements are stored in a hash-table
202  * from their hash-code to their 'struct Element',
203  * so that the remove and add operations are reasonably
204  * fast.
205  */
206 struct ElementEntry
207 {
208   /**
209    * The actual element. The data for the element
210    * should be allocated at the end of this struct.
211    */
212   struct GNUNET_SET_Element element;
213
214   /**
215    * Hash of the element.
216    * Will be used to derive the different IBF keys
217    * for different salts.
218    */
219   struct GNUNET_HashCode element_hash;
220
221   /**
222    * Generation the element was added by the client.
223    * Operations of earlier generations will not consider the element.
224    */
225   unsigned int generation_added;
226
227   /**
228    * GNUNET_YES if the element has been removed in some generation.
229    */
230   int removed;
231
232   /**
233    * Generation the element was removed by the client. 
234    * Operations of later generations will not consider the element.
235    * Only valid if is_removed is GNUNET_YES.
236    */
237   unsigned int generation_removed;
238
239   /**
240    * GNUNET_YES if the element is a remote element, and does not belong
241    * to the operation's set.
242    */
243   int remote;
244 };
245
246
247 /**
248  * Entries in the key-to-element map of the union set.
249  */
250 struct KeyEntry
251 {
252   /**
253    * IBF key for the entry, derived from the current salt.
254    */
255   struct IBF_Key ibf_key;
256
257   /**
258    * The actual element associated with the key
259    */
260   struct ElementEntry *element;
261
262   /**
263    * Element that collides with this element
264    * on the ibf key
265    */
266   struct KeyEntry *next_colliding;
267 };
268
269 /**
270  * Used as a closure for sending elements
271  * with a specific IBF key.
272  */
273 struct SendElementClosure
274 {
275   /**
276    * The IBF key whose matching elements should be
277    * sent.
278    */
279   struct IBF_Key ibf_key;
280
281   /**
282    * Operation for which the elements
283    * should be sent.
284    */
285   struct UnionEvaluateOperation *eo;
286 };
287
288
289 /**
290  * Extra state required for efficient set union.
291  */
292 struct UnionState
293 {
294   /**
295    * The strata estimator is only generated once for
296    * each set.
297    * The IBF keys are derived from the element hashes with
298    * salt=0.
299    */
300   struct StrataEstimator *se;
301
302   /**
303    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
304    */
305   struct GNUNET_CONTAINER_MultiHashMap *elements;
306
307   /**
308    * Evaluate operations are held in
309    * a linked list.
310    */
311   struct UnionEvaluateOperation *ops_head;
312
313   /**
314    * Evaluate operations are held in
315    * a linked list.
316    */
317   struct UnionEvaluateOperation *ops_tail;
318
319   /**
320    * Current generation, that is, number of
321    * previously executed operations on this set
322    */
323   unsigned int current_generation;
324 };
325
326
327
328 /**
329  * Iterator over hash map entries.
330  *
331  * @param cls closure
332  * @param key current key code
333  * @param value value in the hash map
334  * @return GNUNET_YES if we should continue to
335  *         iterate,
336  *         GNUNET_NO if not.
337  */
338 static int
339 destroy_elements_iterator (void *cls,
340                            const struct GNUNET_HashCode * key,
341                            void *value)
342 {
343   struct ElementEntry *ee = value;
344
345   GNUNET_free (ee);
346   return GNUNET_YES;
347 }
348
349
350 /**
351  * Destroy the elements belonging to a union set.
352  *
353  * @param us union state that contains the elements
354  */
355 static void
356 destroy_elements (struct UnionState *us)
357 {
358   if (NULL == us->elements)
359     return;
360   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
361   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
362   us->elements = NULL;
363 }
364
365
366
367 /**
368  * Iterator over hash map entries.
369  *
370  * @param cls closure
371  * @param key current key code
372  * @param value value in the hash map
373  * @return GNUNET_YES if we should continue to
374  *         iterate,
375  *         GNUNET_NO if not.
376  */
377 static int
378 destroy_key_to_element_iter (void *cls,
379                              uint32_t key,
380                              void *value)
381 {
382   struct KeyEntry *k = value;
383   
384   while (NULL != k)
385   {
386     struct KeyEntry *k_tmp = k;
387     k = k->next_colliding;
388     GNUNET_free (k_tmp);
389   }
390   return GNUNET_YES;
391 }
392
393
394 /**
395  * Destroy a union operation, and free all resources
396  * associated with it.
397  *
398  * @param eo the union operation to destroy
399  */
400 static void
401 destroy_union_operation (struct UnionEvaluateOperation *eo)
402 {
403   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
404   
405   if (NULL != eo->mq)
406   {
407     GNUNET_MQ_destroy (eo->mq);
408     eo->mq = NULL;
409   }
410
411   if (NULL != eo->socket)
412   {
413     GNUNET_STREAM_close (eo->socket);
414     eo->socket = NULL;
415   }
416   if (NULL != eo->remote_ibf)
417   {
418     ibf_destroy (eo->remote_ibf);
419     eo->remote_ibf = NULL;
420   }
421   if (NULL != eo->local_ibf)
422   {
423     ibf_destroy (eo->local_ibf);
424     eo->local_ibf = NULL;
425   }
426   if (NULL != eo->se)
427   {
428     strata_estimator_destroy (eo->se);
429     eo->se = NULL;
430   }
431   if (NULL != eo->key_to_element)
432   {
433     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
434     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
435     eo->key_to_element = NULL;
436   }
437
438   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
439                                eo->set->state.u->ops_tail,
440                                eo);
441   GNUNET_free (eo);
442
443
444   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
445
446
447   /* FIXME: do a garbage collection of the set generations */
448 }
449
450
451 /**
452  * Inform the client that the union operation has failed,
453  * and proceed to destroy the evaluate operation.
454  *
455  * @param eo the union operation to fail
456  */
457 static void
458 fail_union_operation (struct UnionEvaluateOperation *eo)
459 {
460   struct GNUNET_MQ_Message *mqm;
461   struct GNUNET_SET_ResultMessage *msg;
462
463   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
464   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
465   msg->request_id = htonl (eo->request_id);
466   GNUNET_MQ_send (eo->set->client_mq, mqm);
467   destroy_union_operation (eo);
468 }
469
470
471 /**
472  * Derive the IBF key from a hash code and 
473  * a salt.
474  *
475  * @param src the hash code
476  * @param salt salt to use
477  * @return the derived IBF key
478  */
479 static struct IBF_Key
480 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
481 {
482   struct IBF_Key key;
483
484   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
485                       GCRY_MD_SHA512, GCRY_MD_SHA256,
486                       src, sizeof *src,
487                       &salt, sizeof (salt),
488                       NULL, 0);
489   return key;
490 }
491
492
493 /**
494  * Send a request for the evaluate operation to a remote peer
495  *
496  * @param eo operation with the other peer
497  */
498 static void
499 send_operation_request (struct UnionEvaluateOperation *eo)
500 {
501   struct GNUNET_MQ_Message *mqm;
502   struct OperationRequestMessage *msg;
503
504   mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
505
506   if (NULL == mqm)
507   {
508     /* the context message is too large */
509     GNUNET_break (0);
510     GNUNET_SERVER_client_disconnect (eo->set->client);
511     return;
512   }
513   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
514   msg->app_id = eo->app_id;
515   GNUNET_MQ_send (eo->mq, mqm);
516
517   if (NULL != eo->context_msg)
518   {
519     GNUNET_free (eo->context_msg);
520     eo->context_msg = NULL;
521   }
522
523   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
524 }
525
526
527 /**
528  * Iterator to create the mapping between ibf keys
529  * and element entries.
530  *
531  * @param cls closure
532  * @param key current key code
533  * @param value value in the hash map
534  * @return GNUNET_YES if we should continue to
535  *         iterate,
536  *         GNUNET_NO if not.
537  */
538 static int
539 insert_element_iterator (void *cls,
540                          uint32_t key,
541                          void *value)
542 {
543   struct KeyEntry *const new_k = cls;
544   struct KeyEntry *old_k = value;
545
546   GNUNET_assert (NULL != old_k);
547   do
548   {
549     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
550     {
551       new_k->next_colliding = old_k->next_colliding;
552       old_k->next_colliding = new_k;
553       return GNUNET_NO;
554     }
555     old_k = old_k->next_colliding;
556   } while (NULL != old_k);
557   return GNUNET_YES;
558 }
559
560
561 /**
562  * Insert an element into the union operation's
563  * key-to-element mapping
564  *
565  * @param the union operation
566  * @param ee the element entry
567  */
568 static void
569 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
570 {
571   int ret;
572   struct IBF_Key ibf_key;
573   struct KeyEntry *k;
574
575   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
576   k = GNUNET_new (struct KeyEntry);
577   k->element = ee;
578   k->ibf_key = ibf_key;
579   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
580                                                       (uint32_t) ibf_key.key_val,
581                                                       insert_element_iterator, k);
582
583   /* was the element inserted into a colliding bucket? */
584   if (GNUNET_SYSERR == ret)
585     return;
586
587   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
588                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
589 }
590
591
592 /**
593  * Insert a key into an ibf.
594  *
595  * @param cls the ibf
596  * @param key unused
597  * @param value the key entry to get the key from
598  */
599 static int
600 prepare_ibf_iterator (void *cls,
601                       uint32_t key,
602                       void *value)
603 {
604   struct InvertibleBloomFilter *ibf = cls;
605   struct KeyEntry *ke = value;
606
607   ibf_insert (ibf, ke->ibf_key);
608   return GNUNET_YES;
609 }
610
611
612 /**
613  * Iterator for initializing the
614  * key-to-element mapping of a union operation
615  *
616  * @param cls the union operation
617  * @param key unised
618  * @param value the element entry to insert
619  *        into the key-to-element mapping
620  */
621 static int
622 init_key_to_element_iterator (void *cls,
623                               const struct GNUNET_HashCode *key,
624                               void *value)
625 {
626   struct UnionEvaluateOperation *eo = cls;
627   struct ElementEntry *e = value;
628
629   /* make sure that the element belongs to the set at the time
630    * of creating the operation */
631   if ( (e->generation_added > eo->generation_created) ||
632        ( (GNUNET_YES == e->removed) &&
633          (e->generation_removed < eo->generation_created)))
634     return GNUNET_YES;
635
636   insert_element (eo, e);
637   return GNUNET_YES;
638 }
639
640
641 /**
642  * Create an ibf with the operation's elements
643  * of the specified size
644  *
645  * @param eo the union operation
646  * @param size size of the ibf to create
647  */
648 static void
649 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
650 {
651   if (NULL == eo->key_to_element)
652   {
653     unsigned int len;
654     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
655     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
656     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
657                                              init_key_to_element_iterator, eo);
658   }
659   if (NULL != eo->local_ibf)
660     ibf_destroy (eo->local_ibf);
661   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
662   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
663                                            prepare_ibf_iterator, eo->local_ibf);
664 }
665
666
667 /**
668  * Send an ibf of appropriate size.
669  *
670  * @param eo the union operation
671  * @param ibf_order order of the ibf to send, size=2^order
672  */
673 static void
674 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
675 {
676   unsigned int buckets_sent = 0;
677   struct InvertibleBloomFilter *ibf;
678
679   prepare_ibf (eo, 1<<ibf_order);
680
681   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
682
683   ibf = eo->local_ibf;
684
685   while (buckets_sent < (1 << ibf_order))
686   {
687     unsigned int buckets_in_message;
688     struct GNUNET_MQ_Message *mqm;
689     struct IBFMessage *msg;
690
691     buckets_in_message = (1 << ibf_order) - buckets_sent;
692     /* limit to maximum */
693     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
694       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
695
696     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
697                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
698     msg->order = ibf_order;
699     msg->offset = htons (buckets_sent);
700     ibf_write_slice (ibf, buckets_sent,
701                      buckets_in_message, &msg[1]);
702     buckets_sent += buckets_in_message;
703     GNUNET_MQ_send (eo->mq, mqm);
704   }
705
706   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
707 }
708
709
710 /**
711  * Send a strata estimator to the remote peer.
712  *
713  * @param eo the union operation with the remote peer
714  */
715 static void
716 send_strata_estimator (struct UnionEvaluateOperation *eo)
717 {
718   struct GNUNET_MQ_Message *mqm;
719   struct GNUNET_MessageHeader *strata_msg;
720
721   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
722                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
723                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
724   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
725   GNUNET_MQ_send (eo->mq, mqm);
726   eo->phase = PHASE_EXPECT_IBF;
727 }
728
729
730 /**
731  * Compute the necessary order of an ibf
732  * from the size of the symmetric set difference.
733  *
734  * @param diff the difference
735  * @return the required size of the ibf
736  */
737 static unsigned int
738 get_order_from_difference (unsigned int diff)
739 {
740   unsigned int ibf_order;
741
742   ibf_order = 2;
743   while ((1<<ibf_order) < (2 * diff))
744     ibf_order++;
745   if (ibf_order > MAX_IBF_ORDER)
746     ibf_order = MAX_IBF_ORDER;
747   return ibf_order;
748 }
749
750
751 /**
752  * Handle a strata estimator from a remote peer
753  *
754  * @param the union operation
755  * @param mh the message
756  */
757 static void
758 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
759 {
760   struct UnionEvaluateOperation *eo = cls;
761   struct StrataEstimator *remote_se;
762   int diff;
763
764
765   if (eo->phase != PHASE_EXPECT_SE)
766   {
767     fail_union_operation (eo);
768     GNUNET_break (0);
769     return;
770   }
771   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
772                                        SE_IBF_HASH_NUM);
773   strata_estimator_read (&mh[1], remote_se);
774   GNUNET_assert (NULL != eo->se);
775   diff = strata_estimator_difference (remote_se, eo->se);
776   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
777   strata_estimator_destroy (remote_se);
778   strata_estimator_destroy (eo->se);
779   eo->se = NULL;
780   send_ibf (eo, get_order_from_difference (diff));
781 }
782
783
784
785 /**
786  * Iterator to send elements to a remote peer
787  *
788  * @param cls closure with the element key and the union operation
789  * @param key ignored
790  * @param value the key entry
791  */
792 static int
793 send_element_iterator (void *cls,
794                        uint32_t key,
795                        void *value)
796 {
797   struct SendElementClosure *sec = cls;
798   struct IBF_Key ibf_key = sec->ibf_key;
799   struct UnionEvaluateOperation *eo = sec->eo;
800   struct KeyEntry *ke = value;
801
802   if (ke->ibf_key.key_val != ibf_key.key_val)
803     return GNUNET_YES;
804   while (NULL != ke)
805   {
806     const struct GNUNET_SET_Element *const element = &ke->element->element;
807     struct GNUNET_MQ_Message *mqm;
808     struct GNUNET_MessageHeader *mh;
809
810     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
811     mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
812     if (NULL == mqm)
813     {
814       /* element too large */
815       GNUNET_break (0);
816       continue;
817     }
818     memcpy (&mh[1], element->data, element->size);
819     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
820     GNUNET_MQ_send (eo->mq, mqm);
821     ke = ke->next_colliding;
822   }
823   return GNUNET_NO;
824 }
825
826 /**
827  * Send all elements that have the specified IBF key
828  * to the remote peer of the union operation
829  *
830  * @param eo union operation
831  * @param ibf_key IBF key of interest
832  */
833 static void
834 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
835 {
836   struct SendElementClosure send_cls;
837
838   send_cls.ibf_key = ibf_key;
839   send_cls.eo = eo;
840   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
841                                                 &send_element_iterator, &send_cls);
842 }
843
844
845 /**
846  * Decode which elements are missing on each side, and
847  * send the appropriate elemens and requests
848  *
849  * @param eo union operation
850  */
851 static void
852 decode_and_send (struct UnionEvaluateOperation *eo)
853 {
854   struct IBF_Key key;
855   int side;
856   struct InvertibleBloomFilter *diff_ibf;
857
858   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
859
860   prepare_ibf (eo, eo->remote_ibf->size);
861   diff_ibf = ibf_dup (eo->local_ibf);
862   ibf_subtract (diff_ibf, eo->remote_ibf);
863
864   while (1)
865   {
866     int res;
867
868     res = ibf_decode (diff_ibf, &side, &key);
869     if (GNUNET_SYSERR == res)
870     {
871       int next_order;
872       next_order = 0;
873       while (1<<next_order < diff_ibf->size)
874         next_order++;
875       next_order++;
876       if (next_order <= MAX_IBF_ORDER)
877       {
878         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
879                     "decoding failed, sending larger ibf (size %u)\n",
880                     1<<next_order);
881         send_ibf (eo, next_order);
882       }
883       else
884       {
885         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
886                     "set union failed: reached ibf limit\n");
887       }
888       break;
889     }
890     if (GNUNET_NO == res)
891     {
892       struct GNUNET_MQ_Message *mqm;
893
894       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
895       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
896       GNUNET_MQ_send (eo->mq, mqm);
897       break;
898     }
899     if (1 == side)
900     {
901       send_elements_for_key (eo, key);
902     }
903     else
904     {
905       struct GNUNET_MQ_Message *mqm;
906       struct GNUNET_MessageHeader *msg;
907
908       /* FIXME: before sending the request, check if we may just have the element */
909       /* FIXME: merge multiple requests */
910       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
911                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
912       *(struct IBF_Key *) &msg[1] = key;
913       GNUNET_MQ_send (eo->mq, mqm);
914     }
915   }
916   ibf_destroy (diff_ibf);
917 }
918
919
920 /**
921  * Handle an IBF message from a remote peer.
922  *
923  * @param cls the union operation
924  * @param mh the header of the message
925  */
926 static void
927 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
928 {
929   struct UnionEvaluateOperation *eo = cls;
930   struct IBFMessage *msg = (struct IBFMessage *) mh;
931   unsigned int buckets_in_message;
932
933   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
934        (eo->phase == PHASE_EXPECT_IBF) )
935   {
936     eo->phase = PHASE_EXPECT_IBF_CONT;
937     GNUNET_assert (NULL == eo->remote_ibf);
938     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
939     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
940     if (0 != ntohs (msg->offset))
941     {
942       GNUNET_break (0);
943       fail_union_operation (eo);
944     }
945   }
946   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
947   {
948     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
949          (1<<msg->order != eo->remote_ibf->size) )
950     {
951       GNUNET_break (0);
952       fail_union_operation (eo);
953       return;
954     }
955   }
956
957   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
958
959   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
960   {
961     GNUNET_break (0);
962     fail_union_operation (eo);
963     return;
964   }
965   
966   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
967   eo->ibf_buckets_received += buckets_in_message;
968
969   if (eo->ibf_buckets_received == eo->remote_ibf->size)
970   {
971
972     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
973     eo->phase = PHASE_EXPECT_ELEMENTS;
974     decode_and_send (eo);
975   }
976 }
977
978
979 /**
980  * Send a result message to the client indicating
981  * that there is a new element.
982  *
983  * @param eo union operation
984  * @param element element to send
985  */
986 static void
987 send_client_element (struct UnionEvaluateOperation *eo,
988                      struct GNUNET_SET_Element *element)
989 {
990   struct GNUNET_MQ_Message *mqm;
991   struct GNUNET_SET_ResultMessage *rm;
992
993   GNUNET_assert (0 != eo->request_id);
994   mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
995   if (NULL == mqm)
996   {
997     GNUNET_MQ_discard (mqm);
998     GNUNET_break (0);
999     return;
1000   }
1001   rm->result_status = htons (GNUNET_SET_STATUS_OK);
1002   rm->request_id = htonl (eo->request_id);
1003   memcpy (&rm[1], element->data, element->size);
1004   GNUNET_MQ_send (eo->set->client_mq, mqm);
1005 }
1006
1007
1008 /**
1009  * Completion callback for shutdown
1010  *
1011  * @param cls the closure from GNUNET_STREAM_shutdown call
1012  * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
1013  *          SHUT_RDWR) 
1014  */
1015 /*
1016 static void 
1017 stream_shutdown_cb (void *cls,
1018                     int operation)
1019 {
1020   //struct UnionEvaluateOperation *eo = cls;
1021
1022   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
1023
1024   // destroy_union_operation (eo);
1025 }
1026 */
1027
1028
1029 /**
1030  * Send a result message to the client indicating
1031  * that the operation is over.
1032  * After the result done message has been sent to the client,
1033  * destroy the evaluate operation.
1034  *
1035  * @param eo union operation
1036  * @param element element to send
1037  */
1038 static void
1039 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1040 {
1041   struct GNUNET_MQ_Message *mqm;
1042   struct GNUNET_SET_ResultMessage *rm;
1043
1044   GNUNET_assert (0 != eo->request_id);
1045   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1046   rm->request_id = htonl (eo->request_id);
1047   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1048   GNUNET_MQ_send (eo->set->client_mq, mqm);
1049
1050   // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
1051 }
1052
1053
1054 /**
1055  * Handle an element message from a remote peer.
1056  *
1057  * @param cls the union operation
1058  * @param mh the message
1059  */
1060 static void
1061 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1062 {
1063   struct UnionEvaluateOperation *eo = cls;
1064   struct ElementEntry *ee;
1065   uint16_t element_size;
1066
1067   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1068
1069   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1070        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1071   {
1072     fail_union_operation (eo);
1073     GNUNET_break (0);
1074     return;
1075   }
1076   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1077   ee = GNUNET_malloc (sizeof *eo + element_size);
1078   memcpy (&ee[1], &mh[1], element_size);
1079   ee->element.data = &ee[1];
1080   ee->remote = GNUNET_YES;
1081
1082   insert_element (eo, ee);
1083   send_client_element (eo, &ee->element);
1084
1085   GNUNET_free (ee);
1086 }
1087
1088
1089 /**
1090  * Handle an element request from a remote peer.
1091  *
1092  * @param cls the union operation
1093  * @param mh the message
1094  */
1095 static void
1096 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1097 {
1098   struct UnionEvaluateOperation *eo = cls;
1099   struct IBF_Key *ibf_key;
1100   unsigned int num_keys;
1101
1102   /* look up elements and send them */
1103   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1104   {
1105     GNUNET_break (0);
1106     fail_union_operation (eo);
1107     return;
1108   }
1109
1110   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1111
1112   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1113   {
1114     GNUNET_break (0);
1115     fail_union_operation (eo);
1116     return;
1117   }
1118
1119   ibf_key = (struct IBF_Key *) &mh[1];
1120   while (0 != num_keys--)
1121   {
1122     send_elements_for_key (eo, *ibf_key);
1123     ibf_key++;
1124   }
1125 }
1126
1127
1128 /**
1129  * Callback used for notifications
1130  *
1131  * @param cls closure
1132  */
1133 static void
1134 peer_done_sent_cb (void *cls)
1135 {
1136   struct UnionEvaluateOperation *eo = cls;
1137
1138   send_client_done_and_destroy (eo);
1139 }
1140
1141
1142 /**
1143  * Handle a done message from a remote peer
1144  * 
1145  * @param cls the union operation
1146  * @param mh the message
1147  */
1148 static void
1149 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1150 {
1151   struct UnionEvaluateOperation *eo = cls;
1152
1153   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1154   {
1155     /* we got all requests, but still have to send our elements as response */
1156     struct GNUNET_MQ_Message *mqm;
1157
1158     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1159     eo->phase = PHASE_FINISHED;
1160     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1161     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1162     GNUNET_MQ_send (eo->mq, mqm);
1163     return;
1164   }
1165   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1166   {
1167     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1168     eo->phase = PHASE_FINISHED;
1169     send_client_done_and_destroy (eo);
1170     return;
1171   }
1172   GNUNET_break (0);
1173   fail_union_operation (eo);
1174 }
1175
1176
1177 /**
1178  * The handlers array, used for both evaluate and accept
1179  */
1180 static const struct GNUNET_MQ_Handler union_handlers[] = {
1181   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
1182   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
1183   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
1184   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
1185   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
1186   GNUNET_MQ_HANDLERS_END
1187 };
1188
1189
1190 /**
1191  * Functions of this type will be called when a stream is established
1192  * 
1193  * @param cls the closure from GNUNET_STREAM_open
1194  * @param socket socket to use to communicate with the
1195  *        other side (read/write)
1196  */
1197 static void
1198 stream_open_cb (void *cls,
1199                 struct GNUNET_STREAM_Socket *socket)
1200 {
1201   struct UnionEvaluateOperation *eo = cls;
1202
1203   GNUNET_assert (NULL == eo->mq);
1204   GNUNET_assert (socket == eo->socket);
1205   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1206               "open cb successful\n");
1207   eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
1208   /* we started the operation, thus we have to send the operation request */
1209   send_operation_request (eo);
1210   eo->phase = PHASE_EXPECT_SE;
1211 }
1212
1213
1214 /**
1215  * Evaluate a union operation with
1216  * a remote peer.
1217  *
1218  * @param m the evaluate request message from the client
1219  * @parem set the set to evaluate the operation with
1220  */
1221 void
1222 _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1223 {
1224   struct UnionEvaluateOperation *eo;
1225   struct GNUNET_MessageHeader *context_msg;
1226
1227   eo = GNUNET_new (struct UnionEvaluateOperation);
1228   eo->peer = m->target_peer;
1229   eo->set = set;
1230   eo->request_id = htonl (m->request_id);
1231   GNUNET_assert (0 != eo->request_id);
1232   eo->se = strata_estimator_dup (set->state.u->se);
1233   eo->salt = ntohs (m->salt);
1234   eo->app_id = m->app_id;
1235   
1236   context_msg = GNUNET_MQ_extract_nested_mh (m);
1237   if (NULL != context_msg)
1238   {
1239     eo->context_msg = GNUNET_copy_message (context_msg);
1240   }
1241
1242   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1243               "evaluating union operation, (app %s)\n", 
1244               GNUNET_h2s (&eo->app_id));
1245
1246   eo->socket = 
1247       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1248                           &stream_open_cb, eo,
1249                           GNUNET_STREAM_OPTION_END);
1250   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1251                                eo->set->state.u->ops_tail,
1252                                eo);
1253   /* the stream open callback will kick off the operation */
1254 }
1255
1256
1257 /**
1258  * Accept an union operation request from a remote peer
1259  *
1260  * @param m the accept message from the client
1261  * @param set the set of the client
1262  * @param incoming information about the requesting remote peer
1263  */
1264 void
1265 _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1266                    struct Incoming *incoming)
1267 {
1268   struct UnionEvaluateOperation *eo;
1269
1270   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1271
1272   eo = GNUNET_new (struct UnionEvaluateOperation);
1273   eo->generation_created = set->state.u->current_generation++;
1274   eo->set = set;
1275   eo->peer = incoming->peer;
1276   eo->salt = ntohs (incoming->salt);
1277   GNUNET_assert (0 != ntohl (m->request_id));
1278   eo->request_id = ntohl (m->request_id);
1279   eo->se = strata_estimator_dup (set->state.u->se);
1280   eo->mq = incoming->mq;
1281   /* transfer ownership of mq and socket from incoming to eo */
1282   incoming->mq = NULL;
1283   eo->socket = incoming->socket;
1284   incoming->socket = NULL;
1285   /* the peer's socket is now ours, we'll receive all messages */
1286   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1287
1288   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1289                                eo->set->state.u->ops_tail,
1290                                eo);
1291
1292   /* kick off the operation */
1293   send_strata_estimator (eo);
1294 }
1295
1296
1297 /**
1298  * Create a new set supporting the union operation
1299  *
1300  * @return the newly created set
1301  */
1302 struct Set *
1303 _GSS_union_set_create (void)
1304 {
1305   struct Set *set;
1306
1307   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1308   
1309   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1310   set->state.u = (struct UnionState *) &set[1];
1311   set->operation = GNUNET_SET_OPERATION_UNION;
1312   /* keys of the hash map are stored in the element entrys, thus we do not
1313    * want the hash map to copy them */
1314   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1315   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1316                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1317   return set;
1318 }
1319
1320
1321 /**
1322  * Add the element from the given element message to the set.
1323  *
1324  * @param m message with the element
1325  * @param set set to add the element to
1326  */
1327 void
1328 _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1329 {
1330   struct ElementEntry *ee;
1331   struct ElementEntry *ee_dup;
1332   uint16_t element_size;
1333
1334   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1335
1336   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1337   element_size = ntohs (m->header.size) - sizeof *m;
1338   ee = GNUNET_malloc (element_size + sizeof *ee);
1339   ee->element.size = element_size;
1340   memcpy (&ee[1], &m[1], element_size);
1341   ee->element.data = &ee[1];
1342   ee->generation_added = set->state.u->current_generation;
1343   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1344   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1345   if (NULL != ee_dup)
1346   {
1347     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1348     GNUNET_free (ee);
1349     return;
1350   }
1351   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1352                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1353   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1354 }
1355
1356
1357 /**
1358  * Destroy a set that supports the union operation
1359  *
1360  * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1361  */
1362 void
1363 _GSS_union_set_destroy (struct Set *set)
1364 {
1365   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1366   if (NULL != set->client)
1367   {
1368     GNUNET_SERVER_client_drop (set->client);
1369     set->client = NULL;
1370   }
1371   if (NULL != set->client_mq)
1372   {
1373     GNUNET_MQ_destroy (set->client_mq);
1374     set->client_mq = NULL;
1375   }
1376
1377   if (NULL != set->state.u->se)
1378   {
1379     strata_estimator_destroy (set->state.u->se);
1380     set->state.u->se = NULL;
1381   }
1382
1383   destroy_elements (set->state.u);
1384
1385   while (NULL != set->state.u->ops_head)
1386   {
1387     destroy_union_operation (set->state.u->ops_head);
1388   }
1389 }
1390
1391 /**
1392  * Remove the element given in the element message from the set.
1393  * Only marks the element as removed, so that older set operations can still exchange it.
1394  *
1395  * @param m message with the element
1396  * @param set set to remove the element from
1397  */
1398 void
1399 _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1400 {
1401   struct GNUNET_HashCode hash;
1402   struct ElementEntry *ee;
1403
1404   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1405   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1406   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1407   if (NULL == ee)
1408   {
1409     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1410     return;
1411   }
1412   if (GNUNET_YES == ee->removed)
1413   {
1414     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1415     return;
1416   }
1417   ee->removed = GNUNET_YES;
1418   ee->generation_removed = set->state.u->current_generation;
1419 }
1420