2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
40 * Lazy copy requests made by a client.
42 struct LazyCopyRequest
47 struct LazyCopyRequest *prev;
52 struct LazyCopyRequest *next;
55 * Which set are we supposed to copy?
57 struct Set *source_set;
60 * Cookie identifying the request.
68 * A listener is inhabited by a client, and waits for evaluation
69 * requests from remote peers.
74 * Listeners are held in a doubly linked list.
76 struct Listener *next;
79 * Listeners are held in a doubly linked list.
81 struct Listener *prev;
84 * Head of DLL of operations this listener is responsible for.
85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
88 struct Operation *op_head;
91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
95 struct Operation *op_tail;
98 * Client that owns the listener.
99 * Only one client may own a listener.
101 struct ClientState *cs;
104 * The port we are listening on with CADET.
106 struct GNUNET_CADET_Port *open_port;
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
112 struct GNUNET_HashCode app_id;
115 * The type of the operation.
117 enum GNUNET_SET_OperationType operation;
122 * Handle to the cadet service, used to listen for and connect to
125 static struct GNUNET_CADET_Handle *cadet;
128 * DLL of lazy copy requests by this client.
130 static struct LazyCopyRequest *lazy_copy_head;
133 * DLL of lazy copy requests by this client.
135 static struct LazyCopyRequest *lazy_copy_tail;
138 * Generator for unique cookie we set per lazy copy request.
140 static uint32_t lazy_copy_cookie;
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
148 * Listeners are held in a doubly linked list.
150 static struct Listener *listener_head;
153 * Listeners are held in a doubly linked list.
155 static struct Listener *listener_tail;
158 * Number of active clients.
160 static unsigned int num_clients;
163 * Are we in shutdown? if #GNUNET_YES and the number of clients
164 * drops to zero, disconnect from CADET.
166 static int in_shutdown;
169 * Counter for allocating unique IDs for clients, used to identify
170 * incoming operation requests from remote peers, that the client can
171 * choose to accept or refuse. 0 must not be used (reserved for
174 static uint32_t suggest_id;
178 * Get the incoming socket associated with the given id.
180 * @param listener the listener to look in
181 * @param id id to look for
182 * @return the incoming socket associated with the id,
183 * or NULL if there is none
185 static struct Operation *
186 get_incoming (uint32_t id)
188 for (struct Listener *listener = listener_head;
190 listener = listener->next)
192 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
193 if (op->suggest_id == id)
201 * Destroy an incoming request from a remote peer
203 * @param op remote request to destroy
206 incoming_destroy (struct Operation *op)
208 struct Listener *listener;
209 struct GNUNET_CADET_Channel *channel;
211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
212 "Destroying incoming operation %p\n",
214 if (NULL != (listener = op->listener))
216 GNUNET_CONTAINER_DLL_remove (listener->op_head,
221 if (NULL != op->timeout_task)
223 GNUNET_SCHEDULER_cancel (op->timeout_task);
224 op->timeout_task = NULL;
226 if (NULL != (channel = op->channel))
229 GNUNET_CADET_channel_destroy (channel);
235 * Context for the #garbage_collect_cb().
237 struct GarbageContext
241 * Map for which we are garbage collecting removed elements.
243 struct GNUNET_CONTAINER_MultiHashMap *map;
246 * Lowest generation for which an operation is still pending.
248 unsigned int min_op_generation;
251 * Largest generation for which an operation is still pending.
253 unsigned int max_op_generation;
259 * Function invoked to check if an element can be removed from
260 * the set's history because it is no longer needed.
262 * @param cls the `struct GarbageContext *`
263 * @param key key of the element in the map
264 * @param value the `struct ElementEntry *`
265 * @return #GNUNET_OK (continue to iterate)
268 garbage_collect_cb (void *cls,
269 const struct GNUNET_HashCode *key,
272 //struct GarbageContext *gc = cls;
273 //struct ElementEntry *ee = value;
275 //if (GNUNET_YES != ee->removed)
277 //if ( (gc->max_op_generation < ee->generation_added) ||
278 // (ee->generation_removed > gc->min_op_generation) )
280 // GNUNET_assert (GNUNET_YES ==
281 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
291 * Collect and destroy elements that are not needed anymore, because
292 * their lifetime (as determined by their generation) does not overlap
293 * with any active set operation.
295 * @param set set to garbage collect
298 collect_generation_garbage (struct Set *set)
300 struct GarbageContext gc;
302 gc.min_op_generation = UINT_MAX;
303 gc.max_op_generation = 0;
304 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
306 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
307 op->generation_created);
308 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
309 op->generation_created);
311 gc.map = set->content->elements;
312 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
319 * Is @a generation in the range of exclusions?
321 * @param generation generation to query
322 * @param excluded array of generations where the element is excluded
323 * @param excluded_size length of the @a excluded array
324 * @return #GNUNET_YES if @a generation is in any of the ranges
327 is_excluded_generation (unsigned int generation,
328 struct GenerationRange *excluded,
329 unsigned int excluded_size)
331 for (unsigned int i = 0; i < excluded_size; i++)
332 if ( (generation >= excluded[i].start) &&
333 (generation < excluded[i].end) )
340 * Is element @a ee part of the set during @a query_generation?
342 * @param ee element to test
343 * @param query_generation generation to query
344 * @param excluded array of generations where the element is excluded
345 * @param excluded_size length of the @a excluded array
346 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
349 is_element_of_generation (struct ElementEntry *ee,
350 unsigned int query_generation,
351 struct GenerationRange *excluded,
352 unsigned int excluded_size)
354 struct MutationEvent *mut;
357 GNUNET_assert (NULL != ee->mutations);
359 is_excluded_generation (query_generation,
367 is_present = GNUNET_NO;
369 /* Could be made faster with binary search, but lists
370 are small, so why bother. */
371 for (unsigned int i = 0; i < ee->mutations_size; i++)
373 mut = &ee->mutations[i];
375 if (mut->generation > query_generation)
377 /* The mutation doesn't apply to our generation
378 anymore. We can'b break here, since mutations aren't
379 sorted by generation. */
384 is_excluded_generation (mut->generation,
388 /* The generation is excluded (because it belongs to another
389 fork via a lazy copy) and thus mutations aren't considered
390 for membership testing. */
394 /* This would be an inconsistency in how we manage mutations. */
395 if ( (GNUNET_YES == is_present) &&
396 (GNUNET_YES == mut->added) )
399 if ( (GNUNET_NO == is_present) &&
400 (GNUNET_NO == mut->added) )
403 is_present = mut->added;
411 * Is element @a ee part of the set used by @a op?
413 * @param ee element to test
414 * @param op operation the defines the set and its generation
415 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
418 _GSS_is_element_of_operation (struct ElementEntry *ee,
419 struct Operation *op)
421 return is_element_of_generation (ee,
422 op->generation_created,
423 op->set->excluded_generations,
424 op->set->excluded_generations_size);
429 * Destroy the given operation. Used for any operation where both
430 * peers were known and that thus actually had a vt and channel. Must
431 * not be used for operations where 'listener' is still set and we do
432 * not know the other peer.
434 * Call the implementation-specific cancel function of the operation.
435 * Disconnects from the remote peer. Does not disconnect the client,
436 * as there may be multiple operations per set.
438 * @param op operation to destroy
439 * @param gc #GNUNET_YES to perform garbage collection on the set
442 _GSS_operation_destroy (struct Operation *op,
445 struct Set *set = op->set;
446 struct GNUNET_CADET_Channel *channel;
448 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449 "Destroying operation %p\n",
451 GNUNET_assert (NULL == op->listener);
452 if (NULL != op->state)
454 set->vt->cancel (op);
459 GNUNET_CONTAINER_DLL_remove (set->ops_head,
464 if (NULL != op->context_msg)
466 GNUNET_free (op->context_msg);
467 op->context_msg = NULL;
469 if (NULL != (channel = op->channel))
471 /* This will free op; called conditionally as this helper function
472 is also called from within the channel disconnect handler. */
474 GNUNET_CADET_channel_destroy (channel);
476 if ( (NULL != set) &&
478 collect_generation_garbage (set);
479 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
480 * there was a channel end handler that will free 'op' on the call stack. */
485 * Callback called when a client connects to the service.
487 * @param cls closure for the service
488 * @param c the new client that connected to the service
489 * @param mq the message queue used to send messages to the client
490 * @return @a `struct ClientState`
493 client_connect_cb (void *cls,
494 struct GNUNET_SERVICE_Client *c,
495 struct GNUNET_MQ_Handle *mq)
497 struct ClientState *cs;
500 cs = GNUNET_new (struct ClientState);
508 * Iterator over hash map entries to free element entries.
511 * @param key current key code
512 * @param value a `struct ElementEntry *` to be free'd
513 * @return #GNUNET_YES (continue to iterate)
516 destroy_elements_iterator (void *cls,
517 const struct GNUNET_HashCode *key,
520 struct ElementEntry *ee = value;
522 GNUNET_free_non_null (ee->mutations);
529 * Clean up after a client has disconnected
531 * @param cls closure, unused
532 * @param client the client to clean up after
533 * @param internal_cls the `struct ClientState`
536 client_disconnect_cb (void *cls,
537 struct GNUNET_SERVICE_Client *client,
540 struct ClientState *cs = internal_cls;
541 struct Operation *op;
542 struct Listener *listener;
545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546 "Client disconnected, cleaning up\n");
547 if (NULL != (set = cs->set))
549 struct SetContent *content = set->content;
550 struct PendingMutation *pm;
551 struct PendingMutation *pm_current;
552 struct LazyCopyRequest *lcr;
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "Destroying client's set\n");
556 /* Destroy pending set operations */
557 while (NULL != set->ops_head)
558 _GSS_operation_destroy (set->ops_head,
561 /* Destroy operation-specific state */
562 GNUNET_assert (NULL != set->state);
563 set->vt->destroy_set (set->state);
566 /* Clean up ongoing iterations */
567 if (NULL != set->iter)
569 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
574 /* discard any pending mutations that reference this set */
575 pm = content->pending_mutations_head;
580 if (pm_current->set == set)
582 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
583 content->pending_mutations_tail,
585 GNUNET_free (pm_current);
589 /* free set content (or at least decrement RC) */
591 GNUNET_assert (0 != content->refcount);
593 if (0 == content->refcount)
595 GNUNET_assert (NULL != content->elements);
596 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
597 &destroy_elements_iterator,
599 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
600 content->elements = NULL;
601 GNUNET_free (content);
603 GNUNET_free_non_null (set->excluded_generations);
604 set->excluded_generations = NULL;
606 /* remove set from pending copy requests */
607 lcr = lazy_copy_head;
610 struct LazyCopyRequest *lcr_current = lcr;
613 if (lcr_current->source_set == set)
615 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
618 GNUNET_free (lcr_current);
624 if (NULL != (listener = cs->listener))
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627 "Destroying client's listener\n");
628 GNUNET_CADET_close_port (listener->open_port);
629 listener->open_port = NULL;
630 while (NULL != (op = listener->op_head))
631 incoming_destroy (op);
632 GNUNET_CONTAINER_DLL_remove (listener_head,
635 GNUNET_free (listener);
639 if ( (GNUNET_YES == in_shutdown) &&
644 GNUNET_CADET_disconnect (cadet);
652 * Check a request for a set operation from another peer.
654 * @param cls the operation state
655 * @param msg the received message
656 * @return #GNUNET_OK if the channel should be kept alive,
657 * #GNUNET_SYSERR to destroy the channel
660 check_incoming_msg (void *cls,
661 const struct OperationRequestMessage *msg)
663 struct Operation *op = cls;
664 struct Listener *listener = op->listener;
665 const struct GNUNET_MessageHeader *nested_context;
667 /* double operation request */
668 if (0 != op->suggest_id)
671 return GNUNET_SYSERR;
673 /* This should be equivalent to the previous condition, but can't hurt to check twice */
674 if (NULL == op->listener)
677 return GNUNET_SYSERR;
679 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
682 return GNUNET_SYSERR;
684 nested_context = GNUNET_MQ_extract_nested_mh (msg);
685 if ( (NULL != nested_context) &&
686 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
689 return GNUNET_SYSERR;
696 * Handle a request for a set operation from another peer. Checks if we
697 * have a listener waiting for such a request (and in that case initiates
698 * asking the listener about accepting the connection). If no listener
699 * is waiting, we queue the operation request in hope that a listener
700 * shows up soon (before timeout).
702 * This msg is expected as the first and only msg handled through the
703 * non-operation bound virtual table, acceptance of this operation replaces
704 * our virtual table and subsequent msgs would be routed differently (as
705 * we then know what type of operation this is).
707 * @param cls the operation state
708 * @param msg the received message
709 * @return #GNUNET_OK if the channel should be kept alive,
710 * #GNUNET_SYSERR to destroy the channel
713 handle_incoming_msg (void *cls,
714 const struct OperationRequestMessage *msg)
716 struct Operation *op = cls;
717 struct Listener *listener = op->listener;
718 const struct GNUNET_MessageHeader *nested_context;
719 struct GNUNET_MQ_Envelope *env;
720 struct GNUNET_SET_RequestMessage *cmsg;
722 nested_context = GNUNET_MQ_extract_nested_mh (msg);
723 /* Make a copy of the nested_context (application-specific context
724 information that is opaque to set) so we can pass it to the
726 if (NULL != nested_context)
727 op->context_msg = GNUNET_copy_message (nested_context);
728 op->remote_element_count = ntohl (msg->element_count);
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Received P2P operation request (op %u, port %s) for active listener\n",
731 (uint32_t) ntohl (msg->operation),
732 GNUNET_h2s (&op->listener->app_id));
733 GNUNET_assert (0 == op->suggest_id);
736 op->suggest_id = suggest_id++;
737 GNUNET_assert (NULL != op->timeout_task);
738 GNUNET_SCHEDULER_cancel (op->timeout_task);
739 op->timeout_task = NULL;
740 env = GNUNET_MQ_msg_nested_mh (cmsg,
741 GNUNET_MESSAGE_TYPE_SET_REQUEST,
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
748 cmsg->accept_id = htonl (op->suggest_id);
749 cmsg->peer_id = op->peer;
750 GNUNET_MQ_send (listener->cs->mq,
752 /* NOTE: GNUNET_CADET_receive_done() will be called in
753 #handle_client_accept() */
758 * Add an element to @a set as specified by @a msg
760 * @param set set to manipulate
761 * @param msg message specifying the change
764 execute_add (struct Set *set,
765 const struct GNUNET_SET_ElementMessage *msg)
767 struct GNUNET_SET_Element el;
768 struct ElementEntry *ee;
769 struct GNUNET_HashCode hash;
771 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
772 el.size = ntohs (msg->header.size) - sizeof (*msg);
774 el.element_type = ntohs (msg->element_type);
775 GNUNET_SET_element_hash (&el,
777 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "Client inserts element %s of size %u\n",
785 ee = GNUNET_malloc (el.size + sizeof (*ee));
786 ee->element.size = el.size;
787 GNUNET_memcpy (&ee[1],
790 ee->element.data = &ee[1];
791 ee->element.element_type = el.element_type;
792 ee->remote = GNUNET_NO;
793 ee->mutations = NULL;
794 ee->mutations_size = 0;
795 ee->element_hash = hash;
796 GNUNET_break (GNUNET_YES ==
797 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
800 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
802 else if (GNUNET_YES ==
803 is_element_of_generation (ee,
804 set->current_generation,
805 set->excluded_generations,
806 set->excluded_generations_size))
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809 "Client inserted element %s of size %u twice (ignored)\n",
813 /* same element inserted twice */
818 struct MutationEvent mut = {
819 .generation = set->current_generation,
822 GNUNET_array_append (ee->mutations,
826 set->vt->add (set->state,
832 * Remove an element from @a set as specified by @a msg
834 * @param set set to manipulate
835 * @param msg message specifying the change
838 execute_remove (struct Set *set,
839 const struct GNUNET_SET_ElementMessage *msg)
841 struct GNUNET_SET_Element el;
842 struct ElementEntry *ee;
843 struct GNUNET_HashCode hash;
845 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
846 el.size = ntohs (msg->header.size) - sizeof (*msg);
848 el.element_type = ntohs (msg->element_type);
849 GNUNET_SET_element_hash (&el, &hash);
850 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
854 /* Client tried to remove non-existing element. */
855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856 "Client removes non-existing element of size %u\n",
861 is_element_of_generation (ee,
862 set->current_generation,
863 set->excluded_generations,
864 set->excluded_generations_size))
866 /* Client tried to remove element twice */
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868 "Client removed element of size %u twice (ignored)\n",
874 struct MutationEvent mut = {
875 .generation = set->current_generation,
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Client removes element of size %u\n",
883 GNUNET_array_append (ee->mutations,
887 set->vt->remove (set->state,
893 * Perform a mutation on a set as specified by the @a msg
895 * @param set the set to mutate
896 * @param msg specification of what to change
899 execute_mutation (struct Set *set,
900 const struct GNUNET_SET_ElementMessage *msg)
902 switch (ntohs (msg->header.type))
904 case GNUNET_MESSAGE_TYPE_SET_ADD:
905 execute_add (set, msg);
907 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
908 execute_remove (set, msg);
917 * Execute mutations that were delayed on a set because of
918 * pending operations.
920 * @param set the set to execute mutations on
923 execute_delayed_mutations (struct Set *set)
925 struct PendingMutation *pm;
927 if (0 != set->content->iterator_count)
928 return; /* still cannot do this */
929 while (NULL != (pm = set->content->pending_mutations_head))
931 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
932 set->content->pending_mutations_tail,
934 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935 "Executing pending mutation on %p.\n",
937 execute_mutation (pm->set,
939 GNUNET_free (pm->msg);
946 * Send the next element of a set to the set's client. The next element is given by
947 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
948 * are no more elements in the set. The caller must ensure that the set's iterator is
951 * The client will acknowledge each received element with a
952 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
953 * #handle_client_iter_ack() will then trigger the next transmission.
954 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
956 * @param set set that should send its next element to its client
959 send_client_element (struct Set *set)
962 struct ElementEntry *ee;
963 struct GNUNET_MQ_Envelope *ev;
964 struct GNUNET_SET_IterResponseMessage *msg;
966 GNUNET_assert (NULL != set->iter);
968 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
970 (const void **) &ee);
971 if (GNUNET_NO == ret)
973 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974 "Iteration on %p done.\n",
976 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
977 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
980 GNUNET_assert (set->content->iterator_count > 0);
981 set->content->iterator_count--;
982 execute_delayed_mutations (set);
983 GNUNET_MQ_send (set->cs->mq,
987 GNUNET_assert (NULL != ee);
988 } while (GNUNET_NO ==
989 is_element_of_generation (ee,
990 set->iter_generation,
991 set->excluded_generations,
992 set->excluded_generations_size));
993 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994 "Sending iteration element on %p.\n",
996 ev = GNUNET_MQ_msg_extra (msg,
998 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
999 GNUNET_memcpy (&msg[1],
1002 msg->element_type = htons (ee->element.element_type);
1003 msg->iteration_id = htons (set->iteration_id);
1004 GNUNET_MQ_send (set->cs->mq,
1010 * Called when a client wants to iterate the elements of a set.
1011 * Checks if we have a set associated with the client and if we
1012 * can right now start an iteration. If all checks out, starts
1013 * sending the elements of the set to the client.
1015 * @param cls client that sent the message
1016 * @param m message sent by the client
1019 handle_client_iterate (void *cls,
1020 const struct GNUNET_MessageHeader *m)
1022 struct ClientState *cs = cls;
1025 if (NULL == (set = cs->set))
1027 /* attempt to iterate over a non existing set */
1029 GNUNET_SERVICE_client_drop (cs->client);
1032 if (NULL != set->iter)
1034 /* Only one concurrent iterate-action allowed per set */
1036 GNUNET_SERVICE_client_drop (cs->client);
1039 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040 "Iterating set %p in gen %u with %u content elements\n",
1042 set->current_generation,
1043 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1044 GNUNET_SERVICE_client_continue (cs->client);
1045 set->content->iterator_count++;
1046 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1047 set->iter_generation = set->current_generation;
1048 send_client_element (set);
1053 * Called when a client wants to create a new set. This is typically
1054 * the first request from a client, and includes the type of set
1055 * operation to be performed.
1057 * @param cls client that sent the message
1058 * @param m message sent by the client
1061 handle_client_create_set (void *cls,
1062 const struct GNUNET_SET_CreateMessage *msg)
1064 struct ClientState *cs = cls;
1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068 "Client created new set (operation %u)\n",
1069 (uint32_t) ntohl (msg->operation));
1070 if (NULL != cs->set)
1072 /* There can only be one set per client */
1074 GNUNET_SERVICE_client_drop (cs->client);
1077 set = GNUNET_new (struct Set);
1078 switch (ntohl (msg->operation))
1080 case GNUNET_SET_OPERATION_INTERSECTION:
1081 set->vt = _GSS_intersection_vt ();
1083 case GNUNET_SET_OPERATION_UNION:
1084 set->vt = _GSS_union_vt ();
1089 GNUNET_SERVICE_client_drop (cs->client);
1092 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1093 set->state = set->vt->create ();
1094 if (NULL == set->state)
1096 /* initialization failed (i.e. out of memory) */
1098 GNUNET_SERVICE_client_drop (cs->client);
1101 set->content = GNUNET_new (struct SetContent);
1102 set->content->refcount = 1;
1103 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1107 GNUNET_SERVICE_client_continue (cs->client);
1112 * Timeout happens iff:
1113 * - we suggested an operation to our listener,
1114 * but did not receive a response in time
1115 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1117 * @param cls channel context
1118 * @param tc context information (why was this task triggered now)
1121 incoming_timeout_cb (void *cls)
1123 struct Operation *op = cls;
1125 op->timeout_task = NULL;
1126 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127 "Remote peer's incoming request timed out\n");
1128 incoming_destroy (op);
1133 * Method called whenever another peer has added us to a channel the
1134 * other peer initiated. Only called (once) upon reception of data
1135 * from a channel we listen on.
1137 * The channel context represents the operation itself and gets added
1138 * to a DLL, from where it gets looked up when our local listener
1139 * client responds to a proposed/suggested operation or connects and
1140 * associates with this operation.
1142 * @param cls closure
1143 * @param channel new handle to the channel
1144 * @param source peer that started the channel
1145 * @return initial channel context for the channel
1146 * returns NULL on error
1149 channel_new_cb (void *cls,
1150 struct GNUNET_CADET_Channel *channel,
1151 const struct GNUNET_PeerIdentity *source)
1153 struct Listener *listener = cls;
1154 struct Operation *op;
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "New incoming channel\n");
1158 op = GNUNET_new (struct Operation);
1159 op->listener = listener;
1161 op->channel = channel;
1162 op->mq = GNUNET_CADET_get_mq (op->channel);
1163 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1166 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1167 &incoming_timeout_cb,
1169 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1177 * Function called whenever a channel is destroyed. Should clean up
1178 * any associated state. It must NOT call
1179 * GNUNET_CADET_channel_destroy() on the channel.
1181 * The peer_disconnect function is part of a a virtual table set initially either
1182 * when a peer creates a new channel with us, or once we create
1183 * a new channel ourselves (evaluate).
1185 * Once we know the exact type of operation (union/intersection), the vt is
1186 * replaced with an operation specific instance (_GSS_[op]_vt).
1188 * @param channel_ctx place where local state associated
1189 * with the channel is stored
1190 * @param channel connection to the other end (henceforth invalid)
1193 channel_end_cb (void *channel_ctx,
1194 const struct GNUNET_CADET_Channel *channel)
1196 struct Operation *op = channel_ctx;
1198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199 "channel_end_cb called\n");
1201 if (NULL != op->listener)
1202 incoming_destroy (op);
1203 else if (NULL != op->set)
1204 op->set->vt->channel_death (op);
1206 _GSS_operation_destroy (op,
1213 * Function called whenever an MQ-channel's transmission window size changes.
1215 * The first callback in an outgoing channel will be with a non-zero value
1216 * and will mean the channel is connected to the destination.
1218 * For an incoming channel it will be called immediately after the
1219 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1221 * @param cls Channel closure.
1222 * @param channel Connection to the other end (henceforth invalid).
1223 * @param window_size New window size. If the is more messages than buffer size
1224 * this value will be negative..
1227 channel_window_cb (void *cls,
1228 const struct GNUNET_CADET_Channel *channel,
1231 /* FIXME: not implemented, we could do flow control here... */
1236 * Called when a client wants to create a new listener.
1238 * @param cls client that sent the message
1239 * @param msg message sent by the client
1242 handle_client_listen (void *cls,
1243 const struct GNUNET_SET_ListenMessage *msg)
1245 struct ClientState *cs = cls;
1246 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1247 GNUNET_MQ_hd_var_size (incoming_msg,
1248 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1249 struct OperationRequestMessage,
1251 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1252 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1255 GNUNET_MQ_hd_var_size (union_p2p_elements,
1256 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1257 struct GNUNET_SET_ElementMessage,
1259 GNUNET_MQ_hd_var_size (union_p2p_offer,
1260 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1261 struct GNUNET_MessageHeader,
1263 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1264 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1265 struct InquiryMessage,
1267 GNUNET_MQ_hd_var_size (union_p2p_demand,
1268 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1269 struct GNUNET_MessageHeader,
1271 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1272 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1273 struct GNUNET_MessageHeader,
1275 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1276 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1277 struct GNUNET_MessageHeader,
1279 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1280 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1281 struct GNUNET_MessageHeader,
1283 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1284 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1285 struct GNUNET_MessageHeader,
1287 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1288 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1289 struct StrataEstimatorMessage,
1291 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1292 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1293 struct StrataEstimatorMessage,
1295 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1296 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1297 struct GNUNET_SET_ElementMessage,
1299 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1300 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1301 struct IntersectionElementInfoMessage,
1303 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1304 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1307 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1308 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1309 struct IntersectionDoneMessage,
1311 GNUNET_MQ_handler_end ()
1313 struct Listener *listener;
1315 if (NULL != cs->listener)
1317 /* max. one active listener per client! */
1319 GNUNET_SERVICE_client_drop (cs->client);
1322 listener = GNUNET_new (struct Listener);
1324 cs->listener = listener;
1325 listener->app_id = msg->app_id;
1326 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1327 GNUNET_CONTAINER_DLL_insert (listener_head,
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "New listener created (op %u, port %s)\n",
1332 listener->operation,
1333 GNUNET_h2s (&listener->app_id));
1335 = GNUNET_CADET_open_port (cadet,
1342 GNUNET_SERVICE_client_continue (cs->client);
1347 * Called when the listening client rejects an operation
1348 * request by another peer.
1350 * @param cls client that sent the message
1351 * @param msg message sent by the client
1354 handle_client_reject (void *cls,
1355 const struct GNUNET_SET_RejectMessage *msg)
1357 struct ClientState *cs = cls;
1358 struct Operation *op;
1360 op = get_incoming (ntohl (msg->accept_reject_id));
1363 /* no matching incoming operation for this reject;
1364 could be that the other peer already disconnected... */
1365 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1366 "Client rejected unknown operation %u\n",
1367 (unsigned int) ntohl (msg->accept_reject_id));
1368 GNUNET_SERVICE_client_continue (cs->client);
1371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372 "Peer request (op %u, app %s) rejected by client\n",
1373 op->listener->operation,
1374 GNUNET_h2s (&cs->listener->app_id));
1375 GNUNET_CADET_channel_destroy (op->channel);
1376 GNUNET_SERVICE_client_continue (cs->client);
1381 * Called when a client wants to add or remove an element to a set it inhabits.
1383 * @param cls client that sent the message
1384 * @param msg message sent by the client
1387 check_client_mutation (void *cls,
1388 const struct GNUNET_SET_ElementMessage *msg)
1390 /* NOTE: Technically, we should probably check with the
1391 block library whether the element we are given is well-formed */
1397 * Called when a client wants to add or remove an element to a set it inhabits.
1399 * @param cls client that sent the message
1400 * @param msg message sent by the client
1403 handle_client_mutation (void *cls,
1404 const struct GNUNET_SET_ElementMessage *msg)
1406 struct ClientState *cs = cls;
1409 if (NULL == (set = cs->set))
1411 /* client without a set requested an operation */
1413 GNUNET_SERVICE_client_drop (cs->client);
1416 GNUNET_SERVICE_client_continue (cs->client);
1418 if (0 != set->content->iterator_count)
1420 struct PendingMutation *pm;
1422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423 "Scheduling mutation on set\n");
1424 pm = GNUNET_new (struct PendingMutation);
1425 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1427 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1428 set->content->pending_mutations_tail,
1432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433 "Executing mutation on set\n");
1434 execute_mutation (set,
1440 * Advance the current generation of a set,
1441 * adding exclusion ranges if necessary.
1443 * @param set the set where we want to advance the generation
1446 advance_generation (struct Set *set)
1448 struct GenerationRange r;
1450 if (set->current_generation == set->content->latest_generation)
1452 set->content->latest_generation++;
1453 set->current_generation++;
1457 GNUNET_assert (set->current_generation < set->content->latest_generation);
1459 r.start = set->current_generation + 1;
1460 r.end = set->content->latest_generation + 1;
1461 set->content->latest_generation = r.end;
1462 set->current_generation = r.end;
1463 GNUNET_array_append (set->excluded_generations,
1464 set->excluded_generations_size,
1470 * Called when a client wants to initiate a set operation with another
1471 * peer. Initiates the CADET connection to the listener and sends the
1474 * @param cls client that sent the message
1475 * @param msg message sent by the client
1476 * @return #GNUNET_OK if the message is well-formed
1479 check_client_evaluate (void *cls,
1480 const struct GNUNET_SET_EvaluateMessage *msg)
1482 /* FIXME: suboptimal, even if the context below could be NULL,
1483 there are malformed messages this does not check for... */
1489 * Called when a client wants to initiate a set operation with another
1490 * peer. Initiates the CADET connection to the listener and sends the
1493 * @param cls client that sent the message
1494 * @param msg message sent by the client
1497 handle_client_evaluate (void *cls,
1498 const struct GNUNET_SET_EvaluateMessage *msg)
1500 struct ClientState *cs = cls;
1501 struct Operation *op = GNUNET_new (struct Operation);
1502 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1503 GNUNET_MQ_hd_var_size (incoming_msg,
1504 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1505 struct OperationRequestMessage,
1507 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1508 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1511 GNUNET_MQ_hd_var_size (union_p2p_elements,
1512 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1513 struct GNUNET_SET_ElementMessage,
1515 GNUNET_MQ_hd_var_size (union_p2p_offer,
1516 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1517 struct GNUNET_MessageHeader,
1519 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1520 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1521 struct InquiryMessage,
1523 GNUNET_MQ_hd_var_size (union_p2p_demand,
1524 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1525 struct GNUNET_MessageHeader,
1527 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1528 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1529 struct GNUNET_MessageHeader,
1531 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1532 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1533 struct GNUNET_MessageHeader,
1535 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1536 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1537 struct GNUNET_MessageHeader,
1539 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1540 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1541 struct GNUNET_MessageHeader,
1543 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1544 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1545 struct StrataEstimatorMessage,
1547 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1548 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1549 struct StrataEstimatorMessage,
1551 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1552 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1553 struct GNUNET_SET_ElementMessage,
1555 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1556 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1557 struct IntersectionElementInfoMessage,
1559 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1560 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1563 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1564 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1565 struct IntersectionDoneMessage,
1567 GNUNET_MQ_handler_end ()
1570 const struct GNUNET_MessageHeader *context;
1572 if (NULL == (set = cs->set))
1576 GNUNET_SERVICE_client_drop (cs->client);
1579 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1581 op->peer = msg->target_peer;
1582 op->result_mode = ntohl (msg->result_mode);
1583 op->client_request_id = ntohl (msg->request_id);
1584 op->byzantine = msg->byzantine;
1585 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1586 op->force_full = msg->force_full;
1587 op->force_delta = msg->force_delta;
1588 context = GNUNET_MQ_extract_nested_mh (msg);
1590 /* Advance generation values, so that
1591 mutations won't interfer with the running operation. */
1593 op->generation_created = set->current_generation;
1594 advance_generation (set);
1595 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599 "Creating new CADET channel to port %s for set operation type %u\n",
1600 GNUNET_h2s (&msg->app_id),
1602 op->channel = GNUNET_CADET_channel_create (cadet,
1606 GNUNET_CADET_OPTION_RELIABLE,
1610 op->mq = GNUNET_CADET_get_mq (op->channel);
1611 op->state = set->vt->evaluate (op,
1613 if (NULL == op->state)
1616 GNUNET_SERVICE_client_drop (cs->client);
1619 GNUNET_SERVICE_client_continue (cs->client);
1624 * Handle an ack from a client, and send the next element. Note
1625 * that we only expect acks for set elements, not after the
1626 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1628 * @param cls client the client
1629 * @param ack the message
1632 handle_client_iter_ack (void *cls,
1633 const struct GNUNET_SET_IterAckMessage *ack)
1635 struct ClientState *cs = cls;
1638 if (NULL == (set = cs->set))
1640 /* client without a set acknowledged receiving a value */
1642 GNUNET_SERVICE_client_drop (cs->client);
1645 if (NULL == set->iter)
1647 /* client sent an ack, but we were not expecting one (as
1648 set iteration has finished) */
1650 GNUNET_SERVICE_client_drop (cs->client);
1653 GNUNET_SERVICE_client_continue (cs->client);
1654 if (ntohl (ack->send_more))
1656 send_client_element (set);
1660 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1662 set->iteration_id++;
1668 * Handle a request from the client to copy a set.
1670 * @param cls the client
1671 * @param mh the message
1674 handle_client_copy_lazy_prepare (void *cls,
1675 const struct GNUNET_MessageHeader *mh)
1677 struct ClientState *cs = cls;
1679 struct LazyCopyRequest *cr;
1680 struct GNUNET_MQ_Envelope *ev;
1681 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1683 if (NULL == (set = cs->set))
1685 /* client without a set requested an operation */
1687 GNUNET_SERVICE_client_drop (cs->client);
1690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1691 "Client requested creation of lazy copy\n");
1692 cr = GNUNET_new (struct LazyCopyRequest);
1693 cr->cookie = ++lazy_copy_cookie;
1694 cr->source_set = set;
1695 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1698 ev = GNUNET_MQ_msg (resp_msg,
1699 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1700 resp_msg->cookie = cr->cookie;
1701 GNUNET_MQ_send (set->cs->mq,
1703 GNUNET_SERVICE_client_continue (cs->client);
1708 * Handle a request from the client to connect to a copy of a set.
1710 * @param cls the client
1711 * @param msg the message
1714 handle_client_copy_lazy_connect (void *cls,
1715 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1717 struct ClientState *cs = cls;
1718 struct LazyCopyRequest *cr;
1722 if (NULL != cs->set)
1724 /* There can only be one set per client */
1726 GNUNET_SERVICE_client_drop (cs->client);
1730 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1732 if (cr->cookie == msg->cookie)
1738 if (GNUNET_NO == found)
1740 /* client asked for copy with cookie we don't know */
1742 GNUNET_SERVICE_client_drop (cs->client);
1745 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1748 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1749 "Client %p requested use of lazy copy\n",
1751 set = GNUNET_new (struct Set);
1752 switch (cr->source_set->operation)
1754 case GNUNET_SET_OPERATION_INTERSECTION:
1755 set->vt = _GSS_intersection_vt ();
1757 case GNUNET_SET_OPERATION_UNION:
1758 set->vt = _GSS_union_vt ();
1765 if (NULL == set->vt->copy_state)
1767 /* Lazy copy not supported for this set operation */
1771 GNUNET_SERVICE_client_drop (cs->client);
1775 set->operation = cr->source_set->operation;
1776 set->state = set->vt->copy_state (cr->source_set->state);
1777 set->content = cr->source_set->content;
1778 set->content->refcount++;
1780 set->current_generation = cr->source_set->current_generation;
1781 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1782 set->excluded_generations
1783 = GNUNET_memdup (cr->source_set->excluded_generations,
1784 set->excluded_generations_size * sizeof (struct GenerationRange));
1786 /* Advance the generation of the new set, so that mutations to the
1787 of the cloned set and the source set are independent. */
1788 advance_generation (set);
1792 GNUNET_SERVICE_client_continue (cs->client);
1797 * Handle a request from the client to cancel a running set operation.
1799 * @param cls the client
1800 * @param msg the message
1803 handle_client_cancel (void *cls,
1804 const struct GNUNET_SET_CancelMessage *msg)
1806 struct ClientState *cs = cls;
1808 struct Operation *op;
1811 if (NULL == (set = cs->set))
1813 /* client without a set requested an operation */
1815 GNUNET_SERVICE_client_drop (cs->client);
1819 for (op = set->ops_head; NULL != op; op = op->next)
1821 if (op->client_request_id == ntohl (msg->request_id))
1827 if (GNUNET_NO == found)
1829 /* It may happen that the operation was already destroyed due to
1830 * the other peer disconnecting. The client may not know about this
1831 * yet and try to cancel the (just barely non-existent) operation.
1832 * So this is not a hard error.
1834 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1835 "Client canceled non-existent op %u\n",
1836 (uint32_t) ntohl (msg->request_id));
1840 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841 "Client requested cancel for op %u\n",
1842 (uint32_t) ntohl (msg->request_id));
1843 _GSS_operation_destroy (op,
1846 GNUNET_SERVICE_client_continue (cs->client);
1851 * Handle a request from the client to accept a set operation that
1852 * came from a remote peer. We forward the accept to the associated
1853 * operation for handling
1855 * @param cls the client
1856 * @param msg the message
1859 handle_client_accept (void *cls,
1860 const struct GNUNET_SET_AcceptMessage *msg)
1862 struct ClientState *cs = cls;
1864 struct Operation *op;
1865 struct GNUNET_SET_ResultMessage *result_message;
1866 struct GNUNET_MQ_Envelope *ev;
1867 struct Listener *listener;
1869 if (NULL == (set = cs->set))
1871 /* client without a set requested to accept */
1873 GNUNET_SERVICE_client_drop (cs->client);
1876 op = get_incoming (ntohl (msg->accept_reject_id));
1879 /* It is not an error if the set op does not exist -- it may
1880 * have been destroyed when the partner peer disconnected. */
1881 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1882 "Client %p accepted request %u of listener %p that is no longer active\n",
1884 ntohl (msg->accept_reject_id),
1886 ev = GNUNET_MQ_msg (result_message,
1887 GNUNET_MESSAGE_TYPE_SET_RESULT);
1888 result_message->request_id = msg->request_id;
1889 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1890 GNUNET_MQ_send (set->cs->mq,
1892 GNUNET_SERVICE_client_continue (cs->client);
1895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1896 "Client accepting request %u\n",
1897 (uint32_t) ntohl (msg->accept_reject_id));
1898 listener = op->listener;
1899 op->listener = NULL;
1900 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1904 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1907 op->client_request_id = ntohl (msg->request_id);
1908 op->result_mode = ntohl (msg->result_mode);
1909 op->byzantine = msg->byzantine;
1910 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1911 op->force_full = msg->force_full;
1912 op->force_delta = msg->force_delta;
1914 /* Advance generation values, so that future mutations do not
1915 interfer with the running operation. */
1916 op->generation_created = set->current_generation;
1917 advance_generation (set);
1918 GNUNET_assert (NULL == op->state);
1919 op->state = set->vt->accept (op);
1920 if (NULL == op->state)
1923 GNUNET_SERVICE_client_drop (cs->client);
1926 /* Now allow CADET to continue, as we did not do this in
1927 #handle_incoming_msg (as we wanted to first see if the
1928 local client would accept the request). */
1929 GNUNET_CADET_receive_done (op->channel);
1930 GNUNET_SERVICE_client_continue (cs->client);
1935 * Called to clean up, after a shutdown has been requested.
1937 * @param cls closure, NULL
1940 shutdown_task (void *cls)
1942 /* Delay actual shutdown to allow service to disconnect clients */
1943 in_shutdown = GNUNET_YES;
1944 if (0 == num_clients)
1948 GNUNET_CADET_disconnect (cadet);
1952 GNUNET_STATISTICS_destroy (_GSS_statistics,
1954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955 "handled shutdown request\n");
1960 * Function called by the service's run
1961 * method to run service-specific setup code.
1963 * @param cls closure
1964 * @param cfg configuration to use
1965 * @param service the initialized service
1969 const struct GNUNET_CONFIGURATION_Handle *cfg,
1970 struct GNUNET_SERVICE_Handle *service)
1972 /* FIXME: need to modify SERVICE (!) API to allow
1973 us to run a shutdown task *after* clients were
1974 forcefully disconnected! */
1975 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1977 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1979 cadet = GNUNET_CADET_connect (cfg);
1982 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1983 _("Could not connect to CADET service\n"));
1984 GNUNET_SCHEDULER_shutdown ();
1991 * Define "main" method using service macro.
1995 GNUNET_SERVICE_OPTION_NONE,
1998 &client_disconnect_cb,
2000 GNUNET_MQ_hd_fixed_size (client_accept,
2001 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2002 struct GNUNET_SET_AcceptMessage,
2004 GNUNET_MQ_hd_fixed_size (client_iter_ack,
2005 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2006 struct GNUNET_SET_IterAckMessage,
2008 GNUNET_MQ_hd_var_size (client_mutation,
2009 GNUNET_MESSAGE_TYPE_SET_ADD,
2010 struct GNUNET_SET_ElementMessage,
2012 GNUNET_MQ_hd_fixed_size (client_create_set,
2013 GNUNET_MESSAGE_TYPE_SET_CREATE,
2014 struct GNUNET_SET_CreateMessage,
2016 GNUNET_MQ_hd_fixed_size (client_iterate,
2017 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2018 struct GNUNET_MessageHeader,
2020 GNUNET_MQ_hd_var_size (client_evaluate,
2021 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2022 struct GNUNET_SET_EvaluateMessage,
2024 GNUNET_MQ_hd_fixed_size (client_listen,
2025 GNUNET_MESSAGE_TYPE_SET_LISTEN,
2026 struct GNUNET_SET_ListenMessage,
2028 GNUNET_MQ_hd_fixed_size (client_reject,
2029 GNUNET_MESSAGE_TYPE_SET_REJECT,
2030 struct GNUNET_SET_RejectMessage,
2032 GNUNET_MQ_hd_var_size (client_mutation,
2033 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2034 struct GNUNET_SET_ElementMessage,
2036 GNUNET_MQ_hd_fixed_size (client_cancel,
2037 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2038 struct GNUNET_SET_CancelMessage,
2040 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2041 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2042 struct GNUNET_MessageHeader,
2044 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2045 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2046 struct GNUNET_SET_CopyLazyConnectMessage,
2048 GNUNET_MQ_handler_end ());
2051 /* end of gnunet-service-set.c */