work on gnunet-set, isolated bug in stream
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 enum UnionOperationState
64 {
65   STATE_EXPECT_SE,
66   STATE_EXPECT_IBF,
67   STATE_EXPECT_IBF_CONT,
68   STATE_EXPECT_ELEMENTS,
69   STATE_EXPECT_ELEMENTS_AND_REQUESTS,
70   STATE_WAIT_SENT_DONE,
71   STATE_FINISHED
72 };
73
74
75 /**
76  * State of an evaluate operation
77  * with another peer.
78  */
79 struct UnionEvaluateOperation
80 {
81   /**
82    * Local set the operation is evaluated on
83    */
84   struct Set *set;
85
86   /**
87    * Peer with the remote set
88    */
89   struct GNUNET_PeerIdentity peer;
90
91   /**
92    * Application-specific identifier
93    */
94   struct GNUNET_HashCode app_id;
95
96   /**
97    * Context message, given to us
98    * by the client, may be NULL.
99    */
100   struct GNUNET_MessageHeader *context_msg;
101
102   /**
103    * Stream socket connected to the other peer
104    */
105   struct GNUNET_STREAM_Socket *socket;
106
107   /**
108    * Message queue for the peer on the other
109    * end
110    */
111   struct GNUNET_MQ_MessageQueue *mq;
112
113   /**
114    * Type of this operation
115    */
116   enum GNUNET_SET_OperationType operation;
117
118   /**
119    * GNUNET_YES if we started the operation,
120    * GNUNET_NO if the other peer started it.
121    */
122   int is_outgoing;
123
124   /**
125    * Request id, so we can use one client handle
126    * for multiple operations
127    */
128   uint32_t request_id;
129
130   /* last difference estimate */
131   unsigned int diff;
132
133   /**
134    * Number of ibf buckets received
135    */
136   unsigned int ibf_buckets_received;
137
138   /**
139    * Current salt in use, zero unless
140    * we detected a collision
141    */
142   uint8_t salt;
143
144   /**
145    * order of the ibf we receive
146    */
147   unsigned int ibf_order;
148
149   struct StrataEstimator *se;
150
151   /**
152    * The ibf we currently receive
153    */
154   struct InvertibleBloomFilter *remote_ibf;
155
156   /**
157    * Array of IBFs, some of them pre-allocated
158    */
159   struct InvertibleBloomFilter *local_ibf;
160
161   /**
162    * Elements we received from the other peer.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap *received_elements;
165
166   /**
167    * Maps IBF-Keys (specific to the current salt) to elements.
168    */
169   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
170
171   /**
172    * Current state of the operation
173    */
174   enum UnionOperationState state;
175   
176   /**
177    * Evaluate operations are held in
178    * a linked list.
179    */
180   struct UnionEvaluateOperation *next;
181   
182    /**
183    * Evaluate operations are held in
184    * a linked list.
185    */
186   struct UnionEvaluateOperation *prev;
187 };
188
189 /**
190  * Information about the element in a set.
191  * All elements are stored in a hash-table
192  * from their hash-code to their 'struct Element',
193  * so that the remove and add operations are reasonably
194  * fast.
195  */
196 struct ElementEntry
197 {
198   /**
199    * The actual element. The data for the element
200    * should be allocated at the end of this struct.
201    */
202   struct GNUNET_SET_Element element;
203
204   /**
205    * Hash of the element.
206    * Will be used to derive the different IBF keys
207    * for different salts.
208    */
209   struct GNUNET_HashCode element_hash;
210
211   /**
212    * Generation the element was added.
213    * Operations of earlier generations will not consider the element.
214    */
215   int generation_add;
216
217   /**
218    * Generation this element was removed.
219    * Operations of later generations will not consider the element.
220    */
221   int generation_remove;
222
223   /**
224    * GNUNET_YES if we received the element from a remote peer, and not
225    * from the local peer.  Note that if the local client inserts an
226    * element *after* we got it from a remote peer, the element is
227    * considered local.
228    */
229   int remote;
230 };
231
232 /**
233  * Information about the element used for 
234  * a specific union operation.
235  */
236 struct KeyEntry
237 {
238   struct IBF_Key ibf_key;
239
240   /**
241    * The actual element associated with the key
242    */
243   struct ElementEntry *element;
244
245   /**
246    * Element that collides with this element
247    * on the ibf key
248    */
249   struct KeyEntry *next_colliding;
250 };
251
252
253 /**
254  * Extra state required for efficient set union.
255  */
256 struct UnionState
257 {
258   /**
259    * The strata estimator is only generated once for
260    * each set.
261    */
262   struct StrataEstimator *se;
263
264   /**
265    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
266    */
267   struct GNUNET_CONTAINER_MultiHashMap *elements;
268
269   /**
270    * Evaluate operations are held in
271    * a linked list.
272    */
273   struct UnionEvaluateOperation *ops_head;
274
275   /**
276    * Evaluate operations are held in
277    * a linked list.
278    */
279   struct UnionEvaluateOperation *ops_tail;
280 };
281
282
283 static struct IBF_Key
284 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
285 {
286    
287   struct IBF_Key key;
288   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
289                       GCRY_MD_SHA512, GCRY_MD_SHA256,
290                       src, sizeof *src,
291                       &salt, sizeof (salt),
292                       NULL, 0);
293   return key;
294 }
295
296
297 static void
298 send_operation_request (struct UnionEvaluateOperation *eo)
299 {
300   struct GNUNET_MQ_Message *mqm;
301   struct OperationRequestMessage *msg;
302   int ret;
303
304   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
305   ret = GNUNET_MQ_nest (mqm, eo->context_msg);
306   if (GNUNET_OK != ret)
307   {
308     /* the context message is too large */
309     _GSS_client_disconnect (eo->set->client);
310     GNUNET_MQ_discard (mqm);
311     GNUNET_break (0);
312     return;
313   }
314   msg->operation = eo->operation;
315   msg->app_id = eo->app_id;
316   GNUNET_MQ_send (eo->mq, mqm);
317
318   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
319 }
320
321
322 /**
323  * Iterator to create the mapping between ibf keys
324  * and element entries.
325  *
326  * @param cls closure
327  * @param key current key code
328  * @param value value in the hash map
329  * @return GNUNET_YES if we should continue to
330  *         iterate,
331  *         GNUNET_NO if not.
332  */
333 static int
334 insert_element_iterator (void *cls,
335                          uint32_t key,
336                          void *value)
337 {
338   struct KeyEntry *const new_k = cls;
339   struct KeyEntry *old_k = value;
340
341   GNUNET_assert (NULL != old_k);
342   do
343   {
344     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
345     {
346       new_k->next_colliding = old_k;
347       old_k->next_colliding = new_k;
348       return GNUNET_NO;
349     }
350     old_k = old_k->next_colliding;
351   } while (NULL != old_k);
352   return GNUNET_YES;
353 }
354
355
356 static void
357 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
358 {
359   int ret;
360   struct IBF_Key ibf_key;
361   struct KeyEntry *k;
362
363   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
364   k = GNUNET_new (struct KeyEntry);
365   k->element = ee;
366   k->ibf_key = ibf_key;
367   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
368                                                     insert_element_iterator, k);
369   /* was the element inserted into a colliding bucket? */
370   if (GNUNET_SYSERR == ret)
371   {
372     GNUNET_assert (NULL != k->next_colliding);
373     return;
374   }
375   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
376                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
377   if (NULL != eo->local_ibf)
378     ibf_insert (eo->local_ibf, ibf_key);
379 }
380
381
382 static int
383 prepare_ibf_iterator (void *cls,
384                      uint32_t key,
385                      void *value)
386 {
387   struct InvertibleBloomFilter *ibf = cls;
388   struct KeyEntry *ke = value;
389
390   ibf_insert (ibf, ke->ibf_key);
391   return GNUNET_YES;
392 }
393
394 static int
395 init_key_to_element_iterator (void *cls,
396                               const struct GNUNET_HashCode *key,
397                               void *value)
398 {
399   struct UnionEvaluateOperation *eo = cls;
400   struct ElementEntry *e = value;
401
402   insert_element (eo, e);
403   return GNUNET_YES;
404 }
405
406 static void
407 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
408 {
409   if (NULL == eo->key_to_element)
410   {
411     unsigned int len;
412     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
413     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
414     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
415                                              init_key_to_element_iterator, eo);
416   }
417   if (NULL != eo->local_ibf)
418     ibf_destroy (eo->local_ibf);
419   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
420   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, prepare_ibf_iterator, eo->local_ibf);
421 }
422
423
424 /**
425  * Send an ibf of appropriate size.
426  *
427  * @param cpi the peer
428  */
429 static void
430 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
431 {
432   unsigned int buckets_sent = 0;
433   struct InvertibleBloomFilter *ibf;
434
435   prepare_ibf (eo, ibf_order);
436
437   ibf = eo->local_ibf;
438
439   while (buckets_sent < (1 << ibf_order))
440   {
441     unsigned int buckets_in_message;
442     struct GNUNET_MQ_Message *mqm;
443     struct IBFMessage *msg;
444
445     buckets_in_message = (1 << ibf_order) - buckets_sent;
446     /* limit to maximum */
447     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
448       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
449
450     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
451                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
452     msg->order = htons (ibf_order);
453     msg->offset = htons (buckets_sent);
454     ibf_write_slice (ibf, buckets_sent,
455                      buckets_in_message, &msg[1]);
456     buckets_sent += buckets_in_message;
457     GNUNET_MQ_send (eo->mq, mqm);
458   }
459
460   eo->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS;
461 }
462
463
464 /**
465  * Send a strata estimator.
466  *
467  * @param cpi the peer
468  */
469 static void
470 send_strata_estimator (struct UnionEvaluateOperation *eo)
471 {
472   struct GNUNET_MQ_Message *mqm;
473   struct GNUNET_MessageHeader *strata_msg;
474
475   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
476                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
477                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
478   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
479   GNUNET_MQ_send (eo->mq, mqm);
480   eo->state = STATE_EXPECT_IBF;
481 }
482
483
484 static void
485 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
486 {
487   struct UnionEvaluateOperation *eo = cls;
488   struct StrataEstimator *remote_se;
489   int ibf_order;
490   int diff;
491
492   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
493
494   if (eo->state != STATE_EXPECT_SE)
495   {
496     /* FIXME: handle */
497     GNUNET_break (0);
498     return;
499   }
500   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
501                                        SE_IBF_HASH_NUM);
502   strata_estimator_read (&mh[1], remote_se);
503   GNUNET_assert (NULL != eo->se);
504   diff = strata_estimator_difference (remote_se, eo->se);
505   strata_estimator_destroy (remote_se);
506   strata_estimator_destroy (eo->se);
507   eo->se = NULL;
508   /* minimum order */
509   ibf_order = 2;
510   while ((1<<ibf_order) < (2 * diff))
511     ibf_order++;
512   if (ibf_order > MAX_IBF_ORDER)
513     ibf_order = MAX_IBF_ORDER;
514   send_ibf (eo, ibf_order);
515 }
516
517
518 /**
519  * FIXME
520  *
521  * @param
522  */
523 static void
524 decode (struct UnionEvaluateOperation *eo)
525 {
526   struct IBF_Key key;
527   int side;
528   struct InvertibleBloomFilter *diff_ibf;
529
530   GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->state);
531
532   prepare_ibf (eo, eo->ibf_order);
533   diff_ibf = ibf_dup (eo->local_ibf);
534   ibf_subtract (diff_ibf, eo->remote_ibf);
535
536   while (1)
537   {
538     int res;
539
540     res = ibf_decode (diff_ibf, &side, &key);
541     if (GNUNET_SYSERR == res)
542     {
543       /* decoding failed, we tell the other peer by sending our ibf
544        * with a larger order */
545       GNUNET_assert (0);
546       return;
547     }
548     if (GNUNET_NO == res)
549     {
550       struct GNUNET_MQ_Message *mqm;
551       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
552       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
553       GNUNET_MQ_send (eo->mq, mqm);
554       return;
555     }
556     if (1 == side)
557     {
558       //struct ElementEntry *e;
559       /* we have the element(s), send it to the other peer */
560       //GNUNET_CONTAINER_multihashmap32_get_multiple (eo->set->state.u->elements,
561       //                                              (uint32_t) key.key_val);
562       /* FIXME */
563     }
564     else
565     {
566       struct GNUNET_MQ_Message *mqm;
567       struct GNUNET_MessageHeader *msg;
568
569       /* FIXME: before sending the request, check if we may just have the element */
570       /* FIXME: merge multiple requests */
571       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
572       *(struct IBF_Key *) &msg[1] = key;
573       GNUNET_MQ_send (eo->mq, mqm);
574     }
575   }
576 }
577
578
579 static void
580 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
581 {
582   struct UnionEvaluateOperation *eo = cls;
583   struct IBFMessage *msg = (struct IBFMessage *) mh;
584   unsigned int buckets_in_message;
585
586   if (eo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS)
587   {
588     /* check that the ibf is a new one / first part */
589     /* clear outgoing messages */
590     GNUNET_assert (0);
591   }
592   else if (eo->state == STATE_EXPECT_IBF)
593   {
594     eo->state = STATE_EXPECT_IBF_CONT;
595     eo->ibf_order = msg->order;
596     GNUNET_assert (NULL == eo->remote_ibf);
597     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
598     if (ntohs (msg->offset) != 0)
599     {
600       /* FIXME: handle */
601       GNUNET_assert (0);
602     }
603   }
604   else if (eo->state == STATE_EXPECT_IBF_CONT)
605   {
606     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
607          (msg->order != eo->ibf_order) )
608     {
609       /* FIXME: handle */
610       GNUNET_assert (0);
611     }
612   }
613
614   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
615
616   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
617   {
618     /* FIXME: handle, message was malformed */
619     GNUNET_assert (0);
620   }
621
622   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
623   eo->ibf_buckets_received += buckets_in_message;
624
625   if (eo->ibf_buckets_received == (1<<eo->ibf_order))
626   {
627     eo->state = STATE_EXPECT_ELEMENTS;
628     decode (eo);
629   }
630 }
631
632
633 static void
634 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
635 {
636   struct UnionEvaluateOperation *eo = cls;
637
638   if ( (eo->state != STATE_EXPECT_ELEMENTS) &&
639        (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) )
640   {
641     /* FIXME: handle */
642     GNUNET_break (0);
643     return;
644   }
645 }
646
647
648 static void
649 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
650 {
651   struct UnionEvaluateOperation *eo = cls;
652
653   /* look up elements and send them */
654   if (eo->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS)
655   {
656     /* FIXME: handle */
657     GNUNET_break (0);
658     return;
659   }
660 }
661
662
663 static void
664 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
665 {
666   GNUNET_break (0);
667 }
668
669
670 static const struct GNUNET_MQ_Handler union_handlers[] = {
671   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
672   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
673   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
674   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
675   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
676   GNUNET_MQ_HANDLERS_END
677 };
678
679
680 /**
681  * Functions of this type will be called when a stream is established
682  * 
683  * @param cls the closure from GNUNET_STREAM_open
684  * @param socket socket to use to communicate with the
685  *        other side (read/write)
686  */
687 static void
688 stream_open_cb (void *cls,
689                 struct GNUNET_STREAM_Socket *socket)
690 {
691   struct UnionEvaluateOperation *eo = cls;
692
693   GNUNET_assert (NULL == eo->mq);
694   GNUNET_assert (socket == eo->socket);
695
696   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
697
698   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
699                                               union_handlers, eo);
700   /* we started the operation, thus we have to send the operation request */
701   send_operation_request (eo);
702   eo->state = STATE_EXPECT_SE;
703 }
704         
705
706 void
707 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
708 {
709   struct UnionEvaluateOperation *eo;
710
711   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
712
713   eo = GNUNET_new (struct UnionEvaluateOperation);
714   eo->peer = m->peer;
715   eo->set = set;
716   eo->socket = 
717       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
718                           stream_open_cb, eo,
719                           GNUNET_STREAM_OPTION_END);
720 }
721
722
723 void
724 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
725                    struct Incoming *incoming)
726 {
727   struct UnionEvaluateOperation *eo;
728
729   eo = GNUNET_new (struct UnionEvaluateOperation);
730   eo->set = set;
731   eo->peer = incoming->peer;
732   eo->app_id = incoming->app_id;
733   eo->salt = ntohs (incoming->salt);
734   eo->request_id = m->request_id;
735   eo->set = set;
736   eo->mq = incoming->mq;
737   /* the peer's socket is now ours, we'll receive all messages */
738   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
739   /* kick of the operation */
740   send_strata_estimator (eo);
741 }
742
743
744 struct Set *
745 _GSS_union_set_create (void)
746 {
747   struct Set *set;
748
749   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
750   
751   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
752   set->state.u = (struct UnionState *) &set[1];
753   set->operation = GNUNET_SET_OPERATION_UNION;
754   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
755                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
756   return set;
757 }
758
759
760
761 void
762 _GSS_union_add (struct ElementMessage *m, struct Set *set)
763 {
764   struct ElementEntry *ee;
765   struct ElementEntry *ee_dup;
766   uint16_t element_size;
767   
768   element_size = ntohs (m->header.size) - sizeof *m;
769   ee = GNUNET_malloc (element_size + sizeof *ee);
770   ee->element.size = element_size;
771   ee->element.data = &ee[1];
772   memcpy (ee->element.data, &m[1], element_size);
773   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
774   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
775   if (NULL != ee_dup)
776   {
777     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
778     GNUNET_free (ee);
779     return;
780   }
781   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
782                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
783   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
784 }
785