more work on set,
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 4
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52 /**
53  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54  * Choose this value so that computing the IBF is still cheaper
55  * than transmitting all values.
56  */
57 #define MAX_IBF_ORDER (16)
58
59 /**
60  * Number of buckets used in the ibf per estimated
61  * difference.
62  */
63 #define IBF_ALPHA 4
64
65
66 /**
67  * Current phase we are in for a intersection operation.
68  */
69 enum IntersectionOperationPhase
70 {
71   /**
72    * We sent the request message, and expect a BF
73    */
74   PHASE_EXPECT_INITIAL,
75   /**
76    * We sent the request message, and expect a BF
77    */
78   PHASE_EXPECT_BF,
79   /**
80    * The protocol is over.
81    * Results may still have to be sent to the client.
82    */
83   PHASE_FINISHED
84 };
85
86
87 /**
88  * State of an evaluate operation
89  * with another peer.
90  */
91 struct OperationState
92 {
93   /**
94    * Tunnel to the remote peer.
95    */
96   struct GNUNET_MESH_Tunnel *tunnel;
97
98   /**
99    * Detail information about the set operation,
100    * including the set to use.
101    */
102   struct OperationSpecification *spec;
103
104   /**
105    * Message queue for the peer.
106    */
107   struct GNUNET_MQ_Handle *mq;
108
109   /**
110    * Number of ibf buckets received
111    */
112   unsigned int ibf_buckets_received;
113
114   /**
115    * Copy of the set's strata estimator at the time of
116    * creation of this operation
117    */
118   struct StrataEstimator *se;
119
120   /**
121    * The ibf we currently receive
122    */
123   struct InvertibleBloomFilter *remote_ibf;
124
125   /**
126    * IBF of the set's element.
127    */
128   struct InvertibleBloomFilter *local_ibf;
129
130   /**
131    * Maps IBF-Keys (specific to the current salt) to elements.
132    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
133    * Colliding IBF-Keys are linked.
134    */
135   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
136
137   /**
138    * Current state of the operation.
139    */
140   enum IntersectionOperationPhase phase;
141
142   /**
143    * Generation in which the operation handle
144    * was created.
145    */
146   unsigned int generation_created;
147
148   /**
149    * Set state of the set that this operation
150    * belongs to.
151    */
152   struct Set *set;
153
154   /**
155    * Evaluate operations are held in
156    * a linked list.
157    */
158   struct OperationState *next;
159
160    /**
161     * Evaluate operations are held in
162     * a linked list.
163     */
164   struct OperationState *prev;
165
166   /**
167    * Did we send the client that we are done?
168    */
169   int client_done_sent;
170 };
171
172
173 /**
174  * The key entry is used to associate an ibf key with
175  * an element.
176  */
177 struct KeyEntry
178 {
179   /**
180    * IBF key for the entry, derived from the current salt.
181    */
182   struct IBF_Key ibf_key;
183
184   /**
185    * The actual element associated with the key
186    */
187   struct ElementEntry *element;
188
189   /**
190    * Element that collides with this element
191    * on the ibf key
192    */
193   struct KeyEntry *next_colliding;
194 };
195
196
197 /**
198  * Used as a closure for sending elements
199  * with a specific IBF key.
200  */
201 struct SendElementClosure
202 {
203   /**
204    * The IBF key whose matching elements should be
205    * sent.
206    */
207   struct IBF_Key ibf_key;
208
209   /**
210    * Operation for which the elements
211    * should be sent.
212    */
213   struct OperationState *eo;
214 };
215
216
217 /**
218  * Extra state required for efficient set intersection.
219  */
220 struct SetState
221 {
222   /**
223    * The strata estimator is only generated once for
224    * each set.
225    * The IBF keys are derived from the element hashes with
226    * salt=0.
227    */
228   struct StrataEstimator *se;
229
230   /**
231    * Evaluate operations are held in
232    * a linked list.
233    */
234   struct OperationState *ops_head;
235
236   /**
237    * Evaluate operations are held in
238    * a linked list.
239    */
240   struct OperationState *ops_tail;
241 };
242
243
244 /**
245  * Iterator over hash map entries.
246  *
247  * @param cls closure
248  * @param key current key code
249  * @param value value in the hash map
250  * @return GNUNET_YES if we should continue to
251  *         iterate,
252  *         GNUNET_NO if not.
253  */
254 static int
255 destroy_key_to_element_iter (void *cls,
256                              uint32_t key,
257                              void *value)
258 {
259   struct KeyEntry *k = value;
260
261   while (NULL != k)
262   {
263     struct KeyEntry *k_tmp = k;
264     k = k->next_colliding;
265     if (GNUNET_YES == k_tmp->element->remote)
266     {
267       GNUNET_free (k_tmp->element);
268       k_tmp->element = NULL;
269     }
270     GNUNET_free (k_tmp);
271   }
272   return GNUNET_YES;
273 }
274
275
276 /**
277  * Destroy a intersection operation, and free all resources
278  * associated with it.
279  *
280  * @param eo the intersection operation to destroy
281  */
282 static void
283 intersection_operation_destroy (struct OperationState *eo)
284 {
285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
286   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
287                                eo->set->state->ops_tail,
288                                eo);
289   if (NULL != eo->mq)
290   {
291     GNUNET_MQ_destroy (eo->mq);
292     eo->mq = NULL;
293   }
294   if (NULL != eo->tunnel)
295   {
296     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
297     eo->tunnel = NULL;
298     GNUNET_MESH_tunnel_destroy (t);
299   }
300   if (NULL != eo->remote_ibf)
301   {
302     ibf_destroy (eo->remote_ibf);
303     eo->remote_ibf = NULL;
304   }
305   if (NULL != eo->local_ibf)
306   {
307     ibf_destroy (eo->local_ibf);
308     eo->local_ibf = NULL;
309   }
310   if (NULL != eo->se)
311   {
312     strata_estimator_destroy (eo->se);
313     eo->se = NULL;
314   }
315   if (NULL != eo->key_to_element)
316   {
317     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
318     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
319     eo->key_to_element = NULL;
320   }
321   if (NULL != eo->spec)
322   {
323     if (NULL != eo->spec->context_msg)
324     {
325       GNUNET_free (eo->spec->context_msg);
326       eo->spec->context_msg = NULL;
327     }
328     GNUNET_free (eo->spec);
329     eo->spec = NULL;
330   }
331   GNUNET_free (eo);
332
333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
334
335   /* FIXME: do a garbage collection of the set generations */
336 }
337
338
339 /**
340  * Inform the client that the intersection operation has failed,
341  * and proceed to destroy the evaluate operation.
342  *
343  * @param eo the intersection operation to fail
344  */
345 static void
346 fail_intersection_operation (struct OperationState *eo)
347 {
348   struct GNUNET_MQ_Envelope *ev;
349   struct GNUNET_SET_ResultMessage *msg;
350
351   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
352   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
353   msg->request_id = htonl (eo->spec->client_request_id);
354   msg->element_type = htons (0);
355   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
356   intersection_operation_destroy (eo);
357 }
358
359
360 /**
361  * Derive the IBF key from a hash code and
362  * a salt.
363  *
364  * @param src the hash code
365  * @param salt salt to use
366  * @return the derived IBF key
367  */
368 static struct IBF_Key
369 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
370 {
371   struct IBF_Key key;
372
373   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
374                       GCRY_MD_SHA512, GCRY_MD_SHA256,
375                       src, sizeof *src,
376                       &salt, sizeof (salt),
377                       NULL, 0);
378   return key;
379 }
380
381
382 /**
383  * Send a request for the evaluate operation to a remote peer
384  *
385  * @param eo operation with the other peer
386  */
387 static void
388 send_operation_request (struct OperationState *eo)
389 {
390   struct GNUNET_MQ_Envelope *ev;
391   struct OperationRequestMessage *msg;
392
393   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
394                                 eo->spec->context_msg);
395
396   if (NULL == ev)
397   {
398     /* the context message is too large */
399     GNUNET_break (0);
400     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
401     return;
402   }
403   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
404   msg->app_id = eo->spec->app_id;
405   msg->salt = htonl (eo->spec->salt);
406   GNUNET_MQ_send (eo->mq, ev);
407
408   if (NULL != eo->spec->context_msg)
409     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
410   else
411     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
412
413   if (NULL != eo->spec->context_msg)
414   {
415     GNUNET_free (eo->spec->context_msg);
416     eo->spec->context_msg = NULL;
417   }
418
419 }
420
421
422 /**
423  * Iterator to create the mapping between ibf keys
424  * and element entries.
425  *
426  * @param cls closure
427  * @param key current key code
428  * @param value value in the hash map
429  * @return GNUNET_YES if we should continue to
430  *         iterate,
431  *         GNUNET_NO if not.
432  */
433 static int
434 op_register_element_iterator (void *cls,
435                          uint32_t key,
436                          void *value)
437 {
438   struct KeyEntry *const new_k = cls;
439   struct KeyEntry *old_k = value;
440
441   GNUNET_assert (NULL != old_k);
442   do
443   {
444     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
445     {
446       new_k->next_colliding = old_k->next_colliding;
447       old_k->next_colliding = new_k;
448       return GNUNET_NO;
449     }
450     old_k = old_k->next_colliding;
451   } while (NULL != old_k);
452   return GNUNET_YES;
453 }
454
455
456 /**
457  * Insert an element into the intersection operation's
458  * key-to-element mapping. Takes ownership of 'ee'.
459  * Note that this does not insert the element in the set,
460  * only in the operation's key-element mapping.
461  * This is done to speed up re-tried operations, if some elements
462  * were transmitted, and then the IBF fails to decode.
463  *
464  * @param eo the intersection operation
465  * @param ee the element entry
466  */
467 static void
468 op_register_element (struct OperationState *eo, struct ElementEntry *ee)
469 {
470   int ret;
471   struct IBF_Key ibf_key;
472   struct KeyEntry *k;
473
474   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
475   k = GNUNET_new (struct KeyEntry);
476   k->element = ee;
477   k->ibf_key = ibf_key;
478   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
479                                                       (uint32_t) ibf_key.key_val,
480                                                       op_register_element_iterator, k);
481
482   /* was the element inserted into a colliding bucket? */
483   if (GNUNET_SYSERR == ret)
484     return;
485
486   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
487                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
488 }
489
490
491 /**
492  * Insert a key into an ibf.
493  *
494  * @param cls the ibf
495  * @param key unused
496  * @param value the key entry to get the key from
497  */
498 static int
499 prepare_ibf_iterator (void *cls,
500                       uint32_t key,
501                       void *value)
502 {
503   struct InvertibleBloomFilter *ibf = cls;
504   struct KeyEntry *ke = value;
505
506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
507
508   ibf_insert (ibf, ke->ibf_key);
509   return GNUNET_YES;
510 }
511
512
513 /**
514  * Iterator for initializing the
515  * key-to-element mapping of a intersection operation
516  *
517  * @param cls the intersection operation
518  * @param key unised
519  * @param value the element entry to insert
520  *        into the key-to-element mapping
521  * @return GNUNET_YES to continue iterating,
522  *         GNUNET_NO to stop
523  */
524 static int
525 init_key_to_element_iterator (void *cls,
526                               const struct GNUNET_HashCode *key,
527                               void *value)
528 {
529   struct OperationState *eo = cls;
530   struct ElementEntry *e = value;
531
532   /* make sure that the element belongs to the set at the time
533    * of creating the operation */
534   if ( (e->generation_added > eo->generation_created) ||
535        ( (GNUNET_YES == e->removed) &&
536          (e->generation_removed < eo->generation_created)))
537     return GNUNET_YES;
538
539   GNUNET_assert (GNUNET_NO == e->remote);
540
541   op_register_element (eo, e);
542   return GNUNET_YES;
543 }
544
545
546 /**
547  * Create an ibf with the operation's elements
548  * of the specified size
549  *
550  * @param eo the intersection operation
551  * @param size size of the ibf to create
552  */
553 static void
554 prepare_ibf (struct OperationState *eo, uint16_t size)
555 {
556   if (NULL == eo->key_to_element)
557   {
558     unsigned int len;
559     len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
560     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
561     GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
562                                            init_key_to_element_iterator, eo);
563   }
564   if (NULL != eo->local_ibf)
565     ibf_destroy (eo->local_ibf);
566   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
567   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
568                                            prepare_ibf_iterator, eo->local_ibf);
569 }
570
571
572 /**
573  * Send an ibf of appropriate size.
574  *
575  * @param eo the intersection operation
576  * @param ibf_order order of the ibf to send, size=2^order
577  */
578 static void
579 send_ibf (struct OperationState *eo, uint16_t ibf_order)
580 {
581   unsigned int buckets_sent = 0;
582   struct InvertibleBloomFilter *ibf;
583
584   prepare_ibf (eo, 1<<ibf_order);
585
586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
587
588   ibf = eo->local_ibf;
589
590   while (buckets_sent < (1 << ibf_order))
591   {
592     unsigned int buckets_in_message;
593     struct GNUNET_MQ_Envelope *ev;
594     struct IBFMessage *msg;
595
596     buckets_in_message = (1 << ibf_order) - buckets_sent;
597     /* limit to maximum */
598     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
599       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
600
601     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
602                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
603     msg->reserved = 0;
604     msg->order = ibf_order;
605     msg->offset = htons (buckets_sent);
606     ibf_write_slice (ibf, buckets_sent,
607                      buckets_in_message, &msg[1]);
608     buckets_sent += buckets_in_message;
609     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
610                 buckets_in_message, buckets_sent, 1<<ibf_order);
611     GNUNET_MQ_send (eo->mq, ev);
612   }
613
614   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
615 }
616
617
618 /**
619  * Send a strata estimator to the remote peer.
620  *
621  * @param eo the intersection operation with the remote peer
622  */
623 static void
624 send_strata_estimator (struct OperationState *eo)
625 {
626   struct GNUNET_MQ_Envelope *ev;
627   struct GNUNET_MessageHeader *strata_msg;
628
629   ev = GNUNET_MQ_msg_header_extra (strata_msg,
630                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
631                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
632   strata_estimator_write (eo->set->state->se, &strata_msg[1]);
633   GNUNET_MQ_send (eo->mq, ev);
634   eo->phase = PHASE_EXPECT_BF;
635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
636 }
637
638
639 /**
640  * Compute the necessary order of an ibf
641  * from the size of the symmetric set difference.
642  *
643  * @param diff the difference
644  * @return the required size of the ibf
645  */
646 static unsigned int
647 get_order_from_difference (unsigned int diff)
648 {
649   unsigned int ibf_order;
650
651   ibf_order = 2;
652   while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
653     ibf_order++;
654   if (ibf_order > MAX_IBF_ORDER)
655     ibf_order = MAX_IBF_ORDER;
656   return ibf_order;
657 }
658
659
660 /**
661  * Handle a strata estimator from a remote peer
662  *
663  * @param cls the intersection operation
664  * @param mh the message
665  */
666 static void
667 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
668 {
669   struct OperationState *eo = cls;
670   struct StrataEstimator *remote_se;
671   int diff;
672
673   if (eo->phase != PHASE_EXPECT_SE)
674   {
675     fail_intersection_operation (eo);
676     GNUNET_break (0);
677     return;
678   }
679   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
680                                        SE_IBF_HASH_NUM);
681   strata_estimator_read (&mh[1], remote_se);
682   GNUNET_assert (NULL != eo->se);
683   diff = strata_estimator_difference (remote_se, eo->se);
684   strata_estimator_destroy (remote_se);
685   strata_estimator_destroy (eo->se);
686   eo->se = NULL;
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
688               diff, 1<<get_order_from_difference (diff));
689   send_ibf (eo, get_order_from_difference (diff));
690 }
691
692
693
694 /**
695  * Iterator to send elements to a remote peer
696  *
697  * @param cls closure with the element key and the intersection operation
698  * @param key ignored
699  * @param value the key entry
700  */
701 static int
702 send_element_iterator (void *cls,
703                        uint32_t key,
704                        void *value)
705 {
706   struct SendElementClosure *sec = cls;
707   struct IBF_Key ibf_key = sec->ibf_key;
708   struct OperationState *eo = sec->eo;
709   struct KeyEntry *ke = value;
710
711   if (ke->ibf_key.key_val != ibf_key.key_val)
712     return GNUNET_YES;
713   while (NULL != ke)
714   {
715     const struct GNUNET_SET_Element *const element = &ke->element->element;
716     struct GNUNET_MQ_Envelope *ev;
717     struct GNUNET_MessageHeader *mh;
718
719     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
720     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
721     if (NULL == ev)
722     {
723       /* element too large */
724       GNUNET_break (0);
725       continue;
726     }
727     memcpy (&mh[1], element->data, element->size);
728     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
729                 GNUNET_h2s (&ke->element->element_hash));
730     GNUNET_MQ_send (eo->mq, ev);
731     ke = ke->next_colliding;
732   }
733   return GNUNET_NO;
734 }
735
736 /**
737  * Send all elements that have the specified IBF key
738  * to the remote peer of the intersection operation
739  *
740  * @param eo intersection operation
741  * @param ibf_key IBF key of interest
742  */
743 static void
744 send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
745 {
746   struct SendElementClosure send_cls;
747
748   send_cls.ibf_key = ibf_key;
749   send_cls.eo = eo;
750   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
751                                                 &send_element_iterator, &send_cls);
752 }
753
754
755 /**
756  * Decode which elements are missing on each side, and
757  * send the appropriate elemens and requests
758  *
759  * @param eo intersection operation
760  */
761 static void
762 decode_and_send (struct OperationState *eo)
763 {
764   struct IBF_Key key;
765   struct IBF_Key last_key;
766   int side;
767   unsigned int num_decoded;
768   struct InvertibleBloomFilter *diff_ibf;
769
770   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
771
772   prepare_ibf (eo, eo->remote_ibf->size);
773   diff_ibf = ibf_dup (eo->local_ibf);
774   ibf_subtract (diff_ibf, eo->remote_ibf);
775
776   ibf_destroy (eo->remote_ibf);
777   eo->remote_ibf = NULL;
778
779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
780
781   num_decoded = 0;
782   last_key.key_val = 0;
783
784   while (1)
785   {
786     int res;
787     int cycle_detected = GNUNET_NO;
788
789     last_key = key;
790
791     res = ibf_decode (diff_ibf, &side, &key);
792     if (res == GNUNET_OK)
793     {
794       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
795                   key.key_val);
796       num_decoded += 1;
797       if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
798       {
799         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
800                     num_decoded, diff_ibf->size);
801         cycle_detected = GNUNET_YES;
802       }
803     }
804     if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
805     {
806       int next_order;
807       next_order = 0;
808       while (1<<next_order < diff_ibf->size)
809         next_order++;
810       next_order++;
811       if (next_order <= MAX_IBF_ORDER)
812       {
813         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
814                     "decoding failed, sending larger ibf (size %u)\n",
815                     1<<next_order);
816         send_ibf (eo, next_order);
817       }
818       else
819       {
820         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
821                     "set intersection failed: reached ibf limit\n");
822       }
823       break;
824     }
825     if (GNUNET_NO == res)
826     {
827       struct GNUNET_MQ_Envelope *ev;
828
829       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
830       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
831       GNUNET_MQ_send (eo->mq, ev);
832       break;
833     }
834     if (1 == side)
835     {
836       send_elements_for_key (eo, key);
837     }
838     else if (-1 == side)
839     {
840       struct GNUNET_MQ_Envelope *ev;
841       struct GNUNET_MessageHeader *msg;
842
843       /* FIXME: before sending the request, check if we may just have the element */
844       /* FIXME: merge multiple requests */
845       /* FIXME: remember somewhere that we already requested the element,
846        * so that we don't request it again with the next ibf if decoding fails */
847       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
848                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
849
850       *(struct IBF_Key *) &msg[1] = key;
851       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
852       GNUNET_MQ_send (eo->mq, ev);
853     }
854     else
855     {
856       GNUNET_assert (0);
857     }
858   }
859   ibf_destroy (diff_ibf);
860 }
861
862
863 /**
864  * Handle an IBF message from a remote peer.
865  *
866  * @param cls the intersection operation
867  * @param mh the header of the message
868  */
869 static void
870 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
871 {
872   struct OperationState *eo = cls;
873   struct IBFMessage *msg = (struct IBFMessage *) mh;
874   unsigned int buckets_in_message;
875
876   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
877        (eo->phase == PHASE_EXPECT_BF) )
878   {
879     eo->phase = PHASE_EXPECT_BF_CONT;
880     GNUNET_assert (NULL == eo->remote_ibf);
881     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
882     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
883     eo->ibf_buckets_received = 0;
884     if (0 != ntohs (msg->offset))
885     {
886       GNUNET_break (0);
887       fail_intersection_operation (eo);
888       return;
889     }
890   }
891   else if (eo->phase == PHASE_EXPECT_BF_CONT)
892   {
893     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
894          (1<<msg->order != eo->remote_ibf->size) )
895     {
896       GNUNET_break (0);
897       fail_intersection_operation (eo);
898       return;
899     }
900   }
901
902   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
903
904   if (0 == buckets_in_message)
905   {
906     GNUNET_break_op (0);
907     fail_intersection_operation (eo);
908     return;
909   }
910
911   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
912   {
913     GNUNET_break (0);
914     fail_intersection_operation (eo);
915     return;
916   }
917
918   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
919   eo->ibf_buckets_received += buckets_in_message;
920
921   if (eo->ibf_buckets_received == eo->remote_ibf->size)
922   {
923     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
924     eo->phase = PHASE_EXPECT_ELEMENTS;
925     decode_and_send (eo);
926   }
927 }
928
929
930 /**
931  * Send a result message to the client indicating
932  * that there is a new element.
933  *
934  * @param eo intersection operation
935  * @param element element to send
936  */
937 static void
938 send_client_element (struct OperationState *eo,
939                      struct GNUNET_SET_Element *element)
940 {
941   struct GNUNET_MQ_Envelope *ev;
942   struct GNUNET_SET_ResultMessage *rm;
943
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
945   GNUNET_assert (0 != eo->spec->client_request_id);
946   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
947   if (NULL == ev)
948   {
949     GNUNET_MQ_discard (ev);
950     GNUNET_break (0);
951     return;
952   }
953   rm->result_status = htons (GNUNET_SET_STATUS_OK);
954   rm->request_id = htonl (eo->spec->client_request_id);
955   rm->element_type = element->type;
956   memcpy (&rm[1], element->data, element->size);
957   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
958 }
959
960
961 /**
962  * Send a result message to the client indicating
963  * that the operation is over.
964  * After the result done message has been sent to the client,
965  * destroy the evaluate operation.
966  *
967  * @param eo intersection operation
968  */
969 static void
970 send_client_done_and_destroy (struct OperationState *eo)
971 {
972   struct GNUNET_MQ_Envelope *ev;
973   struct GNUNET_SET_ResultMessage *rm;
974
975   GNUNET_assert (GNUNET_NO == eo->client_done_sent);
976
977   eo->client_done_sent = GNUNET_YES;
978
979   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
980   rm->request_id = htonl (eo->spec->client_request_id);
981   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
982   rm->element_type = htons (0);
983   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
984
985   intersection_operation_destroy (eo);
986 }
987
988
989 /**
990  * Handle an element message from a remote peer.
991  *
992  * @param cls the intersection operation
993  * @param mh the message
994  */
995 static void
996 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
997 {
998   struct OperationState *eo = cls;
999   struct ElementEntry *ee;
1000   uint16_t element_size;
1001
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1003
1004   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1005        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1006   {
1007     fail_intersection_operation (eo);
1008     GNUNET_break (0);
1009     return;
1010   }
1011   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1012   ee = GNUNET_malloc (sizeof *ee + element_size);
1013   memcpy (&ee[1], &mh[1], element_size);
1014   ee->element.size = element_size;
1015   ee->element.data = &ee[1];
1016   ee->remote = GNUNET_YES;
1017   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1018
1019   /* FIXME: see if the element has already been inserted! */
1020
1021   op_register_element (eo, ee);
1022   send_client_element (eo, &ee->element);
1023 }
1024
1025
1026 /**
1027  * Handle an element request from a remote peer.
1028  *
1029  * @param cls the intersection operation
1030  * @param mh the message
1031  */
1032 static void
1033 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1034 {
1035   struct OperationState *eo = cls;
1036   struct IBF_Key *ibf_key;
1037   unsigned int num_keys;
1038
1039   /* look up elements and send them */
1040   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1041   {
1042     GNUNET_break (0);
1043     fail_intersection_operation (eo);
1044     return;
1045   }
1046
1047   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1048
1049   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1050   {
1051     GNUNET_break (0);
1052     fail_intersection_operation (eo);
1053     return;
1054   }
1055
1056   ibf_key = (struct IBF_Key *) &mh[1];
1057   while (0 != num_keys--)
1058   {
1059     send_elements_for_key (eo, *ibf_key);
1060     ibf_key++;
1061   }
1062 }
1063
1064
1065 /**
1066  * Handle a done message from a remote peer
1067  *
1068  * @param cls the intersection operation
1069  * @param mh the message
1070  */
1071 static void
1072 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1073 {
1074   struct OperationState *eo = cls;
1075   struct GNUNET_MQ_Envelope *ev;
1076
1077   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1078   {
1079     /* we got all requests, but still have to send our elements as response */
1080
1081     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1082     eo->phase = PHASE_FINISHED;
1083     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1084     GNUNET_MQ_send (eo->mq, ev);
1085     return;
1086   }
1087   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1088   {
1089     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1090     eo->phase = PHASE_FINISHED;
1091     send_client_done_and_destroy (eo);
1092     return;
1093   }
1094   GNUNET_break (0);
1095   fail_intersection_operation (eo);
1096 }
1097
1098
1099 /**
1100  * Evaluate a intersection operation with
1101  * a remote peer.
1102  *
1103  * @param spec specification of the operation the evaluate
1104  * @param tunnel tunnel already connected to the partner peer
1105  * @param tc tunnel context, passed here so all new incoming
1106  *        messages are directly going to the intersection operations
1107  * @return a handle to the operation
1108  */
1109 static void
1110 intersection_evaluate (struct OperationSpecification *spec,
1111                 struct GNUNET_MESH_Tunnel *tunnel,
1112                 struct TunnelContext *tc)
1113 {
1114   struct OperationState *eo;
1115
1116   eo = GNUNET_new (struct OperationState);
1117   tc->vt = _GSS_intersection_vt ();
1118   tc->op = eo;
1119   eo->se = strata_estimator_dup (spec->set->state->se);
1120   eo->generation_created = spec->set->current_generation++;
1121   eo->set = spec->set;
1122   eo->spec = spec;
1123   eo->tunnel = tunnel;
1124   eo->mq = GNUNET_MESH_mq_create (tunnel);
1125
1126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127               "evaluating intersection operation, (app %s)\n",
1128               GNUNET_h2s (&eo->spec->app_id));
1129
1130   /* we started the operation, thus we have to send the operation request */
1131   eo->phase = PHASE_EXPECT_SE;
1132
1133   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1134                                eo->set->state->ops_tail,
1135                                eo);
1136
1137   send_operation_request (eo);
1138 }
1139
1140
1141 /**
1142  * Accept an intersection operation request from a remote peer
1143  *
1144  * @param spec all necessary information about the operation
1145  * @param tunnel open tunnel to the partner's peer
1146  * @param tc tunnel context, passed here so all new incoming
1147  *        messages are directly going to the intersection operations
1148  * @return operation
1149  */
1150 static void
1151 intersection_accept (struct OperationSpecification *spec,
1152               struct GNUNET_MESH_Tunnel *tunnel,
1153               struct TunnelContext *tc)
1154 {
1155   struct OperationState *eo;
1156
1157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set intersection operation\n");
1158
1159   eo = GNUNET_new (struct OperationState);
1160   tc->vt = _GSS_intersection_vt ();
1161   tc->op = eo;
1162   eo->set = spec->set;
1163   eo->generation_created = eo->set->current_generation++;
1164   eo->spec = spec;
1165   eo->tunnel = tunnel;
1166   eo->mq = GNUNET_MESH_mq_create (tunnel);
1167   eo->se = strata_estimator_dup (eo->set->state->se);
1168   /* transfer ownership of mq and socket from incoming to eo */
1169   GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1170                                eo->set->state->ops_tail,
1171                                eo);
1172   /* kick off the operation */
1173   send_strata_estimator (eo);
1174 }
1175
1176
1177 /**
1178  * Create a new set supporting the intersection operation
1179  *
1180  * @return the newly created set
1181  */
1182 static struct SetState *
1183 intersection_set_create (void)
1184 {
1185   struct SetState *set_state;
1186
1187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
1188
1189   set_state = GNUNET_new (struct SetState);
1190   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1191                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1192   return set_state;
1193 }
1194
1195
1196 /**
1197  * Add the element from the given element message to the set.
1198  *
1199  * @param set_state state of the set want to add to
1200  * @param ee the element to add to the set
1201  */
1202 static void
1203 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
1204 {
1205   strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1206 }
1207
1208
1209 /**
1210  * Destroy a set that supports the intersection operation
1211  *
1212  * @param set_state the set to destroy
1213  */
1214 static void
1215 intersection_set_destroy (struct SetState *set_state)
1216 {
1217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection set\n");
1218   /* important to destroy operations before the rest of the set */
1219   while (NULL != set_state->ops_head)
1220     intersection_operation_destroy (set_state->ops_head);
1221   if (NULL != set_state->se)
1222   {
1223     strata_estimator_destroy (set_state->se);
1224     set_state->se = NULL;
1225   }
1226   GNUNET_free (set_state);
1227 }
1228
1229
1230 /**
1231  * Remove the element given in the element message from the set.
1232  * Only marks the element as removed, so that older set operations can still exchange it.
1233  *
1234  * @param set_state state of the set to remove from
1235  * @param element set element to remove
1236  */
1237 static void
1238 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
1239 {
1240   /* FIXME: remove from strata estimator */
1241 }
1242
1243
1244 /**
1245  * Dispatch messages for a intersection operation.
1246  *
1247  * @param eo the state of the intersection evaluate operation
1248  * @param mh the received message
1249  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1250  *         GNUNET_OK otherwise
1251  */
1252 int
1253 intersection_handle_p2p_message (struct OperationState *eo,
1254                           const struct GNUNET_MessageHeader *mh)
1255 {
1256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1257               ntohs (mh->type), ntohs (mh->size));
1258   switch (ntohs (mh->type))
1259   {
1260     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1261       handle_p2p_ibf (eo, mh);
1262       break;
1263     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1264       handle_p2p_strata_estimator (eo, mh);
1265       break;
1266     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1267       handle_p2p_elements (eo, mh);
1268       break;
1269     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1270       handle_p2p_element_requests (eo, mh);
1271       break;
1272     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1273       handle_p2p_done (eo, mh);
1274       break;
1275     default:
1276       /* something wrong with mesh's message handlers? */
1277       GNUNET_assert (0);
1278   }
1279   return GNUNET_OK;
1280 }
1281
1282
1283 static void
1284 intersection_peer_disconnect (struct OperationState *op)
1285 {
1286   /* Are we already disconnected? */
1287   if (NULL == op->tunnel)
1288     return;
1289   op->tunnel = NULL;
1290   if (NULL != op->mq)
1291   {
1292     GNUNET_MQ_destroy (op->mq);
1293     op->mq = NULL;
1294   }
1295   if (PHASE_FINISHED != op->phase)
1296   {
1297     struct GNUNET_MQ_Envelope *ev;
1298     struct GNUNET_SET_ResultMessage *msg;
1299
1300     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1301     msg->request_id = htonl (op->spec->client_request_id);
1302     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1303     msg->element_type = htons (0);
1304     GNUNET_MQ_send (op->spec->set->client_mq, ev);
1305     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1306     intersection_operation_destroy (op);
1307     return;
1308   }
1309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1310   if (GNUNET_NO == op->client_done_sent)
1311     send_client_done_and_destroy (op);
1312 }
1313
1314
1315 static void
1316 intersection_op_cancel (struct SetState *set_state, uint32_t op_id)
1317 {
1318   /* FIXME: implement */
1319 }
1320
1321
1322 const struct SetVT *
1323 _GSS_intersection_vt ()
1324 {
1325   static const struct SetVT intersection_vt = {
1326     .create = &intersection_set_create,
1327     .msg_handler = &intersection_handle_p2p_message,
1328     .add = &intersection_add,
1329     .remove = &intersection_remove,
1330     .destroy_set = &intersection_set_destroy,
1331     .evaluate = &intersection_evaluate,
1332     .accept = &intersection_accept,
1333     .peer_disconnect = &intersection_peer_disconnect,
1334     .cancel = &intersection_op_cancel,
1335   };
1336
1337   return &intersection_vt;
1338 }