removed unnecessary malloc
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Request ID to multiplex set operations to
139    * the client inhabiting the set.
140    */
141   uint32_t request_id;
142
143   /**
144    * Number of ibf buckets received
145    */
146   unsigned int ibf_buckets_received;
147
148   /**
149    * Copy of the set's strata estimator at the time of
150    * creation of this operation
151    */
152   struct StrataEstimator *se;
153
154   /**
155    * The ibf we currently receive
156    */
157   struct InvertibleBloomFilter *remote_ibf;
158
159   /**
160    * IBF of the set's element.
161    */
162   struct InvertibleBloomFilter *local_ibf;
163
164   /**
165    * Maps IBF-Keys (specific to the current salt) to elements.
166    */
167   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168
169   /**
170    * Current state of the operation.
171    */
172   enum UnionOperationPhase phase;
173
174   /**
175    * Salt to use for this operation.
176    */
177   uint16_t salt;
178
179   /**
180    * Generation in which the operation handle
181    * was created.
182    */
183   unsigned int generation_created;
184   
185   /**
186    * Evaluate operations are held in
187    * a linked list.
188    */
189   struct UnionEvaluateOperation *next;
190   
191    /**
192    * Evaluate operations are held in
193    * a linked list.
194    */
195   struct UnionEvaluateOperation *prev;
196 };
197
198
199 /**
200  * Information about the element in a set.
201  * All elements are stored in a hash-table
202  * from their hash-code to their 'struct Element',
203  * so that the remove and add operations are reasonably
204  * fast.
205  */
206 struct ElementEntry
207 {
208   /**
209    * The actual element. The data for the element
210    * should be allocated at the end of this struct.
211    */
212   struct GNUNET_SET_Element element;
213
214   /**
215    * Hash of the element.
216    * Will be used to derive the different IBF keys
217    * for different salts.
218    */
219   struct GNUNET_HashCode element_hash;
220
221   /**
222    * Generation the element was added by the client.
223    * Operations of earlier generations will not consider the element.
224    */
225   unsigned int generation_added;
226
227   /**
228    * GNUNET_YES if the element has been removed in some generation.
229    */
230   int removed;
231
232   /**
233    * Generation the element was removed by the client. 
234    * Operations of later generations will not consider the element.
235    * Only valid if is_removed is GNUNET_YES.
236    */
237   unsigned int generation_removed;
238
239   /**
240    * GNUNET_YES if the element is a remote element, and does not belong
241    * to the operation's set.
242    */
243   int remote;
244 };
245
246
247 /**
248  * Information about the element used for 
249  * a specific union operation.
250  */
251 struct KeyEntry
252 {
253   /**
254    * IBF key for the entry, derived from the current salt.
255    */
256   struct IBF_Key ibf_key;
257
258   /**
259    * The actual element associated with the key
260    */
261   struct ElementEntry *element;
262
263   /**
264    * Element that collides with this element
265    * on the ibf key
266    */
267   struct KeyEntry *next_colliding;
268 };
269
270 /**
271  * Used as a closure for sending elements
272  * with a specific IBF key.
273  */
274 struct SendElementClosure
275 {
276   /**
277    * The IBF key whose matching elements should be
278    * sent.
279    */
280   struct IBF_Key ibf_key;
281
282   /**
283    * Operation for which the elements
284    * should be sent.
285    */
286   struct UnionEvaluateOperation *eo;
287 };
288
289
290 /**
291  * Extra state required for efficient set union.
292  */
293 struct UnionState
294 {
295   /**
296    * The strata estimator is only generated once for
297    * each set.
298    * The IBF keys are derived from the element hashes with
299    * salt=0.
300    */
301   struct StrataEstimator *se;
302
303   /**
304    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
305    */
306   struct GNUNET_CONTAINER_MultiHashMap *elements;
307
308   /**
309    * Evaluate operations are held in
310    * a linked list.
311    */
312   struct UnionEvaluateOperation *ops_head;
313
314   /**
315    * Evaluate operations are held in
316    * a linked list.
317    */
318   struct UnionEvaluateOperation *ops_tail;
319
320   /**
321    * Current generation, that is, number of
322    * previously executed operations on this set
323    */
324   unsigned int current_generation;
325 };
326
327
328
329 /**
330  * Iterator over hash map entries.
331  *
332  * @param cls closure
333  * @param key current key code
334  * @param value value in the hash map
335  * @return GNUNET_YES if we should continue to
336  *         iterate,
337  *         GNUNET_NO if not.
338  */
339 static int
340 destroy_elements_iterator (void *cls,
341                            const struct GNUNET_HashCode * key,
342                            void *value)
343 {
344   struct ElementEntry *ee = value;
345
346   GNUNET_free (ee);
347   return GNUNET_YES;
348 }
349
350
351 /**
352  * Destroy the elements belonging to a union set.
353  *
354  * @param us union state that contains the elements
355  */
356 static void
357 destroy_elements (struct UnionState *us)
358 {
359   if (NULL == us->elements)
360     return;
361   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
362   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
363   us->elements = NULL;
364 }
365
366
367
368 /**
369  * Iterator over hash map entries.
370  *
371  * @param cls closure
372  * @param key current key code
373  * @param value value in the hash map
374  * @return GNUNET_YES if we should continue to
375  *         iterate,
376  *         GNUNET_NO if not.
377  */
378 static int
379 destroy_key_to_element_iter (void *cls,
380                              uint32_t key,
381                              void *value)
382 {
383   struct KeyEntry *k = value;
384   
385   while (NULL != k)
386   {
387     struct KeyEntry *k_tmp = k;
388     k = k->next_colliding;
389     GNUNET_free (k_tmp);
390   }
391   return GNUNET_YES;
392 }
393
394
395 /**
396  * Destroy a union operation, and free all resources
397  * associated with it.
398  *
399  * @param eo the union operation to destroy
400  */
401 static void
402 destroy_union_operation (struct UnionEvaluateOperation *eo)
403 {
404   if (NULL != eo->mq)
405   {
406     GNUNET_MQ_destroy (eo->mq);
407     eo->mq = NULL;
408   }
409   if (NULL != eo->socket)
410   {
411     GNUNET_STREAM_close (eo->socket);
412     eo->socket = NULL;
413   }
414   if (NULL != eo->remote_ibf)
415   {
416     ibf_destroy (eo->remote_ibf);
417     eo->remote_ibf = NULL;
418   }
419   if (NULL != eo->local_ibf)
420   {
421     ibf_destroy (eo->local_ibf);
422     eo->local_ibf = NULL;
423   }
424   if (NULL != eo->se)
425   {
426     strata_estimator_destroy (eo->se);
427     eo->se = NULL;
428   }
429   if (NULL != eo->key_to_element)
430   {
431     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
432     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
433     eo->key_to_element = NULL;
434   }
435
436
437   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
438                                eo->set->state.u->ops_tail,
439                                eo);
440   GNUNET_free (eo);
441   /* FIXME: free and destroy everything else */
442 }
443
444
445 /**
446  * Inform the client that the union operation has failed,
447  * and proceed to destroy the evaluate operation.
448  *
449  * @param eo the union operation to fail
450  */
451 static void
452 fail_union_operation (struct UnionEvaluateOperation *eo)
453 {
454   struct GNUNET_MQ_Message *mqm;
455   struct ResultMessage *msg;
456
457   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
458   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
459   msg->request_id = htonl (eo->request_id);
460   GNUNET_MQ_send (eo->set->client_mq, mqm);
461   destroy_union_operation (eo);
462 }
463
464
465 /**
466  * Derive the IBF key from a hash code and 
467  * a salt.
468  *
469  * @param src the hash code
470  * @param salt salt to use
471  * @return the derived IBF key
472  */
473 static struct IBF_Key
474 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
475 {
476   struct IBF_Key key;
477
478   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
479                       GCRY_MD_SHA512, GCRY_MD_SHA256,
480                       src, sizeof *src,
481                       &salt, sizeof (salt),
482                       NULL, 0);
483   return key;
484 }
485
486
487 /**
488  * Send a request for the evaluate operation to a remote peer
489  *
490  * @param eo operation with the other peer
491  */
492 static void
493 send_operation_request (struct UnionEvaluateOperation *eo)
494 {
495   struct GNUNET_MQ_Message *mqm;
496   struct OperationRequestMessage *msg;
497
498   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
499   if (NULL != eo->context_msg)
500     if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
501     {
502       /* the context message is too large */
503       GNUNET_break (0);
504       GNUNET_SERVER_client_disconnect (eo->set->client);
505       GNUNET_MQ_discard (mqm);
506       return;
507     }
508   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
509   msg->app_id = eo->app_id;
510   GNUNET_MQ_send (eo->mq, mqm);
511
512   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
513 }
514
515
516 /**
517  * Iterator to create the mapping between ibf keys
518  * and element entries.
519  *
520  * @param cls closure
521  * @param key current key code
522  * @param value value in the hash map
523  * @return GNUNET_YES if we should continue to
524  *         iterate,
525  *         GNUNET_NO if not.
526  */
527 static int
528 insert_element_iterator (void *cls,
529                          uint32_t key,
530                          void *value)
531 {
532   struct KeyEntry *const new_k = cls;
533   struct KeyEntry *old_k = value;
534
535   GNUNET_assert (NULL != old_k);
536   do
537   {
538     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
539     {
540       new_k->next_colliding = old_k;
541       old_k->next_colliding = new_k;
542       return GNUNET_NO;
543     }
544     old_k = old_k->next_colliding;
545   } while (NULL != old_k);
546   return GNUNET_YES;
547 }
548
549
550 /**
551  * Insert an element into the union operation's
552  * key-to-element mapping
553  *
554  * @param the union operation
555  * @param ee the element entry
556  */
557 static void
558 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
559 {
560   int ret;
561   struct IBF_Key ibf_key;
562   struct KeyEntry *k;
563
564   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
565   k = GNUNET_new (struct KeyEntry);
566   k->element = ee;
567   k->ibf_key = ibf_key;
568   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
569                                                       (uint32_t) ibf_key.key_val,
570                                                       insert_element_iterator, k);
571   /* was the element inserted into a colliding bucket? */
572   if (GNUNET_SYSERR == ret)
573   {
574     GNUNET_assert (NULL != k->next_colliding);
575     return;
576   }
577   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
578                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
579 }
580
581
582 /**
583  * Insert a key into an ibf.
584  *
585  * @param cls the ibf
586  * @param key unused
587  * @param value the key entry to get the key from
588  */
589 static int
590 prepare_ibf_iterator (void *cls,
591                       uint32_t key,
592                       void *value)
593 {
594   struct InvertibleBloomFilter *ibf = cls;
595   struct KeyEntry *ke = value;
596
597   ibf_insert (ibf, ke->ibf_key);
598   return GNUNET_YES;
599 }
600
601
602 /**
603  * Iterator for initializing the
604  * key-to-element mapping of a union operation
605  *
606  * @param cls the union operation
607  * @param key unised
608  * @param value the element entry to insert
609  *        into the key-to-element mapping
610  */
611 static int
612 init_key_to_element_iterator (void *cls,
613                               const struct GNUNET_HashCode *key,
614                               void *value)
615 {
616   struct UnionEvaluateOperation *eo = cls;
617   struct ElementEntry *e = value;
618
619   /* make sure that the element belongs to the set at the time
620    * of creating the operation */
621   if ( (e->generation_added > eo->generation_created) ||
622        ( (GNUNET_YES == e->removed) &&
623          (e->generation_removed < eo->generation_created)))
624     return GNUNET_YES;
625
626   insert_element (eo, e);
627   return GNUNET_YES;
628 }
629
630
631 /**
632  * Create an ibf with the operation's elements
633  * of the specified size
634  *
635  * @param eo the union operation
636  * @param size size of the ibf to create
637  */
638 static void
639 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
640 {
641   if (NULL == eo->key_to_element)
642   {
643     unsigned int len;
644     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
645     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
646     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
647                                              init_key_to_element_iterator, eo);
648   }
649   if (NULL != eo->local_ibf)
650     ibf_destroy (eo->local_ibf);
651   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
652   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
653                                            prepare_ibf_iterator, eo->local_ibf);
654 }
655
656
657 /**
658  * Send an ibf of appropriate size.
659  *
660  * @param eo the union operation
661  * @param ibf_order order of the ibf to send, size=2^order
662  */
663 static void
664 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
665 {
666   unsigned int buckets_sent = 0;
667   struct InvertibleBloomFilter *ibf;
668
669   prepare_ibf (eo, 1<<ibf_order);
670
671   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
672
673   ibf = eo->local_ibf;
674
675   while (buckets_sent < (1 << ibf_order))
676   {
677     unsigned int buckets_in_message;
678     struct GNUNET_MQ_Message *mqm;
679     struct IBFMessage *msg;
680
681     buckets_in_message = (1 << ibf_order) - buckets_sent;
682     /* limit to maximum */
683     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
684       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
685
686     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
687                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
688     msg->order = ibf_order;
689     msg->offset = htons (buckets_sent);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     GNUNET_MQ_send (eo->mq, mqm);
694   }
695
696   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
697 }
698
699
700 /**
701  * Send a strata estimator to the remote peer.
702  *
703  * @param eo the union operation with the remote peer
704  */
705 static void
706 send_strata_estimator (struct UnionEvaluateOperation *eo)
707 {
708   struct GNUNET_MQ_Message *mqm;
709   struct GNUNET_MessageHeader *strata_msg;
710
711   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
712                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
713                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
714   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
715   GNUNET_MQ_send (eo->mq, mqm);
716   eo->phase = PHASE_EXPECT_IBF;
717 }
718
719
720 /**
721  * Compute the necessary order of an ibf
722  * from the size of the symmetric set difference.
723  *
724  * @param diff the difference
725  * @return the required size of the ibf
726  */
727 static unsigned int
728 get_order_from_difference (unsigned int diff)
729 {
730   unsigned int ibf_order;
731
732   ibf_order = 2;
733   while ((1<<ibf_order) < (2 * diff))
734     ibf_order++;
735   if (ibf_order > MAX_IBF_ORDER)
736     ibf_order = MAX_IBF_ORDER;
737   return ibf_order;
738 }
739
740
741 /**
742  * Handle a strata estimator from a remote peer
743  *
744  * @param the union operation
745  * @param mh the message
746  */
747 static void
748 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
749 {
750   struct UnionEvaluateOperation *eo = cls;
751   struct StrataEstimator *remote_se;
752   int diff;
753
754
755   if (eo->phase != PHASE_EXPECT_SE)
756   {
757     fail_union_operation (eo);
758     GNUNET_break (0);
759     return;
760   }
761   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
762                                        SE_IBF_HASH_NUM);
763   strata_estimator_read (&mh[1], remote_se);
764   GNUNET_assert (NULL != eo->se);
765   diff = strata_estimator_difference (remote_se, eo->se);
766   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
767   strata_estimator_destroy (remote_se);
768   strata_estimator_destroy (eo->se);
769   eo->se = NULL;
770   send_ibf (eo, get_order_from_difference (diff));
771 }
772
773
774
775 /**
776  * Iterator to send elements to a remote peer
777  *
778  * @param cls closure with the element key and the union operation
779  * @param key ignored
780  * @param value the key entry
781  */
782 static int
783 send_element_iterator (void *cls,
784                       uint32_t key,
785                       void *value)
786 {
787   struct SendElementClosure *sec = cls;
788   struct IBF_Key ibf_key = sec->ibf_key;
789   struct UnionEvaluateOperation *eo = sec->eo;
790   struct KeyEntry *ke = value;
791
792   if (ke->ibf_key.key_val != ibf_key.key_val)
793     return GNUNET_YES;
794   while (NULL != ke)
795   {
796     const struct GNUNET_SET_Element *const element = &ke->element->element;
797     struct GNUNET_MQ_Message *mqm;
798
799     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
800     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
801     if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
802     {
803       GNUNET_break (0);
804       GNUNET_MQ_discard (mqm);
805       continue;
806     }
807     GNUNET_MQ_send (eo->mq, mqm);
808     ke = ke->next_colliding;
809   }
810   return GNUNET_NO;
811 }
812
813 /**
814  * Send all elements that have the specified IBF key
815  * to the remote peer of the union operation
816  *
817  * @param eo union operation
818  * @param ibf_key IBF key of interest
819  */
820 static void
821 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
822 {
823   struct SendElementClosure send_cls;
824
825   send_cls.ibf_key = ibf_key;
826   send_cls.eo = eo;
827   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
828                                                 send_element_iterator, &send_cls);
829 }
830
831
832 /**
833  * Decode which elements are missing on each side, and
834  * send the appropriate elemens and requests
835  *
836  * @param eo union operation
837  */
838 static void
839 decode_and_send (struct UnionEvaluateOperation *eo)
840 {
841   struct IBF_Key key;
842   int side;
843   struct InvertibleBloomFilter *diff_ibf;
844
845   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
846
847   prepare_ibf (eo, eo->remote_ibf->size);
848   diff_ibf = ibf_dup (eo->local_ibf);
849   ibf_subtract (diff_ibf, eo->remote_ibf);
850
851   while (1)
852   {
853     int res;
854
855     res = ibf_decode (diff_ibf, &side, &key);
856     if (GNUNET_SYSERR == res)
857     {
858       int next_order;
859       next_order = 0;
860       while (1<<next_order < diff_ibf->size)
861         next_order++;
862       next_order++;
863       if (next_order <= MAX_IBF_ORDER)
864       {
865         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
866                     1<<next_order);
867         send_ibf (eo, next_order);
868       }
869       else
870       {
871         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set union failed: reached ibf limit\n");
872       }
873       break;
874     }
875     if (GNUNET_NO == res)
876     {
877       struct GNUNET_MQ_Message *mqm;
878
879       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
880       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
881       GNUNET_MQ_send (eo->mq, mqm);
882       break;
883     }
884     if (1 == side)
885     {
886       send_elements_for_key (eo, key);
887     }
888     else
889     {
890       struct GNUNET_MQ_Message *mqm;
891       struct GNUNET_MessageHeader *msg;
892
893       /* FIXME: before sending the request, check if we may just have the element */
894       /* FIXME: merge multiple requests */
895       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
896                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
897       *(struct IBF_Key *) &msg[1] = key;
898       GNUNET_MQ_send (eo->mq, mqm);
899     }
900   }
901   ibf_destroy (diff_ibf);
902 }
903
904
905 /**
906  * Handle an IBF message from a remote peer.
907  *
908  * @param cls the union operation
909  * @param mh the header of the message
910  */
911 static void
912 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
913 {
914   struct UnionEvaluateOperation *eo = cls;
915   struct IBFMessage *msg = (struct IBFMessage *) mh;
916   unsigned int buckets_in_message;
917
918   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
919        (eo->phase == PHASE_EXPECT_IBF) )
920   {
921     eo->phase = PHASE_EXPECT_IBF_CONT;
922     GNUNET_assert (NULL == eo->remote_ibf);
923     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
924     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
925     if (0 != ntohs (msg->offset))
926     {
927       GNUNET_break (0);
928       fail_union_operation (eo);
929     }
930   }
931   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
932   {
933     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
934          (1<<msg->order != eo->remote_ibf->size) )
935     {
936       GNUNET_break (0);
937       fail_union_operation (eo);
938       return;
939     }
940   }
941
942   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
943
944   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
945   {
946     GNUNET_break (0);
947     fail_union_operation (eo);
948     return;
949   }
950   
951   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
952   eo->ibf_buckets_received += buckets_in_message;
953
954   if (eo->ibf_buckets_received == eo->remote_ibf->size)
955   {
956
957     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
958     eo->phase = PHASE_EXPECT_ELEMENTS;
959     decode_and_send (eo);
960   }
961 }
962
963
964 /**
965  * Send a result message to the client indicating
966  * that there is a new element.
967  *
968  * @param eo union operation
969  * @param element element to send
970  */
971 static void
972 send_client_element (struct UnionEvaluateOperation *eo,
973                      struct GNUNET_SET_Element *element)
974 {
975   struct GNUNET_MQ_Message *mqm;
976   struct ResultMessage *rm;
977
978   GNUNET_assert (0 != eo->request_id);
979   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
980   rm->result_status = htons (GNUNET_SET_STATUS_OK);
981   rm->request_id = htonl (eo->request_id);
982   if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
983   {
984     GNUNET_MQ_discard (mqm);
985     GNUNET_break (0);
986     return;
987   }
988
989   GNUNET_MQ_send (eo->set->client_mq, mqm);
990 }
991
992
993 /**
994  * Callback used for notifications
995  *
996  * @param cls closure
997  */
998 static void
999 client_done_sent_cb (void *cls)
1000 {
1001   //struct UnionEvaluateOperation *eo = cls;
1002   /* FIXME: destroy eo */
1003 }
1004
1005
1006 /**
1007  * Send a result message to the client indicating
1008  * that the operation is over.
1009  * After the result done message has been sent to the client,
1010  * destroy the evaluate operation.
1011  *
1012  * @param eo union operation
1013  * @param element element to send
1014  */
1015 static void
1016 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1017 {
1018   struct GNUNET_MQ_Message *mqm;
1019   struct ResultMessage *rm;
1020
1021   GNUNET_assert (0 != eo->request_id);
1022   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1023   rm->request_id = htonl (eo->request_id);
1024   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1025   GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
1026   GNUNET_MQ_send (eo->set->client_mq, mqm);
1027
1028   /* FIXME: destroy the eo */
1029 }
1030
1031
1032 /**
1033  * Handle an element message from a remote peer.
1034  *
1035  * @param cls the union operation
1036  * @param mh the message
1037  */
1038 static void
1039 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1040 {
1041   struct UnionEvaluateOperation *eo = cls;
1042   struct ElementEntry *ee;
1043   uint16_t element_size;
1044
1045   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1046
1047   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1048        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1049   {
1050     fail_union_operation (eo);
1051     GNUNET_break (0);
1052     return;
1053   }
1054   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1055   ee = GNUNET_malloc (sizeof *eo + element_size);
1056   ee->element.data = &ee[1];
1057   memcpy (ee->element.data, &mh[1], element_size);
1058   ee->remote = GNUNET_YES;
1059
1060   insert_element (eo, ee);
1061   send_client_element (eo, &ee->element);
1062
1063   GNUNET_free (ee);
1064 }
1065
1066
1067 /**
1068  * Handle an element request from a remote peer.
1069  *
1070  * @param cls the union operation
1071  * @param mh the message
1072  */
1073 static void
1074 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1075 {
1076   struct UnionEvaluateOperation *eo = cls;
1077   struct IBF_Key *ibf_key;
1078   unsigned int num_keys;
1079
1080   /* look up elements and send them */
1081   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1082   {
1083     GNUNET_break (0);
1084     fail_union_operation (eo);
1085     return;
1086   }
1087
1088   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1089
1090   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1091   {
1092     GNUNET_break (0);
1093     fail_union_operation (eo);
1094     return;
1095   }
1096
1097   ibf_key = (struct IBF_Key *) &mh[1];
1098   while (0 != num_keys--)
1099   {
1100     send_elements_for_key (eo, *ibf_key);
1101     ibf_key++;
1102   }
1103 }
1104
1105
1106 /**
1107  * Callback used for notifications
1108  *
1109  * @param cls closure
1110  */
1111 static void
1112 peer_done_sent_cb (void *cls)
1113 {
1114   struct UnionEvaluateOperation *eo = cls;
1115
1116   send_client_done_and_destroy (eo);
1117 }
1118
1119
1120 /**
1121  * Handle a done message from a remote peer
1122  * 
1123  * @param cls the union operation
1124  * @param mh the message
1125  */
1126 static void
1127 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1128 {
1129   struct UnionEvaluateOperation *eo = cls;
1130
1131   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1132   {
1133     /* we got all requests, but still have to send our elements as response */
1134     struct GNUNET_MQ_Message *mqm;
1135
1136     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1137     eo->phase = PHASE_FINISHED;
1138     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1139     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1140     GNUNET_MQ_send (eo->mq, mqm);
1141     return;
1142   }
1143   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1144   {
1145     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1146     eo->phase = PHASE_FINISHED;
1147     send_client_done_and_destroy (eo);
1148     return;
1149   }
1150   GNUNET_break (0);
1151   fail_union_operation (eo);
1152 }
1153
1154
1155 /**
1156  * The handlers array, used for both evaluate and accept
1157  */
1158 static const struct GNUNET_MQ_Handler union_handlers[] = {
1159   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
1160   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
1161   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
1162   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
1163   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
1164   GNUNET_MQ_HANDLERS_END
1165 };
1166
1167
1168 /**
1169  * Functions of this type will be called when a stream is established
1170  * 
1171  * @param cls the closure from GNUNET_STREAM_open
1172  * @param socket socket to use to communicate with the
1173  *        other side (read/write)
1174  */
1175 static void
1176 stream_open_cb (void *cls,
1177                 struct GNUNET_STREAM_Socket *socket)
1178 {
1179   struct UnionEvaluateOperation *eo = cls;
1180
1181   GNUNET_assert (NULL == eo->mq);
1182   GNUNET_assert (socket == eo->socket);
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
1185
1186   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
1187                                               union_handlers, eo);
1188   /* we started the operation, thus we have to send the operation request */
1189   send_operation_request (eo);
1190   eo->phase = PHASE_EXPECT_SE;
1191 }
1192
1193
1194 /**
1195  * Evaluate a union operation with
1196  * a remote peer.
1197  *
1198  * @param m the evaluate request message from the client
1199  * @parem set the set to evaluate the operation with
1200  */
1201 void
1202 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1203 {
1204   struct UnionEvaluateOperation *eo;
1205
1206   eo = GNUNET_new (struct UnionEvaluateOperation);
1207   eo->peer = m->peer;
1208   eo->set = set;
1209   eo->request_id = htonl (m->request_id);
1210   GNUNET_assert (0 != eo->request_id);
1211   eo->se = strata_estimator_dup (set->state.u->se);
1212   eo->salt = ntohs (m->salt);
1213   eo->app_id = m->app_id;
1214
1215   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n", 
1216               GNUNET_h2s (&eo->app_id));
1217
1218   eo->socket = 
1219       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1220                           stream_open_cb, eo,
1221                           GNUNET_STREAM_OPTION_END);
1222
1223
1224   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1225                                eo->set->state.u->ops_tail,
1226                                eo);
1227   /* the stream open callback will kick off the operation */
1228 }
1229
1230
1231 /**
1232  * Accept an union operation request from a remote peer
1233  *
1234  * @param m the accept message from the client
1235  * @param set the set of the client
1236  * @param incoming information about the requesting remote peer
1237  */
1238 void
1239 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1240                    struct Incoming *incoming)
1241 {
1242   struct UnionEvaluateOperation *eo;
1243
1244   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1245
1246   eo = GNUNET_new (struct UnionEvaluateOperation);
1247   eo->generation_created = set->state.u->current_generation++;
1248   eo->set = set;
1249   eo->peer = incoming->peer;
1250   eo->salt = ntohs (incoming->salt);
1251   GNUNET_assert (0 != ntohl (m->request_id));
1252   eo->request_id = ntohl (m->request_id);
1253   eo->se = strata_estimator_dup (set->state.u->se);
1254   eo->set = set;
1255   eo->mq = incoming->mq;
1256   /* transfer ownership of mq and socket from incoming to eo */
1257   incoming->mq = NULL;
1258   eo->socket = incoming->socket;
1259   incoming->socket = NULL;
1260   /* the peer's socket is now ours, we'll receive all messages */
1261   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1262
1263   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1264                                eo->set->state.u->ops_tail,
1265                                eo);
1266
1267   /* kick off the operation */
1268   send_strata_estimator (eo);
1269 }
1270
1271
1272 /**
1273  * Create a new set supporting the union operation
1274  *
1275  * @return the newly created set
1276  */
1277 struct Set *
1278 _GSS_union_set_create (void)
1279 {
1280   struct Set *set;
1281
1282   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1283   
1284   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1285   set->state.u = (struct UnionState *) &set[1];
1286   set->operation = GNUNET_SET_OPERATION_UNION;
1287   /* keys of the hash map are stored in the element entrys, thus we do not
1288    * want the hash map to copy them */
1289   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1290   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1291                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1292   return set;
1293 }
1294
1295
1296 /**
1297  * Add the element from the given element message to the set.
1298  *
1299  * @param m message with the element
1300  * @param set set to add the element to
1301  */
1302 void
1303 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1304 {
1305   struct ElementEntry *ee;
1306   struct ElementEntry *ee_dup;
1307   uint16_t element_size;
1308
1309   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1310
1311   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1312   element_size = ntohs (m->header.size) - sizeof *m;
1313   ee = GNUNET_malloc (element_size + sizeof *ee);
1314   ee->element.size = element_size;
1315   ee->element.data = &ee[1];
1316   ee->generation_added = set->state.u->current_generation;
1317   memcpy (ee->element.data, &m[1], element_size);
1318   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1319   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1320   if (NULL != ee_dup)
1321   {
1322     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1323     GNUNET_free (ee);
1324     return;
1325   }
1326   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1327                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1328   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1329 }
1330
1331
1332 /**
1333  * Destroy a set that supports the union operation
1334  *
1335  * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1336  */
1337 void
1338 _GSS_union_set_destroy (struct Set *set)
1339 {
1340   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1341   if (NULL != set->client)
1342   {
1343     GNUNET_SERVER_client_drop (set->client);
1344     set->client = NULL;
1345   }
1346   if (NULL != set->client_mq)
1347   {
1348     GNUNET_MQ_destroy (set->client_mq);
1349     set->client_mq = NULL;
1350   }
1351
1352   if (NULL != set->state.u->se)
1353   {
1354     strata_estimator_destroy (set->state.u->se);
1355     set->state.u->se = NULL;
1356   }
1357
1358   destroy_elements (set->state.u);
1359
1360   while (NULL != set->state.u->ops_head)
1361     destroy_union_operation (set->state.u->ops_head);
1362 }
1363
1364 /**
1365  * Remove the element given in the element message from the set.
1366  * Only marks the element as removed, so that older set operations can still exchange it.
1367  *
1368  * @param m message with the element
1369  * @param set set to remove the element from
1370  */
1371 void
1372 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1373 {
1374   struct GNUNET_HashCode hash;
1375   struct ElementEntry *ee;
1376
1377   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1378   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1379   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1380   if (NULL == ee)
1381   {
1382     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1383     return;
1384   }
1385   if (GNUNET_YES == ee->removed)
1386   {
1387     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1388     return;
1389   }
1390   ee->removed = GNUNET_YES;
1391   ee->generation_removed = set->state.u->current_generation;
1392 }
1393