+ * Handle a request for a set operation from another peer. Checks if we
+ * have a listener waiting for such a request (and in that case initiates
+ * asking the listener about accepting the connection). If no listener
+ * is waiting, we queue the operation request in hope that a listener
+ * shows up soon (before timeout).
+ *
+ * This msg is expected as the first and only msg handled through the
+ * non-operation bound virtual table, acceptance of this operation replaces
+ * our virtual table and subsequent msgs would be routed differently (as
+ * we then know what type of operation this is).
+ *
+ * @param op the operation state
+ * @param mh the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static int
+handle_incoming_msg (struct Operation *op,
+ const struct GNUNET_MessageHeader *mh)
+{
+ const struct OperationRequestMessage *msg;
+ struct Listener *listener = op->listener;
+ struct OperationSpecification *spec;
+ const struct GNUNET_MessageHeader *nested_context;
+
+ msg = (const struct OperationRequestMessage *) mh;
+ GNUNET_assert (GNUNET_YES == op->is_incoming);
+ if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ /* double operation request */
+ if (NULL != op->spec)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ spec = GNUNET_new (struct OperationSpecification);
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ if ( (NULL != nested_context) &&
+ (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+ {
+ GNUNET_break_op (0);
+ GNUNET_free (spec);
+ return GNUNET_SYSERR;
+ }
+ /* Make a copy of the nested_context (application-specific context
+ information that is opaque to set) so we can pass it to the
+ listener later on */
+ if (NULL != nested_context)
+ spec->context_msg = GNUNET_copy_message (nested_context);
+ spec->operation = ntohl (msg->operation);
+ spec->app_id = listener->app_id;
+ spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+ UINT32_MAX);
+ spec->peer = op->peer;
+ spec->remote_element_count = ntohl (msg->element_count);
+ op->spec = spec;
+
+ listener = op->listener;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received P2P operation request (op %u, port %s) for active listener\n",
+ ntohl (msg->operation),
+ GNUNET_h2s (&listener->app_id));
+ incoming_suggest (op,
+ listener);
+ return GNUNET_OK;
+}
+
+
+static void
+execute_add (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ const struct GNUNET_SET_ElementMessage *msg;
+ struct GNUNET_SET_Element el;
+ struct ElementEntry *ee;
+ struct GNUNET_HashCode hash;
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type));
+
+ msg = (const struct GNUNET_SET_ElementMessage *) m;
+ el.size = ntohs (m->size) - sizeof *msg;
+ el.data = &msg[1];
+ el.element_type = ntohs (msg->element_type);
+ GNUNET_SET_element_hash (&el, &hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client inserts element %s of size %u\n",
+ GNUNET_h2s (&hash),
+ el.size);
+
+ ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+ &hash);
+
+ if (NULL == ee)
+ {
+ ee = GNUNET_malloc (el.size + sizeof *ee);
+ ee->element.size = el.size;
+ GNUNET_memcpy (&ee[1],
+ el.data,
+ el.size);
+ ee->element.data = &ee[1];
+ ee->element.element_type = el.element_type;
+ ee->remote = GNUNET_NO;
+ ee->mutations = NULL;
+ ee->mutations_size = 0;
+ ee->element_hash = hash;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_put (set->content->elements,
+ &ee->element_hash,
+ ee,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
+ else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
+ {
+ /* same element inserted twice */
+ return;
+ }
+
+ {
+ struct MutationEvent mut = {
+ .generation = set->current_generation,
+ .added = GNUNET_YES
+ };
+ GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+ }
+
+ set->vt->add (set->state, ee);
+}
+
+
+static void
+execute_remove (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ const struct GNUNET_SET_ElementMessage *msg;
+ struct GNUNET_SET_Element el;
+ struct ElementEntry *ee;
+ struct GNUNET_HashCode hash;
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type));
+
+ msg = (const struct GNUNET_SET_ElementMessage *) m;
+ el.size = ntohs (m->size) - sizeof *msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client removes element of size %u\n",
+ el.size);
+ el.data = &msg[1];
+ el.element_type = ntohs (msg->element_type);
+ GNUNET_SET_element_hash (&el, &hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+ &hash);
+ if (NULL == ee)
+ {
+ /* Client tried to remove non-existing element. */
+ return;
+ }
+ if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
+ {
+ /* Client tried to remove element twice */
+ return;
+ }
+ else
+ {
+ struct MutationEvent mut = {
+ .generation = set->current_generation,
+ .added = GNUNET_NO
+ };
+ GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
+ }
+ set->vt->remove (set->state, ee);
+}
+
+
+
+static void
+execute_mutation (struct Set *set,
+ const struct GNUNET_MessageHeader *m)
+{
+ switch (ntohs (m->type))
+ {
+ case GNUNET_MESSAGE_TYPE_SET_ADD:
+ execute_add (set, m);
+ break;
+ case GNUNET_MESSAGE_TYPE_SET_REMOVE:
+ execute_remove (set, m);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
+
+/**
+ * Send the next element of a set to the set's client. The next element is given by
+ * the set's current hashmap iterator. The set's iterator will be set to NULL if there
+ * are no more elements in the set. The caller must ensure that the set's iterator is
+ * valid.
+ *
+ * The client will acknowledge each received element with a
+ * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
+ * #handle_client_iter_ack() will then trigger the next transmission.
+ * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
+ *
+ * @param set set that should send its next element to its client
+ */
+static void
+send_client_element (struct Set *set)
+{
+ int ret;
+ struct ElementEntry *ee;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_IterResponseMessage *msg;
+
+ GNUNET_assert (NULL != set->iter);
+
+again:
+
+ ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
+ NULL,
+ (const void **) &ee);
+ if (GNUNET_NO == ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iteration on %p done.\n",
+ (void *) set);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
+ set->iter = NULL;
+ set->iteration_id++;
+
+ GNUNET_assert (set->content->iterator_count > 0);
+ set->content->iterator_count -= 1;
+
+ if (0 == set->content->iterator_count)
+ {
+ while (NULL != set->content->pending_mutations_head)
+ {
+ struct PendingMutation *pm;
+
+ pm = set->content->pending_mutations_head;
+ GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
+ set->content->pending_mutations_tail,
+ pm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing pending mutation on %p.\n",
+ (void *) pm->set);
+ execute_mutation (pm->set, pm->mutation_message);
+ GNUNET_free (pm->mutation_message);
+ GNUNET_free (pm);
+ }
+ }
+
+ }
+ else
+ {
+ GNUNET_assert (NULL != ee);
+
+ if (GNUNET_NO == is_element_of_iteration (ee, set))
+ goto again;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending iteration element on %p.\n",
+ (void *) set);
+ ev = GNUNET_MQ_msg_extra (msg,
+ ee->element.size,
+ GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
+ GNUNET_memcpy (&msg[1],
+ ee->element.data,
+ ee->element.size);
+ msg->element_type = htons (ee->element.element_type);
+ msg->iteration_id = htons (set->iteration_id);
+ }
+ GNUNET_MQ_send (set->client_mq, ev);
+}
+
+
+/**
+ * Called when a client wants to iterate the elements of a set.
+ * Checks if we have a set associated with the client and if we
+ * can right now start an iteration. If all checks out, starts
+ * sending the elements of the set to the client.