2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file set/gnunet-service-set.c
20 * @brief two-peer set operations
21 * @author Florian Dold
22 * @author Christian Grothoff
24 #include "gnunet-service-set.h"
25 #include "gnunet-service-set_union.h"
26 #include "gnunet-service-set_intersection.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
31 * How long do we hold on to an incoming channel if there is
32 * no local listener before giving up?
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38 * Lazy copy requests made by a client.
40 struct LazyCopyRequest
45 struct LazyCopyRequest *prev;
50 struct LazyCopyRequest *next;
53 * Which set are we supposed to copy?
55 struct Set *source_set;
58 * Cookie identifying the request.
66 * A listener is inhabited by a client, and waits for evaluation
67 * requests from remote peers.
72 * Listeners are held in a doubly linked list.
74 struct Listener *next;
77 * Listeners are held in a doubly linked list.
79 struct Listener *prev;
82 * Head of DLL of operations this listener is responsible for.
83 * Once the client has accepted/declined the operation, the
84 * operation is moved to the respective set's operation DLLS.
86 struct Operation *op_head;
89 * Tail of DLL of operations this listener is responsible for.
90 * Once the client has accepted/declined the operation, the
91 * operation is moved to the respective set's operation DLLS.
93 struct Operation *op_tail;
96 * Client that owns the listener.
97 * Only one client may own a listener.
99 struct ClientState *cs;
102 * The port we are listening on with CADET.
104 struct GNUNET_CADET_Port *open_port;
107 * Application ID for the operation, used to distinguish
108 * multiple operations of the same type with the same peer.
110 struct GNUNET_HashCode app_id;
113 * The type of the operation.
115 enum GNUNET_SET_OperationType operation;
120 * Handle to the cadet service, used to listen for and connect to
123 static struct GNUNET_CADET_Handle *cadet;
126 * DLL of lazy copy requests by this client.
128 static struct LazyCopyRequest *lazy_copy_head;
131 * DLL of lazy copy requests by this client.
133 static struct LazyCopyRequest *lazy_copy_tail;
136 * Generator for unique cookie we set per lazy copy request.
138 static uint32_t lazy_copy_cookie;
143 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146 * Listeners are held in a doubly linked list.
148 static struct Listener *listener_head;
151 * Listeners are held in a doubly linked list.
153 static struct Listener *listener_tail;
156 * Number of active clients.
158 static unsigned int num_clients;
161 * Are we in shutdown? if #GNUNET_YES and the number of clients
162 * drops to zero, disconnect from CADET.
164 static int in_shutdown;
167 * Counter for allocating unique IDs for clients, used to identify
168 * incoming operation requests from remote peers, that the client can
169 * choose to accept or refuse. 0 must not be used (reserved for
172 static uint32_t suggest_id;
176 * Get the incoming socket associated with the given id.
178 * @param listener the listener to look in
179 * @param id id to look for
180 * @return the incoming socket associated with the id,
181 * or NULL if there is none
183 static struct Operation *
184 get_incoming (uint32_t id)
186 for (struct Listener *listener = listener_head;
188 listener = listener->next)
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
199 * Destroy an incoming request from a remote peer
201 * @param op remote request to destroy
204 incoming_destroy (struct Operation *op)
206 struct Listener *listener;
207 struct GNUNET_CADET_Channel *channel;
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Destroying incoming operation %p\n",
212 if (NULL != (listener = op->listener))
214 GNUNET_CONTAINER_DLL_remove (listener->op_head,
219 if (NULL != op->timeout_task)
221 GNUNET_SCHEDULER_cancel (op->timeout_task);
222 op->timeout_task = NULL;
224 _GSS_operation_destroy2 (op);
229 * Context for the #garbage_collect_cb().
231 struct GarbageContext
235 * Map for which we are garbage collecting removed elements.
237 struct GNUNET_CONTAINER_MultiHashMap *map;
240 * Lowest generation for which an operation is still pending.
242 unsigned int min_op_generation;
245 * Largest generation for which an operation is still pending.
247 unsigned int max_op_generation;
253 * Function invoked to check if an element can be removed from
254 * the set's history because it is no longer needed.
256 * @param cls the `struct GarbageContext *`
257 * @param key key of the element in the map
258 * @param value the `struct ElementEntry *`
259 * @return #GNUNET_OK (continue to iterate)
262 garbage_collect_cb (void *cls,
263 const struct GNUNET_HashCode *key,
266 //struct GarbageContext *gc = cls;
267 //struct ElementEntry *ee = value;
269 //if (GNUNET_YES != ee->removed)
271 //if ( (gc->max_op_generation < ee->generation_added) ||
272 // (ee->generation_removed > gc->min_op_generation) )
274 // GNUNET_assert (GNUNET_YES ==
275 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
285 * Collect and destroy elements that are not needed anymore, because
286 * their lifetime (as determined by their generation) does not overlap
287 * with any active set operation.
289 * @param set set to garbage collect
292 collect_generation_garbage (struct Set *set)
294 struct GarbageContext gc;
296 gc.min_op_generation = UINT_MAX;
297 gc.max_op_generation = 0;
298 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
300 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
301 op->generation_created);
302 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
303 op->generation_created);
305 gc.map = set->content->elements;
306 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
313 * Is @a generation in the range of exclusions?
315 * @param generation generation to query
316 * @param excluded array of generations where the element is excluded
317 * @param excluded_size length of the @a excluded array
318 * @return #GNUNET_YES if @a generation is in any of the ranges
321 is_excluded_generation (unsigned int generation,
322 struct GenerationRange *excluded,
323 unsigned int excluded_size)
325 for (unsigned int i = 0; i < excluded_size; i++)
326 if ( (generation >= excluded[i].start) &&
327 (generation < excluded[i].end) )
334 * Is element @a ee part of the set during @a query_generation?
336 * @param ee element to test
337 * @param query_generation generation to query
338 * @param excluded array of generations where the element is excluded
339 * @param excluded_size length of the @a excluded array
340 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
343 is_element_of_generation (struct ElementEntry *ee,
344 unsigned int query_generation,
345 struct GenerationRange *excluded,
346 unsigned int excluded_size)
348 struct MutationEvent *mut;
351 GNUNET_assert (NULL != ee->mutations);
353 is_excluded_generation (query_generation,
361 is_present = GNUNET_NO;
363 /* Could be made faster with binary search, but lists
364 are small, so why bother. */
365 for (unsigned int i = 0; i < ee->mutations_size; i++)
367 mut = &ee->mutations[i];
369 if (mut->generation > query_generation)
371 /* The mutation doesn't apply to our generation
372 anymore. We can'b break here, since mutations aren't
373 sorted by generation. */
378 is_excluded_generation (mut->generation,
382 /* The generation is excluded (because it belongs to another
383 fork via a lazy copy) and thus mutations aren't considered
384 for membership testing. */
388 /* This would be an inconsistency in how we manage mutations. */
389 if ( (GNUNET_YES == is_present) &&
390 (GNUNET_YES == mut->added) )
393 if ( (GNUNET_NO == is_present) &&
394 (GNUNET_NO == mut->added) )
397 is_present = mut->added;
405 * Is element @a ee part of the set used by @a op?
407 * @param ee element to test
408 * @param op operation the defines the set and its generation
409 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
412 _GSS_is_element_of_operation (struct ElementEntry *ee,
413 struct Operation *op)
415 return is_element_of_generation (ee,
416 op->generation_created,
417 op->set->excluded_generations,
418 op->set->excluded_generations_size);
423 * Destroy the given operation. Used for any operation where both
424 * peers were known and that thus actually had a vt and channel. Must
425 * not be used for operations where 'listener' is still set and we do
426 * not know the other peer.
428 * Call the implementation-specific cancel function of the operation.
429 * Disconnects from the remote peer. Does not disconnect the client,
430 * as there may be multiple operations per set.
432 * @param op operation to destroy
433 * @param gc #GNUNET_YES to perform garbage collection on the set
436 _GSS_operation_destroy (struct Operation *op,
439 struct Set *set = op->set;
440 struct GNUNET_CADET_Channel *channel;
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Destroying operation %p\n",
445 GNUNET_assert (NULL == op->listener);
446 if (NULL != op->state)
448 set->vt->cancel (op);
453 GNUNET_CONTAINER_DLL_remove (set->ops_head,
458 if (NULL != op->context_msg)
460 GNUNET_free (op->context_msg);
461 op->context_msg = NULL;
463 if (NULL != (channel = op->channel))
465 /* This will free op; called conditionally as this helper function
466 is also called from within the channel disconnect handler. */
468 GNUNET_CADET_channel_destroy (channel);
470 if ( (NULL != set) &&
472 collect_generation_garbage (set);
473 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
474 * there was a channel end handler that will free 'op' on the call stack. */
479 * Callback called when a client connects to the service.
481 * @param cls closure for the service
482 * @param c the new client that connected to the service
483 * @param mq the message queue used to send messages to the client
484 * @return @a `struct ClientState`
487 client_connect_cb (void *cls,
488 struct GNUNET_SERVICE_Client *c,
489 struct GNUNET_MQ_Handle *mq)
491 struct ClientState *cs;
494 cs = GNUNET_new (struct ClientState);
502 * Iterator over hash map entries to free element entries.
505 * @param key current key code
506 * @param value a `struct ElementEntry *` to be free'd
507 * @return #GNUNET_YES (continue to iterate)
510 destroy_elements_iterator (void *cls,
511 const struct GNUNET_HashCode *key,
514 struct ElementEntry *ee = value;
516 GNUNET_free_non_null (ee->mutations);
523 * Clean up after a client has disconnected
525 * @param cls closure, unused
526 * @param client the client to clean up after
527 * @param internal_cls the `struct ClientState`
530 client_disconnect_cb (void *cls,
531 struct GNUNET_SERVICE_Client *client,
534 struct ClientState *cs = internal_cls;
535 struct Operation *op;
536 struct Listener *listener;
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540 "Client disconnected, cleaning up\n");
541 if (NULL != (set = cs->set))
543 struct SetContent *content = set->content;
544 struct PendingMutation *pm;
545 struct PendingMutation *pm_current;
546 struct LazyCopyRequest *lcr;
548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549 "Destroying client's set\n");
550 /* Destroy pending set operations */
551 while (NULL != set->ops_head)
552 _GSS_operation_destroy (set->ops_head,
555 /* Destroy operation-specific state */
556 GNUNET_assert (NULL != set->state);
557 set->vt->destroy_set (set->state);
560 /* Clean up ongoing iterations */
561 if (NULL != set->iter)
563 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
568 /* discard any pending mutations that reference this set */
569 pm = content->pending_mutations_head;
574 if (pm_current->set == set)
576 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
577 content->pending_mutations_tail,
579 GNUNET_free (pm_current);
583 /* free set content (or at least decrement RC) */
585 GNUNET_assert (0 != content->refcount);
587 if (0 == content->refcount)
589 GNUNET_assert (NULL != content->elements);
590 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
591 &destroy_elements_iterator,
593 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
594 content->elements = NULL;
595 GNUNET_free (content);
597 GNUNET_free_non_null (set->excluded_generations);
598 set->excluded_generations = NULL;
600 /* remove set from pending copy requests */
601 lcr = lazy_copy_head;
604 struct LazyCopyRequest *lcr_current = lcr;
607 if (lcr_current->source_set == set)
609 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
612 GNUNET_free (lcr_current);
618 if (NULL != (listener = cs->listener))
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621 "Destroying client's listener\n");
622 GNUNET_CADET_close_port (listener->open_port);
623 listener->open_port = NULL;
624 while (NULL != (op = listener->op_head))
626 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
627 "Destroying incoming operation `%u' from peer `%s'\n",
628 (unsigned int) op->client_request_id,
629 GNUNET_i2s (&op->peer));
630 incoming_destroy (op);
632 GNUNET_CONTAINER_DLL_remove (listener_head,
635 GNUNET_free (listener);
639 if ( (GNUNET_YES == in_shutdown) &&
644 GNUNET_CADET_disconnect (cadet);
652 * Check a request for a set operation from another peer.
654 * @param cls the operation state
655 * @param msg the received message
656 * @return #GNUNET_OK if the channel should be kept alive,
657 * #GNUNET_SYSERR to destroy the channel
660 check_incoming_msg (void *cls,
661 const struct OperationRequestMessage *msg)
663 struct Operation *op = cls;
664 struct Listener *listener = op->listener;
665 const struct GNUNET_MessageHeader *nested_context;
667 /* double operation request */
668 if (0 != op->suggest_id)
671 return GNUNET_SYSERR;
673 /* This should be equivalent to the previous condition, but can't hurt to check twice */
674 if (NULL == op->listener)
677 return GNUNET_SYSERR;
679 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
682 return GNUNET_SYSERR;
684 nested_context = GNUNET_MQ_extract_nested_mh (msg);
685 if ( (NULL != nested_context) &&
686 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
689 return GNUNET_SYSERR;
696 * Handle a request for a set operation from another peer. Checks if we
697 * have a listener waiting for such a request (and in that case initiates
698 * asking the listener about accepting the connection). If no listener
699 * is waiting, we queue the operation request in hope that a listener
700 * shows up soon (before timeout).
702 * This msg is expected as the first and only msg handled through the
703 * non-operation bound virtual table, acceptance of this operation replaces
704 * our virtual table and subsequent msgs would be routed differently (as
705 * we then know what type of operation this is).
707 * @param cls the operation state
708 * @param msg the received message
709 * @return #GNUNET_OK if the channel should be kept alive,
710 * #GNUNET_SYSERR to destroy the channel
713 handle_incoming_msg (void *cls,
714 const struct OperationRequestMessage *msg)
716 struct Operation *op = cls;
717 struct Listener *listener = op->listener;
718 const struct GNUNET_MessageHeader *nested_context;
719 struct GNUNET_MQ_Envelope *env;
720 struct GNUNET_SET_RequestMessage *cmsg;
722 nested_context = GNUNET_MQ_extract_nested_mh (msg);
723 /* Make a copy of the nested_context (application-specific context
724 information that is opaque to set) so we can pass it to the
726 if (NULL != nested_context)
727 op->context_msg = GNUNET_copy_message (nested_context);
728 op->remote_element_count = ntohl (msg->element_count);
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Received P2P operation request (op %u, port %s) for active listener\n",
731 (uint32_t) ntohl (msg->operation),
732 GNUNET_h2s (&op->listener->app_id));
733 GNUNET_assert (0 == op->suggest_id);
736 op->suggest_id = suggest_id++;
737 GNUNET_assert (NULL != op->timeout_task);
738 GNUNET_SCHEDULER_cancel (op->timeout_task);
739 op->timeout_task = NULL;
740 env = GNUNET_MQ_msg_nested_mh (cmsg,
741 GNUNET_MESSAGE_TYPE_SET_REQUEST,
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
748 cmsg->accept_id = htonl (op->suggest_id);
749 cmsg->peer_id = op->peer;
750 GNUNET_MQ_send (listener->cs->mq,
752 /* NOTE: GNUNET_CADET_receive_done() will be called in
753 #handle_client_accept() */
758 * Add an element to @a set as specified by @a msg
760 * @param set set to manipulate
761 * @param msg message specifying the change
764 execute_add (struct Set *set,
765 const struct GNUNET_SET_ElementMessage *msg)
767 struct GNUNET_SET_Element el;
768 struct ElementEntry *ee;
769 struct GNUNET_HashCode hash;
771 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
772 el.size = ntohs (msg->header.size) - sizeof (*msg);
774 el.element_type = ntohs (msg->element_type);
775 GNUNET_SET_element_hash (&el,
777 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "Client inserts element %s of size %u\n",
785 ee = GNUNET_malloc (el.size + sizeof (*ee));
786 ee->element.size = el.size;
787 GNUNET_memcpy (&ee[1],
790 ee->element.data = &ee[1];
791 ee->element.element_type = el.element_type;
792 ee->remote = GNUNET_NO;
793 ee->mutations = NULL;
794 ee->mutations_size = 0;
795 ee->element_hash = hash;
796 GNUNET_break (GNUNET_YES ==
797 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
800 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
802 else if (GNUNET_YES ==
803 is_element_of_generation (ee,
804 set->current_generation,
805 set->excluded_generations,
806 set->excluded_generations_size))
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809 "Client inserted element %s of size %u twice (ignored)\n",
813 /* same element inserted twice */
818 struct MutationEvent mut = {
819 .generation = set->current_generation,
822 GNUNET_array_append (ee->mutations,
826 set->vt->add (set->state,
832 * Remove an element from @a set as specified by @a msg
834 * @param set set to manipulate
835 * @param msg message specifying the change
838 execute_remove (struct Set *set,
839 const struct GNUNET_SET_ElementMessage *msg)
841 struct GNUNET_SET_Element el;
842 struct ElementEntry *ee;
843 struct GNUNET_HashCode hash;
845 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
846 el.size = ntohs (msg->header.size) - sizeof (*msg);
848 el.element_type = ntohs (msg->element_type);
849 GNUNET_SET_element_hash (&el, &hash);
850 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
854 /* Client tried to remove non-existing element. */
855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856 "Client removes non-existing element of size %u\n",
861 is_element_of_generation (ee,
862 set->current_generation,
863 set->excluded_generations,
864 set->excluded_generations_size))
866 /* Client tried to remove element twice */
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868 "Client removed element of size %u twice (ignored)\n",
874 struct MutationEvent mut = {
875 .generation = set->current_generation,
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Client removes element of size %u\n",
883 GNUNET_array_append (ee->mutations,
887 set->vt->remove (set->state,
893 * Perform a mutation on a set as specified by the @a msg
895 * @param set the set to mutate
896 * @param msg specification of what to change
899 execute_mutation (struct Set *set,
900 const struct GNUNET_SET_ElementMessage *msg)
902 switch (ntohs (msg->header.type))
904 case GNUNET_MESSAGE_TYPE_SET_ADD:
905 execute_add (set, msg);
907 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
908 execute_remove (set, msg);
917 * Execute mutations that were delayed on a set because of
918 * pending operations.
920 * @param set the set to execute mutations on
923 execute_delayed_mutations (struct Set *set)
925 struct PendingMutation *pm;
927 if (0 != set->content->iterator_count)
928 return; /* still cannot do this */
929 while (NULL != (pm = set->content->pending_mutations_head))
931 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
932 set->content->pending_mutations_tail,
934 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935 "Executing pending mutation on %p.\n",
937 execute_mutation (pm->set,
939 GNUNET_free (pm->msg);
946 * Send the next element of a set to the set's client. The next element is given by
947 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
948 * are no more elements in the set. The caller must ensure that the set's iterator is
951 * The client will acknowledge each received element with a
952 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
953 * #handle_client_iter_ack() will then trigger the next transmission.
954 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
956 * @param set set that should send its next element to its client
959 send_client_element (struct Set *set)
962 struct ElementEntry *ee;
963 struct GNUNET_MQ_Envelope *ev;
964 struct GNUNET_SET_IterResponseMessage *msg;
966 GNUNET_assert (NULL != set->iter);
968 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
970 (const void **) &ee);
971 if (GNUNET_NO == ret)
973 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974 "Iteration on %p done.\n",
976 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
977 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
980 GNUNET_assert (set->content->iterator_count > 0);
981 set->content->iterator_count--;
982 execute_delayed_mutations (set);
983 GNUNET_MQ_send (set->cs->mq,
987 GNUNET_assert (NULL != ee);
988 } while (GNUNET_NO ==
989 is_element_of_generation (ee,
990 set->iter_generation,
991 set->excluded_generations,
992 set->excluded_generations_size));
993 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994 "Sending iteration element on %p.\n",
996 ev = GNUNET_MQ_msg_extra (msg,
998 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
999 GNUNET_memcpy (&msg[1],
1002 msg->element_type = htons (ee->element.element_type);
1003 msg->iteration_id = htons (set->iteration_id);
1004 GNUNET_MQ_send (set->cs->mq,
1010 * Called when a client wants to iterate the elements of a set.
1011 * Checks if we have a set associated with the client and if we
1012 * can right now start an iteration. If all checks out, starts
1013 * sending the elements of the set to the client.
1015 * @param cls client that sent the message
1016 * @param m message sent by the client
1019 handle_client_iterate (void *cls,
1020 const struct GNUNET_MessageHeader *m)
1022 struct ClientState *cs = cls;
1025 if (NULL == (set = cs->set))
1027 /* attempt to iterate over a non existing set */
1029 GNUNET_SERVICE_client_drop (cs->client);
1032 if (NULL != set->iter)
1034 /* Only one concurrent iterate-action allowed per set */
1036 GNUNET_SERVICE_client_drop (cs->client);
1039 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040 "Iterating set %p in gen %u with %u content elements\n",
1042 set->current_generation,
1043 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1044 GNUNET_SERVICE_client_continue (cs->client);
1045 set->content->iterator_count++;
1046 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1047 set->iter_generation = set->current_generation;
1048 send_client_element (set);
1053 * Called when a client wants to create a new set. This is typically
1054 * the first request from a client, and includes the type of set
1055 * operation to be performed.
1057 * @param cls client that sent the message
1058 * @param m message sent by the client
1061 handle_client_create_set (void *cls,
1062 const struct GNUNET_SET_CreateMessage *msg)
1064 struct ClientState *cs = cls;
1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068 "Client created new set (operation %u)\n",
1069 (uint32_t) ntohl (msg->operation));
1070 if (NULL != cs->set)
1072 /* There can only be one set per client */
1074 GNUNET_SERVICE_client_drop (cs->client);
1077 set = GNUNET_new (struct Set);
1078 switch (ntohl (msg->operation))
1080 case GNUNET_SET_OPERATION_INTERSECTION:
1081 set->vt = _GSS_intersection_vt ();
1083 case GNUNET_SET_OPERATION_UNION:
1084 set->vt = _GSS_union_vt ();
1089 GNUNET_SERVICE_client_drop (cs->client);
1092 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1093 set->state = set->vt->create ();
1094 if (NULL == set->state)
1096 /* initialization failed (i.e. out of memory) */
1098 GNUNET_SERVICE_client_drop (cs->client);
1101 set->content = GNUNET_new (struct SetContent);
1102 set->content->refcount = 1;
1103 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1107 GNUNET_SERVICE_client_continue (cs->client);
1112 * Timeout happens iff:
1113 * - we suggested an operation to our listener,
1114 * but did not receive a response in time
1115 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1117 * @param cls channel context
1118 * @param tc context information (why was this task triggered now)
1121 incoming_timeout_cb (void *cls)
1123 struct Operation *op = cls;
1125 op->timeout_task = NULL;
1126 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127 "Remote peer's incoming request timed out\n");
1128 incoming_destroy (op);
1133 * Method called whenever another peer has added us to a channel the
1134 * other peer initiated. Only called (once) upon reception of data
1135 * from a channel we listen on.
1137 * The channel context represents the operation itself and gets added
1138 * to a DLL, from where it gets looked up when our local listener
1139 * client responds to a proposed/suggested operation or connects and
1140 * associates with this operation.
1142 * @param cls closure
1143 * @param channel new handle to the channel
1144 * @param source peer that started the channel
1145 * @return initial channel context for the channel
1146 * returns NULL on error
1149 channel_new_cb (void *cls,
1150 struct GNUNET_CADET_Channel *channel,
1151 const struct GNUNET_PeerIdentity *source)
1153 struct Listener *listener = cls;
1154 struct Operation *op;
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "New incoming channel\n");
1158 op = GNUNET_new (struct Operation);
1159 op->listener = listener;
1161 op->channel = channel;
1162 op->mq = GNUNET_CADET_get_mq (op->channel);
1163 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1166 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1167 &incoming_timeout_cb,
1169 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1177 * Function called whenever a channel is destroyed. Should clean up
1178 * any associated state. It must NOT call
1179 * GNUNET_CADET_channel_destroy() on the channel.
1181 * The peer_disconnect function is part of a a virtual table set initially either
1182 * when a peer creates a new channel with us, or once we create
1183 * a new channel ourselves (evaluate).
1185 * Once we know the exact type of operation (union/intersection), the vt is
1186 * replaced with an operation specific instance (_GSS_[op]_vt).
1188 * @param channel_ctx place where local state associated
1189 * with the channel is stored
1190 * @param channel connection to the other end (henceforth invalid)
1193 channel_end_cb (void *channel_ctx,
1194 const struct GNUNET_CADET_Channel *channel)
1196 struct Operation *op = channel_ctx;
1199 _GSS_operation_destroy2 (op);
1204 * This function probably should not exist
1205 * and be replaced by inlining more specific
1206 * logic in the various places where it is called.
1209 _GSS_operation_destroy2 (struct Operation *op)
1211 struct GNUNET_CADET_Channel *channel;
1213 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214 "channel_end_cb called\n");
1215 if (NULL != (channel = op->channel))
1217 /* This will free op; called conditionally as this helper function
1218 is also called from within the channel disconnect handler. */
1220 GNUNET_CADET_channel_destroy (channel);
1222 if (NULL != op->listener)
1223 incoming_destroy (op);
1224 else if (NULL != op->set)
1225 op->set->vt->channel_death (op);
1227 _GSS_operation_destroy (op,
1234 * Function called whenever an MQ-channel's transmission window size changes.
1236 * The first callback in an outgoing channel will be with a non-zero value
1237 * and will mean the channel is connected to the destination.
1239 * For an incoming channel it will be called immediately after the
1240 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1242 * @param cls Channel closure.
1243 * @param channel Connection to the other end (henceforth invalid).
1244 * @param window_size New window size. If the is more messages than buffer size
1245 * this value will be negative..
1248 channel_window_cb (void *cls,
1249 const struct GNUNET_CADET_Channel *channel,
1252 /* FIXME: not implemented, we could do flow control here... */
1257 * Called when a client wants to create a new listener.
1259 * @param cls client that sent the message
1260 * @param msg message sent by the client
1263 handle_client_listen (void *cls,
1264 const struct GNUNET_SET_ListenMessage *msg)
1266 struct ClientState *cs = cls;
1267 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1268 GNUNET_MQ_hd_var_size (incoming_msg,
1269 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1270 struct OperationRequestMessage,
1272 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1273 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1276 GNUNET_MQ_hd_var_size (union_p2p_elements,
1277 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1278 struct GNUNET_SET_ElementMessage,
1280 GNUNET_MQ_hd_var_size (union_p2p_offer,
1281 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1282 struct GNUNET_MessageHeader,
1284 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1285 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1286 struct InquiryMessage,
1288 GNUNET_MQ_hd_var_size (union_p2p_demand,
1289 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1290 struct GNUNET_MessageHeader,
1292 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1293 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1294 struct GNUNET_MessageHeader,
1296 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1297 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1298 struct GNUNET_MessageHeader,
1300 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1301 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1302 struct GNUNET_MessageHeader,
1304 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1305 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1306 struct GNUNET_MessageHeader,
1308 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1309 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1310 struct StrataEstimatorMessage,
1312 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1313 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1314 struct StrataEstimatorMessage,
1316 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1317 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1318 struct GNUNET_SET_ElementMessage,
1320 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1321 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1322 struct IntersectionElementInfoMessage,
1324 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1325 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1328 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1329 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1330 struct IntersectionDoneMessage,
1332 GNUNET_MQ_handler_end ()
1334 struct Listener *listener;
1336 if (NULL != cs->listener)
1338 /* max. one active listener per client! */
1340 GNUNET_SERVICE_client_drop (cs->client);
1343 listener = GNUNET_new (struct Listener);
1345 cs->listener = listener;
1346 listener->app_id = msg->app_id;
1347 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1348 GNUNET_CONTAINER_DLL_insert (listener_head,
1351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352 "New listener created (op %u, port %s)\n",
1353 listener->operation,
1354 GNUNET_h2s (&listener->app_id));
1356 = GNUNET_CADET_open_port (cadet,
1363 GNUNET_SERVICE_client_continue (cs->client);
1368 * Called when the listening client rejects an operation
1369 * request by another peer.
1371 * @param cls client that sent the message
1372 * @param msg message sent by the client
1375 handle_client_reject (void *cls,
1376 const struct GNUNET_SET_RejectMessage *msg)
1378 struct ClientState *cs = cls;
1379 struct Operation *op;
1381 op = get_incoming (ntohl (msg->accept_reject_id));
1384 /* no matching incoming operation for this reject;
1385 could be that the other peer already disconnected... */
1386 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1387 "Client rejected unknown operation %u\n",
1388 (unsigned int) ntohl (msg->accept_reject_id));
1389 GNUNET_SERVICE_client_continue (cs->client);
1392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393 "Peer request (op %u, app %s) rejected by client\n",
1394 op->listener->operation,
1395 GNUNET_h2s (&cs->listener->app_id));
1396 _GSS_operation_destroy2 (op);
1397 GNUNET_SERVICE_client_continue (cs->client);
1402 * Called when a client wants to add or remove an element to a set it inhabits.
1404 * @param cls client that sent the message
1405 * @param msg message sent by the client
1408 check_client_mutation (void *cls,
1409 const struct GNUNET_SET_ElementMessage *msg)
1411 /* NOTE: Technically, we should probably check with the
1412 block library whether the element we are given is well-formed */
1418 * Called when a client wants to add or remove an element to a set it inhabits.
1420 * @param cls client that sent the message
1421 * @param msg message sent by the client
1424 handle_client_mutation (void *cls,
1425 const struct GNUNET_SET_ElementMessage *msg)
1427 struct ClientState *cs = cls;
1430 if (NULL == (set = cs->set))
1432 /* client without a set requested an operation */
1434 GNUNET_SERVICE_client_drop (cs->client);
1437 GNUNET_SERVICE_client_continue (cs->client);
1439 if (0 != set->content->iterator_count)
1441 struct PendingMutation *pm;
1443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444 "Scheduling mutation on set\n");
1445 pm = GNUNET_new (struct PendingMutation);
1446 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1448 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1449 set->content->pending_mutations_tail,
1453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1454 "Executing mutation on set\n");
1455 execute_mutation (set,
1461 * Advance the current generation of a set,
1462 * adding exclusion ranges if necessary.
1464 * @param set the set where we want to advance the generation
1467 advance_generation (struct Set *set)
1469 struct GenerationRange r;
1471 if (set->current_generation == set->content->latest_generation)
1473 set->content->latest_generation++;
1474 set->current_generation++;
1478 GNUNET_assert (set->current_generation < set->content->latest_generation);
1480 r.start = set->current_generation + 1;
1481 r.end = set->content->latest_generation + 1;
1482 set->content->latest_generation = r.end;
1483 set->current_generation = r.end;
1484 GNUNET_array_append (set->excluded_generations,
1485 set->excluded_generations_size,
1491 * Called when a client wants to initiate a set operation with another
1492 * peer. Initiates the CADET connection to the listener and sends the
1495 * @param cls client that sent the message
1496 * @param msg message sent by the client
1497 * @return #GNUNET_OK if the message is well-formed
1500 check_client_evaluate (void *cls,
1501 const struct GNUNET_SET_EvaluateMessage *msg)
1503 /* FIXME: suboptimal, even if the context below could be NULL,
1504 there are malformed messages this does not check for... */
1510 * Called when a client wants to initiate a set operation with another
1511 * peer. Initiates the CADET connection to the listener and sends the
1514 * @param cls client that sent the message
1515 * @param msg message sent by the client
1518 handle_client_evaluate (void *cls,
1519 const struct GNUNET_SET_EvaluateMessage *msg)
1521 struct ClientState *cs = cls;
1522 struct Operation *op = GNUNET_new (struct Operation);
1523 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1524 GNUNET_MQ_hd_var_size (incoming_msg,
1525 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1526 struct OperationRequestMessage,
1528 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1529 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1532 GNUNET_MQ_hd_var_size (union_p2p_elements,
1533 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1534 struct GNUNET_SET_ElementMessage,
1536 GNUNET_MQ_hd_var_size (union_p2p_offer,
1537 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1538 struct GNUNET_MessageHeader,
1540 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1541 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1542 struct InquiryMessage,
1544 GNUNET_MQ_hd_var_size (union_p2p_demand,
1545 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1546 struct GNUNET_MessageHeader,
1548 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1549 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1550 struct GNUNET_MessageHeader,
1552 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1553 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1554 struct GNUNET_MessageHeader,
1556 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1557 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1558 struct GNUNET_MessageHeader,
1560 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1561 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1562 struct GNUNET_MessageHeader,
1564 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1565 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1566 struct StrataEstimatorMessage,
1568 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1569 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1570 struct StrataEstimatorMessage,
1572 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1573 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1574 struct GNUNET_SET_ElementMessage,
1576 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1577 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1578 struct IntersectionElementInfoMessage,
1580 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1581 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1584 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1585 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1586 struct IntersectionDoneMessage,
1588 GNUNET_MQ_handler_end ()
1591 const struct GNUNET_MessageHeader *context;
1593 if (NULL == (set = cs->set))
1597 GNUNET_SERVICE_client_drop (cs->client);
1600 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1602 op->peer = msg->target_peer;
1603 op->result_mode = ntohl (msg->result_mode);
1604 op->client_request_id = ntohl (msg->request_id);
1605 op->byzantine = msg->byzantine;
1606 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1607 op->force_full = msg->force_full;
1608 op->force_delta = msg->force_delta;
1609 context = GNUNET_MQ_extract_nested_mh (msg);
1611 /* Advance generation values, so that
1612 mutations won't interfer with the running operation. */
1614 op->generation_created = set->current_generation;
1615 advance_generation (set);
1616 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620 "Creating new CADET channel to port %s for set operation type %u\n",
1621 GNUNET_h2s (&msg->app_id),
1623 op->channel = GNUNET_CADET_channel_create (cadet,
1627 GNUNET_CADET_OPTION_RELIABLE,
1631 op->mq = GNUNET_CADET_get_mq (op->channel);
1632 op->state = set->vt->evaluate (op,
1634 if (NULL == op->state)
1637 GNUNET_SERVICE_client_drop (cs->client);
1640 GNUNET_SERVICE_client_continue (cs->client);
1645 * Handle an ack from a client, and send the next element. Note
1646 * that we only expect acks for set elements, not after the
1647 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1649 * @param cls client the client
1650 * @param ack the message
1653 handle_client_iter_ack (void *cls,
1654 const struct GNUNET_SET_IterAckMessage *ack)
1656 struct ClientState *cs = cls;
1659 if (NULL == (set = cs->set))
1661 /* client without a set acknowledged receiving a value */
1663 GNUNET_SERVICE_client_drop (cs->client);
1666 if (NULL == set->iter)
1668 /* client sent an ack, but we were not expecting one (as
1669 set iteration has finished) */
1671 GNUNET_SERVICE_client_drop (cs->client);
1674 GNUNET_SERVICE_client_continue (cs->client);
1675 if (ntohl (ack->send_more))
1677 send_client_element (set);
1681 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1683 set->iteration_id++;
1689 * Handle a request from the client to copy a set.
1691 * @param cls the client
1692 * @param mh the message
1695 handle_client_copy_lazy_prepare (void *cls,
1696 const struct GNUNET_MessageHeader *mh)
1698 struct ClientState *cs = cls;
1700 struct LazyCopyRequest *cr;
1701 struct GNUNET_MQ_Envelope *ev;
1702 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1704 if (NULL == (set = cs->set))
1706 /* client without a set requested an operation */
1708 GNUNET_SERVICE_client_drop (cs->client);
1711 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1712 "Client requested creation of lazy copy\n");
1713 cr = GNUNET_new (struct LazyCopyRequest);
1714 cr->cookie = ++lazy_copy_cookie;
1715 cr->source_set = set;
1716 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1719 ev = GNUNET_MQ_msg (resp_msg,
1720 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1721 resp_msg->cookie = cr->cookie;
1722 GNUNET_MQ_send (set->cs->mq,
1724 GNUNET_SERVICE_client_continue (cs->client);
1729 * Handle a request from the client to connect to a copy of a set.
1731 * @param cls the client
1732 * @param msg the message
1735 handle_client_copy_lazy_connect (void *cls,
1736 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1738 struct ClientState *cs = cls;
1739 struct LazyCopyRequest *cr;
1743 if (NULL != cs->set)
1745 /* There can only be one set per client */
1747 GNUNET_SERVICE_client_drop (cs->client);
1751 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1753 if (cr->cookie == msg->cookie)
1759 if (GNUNET_NO == found)
1761 /* client asked for copy with cookie we don't know */
1763 GNUNET_SERVICE_client_drop (cs->client);
1766 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1769 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770 "Client %p requested use of lazy copy\n",
1772 set = GNUNET_new (struct Set);
1773 switch (cr->source_set->operation)
1775 case GNUNET_SET_OPERATION_INTERSECTION:
1776 set->vt = _GSS_intersection_vt ();
1778 case GNUNET_SET_OPERATION_UNION:
1779 set->vt = _GSS_union_vt ();
1786 if (NULL == set->vt->copy_state)
1788 /* Lazy copy not supported for this set operation */
1792 GNUNET_SERVICE_client_drop (cs->client);
1796 set->operation = cr->source_set->operation;
1797 set->state = set->vt->copy_state (cr->source_set->state);
1798 set->content = cr->source_set->content;
1799 set->content->refcount++;
1801 set->current_generation = cr->source_set->current_generation;
1802 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1803 set->excluded_generations
1804 = GNUNET_memdup (cr->source_set->excluded_generations,
1805 set->excluded_generations_size * sizeof (struct GenerationRange));
1807 /* Advance the generation of the new set, so that mutations to the
1808 of the cloned set and the source set are independent. */
1809 advance_generation (set);
1813 GNUNET_SERVICE_client_continue (cs->client);
1818 * Handle a request from the client to cancel a running set operation.
1820 * @param cls the client
1821 * @param msg the message
1824 handle_client_cancel (void *cls,
1825 const struct GNUNET_SET_CancelMessage *msg)
1827 struct ClientState *cs = cls;
1829 struct Operation *op;
1832 if (NULL == (set = cs->set))
1834 /* client without a set requested an operation */
1836 GNUNET_SERVICE_client_drop (cs->client);
1840 for (op = set->ops_head; NULL != op; op = op->next)
1842 if (op->client_request_id == ntohl (msg->request_id))
1848 if (GNUNET_NO == found)
1850 /* It may happen that the operation was already destroyed due to
1851 * the other peer disconnecting. The client may not know about this
1852 * yet and try to cancel the (just barely non-existent) operation.
1853 * So this is not a hard error.
1855 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1856 "Client canceled non-existent op %u\n",
1857 (uint32_t) ntohl (msg->request_id));
1861 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1862 "Client requested cancel for op %u\n",
1863 (uint32_t) ntohl (msg->request_id));
1864 _GSS_operation_destroy (op,
1867 GNUNET_SERVICE_client_continue (cs->client);
1872 * Handle a request from the client to accept a set operation that
1873 * came from a remote peer. We forward the accept to the associated
1874 * operation for handling
1876 * @param cls the client
1877 * @param msg the message
1880 handle_client_accept (void *cls,
1881 const struct GNUNET_SET_AcceptMessage *msg)
1883 struct ClientState *cs = cls;
1885 struct Operation *op;
1886 struct GNUNET_SET_ResultMessage *result_message;
1887 struct GNUNET_MQ_Envelope *ev;
1888 struct Listener *listener;
1890 if (NULL == (set = cs->set))
1892 /* client without a set requested to accept */
1894 GNUNET_SERVICE_client_drop (cs->client);
1897 op = get_incoming (ntohl (msg->accept_reject_id));
1900 /* It is not an error if the set op does not exist -- it may
1901 * have been destroyed when the partner peer disconnected. */
1902 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1903 "Client %p accepted request %u of listener %p that is no longer active\n",
1905 ntohl (msg->accept_reject_id),
1907 ev = GNUNET_MQ_msg (result_message,
1908 GNUNET_MESSAGE_TYPE_SET_RESULT);
1909 result_message->request_id = msg->request_id;
1910 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1911 GNUNET_MQ_send (set->cs->mq,
1913 GNUNET_SERVICE_client_continue (cs->client);
1916 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917 "Client accepting request %u\n",
1918 (uint32_t) ntohl (msg->accept_reject_id));
1919 listener = op->listener;
1920 op->listener = NULL;
1921 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1925 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1928 op->client_request_id = ntohl (msg->request_id);
1929 op->result_mode = ntohl (msg->result_mode);
1930 op->byzantine = msg->byzantine;
1931 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1932 op->force_full = msg->force_full;
1933 op->force_delta = msg->force_delta;
1935 /* Advance generation values, so that future mutations do not
1936 interfer with the running operation. */
1937 op->generation_created = set->current_generation;
1938 advance_generation (set);
1939 GNUNET_assert (NULL == op->state);
1940 op->state = set->vt->accept (op);
1941 if (NULL == op->state)
1944 GNUNET_SERVICE_client_drop (cs->client);
1947 /* Now allow CADET to continue, as we did not do this in
1948 #handle_incoming_msg (as we wanted to first see if the
1949 local client would accept the request). */
1950 GNUNET_CADET_receive_done (op->channel);
1951 GNUNET_SERVICE_client_continue (cs->client);
1956 * Called to clean up, after a shutdown has been requested.
1958 * @param cls closure, NULL
1961 shutdown_task (void *cls)
1963 /* Delay actual shutdown to allow service to disconnect clients */
1964 in_shutdown = GNUNET_YES;
1965 if (0 == num_clients)
1969 GNUNET_CADET_disconnect (cadet);
1973 GNUNET_STATISTICS_destroy (_GSS_statistics,
1975 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976 "handled shutdown request\n");
1981 * Function called by the service's run
1982 * method to run service-specific setup code.
1984 * @param cls closure
1985 * @param cfg configuration to use
1986 * @param service the initialized service
1990 const struct GNUNET_CONFIGURATION_Handle *cfg,
1991 struct GNUNET_SERVICE_Handle *service)
1993 /* FIXME: need to modify SERVICE (!) API to allow
1994 us to run a shutdown task *after* clients were
1995 forcefully disconnected! */
1996 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1998 _GSS_statistics = GNUNET_STATISTICS_create ("set",
2000 cadet = GNUNET_CADET_connect (cfg);
2003 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2004 _("Could not connect to CADET service\n"));
2005 GNUNET_SCHEDULER_shutdown ();
2012 * Define "main" method using service macro.
2016 GNUNET_SERVICE_OPTION_NONE,
2019 &client_disconnect_cb,
2021 GNUNET_MQ_hd_fixed_size (client_accept,
2022 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2023 struct GNUNET_SET_AcceptMessage,
2025 GNUNET_MQ_hd_fixed_size (client_iter_ack,
2026 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2027 struct GNUNET_SET_IterAckMessage,
2029 GNUNET_MQ_hd_var_size (client_mutation,
2030 GNUNET_MESSAGE_TYPE_SET_ADD,
2031 struct GNUNET_SET_ElementMessage,
2033 GNUNET_MQ_hd_fixed_size (client_create_set,
2034 GNUNET_MESSAGE_TYPE_SET_CREATE,
2035 struct GNUNET_SET_CreateMessage,
2037 GNUNET_MQ_hd_fixed_size (client_iterate,
2038 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2039 struct GNUNET_MessageHeader,
2041 GNUNET_MQ_hd_var_size (client_evaluate,
2042 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2043 struct GNUNET_SET_EvaluateMessage,
2045 GNUNET_MQ_hd_fixed_size (client_listen,
2046 GNUNET_MESSAGE_TYPE_SET_LISTEN,
2047 struct GNUNET_SET_ListenMessage,
2049 GNUNET_MQ_hd_fixed_size (client_reject,
2050 GNUNET_MESSAGE_TYPE_SET_REJECT,
2051 struct GNUNET_SET_RejectMessage,
2053 GNUNET_MQ_hd_var_size (client_mutation,
2054 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2055 struct GNUNET_SET_ElementMessage,
2057 GNUNET_MQ_hd_fixed_size (client_cancel,
2058 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2059 struct GNUNET_SET_CancelMessage,
2061 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2062 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2063 struct GNUNET_MessageHeader,
2065 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2066 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2067 struct GNUNET_SET_CopyLazyConnectMessage,
2069 GNUNET_MQ_handler_end ());
2072 /* end of gnunet-service-set.c */