b46044c352af779b8f99f64ec1010fac900e1955
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian Fuchs
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
31 #include <gcrypt.h>
32
33 /**
34  * Current phase we are in for a intersection operation.
35  */
36 enum IntersectionOperationPhase
37 {
38   /**
39    * We get our tunnel but received no message as of now
40    */
41   PHASE_EXPECT_INITIAL,
42   /**
43    * We expect a BF + the number of the other peers elements
44    */
45   PHASE_BF_EXCHANGE,
46   /**
47    * The protocol is over.
48    * Results may still have to be sent to the client.
49    */
50   PHASE_FINISHED
51 };
52
53
54 /**
55  * State of an evaluate operation
56  * with another peer.
57  */
58 struct OperationState
59 {
60   /**
61    * Tunnel to the remote peer.
62    */
63   struct GNUNET_MESH_Tunnel *tunnel;
64
65   /**
66    * Detail information about the set operation,
67    * including the set to use.
68    */
69   struct OperationSpecification *spec;
70
71   /**
72    * Message queue for the peer.
73    */
74   struct GNUNET_MQ_Handle *mq;
75
76   /**
77    * The bf we currently receive
78    */
79   struct GNUNET_CONTAINER_BloomFilter *remote_bf;
80
81   /**
82    * BF of the set's element.
83    */
84   struct GNUNET_CONTAINER_BloomFilter *local_bf;
85
86   /**
87    * Current state of the operation.
88    */
89   enum IntersectionOperationPhase phase;
90
91   /**
92    * Generation in which the operation handle
93    * was created.
94    */
95   unsigned int generation_created;
96
97   /**
98    * Set state of the set that this operation
99    * belongs to.
100    */
101   struct Set *set;
102   
103   /**
104    * Maps element-id-hashes to 'elements in our set'.
105    */
106   struct GNUNET_CONTAINER_MultiHashMap *contained_elements;
107
108   /**
109    * Iterator for sending elements on the key to element mapping to the client.
110    */
111   struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
112   
113   /**
114    * Evaluate operations are held in
115    * a linked list.
116    */
117   struct OperationState *next;
118
119    /**
120     * Evaluate operations are held in
121     * a linked list.
122     */
123   struct OperationState *prev;
124
125   /**
126    * Did we send the client that we are done?
127    */
128   int client_done_sent;
129 };
130
131
132 /**
133  * Extra state required for efficient set intersection.
134  */
135 struct SetState
136 {
137   /**
138    * Evaluate operations are held in
139    * a linked list.
140    */
141   struct OperationState *ops_head;
142
143   /**
144    * Evaluate operations are held in
145    * a linked list.
146    */
147   struct OperationState *ops_tail;
148 };
149
150
151 /**
152  * Destroy a intersection operation, and free all resources
153  * associated with it.
154  *
155  * @param eo the intersection operation to destroy
156  */
157 static void
158 intersection_operation_destroy (struct OperationState *eo)
159 {
160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
161   GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
162                                eo->set->state->ops_tail,
163                                eo);
164   if (NULL != eo->mq)
165   {
166     GNUNET_MQ_destroy (eo->mq);
167     eo->mq = NULL;
168   }
169   if (NULL != eo->tunnel)
170   {
171     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
172     eo->tunnel = NULL;
173     GNUNET_MESH_tunnel_destroy (t);
174   }
175   // TODO: destroy set elements?
176   if (NULL != eo->spec)
177   {
178     if (NULL != eo->spec->context_msg)
179     {
180       GNUNET_free (eo->spec->context_msg);
181       eo->spec->context_msg = NULL;
182     }
183     GNUNET_free (eo->spec);
184     eo->spec = NULL;
185   }
186   GNUNET_free (eo);
187
188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
189
190   /* FIXME: do a garbage collection of the set generations */
191 }
192
193
194 /**
195  * Inform the client that the intersection operation has failed,
196  * and proceed to destroy the evaluate operation.
197  *
198  * @param eo the intersection operation to fail
199  */
200 static void
201 fail_intersection_operation (struct OperationState *eo)
202 {
203   struct GNUNET_MQ_Envelope *ev;
204   struct GNUNET_SET_ResultMessage *msg;
205
206   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
207   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
208   msg->request_id = htonl (eo->spec->client_request_id);
209   msg->element_type = htons (0);
210   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
211   intersection_operation_destroy (eo);
212 }
213
214
215 /**
216  * Send a request for the evaluate operation to a remote peer
217  *
218  * @param eo operation with the other peer
219  */
220 static void
221 send_operation_request (struct Operation *op)
222 {
223   struct GNUNET_MQ_Envelope *ev;
224   struct OperationRequestMessage *msg;
225
226   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
227                                 op->spec->context_msg);
228
229   if (NULL == ev)
230   {
231     /* the context message is too large */
232     GNUNET_break (0);
233     GNUNET_SERVER_client_disconnect (op->spec->set->client);
234     return;
235   }
236   msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
237   msg->app_id = op->spec->app_id;
238   msg->salt = htonl (op->spec->salt);
239   GNUNET_MQ_send (op->mq, ev);
240
241   if (NULL != op->spec->context_msg)
242     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
243   else
244     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
245
246   if (NULL != op->spec->context_msg)
247   {
248     GNUNET_free (op->spec->context_msg);
249     op->spec->context_msg = NULL;
250   }
251
252 }
253
254
255 /**
256  * Handle an BF message from a remote peer.
257  *
258  * @param cls the intersection operation
259  * @param mh the header of the message
260  */
261 static void
262 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
263 {
264   struct OperationState *eo = cls;
265   struct BFMessage *msg = (struct BFMessage *) mh;
266   unsigned int buckets_in_message;
267
268   if (eo->phase == PHASE_EXPECT_INITIAL )
269   {
270     eo->phase = PHASE_BF_EXCHANGE;
271     
272     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order);
273
274     // if (the remote peer has less elements than us)
275     //    run our elements through his bloomfilter
276     // else if (we have the same elements)
277     //    done;
278     // 
279     // evict elements we can exclude through the bloomfilter
280     //
281     // create a new bloomfilter over our remaining elements
282     // 
283     // send our new count and the bloomfilter back
284   }
285   else if (eo->phase == PHASE_BF_EXCHANGE)
286   {
287
288   }
289
290 }
291
292
293 /**
294  * Send a result message to the client indicating
295  * that there is a new element.
296  *
297  * @param eo intersection operation
298  * @param element element to send
299  */
300 static void
301 send_client_element (struct OperationState *eo,
302                      struct GNUNET_SET_Element *element)
303 {
304   struct GNUNET_MQ_Envelope *ev;
305   struct GNUNET_SET_ResultMessage *rm;
306
307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
308   GNUNET_assert (0 != eo->spec->client_request_id);
309   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
310   if (NULL == ev)
311   {
312     GNUNET_MQ_discard (ev);
313     GNUNET_break (0);
314     return;
315   }
316   rm->result_status = htons (GNUNET_SET_STATUS_OK);
317   rm->request_id = htonl (eo->spec->client_request_id);
318   rm->element_type = element->type;
319   memcpy (&rm[1], element->data, element->size);
320   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
321 }
322
323
324 /**
325  * Send a result message to the client indicating
326  * that the operation is over.
327  * After the result done message has been sent to the client,
328  * destroy the evaluate operation.
329  *
330  * @param eo intersection operation
331  */
332 static void
333 send_client_done_and_destroy (struct OperationState *eo)
334 {
335   struct GNUNET_MQ_Envelope *ev;
336   struct GNUNET_SET_ResultMessage *rm;
337
338   GNUNET_assert (GNUNET_NO == eo->client_done_sent);
339
340   eo->client_done_sent = GNUNET_YES;
341
342   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
343   rm->request_id = htonl (eo->spec->client_request_id);
344   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
345   rm->element_type = htons (0);
346   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
347
348   intersection_operation_destroy (eo);
349 }
350
351 /**
352  * Send a bloomfilter to our peer.
353  * that the operation is over.
354  * After the result done message has been sent to the client,
355  * destroy the evaluate operation.
356  *
357  * @param eo intersection operation
358  */
359 static void
360 send_bloomfilter (struct OperationState *eo){
361   //get number of all elements still in the set
362   
363   // send the bloomfilter
364   unsigned int buckets_sent = 0;
365   struct BloomFilter *bf;
366   //TODO:
367   // add all our elements to the bloomfilter
368   // create new bloomfilter for all our elements & count elements
369   //GNUNET_CONTAINER_multihashmap32_remove
370   //eo->local_bf = GNUNET_CONTAINER_multihashmap32_iterate(eo->set->elements, add);
371
372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", 1<<ibf_order);
373
374   bf = eo->local_bf;
375
376   while (buckets_sent < (1 << bf_order))
377   {
378     unsigned int buckets_in_message;
379     struct GNUNET_MQ_Envelope *ev;
380     struct IBFMessage *msg;
381
382     buckets_in_message = (1 << bf_order) - buckets_sent;
383     /* limit to maximum */
384     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
385       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
386
387     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
388                                GNUNET_MESSAGE_TYPE_SET_P2P_BF);
389     msg->reserved = 0;
390     msg->order = bf_order;
391     msg->offset = htons (buckets_sent);
392     ibf_write_slice (ibf, buckets_sent,
393                      buckets_in_message, &msg[1]);
394     buckets_sent += buckets_in_message;
395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
396                 buckets_in_message, buckets_sent, 1<<ibf_order);
397     GNUNET_MQ_send (eo->mq, ev);
398   }
399
400   eo->phase = PHASE_EXPECT_BF;
401 }
402
403 /**
404  * Handle a done message from a remote peer
405  *
406  * @param cls the intersection operation
407  * @param mh the message
408  */
409 static void
410 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
411 {
412   struct OperationState *eo = cls;
413   struct GNUNET_MQ_Envelope *ev;
414
415   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
416   {
417     /* we got all requests, but still have to send our elements as response */
418
419     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
420     eo->phase = PHASE_FINISHED;
421     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
422     GNUNET_MQ_send (eo->mq, ev);
423     return;
424   }
425   if (eo->phase == PHASE_EXPECT_ELEMENTS)
426   {
427     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
428     eo->phase = PHASE_FINISHED;
429     send_client_done_and_destroy (eo);
430     return;
431   }
432   GNUNET_break (0);
433   fail_intersection_operation (eo);
434 }
435
436
437 /**
438  * Evaluate a union operation with
439  * a remote peer.
440  *
441  * @param op operation to evaluate
442  */
443 static void
444 intersection_evaluate (struct Operation *op)
445 {
446   op->state = GNUNET_new (struct OperationState);
447   /* we started the operation, thus we have to send the operation request */
448   op->state->phase = PHASE_BF_EXCHANGE;
449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
450   send_operation_request (op);
451 }
452
453
454 /**
455  * Accept an union operation request from a remote peer.
456  * Only initializes the private operation state.
457  *
458  * @param op operation that will be accepted as a union operation
459  */
460 static void
461 intersection_accept (struct Operation *op)
462 {
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
464   op->state = GNUNET_new (struct OperationState);
465   
466   op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
467   
468   if (NULL != op->state->remote_bf){
469     // run the set through the remote bloomfilter
470     ;
471   }
472   
473   // 
474   op->state->local_bf;
475   
476   /* kick off the operation */
477   send_bloomfilter (op);
478 }
479
480
481 /**
482  * Create a new set supporting the intersection operation
483  *
484  * @return the newly created set
485  */
486 static struct SetState *
487 intersection_set_create (void)
488 {
489   struct SetState *set_state;
490
491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
492
493   set_state = GNUNET_new (struct SetState);
494   
495   return set_state;
496 }
497
498
499 /**
500  * Add the element from the given element message to the set.
501  *
502  * @param set_state state of the set want to add to
503  * @param ee the element to add to the set
504  */
505 static void
506 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
507 {
508   //nothing to do here
509 }
510
511
512 /**
513  * Destroy a set that supports the intersection operation
514  *
515  * @param set_state the set to destroy
516  */
517 static void
518 intersection_set_destroy (struct SetState *set_state)
519 {
520   GNUNET_free (set_state);
521 }
522
523
524 /**
525  * Remove the element given in the element message from the set.
526  *
527  * @param set_state state of the set to remove from
528  * @param element set element to remove
529  */
530 static void
531 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
532 {
533   //nothing to do here
534 }
535
536
537 /**
538  * Dispatch messages for a intersection operation.
539  *
540  * @param eo the state of the intersection evaluate operation
541  * @param mh the received message
542  * @return GNUNET_SYSERR if the tunnel should be disconnected,
543  *         GNUNET_OK otherwise
544  */
545 int
546 intersection_handle_p2p_message (struct OperationState *eo,
547                           const struct GNUNET_MessageHeader *mh)
548 {
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
550               ntohs (mh->type), ntohs (mh->size));
551   switch (ntohs (mh->type))
552   {
553     /* this message handler is not active until after we received an
554      * operation request message, thus the ops request is not handled here
555      */
556     case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
557       handle_p2p_bf (eo, mh);
558       break;
559     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
560       handle_p2p_done (eo, mh);
561       break;
562     default:
563       /* something wrong with mesh's message handlers? */
564       GNUNET_assert (0);
565   }
566   return GNUNET_OK;
567 }
568
569 /**
570  * Signal to the client that the operation has finished and
571  * destroy the operation.
572  *
573  * @param cls operation to destroy
574  */
575 static void
576 send_done_and_destroy (void *cls)
577 {
578   struct Operation *op = cls;
579   struct GNUNET_MQ_Envelope *ev;
580   struct GNUNET_SET_ResultMessage *rm;
581   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
582   rm->request_id = htonl (op->spec->client_request_id);
583   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
584   rm->element_type = htons (0);
585   GNUNET_MQ_send (op->spec->set->client_mq, ev);
586   _GSS_operation_destroy (op);
587 }
588
589 /**
590  * Send a result message to the client indicating
591  * that the operation is over.
592  * After the result done message has been sent to the client,
593  * destroy the evaluate operation.
594  *
595  * @param op union operation
596  */
597 static void
598 finish_and_destroy (struct Operation *op)
599 {
600   GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
601
602   if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
603   {
604     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
605     GNUNET_assert (NULL == op->state->full_result_iter); 
606     op->state->full_result_iter =
607         GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements);
608     return;
609   }
610   send_done_and_destroy (op);
611 }
612
613
614 static void
615 intersection_peer_disconnect (struct Operation *op)
616 {
617   if (PHASE_FINISHED != op->state->phase)
618   {
619     struct GNUNET_MQ_Envelope *ev;
620     struct GNUNET_SET_ResultMessage *msg;
621
622     ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
623     msg->request_id = htonl (op->spec->client_request_id);
624     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
625     msg->element_type = htons (0);
626     GNUNET_MQ_send (op->spec->set->client_mq, ev);
627     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
628     _GSS_operation_destroy (op);
629     return;
630   }
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
632   if (GNUNET_NO == op->state->client_done_sent)
633     finish_and_destroy (op);
634 }
635
636
637 /**
638  * Destroy the union operation.  Only things specific to the union operation are destroyed.
639  * 
640  * @param op union operation to destroy
641  */
642 static void
643 intersection_op_cancel (struct Operation *op)
644 {
645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
646   /* check if the op was canceled twice */
647   GNUNET_assert (NULL != op->state);
648   if (NULL != op->state->remote_bf)
649   {
650     GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
651     op->state->remote_bf = NULL;
652   }
653   if (NULL != op->state->local_bf)
654   {
655     GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
656     op->state->local_bf = NULL;
657   }
658   if (NULL != op->state->contained_elements)
659   {
660     // no need to free the elements, they are still part of the set
661     GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements);
662     op->state->contained_elements = NULL;
663   }
664   GNUNET_free (op->state);
665   op->state = NULL;
666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
667 }
668
669 const struct SetVT *
670 _GSS_intersection_vt ()
671 {
672   static const struct SetVT intersection_vt = {
673     .create = &intersection_set_create,
674     .msg_handler = &intersection_handle_p2p_message,
675     .add = &intersection_add,
676     .remove = &intersection_remove,
677     .destroy_set = &intersection_set_destroy,
678     .evaluate = &intersection_evaluate,
679     .accept = &intersection_accept,
680     .peer_disconnect = &intersection_peer_disconnect,
681     .cancel = &intersection_op_cancel,
682   };
683
684   return &intersection_vt;
685 }