2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
34 * Current phase we are in for a intersection operation.
36 enum IntersectionOperationPhase
39 * We get our tunnel but received no message as of now
43 * We expect a BF + the number of the other peers elements
47 * The protocol is over.
48 * Results may still have to be sent to the client.
55 * State of an evaluate operation
61 * Tunnel to the remote peer.
63 struct GNUNET_MESH_Tunnel *tunnel;
66 * Detail information about the set operation,
67 * including the set to use.
69 struct OperationSpecification *spec;
72 * Message queue for the peer.
74 struct GNUNET_MQ_Handle *mq;
77 * The bf we currently receive
79 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
82 * BF of the set's element.
84 struct GNUNET_CONTAINER_BloomFilter *local_bf;
87 * Current state of the operation.
89 enum IntersectionOperationPhase phase;
92 * Generation in which the operation handle
95 unsigned int generation_created;
98 * Set state of the set that this operation
104 * Maps element-id-hashes to 'elements in our set'.
106 struct GNUNET_CONTAINER_MultiHashMap *contained_elements;
109 * Iterator for sending elements on the key to element mapping to the client.
111 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
114 * Evaluate operations are held in
117 struct OperationState *next;
120 * Evaluate operations are held in
123 struct OperationState *prev;
126 * Did we send the client that we are done?
128 int client_done_sent;
133 * Extra state required for efficient set intersection.
138 * Evaluate operations are held in
141 struct OperationState *ops_head;
144 * Evaluate operations are held in
147 struct OperationState *ops_tail;
152 * Destroy a intersection operation, and free all resources
153 * associated with it.
155 * @param eo the intersection operation to destroy
158 intersection_operation_destroy (struct OperationState *eo)
160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
161 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
162 eo->set->state->ops_tail,
166 GNUNET_MQ_destroy (eo->mq);
169 if (NULL != eo->tunnel)
171 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
173 GNUNET_MESH_tunnel_destroy (t);
175 // TODO: destroy set elements?
176 if (NULL != eo->spec)
178 if (NULL != eo->spec->context_msg)
180 GNUNET_free (eo->spec->context_msg);
181 eo->spec->context_msg = NULL;
183 GNUNET_free (eo->spec);
188 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
190 /* FIXME: do a garbage collection of the set generations */
195 * Inform the client that the intersection operation has failed,
196 * and proceed to destroy the evaluate operation.
198 * @param eo the intersection operation to fail
201 fail_intersection_operation (struct OperationState *eo)
203 struct GNUNET_MQ_Envelope *ev;
204 struct GNUNET_SET_ResultMessage *msg;
206 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
207 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
208 msg->request_id = htonl (eo->spec->client_request_id);
209 msg->element_type = htons (0);
210 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
211 intersection_operation_destroy (eo);
216 * Send a request for the evaluate operation to a remote peer
218 * @param eo operation with the other peer
221 send_operation_request (struct Operation *op)
223 struct GNUNET_MQ_Envelope *ev;
224 struct OperationRequestMessage *msg;
226 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
227 op->spec->context_msg);
231 /* the context message is too large */
233 GNUNET_SERVER_client_disconnect (op->spec->set->client);
236 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
237 msg->app_id = op->spec->app_id;
238 msg->salt = htonl (op->spec->salt);
239 GNUNET_MQ_send (op->mq, ev);
241 if (NULL != op->spec->context_msg)
242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
246 if (NULL != op->spec->context_msg)
248 GNUNET_free (op->spec->context_msg);
249 op->spec->context_msg = NULL;
256 * Handle an BF message from a remote peer.
258 * @param cls the intersection operation
259 * @param mh the header of the message
262 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
264 struct OperationState *eo = cls;
265 struct BFMessage *msg = (struct BFMessage *) mh;
266 unsigned int buckets_in_message;
268 if (eo->phase == PHASE_EXPECT_INITIAL )
270 eo->phase = PHASE_BF_EXCHANGE;
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order);
274 // if (the remote peer has less elements than us)
275 // run our elements through his bloomfilter
276 // else if (we have the same elements)
279 // evict elements we can exclude through the bloomfilter
281 // create a new bloomfilter over our remaining elements
283 // send our new count and the bloomfilter back
285 else if (eo->phase == PHASE_BF_EXCHANGE)
294 * Send a result message to the client indicating
295 * that there is a new element.
297 * @param eo intersection operation
298 * @param element element to send
301 send_client_element (struct OperationState *eo,
302 struct GNUNET_SET_Element *element)
304 struct GNUNET_MQ_Envelope *ev;
305 struct GNUNET_SET_ResultMessage *rm;
307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
308 GNUNET_assert (0 != eo->spec->client_request_id);
309 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
312 GNUNET_MQ_discard (ev);
316 rm->result_status = htons (GNUNET_SET_STATUS_OK);
317 rm->request_id = htonl (eo->spec->client_request_id);
318 rm->element_type = element->type;
319 memcpy (&rm[1], element->data, element->size);
320 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
325 * Send a result message to the client indicating
326 * that the operation is over.
327 * After the result done message has been sent to the client,
328 * destroy the evaluate operation.
330 * @param eo intersection operation
333 send_client_done_and_destroy (struct OperationState *eo)
335 struct GNUNET_MQ_Envelope *ev;
336 struct GNUNET_SET_ResultMessage *rm;
338 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
340 eo->client_done_sent = GNUNET_YES;
342 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
343 rm->request_id = htonl (eo->spec->client_request_id);
344 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
345 rm->element_type = htons (0);
346 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
348 intersection_operation_destroy (eo);
352 * Send a bloomfilter to our peer.
353 * that the operation is over.
354 * After the result done message has been sent to the client,
355 * destroy the evaluate operation.
357 * @param eo intersection operation
360 send_bloomfilter (struct OperationState *eo){
361 //get number of all elements still in the set
363 // send the bloomfilter
364 unsigned int buckets_sent = 0;
365 struct BloomFilter *bf;
367 // add all our elements to the bloomfilter
368 // create new bloomfilter for all our elements & count elements
369 //GNUNET_CONTAINER_multihashmap32_remove
370 //eo->local_bf = GNUNET_CONTAINER_multihashmap32_iterate(eo->set->elements, add);
372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", 1<<ibf_order);
376 while (buckets_sent < (1 << bf_order))
378 unsigned int buckets_in_message;
379 struct GNUNET_MQ_Envelope *ev;
380 struct IBFMessage *msg;
382 buckets_in_message = (1 << bf_order) - buckets_sent;
383 /* limit to maximum */
384 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
385 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
387 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
388 GNUNET_MESSAGE_TYPE_SET_P2P_BF);
390 msg->order = bf_order;
391 msg->offset = htons (buckets_sent);
392 ibf_write_slice (ibf, buckets_sent,
393 buckets_in_message, &msg[1]);
394 buckets_sent += buckets_in_message;
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
396 buckets_in_message, buckets_sent, 1<<ibf_order);
397 GNUNET_MQ_send (eo->mq, ev);
400 eo->phase = PHASE_EXPECT_BF;
404 * Handle a done message from a remote peer
406 * @param cls the intersection operation
407 * @param mh the message
410 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
412 struct OperationState *eo = cls;
413 struct GNUNET_MQ_Envelope *ev;
415 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
417 /* we got all requests, but still have to send our elements as response */
419 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
420 eo->phase = PHASE_FINISHED;
421 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
422 GNUNET_MQ_send (eo->mq, ev);
425 if (eo->phase == PHASE_EXPECT_ELEMENTS)
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
428 eo->phase = PHASE_FINISHED;
429 send_client_done_and_destroy (eo);
433 fail_intersection_operation (eo);
438 * Evaluate a union operation with
441 * @param op operation to evaluate
444 intersection_evaluate (struct Operation *op)
446 op->state = GNUNET_new (struct OperationState);
447 /* we started the operation, thus we have to send the operation request */
448 op->state->phase = PHASE_BF_EXCHANGE;
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
450 send_operation_request (op);
455 * Accept an union operation request from a remote peer.
456 * Only initializes the private operation state.
458 * @param op operation that will be accepted as a union operation
461 intersection_accept (struct Operation *op)
463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
464 op->state = GNUNET_new (struct OperationState);
466 op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
468 if (NULL != op->state->remote_bf){
469 // run the set through the remote bloomfilter
476 /* kick off the operation */
477 send_bloomfilter (op);
482 * Create a new set supporting the intersection operation
484 * @return the newly created set
486 static struct SetState *
487 intersection_set_create (void)
489 struct SetState *set_state;
491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
493 set_state = GNUNET_new (struct SetState);
500 * Add the element from the given element message to the set.
502 * @param set_state state of the set want to add to
503 * @param ee the element to add to the set
506 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
513 * Destroy a set that supports the intersection operation
515 * @param set_state the set to destroy
518 intersection_set_destroy (struct SetState *set_state)
520 GNUNET_free (set_state);
525 * Remove the element given in the element message from the set.
527 * @param set_state state of the set to remove from
528 * @param element set element to remove
531 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
538 * Dispatch messages for a intersection operation.
540 * @param eo the state of the intersection evaluate operation
541 * @param mh the received message
542 * @return GNUNET_SYSERR if the tunnel should be disconnected,
543 * GNUNET_OK otherwise
546 intersection_handle_p2p_message (struct OperationState *eo,
547 const struct GNUNET_MessageHeader *mh)
549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
550 ntohs (mh->type), ntohs (mh->size));
551 switch (ntohs (mh->type))
553 /* this message handler is not active until after we received an
554 * operation request message, thus the ops request is not handled here
556 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
557 handle_p2p_bf (eo, mh);
559 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
560 handle_p2p_done (eo, mh);
563 /* something wrong with mesh's message handlers? */
570 * Signal to the client that the operation has finished and
571 * destroy the operation.
573 * @param cls operation to destroy
576 send_done_and_destroy (void *cls)
578 struct Operation *op = cls;
579 struct GNUNET_MQ_Envelope *ev;
580 struct GNUNET_SET_ResultMessage *rm;
581 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
582 rm->request_id = htonl (op->spec->client_request_id);
583 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
584 rm->element_type = htons (0);
585 GNUNET_MQ_send (op->spec->set->client_mq, ev);
586 _GSS_operation_destroy (op);
590 * Send a result message to the client indicating
591 * that the operation is over.
592 * After the result done message has been sent to the client,
593 * destroy the evaluate operation.
595 * @param op union operation
598 finish_and_destroy (struct Operation *op)
600 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
602 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
605 GNUNET_assert (NULL == op->state->full_result_iter);
606 op->state->full_result_iter =
607 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements);
610 send_done_and_destroy (op);
615 intersection_peer_disconnect (struct Operation *op)
617 if (PHASE_FINISHED != op->state->phase)
619 struct GNUNET_MQ_Envelope *ev;
620 struct GNUNET_SET_ResultMessage *msg;
622 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
623 msg->request_id = htonl (op->spec->client_request_id);
624 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
625 msg->element_type = htons (0);
626 GNUNET_MQ_send (op->spec->set->client_mq, ev);
627 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
628 _GSS_operation_destroy (op);
631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
632 if (GNUNET_NO == op->state->client_done_sent)
633 finish_and_destroy (op);
638 * Destroy the union operation. Only things specific to the union operation are destroyed.
640 * @param op union operation to destroy
643 intersection_op_cancel (struct Operation *op)
645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
646 /* check if the op was canceled twice */
647 GNUNET_assert (NULL != op->state);
648 if (NULL != op->state->remote_bf)
650 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
651 op->state->remote_bf = NULL;
653 if (NULL != op->state->local_bf)
655 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
656 op->state->local_bf = NULL;
658 if (NULL != op->state->contained_elements)
660 // no need to free the elements, they are still part of the set
661 GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements);
662 op->state->contained_elements = NULL;
664 GNUNET_free (op->state);
666 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
670 _GSS_intersection_vt ()
672 static const struct SetVT intersection_vt = {
673 .create = &intersection_set_create,
674 .msg_handler = &intersection_handle_p2p_message,
675 .add = &intersection_add,
676 .remove = &intersection_remove,
677 .destroy_set = &intersection_set_destroy,
678 .evaluate = &intersection_evaluate,
679 .accept = &intersection_accept,
680 .peer_disconnect = &intersection_peer_disconnect,
681 .cancel = &intersection_op_cancel,
684 return &intersection_vt;