+/**
+ * Suggest the given request to the listener. The listening client can
+ * then accept or reject the remote request.
+ *
+ * @param incoming the incoming peer with the request to suggest
+ * @param listener the listener to suggest the request to
+ */
+static void
+incoming_suggest (struct Operation *incoming,
+ struct Listener *listener)
+{
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SET_RequestMessage *cmsg;
+
+ GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+ GNUNET_assert (NULL != incoming->spec);
+ GNUNET_assert (0 == incoming->suggest_id);
+ incoming->suggest_id = suggest_id++;
+ if (0 == suggest_id)
+ suggest_id++;
+ GNUNET_assert (NULL != incoming->timeout_task);
+ GNUNET_SCHEDULER_cancel (incoming->timeout_task);
+ incoming->timeout_task = NULL;
+ mqm = GNUNET_MQ_msg_nested_mh (cmsg,
+ GNUNET_MESSAGE_TYPE_SET_REQUEST,
+ incoming->spec->context_msg);
+ GNUNET_assert (NULL != mqm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Suggesting incoming request with accept id %u to listener\n",
+ incoming->suggest_id);
+ cmsg->accept_id = htonl (incoming->suggest_id);
+ cmsg->peer_id = incoming->spec->peer;
+ GNUNET_MQ_send (listener->client_mq,
+ mqm);
+}
+
+
+/**
+ * Handle a request for a set operation from another peer. Checks if we
+ * have a listener waiting for such a request (and in that case initiates
+ * asking the listener about accepting the connection). If no listener
+ * is waiting, we queue the operation request in hope that a listener
+ * shows up soon (before timeout).
+ *
+ * This msg is expected as the first and only msg handled through the
+ * non-operation bound virtual table, acceptance of this operation replaces
+ * our virtual table and subsequent msgs would be routed differently (as
+ * we then know what type of operation this is).
+ *
+ * @param op the operation state
+ * @param mh the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static int
+handle_incoming_msg (struct Operation *op,
+ const struct GNUNET_MessageHeader *mh)
+{
+ const struct OperationRequestMessage *msg;
+ struct Listener *listener = op->listener;
+ struct OperationSpecification *spec;
+ const struct GNUNET_MessageHeader *nested_context;
+
+ msg = (const struct OperationRequestMessage *) mh;
+ GNUNET_assert (GNUNET_YES == op->is_incoming);
+ if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ /* double operation request */
+ if (NULL != op->spec)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ spec = GNUNET_new (struct OperationSpecification);
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ if ( (NULL != nested_context) &&
+ (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+ {
+ GNUNET_break_op (0);
+ GNUNET_free (spec);
+ return GNUNET_SYSERR;
+ }
+ /* Make a copy of the nested_context (application-specific context
+ information that is opaque to set) so we can pass it to the
+ listener later on */
+ if (NULL != nested_context)
+ spec->context_msg = GNUNET_copy_message (nested_context);
+ spec->operation = ntohl (msg->operation);
+ spec->app_id = listener->app_id;
+ spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+ UINT32_MAX);
+ spec->peer = op->peer;
+ spec->remote_element_count = ntohl (msg->element_count);
+ op->spec = spec;
+
+ listener = op->listener;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received P2P operation request (op %u, port %s) for active listener\n",
+ ntohl (msg->operation),
+ GNUNET_h2s (&listener->app_id));
+ incoming_suggest (op,
+ listener);
+ return GNUNET_OK;
+}
+
+
+static void
+execute_add (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ const struct GNUNET_SET_ElementMessage *msg;
+ struct GNUNET_SET_Element el;
+ struct ElementEntry *ee;
+ struct GNUNET_HashCode hash;
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
+
+ msg = (const struct GNUNET_SET_ElementMessage *) m;
+ el.size = ntohs (m->size) - sizeof *msg;
+ el.data = &msg[1];
+ el.element_type = ntohs (msg->element_type);
+ GNUNET_SET_element_hash (&el, &hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client inserts element %s of size %u\n",
+ GNUNET_h2s (&hash),
+ el.size);
+
+ ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+ &hash);
+
+ if (NULL == ee)
+ {
+ ee = GNUNET_malloc (el.size + sizeof *ee);
+ ee->element.size = el.size;
+ GNUNET_memcpy (&ee[1],
+ el.data,
+ el.size);
+ ee->element.data = &ee[1];
+ ee->element.element_type = el.element_type;
+ ee->remote = GNUNET_NO;
+ ee->mutations = NULL;
+ ee->mutations_size = 0;
+ ee->element_hash = hash;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_put (set->content->elements,
+ &ee->element_hash,
+ ee,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
+ else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
+ {
+ /* same element inserted twice */
+ return;
+ }
+
+ {
+ struct MutationEvent mut = {
+ .generation = set->current_generation,
+ .added = GNUNET_YES
+ };
+ GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+ }
+
+ set->vt->add (set->state, ee);
+}
+
+
+static void
+execute_remove (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ const struct GNUNET_SET_ElementMessage *msg;
+ struct GNUNET_SET_Element el;
+ struct ElementEntry *ee;
+ struct GNUNET_HashCode hash;
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
+
+ msg = (const struct GNUNET_SET_ElementMessage *) m;
+ el.size = ntohs (m->size) - sizeof *msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client removes element of size %u\n",
+ el.size);
+ el.data = &msg[1];
+ el.element_type = ntohs (msg->element_type);
+ GNUNET_SET_element_hash (&el, &hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+ &hash);
+ if (NULL == ee)
+ {
+ /* Client tried to remove non-existing element. */
+ return;
+ }
+ if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
+ {
+ /* Client tried to remove element twice */
+ return;
+ }
+ else
+ {
+ struct MutationEvent mut = {
+ .generation = set->current_generation,
+ .added = GNUNET_NO
+ };
+ GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+ }
+ set->vt->remove (set->state, ee);
+}
+
+
+
+static void
+execute_mutation (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ switch (ntohs (m->type))
+ {
+ case GNUNET_MESSAGE_TYPE_SET_ADD:
+ execute_add (set, m);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_REMOVE:
+ execute_remove (set, m);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
+
+/**
+ * Send the next element of a set to the set's client. The next element is given by
+ * the set's current hashmap iterator. The set's iterator will be set to NULL if there
+ * are no more elements in the set. The caller must ensure that the set's iterator is
+ * valid.
+ *
+ * The client will acknowledge each received element with a
+ * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
+ * #handle_client_iter_ack() will then trigger the next transmission.
+ * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
+ *
+ * @param set set that should send its next element to its client
+ */
+static void
+send_client_element (struct Set *set)
+{
+ int ret;
+ struct ElementEntry *ee;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_IterResponseMessage *msg;
+
+ GNUNET_assert (NULL != set->iter);
+
+again:
+
+ ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
+ NULL,
+ (const void **) &ee);
+ if (GNUNET_NO == ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iteration on %p done.\n",
+ (void *) set);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
+ set->iter = NULL;
+ set->iteration_id++;
+
+ GNUNET_assert (set->content->iterator_count > 0);
+ set->content->iterator_count -= 1;
+
+ if (0 == set->content->iterator_count)
+ {
+ while (NULL != set->content->pending_mutations_head)
+ {
+ struct PendingMutation *pm;
+
+ pm = set->content->pending_mutations_head;
+ GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
+ set->content->pending_mutations_tail,
+ pm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing pending mutation on %p.\n",
+ (void *) pm->set);
+ execute_mutation (pm->set, pm->mutation_message);
+ GNUNET_free (pm->mutation_message);
+ GNUNET_free (pm);
+ }
+ }
+
+ }
+ else
+ {
+ GNUNET_assert (NULL != ee);
+
+ if (GNUNET_NO == is_element_of_iteration (ee, set))
+ goto again;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending iteration element on %p.\n",
+ (void *) set);
+ ev = GNUNET_MQ_msg_extra (msg,
+ ee->element.size,
+ GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
+ GNUNET_memcpy (&msg[1],
+ ee->element.data,
+ ee->element.size);
+ msg->element_type = htons (ee->element.element_type);
+ msg->iteration_id = htons (set->iteration_id);
+ }
+ GNUNET_MQ_send (set->client_mq, ev);
+}
+
+
+/**
+ * Called when a client wants to iterate the elements of a set.
+ * Checks if we have a set associated with the client and if we
+ * can right now start an iteration. If all checks out, starts
+ * sending the elements of the set to the client.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_iterate (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Set *set;
+
+ set = set_get (client);
+ if (NULL == set)
+ {
+ /* attempt to iterate over a non existing set */
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ if (NULL != set->iter)
+ {
+ /* Only one concurrent iterate-action allowed per set */
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iterating set %p in gen %u with %u content elements\n",
+ (void *) set,
+ set->current_generation,
+ GNUNET_CONTAINER_multihashmap_size (set->content->elements));
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
+ set->content->iterator_count += 1;
+ set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
+ set->iter_generation = set->current_generation;
+ send_client_element (set);
+}
+
+
+/**
+ * Called when a client wants to create a new set. This is typically
+ * the first request from a client, and includes the type of set
+ * operation to be performed.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_create_set (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ const struct GNUNET_SET_CreateMessage *msg;
+ struct Set *set;
+
+ msg = (const struct GNUNET_SET_CreateMessage *) m;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client created new set (operation %u)\n",
+ ntohl (msg->operation));
+ if (NULL != set_get (client))
+ {
+ /* There can only be one set per client */
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ set = GNUNET_new (struct Set);
+ switch (ntohl (msg->operation))
+ {
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ set->vt = _GSS_intersection_vt ();
+ break;
+ case GNUNET_SET_OPERATION_UNION:
+ set->vt = _GSS_union_vt ();
+ break;
+ default:
+ GNUNET_free (set);
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ set->operation = ntohl (msg->operation);
+ set->state = set->vt->create ();
+ if (NULL == set->state)
+ {
+ /* initialization failed (i.e. out of memory) */
+ GNUNET_free (set);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ set->content = GNUNET_new (struct SetContent);
+ set->content->refcount = 1;
+ set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ set->client = client;
+ set->client_mq = GNUNET_MQ_queue_for_server_client (client);
+ GNUNET_CONTAINER_DLL_insert (sets_head,
+ sets_tail,
+ set);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
+}
+
+
+/**
+ * Timeout happens iff:
+ * - we suggested an operation to our listener,
+ * but did not receive a response in time
+ * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ *
+ * @param cls channel context
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls)