2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
40 * Lazy copy requests made by a client.
42 struct LazyCopyRequest
47 struct LazyCopyRequest *prev;
52 struct LazyCopyRequest *next;
55 * Which set are we supposed to copy?
57 struct Set *source_set;
60 * Cookie identifying the request.
68 * A listener is inhabited by a client, and waits for evaluation
69 * requests from remote peers.
74 * Listeners are held in a doubly linked list.
76 struct Listener *next;
79 * Listeners are held in a doubly linked list.
81 struct Listener *prev;
84 * Head of DLL of operations this listener is responsible for.
85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
88 struct Operation *op_head;
91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
95 struct Operation *op_tail;
98 * Client that owns the listener.
99 * Only one client may own a listener.
101 struct ClientState *cs;
104 * The port we are listening on with CADET.
106 struct GNUNET_CADET_Port *open_port;
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
112 struct GNUNET_HashCode app_id;
115 * The type of the operation.
117 enum GNUNET_SET_OperationType operation;
122 * Handle to the cadet service, used to listen for and connect to
125 static struct GNUNET_CADET_Handle *cadet;
128 * DLL of lazy copy requests by this client.
130 static struct LazyCopyRequest *lazy_copy_head;
133 * DLL of lazy copy requests by this client.
135 static struct LazyCopyRequest *lazy_copy_tail;
138 * Generator for unique cookie we set per lazy copy request.
140 static uint32_t lazy_copy_cookie;
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
148 * Listeners are held in a doubly linked list.
150 static struct Listener *listener_head;
153 * Listeners are held in a doubly linked list.
155 static struct Listener *listener_tail;
158 * Number of active clients.
160 static unsigned int num_clients;
163 * Are we in shutdown? if #GNUNET_YES and the number of clients
164 * drops to zero, disconnect from CADET.
166 static int in_shutdown;
169 * Counter for allocating unique IDs for clients, used to identify
170 * incoming operation requests from remote peers, that the client can
171 * choose to accept or refuse. 0 must not be used (reserved for
174 static uint32_t suggest_id;
178 * Get the incoming socket associated with the given id.
180 * @param listener the listener to look in
181 * @param id id to look for
182 * @return the incoming socket associated with the id,
183 * or NULL if there is none
185 static struct Operation *
186 get_incoming (uint32_t id)
188 for (struct Listener *listener = listener_head;
190 listener = listener->next)
192 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
193 if (op->suggest_id == id)
201 * Destroy an incoming request from a remote peer
203 * @param op remote request to destroy
206 incoming_destroy (struct Operation *op)
208 struct Listener *listener;
209 struct GNUNET_CADET_Channel *channel;
211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
212 "Destroying incoming operation %p\n",
214 if (NULL != (listener = op->listener))
216 GNUNET_CONTAINER_DLL_remove (listener->op_head,
221 if (NULL != op->timeout_task)
223 GNUNET_SCHEDULER_cancel (op->timeout_task);
224 op->timeout_task = NULL;
226 if (NULL != (channel = op->channel))
229 GNUNET_CADET_channel_destroy (channel);
235 * Context for the #garbage_collect_cb().
237 struct GarbageContext
241 * Map for which we are garbage collecting removed elements.
243 struct GNUNET_CONTAINER_MultiHashMap *map;
246 * Lowest generation for which an operation is still pending.
248 unsigned int min_op_generation;
251 * Largest generation for which an operation is still pending.
253 unsigned int max_op_generation;
259 * Function invoked to check if an element can be removed from
260 * the set's history because it is no longer needed.
262 * @param cls the `struct GarbageContext *`
263 * @param key key of the element in the map
264 * @param value the `struct ElementEntry *`
265 * @return #GNUNET_OK (continue to iterate)
268 garbage_collect_cb (void *cls,
269 const struct GNUNET_HashCode *key,
272 //struct GarbageContext *gc = cls;
273 //struct ElementEntry *ee = value;
275 //if (GNUNET_YES != ee->removed)
277 //if ( (gc->max_op_generation < ee->generation_added) ||
278 // (ee->generation_removed > gc->min_op_generation) )
280 // GNUNET_assert (GNUNET_YES ==
281 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
291 * Collect and destroy elements that are not needed anymore, because
292 * their lifetime (as determined by their generation) does not overlap
293 * with any active set operation.
295 * @param set set to garbage collect
298 collect_generation_garbage (struct Set *set)
300 struct GarbageContext gc;
302 gc.min_op_generation = UINT_MAX;
303 gc.max_op_generation = 0;
304 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
306 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
307 op->generation_created);
308 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
309 op->generation_created);
311 gc.map = set->content->elements;
312 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
319 * Is @a generation in the range of exclusions?
321 * @param generation generation to query
322 * @param excluded array of generations where the element is excluded
323 * @param excluded_size length of the @a excluded array
324 * @return #GNUNET_YES if @a generation is in any of the ranges
327 is_excluded_generation (unsigned int generation,
328 struct GenerationRange *excluded,
329 unsigned int excluded_size)
331 for (unsigned int i = 0; i < excluded_size; i++)
332 if ( (generation >= excluded[i].start) &&
333 (generation < excluded[i].end) )
340 * Is element @a ee part of the set during @a query_generation?
342 * @param ee element to test
343 * @param query_generation generation to query
344 * @param excluded array of generations where the element is excluded
345 * @param excluded_size length of the @a excluded array
346 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
349 is_element_of_generation (struct ElementEntry *ee,
350 unsigned int query_generation,
351 struct GenerationRange *excluded,
352 unsigned int excluded_size)
354 struct MutationEvent *mut;
357 GNUNET_assert (NULL != ee->mutations);
359 is_excluded_generation (query_generation,
367 is_present = GNUNET_NO;
369 /* Could be made faster with binary search, but lists
370 are small, so why bother. */
371 for (unsigned int i = 0; i < ee->mutations_size; i++)
373 mut = &ee->mutations[i];
375 if (mut->generation > query_generation)
377 /* The mutation doesn't apply to our generation
378 anymore. We can'b break here, since mutations aren't
379 sorted by generation. */
384 is_excluded_generation (mut->generation,
388 /* The generation is excluded (because it belongs to another
389 fork via a lazy copy) and thus mutations aren't considered
390 for membership testing. */
394 /* This would be an inconsistency in how we manage mutations. */
395 if ( (GNUNET_YES == is_present) &&
396 (GNUNET_YES == mut->added) )
399 if ( (GNUNET_NO == is_present) &&
400 (GNUNET_NO == mut->added) )
403 is_present = mut->added;
411 * Is element @a ee part of the set used by @a op?
413 * @param ee element to test
414 * @param op operation the defines the set and its generation
415 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
418 _GSS_is_element_of_operation (struct ElementEntry *ee,
419 struct Operation *op)
421 return is_element_of_generation (ee,
422 op->generation_created,
423 op->set->excluded_generations,
424 op->set->excluded_generations_size);
429 * Destroy the given operation. Used for any operation where both
430 * peers were known and that thus actually had a vt and channel. Must
431 * not be used for operations where 'listener' is still set and we do
432 * not know the other peer.
434 * Call the implementation-specific cancel function of the operation.
435 * Disconnects from the remote peer. Does not disconnect the client,
436 * as there may be multiple operations per set.
438 * @param op operation to destroy
439 * @param gc #GNUNET_YES to perform garbage collection on the set
442 _GSS_operation_destroy (struct Operation *op,
445 struct Set *set = op->set;
446 struct GNUNET_CADET_Channel *channel;
448 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449 "Destroying operation %p\n",
451 GNUNET_assert (NULL == op->listener);
452 if (NULL != op->state)
454 set->vt->cancel (op);
459 GNUNET_CONTAINER_DLL_remove (set->ops_head,
464 if (NULL != op->context_msg)
466 GNUNET_free (op->context_msg);
467 op->context_msg = NULL;
469 if (NULL != (channel = op->channel))
471 /* This will free op; called conditionally as this helper function
472 is also called from within the channel disconnect handler. */
474 GNUNET_CADET_channel_destroy (channel);
476 if ( (NULL != set) &&
478 collect_generation_garbage (set);
479 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
480 * there was a channel end handler that will free 'op' on the call stack. */
485 * Callback called when a client connects to the service.
487 * @param cls closure for the service
488 * @param c the new client that connected to the service
489 * @param mq the message queue used to send messages to the client
490 * @return @a `struct ClientState`
493 client_connect_cb (void *cls,
494 struct GNUNET_SERVICE_Client *c,
495 struct GNUNET_MQ_Handle *mq)
497 struct ClientState *cs;
500 cs = GNUNET_new (struct ClientState);
508 * Iterator over hash map entries to free element entries.
511 * @param key current key code
512 * @param value a `struct ElementEntry *` to be free'd
513 * @return #GNUNET_YES (continue to iterate)
516 destroy_elements_iterator (void *cls,
517 const struct GNUNET_HashCode *key,
520 struct ElementEntry *ee = value;
522 GNUNET_free_non_null (ee->mutations);
529 * Clean up after a client has disconnected
531 * @param cls closure, unused
532 * @param client the client to clean up after
533 * @param internal_cls the `struct ClientState`
536 client_disconnect_cb (void *cls,
537 struct GNUNET_SERVICE_Client *client,
540 struct ClientState *cs = internal_cls;
541 struct Operation *op;
542 struct Listener *listener;
545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546 "Client disconnected, cleaning up\n");
547 if (NULL != (set = cs->set))
549 struct SetContent *content = set->content;
550 struct PendingMutation *pm;
551 struct PendingMutation *pm_current;
552 struct LazyCopyRequest *lcr;
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "Destroying client's set\n");
556 /* Destroy pending set operations */
557 while (NULL != set->ops_head)
558 _GSS_operation_destroy (set->ops_head,
561 /* Destroy operation-specific state */
562 GNUNET_assert (NULL != set->state);
563 set->vt->destroy_set (set->state);
566 /* Clean up ongoing iterations */
567 if (NULL != set->iter)
569 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
574 /* discard any pending mutations that reference this set */
575 pm = content->pending_mutations_head;
580 if (pm_current->set == set)
582 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
583 content->pending_mutations_tail,
585 GNUNET_free (pm_current);
589 /* free set content (or at least decrement RC) */
591 GNUNET_assert (0 != content->refcount);
593 if (0 == content->refcount)
595 GNUNET_assert (NULL != content->elements);
596 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
597 &destroy_elements_iterator,
599 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
600 content->elements = NULL;
601 GNUNET_free (content);
603 GNUNET_free_non_null (set->excluded_generations);
604 set->excluded_generations = NULL;
606 /* remove set from pending copy requests */
607 lcr = lazy_copy_head;
610 struct LazyCopyRequest *lcr_current = lcr;
613 if (lcr_current->source_set == set)
615 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
618 GNUNET_free (lcr_current);
624 if (NULL != (listener = cs->listener))
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627 "Destroying client's listener\n");
628 GNUNET_CADET_close_port (listener->open_port);
629 listener->open_port = NULL;
630 while (NULL != (op = listener->op_head))
632 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633 "Destroying incoming operation `%u' from peer `%s'\n",
634 (unsigned int) op->client_request_id,
635 GNUNET_i2s (&op->peer));
636 incoming_destroy (op);
638 GNUNET_CONTAINER_DLL_remove (listener_head,
641 GNUNET_free (listener);
645 if ( (GNUNET_YES == in_shutdown) &&
650 GNUNET_CADET_disconnect (cadet);
658 * Check a request for a set operation from another peer.
660 * @param cls the operation state
661 * @param msg the received message
662 * @return #GNUNET_OK if the channel should be kept alive,
663 * #GNUNET_SYSERR to destroy the channel
666 check_incoming_msg (void *cls,
667 const struct OperationRequestMessage *msg)
669 struct Operation *op = cls;
670 struct Listener *listener = op->listener;
671 const struct GNUNET_MessageHeader *nested_context;
673 /* double operation request */
674 if (0 != op->suggest_id)
677 return GNUNET_SYSERR;
679 /* This should be equivalent to the previous condition, but can't hurt to check twice */
680 if (NULL == op->listener)
683 return GNUNET_SYSERR;
685 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
688 return GNUNET_SYSERR;
690 nested_context = GNUNET_MQ_extract_nested_mh (msg);
691 if ( (NULL != nested_context) &&
692 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
695 return GNUNET_SYSERR;
702 * Handle a request for a set operation from another peer. Checks if we
703 * have a listener waiting for such a request (and in that case initiates
704 * asking the listener about accepting the connection). If no listener
705 * is waiting, we queue the operation request in hope that a listener
706 * shows up soon (before timeout).
708 * This msg is expected as the first and only msg handled through the
709 * non-operation bound virtual table, acceptance of this operation replaces
710 * our virtual table and subsequent msgs would be routed differently (as
711 * we then know what type of operation this is).
713 * @param cls the operation state
714 * @param msg the received message
715 * @return #GNUNET_OK if the channel should be kept alive,
716 * #GNUNET_SYSERR to destroy the channel
719 handle_incoming_msg (void *cls,
720 const struct OperationRequestMessage *msg)
722 struct Operation *op = cls;
723 struct Listener *listener = op->listener;
724 const struct GNUNET_MessageHeader *nested_context;
725 struct GNUNET_MQ_Envelope *env;
726 struct GNUNET_SET_RequestMessage *cmsg;
728 nested_context = GNUNET_MQ_extract_nested_mh (msg);
729 /* Make a copy of the nested_context (application-specific context
730 information that is opaque to set) so we can pass it to the
732 if (NULL != nested_context)
733 op->context_msg = GNUNET_copy_message (nested_context);
734 op->remote_element_count = ntohl (msg->element_count);
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "Received P2P operation request (op %u, port %s) for active listener\n",
737 (uint32_t) ntohl (msg->operation),
738 GNUNET_h2s (&op->listener->app_id));
739 GNUNET_assert (0 == op->suggest_id);
742 op->suggest_id = suggest_id++;
743 GNUNET_assert (NULL != op->timeout_task);
744 GNUNET_SCHEDULER_cancel (op->timeout_task);
745 op->timeout_task = NULL;
746 env = GNUNET_MQ_msg_nested_mh (cmsg,
747 GNUNET_MESSAGE_TYPE_SET_REQUEST,
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
754 cmsg->accept_id = htonl (op->suggest_id);
755 cmsg->peer_id = op->peer;
756 GNUNET_MQ_send (listener->cs->mq,
758 /* NOTE: GNUNET_CADET_receive_done() will be called in
759 #handle_client_accept() */
764 * Add an element to @a set as specified by @a msg
766 * @param set set to manipulate
767 * @param msg message specifying the change
770 execute_add (struct Set *set,
771 const struct GNUNET_SET_ElementMessage *msg)
773 struct GNUNET_SET_Element el;
774 struct ElementEntry *ee;
775 struct GNUNET_HashCode hash;
777 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
778 el.size = ntohs (msg->header.size) - sizeof (*msg);
780 el.element_type = ntohs (msg->element_type);
781 GNUNET_SET_element_hash (&el,
783 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "Client inserts element %s of size %u\n",
791 ee = GNUNET_malloc (el.size + sizeof (*ee));
792 ee->element.size = el.size;
793 GNUNET_memcpy (&ee[1],
796 ee->element.data = &ee[1];
797 ee->element.element_type = el.element_type;
798 ee->remote = GNUNET_NO;
799 ee->mutations = NULL;
800 ee->mutations_size = 0;
801 ee->element_hash = hash;
802 GNUNET_break (GNUNET_YES ==
803 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
806 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
808 else if (GNUNET_YES ==
809 is_element_of_generation (ee,
810 set->current_generation,
811 set->excluded_generations,
812 set->excluded_generations_size))
814 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815 "Client inserted element %s of size %u twice (ignored)\n",
819 /* same element inserted twice */
824 struct MutationEvent mut = {
825 .generation = set->current_generation,
828 GNUNET_array_append (ee->mutations,
832 set->vt->add (set->state,
838 * Remove an element from @a set as specified by @a msg
840 * @param set set to manipulate
841 * @param msg message specifying the change
844 execute_remove (struct Set *set,
845 const struct GNUNET_SET_ElementMessage *msg)
847 struct GNUNET_SET_Element el;
848 struct ElementEntry *ee;
849 struct GNUNET_HashCode hash;
851 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
852 el.size = ntohs (msg->header.size) - sizeof (*msg);
854 el.element_type = ntohs (msg->element_type);
855 GNUNET_SET_element_hash (&el, &hash);
856 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
860 /* Client tried to remove non-existing element. */
861 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862 "Client removes non-existing element of size %u\n",
867 is_element_of_generation (ee,
868 set->current_generation,
869 set->excluded_generations,
870 set->excluded_generations_size))
872 /* Client tried to remove element twice */
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "Client removed element of size %u twice (ignored)\n",
880 struct MutationEvent mut = {
881 .generation = set->current_generation,
885 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886 "Client removes element of size %u\n",
889 GNUNET_array_append (ee->mutations,
893 set->vt->remove (set->state,
899 * Perform a mutation on a set as specified by the @a msg
901 * @param set the set to mutate
902 * @param msg specification of what to change
905 execute_mutation (struct Set *set,
906 const struct GNUNET_SET_ElementMessage *msg)
908 switch (ntohs (msg->header.type))
910 case GNUNET_MESSAGE_TYPE_SET_ADD:
911 execute_add (set, msg);
913 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
914 execute_remove (set, msg);
923 * Execute mutations that were delayed on a set because of
924 * pending operations.
926 * @param set the set to execute mutations on
929 execute_delayed_mutations (struct Set *set)
931 struct PendingMutation *pm;
933 if (0 != set->content->iterator_count)
934 return; /* still cannot do this */
935 while (NULL != (pm = set->content->pending_mutations_head))
937 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
938 set->content->pending_mutations_tail,
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Executing pending mutation on %p.\n",
943 execute_mutation (pm->set,
945 GNUNET_free (pm->msg);
952 * Send the next element of a set to the set's client. The next element is given by
953 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
954 * are no more elements in the set. The caller must ensure that the set's iterator is
957 * The client will acknowledge each received element with a
958 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
959 * #handle_client_iter_ack() will then trigger the next transmission.
960 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
962 * @param set set that should send its next element to its client
965 send_client_element (struct Set *set)
968 struct ElementEntry *ee;
969 struct GNUNET_MQ_Envelope *ev;
970 struct GNUNET_SET_IterResponseMessage *msg;
972 GNUNET_assert (NULL != set->iter);
974 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
976 (const void **) &ee);
977 if (GNUNET_NO == ret)
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "Iteration on %p done.\n",
982 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
983 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
986 GNUNET_assert (set->content->iterator_count > 0);
987 set->content->iterator_count--;
988 execute_delayed_mutations (set);
989 GNUNET_MQ_send (set->cs->mq,
993 GNUNET_assert (NULL != ee);
994 } while (GNUNET_NO ==
995 is_element_of_generation (ee,
996 set->iter_generation,
997 set->excluded_generations,
998 set->excluded_generations_size));
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000 "Sending iteration element on %p.\n",
1002 ev = GNUNET_MQ_msg_extra (msg,
1004 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1005 GNUNET_memcpy (&msg[1],
1008 msg->element_type = htons (ee->element.element_type);
1009 msg->iteration_id = htons (set->iteration_id);
1010 GNUNET_MQ_send (set->cs->mq,
1016 * Called when a client wants to iterate the elements of a set.
1017 * Checks if we have a set associated with the client and if we
1018 * can right now start an iteration. If all checks out, starts
1019 * sending the elements of the set to the client.
1021 * @param cls client that sent the message
1022 * @param m message sent by the client
1025 handle_client_iterate (void *cls,
1026 const struct GNUNET_MessageHeader *m)
1028 struct ClientState *cs = cls;
1031 if (NULL == (set = cs->set))
1033 /* attempt to iterate over a non existing set */
1035 GNUNET_SERVICE_client_drop (cs->client);
1038 if (NULL != set->iter)
1040 /* Only one concurrent iterate-action allowed per set */
1042 GNUNET_SERVICE_client_drop (cs->client);
1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046 "Iterating set %p in gen %u with %u content elements\n",
1048 set->current_generation,
1049 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1050 GNUNET_SERVICE_client_continue (cs->client);
1051 set->content->iterator_count++;
1052 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1053 set->iter_generation = set->current_generation;
1054 send_client_element (set);
1059 * Called when a client wants to create a new set. This is typically
1060 * the first request from a client, and includes the type of set
1061 * operation to be performed.
1063 * @param cls client that sent the message
1064 * @param m message sent by the client
1067 handle_client_create_set (void *cls,
1068 const struct GNUNET_SET_CreateMessage *msg)
1070 struct ClientState *cs = cls;
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "Client created new set (operation %u)\n",
1075 (uint32_t) ntohl (msg->operation));
1076 if (NULL != cs->set)
1078 /* There can only be one set per client */
1080 GNUNET_SERVICE_client_drop (cs->client);
1083 set = GNUNET_new (struct Set);
1084 switch (ntohl (msg->operation))
1086 case GNUNET_SET_OPERATION_INTERSECTION:
1087 set->vt = _GSS_intersection_vt ();
1089 case GNUNET_SET_OPERATION_UNION:
1090 set->vt = _GSS_union_vt ();
1095 GNUNET_SERVICE_client_drop (cs->client);
1098 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1099 set->state = set->vt->create ();
1100 if (NULL == set->state)
1102 /* initialization failed (i.e. out of memory) */
1104 GNUNET_SERVICE_client_drop (cs->client);
1107 set->content = GNUNET_new (struct SetContent);
1108 set->content->refcount = 1;
1109 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1113 GNUNET_SERVICE_client_continue (cs->client);
1118 * Timeout happens iff:
1119 * - we suggested an operation to our listener,
1120 * but did not receive a response in time
1121 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1123 * @param cls channel context
1124 * @param tc context information (why was this task triggered now)
1127 incoming_timeout_cb (void *cls)
1129 struct Operation *op = cls;
1131 op->timeout_task = NULL;
1132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133 "Remote peer's incoming request timed out\n");
1134 incoming_destroy (op);
1139 * Method called whenever another peer has added us to a channel the
1140 * other peer initiated. Only called (once) upon reception of data
1141 * from a channel we listen on.
1143 * The channel context represents the operation itself and gets added
1144 * to a DLL, from where it gets looked up when our local listener
1145 * client responds to a proposed/suggested operation or connects and
1146 * associates with this operation.
1148 * @param cls closure
1149 * @param channel new handle to the channel
1150 * @param source peer that started the channel
1151 * @return initial channel context for the channel
1152 * returns NULL on error
1155 channel_new_cb (void *cls,
1156 struct GNUNET_CADET_Channel *channel,
1157 const struct GNUNET_PeerIdentity *source)
1159 struct Listener *listener = cls;
1160 struct Operation *op;
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "New incoming channel\n");
1164 op = GNUNET_new (struct Operation);
1165 op->listener = listener;
1167 op->channel = channel;
1168 op->mq = GNUNET_CADET_get_mq (op->channel);
1169 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1172 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1173 &incoming_timeout_cb,
1175 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1183 * Function called whenever a channel is destroyed. Should clean up
1184 * any associated state. It must NOT call
1185 * GNUNET_CADET_channel_destroy() on the channel.
1187 * The peer_disconnect function is part of a a virtual table set initially either
1188 * when a peer creates a new channel with us, or once we create
1189 * a new channel ourselves (evaluate).
1191 * Once we know the exact type of operation (union/intersection), the vt is
1192 * replaced with an operation specific instance (_GSS_[op]_vt).
1194 * @param channel_ctx place where local state associated
1195 * with the channel is stored
1196 * @param channel connection to the other end (henceforth invalid)
1199 channel_end_cb (void *channel_ctx,
1200 const struct GNUNET_CADET_Channel *channel)
1202 struct Operation *op = channel_ctx;
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205 "channel_end_cb called\n");
1207 if (NULL != op->listener)
1208 incoming_destroy (op);
1209 else if (NULL != op->set)
1210 op->set->vt->channel_death (op);
1212 _GSS_operation_destroy (op,
1219 * Function called whenever an MQ-channel's transmission window size changes.
1221 * The first callback in an outgoing channel will be with a non-zero value
1222 * and will mean the channel is connected to the destination.
1224 * For an incoming channel it will be called immediately after the
1225 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1227 * @param cls Channel closure.
1228 * @param channel Connection to the other end (henceforth invalid).
1229 * @param window_size New window size. If the is more messages than buffer size
1230 * this value will be negative..
1233 channel_window_cb (void *cls,
1234 const struct GNUNET_CADET_Channel *channel,
1237 /* FIXME: not implemented, we could do flow control here... */
1242 * Called when a client wants to create a new listener.
1244 * @param cls client that sent the message
1245 * @param msg message sent by the client
1248 handle_client_listen (void *cls,
1249 const struct GNUNET_SET_ListenMessage *msg)
1251 struct ClientState *cs = cls;
1252 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1253 GNUNET_MQ_hd_var_size (incoming_msg,
1254 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1255 struct OperationRequestMessage,
1257 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1258 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1261 GNUNET_MQ_hd_var_size (union_p2p_elements,
1262 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1263 struct GNUNET_SET_ElementMessage,
1265 GNUNET_MQ_hd_var_size (union_p2p_offer,
1266 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1267 struct GNUNET_MessageHeader,
1269 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1270 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1271 struct InquiryMessage,
1273 GNUNET_MQ_hd_var_size (union_p2p_demand,
1274 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1275 struct GNUNET_MessageHeader,
1277 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1278 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1279 struct GNUNET_MessageHeader,
1281 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1282 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1283 struct GNUNET_MessageHeader,
1285 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1286 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1287 struct GNUNET_MessageHeader,
1289 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1290 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1291 struct GNUNET_MessageHeader,
1293 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1294 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1295 struct StrataEstimatorMessage,
1297 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1298 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1299 struct StrataEstimatorMessage,
1301 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1302 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1303 struct GNUNET_SET_ElementMessage,
1305 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1306 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1307 struct IntersectionElementInfoMessage,
1309 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1310 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1313 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1314 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1315 struct IntersectionDoneMessage,
1317 GNUNET_MQ_handler_end ()
1319 struct Listener *listener;
1321 if (NULL != cs->listener)
1323 /* max. one active listener per client! */
1325 GNUNET_SERVICE_client_drop (cs->client);
1328 listener = GNUNET_new (struct Listener);
1330 cs->listener = listener;
1331 listener->app_id = msg->app_id;
1332 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1333 GNUNET_CONTAINER_DLL_insert (listener_head,
1336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337 "New listener created (op %u, port %s)\n",
1338 listener->operation,
1339 GNUNET_h2s (&listener->app_id));
1341 = GNUNET_CADET_open_port (cadet,
1348 GNUNET_SERVICE_client_continue (cs->client);
1353 * Called when the listening client rejects an operation
1354 * request by another peer.
1356 * @param cls client that sent the message
1357 * @param msg message sent by the client
1360 handle_client_reject (void *cls,
1361 const struct GNUNET_SET_RejectMessage *msg)
1363 struct ClientState *cs = cls;
1364 struct Operation *op;
1366 op = get_incoming (ntohl (msg->accept_reject_id));
1369 /* no matching incoming operation for this reject;
1370 could be that the other peer already disconnected... */
1371 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1372 "Client rejected unknown operation %u\n",
1373 (unsigned int) ntohl (msg->accept_reject_id));
1374 GNUNET_SERVICE_client_continue (cs->client);
1377 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378 "Peer request (op %u, app %s) rejected by client\n",
1379 op->listener->operation,
1380 GNUNET_h2s (&cs->listener->app_id));
1381 GNUNET_CADET_channel_destroy (op->channel);
1382 GNUNET_SERVICE_client_continue (cs->client);
1387 * Called when a client wants to add or remove an element to a set it inhabits.
1389 * @param cls client that sent the message
1390 * @param msg message sent by the client
1393 check_client_mutation (void *cls,
1394 const struct GNUNET_SET_ElementMessage *msg)
1396 /* NOTE: Technically, we should probably check with the
1397 block library whether the element we are given is well-formed */
1403 * Called when a client wants to add or remove an element to a set it inhabits.
1405 * @param cls client that sent the message
1406 * @param msg message sent by the client
1409 handle_client_mutation (void *cls,
1410 const struct GNUNET_SET_ElementMessage *msg)
1412 struct ClientState *cs = cls;
1415 if (NULL == (set = cs->set))
1417 /* client without a set requested an operation */
1419 GNUNET_SERVICE_client_drop (cs->client);
1422 GNUNET_SERVICE_client_continue (cs->client);
1424 if (0 != set->content->iterator_count)
1426 struct PendingMutation *pm;
1428 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429 "Scheduling mutation on set\n");
1430 pm = GNUNET_new (struct PendingMutation);
1431 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1433 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1434 set->content->pending_mutations_tail,
1438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439 "Executing mutation on set\n");
1440 execute_mutation (set,
1446 * Advance the current generation of a set,
1447 * adding exclusion ranges if necessary.
1449 * @param set the set where we want to advance the generation
1452 advance_generation (struct Set *set)
1454 struct GenerationRange r;
1456 if (set->current_generation == set->content->latest_generation)
1458 set->content->latest_generation++;
1459 set->current_generation++;
1463 GNUNET_assert (set->current_generation < set->content->latest_generation);
1465 r.start = set->current_generation + 1;
1466 r.end = set->content->latest_generation + 1;
1467 set->content->latest_generation = r.end;
1468 set->current_generation = r.end;
1469 GNUNET_array_append (set->excluded_generations,
1470 set->excluded_generations_size,
1476 * Called when a client wants to initiate a set operation with another
1477 * peer. Initiates the CADET connection to the listener and sends the
1480 * @param cls client that sent the message
1481 * @param msg message sent by the client
1482 * @return #GNUNET_OK if the message is well-formed
1485 check_client_evaluate (void *cls,
1486 const struct GNUNET_SET_EvaluateMessage *msg)
1488 /* FIXME: suboptimal, even if the context below could be NULL,
1489 there are malformed messages this does not check for... */
1495 * Called when a client wants to initiate a set operation with another
1496 * peer. Initiates the CADET connection to the listener and sends the
1499 * @param cls client that sent the message
1500 * @param msg message sent by the client
1503 handle_client_evaluate (void *cls,
1504 const struct GNUNET_SET_EvaluateMessage *msg)
1506 struct ClientState *cs = cls;
1507 struct Operation *op = GNUNET_new (struct Operation);
1508 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1509 GNUNET_MQ_hd_var_size (incoming_msg,
1510 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1511 struct OperationRequestMessage,
1513 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1514 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1517 GNUNET_MQ_hd_var_size (union_p2p_elements,
1518 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1519 struct GNUNET_SET_ElementMessage,
1521 GNUNET_MQ_hd_var_size (union_p2p_offer,
1522 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1523 struct GNUNET_MessageHeader,
1525 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1526 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1527 struct InquiryMessage,
1529 GNUNET_MQ_hd_var_size (union_p2p_demand,
1530 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1531 struct GNUNET_MessageHeader,
1533 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1534 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1535 struct GNUNET_MessageHeader,
1537 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1538 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1539 struct GNUNET_MessageHeader,
1541 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1542 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1543 struct GNUNET_MessageHeader,
1545 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1546 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1547 struct GNUNET_MessageHeader,
1549 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1550 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1551 struct StrataEstimatorMessage,
1553 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1554 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1555 struct StrataEstimatorMessage,
1557 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1558 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1559 struct GNUNET_SET_ElementMessage,
1561 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1562 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1563 struct IntersectionElementInfoMessage,
1565 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1566 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1569 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1570 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1571 struct IntersectionDoneMessage,
1573 GNUNET_MQ_handler_end ()
1576 const struct GNUNET_MessageHeader *context;
1578 if (NULL == (set = cs->set))
1582 GNUNET_SERVICE_client_drop (cs->client);
1585 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1587 op->peer = msg->target_peer;
1588 op->result_mode = ntohl (msg->result_mode);
1589 op->client_request_id = ntohl (msg->request_id);
1590 op->byzantine = msg->byzantine;
1591 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1592 op->force_full = msg->force_full;
1593 op->force_delta = msg->force_delta;
1594 context = GNUNET_MQ_extract_nested_mh (msg);
1596 /* Advance generation values, so that
1597 mutations won't interfer with the running operation. */
1599 op->generation_created = set->current_generation;
1600 advance_generation (set);
1601 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605 "Creating new CADET channel to port %s for set operation type %u\n",
1606 GNUNET_h2s (&msg->app_id),
1608 op->channel = GNUNET_CADET_channel_create (cadet,
1612 GNUNET_CADET_OPTION_RELIABLE,
1616 op->mq = GNUNET_CADET_get_mq (op->channel);
1617 op->state = set->vt->evaluate (op,
1619 if (NULL == op->state)
1622 GNUNET_SERVICE_client_drop (cs->client);
1625 GNUNET_SERVICE_client_continue (cs->client);
1630 * Handle an ack from a client, and send the next element. Note
1631 * that we only expect acks for set elements, not after the
1632 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1634 * @param cls client the client
1635 * @param ack the message
1638 handle_client_iter_ack (void *cls,
1639 const struct GNUNET_SET_IterAckMessage *ack)
1641 struct ClientState *cs = cls;
1644 if (NULL == (set = cs->set))
1646 /* client without a set acknowledged receiving a value */
1648 GNUNET_SERVICE_client_drop (cs->client);
1651 if (NULL == set->iter)
1653 /* client sent an ack, but we were not expecting one (as
1654 set iteration has finished) */
1656 GNUNET_SERVICE_client_drop (cs->client);
1659 GNUNET_SERVICE_client_continue (cs->client);
1660 if (ntohl (ack->send_more))
1662 send_client_element (set);
1666 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1668 set->iteration_id++;
1674 * Handle a request from the client to copy a set.
1676 * @param cls the client
1677 * @param mh the message
1680 handle_client_copy_lazy_prepare (void *cls,
1681 const struct GNUNET_MessageHeader *mh)
1683 struct ClientState *cs = cls;
1685 struct LazyCopyRequest *cr;
1686 struct GNUNET_MQ_Envelope *ev;
1687 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1689 if (NULL == (set = cs->set))
1691 /* client without a set requested an operation */
1693 GNUNET_SERVICE_client_drop (cs->client);
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697 "Client requested creation of lazy copy\n");
1698 cr = GNUNET_new (struct LazyCopyRequest);
1699 cr->cookie = ++lazy_copy_cookie;
1700 cr->source_set = set;
1701 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1704 ev = GNUNET_MQ_msg (resp_msg,
1705 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1706 resp_msg->cookie = cr->cookie;
1707 GNUNET_MQ_send (set->cs->mq,
1709 GNUNET_SERVICE_client_continue (cs->client);
1714 * Handle a request from the client to connect to a copy of a set.
1716 * @param cls the client
1717 * @param msg the message
1720 handle_client_copy_lazy_connect (void *cls,
1721 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1723 struct ClientState *cs = cls;
1724 struct LazyCopyRequest *cr;
1728 if (NULL != cs->set)
1730 /* There can only be one set per client */
1732 GNUNET_SERVICE_client_drop (cs->client);
1736 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1738 if (cr->cookie == msg->cookie)
1744 if (GNUNET_NO == found)
1746 /* client asked for copy with cookie we don't know */
1748 GNUNET_SERVICE_client_drop (cs->client);
1751 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1754 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1755 "Client %p requested use of lazy copy\n",
1757 set = GNUNET_new (struct Set);
1758 switch (cr->source_set->operation)
1760 case GNUNET_SET_OPERATION_INTERSECTION:
1761 set->vt = _GSS_intersection_vt ();
1763 case GNUNET_SET_OPERATION_UNION:
1764 set->vt = _GSS_union_vt ();
1771 if (NULL == set->vt->copy_state)
1773 /* Lazy copy not supported for this set operation */
1777 GNUNET_SERVICE_client_drop (cs->client);
1781 set->operation = cr->source_set->operation;
1782 set->state = set->vt->copy_state (cr->source_set->state);
1783 set->content = cr->source_set->content;
1784 set->content->refcount++;
1786 set->current_generation = cr->source_set->current_generation;
1787 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1788 set->excluded_generations
1789 = GNUNET_memdup (cr->source_set->excluded_generations,
1790 set->excluded_generations_size * sizeof (struct GenerationRange));
1792 /* Advance the generation of the new set, so that mutations to the
1793 of the cloned set and the source set are independent. */
1794 advance_generation (set);
1798 GNUNET_SERVICE_client_continue (cs->client);
1803 * Handle a request from the client to cancel a running set operation.
1805 * @param cls the client
1806 * @param msg the message
1809 handle_client_cancel (void *cls,
1810 const struct GNUNET_SET_CancelMessage *msg)
1812 struct ClientState *cs = cls;
1814 struct Operation *op;
1817 if (NULL == (set = cs->set))
1819 /* client without a set requested an operation */
1821 GNUNET_SERVICE_client_drop (cs->client);
1825 for (op = set->ops_head; NULL != op; op = op->next)
1827 if (op->client_request_id == ntohl (msg->request_id))
1833 if (GNUNET_NO == found)
1835 /* It may happen that the operation was already destroyed due to
1836 * the other peer disconnecting. The client may not know about this
1837 * yet and try to cancel the (just barely non-existent) operation.
1838 * So this is not a hard error.
1840 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1841 "Client canceled non-existent op %u\n",
1842 (uint32_t) ntohl (msg->request_id));
1846 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847 "Client requested cancel for op %u\n",
1848 (uint32_t) ntohl (msg->request_id));
1849 _GSS_operation_destroy (op,
1852 GNUNET_SERVICE_client_continue (cs->client);
1857 * Handle a request from the client to accept a set operation that
1858 * came from a remote peer. We forward the accept to the associated
1859 * operation for handling
1861 * @param cls the client
1862 * @param msg the message
1865 handle_client_accept (void *cls,
1866 const struct GNUNET_SET_AcceptMessage *msg)
1868 struct ClientState *cs = cls;
1870 struct Operation *op;
1871 struct GNUNET_SET_ResultMessage *result_message;
1872 struct GNUNET_MQ_Envelope *ev;
1873 struct Listener *listener;
1875 if (NULL == (set = cs->set))
1877 /* client without a set requested to accept */
1879 GNUNET_SERVICE_client_drop (cs->client);
1882 op = get_incoming (ntohl (msg->accept_reject_id));
1885 /* It is not an error if the set op does not exist -- it may
1886 * have been destroyed when the partner peer disconnected. */
1887 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1888 "Client %p accepted request %u of listener %p that is no longer active\n",
1890 ntohl (msg->accept_reject_id),
1892 ev = GNUNET_MQ_msg (result_message,
1893 GNUNET_MESSAGE_TYPE_SET_RESULT);
1894 result_message->request_id = msg->request_id;
1895 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1896 GNUNET_MQ_send (set->cs->mq,
1898 GNUNET_SERVICE_client_continue (cs->client);
1901 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902 "Client accepting request %u\n",
1903 (uint32_t) ntohl (msg->accept_reject_id));
1904 listener = op->listener;
1905 op->listener = NULL;
1906 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1910 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1913 op->client_request_id = ntohl (msg->request_id);
1914 op->result_mode = ntohl (msg->result_mode);
1915 op->byzantine = msg->byzantine;
1916 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1917 op->force_full = msg->force_full;
1918 op->force_delta = msg->force_delta;
1920 /* Advance generation values, so that future mutations do not
1921 interfer with the running operation. */
1922 op->generation_created = set->current_generation;
1923 advance_generation (set);
1924 GNUNET_assert (NULL == op->state);
1925 op->state = set->vt->accept (op);
1926 if (NULL == op->state)
1929 GNUNET_SERVICE_client_drop (cs->client);
1932 /* Now allow CADET to continue, as we did not do this in
1933 #handle_incoming_msg (as we wanted to first see if the
1934 local client would accept the request). */
1935 GNUNET_CADET_receive_done (op->channel);
1936 GNUNET_SERVICE_client_continue (cs->client);
1941 * Called to clean up, after a shutdown has been requested.
1943 * @param cls closure, NULL
1946 shutdown_task (void *cls)
1948 /* Delay actual shutdown to allow service to disconnect clients */
1949 in_shutdown = GNUNET_YES;
1950 if (0 == num_clients)
1954 GNUNET_CADET_disconnect (cadet);
1958 GNUNET_STATISTICS_destroy (_GSS_statistics,
1960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1961 "handled shutdown request\n");
1966 * Function called by the service's run
1967 * method to run service-specific setup code.
1969 * @param cls closure
1970 * @param cfg configuration to use
1971 * @param service the initialized service
1975 const struct GNUNET_CONFIGURATION_Handle *cfg,
1976 struct GNUNET_SERVICE_Handle *service)
1978 /* FIXME: need to modify SERVICE (!) API to allow
1979 us to run a shutdown task *after* clients were
1980 forcefully disconnected! */
1981 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1983 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1985 cadet = GNUNET_CADET_connect (cfg);
1988 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1989 _("Could not connect to CADET service\n"));
1990 GNUNET_SCHEDULER_shutdown ();
1997 * Define "main" method using service macro.
2001 GNUNET_SERVICE_OPTION_NONE,
2004 &client_disconnect_cb,
2006 GNUNET_MQ_hd_fixed_size (client_accept,
2007 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2008 struct GNUNET_SET_AcceptMessage,
2010 GNUNET_MQ_hd_fixed_size (client_iter_ack,
2011 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2012 struct GNUNET_SET_IterAckMessage,
2014 GNUNET_MQ_hd_var_size (client_mutation,
2015 GNUNET_MESSAGE_TYPE_SET_ADD,
2016 struct GNUNET_SET_ElementMessage,
2018 GNUNET_MQ_hd_fixed_size (client_create_set,
2019 GNUNET_MESSAGE_TYPE_SET_CREATE,
2020 struct GNUNET_SET_CreateMessage,
2022 GNUNET_MQ_hd_fixed_size (client_iterate,
2023 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2024 struct GNUNET_MessageHeader,
2026 GNUNET_MQ_hd_var_size (client_evaluate,
2027 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2028 struct GNUNET_SET_EvaluateMessage,
2030 GNUNET_MQ_hd_fixed_size (client_listen,
2031 GNUNET_MESSAGE_TYPE_SET_LISTEN,
2032 struct GNUNET_SET_ListenMessage,
2034 GNUNET_MQ_hd_fixed_size (client_reject,
2035 GNUNET_MESSAGE_TYPE_SET_REJECT,
2036 struct GNUNET_SET_RejectMessage,
2038 GNUNET_MQ_hd_var_size (client_mutation,
2039 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2040 struct GNUNET_SET_ElementMessage,
2042 GNUNET_MQ_hd_fixed_size (client_cancel,
2043 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2044 struct GNUNET_SET_CancelMessage,
2046 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2047 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2048 struct GNUNET_MessageHeader,
2050 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2051 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2052 struct GNUNET_SET_CopyLazyConnectMessage,
2054 GNUNET_MQ_handler_end ());
2057 /* end of gnunet-service-set.c */