*/
struct GNUNET_HashCode app_id;
+ /**
+ * The port we are listening on with CADET.
+ */
+ struct GNUNET_CADET_Port *open_port;
+
/**
* The type of the operation.
*/
GNUNET_MQ_destroy (listener->client_mq);
listener->client_mq = NULL;
}
+ GNUNET_CADET_close_port (listener->open_port);
GNUNET_CONTAINER_DLL_remove (listeners_head,
listeners_tail,
listener);
&gc);
}
-int
+
+static int
is_excluded_generation (unsigned int generation,
struct GenerationRange *excluded,
unsigned int excluded_size)
}
-int
+static int
is_element_of_generation (struct ElementEntry *ee,
unsigned int query_generation,
struct GenerationRange *excluded,
handle_client_disconnect (void *cls,
struct GNUNET_SERVER_Client *client)
{
- struct Set *set;
struct Listener *listener;
+ struct Set *set;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"client disconnected, cleaning up\n");
}
-/**
- * Find a listener that is interested in the given operation type
- * and application id.
- *
- * @param op operation type to look for
- * @param app_id application id to look for
- * @return a matching listener, or NULL if no listener matches the
- * given operation and application id
- */
-static struct Listener *
-listener_get_by_target (enum GNUNET_SET_OperationType op,
- const struct GNUNET_HashCode *app_id)
-{
- struct Listener *listener;
-
- for (listener = listeners_head; NULL != listener; listener = listener->next)
- if ( (listener->operation == op) &&
- (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
- return listener;
- return NULL;
-}
-
-
/**
* Suggest the given request to the listener. The listening client can
* then accept or reject the remote request.
incoming->suggest_id);
cmsg->accept_id = htonl (incoming->suggest_id);
cmsg->peer_id = incoming->spec->peer;
- GNUNET_MQ_send (listener->client_mq, mqm);
+ GNUNET_MQ_send (listener->client_mq,
+ mqm);
}
const struct GNUNET_MessageHeader *mh)
{
const struct OperationRequestMessage *msg;
- struct Listener *listener;
+ struct Listener *listener = op->listener;
struct OperationSpecification *spec;
const struct GNUNET_MessageHeader *nested_context;
if (NULL != nested_context)
spec->context_msg = GNUNET_copy_message (nested_context);
spec->operation = ntohl (msg->operation);
- spec->app_id = msg->app_id;
+ spec->app_id = listener->app_id;
spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
UINT32_MAX);
spec->peer = op->peer;
spec->remote_element_count = ntohl (msg->element_count);
op->spec = spec;
- listener = listener_get_by_target (ntohl (msg->operation),
- &msg->app_id);
- if (NULL == listener)
- {
- GNUNET_break (NULL != op->timeout_task);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No matching listener for incoming request (op %u, app %s), waiting with timeout\n",
- ntohl (msg->operation),
- GNUNET_h2s (&msg->app_id));
- return GNUNET_OK;
- }
+ listener = op->listener;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received P2P operation request (op %u, app %s) for active listener\n",
+ "Received P2P operation request (op %u, port %s) for active listener\n",
ntohl (msg->operation),
- GNUNET_h2s (&msg->app_id));
- incoming_suggest (op, listener);
+ GNUNET_h2s (&listener->app_id));
+ incoming_suggest (op,
+ listener);
return GNUNET_OK;
}
}
+/**
+ * Timeout happens iff:
+ * - we suggested an operation to our listener,
+ * but did not receive a response in time
+ * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ *
+ * @param cls channel context
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls)
+{
+ struct Operation *incoming = cls;
+
+ incoming->timeout_task = NULL;
+ GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Remote peer's incoming request timed out\n");
+ incoming_destroy (incoming);
+}
+
+
+/**
+ * Terminates an incoming operation in case we have not yet received an
+ * operation request. Called by the channel destruction handler.
+ *
+ * @param op the channel context
+ */
+static void
+handle_incoming_disconnect (struct Operation *op)
+{
+ GNUNET_assert (GNUNET_YES == op->is_incoming);
+ /* channel is already dead, incoming_destroy must not
+ * destroy it ... */
+ op->channel = NULL;
+ incoming_destroy (op);
+ op->vt = NULL;
+}
+
+
+/**
+ * Method called whenever another peer has added us to a channel the
+ * other peer initiated. Only called (once) upon reception of data
+ * from a channel we listen on.
+ *
+ * The channel context represents the operation itself and gets added
+ * to a DLL, from where it gets looked up when our local listener
+ * client responds to a proposed/suggested operation or connects and
+ * associates with this operation.
+ *
+ * @param cls closure
+ * @param channel new handle to the channel
+ * @param initiator peer that started the channel
+ * @param port Port this channel is for.
+ * @param options Unused.
+ * @return initial channel context for the channel
+ * returns NULL on error
+ */
+static void *
+channel_new_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_HashCode *port,
+ enum GNUNET_CADET_ChannelOption options)
+{
+ static const struct SetVT incoming_vt = {
+ .msg_handler = &handle_incoming_msg,
+ .peer_disconnect = &handle_incoming_disconnect
+ };
+ struct Listener *listener = cls;
+ struct Operation *incoming;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New incoming channel\n");
+ incoming = GNUNET_new (struct Operation);
+ incoming->listener = listener;
+ incoming->is_incoming = GNUNET_YES;
+ incoming->peer = *initiator;
+ incoming->channel = channel;
+ incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
+ incoming->vt = &incoming_vt;
+ incoming->timeout_task
+ = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
+ &incoming_timeout_cb,
+ incoming);
+ GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
+ incoming_tail,
+ incoming);
+ // incoming_suggest (incoming,
+ // listener);
+ return incoming;
+}
+
+
/**
* Called when a client wants to create a new listener.
*
GNUNET_CONTAINER_DLL_insert_tail (listeners_head,
listeners_tail,
listener);
+ listener->open_port = GNUNET_CADET_open_port (cadet,
+ &msg->app_id,
+ &channel_new_cb,
+ listener);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New listener created (op %u, app %s)\n",
+ "New listener created (op %u, port %s)\n",
listener->operation,
GNUNET_h2s (&listener->app_id));
incoming_suggest (op,
listener);
}
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
}
r);
}
+
/**
* Called when a client wants to initiate a set operation with another
* peer. Initiates the CADET connection to the listener and sends the
GNUNET_CONTAINER_DLL_insert (set->ops_head,
set->ops_tail,
op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new CADET channel to port %s\n",
+ GNUNET_h2s (&msg->app_id));
op->channel = GNUNET_CADET_channel_create (cadet,
op,
&msg->target_peer,
- GC_u2h (GNUNET_APPLICATION_TYPE_SET),
+ &msg->app_id,
GNUNET_CADET_OPTION_RELIABLE);
op->mq = GNUNET_CADET_mq_create (op->channel);
set->vt->evaluate (op,
return;
}
- if (NULL == set->vt->copy_state) {
+ if (NULL == set->vt->copy_state)
+ {
/* Lazy copy not supported for this set operation */
GNUNET_break (0);
GNUNET_free (set);
}
-/**
- * Timeout happens iff:
- * - we suggested an operation to our listener,
- * but did not receive a response in time
- * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
- *
- * @param cls channel context
- * @param tc context information (why was this task triggered now)
- */
-static void
-incoming_timeout_cb (void *cls)
-{
- struct Operation *incoming = cls;
-
- incoming->timeout_task = NULL;
- GNUNET_assert (GNUNET_YES == incoming->is_incoming);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Remote peer's incoming request timed out\n");
- incoming_destroy (incoming);
-}
-
-
-/**
- * Terminates an incoming operation in case we have not yet received an
- * operation request. Called by the channel destruction handler.
- *
- * @param op the channel context
- */
-static void
-handle_incoming_disconnect (struct Operation *op)
-{
- GNUNET_assert (GNUNET_YES == op->is_incoming);
- /* channel is already dead, incoming_destroy must not
- * destroy it ... */
- op->channel = NULL;
- incoming_destroy (op);
- op->vt = NULL;
-}
-
-
-/**
- * Method called whenever another peer has added us to a channel the
- * other peer initiated. Only called (once) upon reception of data
- * with a message type which was subscribed to in
- * GNUNET_CADET_connect().
- *
- * The channel context represents the operation itself and gets added to a DLL,
- * from where it gets looked up when our local listener client responds
- * to a proposed/suggested operation or connects and associates with this operation.
- *
- * @param cls closure
- * @param channel new handle to the channel
- * @param initiator peer that started the channel
- * @param port Port this channel is for.
- * @param options Unused.
- * @return initial channel context for the channel
- * returns NULL on error
- */
-static void *
-channel_new_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_HashCode *port,
- enum GNUNET_CADET_ChannelOption options)
-{
- static const struct SetVT incoming_vt = {
- .msg_handler = &handle_incoming_msg,
- .peer_disconnect = &handle_incoming_disconnect
- };
- struct Operation *incoming;
-
- if (0 != memcmp (GC_u2h (GNUNET_APPLICATION_TYPE_SET), port, sizeof (*port)))
- {
- GNUNET_break (0);
- GNUNET_CADET_channel_destroy (channel);
- return NULL;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New incoming channel\n");
- incoming = GNUNET_new (struct Operation);
- incoming->is_incoming = GNUNET_YES;
- incoming->peer = *initiator;
- incoming->channel = channel;
- incoming->mq = GNUNET_CADET_mq_create (incoming->channel);
- incoming->vt = &incoming_vt;
- incoming->timeout_task
- = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
- &incoming_timeout_cb,
- incoming);
- GNUNET_CONTAINER_DLL_insert_tail (incoming_head,
- incoming_tail,
- incoming);
- return incoming;
-}
-
-
/**
* Function called whenever a channel is destroyed. Should clean up
* any associated state. It must NOT call
* GNUNET_CADET_channel_destroy() on the channel.
*
* The peer_disconnect function is part of a a virtual table set initially either
- * when a peer creates a new channel with us (#channel_new_cb()), or once we create
+ * when a peer creates a new channel with us, or once we create
* a new channel ourselves (evaluate).
*
* Once we know the exact type of operation (union/intersection), the vt is
* received via a cadet channel.
*
* The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us (channel_new_cb), or once we create a new channel
+ * creates a new channel with us, or once we create a new channel
* ourselves (evaluate).
*
* Once we know the exact type of operation (union/intersection), the vt is
configuration = cfg;
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
GNUNET_SERVER_disconnect_notify (server,
- &handle_client_disconnect, NULL);
+ &handle_client_disconnect,
+ NULL);
GNUNET_SERVER_add_handlers (server,
server_handlers);
_GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
- cadet = GNUNET_CADET_connect (cfg, NULL,
+ cadet = GNUNET_CADET_connect (cfg,
+ NULL,
&channel_end_cb,
cadet_handlers);
- GNUNET_CADET_open_port (cadet,
- GC_u2h (GNUNET_APPLICATION_TYPE_SET),
- &channel_new_cb, NULL);
if (NULL == cadet)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,