2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
40 * Lazy copy requests made by a client.
42 struct LazyCopyRequest
47 struct LazyCopyRequest *prev;
52 struct LazyCopyRequest *next;
55 * Which set are we supposed to copy?
57 struct Set *source_set;
60 * Cookie identifying the request.
67 * A listener is inhabited by a client, and waits for evaluation
68 * requests from remote peers.
73 * Listeners are held in a doubly linked list.
75 struct Listener *next;
78 * Listeners are held in a doubly linked list.
80 struct Listener *prev;
83 * Head of DLL of operations this listener is responsible for.
84 * Once the client has accepted/declined the operation, the
85 * operation is moved to the respective set's operation DLLS.
87 struct Operation *op_head;
90 * Tail of DLL of operations this listener is responsible for.
91 * Once the client has accepted/declined the operation, the
92 * operation is moved to the respective set's operation DLLS.
94 struct Operation *op_tail;
97 * Client that owns the listener.
98 * Only one client may own a listener.
100 struct ClientState *cs;
103 * The port we are listening on with CADET.
105 struct GNUNET_CADET_Port *open_port;
108 * Application ID for the operation, used to distinguish
109 * multiple operations of the same type with the same peer.
111 struct GNUNET_HashCode app_id;
114 * The type of the operation.
116 enum GNUNET_SET_OperationType operation;
121 * Handle to the cadet service, used to listen for and connect to
124 static struct GNUNET_CADET_Handle *cadet;
127 * DLL of lazy copy requests by this client.
129 static struct LazyCopyRequest *lazy_copy_head;
132 * DLL of lazy copy requests by this client.
134 static struct LazyCopyRequest *lazy_copy_tail;
137 * Generator for unique cookie we set per lazy copy request.
139 static uint32_t lazy_copy_cookie;
144 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
147 * Listeners are held in a doubly linked list.
149 static struct Listener *listener_head;
152 * Listeners are held in a doubly linked list.
154 static struct Listener *listener_tail;
157 * Number of active clients.
159 static unsigned int num_clients;
162 * Are we in shutdown? if #GNUNET_YES and the number of clients
163 * drops to zero, disconnect from CADET.
165 static int in_shutdown;
168 * Counter for allocating unique IDs for clients, used to identify
169 * incoming operation requests from remote peers, that the client can
170 * choose to accept or refuse. 0 must not be used (reserved for
173 static uint32_t suggest_id;
177 * Get the incoming socket associated with the given id.
179 * @param listener the listener to look in
180 * @param id id to look for
181 * @return the incoming socket associated with the id,
182 * or NULL if there is none
184 static struct Operation *
185 get_incoming (uint32_t id)
187 for (struct Listener *listener = listener_head; NULL != listener;
188 listener = listener->next)
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
199 * Destroy an incoming request from a remote peer
201 * @param op remote request to destroy
204 incoming_destroy (struct Operation *op)
206 struct Listener *listener;
208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209 "Destroying incoming operation %p\n",
211 if (NULL != (listener = op->listener))
213 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
216 if (NULL != op->timeout_task)
218 GNUNET_SCHEDULER_cancel (op->timeout_task);
219 op->timeout_task = NULL;
221 _GSS_operation_destroy2 (op);
226 * Context for the #garbage_collect_cb().
228 struct GarbageContext
231 * Map for which we are garbage collecting removed elements.
233 struct GNUNET_CONTAINER_MultiHashMap *map;
236 * Lowest generation for which an operation is still pending.
238 unsigned int min_op_generation;
241 * Largest generation for which an operation is still pending.
243 unsigned int max_op_generation;
248 * Function invoked to check if an element can be removed from
249 * the set's history because it is no longer needed.
251 * @param cls the `struct GarbageContext *`
252 * @param key key of the element in the map
253 * @param value the `struct ElementEntry *`
254 * @return #GNUNET_OK (continue to iterate)
257 garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
259 // struct GarbageContext *gc = cls;
260 // struct ElementEntry *ee = value;
262 // if (GNUNET_YES != ee->removed)
264 // if ( (gc->max_op_generation < ee->generation_added) ||
265 // (ee->generation_removed > gc->min_op_generation) )
267 // GNUNET_assert (GNUNET_YES ==
268 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
278 * Collect and destroy elements that are not needed anymore, because
279 * their lifetime (as determined by their generation) does not overlap
280 * with any active set operation.
282 * @param set set to garbage collect
285 collect_generation_garbage (struct Set *set)
287 struct GarbageContext gc;
289 gc.min_op_generation = UINT_MAX;
290 gc.max_op_generation = 0;
291 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
293 gc.min_op_generation =
294 GNUNET_MIN (gc.min_op_generation, op->generation_created);
295 gc.max_op_generation =
296 GNUNET_MAX (gc.max_op_generation, op->generation_created);
298 gc.map = set->content->elements;
299 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
306 * Is @a generation in the range of exclusions?
308 * @param generation generation to query
309 * @param excluded array of generations where the element is excluded
310 * @param excluded_size length of the @a excluded array
311 * @return #GNUNET_YES if @a generation is in any of the ranges
314 is_excluded_generation (unsigned int generation,
315 struct GenerationRange *excluded,
316 unsigned int excluded_size)
318 for (unsigned int i = 0; i < excluded_size; i++)
319 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
326 * Is element @a ee part of the set during @a query_generation?
328 * @param ee element to test
329 * @param query_generation generation to query
330 * @param excluded array of generations where the element is excluded
331 * @param excluded_size length of the @a excluded array
332 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
335 is_element_of_generation (struct ElementEntry *ee,
336 unsigned int query_generation,
337 struct GenerationRange *excluded,
338 unsigned int excluded_size)
340 struct MutationEvent *mut;
343 GNUNET_assert (NULL != ee->mutations);
345 is_excluded_generation (query_generation, excluded, excluded_size))
351 is_present = GNUNET_NO;
353 /* Could be made faster with binary search, but lists
354 are small, so why bother. */
355 for (unsigned int i = 0; i < ee->mutations_size; i++)
357 mut = &ee->mutations[i];
359 if (mut->generation > query_generation)
361 /* The mutation doesn't apply to our generation
362 anymore. We can'b break here, since mutations aren't
363 sorted by generation. */
368 is_excluded_generation (mut->generation, excluded, excluded_size))
370 /* The generation is excluded (because it belongs to another
371 fork via a lazy copy) and thus mutations aren't considered
372 for membership testing. */
376 /* This would be an inconsistency in how we manage mutations. */
377 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
380 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
383 is_present = mut->added;
391 * Is element @a ee part of the set used by @a op?
393 * @param ee element to test
394 * @param op operation the defines the set and its generation
395 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
398 _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
400 return is_element_of_generation (ee,
401 op->generation_created,
402 op->set->excluded_generations,
403 op->set->excluded_generations_size);
408 * Destroy the given operation. Used for any operation where both
409 * peers were known and that thus actually had a vt and channel. Must
410 * not be used for operations where 'listener' is still set and we do
411 * not know the other peer.
413 * Call the implementation-specific cancel function of the operation.
414 * Disconnects from the remote peer. Does not disconnect the client,
415 * as there may be multiple operations per set.
417 * @param op operation to destroy
418 * @param gc #GNUNET_YES to perform garbage collection on the set
421 _GSS_operation_destroy (struct Operation *op, int gc)
423 struct Set *set = op->set;
424 struct GNUNET_CADET_Channel *channel;
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427 GNUNET_assert (NULL == op->listener);
428 if (NULL != op->state)
430 set->vt->cancel (op);
435 GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op);
438 if (NULL != op->context_msg)
440 GNUNET_free (op->context_msg);
441 op->context_msg = NULL;
443 if (NULL != (channel = op->channel))
445 /* This will free op; called conditionally as this helper function
446 is also called from within the channel disconnect handler. */
448 GNUNET_CADET_channel_destroy (channel);
450 if ((NULL != set) && (GNUNET_YES == gc))
451 collect_generation_garbage (set);
452 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453 * there was a channel end handler that will free 'op' on the call stack. */
458 * Callback called when a client connects to the service.
460 * @param cls closure for the service
461 * @param c the new client that connected to the service
462 * @param mq the message queue used to send messages to the client
463 * @return @a `struct ClientState`
466 client_connect_cb (void *cls,
467 struct GNUNET_SERVICE_Client *c,
468 struct GNUNET_MQ_Handle *mq)
470 struct ClientState *cs;
473 cs = GNUNET_new (struct ClientState);
481 * Iterator over hash map entries to free element entries.
484 * @param key current key code
485 * @param value a `struct ElementEntry *` to be free'd
486 * @return #GNUNET_YES (continue to iterate)
489 destroy_elements_iterator (void *cls,
490 const struct GNUNET_HashCode *key,
493 struct ElementEntry *ee = value;
495 GNUNET_free_non_null (ee->mutations);
502 * Clean up after a client has disconnected
504 * @param cls closure, unused
505 * @param client the client to clean up after
506 * @param internal_cls the `struct ClientState`
509 client_disconnect_cb (void *cls,
510 struct GNUNET_SERVICE_Client *client,
513 struct ClientState *cs = internal_cls;
514 struct Operation *op;
515 struct Listener *listener;
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519 if (NULL != (set = cs->set))
521 struct SetContent *content = set->content;
522 struct PendingMutation *pm;
523 struct PendingMutation *pm_current;
524 struct LazyCopyRequest *lcr;
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527 /* Destroy pending set operations */
528 while (NULL != set->ops_head)
529 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
531 /* Destroy operation-specific state */
532 GNUNET_assert (NULL != set->state);
533 set->vt->destroy_set (set->state);
536 /* Clean up ongoing iterations */
537 if (NULL != set->iter)
539 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
544 /* discard any pending mutations that reference this set */
545 pm = content->pending_mutations_head;
550 if (pm_current->set == set)
552 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553 content->pending_mutations_tail,
555 GNUNET_free (pm_current);
559 /* free set content (or at least decrement RC) */
561 GNUNET_assert (0 != content->refcount);
563 if (0 == content->refcount)
565 GNUNET_assert (NULL != content->elements);
566 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
567 &destroy_elements_iterator,
569 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
570 content->elements = NULL;
571 GNUNET_free (content);
573 GNUNET_free_non_null (set->excluded_generations);
574 set->excluded_generations = NULL;
576 /* remove set from pending copy requests */
577 lcr = lazy_copy_head;
580 struct LazyCopyRequest *lcr_current = lcr;
583 if (lcr_current->source_set == set)
585 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
588 GNUNET_free (lcr_current);
594 if (NULL != (listener = cs->listener))
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
597 GNUNET_CADET_close_port (listener->open_port);
598 listener->open_port = NULL;
599 while (NULL != (op = listener->op_head))
601 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602 "Destroying incoming operation `%u' from peer `%s'\n",
603 (unsigned int) op->client_request_id,
604 GNUNET_i2s (&op->peer));
605 incoming_destroy (op);
607 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
608 GNUNET_free (listener);
612 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
616 GNUNET_CADET_disconnect (cadet);
624 * Check a request for a set operation from another peer.
626 * @param cls the operation state
627 * @param msg the received message
628 * @return #GNUNET_OK if the channel should be kept alive,
629 * #GNUNET_SYSERR to destroy the channel
632 check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
634 struct Operation *op = cls;
635 struct Listener *listener = op->listener;
636 const struct GNUNET_MessageHeader *nested_context;
638 /* double operation request */
639 if (0 != op->suggest_id)
642 return GNUNET_SYSERR;
644 /* This should be equivalent to the previous condition, but can't hurt to check twice */
645 if (NULL == op->listener)
648 return GNUNET_SYSERR;
650 if (listener->operation !=
651 (enum GNUNET_SET_OperationType) ntohl (msg->operation))
654 return GNUNET_SYSERR;
656 nested_context = GNUNET_MQ_extract_nested_mh (msg);
657 if ((NULL != nested_context) &&
658 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
661 return GNUNET_SYSERR;
668 * Handle a request for a set operation from another peer. Checks if we
669 * have a listener waiting for such a request (and in that case initiates
670 * asking the listener about accepting the connection). If no listener
671 * is waiting, we queue the operation request in hope that a listener
672 * shows up soon (before timeout).
674 * This msg is expected as the first and only msg handled through the
675 * non-operation bound virtual table, acceptance of this operation replaces
676 * our virtual table and subsequent msgs would be routed differently (as
677 * we then know what type of operation this is).
679 * @param cls the operation state
680 * @param msg the received message
681 * @return #GNUNET_OK if the channel should be kept alive,
682 * #GNUNET_SYSERR to destroy the channel
685 handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
687 struct Operation *op = cls;
688 struct Listener *listener = op->listener;
689 const struct GNUNET_MessageHeader *nested_context;
690 struct GNUNET_MQ_Envelope *env;
691 struct GNUNET_SET_RequestMessage *cmsg;
693 nested_context = GNUNET_MQ_extract_nested_mh (msg);
694 /* Make a copy of the nested_context (application-specific context
695 information that is opaque to set) so we can pass it to the
697 if (NULL != nested_context)
698 op->context_msg = GNUNET_copy_message (nested_context);
699 op->remote_element_count = ntohl (msg->element_count);
701 GNUNET_ERROR_TYPE_DEBUG,
702 "Received P2P operation request (op %u, port %s) for active listener\n",
703 (uint32_t) ntohl (msg->operation),
704 GNUNET_h2s (&op->listener->app_id));
705 GNUNET_assert (0 == op->suggest_id);
708 op->suggest_id = suggest_id++;
709 GNUNET_assert (NULL != op->timeout_task);
710 GNUNET_SCHEDULER_cancel (op->timeout_task);
711 op->timeout_task = NULL;
712 env = GNUNET_MQ_msg_nested_mh (cmsg,
713 GNUNET_MESSAGE_TYPE_SET_REQUEST,
716 GNUNET_ERROR_TYPE_DEBUG,
717 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
721 cmsg->accept_id = htonl (op->suggest_id);
722 cmsg->peer_id = op->peer;
723 GNUNET_MQ_send (listener->cs->mq, env);
724 /* NOTE: GNUNET_CADET_receive_done() will be called in
725 #handle_client_accept() */
730 * Add an element to @a set as specified by @a msg
732 * @param set set to manipulate
733 * @param msg message specifying the change
736 execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
738 struct GNUNET_SET_Element el;
739 struct ElementEntry *ee;
740 struct GNUNET_HashCode hash;
742 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
743 el.size = ntohs (msg->header.size) - sizeof(*msg);
745 el.element_type = ntohs (msg->element_type);
746 GNUNET_SET_element_hash (&el, &hash);
747 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "Client inserts element %s of size %u\n",
754 ee = GNUNET_malloc (el.size + sizeof(*ee));
755 ee->element.size = el.size;
756 GNUNET_memcpy (&ee[1], el.data, el.size);
757 ee->element.data = &ee[1];
758 ee->element.element_type = el.element_type;
759 ee->remote = GNUNET_NO;
760 ee->mutations = NULL;
761 ee->mutations_size = 0;
762 ee->element_hash = hash;
763 GNUNET_break (GNUNET_YES ==
764 GNUNET_CONTAINER_multihashmap_put (
765 set->content->elements,
768 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
770 else if (GNUNET_YES ==
771 is_element_of_generation (ee,
772 set->current_generation,
773 set->excluded_generations,
774 set->excluded_generations_size))
776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777 "Client inserted element %s of size %u twice (ignored)\n",
781 /* same element inserted twice */
786 struct MutationEvent mut = { .generation = set->current_generation,
787 .added = GNUNET_YES };
788 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
790 set->vt->add (set->state, ee);
795 * Remove an element from @a set as specified by @a msg
797 * @param set set to manipulate
798 * @param msg message specifying the change
801 execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
803 struct GNUNET_SET_Element el;
804 struct ElementEntry *ee;
805 struct GNUNET_HashCode hash;
807 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
808 el.size = ntohs (msg->header.size) - sizeof(*msg);
810 el.element_type = ntohs (msg->element_type);
811 GNUNET_SET_element_hash (&el, &hash);
812 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
815 /* Client tried to remove non-existing element. */
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 "Client removes non-existing element of size %u\n",
821 if (GNUNET_NO == is_element_of_generation (ee,
822 set->current_generation,
823 set->excluded_generations,
824 set->excluded_generations_size))
826 /* Client tried to remove element twice */
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 "Client removed element of size %u twice (ignored)\n",
834 struct MutationEvent mut = { .generation = set->current_generation,
835 .added = GNUNET_NO };
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "Client removes element of size %u\n",
841 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
843 set->vt->remove (set->state, ee);
848 * Perform a mutation on a set as specified by the @a msg
850 * @param set the set to mutate
851 * @param msg specification of what to change
854 execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
856 switch (ntohs (msg->header.type))
858 case GNUNET_MESSAGE_TYPE_SET_ADD:
859 execute_add (set, msg);
862 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
863 execute_remove (set, msg);
873 * Execute mutations that were delayed on a set because of
874 * pending operations.
876 * @param set the set to execute mutations on
879 execute_delayed_mutations (struct Set *set)
881 struct PendingMutation *pm;
883 if (0 != set->content->iterator_count)
884 return; /* still cannot do this */
885 while (NULL != (pm = set->content->pending_mutations_head))
887 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
888 set->content->pending_mutations_tail,
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891 "Executing pending mutation on %p.\n",
893 execute_mutation (pm->set, pm->msg);
894 GNUNET_free (pm->msg);
901 * Send the next element of a set to the set's client. The next element is given by
902 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
903 * are no more elements in the set. The caller must ensure that the set's iterator is
906 * The client will acknowledge each received element with a
907 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
908 * #handle_client_iter_ack() will then trigger the next transmission.
909 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
911 * @param set set that should send its next element to its client
914 send_client_element (struct Set *set)
917 struct ElementEntry *ee;
918 struct GNUNET_MQ_Envelope *ev;
919 struct GNUNET_SET_IterResponseMessage *msg;
921 GNUNET_assert (NULL != set->iter);
924 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
926 (const void **) &ee);
927 if (GNUNET_NO == ret)
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
930 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
931 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
934 GNUNET_assert (set->content->iterator_count > 0);
935 set->content->iterator_count--;
936 execute_delayed_mutations (set);
937 GNUNET_MQ_send (set->cs->mq, ev);
940 GNUNET_assert (NULL != ee);
943 is_element_of_generation (ee,
944 set->iter_generation,
945 set->excluded_generations,
946 set->excluded_generations_size));
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "Sending iteration element on %p.\n",
950 ev = GNUNET_MQ_msg_extra (msg,
952 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
953 GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954 msg->element_type = htons (ee->element.element_type);
955 msg->iteration_id = htons (set->iteration_id);
956 GNUNET_MQ_send (set->cs->mq, ev);
961 * Called when a client wants to iterate the elements of a set.
962 * Checks if we have a set associated with the client and if we
963 * can right now start an iteration. If all checks out, starts
964 * sending the elements of the set to the client.
966 * @param cls client that sent the message
967 * @param m message sent by the client
970 handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
972 struct ClientState *cs = cls;
975 if (NULL == (set = cs->set))
977 /* attempt to iterate over a non existing set */
979 GNUNET_SERVICE_client_drop (cs->client);
982 if (NULL != set->iter)
984 /* Only one concurrent iterate-action allowed per set */
986 GNUNET_SERVICE_client_drop (cs->client);
989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990 "Iterating set %p in gen %u with %u content elements\n",
992 set->current_generation,
993 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
994 GNUNET_SERVICE_client_continue (cs->client);
995 set->content->iterator_count++;
997 GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
998 set->iter_generation = set->current_generation;
999 send_client_element (set);
1004 * Called when a client wants to create a new set. This is typically
1005 * the first request from a client, and includes the type of set
1006 * operation to be performed.
1008 * @param cls client that sent the message
1009 * @param m message sent by the client
1012 handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
1014 struct ClientState *cs = cls;
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018 "Client created new set (operation %u)\n",
1019 (uint32_t) ntohl (msg->operation));
1020 if (NULL != cs->set)
1022 /* There can only be one set per client */
1024 GNUNET_SERVICE_client_drop (cs->client);
1027 set = GNUNET_new (struct Set);
1028 switch (ntohl (msg->operation))
1030 case GNUNET_SET_OPERATION_INTERSECTION:
1031 set->vt = _GSS_intersection_vt ();
1034 case GNUNET_SET_OPERATION_UNION:
1035 set->vt = _GSS_union_vt ();
1041 GNUNET_SERVICE_client_drop (cs->client);
1044 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045 set->state = set->vt->create ();
1046 if (NULL == set->state)
1048 /* initialization failed (i.e. out of memory) */
1050 GNUNET_SERVICE_client_drop (cs->client);
1053 set->content = GNUNET_new (struct SetContent);
1054 set->content->refcount = 1;
1055 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1058 GNUNET_SERVICE_client_continue (cs->client);
1063 * Timeout happens iff:
1064 * - we suggested an operation to our listener,
1065 * but did not receive a response in time
1066 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1068 * @param cls channel context
1069 * @param tc context information (why was this task triggered now)
1072 incoming_timeout_cb (void *cls)
1074 struct Operation *op = cls;
1076 op->timeout_task = NULL;
1077 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078 "Remote peer's incoming request timed out\n");
1079 incoming_destroy (op);
1084 * Method called whenever another peer has added us to a channel the
1085 * other peer initiated. Only called (once) upon reception of data
1086 * from a channel we listen on.
1088 * The channel context represents the operation itself and gets added
1089 * to a DLL, from where it gets looked up when our local listener
1090 * client responds to a proposed/suggested operation or connects and
1091 * associates with this operation.
1093 * @param cls closure
1094 * @param channel new handle to the channel
1095 * @param source peer that started the channel
1096 * @return initial channel context for the channel
1097 * returns NULL on error
1100 channel_new_cb (void *cls,
1101 struct GNUNET_CADET_Channel *channel,
1102 const struct GNUNET_PeerIdentity *source)
1104 struct Listener *listener = cls;
1105 struct Operation *op;
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1108 op = GNUNET_new (struct Operation);
1109 op->listener = listener;
1111 op->channel = channel;
1112 op->mq = GNUNET_CADET_get_mq (op->channel);
1113 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1114 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1115 &incoming_timeout_cb,
1117 GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op);
1123 * Function called whenever a channel is destroyed. Should clean up
1124 * any associated state. It must NOT call
1125 * GNUNET_CADET_channel_destroy() on the channel.
1127 * The peer_disconnect function is part of a a virtual table set initially either
1128 * when a peer creates a new channel with us, or once we create
1129 * a new channel ourselves (evaluate).
1131 * Once we know the exact type of operation (union/intersection), the vt is
1132 * replaced with an operation specific instance (_GSS_[op]_vt).
1134 * @param channel_ctx place where local state associated
1135 * with the channel is stored
1136 * @param channel connection to the other end (henceforth invalid)
1139 channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1141 struct Operation *op = channel_ctx;
1144 _GSS_operation_destroy2 (op);
1149 * This function probably should not exist
1150 * and be replaced by inlining more specific
1151 * logic in the various places where it is called.
1154 _GSS_operation_destroy2 (struct Operation *op)
1156 struct GNUNET_CADET_Channel *channel;
1158 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1159 if (NULL != (channel = op->channel))
1161 /* This will free op; called conditionally as this helper function
1162 is also called from within the channel disconnect handler. */
1164 GNUNET_CADET_channel_destroy (channel);
1166 if (NULL != op->listener)
1168 incoming_destroy (op);
1171 if (NULL != op->set)
1172 op->set->vt->channel_death (op);
1174 _GSS_operation_destroy (op, GNUNET_YES);
1180 * Function called whenever an MQ-channel's transmission window size changes.
1182 * The first callback in an outgoing channel will be with a non-zero value
1183 * and will mean the channel is connected to the destination.
1185 * For an incoming channel it will be called immediately after the
1186 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1188 * @param cls Channel closure.
1189 * @param channel Connection to the other end (henceforth invalid).
1190 * @param window_size New window size. If the is more messages than buffer size
1191 * this value will be negative..
1194 channel_window_cb (void *cls,
1195 const struct GNUNET_CADET_Channel *channel,
1198 /* FIXME: not implemented, we could do flow control here... */
1203 * Called when a client wants to create a new listener.
1205 * @param cls client that sent the message
1206 * @param msg message sent by the client
1209 handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
1211 struct ClientState *cs = cls;
1212 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1213 { GNUNET_MQ_hd_var_size (incoming_msg,
1214 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1215 struct OperationRequestMessage,
1217 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1218 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1221 GNUNET_MQ_hd_var_size (union_p2p_elements,
1222 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1223 struct GNUNET_SET_ElementMessage,
1225 GNUNET_MQ_hd_var_size (union_p2p_offer,
1226 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1227 struct GNUNET_MessageHeader,
1229 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1230 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1231 struct InquiryMessage,
1233 GNUNET_MQ_hd_var_size (union_p2p_demand,
1234 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1235 struct GNUNET_MessageHeader,
1237 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1238 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1239 struct GNUNET_MessageHeader,
1241 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1242 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1243 struct GNUNET_MessageHeader,
1245 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1246 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1247 struct GNUNET_MessageHeader,
1249 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1250 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1251 struct GNUNET_MessageHeader,
1253 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1254 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1255 struct StrataEstimatorMessage,
1257 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1258 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1259 struct StrataEstimatorMessage,
1261 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1262 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1263 struct GNUNET_SET_ElementMessage,
1265 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1266 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1267 struct IntersectionElementInfoMessage,
1269 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1270 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1273 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1274 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1275 struct IntersectionDoneMessage,
1277 GNUNET_MQ_handler_end () };
1278 struct Listener *listener;
1280 if (NULL != cs->listener)
1282 /* max. one active listener per client! */
1284 GNUNET_SERVICE_client_drop (cs->client);
1287 listener = GNUNET_new (struct Listener);
1289 cs->listener = listener;
1290 listener->app_id = msg->app_id;
1291 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1292 GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener);
1293 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294 "New listener created (op %u, port %s)\n",
1295 listener->operation,
1296 GNUNET_h2s (&listener->app_id));
1297 listener->open_port = GNUNET_CADET_open_port (cadet,
1304 GNUNET_SERVICE_client_continue (cs->client);
1309 * Called when the listening client rejects an operation
1310 * request by another peer.
1312 * @param cls client that sent the message
1313 * @param msg message sent by the client
1316 handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
1318 struct ClientState *cs = cls;
1319 struct Operation *op;
1321 op = get_incoming (ntohl (msg->accept_reject_id));
1324 /* no matching incoming operation for this reject;
1325 could be that the other peer already disconnected... */
1326 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1327 "Client rejected unknown operation %u\n",
1328 (unsigned int) ntohl (msg->accept_reject_id));
1329 GNUNET_SERVICE_client_continue (cs->client);
1332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333 "Peer request (op %u, app %s) rejected by client\n",
1334 op->listener->operation,
1335 GNUNET_h2s (&cs->listener->app_id));
1336 _GSS_operation_destroy2 (op);
1337 GNUNET_SERVICE_client_continue (cs->client);
1342 * Called when a client wants to add or remove an element to a set it inhabits.
1344 * @param cls client that sent the message
1345 * @param msg message sent by the client
1348 check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1350 /* NOTE: Technically, we should probably check with the
1351 block library whether the element we are given is well-formed */
1357 * Called when a client wants to add or remove an element to a set it inhabits.
1359 * @param cls client that sent the message
1360 * @param msg message sent by the client
1363 handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1365 struct ClientState *cs = cls;
1368 if (NULL == (set = cs->set))
1370 /* client without a set requested an operation */
1372 GNUNET_SERVICE_client_drop (cs->client);
1375 GNUNET_SERVICE_client_continue (cs->client);
1377 if (0 != set->content->iterator_count)
1379 struct PendingMutation *pm;
1381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1382 pm = GNUNET_new (struct PendingMutation);
1384 (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1386 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1387 set->content->pending_mutations_tail,
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1392 execute_mutation (set, msg);
1397 * Advance the current generation of a set,
1398 * adding exclusion ranges if necessary.
1400 * @param set the set where we want to advance the generation
1403 advance_generation (struct Set *set)
1405 struct GenerationRange r;
1407 if (set->current_generation == set->content->latest_generation)
1409 set->content->latest_generation++;
1410 set->current_generation++;
1414 GNUNET_assert (set->current_generation < set->content->latest_generation);
1416 r.start = set->current_generation + 1;
1417 r.end = set->content->latest_generation + 1;
1418 set->content->latest_generation = r.end;
1419 set->current_generation = r.end;
1420 GNUNET_array_append (set->excluded_generations,
1421 set->excluded_generations_size,
1427 * Called when a client wants to initiate a set operation with another
1428 * peer. Initiates the CADET connection to the listener and sends the
1431 * @param cls client that sent the message
1432 * @param msg message sent by the client
1433 * @return #GNUNET_OK if the message is well-formed
1436 check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1438 /* FIXME: suboptimal, even if the context below could be NULL,
1439 there are malformed messages this does not check for... */
1445 * Called when a client wants to initiate a set operation with another
1446 * peer. Initiates the CADET connection to the listener and sends the
1449 * @param cls client that sent the message
1450 * @param msg message sent by the client
1453 handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1455 struct ClientState *cs = cls;
1456 struct Operation *op = GNUNET_new (struct Operation);
1457 const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1458 { GNUNET_MQ_hd_var_size (incoming_msg,
1459 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1460 struct OperationRequestMessage,
1462 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1463 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1466 GNUNET_MQ_hd_var_size (union_p2p_elements,
1467 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1468 struct GNUNET_SET_ElementMessage,
1470 GNUNET_MQ_hd_var_size (union_p2p_offer,
1471 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1472 struct GNUNET_MessageHeader,
1474 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1475 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1476 struct InquiryMessage,
1478 GNUNET_MQ_hd_var_size (union_p2p_demand,
1479 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1480 struct GNUNET_MessageHeader,
1482 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1483 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1484 struct GNUNET_MessageHeader,
1486 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1487 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1488 struct GNUNET_MessageHeader,
1490 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1491 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1492 struct GNUNET_MessageHeader,
1494 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1495 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1496 struct GNUNET_MessageHeader,
1498 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1499 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1500 struct StrataEstimatorMessage,
1502 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1503 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1504 struct StrataEstimatorMessage,
1506 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1507 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1508 struct GNUNET_SET_ElementMessage,
1510 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1511 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1512 struct IntersectionElementInfoMessage,
1514 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1515 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1518 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1519 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1520 struct IntersectionDoneMessage,
1522 GNUNET_MQ_handler_end () };
1524 const struct GNUNET_MessageHeader *context;
1526 if (NULL == (set = cs->set))
1530 GNUNET_SERVICE_client_drop (cs->client);
1533 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1534 op->peer = msg->target_peer;
1535 op->result_mode = ntohl (msg->result_mode);
1536 op->client_request_id = ntohl (msg->request_id);
1537 op->byzantine = msg->byzantine;
1538 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1539 op->force_full = msg->force_full;
1540 op->force_delta = msg->force_delta;
1541 context = GNUNET_MQ_extract_nested_mh (msg);
1543 /* Advance generation values, so that
1544 mutations won't interfer with the running operation. */
1546 op->generation_created = set->current_generation;
1547 advance_generation (set);
1548 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550 "Creating new CADET channel to port %s for set operation type %u\n",
1551 GNUNET_h2s (&msg->app_id),
1553 op->channel = GNUNET_CADET_channel_create (cadet,
1560 op->mq = GNUNET_CADET_get_mq (op->channel);
1561 op->state = set->vt->evaluate (op, context);
1562 if (NULL == op->state)
1565 GNUNET_SERVICE_client_drop (cs->client);
1568 GNUNET_SERVICE_client_continue (cs->client);
1573 * Handle an ack from a client, and send the next element. Note
1574 * that we only expect acks for set elements, not after the
1575 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1577 * @param cls client the client
1578 * @param ack the message
1581 handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1583 struct ClientState *cs = cls;
1586 if (NULL == (set = cs->set))
1588 /* client without a set acknowledged receiving a value */
1590 GNUNET_SERVICE_client_drop (cs->client);
1593 if (NULL == set->iter)
1595 /* client sent an ack, but we were not expecting one (as
1596 set iteration has finished) */
1598 GNUNET_SERVICE_client_drop (cs->client);
1601 GNUNET_SERVICE_client_continue (cs->client);
1602 if (ntohl (ack->send_more))
1604 send_client_element (set);
1608 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1610 set->iteration_id++;
1616 * Handle a request from the client to copy a set.
1618 * @param cls the client
1619 * @param mh the message
1622 handle_client_copy_lazy_prepare (void *cls,
1623 const struct GNUNET_MessageHeader *mh)
1625 struct ClientState *cs = cls;
1627 struct LazyCopyRequest *cr;
1628 struct GNUNET_MQ_Envelope *ev;
1629 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1631 if (NULL == (set = cs->set))
1633 /* client without a set requested an operation */
1635 GNUNET_SERVICE_client_drop (cs->client);
1638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639 "Client requested creation of lazy copy\n");
1640 cr = GNUNET_new (struct LazyCopyRequest);
1641 cr->cookie = ++lazy_copy_cookie;
1642 cr->source_set = set;
1643 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr);
1644 ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1645 resp_msg->cookie = cr->cookie;
1646 GNUNET_MQ_send (set->cs->mq, ev);
1647 GNUNET_SERVICE_client_continue (cs->client);
1652 * Handle a request from the client to connect to a copy of a set.
1654 * @param cls the client
1655 * @param msg the message
1658 handle_client_copy_lazy_connect (
1660 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1662 struct ClientState *cs = cls;
1663 struct LazyCopyRequest *cr;
1667 if (NULL != cs->set)
1669 /* There can only be one set per client */
1671 GNUNET_SERVICE_client_drop (cs->client);
1675 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1677 if (cr->cookie == msg->cookie)
1683 if (GNUNET_NO == found)
1685 /* client asked for copy with cookie we don't know */
1687 GNUNET_SERVICE_client_drop (cs->client);
1690 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr);
1691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692 "Client %p requested use of lazy copy\n",
1694 set = GNUNET_new (struct Set);
1695 switch (cr->source_set->operation)
1697 case GNUNET_SET_OPERATION_INTERSECTION:
1698 set->vt = _GSS_intersection_vt ();
1701 case GNUNET_SET_OPERATION_UNION:
1702 set->vt = _GSS_union_vt ();
1710 if (NULL == set->vt->copy_state)
1712 /* Lazy copy not supported for this set operation */
1716 GNUNET_SERVICE_client_drop (cs->client);
1720 set->operation = cr->source_set->operation;
1721 set->state = set->vt->copy_state (cr->source_set->state);
1722 set->content = cr->source_set->content;
1723 set->content->refcount++;
1725 set->current_generation = cr->source_set->current_generation;
1726 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1727 set->excluded_generations =
1728 GNUNET_memdup (cr->source_set->excluded_generations,
1729 set->excluded_generations_size
1730 * sizeof(struct GenerationRange));
1732 /* Advance the generation of the new set, so that mutations to the
1733 of the cloned set and the source set are independent. */
1734 advance_generation (set);
1738 GNUNET_SERVICE_client_continue (cs->client);
1743 * Handle a request from the client to cancel a running set operation.
1745 * @param cls the client
1746 * @param msg the message
1749 handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
1751 struct ClientState *cs = cls;
1753 struct Operation *op;
1756 if (NULL == (set = cs->set))
1758 /* client without a set requested an operation */
1760 GNUNET_SERVICE_client_drop (cs->client);
1764 for (op = set->ops_head; NULL != op; op = op->next)
1766 if (op->client_request_id == ntohl (msg->request_id))
1772 if (GNUNET_NO == found)
1774 /* It may happen that the operation was already destroyed due to
1775 * the other peer disconnecting. The client may not know about this
1776 * yet and try to cancel the (just barely non-existent) operation.
1777 * So this is not a hard error.
1778 */GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1779 "Client canceled non-existent op %u\n",
1780 (uint32_t) ntohl (msg->request_id));
1784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785 "Client requested cancel for op %u\n",
1786 (uint32_t) ntohl (msg->request_id));
1787 _GSS_operation_destroy (op, GNUNET_YES);
1789 GNUNET_SERVICE_client_continue (cs->client);
1794 * Handle a request from the client to accept a set operation that
1795 * came from a remote peer. We forward the accept to the associated
1796 * operation for handling
1798 * @param cls the client
1799 * @param msg the message
1802 handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1804 struct ClientState *cs = cls;
1806 struct Operation *op;
1807 struct GNUNET_SET_ResultMessage *result_message;
1808 struct GNUNET_MQ_Envelope *ev;
1809 struct Listener *listener;
1811 if (NULL == (set = cs->set))
1813 /* client without a set requested to accept */
1815 GNUNET_SERVICE_client_drop (cs->client);
1818 op = get_incoming (ntohl (msg->accept_reject_id));
1821 /* It is not an error if the set op does not exist -- it may
1822 * have been destroyed when the partner peer disconnected. */
1824 GNUNET_ERROR_TYPE_INFO,
1825 "Client %p accepted request %u of listener %p that is no longer active\n",
1827 ntohl (msg->accept_reject_id),
1829 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1830 result_message->request_id = msg->request_id;
1831 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1832 GNUNET_MQ_send (set->cs->mq, ev);
1833 GNUNET_SERVICE_client_continue (cs->client);
1836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837 "Client accepting request %u\n",
1838 (uint32_t) ntohl (msg->accept_reject_id));
1839 listener = op->listener;
1840 op->listener = NULL;
1841 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1843 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1844 op->client_request_id = ntohl (msg->request_id);
1845 op->result_mode = ntohl (msg->result_mode);
1846 op->byzantine = msg->byzantine;
1847 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1848 op->force_full = msg->force_full;
1849 op->force_delta = msg->force_delta;
1851 /* Advance generation values, so that future mutations do not
1852 interfer with the running operation. */
1853 op->generation_created = set->current_generation;
1854 advance_generation (set);
1855 GNUNET_assert (NULL == op->state);
1856 op->state = set->vt->accept (op);
1857 if (NULL == op->state)
1860 GNUNET_SERVICE_client_drop (cs->client);
1863 /* Now allow CADET to continue, as we did not do this in
1864 #handle_incoming_msg (as we wanted to first see if the
1865 local client would accept the request). */
1866 GNUNET_CADET_receive_done (op->channel);
1867 GNUNET_SERVICE_client_continue (cs->client);
1872 * Called to clean up, after a shutdown has been requested.
1874 * @param cls closure, NULL
1877 shutdown_task (void *cls)
1879 /* Delay actual shutdown to allow service to disconnect clients */
1880 in_shutdown = GNUNET_YES;
1881 if (0 == num_clients)
1885 GNUNET_CADET_disconnect (cadet);
1889 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1895 * Function called by the service's run
1896 * method to run service-specific setup code.
1898 * @param cls closure
1899 * @param cfg configuration to use
1900 * @param service the initialized service
1904 const struct GNUNET_CONFIGURATION_Handle *cfg,
1905 struct GNUNET_SERVICE_Handle *service)
1907 /* FIXME: need to modify SERVICE (!) API to allow
1908 us to run a shutdown task *after* clients were
1909 forcefully disconnected! */
1910 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1911 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1912 cadet = GNUNET_CADET_connect (cfg);
1915 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1916 _ ("Could not connect to CADET service\n"));
1917 GNUNET_SCHEDULER_shutdown ();
1924 * Define "main" method using service macro.
1926 GNUNET_SERVICE_MAIN (
1928 GNUNET_SERVICE_OPTION_NONE,
1931 &client_disconnect_cb,
1933 GNUNET_MQ_hd_fixed_size (client_accept,
1934 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1935 struct GNUNET_SET_AcceptMessage,
1937 GNUNET_MQ_hd_fixed_size (client_iter_ack,
1938 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1939 struct GNUNET_SET_IterAckMessage,
1941 GNUNET_MQ_hd_var_size (client_mutation,
1942 GNUNET_MESSAGE_TYPE_SET_ADD,
1943 struct GNUNET_SET_ElementMessage,
1945 GNUNET_MQ_hd_fixed_size (client_create_set,
1946 GNUNET_MESSAGE_TYPE_SET_CREATE,
1947 struct GNUNET_SET_CreateMessage,
1949 GNUNET_MQ_hd_fixed_size (client_iterate,
1950 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1951 struct GNUNET_MessageHeader,
1953 GNUNET_MQ_hd_var_size (client_evaluate,
1954 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1955 struct GNUNET_SET_EvaluateMessage,
1957 GNUNET_MQ_hd_fixed_size (client_listen,
1958 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1959 struct GNUNET_SET_ListenMessage,
1961 GNUNET_MQ_hd_fixed_size (client_reject,
1962 GNUNET_MESSAGE_TYPE_SET_REJECT,
1963 struct GNUNET_SET_RejectMessage,
1965 GNUNET_MQ_hd_var_size (client_mutation,
1966 GNUNET_MESSAGE_TYPE_SET_REMOVE,
1967 struct GNUNET_SET_ElementMessage,
1969 GNUNET_MQ_hd_fixed_size (client_cancel,
1970 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1971 struct GNUNET_SET_CancelMessage,
1973 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1974 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1975 struct GNUNET_MessageHeader,
1977 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1978 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1979 struct GNUNET_SET_CopyLazyConnectMessage,
1981 GNUNET_MQ_handler_end ());
1984 /* end of gnunet-service-set.c */