fix / documentation
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Type of this operation
139    */
140   enum GNUNET_SET_OperationType operation;
141
142   /**
143    * Request ID to multiplex set operations to
144    * the client inhabiting the set.
145    */
146   uint32_t request_id;
147
148   /**
149    * Number of ibf buckets received
150    */
151   unsigned int ibf_buckets_received;
152
153   /**
154    * Copy of the set's strata estimator at the time of
155    * creation of this operation
156    */
157   struct StrataEstimator *se;
158
159   /**
160    * The ibf we currently receive
161    */
162   struct InvertibleBloomFilter *remote_ibf;
163
164   /**
165    * IBF of the set's element.
166    */
167   struct InvertibleBloomFilter *local_ibf;
168
169   /**
170    * Maps IBF-Keys (specific to the current salt) to elements.
171    */
172   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
173
174   /**
175    * Current state of the operation.
176    */
177   enum UnionOperationPhase phase;
178
179   /**
180    * Salt to use for this operation.
181    */
182   uint16_t salt;
183
184   /**
185    * Generation in which the operation handle
186    * was created.
187    */
188   unsigned int generation_created;
189   
190   /**
191    * Evaluate operations are held in
192    * a linked list.
193    */
194   struct UnionEvaluateOperation *next;
195   
196    /**
197    * Evaluate operations are held in
198    * a linked list.
199    */
200   struct UnionEvaluateOperation *prev;
201 };
202
203
204 /**
205  * Information about the element in a set.
206  * All elements are stored in a hash-table
207  * from their hash-code to their 'struct Element',
208  * so that the remove and add operations are reasonably
209  * fast.
210  */
211 struct ElementEntry
212 {
213   /**
214    * The actual element. The data for the element
215    * should be allocated at the end of this struct.
216    */
217   struct GNUNET_SET_Element element;
218
219   /**
220    * Hash of the element.
221    * Will be used to derive the different IBF keys
222    * for different salts.
223    */
224   struct GNUNET_HashCode element_hash;
225
226   /**
227    * Generation the element was added by the client.
228    * Operations of earlier generations will not consider the element.
229    */
230   unsigned int generation_added;
231
232   /**
233    * GNUNET_YES if the element has been removed in some generation.
234    */
235   int removed;
236
237   /**
238    * Generation the element was removed by the client. 
239    * Operations of later generations will not consider the element.
240    * Only valid if is_removed is GNUNET_YES.
241    */
242   unsigned int generation_removed;
243
244   /**
245    * GNUNET_YES if the element is a remote element, and does not belong
246    * to the operation's set.
247    */
248   int remote;
249 };
250
251
252 /**
253  * Information about the element used for 
254  * a specific union operation.
255  */
256 struct KeyEntry
257 {
258   /**
259    * IBF key for the entry, derived from the current salt.
260    */
261   struct IBF_Key ibf_key;
262
263   /**
264    * The actual element associated with the key
265    */
266   struct ElementEntry *element;
267
268   /**
269    * Element that collides with this element
270    * on the ibf key
271    */
272   struct KeyEntry *next_colliding;
273 };
274
275 /**
276  * Used as a closure for sending elements
277  * with a specific IBF key.
278  */
279 struct SendElementClosure
280 {
281   /**
282    * The IBF key whose matching elements should be
283    * sent.
284    */
285   struct IBF_Key ibf_key;
286
287   /**
288    * Operation for which the elements
289    * should be sent.
290    */
291   struct UnionEvaluateOperation *eo;
292 };
293
294
295 /**
296  * Extra state required for efficient set union.
297  */
298 struct UnionState
299 {
300   /**
301    * The strata estimator is only generated once for
302    * each set.
303    * The IBF keys are derived from the element hashes with
304    * salt=0.
305    */
306   struct StrataEstimator *se;
307
308   /**
309    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
310    */
311   struct GNUNET_CONTAINER_MultiHashMap *elements;
312
313   /**
314    * Evaluate operations are held in
315    * a linked list.
316    */
317   struct UnionEvaluateOperation *ops_head;
318
319   /**
320    * Evaluate operations are held in
321    * a linked list.
322    */
323   struct UnionEvaluateOperation *ops_tail;
324
325   /**
326    * Current generation, that is, number of
327    * previously executed operations on this set
328    */
329   unsigned int current_generation;
330 };
331
332
333
334
335 /**
336  * Destroy a union operation, and free all resources
337  * associated with it.
338  *
339  * @param eo the union operation to destroy
340  */
341 static void
342 destroy_union_operation (struct UnionEvaluateOperation *eo)
343 {
344   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
345                                eo->set->state.u->ops_tail,
346                                eo);
347   GNUNET_free (eo);
348   /* FIXME: free and destroy everything else */
349 }
350
351
352 /**
353  * Inform the client that the union operation has failed,
354  * and proceed to destroy the evaluate operation.
355  *
356  * @param eo the union operation to fail
357  */
358 static void
359 fail_union_operation (struct UnionEvaluateOperation *eo)
360 {
361   struct GNUNET_MQ_Message *mqm;
362   struct ResultMessage *msg;
363
364   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366   msg->request_id = eo->request_id;
367   GNUNET_MQ_send (eo->set->client_mq, mqm);
368   destroy_union_operation (eo);
369 }
370
371
372 /**
373  * Derive the IBF key from a hash code and 
374  * a salt.
375  *
376  * @param src the hash code
377  * @param salt salt to use
378  * @return the derived IBF key
379  */
380 static struct IBF_Key
381 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
382 {
383   struct IBF_Key key;
384
385   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
386                       GCRY_MD_SHA512, GCRY_MD_SHA256,
387                       src, sizeof *src,
388                       &salt, sizeof (salt),
389                       NULL, 0);
390   return key;
391 }
392
393
394 /**
395  * Send a request for the evaluate operation to a remote peer
396  *
397  * @param eo operation with the other peer
398  */
399 static void
400 send_operation_request (struct UnionEvaluateOperation *eo)
401 {
402   struct GNUNET_MQ_Message *mqm;
403   struct OperationRequestMessage *msg;
404
405   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
406   if (NULL != eo->context_msg)
407     if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
408     {
409       /* the context message is too large */
410       _GSS_client_disconnect (eo->set->client);
411       GNUNET_MQ_discard (mqm);
412       GNUNET_break (0);
413       return;
414     }
415   msg->operation = eo->operation;
416   msg->app_id = eo->app_id;
417   GNUNET_MQ_send (eo->mq, mqm);
418
419   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
420 }
421
422
423 /**
424  * Iterator to create the mapping between ibf keys
425  * and element entries.
426  *
427  * @param cls closure
428  * @param key current key code
429  * @param value value in the hash map
430  * @return GNUNET_YES if we should continue to
431  *         iterate,
432  *         GNUNET_NO if not.
433  */
434 static int
435 insert_element_iterator (void *cls,
436                          uint32_t key,
437                          void *value)
438 {
439   struct KeyEntry *const new_k = cls;
440   struct KeyEntry *old_k = value;
441
442   GNUNET_assert (NULL != old_k);
443   do
444   {
445     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
446     {
447       new_k->next_colliding = old_k;
448       old_k->next_colliding = new_k;
449       return GNUNET_NO;
450     }
451     old_k = old_k->next_colliding;
452   } while (NULL != old_k);
453   return GNUNET_YES;
454 }
455
456
457 /**
458  * Insert an element into the union operation's
459  * key-to-element mapping
460  *
461  * @param the union operation
462  * @param ee the element entry
463  */
464 static void
465 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
466 {
467   int ret;
468   struct IBF_Key ibf_key;
469   struct KeyEntry *k;
470
471   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
472   k = GNUNET_new (struct KeyEntry);
473   k->element = ee;
474   k->ibf_key = ibf_key;
475   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
476                                                       (uint32_t) ibf_key.key_val,
477                                                       insert_element_iterator, k);
478   /* was the element inserted into a colliding bucket? */
479   if (GNUNET_SYSERR == ret)
480   {
481     GNUNET_assert (NULL != k->next_colliding);
482     return;
483   }
484   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
485                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
486 }
487
488
489 /**
490  * Insert a key into an ibf.
491  *
492  * @param cls the ibf
493  * @param key unused
494  * @param value the key entry to get the key from
495  */
496 static int
497 prepare_ibf_iterator (void *cls,
498                       uint32_t key,
499                       void *value)
500 {
501   struct InvertibleBloomFilter *ibf = cls;
502   struct KeyEntry *ke = value;
503
504   ibf_insert (ibf, ke->ibf_key);
505   return GNUNET_YES;
506 }
507
508
509 /**
510  * Iterator for initializing the
511  * key-to-element mapping of a union operation
512  *
513  * @param cls the union operation
514  * @param key unised
515  * @param value the element entry to insert
516  *        into the key-to-element mapping
517  */
518 static int
519 init_key_to_element_iterator (void *cls,
520                               const struct GNUNET_HashCode *key,
521                               void *value)
522 {
523   struct UnionEvaluateOperation *eo = cls;
524   struct ElementEntry *e = value;
525
526   /* make sure that the element belongs to the set at the time
527    * of creating the operation */
528   if ( (e->generation_added > eo->generation_created) ||
529        ( (GNUNET_YES == e->removed) &&
530          (e->generation_removed < eo->generation_created)))
531     return GNUNET_YES;
532
533   insert_element (eo, e);
534   return GNUNET_YES;
535 }
536
537
538 /**
539  * Create an ibf with the operation's elements
540  * of the specified size
541  *
542  * @param eo the union operation
543  * @param size size of the ibf to create
544  */
545 static void
546 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
547 {
548   if (NULL == eo->key_to_element)
549   {
550     unsigned int len;
551     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
552     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
553     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
554                                              init_key_to_element_iterator, eo);
555   }
556   if (NULL != eo->local_ibf)
557     ibf_destroy (eo->local_ibf);
558   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
559   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
560                                            prepare_ibf_iterator, eo->local_ibf);
561 }
562
563
564 /**
565  * Send an ibf of appropriate size.
566  *
567  * @param eo the union operation
568  * @param ibf_order order of the ibf to send, size=2^order
569  */
570 static void
571 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
572 {
573   unsigned int buckets_sent = 0;
574   struct InvertibleBloomFilter *ibf;
575
576   prepare_ibf (eo, 1<<ibf_order);
577
578   ibf = eo->local_ibf;
579
580   while (buckets_sent < (1 << ibf_order))
581   {
582     unsigned int buckets_in_message;
583     struct GNUNET_MQ_Message *mqm;
584     struct IBFMessage *msg;
585
586     buckets_in_message = (1 << ibf_order) - buckets_sent;
587     /* limit to maximum */
588     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
589       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
590
591     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
592                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
593     msg->order = htons (ibf_order);
594     msg->offset = htons (buckets_sent);
595     ibf_write_slice (ibf, buckets_sent,
596                      buckets_in_message, &msg[1]);
597     buckets_sent += buckets_in_message;
598     GNUNET_MQ_send (eo->mq, mqm);
599   }
600
601   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
602 }
603
604
605 /**
606  * Send a strata estimator to the remote peer.
607  *
608  * @param eo the union operation with the remote peer
609  */
610 static void
611 send_strata_estimator (struct UnionEvaluateOperation *eo)
612 {
613   struct GNUNET_MQ_Message *mqm;
614   struct GNUNET_MessageHeader *strata_msg;
615
616   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
617                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
618                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
619   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
620   GNUNET_MQ_send (eo->mq, mqm);
621   eo->phase = PHASE_EXPECT_IBF;
622 }
623
624
625 /**
626  * Compute the necessary order of an ibf
627  * from the size of the symmetric set difference.
628  *
629  * @param diff the difference
630  * @return the required size of the ibf
631  */
632 static unsigned int
633 get_order_from_difference (unsigned int diff)
634 {
635   unsigned int ibf_order;
636
637   ibf_order = 2;
638   while ((1<<ibf_order) < (2 * diff))
639     ibf_order++;
640   if (ibf_order > MAX_IBF_ORDER)
641     ibf_order = MAX_IBF_ORDER;
642   return ibf_order;
643 }
644
645
646 /**
647  * Handle a strata estimator from a remote peer
648  *
649  * @param the union operation
650  * @param mh the message
651  */
652 static void
653 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
654 {
655   struct UnionEvaluateOperation *eo = cls;
656   struct StrataEstimator *remote_se;
657   int diff;
658
659   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
660
661   if (eo->phase != PHASE_EXPECT_SE)
662   {
663     fail_union_operation (eo);
664     GNUNET_break (0);
665     return;
666   }
667   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
668                                        SE_IBF_HASH_NUM);
669   strata_estimator_read (&mh[1], remote_se);
670   GNUNET_assert (NULL != eo->se);
671   diff = strata_estimator_difference (remote_se, eo->se);
672   strata_estimator_destroy (remote_se);
673   strata_estimator_destroy (eo->se);
674   eo->se = NULL;
675   send_ibf (eo, get_order_from_difference (diff));
676 }
677
678
679
680 /**
681  * Iterator to send elements to a remote peer
682  *
683  * @param cls closure with the element key and the union operation
684  * @param key ignored
685  * @param value the key entry
686  */
687 static int
688 send_element_iterator (void *cls,
689                       uint32_t key,
690                       void *value)
691 {
692   struct SendElementClosure *sec = cls;
693   struct IBF_Key ibf_key = sec->ibf_key;
694   struct UnionEvaluateOperation *eo = sec->eo;
695   struct KeyEntry *ke = value;
696
697   if (ke->ibf_key.key_val != ibf_key.key_val)
698     return GNUNET_YES;
699   while (NULL != ke)
700   {
701     const struct GNUNET_SET_Element *const element = &ke->element->element;
702     struct GNUNET_MQ_Message *mqm;
703
704     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
705     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
706     if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
707     {
708       GNUNET_break (0);
709       GNUNET_MQ_discard (mqm);
710       continue;
711     }
712     GNUNET_MQ_send (eo->mq, mqm);
713   }
714   return GNUNET_NO;
715 }
716
717 /**
718  * Send all elements that have the specified IBF key
719  * to the remote peer of the union operation
720  *
721  * @param eo union operation
722  * @param ibf_key IBF key of interest
723  */
724 static void
725 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
726 {
727   struct SendElementClosure send_cls;
728
729   send_cls.ibf_key = ibf_key;
730   send_cls.eo = eo;
731   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
732                                                 send_element_iterator, &send_cls);
733 }
734
735
736
737 /**
738  * Decode which elements are missing on each side, and
739  * send the appropriate elemens and requests
740  *
741  * @param eo union operation
742  */
743 static void
744 decode_and_send (struct UnionEvaluateOperation *eo)
745 {
746   struct IBF_Key key;
747   int side;
748   struct InvertibleBloomFilter *diff_ibf;
749
750   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
751
752   prepare_ibf (eo, eo->remote_ibf->size);
753   diff_ibf = ibf_dup (eo->local_ibf);
754   ibf_subtract (diff_ibf, eo->remote_ibf);
755
756   while (1)
757   {
758     int res;
759
760     res = ibf_decode (diff_ibf, &side, &key);
761     if (GNUNET_SYSERR == res)
762     {
763       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
764                   diff_ibf->size * 2);
765       send_ibf (eo, diff_ibf->size * 2);
766       ibf_destroy (diff_ibf);
767       return;
768     }
769     if (GNUNET_NO == res)
770     {
771       struct GNUNET_MQ_Message *mqm;
772
773       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
774       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
775       GNUNET_MQ_send (eo->mq, mqm);
776       return;
777     }
778     if (1 == side)
779     {
780       send_elements_for_key (eo, key);
781     }
782     else
783     {
784       struct GNUNET_MQ_Message *mqm;
785       struct GNUNET_MessageHeader *msg;
786
787       /* FIXME: before sending the request, check if we may just have the element */
788       /* FIXME: merge multiple requests */
789       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
790                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
791       *(struct IBF_Key *) &msg[1] = key;
792       GNUNET_MQ_send (eo->mq, mqm);
793     }
794   }
795 }
796
797
798 /**
799  * Handle an IBF message from a remote peer.
800  *
801  * @param cls the union operation
802  * @param mh the header of the message
803  */
804 static void
805 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
806 {
807   struct UnionEvaluateOperation *eo = cls;
808   struct IBFMessage *msg = (struct IBFMessage *) mh;
809   unsigned int buckets_in_message;
810
811   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
812        (eo->phase == PHASE_EXPECT_IBF) )
813   {
814     eo->phase = PHASE_EXPECT_IBF_CONT;
815     GNUNET_assert (NULL == eo->remote_ibf);
816     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
817     if (0 != ntohs (msg->offset))
818     {
819       GNUNET_break (0);
820       fail_union_operation (eo);
821     }
822   }
823   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
824   {
825     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
826          (1<<msg->order != eo->remote_ibf->size) )
827     {
828       GNUNET_break (0);
829       fail_union_operation (eo);
830     }
831   }
832
833   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
834
835   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
836   {
837     GNUNET_break (0);
838     fail_union_operation (eo);
839   }
840
841   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
842   eo->ibf_buckets_received += buckets_in_message;
843
844   if (eo->ibf_buckets_received == eo->remote_ibf->size)
845   {
846     eo->phase = PHASE_EXPECT_ELEMENTS;
847     decode_and_send (eo);
848   }
849 }
850
851
852 /**
853  * Send an element to the client of the operations's set.
854  *
855  * @param eo union operation
856  * @param element element to send
857  */
858 static void
859 send_client_element (struct UnionEvaluateOperation *eo,
860                      struct GNUNET_SET_Element *element)
861 {
862   struct GNUNET_MQ_Message *mqm;
863   struct ResultMessage *rm;
864
865   GNUNET_assert (0 != eo->request_id);
866   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
867   if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
868   {
869     GNUNET_MQ_discard (mqm);
870     GNUNET_break (0);
871     return;
872   }
873
874   GNUNET_MQ_send (eo->mq, mqm);
875 }
876
877
878 /**
879  * Handle an element message from a remote peer.
880  *
881  * @param cls the union operation
882  * @param mh the message
883  */
884 static void
885 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
886 {
887   struct UnionEvaluateOperation *eo = cls;
888   struct ElementEntry *ee;
889   uint16_t element_size;
890
891   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
892        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
893   {
894     fail_union_operation (eo);
895     GNUNET_break (0);
896     return;
897   }
898   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
899   ee = GNUNET_malloc (sizeof *eo + element_size);
900   ee->element.data = &ee[1];
901   memcpy (ee->element.data, &mh[1], element_size);
902   ee->remote = GNUNET_YES;
903
904   insert_element (eo, ee);
905   send_client_element (eo, &ee->element);
906 }
907
908
909 /**
910  * Handle an element request from a remote peer.
911  *
912  * @param cls the union operation
913  * @param mh the message
914  */
915 static void
916 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
917 {
918   struct UnionEvaluateOperation *eo = cls;
919   struct IBF_Key *ibf_key;
920   unsigned int num_keys;
921
922   /* look up elements and send them */
923   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
924   {
925     fail_union_operation (eo);
926     GNUNET_break (0);
927     return;
928   }
929
930   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
931
932   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
933   {
934     fail_union_operation (eo);
935     GNUNET_break (0);
936     return;
937   }
938
939   ibf_key = (struct IBF_Key *) &mh[1];
940   while (0 != num_keys--)
941   {
942     send_elements_for_key (eo, *ibf_key);
943     ibf_key++;
944   }
945 }
946
947
948 static void
949 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
950 {
951   struct UnionEvaluateOperation *eo = cls;
952
953   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
954   {
955     /* we got all requests, but still have to send our elements as response */
956     struct GNUNET_MQ_Message *mqm;
957
958     eo->phase = PHASE_FINISHED;
959     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
960     GNUNET_MQ_send (eo->mq, mqm);
961     return;
962   }
963   if (eo->phase == PHASE_EXPECT_ELEMENTS)
964   {
965     /* it's all over! */
966     eo->phase = PHASE_FINISHED;
967     return;
968   }
969   GNUNET_break (0);
970   fail_union_operation (eo);
971 }
972
973
974 /**
975  * The handlers array, used for both evaluate and accept
976  */
977 static const struct GNUNET_MQ_Handler union_handlers[] = {
978   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
979   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
980   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
981   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
982   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
983   GNUNET_MQ_HANDLERS_END
984 };
985
986
987 /**
988  * Functions of this type will be called when a stream is established
989  * 
990  * @param cls the closure from GNUNET_STREAM_open
991  * @param socket socket to use to communicate with the
992  *        other side (read/write)
993  */
994 static void
995 stream_open_cb (void *cls,
996                 struct GNUNET_STREAM_Socket *socket)
997 {
998   struct UnionEvaluateOperation *eo = cls;
999
1000   GNUNET_assert (NULL == eo->mq);
1001   GNUNET_assert (socket == eo->socket);
1002
1003   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
1004
1005   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
1006                                               union_handlers, eo);
1007   /* we started the operation, thus we have to send the operation request */
1008   send_operation_request (eo);
1009   eo->phase = PHASE_EXPECT_SE;
1010 }
1011
1012         
1013
1014 void
1015 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1016 {
1017   struct UnionEvaluateOperation *eo;
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
1020
1021   eo = GNUNET_new (struct UnionEvaluateOperation);
1022   eo->peer = m->peer;
1023   eo->set = set;
1024   eo->request_id = htons(m->request_id);
1025   eo->se = strata_estimator_dup (set->state.u->se);
1026   eo->salt = ntohs (m->salt);
1027   eo->app_id = m->app_id;
1028   eo->socket = 
1029       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1030                           stream_open_cb, eo,
1031                           GNUNET_STREAM_OPTION_END);
1032 }
1033
1034
1035 void
1036 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1037                    struct Incoming *incoming)
1038 {
1039   struct UnionEvaluateOperation *eo;
1040
1041   eo = GNUNET_new (struct UnionEvaluateOperation);
1042   eo->generation_created = set->state.u->current_generation++;
1043   eo->set = set;
1044   eo->peer = incoming->peer;
1045   eo->salt = ntohs (incoming->salt);
1046   eo->request_id = m->request_id;
1047   eo->se = strata_estimator_dup (set->state.u->se);
1048   eo->set = set;
1049   eo->mq = incoming->mq;
1050   /* the peer's socket is now ours, we'll receive all messages */
1051   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1052   /* kick of the operation */
1053   send_strata_estimator (eo);
1054 }
1055
1056
1057 struct Set *
1058 _GSS_union_set_create (void)
1059 {
1060   struct Set *set;
1061
1062   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
1063   
1064   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1065   set->state.u = (struct UnionState *) &set[1];
1066   set->operation = GNUNET_SET_OPERATION_UNION;
1067   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1068                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1069   return set;
1070 }
1071
1072
1073 void
1074 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1075 {
1076   struct ElementEntry *ee;
1077   struct ElementEntry *ee_dup;
1078   uint16_t element_size;
1079   
1080   element_size = ntohs (m->header.size) - sizeof *m;
1081   ee = GNUNET_malloc (element_size + sizeof *ee);
1082   ee->element.size = element_size;
1083   ee->element.data = &ee[1];
1084   memcpy (ee->element.data, &m[1], element_size);
1085   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1086   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1087   if (NULL != ee_dup)
1088   {
1089     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1090     GNUNET_free (ee);
1091     return;
1092   }
1093   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1094                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1095   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1096 }
1097
1098
1099 /**
1100  * Remove the element given in the element message from the set.
1101  * Only marks the element as removed, so that older set operations can still exchange it.
1102  *
1103  * @param m message with the element
1104  * @param set set to remove the element from
1105  */
1106 void
1107 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1108 {
1109   struct GNUNET_HashCode hash;
1110   struct ElementEntry *ee;
1111
1112   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1113   
1114   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1115   if (NULL == ee)
1116   {
1117     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1118     return;
1119   }
1120   if (GNUNET_YES == ee->removed)
1121   {
1122     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1123     return;
1124   }
1125   ee->removed = GNUNET_YES;
1126   ee->generation_removed = set->state.u->current_generation;
1127 }
1128