2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
40 * Lazy copy requests made by a client.
42 struct LazyCopyRequest {
46 struct LazyCopyRequest *prev;
51 struct LazyCopyRequest *next;
54 * Which set are we supposed to copy?
56 struct Set *source_set;
59 * Cookie identifying the request.
66 * A listener is inhabited by a client, and waits for evaluation
67 * requests from remote peers.
71 * Listeners are held in a doubly linked list.
73 struct Listener *next;
76 * Listeners are held in a doubly linked list.
78 struct Listener *prev;
81 * Head of DLL of operations this listener is responsible for.
82 * Once the client has accepted/declined the operation, the
83 * operation is moved to the respective set's operation DLLS.
85 struct Operation *op_head;
88 * Tail of DLL of operations this listener is responsible for.
89 * Once the client has accepted/declined the operation, the
90 * operation is moved to the respective set's operation DLLS.
92 struct Operation *op_tail;
95 * Client that owns the listener.
96 * Only one client may own a listener.
98 struct ClientState *cs;
101 * The port we are listening on with CADET.
103 struct GNUNET_CADET_Port *open_port;
106 * Application ID for the operation, used to distinguish
107 * multiple operations of the same type with the same peer.
109 struct GNUNET_HashCode app_id;
112 * The type of the operation.
114 enum GNUNET_SET_OperationType operation;
119 * Handle to the cadet service, used to listen for and connect to
122 static struct GNUNET_CADET_Handle *cadet;
125 * DLL of lazy copy requests by this client.
127 static struct LazyCopyRequest *lazy_copy_head;
130 * DLL of lazy copy requests by this client.
132 static struct LazyCopyRequest *lazy_copy_tail;
135 * Generator for unique cookie we set per lazy copy request.
137 static uint32_t lazy_copy_cookie;
142 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
145 * Listeners are held in a doubly linked list.
147 static struct Listener *listener_head;
150 * Listeners are held in a doubly linked list.
152 static struct Listener *listener_tail;
155 * Number of active clients.
157 static unsigned int num_clients;
160 * Are we in shutdown? if #GNUNET_YES and the number of clients
161 * drops to zero, disconnect from CADET.
163 static int in_shutdown;
166 * Counter for allocating unique IDs for clients, used to identify
167 * incoming operation requests from remote peers, that the client can
168 * choose to accept or refuse. 0 must not be used (reserved for
171 static uint32_t suggest_id;
175 * Get the incoming socket associated with the given id.
177 * @param listener the listener to look in
178 * @param id id to look for
179 * @return the incoming socket associated with the id,
180 * or NULL if there is none
182 static struct Operation *
183 get_incoming(uint32_t id)
185 for (struct Listener *listener = listener_head; NULL != listener;
186 listener = listener->next)
188 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
189 if (op->suggest_id == id)
197 * Destroy an incoming request from a remote peer
199 * @param op remote request to destroy
202 incoming_destroy(struct Operation *op)
204 struct Listener *listener;
206 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
207 "Destroying incoming operation %p\n",
209 if (NULL != (listener = op->listener))
211 GNUNET_CONTAINER_DLL_remove(listener->op_head, listener->op_tail, op);
214 if (NULL != op->timeout_task)
216 GNUNET_SCHEDULER_cancel(op->timeout_task);
217 op->timeout_task = NULL;
219 _GSS_operation_destroy2(op);
224 * Context for the #garbage_collect_cb().
226 struct GarbageContext {
228 * Map for which we are garbage collecting removed elements.
230 struct GNUNET_CONTAINER_MultiHashMap *map;
233 * Lowest generation for which an operation is still pending.
235 unsigned int min_op_generation;
238 * Largest generation for which an operation is still pending.
240 unsigned int max_op_generation;
245 * Function invoked to check if an element can be removed from
246 * the set's history because it is no longer needed.
248 * @param cls the `struct GarbageContext *`
249 * @param key key of the element in the map
250 * @param value the `struct ElementEntry *`
251 * @return #GNUNET_OK (continue to iterate)
254 garbage_collect_cb(void *cls, const struct GNUNET_HashCode *key, void *value)
256 //struct GarbageContext *gc = cls;
257 //struct ElementEntry *ee = value;
259 //if (GNUNET_YES != ee->removed)
261 //if ( (gc->max_op_generation < ee->generation_added) ||
262 // (ee->generation_removed > gc->min_op_generation) )
264 // GNUNET_assert (GNUNET_YES ==
265 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
275 * Collect and destroy elements that are not needed anymore, because
276 * their lifetime (as determined by their generation) does not overlap
277 * with any active set operation.
279 * @param set set to garbage collect
282 collect_generation_garbage(struct Set *set)
284 struct GarbageContext gc;
286 gc.min_op_generation = UINT_MAX;
287 gc.max_op_generation = 0;
288 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
290 gc.min_op_generation =
291 GNUNET_MIN(gc.min_op_generation, op->generation_created);
292 gc.max_op_generation =
293 GNUNET_MAX(gc.max_op_generation, op->generation_created);
295 gc.map = set->content->elements;
296 GNUNET_CONTAINER_multihashmap_iterate(set->content->elements,
303 * Is @a generation in the range of exclusions?
305 * @param generation generation to query
306 * @param excluded array of generations where the element is excluded
307 * @param excluded_size length of the @a excluded array
308 * @return #GNUNET_YES if @a generation is in any of the ranges
311 is_excluded_generation(unsigned int generation,
312 struct GenerationRange *excluded,
313 unsigned int excluded_size)
315 for (unsigned int i = 0; i < excluded_size; i++)
316 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
323 * Is element @a ee part of the set during @a query_generation?
325 * @param ee element to test
326 * @param query_generation generation to query
327 * @param excluded array of generations where the element is excluded
328 * @param excluded_size length of the @a excluded array
329 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
332 is_element_of_generation(struct ElementEntry *ee,
333 unsigned int query_generation,
334 struct GenerationRange *excluded,
335 unsigned int excluded_size)
337 struct MutationEvent *mut;
340 GNUNET_assert(NULL != ee->mutations);
342 is_excluded_generation(query_generation, excluded, excluded_size))
348 is_present = GNUNET_NO;
350 /* Could be made faster with binary search, but lists
351 are small, so why bother. */
352 for (unsigned int i = 0; i < ee->mutations_size; i++)
354 mut = &ee->mutations[i];
356 if (mut->generation > query_generation)
358 /* The mutation doesn't apply to our generation
359 anymore. We can'b break here, since mutations aren't
360 sorted by generation. */
365 is_excluded_generation(mut->generation, excluded, excluded_size))
367 /* The generation is excluded (because it belongs to another
368 fork via a lazy copy) and thus mutations aren't considered
369 for membership testing. */
373 /* This would be an inconsistency in how we manage mutations. */
374 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
377 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
380 is_present = mut->added;
388 * Is element @a ee part of the set used by @a op?
390 * @param ee element to test
391 * @param op operation the defines the set and its generation
392 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
395 _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
397 return is_element_of_generation(ee,
398 op->generation_created,
399 op->set->excluded_generations,
400 op->set->excluded_generations_size);
405 * Destroy the given operation. Used for any operation where both
406 * peers were known and that thus actually had a vt and channel. Must
407 * not be used for operations where 'listener' is still set and we do
408 * not know the other peer.
410 * Call the implementation-specific cancel function of the operation.
411 * Disconnects from the remote peer. Does not disconnect the client,
412 * as there may be multiple operations per set.
414 * @param op operation to destroy
415 * @param gc #GNUNET_YES to perform garbage collection on the set
418 _GSS_operation_destroy(struct Operation *op, int gc)
420 struct Set *set = op->set;
421 struct GNUNET_CADET_Channel *channel;
423 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
424 GNUNET_assert(NULL == op->listener);
425 if (NULL != op->state)
432 GNUNET_CONTAINER_DLL_remove(set->ops_head, set->ops_tail, op);
435 if (NULL != op->context_msg)
437 GNUNET_free(op->context_msg);
438 op->context_msg = NULL;
440 if (NULL != (channel = op->channel))
442 /* This will free op; called conditionally as this helper function
443 is also called from within the channel disconnect handler. */
445 GNUNET_CADET_channel_destroy(channel);
447 if ((NULL != set) && (GNUNET_YES == gc))
448 collect_generation_garbage(set);
449 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
450 * there was a channel end handler that will free 'op' on the call stack. */
455 * Callback called when a client connects to the service.
457 * @param cls closure for the service
458 * @param c the new client that connected to the service
459 * @param mq the message queue used to send messages to the client
460 * @return @a `struct ClientState`
463 client_connect_cb(void *cls,
464 struct GNUNET_SERVICE_Client *c,
465 struct GNUNET_MQ_Handle *mq)
467 struct ClientState *cs;
470 cs = GNUNET_new(struct ClientState);
478 * Iterator over hash map entries to free element entries.
481 * @param key current key code
482 * @param value a `struct ElementEntry *` to be free'd
483 * @return #GNUNET_YES (continue to iterate)
486 destroy_elements_iterator(void *cls,
487 const struct GNUNET_HashCode *key,
490 struct ElementEntry *ee = value;
492 GNUNET_free_non_null(ee->mutations);
499 * Clean up after a client has disconnected
501 * @param cls closure, unused
502 * @param client the client to clean up after
503 * @param internal_cls the `struct ClientState`
506 client_disconnect_cb(void *cls,
507 struct GNUNET_SERVICE_Client *client,
510 struct ClientState *cs = internal_cls;
511 struct Operation *op;
512 struct Listener *listener;
515 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
516 if (NULL != (set = cs->set))
518 struct SetContent *content = set->content;
519 struct PendingMutation *pm;
520 struct PendingMutation *pm_current;
521 struct LazyCopyRequest *lcr;
523 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
524 /* Destroy pending set operations */
525 while (NULL != set->ops_head)
526 _GSS_operation_destroy(set->ops_head, GNUNET_NO);
528 /* Destroy operation-specific state */
529 GNUNET_assert(NULL != set->state);
530 set->vt->destroy_set(set->state);
533 /* Clean up ongoing iterations */
534 if (NULL != set->iter)
536 GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
541 /* discard any pending mutations that reference this set */
542 pm = content->pending_mutations_head;
547 if (pm_current->set == set)
549 GNUNET_CONTAINER_DLL_remove(content->pending_mutations_head,
550 content->pending_mutations_tail,
552 GNUNET_free(pm_current);
556 /* free set content (or at least decrement RC) */
558 GNUNET_assert(0 != content->refcount);
560 if (0 == content->refcount)
562 GNUNET_assert(NULL != content->elements);
563 GNUNET_CONTAINER_multihashmap_iterate(content->elements,
564 &destroy_elements_iterator,
566 GNUNET_CONTAINER_multihashmap_destroy(content->elements);
567 content->elements = NULL;
568 GNUNET_free(content);
570 GNUNET_free_non_null(set->excluded_generations);
571 set->excluded_generations = NULL;
573 /* remove set from pending copy requests */
574 lcr = lazy_copy_head;
577 struct LazyCopyRequest *lcr_current = lcr;
580 if (lcr_current->source_set == set)
582 GNUNET_CONTAINER_DLL_remove(lazy_copy_head,
585 GNUNET_free(lcr_current);
591 if (NULL != (listener = cs->listener))
593 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
594 GNUNET_CADET_close_port(listener->open_port);
595 listener->open_port = NULL;
596 while (NULL != (op = listener->op_head))
598 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
599 "Destroying incoming operation `%u' from peer `%s'\n",
600 (unsigned int)op->client_request_id,
601 GNUNET_i2s(&op->peer));
602 incoming_destroy(op);
604 GNUNET_CONTAINER_DLL_remove(listener_head, listener_tail, listener);
605 GNUNET_free(listener);
609 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613 GNUNET_CADET_disconnect(cadet);
621 * Check a request for a set operation from another peer.
623 * @param cls the operation state
624 * @param msg the received message
625 * @return #GNUNET_OK if the channel should be kept alive,
626 * #GNUNET_SYSERR to destroy the channel
629 check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
631 struct Operation *op = cls;
632 struct Listener *listener = op->listener;
633 const struct GNUNET_MessageHeader *nested_context;
635 /* double operation request */
636 if (0 != op->suggest_id)
639 return GNUNET_SYSERR;
641 /* This should be equivalent to the previous condition, but can't hurt to check twice */
642 if (NULL == op->listener)
645 return GNUNET_SYSERR;
647 if (listener->operation !=
648 (enum GNUNET_SET_OperationType)ntohl(msg->operation))
651 return GNUNET_SYSERR;
653 nested_context = GNUNET_MQ_extract_nested_mh(msg);
654 if ((NULL != nested_context) &&
655 (ntohs(nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
658 return GNUNET_SYSERR;
665 * Handle a request for a set operation from another peer. Checks if we
666 * have a listener waiting for such a request (and in that case initiates
667 * asking the listener about accepting the connection). If no listener
668 * is waiting, we queue the operation request in hope that a listener
669 * shows up soon (before timeout).
671 * This msg is expected as the first and only msg handled through the
672 * non-operation bound virtual table, acceptance of this operation replaces
673 * our virtual table and subsequent msgs would be routed differently (as
674 * we then know what type of operation this is).
676 * @param cls the operation state
677 * @param msg the received message
678 * @return #GNUNET_OK if the channel should be kept alive,
679 * #GNUNET_SYSERR to destroy the channel
682 handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
684 struct Operation *op = cls;
685 struct Listener *listener = op->listener;
686 const struct GNUNET_MessageHeader *nested_context;
687 struct GNUNET_MQ_Envelope *env;
688 struct GNUNET_SET_RequestMessage *cmsg;
690 nested_context = GNUNET_MQ_extract_nested_mh(msg);
691 /* Make a copy of the nested_context (application-specific context
692 information that is opaque to set) so we can pass it to the
694 if (NULL != nested_context)
695 op->context_msg = GNUNET_copy_message(nested_context);
696 op->remote_element_count = ntohl(msg->element_count);
698 GNUNET_ERROR_TYPE_DEBUG,
699 "Received P2P operation request (op %u, port %s) for active listener\n",
700 (uint32_t)ntohl(msg->operation),
701 GNUNET_h2s(&op->listener->app_id));
702 GNUNET_assert(0 == op->suggest_id);
705 op->suggest_id = suggest_id++;
706 GNUNET_assert(NULL != op->timeout_task);
707 GNUNET_SCHEDULER_cancel(op->timeout_task);
708 op->timeout_task = NULL;
709 env = GNUNET_MQ_msg_nested_mh(cmsg,
710 GNUNET_MESSAGE_TYPE_SET_REQUEST,
713 GNUNET_ERROR_TYPE_DEBUG,
714 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718 cmsg->accept_id = htonl(op->suggest_id);
719 cmsg->peer_id = op->peer;
720 GNUNET_MQ_send(listener->cs->mq, env);
721 /* NOTE: GNUNET_CADET_receive_done() will be called in
722 #handle_client_accept() */
727 * Add an element to @a set as specified by @a msg
729 * @param set set to manipulate
730 * @param msg message specifying the change
733 execute_add(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
735 struct GNUNET_SET_Element el;
736 struct ElementEntry *ee;
737 struct GNUNET_HashCode hash;
739 GNUNET_assert(GNUNET_MESSAGE_TYPE_SET_ADD == ntohs(msg->header.type));
740 el.size = ntohs(msg->header.size) - sizeof(*msg);
742 el.element_type = ntohs(msg->element_type);
743 GNUNET_SET_element_hash(&el, &hash);
744 ee = GNUNET_CONTAINER_multihashmap_get(set->content->elements, &hash);
747 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
748 "Client inserts element %s of size %u\n",
751 ee = GNUNET_malloc(el.size + sizeof(*ee));
752 ee->element.size = el.size;
753 GNUNET_memcpy(&ee[1], el.data, el.size);
754 ee->element.data = &ee[1];
755 ee->element.element_type = el.element_type;
756 ee->remote = GNUNET_NO;
757 ee->mutations = NULL;
758 ee->mutations_size = 0;
759 ee->element_hash = hash;
760 GNUNET_break(GNUNET_YES ==
761 GNUNET_CONTAINER_multihashmap_put(
762 set->content->elements,
765 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
767 else if (GNUNET_YES ==
768 is_element_of_generation(ee,
769 set->current_generation,
770 set->excluded_generations,
771 set->excluded_generations_size))
773 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
774 "Client inserted element %s of size %u twice (ignored)\n",
778 /* same element inserted twice */
783 struct MutationEvent mut = { .generation = set->current_generation,
784 .added = GNUNET_YES };
785 GNUNET_array_append(ee->mutations, ee->mutations_size, mut);
787 set->vt->add(set->state, ee);
792 * Remove an element from @a set as specified by @a msg
794 * @param set set to manipulate
795 * @param msg message specifying the change
798 execute_remove(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
800 struct GNUNET_SET_Element el;
801 struct ElementEntry *ee;
802 struct GNUNET_HashCode hash;
804 GNUNET_assert(GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs(msg->header.type));
805 el.size = ntohs(msg->header.size) - sizeof(*msg);
807 el.element_type = ntohs(msg->element_type);
808 GNUNET_SET_element_hash(&el, &hash);
809 ee = GNUNET_CONTAINER_multihashmap_get(set->content->elements, &hash);
812 /* Client tried to remove non-existing element. */
813 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
814 "Client removes non-existing element of size %u\n",
818 if (GNUNET_NO == is_element_of_generation(ee,
819 set->current_generation,
820 set->excluded_generations,
821 set->excluded_generations_size))
823 /* Client tried to remove element twice */
824 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
825 "Client removed element of size %u twice (ignored)\n",
831 struct MutationEvent mut = { .generation = set->current_generation,
832 .added = GNUNET_NO };
834 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
835 "Client removes element of size %u\n",
838 GNUNET_array_append(ee->mutations, ee->mutations_size, mut);
840 set->vt->remove(set->state, ee);
845 * Perform a mutation on a set as specified by the @a msg
847 * @param set the set to mutate
848 * @param msg specification of what to change
851 execute_mutation(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
853 switch (ntohs(msg->header.type))
855 case GNUNET_MESSAGE_TYPE_SET_ADD:
856 execute_add(set, msg);
859 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
860 execute_remove(set, msg);
870 * Execute mutations that were delayed on a set because of
871 * pending operations.
873 * @param set the set to execute mutations on
876 execute_delayed_mutations(struct Set *set)
878 struct PendingMutation *pm;
880 if (0 != set->content->iterator_count)
881 return; /* still cannot do this */
882 while (NULL != (pm = set->content->pending_mutations_head))
884 GNUNET_CONTAINER_DLL_remove(set->content->pending_mutations_head,
885 set->content->pending_mutations_tail,
887 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
888 "Executing pending mutation on %p.\n",
890 execute_mutation(pm->set, pm->msg);
891 GNUNET_free(pm->msg);
898 * Send the next element of a set to the set's client. The next element is given by
899 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
900 * are no more elements in the set. The caller must ensure that the set's iterator is
903 * The client will acknowledge each received element with a
904 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
905 * #handle_client_iter_ack() will then trigger the next transmission.
906 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
908 * @param set set that should send its next element to its client
911 send_client_element(struct Set *set)
914 struct ElementEntry *ee;
915 struct GNUNET_MQ_Envelope *ev;
916 struct GNUNET_SET_IterResponseMessage *msg;
918 GNUNET_assert(NULL != set->iter);
921 ret = GNUNET_CONTAINER_multihashmap_iterator_next(set->iter,
924 if (GNUNET_NO == ret)
926 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
927 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
928 GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
931 GNUNET_assert(set->content->iterator_count > 0);
932 set->content->iterator_count--;
933 execute_delayed_mutations(set);
934 GNUNET_MQ_send(set->cs->mq, ev);
937 GNUNET_assert(NULL != ee);
940 is_element_of_generation(ee,
941 set->iter_generation,
942 set->excluded_generations,
943 set->excluded_generations_size));
944 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
945 "Sending iteration element on %p.\n",
947 ev = GNUNET_MQ_msg_extra(msg,
949 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
950 GNUNET_memcpy(&msg[1], ee->element.data, ee->element.size);
951 msg->element_type = htons(ee->element.element_type);
952 msg->iteration_id = htons(set->iteration_id);
953 GNUNET_MQ_send(set->cs->mq, ev);
958 * Called when a client wants to iterate the elements of a set.
959 * Checks if we have a set associated with the client and if we
960 * can right now start an iteration. If all checks out, starts
961 * sending the elements of the set to the client.
963 * @param cls client that sent the message
964 * @param m message sent by the client
967 handle_client_iterate(void *cls, const struct GNUNET_MessageHeader *m)
969 struct ClientState *cs = cls;
972 if (NULL == (set = cs->set))
974 /* attempt to iterate over a non existing set */
976 GNUNET_SERVICE_client_drop(cs->client);
979 if (NULL != set->iter)
981 /* Only one concurrent iterate-action allowed per set */
983 GNUNET_SERVICE_client_drop(cs->client);
986 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
987 "Iterating set %p in gen %u with %u content elements\n",
989 set->current_generation,
990 GNUNET_CONTAINER_multihashmap_size(set->content->elements));
991 GNUNET_SERVICE_client_continue(cs->client);
992 set->content->iterator_count++;
994 GNUNET_CONTAINER_multihashmap_iterator_create(set->content->elements);
995 set->iter_generation = set->current_generation;
996 send_client_element(set);
1001 * Called when a client wants to create a new set. This is typically
1002 * the first request from a client, and includes the type of set
1003 * operation to be performed.
1005 * @param cls client that sent the message
1006 * @param m message sent by the client
1009 handle_client_create_set(void *cls, const struct GNUNET_SET_CreateMessage *msg)
1011 struct ClientState *cs = cls;
1014 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1015 "Client created new set (operation %u)\n",
1016 (uint32_t)ntohl(msg->operation));
1017 if (NULL != cs->set)
1019 /* There can only be one set per client */
1021 GNUNET_SERVICE_client_drop(cs->client);
1024 set = GNUNET_new(struct Set);
1025 switch (ntohl(msg->operation))
1027 case GNUNET_SET_OPERATION_INTERSECTION:
1028 set->vt = _GSS_intersection_vt();
1031 case GNUNET_SET_OPERATION_UNION:
1032 set->vt = _GSS_union_vt();
1038 GNUNET_SERVICE_client_drop(cs->client);
1041 set->operation = (enum GNUNET_SET_OperationType)ntohl(msg->operation);
1042 set->state = set->vt->create();
1043 if (NULL == set->state)
1045 /* initialization failed (i.e. out of memory) */
1047 GNUNET_SERVICE_client_drop(cs->client);
1050 set->content = GNUNET_new(struct SetContent);
1051 set->content->refcount = 1;
1052 set->content->elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
1055 GNUNET_SERVICE_client_continue(cs->client);
1060 * Timeout happens iff:
1061 * - we suggested an operation to our listener,
1062 * but did not receive a response in time
1063 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1065 * @param cls channel context
1066 * @param tc context information (why was this task triggered now)
1069 incoming_timeout_cb(void *cls)
1071 struct Operation *op = cls;
1073 op->timeout_task = NULL;
1074 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1075 "Remote peer's incoming request timed out\n");
1076 incoming_destroy(op);
1081 * Method called whenever another peer has added us to a channel the
1082 * other peer initiated. Only called (once) upon reception of data
1083 * from a channel we listen on.
1085 * The channel context represents the operation itself and gets added
1086 * to a DLL, from where it gets looked up when our local listener
1087 * client responds to a proposed/suggested operation or connects and
1088 * associates with this operation.
1090 * @param cls closure
1091 * @param channel new handle to the channel
1092 * @param source peer that started the channel
1093 * @return initial channel context for the channel
1094 * returns NULL on error
1097 channel_new_cb(void *cls,
1098 struct GNUNET_CADET_Channel *channel,
1099 const struct GNUNET_PeerIdentity *source)
1101 struct Listener *listener = cls;
1102 struct Operation *op;
1104 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1105 op = GNUNET_new(struct Operation);
1106 op->listener = listener;
1108 op->channel = channel;
1109 op->mq = GNUNET_CADET_get_mq(op->channel);
1110 op->salt = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1111 op->timeout_task = GNUNET_SCHEDULER_add_delayed(INCOMING_CHANNEL_TIMEOUT,
1112 &incoming_timeout_cb,
1114 GNUNET_CONTAINER_DLL_insert(listener->op_head, listener->op_tail, op);
1120 * Function called whenever a channel is destroyed. Should clean up
1121 * any associated state. It must NOT call
1122 * GNUNET_CADET_channel_destroy() on the channel.
1124 * The peer_disconnect function is part of a a virtual table set initially either
1125 * when a peer creates a new channel with us, or once we create
1126 * a new channel ourselves (evaluate).
1128 * Once we know the exact type of operation (union/intersection), the vt is
1129 * replaced with an operation specific instance (_GSS_[op]_vt).
1131 * @param channel_ctx place where local state associated
1132 * with the channel is stored
1133 * @param channel connection to the other end (henceforth invalid)
1136 channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1138 struct Operation *op = channel_ctx;
1141 _GSS_operation_destroy2(op);
1146 * This function probably should not exist
1147 * and be replaced by inlining more specific
1148 * logic in the various places where it is called.
1151 _GSS_operation_destroy2(struct Operation *op)
1153 struct GNUNET_CADET_Channel *channel;
1155 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1156 if (NULL != (channel = op->channel))
1158 /* This will free op; called conditionally as this helper function
1159 is also called from within the channel disconnect handler. */
1161 GNUNET_CADET_channel_destroy(channel);
1163 if (NULL != op->listener)
1165 incoming_destroy(op);
1168 if (NULL != op->set)
1169 op->set->vt->channel_death(op);
1171 _GSS_operation_destroy(op, GNUNET_YES);
1177 * Function called whenever an MQ-channel's transmission window size changes.
1179 * The first callback in an outgoing channel will be with a non-zero value
1180 * and will mean the channel is connected to the destination.
1182 * For an incoming channel it will be called immediately after the
1183 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1185 * @param cls Channel closure.
1186 * @param channel Connection to the other end (henceforth invalid).
1187 * @param window_size New window size. If the is more messages than buffer size
1188 * this value will be negative..
1191 channel_window_cb(void *cls,
1192 const struct GNUNET_CADET_Channel *channel,
1195 /* FIXME: not implemented, we could do flow control here... */
1200 * Called when a client wants to create a new listener.
1202 * @param cls client that sent the message
1203 * @param msg message sent by the client
1206 handle_client_listen(void *cls, const struct GNUNET_SET_ListenMessage *msg)
1208 struct ClientState *cs = cls;
1209 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1210 { GNUNET_MQ_hd_var_size(incoming_msg,
1211 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1212 struct OperationRequestMessage,
1214 GNUNET_MQ_hd_var_size(union_p2p_ibf,
1215 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1218 GNUNET_MQ_hd_var_size(union_p2p_elements,
1219 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1220 struct GNUNET_SET_ElementMessage,
1222 GNUNET_MQ_hd_var_size(union_p2p_offer,
1223 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1224 struct GNUNET_MessageHeader,
1226 GNUNET_MQ_hd_var_size(union_p2p_inquiry,
1227 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1228 struct InquiryMessage,
1230 GNUNET_MQ_hd_var_size(union_p2p_demand,
1231 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1232 struct GNUNET_MessageHeader,
1234 GNUNET_MQ_hd_fixed_size(union_p2p_done,
1235 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1236 struct GNUNET_MessageHeader,
1238 GNUNET_MQ_hd_fixed_size(union_p2p_over,
1239 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1240 struct GNUNET_MessageHeader,
1242 GNUNET_MQ_hd_fixed_size(union_p2p_full_done,
1243 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1244 struct GNUNET_MessageHeader,
1246 GNUNET_MQ_hd_fixed_size(union_p2p_request_full,
1247 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1248 struct GNUNET_MessageHeader,
1250 GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1251 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1252 struct StrataEstimatorMessage,
1254 GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1255 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1256 struct StrataEstimatorMessage,
1258 GNUNET_MQ_hd_var_size(union_p2p_full_element,
1259 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1260 struct GNUNET_SET_ElementMessage,
1262 GNUNET_MQ_hd_fixed_size(intersection_p2p_element_info,
1263 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1264 struct IntersectionElementInfoMessage,
1266 GNUNET_MQ_hd_var_size(intersection_p2p_bf,
1267 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1270 GNUNET_MQ_hd_fixed_size(intersection_p2p_done,
1271 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1272 struct IntersectionDoneMessage,
1274 GNUNET_MQ_handler_end() };
1275 struct Listener *listener;
1277 if (NULL != cs->listener)
1279 /* max. one active listener per client! */
1281 GNUNET_SERVICE_client_drop(cs->client);
1284 listener = GNUNET_new(struct Listener);
1286 cs->listener = listener;
1287 listener->app_id = msg->app_id;
1288 listener->operation = (enum GNUNET_SET_OperationType)ntohl(msg->operation);
1289 GNUNET_CONTAINER_DLL_insert(listener_head, listener_tail, listener);
1290 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1291 "New listener created (op %u, port %s)\n",
1292 listener->operation,
1293 GNUNET_h2s(&listener->app_id));
1294 listener->open_port = GNUNET_CADET_open_port(cadet,
1301 GNUNET_SERVICE_client_continue(cs->client);
1306 * Called when the listening client rejects an operation
1307 * request by another peer.
1309 * @param cls client that sent the message
1310 * @param msg message sent by the client
1313 handle_client_reject(void *cls, const struct GNUNET_SET_RejectMessage *msg)
1315 struct ClientState *cs = cls;
1316 struct Operation *op;
1318 op = get_incoming(ntohl(msg->accept_reject_id));
1321 /* no matching incoming operation for this reject;
1322 could be that the other peer already disconnected... */
1323 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1324 "Client rejected unknown operation %u\n",
1325 (unsigned int)ntohl(msg->accept_reject_id));
1326 GNUNET_SERVICE_client_continue(cs->client);
1329 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1330 "Peer request (op %u, app %s) rejected by client\n",
1331 op->listener->operation,
1332 GNUNET_h2s(&cs->listener->app_id));
1333 _GSS_operation_destroy2(op);
1334 GNUNET_SERVICE_client_continue(cs->client);
1339 * Called when a client wants to add or remove an element to a set it inhabits.
1341 * @param cls client that sent the message
1342 * @param msg message sent by the client
1345 check_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
1347 /* NOTE: Technically, we should probably check with the
1348 block library whether the element we are given is well-formed */
1354 * Called when a client wants to add or remove an element to a set it inhabits.
1356 * @param cls client that sent the message
1357 * @param msg message sent by the client
1360 handle_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
1362 struct ClientState *cs = cls;
1365 if (NULL == (set = cs->set))
1367 /* client without a set requested an operation */
1369 GNUNET_SERVICE_client_drop(cs->client);
1372 GNUNET_SERVICE_client_continue(cs->client);
1374 if (0 != set->content->iterator_count)
1376 struct PendingMutation *pm;
1378 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1379 pm = GNUNET_new(struct PendingMutation);
1381 (struct GNUNET_SET_ElementMessage *)GNUNET_copy_message(&msg->header);
1383 GNUNET_CONTAINER_DLL_insert_tail(set->content->pending_mutations_head,
1384 set->content->pending_mutations_tail,
1388 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1389 execute_mutation(set, msg);
1394 * Advance the current generation of a set,
1395 * adding exclusion ranges if necessary.
1397 * @param set the set where we want to advance the generation
1400 advance_generation(struct Set *set)
1402 struct GenerationRange r;
1404 if (set->current_generation == set->content->latest_generation)
1406 set->content->latest_generation++;
1407 set->current_generation++;
1411 GNUNET_assert(set->current_generation < set->content->latest_generation);
1413 r.start = set->current_generation + 1;
1414 r.end = set->content->latest_generation + 1;
1415 set->content->latest_generation = r.end;
1416 set->current_generation = r.end;
1417 GNUNET_array_append(set->excluded_generations,
1418 set->excluded_generations_size,
1424 * Called when a client wants to initiate a set operation with another
1425 * peer. Initiates the CADET connection to the listener and sends the
1428 * @param cls client that sent the message
1429 * @param msg message sent by the client
1430 * @return #GNUNET_OK if the message is well-formed
1433 check_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1435 /* FIXME: suboptimal, even if the context below could be NULL,
1436 there are malformed messages this does not check for... */
1442 * Called when a client wants to initiate a set operation with another
1443 * peer. Initiates the CADET connection to the listener and sends the
1446 * @param cls client that sent the message
1447 * @param msg message sent by the client
1450 handle_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1452 struct ClientState *cs = cls;
1453 struct Operation *op = GNUNET_new(struct Operation);
1454 const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1455 { GNUNET_MQ_hd_var_size(incoming_msg,
1456 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1457 struct OperationRequestMessage,
1459 GNUNET_MQ_hd_var_size(union_p2p_ibf,
1460 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1463 GNUNET_MQ_hd_var_size(union_p2p_elements,
1464 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1465 struct GNUNET_SET_ElementMessage,
1467 GNUNET_MQ_hd_var_size(union_p2p_offer,
1468 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1469 struct GNUNET_MessageHeader,
1471 GNUNET_MQ_hd_var_size(union_p2p_inquiry,
1472 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1473 struct InquiryMessage,
1475 GNUNET_MQ_hd_var_size(union_p2p_demand,
1476 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1477 struct GNUNET_MessageHeader,
1479 GNUNET_MQ_hd_fixed_size(union_p2p_done,
1480 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1481 struct GNUNET_MessageHeader,
1483 GNUNET_MQ_hd_fixed_size(union_p2p_over,
1484 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1485 struct GNUNET_MessageHeader,
1487 GNUNET_MQ_hd_fixed_size(union_p2p_full_done,
1488 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1489 struct GNUNET_MessageHeader,
1491 GNUNET_MQ_hd_fixed_size(union_p2p_request_full,
1492 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1493 struct GNUNET_MessageHeader,
1495 GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1496 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1497 struct StrataEstimatorMessage,
1499 GNUNET_MQ_hd_var_size(union_p2p_strata_estimator,
1500 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1501 struct StrataEstimatorMessage,
1503 GNUNET_MQ_hd_var_size(union_p2p_full_element,
1504 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1505 struct GNUNET_SET_ElementMessage,
1507 GNUNET_MQ_hd_fixed_size(intersection_p2p_element_info,
1508 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1509 struct IntersectionElementInfoMessage,
1511 GNUNET_MQ_hd_var_size(intersection_p2p_bf,
1512 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1515 GNUNET_MQ_hd_fixed_size(intersection_p2p_done,
1516 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1517 struct IntersectionDoneMessage,
1519 GNUNET_MQ_handler_end() };
1521 const struct GNUNET_MessageHeader *context;
1523 if (NULL == (set = cs->set))
1527 GNUNET_SERVICE_client_drop(cs->client);
1530 op->salt = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1531 op->peer = msg->target_peer;
1532 op->result_mode = ntohl(msg->result_mode);
1533 op->client_request_id = ntohl(msg->request_id);
1534 op->byzantine = msg->byzantine;
1535 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1536 op->force_full = msg->force_full;
1537 op->force_delta = msg->force_delta;
1538 context = GNUNET_MQ_extract_nested_mh(msg);
1540 /* Advance generation values, so that
1541 mutations won't interfer with the running operation. */
1543 op->generation_created = set->current_generation;
1544 advance_generation(set);
1545 GNUNET_CONTAINER_DLL_insert(set->ops_head, set->ops_tail, op);
1546 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1547 "Creating new CADET channel to port %s for set operation type %u\n",
1548 GNUNET_h2s(&msg->app_id),
1550 op->channel = GNUNET_CADET_channel_create(cadet,
1557 op->mq = GNUNET_CADET_get_mq(op->channel);
1558 op->state = set->vt->evaluate(op, context);
1559 if (NULL == op->state)
1562 GNUNET_SERVICE_client_drop(cs->client);
1565 GNUNET_SERVICE_client_continue(cs->client);
1570 * Handle an ack from a client, and send the next element. Note
1571 * that we only expect acks for set elements, not after the
1572 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1574 * @param cls client the client
1575 * @param ack the message
1578 handle_client_iter_ack(void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1580 struct ClientState *cs = cls;
1583 if (NULL == (set = cs->set))
1585 /* client without a set acknowledged receiving a value */
1587 GNUNET_SERVICE_client_drop(cs->client);
1590 if (NULL == set->iter)
1592 /* client sent an ack, but we were not expecting one (as
1593 set iteration has finished) */
1595 GNUNET_SERVICE_client_drop(cs->client);
1598 GNUNET_SERVICE_client_continue(cs->client);
1599 if (ntohl(ack->send_more))
1601 send_client_element(set);
1605 GNUNET_CONTAINER_multihashmap_iterator_destroy(set->iter);
1607 set->iteration_id++;
1613 * Handle a request from the client to copy a set.
1615 * @param cls the client
1616 * @param mh the message
1619 handle_client_copy_lazy_prepare(void *cls,
1620 const struct GNUNET_MessageHeader *mh)
1622 struct ClientState *cs = cls;
1624 struct LazyCopyRequest *cr;
1625 struct GNUNET_MQ_Envelope *ev;
1626 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1628 if (NULL == (set = cs->set))
1630 /* client without a set requested an operation */
1632 GNUNET_SERVICE_client_drop(cs->client);
1635 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1636 "Client requested creation of lazy copy\n");
1637 cr = GNUNET_new(struct LazyCopyRequest);
1638 cr->cookie = ++lazy_copy_cookie;
1639 cr->source_set = set;
1640 GNUNET_CONTAINER_DLL_insert(lazy_copy_head, lazy_copy_tail, cr);
1641 ev = GNUNET_MQ_msg(resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1642 resp_msg->cookie = cr->cookie;
1643 GNUNET_MQ_send(set->cs->mq, ev);
1644 GNUNET_SERVICE_client_continue(cs->client);
1649 * Handle a request from the client to connect to a copy of a set.
1651 * @param cls the client
1652 * @param msg the message
1655 handle_client_copy_lazy_connect(
1657 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1659 struct ClientState *cs = cls;
1660 struct LazyCopyRequest *cr;
1664 if (NULL != cs->set)
1666 /* There can only be one set per client */
1668 GNUNET_SERVICE_client_drop(cs->client);
1672 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1674 if (cr->cookie == msg->cookie)
1680 if (GNUNET_NO == found)
1682 /* client asked for copy with cookie we don't know */
1684 GNUNET_SERVICE_client_drop(cs->client);
1687 GNUNET_CONTAINER_DLL_remove(lazy_copy_head, lazy_copy_tail, cr);
1688 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1689 "Client %p requested use of lazy copy\n",
1691 set = GNUNET_new(struct Set);
1692 switch (cr->source_set->operation)
1694 case GNUNET_SET_OPERATION_INTERSECTION:
1695 set->vt = _GSS_intersection_vt();
1698 case GNUNET_SET_OPERATION_UNION:
1699 set->vt = _GSS_union_vt();
1707 if (NULL == set->vt->copy_state)
1709 /* Lazy copy not supported for this set operation */
1713 GNUNET_SERVICE_client_drop(cs->client);
1717 set->operation = cr->source_set->operation;
1718 set->state = set->vt->copy_state(cr->source_set->state);
1719 set->content = cr->source_set->content;
1720 set->content->refcount++;
1722 set->current_generation = cr->source_set->current_generation;
1723 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1724 set->excluded_generations =
1725 GNUNET_memdup(cr->source_set->excluded_generations,
1726 set->excluded_generations_size *
1727 sizeof(struct GenerationRange));
1729 /* Advance the generation of the new set, so that mutations to the
1730 of the cloned set and the source set are independent. */
1731 advance_generation(set);
1735 GNUNET_SERVICE_client_continue(cs->client);
1740 * Handle a request from the client to cancel a running set operation.
1742 * @param cls the client
1743 * @param msg the message
1746 handle_client_cancel(void *cls, const struct GNUNET_SET_CancelMessage *msg)
1748 struct ClientState *cs = cls;
1750 struct Operation *op;
1753 if (NULL == (set = cs->set))
1755 /* client without a set requested an operation */
1757 GNUNET_SERVICE_client_drop(cs->client);
1761 for (op = set->ops_head; NULL != op; op = op->next)
1763 if (op->client_request_id == ntohl(msg->request_id))
1769 if (GNUNET_NO == found)
1771 /* It may happen that the operation was already destroyed due to
1772 * the other peer disconnecting. The client may not know about this
1773 * yet and try to cancel the (just barely non-existent) operation.
1774 * So this is not a hard error.
1776 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1777 "Client canceled non-existent op %u\n",
1778 (uint32_t)ntohl(msg->request_id));
1782 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1783 "Client requested cancel for op %u\n",
1784 (uint32_t)ntohl(msg->request_id));
1785 _GSS_operation_destroy(op, GNUNET_YES);
1787 GNUNET_SERVICE_client_continue(cs->client);
1792 * Handle a request from the client to accept a set operation that
1793 * came from a remote peer. We forward the accept to the associated
1794 * operation for handling
1796 * @param cls the client
1797 * @param msg the message
1800 handle_client_accept(void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1802 struct ClientState *cs = cls;
1804 struct Operation *op;
1805 struct GNUNET_SET_ResultMessage *result_message;
1806 struct GNUNET_MQ_Envelope *ev;
1807 struct Listener *listener;
1809 if (NULL == (set = cs->set))
1811 /* client without a set requested to accept */
1813 GNUNET_SERVICE_client_drop(cs->client);
1816 op = get_incoming(ntohl(msg->accept_reject_id));
1819 /* It is not an error if the set op does not exist -- it may
1820 * have been destroyed when the partner peer disconnected. */
1822 GNUNET_ERROR_TYPE_INFO,
1823 "Client %p accepted request %u of listener %p that is no longer active\n",
1825 ntohl(msg->accept_reject_id),
1827 ev = GNUNET_MQ_msg(result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1828 result_message->request_id = msg->request_id;
1829 result_message->result_status = htons(GNUNET_SET_STATUS_FAILURE);
1830 GNUNET_MQ_send(set->cs->mq, ev);
1831 GNUNET_SERVICE_client_continue(cs->client);
1834 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1835 "Client accepting request %u\n",
1836 (uint32_t)ntohl(msg->accept_reject_id));
1837 listener = op->listener;
1838 op->listener = NULL;
1839 GNUNET_CONTAINER_DLL_remove(listener->op_head, listener->op_tail, op);
1841 GNUNET_CONTAINER_DLL_insert(set->ops_head, set->ops_tail, op);
1842 op->client_request_id = ntohl(msg->request_id);
1843 op->result_mode = ntohl(msg->result_mode);
1844 op->byzantine = msg->byzantine;
1845 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1846 op->force_full = msg->force_full;
1847 op->force_delta = msg->force_delta;
1849 /* Advance generation values, so that future mutations do not
1850 interfer with the running operation. */
1851 op->generation_created = set->current_generation;
1852 advance_generation(set);
1853 GNUNET_assert(NULL == op->state);
1854 op->state = set->vt->accept(op);
1855 if (NULL == op->state)
1858 GNUNET_SERVICE_client_drop(cs->client);
1861 /* Now allow CADET to continue, as we did not do this in
1862 #handle_incoming_msg (as we wanted to first see if the
1863 local client would accept the request). */
1864 GNUNET_CADET_receive_done(op->channel);
1865 GNUNET_SERVICE_client_continue(cs->client);
1870 * Called to clean up, after a shutdown has been requested.
1872 * @param cls closure, NULL
1875 shutdown_task(void *cls)
1877 /* Delay actual shutdown to allow service to disconnect clients */
1878 in_shutdown = GNUNET_YES;
1879 if (0 == num_clients)
1883 GNUNET_CADET_disconnect(cadet);
1887 GNUNET_STATISTICS_destroy(_GSS_statistics, GNUNET_YES);
1888 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1893 * Function called by the service's run
1894 * method to run service-specific setup code.
1896 * @param cls closure
1897 * @param cfg configuration to use
1898 * @param service the initialized service
1902 const struct GNUNET_CONFIGURATION_Handle *cfg,
1903 struct GNUNET_SERVICE_Handle *service)
1905 /* FIXME: need to modify SERVICE (!) API to allow
1906 us to run a shutdown task *after* clients were
1907 forcefully disconnected! */
1908 GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL);
1909 _GSS_statistics = GNUNET_STATISTICS_create("set", cfg);
1910 cadet = GNUNET_CADET_connect(cfg);
1913 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1914 _("Could not connect to CADET service\n"));
1915 GNUNET_SCHEDULER_shutdown();
1922 * Define "main" method using service macro.
1924 GNUNET_SERVICE_MAIN(
1926 GNUNET_SERVICE_OPTION_NONE,
1929 &client_disconnect_cb,
1931 GNUNET_MQ_hd_fixed_size(client_accept,
1932 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1933 struct GNUNET_SET_AcceptMessage,
1935 GNUNET_MQ_hd_fixed_size(client_iter_ack,
1936 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1937 struct GNUNET_SET_IterAckMessage,
1939 GNUNET_MQ_hd_var_size(client_mutation,
1940 GNUNET_MESSAGE_TYPE_SET_ADD,
1941 struct GNUNET_SET_ElementMessage,
1943 GNUNET_MQ_hd_fixed_size(client_create_set,
1944 GNUNET_MESSAGE_TYPE_SET_CREATE,
1945 struct GNUNET_SET_CreateMessage,
1947 GNUNET_MQ_hd_fixed_size(client_iterate,
1948 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1949 struct GNUNET_MessageHeader,
1951 GNUNET_MQ_hd_var_size(client_evaluate,
1952 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1953 struct GNUNET_SET_EvaluateMessage,
1955 GNUNET_MQ_hd_fixed_size(client_listen,
1956 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1957 struct GNUNET_SET_ListenMessage,
1959 GNUNET_MQ_hd_fixed_size(client_reject,
1960 GNUNET_MESSAGE_TYPE_SET_REJECT,
1961 struct GNUNET_SET_RejectMessage,
1963 GNUNET_MQ_hd_var_size(client_mutation,
1964 GNUNET_MESSAGE_TYPE_SET_REMOVE,
1965 struct GNUNET_SET_ElementMessage,
1967 GNUNET_MQ_hd_fixed_size(client_cancel,
1968 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1969 struct GNUNET_SET_CancelMessage,
1971 GNUNET_MQ_hd_fixed_size(client_copy_lazy_prepare,
1972 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1973 struct GNUNET_MessageHeader,
1975 GNUNET_MQ_hd_fixed_size(client_copy_lazy_connect,
1976 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1977 struct GNUNET_SET_CopyLazyConnectMessage,
1979 GNUNET_MQ_handler_end());
1982 /* end of gnunet-service-set.c */