2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
26 #include "gnunet-service-set.h"
27 #include "gnunet-service-set_union.h"
28 #include "gnunet-service-set_intersection.h"
29 #include "gnunet-service-set_protocol.h"
30 #include "gnunet_statistics_service.h"
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
36 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
40 * Lazy copy requests made by a client.
42 struct LazyCopyRequest
47 struct LazyCopyRequest *prev;
52 struct LazyCopyRequest *next;
55 * Which set are we supposed to copy?
57 struct Set *source_set;
60 * Cookie identifying the request.
68 * A listener is inhabited by a client, and waits for evaluation
69 * requests from remote peers.
74 * Listeners are held in a doubly linked list.
76 struct Listener *next;
79 * Listeners are held in a doubly linked list.
81 struct Listener *prev;
84 * Head of DLL of operations this listener is responsible for.
85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
88 struct Operation *op_head;
91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
95 struct Operation *op_tail;
98 * Client that owns the listener.
99 * Only one client may own a listener.
101 struct ClientState *cs;
104 * The port we are listening on with CADET.
106 struct GNUNET_CADET_Port *open_port;
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
112 struct GNUNET_HashCode app_id;
115 * The type of the operation.
117 enum GNUNET_SET_OperationType operation;
122 * Handle to the cadet service, used to listen for and connect to
125 static struct GNUNET_CADET_Handle *cadet;
128 * DLL of lazy copy requests by this client.
130 static struct LazyCopyRequest *lazy_copy_head;
133 * DLL of lazy copy requests by this client.
135 static struct LazyCopyRequest *lazy_copy_tail;
138 * Generator for unique cookie we set per lazy copy request.
140 static uint32_t lazy_copy_cookie;
145 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
148 * Listeners are held in a doubly linked list.
150 static struct Listener *listener_head;
153 * Listeners are held in a doubly linked list.
155 static struct Listener *listener_tail;
158 * Counter for allocating unique IDs for clients, used to identify
159 * incoming operation requests from remote peers, that the client can
160 * choose to accept or refuse. 0 must not be used (reserved for
163 static uint32_t suggest_id;
167 * Get the incoming socket associated with the given id.
169 * @param listener the listener to look in
170 * @param id id to look for
171 * @return the incoming socket associated with the id,
172 * or NULL if there is none
174 static struct Operation *
175 get_incoming (uint32_t id)
177 for (struct Listener *listener = listener_head;
179 listener = listener->next)
181 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
182 if (op->suggest_id == id)
190 * Destroy an incoming request from a remote peer
192 * @param op remote request to destroy
195 incoming_destroy (struct Operation *op)
197 struct Listener *listener;
198 struct GNUNET_CADET_Channel *channel;
200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201 "Destroying incoming operation %p\n",
203 if (NULL != (listener = op->listener))
205 GNUNET_CONTAINER_DLL_remove (listener->op_head,
210 if (NULL != op->timeout_task)
212 GNUNET_SCHEDULER_cancel (op->timeout_task);
213 op->timeout_task = NULL;
215 if (NULL != (channel = op->channel))
218 GNUNET_CADET_channel_destroy (channel);
224 * Context for the #garbage_collect_cb().
226 struct GarbageContext
230 * Map for which we are garbage collecting removed elements.
232 struct GNUNET_CONTAINER_MultiHashMap *map;
235 * Lowest generation for which an operation is still pending.
237 unsigned int min_op_generation;
240 * Largest generation for which an operation is still pending.
242 unsigned int max_op_generation;
248 * Function invoked to check if an element can be removed from
249 * the set's history because it is no longer needed.
251 * @param cls the `struct GarbageContext *`
252 * @param key key of the element in the map
253 * @param value the `struct ElementEntry *`
254 * @return #GNUNET_OK (continue to iterate)
257 garbage_collect_cb (void *cls,
258 const struct GNUNET_HashCode *key,
261 //struct GarbageContext *gc = cls;
262 //struct ElementEntry *ee = value;
264 //if (GNUNET_YES != ee->removed)
266 //if ( (gc->max_op_generation < ee->generation_added) ||
267 // (ee->generation_removed > gc->min_op_generation) )
269 // GNUNET_assert (GNUNET_YES ==
270 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
280 * Collect and destroy elements that are not needed anymore, because
281 * their lifetime (as determined by their generation) does not overlap
282 * with any active set operation.
284 * @param set set to garbage collect
287 collect_generation_garbage (struct Set *set)
289 struct GarbageContext gc;
291 gc.min_op_generation = UINT_MAX;
292 gc.max_op_generation = 0;
293 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
295 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
296 op->generation_created);
297 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
298 op->generation_created);
300 gc.map = set->content->elements;
301 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
308 * Is @a generation in the range of exclusions?
310 * @param generation generation to query
311 * @param excluded array of generations where the element is excluded
312 * @param excluded_size length of the @a excluded array
313 * @return #GNUNET_YES if @a generation is in any of the ranges
316 is_excluded_generation (unsigned int generation,
317 struct GenerationRange *excluded,
318 unsigned int excluded_size)
320 for (unsigned int i = 0; i < excluded_size; i++)
321 if ( (generation >= excluded[i].start) &&
322 (generation < excluded[i].end) )
329 * Is element @a ee part of the set during @a query_generation?
331 * @param ee element to test
332 * @param query_generation generation to query
333 * @param excluded array of generations where the element is excluded
334 * @param excluded_size length of the @a excluded array
335 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
338 is_element_of_generation (struct ElementEntry *ee,
339 unsigned int query_generation,
340 struct GenerationRange *excluded,
341 unsigned int excluded_size)
343 struct MutationEvent *mut;
346 GNUNET_assert (NULL != ee->mutations);
348 is_excluded_generation (query_generation,
356 is_present = GNUNET_NO;
358 /* Could be made faster with binary search, but lists
359 are small, so why bother. */
360 for (unsigned int i = 0; i < ee->mutations_size; i++)
362 mut = &ee->mutations[i];
364 if (mut->generation > query_generation)
366 /* The mutation doesn't apply to our generation
367 anymore. We can'b break here, since mutations aren't
368 sorted by generation. */
373 is_excluded_generation (mut->generation,
377 /* The generation is excluded (because it belongs to another
378 fork via a lazy copy) and thus mutations aren't considered
379 for membership testing. */
383 /* This would be an inconsistency in how we manage mutations. */
384 if ( (GNUNET_YES == is_present) &&
385 (GNUNET_YES == mut->added) )
388 if ( (GNUNET_NO == is_present) &&
389 (GNUNET_NO == mut->added) )
392 is_present = mut->added;
400 * Is element @a ee part of the set used by @a op?
402 * @param ee element to test
403 * @param op operation the defines the set and its generation
404 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
407 _GSS_is_element_of_operation (struct ElementEntry *ee,
408 struct Operation *op)
410 return is_element_of_generation (ee,
411 op->generation_created,
412 op->set->excluded_generations,
413 op->set->excluded_generations_size);
418 * Destroy the given operation. Used for any operation where both
419 * peers were known and that thus actually had a vt and channel. Must
420 * not be used for operations where 'listener' is still set and we do
421 * not know the other peer.
423 * Call the implementation-specific cancel function of the operation.
424 * Disconnects from the remote peer. Does not disconnect the client,
425 * as there may be multiple operations per set.
427 * @param op operation to destroy
428 * @param gc #GNUNET_YES to perform garbage collection on the set
431 _GSS_operation_destroy (struct Operation *op,
434 struct Set *set = op->set;
435 struct GNUNET_CADET_Channel *channel;
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Destroying operation %p\n",
440 GNUNET_assert (NULL == op->listener);
441 if (NULL != op->state)
443 set->vt->cancel (op);
448 GNUNET_CONTAINER_DLL_remove (set->ops_head,
453 if (NULL != op->context_msg)
455 GNUNET_free (op->context_msg);
456 op->context_msg = NULL;
458 if (NULL != (channel = op->channel))
460 /* This will free op; called conditionally as this helper function
461 is also called from within the channel disconnect handler. */
463 GNUNET_CADET_channel_destroy (channel);
465 if ( (NULL != set) &&
467 collect_generation_garbage (set);
468 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
469 * there was a channel end handler that will free 'op' on the call stack. */
474 * Callback called when a client connects to the service.
476 * @param cls closure for the service
477 * @param c the new client that connected to the service
478 * @param mq the message queue used to send messages to the client
479 * @return @a `struct ClientState`
482 client_connect_cb (void *cls,
483 struct GNUNET_SERVICE_Client *c,
484 struct GNUNET_MQ_Handle *mq)
486 struct ClientState *cs;
488 cs = GNUNET_new (struct ClientState);
496 * Iterator over hash map entries to free element entries.
499 * @param key current key code
500 * @param value a `struct ElementEntry *` to be free'd
501 * @return #GNUNET_YES (continue to iterate)
504 destroy_elements_iterator (void *cls,
505 const struct GNUNET_HashCode *key,
508 struct ElementEntry *ee = value;
510 GNUNET_free_non_null (ee->mutations);
517 * Clean up after a client has disconnected
519 * @param cls closure, unused
520 * @param client the client to clean up after
521 * @param internal_cls the `struct ClientState`
524 client_disconnect_cb (void *cls,
525 struct GNUNET_SERVICE_Client *client,
528 struct ClientState *cs = internal_cls;
529 struct Operation *op;
530 struct Listener *listener;
533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534 "Client disconnected, cleaning up\n");
535 if (NULL != (set = cs->set))
537 struct SetContent *content = set->content;
538 struct PendingMutation *pm;
539 struct PendingMutation *pm_current;
540 struct LazyCopyRequest *lcr;
542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Destroying client's set\n");
544 /* Destroy pending set operations */
545 while (NULL != set->ops_head)
546 _GSS_operation_destroy (set->ops_head,
549 /* Destroy operation-specific state */
550 GNUNET_assert (NULL != set->state);
551 set->vt->destroy_set (set->state);
554 /* Clean up ongoing iterations */
555 if (NULL != set->iter)
557 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
562 /* discard any pending mutations that reference this set */
563 pm = content->pending_mutations_head;
568 if (pm_current->set == set)
570 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
571 content->pending_mutations_tail,
573 GNUNET_free (pm_current);
577 /* free set content (or at least decrement RC) */
579 GNUNET_assert (0 != content->refcount);
581 if (0 == content->refcount)
583 GNUNET_assert (NULL != content->elements);
584 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
585 &destroy_elements_iterator,
587 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
588 content->elements = NULL;
589 GNUNET_free (content);
591 GNUNET_free_non_null (set->excluded_generations);
592 set->excluded_generations = NULL;
594 /* remove set from pending copy requests */
595 lcr = lazy_copy_head;
598 struct LazyCopyRequest *lcr_current = lcr;
601 if (lcr_current->source_set == set)
603 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
606 GNUNET_free (lcr_current);
612 if (NULL != (listener = cs->listener))
614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615 "Destroying client's listener\n");
616 GNUNET_CADET_close_port (listener->open_port);
617 listener->open_port = NULL;
618 while (NULL != (op = listener->op_head))
619 incoming_destroy (op);
620 GNUNET_CONTAINER_DLL_remove (listener_head,
623 GNUNET_free (listener);
630 * Check a request for a set operation from another peer.
632 * @param cls the operation state
633 * @param msg the received message
634 * @return #GNUNET_OK if the channel should be kept alive,
635 * #GNUNET_SYSERR to destroy the channel
638 check_incoming_msg (void *cls,
639 const struct OperationRequestMessage *msg)
641 struct Operation *op = cls;
642 struct Listener *listener = op->listener;
643 const struct GNUNET_MessageHeader *nested_context;
645 /* double operation request */
646 if (0 != op->suggest_id)
649 return GNUNET_SYSERR;
651 /* This should be equivalent to the previous condition, but can't hurt to check twice */
652 if (NULL == op->listener)
655 return GNUNET_SYSERR;
657 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
660 return GNUNET_SYSERR;
662 nested_context = GNUNET_MQ_extract_nested_mh (msg);
663 if ( (NULL != nested_context) &&
664 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
667 return GNUNET_SYSERR;
674 * Handle a request for a set operation from another peer. Checks if we
675 * have a listener waiting for such a request (and in that case initiates
676 * asking the listener about accepting the connection). If no listener
677 * is waiting, we queue the operation request in hope that a listener
678 * shows up soon (before timeout).
680 * This msg is expected as the first and only msg handled through the
681 * non-operation bound virtual table, acceptance of this operation replaces
682 * our virtual table and subsequent msgs would be routed differently (as
683 * we then know what type of operation this is).
685 * @param cls the operation state
686 * @param msg the received message
687 * @return #GNUNET_OK if the channel should be kept alive,
688 * #GNUNET_SYSERR to destroy the channel
691 handle_incoming_msg (void *cls,
692 const struct OperationRequestMessage *msg)
694 struct Operation *op = cls;
695 struct Listener *listener = op->listener;
696 const struct GNUNET_MessageHeader *nested_context;
697 struct GNUNET_MQ_Envelope *env;
698 struct GNUNET_SET_RequestMessage *cmsg;
700 nested_context = GNUNET_MQ_extract_nested_mh (msg);
701 /* Make a copy of the nested_context (application-specific context
702 information that is opaque to set) so we can pass it to the
704 if (NULL != nested_context)
705 op->context_msg = GNUNET_copy_message (nested_context);
706 op->remote_element_count = ntohl (msg->element_count);
707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708 "Received P2P operation request (op %u, port %s) for active listener\n",
709 (uint32_t) ntohl (msg->operation),
710 GNUNET_h2s (&op->listener->app_id));
711 GNUNET_assert (0 == op->suggest_id);
714 op->suggest_id = suggest_id++;
715 GNUNET_assert (NULL != op->timeout_task);
716 GNUNET_SCHEDULER_cancel (op->timeout_task);
717 op->timeout_task = NULL;
718 env = GNUNET_MQ_msg_nested_mh (cmsg,
719 GNUNET_MESSAGE_TYPE_SET_REQUEST,
721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
726 cmsg->accept_id = htonl (op->suggest_id);
727 cmsg->peer_id = op->peer;
728 GNUNET_MQ_send (listener->cs->mq,
730 /* NOTE: GNUNET_CADET_receive_done() will be called in
731 #handle_client_accept() */
736 * Add an element to @a set as specified by @a msg
738 * @param set set to manipulate
739 * @param msg message specifying the change
742 execute_add (struct Set *set,
743 const struct GNUNET_SET_ElementMessage *msg)
745 struct GNUNET_SET_Element el;
746 struct ElementEntry *ee;
747 struct GNUNET_HashCode hash;
749 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
750 el.size = ntohs (msg->header.size) - sizeof (*msg);
752 el.element_type = ntohs (msg->element_type);
753 GNUNET_SET_element_hash (&el,
755 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760 "Client inserts element %s of size %u\n",
763 ee = GNUNET_malloc (el.size + sizeof (*ee));
764 ee->element.size = el.size;
765 GNUNET_memcpy (&ee[1],
768 ee->element.data = &ee[1];
769 ee->element.element_type = el.element_type;
770 ee->remote = GNUNET_NO;
771 ee->mutations = NULL;
772 ee->mutations_size = 0;
773 ee->element_hash = hash;
774 GNUNET_break (GNUNET_YES ==
775 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
778 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
780 else if (GNUNET_YES ==
781 is_element_of_generation (ee,
782 set->current_generation,
783 set->excluded_generations,
784 set->excluded_generations_size))
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "Client inserted element %s of size %u twice (ignored)\n",
791 /* same element inserted twice */
796 struct MutationEvent mut = {
797 .generation = set->current_generation,
800 GNUNET_array_append (ee->mutations,
804 set->vt->add (set->state,
810 * Remove an element from @a set as specified by @a msg
812 * @param set set to manipulate
813 * @param msg message specifying the change
816 execute_remove (struct Set *set,
817 const struct GNUNET_SET_ElementMessage *msg)
819 struct GNUNET_SET_Element el;
820 struct ElementEntry *ee;
821 struct GNUNET_HashCode hash;
823 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
824 el.size = ntohs (msg->header.size) - sizeof (*msg);
826 el.element_type = ntohs (msg->element_type);
827 GNUNET_SET_element_hash (&el, &hash);
828 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
832 /* Client tried to remove non-existing element. */
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834 "Client removes non-existing element of size %u\n",
839 is_element_of_generation (ee,
840 set->current_generation,
841 set->excluded_generations,
842 set->excluded_generations_size))
844 /* Client tried to remove element twice */
845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846 "Client removed element of size %u twice (ignored)\n",
852 struct MutationEvent mut = {
853 .generation = set->current_generation,
857 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858 "Client removes element of size %u\n",
861 GNUNET_array_append (ee->mutations,
865 set->vt->remove (set->state,
871 * Perform a mutation on a set as specified by the @a msg
873 * @param set the set to mutate
874 * @param msg specification of what to change
877 execute_mutation (struct Set *set,
878 const struct GNUNET_SET_ElementMessage *msg)
880 switch (ntohs (msg->header.type))
882 case GNUNET_MESSAGE_TYPE_SET_ADD:
883 execute_add (set, msg);
885 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
886 execute_remove (set, msg);
895 * Execute mutations that were delayed on a set because of
896 * pending operations.
898 * @param set the set to execute mutations on
901 execute_delayed_mutations (struct Set *set)
903 struct PendingMutation *pm;
905 if (0 != set->content->iterator_count)
906 return; /* still cannot do this */
907 while (NULL != (pm = set->content->pending_mutations_head))
909 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910 set->content->pending_mutations_tail,
912 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913 "Executing pending mutation on %p.\n",
915 execute_mutation (pm->set,
917 GNUNET_free (pm->msg);
924 * Send the next element of a set to the set's client. The next element is given by
925 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
926 * are no more elements in the set. The caller must ensure that the set's iterator is
929 * The client will acknowledge each received element with a
930 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
931 * #handle_client_iter_ack() will then trigger the next transmission.
932 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
934 * @param set set that should send its next element to its client
937 send_client_element (struct Set *set)
940 struct ElementEntry *ee;
941 struct GNUNET_MQ_Envelope *ev;
942 struct GNUNET_SET_IterResponseMessage *msg;
944 GNUNET_assert (NULL != set->iter);
946 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
948 (const void **) &ee);
949 if (GNUNET_NO == ret)
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Iteration on %p done.\n",
954 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
955 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
958 GNUNET_assert (set->content->iterator_count > 0);
959 set->content->iterator_count--;
960 execute_delayed_mutations (set);
961 GNUNET_MQ_send (set->cs->mq,
965 GNUNET_assert (NULL != ee);
966 } while (GNUNET_NO ==
967 is_element_of_generation (ee,
968 set->iter_generation,
969 set->excluded_generations,
970 set->excluded_generations_size));
971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972 "Sending iteration element on %p.\n",
974 ev = GNUNET_MQ_msg_extra (msg,
976 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
977 GNUNET_memcpy (&msg[1],
980 msg->element_type = htons (ee->element.element_type);
981 msg->iteration_id = htons (set->iteration_id);
982 GNUNET_MQ_send (set->cs->mq,
988 * Called when a client wants to iterate the elements of a set.
989 * Checks if we have a set associated with the client and if we
990 * can right now start an iteration. If all checks out, starts
991 * sending the elements of the set to the client.
993 * @param cls client that sent the message
994 * @param m message sent by the client
997 handle_client_iterate (void *cls,
998 const struct GNUNET_MessageHeader *m)
1000 struct ClientState *cs = cls;
1003 if (NULL == (set = cs->set))
1005 /* attempt to iterate over a non existing set */
1007 GNUNET_SERVICE_client_drop (cs->client);
1010 if (NULL != set->iter)
1012 /* Only one concurrent iterate-action allowed per set */
1014 GNUNET_SERVICE_client_drop (cs->client);
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018 "Iterating set %p in gen %u with %u content elements\n",
1020 set->current_generation,
1021 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1022 GNUNET_SERVICE_client_continue (cs->client);
1023 set->content->iterator_count++;
1024 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1025 set->iter_generation = set->current_generation;
1026 send_client_element (set);
1031 * Called when a client wants to create a new set. This is typically
1032 * the first request from a client, and includes the type of set
1033 * operation to be performed.
1035 * @param cls client that sent the message
1036 * @param m message sent by the client
1039 handle_client_create_set (void *cls,
1040 const struct GNUNET_SET_CreateMessage *msg)
1042 struct ClientState *cs = cls;
1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046 "Client created new set (operation %u)\n",
1047 (uint32_t) ntohl (msg->operation));
1048 if (NULL != cs->set)
1050 /* There can only be one set per client */
1052 GNUNET_SERVICE_client_drop (cs->client);
1055 set = GNUNET_new (struct Set);
1056 switch (ntohl (msg->operation))
1058 case GNUNET_SET_OPERATION_INTERSECTION:
1059 set->vt = _GSS_intersection_vt ();
1061 case GNUNET_SET_OPERATION_UNION:
1062 set->vt = _GSS_union_vt ();
1067 GNUNET_SERVICE_client_drop (cs->client);
1070 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1071 set->state = set->vt->create ();
1072 if (NULL == set->state)
1074 /* initialization failed (i.e. out of memory) */
1076 GNUNET_SERVICE_client_drop (cs->client);
1079 set->content = GNUNET_new (struct SetContent);
1080 set->content->refcount = 1;
1081 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1085 GNUNET_SERVICE_client_continue (cs->client);
1090 * Timeout happens iff:
1091 * - we suggested an operation to our listener,
1092 * but did not receive a response in time
1093 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1095 * @param cls channel context
1096 * @param tc context information (why was this task triggered now)
1099 incoming_timeout_cb (void *cls)
1101 struct Operation *op = cls;
1103 op->timeout_task = NULL;
1104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105 "Remote peer's incoming request timed out\n");
1106 incoming_destroy (op);
1111 * Method called whenever another peer has added us to a channel the
1112 * other peer initiated. Only called (once) upon reception of data
1113 * from a channel we listen on.
1115 * The channel context represents the operation itself and gets added
1116 * to a DLL, from where it gets looked up when our local listener
1117 * client responds to a proposed/suggested operation or connects and
1118 * associates with this operation.
1120 * @param cls closure
1121 * @param channel new handle to the channel
1122 * @param source peer that started the channel
1123 * @return initial channel context for the channel
1124 * returns NULL on error
1127 channel_new_cb (void *cls,
1128 struct GNUNET_CADET_Channel *channel,
1129 const struct GNUNET_PeerIdentity *source)
1131 struct Listener *listener = cls;
1132 struct Operation *op;
1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135 "New incoming channel\n");
1136 op = GNUNET_new (struct Operation);
1137 op->listener = listener;
1139 op->channel = channel;
1140 op->mq = GNUNET_CADET_get_mq (op->channel);
1141 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1144 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1145 &incoming_timeout_cb,
1147 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1155 * Function called whenever a channel is destroyed. Should clean up
1156 * any associated state. It must NOT call
1157 * GNUNET_CADET_channel_destroy() on the channel.
1159 * The peer_disconnect function is part of a a virtual table set initially either
1160 * when a peer creates a new channel with us, or once we create
1161 * a new channel ourselves (evaluate).
1163 * Once we know the exact type of operation (union/intersection), the vt is
1164 * replaced with an operation specific instance (_GSS_[op]_vt).
1166 * @param channel_ctx place where local state associated
1167 * with the channel is stored
1168 * @param channel connection to the other end (henceforth invalid)
1171 channel_end_cb (void *channel_ctx,
1172 const struct GNUNET_CADET_Channel *channel)
1174 struct Operation *op = channel_ctx;
1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177 "channel_end_cb called\n");
1179 if (NULL != op->listener)
1180 incoming_destroy (op);
1181 else if (NULL != op->set)
1182 op->set->vt->channel_death (op);
1184 _GSS_operation_destroy (op,
1191 * Function called whenever an MQ-channel's transmission window size changes.
1193 * The first callback in an outgoing channel will be with a non-zero value
1194 * and will mean the channel is connected to the destination.
1196 * For an incoming channel it will be called immediately after the
1197 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1199 * @param cls Channel closure.
1200 * @param channel Connection to the other end (henceforth invalid).
1201 * @param window_size New window size. If the is more messages than buffer size
1202 * this value will be negative..
1205 channel_window_cb (void *cls,
1206 const struct GNUNET_CADET_Channel *channel,
1209 /* FIXME: not implemented, we could do flow control here... */
1214 * Called when a client wants to create a new listener.
1216 * @param cls client that sent the message
1217 * @param msg message sent by the client
1220 handle_client_listen (void *cls,
1221 const struct GNUNET_SET_ListenMessage *msg)
1223 struct ClientState *cs = cls;
1224 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1225 GNUNET_MQ_hd_var_size (incoming_msg,
1226 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1227 struct OperationRequestMessage,
1229 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1230 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1233 GNUNET_MQ_hd_var_size (union_p2p_elements,
1234 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1235 struct GNUNET_SET_ElementMessage,
1237 GNUNET_MQ_hd_var_size (union_p2p_offer,
1238 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1239 struct GNUNET_MessageHeader,
1241 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1242 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1243 struct InquiryMessage,
1245 GNUNET_MQ_hd_var_size (union_p2p_demand,
1246 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1247 struct GNUNET_MessageHeader,
1249 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1250 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1251 struct GNUNET_MessageHeader,
1253 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1254 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1255 struct GNUNET_MessageHeader,
1257 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1258 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1259 struct GNUNET_MessageHeader,
1261 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1262 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1263 struct GNUNET_MessageHeader,
1265 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1266 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1267 struct StrataEstimatorMessage,
1269 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1270 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1271 struct StrataEstimatorMessage,
1273 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1274 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1275 struct GNUNET_SET_ElementMessage,
1277 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1278 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1279 struct IntersectionElementInfoMessage,
1281 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1282 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1285 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1286 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1287 struct IntersectionDoneMessage,
1289 GNUNET_MQ_handler_end ()
1291 struct Listener *listener;
1293 if (NULL != cs->listener)
1295 /* max. one active listener per client! */
1297 GNUNET_SERVICE_client_drop (cs->client);
1300 listener = GNUNET_new (struct Listener);
1302 listener->app_id = msg->app_id;
1303 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1304 GNUNET_CONTAINER_DLL_insert (listener_head,
1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308 "New listener created (op %u, port %s)\n",
1309 listener->operation,
1310 GNUNET_h2s (&listener->app_id));
1312 = GNUNET_CADET_open_port (cadet,
1319 GNUNET_SERVICE_client_continue (cs->client);
1324 * Called when the listening client rejects an operation
1325 * request by another peer.
1327 * @param cls client that sent the message
1328 * @param msg message sent by the client
1331 handle_client_reject (void *cls,
1332 const struct GNUNET_SET_RejectMessage *msg)
1334 struct ClientState *cs = cls;
1335 struct Operation *op;
1337 op = get_incoming (ntohl (msg->accept_reject_id));
1340 /* no matching incoming operation for this reject;
1341 could be that the other peer already disconnected... */
1342 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1343 "Client rejected unknown operation %u\n",
1344 (unsigned int) ntohl (msg->accept_reject_id));
1345 GNUNET_SERVICE_client_continue (cs->client);
1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349 "Peer request (op %u, app %s) rejected by client\n",
1350 op->listener->operation,
1351 GNUNET_h2s (&cs->listener->app_id));
1352 GNUNET_CADET_channel_destroy (op->channel);
1353 GNUNET_SERVICE_client_continue (cs->client);
1358 * Called when a client wants to add or remove an element to a set it inhabits.
1360 * @param cls client that sent the message
1361 * @param msg message sent by the client
1364 check_client_mutation (void *cls,
1365 const struct GNUNET_SET_ElementMessage *msg)
1367 /* NOTE: Technically, we should probably check with the
1368 block library whether the element we are given is well-formed */
1374 * Called when a client wants to add or remove an element to a set it inhabits.
1376 * @param cls client that sent the message
1377 * @param msg message sent by the client
1380 handle_client_mutation (void *cls,
1381 const struct GNUNET_SET_ElementMessage *msg)
1383 struct ClientState *cs = cls;
1386 if (NULL == (set = cs->set))
1388 /* client without a set requested an operation */
1390 GNUNET_SERVICE_client_drop (cs->client);
1393 GNUNET_SERVICE_client_continue (cs->client);
1395 if (0 != set->content->iterator_count)
1397 struct PendingMutation *pm;
1399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1400 "Scheduling mutation on set\n");
1401 pm = GNUNET_new (struct PendingMutation);
1402 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1404 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1405 set->content->pending_mutations_tail,
1409 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410 "Executing mutation on set\n");
1411 execute_mutation (set,
1417 * Advance the current generation of a set,
1418 * adding exclusion ranges if necessary.
1420 * @param set the set where we want to advance the generation
1423 advance_generation (struct Set *set)
1425 struct GenerationRange r;
1427 if (set->current_generation == set->content->latest_generation)
1429 set->content->latest_generation++;
1430 set->current_generation++;
1434 GNUNET_assert (set->current_generation < set->content->latest_generation);
1436 r.start = set->current_generation + 1;
1437 r.end = set->content->latest_generation + 1;
1438 set->content->latest_generation = r.end;
1439 set->current_generation = r.end;
1440 GNUNET_array_append (set->excluded_generations,
1441 set->excluded_generations_size,
1447 * Called when a client wants to initiate a set operation with another
1448 * peer. Initiates the CADET connection to the listener and sends the
1451 * @param cls client that sent the message
1452 * @param msg message sent by the client
1453 * @return #GNUNET_OK if the message is well-formed
1456 check_client_evaluate (void *cls,
1457 const struct GNUNET_SET_EvaluateMessage *msg)
1459 /* FIXME: suboptimal, even if the context below could be NULL,
1460 there are malformed messages this does not check for... */
1466 * Called when a client wants to initiate a set operation with another
1467 * peer. Initiates the CADET connection to the listener and sends the
1470 * @param cls client that sent the message
1471 * @param msg message sent by the client
1474 handle_client_evaluate (void *cls,
1475 const struct GNUNET_SET_EvaluateMessage *msg)
1477 struct ClientState *cs = cls;
1478 struct Operation *op = GNUNET_new (struct Operation);
1479 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1480 GNUNET_MQ_hd_var_size (incoming_msg,
1481 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1482 struct OperationRequestMessage,
1484 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1485 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1488 GNUNET_MQ_hd_var_size (union_p2p_elements,
1489 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1490 struct GNUNET_SET_ElementMessage,
1492 GNUNET_MQ_hd_var_size (union_p2p_offer,
1493 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1494 struct GNUNET_MessageHeader,
1496 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1497 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1498 struct InquiryMessage,
1500 GNUNET_MQ_hd_var_size (union_p2p_demand,
1501 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1502 struct GNUNET_MessageHeader,
1504 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1505 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1506 struct GNUNET_MessageHeader,
1508 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1509 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1510 struct GNUNET_MessageHeader,
1512 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1513 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1514 struct GNUNET_MessageHeader,
1516 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1517 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1518 struct GNUNET_MessageHeader,
1520 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1521 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1522 struct StrataEstimatorMessage,
1524 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1525 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1526 struct StrataEstimatorMessage,
1528 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1529 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1530 struct GNUNET_SET_ElementMessage,
1532 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1533 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1534 struct IntersectionElementInfoMessage,
1536 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1537 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1540 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1541 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1542 struct IntersectionDoneMessage,
1544 GNUNET_MQ_handler_end ()
1547 const struct GNUNET_MessageHeader *context;
1549 if (NULL == (set = cs->set))
1553 GNUNET_SERVICE_client_drop (cs->client);
1556 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1558 op->peer = msg->target_peer;
1559 op->result_mode = ntohl (msg->result_mode);
1560 op->client_request_id = ntohl (msg->request_id);
1561 op->byzantine = msg->byzantine;
1562 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1563 op->force_full = msg->force_full;
1564 op->force_delta = msg->force_delta;
1565 context = GNUNET_MQ_extract_nested_mh (msg);
1567 /* Advance generation values, so that
1568 mutations won't interfer with the running operation. */
1570 op->generation_created = set->current_generation;
1571 advance_generation (set);
1572 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1575 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576 "Creating new CADET channel to port %s for set operation type %u\n",
1577 GNUNET_h2s (&msg->app_id),
1579 op->channel = GNUNET_CADET_channel_create (cadet,
1583 GNUNET_CADET_OPTION_RELIABLE,
1587 op->mq = GNUNET_CADET_get_mq (op->channel);
1588 op->state = set->vt->evaluate (op,
1590 if (NULL == op->state)
1593 GNUNET_SERVICE_client_drop (cs->client);
1596 GNUNET_SERVICE_client_continue (cs->client);
1601 * Handle an ack from a client, and send the next element. Note
1602 * that we only expect acks for set elements, not after the
1603 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1605 * @param cls client the client
1606 * @param ack the message
1609 handle_client_iter_ack (void *cls,
1610 const struct GNUNET_SET_IterAckMessage *ack)
1612 struct ClientState *cs = cls;
1615 if (NULL == (set = cs->set))
1617 /* client without a set acknowledged receiving a value */
1619 GNUNET_SERVICE_client_drop (cs->client);
1622 if (NULL == set->iter)
1624 /* client sent an ack, but we were not expecting one (as
1625 set iteration has finished) */
1627 GNUNET_SERVICE_client_drop (cs->client);
1630 GNUNET_SERVICE_client_continue (cs->client);
1631 if (ntohl (ack->send_more))
1633 send_client_element (set);
1637 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1639 set->iteration_id++;
1645 * Handle a request from the client to copy a set.
1647 * @param cls the client
1648 * @param mh the message
1651 handle_client_copy_lazy_prepare (void *cls,
1652 const struct GNUNET_MessageHeader *mh)
1654 struct ClientState *cs = cls;
1656 struct LazyCopyRequest *cr;
1657 struct GNUNET_MQ_Envelope *ev;
1658 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1660 if (NULL == (set = cs->set))
1662 /* client without a set requested an operation */
1664 GNUNET_SERVICE_client_drop (cs->client);
1667 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1668 "Client requested creation of lazy copy\n");
1669 cr = GNUNET_new (struct LazyCopyRequest);
1670 cr->cookie = ++lazy_copy_cookie;
1671 cr->source_set = set;
1672 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1675 ev = GNUNET_MQ_msg (resp_msg,
1676 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1677 resp_msg->cookie = cr->cookie;
1678 GNUNET_MQ_send (set->cs->mq,
1680 GNUNET_SERVICE_client_continue (cs->client);
1685 * Handle a request from the client to connect to a copy of a set.
1687 * @param cls the client
1688 * @param msg the message
1691 handle_client_copy_lazy_connect (void *cls,
1692 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1694 struct ClientState *cs = cls;
1695 struct LazyCopyRequest *cr;
1699 if (NULL != cs->set)
1701 /* There can only be one set per client */
1703 GNUNET_SERVICE_client_drop (cs->client);
1707 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1709 if (cr->cookie == msg->cookie)
1715 if (GNUNET_NO == found)
1717 /* client asked for copy with cookie we don't know */
1719 GNUNET_SERVICE_client_drop (cs->client);
1722 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726 "Client %p requested use of lazy copy\n",
1728 set = GNUNET_new (struct Set);
1729 switch (cr->source_set->operation)
1731 case GNUNET_SET_OPERATION_INTERSECTION:
1732 set->vt = _GSS_intersection_vt ();
1734 case GNUNET_SET_OPERATION_UNION:
1735 set->vt = _GSS_union_vt ();
1742 if (NULL == set->vt->copy_state)
1744 /* Lazy copy not supported for this set operation */
1748 GNUNET_SERVICE_client_drop (cs->client);
1752 set->operation = cr->source_set->operation;
1753 set->state = set->vt->copy_state (cr->source_set->state);
1754 set->content = cr->source_set->content;
1755 set->content->refcount++;
1757 set->current_generation = cr->source_set->current_generation;
1758 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1759 set->excluded_generations
1760 = GNUNET_memdup (cr->source_set->excluded_generations,
1761 set->excluded_generations_size * sizeof (struct GenerationRange));
1763 /* Advance the generation of the new set, so that mutations to the
1764 of the cloned set and the source set are independent. */
1765 advance_generation (set);
1769 GNUNET_SERVICE_client_continue (cs->client);
1774 * Handle a request from the client to cancel a running set operation.
1776 * @param cls the client
1777 * @param msg the message
1780 handle_client_cancel (void *cls,
1781 const struct GNUNET_SET_CancelMessage *msg)
1783 struct ClientState *cs = cls;
1785 struct Operation *op;
1788 if (NULL == (set = cs->set))
1790 /* client without a set requested an operation */
1792 GNUNET_SERVICE_client_drop (cs->client);
1796 for (op = set->ops_head; NULL != op; op = op->next)
1798 if (op->client_request_id == ntohl (msg->request_id))
1804 if (GNUNET_NO == found)
1806 /* It may happen that the operation was already destroyed due to
1807 * the other peer disconnecting. The client may not know about this
1808 * yet and try to cancel the (just barely non-existent) operation.
1809 * So this is not a hard error.
1811 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1812 "Client canceled non-existent op %u\n",
1813 (uint32_t) ntohl (msg->request_id));
1817 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818 "Client requested cancel for op %u\n",
1819 (uint32_t) ntohl (msg->request_id));
1820 _GSS_operation_destroy (op,
1823 GNUNET_SERVICE_client_continue (cs->client);
1828 * Handle a request from the client to accept a set operation that
1829 * came from a remote peer. We forward the accept to the associated
1830 * operation for handling
1832 * @param cls the client
1833 * @param msg the message
1836 handle_client_accept (void *cls,
1837 const struct GNUNET_SET_AcceptMessage *msg)
1839 struct ClientState *cs = cls;
1841 struct Operation *op;
1842 struct GNUNET_SET_ResultMessage *result_message;
1843 struct GNUNET_MQ_Envelope *ev;
1844 struct Listener *listener;
1846 if (NULL == (set = cs->set))
1848 /* client without a set requested to accept */
1850 GNUNET_SERVICE_client_drop (cs->client);
1853 op = get_incoming (ntohl (msg->accept_reject_id));
1856 /* It is not an error if the set op does not exist -- it may
1857 * have been destroyed when the partner peer disconnected. */
1858 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1859 "Client %p accepted request %u of listener %p that is no longer active\n",
1861 ntohl (msg->accept_reject_id),
1863 ev = GNUNET_MQ_msg (result_message,
1864 GNUNET_MESSAGE_TYPE_SET_RESULT);
1865 result_message->request_id = msg->request_id;
1866 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1867 GNUNET_MQ_send (set->cs->mq,
1869 GNUNET_SERVICE_client_continue (cs->client);
1872 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1873 "Client accepting request %u\n",
1874 (uint32_t) ntohl (msg->accept_reject_id));
1875 listener = op->listener;
1876 op->listener = NULL;
1877 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1881 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1884 op->client_request_id = ntohl (msg->request_id);
1885 op->result_mode = ntohl (msg->result_mode);
1886 op->byzantine = msg->byzantine;
1887 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1888 op->force_full = msg->force_full;
1889 op->force_delta = msg->force_delta;
1891 /* Advance generation values, so that future mutations do not
1892 interfer with the running operation. */
1893 op->generation_created = set->current_generation;
1894 advance_generation (set);
1895 GNUNET_assert (NULL == op->state);
1896 op->state = set->vt->accept (op);
1897 if (NULL == op->state)
1900 GNUNET_SERVICE_client_drop (cs->client);
1903 /* Now allow CADET to continue, as we did not do this in
1904 #handle_incoming_msg (as we wanted to first see if the
1905 local client would accept the request). */
1906 GNUNET_CADET_receive_done (op->channel);
1907 GNUNET_SERVICE_client_continue (cs->client);
1912 * Called to clean up, after a shutdown has been requested.
1914 * @param cls closure, NULL
1917 shutdown_task (void *cls)
1919 /* Delay actual shutdown to allow service to disconnect clients */
1922 GNUNET_CADET_disconnect (cadet);
1925 GNUNET_STATISTICS_destroy (_GSS_statistics,
1927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928 "handled shutdown request\n");
1933 * Function called by the service's run
1934 * method to run service-specific setup code.
1936 * @param cls closure
1937 * @param cfg configuration to use
1938 * @param service the initialized service
1942 const struct GNUNET_CONFIGURATION_Handle *cfg,
1943 struct GNUNET_SERVICE_Handle *service)
1945 /* FIXME: need to modify SERVICE (!) API to allow
1946 us to run a shutdown task *after* clients were
1947 forcefully disconnected! */
1948 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1950 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1952 cadet = GNUNET_CADET_connect (cfg);
1955 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1956 _("Could not connect to CADET service\n"));
1957 GNUNET_SCHEDULER_shutdown ();
1964 * Define "main" method using service macro.
1968 GNUNET_SERVICE_OPTION_NONE,
1971 &client_disconnect_cb,
1973 GNUNET_MQ_hd_fixed_size (client_accept,
1974 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1975 struct GNUNET_SET_AcceptMessage,
1977 GNUNET_MQ_hd_fixed_size (client_iter_ack,
1978 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1979 struct GNUNET_SET_IterAckMessage,
1981 GNUNET_MQ_hd_var_size (client_mutation,
1982 GNUNET_MESSAGE_TYPE_SET_ADD,
1983 struct GNUNET_SET_ElementMessage,
1985 GNUNET_MQ_hd_fixed_size (client_create_set,
1986 GNUNET_MESSAGE_TYPE_SET_CREATE,
1987 struct GNUNET_SET_CreateMessage,
1989 GNUNET_MQ_hd_fixed_size (client_iterate,
1990 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1991 struct GNUNET_MessageHeader,
1993 GNUNET_MQ_hd_var_size (client_evaluate,
1994 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1995 struct GNUNET_SET_EvaluateMessage,
1997 GNUNET_MQ_hd_fixed_size (client_listen,
1998 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1999 struct GNUNET_SET_ListenMessage,
2001 GNUNET_MQ_hd_fixed_size (client_reject,
2002 GNUNET_MESSAGE_TYPE_SET_REJECT,
2003 struct GNUNET_SET_RejectMessage,
2005 GNUNET_MQ_hd_var_size (client_mutation,
2006 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2007 struct GNUNET_SET_ElementMessage,
2009 GNUNET_MQ_hd_fixed_size (client_cancel,
2010 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2011 struct GNUNET_SET_CancelMessage,
2013 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2014 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2015 struct GNUNET_MessageHeader,
2017 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2018 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2019 struct GNUNET_SET_CopyLazyConnectMessage,
2021 GNUNET_MQ_handler_end ());
2024 /* end of gnunet-service-set.c */