* We sent the request message, and expect a strata estimator
*/
PHASE_EXPECT_SE,
+
/**
- * We sent the strata estimator, and expect an IBF. This phase is entered once
+ * We sent the strata estimator, and expect an IBF. This phase is entered once
* upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
- *
+ *
* After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
*/
PHASE_EXPECT_IBF,
+
/**
* Continuation for multi part IBFs.
*/
PHASE_EXPECT_IBF_CONT,
+
/**
* We are sending request and elements,
* and thus only expect elements from the other peer.
- *
+ *
* We are currently decoding an IBF until it can no longer be decoded,
* we currently send requests and expect elements
* The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
*/
PHASE_EXPECT_ELEMENTS,
+
/**
* We are expecting elements and requests, and send
* requested elements back to the other peer.
- *
+ *
* We are in this phase if we have SENT an IBF for the remote peer to decode.
* We expect requests, send elements or could receive an new IBF, which takes
* us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
- *
+ *
* The remote peer is thus in:
- * PHASE_EXPECT_ELEMENTS
+ * PHASE_EXPECT_ELEMENTS
*/
PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+
/**
* The protocol is over.
* Results may still have to be sent to the client.
/**
- * State of an evaluate operation
- * with another peer.
+ * State of an evaluate operation with another peer.
*/
struct OperationState
{
- /**
- * Number of ibf buckets received
- */
- unsigned int ibf_buckets_received;
/**
* Copy of the set's strata estimator at the time of
* Did we send the client that we are done?
*/
int client_done_sent;
+
+ /**
+ * Number of ibf buckets received
+ */
+ unsigned int ibf_buckets_received;
+
};
/**
* Destroy the union operation. Only things specific to the union operation are destroyed.
- *
+ *
* @param op union operation to destroy
*/
static void
msg->request_id = htonl (op->spec->client_request_id);
msg->element_type = htons (0);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
- _GSS_operation_destroy (op);
+ _GSS_operation_destroy (op, GNUNET_YES);
}
}
-/**
- * Send a request for the evaluate operation to a remote peer
- *
- * @param op operation with the other peer
- */
-static void
-send_operation_request (struct Operation *op)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct OperationRequestMessage *msg;
-
- ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- op->spec->context_msg);
-
- if (NULL == ev)
- {
- /* the context message is too large */
- GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (op->spec->set->client);
- return;
- }
- msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
- msg->app_id = op->spec->app_id;
- msg->salt = htonl (op->spec->salt);
- GNUNET_MQ_send (op->mq, ev);
-
- if (NULL != op->spec->context_msg)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
-
- if (NULL != op->spec->context_msg)
- {
- GNUNET_free (op->spec->context_msg);
- op->spec->context_msg = NULL;
- }
-}
-
-
/**
* Iterator to create the mapping between ibf keys
* and element entries.
* @param ee the element entry
*/
static void
-op_register_element (struct Operation *op, struct ElementEntry *ee)
+op_register_element (struct Operation *op,
+ struct ElementEntry *ee)
{
int ret;
struct IBF_Key ibf_key;
send_cls.ibf_key = ibf_key;
send_cls.op = op;
- GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val,
- &send_element_iterator, &send_cls);
+ (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+ (uint32_t) ibf_key.key_val,
+ &send_element_iterator, &send_cls);
}
next_order++;
if (next_order <= MAX_IBF_ORDER)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"decoding failed, sending larger ibf (size %u)\n",
1<<next_order);
send_ibf (op, next_order);
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *msg;
- /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
+ /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
* the effort additional complexity. */
ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
rm->request_id = htonl (op->spec->client_request_id);
- rm->element_type = element->type;
+ rm->element_type = element->element_type;
memcpy (&rm[1], element->data, element->size);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
}
struct Operation *op = cls;
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
+ int keep = op->keep;
+
ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
rm->request_id = htonl (op->spec->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
rm->element_type = htons (0);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
- _GSS_operation_destroy (op);
+ _GSS_operation_destroy (op, GNUNET_YES);
+ if (GNUNET_YES == keep)
+ GNUNET_free (op);
}
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
rm->request_id = htonl (op->spec->client_request_id);
- rm->element_type = element->type;
+ rm->element_type = element->element_type;
memcpy (&rm[1], element->data, element->size);
if (ke->next_colliding == NULL)
{
if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
{
+ /* prevent that the op is free'd by the tunnel end handler */
+ op->keep = GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
- GNUNET_assert (NULL == op->state->full_result_iter);
+ GNUNET_assert (NULL == op->state->full_result_iter);
op->state->full_result_iter =
GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
send_remaining_elements (op);
struct ElementEntry *ee;
uint16_t element_size;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "got element from peer\n");
if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
(op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
return;
}
element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
- ee = GNUNET_malloc (sizeof *ee + element_size);
+ ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
memcpy (&ee[1], &mh[1], element_size);
ee->element.size = element_size;
ee->element.data = &ee[1];
ee->remote = GNUNET_YES;
- GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
+ GNUNET_CRYPTO_hash (ee->element.data,
+ ee->element.size,
+ &ee->element_hash);
if (GNUNET_YES == op_has_element (op, &ee->element_hash))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "got existing element from peer\n");
GNUNET_free (ee);
return;
}
* @param mh the message
*/
static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_element_requests (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
struct IBF_Key *ibf_key;
/**
- * Evaluate a union operation with
- * a remote peer.
+ * Initiate operation to evaluate a set union with a remote peer.
*
- * @param op operation to evaluate
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ * to convince him to accept, may be NULL
*/
static void
-union_evaluate (struct Operation *op)
+union_evaluate (struct Operation *op,
+ const struct GNUNET_MessageHeader *opaque_context)
{
+ struct GNUNET_MQ_Envelope *ev;
+ struct OperationRequestMessage *msg;
+
op->state = GNUNET_new (struct OperationState);
- // copy the current generation's strata estimator for this operation
+ /* copy the current generation's strata estimator for this operation */
op->state->se = strata_estimator_dup (op->spec->set->state->se);
/* we started the operation, thus we have to send the operation request */
op->state->phase = PHASE_EXPECT_SE;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation\n");
- send_operation_request (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Initiating union operation evaluation\n");
+ ev = GNUNET_MQ_msg_nested_mh (msg,
+ GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+ opaque_context);
+ if (NULL == ev)
+ {
+ /* the context message is too large */
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (op->spec->set->client);
+ return;
+ }
+ msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
+ msg->app_id = op->spec->app_id;
+ msg->salt = htonl (op->spec->salt);
+ GNUNET_MQ_send (op->mq, ev);
+
+ if (NULL != opaque_context)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "sent op request with context message\n");
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "sent op request without context message\n");
}
/**
* Create a new set supporting the union operation
- *
+ *
* We maintain one strata estimator per set and then manipulate it over the
* lifetime of the set, as recreating a strata estimator would be expensive.
- *
+ *
* @return the newly created set
*/
static struct SetState *
handle_p2p_done (op, mh);
break;
default:
- /* something wrong with mesh's message handlers? */
+ /* something wrong with cadet's message handlers? */
GNUNET_assert (0);
}
return GNUNET_OK;
}
/**
- * handler for peer-disconnects, notifies the client
+ * handler for peer-disconnects, notifies the client
* about the aborted operation in case the op was not concluded
- *
+ *
* @param op the destroyed operation
*/
static void
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
msg->element_type = htons (0);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
- _GSS_operation_destroy (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "other peer disconnected prematurely\n");
+ _GSS_operation_destroy (op, GNUNET_YES);
return;
}
// else: the session has already been concluded
/**
* Get the table with implementing functions for
* set union.
- *
+ *
* @return the operation specific VTable
*/
const struct SetVT *