-comments
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "set_protocol.h"
30 #include "ibf.h"
31 #include "strata_estimator.h"
32
33
34 /**
35  * Number of IBFs in a strata estimator.
36  */
37 #define SE_STRATA_COUNT 32
38 /**
39  * Size of the IBFs in the strata estimator.
40  */
41 #define SE_IBF_SIZE 80
42 /**
43  * hash num parameter for the difference digests and strata estimators
44  */
45 #define SE_IBF_HASH_NUM 3
46
47 /**
48  * Number of buckets that can be transmitted in one message.
49  */
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52
53 /**
54  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55  * Choose this value so that computing the IBF is still cheaper
56  * than transmitting all values.
57  */
58 #define MAX_IBF_ORDER (16)
59
60
61 enum UnionOperationState
62 {
63   STATE_EXPECT_SE,
64   STATE_EXPECT_IBF,
65   STATE_EXPECT_IBF_CONT,
66   STATE_EXPECT_ELEMENTS,
67   STATE_EXPECT_ELEMENTS_AND_REQUESTS,
68   STATE_WAIT_SENT_DONE,
69   STATE_FINISHED
70 };
71
72
73 /**
74  * State of an evaluate operation
75  * with another peer.
76  */
77 struct UnionEvaluateOperation
78 {
79   /* last difference estimate */
80   unsigned int diff;
81
82   /**
83    * Number of ibf buckets received
84    */
85   unsigned int ibf_buckets_received;
86
87   /**
88    * Current salt in use, zero unless
89    * we detected a collision
90    */
91   uint8_t salt;
92
93   /**
94    * order of the ibf we receive
95    */
96   unsigned int ibf_order;
97
98   /**
99    * The ibf we currently receive
100    */
101   struct InvertibleBloomFilter *ibf_received;
102
103   struct StrataEstimator *se;
104
105   /**
106    * Current state of the operation
107    */
108   enum UnionOperationState state;
109 };
110
111
112 /**
113  * Element entry, stored in the hash maps from
114  * partial IBF keys to elements.
115  */
116 struct ElementEntry
117 {
118   /**
119    * The actual element
120    */
121   struct GNUNET_SET_Element *element;
122
123   /**
124    * Actual ibf key of the element entry
125    */
126   struct IBF_Key ibf_key;
127
128   /**
129    * Linked list, note that the next element
130    * has to have an ibf_key that is lexicographically
131    * equal or larger.
132    */
133   struct ElementEntry *next;
134
135   /**
136    * GNUNET_YES if the element was received from
137    * the remote peer, and the local peer did not previously
138    * have it
139    */
140   int remote;
141 };
142
143
144 /**
145  * Extra state required for efficient set union.
146  */
147 struct UnionState
148 {
149   /**
150    * Strate estimator of the set we currently have,
151    * used for estimation of the symmetric difference
152    */
153   struct StrataEstimator *se;
154
155   /**
156    * Array of IBFs, some of them pre-allocated
157    */
158   struct InvertibleBloomFilter **ibfs;
159
160   /**
161    * Maps the first 32 bits of the ibf-key to
162    * elements.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *elements;
165 };
166
167
168 static void
169 send_operation_request (struct EvaluateOperation *eo)
170 {
171   struct GNUNET_MQ_Message *mqm;
172   struct OperationRequestMessage *msg;
173
174   mqm = GNUNET_MQ_msg_concat (msg, eo->context_msg,
175                               GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
176   if (NULL == mqm)
177   {
178     /* the context message is too large */
179     client_disconnect (eo->set->client);
180     GNUNET_break (0);
181     return;
182   }
183   msg->operation = eo->operation;
184   msg->app_id = eo->app_id;
185   GNUNET_MQ_send (eo->mq, mqm);
186 }
187
188
189 /**
190  * Iterator to insert values into an ibf.
191  *
192  * @param cls closure
193  * @param key current key code
194  * @param value value in the hash map
195  * @return GNUNET_YES if we should continue to
196  *         iterate,
197  *         GNUNET_NO if not.
198  */
199 static int
200 ibf_insert_iterator (void *cls,
201                      uint32_t key,
202                      void *value)
203 {
204   struct InvertibleBloomFilter *ibf = cls;
205   struct ElementEntry *e = value;
206   struct IBF_Key ibf_key;
207
208   GNUNET_assert (NULL != e);
209   ibf_key = e->ibf_key;
210   ibf_insert (ibf, ibf_key);
211   e = e->next;
212
213   while (NULL != e)
214   {
215     /* only insert keys we haven't seen yet */
216     if (0 != memcmp (&e->ibf_key, &ibf_key, sizeof ibf_key))
217     {
218       ibf_key = e->ibf_key;
219       ibf_insert (ibf, ibf_key);
220     }
221     e = e->next; 
222   }
223
224   return GNUNET_YES;
225 }
226
227
228 /**
229  * Create and populate an IBF for the specified peer,
230  * if it does not already exist.
231  *
232  * @param cpi peer to create the ibf for
233  */
234 static struct InvertibleBloomFilter *
235 prepare_ibf (struct EvaluateOperation *eo, uint16_t order)
236 {
237   struct UnionState *us = eo->set->extra.u;
238
239   GNUNET_assert (order <= MAX_IBF_ORDER);
240   if (NULL == us->ibfs)
241     us->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
242   if (NULL == us->ibfs[order])
243   {
244     us->ibfs[order] = ibf_create (1 << order, SE_IBF_HASH_NUM);
245     GNUNET_CONTAINER_multihashmap32_iterate (us->elements, ibf_insert_iterator, us->ibfs[order]);
246   }
247   return us->ibfs[order];
248 }
249
250
251 /**
252  * Send an ibf of appropriate size.
253  *
254  * @param cpi the peer
255  */
256 static void
257 send_ibf (struct EvaluateOperation *eo, uint16_t ibf_order)
258 {
259   unsigned int buckets_sent = 0;
260   struct InvertibleBloomFilter *ibf;
261
262   ibf = prepare_ibf (eo, ibf_order);
263
264   while (buckets_sent < (1 << ibf_order))
265   {
266     unsigned int buckets_in_message;
267     struct GNUNET_MQ_Message *mqm;
268     struct IBFMessage *msg;
269
270     buckets_in_message = (1 << ibf_order) - buckets_sent;
271     /* limit to maximum */
272     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
273       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
274
275     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
276                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
277     msg->order = htons (ibf_order);
278     msg->offset = htons (buckets_sent);
279     ibf_write_slice (ibf, buckets_sent,
280                      buckets_in_message, &msg[1]);
281     buckets_sent += buckets_in_message;
282     GNUNET_MQ_send (eo->mq, mqm);
283   }
284
285   eo->extra.u->state = STATE_EXPECT_ELEMENTS_AND_REQUESTS;
286 }
287
288
289 /**
290  * Send a strata estimator.
291  *
292  * @param cpi the peer
293  */
294 static void
295 send_strata_estimator (struct EvaluateOperation *eo)
296 {
297   struct GNUNET_MQ_Message *mqm;
298   struct GNUNET_MessageHeader *strata_msg;
299
300   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
301                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
302                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
303   strata_estimator_write (eo->set->extra.u->se, &strata_msg[1]);
304   GNUNET_MQ_send (eo->mq, mqm);
305
306   eo->extra.u->state = STATE_EXPECT_IBF;
307 }
308
309
310 static void
311 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
312 {
313   struct EvaluateOperation *eo = cls;
314   int ibf_order;
315   int diff;
316
317   if (eo->extra.u->state != STATE_EXPECT_SE)
318   {
319     /* FIXME: handle */
320     GNUNET_break (0);
321     return;
322   }
323   GNUNET_assert (NULL == eo->extra.u->se);
324   eo->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
325   strata_estimator_read (&mh[1], eo->extra.u->se);
326   GNUNET_assert (NULL != eo->set->extra.u->se);
327   diff = strata_estimator_difference (eo->set->extra.u->se, eo->extra.u->se);
328   /* minimum order */
329   ibf_order = 2;
330   while ((1<<ibf_order) < (2 * diff))
331     ibf_order++;
332   if (ibf_order > MAX_IBF_ORDER)
333     ibf_order = MAX_IBF_ORDER;
334   send_ibf (eo, ibf_order);
335 }
336
337
338 /**
339  * FIXME
340  *
341  * @param
342  */
343 static void
344 decode (struct EvaluateOperation *eo)
345 {
346   struct IBF_Key key;
347   int side;
348   struct InvertibleBloomFilter *diff_ibf;
349
350   GNUNET_assert (STATE_EXPECT_ELEMENTS == eo->extra.u->state);
351
352   diff_ibf = ibf_dup (prepare_ibf (eo, eo->extra.u->ibf_order));
353   ibf_subtract (diff_ibf, eo->extra.u->ibf_received);
354
355   while (1)
356   {
357     int res;
358
359     res = ibf_decode (diff_ibf, &side, &key);
360     if (GNUNET_SYSERR == res)
361     {
362       /* decoding failed, we tell the other peer by sending our ibf with a larger order */
363       GNUNET_assert (0);
364       return;
365     }
366     if (GNUNET_NO == res)
367     {
368       struct GNUNET_MQ_Message *mqm;
369       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
370       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
371       GNUNET_MQ_send (eo->mq, mqm);
372       return;
373     }
374     if (1 == side)
375     {
376       struct ElementEntry *e;
377       /* we have the element(s), send it to the other peer */
378       e = GNUNET_CONTAINER_multihashmap32_get (eo->set->extra.u->elements, (uint32_t) key.key_val);
379       if (NULL == e)
380       {
381         /* FIXME */
382         GNUNET_assert (0);
383         return;
384       }
385       while (NULL != e)
386       {
387         /* FIXME: send element */
388         e = e->next;
389       }
390     }
391     else
392     {
393       struct GNUNET_MQ_Message *mqm;
394       struct GNUNET_MessageHeader *msg;
395
396       /* FIXME: before sending the request, check if we may just have the element */
397       /* FIXME: merge multiple requests */
398       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
399       *(struct IBF_Key *) &msg[1] = key;
400       GNUNET_MQ_send (eo->mq, mqm);
401     }
402   }
403 }
404
405
406
407 static void
408 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
409 {
410   struct EvaluateOperation *eo = cls;
411   struct UnionEvaluateOperation *ueo = eo->extra.u;
412   struct IBFMessage *msg = (struct IBFMessage *) mh;
413   unsigned int buckets_in_message;
414
415   if (ueo->state == STATE_EXPECT_ELEMENTS_AND_REQUESTS)
416   {
417     /* check that the ibf is a new one / first part */
418     /* clear outgoing messages */
419     GNUNET_assert (0);
420   }
421   else if (ueo->state == STATE_EXPECT_IBF)
422   {
423     ueo->state = STATE_EXPECT_IBF_CONT;
424     ueo->ibf_order = msg->order;
425     GNUNET_assert (NULL == ueo->ibf_received);
426     ueo->ibf_received = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
427     if (ntohs (msg->offset) != 0)
428     {
429       /* FIXME: handle */
430       GNUNET_assert (0);
431     }
432   }
433   else if (ueo->state == STATE_EXPECT_IBF_CONT)
434   {
435     if ( (ntohs (msg->offset) != ueo->ibf_buckets_received) ||
436          (msg->order != ueo->ibf_order) )
437     {
438       /* FIXME: handle */
439       GNUNET_assert (0);
440     }
441   }
442
443   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
444
445   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
446   {
447     /* FIXME: handle, message was malformed */
448     GNUNET_assert (0);
449   }
450
451   ibf_read_slice (&msg[1], ueo->ibf_buckets_received, buckets_in_message, ueo->ibf_received);
452   ueo->ibf_buckets_received += buckets_in_message;
453
454   if (ueo->ibf_buckets_received == (1<<ueo->ibf_order))
455   {
456     ueo->state = STATE_EXPECT_ELEMENTS;
457     decode (eo);
458   }
459 }
460
461
462 static void
463 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
464 {
465   struct EvaluateOperation *eo = cls;
466
467   if ( (eo->extra.u->state != STATE_EXPECT_ELEMENTS) &&
468        (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS) )
469   {
470     /* FIXME: handle */
471     GNUNET_break (0);
472     return;
473   }
474 }
475
476
477 static void
478 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
479 {
480   struct EvaluateOperation *eo = cls;
481
482   /* look up elements and send them */
483   if (eo->extra.u->state != STATE_EXPECT_ELEMENTS_AND_REQUESTS)
484   {
485     /* FIXME: handle */
486     GNUNET_break (0);
487     return;
488   }
489 }
490
491
492 static void
493 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
494 {
495   GNUNET_break (0);
496 }
497
498
499 static const struct GNUNET_MQ_Handler union_handlers[] = {
500   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
501   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
502   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
503   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
504   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
505   GNUNET_MQ_HANDLERS_END
506 };
507
508
509 /**
510  * Functions of this type will be called when a stream is established
511  *
512  * @param cls the closure from GNUNET_STREAM_open
513  * @param socket socket to use to communicate with the other side (read/write)
514  */
515 static void
516 stream_open_cb (void *cls,
517                 struct GNUNET_STREAM_Socket *socket)
518 {
519   struct EvaluateOperation *eo = cls;
520
521   GNUNET_assert (NULL == eo->mq);
522   GNUNET_assert (socket == eo->socket);
523
524   eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, union_handlers, eo);
525   send_operation_request (eo);
526 }
527         
528
529 void
530 union_evaluate (struct EvaluateOperation *eo)
531 {
532   GNUNET_assert (GNUNET_SET_OPERATION_UNION == eo->set->operation);
533   eo->socket = 
534       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
535                           stream_open_cb, GNUNET_STREAM_OPTION_END);
536 }
537
538
539 static void
540 insert_ibf_key_unchecked (struct UnionState *us, struct IBF_Key ibf_key)
541 {
542   int i;
543
544   strata_estimator_insert (us->se, ibf_key);
545   for (i = 0; i <= MAX_IBF_ORDER; i++)
546   {
547     if (NULL == us->ibfs)
548       break;
549     if (NULL == us->ibfs[i])
550       continue;
551     ibf_insert (us->ibfs[i], ibf_key);
552   }
553 }
554
555
556 /**
557  * Insert an element into the consensus set of the specified session.
558  * The element will not be copied, and freed when destroying the session.
559  *
560  * @param session session for new element
561  * @param element element to insert
562  */
563 static void
564 insert_element (struct Set *set, struct GNUNET_SET_Element *element)
565 {
566   struct UnionState *us = set->extra.u;
567   struct GNUNET_HashCode hash;
568   struct ElementEntry *e;
569   struct ElementEntry *e_old;
570
571   e = GNUNET_new (struct ElementEntry);
572   e->element = element;
573   GNUNET_CRYPTO_hash (e->element->data, e->element->size, &hash);
574   e->ibf_key = ibf_key_from_hashcode (&hash);
575
576   e_old = GNUNET_CONTAINER_multihashmap32_get (us->elements, (uint32_t) e->ibf_key.key_val);
577   if (NULL == e_old)
578   {
579     GNUNET_CONTAINER_multihashmap32_put (us->elements, (uint32_t) e->ibf_key.key_val, e,
580                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
581     return;
582   }
583
584   while (NULL != e_old)
585   {
586     int cmp = memcmp (&e->ibf_key, &e_old->ibf_key, sizeof (struct IBF_Key));
587     if (cmp < 0)
588     {
589       if (NULL == e_old->next)
590       {
591         e_old->next = e;
592         insert_ibf_key_unchecked (us, e->ibf_key);
593         return;
594       }
595       e_old = e_old->next;
596     }
597     else if (cmp == 0)
598     {
599       e->next = e_old->next;
600       e_old->next = e;
601       return;
602     }
603     else
604     {
605       e->next = e_old;
606       insert_ibf_key_unchecked (us, e->ibf_key);
607       return;
608     }
609   } 
610 }
611
612
613 void
614 union_accept (struct EvaluateOperation *eo, struct Incoming *incoming)
615 {
616   GNUNET_assert (NULL != incoming->mq); 
617   eo->mq = incoming->mq;
618   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
619
620   send_strata_estimator (eo);
621 }
622
623
624 struct Set *
625 union_set_create ()
626 {
627   struct Set *set;
628   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
629   set->extra.u = (struct UnionState *) &set[1];
630   set->operation = GNUNET_SET_OPERATION_UNION;
631   set->extra.u->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
632   return set;
633 }
634
635
636 void
637 union_add (struct Set *set, struct ElementMessage *m)
638 {
639   struct GNUNET_SET_Element *element;
640   uint16_t element_size;
641   element_size = ntohs (m->header.size) - sizeof *m;
642   element = GNUNET_malloc (sizeof *element + element_size);
643   element->size = element_size;
644   element->data = &element[1];
645   memcpy (element->data, &m[1], element_size);
646   insert_element (set, element);
647 }
648