2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file set/gnunet-service-set.c
20 * @brief two-peer set operations
21 * @author Florian Dold
22 * @author Christian Grothoff
24 #include "gnunet-service-set.h"
25 #include "gnunet-service-set_union.h"
26 #include "gnunet-service-set_intersection.h"
27 #include "gnunet-service-set_protocol.h"
28 #include "gnunet_statistics_service.h"
31 * How long do we hold on to an incoming channel if there is
32 * no local listener before giving up?
34 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38 * Lazy copy requests made by a client.
40 struct LazyCopyRequest
45 struct LazyCopyRequest *prev;
50 struct LazyCopyRequest *next;
53 * Which set are we supposed to copy?
55 struct Set *source_set;
58 * Cookie identifying the request.
66 * A listener is inhabited by a client, and waits for evaluation
67 * requests from remote peers.
72 * Listeners are held in a doubly linked list.
74 struct Listener *next;
77 * Listeners are held in a doubly linked list.
79 struct Listener *prev;
82 * Head of DLL of operations this listener is responsible for.
83 * Once the client has accepted/declined the operation, the
84 * operation is moved to the respective set's operation DLLS.
86 struct Operation *op_head;
89 * Tail of DLL of operations this listener is responsible for.
90 * Once the client has accepted/declined the operation, the
91 * operation is moved to the respective set's operation DLLS.
93 struct Operation *op_tail;
96 * Client that owns the listener.
97 * Only one client may own a listener.
99 struct ClientState *cs;
102 * The port we are listening on with CADET.
104 struct GNUNET_CADET_Port *open_port;
107 * Application ID for the operation, used to distinguish
108 * multiple operations of the same type with the same peer.
110 struct GNUNET_HashCode app_id;
113 * The type of the operation.
115 enum GNUNET_SET_OperationType operation;
120 * Handle to the cadet service, used to listen for and connect to
123 static struct GNUNET_CADET_Handle *cadet;
126 * DLL of lazy copy requests by this client.
128 static struct LazyCopyRequest *lazy_copy_head;
131 * DLL of lazy copy requests by this client.
133 static struct LazyCopyRequest *lazy_copy_tail;
136 * Generator for unique cookie we set per lazy copy request.
138 static uint32_t lazy_copy_cookie;
143 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146 * Listeners are held in a doubly linked list.
148 static struct Listener *listener_head;
151 * Listeners are held in a doubly linked list.
153 static struct Listener *listener_tail;
156 * Number of active clients.
158 static unsigned int num_clients;
161 * Are we in shutdown? if #GNUNET_YES and the number of clients
162 * drops to zero, disconnect from CADET.
164 static int in_shutdown;
167 * Counter for allocating unique IDs for clients, used to identify
168 * incoming operation requests from remote peers, that the client can
169 * choose to accept or refuse. 0 must not be used (reserved for
172 static uint32_t suggest_id;
176 * Get the incoming socket associated with the given id.
178 * @param listener the listener to look in
179 * @param id id to look for
180 * @return the incoming socket associated with the id,
181 * or NULL if there is none
183 static struct Operation *
184 get_incoming (uint32_t id)
186 for (struct Listener *listener = listener_head;
188 listener = listener->next)
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
199 * Destroy an incoming request from a remote peer
201 * @param op remote request to destroy
204 incoming_destroy (struct Operation *op)
206 struct Listener *listener;
207 struct GNUNET_CADET_Channel *channel;
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Destroying incoming operation %p\n",
212 if (NULL != (listener = op->listener))
214 GNUNET_CONTAINER_DLL_remove (listener->op_head,
219 if (NULL != op->timeout_task)
221 GNUNET_SCHEDULER_cancel (op->timeout_task);
222 op->timeout_task = NULL;
224 if (NULL != (channel = op->channel))
227 GNUNET_CADET_channel_destroy (channel);
233 * Context for the #garbage_collect_cb().
235 struct GarbageContext
239 * Map for which we are garbage collecting removed elements.
241 struct GNUNET_CONTAINER_MultiHashMap *map;
244 * Lowest generation for which an operation is still pending.
246 unsigned int min_op_generation;
249 * Largest generation for which an operation is still pending.
251 unsigned int max_op_generation;
257 * Function invoked to check if an element can be removed from
258 * the set's history because it is no longer needed.
260 * @param cls the `struct GarbageContext *`
261 * @param key key of the element in the map
262 * @param value the `struct ElementEntry *`
263 * @return #GNUNET_OK (continue to iterate)
266 garbage_collect_cb (void *cls,
267 const struct GNUNET_HashCode *key,
270 //struct GarbageContext *gc = cls;
271 //struct ElementEntry *ee = value;
273 //if (GNUNET_YES != ee->removed)
275 //if ( (gc->max_op_generation < ee->generation_added) ||
276 // (ee->generation_removed > gc->min_op_generation) )
278 // GNUNET_assert (GNUNET_YES ==
279 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
289 * Collect and destroy elements that are not needed anymore, because
290 * their lifetime (as determined by their generation) does not overlap
291 * with any active set operation.
293 * @param set set to garbage collect
296 collect_generation_garbage (struct Set *set)
298 struct GarbageContext gc;
300 gc.min_op_generation = UINT_MAX;
301 gc.max_op_generation = 0;
302 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
304 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
305 op->generation_created);
306 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
307 op->generation_created);
309 gc.map = set->content->elements;
310 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
317 * Is @a generation in the range of exclusions?
319 * @param generation generation to query
320 * @param excluded array of generations where the element is excluded
321 * @param excluded_size length of the @a excluded array
322 * @return #GNUNET_YES if @a generation is in any of the ranges
325 is_excluded_generation (unsigned int generation,
326 struct GenerationRange *excluded,
327 unsigned int excluded_size)
329 for (unsigned int i = 0; i < excluded_size; i++)
330 if ( (generation >= excluded[i].start) &&
331 (generation < excluded[i].end) )
338 * Is element @a ee part of the set during @a query_generation?
340 * @param ee element to test
341 * @param query_generation generation to query
342 * @param excluded array of generations where the element is excluded
343 * @param excluded_size length of the @a excluded array
344 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
347 is_element_of_generation (struct ElementEntry *ee,
348 unsigned int query_generation,
349 struct GenerationRange *excluded,
350 unsigned int excluded_size)
352 struct MutationEvent *mut;
355 GNUNET_assert (NULL != ee->mutations);
357 is_excluded_generation (query_generation,
365 is_present = GNUNET_NO;
367 /* Could be made faster with binary search, but lists
368 are small, so why bother. */
369 for (unsigned int i = 0; i < ee->mutations_size; i++)
371 mut = &ee->mutations[i];
373 if (mut->generation > query_generation)
375 /* The mutation doesn't apply to our generation
376 anymore. We can'b break here, since mutations aren't
377 sorted by generation. */
382 is_excluded_generation (mut->generation,
386 /* The generation is excluded (because it belongs to another
387 fork via a lazy copy) and thus mutations aren't considered
388 for membership testing. */
392 /* This would be an inconsistency in how we manage mutations. */
393 if ( (GNUNET_YES == is_present) &&
394 (GNUNET_YES == mut->added) )
397 if ( (GNUNET_NO == is_present) &&
398 (GNUNET_NO == mut->added) )
401 is_present = mut->added;
409 * Is element @a ee part of the set used by @a op?
411 * @param ee element to test
412 * @param op operation the defines the set and its generation
413 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
416 _GSS_is_element_of_operation (struct ElementEntry *ee,
417 struct Operation *op)
419 return is_element_of_generation (ee,
420 op->generation_created,
421 op->set->excluded_generations,
422 op->set->excluded_generations_size);
427 * Destroy the given operation. Used for any operation where both
428 * peers were known and that thus actually had a vt and channel. Must
429 * not be used for operations where 'listener' is still set and we do
430 * not know the other peer.
432 * Call the implementation-specific cancel function of the operation.
433 * Disconnects from the remote peer. Does not disconnect the client,
434 * as there may be multiple operations per set.
436 * @param op operation to destroy
437 * @param gc #GNUNET_YES to perform garbage collection on the set
440 _GSS_operation_destroy (struct Operation *op,
443 struct Set *set = op->set;
444 struct GNUNET_CADET_Channel *channel;
446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447 "Destroying operation %p\n",
449 GNUNET_assert (NULL == op->listener);
450 if (NULL != op->state)
452 set->vt->cancel (op);
457 GNUNET_CONTAINER_DLL_remove (set->ops_head,
462 if (NULL != op->context_msg)
464 GNUNET_free (op->context_msg);
465 op->context_msg = NULL;
467 if (NULL != (channel = op->channel))
469 /* This will free op; called conditionally as this helper function
470 is also called from within the channel disconnect handler. */
472 GNUNET_CADET_channel_destroy (channel);
474 if ( (NULL != set) &&
476 collect_generation_garbage (set);
477 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
478 * there was a channel end handler that will free 'op' on the call stack. */
483 * Callback called when a client connects to the service.
485 * @param cls closure for the service
486 * @param c the new client that connected to the service
487 * @param mq the message queue used to send messages to the client
488 * @return @a `struct ClientState`
491 client_connect_cb (void *cls,
492 struct GNUNET_SERVICE_Client *c,
493 struct GNUNET_MQ_Handle *mq)
495 struct ClientState *cs;
498 cs = GNUNET_new (struct ClientState);
506 * Iterator over hash map entries to free element entries.
509 * @param key current key code
510 * @param value a `struct ElementEntry *` to be free'd
511 * @return #GNUNET_YES (continue to iterate)
514 destroy_elements_iterator (void *cls,
515 const struct GNUNET_HashCode *key,
518 struct ElementEntry *ee = value;
520 GNUNET_free_non_null (ee->mutations);
527 * Clean up after a client has disconnected
529 * @param cls closure, unused
530 * @param client the client to clean up after
531 * @param internal_cls the `struct ClientState`
534 client_disconnect_cb (void *cls,
535 struct GNUNET_SERVICE_Client *client,
538 struct ClientState *cs = internal_cls;
539 struct Operation *op;
540 struct Listener *listener;
543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544 "Client disconnected, cleaning up\n");
545 if (NULL != (set = cs->set))
547 struct SetContent *content = set->content;
548 struct PendingMutation *pm;
549 struct PendingMutation *pm_current;
550 struct LazyCopyRequest *lcr;
552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553 "Destroying client's set\n");
554 /* Destroy pending set operations */
555 while (NULL != set->ops_head)
556 _GSS_operation_destroy (set->ops_head,
559 /* Destroy operation-specific state */
560 GNUNET_assert (NULL != set->state);
561 set->vt->destroy_set (set->state);
564 /* Clean up ongoing iterations */
565 if (NULL != set->iter)
567 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
572 /* discard any pending mutations that reference this set */
573 pm = content->pending_mutations_head;
578 if (pm_current->set == set)
580 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
581 content->pending_mutations_tail,
583 GNUNET_free (pm_current);
587 /* free set content (or at least decrement RC) */
589 GNUNET_assert (0 != content->refcount);
591 if (0 == content->refcount)
593 GNUNET_assert (NULL != content->elements);
594 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
595 &destroy_elements_iterator,
597 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
598 content->elements = NULL;
599 GNUNET_free (content);
601 GNUNET_free_non_null (set->excluded_generations);
602 set->excluded_generations = NULL;
604 /* remove set from pending copy requests */
605 lcr = lazy_copy_head;
608 struct LazyCopyRequest *lcr_current = lcr;
611 if (lcr_current->source_set == set)
613 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
616 GNUNET_free (lcr_current);
622 if (NULL != (listener = cs->listener))
624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625 "Destroying client's listener\n");
626 GNUNET_CADET_close_port (listener->open_port);
627 listener->open_port = NULL;
628 while (NULL != (op = listener->op_head))
630 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
631 "Destroying incoming operation `%u' from peer `%s'\n",
632 (unsigned int) op->client_request_id,
633 GNUNET_i2s (&op->peer));
634 incoming_destroy (op);
636 GNUNET_CONTAINER_DLL_remove (listener_head,
639 GNUNET_free (listener);
643 if ( (GNUNET_YES == in_shutdown) &&
648 GNUNET_CADET_disconnect (cadet);
656 * Check a request for a set operation from another peer.
658 * @param cls the operation state
659 * @param msg the received message
660 * @return #GNUNET_OK if the channel should be kept alive,
661 * #GNUNET_SYSERR to destroy the channel
664 check_incoming_msg (void *cls,
665 const struct OperationRequestMessage *msg)
667 struct Operation *op = cls;
668 struct Listener *listener = op->listener;
669 const struct GNUNET_MessageHeader *nested_context;
671 /* double operation request */
672 if (0 != op->suggest_id)
675 return GNUNET_SYSERR;
677 /* This should be equivalent to the previous condition, but can't hurt to check twice */
678 if (NULL == op->listener)
681 return GNUNET_SYSERR;
683 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
686 return GNUNET_SYSERR;
688 nested_context = GNUNET_MQ_extract_nested_mh (msg);
689 if ( (NULL != nested_context) &&
690 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
693 return GNUNET_SYSERR;
700 * Handle a request for a set operation from another peer. Checks if we
701 * have a listener waiting for such a request (and in that case initiates
702 * asking the listener about accepting the connection). If no listener
703 * is waiting, we queue the operation request in hope that a listener
704 * shows up soon (before timeout).
706 * This msg is expected as the first and only msg handled through the
707 * non-operation bound virtual table, acceptance of this operation replaces
708 * our virtual table and subsequent msgs would be routed differently (as
709 * we then know what type of operation this is).
711 * @param cls the operation state
712 * @param msg the received message
713 * @return #GNUNET_OK if the channel should be kept alive,
714 * #GNUNET_SYSERR to destroy the channel
717 handle_incoming_msg (void *cls,
718 const struct OperationRequestMessage *msg)
720 struct Operation *op = cls;
721 struct Listener *listener = op->listener;
722 const struct GNUNET_MessageHeader *nested_context;
723 struct GNUNET_MQ_Envelope *env;
724 struct GNUNET_SET_RequestMessage *cmsg;
726 nested_context = GNUNET_MQ_extract_nested_mh (msg);
727 /* Make a copy of the nested_context (application-specific context
728 information that is opaque to set) so we can pass it to the
730 if (NULL != nested_context)
731 op->context_msg = GNUNET_copy_message (nested_context);
732 op->remote_element_count = ntohl (msg->element_count);
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "Received P2P operation request (op %u, port %s) for active listener\n",
735 (uint32_t) ntohl (msg->operation),
736 GNUNET_h2s (&op->listener->app_id));
737 GNUNET_assert (0 == op->suggest_id);
740 op->suggest_id = suggest_id++;
741 GNUNET_assert (NULL != op->timeout_task);
742 GNUNET_SCHEDULER_cancel (op->timeout_task);
743 op->timeout_task = NULL;
744 env = GNUNET_MQ_msg_nested_mh (cmsg,
745 GNUNET_MESSAGE_TYPE_SET_REQUEST,
747 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
752 cmsg->accept_id = htonl (op->suggest_id);
753 cmsg->peer_id = op->peer;
754 GNUNET_MQ_send (listener->cs->mq,
756 /* NOTE: GNUNET_CADET_receive_done() will be called in
757 #handle_client_accept() */
762 * Add an element to @a set as specified by @a msg
764 * @param set set to manipulate
765 * @param msg message specifying the change
768 execute_add (struct Set *set,
769 const struct GNUNET_SET_ElementMessage *msg)
771 struct GNUNET_SET_Element el;
772 struct ElementEntry *ee;
773 struct GNUNET_HashCode hash;
775 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
776 el.size = ntohs (msg->header.size) - sizeof (*msg);
778 el.element_type = ntohs (msg->element_type);
779 GNUNET_SET_element_hash (&el,
781 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786 "Client inserts element %s of size %u\n",
789 ee = GNUNET_malloc (el.size + sizeof (*ee));
790 ee->element.size = el.size;
791 GNUNET_memcpy (&ee[1],
794 ee->element.data = &ee[1];
795 ee->element.element_type = el.element_type;
796 ee->remote = GNUNET_NO;
797 ee->mutations = NULL;
798 ee->mutations_size = 0;
799 ee->element_hash = hash;
800 GNUNET_break (GNUNET_YES ==
801 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
804 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
806 else if (GNUNET_YES ==
807 is_element_of_generation (ee,
808 set->current_generation,
809 set->excluded_generations,
810 set->excluded_generations_size))
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813 "Client inserted element %s of size %u twice (ignored)\n",
817 /* same element inserted twice */
822 struct MutationEvent mut = {
823 .generation = set->current_generation,
826 GNUNET_array_append (ee->mutations,
830 set->vt->add (set->state,
836 * Remove an element from @a set as specified by @a msg
838 * @param set set to manipulate
839 * @param msg message specifying the change
842 execute_remove (struct Set *set,
843 const struct GNUNET_SET_ElementMessage *msg)
845 struct GNUNET_SET_Element el;
846 struct ElementEntry *ee;
847 struct GNUNET_HashCode hash;
849 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
850 el.size = ntohs (msg->header.size) - sizeof (*msg);
852 el.element_type = ntohs (msg->element_type);
853 GNUNET_SET_element_hash (&el, &hash);
854 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
858 /* Client tried to remove non-existing element. */
859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860 "Client removes non-existing element of size %u\n",
865 is_element_of_generation (ee,
866 set->current_generation,
867 set->excluded_generations,
868 set->excluded_generations_size))
870 /* Client tried to remove element twice */
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "Client removed element of size %u twice (ignored)\n",
878 struct MutationEvent mut = {
879 .generation = set->current_generation,
883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884 "Client removes element of size %u\n",
887 GNUNET_array_append (ee->mutations,
891 set->vt->remove (set->state,
897 * Perform a mutation on a set as specified by the @a msg
899 * @param set the set to mutate
900 * @param msg specification of what to change
903 execute_mutation (struct Set *set,
904 const struct GNUNET_SET_ElementMessage *msg)
906 switch (ntohs (msg->header.type))
908 case GNUNET_MESSAGE_TYPE_SET_ADD:
909 execute_add (set, msg);
911 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
912 execute_remove (set, msg);
921 * Execute mutations that were delayed on a set because of
922 * pending operations.
924 * @param set the set to execute mutations on
927 execute_delayed_mutations (struct Set *set)
929 struct PendingMutation *pm;
931 if (0 != set->content->iterator_count)
932 return; /* still cannot do this */
933 while (NULL != (pm = set->content->pending_mutations_head))
935 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
936 set->content->pending_mutations_tail,
938 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939 "Executing pending mutation on %p.\n",
941 execute_mutation (pm->set,
943 GNUNET_free (pm->msg);
950 * Send the next element of a set to the set's client. The next element is given by
951 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
952 * are no more elements in the set. The caller must ensure that the set's iterator is
955 * The client will acknowledge each received element with a
956 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
957 * #handle_client_iter_ack() will then trigger the next transmission.
958 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
960 * @param set set that should send its next element to its client
963 send_client_element (struct Set *set)
966 struct ElementEntry *ee;
967 struct GNUNET_MQ_Envelope *ev;
968 struct GNUNET_SET_IterResponseMessage *msg;
970 GNUNET_assert (NULL != set->iter);
972 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
974 (const void **) &ee);
975 if (GNUNET_NO == ret)
977 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978 "Iteration on %p done.\n",
980 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
981 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
984 GNUNET_assert (set->content->iterator_count > 0);
985 set->content->iterator_count--;
986 execute_delayed_mutations (set);
987 GNUNET_MQ_send (set->cs->mq,
991 GNUNET_assert (NULL != ee);
992 } while (GNUNET_NO ==
993 is_element_of_generation (ee,
994 set->iter_generation,
995 set->excluded_generations,
996 set->excluded_generations_size));
997 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998 "Sending iteration element on %p.\n",
1000 ev = GNUNET_MQ_msg_extra (msg,
1002 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1003 GNUNET_memcpy (&msg[1],
1006 msg->element_type = htons (ee->element.element_type);
1007 msg->iteration_id = htons (set->iteration_id);
1008 GNUNET_MQ_send (set->cs->mq,
1014 * Called when a client wants to iterate the elements of a set.
1015 * Checks if we have a set associated with the client and if we
1016 * can right now start an iteration. If all checks out, starts
1017 * sending the elements of the set to the client.
1019 * @param cls client that sent the message
1020 * @param m message sent by the client
1023 handle_client_iterate (void *cls,
1024 const struct GNUNET_MessageHeader *m)
1026 struct ClientState *cs = cls;
1029 if (NULL == (set = cs->set))
1031 /* attempt to iterate over a non existing set */
1033 GNUNET_SERVICE_client_drop (cs->client);
1036 if (NULL != set->iter)
1038 /* Only one concurrent iterate-action allowed per set */
1040 GNUNET_SERVICE_client_drop (cs->client);
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "Iterating set %p in gen %u with %u content elements\n",
1046 set->current_generation,
1047 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1048 GNUNET_SERVICE_client_continue (cs->client);
1049 set->content->iterator_count++;
1050 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1051 set->iter_generation = set->current_generation;
1052 send_client_element (set);
1057 * Called when a client wants to create a new set. This is typically
1058 * the first request from a client, and includes the type of set
1059 * operation to be performed.
1061 * @param cls client that sent the message
1062 * @param m message sent by the client
1065 handle_client_create_set (void *cls,
1066 const struct GNUNET_SET_CreateMessage *msg)
1068 struct ClientState *cs = cls;
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "Client created new set (operation %u)\n",
1073 (uint32_t) ntohl (msg->operation));
1074 if (NULL != cs->set)
1076 /* There can only be one set per client */
1078 GNUNET_SERVICE_client_drop (cs->client);
1081 set = GNUNET_new (struct Set);
1082 switch (ntohl (msg->operation))
1084 case GNUNET_SET_OPERATION_INTERSECTION:
1085 set->vt = _GSS_intersection_vt ();
1087 case GNUNET_SET_OPERATION_UNION:
1088 set->vt = _GSS_union_vt ();
1093 GNUNET_SERVICE_client_drop (cs->client);
1096 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1097 set->state = set->vt->create ();
1098 if (NULL == set->state)
1100 /* initialization failed (i.e. out of memory) */
1102 GNUNET_SERVICE_client_drop (cs->client);
1105 set->content = GNUNET_new (struct SetContent);
1106 set->content->refcount = 1;
1107 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1111 GNUNET_SERVICE_client_continue (cs->client);
1116 * Timeout happens iff:
1117 * - we suggested an operation to our listener,
1118 * but did not receive a response in time
1119 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1121 * @param cls channel context
1122 * @param tc context information (why was this task triggered now)
1125 incoming_timeout_cb (void *cls)
1127 struct Operation *op = cls;
1129 op->timeout_task = NULL;
1130 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131 "Remote peer's incoming request timed out\n");
1132 incoming_destroy (op);
1137 * Method called whenever another peer has added us to a channel the
1138 * other peer initiated. Only called (once) upon reception of data
1139 * from a channel we listen on.
1141 * The channel context represents the operation itself and gets added
1142 * to a DLL, from where it gets looked up when our local listener
1143 * client responds to a proposed/suggested operation or connects and
1144 * associates with this operation.
1146 * @param cls closure
1147 * @param channel new handle to the channel
1148 * @param source peer that started the channel
1149 * @return initial channel context for the channel
1150 * returns NULL on error
1153 channel_new_cb (void *cls,
1154 struct GNUNET_CADET_Channel *channel,
1155 const struct GNUNET_PeerIdentity *source)
1157 struct Listener *listener = cls;
1158 struct Operation *op;
1160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161 "New incoming channel\n");
1162 op = GNUNET_new (struct Operation);
1163 op->listener = listener;
1165 op->channel = channel;
1166 op->mq = GNUNET_CADET_get_mq (op->channel);
1167 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1170 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1171 &incoming_timeout_cb,
1173 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1181 * Function called whenever a channel is destroyed. Should clean up
1182 * any associated state. It must NOT call
1183 * GNUNET_CADET_channel_destroy() on the channel.
1185 * The peer_disconnect function is part of a a virtual table set initially either
1186 * when a peer creates a new channel with us, or once we create
1187 * a new channel ourselves (evaluate).
1189 * Once we know the exact type of operation (union/intersection), the vt is
1190 * replaced with an operation specific instance (_GSS_[op]_vt).
1192 * @param channel_ctx place where local state associated
1193 * with the channel is stored
1194 * @param channel connection to the other end (henceforth invalid)
1197 channel_end_cb (void *channel_ctx,
1198 const struct GNUNET_CADET_Channel *channel)
1200 struct Operation *op = channel_ctx;
1202 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203 "channel_end_cb called\n");
1205 if (NULL != op->listener)
1206 incoming_destroy (op);
1207 else if (NULL != op->set)
1208 op->set->vt->channel_death (op);
1210 _GSS_operation_destroy (op,
1217 * Function called whenever an MQ-channel's transmission window size changes.
1219 * The first callback in an outgoing channel will be with a non-zero value
1220 * and will mean the channel is connected to the destination.
1222 * For an incoming channel it will be called immediately after the
1223 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1225 * @param cls Channel closure.
1226 * @param channel Connection to the other end (henceforth invalid).
1227 * @param window_size New window size. If the is more messages than buffer size
1228 * this value will be negative..
1231 channel_window_cb (void *cls,
1232 const struct GNUNET_CADET_Channel *channel,
1235 /* FIXME: not implemented, we could do flow control here... */
1240 * Called when a client wants to create a new listener.
1242 * @param cls client that sent the message
1243 * @param msg message sent by the client
1246 handle_client_listen (void *cls,
1247 const struct GNUNET_SET_ListenMessage *msg)
1249 struct ClientState *cs = cls;
1250 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1251 GNUNET_MQ_hd_var_size (incoming_msg,
1252 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1253 struct OperationRequestMessage,
1255 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1256 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1259 GNUNET_MQ_hd_var_size (union_p2p_elements,
1260 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1261 struct GNUNET_SET_ElementMessage,
1263 GNUNET_MQ_hd_var_size (union_p2p_offer,
1264 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1265 struct GNUNET_MessageHeader,
1267 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1268 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1269 struct InquiryMessage,
1271 GNUNET_MQ_hd_var_size (union_p2p_demand,
1272 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1273 struct GNUNET_MessageHeader,
1275 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1276 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1277 struct GNUNET_MessageHeader,
1279 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1280 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1281 struct GNUNET_MessageHeader,
1283 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1284 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1285 struct GNUNET_MessageHeader,
1287 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1288 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1289 struct GNUNET_MessageHeader,
1291 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1292 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1293 struct StrataEstimatorMessage,
1295 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1296 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1297 struct StrataEstimatorMessage,
1299 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1300 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1301 struct GNUNET_SET_ElementMessage,
1303 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1304 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1305 struct IntersectionElementInfoMessage,
1307 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1308 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1311 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1312 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1313 struct IntersectionDoneMessage,
1315 GNUNET_MQ_handler_end ()
1317 struct Listener *listener;
1319 if (NULL != cs->listener)
1321 /* max. one active listener per client! */
1323 GNUNET_SERVICE_client_drop (cs->client);
1326 listener = GNUNET_new (struct Listener);
1328 cs->listener = listener;
1329 listener->app_id = msg->app_id;
1330 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1331 GNUNET_CONTAINER_DLL_insert (listener_head,
1334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335 "New listener created (op %u, port %s)\n",
1336 listener->operation,
1337 GNUNET_h2s (&listener->app_id));
1339 = GNUNET_CADET_open_port (cadet,
1346 GNUNET_SERVICE_client_continue (cs->client);
1351 * Called when the listening client rejects an operation
1352 * request by another peer.
1354 * @param cls client that sent the message
1355 * @param msg message sent by the client
1358 handle_client_reject (void *cls,
1359 const struct GNUNET_SET_RejectMessage *msg)
1361 struct ClientState *cs = cls;
1362 struct Operation *op;
1364 op = get_incoming (ntohl (msg->accept_reject_id));
1367 /* no matching incoming operation for this reject;
1368 could be that the other peer already disconnected... */
1369 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1370 "Client rejected unknown operation %u\n",
1371 (unsigned int) ntohl (msg->accept_reject_id));
1372 GNUNET_SERVICE_client_continue (cs->client);
1375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376 "Peer request (op %u, app %s) rejected by client\n",
1377 op->listener->operation,
1378 GNUNET_h2s (&cs->listener->app_id));
1379 GNUNET_CADET_channel_destroy (op->channel);
1380 GNUNET_SERVICE_client_continue (cs->client);
1385 * Called when a client wants to add or remove an element to a set it inhabits.
1387 * @param cls client that sent the message
1388 * @param msg message sent by the client
1391 check_client_mutation (void *cls,
1392 const struct GNUNET_SET_ElementMessage *msg)
1394 /* NOTE: Technically, we should probably check with the
1395 block library whether the element we are given is well-formed */
1401 * Called when a client wants to add or remove an element to a set it inhabits.
1403 * @param cls client that sent the message
1404 * @param msg message sent by the client
1407 handle_client_mutation (void *cls,
1408 const struct GNUNET_SET_ElementMessage *msg)
1410 struct ClientState *cs = cls;
1413 if (NULL == (set = cs->set))
1415 /* client without a set requested an operation */
1417 GNUNET_SERVICE_client_drop (cs->client);
1420 GNUNET_SERVICE_client_continue (cs->client);
1422 if (0 != set->content->iterator_count)
1424 struct PendingMutation *pm;
1426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427 "Scheduling mutation on set\n");
1428 pm = GNUNET_new (struct PendingMutation);
1429 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1431 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1432 set->content->pending_mutations_tail,
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1437 "Executing mutation on set\n");
1438 execute_mutation (set,
1444 * Advance the current generation of a set,
1445 * adding exclusion ranges if necessary.
1447 * @param set the set where we want to advance the generation
1450 advance_generation (struct Set *set)
1452 struct GenerationRange r;
1454 if (set->current_generation == set->content->latest_generation)
1456 set->content->latest_generation++;
1457 set->current_generation++;
1461 GNUNET_assert (set->current_generation < set->content->latest_generation);
1463 r.start = set->current_generation + 1;
1464 r.end = set->content->latest_generation + 1;
1465 set->content->latest_generation = r.end;
1466 set->current_generation = r.end;
1467 GNUNET_array_append (set->excluded_generations,
1468 set->excluded_generations_size,
1474 * Called when a client wants to initiate a set operation with another
1475 * peer. Initiates the CADET connection to the listener and sends the
1478 * @param cls client that sent the message
1479 * @param msg message sent by the client
1480 * @return #GNUNET_OK if the message is well-formed
1483 check_client_evaluate (void *cls,
1484 const struct GNUNET_SET_EvaluateMessage *msg)
1486 /* FIXME: suboptimal, even if the context below could be NULL,
1487 there are malformed messages this does not check for... */
1493 * Called when a client wants to initiate a set operation with another
1494 * peer. Initiates the CADET connection to the listener and sends the
1497 * @param cls client that sent the message
1498 * @param msg message sent by the client
1501 handle_client_evaluate (void *cls,
1502 const struct GNUNET_SET_EvaluateMessage *msg)
1504 struct ClientState *cs = cls;
1505 struct Operation *op = GNUNET_new (struct Operation);
1506 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1507 GNUNET_MQ_hd_var_size (incoming_msg,
1508 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1509 struct OperationRequestMessage,
1511 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1512 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1515 GNUNET_MQ_hd_var_size (union_p2p_elements,
1516 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1517 struct GNUNET_SET_ElementMessage,
1519 GNUNET_MQ_hd_var_size (union_p2p_offer,
1520 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1521 struct GNUNET_MessageHeader,
1523 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1524 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1525 struct InquiryMessage,
1527 GNUNET_MQ_hd_var_size (union_p2p_demand,
1528 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1529 struct GNUNET_MessageHeader,
1531 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1532 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1533 struct GNUNET_MessageHeader,
1535 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1536 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1537 struct GNUNET_MessageHeader,
1539 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1540 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1541 struct GNUNET_MessageHeader,
1543 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1544 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1545 struct GNUNET_MessageHeader,
1547 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1548 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1549 struct StrataEstimatorMessage,
1551 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1552 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1553 struct StrataEstimatorMessage,
1555 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1556 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1557 struct GNUNET_SET_ElementMessage,
1559 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1560 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1561 struct IntersectionElementInfoMessage,
1563 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1564 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1567 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1568 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1569 struct IntersectionDoneMessage,
1571 GNUNET_MQ_handler_end ()
1574 const struct GNUNET_MessageHeader *context;
1576 if (NULL == (set = cs->set))
1580 GNUNET_SERVICE_client_drop (cs->client);
1583 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1585 op->peer = msg->target_peer;
1586 op->result_mode = ntohl (msg->result_mode);
1587 op->client_request_id = ntohl (msg->request_id);
1588 op->byzantine = msg->byzantine;
1589 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1590 op->force_full = msg->force_full;
1591 op->force_delta = msg->force_delta;
1592 context = GNUNET_MQ_extract_nested_mh (msg);
1594 /* Advance generation values, so that
1595 mutations won't interfer with the running operation. */
1597 op->generation_created = set->current_generation;
1598 advance_generation (set);
1599 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603 "Creating new CADET channel to port %s for set operation type %u\n",
1604 GNUNET_h2s (&msg->app_id),
1606 op->channel = GNUNET_CADET_channel_create (cadet,
1610 GNUNET_CADET_OPTION_RELIABLE,
1614 op->mq = GNUNET_CADET_get_mq (op->channel);
1615 op->state = set->vt->evaluate (op,
1617 if (NULL == op->state)
1620 GNUNET_SERVICE_client_drop (cs->client);
1623 GNUNET_SERVICE_client_continue (cs->client);
1628 * Handle an ack from a client, and send the next element. Note
1629 * that we only expect acks for set elements, not after the
1630 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1632 * @param cls client the client
1633 * @param ack the message
1636 handle_client_iter_ack (void *cls,
1637 const struct GNUNET_SET_IterAckMessage *ack)
1639 struct ClientState *cs = cls;
1642 if (NULL == (set = cs->set))
1644 /* client without a set acknowledged receiving a value */
1646 GNUNET_SERVICE_client_drop (cs->client);
1649 if (NULL == set->iter)
1651 /* client sent an ack, but we were not expecting one (as
1652 set iteration has finished) */
1654 GNUNET_SERVICE_client_drop (cs->client);
1657 GNUNET_SERVICE_client_continue (cs->client);
1658 if (ntohl (ack->send_more))
1660 send_client_element (set);
1664 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1666 set->iteration_id++;
1672 * Handle a request from the client to copy a set.
1674 * @param cls the client
1675 * @param mh the message
1678 handle_client_copy_lazy_prepare (void *cls,
1679 const struct GNUNET_MessageHeader *mh)
1681 struct ClientState *cs = cls;
1683 struct LazyCopyRequest *cr;
1684 struct GNUNET_MQ_Envelope *ev;
1685 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1687 if (NULL == (set = cs->set))
1689 /* client without a set requested an operation */
1691 GNUNET_SERVICE_client_drop (cs->client);
1694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1695 "Client requested creation of lazy copy\n");
1696 cr = GNUNET_new (struct LazyCopyRequest);
1697 cr->cookie = ++lazy_copy_cookie;
1698 cr->source_set = set;
1699 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1702 ev = GNUNET_MQ_msg (resp_msg,
1703 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1704 resp_msg->cookie = cr->cookie;
1705 GNUNET_MQ_send (set->cs->mq,
1707 GNUNET_SERVICE_client_continue (cs->client);
1712 * Handle a request from the client to connect to a copy of a set.
1714 * @param cls the client
1715 * @param msg the message
1718 handle_client_copy_lazy_connect (void *cls,
1719 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1721 struct ClientState *cs = cls;
1722 struct LazyCopyRequest *cr;
1726 if (NULL != cs->set)
1728 /* There can only be one set per client */
1730 GNUNET_SERVICE_client_drop (cs->client);
1734 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1736 if (cr->cookie == msg->cookie)
1742 if (GNUNET_NO == found)
1744 /* client asked for copy with cookie we don't know */
1746 GNUNET_SERVICE_client_drop (cs->client);
1749 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1752 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753 "Client %p requested use of lazy copy\n",
1755 set = GNUNET_new (struct Set);
1756 switch (cr->source_set->operation)
1758 case GNUNET_SET_OPERATION_INTERSECTION:
1759 set->vt = _GSS_intersection_vt ();
1761 case GNUNET_SET_OPERATION_UNION:
1762 set->vt = _GSS_union_vt ();
1769 if (NULL == set->vt->copy_state)
1771 /* Lazy copy not supported for this set operation */
1775 GNUNET_SERVICE_client_drop (cs->client);
1779 set->operation = cr->source_set->operation;
1780 set->state = set->vt->copy_state (cr->source_set->state);
1781 set->content = cr->source_set->content;
1782 set->content->refcount++;
1784 set->current_generation = cr->source_set->current_generation;
1785 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1786 set->excluded_generations
1787 = GNUNET_memdup (cr->source_set->excluded_generations,
1788 set->excluded_generations_size * sizeof (struct GenerationRange));
1790 /* Advance the generation of the new set, so that mutations to the
1791 of the cloned set and the source set are independent. */
1792 advance_generation (set);
1796 GNUNET_SERVICE_client_continue (cs->client);
1801 * Handle a request from the client to cancel a running set operation.
1803 * @param cls the client
1804 * @param msg the message
1807 handle_client_cancel (void *cls,
1808 const struct GNUNET_SET_CancelMessage *msg)
1810 struct ClientState *cs = cls;
1812 struct Operation *op;
1815 if (NULL == (set = cs->set))
1817 /* client without a set requested an operation */
1819 GNUNET_SERVICE_client_drop (cs->client);
1823 for (op = set->ops_head; NULL != op; op = op->next)
1825 if (op->client_request_id == ntohl (msg->request_id))
1831 if (GNUNET_NO == found)
1833 /* It may happen that the operation was already destroyed due to
1834 * the other peer disconnecting. The client may not know about this
1835 * yet and try to cancel the (just barely non-existent) operation.
1836 * So this is not a hard error.
1838 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1839 "Client canceled non-existent op %u\n",
1840 (uint32_t) ntohl (msg->request_id));
1844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1845 "Client requested cancel for op %u\n",
1846 (uint32_t) ntohl (msg->request_id));
1847 _GSS_operation_destroy (op,
1850 GNUNET_SERVICE_client_continue (cs->client);
1855 * Handle a request from the client to accept a set operation that
1856 * came from a remote peer. We forward the accept to the associated
1857 * operation for handling
1859 * @param cls the client
1860 * @param msg the message
1863 handle_client_accept (void *cls,
1864 const struct GNUNET_SET_AcceptMessage *msg)
1866 struct ClientState *cs = cls;
1868 struct Operation *op;
1869 struct GNUNET_SET_ResultMessage *result_message;
1870 struct GNUNET_MQ_Envelope *ev;
1871 struct Listener *listener;
1873 if (NULL == (set = cs->set))
1875 /* client without a set requested to accept */
1877 GNUNET_SERVICE_client_drop (cs->client);
1880 op = get_incoming (ntohl (msg->accept_reject_id));
1883 /* It is not an error if the set op does not exist -- it may
1884 * have been destroyed when the partner peer disconnected. */
1885 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1886 "Client %p accepted request %u of listener %p that is no longer active\n",
1888 ntohl (msg->accept_reject_id),
1890 ev = GNUNET_MQ_msg (result_message,
1891 GNUNET_MESSAGE_TYPE_SET_RESULT);
1892 result_message->request_id = msg->request_id;
1893 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1894 GNUNET_MQ_send (set->cs->mq,
1896 GNUNET_SERVICE_client_continue (cs->client);
1899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900 "Client accepting request %u\n",
1901 (uint32_t) ntohl (msg->accept_reject_id));
1902 listener = op->listener;
1903 op->listener = NULL;
1904 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1908 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1911 op->client_request_id = ntohl (msg->request_id);
1912 op->result_mode = ntohl (msg->result_mode);
1913 op->byzantine = msg->byzantine;
1914 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1915 op->force_full = msg->force_full;
1916 op->force_delta = msg->force_delta;
1918 /* Advance generation values, so that future mutations do not
1919 interfer with the running operation. */
1920 op->generation_created = set->current_generation;
1921 advance_generation (set);
1922 GNUNET_assert (NULL == op->state);
1923 op->state = set->vt->accept (op);
1924 if (NULL == op->state)
1927 GNUNET_SERVICE_client_drop (cs->client);
1930 /* Now allow CADET to continue, as we did not do this in
1931 #handle_incoming_msg (as we wanted to first see if the
1932 local client would accept the request). */
1933 GNUNET_CADET_receive_done (op->channel);
1934 GNUNET_SERVICE_client_continue (cs->client);
1939 * Called to clean up, after a shutdown has been requested.
1941 * @param cls closure, NULL
1944 shutdown_task (void *cls)
1946 /* Delay actual shutdown to allow service to disconnect clients */
1947 in_shutdown = GNUNET_YES;
1948 if (0 == num_clients)
1952 GNUNET_CADET_disconnect (cadet);
1956 GNUNET_STATISTICS_destroy (_GSS_statistics,
1958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959 "handled shutdown request\n");
1964 * Function called by the service's run
1965 * method to run service-specific setup code.
1967 * @param cls closure
1968 * @param cfg configuration to use
1969 * @param service the initialized service
1973 const struct GNUNET_CONFIGURATION_Handle *cfg,
1974 struct GNUNET_SERVICE_Handle *service)
1976 /* FIXME: need to modify SERVICE (!) API to allow
1977 us to run a shutdown task *after* clients were
1978 forcefully disconnected! */
1979 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1981 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1983 cadet = GNUNET_CADET_connect (cfg);
1986 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1987 _("Could not connect to CADET service\n"));
1988 GNUNET_SCHEDULER_shutdown ();
1995 * Define "main" method using service macro.
1999 GNUNET_SERVICE_OPTION_NONE,
2002 &client_disconnect_cb,
2004 GNUNET_MQ_hd_fixed_size (client_accept,
2005 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2006 struct GNUNET_SET_AcceptMessage,
2008 GNUNET_MQ_hd_fixed_size (client_iter_ack,
2009 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2010 struct GNUNET_SET_IterAckMessage,
2012 GNUNET_MQ_hd_var_size (client_mutation,
2013 GNUNET_MESSAGE_TYPE_SET_ADD,
2014 struct GNUNET_SET_ElementMessage,
2016 GNUNET_MQ_hd_fixed_size (client_create_set,
2017 GNUNET_MESSAGE_TYPE_SET_CREATE,
2018 struct GNUNET_SET_CreateMessage,
2020 GNUNET_MQ_hd_fixed_size (client_iterate,
2021 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2022 struct GNUNET_MessageHeader,
2024 GNUNET_MQ_hd_var_size (client_evaluate,
2025 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2026 struct GNUNET_SET_EvaluateMessage,
2028 GNUNET_MQ_hd_fixed_size (client_listen,
2029 GNUNET_MESSAGE_TYPE_SET_LISTEN,
2030 struct GNUNET_SET_ListenMessage,
2032 GNUNET_MQ_hd_fixed_size (client_reject,
2033 GNUNET_MESSAGE_TYPE_SET_REJECT,
2034 struct GNUNET_SET_RejectMessage,
2036 GNUNET_MQ_hd_var_size (client_mutation,
2037 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2038 struct GNUNET_SET_ElementMessage,
2040 GNUNET_MQ_hd_fixed_size (client_cancel,
2041 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2042 struct GNUNET_SET_CancelMessage,
2044 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2045 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2046 struct GNUNET_MessageHeader,
2048 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2049 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2050 struct GNUNET_SET_CopyLazyConnectMessage,
2052 GNUNET_MQ_handler_end ());
2055 /* end of gnunet-service-set.c */