implemented most parts of the set service
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Type of this operation
139    */
140   enum GNUNET_SET_OperationType operation;
141
142   /**
143    * Request ID to multiplex set operations to
144    * the client inhabiting the set.
145    */
146   uint32_t request_id;
147
148   /**
149    * Number of ibf buckets received
150    */
151   unsigned int ibf_buckets_received;
152
153   /**
154    * Copy of the set's strata estimator at the time of
155    * creation of this operation
156    */
157   struct StrataEstimator *se;
158
159   /**
160    * The ibf we currently receive
161    */
162   struct InvertibleBloomFilter *remote_ibf;
163
164   /**
165    * IBF of the set's element.
166    */
167   struct InvertibleBloomFilter *local_ibf;
168
169   /**
170    * Maps IBF-Keys (specific to the current salt) to elements.
171    */
172   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
173
174   /**
175    * Current state of the operation.
176    */
177   enum UnionOperationPhase phase;
178
179   /**
180    * Salt to use for this operation.
181    */
182   uint16_t salt;
183
184   /**
185    * Generation in which the operation handle
186    * was created.
187    */
188   unsigned int generation_created;
189   
190   /**
191    * Evaluate operations are held in
192    * a linked list.
193    */
194   struct UnionEvaluateOperation *next;
195   
196    /**
197    * Evaluate operations are held in
198    * a linked list.
199    */
200   struct UnionEvaluateOperation *prev;
201 };
202
203
204 /**
205  * Information about the element in a set.
206  * All elements are stored in a hash-table
207  * from their hash-code to their 'struct Element',
208  * so that the remove and add operations are reasonably
209  * fast.
210  */
211 struct ElementEntry
212 {
213   /**
214    * The actual element. The data for the element
215    * should be allocated at the end of this struct.
216    */
217   struct GNUNET_SET_Element element;
218
219   /**
220    * Hash of the element.
221    * Will be used to derive the different IBF keys
222    * for different salts.
223    */
224   struct GNUNET_HashCode element_hash;
225
226   /**
227    * Generation the element was added by the client.
228    * Operations of earlier generations will not consider the element.
229    */
230   unsigned int generation_added;
231
232   /**
233    * GNUNET_YES if the element has been removed in some generation.
234    */
235   int removed;
236
237   /**
238    * Generation the element was removed by the client. 
239    * Operations of later generations will not consider the element.
240    * Only valid if is_removed is GNUNET_YES.
241    */
242   unsigned int generation_removed;
243
244   /**
245    * GNUNET_YES if the element is a remote element, and does not belong
246    * to the operation's set.
247    */
248   int remote;
249 };
250
251
252 /**
253  * Information about the element used for 
254  * a specific union operation.
255  */
256 struct KeyEntry
257 {
258   /**
259    * IBF key for the entry, derived from the current salt.
260    */
261   struct IBF_Key ibf_key;
262
263   /**
264    * The actual element associated with the key
265    */
266   struct ElementEntry *element;
267
268   /**
269    * Element that collides with this element
270    * on the ibf key
271    */
272   struct KeyEntry *next_colliding;
273 };
274
275 /**
276  * Used as a closure for sending elements
277  * with a specific IBF key.
278  */
279 struct SendElementClosure
280 {
281   /**
282    * The IBF key whose matching elements should be
283    * sent.
284    */
285   struct IBF_Key ibf_key;
286
287   /**
288    * Operation for which the elements
289    * should be sent.
290    */
291   struct UnionEvaluateOperation *eo;
292 };
293
294
295 /**
296  * Extra state required for efficient set union.
297  */
298 struct UnionState
299 {
300   /**
301    * The strata estimator is only generated once for
302    * each set.
303    * The IBF keys are derived from the element hashes with
304    * salt=0.
305    */
306   struct StrataEstimator *se;
307
308   /**
309    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
310    */
311   struct GNUNET_CONTAINER_MultiHashMap *elements;
312
313   /**
314    * Evaluate operations are held in
315    * a linked list.
316    */
317   struct UnionEvaluateOperation *ops_head;
318
319   /**
320    * Evaluate operations are held in
321    * a linked list.
322    */
323   struct UnionEvaluateOperation *ops_tail;
324
325   /**
326    * Current generation, that is, number of
327    * previously executed operations on this set
328    */
329   unsigned int current_generation;
330 };
331
332
333
334
335 /**
336  * Destroy a union operation, and free all resources
337  * associated with it.
338  *
339  * @param eo the union operation to destroy
340  */
341 static void
342 destroy_union_operation (struct UnionEvaluateOperation *eo)
343 {
344   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
345                                eo->set->state.u->ops_tail,
346                                eo);
347   GNUNET_free (eo);
348   /* FIXME: free and destroy everything else */
349 }
350
351
352 /**
353  * Inform the client that the union operation has failed,
354  * and proceed to destroy the evaluate operation.
355  *
356  * @param eo the union operation to fail
357  */
358 static void
359 fail_union_operation (struct UnionEvaluateOperation *eo)
360 {
361   struct GNUNET_MQ_Message *mqm;
362   struct ResultMessage *msg;
363
364   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366   msg->request_id = eo->request_id;
367   GNUNET_MQ_send (eo->set->client_mq, mqm);
368   destroy_union_operation (eo);
369 }
370
371
372 /**
373  * Derive the IBF key from a hash code and 
374  * a salt.
375  *
376  * @param src the hash code
377  * @param salt salt to use
378  * @return the derived IBF key
379  */
380 static struct IBF_Key
381 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
382 {
383   struct IBF_Key key;
384
385   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
386                       GCRY_MD_SHA512, GCRY_MD_SHA256,
387                       src, sizeof *src,
388                       &salt, sizeof (salt),
389                       NULL, 0);
390   return key;
391 }
392
393
394 /**
395  * Send a request for the evaluate operation to a remote peer
396  *
397  * @param eo operation with the other peer
398  */
399 static void
400 send_operation_request (struct UnionEvaluateOperation *eo)
401 {
402   struct GNUNET_MQ_Message *mqm;
403   struct OperationRequestMessage *msg;
404
405   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
406   if (NULL != eo->context_msg)
407     if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
408     {
409       /* the context message is too large */
410       _GSS_client_disconnect (eo->set->client);
411       GNUNET_MQ_discard (mqm);
412       GNUNET_break (0);
413       return;
414     }
415   msg->operation = eo->operation;
416   msg->app_id = eo->app_id;
417   GNUNET_MQ_send (eo->mq, mqm);
418
419   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
420 }
421
422
423 /**
424  * Iterator to create the mapping between ibf keys
425  * and element entries.
426  *
427  * @param cls closure
428  * @param key current key code
429  * @param value value in the hash map
430  * @return GNUNET_YES if we should continue to
431  *         iterate,
432  *         GNUNET_NO if not.
433  */
434 static int
435 insert_element_iterator (void *cls,
436                          uint32_t key,
437                          void *value)
438 {
439   struct KeyEntry *const new_k = cls;
440   struct KeyEntry *old_k = value;
441
442   GNUNET_assert (NULL != old_k);
443   do
444   {
445     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
446     {
447       new_k->next_colliding = old_k;
448       old_k->next_colliding = new_k;
449       return GNUNET_NO;
450     }
451     old_k = old_k->next_colliding;
452   } while (NULL != old_k);
453   return GNUNET_YES;
454 }
455
456
457 static void
458 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
459 {
460   int ret;
461   struct IBF_Key ibf_key;
462   struct KeyEntry *k;
463
464   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
465   k = GNUNET_new (struct KeyEntry);
466   k->element = ee;
467   k->ibf_key = ibf_key;
468   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
469                                                       (uint32_t) ibf_key.key_val,
470                                                       insert_element_iterator, k);
471   /* was the element inserted into a colliding bucket? */
472   if (GNUNET_SYSERR == ret)
473   {
474     GNUNET_assert (NULL != k->next_colliding);
475     return;
476   }
477   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
478                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
479   if (NULL != eo->local_ibf)
480     ibf_insert (eo->local_ibf, ibf_key);
481 }
482
483
484 static int
485 prepare_ibf_iterator (void *cls,
486                       uint32_t key,
487                       void *value)
488 {
489   struct InvertibleBloomFilter *ibf = cls;
490   struct KeyEntry *ke = value;
491
492   ibf_insert (ibf, ke->ibf_key);
493   return GNUNET_YES;
494 }
495
496
497 static int
498 init_key_to_element_iterator (void *cls,
499                               const struct GNUNET_HashCode *key,
500                               void *value)
501 {
502   struct UnionEvaluateOperation *eo = cls;
503   struct ElementEntry *e = value;
504
505   /* make sure that the element belongs to the set at the time
506    * of creating the operation */
507   if ( (e->generation_added > eo->generation_created) ||
508        ( (GNUNET_YES == e->removed) &&
509          (e->generation_removed < eo->generation_created)))
510     return GNUNET_YES;
511
512   insert_element (eo, e);
513   return GNUNET_YES;
514 }
515
516
517 static void
518 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
519 {
520   if (NULL == eo->key_to_element)
521   {
522     unsigned int len;
523     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
524     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
525     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
526                                              init_key_to_element_iterator, eo);
527   }
528   if (NULL != eo->local_ibf)
529     ibf_destroy (eo->local_ibf);
530   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
531   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
532                                            prepare_ibf_iterator, eo->local_ibf);
533 }
534
535
536 /**
537  * Send an ibf of appropriate size.
538  *
539  * @param eo the union operation
540  * @param ibf_order order of the ibf to send, size=2^order
541  */
542 static void
543 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
544 {
545   unsigned int buckets_sent = 0;
546   struct InvertibleBloomFilter *ibf;
547
548   prepare_ibf (eo, 1<<ibf_order);
549
550   ibf = eo->local_ibf;
551
552   while (buckets_sent < (1 << ibf_order))
553   {
554     unsigned int buckets_in_message;
555     struct GNUNET_MQ_Message *mqm;
556     struct IBFMessage *msg;
557
558     buckets_in_message = (1 << ibf_order) - buckets_sent;
559     /* limit to maximum */
560     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
561       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
562
563     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
564                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
565     msg->order = htons (ibf_order);
566     msg->offset = htons (buckets_sent);
567     ibf_write_slice (ibf, buckets_sent,
568                      buckets_in_message, &msg[1]);
569     buckets_sent += buckets_in_message;
570     GNUNET_MQ_send (eo->mq, mqm);
571   }
572
573   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
574 }
575
576
577 /**
578  * Send a strata estimator to the remote peer.
579  *
580  * @param eo the union operation with the remote peer
581  */
582 static void
583 send_strata_estimator (struct UnionEvaluateOperation *eo)
584 {
585   struct GNUNET_MQ_Message *mqm;
586   struct GNUNET_MessageHeader *strata_msg;
587
588   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
589                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
590                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
591   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
592   GNUNET_MQ_send (eo->mq, mqm);
593   eo->phase = PHASE_EXPECT_IBF;
594 }
595
596 static unsigned int
597 get_order_from_difference (unsigned int diff)
598 {
599   unsigned int ibf_order;
600
601   ibf_order = 2;
602   while ((1<<ibf_order) < (2 * diff))
603     ibf_order++;
604   if (ibf_order > MAX_IBF_ORDER)
605     ibf_order = MAX_IBF_ORDER;
606   return ibf_order;
607 }
608
609
610 static void
611 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
612 {
613   struct UnionEvaluateOperation *eo = cls;
614   struct StrataEstimator *remote_se;
615   int diff;
616
617   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
618
619   if (eo->phase != PHASE_EXPECT_SE)
620   {
621     fail_union_operation (eo);
622     GNUNET_break (0);
623     return;
624   }
625   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
626                                        SE_IBF_HASH_NUM);
627   strata_estimator_read (&mh[1], remote_se);
628   GNUNET_assert (NULL != eo->se);
629   diff = strata_estimator_difference (remote_se, eo->se);
630   strata_estimator_destroy (remote_se);
631   strata_estimator_destroy (eo->se);
632   eo->se = NULL;
633   send_ibf (eo, get_order_from_difference (diff));
634 }
635
636
637
638 static int
639 send_element_iterator (void *cls,
640                       uint32_t key,
641                       void *value)
642 {
643   struct SendElementClosure *sec = cls;
644   struct IBF_Key ibf_key = sec->ibf_key;
645   struct UnionEvaluateOperation *eo = sec->eo;
646   struct KeyEntry *ke = value;
647
648   if (ke->ibf_key.key_val != ibf_key.key_val)
649     return GNUNET_YES;
650   while (NULL != ke)
651   {
652     const struct GNUNET_SET_Element *const element = &ke->element->element;
653     struct GNUNET_MQ_Message *mqm;
654
655     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
656     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
657     if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
658     {
659       GNUNET_break (0);
660       GNUNET_MQ_discard (mqm);
661       continue;
662     }
663     GNUNET_MQ_send (eo->mq, mqm);
664   }
665   return GNUNET_NO;
666 }
667
668 /**
669  * Send all elements that have the specified IBF key
670  * to the remote peer of the union operation
671  *
672  * @param eo union operation
673  * @param ibf_key IBF key of interest
674  */
675 static void
676 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
677 {
678   struct SendElementClosure send_cls;
679
680   send_cls.ibf_key = ibf_key;
681   send_cls.eo = eo;
682   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
683                                                 send_element_iterator, &send_cls);
684 }
685
686
687
688 /**
689  * Decode which elements are missing on each side, and
690  * send the appropriate elemens and requests
691  *
692  * @param eo union operation
693  */
694 static void
695 decode_and_send (struct UnionEvaluateOperation *eo)
696 {
697   struct IBF_Key key;
698   int side;
699   struct InvertibleBloomFilter *diff_ibf;
700
701   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
702
703   prepare_ibf (eo, eo->remote_ibf->size);
704   diff_ibf = ibf_dup (eo->local_ibf);
705   ibf_subtract (diff_ibf, eo->remote_ibf);
706
707   while (1)
708   {
709     int res;
710
711     res = ibf_decode (diff_ibf, &side, &key);
712     if (GNUNET_SYSERR == res)
713     {
714       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
715                   diff_ibf->size * 2);
716       send_ibf (eo, diff_ibf->size * 2);
717       ibf_destroy (diff_ibf);
718       return;
719     }
720     if (GNUNET_NO == res)
721     {
722       struct GNUNET_MQ_Message *mqm;
723
724       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
725       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
726       GNUNET_MQ_send (eo->mq, mqm);
727       return;
728     }
729     if (1 == side)
730     {
731       send_elements_for_key (eo, key);
732     }
733     else
734     {
735       struct GNUNET_MQ_Message *mqm;
736       struct GNUNET_MessageHeader *msg;
737
738       /* FIXME: before sending the request, check if we may just have the element */
739       /* FIXME: merge multiple requests */
740       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
741                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
742       *(struct IBF_Key *) &msg[1] = key;
743       GNUNET_MQ_send (eo->mq, mqm);
744     }
745   }
746 }
747
748
749 static void
750 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
751 {
752   struct UnionEvaluateOperation *eo = cls;
753   struct IBFMessage *msg = (struct IBFMessage *) mh;
754   unsigned int buckets_in_message;
755
756   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
757        (eo->phase == PHASE_EXPECT_IBF) )
758   {
759     eo->phase = PHASE_EXPECT_IBF_CONT;
760     GNUNET_assert (NULL == eo->remote_ibf);
761     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
762     if (0 != ntohs (msg->offset))
763     {
764       GNUNET_break (0);
765       fail_union_operation (eo);
766     }
767   }
768   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
769   {
770     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
771          (1<<msg->order != eo->remote_ibf->size) )
772     {
773       GNUNET_break (0);
774       fail_union_operation (eo);
775     }
776   }
777
778   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
779
780   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
781   {
782     GNUNET_break (0);
783     fail_union_operation (eo);
784   }
785
786   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
787   eo->ibf_buckets_received += buckets_in_message;
788
789   if (eo->ibf_buckets_received == eo->remote_ibf->size)
790   {
791     eo->phase = PHASE_EXPECT_ELEMENTS;
792     decode_and_send (eo);
793   }
794 }
795
796
797 /**
798  * Send an element to the client of the operations's set.
799  *
800  * @param eo union operation
801  * @param element element to send
802  */
803 static void
804 send_client_element (struct UnionEvaluateOperation *eo,
805                      struct GNUNET_SET_Element *element)
806 {
807   struct GNUNET_MQ_Message *mqm;
808   struct ResultMessage *rm;
809
810   GNUNET_assert (0 != eo->request_id);
811   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
812   if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
813   {
814     GNUNET_MQ_discard (mqm);
815     GNUNET_break (0);
816     return;
817   }
818
819   GNUNET_MQ_send (eo->mq, mqm);
820 }
821
822
823 static void
824 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
825 {
826   struct UnionEvaluateOperation *eo = cls;
827   struct ElementEntry *ee;
828   uint16_t element_size;
829
830   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
831        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
832   {
833     fail_union_operation (eo);
834     GNUNET_break (0);
835     return;
836   }
837   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
838   ee = GNUNET_malloc (sizeof *eo + element_size);
839   ee->element.data = &ee[1];
840   memcpy (ee->element.data, &mh[1], element_size);
841   ee->remote = GNUNET_YES;
842
843   insert_element (eo, ee);
844   send_client_element (eo, &ee->element);
845 }
846
847
848 static void
849 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
850 {
851   struct UnionEvaluateOperation *eo = cls;
852   struct IBF_Key *ibf_key;
853   unsigned int num_keys;
854
855   /* look up elements and send them */
856   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
857   {
858     fail_union_operation (eo);
859     GNUNET_break (0);
860     return;
861   }
862
863   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
864
865   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
866   {
867     fail_union_operation (eo);
868     GNUNET_break (0);
869     return;
870   }
871
872   ibf_key = (struct IBF_Key *) &mh[1];
873   while (0 != num_keys--)
874   {
875     send_elements_for_key (eo, *ibf_key);
876     ibf_key++;
877   }
878 }
879
880
881 static void
882 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
883 {
884   struct UnionEvaluateOperation *eo = cls;
885
886   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
887   {
888     /* we got all requests, but still have to send our elements as response */
889     struct GNUNET_MQ_Message *mqm;
890
891     eo->phase = PHASE_FINISHED;
892     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
893     GNUNET_MQ_send (eo->mq, mqm);
894     return;
895   }
896   if (eo->phase == PHASE_EXPECT_ELEMENTS)
897   {
898     /* it's all over! */
899     eo->phase = PHASE_FINISHED;
900     return;
901   }
902   GNUNET_break (0);
903   fail_union_operation (eo);
904 }
905
906
907 /**
908  * The handlers array, used for both evaluate and accept
909  */
910 static const struct GNUNET_MQ_Handler union_handlers[] = {
911   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
912   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
913   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
914   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
915   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
916   GNUNET_MQ_HANDLERS_END
917 };
918
919
920 /**
921  * Functions of this type will be called when a stream is established
922  * 
923  * @param cls the closure from GNUNET_STREAM_open
924  * @param socket socket to use to communicate with the
925  *        other side (read/write)
926  */
927 static void
928 stream_open_cb (void *cls,
929                 struct GNUNET_STREAM_Socket *socket)
930 {
931   struct UnionEvaluateOperation *eo = cls;
932
933   GNUNET_assert (NULL == eo->mq);
934   GNUNET_assert (socket == eo->socket);
935
936   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
937
938   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
939                                               union_handlers, eo);
940   /* we started the operation, thus we have to send the operation request */
941   send_operation_request (eo);
942   eo->phase = PHASE_EXPECT_SE;
943 }
944
945         
946
947 void
948 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
949 {
950   struct UnionEvaluateOperation *eo;
951
952   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
953
954   eo = GNUNET_new (struct UnionEvaluateOperation);
955   eo->peer = m->peer;
956   eo->set = set;
957   eo->socket = 
958       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
959                           stream_open_cb, eo,
960                           GNUNET_STREAM_OPTION_END);
961 }
962
963
964 void
965 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
966                    struct Incoming *incoming)
967 {
968   struct UnionEvaluateOperation *eo;
969
970   eo = GNUNET_new (struct UnionEvaluateOperation);
971   eo->generation_created = set->state.u->current_generation++;
972   eo->set = set;
973   eo->peer = incoming->peer;
974   eo->app_id = incoming->app_id;
975   eo->salt = ntohs (incoming->salt);
976   eo->request_id = m->request_id;
977   eo->set = set;
978   eo->mq = incoming->mq;
979   /* the peer's socket is now ours, we'll receive all messages */
980   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
981   /* kick of the operation */
982   send_strata_estimator (eo);
983 }
984
985
986 struct Set *
987 _GSS_union_set_create (void)
988 {
989   struct Set *set;
990
991   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
992   
993   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
994   set->state.u = (struct UnionState *) &set[1];
995   set->operation = GNUNET_SET_OPERATION_UNION;
996   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
997                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
998   return set;
999 }
1000
1001
1002 void
1003 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1004 {
1005   struct ElementEntry *ee;
1006   struct ElementEntry *ee_dup;
1007   uint16_t element_size;
1008   
1009   element_size = ntohs (m->header.size) - sizeof *m;
1010   ee = GNUNET_malloc (element_size + sizeof *ee);
1011   ee->element.size = element_size;
1012   ee->element.data = &ee[1];
1013   memcpy (ee->element.data, &m[1], element_size);
1014   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1015   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1016   if (NULL != ee_dup)
1017   {
1018     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1019     GNUNET_free (ee);
1020     return;
1021   }
1022   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1023                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1024   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1025 }
1026
1027
1028 /**
1029  * Remove the element given in the element message from the set.
1030  * Only marks the element as removed, so that older set operations can still exchange it.
1031  *
1032  * @param m message with the element
1033  * @param set set to remove the element from
1034  */
1035 void
1036 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1037 {
1038   struct GNUNET_HashCode hash;
1039   struct ElementEntry *ee;
1040
1041   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1042   
1043   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1044   if (NULL == ee)
1045   {
1046     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1047     return;
1048   }
1049   if (GNUNET_YES == ee->removed)
1050   {
1051     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1052     return;
1053   }
1054   ee->removed = GNUNET_YES;
1055   ee->generation_removed = set->state.u->current_generation;
1056 }
1057