- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Type of this operation
139    */
140   enum GNUNET_SET_OperationType operation;
141
142   /**
143    * Request ID to multiplex set operations to
144    * the client inhabiting the set.
145    */
146   uint32_t request_id;
147
148   /**
149    * Number of ibf buckets received
150    */
151   unsigned int ibf_buckets_received;
152
153   /**
154    * Copy of the set's strata estimator at the time of
155    * creation of this operation
156    */
157   struct StrataEstimator *se;
158
159   /**
160    * The ibf we currently receive
161    */
162   struct InvertibleBloomFilter *remote_ibf;
163
164   /**
165    * IBF of the set's element.
166    */
167   struct InvertibleBloomFilter *local_ibf;
168
169   /**
170    * Maps IBF-Keys (specific to the current salt) to elements.
171    */
172   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
173
174   /**
175    * Current state of the operation.
176    */
177   enum UnionOperationPhase phase;
178
179   /**
180    * Salt to use for this operation.
181    */
182   uint16_t salt;
183
184   /**
185    * Generation in which the operation handle
186    * was created.
187    */
188   unsigned int generation_created;
189   
190   /**
191    * Evaluate operations are held in
192    * a linked list.
193    */
194   struct UnionEvaluateOperation *next;
195   
196    /**
197    * Evaluate operations are held in
198    * a linked list.
199    */
200   struct UnionEvaluateOperation *prev;
201 };
202
203
204 /**
205  * Information about the element in a set.
206  * All elements are stored in a hash-table
207  * from their hash-code to their 'struct Element',
208  * so that the remove and add operations are reasonably
209  * fast.
210  */
211 struct ElementEntry
212 {
213   /**
214    * The actual element. The data for the element
215    * should be allocated at the end of this struct.
216    */
217   struct GNUNET_SET_Element element;
218
219   /**
220    * Hash of the element.
221    * Will be used to derive the different IBF keys
222    * for different salts.
223    */
224   struct GNUNET_HashCode element_hash;
225
226   /**
227    * Generation the element was added by the client.
228    * Operations of earlier generations will not consider the element.
229    */
230   unsigned int generation_added;
231
232   /**
233    * GNUNET_YES if the element has been removed in some generation.
234    */
235   int removed;
236
237   /**
238    * Generation the element was removed by the client. 
239    * Operations of later generations will not consider the element.
240    * Only valid if is_removed is GNUNET_YES.
241    */
242   unsigned int generation_removed;
243
244   /**
245    * GNUNET_YES if the element is a remote element, and does not belong
246    * to the operation's set.
247    */
248   int remote;
249 };
250
251
252 /**
253  * Information about the element used for 
254  * a specific union operation.
255  */
256 struct KeyEntry
257 {
258   /**
259    * IBF key for the entry, derived from the current salt.
260    */
261   struct IBF_Key ibf_key;
262
263   /**
264    * The actual element associated with the key
265    */
266   struct ElementEntry *element;
267
268   /**
269    * Element that collides with this element
270    * on the ibf key
271    */
272   struct KeyEntry *next_colliding;
273 };
274
275 /**
276  * Used as a closure for sending elements
277  * with a specific IBF key.
278  */
279 struct SendElementClosure
280 {
281   /**
282    * The IBF key whose matching elements should be
283    * sent.
284    */
285   struct IBF_Key ibf_key;
286
287   /**
288    * Operation for which the elements
289    * should be sent.
290    */
291   struct UnionEvaluateOperation *eo;
292 };
293
294
295 /**
296  * Extra state required for efficient set union.
297  */
298 struct UnionState
299 {
300   /**
301    * The strata estimator is only generated once for
302    * each set.
303    * The IBF keys are derived from the element hashes with
304    * salt=0.
305    */
306   struct StrataEstimator *se;
307
308   /**
309    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
310    */
311   struct GNUNET_CONTAINER_MultiHashMap *elements;
312
313   /**
314    * Evaluate operations are held in
315    * a linked list.
316    */
317   struct UnionEvaluateOperation *ops_head;
318
319   /**
320    * Evaluate operations are held in
321    * a linked list.
322    */
323   struct UnionEvaluateOperation *ops_tail;
324
325   /**
326    * Current generation, that is, number of
327    * previously executed operations on this set
328    */
329   unsigned int current_generation;
330 };
331
332
333 /**
334  * Destroy a union operation, and free all resources
335  * associated with it.
336  *
337  * @param eo the union operation to destroy
338  */
339 static void
340 destroy_union_operation (struct UnionEvaluateOperation *eo)
341 {
342   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
343                                eo->set->state.u->ops_tail,
344                                eo);
345   GNUNET_free (eo);
346   /* FIXME: free and destroy everything else */
347 }
348
349
350 /**
351  * Inform the client that the union operation has failed,
352  * and proceed to destroy the evaluate operation.
353  *
354  * @param eo the union operation to fail
355  */
356 static void
357 fail_union_operation (struct UnionEvaluateOperation *eo)
358 {
359   struct GNUNET_MQ_Message *mqm;
360   struct ResultMessage *msg;
361
362   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
363   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
364   msg->request_id = eo->request_id;
365   GNUNET_MQ_send (eo->set->client_mq, mqm);
366   destroy_union_operation (eo);
367 }
368
369
370 /**
371  * Derive the IBF key from a hash code and 
372  * a salt.
373  *
374  * @param src the hash code
375  * @param salt salt to use
376  * @return the derived IBF key
377  */
378 static struct IBF_Key
379 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
380 {
381   struct IBF_Key key;
382
383   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
384                       GCRY_MD_SHA512, GCRY_MD_SHA256,
385                       src, sizeof *src,
386                       &salt, sizeof (salt),
387                       NULL, 0);
388   return key;
389 }
390
391
392 /**
393  * Send a request for the evaluate operation to a remote peer
394  *
395  * @param eo operation with the other peer
396  */
397 static void
398 send_operation_request (struct UnionEvaluateOperation *eo)
399 {
400   struct GNUNET_MQ_Message *mqm;
401   struct OperationRequestMessage *msg;
402
403   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
404   if (NULL != eo->context_msg)
405     if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
406     {
407       /* the context message is too large */
408       _GSS_client_disconnect (eo->set->client);
409       GNUNET_MQ_discard (mqm);
410       GNUNET_break (0);
411       return;
412     }
413   msg->operation = eo->operation;
414   msg->app_id = eo->app_id;
415   GNUNET_MQ_send (eo->mq, mqm);
416
417   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
418 }
419
420
421 /**
422  * Iterator to create the mapping between ibf keys
423  * and element entries.
424  *
425  * @param cls closure
426  * @param key current key code
427  * @param value value in the hash map
428  * @return GNUNET_YES if we should continue to
429  *         iterate,
430  *         GNUNET_NO if not.
431  */
432 static int
433 insert_element_iterator (void *cls,
434                          uint32_t key,
435                          void *value)
436 {
437   struct KeyEntry *const new_k = cls;
438   struct KeyEntry *old_k = value;
439
440   GNUNET_assert (NULL != old_k);
441   do
442   {
443     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
444     {
445       new_k->next_colliding = old_k;
446       old_k->next_colliding = new_k;
447       return GNUNET_NO;
448     }
449     old_k = old_k->next_colliding;
450   } while (NULL != old_k);
451   return GNUNET_YES;
452 }
453
454
455 /**
456  * Insert an element into the union operation's
457  * key-to-element mapping
458  *
459  * @param the union operation
460  * @param ee the element entry
461  */
462 static void
463 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
464 {
465   int ret;
466   struct IBF_Key ibf_key;
467   struct KeyEntry *k;
468
469   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
470   k = GNUNET_new (struct KeyEntry);
471   k->element = ee;
472   k->ibf_key = ibf_key;
473   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
474                                                       (uint32_t) ibf_key.key_val,
475                                                       insert_element_iterator, k);
476   /* was the element inserted into a colliding bucket? */
477   if (GNUNET_SYSERR == ret)
478   {
479     GNUNET_assert (NULL != k->next_colliding);
480     return;
481   }
482   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
483                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
484 }
485
486
487 /**
488  * Insert a key into an ibf.
489  *
490  * @param cls the ibf
491  * @param key unused
492  * @param value the key entry to get the key from
493  */
494 static int
495 prepare_ibf_iterator (void *cls,
496                       uint32_t key,
497                       void *value)
498 {
499   struct InvertibleBloomFilter *ibf = cls;
500   struct KeyEntry *ke = value;
501
502   ibf_insert (ibf, ke->ibf_key);
503   return GNUNET_YES;
504 }
505
506
507 /**
508  * Iterator for initializing the
509  * key-to-element mapping of a union operation
510  *
511  * @param cls the union operation
512  * @param key unised
513  * @param value the element entry to insert
514  *        into the key-to-element mapping
515  */
516 static int
517 init_key_to_element_iterator (void *cls,
518                               const struct GNUNET_HashCode *key,
519                               void *value)
520 {
521   struct UnionEvaluateOperation *eo = cls;
522   struct ElementEntry *e = value;
523
524   /* make sure that the element belongs to the set at the time
525    * of creating the operation */
526   if ( (e->generation_added > eo->generation_created) ||
527        ( (GNUNET_YES == e->removed) &&
528          (e->generation_removed < eo->generation_created)))
529     return GNUNET_YES;
530
531   insert_element (eo, e);
532   return GNUNET_YES;
533 }
534
535
536 /**
537  * Create an ibf with the operation's elements
538  * of the specified size
539  *
540  * @param eo the union operation
541  * @param size size of the ibf to create
542  */
543 static void
544 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
545 {
546   if (NULL == eo->key_to_element)
547   {
548     unsigned int len;
549     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
550     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
551     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
552                                              init_key_to_element_iterator, eo);
553   }
554   if (NULL != eo->local_ibf)
555     ibf_destroy (eo->local_ibf);
556   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
557   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
558                                            prepare_ibf_iterator, eo->local_ibf);
559 }
560
561
562 /**
563  * Send an ibf of appropriate size.
564  *
565  * @param eo the union operation
566  * @param ibf_order order of the ibf to send, size=2^order
567  */
568 static void
569 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
570 {
571   unsigned int buckets_sent = 0;
572   struct InvertibleBloomFilter *ibf;
573
574   prepare_ibf (eo, 1<<ibf_order);
575
576   ibf = eo->local_ibf;
577
578   while (buckets_sent < (1 << ibf_order))
579   {
580     unsigned int buckets_in_message;
581     struct GNUNET_MQ_Message *mqm;
582     struct IBFMessage *msg;
583
584     buckets_in_message = (1 << ibf_order) - buckets_sent;
585     /* limit to maximum */
586     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
587       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
588
589     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
590                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
591     msg->order = htons (ibf_order);
592     msg->offset = htons (buckets_sent);
593     ibf_write_slice (ibf, buckets_sent,
594                      buckets_in_message, &msg[1]);
595     buckets_sent += buckets_in_message;
596     GNUNET_MQ_send (eo->mq, mqm);
597   }
598
599   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
600 }
601
602
603 /**
604  * Send a strata estimator to the remote peer.
605  *
606  * @param eo the union operation with the remote peer
607  */
608 static void
609 send_strata_estimator (struct UnionEvaluateOperation *eo)
610 {
611   struct GNUNET_MQ_Message *mqm;
612   struct GNUNET_MessageHeader *strata_msg;
613
614   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
615                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
616                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
617   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
618   GNUNET_MQ_send (eo->mq, mqm);
619   eo->phase = PHASE_EXPECT_IBF;
620 }
621
622
623 /**
624  * Compute the necessary order of an ibf
625  * from the size of the symmetric set difference.
626  *
627  * @param diff the difference
628  * @return the required size of the ibf
629  */
630 static unsigned int
631 get_order_from_difference (unsigned int diff)
632 {
633   unsigned int ibf_order;
634
635   ibf_order = 2;
636   while ((1<<ibf_order) < (2 * diff))
637     ibf_order++;
638   if (ibf_order > MAX_IBF_ORDER)
639     ibf_order = MAX_IBF_ORDER;
640   return ibf_order;
641 }
642
643
644 /**
645  * Handle a strata estimator from a remote peer
646  *
647  * @param the union operation
648  * @param mh the message
649  */
650 static void
651 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
652 {
653   struct UnionEvaluateOperation *eo = cls;
654   struct StrataEstimator *remote_se;
655   int diff;
656
657   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
658
659   if (eo->phase != PHASE_EXPECT_SE)
660   {
661     fail_union_operation (eo);
662     GNUNET_break (0);
663     return;
664   }
665   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
666                                        SE_IBF_HASH_NUM);
667   strata_estimator_read (&mh[1], remote_se);
668   GNUNET_assert (NULL != eo->se);
669   diff = strata_estimator_difference (remote_se, eo->se);
670   strata_estimator_destroy (remote_se);
671   strata_estimator_destroy (eo->se);
672   eo->se = NULL;
673   send_ibf (eo, get_order_from_difference (diff));
674 }
675
676
677
678 /**
679  * Iterator to send elements to a remote peer
680  *
681  * @param cls closure with the element key and the union operation
682  * @param key ignored
683  * @param value the key entry
684  */
685 static int
686 send_element_iterator (void *cls,
687                       uint32_t key,
688                       void *value)
689 {
690   struct SendElementClosure *sec = cls;
691   struct IBF_Key ibf_key = sec->ibf_key;
692   struct UnionEvaluateOperation *eo = sec->eo;
693   struct KeyEntry *ke = value;
694
695   if (ke->ibf_key.key_val != ibf_key.key_val)
696     return GNUNET_YES;
697   while (NULL != ke)
698   {
699     const struct GNUNET_SET_Element *const element = &ke->element->element;
700     struct GNUNET_MQ_Message *mqm;
701
702     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
703     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
704     if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
705     {
706       GNUNET_break (0);
707       GNUNET_MQ_discard (mqm);
708       continue;
709     }
710     GNUNET_MQ_send (eo->mq, mqm);
711   }
712   return GNUNET_NO;
713 }
714
715 /**
716  * Send all elements that have the specified IBF key
717  * to the remote peer of the union operation
718  *
719  * @param eo union operation
720  * @param ibf_key IBF key of interest
721  */
722 static void
723 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
724 {
725   struct SendElementClosure send_cls;
726
727   send_cls.ibf_key = ibf_key;
728   send_cls.eo = eo;
729   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
730                                                 send_element_iterator, &send_cls);
731 }
732
733
734
735 /**
736  * Decode which elements are missing on each side, and
737  * send the appropriate elemens and requests
738  *
739  * @param eo union operation
740  */
741 static void
742 decode_and_send (struct UnionEvaluateOperation *eo)
743 {
744   struct IBF_Key key;
745   int side;
746   struct InvertibleBloomFilter *diff_ibf;
747
748   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
749
750   prepare_ibf (eo, eo->remote_ibf->size);
751   diff_ibf = ibf_dup (eo->local_ibf);
752   ibf_subtract (diff_ibf, eo->remote_ibf);
753
754   while (1)
755   {
756     int res;
757
758     res = ibf_decode (diff_ibf, &side, &key);
759     if (GNUNET_SYSERR == res)
760     {
761       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
762                   diff_ibf->size * 2);
763       send_ibf (eo, diff_ibf->size * 2);
764       ibf_destroy (diff_ibf);
765       return;
766     }
767     if (GNUNET_NO == res)
768     {
769       struct GNUNET_MQ_Message *mqm;
770
771       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
772       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
773       GNUNET_MQ_send (eo->mq, mqm);
774       return;
775     }
776     if (1 == side)
777     {
778       send_elements_for_key (eo, key);
779     }
780     else
781     {
782       struct GNUNET_MQ_Message *mqm;
783       struct GNUNET_MessageHeader *msg;
784
785       /* FIXME: before sending the request, check if we may just have the element */
786       /* FIXME: merge multiple requests */
787       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
788                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
789       *(struct IBF_Key *) &msg[1] = key;
790       GNUNET_MQ_send (eo->mq, mqm);
791     }
792   }
793 }
794
795
796 /**
797  * Handle an IBF message from a remote peer.
798  *
799  * @param cls the union operation
800  * @param mh the header of the message
801  */
802 static void
803 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
804 {
805   struct UnionEvaluateOperation *eo = cls;
806   struct IBFMessage *msg = (struct IBFMessage *) mh;
807   unsigned int buckets_in_message;
808
809   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
810        (eo->phase == PHASE_EXPECT_IBF) )
811   {
812     eo->phase = PHASE_EXPECT_IBF_CONT;
813     GNUNET_assert (NULL == eo->remote_ibf);
814     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
815     if (0 != ntohs (msg->offset))
816     {
817       GNUNET_break (0);
818       fail_union_operation (eo);
819     }
820   }
821   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
822   {
823     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
824          (1<<msg->order != eo->remote_ibf->size) )
825     {
826       GNUNET_break (0);
827       fail_union_operation (eo);
828     }
829   }
830
831   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
832
833   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
834   {
835     GNUNET_break (0);
836     fail_union_operation (eo);
837   }
838
839   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
840   eo->ibf_buckets_received += buckets_in_message;
841
842   if (eo->ibf_buckets_received == eo->remote_ibf->size)
843   {
844     eo->phase = PHASE_EXPECT_ELEMENTS;
845     decode_and_send (eo);
846   }
847 }
848
849
850 /**
851  * Send an element to the client of the operations's set.
852  *
853  * @param eo union operation
854  * @param element element to send
855  */
856 static void
857 send_client_element (struct UnionEvaluateOperation *eo,
858                      struct GNUNET_SET_Element *element)
859 {
860   struct GNUNET_MQ_Message *mqm;
861   struct ResultMessage *rm;
862
863   GNUNET_assert (0 != eo->request_id);
864   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
865   if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
866   {
867     GNUNET_MQ_discard (mqm);
868     GNUNET_break (0);
869     return;
870   }
871
872   GNUNET_MQ_send (eo->mq, mqm);
873 }
874
875
876 /**
877  * Handle an element message from a remote peer.
878  *
879  * @param cls the union operation
880  * @param mh the message
881  */
882 static void
883 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
884 {
885   struct UnionEvaluateOperation *eo = cls;
886   struct ElementEntry *ee;
887   uint16_t element_size;
888
889   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
890        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
891   {
892     fail_union_operation (eo);
893     GNUNET_break (0);
894     return;
895   }
896   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
897   ee = GNUNET_malloc (sizeof *eo + element_size);
898   ee->element.data = &ee[1];
899   memcpy (ee->element.data, &mh[1], element_size);
900   ee->remote = GNUNET_YES;
901
902   insert_element (eo, ee);
903   send_client_element (eo, &ee->element);
904 }
905
906
907 /**
908  * Handle an element request from a remote peer.
909  *
910  * @param cls the union operation
911  * @param mh the message
912  */
913 static void
914 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
915 {
916   struct UnionEvaluateOperation *eo = cls;
917   struct IBF_Key *ibf_key;
918   unsigned int num_keys;
919
920   /* look up elements and send them */
921   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
922   {
923     fail_union_operation (eo);
924     GNUNET_break (0);
925     return;
926   }
927
928   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
929
930   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
931   {
932     fail_union_operation (eo);
933     GNUNET_break (0);
934     return;
935   }
936
937   ibf_key = (struct IBF_Key *) &mh[1];
938   while (0 != num_keys--)
939   {
940     send_elements_for_key (eo, *ibf_key);
941     ibf_key++;
942   }
943 }
944
945
946 /**
947  * Handle a done message from a remote peer
948  * 
949  * @param cls the union operation
950  * @param mh the message
951  */
952 static void
953 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
954 {
955   struct UnionEvaluateOperation *eo = cls;
956
957   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
958   {
959     /* we got all requests, but still have to send our elements as response */
960     struct GNUNET_MQ_Message *mqm;
961
962     eo->phase = PHASE_FINISHED;
963     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
964     GNUNET_MQ_send (eo->mq, mqm);
965     return;
966   }
967   if (eo->phase == PHASE_EXPECT_ELEMENTS)
968   {
969     /* it's all over! */
970     eo->phase = PHASE_FINISHED;
971     return;
972   }
973   GNUNET_break (0);
974   fail_union_operation (eo);
975 }
976
977
978 /**
979  * The handlers array, used for both evaluate and accept
980  */
981 static const struct GNUNET_MQ_Handler union_handlers[] = {
982   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
983   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
984   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
985   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
986   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
987   GNUNET_MQ_HANDLERS_END
988 };
989
990
991 /**
992  * Functions of this type will be called when a stream is established
993  * 
994  * @param cls the closure from GNUNET_STREAM_open
995  * @param socket socket to use to communicate with the
996  *        other side (read/write)
997  */
998 static void
999 stream_open_cb (void *cls,
1000                 struct GNUNET_STREAM_Socket *socket)
1001 {
1002   struct UnionEvaluateOperation *eo = cls;
1003
1004   GNUNET_assert (NULL == eo->mq);
1005   GNUNET_assert (socket == eo->socket);
1006
1007   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
1008
1009   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
1010                                               union_handlers, eo);
1011   /* we started the operation, thus we have to send the operation request */
1012   send_operation_request (eo);
1013   eo->phase = PHASE_EXPECT_SE;
1014 }
1015
1016
1017 /**
1018  * Evaluate a union operation with
1019  * a remote peer.
1020  *
1021  * @param m the evaluate request message from the client
1022  * @parem set the set to evaluate the operation with
1023  */
1024 void
1025 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1026 {
1027   struct UnionEvaluateOperation *eo;
1028
1029   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
1030
1031   eo = GNUNET_new (struct UnionEvaluateOperation);
1032   eo->peer = m->peer;
1033   eo->set = set;
1034   eo->request_id = htons(m->request_id);
1035   eo->se = strata_estimator_dup (set->state.u->se);
1036   eo->salt = ntohs (m->salt);
1037   eo->app_id = m->app_id;
1038   eo->socket = 
1039       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1040                           stream_open_cb, eo,
1041                           GNUNET_STREAM_OPTION_END);
1042   /* the stream open callback will kick off the operation */
1043 }
1044
1045
1046 /**
1047  * Accept an union operation request from a remote peer
1048  *
1049  * @param m the accept message from the client
1050  * @param set the set of the client
1051  * @param incoming information about the requesting remote peer
1052  */
1053 void
1054 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1055                    struct Incoming *incoming)
1056 {
1057   struct UnionEvaluateOperation *eo;
1058
1059   eo = GNUNET_new (struct UnionEvaluateOperation);
1060   eo->generation_created = set->state.u->current_generation++;
1061   eo->set = set;
1062   eo->peer = incoming->peer;
1063   eo->salt = ntohs (incoming->salt);
1064   eo->request_id = m->request_id;
1065   eo->se = strata_estimator_dup (set->state.u->se);
1066   eo->set = set;
1067   eo->mq = incoming->mq;
1068   /* the peer's socket is now ours, we'll receive all messages */
1069   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1070   /* kick of the operation */
1071   send_strata_estimator (eo);
1072 }
1073
1074
1075 /**
1076  * Create a new set supporting the union operation
1077  *
1078  * @return the newly created set
1079  */
1080 struct Set *
1081 _GSS_union_set_create (void)
1082 {
1083   struct Set *set;
1084
1085   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
1086   
1087   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1088   set->state.u = (struct UnionState *) &set[1];
1089   set->operation = GNUNET_SET_OPERATION_UNION;
1090   /* keys of the hash map are stored in the element entrys, thus we do not
1091    * want the hash map to copy them */
1092   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1093   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1094                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1095   return set;
1096 }
1097
1098
1099 /**
1100  * Add the element from the given element message to the set.
1101  *
1102  * @param m message with the element
1103  * @param set set to add the element to
1104  */
1105 void
1106 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1107 {
1108   struct ElementEntry *ee;
1109   struct ElementEntry *ee_dup;
1110   uint16_t element_size;
1111
1112   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1113   element_size = ntohs (m->header.size) - sizeof *m;
1114   ee = GNUNET_malloc (element_size + sizeof *ee);
1115   ee->element.size = element_size;
1116   ee->element.data = &ee[1];
1117   ee->generation_added = set->state.u->current_generation;
1118   memcpy (ee->element.data, &m[1], element_size);
1119   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1120   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1121   if (NULL != ee_dup)
1122   {
1123     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1124     GNUNET_free (ee);
1125     return;
1126   }
1127   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1128                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1129   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1130 }
1131
1132
1133 /**
1134  * Remove the element given in the element message from the set.
1135  * Only marks the element as removed, so that older set operations can still exchange it.
1136  *
1137  * @param m message with the element
1138  * @param set set to remove the element from
1139  */
1140 void
1141 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1142 {
1143   struct GNUNET_HashCode hash;
1144   struct ElementEntry *ee;
1145
1146   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1147   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1148   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1149   if (NULL == ee)
1150   {
1151     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1152     return;
1153   }
1154   if (GNUNET_YES == ee->removed)
1155   {
1156     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1157     return;
1158   }
1159   ee->removed = GNUNET_YES;
1160   ee->generation_removed = set->state.u->current_generation;
1161 }
1162