2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file set/gnunet-service-set.c
17 * @brief two-peer set operations
18 * @author Florian Dold
19 * @author Christian Grothoff
21 #include "gnunet-service-set.h"
22 #include "gnunet-service-set_union.h"
23 #include "gnunet-service-set_intersection.h"
24 #include "gnunet-service-set_protocol.h"
25 #include "gnunet_statistics_service.h"
28 * How long do we hold on to an incoming channel if there is
29 * no local listener before giving up?
31 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35 * Lazy copy requests made by a client.
37 struct LazyCopyRequest
42 struct LazyCopyRequest *prev;
47 struct LazyCopyRequest *next;
50 * Which set are we supposed to copy?
52 struct Set *source_set;
55 * Cookie identifying the request.
63 * A listener is inhabited by a client, and waits for evaluation
64 * requests from remote peers.
69 * Listeners are held in a doubly linked list.
71 struct Listener *next;
74 * Listeners are held in a doubly linked list.
76 struct Listener *prev;
79 * Head of DLL of operations this listener is responsible for.
80 * Once the client has accepted/declined the operation, the
81 * operation is moved to the respective set's operation DLLS.
83 struct Operation *op_head;
86 * Tail of DLL of operations this listener is responsible for.
87 * Once the client has accepted/declined the operation, the
88 * operation is moved to the respective set's operation DLLS.
90 struct Operation *op_tail;
93 * Client that owns the listener.
94 * Only one client may own a listener.
96 struct ClientState *cs;
99 * The port we are listening on with CADET.
101 struct GNUNET_CADET_Port *open_port;
104 * Application ID for the operation, used to distinguish
105 * multiple operations of the same type with the same peer.
107 struct GNUNET_HashCode app_id;
110 * The type of the operation.
112 enum GNUNET_SET_OperationType operation;
117 * Handle to the cadet service, used to listen for and connect to
120 static struct GNUNET_CADET_Handle *cadet;
123 * DLL of lazy copy requests by this client.
125 static struct LazyCopyRequest *lazy_copy_head;
128 * DLL of lazy copy requests by this client.
130 static struct LazyCopyRequest *lazy_copy_tail;
133 * Generator for unique cookie we set per lazy copy request.
135 static uint32_t lazy_copy_cookie;
140 struct GNUNET_STATISTICS_Handle *_GSS_statistics;
143 * Listeners are held in a doubly linked list.
145 static struct Listener *listener_head;
148 * Listeners are held in a doubly linked list.
150 static struct Listener *listener_tail;
153 * Number of active clients.
155 static unsigned int num_clients;
158 * Are we in shutdown? if #GNUNET_YES and the number of clients
159 * drops to zero, disconnect from CADET.
161 static int in_shutdown;
164 * Counter for allocating unique IDs for clients, used to identify
165 * incoming operation requests from remote peers, that the client can
166 * choose to accept or refuse. 0 must not be used (reserved for
169 static uint32_t suggest_id;
173 * Get the incoming socket associated with the given id.
175 * @param listener the listener to look in
176 * @param id id to look for
177 * @return the incoming socket associated with the id,
178 * or NULL if there is none
180 static struct Operation *
181 get_incoming (uint32_t id)
183 for (struct Listener *listener = listener_head;
185 listener = listener->next)
187 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
188 if (op->suggest_id == id)
196 * Destroy an incoming request from a remote peer
198 * @param op remote request to destroy
201 incoming_destroy (struct Operation *op)
203 struct Listener *listener;
204 struct GNUNET_CADET_Channel *channel;
206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
207 "Destroying incoming operation %p\n",
209 if (NULL != (listener = op->listener))
211 GNUNET_CONTAINER_DLL_remove (listener->op_head,
216 if (NULL != op->timeout_task)
218 GNUNET_SCHEDULER_cancel (op->timeout_task);
219 op->timeout_task = NULL;
221 if (NULL != (channel = op->channel))
224 GNUNET_CADET_channel_destroy (channel);
230 * Context for the #garbage_collect_cb().
232 struct GarbageContext
236 * Map for which we are garbage collecting removed elements.
238 struct GNUNET_CONTAINER_MultiHashMap *map;
241 * Lowest generation for which an operation is still pending.
243 unsigned int min_op_generation;
246 * Largest generation for which an operation is still pending.
248 unsigned int max_op_generation;
254 * Function invoked to check if an element can be removed from
255 * the set's history because it is no longer needed.
257 * @param cls the `struct GarbageContext *`
258 * @param key key of the element in the map
259 * @param value the `struct ElementEntry *`
260 * @return #GNUNET_OK (continue to iterate)
263 garbage_collect_cb (void *cls,
264 const struct GNUNET_HashCode *key,
267 //struct GarbageContext *gc = cls;
268 //struct ElementEntry *ee = value;
270 //if (GNUNET_YES != ee->removed)
272 //if ( (gc->max_op_generation < ee->generation_added) ||
273 // (ee->generation_removed > gc->min_op_generation) )
275 // GNUNET_assert (GNUNET_YES ==
276 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
286 * Collect and destroy elements that are not needed anymore, because
287 * their lifetime (as determined by their generation) does not overlap
288 * with any active set operation.
290 * @param set set to garbage collect
293 collect_generation_garbage (struct Set *set)
295 struct GarbageContext gc;
297 gc.min_op_generation = UINT_MAX;
298 gc.max_op_generation = 0;
299 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
301 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
302 op->generation_created);
303 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
304 op->generation_created);
306 gc.map = set->content->elements;
307 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
314 * Is @a generation in the range of exclusions?
316 * @param generation generation to query
317 * @param excluded array of generations where the element is excluded
318 * @param excluded_size length of the @a excluded array
319 * @return #GNUNET_YES if @a generation is in any of the ranges
322 is_excluded_generation (unsigned int generation,
323 struct GenerationRange *excluded,
324 unsigned int excluded_size)
326 for (unsigned int i = 0; i < excluded_size; i++)
327 if ( (generation >= excluded[i].start) &&
328 (generation < excluded[i].end) )
335 * Is element @a ee part of the set during @a query_generation?
337 * @param ee element to test
338 * @param query_generation generation to query
339 * @param excluded array of generations where the element is excluded
340 * @param excluded_size length of the @a excluded array
341 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
344 is_element_of_generation (struct ElementEntry *ee,
345 unsigned int query_generation,
346 struct GenerationRange *excluded,
347 unsigned int excluded_size)
349 struct MutationEvent *mut;
352 GNUNET_assert (NULL != ee->mutations);
354 is_excluded_generation (query_generation,
362 is_present = GNUNET_NO;
364 /* Could be made faster with binary search, but lists
365 are small, so why bother. */
366 for (unsigned int i = 0; i < ee->mutations_size; i++)
368 mut = &ee->mutations[i];
370 if (mut->generation > query_generation)
372 /* The mutation doesn't apply to our generation
373 anymore. We can'b break here, since mutations aren't
374 sorted by generation. */
379 is_excluded_generation (mut->generation,
383 /* The generation is excluded (because it belongs to another
384 fork via a lazy copy) and thus mutations aren't considered
385 for membership testing. */
389 /* This would be an inconsistency in how we manage mutations. */
390 if ( (GNUNET_YES == is_present) &&
391 (GNUNET_YES == mut->added) )
394 if ( (GNUNET_NO == is_present) &&
395 (GNUNET_NO == mut->added) )
398 is_present = mut->added;
406 * Is element @a ee part of the set used by @a op?
408 * @param ee element to test
409 * @param op operation the defines the set and its generation
410 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
413 _GSS_is_element_of_operation (struct ElementEntry *ee,
414 struct Operation *op)
416 return is_element_of_generation (ee,
417 op->generation_created,
418 op->set->excluded_generations,
419 op->set->excluded_generations_size);
424 * Destroy the given operation. Used for any operation where both
425 * peers were known and that thus actually had a vt and channel. Must
426 * not be used for operations where 'listener' is still set and we do
427 * not know the other peer.
429 * Call the implementation-specific cancel function of the operation.
430 * Disconnects from the remote peer. Does not disconnect the client,
431 * as there may be multiple operations per set.
433 * @param op operation to destroy
434 * @param gc #GNUNET_YES to perform garbage collection on the set
437 _GSS_operation_destroy (struct Operation *op,
440 struct Set *set = op->set;
441 struct GNUNET_CADET_Channel *channel;
443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444 "Destroying operation %p\n",
446 GNUNET_assert (NULL == op->listener);
447 if (NULL != op->state)
449 set->vt->cancel (op);
454 GNUNET_CONTAINER_DLL_remove (set->ops_head,
459 if (NULL != op->context_msg)
461 GNUNET_free (op->context_msg);
462 op->context_msg = NULL;
464 if (NULL != (channel = op->channel))
466 /* This will free op; called conditionally as this helper function
467 is also called from within the channel disconnect handler. */
469 GNUNET_CADET_channel_destroy (channel);
471 if ( (NULL != set) &&
473 collect_generation_garbage (set);
474 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
475 * there was a channel end handler that will free 'op' on the call stack. */
480 * Callback called when a client connects to the service.
482 * @param cls closure for the service
483 * @param c the new client that connected to the service
484 * @param mq the message queue used to send messages to the client
485 * @return @a `struct ClientState`
488 client_connect_cb (void *cls,
489 struct GNUNET_SERVICE_Client *c,
490 struct GNUNET_MQ_Handle *mq)
492 struct ClientState *cs;
495 cs = GNUNET_new (struct ClientState);
503 * Iterator over hash map entries to free element entries.
506 * @param key current key code
507 * @param value a `struct ElementEntry *` to be free'd
508 * @return #GNUNET_YES (continue to iterate)
511 destroy_elements_iterator (void *cls,
512 const struct GNUNET_HashCode *key,
515 struct ElementEntry *ee = value;
517 GNUNET_free_non_null (ee->mutations);
524 * Clean up after a client has disconnected
526 * @param cls closure, unused
527 * @param client the client to clean up after
528 * @param internal_cls the `struct ClientState`
531 client_disconnect_cb (void *cls,
532 struct GNUNET_SERVICE_Client *client,
535 struct ClientState *cs = internal_cls;
536 struct Operation *op;
537 struct Listener *listener;
540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541 "Client disconnected, cleaning up\n");
542 if (NULL != (set = cs->set))
544 struct SetContent *content = set->content;
545 struct PendingMutation *pm;
546 struct PendingMutation *pm_current;
547 struct LazyCopyRequest *lcr;
549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550 "Destroying client's set\n");
551 /* Destroy pending set operations */
552 while (NULL != set->ops_head)
553 _GSS_operation_destroy (set->ops_head,
556 /* Destroy operation-specific state */
557 GNUNET_assert (NULL != set->state);
558 set->vt->destroy_set (set->state);
561 /* Clean up ongoing iterations */
562 if (NULL != set->iter)
564 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
569 /* discard any pending mutations that reference this set */
570 pm = content->pending_mutations_head;
575 if (pm_current->set == set)
577 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
578 content->pending_mutations_tail,
580 GNUNET_free (pm_current);
584 /* free set content (or at least decrement RC) */
586 GNUNET_assert (0 != content->refcount);
588 if (0 == content->refcount)
590 GNUNET_assert (NULL != content->elements);
591 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
592 &destroy_elements_iterator,
594 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
595 content->elements = NULL;
596 GNUNET_free (content);
598 GNUNET_free_non_null (set->excluded_generations);
599 set->excluded_generations = NULL;
601 /* remove set from pending copy requests */
602 lcr = lazy_copy_head;
605 struct LazyCopyRequest *lcr_current = lcr;
608 if (lcr_current->source_set == set)
610 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
613 GNUNET_free (lcr_current);
619 if (NULL != (listener = cs->listener))
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "Destroying client's listener\n");
623 GNUNET_CADET_close_port (listener->open_port);
624 listener->open_port = NULL;
625 while (NULL != (op = listener->op_head))
627 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
628 "Destroying incoming operation `%u' from peer `%s'\n",
629 (unsigned int) op->client_request_id,
630 GNUNET_i2s (&op->peer));
631 incoming_destroy (op);
633 GNUNET_CONTAINER_DLL_remove (listener_head,
636 GNUNET_free (listener);
640 if ( (GNUNET_YES == in_shutdown) &&
645 GNUNET_CADET_disconnect (cadet);
653 * Check a request for a set operation from another peer.
655 * @param cls the operation state
656 * @param msg the received message
657 * @return #GNUNET_OK if the channel should be kept alive,
658 * #GNUNET_SYSERR to destroy the channel
661 check_incoming_msg (void *cls,
662 const struct OperationRequestMessage *msg)
664 struct Operation *op = cls;
665 struct Listener *listener = op->listener;
666 const struct GNUNET_MessageHeader *nested_context;
668 /* double operation request */
669 if (0 != op->suggest_id)
672 return GNUNET_SYSERR;
674 /* This should be equivalent to the previous condition, but can't hurt to check twice */
675 if (NULL == op->listener)
678 return GNUNET_SYSERR;
680 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
683 return GNUNET_SYSERR;
685 nested_context = GNUNET_MQ_extract_nested_mh (msg);
686 if ( (NULL != nested_context) &&
687 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
690 return GNUNET_SYSERR;
697 * Handle a request for a set operation from another peer. Checks if we
698 * have a listener waiting for such a request (and in that case initiates
699 * asking the listener about accepting the connection). If no listener
700 * is waiting, we queue the operation request in hope that a listener
701 * shows up soon (before timeout).
703 * This msg is expected as the first and only msg handled through the
704 * non-operation bound virtual table, acceptance of this operation replaces
705 * our virtual table and subsequent msgs would be routed differently (as
706 * we then know what type of operation this is).
708 * @param cls the operation state
709 * @param msg the received message
710 * @return #GNUNET_OK if the channel should be kept alive,
711 * #GNUNET_SYSERR to destroy the channel
714 handle_incoming_msg (void *cls,
715 const struct OperationRequestMessage *msg)
717 struct Operation *op = cls;
718 struct Listener *listener = op->listener;
719 const struct GNUNET_MessageHeader *nested_context;
720 struct GNUNET_MQ_Envelope *env;
721 struct GNUNET_SET_RequestMessage *cmsg;
723 nested_context = GNUNET_MQ_extract_nested_mh (msg);
724 /* Make a copy of the nested_context (application-specific context
725 information that is opaque to set) so we can pass it to the
727 if (NULL != nested_context)
728 op->context_msg = GNUNET_copy_message (nested_context);
729 op->remote_element_count = ntohl (msg->element_count);
730 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731 "Received P2P operation request (op %u, port %s) for active listener\n",
732 (uint32_t) ntohl (msg->operation),
733 GNUNET_h2s (&op->listener->app_id));
734 GNUNET_assert (0 == op->suggest_id);
737 op->suggest_id = suggest_id++;
738 GNUNET_assert (NULL != op->timeout_task);
739 GNUNET_SCHEDULER_cancel (op->timeout_task);
740 op->timeout_task = NULL;
741 env = GNUNET_MQ_msg_nested_mh (cmsg,
742 GNUNET_MESSAGE_TYPE_SET_REQUEST,
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
749 cmsg->accept_id = htonl (op->suggest_id);
750 cmsg->peer_id = op->peer;
751 GNUNET_MQ_send (listener->cs->mq,
753 /* NOTE: GNUNET_CADET_receive_done() will be called in
754 #handle_client_accept() */
759 * Add an element to @a set as specified by @a msg
761 * @param set set to manipulate
762 * @param msg message specifying the change
765 execute_add (struct Set *set,
766 const struct GNUNET_SET_ElementMessage *msg)
768 struct GNUNET_SET_Element el;
769 struct ElementEntry *ee;
770 struct GNUNET_HashCode hash;
772 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
773 el.size = ntohs (msg->header.size) - sizeof (*msg);
775 el.element_type = ntohs (msg->element_type);
776 GNUNET_SET_element_hash (&el,
778 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "Client inserts element %s of size %u\n",
786 ee = GNUNET_malloc (el.size + sizeof (*ee));
787 ee->element.size = el.size;
788 GNUNET_memcpy (&ee[1],
791 ee->element.data = &ee[1];
792 ee->element.element_type = el.element_type;
793 ee->remote = GNUNET_NO;
794 ee->mutations = NULL;
795 ee->mutations_size = 0;
796 ee->element_hash = hash;
797 GNUNET_break (GNUNET_YES ==
798 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
801 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
803 else if (GNUNET_YES ==
804 is_element_of_generation (ee,
805 set->current_generation,
806 set->excluded_generations,
807 set->excluded_generations_size))
809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810 "Client inserted element %s of size %u twice (ignored)\n",
814 /* same element inserted twice */
819 struct MutationEvent mut = {
820 .generation = set->current_generation,
823 GNUNET_array_append (ee->mutations,
827 set->vt->add (set->state,
833 * Remove an element from @a set as specified by @a msg
835 * @param set set to manipulate
836 * @param msg message specifying the change
839 execute_remove (struct Set *set,
840 const struct GNUNET_SET_ElementMessage *msg)
842 struct GNUNET_SET_Element el;
843 struct ElementEntry *ee;
844 struct GNUNET_HashCode hash;
846 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
847 el.size = ntohs (msg->header.size) - sizeof (*msg);
849 el.element_type = ntohs (msg->element_type);
850 GNUNET_SET_element_hash (&el, &hash);
851 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
855 /* Client tried to remove non-existing element. */
856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857 "Client removes non-existing element of size %u\n",
862 is_element_of_generation (ee,
863 set->current_generation,
864 set->excluded_generations,
865 set->excluded_generations_size))
867 /* Client tried to remove element twice */
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869 "Client removed element of size %u twice (ignored)\n",
875 struct MutationEvent mut = {
876 .generation = set->current_generation,
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881 "Client removes element of size %u\n",
884 GNUNET_array_append (ee->mutations,
888 set->vt->remove (set->state,
894 * Perform a mutation on a set as specified by the @a msg
896 * @param set the set to mutate
897 * @param msg specification of what to change
900 execute_mutation (struct Set *set,
901 const struct GNUNET_SET_ElementMessage *msg)
903 switch (ntohs (msg->header.type))
905 case GNUNET_MESSAGE_TYPE_SET_ADD:
906 execute_add (set, msg);
908 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
909 execute_remove (set, msg);
918 * Execute mutations that were delayed on a set because of
919 * pending operations.
921 * @param set the set to execute mutations on
924 execute_delayed_mutations (struct Set *set)
926 struct PendingMutation *pm;
928 if (0 != set->content->iterator_count)
929 return; /* still cannot do this */
930 while (NULL != (pm = set->content->pending_mutations_head))
932 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
933 set->content->pending_mutations_tail,
935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936 "Executing pending mutation on %p.\n",
938 execute_mutation (pm->set,
940 GNUNET_free (pm->msg);
947 * Send the next element of a set to the set's client. The next element is given by
948 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
949 * are no more elements in the set. The caller must ensure that the set's iterator is
952 * The client will acknowledge each received element with a
953 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
954 * #handle_client_iter_ack() will then trigger the next transmission.
955 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
957 * @param set set that should send its next element to its client
960 send_client_element (struct Set *set)
963 struct ElementEntry *ee;
964 struct GNUNET_MQ_Envelope *ev;
965 struct GNUNET_SET_IterResponseMessage *msg;
967 GNUNET_assert (NULL != set->iter);
969 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
971 (const void **) &ee);
972 if (GNUNET_NO == ret)
974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975 "Iteration on %p done.\n",
977 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
978 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
981 GNUNET_assert (set->content->iterator_count > 0);
982 set->content->iterator_count--;
983 execute_delayed_mutations (set);
984 GNUNET_MQ_send (set->cs->mq,
988 GNUNET_assert (NULL != ee);
989 } while (GNUNET_NO ==
990 is_element_of_generation (ee,
991 set->iter_generation,
992 set->excluded_generations,
993 set->excluded_generations_size));
994 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995 "Sending iteration element on %p.\n",
997 ev = GNUNET_MQ_msg_extra (msg,
999 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1000 GNUNET_memcpy (&msg[1],
1003 msg->element_type = htons (ee->element.element_type);
1004 msg->iteration_id = htons (set->iteration_id);
1005 GNUNET_MQ_send (set->cs->mq,
1011 * Called when a client wants to iterate the elements of a set.
1012 * Checks if we have a set associated with the client and if we
1013 * can right now start an iteration. If all checks out, starts
1014 * sending the elements of the set to the client.
1016 * @param cls client that sent the message
1017 * @param m message sent by the client
1020 handle_client_iterate (void *cls,
1021 const struct GNUNET_MessageHeader *m)
1023 struct ClientState *cs = cls;
1026 if (NULL == (set = cs->set))
1028 /* attempt to iterate over a non existing set */
1030 GNUNET_SERVICE_client_drop (cs->client);
1033 if (NULL != set->iter)
1035 /* Only one concurrent iterate-action allowed per set */
1037 GNUNET_SERVICE_client_drop (cs->client);
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Iterating set %p in gen %u with %u content elements\n",
1043 set->current_generation,
1044 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1045 GNUNET_SERVICE_client_continue (cs->client);
1046 set->content->iterator_count++;
1047 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1048 set->iter_generation = set->current_generation;
1049 send_client_element (set);
1054 * Called when a client wants to create a new set. This is typically
1055 * the first request from a client, and includes the type of set
1056 * operation to be performed.
1058 * @param cls client that sent the message
1059 * @param m message sent by the client
1062 handle_client_create_set (void *cls,
1063 const struct GNUNET_SET_CreateMessage *msg)
1065 struct ClientState *cs = cls;
1068 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069 "Client created new set (operation %u)\n",
1070 (uint32_t) ntohl (msg->operation));
1071 if (NULL != cs->set)
1073 /* There can only be one set per client */
1075 GNUNET_SERVICE_client_drop (cs->client);
1078 set = GNUNET_new (struct Set);
1079 switch (ntohl (msg->operation))
1081 case GNUNET_SET_OPERATION_INTERSECTION:
1082 set->vt = _GSS_intersection_vt ();
1084 case GNUNET_SET_OPERATION_UNION:
1085 set->vt = _GSS_union_vt ();
1090 GNUNET_SERVICE_client_drop (cs->client);
1093 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1094 set->state = set->vt->create ();
1095 if (NULL == set->state)
1097 /* initialization failed (i.e. out of memory) */
1099 GNUNET_SERVICE_client_drop (cs->client);
1102 set->content = GNUNET_new (struct SetContent);
1103 set->content->refcount = 1;
1104 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1108 GNUNET_SERVICE_client_continue (cs->client);
1113 * Timeout happens iff:
1114 * - we suggested an operation to our listener,
1115 * but did not receive a response in time
1116 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1118 * @param cls channel context
1119 * @param tc context information (why was this task triggered now)
1122 incoming_timeout_cb (void *cls)
1124 struct Operation *op = cls;
1126 op->timeout_task = NULL;
1127 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1128 "Remote peer's incoming request timed out\n");
1129 incoming_destroy (op);
1134 * Method called whenever another peer has added us to a channel the
1135 * other peer initiated. Only called (once) upon reception of data
1136 * from a channel we listen on.
1138 * The channel context represents the operation itself and gets added
1139 * to a DLL, from where it gets looked up when our local listener
1140 * client responds to a proposed/suggested operation or connects and
1141 * associates with this operation.
1143 * @param cls closure
1144 * @param channel new handle to the channel
1145 * @param source peer that started the channel
1146 * @return initial channel context for the channel
1147 * returns NULL on error
1150 channel_new_cb (void *cls,
1151 struct GNUNET_CADET_Channel *channel,
1152 const struct GNUNET_PeerIdentity *source)
1154 struct Listener *listener = cls;
1155 struct Operation *op;
1157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158 "New incoming channel\n");
1159 op = GNUNET_new (struct Operation);
1160 op->listener = listener;
1162 op->channel = channel;
1163 op->mq = GNUNET_CADET_get_mq (op->channel);
1164 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1167 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1168 &incoming_timeout_cb,
1170 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1178 * Function called whenever a channel is destroyed. Should clean up
1179 * any associated state. It must NOT call
1180 * GNUNET_CADET_channel_destroy() on the channel.
1182 * The peer_disconnect function is part of a a virtual table set initially either
1183 * when a peer creates a new channel with us, or once we create
1184 * a new channel ourselves (evaluate).
1186 * Once we know the exact type of operation (union/intersection), the vt is
1187 * replaced with an operation specific instance (_GSS_[op]_vt).
1189 * @param channel_ctx place where local state associated
1190 * with the channel is stored
1191 * @param channel connection to the other end (henceforth invalid)
1194 channel_end_cb (void *channel_ctx,
1195 const struct GNUNET_CADET_Channel *channel)
1197 struct Operation *op = channel_ctx;
1199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200 "channel_end_cb called\n");
1202 if (NULL != op->listener)
1203 incoming_destroy (op);
1204 else if (NULL != op->set)
1205 op->set->vt->channel_death (op);
1207 _GSS_operation_destroy (op,
1214 * Function called whenever an MQ-channel's transmission window size changes.
1216 * The first callback in an outgoing channel will be with a non-zero value
1217 * and will mean the channel is connected to the destination.
1219 * For an incoming channel it will be called immediately after the
1220 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1222 * @param cls Channel closure.
1223 * @param channel Connection to the other end (henceforth invalid).
1224 * @param window_size New window size. If the is more messages than buffer size
1225 * this value will be negative..
1228 channel_window_cb (void *cls,
1229 const struct GNUNET_CADET_Channel *channel,
1232 /* FIXME: not implemented, we could do flow control here... */
1237 * Called when a client wants to create a new listener.
1239 * @param cls client that sent the message
1240 * @param msg message sent by the client
1243 handle_client_listen (void *cls,
1244 const struct GNUNET_SET_ListenMessage *msg)
1246 struct ClientState *cs = cls;
1247 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1248 GNUNET_MQ_hd_var_size (incoming_msg,
1249 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1250 struct OperationRequestMessage,
1252 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1253 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1256 GNUNET_MQ_hd_var_size (union_p2p_elements,
1257 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1258 struct GNUNET_SET_ElementMessage,
1260 GNUNET_MQ_hd_var_size (union_p2p_offer,
1261 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1262 struct GNUNET_MessageHeader,
1264 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1265 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1266 struct InquiryMessage,
1268 GNUNET_MQ_hd_var_size (union_p2p_demand,
1269 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1270 struct GNUNET_MessageHeader,
1272 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1273 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1274 struct GNUNET_MessageHeader,
1276 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1277 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1278 struct GNUNET_MessageHeader,
1280 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1281 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1282 struct GNUNET_MessageHeader,
1284 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1285 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1286 struct GNUNET_MessageHeader,
1288 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1289 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1290 struct StrataEstimatorMessage,
1292 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1293 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1294 struct StrataEstimatorMessage,
1296 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1297 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1298 struct GNUNET_SET_ElementMessage,
1300 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1301 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1302 struct IntersectionElementInfoMessage,
1304 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1305 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1308 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1309 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1310 struct IntersectionDoneMessage,
1312 GNUNET_MQ_handler_end ()
1314 struct Listener *listener;
1316 if (NULL != cs->listener)
1318 /* max. one active listener per client! */
1320 GNUNET_SERVICE_client_drop (cs->client);
1323 listener = GNUNET_new (struct Listener);
1325 cs->listener = listener;
1326 listener->app_id = msg->app_id;
1327 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1328 GNUNET_CONTAINER_DLL_insert (listener_head,
1331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332 "New listener created (op %u, port %s)\n",
1333 listener->operation,
1334 GNUNET_h2s (&listener->app_id));
1336 = GNUNET_CADET_open_port (cadet,
1343 GNUNET_SERVICE_client_continue (cs->client);
1348 * Called when the listening client rejects an operation
1349 * request by another peer.
1351 * @param cls client that sent the message
1352 * @param msg message sent by the client
1355 handle_client_reject (void *cls,
1356 const struct GNUNET_SET_RejectMessage *msg)
1358 struct ClientState *cs = cls;
1359 struct Operation *op;
1361 op = get_incoming (ntohl (msg->accept_reject_id));
1364 /* no matching incoming operation for this reject;
1365 could be that the other peer already disconnected... */
1366 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1367 "Client rejected unknown operation %u\n",
1368 (unsigned int) ntohl (msg->accept_reject_id));
1369 GNUNET_SERVICE_client_continue (cs->client);
1372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373 "Peer request (op %u, app %s) rejected by client\n",
1374 op->listener->operation,
1375 GNUNET_h2s (&cs->listener->app_id));
1376 GNUNET_CADET_channel_destroy (op->channel);
1377 GNUNET_SERVICE_client_continue (cs->client);
1382 * Called when a client wants to add or remove an element to a set it inhabits.
1384 * @param cls client that sent the message
1385 * @param msg message sent by the client
1388 check_client_mutation (void *cls,
1389 const struct GNUNET_SET_ElementMessage *msg)
1391 /* NOTE: Technically, we should probably check with the
1392 block library whether the element we are given is well-formed */
1398 * Called when a client wants to add or remove an element to a set it inhabits.
1400 * @param cls client that sent the message
1401 * @param msg message sent by the client
1404 handle_client_mutation (void *cls,
1405 const struct GNUNET_SET_ElementMessage *msg)
1407 struct ClientState *cs = cls;
1410 if (NULL == (set = cs->set))
1412 /* client without a set requested an operation */
1414 GNUNET_SERVICE_client_drop (cs->client);
1417 GNUNET_SERVICE_client_continue (cs->client);
1419 if (0 != set->content->iterator_count)
1421 struct PendingMutation *pm;
1423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1424 "Scheduling mutation on set\n");
1425 pm = GNUNET_new (struct PendingMutation);
1426 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1428 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1429 set->content->pending_mutations_tail,
1433 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1434 "Executing mutation on set\n");
1435 execute_mutation (set,
1441 * Advance the current generation of a set,
1442 * adding exclusion ranges if necessary.
1444 * @param set the set where we want to advance the generation
1447 advance_generation (struct Set *set)
1449 struct GenerationRange r;
1451 if (set->current_generation == set->content->latest_generation)
1453 set->content->latest_generation++;
1454 set->current_generation++;
1458 GNUNET_assert (set->current_generation < set->content->latest_generation);
1460 r.start = set->current_generation + 1;
1461 r.end = set->content->latest_generation + 1;
1462 set->content->latest_generation = r.end;
1463 set->current_generation = r.end;
1464 GNUNET_array_append (set->excluded_generations,
1465 set->excluded_generations_size,
1471 * Called when a client wants to initiate a set operation with another
1472 * peer. Initiates the CADET connection to the listener and sends the
1475 * @param cls client that sent the message
1476 * @param msg message sent by the client
1477 * @return #GNUNET_OK if the message is well-formed
1480 check_client_evaluate (void *cls,
1481 const struct GNUNET_SET_EvaluateMessage *msg)
1483 /* FIXME: suboptimal, even if the context below could be NULL,
1484 there are malformed messages this does not check for... */
1490 * Called when a client wants to initiate a set operation with another
1491 * peer. Initiates the CADET connection to the listener and sends the
1494 * @param cls client that sent the message
1495 * @param msg message sent by the client
1498 handle_client_evaluate (void *cls,
1499 const struct GNUNET_SET_EvaluateMessage *msg)
1501 struct ClientState *cs = cls;
1502 struct Operation *op = GNUNET_new (struct Operation);
1503 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1504 GNUNET_MQ_hd_var_size (incoming_msg,
1505 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1506 struct OperationRequestMessage,
1508 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1509 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1512 GNUNET_MQ_hd_var_size (union_p2p_elements,
1513 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1514 struct GNUNET_SET_ElementMessage,
1516 GNUNET_MQ_hd_var_size (union_p2p_offer,
1517 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1518 struct GNUNET_MessageHeader,
1520 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1521 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1522 struct InquiryMessage,
1524 GNUNET_MQ_hd_var_size (union_p2p_demand,
1525 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1526 struct GNUNET_MessageHeader,
1528 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1529 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1530 struct GNUNET_MessageHeader,
1532 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1533 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1534 struct GNUNET_MessageHeader,
1536 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1537 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1538 struct GNUNET_MessageHeader,
1540 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1541 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1542 struct GNUNET_MessageHeader,
1544 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1545 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1546 struct StrataEstimatorMessage,
1548 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1549 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1550 struct StrataEstimatorMessage,
1552 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1553 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1554 struct GNUNET_SET_ElementMessage,
1556 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1557 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1558 struct IntersectionElementInfoMessage,
1560 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1561 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1564 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1565 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1566 struct IntersectionDoneMessage,
1568 GNUNET_MQ_handler_end ()
1571 const struct GNUNET_MessageHeader *context;
1573 if (NULL == (set = cs->set))
1577 GNUNET_SERVICE_client_drop (cs->client);
1580 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1582 op->peer = msg->target_peer;
1583 op->result_mode = ntohl (msg->result_mode);
1584 op->client_request_id = ntohl (msg->request_id);
1585 op->byzantine = msg->byzantine;
1586 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1587 op->force_full = msg->force_full;
1588 op->force_delta = msg->force_delta;
1589 context = GNUNET_MQ_extract_nested_mh (msg);
1591 /* Advance generation values, so that
1592 mutations won't interfer with the running operation. */
1594 op->generation_created = set->current_generation;
1595 advance_generation (set);
1596 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1600 "Creating new CADET channel to port %s for set operation type %u\n",
1601 GNUNET_h2s (&msg->app_id),
1603 op->channel = GNUNET_CADET_channel_create (cadet,
1607 GNUNET_CADET_OPTION_RELIABLE,
1611 op->mq = GNUNET_CADET_get_mq (op->channel);
1612 op->state = set->vt->evaluate (op,
1614 if (NULL == op->state)
1617 GNUNET_SERVICE_client_drop (cs->client);
1620 GNUNET_SERVICE_client_continue (cs->client);
1625 * Handle an ack from a client, and send the next element. Note
1626 * that we only expect acks for set elements, not after the
1627 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1629 * @param cls client the client
1630 * @param ack the message
1633 handle_client_iter_ack (void *cls,
1634 const struct GNUNET_SET_IterAckMessage *ack)
1636 struct ClientState *cs = cls;
1639 if (NULL == (set = cs->set))
1641 /* client without a set acknowledged receiving a value */
1643 GNUNET_SERVICE_client_drop (cs->client);
1646 if (NULL == set->iter)
1648 /* client sent an ack, but we were not expecting one (as
1649 set iteration has finished) */
1651 GNUNET_SERVICE_client_drop (cs->client);
1654 GNUNET_SERVICE_client_continue (cs->client);
1655 if (ntohl (ack->send_more))
1657 send_client_element (set);
1661 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1663 set->iteration_id++;
1669 * Handle a request from the client to copy a set.
1671 * @param cls the client
1672 * @param mh the message
1675 handle_client_copy_lazy_prepare (void *cls,
1676 const struct GNUNET_MessageHeader *mh)
1678 struct ClientState *cs = cls;
1680 struct LazyCopyRequest *cr;
1681 struct GNUNET_MQ_Envelope *ev;
1682 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1684 if (NULL == (set = cs->set))
1686 /* client without a set requested an operation */
1688 GNUNET_SERVICE_client_drop (cs->client);
1691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692 "Client requested creation of lazy copy\n");
1693 cr = GNUNET_new (struct LazyCopyRequest);
1694 cr->cookie = ++lazy_copy_cookie;
1695 cr->source_set = set;
1696 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1699 ev = GNUNET_MQ_msg (resp_msg,
1700 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1701 resp_msg->cookie = cr->cookie;
1702 GNUNET_MQ_send (set->cs->mq,
1704 GNUNET_SERVICE_client_continue (cs->client);
1709 * Handle a request from the client to connect to a copy of a set.
1711 * @param cls the client
1712 * @param msg the message
1715 handle_client_copy_lazy_connect (void *cls,
1716 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1718 struct ClientState *cs = cls;
1719 struct LazyCopyRequest *cr;
1723 if (NULL != cs->set)
1725 /* There can only be one set per client */
1727 GNUNET_SERVICE_client_drop (cs->client);
1731 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1733 if (cr->cookie == msg->cookie)
1739 if (GNUNET_NO == found)
1741 /* client asked for copy with cookie we don't know */
1743 GNUNET_SERVICE_client_drop (cs->client);
1746 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750 "Client %p requested use of lazy copy\n",
1752 set = GNUNET_new (struct Set);
1753 switch (cr->source_set->operation)
1755 case GNUNET_SET_OPERATION_INTERSECTION:
1756 set->vt = _GSS_intersection_vt ();
1758 case GNUNET_SET_OPERATION_UNION:
1759 set->vt = _GSS_union_vt ();
1766 if (NULL == set->vt->copy_state)
1768 /* Lazy copy not supported for this set operation */
1772 GNUNET_SERVICE_client_drop (cs->client);
1776 set->operation = cr->source_set->operation;
1777 set->state = set->vt->copy_state (cr->source_set->state);
1778 set->content = cr->source_set->content;
1779 set->content->refcount++;
1781 set->current_generation = cr->source_set->current_generation;
1782 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1783 set->excluded_generations
1784 = GNUNET_memdup (cr->source_set->excluded_generations,
1785 set->excluded_generations_size * sizeof (struct GenerationRange));
1787 /* Advance the generation of the new set, so that mutations to the
1788 of the cloned set and the source set are independent. */
1789 advance_generation (set);
1793 GNUNET_SERVICE_client_continue (cs->client);
1798 * Handle a request from the client to cancel a running set operation.
1800 * @param cls the client
1801 * @param msg the message
1804 handle_client_cancel (void *cls,
1805 const struct GNUNET_SET_CancelMessage *msg)
1807 struct ClientState *cs = cls;
1809 struct Operation *op;
1812 if (NULL == (set = cs->set))
1814 /* client without a set requested an operation */
1816 GNUNET_SERVICE_client_drop (cs->client);
1820 for (op = set->ops_head; NULL != op; op = op->next)
1822 if (op->client_request_id == ntohl (msg->request_id))
1828 if (GNUNET_NO == found)
1830 /* It may happen that the operation was already destroyed due to
1831 * the other peer disconnecting. The client may not know about this
1832 * yet and try to cancel the (just barely non-existent) operation.
1833 * So this is not a hard error.
1835 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1836 "Client canceled non-existent op %u\n",
1837 (uint32_t) ntohl (msg->request_id));
1841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1842 "Client requested cancel for op %u\n",
1843 (uint32_t) ntohl (msg->request_id));
1844 _GSS_operation_destroy (op,
1847 GNUNET_SERVICE_client_continue (cs->client);
1852 * Handle a request from the client to accept a set operation that
1853 * came from a remote peer. We forward the accept to the associated
1854 * operation for handling
1856 * @param cls the client
1857 * @param msg the message
1860 handle_client_accept (void *cls,
1861 const struct GNUNET_SET_AcceptMessage *msg)
1863 struct ClientState *cs = cls;
1865 struct Operation *op;
1866 struct GNUNET_SET_ResultMessage *result_message;
1867 struct GNUNET_MQ_Envelope *ev;
1868 struct Listener *listener;
1870 if (NULL == (set = cs->set))
1872 /* client without a set requested to accept */
1874 GNUNET_SERVICE_client_drop (cs->client);
1877 op = get_incoming (ntohl (msg->accept_reject_id));
1880 /* It is not an error if the set op does not exist -- it may
1881 * have been destroyed when the partner peer disconnected. */
1882 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1883 "Client %p accepted request %u of listener %p that is no longer active\n",
1885 ntohl (msg->accept_reject_id),
1887 ev = GNUNET_MQ_msg (result_message,
1888 GNUNET_MESSAGE_TYPE_SET_RESULT);
1889 result_message->request_id = msg->request_id;
1890 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1891 GNUNET_MQ_send (set->cs->mq,
1893 GNUNET_SERVICE_client_continue (cs->client);
1896 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897 "Client accepting request %u\n",
1898 (uint32_t) ntohl (msg->accept_reject_id));
1899 listener = op->listener;
1900 op->listener = NULL;
1901 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1905 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1908 op->client_request_id = ntohl (msg->request_id);
1909 op->result_mode = ntohl (msg->result_mode);
1910 op->byzantine = msg->byzantine;
1911 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1912 op->force_full = msg->force_full;
1913 op->force_delta = msg->force_delta;
1915 /* Advance generation values, so that future mutations do not
1916 interfer with the running operation. */
1917 op->generation_created = set->current_generation;
1918 advance_generation (set);
1919 GNUNET_assert (NULL == op->state);
1920 op->state = set->vt->accept (op);
1921 if (NULL == op->state)
1924 GNUNET_SERVICE_client_drop (cs->client);
1927 /* Now allow CADET to continue, as we did not do this in
1928 #handle_incoming_msg (as we wanted to first see if the
1929 local client would accept the request). */
1930 GNUNET_CADET_receive_done (op->channel);
1931 GNUNET_SERVICE_client_continue (cs->client);
1936 * Called to clean up, after a shutdown has been requested.
1938 * @param cls closure, NULL
1941 shutdown_task (void *cls)
1943 /* Delay actual shutdown to allow service to disconnect clients */
1944 in_shutdown = GNUNET_YES;
1945 if (0 == num_clients)
1949 GNUNET_CADET_disconnect (cadet);
1953 GNUNET_STATISTICS_destroy (_GSS_statistics,
1955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956 "handled shutdown request\n");
1961 * Function called by the service's run
1962 * method to run service-specific setup code.
1964 * @param cls closure
1965 * @param cfg configuration to use
1966 * @param service the initialized service
1970 const struct GNUNET_CONFIGURATION_Handle *cfg,
1971 struct GNUNET_SERVICE_Handle *service)
1973 /* FIXME: need to modify SERVICE (!) API to allow
1974 us to run a shutdown task *after* clients were
1975 forcefully disconnected! */
1976 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1978 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1980 cadet = GNUNET_CADET_connect (cfg);
1983 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1984 _("Could not connect to CADET service\n"));
1985 GNUNET_SCHEDULER_shutdown ();
1992 * Define "main" method using service macro.
1996 GNUNET_SERVICE_OPTION_NONE,
1999 &client_disconnect_cb,
2001 GNUNET_MQ_hd_fixed_size (client_accept,
2002 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
2003 struct GNUNET_SET_AcceptMessage,
2005 GNUNET_MQ_hd_fixed_size (client_iter_ack,
2006 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
2007 struct GNUNET_SET_IterAckMessage,
2009 GNUNET_MQ_hd_var_size (client_mutation,
2010 GNUNET_MESSAGE_TYPE_SET_ADD,
2011 struct GNUNET_SET_ElementMessage,
2013 GNUNET_MQ_hd_fixed_size (client_create_set,
2014 GNUNET_MESSAGE_TYPE_SET_CREATE,
2015 struct GNUNET_SET_CreateMessage,
2017 GNUNET_MQ_hd_fixed_size (client_iterate,
2018 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
2019 struct GNUNET_MessageHeader,
2021 GNUNET_MQ_hd_var_size (client_evaluate,
2022 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
2023 struct GNUNET_SET_EvaluateMessage,
2025 GNUNET_MQ_hd_fixed_size (client_listen,
2026 GNUNET_MESSAGE_TYPE_SET_LISTEN,
2027 struct GNUNET_SET_ListenMessage,
2029 GNUNET_MQ_hd_fixed_size (client_reject,
2030 GNUNET_MESSAGE_TYPE_SET_REJECT,
2031 struct GNUNET_SET_RejectMessage,
2033 GNUNET_MQ_hd_var_size (client_mutation,
2034 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2035 struct GNUNET_SET_ElementMessage,
2037 GNUNET_MQ_hd_fixed_size (client_cancel,
2038 GNUNET_MESSAGE_TYPE_SET_CANCEL,
2039 struct GNUNET_SET_CancelMessage,
2041 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
2042 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
2043 struct GNUNET_MessageHeader,
2045 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
2046 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
2047 struct GNUNET_SET_CopyLazyConnectMessage,
2049 GNUNET_MQ_handler_end ());
2052 /* end of gnunet-service-set.c */